rarena_allocator/
sync.rs

1use core::{
2  fmt,
3  mem::{self, MaybeUninit},
4  ptr::{self, NonNull},
5  slice,
6};
7
8use crossbeam_utils::Backoff;
9
10use super::{common::*, sealed::Sealed, *};
11
12#[allow(unused_imports)]
13use std::boxed::Box;
14
15#[cfg(feature = "std")]
16type Memory =
17  super::memory::Memory<AtomicUsize, std::sync::Arc<std::path::PathBuf>, sealed::Header>;
18
19#[cfg(not(feature = "std"))]
20type Memory = super::memory::Memory<AtomicUsize, std::sync::Arc<()>, sealed::Header>;
21
22const SEGMENT_NODE_SIZE: usize = mem::size_of::<SegmentNode>();
23const REMOVED_SEGMENT_NODE: u32 = 0;
24
25mod sealed {
26  use super::*;
27
28  #[derive(Debug)]
29  #[repr(C, align(8))]
30  pub struct Header {
31    /// The sentinel node for the ordered free list.
32    pub(super) sentinel: SegmentNode,
33    pub(super) allocated: AtomicU32,
34    pub(super) min_segment_size: AtomicU32,
35    pub(super) discarded: AtomicU32,
36  }
37
38  impl super::super::sealed::Header for Header {
39    #[inline]
40    fn new(size: u32, min_segment_size: u32) -> Self {
41      Self {
42        allocated: AtomicU32::new(size),
43        sentinel: SegmentNode::sentinel(),
44        min_segment_size: AtomicU32::new(min_segment_size),
45        discarded: AtomicU32::new(0),
46      }
47    }
48
49    #[cfg(all(feature = "memmap", not(target_family = "wasm")))]
50    #[inline]
51    fn load_allocated(&self) -> u32 {
52      self.allocated.load(Ordering::Acquire)
53    }
54
55    #[inline]
56    fn load_min_segment_size(&self) -> u32 {
57      self.min_segment_size.load(Ordering::Acquire)
58    }
59  }
60}
61
62#[repr(transparent)]
63struct SegmentNode {
64  /// The first 32 bits are the size of the memory,
65  /// the last 32 bits are the offset of the next segment node.
66  size_and_next: AtomicU64,
67}
68
69impl core::fmt::Debug for SegmentNode {
70  fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
71    let (offset, next) = decode_segment_node(self.size_and_next.load(Ordering::Acquire));
72    f.debug_struct("SegmentNode")
73      .field("offset", &offset)
74      .field("next", &next)
75      .finish()
76  }
77}
78
79impl core::ops::Deref for SegmentNode {
80  type Target = AtomicU64;
81
82  #[inline]
83  fn deref(&self) -> &Self::Target {
84    &self.size_and_next
85  }
86}
87
88impl SegmentNode {
89  #[inline]
90  fn sentinel() -> Self {
91    Self {
92      size_and_next: AtomicU64::new(encode_segment_node(
93        SENTINEL_SEGMENT_NODE_OFFSET,
94        SENTINEL_SEGMENT_NODE_OFFSET,
95      )),
96    }
97  }
98}
99
100#[derive(Debug, Copy, Clone, PartialEq, Eq)]
101struct Segment {
102  ptr: *mut u8,
103  ptr_offset: u32,
104  data_offset: u32,
105  data_size: u32,
106}
107
108impl Segment {
109  /// ## Safety
110  /// - offset must be a well-aligned and in-bounds `AtomicU64` pointer.
111  #[inline]
112  unsafe fn from_offset(arena: &Arena, offset: u32, data_size: u32) -> Self {
113    Self {
114      ptr: arena.ptr,
115      ptr_offset: offset,
116      data_offset: offset + SEGMENT_NODE_SIZE as u32,
117      data_size,
118    }
119  }
120
121  #[inline]
122  fn as_ref(&self) -> &SegmentNode {
123    // Safety: when constructing the Segment, we have checked the ptr_offset is in bounds and well-aligned.
124    unsafe {
125      let ptr = self.ptr.add(self.ptr_offset as usize);
126      &*ptr.cast::<SegmentNode>()
127    }
128  }
129
130  #[inline]
131  fn update_next_node(&self, next: u32) {
132    self
133      .as_ref()
134      .store(encode_segment_node(self.data_size, next), Ordering::Release);
135  }
136}
137
138/// Arena should be lock-free
139pub struct Arena {
140  ptr: *mut u8,
141  data_offset: u32,
142  reserved: usize,
143  flag: MemoryFlags,
144  max_retries: u8,
145  inner: NonNull<Memory>,
146  unify: bool,
147  magic_version: u16,
148  version: u16,
149  ro: bool,
150  cap: u32,
151  freelist: Freelist,
152  page_size: u32,
153}
154
155impl fmt::Debug for Arena {
156  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157    let header = self.header();
158    let allocated = header.allocated.load(Ordering::Acquire);
159
160    f.debug_struct("Arena")
161      .field("cap", &self.cap)
162      .field("data_offset", &self.data_offset)
163      .field("allocated", &allocated)
164      .field("discarded", &header.discarded.load(Ordering::Acquire))
165      .field("freelist", &self.freelist)
166      .field("page_size", &self.page_size)
167      .field("reserved", &self.reserved)
168      .field("magic_version", &self.magic_version)
169      .field("version", &self.version)
170      .field("read_only", &self.ro)
171      .field("unify", &self.unify)
172      .finish()
173  }
174}
175
176impl Clone for Arena {
177  fn clone(&self) -> Self {
178    unsafe {
179      let memory = self.inner.as_ref();
180
181      let old_size = memory.refs().fetch_add(1, Ordering::Release);
182      if old_size > usize::MAX >> 1 {
183        dbutils::abort();
184      }
185
186      // Safety:
187      // The ptr is always non-null, and the data is only deallocated when the
188      // last Arena is dropped.
189      Self {
190        reserved: self.reserved,
191        max_retries: self.max_retries,
192        flag: self.flag,
193        magic_version: self.magic_version,
194        version: self.version,
195        ptr: self.ptr,
196        data_offset: self.data_offset,
197        ro: self.ro,
198        inner: self.inner,
199        unify: self.unify,
200        cap: self.cap,
201        freelist: self.freelist,
202        page_size: self.page_size,
203      }
204    }
205  }
206}
207
208impl From<Memory> for Arena {
209  fn from(memory: Memory) -> Self {
210    let ptr = memory.as_mut_ptr();
211
212    Self {
213      freelist: memory.freelist(),
214      reserved: memory.reserved(),
215      cap: memory.cap(),
216      flag: memory.flag(),
217      unify: memory.unify(),
218      magic_version: memory.magic_version(),
219      version: memory.version(),
220      ptr,
221      ro: memory.read_only(),
222      max_retries: memory.maximum_retries(),
223      data_offset: memory.data_offset() as u32,
224      inner: unsafe { NonNull::new_unchecked(Box::into_raw(Box::new(memory)) as _) },
225      page_size: *PAGE_SIZE,
226    }
227  }
228}
229
230impl Sealed for Arena {
231  type Header = sealed::Header;
232  type RefCounter = AtomicUsize;
233  #[cfg(feature = "std")]
234  type PathRefCounter = std::sync::Arc<std::path::PathBuf>;
235
236  #[cfg(not(feature = "std"))]
237  type PathRefCounter = std::sync::Arc<()>;
238}
239
240impl AsRef<Memory> for Arena {
241  #[inline]
242  fn as_ref(&self) -> &Memory {
243    // Safety: the inner is always non-null.
244    unsafe { self.inner.as_ref() }
245  }
246}
247
248impl Allocator for Arena {
249  #[cfg(all(feature = "memmap", not(target_family = "wasm")))]
250  #[cfg_attr(docsrs, doc(cfg(all(feature = "memmap", not(target_family = "wasm")))))]
251  type Path = std::sync::Arc<std::path::PathBuf>;
252
253  #[inline]
254  fn reserved_bytes(&self) -> usize {
255    self.reserved
256  }
257
258  #[inline]
259  fn reserved_slice(&self) -> &[u8] {
260    if self.reserved == 0 {
261      return &[];
262    }
263
264    // Safety: the ptr is always non-null.
265    unsafe { slice::from_raw_parts(self.ptr, self.reserved) }
266  }
267
268  #[inline]
269  unsafe fn reserved_slice_mut(&self) -> &mut [u8] {
270    unsafe {
271      if self.reserved == 0 {
272        return &mut [];
273      }
274
275      if self.ro {
276        panic!("ARENA is read-only");
277      }
278
279      slice::from_raw_parts_mut(self.ptr, self.reserved)
280    }
281  }
282
283  #[inline]
284  unsafe fn alloc<T>(&self) -> Result<RefMut<'_, T, Self>, Error> {
285    if mem::size_of::<T>() == 0 {
286      return Ok(RefMut::new_zst(self));
287    }
288
289    let allocated = self
290      .alloc_in::<T>()?
291      .expect("allocated size is not zero, but get None");
292    let ptr = unsafe { self.get_aligned_pointer_mut::<T>(allocated.memory_offset as usize) };
293    if mem::needs_drop::<T>() {
294      unsafe {
295        let ptr: *mut MaybeUninit<T> = ptr.as_ptr().cast();
296        ptr::write(ptr, MaybeUninit::uninit());
297
298        Ok(RefMut::new(ptr::read(ptr), allocated, self))
299      }
300    } else {
301      Ok(RefMut::new_inline(ptr, allocated, self))
302    }
303  }
304
305  #[inline]
306  fn alloc_aligned_bytes<T>(&self, size: u32) -> Result<BytesRefMut<'_, Self>, Error> {
307    self.alloc_aligned_bytes_in::<T>(size).map(|a| match a {
308      None => BytesRefMut::null(self),
309      Some(allocated) => unsafe { BytesRefMut::new(self, allocated) },
310    })
311  }
312
313  // #[cfg(all(feature = "memmap", not(target_family = "wasm")))]
314  // #[cfg_attr(docsrs, doc(cfg(all(feature = "memmap", not(target_family = "wasm")))))]
315  // #[inline]
316  // fn alloc_aligned_bytes_within_page<T>(&self, size: u32) -> Result<BytesRefMut<'_, Self>, Error> {
317  //   self
318  //     .alloc_aligned_bytes_within_page_in::<T>(size)
319  //     .map(|a| match a {
320  //       None => BytesRefMut::null(self),
321  //       Some(allocated) => unsafe { BytesRefMut::new(self, allocated) },
322  //     })
323  // }
324
325  #[inline]
326  fn alloc_bytes(&self, size: u32) -> Result<BytesRefMut<'_, Self>, Error> {
327    self.alloc_bytes_in(size).map(|a| match a {
328      None => BytesRefMut::null(self),
329      Some(allocated) => unsafe { BytesRefMut::new(self, allocated) },
330    })
331  }
332
333  // #[cfg(all(feature = "memmap", not(target_family = "wasm")))]
334  // #[cfg_attr(docsrs, doc(cfg(all(feature = "memmap", not(target_family = "wasm")))))]
335  // #[inline]
336  // fn alloc_bytes_within_page(&self, size: u32) -> Result<BytesRefMut<'_, Self>, Error> {
337  //   self.alloc_bytes_within_page_in(size).map(|a| match a {
338  //     None => BytesRefMut::null(self),
339  //     Some(allocated) => unsafe { BytesRefMut::new(self, allocated) },
340  //   })
341  // }
342
343  // #[cfg(all(feature = "memmap", not(target_family = "wasm")))]
344  // #[cfg_attr(docsrs, doc(cfg(all(feature = "memmap", not(target_family = "wasm")))))]
345  // #[inline]
346  // unsafe fn alloc_within_page<T>(&self) -> Result<RefMut<'_, T, Self>, Error> {
347  //   if mem::size_of::<T>() == 0 {
348  //     return Ok(RefMut::new_zst(self));
349  //   }
350
351  //   let allocated = self
352  //     .alloc_within_page_in::<T>()?
353  //     .expect("allocated size is not zero, but get None");
354  //   let ptr = unsafe { self.get_aligned_pointer_mut::<T>(allocated.memory_offset as usize) };
355  //   if mem::needs_drop::<T>() {
356  //     unsafe {
357  //       let ptr: *mut MaybeUninit<T> = ptr.as_ptr().cast();
358  //       ptr::write(ptr, MaybeUninit::uninit());
359
360  //       Ok(RefMut::new(ptr::read(ptr), allocated, self))
361  //     }
362  //   } else {
363  //     Ok(RefMut::new_inline(ptr, allocated, self))
364  //   }
365  // }
366
367  #[inline]
368  fn allocated(&self) -> usize {
369    self.header().allocated.load(Ordering::Acquire) as usize
370  }
371
372  #[inline]
373  fn raw_mut_ptr(&self) -> *mut u8 {
374    self.ptr
375  }
376
377  #[inline]
378  fn raw_ptr(&self) -> *const u8 {
379    self.ptr
380  }
381
382  unsafe fn clear(&self) -> Result<(), Error> {
383    unsafe {
384      if self.ro {
385        return Err(Error::ReadOnly);
386      }
387
388      let memory = &mut *self.inner.as_ptr();
389      memory.clear();
390
391      Ok(())
392    }
393  }
394
395  #[inline]
396  unsafe fn dealloc(&self, offset: u32, size: u32) -> bool {
397    // first try to deallocate the memory back to the main memory.
398    let header = self.header();
399    // if the offset + size is the current allocated size, then we can deallocate the memory back to the main memory.
400    if header
401      .allocated
402      .compare_exchange(offset + size, offset, Ordering::SeqCst, Ordering::Relaxed)
403      .is_ok()
404    {
405      return true;
406    }
407
408    match self.freelist {
409      Freelist::None => {
410        self.increase_discarded(size);
411        true
412      }
413      Freelist::Optimistic => self.optimistic_dealloc(offset, size),
414      Freelist::Pessimistic => self.pessimistic_dealloc(offset, size),
415    }
416  }
417
418  #[inline]
419  fn discard_freelist(&self) -> Result<u32, Error> {
420    if self.ro {
421      return Err(Error::ReadOnly);
422    }
423
424    Ok(match self.freelist {
425      Freelist::None => 0,
426      _ => self.discard_freelist_in(),
427    })
428  }
429
430  #[inline]
431  fn discarded(&self) -> u32 {
432    self.header().discarded.load(Ordering::Acquire)
433  }
434
435  #[inline]
436  fn increase_discarded(&self, size: u32) {
437    #[cfg(feature = "tracing")]
438    tracing::debug!("discard {size} bytes");
439
440    self.header().discarded.fetch_add(size, Ordering::Release);
441  }
442
443  #[inline]
444  fn magic_version(&self) -> u16 {
445    self.magic_version
446  }
447
448  #[inline]
449  fn minimum_segment_size(&self) -> u32 {
450    self.header().min_segment_size.load(Ordering::Acquire)
451  }
452
453  #[inline]
454  fn set_minimum_segment_size(&self, size: u32) {
455    self
456      .header()
457      .min_segment_size
458      .store(size, Ordering::Release);
459  }
460
461  #[cfg(all(feature = "memmap", not(target_family = "wasm")))]
462  #[cfg_attr(docsrs, doc(cfg(all(feature = "memmap", not(target_family = "wasm")))))]
463  #[inline]
464  fn path(&self) -> Option<&Self::Path> {
465    // Safety: the inner is always non-null, we only deallocate it when the memory refs is 1.
466    unsafe { self.inner.as_ref().path() }
467  }
468
469  #[inline]
470  unsafe fn offset(&self, ptr: *const u8) -> usize {
471    unsafe {
472      let offset = ptr.offset_from(self.ptr);
473      offset as usize
474    }
475  }
476
477  #[inline]
478  fn page_size(&self) -> usize {
479    self.page_size as usize
480  }
481
482  #[inline]
483  fn read_only(&self) -> bool {
484    self.ro
485  }
486
487  #[inline]
488  fn refs(&self) -> usize {
489    unsafe { self.inner.as_ref().refs().load(Ordering::Acquire) }
490  }
491
492  #[inline]
493  fn remaining(&self) -> usize {
494    (self.cap as usize).saturating_sub(self.allocated())
495  }
496
497  unsafe fn rewind(&self, pos: ArenaPosition) {
498    let header = self.header();
499    let allocated = header.allocated.load(Ordering::Acquire);
500    let data_offset = self.data_offset;
501    let cap = self.cap;
502    let final_offset = match pos {
503      ArenaPosition::Start(offset) => offset.max(data_offset).min(cap),
504      ArenaPosition::Current(offset) => {
505        let offset = allocated as i64 + offset;
506        #[allow(clippy::comparison_chain)]
507        if offset > 0 {
508          if offset >= (cap as i64) {
509            cap
510          } else {
511            let offset = offset as u32;
512            offset.max(data_offset).min(cap)
513          }
514        } else if offset < 0 {
515          data_offset
516        } else {
517          return;
518        }
519      }
520      ArenaPosition::End(offset) => match cap.checked_sub(offset) {
521        Some(val) => val.max(data_offset),
522        None => data_offset,
523      },
524    };
525
526    header.allocated.store(final_offset, Ordering::Release);
527  }
528
529  #[inline]
530  fn version(&self) -> u16 {
531    self.version
532  }
533}
534
535unsafe impl Send for Arena {}
536unsafe impl Sync for Arena {}
537
538impl Arena {
539  #[inline]
540  fn header(&self) -> &sealed::Header {
541    // Safety:
542    // The inner is always non-null, we only deallocate it when the memory refs is 1.
543    unsafe { (*self.inner.as_ptr()).header() }
544  }
545}
546
547impl Arena {
548  /// Returns the free list position to insert the value.
549  /// - `None` means that we should insert to the head.
550  /// - `Some(offset)` means that we should insert after the offset. offset -> new -> next
551  fn find_position(&self, val: u32, check: impl Fn(u32, u32) -> bool) -> (u64, &AtomicU64) {
552    let header = self.header();
553    let mut current: &AtomicU64 = &header.sentinel;
554    let mut current_node = current.load(Ordering::Acquire);
555    let (mut current_node_size, mut next_offset) = decode_segment_node(current_node);
556    let backoff = Backoff::new();
557    loop {
558      // the list is empty
559      if current_node_size == SENTINEL_SEGMENT_NODE_SIZE
560        && next_offset == SENTINEL_SEGMENT_NODE_OFFSET
561      {
562        return (current_node, current);
563      }
564
565      // the current is marked as remove and the next is the tail.
566      if current_node_size == REMOVED_SEGMENT_NODE && next_offset == SENTINEL_SEGMENT_NODE_OFFSET {
567        return (current_node, current);
568      }
569
570      if current_node_size == REMOVED_SEGMENT_NODE {
571        current = if next_offset == SENTINEL_SEGMENT_NODE_OFFSET {
572          backoff.snooze();
573          &header.sentinel
574        } else {
575          self.get_segment_node(next_offset)
576        };
577        current_node = current.load(Ordering::Acquire);
578        (current_node_size, next_offset) = decode_segment_node(current_node);
579        continue;
580      }
581
582      // the next is the tail, then we should insert the value after the current node.
583      if next_offset == SENTINEL_SEGMENT_NODE_OFFSET {
584        return (current_node, current);
585      }
586
587      let next = self.get_segment_node(next_offset);
588      let next_node = next.load(Ordering::Acquire);
589      let (next_node_size, next_next_offset) = decode_segment_node(next_node);
590      if next_node_size == REMOVED_SEGMENT_NODE {
591        backoff.snooze();
592        continue;
593      }
594
595      if check(val, next_node_size) {
596        return (current_node, current);
597      }
598
599      current = next;
600      current_node = next_node;
601      current_node_size = next_node_size;
602      next_offset = next_next_offset;
603    }
604  }
605
606  #[allow(clippy::type_complexity)]
607  fn find_prev_and_next(
608    &self,
609    val: u32,
610    check: impl Fn(u32, u32) -> bool,
611  ) -> Option<((u64, &AtomicU64), (u64, &AtomicU64))> {
612    let header = self.header();
613    let mut current: &AtomicU64 = &header.sentinel;
614    let mut current_node = current.load(Ordering::Acquire);
615    let (mut current_node_size, mut next_offset) = decode_segment_node(current_node);
616    let backoff = Backoff::new();
617    loop {
618      // the list is empty
619      if current_node_size == SENTINEL_SEGMENT_NODE_SIZE
620        && next_offset == SENTINEL_SEGMENT_NODE_OFFSET
621      {
622        return None;
623      }
624
625      // the current is marked as remove and the next is the tail.
626      if current_node_size == REMOVED_SEGMENT_NODE && next_offset == SENTINEL_SEGMENT_NODE_OFFSET {
627        return None;
628      }
629
630      if current_node_size == REMOVED_SEGMENT_NODE {
631        current = if next_offset == SENTINEL_SEGMENT_NODE_OFFSET {
632          return None;
633        } else {
634          self.get_segment_node(next_offset)
635        };
636        current_node = current.load(Ordering::Acquire);
637        (current_node_size, next_offset) = decode_segment_node(current_node);
638        continue;
639      }
640
641      // the next is the tail
642      if next_offset == SENTINEL_SEGMENT_NODE_OFFSET {
643        return None;
644      }
645
646      let next = self.get_segment_node(next_offset);
647      let next_node = next.load(Ordering::Acquire);
648      let (next_node_size, next_next_offset) = decode_segment_node(next_node);
649
650      if check(val, next_node_size) {
651        if next_node_size == REMOVED_SEGMENT_NODE {
652          backoff.snooze();
653          continue;
654        }
655
656        return Some(((current_node, current), (next_node, next)));
657      }
658
659      current = self.get_segment_node(next_offset);
660      current_node = next_node;
661      current_node_size = next_node_size;
662      next_offset = next_next_offset;
663    }
664  }
665
666  fn optimistic_dealloc(&self, offset: u32, size: u32) -> bool {
667    // check if we have enough space to allocate a new segment in this segment.
668    let Some(segment_node) = self.try_new_segment(offset, size) else {
669      return false;
670    };
671
672    let backoff = Backoff::new();
673
674    loop {
675      let (current_node_size_and_next_node_offset, current) = self
676        .find_position(segment_node.data_size, |val, next_node_size| {
677          val >= next_node_size
678        });
679      let (node_size, next_node_offset) =
680        decode_segment_node(current_node_size_and_next_node_offset);
681
682      if node_size == REMOVED_SEGMENT_NODE {
683        // wait other thread to make progress.
684        backoff.snooze();
685        continue;
686      }
687
688      if segment_node.ptr_offset == next_node_offset {
689        // we found ourselves, then we need to refind the position.
690        backoff.snooze();
691        continue;
692      }
693
694      segment_node.update_next_node(next_node_offset);
695
696      match current.compare_exchange(
697        current_node_size_and_next_node_offset,
698        encode_segment_node(node_size, segment_node.ptr_offset),
699        Ordering::AcqRel,
700        Ordering::Relaxed,
701      ) {
702        Ok(_) => {
703          #[cfg(feature = "tracing")]
704          tracing::debug!(
705            "create segment node ({} bytes) at {}, next segment {next_node_offset}",
706            segment_node.data_size,
707            segment_node.ptr_offset
708          );
709
710          self.increase_discarded(segment_node.data_offset - segment_node.ptr_offset);
711          return true;
712        }
713        Err(current) => {
714          let (size, _) = decode_segment_node(current);
715          // the current is removed from the list, then we need to refind the position.
716          if size == REMOVED_SEGMENT_NODE {
717            // wait other thread to make progress.
718            backoff.snooze();
719          } else {
720            backoff.spin();
721          }
722        }
723      }
724    }
725  }
726
727  fn pessimistic_dealloc(&self, offset: u32, size: u32) -> bool {
728    // check if we have enough space to allocate a new segment in this segment.
729    let Some(segment_node) = self.try_new_segment(offset, size) else {
730      return false;
731    };
732
733    let backoff = Backoff::new();
734
735    loop {
736      let (current_node_size_and_next_node_offset, current) = self
737        .find_position(segment_node.data_size, |val, next_node_size| {
738          val <= next_node_size
739        });
740      let (node_size, next_node_offset) =
741        decode_segment_node(current_node_size_and_next_node_offset);
742
743      if node_size == REMOVED_SEGMENT_NODE {
744        // wait other thread to make progress.
745        backoff.snooze();
746        continue;
747      }
748
749      if segment_node.ptr_offset == next_node_offset {
750        // we found ourselves, then we need to refind the position.
751        backoff.snooze();
752        continue;
753      }
754
755      segment_node.update_next_node(next_node_offset);
756
757      match current.compare_exchange(
758        current_node_size_and_next_node_offset,
759        encode_segment_node(node_size, segment_node.ptr_offset),
760        Ordering::AcqRel,
761        Ordering::Relaxed,
762      ) {
763        Ok(_) => {
764          #[cfg(feature = "tracing")]
765          tracing::debug!(
766            "create segment node ({} bytes) at {}, next segment {next_node_offset}",
767            segment_node.data_size,
768            segment_node.ptr_offset
769          );
770
771          self.increase_discarded(segment_node.data_offset - segment_node.ptr_offset);
772          return true;
773        }
774        Err(current) => {
775          let (size, _) = decode_segment_node(current);
776          // the current is removed from the list, then we need to refind the position.
777          if size == REMOVED_SEGMENT_NODE {
778            // wait other thread to make progress.
779            backoff.snooze();
780          } else {
781            backoff.spin();
782          }
783        }
784      }
785    }
786  }
787
788  fn alloc_bytes_in(&self, size: u32) -> Result<Option<Meta>, Error> {
789    if self.ro {
790      return Err(Error::ReadOnly);
791    }
792
793    if size == 0 {
794      return Ok(None);
795    }
796    let header = self.header();
797    let mut allocated = header.allocated.load(Ordering::Acquire);
798
799    loop {
800      let want = allocated + size;
801      if want > self.cap {
802        break;
803      }
804
805      match header.allocated.compare_exchange_weak(
806        allocated,
807        want,
808        Ordering::SeqCst,
809        Ordering::Acquire,
810      ) {
811        Ok(offset) => {
812          #[cfg(feature = "tracing")]
813          tracing::debug!("allocate {} bytes at offset {} from memory", size, offset);
814
815          let allocated = Meta::new(self.ptr as _, offset, size);
816          unsafe { allocated.clear(self) };
817          return Ok(Some(allocated));
818        }
819        Err(x) => allocated = x,
820      }
821    }
822
823    // allocate through slow path
824    let mut i = 0;
825
826    loop {
827      match self.freelist {
828        Freelist::None => {
829          return Err(Error::InsufficientSpace {
830            requested: size,
831            available: self.remaining() as u32,
832          });
833        }
834        Freelist::Optimistic => match self.alloc_slow_path_optimistic(size) {
835          Ok(bytes) => return Ok(Some(bytes)),
836          Err(e) => {
837            if i == self.max_retries - 1 {
838              return Err(e);
839            }
840          }
841        },
842        Freelist::Pessimistic => match self.alloc_slow_path_pessimistic(size) {
843          Ok(bytes) => return Ok(Some(bytes)),
844          Err(e) => {
845            if i == self.max_retries - 1 {
846              return Err(e);
847            }
848          }
849        },
850      }
851
852      i += 1;
853    }
854  }
855
856  // #[cfg(all(feature = "memmap", not(target_family = "wasm")))]
857  // fn alloc_bytes_within_page_in(&self, size: u32) -> Result<Option<Meta>, Error> {
858  //   if self.ro {
859  //     return Err(Error::ReadOnly);
860  //   }
861
862  //   if size == 0 {
863  //     return Ok(None);
864  //   }
865
866  //   if size > self.page_size {
867  //     return Err(Error::larger_than_page_size(size, self.page_size));
868  //   }
869
870  //   let header = self.header();
871  //   let mut allocated = header.allocated.load(Ordering::Acquire);
872  //   let mut padding_to_next_page = 0;
873  //   let want = loop {
874  //     let page_boundary = self.nearest_page_boundary(allocated);
875  //     let mut want = allocated + size;
876
877  //     // Ensure that the allocation will fit within page
878  //     if want > page_boundary {
879  //       // Adjust the allocation to start at the next page boundary
880  //       padding_to_next_page = page_boundary - allocated;
881  //       want += padding_to_next_page;
882  //     }
883
884  //     if want > self.cap {
885  //       break want;
886  //     }
887
888  //     match header.allocated.compare_exchange_weak(
889  //       allocated,
890  //       want,
891  //       Ordering::SeqCst,
892  //       Ordering::Acquire,
893  //     ) {
894  //       Ok(offset) => {
895  //         #[cfg(feature = "tracing")]
896  //         tracing::debug!(
897  //           "allocate {} bytes at offset {} from memory",
898  //           size + padding_to_next_page,
899  //           offset
900  //         );
901
902  //         let mut allocated = Meta::new(self.ptr as _, offset, size + padding_to_next_page);
903  //         allocated.ptr_offset = allocated.memory_offset + padding_to_next_page;
904  //         allocated.ptr_size = size;
905  //         unsafe { allocated.clear(self) };
906
907  //         #[cfg(all(test, feature = "memmap", not(target_family = "wasm")))]
908  //         self.check_page_boundary(allocated.ptr_offset, allocated.ptr_size);
909
910  //         return Ok(Some(allocated));
911  //       }
912  //       Err(x) => allocated = x,
913  //     }
914  //   };
915
916  //   Err(Error::InsufficientSpace {
917  //     requested: want - allocated,
918  //     available: self.remaining() as u32,
919  //   })
920  // }
921
922  // #[cfg(all(feature = "memmap", not(target_family = "wasm")))]
923  // #[inline]
924  // fn nearest_page_boundary(&self, offset: u32) -> u32 {
925  //   // Calculate the nearest page boundary after the offset
926  //   let remainder = offset % self.page_size;
927  //   if remainder == 0 {
928  //     offset // Already at a page boundary
929  //   } else {
930  //     offset + (self.page_size - remainder)
931  //   }
932  // }
933
934  fn alloc_aligned_bytes_in<T>(&self, extra: u32) -> Result<Option<Meta>, Error> {
935    if self.ro {
936      return Err(Error::ReadOnly);
937    }
938
939    if mem::size_of::<T>() == 0 {
940      return self.alloc_bytes_in(extra);
941    }
942
943    let header = self.header();
944    let mut allocated = header.allocated.load(Ordering::Acquire);
945
946    let want = loop {
947      let aligned_offset = align_offset::<T>(allocated);
948      let size = mem::size_of::<T>() as u32;
949      let want = aligned_offset + size + extra;
950      if want > self.cap {
951        break size + extra;
952      }
953
954      match header.allocated.compare_exchange_weak(
955        allocated,
956        want,
957        Ordering::SeqCst,
958        Ordering::Acquire,
959      ) {
960        Ok(offset) => {
961          let mut allocated = Meta::new(self.ptr as _, offset, want - offset);
962          allocated.align_bytes_to::<T>();
963          #[cfg(feature = "tracing")]
964          tracing::debug!(
965            "allocate {} bytes at offset {} from memory",
966            want - offset,
967            offset
968          );
969          return Ok(Some(allocated));
970        }
971        Err(x) => allocated = x,
972      }
973    };
974
975    // allocate through slow path
976    let mut i = 0;
977    loop {
978      match self.freelist {
979        Freelist::None => {
980          return Err(Error::InsufficientSpace {
981            requested: want,
982            available: self.remaining() as u32,
983          });
984        }
985        Freelist::Optimistic => {
986          match self.alloc_slow_path_optimistic(Self::pad::<T>() as u32 + extra) {
987            Ok(mut bytes) => {
988              bytes.align_bytes_to::<T>();
989              return Ok(Some(bytes));
990            }
991            Err(e) => {
992              if i == self.max_retries - 1 {
993                return Err(e);
994              }
995            }
996          }
997        }
998        Freelist::Pessimistic => {
999          match self.alloc_slow_path_pessimistic(Self::pad::<T>() as u32 + extra) {
1000            Ok(mut bytes) => {
1001              bytes.align_bytes_to::<T>();
1002              return Ok(Some(bytes));
1003            }
1004            Err(e) => {
1005              if i == self.max_retries - 1 {
1006                return Err(e);
1007              }
1008            }
1009          }
1010        }
1011      }
1012      i += 1;
1013    }
1014  }
1015
1016  // #[cfg(all(feature = "memmap", not(target_family = "wasm")))]
1017  // fn alloc_aligned_bytes_within_page_in<T>(&self, extra: u32) -> Result<Option<Meta>, Error> {
1018  //   if self.ro {
1019  //     return Err(Error::ReadOnly);
1020  //   }
1021
1022  //   let t_size = mem::size_of::<T>();
1023
1024  //   if t_size == 0 {
1025  //     return self.alloc_bytes_within_page_in(extra);
1026  //   }
1027
1028  //   let header = self.header();
1029  //   let mut allocated = header.allocated.load(Ordering::Acquire);
1030  //   let want = loop {
1031  //     let page_boundary = self.nearest_page_boundary(allocated);
1032  //     let mut aligned_offset = align_offset::<T>(allocated);
1033  //     let size = t_size as u32;
1034  //     let mut want = aligned_offset + size + extra;
1035  //     let mut estimated_size = want - allocated;
1036
1037  //     // Ensure that the allocation will fit within page
1038  //     if want > page_boundary {
1039  //       aligned_offset = align_offset::<T>(page_boundary);
1040  //       want = aligned_offset + size + extra;
1041  //       estimated_size = (aligned_offset - page_boundary) + size + extra;
1042  //     }
1043
1044  //     if estimated_size > self.page_size {
1045  //       return Err(Error::larger_than_page_size(estimated_size, self.page_size));
1046  //     }
1047
1048  //     if want > self.cap {
1049  //       break want;
1050  //     }
1051
1052  //     match header.allocated.compare_exchange_weak(
1053  //       allocated,
1054  //       want,
1055  //       Ordering::SeqCst,
1056  //       Ordering::Acquire,
1057  //     ) {
1058  //       Ok(offset) => {
1059  //         let mut allocated = Meta::new(self.ptr as _, offset, want - offset);
1060  //         allocated.ptr_offset = aligned_offset;
1061  //         allocated.ptr_size = size + extra;
1062  //         unsafe { allocated.clear(self) };
1063
1064  //         #[cfg(all(test, feature = "memmap", not(target_family = "wasm")))]
1065  //         self.check_page_boundary(allocated.ptr_offset, allocated.ptr_size);
1066
1067  //         #[cfg(feature = "tracing")]
1068  //         tracing::debug!(
1069  //           "allocate {} bytes at offset {} from memory",
1070  //           want - offset,
1071  //           offset
1072  //         );
1073  //         return Ok(Some(allocated));
1074  //       }
1075  //       Err(x) => allocated = x,
1076  //     }
1077  //   };
1078
1079  //   Err(Error::InsufficientSpace {
1080  //     requested: want,
1081  //     available: self.remaining() as u32,
1082  //   })
1083  // }
1084
1085  fn alloc_in<T>(&self) -> Result<Option<Meta>, Error> {
1086    if self.ro {
1087      return Err(Error::ReadOnly);
1088    }
1089
1090    let t_size = mem::size_of::<T>();
1091    if t_size == 0 {
1092      return Ok(None);
1093    }
1094
1095    let header = self.header();
1096    let mut allocated = header.allocated.load(Ordering::Acquire);
1097    let want = loop {
1098      let align_offset = align_offset::<T>(allocated);
1099      let size = t_size as u32;
1100      let want = align_offset + size;
1101      if want > self.cap {
1102        break size;
1103      }
1104
1105      match header.allocated.compare_exchange_weak(
1106        allocated,
1107        want,
1108        Ordering::SeqCst,
1109        Ordering::Acquire,
1110      ) {
1111        Ok(offset) => {
1112          let mut allocated = Meta::new(self.ptr as _, offset, want - offset);
1113          allocated.align_to::<T>();
1114
1115          #[cfg(feature = "tracing")]
1116          tracing::debug!(
1117            "allocate {} bytes at offset {} from memory",
1118            want - offset,
1119            offset
1120          );
1121
1122          unsafe { allocated.clear(self) };
1123          return Ok(Some(allocated));
1124        }
1125        Err(x) => allocated = x,
1126      }
1127    };
1128
1129    // allocate through slow path
1130    let mut i = 0;
1131
1132    loop {
1133      match self.freelist {
1134        Freelist::None => {
1135          return Err(Error::InsufficientSpace {
1136            requested: want,
1137            available: self.remaining() as u32,
1138          });
1139        }
1140        Freelist::Optimistic => match self.alloc_slow_path_optimistic(Self::pad::<T>() as u32) {
1141          Ok(mut allocated) => {
1142            allocated.align_to::<T>();
1143            return Ok(Some(allocated));
1144          }
1145          Err(e) => {
1146            if i == self.max_retries - 1 {
1147              return Err(e);
1148            }
1149          }
1150        },
1151        Freelist::Pessimistic => match self.alloc_slow_path_pessimistic(Self::pad::<T>() as u32) {
1152          Ok(mut allocated) => {
1153            allocated.align_to::<T>();
1154            return Ok(Some(allocated));
1155          }
1156          Err(e) => {
1157            if i == self.max_retries - 1 {
1158              return Err(e);
1159            }
1160          }
1161        },
1162      }
1163
1164      i += 1;
1165    }
1166  }
1167
1168  // #[cfg(all(feature = "memmap", not(target_family = "wasm")))]
1169  // fn alloc_within_page_in<T>(&self) -> Result<Option<Meta>, Error> {
1170  //   if self.ro {
1171  //     return Err(Error::ReadOnly);
1172  //   }
1173
1174  //   let t_size = mem::size_of::<T>();
1175
1176  //   if t_size == 0 {
1177  //     return Ok(None);
1178  //   }
1179
1180  //   if t_size as u32 > self.page_size {
1181  //     return Err(Error::larger_than_page_size(t_size as u32, self.page_size));
1182  //   }
1183
1184  //   let header = self.header();
1185  //   let mut allocated = header.allocated.load(Ordering::Acquire);
1186  //   let want = loop {
1187  //     let page_boundary = self.nearest_page_boundary(allocated);
1188  //     let mut aligned_offset = align_offset::<T>(allocated);
1189  //     let size = mem::size_of::<T>() as u32;
1190  //     let mut want = aligned_offset + size;
1191  //     let mut estimated_size = want - allocated;
1192
1193  //     // Ensure that the allocation will fit within page
1194  //     if want > page_boundary {
1195  //       aligned_offset = align_offset::<T>(page_boundary);
1196  //       want = aligned_offset + size;
1197  //       estimated_size = (aligned_offset - page_boundary) + size;
1198  //     }
1199
1200  //     if estimated_size > self.page_size {
1201  //       return Err(Error::larger_than_page_size(estimated_size, self.page_size));
1202  //     }
1203
1204  //     if want > self.cap {
1205  //       break want;
1206  //     }
1207
1208  //     match header.allocated.compare_exchange_weak(
1209  //       allocated,
1210  //       want,
1211  //       Ordering::SeqCst,
1212  //       Ordering::Acquire,
1213  //     ) {
1214  //       Ok(offset) => {
1215  //         let mut allocated = Meta::new(self.ptr as _, offset, want - offset);
1216  //         allocated.ptr_offset = aligned_offset;
1217  //         allocated.ptr_size = size;
1218  //         unsafe { allocated.clear(self) };
1219
1220  //         #[cfg(all(test, feature = "memmap", not(target_family = "wasm")))]
1221  //         self.check_page_boundary(allocated.ptr_offset, allocated.ptr_size);
1222
1223  //         #[cfg(feature = "tracing")]
1224  //         tracing::debug!(
1225  //           "allocate {} bytes at offset {} from memory",
1226  //           want - offset,
1227  //           offset
1228  //         );
1229
1230  //         unsafe { allocated.clear(self) };
1231  //         return Ok(Some(allocated));
1232  //       }
1233  //       Err(x) => allocated = x,
1234  //     }
1235  //   };
1236
1237  //   Err(Error::InsufficientSpace {
1238  //     requested: want,
1239  //     available: self.remaining() as u32,
1240  //   })
1241  // }
1242
1243  fn alloc_slow_path_pessimistic(&self, size: u32) -> Result<Meta, Error> {
1244    if self.ro {
1245      return Err(Error::ReadOnly);
1246    }
1247
1248    let backoff = Backoff::new();
1249
1250    loop {
1251      let Some(((prev_node_val, prev_node), (next_node_val, next_node))) =
1252        self.find_prev_and_next(size, |val, next_node_size| val <= next_node_size)
1253      else {
1254        return Err(Error::InsufficientSpace {
1255          requested: size,
1256          available: self.remaining() as u32,
1257        });
1258      };
1259
1260      let (prev_node_size, next_node_offset) = decode_segment_node(prev_node_val);
1261      if prev_node_size == REMOVED_SEGMENT_NODE {
1262        // the current node is marked as removed, wait other thread to make progress.
1263        backoff.snooze();
1264        continue;
1265      }
1266
1267      let (next_node_size, next_next_node_offset) = decode_segment_node(next_node_val);
1268      if next_node_size == REMOVED_SEGMENT_NODE {
1269        // the current node is marked as removed, wait other thread to make progress.
1270        backoff.snooze();
1271        continue;
1272      }
1273
1274      // mark next node as removed
1275      let removed_next = encode_segment_node(REMOVED_SEGMENT_NODE, next_next_node_offset);
1276      if next_node
1277        .compare_exchange(
1278          next_node_val,
1279          removed_next,
1280          Ordering::AcqRel,
1281          Ordering::Relaxed,
1282        )
1283        .is_err()
1284      {
1285        // wait other thread to make progress.
1286        backoff.snooze();
1287        continue;
1288      }
1289
1290      let remaining = next_node_size - size;
1291
1292      let segment_node = unsafe { Segment::from_offset(self, next_node_offset, next_node_size) };
1293
1294      // update the prev node to point to the next next node.
1295      let updated_prev = encode_segment_node(prev_node_size, next_next_node_offset);
1296      match prev_node.compare_exchange(
1297        prev_node_val,
1298        updated_prev,
1299        Ordering::AcqRel,
1300        Ordering::Relaxed,
1301      ) {
1302        Ok(_) => {
1303          #[cfg(feature = "tracing")]
1304          tracing::debug!(
1305            "allocate {} bytes at offset {} from segment",
1306            size,
1307            next_node_offset
1308          );
1309
1310          let mut memory_size = next_node_size;
1311          let data_end_offset = segment_node.data_offset + size;
1312          // check if the remaining is enough to allocate a new segment.
1313          if self.validate_segment(data_end_offset, remaining) {
1314            memory_size -= remaining;
1315            // We have successfully remove the head node from the list.
1316            // Then we can allocate the memory.
1317            // give back the remaining memory to the free list.
1318
1319            // Safety: the `next + size` is in bounds, and `node_size - size` is also in bounds.
1320            self.pessimistic_dealloc(data_end_offset, remaining);
1321          }
1322
1323          let mut allocated = Meta::new(self.ptr as _, segment_node.ptr_offset, memory_size);
1324          allocated.ptr_offset = segment_node.data_offset;
1325          allocated.ptr_size = size;
1326          unsafe {
1327            allocated.clear(self);
1328          }
1329          return Ok(allocated);
1330        }
1331        Err(current) => {
1332          let (node_size, _) = decode_segment_node(current);
1333          if node_size == REMOVED_SEGMENT_NODE {
1334            // the current node is marked as removed, wait other thread to make progress.
1335            backoff.snooze();
1336          } else {
1337            backoff.spin();
1338          }
1339        }
1340      }
1341    }
1342  }
1343
1344  /// It is like a pop operation, we will always allocate from the largest segment.
1345  fn alloc_slow_path_optimistic(&self, size: u32) -> Result<Meta, Error> {
1346    if self.ro {
1347      return Err(Error::ReadOnly);
1348    }
1349
1350    let backoff = Backoff::new();
1351    let header = self.header();
1352
1353    loop {
1354      let sentinel = header.sentinel.load(Ordering::Acquire);
1355      let (sentinel_node_size, head_node_offset) = decode_segment_node(sentinel);
1356
1357      // free list is empty
1358      if sentinel_node_size == SENTINEL_SEGMENT_NODE_SIZE
1359        && head_node_offset == SENTINEL_SEGMENT_NODE_OFFSET
1360      {
1361        return Err(Error::InsufficientSpace {
1362          requested: size,
1363          available: self.remaining() as u32,
1364        });
1365      }
1366
1367      if head_node_offset == REMOVED_SEGMENT_NODE {
1368        // the head node is marked as removed, wait other thread to make progress.
1369        backoff.snooze();
1370        continue;
1371      }
1372
1373      let head = self.get_segment_node(head_node_offset);
1374      let head_node_size_and_next_node_offset = head.load(Ordering::Acquire);
1375      let (head_node_size, next_node_offset) =
1376        decode_segment_node(head_node_size_and_next_node_offset);
1377
1378      if head_node_size == REMOVED_SEGMENT_NODE {
1379        // the head node is marked as removed, wait other thread to make progress.
1380        backoff.snooze();
1381        continue;
1382      }
1383
1384      // The larget segment does not have enough space to allocate, so just return err.
1385      if size > head_node_size {
1386        return Err(Error::InsufficientSpace {
1387          requested: size,
1388          available: head_node_size,
1389        });
1390      }
1391
1392      let remaining = head_node_size - size;
1393
1394      // Safety: the `next` and `node_size` are valid, because they just come from the sentinel.
1395      let segment_node = unsafe { Segment::from_offset(self, head_node_offset, head_node_size) };
1396
1397      if head_node_size == REMOVED_SEGMENT_NODE {
1398        // the head node is marked as removed, wait other thread to make progress.
1399        backoff.snooze();
1400        continue;
1401      }
1402
1403      // CAS to remove the current head
1404      let removed_head = encode_segment_node(REMOVED_SEGMENT_NODE, next_node_offset);
1405      if head
1406        .compare_exchange(
1407          head_node_size_and_next_node_offset,
1408          removed_head,
1409          Ordering::AcqRel,
1410          Ordering::Relaxed,
1411        )
1412        .is_err()
1413      {
1414        // wait other thread to make progress.
1415        backoff.snooze();
1416        continue;
1417      }
1418
1419      // We have successfully mark the head is removed, then we need to let sentinel node points to the next node.
1420      match header.sentinel.compare_exchange(
1421        sentinel,
1422        encode_segment_node(sentinel_node_size, next_node_offset),
1423        Ordering::AcqRel,
1424        Ordering::Relaxed,
1425      ) {
1426        Ok(_) => {
1427          #[cfg(feature = "tracing")]
1428          tracing::debug!(
1429            "allocate {} bytes at offset {} from segment",
1430            size,
1431            segment_node.data_offset
1432          );
1433
1434          let mut memory_size = head_node_size;
1435          let data_end_offset = segment_node.data_offset + size;
1436          // check if the remaining is enough to allocate a new segment.
1437          if self.validate_segment(data_end_offset, remaining) {
1438            memory_size -= remaining;
1439            // We have successfully remove the head node from the list.
1440            // Then we can allocate the memory.
1441            // give back the remaining memory to the free list.
1442
1443            // Safety: the `next + size` is in bounds, and `node_size - size` is also in bounds.
1444            self.optimistic_dealloc(data_end_offset, remaining);
1445          }
1446
1447          let mut allocated = Meta::new(self.ptr as _, segment_node.ptr_offset, memory_size);
1448          allocated.ptr_offset = segment_node.data_offset;
1449          allocated.ptr_size = size;
1450          unsafe {
1451            allocated.clear(self);
1452          }
1453          return Ok(allocated);
1454        }
1455        Err(current) => {
1456          let (node_size, _) = decode_segment_node(current);
1457          if node_size == REMOVED_SEGMENT_NODE {
1458            // The current head is removed from the list, wait other thread to make progress.
1459            backoff.snooze();
1460          } else {
1461            backoff.spin();
1462          }
1463        }
1464      }
1465    }
1466  }
1467
1468  fn discard_freelist_in(&self) -> u32 {
1469    let backoff = Backoff::new();
1470    let header = self.header();
1471    let mut discarded = 0;
1472    loop {
1473      let sentinel = header.sentinel.load(Ordering::Acquire);
1474      let (sentinel_node_size, head_node_offset) = decode_segment_node(sentinel);
1475
1476      // free list is empty
1477      if sentinel_node_size == SENTINEL_SEGMENT_NODE_SIZE
1478        && head_node_offset == SENTINEL_SEGMENT_NODE_OFFSET
1479      {
1480        return discarded;
1481      }
1482
1483      if head_node_offset == REMOVED_SEGMENT_NODE {
1484        // the head node is marked as removed, wait other thread to make progress.
1485        backoff.snooze();
1486        continue;
1487      }
1488
1489      let head = self.get_segment_node(head_node_offset);
1490      let head_node_size_and_next_node_offset = head.load(Ordering::Acquire);
1491      let (head_node_size, next_node_offset) =
1492        decode_segment_node(head_node_size_and_next_node_offset);
1493
1494      if head_node_size == REMOVED_SEGMENT_NODE {
1495        // the head node is marked as removed, wait other thread to make progress.
1496        backoff.snooze();
1497        continue;
1498      }
1499
1500      // Safety: the `next` and `node_size` are valid, because they just come from the sentinel.
1501      let segment_node = unsafe { Segment::from_offset(self, head_node_offset, head_node_size) };
1502
1503      if head_node_size == REMOVED_SEGMENT_NODE {
1504        // the head node is marked as removed, wait other thread to make progress.
1505        backoff.snooze();
1506        continue;
1507      }
1508
1509      // CAS to remove the current head
1510      let removed_head = encode_segment_node(REMOVED_SEGMENT_NODE, next_node_offset);
1511      if head
1512        .compare_exchange(
1513          head_node_size_and_next_node_offset,
1514          removed_head,
1515          Ordering::AcqRel,
1516          Ordering::Relaxed,
1517        )
1518        .is_err()
1519      {
1520        // wait other thread to make progress.
1521        backoff.snooze();
1522        continue;
1523      }
1524
1525      // We have successfully mark the head is removed, then we need to let sentinel node points to the next node.
1526      match header.sentinel.compare_exchange(
1527        sentinel,
1528        encode_segment_node(sentinel_node_size, next_node_offset),
1529        Ordering::AcqRel,
1530        Ordering::Relaxed,
1531      ) {
1532        Ok(_) => {
1533          // incresase the discarded memory.
1534          self.increase_discarded(segment_node.data_size);
1535          discarded += segment_node.data_size;
1536          continue;
1537        }
1538        Err(current) => {
1539          let (node_size, _) = decode_segment_node(current);
1540          if node_size == REMOVED_SEGMENT_NODE {
1541            // The current head is removed from the list, wait other thread to make progress.
1542            backoff.snooze();
1543          } else {
1544            backoff.spin();
1545          }
1546        }
1547      }
1548    }
1549  }
1550
1551  /// Returns `true` if this offset and size is valid for a segment node.
1552  #[inline]
1553  fn validate_segment(&self, offset: u32, size: u32) -> bool {
1554    if offset == 0 || size == 0 {
1555      return false;
1556    }
1557
1558    let aligned_offset = align_offset::<AtomicU64>(offset) as usize;
1559    let padding = aligned_offset - offset as usize;
1560    let segmented_node_size = padding + SEGMENT_NODE_SIZE;
1561    if segmented_node_size >= size as usize {
1562      return false;
1563    }
1564
1565    let available_bytes = size - segmented_node_size as u32;
1566    if available_bytes < self.header().min_segment_size.load(Ordering::Acquire) {
1567      return false;
1568    }
1569
1570    true
1571  }
1572
1573  #[inline]
1574  fn try_new_segment(&self, offset: u32, size: u32) -> Option<Segment> {
1575    if offset == 0 || size == 0 {
1576      return None;
1577    }
1578
1579    let aligned_offset = align_offset::<AtomicU64>(offset) as usize;
1580    let padding = aligned_offset - offset as usize;
1581    let segmented_node_size = padding + SEGMENT_NODE_SIZE;
1582    if segmented_node_size >= size as usize {
1583      self.increase_discarded(size);
1584      return None;
1585    }
1586
1587    let available_bytes = size - segmented_node_size as u32;
1588    if available_bytes < self.header().min_segment_size.load(Ordering::Acquire) {
1589      self.increase_discarded(size);
1590      return None;
1591    }
1592
1593    Some(Segment {
1594      ptr: self.ptr,
1595      ptr_offset: aligned_offset as u32,
1596      data_offset: (aligned_offset + SEGMENT_NODE_SIZE) as u32,
1597      data_size: available_bytes,
1598    })
1599  }
1600
1601  #[inline]
1602  fn get_segment_node(&self, offset: u32) -> &AtomicU64 {
1603    // Safety: the offset is in bounds and well aligned.
1604    unsafe {
1605      let ptr = self.ptr.add(offset as usize);
1606      &*(ptr as *const _)
1607    }
1608  }
1609
1610  #[inline]
1611  fn pad<T>() -> usize {
1612    let size = mem::size_of::<T>();
1613    let align = mem::align_of::<T>();
1614    size + align - 1
1615  }
1616
1617  // #[cfg(test)]
1618  // #[cfg(all(feature = "memmap", not(target_family = "wasm")))]
1619  // #[inline]
1620  // fn check_page_boundary(&self, offset: u32, len: u32) {
1621  //   if len == 0 {
1622  //     return; // A zero-length range is trivially within the same "page"
1623  //   }
1624
1625  //   // Calculate the page boundary of the start and end of the range
1626  //   let start_page = offset / self.page_size;
1627  //   let end_page = (offset + len - 1) / self.page_size;
1628
1629  //   assert_eq!(
1630  //     start_page, end_page,
1631  //     "start and end of range must be in the same page"
1632  //   );
1633  // }
1634
1635  #[cfg(test)]
1636  #[cfg(feature = "std")]
1637  #[allow(dead_code)]
1638  pub(super) fn print_segment_list(&self) {
1639    let header = self.header();
1640    let mut current: &AtomicU64 = &header.sentinel;
1641
1642    loop {
1643      let current_node = current.load(Ordering::Acquire);
1644      let (node_size, next_node_offset) = decode_segment_node(current_node);
1645
1646      if node_size == SENTINEL_SEGMENT_NODE_SIZE {
1647        if next_node_offset == SENTINEL_SEGMENT_NODE_OFFSET {
1648          break;
1649        }
1650
1651        current = self.get_segment_node(next_node_offset);
1652        continue;
1653      }
1654
1655      std::println!("node_size: {node_size}, next_node_offset: {next_node_offset}",);
1656
1657      if next_node_offset == SENTINEL_SEGMENT_NODE_OFFSET {
1658        break;
1659      }
1660      current = self.get_segment_node(next_node_offset);
1661    }
1662  }
1663}
1664
1665impl Drop for Arena {
1666  fn drop(&mut self) {
1667    unsafe {
1668      let memory_ptr = self.inner.as_ptr();
1669      let memory = &*memory_ptr;
1670      // `Memory` storage... follow the drop steps from Arc.
1671      if memory.refs().fetch_sub(1, Ordering::Release) != 1 {
1672        return;
1673      }
1674
1675      // This fence is needed to prevent reordering of use of the data and
1676      // deletion of the data.  Because it is marked `Release`, the decreasing
1677      // of the reference count synchronizes with this `Acquire` fence. This
1678      // means that use of the data happens before decreasing the reference
1679      // count, which happens before this fence, which happens before the
1680      // deletion of the data.
1681      //
1682      // As explained in the [Boost documentation][1],
1683      //
1684      // > It is important to enforce any possible access to the object in one
1685      // > thread (through an existing reference) to *happen before* deleting
1686      // > the object in a different thread. This is achieved by a "release"
1687      // > operation after dropping a reference (any access to the object
1688      // > through this reference must obviously happened before), and an
1689      // > "acquire" operation before deleting the object.
1690      //
1691      // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
1692      //
1693      // Thread sanitizer does not support atomic fences. Use an atomic load
1694      // instead.
1695      memory.refs().load(Ordering::Acquire);
1696      // Drop the data
1697      let mut memory = Box::from_raw(memory_ptr);
1698
1699      // Relaxed is enough here as we're in a drop, no one else can
1700      // access this memory anymore.
1701      memory.unmount();
1702    }
1703  }
1704}
1705
1706#[cfg(test)]
1707mod tests;