Documentation
use crate::sync::Waiter;
use crate::sync::lock::Mutex;
use alloc::collections::VecDeque;
use alloc::vec::Vec;
use core::ptr::NonNull;
use core::sync::atomic::{AtomicU16, AtomicUsize, Ordering};

/// Buffer group ID for io_uring
pub type BufferGroupId = u16;

/// Buffer ID within a group
pub type BufferId = u16;

/// A single buffer in the pool
pub struct Buffer {
    pub ptr: NonNull<u8>,
    pub len: usize,
    pub group_id: BufferGroupId,
    pub buffer_id: BufferId,
}

unsafe impl Send for Buffer {}

/// Global buffer pool configuration
pub struct BufferPoolConfig {
    /// Size of each buffer (default: 64KB for jumbo frames)
    pub buffer_size: usize,
    /// Number of buffers per group (default: 256)
    pub buffers_per_group: usize,
    /// Maximum number of groups (default: 16)
    pub max_groups: usize,
}

impl Default for BufferPoolConfig {
    fn default() -> Self {
        Self {
            buffer_size: 65536, // 64KB
            buffers_per_group: 256,
            max_groups: 16,
        }
    }
}

/// A group of buffers for a specific socket/connection
struct BufferGroup {
    buffers: Vec<Buffer>,
    free_list: VecDeque<BufferId>,
    allocated: AtomicUsize,
}

/// Global buffer pool shared across all sockets
pub struct BufferPool {
    config: BufferPoolConfig,
    groups: Mutex<Vec<Option<BufferGroup>>>,
    next_group_id: AtomicU16,
}

impl BufferPool {
    pub fn new(config: BufferPoolConfig) -> Self {
        let mut groups = Vec::with_capacity(config.max_groups);
        for _ in 0..config.max_groups {
            groups.push(None);
        }

        Self {
            config,
            groups: Mutex::new(groups),
            next_group_id: AtomicU16::new(0),
        }
    }

    /// Allocate a new buffer group for a socket
    pub async fn allocate_group(&self) -> Option<BufferGroupId> {
        let mut groups = self.groups.lock(Waiter::default()).await;

        // Find an empty slot
        for (idx, slot) in groups.iter_mut().enumerate() {
            if slot.is_none() {
                let group_id = idx as BufferGroupId;
                let mut buffers = Vec::with_capacity(self.config.buffers_per_group);
                let mut free_list = VecDeque::with_capacity(self.config.buffers_per_group);

                // Allocate all buffers for this group
                for buffer_id in 0..self.config.buffers_per_group {
                    let layout = alloc::alloc::Layout::from_size_align(
                        self.config.buffer_size,
                        64, // Cache line alignment
                    )
                    .unwrap();

                    let ptr = unsafe { alloc::alloc::alloc(layout) };
                    if ptr.is_null() {
                        return None;
                    }

                    buffers.push(Buffer {
                        ptr: NonNull::new(ptr).unwrap(),
                        len: self.config.buffer_size,
                        group_id,
                        buffer_id: buffer_id as BufferId,
                    });

                    free_list.push_back(buffer_id as BufferId);
                }

                *slot = Some(BufferGroup {
                    buffers,
                    free_list,
                    allocated: AtomicUsize::new(0),
                });

                return Some(group_id);
            }
        }

        None
    }

    /// Release a buffer group
    pub async fn release_group(&self, group_id: BufferGroupId) {
        let mut groups = self.groups.lock(Waiter::default()).await;

        if let Some(Some(group)) = groups.get_mut(group_id as usize) {
            let layout =
                alloc::alloc::Layout::from_size_align(self.config.buffer_size, 64).unwrap();

            // Deallocate all buffers
            for buffer in &group.buffers {
                unsafe {
                    alloc::alloc::dealloc(buffer.ptr.as_ptr(), layout);
                }
            }

            groups[group_id as usize] = None;
        }
    }

    /// Get a buffer from a group using a closure (for io_uring to use)
    pub async fn with_buffer<F, R>(&self, group_id: BufferGroupId, f: F) -> Option<R>
    where
        F: FnOnce(&Buffer) -> R,
    {
        let groups = self.groups.lock(Waiter::default()).await;

        if let Some(Some(group)) = groups.get(group_id as usize) {
            if let Some(&buffer_id) = group.free_list.front() {
                return group.buffers.get(buffer_id as usize).map(f);
            }
        }

        None
    }

    /// Mark a buffer as used (when io_uring consumes it)
    pub async fn mark_used(&self, group_id: BufferGroupId, buffer_id: BufferId) {
        let mut groups = self.groups.lock(Waiter::default()).await;

        if let Some(Some(group)) = groups.get_mut(group_id as usize) {
            group.free_list.retain(|&id| id != buffer_id);
            group.allocated.fetch_add(1, Ordering::Relaxed);
        }
    }

    /// Return a buffer to the pool
    pub async fn return_buffer(&self, group_id: BufferGroupId, buffer_id: BufferId) {
        let mut groups = self.groups.lock(Waiter::default()).await;

        if let Some(Some(group)) = groups.get_mut(group_id as usize) {
            if !group.free_list.contains(&buffer_id) {
                group.free_list.push_back(buffer_id);
                group.allocated.fetch_sub(1, Ordering::Relaxed);
            }
        }
    }

    /// Get all buffers in a group (for registering with io_uring)
    pub async fn get_group_buffers(
        &self,
        group_id: BufferGroupId,
    ) -> Option<Vec<(NonNull<u8>, usize)>> {
        let groups = self.groups.lock(Waiter::default()).await;

        if let Some(Some(group)) = groups.get(group_id as usize) {
            Some(group.buffers.iter().map(|b| (b.ptr, b.len)).collect())
        } else {
            None
        }
    }
}

/// Per-socket buffer allocation
pub struct SocketBufferAllocation {
    pool: Arc<BufferPool>,
    group_id: BufferGroupId,
    /// Buffers currently in use by this socket
    in_use: Mutex<Vec<BufferId>>,
}

use alloc::sync::Arc;

impl SocketBufferAllocation {
    pub async fn new(pool: Arc<BufferPool>) -> Option<Self> {
        let group_id = pool.allocate_group().await?;

        Some(Self {
            pool,
            group_id,
            in_use: Mutex::new(Vec::new()),
        })
    }

    pub fn group_id(&self) -> BufferGroupId {
        self.group_id
    }

    pub async fn acquire_buffer(&self) -> Option<BufferId> {
        // This is handled by io_uring with buffer selection
        // We just track what's in use
        None
    }

    pub async fn release_buffer(&self, buffer_id: BufferId) {
        self.pool.return_buffer(self.group_id, buffer_id).await;
        let mut in_use = self.in_use.lock(Waiter::default()).await;
        in_use.retain(|&id| id != buffer_id);
    }
}

impl Drop for SocketBufferAllocation {
    fn drop(&mut self) {
        // Schedule cleanup task
        let pool = self.pool.clone();
        let group_id = self.group_id;

        crate::runtime::spawn_local(async move {
            pool.release_group(group_id).await;
        });
    }
}