bf_tree/circular_buffer/
mod.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4mod freelist;
5mod metrics;
6
7use std::{cell::UnsafeCell, marker::PhantomData, sync::TryLockError};
8
9use crate::{
10    circular_buffer::freelist::FreeList,
11    sync::{
12        atomic::{AtomicU8, AtomicUsize, Ordering},
13        Mutex, MutexGuard,
14    },
15    utils::Backoff,
16};
17
18use std::convert::Into;
19
20pub use self::metrics::CircularBufferMetrics;
21
22pub(crate) const CB_ALLOC_META_SIZE: usize = std::mem::size_of::<AllocMeta>();
23
24const BUFFER_ALIGNMENT: usize = 4096;
25
26#[repr(u8)]
27#[derive(Debug, PartialEq, Eq)]
28enum MetaState {
29    NotReady = 0,
30    Ready = 1,
31    Tombstone = 2,
32    BeginTombStone = 3,
33    FreeListed = 4,
34    Evicted = 5, // the head address can only pass this memory if it is evicted.
35}
36
37impl From<MetaState> for u8 {
38    fn from(state: MetaState) -> u8 {
39        state as u8
40    }
41}
42
43/// Life cycle of a piece of memory in the circular buffer:
44/// 1. NOT_READY (allocated)
45/// 2. READY (used by caller, i.e., the `CircularBufferPtr` is dropped)
46/// 3. BEGIN_TOMBSTONE (before deallocating/evicting the memory, it is essentially a x-lock, whoever wins gets to deallocate/evict)
47/// 4. TOMBSTONE (not accessible to any thread, free to be reused)
48///
49/// You can only be in one state at any given moment.
50struct MetaRawState {
51    state: AtomicU8,
52}
53
54impl MetaRawState {
55    fn new_not_ready() -> Self {
56        Self {
57            state: AtomicU8::new(MetaState::NotReady.into()),
58        }
59    }
60    fn new_tombstoned() -> Self {
61        Self {
62            state: AtomicU8::new(MetaState::Tombstone.into()),
63        }
64    }
65
66    /// Will panic if the old state is not `META_STATE_NOT_READY`
67    fn to_ready(&self) {
68        match self.state.compare_exchange(
69            MetaState::NotReady.into(),
70            MetaState::Ready.into(),
71            Ordering::AcqRel,
72            Ordering::Relaxed,
73        ) {
74            Ok(_) => {}
75            Err(v) => {
76                panic!(
77                    "Meta state incorrect, expected {:?}, actual {}",
78                    MetaState::NotReady,
79                    v
80                );
81            }
82        }
83    }
84
85    fn try_begin_tombstone(&self) -> bool {
86        self.state
87            .compare_exchange(
88                MetaState::Ready.into(),
89                MetaState::BeginTombStone.into(),
90                Ordering::AcqRel,
91                Ordering::Relaxed,
92            )
93            .is_ok()
94    }
95
96    fn is_tombstoned(&self) -> bool {
97        self.load() == <MetaState as Into<u8>>::into(MetaState::Tombstone)
98    }
99
100    fn is_evicted(&self) -> bool {
101        self.load() == <MetaState as Into<u8>>::into(MetaState::Evicted)
102    }
103
104    fn is_freelisted(&self) -> bool {
105        self.load() == <MetaState as Into<u8>>::into(MetaState::FreeListed)
106    }
107
108    fn load(&self) -> u8 {
109        self.state.load(Ordering::Acquire)
110    }
111
112    fn state(&self) -> MetaState {
113        let v = self.load();
114        unsafe { std::mem::transmute(v) }
115    }
116
117    fn revert_to_ready(&self) {
118        match self.state.compare_exchange(
119            MetaState::BeginTombStone.into(),
120            MetaState::Ready.into(),
121            Ordering::AcqRel,
122            Ordering::Relaxed,
123        ) {
124            Ok(_) => {}
125            Err(v) => {
126                panic!(
127                    "Meta state incorrect, expected {:?}, actual {}",
128                    MetaState::BeginTombStone,
129                    v
130                );
131            }
132        }
133    }
134
135    fn free_list_to_tombstone(&self) {
136        match self.state.compare_exchange(
137            MetaState::FreeListed.into(),
138            MetaState::Tombstone.into(),
139            Ordering::AcqRel,
140            Ordering::Relaxed,
141        ) {
142            Ok(_) => {}
143            Err(v) => {
144                panic!(
145                    "Meta state incorrect, expected {:?}, actual {}",
146                    MetaState::FreeListed,
147                    v
148                );
149            }
150        }
151    }
152
153    fn to_freelist(&self) {
154        match self.state.compare_exchange(
155            MetaState::BeginTombStone.into(),
156            MetaState::FreeListed.into(),
157            Ordering::AcqRel,
158            Ordering::Relaxed,
159        ) {
160            Ok(_) => {}
161            Err(v) => {
162                panic!(
163                    "Meta state incorrect, expected {:?}, actual {}",
164                    MetaState::Ready,
165                    v
166                );
167            }
168        }
169    }
170
171    /// Previous state must be `META_STATE_BEGIN_TOMBSTONE`
172    fn to_tombstone(&self) {
173        match self.state.compare_exchange(
174            MetaState::BeginTombStone.into(),
175            MetaState::Tombstone.into(),
176            Ordering::AcqRel,
177            Ordering::Relaxed,
178        ) {
179            Ok(_) => {}
180            Err(v) => {
181                panic!(
182                    "Meta state incorrect, expected {:?}, actual {}",
183                    MetaState::BeginTombStone,
184                    v
185                );
186            }
187        }
188    }
189
190    fn tombstone_to_evicted(&self) {
191        match self.state.compare_exchange(
192            MetaState::Tombstone.into(),
193            MetaState::Evicted.into(),
194            Ordering::AcqRel,
195            Ordering::Relaxed,
196        ) {
197            Ok(_) => {}
198            Err(v) => {
199                panic!(
200                    "Meta state incorrect, expected {:?}, actual {}",
201                    MetaState::Tombstone,
202                    v
203                );
204            }
205        }
206    }
207}
208
209#[cfg(all(feature = "shuttle", test))]
210#[repr(C, align(256))]
211struct AllocMeta {
212    /// the allocated size, not including the meta itself
213    pub(crate) size: u32,
214
215    states: MetaRawState,
216}
217
218#[cfg(not(all(feature = "shuttle", test)))]
219#[repr(C, align(8))]
220struct AllocMeta {
221    /// the allocated size, not including the meta itself
222    pub(crate) size: u32,
223
224    states: MetaRawState,
225}
226
227impl AllocMeta {
228    fn new(size: u32, tombstone: bool) -> Self {
229        #[cfg(not(feature = "shuttle"))]
230        debug_assert_eq!(std::mem::size_of::<AllocMeta>(), 8);
231
232        let states = if tombstone {
233            MetaRawState::new_tombstoned()
234        } else {
235            MetaRawState::new_not_ready()
236        };
237
238        Self { size, states }
239    }
240
241    fn data_ptr(&self) -> *mut u8 {
242        unsafe { (self as *const Self as *mut u8).add(std::mem::size_of::<Self>()) }
243    }
244
245    fn state(&self) -> MetaState {
246        self.states.state()
247    }
248}
249
250fn align_up(addr: usize, align: usize) -> usize {
251    (addr + align - 1) & !(align - 1)
252}
253
254/// The guard returned by [CircularBuffer::alloc].
255/// While this guard is being hold, the allocated memory is not allowed to be evicted.
256/// This means that you may block the evicting thread if you hold this guard for too long.
257///
258/// The lock will be released once the guard is dropped.
259pub struct CircularBufferPtr<'a> {
260    ptr: *mut u8,
261    _pt: PhantomData<&'a ()>,
262}
263
264impl CircularBufferPtr<'_> {
265    fn new(ptr: *mut u8) -> Self {
266        Self {
267            ptr,
268            _pt: PhantomData,
269        }
270    }
271
272    /// Get the actual pointer to the allocated memory.
273    pub fn as_ptr(&self) -> *mut u8 {
274        self.ptr
275    }
276}
277
278impl Drop for CircularBufferPtr<'_> {
279    fn drop(&mut self) {
280        // set can be evicted to true
281        let meta = CircularBuffer::get_meta_from_data_ptr(self.ptr);
282        meta.states.to_ready();
283    }
284}
285
286/// This is a opaque handle that you can use to deallocate the ptr.
287/// You need to acquire this handle by [CircularBuffer::acquire_exclusive_dealloc_handle] before [CircularBuffer::dealloc] any memory.
288///
289#[derive(Debug)]
290pub struct TombstoneHandle {
291    pub(crate) ptr: *mut u8,
292}
293
294impl TombstoneHandle {
295    fn into_ptr(self) -> *mut u8 {
296        let ptr = self.ptr;
297        std::mem::forget(self);
298        ptr
299    }
300
301    pub fn as_ptr(&self) -> *mut u8 {
302        self.ptr
303    }
304}
305
306impl Drop for TombstoneHandle {
307    fn drop(&mut self) {
308        let meta = CircularBuffer::get_meta_from_data_ptr(self.ptr);
309        meta.states.revert_to_ready();
310    }
311}
312
313#[derive(Debug)]
314pub enum CircularBufferError {
315    Full,
316    EmptyAlloc,
317    WouldBlock,
318}
319
320impl std::fmt::Display for CircularBufferError {
321    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
322        match self {
323            CircularBufferError::Full => write!(f, "CircularBuffer is full"),
324            CircularBufferError::EmptyAlloc => write!(f, "Empty allocation"),
325            CircularBufferError::WouldBlock => write!(f, "Would block"),
326        }
327    }
328}
329
330impl std::error::Error for CircularBufferError {}
331
332#[derive(Debug)]
333struct States {
334    head_addr: AtomicUsize,
335    evicting_addr: usize,
336    tail_addr: usize,
337}
338
339impl States {
340    fn new() -> Self {
341        Self {
342            head_addr: AtomicUsize::new(0),
343            evicting_addr: 0,
344            tail_addr: 0,
345        }
346    }
347
348    fn head_addr(&self) -> usize {
349        self.head_addr.load(Ordering::Relaxed)
350    }
351
352    fn tail_addr(&self) -> usize {
353        self.tail_addr
354    }
355}
356
357/// The circular buffer inspired by FASTER's ring buffer.
358/// It acts mostly like a variable length buffer pool, except that evicting entires are handled by the callback.
359///
360///
361/// Getting this to be correct is quite challenging, especially that we need to support concurrent allocation/deallocation/eviction,
362/// and we don't want a big lock on everything.
363#[derive(Debug)]
364pub struct CircularBuffer {
365    states: UnsafeCell<States>,
366    capacity: usize,
367    data_ptr: *mut u8,
368    lock: Mutex<()>,
369
370    free_list: FreeList,
371
372    /// if true, when dropping the circular buffer, we will check and panic if there is any elements that are not marked as tombstone.
373    check_tombstone_on_drop: bool,
374
375    copy_on_access_threshold: usize,
376}
377
378impl Drop for CircularBuffer {
379    fn drop(&mut self) {
380        if self.check_tombstone_on_drop {
381            let iter = self.iter().unwrap();
382            for meta in iter {
383                assert!(meta.states.is_tombstoned() || meta.states.is_freelisted());
384            }
385        }
386
387        let layout = std::alloc::Layout::from_size_align(self.capacity, BUFFER_ALIGNMENT).unwrap();
388        unsafe { std::alloc::dealloc(self.data_ptr, layout) };
389    }
390}
391
392impl CircularBuffer {
393    /// Create a new circular buffer with the given capacity, the capacity has to be a power of two and large enough to hold at least one leaf page.
394    ///
395    /// TODO: I don't like the fact that we require users to set cache capacity to be power of two just because it is easy to do modulo.
396    /// We should actually investigate how much performance is actually gained by requiring power of two.
397    ///
398    /// TODO: I don't think we should ever expose the `copy_on_access_percent` to user, it is an internal implementation detail.
399    ///
400    ///
401    /// ```
402    /// use bf_tree::circular_buffer::CircularBuffer;
403    /// let buffer = CircularBuffer::new(4096 * 2, 0.1, 64, 1952, 4096, 32, None, false);
404    /// ```
405    #[allow(clippy::too_many_arguments)]
406    pub fn new(
407        capacity: usize,
408        copy_on_access_percent: f64,
409        min_record_size: usize,
410        max_record_size: usize,
411        leaf_page_size: usize,
412        max_fence_len: usize,
413        pre_alloc_ptr: Option<*mut u8>,
414        cache_only: bool,
415    ) -> Self {
416        assert!(capacity.is_power_of_two());
417        // It needs to accomodate at least one full page
418        assert!(capacity >= leaf_page_size + std::mem::size_of::<AllocMeta>());
419
420        let layout = std::alloc::Layout::from_size_align(capacity, BUFFER_ALIGNMENT).unwrap();
421        let ptr = match pre_alloc_ptr {
422            Some(p) => {
423                assert_eq!(layout.size(), capacity);
424                p
425            }
426            None => unsafe { std::alloc::alloc(layout) },
427        };
428
429        let copy_on_access_threshold = (capacity as f64 * (1.0 - copy_on_access_percent)) as usize;
430
431        Self {
432            states: UnsafeCell::new(States::new()),
433            capacity,
434            free_list: FreeList::new(
435                min_record_size,
436                max_record_size,
437                leaf_page_size,
438                max_fence_len,
439                cache_only,
440            ),
441            data_ptr: ptr,
442            lock: Mutex::new(()),
443            check_tombstone_on_drop: true,
444            copy_on_access_threshold,
445        }
446    }
447
448    /// Returns the metrics of CircularBuffer.
449    /// Note that this is a very slow, exclusive operation,
450    /// it essentially stops all other operations,
451    /// so use it with caution.
452    ///
453    /// You should only use it for debugging and testing.
454    pub fn get_metrics(&self) -> CircularBufferMetrics {
455        let (lock, states) = self.lock_states();
456
457        let mut metrics = CircularBufferMetrics::new(self.capacity, states);
458
459        let iter = AllocatedIter {
460            _lock: lock,
461            buffer: self,
462            head_addr: states.head_addr(),
463            tail_addr: states.tail_addr(),
464        };
465
466        let mut tombstone_size = 0;
467
468        for meta in iter {
469            match meta.state() {
470                MetaState::Ready => metrics.ready_cnt += 1,
471                MetaState::NotReady => metrics.not_ready_cnt += 1,
472                MetaState::Tombstone => {
473                    metrics.tombstone_cnt += 1;
474                    tombstone_size += meta.size as usize;
475                }
476                MetaState::BeginTombStone => metrics.begin_tombstone_cnt += 1,
477                MetaState::FreeListed => metrics.free_listed_cnt += 1,
478                MetaState::Evicted => metrics.evicted_cnt += 1,
479            }
480            metrics.allocated_cnt += 1;
481            let alloc_size = meta.size as usize;
482            metrics
483                .size_cnt
484                .entry(alloc_size)
485                .and_modify(|v| *v += 1)
486                .or_insert(1);
487        }
488        metrics.tombstone_size = tombstone_size;
489        metrics
490    }
491
492    #[allow(clippy::mut_from_ref)]
493    fn try_get_states(&self) -> Result<(MutexGuard<'_, ()>, &mut States), CircularBufferError> {
494        let lock = match self.lock.try_lock() {
495            Ok(v) => v,
496            Err(TryLockError::Poisoned(_)) => {
497                panic!("Poisoned lock")
498            }
499            Err(TryLockError::WouldBlock) => return Err(CircularBufferError::WouldBlock),
500        };
501
502        let states = unsafe { &mut *self.states.get() };
503        Ok((lock, states))
504    }
505
506    #[allow(clippy::mut_from_ref)]
507    fn lock_states(&self) -> (MutexGuard<'_, ()>, &mut States) {
508        (self.lock.lock().unwrap(), unsafe {
509            &mut *self.states.get()
510        })
511    }
512
513    /// Allocate a piece of memory from the circular buffer, returns a guard that will panic if not used.
514    ///
515    /// Ignores alignment, always align to 8
516    /// Returns None if we have no free space, which caller needs to call [CircularBuffer::evict_one] or [CircularBuffer::evict_n].
517    ///
518    /// ```
519    /// use bf_tree::circular_buffer::CircularBuffer;
520    /// let mut buffer = CircularBuffer::new(4096 * 2, 0.1, 64, 1952, 4096, 32, None, false);
521    ///
522    /// let allocated = buffer.alloc(128);
523    /// let ptr = allocated.unwrap().as_ptr();
524    ///
525    /// let v = unsafe { buffer.acquire_exclusive_dealloc_handle(ptr).unwrap() };
526    /// buffer.dealloc(v); // dealloc is mandatory before buffer being dropped.
527    /// ```
528    #[cfg_attr(feature = "tracing", tracing::instrument)]
529    pub fn alloc(&self, size: usize) -> Result<CircularBufferPtr<'_>, CircularBufferError> {
530        if size == 0 {
531            return Err(CircularBufferError::EmptyAlloc);
532        }
533
534        // The allocated space has to be greater than or equal to smallest mini-page size
535        assert!(size >= self.free_list.size_classes[self.free_list.size_classes.len() - 1]);
536
537        let (lock_guard, states) = self.lock_states();
538
539        while let Some(ptr) = self.free_list.remove(size) {
540            let raw_ptr: *mut u8 = ptr.as_ptr();
541
542            let old_meta = CircularBuffer::get_meta_from_data_ptr(raw_ptr);
543
544            if self.ptr_is_copy_on_access(raw_ptr) {
545                // here we might fail because ptr might be also evicted by evict_n,
546                // nevertheless, someone will tombstone it, so we are fine.
547                old_meta.states.free_list_to_tombstone();
548                // retry
549                continue;
550            }
551
552            assert!(old_meta.size as usize >= size);
553
554            // we need to ensure while we are allocating this, no one else is evicting it.
555            match old_meta.states.state.compare_exchange_weak(
556                MetaState::FreeListed.into(),
557                MetaState::NotReady.into(),
558                Ordering::AcqRel,
559                Ordering::Relaxed,
560            ) {
561                Ok(_) => {
562                    return Ok(CircularBufferPtr::new(raw_ptr));
563                }
564                Err(_) => {
565                    continue;
566                }
567            };
568        }
569
570        let logical_remaining = self.capacity - (states.tail_addr() - states.head_addr()); // Total amount of un-used memory
571        let physical_remaining = self.capacity - (states.tail_addr & (self.capacity - 1)); // Total amount of contiguous memory starting from the tail
572
573        let aligned_size = align_up(size, CB_ALLOC_META_SIZE);
574        let required = aligned_size + std::mem::size_of::<AllocMeta>();
575
576        if logical_remaining < required {
577            return Err(CircularBufferError::Full);
578        }
579
580        if physical_remaining < required {
581            // fill the remaining physical space with a tombstone meta.
582            assert!(physical_remaining >= CB_ALLOC_META_SIZE);
583            let physical_addr = self.logical_to_physical(states.tail_addr);
584            let meta = AllocMeta::new((physical_remaining - CB_ALLOC_META_SIZE) as u32, true);
585            unsafe {
586                physical_addr.cast::<AllocMeta>().write(meta);
587            }
588            states.tail_addr += physical_remaining;
589            std::mem::drop(lock_guard);
590            return self.alloc(size);
591        }
592
593        let meta = AllocMeta::new(aligned_size as u32, false);
594
595        unsafe {
596            let physical_addr = self.logical_to_physical(states.tail_addr);
597            physical_addr.cast::<AllocMeta>().write(meta);
598        }
599        let return_addr = states.tail_addr + std::mem::size_of::<AllocMeta>();
600        states.tail_addr += required;
601
602        let ptr = CircularBufferPtr::new(self.logical_to_physical(return_addr));
603        Ok(ptr)
604    }
605
606    fn logical_to_physical(&self, addr: usize) -> *mut u8 {
607        let offset = addr & (self.capacity - 1);
608        unsafe { self.data_ptr.add(offset) }
609    }
610
611    fn debug_check_ptr_is_from_me(&self, ptr: *mut u8) {
612        let offset = ptr as usize - self.data_ptr as usize;
613        debug_assert!(offset <= self.capacity);
614    }
615
616    /// Returns whether the pointer is inside copy-on-access region.
617    /// Useful to detect if the ptr is about to be evicted.
618    ///
619    /// If a ptr is close to head, it is copy on access.
620    /// If a ptr is close to tail, it is inplace updatable.
621    pub fn ptr_is_copy_on_access(&self, ptr: *mut u8) -> bool {
622        let distance = self.distance_to_tail(ptr);
623        distance >= self.copy_on_access_threshold
624    }
625
626    fn distance_to_tail(&self, ptr: *mut u8) -> usize {
627        let ptr_usize = ptr as usize;
628        let tail_ptr = self.logical_to_physical(self.get_fuzzy_tail_addr());
629        let tail_usize = tail_ptr as usize;
630
631        if tail_usize >= ptr_usize {
632            tail_usize - ptr_usize
633        } else {
634            self.capacity - (ptr_usize - tail_usize)
635        }
636    }
637
638    fn get_fuzzy_tail_addr(&self) -> usize {
639        unsafe { &*self.states.get() }.tail_addr()
640    }
641
642    /// This is used to sanity check that
643    /// the given address has already been tombstoned, if not, return false.
644    ///
645    /// # Safety
646    /// The addr must be allocated by this buffer.
647    #[allow(dead_code)]
648    pub(crate) unsafe fn addr_is_tombstoned(addr: *mut u8) -> bool {
649        let meta = CircularBuffer::get_meta_from_data_ptr(addr);
650
651        meta.states.is_tombstoned()
652    }
653
654    /// Deallocates the given address.
655    /// Deallocate is mandatory before the buffer being dropped.
656    ///
657    /// It panics if the ptr is already dealloced, so double free is not allowed.
658    ///
659    /// ```
660    /// use bf_tree::circular_buffer::CircularBuffer;
661    /// let mut buffer = CircularBuffer::new(4096 * 2, 0.1, 64, 1952, 4096, 32, None, false);
662    ///
663    /// let allocated = buffer.alloc(128);
664    /// let ptr = allocated.unwrap().as_ptr();
665    ///
666    /// let v = unsafe{ buffer.acquire_exclusive_dealloc_handle(ptr).unwrap() };
667    /// buffer.dealloc(v); // dealloc is mandatory before buffer being dropped.
668    /// ```
669    ///
670    #[cfg_attr(feature = "tracing", tracing::instrument)]
671    pub fn dealloc(&self, ptr: TombstoneHandle) {
672        self.dealloc_inner(ptr, true);
673    }
674
675    fn dealloc_inner(&self, ptr: TombstoneHandle, add_to_freelist: bool) {
676        self.debug_check_ptr_is_from_me(ptr.as_ptr());
677        let ptr = ptr.into_ptr();
678        let meta = CircularBuffer::get_meta_from_data_ptr(ptr);
679
680        if !add_to_freelist || self.ptr_is_copy_on_access(ptr) {
681            meta.states.to_tombstone();
682            return;
683        }
684
685        // we don't want to pollute dealloc with a Result<>, so if there's contention, we don't add it to free list.
686        match self.free_list.try_add(ptr, meta.size as usize) {
687            Ok(_lock) => {
688                meta.states.to_freelist();
689            }
690            Err(_) => {
691                meta.states.to_tombstone();
692            }
693        }
694    }
695
696    /// Check if the ptr is accessible.
697    ///
698    /// # Safety
699    /// The ptr must be allocated by this buffer.
700    pub unsafe fn check_ptr_is_ready(ptr: *mut u8) {
701        let meta = CircularBuffer::get_meta_from_data_ptr(ptr);
702
703        assert!(meta.states.state() == MetaState::Ready);
704    }
705
706    /// Set the ptr's state to be tombstoning, no future access is allowed, no concurrent tombstoning is allowed.
707    /// This is the required call before you can deallocate the ptr.
708    ///
709    /// Returns the handle that you can use to deallocate the ptr.
710    ///     Or Err if contention happened.
711    ///
712    /// This call is necessary because two concurrent threads can deallocating the same ptr at the same time:
713    ///    1. thread A deallocates the ptr normally,
714    ///    2. thread B deallocates the ptr by calling evict_n
715    ///
716    /// This causes contention and unnecessary complexity to handle the race.
717    /// It is possible for user to coordinate the two threads, but I feel like it is better to handle it in the library.
718    /// I'm not 100% sure this is the best way to do it. If you are reading this, it's a good time to revisit the design.
719    ///
720    /// Some other thoughts:
721    /// This is essentially a x-lock, whoever wins gets to deallocate/evict.
722    /// Why not directly expose a locking API?
723    ///     While it is possible, I don't like it because we will have too many places to lock.
724    ///     In a complex system like bf-tree, the more place to lock, the higher mental burden to the maintainer.
725    ///     I want bf-tree to be simple to maintain.
726    ///
727    /// The next question is: if we don't want so many places to lock, why do we have to lock here?
728    /// Why not directly expose the raw bare minimal API, and let users to coordinate the locking?
729    /// Readers of this comment should think carefully and consider it as a refactoring opportunity.
730    ///
731    /// The very high level question here is: what is the safest and efficient interface of a circular buffer that serves our use case?
732    ///
733    /// # Safety
734    /// The ptr must be allocated by this buffer.
735    pub unsafe fn acquire_exclusive_dealloc_handle(
736        &self,
737        ptr: *mut u8,
738    ) -> Result<TombstoneHandle, CircularBufferError> {
739        self.debug_check_ptr_is_from_me(ptr);
740
741        let meta = CircularBuffer::get_meta_from_data_ptr(ptr);
742
743        if meta.states.try_begin_tombstone() {
744            Ok(TombstoneHandle { ptr })
745        } else {
746            Err(CircularBufferError::WouldBlock)
747        }
748    }
749
750    /// Returns an iterator that allows you to iterate over the allocated items in the buffer, from head to tail.
751    /// Useful for sanity check.
752    fn iter(&self) -> Result<AllocatedIter<'_>, CircularBufferError> {
753        let (lock, states) = self.try_get_states()?;
754        Ok(AllocatedIter {
755            _lock: lock,
756            buffer: self,
757            head_addr: states.head_addr(),
758            tail_addr: states.tail_addr(),
759        })
760    }
761
762    /// Evict n items from the buffer, calling callback on each item,
763    /// and returning (elements that is evicted from callback, the number of bytes the head advanced).
764    /// This is necessary when the buffer is full, i.e., failed to allocate a new item.
765    ///
766    ///
767    /// The call back is called on each item.
768    /// The input handle gives excluesive access to the item, i.e., no other thread can deallocate/evict it.
769    /// If you failed to evict the item, return Err, and eviction will release the handle and restart the eviction again.
770    ///
771    ///
772    /// ```
773    /// use bf_tree::circular_buffer::CircularBuffer;
774    /// let mut buffer = CircularBuffer::new(1024 * 2, 0.1, 64, 256, 1024, 32, None, true);
775    ///
776    /// for _i in 0..7 {
777    ///     let alloc = buffer.alloc(256).unwrap();
778    ///     unsafe { *alloc.as_ptr() = 42 };
779    ///     drop(alloc);
780    /// }
781    ///
782    /// let not_allocated = buffer.alloc(400);
783    /// assert!(not_allocated.is_err());
784    /// drop(not_allocated);
785    ///
786    /// buffer.evict_n(
787    ///     usize::MAX,
788    ///     |h| {
789    ///         let ptr = h.as_ptr();
790    ///         assert_eq!(unsafe { *ptr }, 42);
791    ///         Ok(h)
792    ///     },
793    /// );
794    ///
795    /// let allocated = buffer.alloc(400).unwrap();
796    /// let ptr = allocated.as_ptr();
797    /// drop(allocated);
798    /// let v = unsafe { buffer.acquire_exclusive_dealloc_handle(ptr).unwrap() };
799    /// buffer.dealloc(v);
800    /// ```
801    pub fn evict_n<T>(&self, n: usize, mut callback: T) -> Result<u32, CircularBufferError>
802    where
803        T: FnMut(TombstoneHandle) -> Result<TombstoneHandle, TombstoneHandle>,
804    {
805        let mut cur_n = 0;
806        let mut cur_evicted = 0;
807        while cur_n < n {
808            let evicted = self.evict_one(&mut callback);
809            match evicted {
810                None => return Ok(cur_evicted),
811                Some(v) => {
812                    cur_evicted += v;
813                    cur_n += 1;
814                }
815            }
816        }
817        Ok(cur_evicted)
818    }
819
820    fn get_meta(&self, logical_address: usize) -> &AllocMeta {
821        let ptr = self.logical_to_physical(logical_address);
822        self.debug_check_ptr_is_from_me(ptr);
823        let meta_ptr = ptr.cast::<AllocMeta>();
824        unsafe { &*meta_ptr }
825    }
826
827    fn get_meta_from_data_ptr<'a>(data_ptr: *mut u8) -> &'a AllocMeta {
828        debug_assert_eq!(data_ptr as usize % 8, 0);
829        let meta_ptr = unsafe { data_ptr.sub(CB_ALLOC_META_SIZE) } as *mut AllocMeta;
830        unsafe { &*meta_ptr }
831    }
832
833    #[cfg_attr(feature = "tracing", tracing::instrument)]
834    fn try_bump_head_address_to_evicting_addr(
835        &self,
836        states: &mut States,
837    ) -> Result<u32, CircularBufferError> {
838        let mut head_addr = states.head_addr();
839        let old_addr = head_addr;
840        let evicting_addr = states.evicting_addr;
841        while head_addr < evicting_addr {
842            let meta = self.get_meta(head_addr);
843            if !meta.states.is_evicted() {
844                #[cfg(all(feature = "shuttle", test))]
845                {
846                    shuttle::thread::yield_now();
847                }
848                return Err(CircularBufferError::WouldBlock);
849            }
850
851            let to_add = meta.size as usize + CB_ALLOC_META_SIZE;
852            states.head_addr.fetch_add(to_add, Ordering::Relaxed);
853            head_addr += to_add;
854        }
855        Ok((head_addr - old_addr) as u32)
856    }
857
858    /// Drain the buffer, calling the callback on each item.
859    pub fn drain<T>(&self, mut callback: T)
860    where
861        T: FnMut(TombstoneHandle) -> Result<TombstoneHandle, TombstoneHandle>,
862    {
863        loop {
864            let evicted = self.evict_one(&mut callback);
865            if evicted.is_none() {
866                break;
867            }
868        }
869        let backoff = Backoff::new();
870        let (_lock, states) = self.lock_states();
871        loop {
872            if self.try_bump_head_address_to_evicting_addr(states).is_ok() {
873                assert_eq!(states.evicting_addr, states.head_addr());
874                assert_eq!(states.head_addr(), states.tail_addr());
875                return;
876            } else {
877                backoff.snooze();
878            }
879        }
880    }
881
882    /// Evict one element from the buffer, it never fails.
883    /// Returns the number of bytes the head advanced.
884    /// Return None if the buffer is empty.
885    ///
886    /// This is a complex function, the design goal is to not holding a lock while waiting for IO.
887    /// This is two step process:
888    /// (1) take the lock and make the reservation: bump the evicting address
889    /// (2) evict the data, potentially long running IO call.
890    /// (3) finish the reservation: bump the head address to the evicting address
891    pub fn evict_one<T>(&self, callback: &mut T) -> Option<u32>
892    where
893        T: FnMut(TombstoneHandle) -> Result<TombstoneHandle, TombstoneHandle>,
894    {
895        let (start_addr, end_addr) = {
896            let (lock, states) = self.lock_states();
897
898            let evicting_addr = states.evicting_addr;
899
900            if evicting_addr == states.tail_addr() {
901                // we are behind schedule here, should we call bump head address?
902                #[cfg(all(feature = "shuttle", test))]
903                {
904                    shuttle::thread::yield_now();
905                }
906                return None;
907            }
908
909            let evicting_meta = self.get_meta(evicting_addr);
910            let size = evicting_meta.size as usize;
911
912            let advance = size + CB_ALLOC_META_SIZE;
913
914            states.evicting_addr += advance;
915            drop(lock);
916            (evicting_addr, evicting_addr + advance)
917        };
918
919        let meta = self.get_meta(start_addr);
920        let data_ptr = meta.data_ptr();
921
922        let backoff = Backoff::new();
923
924        // evict the data using the callback, IO long running call.
925        loop {
926            let h = unsafe { self.acquire_exclusive_dealloc_handle(data_ptr) };
927            match h {
928                Ok(v) => {
929                    match callback(v) {
930                        Ok(h) => {
931                            self.dealloc_inner(h, false);
932                            meta.states.tombstone_to_evicted();
933                            break;
934                        }
935                        Err(h) => {
936                            drop(h);
937                            backoff.spin();
938                        }
939                    };
940                }
941                Err(_) => {
942                    let state = meta.states.state();
943
944                    if state == MetaState::NotReady {
945                        // do nothing and wait for the ptr to be ready.
946                    } else {
947                        if state == MetaState::Tombstone {
948                            meta.states.tombstone_to_evicted();
949                            break;
950                        }
951                        if state == MetaState::FreeListed {
952                            let found =
953                                self.free_list.find_and_remove(data_ptr, meta.size as usize);
954                            if found {
955                                meta.states.free_list_to_tombstone();
956                                meta.states.tombstone_to_evicted();
957                                break;
958                            }
959                        }
960                    }
961                    backoff.spin();
962                }
963            }
964        }
965
966        let (_lock, states) = self.lock_states();
967        _ = self.try_bump_head_address_to_evicting_addr(states);
968        Some((end_addr - start_addr) as u32)
969    }
970}
971
972/// An iterator that allows you to iterate over the allocated items in the buffer, from head to tail.
973/// This iterator holds an lock on the entire buffer, which prevents concurrent allocation/eviction.
974/// Use it wisely.
975///
976/// The only proper use case I can think of is to sanity check every allocated item in the buffer.
977struct AllocatedIter<'a> {
978    _lock: MutexGuard<'a, ()>,
979    buffer: &'a CircularBuffer,
980    head_addr: usize,
981    tail_addr: usize,
982}
983
984impl<'a> Iterator for AllocatedIter<'a> {
985    type Item = &'a AllocMeta;
986
987    fn next(&mut self) -> Option<Self::Item> {
988        if self.head_addr == self.tail_addr {
989            return None;
990        }
991
992        let meta = self.buffer.get_meta(self.head_addr);
993
994        let size = meta.size as usize;
995        let advance = size + CB_ALLOC_META_SIZE;
996
997        self.head_addr += advance;
998
999        Some(meta)
1000    }
1001}
1002
1003#[cfg(test)]
1004mod tests {
1005    use super::*;
1006    use crate::{BfTree, Config};
1007    use rstest::rstest;
1008
1009    #[rstest]
1010    #[case(64, 1952, 4096)] // 1 leaf page = 1 disk page
1011    #[case(3072, 3072, 8192)] // 1 leaf page = 1 disk page, uniform record size
1012    #[case(64, 2048, 16384)] // 1 leaf page = 4 disk page
1013    fn test_circular_buffer_initialization(
1014        #[case] min_record_size: usize,
1015        #[case] max_record_size: usize,
1016        #[case] leaf_page_size: usize,
1017    ) {
1018        let capacity = leaf_page_size * 2; // Use a valid power of two and greater than 1024
1019        let buffer = CircularBuffer::new(
1020            capacity,
1021            0.1,
1022            min_record_size,
1023            max_record_size,
1024            leaf_page_size,
1025            32,
1026            None,
1027            false,
1028        );
1029
1030        let (_lock, states) = buffer.try_get_states().unwrap();
1031        assert_eq!(states.head_addr(), 0);
1032        assert_eq!(states.tail_addr(), 0);
1033        assert!(!buffer.data_ptr.is_null());
1034        assert_eq!(buffer.capacity, capacity);
1035    }
1036
1037    #[rstest]
1038    #[case(64, 1952, 4096, false)] // 1 leaf page = 1 disk page
1039    #[case(3072, 3072, 8192, false)] // 1 leaf page = 1 disk page, uniform record size
1040    #[case(64, 2048, 16384, true)] // 1 leaf page = 4 disk page
1041    fn test_circular_buffer_alloc_and_dealloc(
1042        #[case] min_record_size: usize,
1043        #[case] max_record_size: usize,
1044        #[case] leaf_page_size: usize,
1045        #[case] pre_allocated_buffer: bool,
1046    ) {
1047        let buffer_ptr = if pre_allocated_buffer {
1048            let layout =
1049                std::alloc::Layout::from_size_align(leaf_page_size * 2, BUFFER_ALIGNMENT).unwrap();
1050            let ptr = unsafe { std::alloc::alloc(layout) };
1051            Some(ptr)
1052        } else {
1053            None
1054        };
1055
1056        let buffer = CircularBuffer::new(
1057            leaf_page_size * 2,
1058            0.1,
1059            min_record_size,
1060            max_record_size,
1061            leaf_page_size,
1062            32,
1063            buffer_ptr,
1064            false,
1065        );
1066
1067        // Allocate a page of the smallest/largest mini-page
1068        let mini_page_size = vec![
1069            buffer.free_list.size_classes[0],
1070            buffer.free_list.size_classes[buffer.free_list.size_classes.len() - 1],
1071        ];
1072
1073        for i in 0..mini_page_size.len() {
1074            let size = mini_page_size[i]; // this size cannot be smaller than mini-page size
1075            let alloc_ptr = buffer.alloc(size).expect("Allocation failed").ptr;
1076            assert!(!alloc_ptr.is_null());
1077
1078            unsafe {
1079                let p = buffer.acquire_exclusive_dealloc_handle(alloc_ptr).unwrap();
1080                buffer.dealloc(p);
1081            }
1082
1083            // Check tombstone
1084            let meta = CircularBuffer::get_meta_from_data_ptr(alloc_ptr);
1085            assert!(meta.states.is_tombstoned() || meta.states.is_freelisted());
1086        }
1087    }
1088
1089    #[rstest]
1090    #[case(32, 1952, 4096)] // 1 leaf page = 1 disk page
1091    #[case(3072, 3072, 8192)] // 1 leaf page = 1 disk page, uniform record size
1092    #[case(64, 2048, 16384)] // 1 leaf page = 4 disk page
1093    fn test_circular_buffer_evict_n(
1094        #[case] min_record_size: usize,
1095        #[case] max_record_size: usize,
1096        #[case] leaf_page_size: usize,
1097    ) {
1098        let buffer = CircularBuffer::new(
1099            leaf_page_size * 2,
1100            0.1,
1101            min_record_size,
1102            max_record_size,
1103            leaf_page_size,
1104            32,
1105            None,
1106            false,
1107        );
1108        let size = buffer.free_list.size_classes[0]; // Smallest mini-page size
1109
1110        // Allocate and then evict
1111        let _ = buffer.alloc(size).expect("Allocation failed");
1112        let bytes_advanced = buffer.evict_n(1, |h| Ok(h)).unwrap() as usize;
1113
1114        assert_eq!(
1115            bytes_advanced,
1116            align_up(size, CB_ALLOC_META_SIZE) + CB_ALLOC_META_SIZE
1117        );
1118    }
1119
1120    #[test]
1121    fn test_circular_buffer_evict_more_than_present() {
1122        let buffer = CircularBuffer::new(4096 * 2, 0.1, 64, 1952, 4096, 64, None, true);
1123
1124        // Evict more items than are in the buffer
1125        let bytes_advanced = buffer.evict_n(10, |h| Ok(h)).unwrap();
1126        assert_eq!(bytes_advanced, 0);
1127    }
1128
1129    #[test]
1130    fn test_align_up_function() {
1131        let addr = 123;
1132        let align = 8;
1133        let aligned_addr = align_up(addr, align);
1134
1135        assert_eq!(aligned_addr % align, 0);
1136    }
1137
1138    #[test]
1139    fn alloc_and_evict() {
1140        let buffer = CircularBuffer::new(4096 * 2, 0.1, 64, 1952, 4096, 32, None, true);
1141
1142        // Fill up the circular buffer
1143        for _i in 0..3 {
1144            let alloc = buffer.alloc(2048).unwrap();
1145            unsafe { *alloc.as_ptr() = 42 };
1146            drop(alloc);
1147        }
1148
1149        // New allcation fails
1150        let not_allocated = buffer.alloc(2048);
1151        assert!(matches!(not_allocated, Err(CircularBufferError::Full)));
1152        drop(not_allocated);
1153
1154        // Evict everything in the circular buffer
1155        buffer
1156            .evict_n(usize::MAX, |h| {
1157                assert_eq!(unsafe { *(h.as_ptr()) }, 42);
1158                Ok(h)
1159            })
1160            .unwrap();
1161
1162        // Allocation succeeds
1163        let allocated = buffer.alloc(2048).unwrap();
1164        let ptr = allocated.as_ptr();
1165        drop(allocated);
1166        unsafe {
1167            let p = buffer.acquire_exclusive_dealloc_handle(ptr).unwrap();
1168            buffer.dealloc(p);
1169        }
1170    }
1171
1172    #[test]
1173    fn idential_mini_page_classes() {
1174        // Create a regular bf-tree and check the BfTree's mini-page classes are identical to its CB's mini-page classes
1175        let mut config = Config::default();
1176        config.cb_max_record_size(1928);
1177
1178        let mut tree = BfTree::with_config(config.clone(), None);
1179
1180        let a = tree.mini_page_size_classes.clone();
1181        let mut b = tree.storage.circular_buffer.free_list.size_classes.clone();
1182        b.reverse();
1183        assert_eq!(a, b);
1184        drop(tree);
1185
1186        config.cache_only = true;
1187        tree = BfTree::with_config(config.clone(), None);
1188        let c = tree.mini_page_size_classes.clone();
1189        let mut d = tree.storage.circular_buffer.free_list.size_classes.clone();
1190        d.reverse();
1191        assert_eq!(c, d);
1192        assert_eq!(a, c);
1193
1194        drop(tree);
1195    }
1196}