ruapc_bufpool/
pool.rs

1//! Buffer pool implementation with buddy memory allocation.
2//!
3//! This module provides the [`BufferPool`] and [`BufferPoolBuilder`] types for
4//! managing a pool of memory buffers using the buddy allocation algorithm.
5
6use std::collections::VecDeque;
7use std::io::{Error, ErrorKind, Result};
8use std::ptr::NonNull;
9use std::sync::{Arc, Mutex};
10
11use tokio::sync::oneshot;
12
13use crate::allocator::{Allocator, DefaultAllocator};
14use crate::buddy::{BuddyBlock, NUM_LEVELS, NodeState, SIZE_64MIB, size_to_level};
15use crate::buffer::Buffer;
16use crate::intrusive_list::IntrusiveList;
17
18/// Default maximum memory limit (256 MiB).
19const DEFAULT_MAX_MEMORY: usize = 256 * 1024 * 1024;
20
21/// Builder for creating a [`BufferPool`] with custom configuration.
22///
23/// # Example
24///
25/// ```rust
26/// use ruapc_bufpool::BufferPoolBuilder;
27///
28/// let pool = BufferPoolBuilder::new()
29///     .max_memory(512 * 1024 * 1024) // 512 MiB
30///     .build();
31/// ```
32pub struct BufferPoolBuilder {
33    max_memory: usize,
34    allocator: Box<dyn Allocator>,
35}
36
37impl Default for BufferPoolBuilder {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl BufferPoolBuilder {
44    /// Creates a new builder with default settings.
45    ///
46    /// Default settings:
47    /// - Max memory: 256 MiB
48    /// - Allocator: [`DefaultAllocator`]
49    #[must_use]
50    pub fn new() -> Self {
51        Self {
52            max_memory: DEFAULT_MAX_MEMORY,
53            allocator: Box::new(DefaultAllocator::new()),
54        }
55    }
56
57    /// Sets the maximum memory limit for the pool.
58    ///
59    /// When this limit is reached:
60    /// - Synchronous allocations will return an error
61    /// - Asynchronous allocations will wait for memory to be freed
62    ///
63    /// The limit should be a multiple of 64 MiB for optimal utilization.
64    #[must_use]
65    pub const fn max_memory(mut self, max_memory: usize) -> Self {
66        self.max_memory = max_memory;
67        self
68    }
69
70    /// Sets a custom allocator for the pool.
71    ///
72    /// The allocator is used to allocate and deallocate the underlying 64 MiB
73    /// memory blocks.
74    #[must_use]
75    pub fn allocator(mut self, allocator: Box<dyn Allocator>) -> Self {
76        self.allocator = allocator;
77        self
78    }
79
80    /// Builds the buffer pool with the configured settings.
81    ///
82    /// Returns an `Arc<BufferPool>` to enable efficient sharing across threads
83    /// and ensure buffers can maintain references to their owning pool.
84    #[must_use]
85    pub fn build(self) -> Arc<BufferPool> {
86        let inner = PoolInner {
87            allocator: self.allocator,
88            max_memory: self.max_memory,
89            allocated_memory: 0,
90            blocks: Vec::new(),
91            free_lists: std::array::from_fn(|_| IntrusiveList::new()),
92            waiting_lists: std::array::from_fn(|_| VecDeque::new()),
93            min_waiting_level: None,
94        };
95
96        Arc::new(BufferPool {
97            inner: Mutex::new(inner),
98        })
99    }
100}
101
102/// A high-performance memory pool using buddy memory allocation.
103///
104/// The pool manages memory in 64 MiB blocks and supports allocation of buffers
105/// at four size levels: 1 MiB, 4 MiB, 16 MiB, and 64 MiB.
106///
107/// # Thread Safety
108///
109/// The pool uses `tokio::sync::Mutex` for thread safety, making it suitable for
110/// use in both synchronous and asynchronous contexts.
111///
112/// # Buffer Lifetime
113///
114/// Each allocated [`Buffer`] holds an `Arc<BufferPool>` reference, ensuring the
115/// pool remains valid for the lifetime of all buffers. When a buffer is dropped,
116/// its memory is immediately returned to the pool with O(1) complexity.
117///
118/// # Example
119///
120/// ```rust
121/// use ruapc_bufpool::BufferPoolBuilder;
122///
123/// # fn main() -> std::io::Result<()> {
124/// let pool = BufferPoolBuilder::new()
125///     .max_memory(128 * 1024 * 1024)
126///     .build();
127///
128/// // Allocate a 1 MiB buffer
129/// let buffer = pool.allocate(1024 * 1024)?;
130/// assert!(buffer.len() >= 1024 * 1024);
131///
132/// // Buffer is automatically returned when dropped
133/// drop(buffer);
134/// # Ok(())
135/// # }
136/// ```
137pub struct BufferPool {
138    inner: Mutex<PoolInner>,
139}
140
141impl BufferPool {
142    /// Creates a new buffer pool with default settings.
143    ///
144    /// This is equivalent to `BufferPoolBuilder::new().build()`.
145    #[must_use]
146    pub fn new() -> Arc<Self> {
147        BufferPoolBuilder::new().build()
148    }
149
150    /// Returns a buffer to the pool.
151    ///
152    /// This is called automatically when a [`Buffer`] is dropped. The operation
153    /// is O(1) for deallocation and O(log n) for buddy merging (where n is the
154    /// number of levels, which is constant at 4).
155    ///
156    /// # Arguments
157    ///
158    /// * `level` - The allocation level (0-3)
159    /// * `index` - The index within the level
160    /// * `block` - Pointer to the buddy block
161    pub(crate) fn return_buffer(
162        self: &Arc<Self>,
163        level: usize,
164        index: usize,
165        block: NonNull<BuddyBlock>,
166    ) {
167        let mut inner = self.inner.lock().expect("BufferPool mutex poisoned");
168        inner.deallocate_buffer(level, index, block);
169    }
170
171    /// Allocates a buffer of at least the specified size.
172    ///
173    /// The returned buffer may be larger than requested, rounded up to the
174    /// nearest allocation level (1 MiB, 4 MiB, 16 MiB, or 64 MiB).
175    ///
176    /// This method blocks the current thread while waiting for the lock.
177    /// For async contexts, use [`async_allocate`](Self::async_allocate) instead.
178    ///
179    /// # Arguments
180    ///
181    /// * `size` - The minimum size of the buffer in bytes.
182    ///
183    /// # Returns
184    ///
185    /// Returns a [`Buffer`] on success, or an error if:
186    /// - The requested size is 0 or exceeds 64 MiB
187    /// - The memory limit has been reached
188    /// - Memory allocation fails
189    ///
190    /// # Errors
191    ///
192    /// Returns an error if:
193    /// - `size` is 0 or exceeds 64 MiB (`InvalidInput`)
194    /// - Memory limit has been reached (`OutOfMemory`)
195    /// - Underlying allocator fails (`OutOfMemory`)
196    ///
197    /// # Panics
198    ///
199    /// Panics if the internal mutex is poisoned (which only happens if another
200    /// thread panicked while holding the lock).
201    ///
202    /// # Example
203    ///
204    /// ```rust
205    /// use ruapc_bufpool::BufferPoolBuilder;
206    ///
207    /// # fn main() -> std::io::Result<()> {
208    /// let pool = BufferPoolBuilder::new().build();
209    /// let buffer = pool.allocate(2 * 1024 * 1024)?; // Request 2 MiB
210    /// assert!(buffer.len() >= 4 * 1024 * 1024); // Gets 4 MiB (next level up)
211    /// # Ok(())
212    /// # }
213    /// ```
214    pub fn allocate(self: &Arc<Self>, size: usize) -> Result<Buffer> {
215        let mut inner = self.inner.lock().expect("BufferPool mutex poisoned");
216        inner.allocate_sync(size, Arc::clone(self))
217    }
218
219    /// Allocates a buffer asynchronously.
220    ///
221    /// This is the async version of [`allocate`](Self::allocate). If the memory
222    /// limit has been reached, this method will wait for other buffers to be
223    /// freed rather than returning an error.
224    ///
225    /// # Arguments
226    ///
227    /// * `size` - The minimum size of the buffer in bytes.
228    ///
229    /// # Returns
230    ///
231    /// Returns a [`Buffer`] on success, or an error if:
232    /// - The requested size is 0 or exceeds 64 MiB
233    /// - Memory allocation fails
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if:
238    /// - `size` is 0 or exceeds 64 MiB (`InvalidInput`)
239    /// - Underlying allocator fails (`OutOfMemory`)
240    ///
241    /// Note: Unlike [`allocate`](Self::allocate), this method will wait instead
242    /// of returning `OutOfMemory` when the pool's memory limit is reached.
243    ///
244    /// # Panics
245    ///
246    /// Panics if the internal mutex is poisoned (which only happens if another
247    /// thread panicked while holding the lock).
248    ///
249    /// # Example
250    ///
251    /// ```rust
252    /// use ruapc_bufpool::BufferPoolBuilder;
253    ///
254    /// # async fn example() -> std::io::Result<()> {
255    /// let pool = BufferPoolBuilder::new().build();
256    /// let buffer = pool.async_allocate(1024 * 1024).await?;
257    /// # Ok(())
258    /// # }
259    /// ```
260    pub async fn async_allocate(self: &Arc<Self>, size: usize) -> Result<Buffer> {
261        let level = size_to_level(size).ok_or_else(|| {
262            Error::new(
263                ErrorKind::InvalidInput,
264                format!("invalid size: {size} (must be 1-67108864 bytes)"),
265            )
266        })?;
267
268        loop {
269            let receiver = {
270                let mut inner = self.inner.lock().expect("BufferPool mutex poisoned");
271
272                // Try to allocate directly
273                match inner.try_allocate(level, Arc::clone(self)) {
274                    Ok(buffer) => return Ok(buffer),
275                    Err(e) if e.kind() == ErrorKind::OutOfMemory => {
276                        // Memory limit reached, wait for a buffer
277                        let (sender, receiver) = oneshot::channel();
278                        inner.waiting_lists[level].push_back(sender);
279                        inner.update_min_waiting_level_on_add(level);
280                        receiver
281                    }
282                    Err(e) => return Err(e),
283                }
284            };
285
286            // Wait for buffer to be available (lock is released)
287            if let Ok(buffer) = receiver.await {
288                return Ok(buffer);
289            }
290            // Sender was dropped without sending, retry allocation
291        }
292    }
293
294    /// Returns the current amount of allocated memory in bytes.
295    ///
296    /// This includes all 64 MiB blocks that have been allocated from the
297    /// underlying allocator, regardless of how much is currently in use.
298    ///
299    /// # Panics
300    ///
301    /// Panics if the internal mutex is poisoned.
302    pub fn allocated_memory(&self) -> usize {
303        self.inner
304            .lock()
305            .expect("BufferPool mutex poisoned")
306            .allocated_memory
307    }
308
309    /// Returns the maximum memory limit in bytes.
310    ///
311    /// # Panics
312    ///
313    /// Panics if the internal mutex is poisoned.
314    pub fn max_memory(&self) -> usize {
315        self.inner
316            .lock()
317            .expect("BufferPool mutex poisoned")
318            .max_memory
319    }
320
321    /// Returns the number of free buffers at each level.
322    ///
323    /// The returned array contains counts for levels 0-3 (1 MiB to 64 MiB).
324    ///
325    /// # Panics
326    ///
327    /// Panics if the internal mutex is poisoned.
328    pub fn free_counts(&self) -> [usize; NUM_LEVELS] {
329        let inner = self.inner.lock().expect("BufferPool mutex poisoned");
330        std::array::from_fn(|i| inner.free_lists[i].len())
331    }
332}
333
334/// Sender type for waiting list entries.
335type WaitingSender = oneshot::Sender<Buffer>;
336
337/// Internal pool state protected by the mutex.
338struct PoolInner {
339    /// The allocator used for allocating 64 MiB blocks.
340    allocator: Box<dyn Allocator>,
341
342    /// Maximum allowed total memory.
343    max_memory: usize,
344
345    /// Current total allocated memory (from the underlying allocator).
346    allocated_memory: usize,
347
348    /// All allocated 64 MiB blocks.
349    /// Note: We use Box to ensure stable addresses, as free list nodes
350    /// reference the blocks via raw pointers.
351    #[allow(clippy::vec_box)]
352    blocks: Vec<Box<BuddyBlock>>,
353
354    /// Free lists for each level (0 = 1MiB, 3 = 64MiB).
355    free_lists: [IntrusiveList<crate::buddy::FreeNodeData>; NUM_LEVELS],
356
357    /// Waiting lists for each level (async waiters).
358    waiting_lists: [VecDeque<WaitingSender>; NUM_LEVELS],
359
360    /// Minimum level that has waiting allocations.
361    min_waiting_level: Option<usize>,
362}
363
364impl PoolInner {
365    /// Synchronous allocation - returns error if memory limit reached.
366    fn allocate_sync(&mut self, size: usize, pool: Arc<BufferPool>) -> Result<Buffer> {
367        let level = size_to_level(size).ok_or_else(|| {
368            Error::new(
369                ErrorKind::InvalidInput,
370                format!("invalid size: {size} (must be 1-67108864 bytes)"),
371            )
372        })?;
373
374        self.try_allocate(level, pool)
375    }
376
377    /// Tries to allocate a buffer at the given level.
378    ///
379    /// Returns `OutOfMemory` error if the memory limit is reached.
380    fn try_allocate(&mut self, level: usize, pool: Arc<BufferPool>) -> Result<Buffer> {
381        // Try to find a free buffer at this level or split from a larger one
382        if let Some(buffer) = self.try_allocate_from_free_lists(level, Arc::clone(&pool)) {
383            return Ok(buffer);
384        }
385
386        // Need to allocate a new 64 MiB block
387        self.allocate_new_block()?;
388
389        // Try again - should succeed now
390        self.try_allocate_from_free_lists(level, pool)
391            .ok_or_else(|| Error::other("allocation failed unexpectedly"))
392    }
393
394    /// Tries to allocate from existing free lists.
395    fn try_allocate_from_free_lists(
396        &mut self,
397        level: usize,
398        pool: Arc<BufferPool>,
399    ) -> Option<Buffer> {
400        // Look for a free buffer at this level or higher
401        for search_level in level..NUM_LEVELS {
402            if !self.free_lists[search_level].is_empty() {
403                // Found a free buffer, potentially need to split
404                return Some(self.allocate_at_level(search_level, level, pool));
405            }
406        }
407        None
408    }
409
410    /// Allocates a buffer by taking from `from_level` and splitting down to `target_level`.
411    fn allocate_at_level(
412        &mut self,
413        from_level: usize,
414        target_level: usize,
415        pool: Arc<BufferPool>,
416    ) -> Buffer {
417        // Pop a free node from the source level
418        let node = self.free_lists[from_level].pop_front().unwrap();
419
420        // SAFETY: node is valid and was in the free list
421        let (block, index_in_level) = unsafe {
422            let node_ref = &*node.as_ptr();
423            (node_ref.data.block, node_ref.data.index_in_level)
424        };
425
426        // SAFETY: block pointer is valid
427        let block_ref = unsafe { &mut *block.as_ptr() };
428
429        if from_level == target_level {
430            // No splitting needed
431            block_ref.set_state(from_level, index_in_level, NodeState::Allocated);
432
433            let ptr = block_ref.get_memory_addr(from_level, index_in_level);
434
435            // SAFETY: ptr is valid, block is valid, level/index are correct
436            unsafe {
437                Buffer::new(
438                    NonNull::new(ptr).unwrap(),
439                    from_level,
440                    index_in_level,
441                    block,
442                    pool,
443                )
444            }
445        } else {
446            // Need to split
447            self.split_and_allocate(block, from_level, index_in_level, target_level, pool)
448        }
449    }
450
451    /// Splits a block from `from_level` down to `target_level` and allocates.
452    fn split_and_allocate(
453        &mut self,
454        block: NonNull<BuddyBlock>,
455        from_level: usize,
456        from_index: usize,
457        target_level: usize,
458        pool: Arc<BufferPool>,
459    ) -> Buffer {
460        // SAFETY: block pointer is valid
461        let block_ref = unsafe { &mut *block.as_ptr() };
462
463        let mut current_level = from_level;
464        let mut current_index = from_index;
465
466        while current_level > target_level {
467            // Mark current node as split
468            block_ref.set_state(current_level, current_index, NodeState::Split);
469
470            // Get first child
471            let (child_level, first_child_index) =
472                BuddyBlock::get_first_child(current_level, current_index).unwrap();
473
474            // Add siblings 1-3 to free list (child 0 will be used or split further)
475            for i in 1..4 {
476                let child_index = first_child_index + i;
477                block_ref.set_state(child_level, child_index, NodeState::Free);
478
479                let node = block_ref.get_free_node_mut(child_level, child_index);
480                // SAFETY: node is valid and not in any list
481                unsafe {
482                    self.free_lists[child_level].push_front(node);
483                }
484            }
485
486            // Continue with child 0
487            current_level = child_level;
488            current_index = first_child_index;
489        }
490
491        // Allocate at target level
492        block_ref.set_state(current_level, current_index, NodeState::Allocated);
493
494        let ptr = block_ref.get_memory_addr(current_level, current_index);
495
496        // SAFETY: ptr is valid, block is valid, level/index are correct
497        unsafe {
498            Buffer::new(
499                NonNull::new(ptr).unwrap(),
500                current_level,
501                current_index,
502                block,
503                pool,
504            )
505        }
506    }
507
508    /// Allocates a new 64 MiB block from the underlying allocator.
509    fn allocate_new_block(&mut self) -> Result<()> {
510        // Check memory limit
511        if self.allocated_memory + SIZE_64MIB > self.max_memory {
512            return Err(Error::new(ErrorKind::OutOfMemory, "memory limit reached"));
513        }
514
515        // Allocate memory
516        let memory = self.allocator.allocate(SIZE_64MIB)?;
517
518        // Create buddy block
519        // SAFETY: memory is valid and points to SIZE_64MIB bytes
520        let block = unsafe { BuddyBlock::new(memory) };
521
522        // Get pointer to block before adding to list
523        // SAFETY: We need a raw pointer to push the free node. The block is heap-allocated.
524        let block_ptr =
525            NonNull::new(std::ptr::from_ref::<BuddyBlock>(block.as_ref()).cast_mut()).unwrap();
526
527        // Add root node to level 3 free list
528        // SAFETY: block is valid, and the level 3 node exists
529        unsafe {
530            let node = (*block_ptr.as_ptr()).get_free_node_mut(3, 0);
531            self.free_lists[3].push_front(node);
532        }
533
534        // Add block to our list
535        // Note: We need to be careful here. After the block is added to the list,
536        // the pointer we got earlier is still valid because Box is heap-allocated.
537        self.blocks.push(block);
538        self.allocated_memory += SIZE_64MIB;
539
540        Ok(())
541    }
542
543    /// Deallocates a buffer and returns it to the appropriate free list.
544    ///
545    /// This method performs O(1) deallocation and O(log n) buddy merging where
546    /// n is the number of levels (constant at 4). If there are waiters for
547    /// memory, they will be notified to retry allocation.
548    pub(crate) fn deallocate_buffer(
549        &mut self,
550        level: usize,
551        index: usize,
552        block: NonNull<BuddyBlock>,
553    ) {
554        // SAFETY: block pointer is valid
555        let block_ref = unsafe { &mut *block.as_ptr() };
556
557        // Try to merge with siblings
558        let (final_level, _final_index) = self.try_merge(block_ref, level, index);
559
560        // Check if we should notify a waiting allocation
561        if let Some(min_waiting) = self.min_waiting_level {
562            // Check if we can satisfy any waiting allocation
563            // We can satisfy if the freed buffer is >= the waiting level
564            if final_level >= min_waiting {
565                // Find the smallest waiting level we can satisfy
566                for wait_level in min_waiting..=final_level {
567                    if self.waiting_lists[wait_level].pop_front().is_some() {
568                        // Sender is dropped here, signaling the waiter to retry.
569                        // The waiter will re-acquire the lock and allocate.
570                        self.update_min_waiting_level_on_remove(wait_level);
571                        return;
572                    }
573                }
574            }
575        }
576
577        // No waiters to satisfy, buffer is already in free list from try_merge
578    }
579
580    /// Tries to merge freed buffer with its buddies, returns the final level and index.
581    fn try_merge(&mut self, block: &mut BuddyBlock, level: usize, index: usize) -> (usize, usize) {
582        let mut current_level = level;
583        let mut current_index = index;
584
585        loop {
586            // Check if we can merge with siblings
587            if current_level >= 3 {
588                // At root level, just mark as free
589                block.set_state(current_level, current_index, NodeState::Free);
590                let node = block.get_free_node_mut(current_level, current_index);
591                // SAFETY: node is valid and not in any list
592                unsafe {
593                    self.free_lists[current_level].push_front(node);
594                }
595                return (current_level, current_index);
596            }
597
598            // Get sibling indices
599            let siblings = BuddyBlock::get_siblings(current_index);
600
601            // Check if all siblings are free
602            let all_free = siblings.iter().all(|&idx| {
603                idx == current_index || block.get_state(current_level, idx) == NodeState::Free
604            });
605
606            if !all_free {
607                // Cannot merge, just mark as free
608                block.set_state(current_level, current_index, NodeState::Free);
609                let node = block.get_free_node_mut(current_level, current_index);
610                // SAFETY: node is valid and not in any list
611                unsafe {
612                    self.free_lists[current_level].push_front(node);
613                }
614                return (current_level, current_index);
615            }
616
617            // All siblings are free, remove them from free list and merge
618            for &sibling_idx in &siblings {
619                if sibling_idx != current_index {
620                    // Remove from free list
621                    let node = block.get_free_node_mut(current_level, sibling_idx);
622                    // SAFETY: node is valid and in the free list
623                    unsafe {
624                        self.free_lists[current_level].remove(node);
625                    }
626                }
627                // Mark as allocated (will be managed by parent)
628                block.set_state(current_level, sibling_idx, NodeState::Allocated);
629            }
630
631            // Move up to parent
632            let (parent_level, parent_index) =
633                BuddyBlock::get_parent(current_level, current_index).unwrap();
634            current_level = parent_level;
635            current_index = parent_index;
636        }
637    }
638
639    /// Updates the minimum waiting level after adding a waiter.
640    #[allow(clippy::missing_const_for_fn)]
641    fn update_min_waiting_level_on_add(&mut self, added_level: usize) {
642        match self.min_waiting_level {
643            None => self.min_waiting_level = Some(added_level),
644            Some(min_level) if added_level < min_level => {
645                self.min_waiting_level = Some(added_level);
646            }
647            _ => {}
648        }
649    }
650
651    /// Updates the minimum waiting level after removing a waiter.
652    fn update_min_waiting_level_on_remove(&mut self, removed_level: usize) {
653        if self.min_waiting_level == Some(removed_level)
654            && self.waiting_lists[removed_level].is_empty()
655        {
656            // Find the next non-empty waiting list
657            self.min_waiting_level =
658                (removed_level..NUM_LEVELS).find(|&level| !self.waiting_lists[level].is_empty());
659        }
660    }
661}
662
663impl Drop for PoolInner {
664    fn drop(&mut self) {
665        // Deallocate all 64 MiB blocks
666        for block in &self.blocks {
667            // SAFETY: memory was allocated by our allocator with SIZE_64MIB
668            unsafe {
669                self.allocator.deallocate(block.memory, SIZE_64MIB);
670            }
671        }
672    }
673}
674
675#[cfg(test)]
676mod tests {
677    use super::*;
678    use crate::buddy::{LEVEL_SIZES, SIZE_1MIB};
679
680    #[test]
681    fn test_pool_builder_defaults() {
682        let pool = BufferPoolBuilder::new().build();
683        // Just verify it builds without error
684        drop(pool);
685    }
686
687    #[test]
688    fn test_pool_builder_custom() {
689        let pool = BufferPoolBuilder::new()
690            .max_memory(128 * 1024 * 1024)
691            .build();
692        drop(pool);
693    }
694
695    #[test]
696    fn test_simple_allocation() {
697        let pool = BufferPoolBuilder::new().build();
698        let buffer = pool.allocate(SIZE_1MIB).unwrap();
699        assert_eq!(buffer.len(), SIZE_1MIB);
700    }
701
702    #[test]
703    fn test_allocation_sizes() {
704        let pool = BufferPoolBuilder::new().build();
705
706        // 1 MiB
707        let b1 = pool.allocate(1).unwrap();
708        assert_eq!(b1.len(), SIZE_1MIB);
709
710        // 4 MiB
711        let b2 = pool.allocate(SIZE_1MIB + 1).unwrap();
712        assert_eq!(b2.len(), LEVEL_SIZES[1]);
713
714        // 16 MiB
715        let b3 = pool.allocate(LEVEL_SIZES[1] + 1).unwrap();
716        assert_eq!(b3.len(), LEVEL_SIZES[2]);
717
718        // 64 MiB
719        let b4 = pool.allocate(LEVEL_SIZES[2] + 1).unwrap();
720        assert_eq!(b4.len(), LEVEL_SIZES[3]);
721    }
722
723    #[test]
724    fn test_allocation_reuse() {
725        let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
726
727        // Allocate and drop
728        let addr1 = {
729            let buffer = pool.allocate(SIZE_1MIB).unwrap();
730            buffer.as_ptr() as usize
731        };
732
733        // Allocate again - should reuse
734        let buffer2 = pool.allocate(SIZE_1MIB).unwrap();
735        let addr2 = buffer2.as_ptr() as usize;
736
737        // In buddy allocation, we might not get the exact same address
738        // but we should be within the same 64MiB block
739        assert!(addr1 > 0);
740        assert!(addr2 > 0);
741    }
742
743    #[test]
744    fn test_memory_limit_sync() {
745        let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
746
747        // Allocate the entire limit
748        let _b1 = pool.allocate(SIZE_64MIB).unwrap();
749
750        // This should fail
751        let result = pool.allocate(SIZE_1MIB);
752        assert!(result.is_err());
753        assert_eq!(result.unwrap_err().kind(), ErrorKind::OutOfMemory);
754    }
755
756    #[test]
757    fn test_invalid_size() {
758        let pool = BufferPoolBuilder::new().build();
759
760        // Size 0
761        let result = pool.allocate(0);
762        assert!(result.is_err());
763
764        // Size too large
765        let result = pool.allocate(SIZE_64MIB + 1);
766        assert!(result.is_err());
767    }
768
769    #[test]
770    fn test_buddy_splitting() {
771        let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
772
773        // Allocate 64 1MiB buffers (should all fit in one 64MiB block)
774        let buffers: Vec<_> = (0..64).map(|_| pool.allocate(SIZE_1MIB).unwrap()).collect();
775
776        assert_eq!(buffers.len(), 64);
777
778        // All buffers should have valid addresses within the 64MiB range
779        let base = buffers[0].as_ptr() as usize;
780        for (i, buf) in buffers.iter().enumerate() {
781            let addr = buf.as_ptr() as usize;
782            // Each buffer should be SIZE_1MIB apart (though order may vary)
783            assert!(addr >= base - SIZE_64MIB && addr < base + SIZE_64MIB);
784            assert_eq!(buf.len(), SIZE_1MIB);
785            // Verify the index is valid
786            assert!(i < 64);
787        }
788    }
789
790    #[test]
791    fn test_buddy_merging() {
792        let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
793
794        // Allocate 4 x 16MiB (fills entire 64MiB)
795        let b1 = pool.allocate(LEVEL_SIZES[2]).unwrap();
796        let b2 = pool.allocate(LEVEL_SIZES[2]).unwrap();
797        let b3 = pool.allocate(LEVEL_SIZES[2]).unwrap();
798        let b4 = pool.allocate(LEVEL_SIZES[2]).unwrap();
799
800        // Should be at limit now
801        assert!(pool.allocate(SIZE_1MIB).is_err());
802
803        // Free all 4 (should merge back to one 64MiB)
804        drop(b1);
805        drop(b2);
806        drop(b3);
807        drop(b4);
808
809        // Should be able to allocate 64MiB now
810        let b5 = pool.allocate(SIZE_64MIB).unwrap();
811        assert_eq!(b5.len(), SIZE_64MIB);
812    }
813
814    #[tokio::test]
815    async fn test_async_allocation() {
816        let pool = BufferPoolBuilder::new().build();
817        let buffer = pool.async_allocate(SIZE_1MIB).await.unwrap();
818        assert_eq!(buffer.len(), SIZE_1MIB);
819    }
820
821    #[tokio::test]
822    async fn test_async_allocation_waiting() {
823        use std::time::Duration;
824        use tokio::time::timeout;
825
826        let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
827
828        // Allocate the entire limit
829        let buffer = pool.async_allocate(SIZE_64MIB).await.unwrap();
830
831        let pool_clone = pool.clone();
832
833        // Start an async allocation that will have to wait
834        let handle = tokio::spawn(async move { pool_clone.async_allocate(SIZE_1MIB).await });
835
836        // Give the async allocation time to start waiting
837        tokio::time::sleep(Duration::from_millis(10)).await;
838
839        // Free the buffer
840        drop(buffer);
841
842        // The async allocation should complete
843        let result = timeout(Duration::from_secs(1), handle).await;
844        assert!(result.is_ok());
845        let buffer = result.unwrap().unwrap().unwrap();
846        assert_eq!(buffer.len(), SIZE_1MIB);
847    }
848
849    #[tokio::test]
850    async fn test_pool_stats() {
851        let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB * 2).build();
852
853        assert_eq!(pool.allocated_memory(), 0);
854        assert_eq!(pool.max_memory(), SIZE_64MIB * 2);
855
856        let _buffer = pool.async_allocate(SIZE_1MIB).await.unwrap();
857        assert_eq!(pool.allocated_memory(), SIZE_64MIB);
858    }
859
860    #[test]
861    fn test_buffer_write_read() {
862        let pool = BufferPoolBuilder::new().build();
863        let mut buffer = pool.allocate(SIZE_1MIB).unwrap();
864
865        // Write pattern
866        for (i, byte) in buffer.iter_mut().enumerate() {
867            *byte = (i % 256) as u8;
868        }
869
870        // Read back and verify
871        for (i, byte) in buffer.iter().enumerate() {
872            assert_eq!(*byte, (i % 256) as u8);
873        }
874    }
875
876    #[test]
877    fn test_multiple_pools() {
878        let pool1 = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
879        let pool2 = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
880
881        let b1 = pool1.allocate(SIZE_64MIB).unwrap();
882        let b2 = pool2.allocate(SIZE_64MIB).unwrap();
883
884        // Both should succeed as they're separate pools
885        assert_eq!(b1.len(), SIZE_64MIB);
886        assert_eq!(b2.len(), SIZE_64MIB);
887    }
888
889    #[test]
890    fn test_clone_pool() {
891        let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
892
893        let pool_clone = pool.clone();
894
895        let b1 = pool.allocate(SIZE_64MIB).unwrap();
896
897        // Clone shares the same state, so this should fail
898        let result = pool_clone.allocate(SIZE_1MIB);
899        assert!(result.is_err());
900
901        drop(b1);
902
903        // Now it should work
904        let _b2 = pool_clone.allocate(SIZE_1MIB).unwrap();
905    }
906}