ruapc_bufpool/pool.rs
1//! Buffer pool implementation with buddy memory allocation.
2//!
3//! This module provides the [`BufferPool`] and [`BufferPoolBuilder`] types for
4//! managing a pool of memory buffers using the buddy allocation algorithm.
5
6use std::collections::VecDeque;
7use std::io::{Error, ErrorKind, Result};
8use std::ptr::NonNull;
9use std::sync::{Arc, Mutex};
10
11use tokio::sync::oneshot;
12
13use crate::allocator::{Allocator, DefaultAllocator};
14use crate::buddy::{BuddyBlock, NUM_LEVELS, NodeState, SIZE_64MIB, size_to_level};
15use crate::buffer::Buffer;
16use crate::intrusive_list::IntrusiveList;
17
18/// Default maximum memory limit (256 MiB).
19const DEFAULT_MAX_MEMORY: usize = 256 * 1024 * 1024;
20
21/// Builder for creating a [`BufferPool`] with custom configuration.
22///
23/// # Example
24///
25/// ```rust
26/// use ruapc_bufpool::BufferPoolBuilder;
27///
28/// let pool = BufferPoolBuilder::new()
29/// .max_memory(512 * 1024 * 1024) // 512 MiB
30/// .build();
31/// ```
32pub struct BufferPoolBuilder {
33 max_memory: usize,
34 allocator: Box<dyn Allocator>,
35}
36
37impl Default for BufferPoolBuilder {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl BufferPoolBuilder {
44 /// Creates a new builder with default settings.
45 ///
46 /// Default settings:
47 /// - Max memory: 256 MiB
48 /// - Allocator: [`DefaultAllocator`]
49 #[must_use]
50 pub fn new() -> Self {
51 Self {
52 max_memory: DEFAULT_MAX_MEMORY,
53 allocator: Box::new(DefaultAllocator::new()),
54 }
55 }
56
57 /// Sets the maximum memory limit for the pool.
58 ///
59 /// When this limit is reached:
60 /// - Synchronous allocations will return an error
61 /// - Asynchronous allocations will wait for memory to be freed
62 ///
63 /// The limit should be a multiple of 64 MiB for optimal utilization.
64 #[must_use]
65 pub const fn max_memory(mut self, max_memory: usize) -> Self {
66 self.max_memory = max_memory;
67 self
68 }
69
70 /// Sets a custom allocator for the pool.
71 ///
72 /// The allocator is used to allocate and deallocate the underlying 64 MiB
73 /// memory blocks.
74 #[must_use]
75 pub fn allocator(mut self, allocator: Box<dyn Allocator>) -> Self {
76 self.allocator = allocator;
77 self
78 }
79
80 /// Builds the buffer pool with the configured settings.
81 ///
82 /// Returns an `Arc<BufferPool>` to enable efficient sharing across threads
83 /// and ensure buffers can maintain references to their owning pool.
84 #[must_use]
85 pub fn build(self) -> Arc<BufferPool> {
86 let inner = PoolInner {
87 allocator: self.allocator,
88 max_memory: self.max_memory,
89 allocated_memory: 0,
90 blocks: Vec::new(),
91 free_lists: std::array::from_fn(|_| IntrusiveList::new()),
92 waiting_lists: std::array::from_fn(|_| VecDeque::new()),
93 min_waiting_level: None,
94 };
95
96 Arc::new(BufferPool {
97 inner: Mutex::new(inner),
98 })
99 }
100}
101
102/// A high-performance memory pool using buddy memory allocation.
103///
104/// The pool manages memory in 64 MiB blocks and supports allocation of buffers
105/// at four size levels: 1 MiB, 4 MiB, 16 MiB, and 64 MiB.
106///
107/// # Thread Safety
108///
109/// The pool uses `tokio::sync::Mutex` for thread safety, making it suitable for
110/// use in both synchronous and asynchronous contexts.
111///
112/// # Buffer Lifetime
113///
114/// Each allocated [`Buffer`] holds an `Arc<BufferPool>` reference, ensuring the
115/// pool remains valid for the lifetime of all buffers. When a buffer is dropped,
116/// its memory is immediately returned to the pool with O(1) complexity.
117///
118/// # Example
119///
120/// ```rust
121/// use ruapc_bufpool::BufferPoolBuilder;
122///
123/// # fn main() -> std::io::Result<()> {
124/// let pool = BufferPoolBuilder::new()
125/// .max_memory(128 * 1024 * 1024)
126/// .build();
127///
128/// // Allocate a 1 MiB buffer
129/// let buffer = pool.allocate(1024 * 1024)?;
130/// assert!(buffer.len() >= 1024 * 1024);
131///
132/// // Buffer is automatically returned when dropped
133/// drop(buffer);
134/// # Ok(())
135/// # }
136/// ```
137pub struct BufferPool {
138 inner: Mutex<PoolInner>,
139}
140
141impl BufferPool {
142 /// Creates a new buffer pool with default settings.
143 ///
144 /// This is equivalent to `BufferPoolBuilder::new().build()`.
145 #[must_use]
146 pub fn new() -> Arc<Self> {
147 BufferPoolBuilder::new().build()
148 }
149
150 /// Returns a buffer to the pool.
151 ///
152 /// This is called automatically when a [`Buffer`] is dropped. The operation
153 /// is O(1) for deallocation and O(log n) for buddy merging (where n is the
154 /// number of levels, which is constant at 4).
155 ///
156 /// # Arguments
157 ///
158 /// * `level` - The allocation level (0-3)
159 /// * `index` - The index within the level
160 /// * `block` - Pointer to the buddy block
161 pub(crate) fn return_buffer(
162 self: &Arc<Self>,
163 level: usize,
164 index: usize,
165 block: NonNull<BuddyBlock>,
166 ) {
167 let mut inner = self.inner.lock().expect("BufferPool mutex poisoned");
168 inner.deallocate_buffer(level, index, block);
169 }
170
171 /// Allocates a buffer of at least the specified size.
172 ///
173 /// The returned buffer may be larger than requested, rounded up to the
174 /// nearest allocation level (1 MiB, 4 MiB, 16 MiB, or 64 MiB).
175 ///
176 /// This method blocks the current thread while waiting for the lock.
177 /// For async contexts, use [`async_allocate`](Self::async_allocate) instead.
178 ///
179 /// # Arguments
180 ///
181 /// * `size` - The minimum size of the buffer in bytes.
182 ///
183 /// # Returns
184 ///
185 /// Returns a [`Buffer`] on success, or an error if:
186 /// - The requested size is 0 or exceeds 64 MiB
187 /// - The memory limit has been reached
188 /// - Memory allocation fails
189 ///
190 /// # Errors
191 ///
192 /// Returns an error if:
193 /// - `size` is 0 or exceeds 64 MiB (`InvalidInput`)
194 /// - Memory limit has been reached (`OutOfMemory`)
195 /// - Underlying allocator fails (`OutOfMemory`)
196 ///
197 /// # Panics
198 ///
199 /// Panics if the internal mutex is poisoned (which only happens if another
200 /// thread panicked while holding the lock).
201 ///
202 /// # Example
203 ///
204 /// ```rust
205 /// use ruapc_bufpool::BufferPoolBuilder;
206 ///
207 /// # fn main() -> std::io::Result<()> {
208 /// let pool = BufferPoolBuilder::new().build();
209 /// let buffer = pool.allocate(2 * 1024 * 1024)?; // Request 2 MiB
210 /// assert!(buffer.len() >= 4 * 1024 * 1024); // Gets 4 MiB (next level up)
211 /// # Ok(())
212 /// # }
213 /// ```
214 pub fn allocate(self: &Arc<Self>, size: usize) -> Result<Buffer> {
215 let mut inner = self.inner.lock().expect("BufferPool mutex poisoned");
216 inner.allocate_sync(size, Arc::clone(self))
217 }
218
219 /// Allocates a buffer asynchronously.
220 ///
221 /// This is the async version of [`allocate`](Self::allocate). If the memory
222 /// limit has been reached, this method will wait for other buffers to be
223 /// freed rather than returning an error.
224 ///
225 /// # Arguments
226 ///
227 /// * `size` - The minimum size of the buffer in bytes.
228 ///
229 /// # Returns
230 ///
231 /// Returns a [`Buffer`] on success, or an error if:
232 /// - The requested size is 0 or exceeds 64 MiB
233 /// - Memory allocation fails
234 ///
235 /// # Errors
236 ///
237 /// Returns an error if:
238 /// - `size` is 0 or exceeds 64 MiB (`InvalidInput`)
239 /// - Underlying allocator fails (`OutOfMemory`)
240 ///
241 /// Note: Unlike [`allocate`](Self::allocate), this method will wait instead
242 /// of returning `OutOfMemory` when the pool's memory limit is reached.
243 ///
244 /// # Panics
245 ///
246 /// Panics if the internal mutex is poisoned (which only happens if another
247 /// thread panicked while holding the lock).
248 ///
249 /// # Example
250 ///
251 /// ```rust
252 /// use ruapc_bufpool::BufferPoolBuilder;
253 ///
254 /// # async fn example() -> std::io::Result<()> {
255 /// let pool = BufferPoolBuilder::new().build();
256 /// let buffer = pool.async_allocate(1024 * 1024).await?;
257 /// # Ok(())
258 /// # }
259 /// ```
260 pub async fn async_allocate(self: &Arc<Self>, size: usize) -> Result<Buffer> {
261 let level = size_to_level(size).ok_or_else(|| {
262 Error::new(
263 ErrorKind::InvalidInput,
264 format!("invalid size: {size} (must be 1-67108864 bytes)"),
265 )
266 })?;
267
268 loop {
269 let receiver = {
270 let mut inner = self.inner.lock().expect("BufferPool mutex poisoned");
271
272 // Try to allocate directly
273 match inner.try_allocate(level, Arc::clone(self)) {
274 Ok(buffer) => return Ok(buffer),
275 Err(e) if e.kind() == ErrorKind::OutOfMemory => {
276 // Memory limit reached, wait for a buffer
277 let (sender, receiver) = oneshot::channel();
278 inner.waiting_lists[level].push_back(sender);
279 inner.update_min_waiting_level_on_add(level);
280 receiver
281 }
282 Err(e) => return Err(e),
283 }
284 };
285
286 // Wait for buffer to be available (lock is released)
287 if let Ok(buffer) = receiver.await {
288 return Ok(buffer);
289 }
290 // Sender was dropped without sending, retry allocation
291 }
292 }
293
294 /// Returns the current amount of allocated memory in bytes.
295 ///
296 /// This includes all 64 MiB blocks that have been allocated from the
297 /// underlying allocator, regardless of how much is currently in use.
298 ///
299 /// # Panics
300 ///
301 /// Panics if the internal mutex is poisoned.
302 pub fn allocated_memory(&self) -> usize {
303 self.inner
304 .lock()
305 .expect("BufferPool mutex poisoned")
306 .allocated_memory
307 }
308
309 /// Returns the maximum memory limit in bytes.
310 ///
311 /// # Panics
312 ///
313 /// Panics if the internal mutex is poisoned.
314 pub fn max_memory(&self) -> usize {
315 self.inner
316 .lock()
317 .expect("BufferPool mutex poisoned")
318 .max_memory
319 }
320
321 /// Returns the number of free buffers at each level.
322 ///
323 /// The returned array contains counts for levels 0-3 (1 MiB to 64 MiB).
324 ///
325 /// # Panics
326 ///
327 /// Panics if the internal mutex is poisoned.
328 pub fn free_counts(&self) -> [usize; NUM_LEVELS] {
329 let inner = self.inner.lock().expect("BufferPool mutex poisoned");
330 std::array::from_fn(|i| inner.free_lists[i].len())
331 }
332}
333
334/// Sender type for waiting list entries.
335type WaitingSender = oneshot::Sender<Buffer>;
336
337/// Internal pool state protected by the mutex.
338struct PoolInner {
339 /// The allocator used for allocating 64 MiB blocks.
340 allocator: Box<dyn Allocator>,
341
342 /// Maximum allowed total memory.
343 max_memory: usize,
344
345 /// Current total allocated memory (from the underlying allocator).
346 allocated_memory: usize,
347
348 /// All allocated 64 MiB blocks.
349 /// Note: We use Box to ensure stable addresses, as free list nodes
350 /// reference the blocks via raw pointers.
351 #[allow(clippy::vec_box)]
352 blocks: Vec<Box<BuddyBlock>>,
353
354 /// Free lists for each level (0 = 1MiB, 3 = 64MiB).
355 free_lists: [IntrusiveList<crate::buddy::FreeNodeData>; NUM_LEVELS],
356
357 /// Waiting lists for each level (async waiters).
358 waiting_lists: [VecDeque<WaitingSender>; NUM_LEVELS],
359
360 /// Minimum level that has waiting allocations.
361 min_waiting_level: Option<usize>,
362}
363
364impl PoolInner {
365 /// Synchronous allocation - returns error if memory limit reached.
366 fn allocate_sync(&mut self, size: usize, pool: Arc<BufferPool>) -> Result<Buffer> {
367 let level = size_to_level(size).ok_or_else(|| {
368 Error::new(
369 ErrorKind::InvalidInput,
370 format!("invalid size: {size} (must be 1-67108864 bytes)"),
371 )
372 })?;
373
374 self.try_allocate(level, pool)
375 }
376
377 /// Tries to allocate a buffer at the given level.
378 ///
379 /// Returns `OutOfMemory` error if the memory limit is reached.
380 fn try_allocate(&mut self, level: usize, pool: Arc<BufferPool>) -> Result<Buffer> {
381 // Try to find a free buffer at this level or split from a larger one
382 if let Some(buffer) = self.try_allocate_from_free_lists(level, Arc::clone(&pool)) {
383 return Ok(buffer);
384 }
385
386 // Need to allocate a new 64 MiB block
387 self.allocate_new_block()?;
388
389 // Try again - should succeed now
390 self.try_allocate_from_free_lists(level, pool)
391 .ok_or_else(|| Error::other("allocation failed unexpectedly"))
392 }
393
394 /// Tries to allocate from existing free lists.
395 fn try_allocate_from_free_lists(
396 &mut self,
397 level: usize,
398 pool: Arc<BufferPool>,
399 ) -> Option<Buffer> {
400 // Look for a free buffer at this level or higher
401 for search_level in level..NUM_LEVELS {
402 if !self.free_lists[search_level].is_empty() {
403 // Found a free buffer, potentially need to split
404 return Some(self.allocate_at_level(search_level, level, pool));
405 }
406 }
407 None
408 }
409
410 /// Allocates a buffer by taking from `from_level` and splitting down to `target_level`.
411 fn allocate_at_level(
412 &mut self,
413 from_level: usize,
414 target_level: usize,
415 pool: Arc<BufferPool>,
416 ) -> Buffer {
417 // Pop a free node from the source level
418 let node = self.free_lists[from_level].pop_front().unwrap();
419
420 // SAFETY: node is valid and was in the free list
421 let (block, index_in_level) = unsafe {
422 let node_ref = &*node.as_ptr();
423 (node_ref.data.block, node_ref.data.index_in_level)
424 };
425
426 // SAFETY: block pointer is valid
427 let block_ref = unsafe { &mut *block.as_ptr() };
428
429 if from_level == target_level {
430 // No splitting needed
431 block_ref.set_state(from_level, index_in_level, NodeState::Allocated);
432
433 let ptr = block_ref.get_memory_addr(from_level, index_in_level);
434
435 // SAFETY: ptr is valid, block is valid, level/index are correct
436 unsafe {
437 Buffer::new(
438 NonNull::new(ptr).unwrap(),
439 from_level,
440 index_in_level,
441 block,
442 pool,
443 )
444 }
445 } else {
446 // Need to split
447 self.split_and_allocate(block, from_level, index_in_level, target_level, pool)
448 }
449 }
450
451 /// Splits a block from `from_level` down to `target_level` and allocates.
452 fn split_and_allocate(
453 &mut self,
454 block: NonNull<BuddyBlock>,
455 from_level: usize,
456 from_index: usize,
457 target_level: usize,
458 pool: Arc<BufferPool>,
459 ) -> Buffer {
460 // SAFETY: block pointer is valid
461 let block_ref = unsafe { &mut *block.as_ptr() };
462
463 let mut current_level = from_level;
464 let mut current_index = from_index;
465
466 while current_level > target_level {
467 // Mark current node as split
468 block_ref.set_state(current_level, current_index, NodeState::Split);
469
470 // Get first child
471 let (child_level, first_child_index) =
472 BuddyBlock::get_first_child(current_level, current_index).unwrap();
473
474 // Add siblings 1-3 to free list (child 0 will be used or split further)
475 for i in 1..4 {
476 let child_index = first_child_index + i;
477 block_ref.set_state(child_level, child_index, NodeState::Free);
478
479 let node = block_ref.get_free_node_mut(child_level, child_index);
480 // SAFETY: node is valid and not in any list
481 unsafe {
482 self.free_lists[child_level].push_front(node);
483 }
484 }
485
486 // Continue with child 0
487 current_level = child_level;
488 current_index = first_child_index;
489 }
490
491 // Allocate at target level
492 block_ref.set_state(current_level, current_index, NodeState::Allocated);
493
494 let ptr = block_ref.get_memory_addr(current_level, current_index);
495
496 // SAFETY: ptr is valid, block is valid, level/index are correct
497 unsafe {
498 Buffer::new(
499 NonNull::new(ptr).unwrap(),
500 current_level,
501 current_index,
502 block,
503 pool,
504 )
505 }
506 }
507
508 /// Allocates a new 64 MiB block from the underlying allocator.
509 fn allocate_new_block(&mut self) -> Result<()> {
510 // Check memory limit
511 if self.allocated_memory + SIZE_64MIB > self.max_memory {
512 return Err(Error::new(ErrorKind::OutOfMemory, "memory limit reached"));
513 }
514
515 // Allocate memory
516 let memory = self.allocator.allocate(SIZE_64MIB)?;
517
518 // Create buddy block
519 // SAFETY: memory is valid and points to SIZE_64MIB bytes
520 let block = unsafe { BuddyBlock::new(memory) };
521
522 // Get pointer to block before adding to list
523 // SAFETY: We need a raw pointer to push the free node. The block is heap-allocated.
524 let block_ptr =
525 NonNull::new(std::ptr::from_ref::<BuddyBlock>(block.as_ref()).cast_mut()).unwrap();
526
527 // Add root node to level 3 free list
528 // SAFETY: block is valid, and the level 3 node exists
529 unsafe {
530 let node = (*block_ptr.as_ptr()).get_free_node_mut(3, 0);
531 self.free_lists[3].push_front(node);
532 }
533
534 // Add block to our list
535 // Note: We need to be careful here. After the block is added to the list,
536 // the pointer we got earlier is still valid because Box is heap-allocated.
537 self.blocks.push(block);
538 self.allocated_memory += SIZE_64MIB;
539
540 Ok(())
541 }
542
543 /// Deallocates a buffer and returns it to the appropriate free list.
544 ///
545 /// This method performs O(1) deallocation and O(log n) buddy merging where
546 /// n is the number of levels (constant at 4). If there are waiters for
547 /// memory, they will be notified to retry allocation.
548 pub(crate) fn deallocate_buffer(
549 &mut self,
550 level: usize,
551 index: usize,
552 block: NonNull<BuddyBlock>,
553 ) {
554 // SAFETY: block pointer is valid
555 let block_ref = unsafe { &mut *block.as_ptr() };
556
557 // Try to merge with siblings
558 let (final_level, _final_index) = self.try_merge(block_ref, level, index);
559
560 // Check if we should notify a waiting allocation
561 if let Some(min_waiting) = self.min_waiting_level {
562 // Check if we can satisfy any waiting allocation
563 // We can satisfy if the freed buffer is >= the waiting level
564 if final_level >= min_waiting {
565 // Find the smallest waiting level we can satisfy
566 for wait_level in min_waiting..=final_level {
567 if self.waiting_lists[wait_level].pop_front().is_some() {
568 // Sender is dropped here, signaling the waiter to retry.
569 // The waiter will re-acquire the lock and allocate.
570 self.update_min_waiting_level_on_remove(wait_level);
571 return;
572 }
573 }
574 }
575 }
576
577 // No waiters to satisfy, buffer is already in free list from try_merge
578 }
579
580 /// Tries to merge freed buffer with its buddies, returns the final level and index.
581 fn try_merge(&mut self, block: &mut BuddyBlock, level: usize, index: usize) -> (usize, usize) {
582 let mut current_level = level;
583 let mut current_index = index;
584
585 loop {
586 // Check if we can merge with siblings
587 if current_level >= 3 {
588 // At root level, just mark as free
589 block.set_state(current_level, current_index, NodeState::Free);
590 let node = block.get_free_node_mut(current_level, current_index);
591 // SAFETY: node is valid and not in any list
592 unsafe {
593 self.free_lists[current_level].push_front(node);
594 }
595 return (current_level, current_index);
596 }
597
598 // Get sibling indices
599 let siblings = BuddyBlock::get_siblings(current_index);
600
601 // Check if all siblings are free
602 let all_free = siblings.iter().all(|&idx| {
603 idx == current_index || block.get_state(current_level, idx) == NodeState::Free
604 });
605
606 if !all_free {
607 // Cannot merge, just mark as free
608 block.set_state(current_level, current_index, NodeState::Free);
609 let node = block.get_free_node_mut(current_level, current_index);
610 // SAFETY: node is valid and not in any list
611 unsafe {
612 self.free_lists[current_level].push_front(node);
613 }
614 return (current_level, current_index);
615 }
616
617 // All siblings are free, remove them from free list and merge
618 for &sibling_idx in &siblings {
619 if sibling_idx != current_index {
620 // Remove from free list
621 let node = block.get_free_node_mut(current_level, sibling_idx);
622 // SAFETY: node is valid and in the free list
623 unsafe {
624 self.free_lists[current_level].remove(node);
625 }
626 }
627 // Mark as allocated (will be managed by parent)
628 block.set_state(current_level, sibling_idx, NodeState::Allocated);
629 }
630
631 // Move up to parent
632 let (parent_level, parent_index) =
633 BuddyBlock::get_parent(current_level, current_index).unwrap();
634 current_level = parent_level;
635 current_index = parent_index;
636 }
637 }
638
639 /// Updates the minimum waiting level after adding a waiter.
640 #[allow(clippy::missing_const_for_fn)]
641 fn update_min_waiting_level_on_add(&mut self, added_level: usize) {
642 match self.min_waiting_level {
643 None => self.min_waiting_level = Some(added_level),
644 Some(min_level) if added_level < min_level => {
645 self.min_waiting_level = Some(added_level);
646 }
647 _ => {}
648 }
649 }
650
651 /// Updates the minimum waiting level after removing a waiter.
652 fn update_min_waiting_level_on_remove(&mut self, removed_level: usize) {
653 if self.min_waiting_level == Some(removed_level)
654 && self.waiting_lists[removed_level].is_empty()
655 {
656 // Find the next non-empty waiting list
657 self.min_waiting_level =
658 (removed_level..NUM_LEVELS).find(|&level| !self.waiting_lists[level].is_empty());
659 }
660 }
661}
662
663impl Drop for PoolInner {
664 fn drop(&mut self) {
665 // Deallocate all 64 MiB blocks
666 for block in &self.blocks {
667 // SAFETY: memory was allocated by our allocator with SIZE_64MIB
668 unsafe {
669 self.allocator.deallocate(block.memory, SIZE_64MIB);
670 }
671 }
672 }
673}
674
675#[cfg(test)]
676mod tests {
677 use super::*;
678 use crate::buddy::{LEVEL_SIZES, SIZE_1MIB};
679
680 #[test]
681 fn test_pool_builder_defaults() {
682 let pool = BufferPoolBuilder::new().build();
683 // Just verify it builds without error
684 drop(pool);
685 }
686
687 #[test]
688 fn test_pool_builder_custom() {
689 let pool = BufferPoolBuilder::new()
690 .max_memory(128 * 1024 * 1024)
691 .build();
692 drop(pool);
693 }
694
695 #[test]
696 fn test_simple_allocation() {
697 let pool = BufferPoolBuilder::new().build();
698 let buffer = pool.allocate(SIZE_1MIB).unwrap();
699 assert_eq!(buffer.len(), SIZE_1MIB);
700 }
701
702 #[test]
703 fn test_allocation_sizes() {
704 let pool = BufferPoolBuilder::new().build();
705
706 // 1 MiB
707 let b1 = pool.allocate(1).unwrap();
708 assert_eq!(b1.len(), SIZE_1MIB);
709
710 // 4 MiB
711 let b2 = pool.allocate(SIZE_1MIB + 1).unwrap();
712 assert_eq!(b2.len(), LEVEL_SIZES[1]);
713
714 // 16 MiB
715 let b3 = pool.allocate(LEVEL_SIZES[1] + 1).unwrap();
716 assert_eq!(b3.len(), LEVEL_SIZES[2]);
717
718 // 64 MiB
719 let b4 = pool.allocate(LEVEL_SIZES[2] + 1).unwrap();
720 assert_eq!(b4.len(), LEVEL_SIZES[3]);
721 }
722
723 #[test]
724 fn test_allocation_reuse() {
725 let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
726
727 // Allocate and drop
728 let addr1 = {
729 let buffer = pool.allocate(SIZE_1MIB).unwrap();
730 buffer.as_ptr() as usize
731 };
732
733 // Allocate again - should reuse
734 let buffer2 = pool.allocate(SIZE_1MIB).unwrap();
735 let addr2 = buffer2.as_ptr() as usize;
736
737 // In buddy allocation, we might not get the exact same address
738 // but we should be within the same 64MiB block
739 assert!(addr1 > 0);
740 assert!(addr2 > 0);
741 }
742
743 #[test]
744 fn test_memory_limit_sync() {
745 let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
746
747 // Allocate the entire limit
748 let _b1 = pool.allocate(SIZE_64MIB).unwrap();
749
750 // This should fail
751 let result = pool.allocate(SIZE_1MIB);
752 assert!(result.is_err());
753 assert_eq!(result.unwrap_err().kind(), ErrorKind::OutOfMemory);
754 }
755
756 #[test]
757 fn test_invalid_size() {
758 let pool = BufferPoolBuilder::new().build();
759
760 // Size 0
761 let result = pool.allocate(0);
762 assert!(result.is_err());
763
764 // Size too large
765 let result = pool.allocate(SIZE_64MIB + 1);
766 assert!(result.is_err());
767 }
768
769 #[test]
770 fn test_buddy_splitting() {
771 let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
772
773 // Allocate 64 1MiB buffers (should all fit in one 64MiB block)
774 let buffers: Vec<_> = (0..64).map(|_| pool.allocate(SIZE_1MIB).unwrap()).collect();
775
776 assert_eq!(buffers.len(), 64);
777
778 // All buffers should have valid addresses within the 64MiB range
779 let base = buffers[0].as_ptr() as usize;
780 for (i, buf) in buffers.iter().enumerate() {
781 let addr = buf.as_ptr() as usize;
782 // Each buffer should be SIZE_1MIB apart (though order may vary)
783 assert!(addr >= base - SIZE_64MIB && addr < base + SIZE_64MIB);
784 assert_eq!(buf.len(), SIZE_1MIB);
785 // Verify the index is valid
786 assert!(i < 64);
787 }
788 }
789
790 #[test]
791 fn test_buddy_merging() {
792 let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
793
794 // Allocate 4 x 16MiB (fills entire 64MiB)
795 let b1 = pool.allocate(LEVEL_SIZES[2]).unwrap();
796 let b2 = pool.allocate(LEVEL_SIZES[2]).unwrap();
797 let b3 = pool.allocate(LEVEL_SIZES[2]).unwrap();
798 let b4 = pool.allocate(LEVEL_SIZES[2]).unwrap();
799
800 // Should be at limit now
801 assert!(pool.allocate(SIZE_1MIB).is_err());
802
803 // Free all 4 (should merge back to one 64MiB)
804 drop(b1);
805 drop(b2);
806 drop(b3);
807 drop(b4);
808
809 // Should be able to allocate 64MiB now
810 let b5 = pool.allocate(SIZE_64MIB).unwrap();
811 assert_eq!(b5.len(), SIZE_64MIB);
812 }
813
814 #[tokio::test]
815 async fn test_async_allocation() {
816 let pool = BufferPoolBuilder::new().build();
817 let buffer = pool.async_allocate(SIZE_1MIB).await.unwrap();
818 assert_eq!(buffer.len(), SIZE_1MIB);
819 }
820
821 #[tokio::test]
822 async fn test_async_allocation_waiting() {
823 use std::time::Duration;
824 use tokio::time::timeout;
825
826 let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
827
828 // Allocate the entire limit
829 let buffer = pool.async_allocate(SIZE_64MIB).await.unwrap();
830
831 let pool_clone = pool.clone();
832
833 // Start an async allocation that will have to wait
834 let handle = tokio::spawn(async move { pool_clone.async_allocate(SIZE_1MIB).await });
835
836 // Give the async allocation time to start waiting
837 tokio::time::sleep(Duration::from_millis(10)).await;
838
839 // Free the buffer
840 drop(buffer);
841
842 // The async allocation should complete
843 let result = timeout(Duration::from_secs(1), handle).await;
844 assert!(result.is_ok());
845 let buffer = result.unwrap().unwrap().unwrap();
846 assert_eq!(buffer.len(), SIZE_1MIB);
847 }
848
849 #[tokio::test]
850 async fn test_pool_stats() {
851 let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB * 2).build();
852
853 assert_eq!(pool.allocated_memory(), 0);
854 assert_eq!(pool.max_memory(), SIZE_64MIB * 2);
855
856 let _buffer = pool.async_allocate(SIZE_1MIB).await.unwrap();
857 assert_eq!(pool.allocated_memory(), SIZE_64MIB);
858 }
859
860 #[test]
861 fn test_buffer_write_read() {
862 let pool = BufferPoolBuilder::new().build();
863 let mut buffer = pool.allocate(SIZE_1MIB).unwrap();
864
865 // Write pattern
866 for (i, byte) in buffer.iter_mut().enumerate() {
867 *byte = (i % 256) as u8;
868 }
869
870 // Read back and verify
871 for (i, byte) in buffer.iter().enumerate() {
872 assert_eq!(*byte, (i % 256) as u8);
873 }
874 }
875
876 #[test]
877 fn test_multiple_pools() {
878 let pool1 = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
879 let pool2 = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
880
881 let b1 = pool1.allocate(SIZE_64MIB).unwrap();
882 let b2 = pool2.allocate(SIZE_64MIB).unwrap();
883
884 // Both should succeed as they're separate pools
885 assert_eq!(b1.len(), SIZE_64MIB);
886 assert_eq!(b2.len(), SIZE_64MIB);
887 }
888
889 #[test]
890 fn test_clone_pool() {
891 let pool = BufferPoolBuilder::new().max_memory(SIZE_64MIB).build();
892
893 let pool_clone = pool.clone();
894
895 let b1 = pool.allocate(SIZE_64MIB).unwrap();
896
897 // Clone shares the same state, so this should fail
898 let result = pool_clone.allocate(SIZE_1MIB);
899 assert!(result.is_err());
900
901 drop(b1);
902
903 // Now it should work
904 let _b2 = pool_clone.allocate(SIZE_1MIB).unwrap();
905 }
906}