1mod freelist;
5mod metrics;
6
7use std::{cell::UnsafeCell, marker::PhantomData, sync::TryLockError};
8
9use crate::{
10 circular_buffer::freelist::FreeList,
11 sync::{
12 atomic::{AtomicU8, AtomicUsize, Ordering},
13 Mutex, MutexGuard,
14 },
15 utils::Backoff,
16};
17
18use std::convert::Into;
19
20pub use self::metrics::CircularBufferMetrics;
21
22pub(crate) const CB_ALLOC_META_SIZE: usize = std::mem::size_of::<AllocMeta>();
23
24const BUFFER_ALIGNMENT: usize = 4096;
25
26#[repr(u8)]
27#[derive(Debug, PartialEq, Eq)]
28enum MetaState {
29 NotReady = 0,
30 Ready = 1,
31 Tombstone = 2,
32 BeginTombStone = 3,
33 FreeListed = 4,
34 Evicted = 5, }
36
37impl From<MetaState> for u8 {
38 fn from(state: MetaState) -> u8 {
39 state as u8
40 }
41}
42
43struct MetaRawState {
51 state: AtomicU8,
52}
53
54impl MetaRawState {
55 fn new_not_ready() -> Self {
56 Self {
57 state: AtomicU8::new(MetaState::NotReady.into()),
58 }
59 }
60 fn new_tombstoned() -> Self {
61 Self {
62 state: AtomicU8::new(MetaState::Tombstone.into()),
63 }
64 }
65
66 fn to_ready(&self) {
68 match self.state.compare_exchange(
69 MetaState::NotReady.into(),
70 MetaState::Ready.into(),
71 Ordering::AcqRel,
72 Ordering::Relaxed,
73 ) {
74 Ok(_) => {}
75 Err(v) => {
76 panic!(
77 "Meta state incorrect, expected {:?}, actual {}",
78 MetaState::NotReady,
79 v
80 );
81 }
82 }
83 }
84
85 fn try_begin_tombstone(&self) -> bool {
86 self.state
87 .compare_exchange(
88 MetaState::Ready.into(),
89 MetaState::BeginTombStone.into(),
90 Ordering::AcqRel,
91 Ordering::Relaxed,
92 )
93 .is_ok()
94 }
95
96 fn is_tombstoned(&self) -> bool {
97 self.load() == <MetaState as Into<u8>>::into(MetaState::Tombstone)
98 }
99
100 fn is_evicted(&self) -> bool {
101 self.load() == <MetaState as Into<u8>>::into(MetaState::Evicted)
102 }
103
104 fn is_freelisted(&self) -> bool {
105 self.load() == <MetaState as Into<u8>>::into(MetaState::FreeListed)
106 }
107
108 fn load(&self) -> u8 {
109 self.state.load(Ordering::Acquire)
110 }
111
112 fn state(&self) -> MetaState {
113 let v = self.load();
114 unsafe { std::mem::transmute(v) }
115 }
116
117 fn revert_to_ready(&self) {
118 match self.state.compare_exchange(
119 MetaState::BeginTombStone.into(),
120 MetaState::Ready.into(),
121 Ordering::AcqRel,
122 Ordering::Relaxed,
123 ) {
124 Ok(_) => {}
125 Err(v) => {
126 panic!(
127 "Meta state incorrect, expected {:?}, actual {}",
128 MetaState::BeginTombStone,
129 v
130 );
131 }
132 }
133 }
134
135 fn free_list_to_tombstone(&self) {
136 match self.state.compare_exchange(
137 MetaState::FreeListed.into(),
138 MetaState::Tombstone.into(),
139 Ordering::AcqRel,
140 Ordering::Relaxed,
141 ) {
142 Ok(_) => {}
143 Err(v) => {
144 panic!(
145 "Meta state incorrect, expected {:?}, actual {}",
146 MetaState::FreeListed,
147 v
148 );
149 }
150 }
151 }
152
153 fn to_freelist(&self) {
154 match self.state.compare_exchange(
155 MetaState::BeginTombStone.into(),
156 MetaState::FreeListed.into(),
157 Ordering::AcqRel,
158 Ordering::Relaxed,
159 ) {
160 Ok(_) => {}
161 Err(v) => {
162 panic!(
163 "Meta state incorrect, expected {:?}, actual {}",
164 MetaState::Ready,
165 v
166 );
167 }
168 }
169 }
170
171 fn to_tombstone(&self) {
173 match self.state.compare_exchange(
174 MetaState::BeginTombStone.into(),
175 MetaState::Tombstone.into(),
176 Ordering::AcqRel,
177 Ordering::Relaxed,
178 ) {
179 Ok(_) => {}
180 Err(v) => {
181 panic!(
182 "Meta state incorrect, expected {:?}, actual {}",
183 MetaState::BeginTombStone,
184 v
185 );
186 }
187 }
188 }
189
190 fn tombstone_to_evicted(&self) {
191 match self.state.compare_exchange(
192 MetaState::Tombstone.into(),
193 MetaState::Evicted.into(),
194 Ordering::AcqRel,
195 Ordering::Relaxed,
196 ) {
197 Ok(_) => {}
198 Err(v) => {
199 panic!(
200 "Meta state incorrect, expected {:?}, actual {}",
201 MetaState::Tombstone,
202 v
203 );
204 }
205 }
206 }
207}
208
209#[cfg(all(feature = "shuttle", test))]
210#[repr(C, align(256))]
211struct AllocMeta {
212 pub(crate) size: u32,
214
215 states: MetaRawState,
216}
217
218#[cfg(not(all(feature = "shuttle", test)))]
219#[repr(C, align(8))]
220struct AllocMeta {
221 pub(crate) size: u32,
223
224 states: MetaRawState,
225}
226
227impl AllocMeta {
228 fn new(size: u32, tombstone: bool) -> Self {
229 #[cfg(not(feature = "shuttle"))]
230 debug_assert_eq!(std::mem::size_of::<AllocMeta>(), 8);
231
232 let states = if tombstone {
233 MetaRawState::new_tombstoned()
234 } else {
235 MetaRawState::new_not_ready()
236 };
237
238 Self { size, states }
239 }
240
241 fn data_ptr(&self) -> *mut u8 {
242 unsafe { (self as *const Self as *mut u8).add(std::mem::size_of::<Self>()) }
243 }
244
245 fn state(&self) -> MetaState {
246 self.states.state()
247 }
248}
249
250fn align_up(addr: usize, align: usize) -> usize {
251 (addr + align - 1) & !(align - 1)
252}
253
254pub struct CircularBufferPtr<'a> {
260 ptr: *mut u8,
261 _pt: PhantomData<&'a ()>,
262}
263
264impl CircularBufferPtr<'_> {
265 fn new(ptr: *mut u8) -> Self {
266 Self {
267 ptr,
268 _pt: PhantomData,
269 }
270 }
271
272 pub fn as_ptr(&self) -> *mut u8 {
274 self.ptr
275 }
276}
277
278impl Drop for CircularBufferPtr<'_> {
279 fn drop(&mut self) {
280 let meta = CircularBuffer::get_meta_from_data_ptr(self.ptr);
282 meta.states.to_ready();
283 }
284}
285
286#[derive(Debug)]
290pub struct TombstoneHandle {
291 pub(crate) ptr: *mut u8,
292}
293
294impl TombstoneHandle {
295 fn into_ptr(self) -> *mut u8 {
296 let ptr = self.ptr;
297 std::mem::forget(self);
298 ptr
299 }
300
301 pub fn as_ptr(&self) -> *mut u8 {
302 self.ptr
303 }
304}
305
306impl Drop for TombstoneHandle {
307 fn drop(&mut self) {
308 let meta = CircularBuffer::get_meta_from_data_ptr(self.ptr);
309 meta.states.revert_to_ready();
310 }
311}
312
313#[derive(Debug)]
314pub enum CircularBufferError {
315 Full,
316 EmptyAlloc,
317 WouldBlock,
318}
319
320impl std::fmt::Display for CircularBufferError {
321 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322 match self {
323 CircularBufferError::Full => write!(f, "CircularBuffer is full"),
324 CircularBufferError::EmptyAlloc => write!(f, "Empty allocation"),
325 CircularBufferError::WouldBlock => write!(f, "Would block"),
326 }
327 }
328}
329
330impl std::error::Error for CircularBufferError {}
331
332#[derive(Debug)]
333struct States {
334 head_addr: AtomicUsize,
335 evicting_addr: usize,
336 tail_addr: usize,
337}
338
339impl States {
340 fn new() -> Self {
341 Self {
342 head_addr: AtomicUsize::new(0),
343 evicting_addr: 0,
344 tail_addr: 0,
345 }
346 }
347
348 fn head_addr(&self) -> usize {
349 self.head_addr.load(Ordering::Relaxed)
350 }
351
352 fn tail_addr(&self) -> usize {
353 self.tail_addr
354 }
355}
356
357#[derive(Debug)]
364pub struct CircularBuffer {
365 states: UnsafeCell<States>,
366 capacity: usize,
367 data_ptr: *mut u8,
368 lock: Mutex<()>,
369
370 free_list: FreeList,
371
372 check_tombstone_on_drop: bool,
374
375 copy_on_access_threshold: usize,
376}
377
378impl Drop for CircularBuffer {
379 fn drop(&mut self) {
380 if self.check_tombstone_on_drop {
381 let iter = self.iter().unwrap();
382 for meta in iter {
383 assert!(meta.states.is_tombstoned() || meta.states.is_freelisted());
384 }
385 }
386
387 let layout = std::alloc::Layout::from_size_align(self.capacity, BUFFER_ALIGNMENT).unwrap();
388 unsafe { std::alloc::dealloc(self.data_ptr, layout) };
389 }
390}
391
392impl CircularBuffer {
393 #[allow(clippy::too_many_arguments)]
406 pub fn new(
407 capacity: usize,
408 copy_on_access_percent: f64,
409 min_record_size: usize,
410 max_record_size: usize,
411 leaf_page_size: usize,
412 max_fence_len: usize,
413 pre_alloc_ptr: Option<*mut u8>,
414 cache_only: bool,
415 ) -> Self {
416 assert!(capacity.is_power_of_two());
417 assert!(capacity >= leaf_page_size + std::mem::size_of::<AllocMeta>());
419
420 let layout = std::alloc::Layout::from_size_align(capacity, BUFFER_ALIGNMENT).unwrap();
421 let ptr = match pre_alloc_ptr {
422 Some(p) => {
423 assert_eq!(layout.size(), capacity);
424 p
425 }
426 None => unsafe { std::alloc::alloc(layout) },
427 };
428
429 let copy_on_access_threshold = (capacity as f64 * (1.0 - copy_on_access_percent)) as usize;
430
431 Self {
432 states: UnsafeCell::new(States::new()),
433 capacity,
434 free_list: FreeList::new(
435 min_record_size,
436 max_record_size,
437 leaf_page_size,
438 max_fence_len,
439 cache_only,
440 ),
441 data_ptr: ptr,
442 lock: Mutex::new(()),
443 check_tombstone_on_drop: true,
444 copy_on_access_threshold,
445 }
446 }
447
448 pub fn get_metrics(&self) -> CircularBufferMetrics {
455 let (lock, states) = self.lock_states();
456
457 let mut metrics = CircularBufferMetrics::new(self.capacity, states);
458
459 let iter = AllocatedIter {
460 _lock: lock,
461 buffer: self,
462 head_addr: states.head_addr(),
463 tail_addr: states.tail_addr(),
464 };
465
466 let mut tombstone_size = 0;
467
468 for meta in iter {
469 match meta.state() {
470 MetaState::Ready => metrics.ready_cnt += 1,
471 MetaState::NotReady => metrics.not_ready_cnt += 1,
472 MetaState::Tombstone => {
473 metrics.tombstone_cnt += 1;
474 tombstone_size += meta.size as usize;
475 }
476 MetaState::BeginTombStone => metrics.begin_tombstone_cnt += 1,
477 MetaState::FreeListed => metrics.free_listed_cnt += 1,
478 MetaState::Evicted => metrics.evicted_cnt += 1,
479 }
480 metrics.allocated_cnt += 1;
481 let alloc_size = meta.size as usize;
482 metrics
483 .size_cnt
484 .entry(alloc_size)
485 .and_modify(|v| *v += 1)
486 .or_insert(1);
487 }
488 metrics.tombstone_size = tombstone_size;
489 metrics
490 }
491
492 #[allow(clippy::mut_from_ref)]
493 fn try_get_states(&self) -> Result<(MutexGuard<'_, ()>, &mut States), CircularBufferError> {
494 let lock = match self.lock.try_lock() {
495 Ok(v) => v,
496 Err(TryLockError::Poisoned(_)) => {
497 panic!("Poisoned lock")
498 }
499 Err(TryLockError::WouldBlock) => return Err(CircularBufferError::WouldBlock),
500 };
501
502 let states = unsafe { &mut *self.states.get() };
503 Ok((lock, states))
504 }
505
506 #[allow(clippy::mut_from_ref)]
507 fn lock_states(&self) -> (MutexGuard<'_, ()>, &mut States) {
508 (self.lock.lock().unwrap(), unsafe {
509 &mut *self.states.get()
510 })
511 }
512
513 #[cfg_attr(feature = "tracing", tracing::instrument)]
529 pub fn alloc(&self, size: usize) -> Result<CircularBufferPtr<'_>, CircularBufferError> {
530 if size == 0 {
531 return Err(CircularBufferError::EmptyAlloc);
532 }
533
534 assert!(size >= self.free_list.size_classes[self.free_list.size_classes.len() - 1]);
536
537 let (lock_guard, states) = self.lock_states();
538
539 while let Some(ptr) = self.free_list.remove(size) {
540 let raw_ptr: *mut u8 = ptr.as_ptr();
541
542 let old_meta = CircularBuffer::get_meta_from_data_ptr(raw_ptr);
543
544 if self.ptr_is_copy_on_access(raw_ptr) {
545 old_meta.states.free_list_to_tombstone();
548 continue;
550 }
551
552 assert!(old_meta.size as usize >= size);
553
554 match old_meta.states.state.compare_exchange_weak(
556 MetaState::FreeListed.into(),
557 MetaState::NotReady.into(),
558 Ordering::AcqRel,
559 Ordering::Relaxed,
560 ) {
561 Ok(_) => {
562 return Ok(CircularBufferPtr::new(raw_ptr));
563 }
564 Err(_) => {
565 continue;
566 }
567 };
568 }
569
570 let logical_remaining = self.capacity - (states.tail_addr() - states.head_addr()); let physical_remaining = self.capacity - (states.tail_addr & (self.capacity - 1)); let aligned_size = align_up(size, CB_ALLOC_META_SIZE);
574 let required = aligned_size + std::mem::size_of::<AllocMeta>();
575
576 if logical_remaining < required {
577 return Err(CircularBufferError::Full);
578 }
579
580 if physical_remaining < required {
581 assert!(physical_remaining >= CB_ALLOC_META_SIZE);
583 let physical_addr = self.logical_to_physical(states.tail_addr);
584 let meta = AllocMeta::new((physical_remaining - CB_ALLOC_META_SIZE) as u32, true);
585 unsafe {
586 physical_addr.cast::<AllocMeta>().write(meta);
587 }
588 states.tail_addr += physical_remaining;
589 std::mem::drop(lock_guard);
590 return self.alloc(size);
591 }
592
593 let meta = AllocMeta::new(aligned_size as u32, false);
594
595 unsafe {
596 let physical_addr = self.logical_to_physical(states.tail_addr);
597 physical_addr.cast::<AllocMeta>().write(meta);
598 }
599 let return_addr = states.tail_addr + std::mem::size_of::<AllocMeta>();
600 states.tail_addr += required;
601
602 let ptr = CircularBufferPtr::new(self.logical_to_physical(return_addr));
603 Ok(ptr)
604 }
605
606 fn logical_to_physical(&self, addr: usize) -> *mut u8 {
607 let offset = addr & (self.capacity - 1);
608 unsafe { self.data_ptr.add(offset) }
609 }
610
611 fn debug_check_ptr_is_from_me(&self, ptr: *mut u8) {
612 let offset = ptr as usize - self.data_ptr as usize;
613 debug_assert!(offset <= self.capacity);
614 }
615
616 pub fn ptr_is_copy_on_access(&self, ptr: *mut u8) -> bool {
622 let distance = self.distance_to_tail(ptr);
623 distance >= self.copy_on_access_threshold
624 }
625
626 fn distance_to_tail(&self, ptr: *mut u8) -> usize {
627 let ptr_usize = ptr as usize;
628 let tail_ptr = self.logical_to_physical(self.get_fuzzy_tail_addr());
629 let tail_usize = tail_ptr as usize;
630
631 if tail_usize >= ptr_usize {
632 tail_usize - ptr_usize
633 } else {
634 self.capacity - (ptr_usize - tail_usize)
635 }
636 }
637
638 fn get_fuzzy_tail_addr(&self) -> usize {
639 unsafe { &*self.states.get() }.tail_addr()
640 }
641
642 #[allow(dead_code)]
648 pub(crate) unsafe fn addr_is_tombstoned(addr: *mut u8) -> bool {
649 let meta = CircularBuffer::get_meta_from_data_ptr(addr);
650
651 meta.states.is_tombstoned()
652 }
653
654 #[cfg_attr(feature = "tracing", tracing::instrument)]
671 pub fn dealloc(&self, ptr: TombstoneHandle) {
672 self.dealloc_inner(ptr, true);
673 }
674
675 fn dealloc_inner(&self, ptr: TombstoneHandle, add_to_freelist: bool) {
676 self.debug_check_ptr_is_from_me(ptr.as_ptr());
677 let ptr = ptr.into_ptr();
678 let meta = CircularBuffer::get_meta_from_data_ptr(ptr);
679
680 if !add_to_freelist || self.ptr_is_copy_on_access(ptr) {
681 meta.states.to_tombstone();
682 return;
683 }
684
685 match self.free_list.try_add(ptr, meta.size as usize) {
687 Ok(_lock) => {
688 meta.states.to_freelist();
689 }
690 Err(_) => {
691 meta.states.to_tombstone();
692 }
693 }
694 }
695
696 pub unsafe fn check_ptr_is_ready(ptr: *mut u8) {
701 let meta = CircularBuffer::get_meta_from_data_ptr(ptr);
702
703 assert!(meta.states.state() == MetaState::Ready);
704 }
705
706 pub unsafe fn acquire_exclusive_dealloc_handle(
736 &self,
737 ptr: *mut u8,
738 ) -> Result<TombstoneHandle, CircularBufferError> {
739 self.debug_check_ptr_is_from_me(ptr);
740
741 let meta = CircularBuffer::get_meta_from_data_ptr(ptr);
742
743 if meta.states.try_begin_tombstone() {
744 Ok(TombstoneHandle { ptr })
745 } else {
746 Err(CircularBufferError::WouldBlock)
747 }
748 }
749
750 fn iter(&self) -> Result<AllocatedIter<'_>, CircularBufferError> {
753 let (lock, states) = self.try_get_states()?;
754 Ok(AllocatedIter {
755 _lock: lock,
756 buffer: self,
757 head_addr: states.head_addr(),
758 tail_addr: states.tail_addr(),
759 })
760 }
761
762 pub fn evict_n<T>(&self, n: usize, mut callback: T) -> Result<u32, CircularBufferError>
802 where
803 T: FnMut(TombstoneHandle) -> Result<TombstoneHandle, TombstoneHandle>,
804 {
805 let mut cur_n = 0;
806 let mut cur_evicted = 0;
807 while cur_n < n {
808 let evicted = self.evict_one(&mut callback);
809 match evicted {
810 None => return Ok(cur_evicted),
811 Some(v) => {
812 cur_evicted += v;
813 cur_n += 1;
814 }
815 }
816 }
817 Ok(cur_evicted)
818 }
819
820 fn get_meta(&self, logical_address: usize) -> &AllocMeta {
821 let ptr = self.logical_to_physical(logical_address);
822 self.debug_check_ptr_is_from_me(ptr);
823 let meta_ptr = ptr.cast::<AllocMeta>();
824 unsafe { &*meta_ptr }
825 }
826
827 fn get_meta_from_data_ptr<'a>(data_ptr: *mut u8) -> &'a AllocMeta {
828 debug_assert_eq!(data_ptr as usize % 8, 0);
829 let meta_ptr = unsafe { data_ptr.sub(CB_ALLOC_META_SIZE) } as *mut AllocMeta;
830 unsafe { &*meta_ptr }
831 }
832
833 #[cfg_attr(feature = "tracing", tracing::instrument)]
834 fn try_bump_head_address_to_evicting_addr(
835 &self,
836 states: &mut States,
837 ) -> Result<u32, CircularBufferError> {
838 let mut head_addr = states.head_addr();
839 let old_addr = head_addr;
840 let evicting_addr = states.evicting_addr;
841 while head_addr < evicting_addr {
842 let meta = self.get_meta(head_addr);
843 if !meta.states.is_evicted() {
844 #[cfg(all(feature = "shuttle", test))]
845 {
846 shuttle::thread::yield_now();
847 }
848 return Err(CircularBufferError::WouldBlock);
849 }
850
851 let to_add = meta.size as usize + CB_ALLOC_META_SIZE;
852 states.head_addr.fetch_add(to_add, Ordering::Relaxed);
853 head_addr += to_add;
854 }
855 Ok((head_addr - old_addr) as u32)
856 }
857
858 pub fn drain<T>(&self, mut callback: T)
860 where
861 T: FnMut(TombstoneHandle) -> Result<TombstoneHandle, TombstoneHandle>,
862 {
863 loop {
864 let evicted = self.evict_one(&mut callback);
865 if evicted.is_none() {
866 break;
867 }
868 }
869 let backoff = Backoff::new();
870 let (_lock, states) = self.lock_states();
871 loop {
872 if self.try_bump_head_address_to_evicting_addr(states).is_ok() {
873 assert_eq!(states.evicting_addr, states.head_addr());
874 assert_eq!(states.head_addr(), states.tail_addr());
875 return;
876 } else {
877 backoff.snooze();
878 }
879 }
880 }
881
882 pub fn evict_one<T>(&self, callback: &mut T) -> Option<u32>
892 where
893 T: FnMut(TombstoneHandle) -> Result<TombstoneHandle, TombstoneHandle>,
894 {
895 let (start_addr, end_addr) = {
896 let (lock, states) = self.lock_states();
897
898 let evicting_addr = states.evicting_addr;
899
900 if evicting_addr == states.tail_addr() {
901 #[cfg(all(feature = "shuttle", test))]
903 {
904 shuttle::thread::yield_now();
905 }
906 return None;
907 }
908
909 let evicting_meta = self.get_meta(evicting_addr);
910 let size = evicting_meta.size as usize;
911
912 let advance = size + CB_ALLOC_META_SIZE;
913
914 states.evicting_addr += advance;
915 drop(lock);
916 (evicting_addr, evicting_addr + advance)
917 };
918
919 let meta = self.get_meta(start_addr);
920 let data_ptr = meta.data_ptr();
921
922 let backoff = Backoff::new();
923
924 loop {
926 let h = unsafe { self.acquire_exclusive_dealloc_handle(data_ptr) };
927 match h {
928 Ok(v) => {
929 match callback(v) {
930 Ok(h) => {
931 self.dealloc_inner(h, false);
932 meta.states.tombstone_to_evicted();
933 break;
934 }
935 Err(h) => {
936 drop(h);
937 backoff.spin();
938 }
939 };
940 }
941 Err(_) => {
942 let state = meta.states.state();
943
944 if state == MetaState::NotReady {
945 } else {
947 if state == MetaState::Tombstone {
948 meta.states.tombstone_to_evicted();
949 break;
950 }
951 if state == MetaState::FreeListed {
952 let found =
953 self.free_list.find_and_remove(data_ptr, meta.size as usize);
954 if found {
955 meta.states.free_list_to_tombstone();
956 meta.states.tombstone_to_evicted();
957 break;
958 }
959 }
960 }
961 backoff.spin();
962 }
963 }
964 }
965
966 let (_lock, states) = self.lock_states();
967 _ = self.try_bump_head_address_to_evicting_addr(states);
968 Some((end_addr - start_addr) as u32)
969 }
970}
971
972struct AllocatedIter<'a> {
978 _lock: MutexGuard<'a, ()>,
979 buffer: &'a CircularBuffer,
980 head_addr: usize,
981 tail_addr: usize,
982}
983
984impl<'a> Iterator for AllocatedIter<'a> {
985 type Item = &'a AllocMeta;
986
987 fn next(&mut self) -> Option<Self::Item> {
988 if self.head_addr == self.tail_addr {
989 return None;
990 }
991
992 let meta = self.buffer.get_meta(self.head_addr);
993
994 let size = meta.size as usize;
995 let advance = size + CB_ALLOC_META_SIZE;
996
997 self.head_addr += advance;
998
999 Some(meta)
1000 }
1001}
1002
1003#[cfg(test)]
1004mod tests {
1005 use super::*;
1006 use crate::{BfTree, Config};
1007 use rstest::rstest;
1008
1009 #[rstest]
1010 #[case(64, 1952, 4096)] #[case(3072, 3072, 8192)] #[case(64, 2048, 16384)] fn test_circular_buffer_initialization(
1014 #[case] min_record_size: usize,
1015 #[case] max_record_size: usize,
1016 #[case] leaf_page_size: usize,
1017 ) {
1018 let capacity = leaf_page_size * 2; let buffer = CircularBuffer::new(
1020 capacity,
1021 0.1,
1022 min_record_size,
1023 max_record_size,
1024 leaf_page_size,
1025 32,
1026 None,
1027 false,
1028 );
1029
1030 let (_lock, states) = buffer.try_get_states().unwrap();
1031 assert_eq!(states.head_addr(), 0);
1032 assert_eq!(states.tail_addr(), 0);
1033 assert!(!buffer.data_ptr.is_null());
1034 assert_eq!(buffer.capacity, capacity);
1035 }
1036
1037 #[rstest]
1038 #[case(64, 1952, 4096, false)] #[case(3072, 3072, 8192, false)] #[case(64, 2048, 16384, true)] fn test_circular_buffer_alloc_and_dealloc(
1042 #[case] min_record_size: usize,
1043 #[case] max_record_size: usize,
1044 #[case] leaf_page_size: usize,
1045 #[case] pre_allocated_buffer: bool,
1046 ) {
1047 let buffer_ptr = if pre_allocated_buffer {
1048 let layout =
1049 std::alloc::Layout::from_size_align(leaf_page_size * 2, BUFFER_ALIGNMENT).unwrap();
1050 let ptr = unsafe { std::alloc::alloc(layout) };
1051 Some(ptr)
1052 } else {
1053 None
1054 };
1055
1056 let buffer = CircularBuffer::new(
1057 leaf_page_size * 2,
1058 0.1,
1059 min_record_size,
1060 max_record_size,
1061 leaf_page_size,
1062 32,
1063 buffer_ptr,
1064 false,
1065 );
1066
1067 let mini_page_size = vec![
1069 buffer.free_list.size_classes[0],
1070 buffer.free_list.size_classes[buffer.free_list.size_classes.len() - 1],
1071 ];
1072
1073 for i in 0..mini_page_size.len() {
1074 let size = mini_page_size[i]; let alloc_ptr = buffer.alloc(size).expect("Allocation failed").ptr;
1076 assert!(!alloc_ptr.is_null());
1077
1078 unsafe {
1079 let p = buffer.acquire_exclusive_dealloc_handle(alloc_ptr).unwrap();
1080 buffer.dealloc(p);
1081 }
1082
1083 let meta = CircularBuffer::get_meta_from_data_ptr(alloc_ptr);
1085 assert!(meta.states.is_tombstoned() || meta.states.is_freelisted());
1086 }
1087 }
1088
1089 #[rstest]
1090 #[case(32, 1952, 4096)] #[case(3072, 3072, 8192)] #[case(64, 2048, 16384)] fn test_circular_buffer_evict_n(
1094 #[case] min_record_size: usize,
1095 #[case] max_record_size: usize,
1096 #[case] leaf_page_size: usize,
1097 ) {
1098 let buffer = CircularBuffer::new(
1099 leaf_page_size * 2,
1100 0.1,
1101 min_record_size,
1102 max_record_size,
1103 leaf_page_size,
1104 32,
1105 None,
1106 false,
1107 );
1108 let size = buffer.free_list.size_classes[0]; let _ = buffer.alloc(size).expect("Allocation failed");
1112 let bytes_advanced = buffer.evict_n(1, |h| Ok(h)).unwrap() as usize;
1113
1114 assert_eq!(
1115 bytes_advanced,
1116 align_up(size, CB_ALLOC_META_SIZE) + CB_ALLOC_META_SIZE
1117 );
1118 }
1119
1120 #[test]
1121 fn test_circular_buffer_evict_more_than_present() {
1122 let buffer = CircularBuffer::new(4096 * 2, 0.1, 64, 1952, 4096, 64, None, true);
1123
1124 let bytes_advanced = buffer.evict_n(10, |h| Ok(h)).unwrap();
1126 assert_eq!(bytes_advanced, 0);
1127 }
1128
1129 #[test]
1130 fn test_align_up_function() {
1131 let addr = 123;
1132 let align = 8;
1133 let aligned_addr = align_up(addr, align);
1134
1135 assert_eq!(aligned_addr % align, 0);
1136 }
1137
1138 #[test]
1139 fn alloc_and_evict() {
1140 let buffer = CircularBuffer::new(4096 * 2, 0.1, 64, 1952, 4096, 32, None, true);
1141
1142 for _i in 0..3 {
1144 let alloc = buffer.alloc(2048).unwrap();
1145 unsafe { *alloc.as_ptr() = 42 };
1146 drop(alloc);
1147 }
1148
1149 let not_allocated = buffer.alloc(2048);
1151 assert!(matches!(not_allocated, Err(CircularBufferError::Full)));
1152 drop(not_allocated);
1153
1154 buffer
1156 .evict_n(usize::MAX, |h| {
1157 assert_eq!(unsafe { *(h.as_ptr()) }, 42);
1158 Ok(h)
1159 })
1160 .unwrap();
1161
1162 let allocated = buffer.alloc(2048).unwrap();
1164 let ptr = allocated.as_ptr();
1165 drop(allocated);
1166 unsafe {
1167 let p = buffer.acquire_exclusive_dealloc_handle(ptr).unwrap();
1168 buffer.dealloc(p);
1169 }
1170 }
1171
1172 #[test]
1173 fn idential_mini_page_classes() {
1174 let mut config = Config::default();
1176 config.cb_max_record_size(1928);
1177
1178 let mut tree = BfTree::with_config(config.clone(), None);
1179
1180 let a = tree.mini_page_size_classes.clone();
1181 let mut b = tree.storage.circular_buffer.free_list.size_classes.clone();
1182 b.reverse();
1183 assert_eq!(a, b);
1184 drop(tree);
1185
1186 config.cache_only = true;
1187 tree = BfTree::with_config(config.clone(), None);
1188 let c = tree.mini_page_size_classes.clone();
1189 let mut d = tree.storage.circular_buffer.free_list.size_classes.clone();
1190 d.reverse();
1191 assert_eq!(c, d);
1192 assert_eq!(a, c);
1193
1194 drop(tree);
1195 }
1196}