Skip to main content

atomic_matrix/
matrix.rs

1//! #A Atomic Matrix Core
2//!
3//! This module implements a high-velocity, lock-free memory arena designed for
4//! ultra-low latency IPC (Inter-Process Communication).
5//!
6//! # Theory of Operation: The Propagation Principle of Atomic Coalescence
7//! Unlike traditional allocators that use centralized mutexes or complex
8//! background garbage collection, the **AtomicMatrix** treats memory
9//! fragmentation as a fluid dynamics problem.
10//!
11//! 1. **Kinetic Healing:** Freeing a block triggers a "Ripple" ('coalesce')
12//! that propagates through the sector.
13//! 2. **Monotonicity:** Ripples only move backward (towards the sector origin)
14//! to prevent circular atomic dependencies and deadlocks.
15//! 3. **Permissive Concurrency:** If a thread encounters contention, it skips the
16//! block rather than blocking, relying on the high frequency of future operations
17//! to complete the healing.
18//!
19//! # Memory Topography
20//! The matrix is laid out linearly in a shared memory segment:
21//! ```text
22//! [ Init Guard (16b) ] [ AtomicMatrix Struct ] [ Padding ] [ Sector 0 ] [ Sector 1 ] ...
23//! ```
24//! Each **Sector** acts as a self-contained fault domain with its own boundary,
25//! preventing local fragmentation from "bleeding" into the entire matrix.
26//!
27//! # Safety & Atomicity
28//! All state transitions follow a strict 'STATE_FREE -> STATE_ALLOCATED ->
29//! STATE_ACKED -> STATE_COALESCING' lifecycle. Hardware-level memory fences
30//! (std::sync::atomic::fence) are utilized to ensure visibility across 16+ CPU
31//! cores without locking.
32
33use std::sync::atomic::{ AtomicU32, Ordering, fence };
34use std::marker::PhantomData;
35use std::fs::OpenOptions;
36use memmap2::MmapMut;
37use uuid::Uuid;
38
39const SYS_UNINITIALIZED: u32 = 0;
40const SYS_FORMATTING: u32 = 1;
41const SYS_READY: u32 = 2;
42
43const STATE_FREE: u32 = 0;
44const STATE_ALLOCATED: u32 = 1;
45const STATE_ACKED: u32 = 3;
46const STATE_COALESCING: u32 = 4;
47
48pub mod core {
49    use super::*;
50
51    /// Header structure that is written at the beginning of each block/sector
52    ///
53    /// The block is made entirely of atomic primitives to ensure safe reading
54    /// and manipulation across participant modules in the matrix.
55    #[derive(Debug)]
56    #[repr(C, align(16))]
57    pub struct BlockHeader {
58        pub size: AtomicU32,
59        pub state: AtomicU32,
60        pub prev_phys: AtomicU32,
61        pub next_free: AtomicU32,
62        pub prev_free: AtomicU32,
63    }
64
65    /// The structural core of the matrix.
66    ///
67    /// Its the non-blocking, SHM-backed memory arena, utilizing a segmented **TLSF
68    /// (Two-Level segregated fit)** inspired mapping for O(1) allocation, paired with
69    /// a custom **Kinetic Coalescing** logic.
70    ///
71    /// # Memory Layout
72    /// The matrix is designed to be mapped directly into '/dev/shm". It starts with
73    /// a 16-byte 'init_guard' followed by the struct itself, and then the sectorized
74    /// raw memory blocks.
75    #[repr(C)]
76    pub struct AtomicMatrix {
77        pub id: Uuid,
78        pub fl_bitmap: AtomicU32,
79        pub sl_bitmaps: [AtomicU32; 32],
80        pub matrix: [[AtomicU32; 8]; 32],
81        pub mmap: MmapMut,
82        pub sector_boundaries: [AtomicU32; 4],
83        pub total_size: u32,
84    }
85
86    /// A Relative Pointer to the block memory address, relative to the start of the
87    /// matrix inside the process memory scope.
88    ///
89    /// This RelativePointer is used to calculate the accurate address of the block its
90    /// related to. Providing a way for independent process to localize the data inside
91    /// their own mappings of the SHM segment.
92    ///
93    /// It also receives a PhantomData to inform the compiler we safely own whatever
94    /// generic type the caller has passed to this pointer.
95    pub struct RelativePtr<T> {
96        offset: u32,
97        _marker: PhantomData<T>,
98    }
99
100    /// A helper struct that provided the O(1) calculations to find the coordinates of
101    /// a block that suits exactly the requested buffer size, or the next available one
102    /// that can fit the message as well.
103    pub struct Mapping;
104
105    impl AtomicMatrix {
106        /// Initialized the matrix struct and returns it.
107        ///
108        /// This function will initialize both TLSF level flags, the matrix map for free
109        /// blocks, assign all the require metadata and return the ready to use object
110        ///
111        /// ### Params
112        /// @ptr: The pointer to the beginning of the matrix segment \
113        /// @id: The ID of this matrix instance \
114        /// @size: The total size of the SHM allocation
115        ///
116        /// ### Returns
117        /// A static, lifetime specified, reference to the matrix struct.
118        pub fn init(ptr: *mut AtomicMatrix, id: Uuid, size: u32) -> &'static mut Self {
119            unsafe {
120                let matrix = &mut *ptr;
121                matrix.fl_bitmap.store(0, Ordering::Release);
122                for i in 0..32 {
123                    matrix.sl_bitmaps[i].store(0, Ordering::Release);
124                    for j in 0..8 {
125                        matrix.matrix[i][j].store(0, Ordering::Release);
126                    }
127                }
128                matrix.id = id;
129                matrix.total_size = size;
130                matrix
131            }
132        }
133
134        /// The entry point of the matrix struct.
135        ///
136        /// It initializes SHM segment, bind to it, executes the initial formatting,
137        /// prepares both the matrix and handler structs and return the High-Level API
138        /// to the caller.
139        ///
140        /// ### Params:
141        /// @id: The ID of a new or existing matrix (if existing, will skip formatting and
142        /// just bind to it) \
143        /// @size: The SHM allocation size
144        ///
145        /// ### Returns
146        /// The matrix handler api, or an error to be handled
147        pub fn bootstrap(
148            id: Option<Uuid>,
149            size: usize,
150            sector_barriers: (u32, u32)
151        ) -> Result<crate::handlers::matrix_handler::MatrixHandler, String> {
152            let path_id = id.unwrap_or_else(Uuid::new_v4);
153            let path = format!("/dev/shm/{}", path_id);
154            let file = OpenOptions::new()
155                .read(true)
156                .write(true)
157                .create(true)
158                .open(&path)
159                .map_err(|e| e.to_string())?;
160
161            file.set_len(size as u64).map_err(|e| e.to_string())?;
162
163            let mut mmap = unsafe { MmapMut::map_mut(&file).map_err(|e| e.to_string())? };
164            let base_ptr = mmap.as_mut_ptr();
165
166            let init_guard = unsafe { &*(base_ptr as *const AtomicU32) };
167            let matrix_ptr = unsafe { base_ptr.add(16) as *mut AtomicMatrix };
168
169            if
170                init_guard
171                    .compare_exchange(
172                        SYS_UNINITIALIZED,
173                        SYS_FORMATTING,
174                        Ordering::SeqCst,
175                        Ordering::Relaxed
176                    )
177                    .is_ok()
178            {
179                let matrix = AtomicMatrix::init(matrix_ptr, path_id, size as u32);
180                matrix
181                    .sectorize(base_ptr, size, sector_barriers.0 as u8, sector_barriers.1 as u8)
182                    .unwrap();
183                init_guard.store(SYS_READY, Ordering::SeqCst);
184            } else {
185                while init_guard.load(Ordering::Acquire) != SYS_READY {
186                    std::hint::spin_loop();
187                }
188            }
189
190            Ok(crate::handlers::matrix_handler::MatrixHandler {
191                matrix: unsafe {
192                    &mut *matrix_ptr
193                },
194                mmap,
195            })
196        }
197
198        /// Sectorizes the SHM segment into three different zones of allocation. These
199        /// zones are classified as Small, Medium and Large.
200        ///
201        /// - **Small Sector:** For data objects between 32 bytes and 1 KB.
202        /// - **Medium Sector:** For data objects between 1 KB and 1 MB.
203        /// - **Large Sector:** For data objects bigger than 1 MB.
204        ///
205        /// This ensures three main safeties for the matrix:
206        ///
207        /// - **Size integrity:** Blocks with similar sizes are required to stay together,
208        /// ensuring that we don't deal with a huge size variety in coalescing.
209        /// - **Propagation granularity:** The healing propagation only occurs inside the
210        /// block sector, ensuring that high operation sectors dont cause a tide of coalescing
211        /// into lower operation sectors.
212        /// - **Seach optimization:** Since small blocks are always together, it reduces
213        /// the TLSF searching index as the size you need is almost always garanteed to
214        /// exist.
215        ///
216        /// The sectorize also limits sectors based on the choosen size for the matrix to
217        /// ensure that if we have a small matrix (e.g.: 1mb) we don't allocate a unneces-
218        /// sary large sector.
219        ///
220        /// ### Params:
221        /// @base_ptr: The starting offset of the SHM mapping. \
222        /// @total_file_size: The total size of SHM segment \
223        /// @mut small_percent: The desired size percentage of the small sector \
224        /// @mut medium_sector: The desired size percentage of the medium sector
225        ///
226        /// ### Returns:
227        /// Any error that arises from the sectorizing. Otherwise, an Ok flag.
228        pub fn sectorize(
229            &self,
230            base_ptr: *const u8,
231            total_file_size: usize,
232            mut small_percent: u8,
233            mut medium_percent: u8
234        ) -> Result<(), String> {
235            let matrix_size = std::mem::size_of::<AtomicMatrix>();
236            let mut current_offset = (16 + (matrix_size as u32) + 15) & !15;
237            let usable_space = (total_file_size as u32).saturating_sub(current_offset);
238
239            if total_file_size < 5 * 1024 * 1024 {
240                small_percent = 30;
241                medium_percent = 70;
242            }
243
244            let small_size =
245                ((((usable_space as u64) * (small_percent as u64)) / 100) as u32) & !15;
246            let medium_size =
247                ((((usable_space as u64) * (medium_percent as u64)) / 100) as u32) & !15;
248            let large_size = usable_space.saturating_sub(small_size).saturating_sub(medium_size);
249
250            let mut prev_phys = 0u32;
251            let mut sizes = vec![small_size, medium_size, large_size];
252            sizes.retain(|&s| s > 64);
253
254            for (i, size) in sizes.iter().enumerate() {
255                let (fl, sl) = Mapping::find_indices(*size);
256                self.create_and_insert_sector(base_ptr, current_offset, *size, prev_phys, fl, sl);
257                prev_phys = current_offset;
258                current_offset += size;
259
260                if i < 4 {
261                    self.sector_boundaries[i].store(current_offset, Ordering::Release);
262                }
263            }
264            Ok(())
265        }
266
267        /// Allocates a block in the matrix for the caller
268        ///
269        /// It acts as a greed allocator, ensuring each call will either get a block allocated
270        /// in the matrix, or it throws a OOM Contention flag. It achieves this by politely
271        /// trying to claim a block for itself. In case the CAS loop fails, it will simply jump
272        /// to the next free block on the chain, granting a lock-free allocation paradigm.
273        ///
274        /// Each allocation is allowed to retry itself 512 times to confirm the matrix is
275        /// indeed out of memory before killing the execution of the function.
276        ///
277        /// ### Params:
278        /// @base_ptr: The starting offset of the SHM mapping. \
279        /// @size: The allocation size of the block
280        ///
281        /// ### Returns:
282        /// Either the relative pointer to the allocated block, or the OOM Contention flag.
283        pub fn allocate(&self, base_ptr: *const u8, size: u32) -> Result<RelativePtr<u8>, String> {
284            let size = (size + 15) & !15;
285            let size = size.max(32);
286            let (fl, sl) = Mapping::find_indices(size);
287
288            for _ in 0..512 {
289                if let Some((f_fl, f_sl)) = self.find_suitable_block(fl, sl) {
290                    if let Ok(offset) = self.remove_free_block(base_ptr, f_fl, f_sl) {
291                        unsafe {
292                            let header = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
293                            let total_size = header.size.load(Ordering::Acquire);
294
295                            if total_size >= size + 32 + 16 {
296                                let rem_size = total_size - size;
297                                let next_off = offset + size;
298                                let next_h = &mut *(
299                                    base_ptr.add(next_off as usize) as *mut BlockHeader
300                                );
301
302                                next_h.size.store(rem_size, Ordering::Release);
303                                next_h.state.store(STATE_FREE, Ordering::Release);
304                                next_h.prev_phys.store(offset, Ordering::Release);
305                                next_h.next_free.store(0, Ordering::Release);
306
307                                header.size.store(size, Ordering::Release);
308                                fence(Ordering::SeqCst);
309
310                                let (r_fl, r_sl) = Mapping::find_indices(rem_size);
311                                self.insert_free_block(base_ptr, next_off, r_fl, r_sl);
312                            }
313                            header.state.store(STATE_ALLOCATED, Ordering::Release);
314                            return Ok(RelativePtr::new(offset + 32));
315                        }
316                    }
317                }
318                std::hint::spin_loop();
319            }
320            Err("OOM: Contention".into())
321        }
322
323        /// Acknowledges the freedon of a block and pushes it to the to_be_freed queue.
324        ///
325        /// If the to_be_freed queue is full, it will imediatelly trigger the drainage
326        /// of the queue and coalesce every block present before trying to push the
327        /// newly ack block into the queue. If there is space available, simply push
328        /// it and move on
329        ///
330        /// ### Params:
331        /// @ptr: The relative pointer of the block to acknowledge \
332        /// @base_ptr: The offset from the start of the SHM segment.
333        pub fn ack(&self, ptr: &RelativePtr<BlockHeader>, base_ptr: *const u8) {
334            unsafe {
335                let header = ptr.resolve_mut(base_ptr);
336
337                header.state.store(STATE_ACKED, Ordering::Release);
338            }
339
340            self.coalesce(ptr, base_ptr);
341        }
342
343        /// Tries to merge neighbouring blocks to the left until the end of the matrix is
344        /// reached or the neighbour block is not ACKED/FREE.
345        ///
346        /// This is the elegant implementation of the Kinetic Coalescence processes. It
347        /// receives the initial block that will start the ripple, and traverse the matrix
348        /// to the left (monotonicity guard). If any race conditions are met in the middle
349        /// (another coalescing just start, or a module just claimed this block), it will
350        /// stop the coalescing and move on (permissive healing).
351        ///
352        /// Then it tries to update the next neighbour previous physical offset metadata to
353        /// the start of the new free block. If this exchange fails due to end of sector, or
354        /// just claimed blocks, it will skip this marking in hopes that when this block is
355        /// eventually coalesced, it will passivelly merge backwards with the ripple and fix
356        /// the marking on its header by himself (horizon boundary).
357        ///
358        /// This three core implementations together composes the Propagation Principle of
359        /// Atomic Coalescence and enables the matrix to have such high throughput speeds.
360        ///
361        /// ### Params:
362        /// @ptr: The relative pointer of the block to coalesce. \
363        /// @base_ptr: The offset from the start of the SHM segment.
364        ///
365        /// ### Throws:
366        /// TidalRippleContentionError: Two coalescing ripples executing simultaneously on
367        /// the same blocks.
368        pub fn coalesce(&self, ptr: &RelativePtr<BlockHeader>, base_ptr: *const u8) {
369            unsafe {
370                let current_offset = ptr.offset();
371                let mut current_header = ptr.resolve_mut(base_ptr);
372                let mut total_size = current_header.size.load(Ordering::Acquire);
373                if total_size < 32 {
374                    return;
375                }
376
377                let mut final_offset = current_offset;
378
379                while current_offset > 16 {
380                    let prev_phys_offset = current_header.prev_phys.load(Ordering::Acquire);
381
382                    if prev_phys_offset == 0 {
383                        break;
384                    }
385
386                    let prev_header_ptr = base_ptr.add(
387                        prev_phys_offset as usize
388                    ) as *mut BlockHeader;
389                    let prev_header = &mut *prev_header_ptr;
390
391                    let res = prev_header.state.compare_exchange(
392                        STATE_FREE,
393                        STATE_COALESCING,
394                        Ordering::Acquire,
395                        Ordering::Relaxed
396                    );
397
398                    let claimed = if res.is_ok() {
399                        true
400                    } else {
401                        prev_header.state
402                            .compare_exchange(
403                                STATE_ACKED,
404                                STATE_COALESCING,
405                                Ordering::Acquire,
406                                Ordering::Relaxed
407                            )
408                            .is_ok()
409                    };
410
411                    if claimed {
412                        let size_to_add = prev_header.size.swap(0, Ordering::Acquire);
413                        if size_to_add == 0 || prev_phys_offset >= current_offset {
414                            break;
415                        }
416
417                        if size_to_add > self.total_size {
418                            break;
419                        }
420
421                        let (fl, sl) = Mapping::find_indices(size_to_add);
422
423                        total_size = total_size
424                            .checked_add(size_to_add)
425                            .expect("TidalRippleCoalescingError.");
426                        final_offset = prev_phys_offset;
427                        current_header = prev_header;
428                        self.remove_free_block(base_ptr, fl, sl).ok();
429                    } else {
430                        break;
431                    }
432                }
433
434                let sector_limit = self.sector_end_offset(final_offset);
435                if let Some(next_h_offset) = final_offset.checked_add(total_size) {
436                    if next_h_offset < sector_limit {
437                        let next_h = &*(base_ptr.add(next_h_offset as usize) as *const BlockHeader);
438                        next_h.prev_phys.store(final_offset, Ordering::Release);
439                    }
440                }
441
442                current_header.size.store(total_size, Ordering::Release);
443                current_header.state.store(STATE_FREE, Ordering::Release);
444
445                let (fl, sl) = Mapping::find_indices(total_size);
446                self.insert_free_block(base_ptr, final_offset, fl, sl);
447            }
448        }
449
450        /// Queries a block offset inside of the matrix.
451        ///
452        /// Not much to say about this, the name is pretty self explanatory.
453        ///
454        /// ### Params:
455        /// @offset: The offset of the block to be queried
456        ///
457        /// ### Returns:
458        /// The Relative Pointer to the queried block
459        pub fn query(&self, offset: u32) -> RelativePtr<u8> {
460            RelativePtr::new(offset + 32)
461        }
462
463        /// Queries the TLSF bitmaps in search of a block.
464        ///
465        /// It acquires the first most suitable index flag (according to the find
466        /// _indices function) and does a bitwise operation to check if it possesses an
467        /// available block. If it matches, return the coordinates of the FL and the
468        /// CTZ result from the SL. If it doesn't match, performs CTZ on the first level
469        /// to return the first available coordinate.
470        ///
471        /// ### Params:
472        /// @fl: Calculated first level coordinate \
473        /// @sl: Calculated second level coordinate
474        ///
475        /// ### Returns:
476        /// A tuple containing the FL/SL coordinates or nothing if there is no space
477        /// available in the matrix.
478        fn find_suitable_block(&self, fl: u32, sl: u32) -> Option<(u32, u32)> {
479            let sl_map = self.sl_bitmaps[fl as usize].load(Ordering::Acquire);
480            let m_sl = sl_map & (!0u32 << sl);
481            if m_sl != 0 {
482                return Some((fl, m_sl.trailing_zeros()));
483            }
484
485            let fl_map = self.fl_bitmap.load(Ordering::Acquire);
486            let m_fl = fl_map & (!0u32 << (fl + 1));
487            if m_fl != 0 {
488                let f_fl = m_fl.trailing_zeros();
489                if f_fl < 32 {
490                    let s_map = self.sl_bitmaps[f_fl as usize].load(Ordering::Acquire);
491                    if s_map != 0 {
492                        return Some((f_fl, s_map.trailing_zeros()));
493                    }
494                }
495            }
496            None
497        }
498
499        /// Pops a free block from the TLSF bitmap.
500        ///
501        /// It tries atomically claims ownership over the header inside the map. If
502        /// successful, swap the current head to next free head in the chain, or 0 if
503        /// there is none. If it fails, it automatically assumes someone claimed the
504        /// buffer first and calls a hint::spin loop instruction to retry claiming a
505        /// head. If, in one of the interactions, the bucket returs 0, it breaks the
506        /// function with an error.
507        ///
508        /// ### Params:
509        /// @base_ptr: The offset from the start of the SHM segment \
510        /// @fl: First level coordinates of the bucket \
511        /// @sl: Second level coordinates of the head.
512        ///
513        /// ### Returns
514        /// A result containing either the head of the newly acquired block, or an
515        /// EmptyBitmapError
516        fn remove_free_block(&self, base_ptr: *const u8, fl: u32, sl: u32) -> Result<u32, String> {
517            let head = &self.matrix[fl as usize][sl as usize];
518            loop {
519                let off = head.load(Ordering::Acquire);
520                if off == 0 {
521                    return Err("EmptyBitmapError".into());
522                }
523                let next = unsafe {
524                    (*(base_ptr.add(off as usize) as *const BlockHeader)).next_free.load(
525                        Ordering::Acquire
526                    )
527                };
528                if head.compare_exchange(off, next, Ordering::AcqRel, Ordering::Relaxed).is_ok() {
529                    if next == 0 {
530                        self.sl_bitmaps[fl as usize].fetch_and(!(1 << sl), Ordering::Release);
531                        if self.sl_bitmaps[fl as usize].load(Ordering::Acquire) == 0 {
532                            self.fl_bitmap.fetch_and(!(1 << fl), Ordering::Release);
533                        }
534                    }
535                    return Ok(off);
536                }
537                std::hint::spin_loop();
538            }
539        }
540
541        /// Stores a new header inside a bucket
542        ///
543        /// It does the exact oposite of the remove_free_block basically.
544        ///
545        /// ### Params:
546        /// @base_ptr: The offset from the beginning of the SHM segment \
547        /// @offset: The header offset to be inserted into the bucket \
548        /// @fl: The first level insertion coordinates \
549        /// @sl: The second level insertion coordinates
550        fn insert_free_block(&self, base_ptr: *const u8, offset: u32, fl: u32, sl: u32) {
551            let head = &self.matrix[fl as usize][sl as usize];
552            unsafe {
553                let h = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
554                loop {
555                    let old = head.load(Ordering::Acquire);
556                    h.next_free.store(old, Ordering::Release);
557                    if
558                        head
559                            .compare_exchange(old, offset, Ordering::AcqRel, Ordering::Relaxed)
560                            .is_ok()
561                    {
562                        break;
563                    }
564                    std::hint::spin_loop();
565                }
566            }
567            self.fl_bitmap.fetch_or(1 << fl, Ordering::Release);
568            self.sl_bitmaps[fl as usize].fetch_or(1 << sl, Ordering::Release);
569        }
570
571        /// Creates and formates the header of the sector, as well as pushing it into its
572        /// corresponding boundary position inside the matrix.
573        ///
574        /// ### Params:
575        /// @base_ptr: The offset from the start of the SHM segment \
576        /// @size: The size of the sector \
577        /// @prev: The previous sector, if any \
578        /// @fl: The first level coordinate based on po2 size scalling \
579        /// @sl: The second level coordinate based on 8 steps division size scalling
580        fn create_and_insert_sector(
581            &self,
582            base_ptr: *const u8,
583            offset: u32,
584            size: u32,
585            prev: u32,
586            fl: u32,
587            sl: u32
588        ) {
589            unsafe {
590                let h = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
591                h.size.store(size, Ordering::Release);
592                h.state.store(STATE_FREE, Ordering::Release);
593                h.prev_phys.store(prev, Ordering::Release);
594                h.next_free.store(0, Ordering::Release);
595                self.insert_free_block(base_ptr, offset, fl, sl);
596            }
597        }
598
599        /// Returns the boundary of the current sector
600        ///
601        /// It queries the boundaries from the metadata and check wheter the block fits or
602        /// not inside this sector.
603        ///
604        /// ### Params:
605        /// @current_offset: The offset to check against the sector.
606        ///
607        /// ### Returns:
608        /// Either the boundary value of the current sector, or the end of the segment.
609        fn sector_end_offset(&self, current_offset: u32) -> u32 {
610            for i in 0..4 {
611                let boundary = self.sector_boundaries[i].load(Ordering::Acquire);
612                if boundary == 0 {
613                    break;
614                }
615                if current_offset < boundary {
616                    return boundary;
617                }
618            }
619
620            self.mmap.len() as u32
621        }
622    }
623
624    impl<T> RelativePtr<T> {
625        /// Creates a new relative pointer based on the provided offset
626        ///
627        /// This initializes the pointer with the PhantomData ownership over the type we
628        /// are passing the the parameter
629        ///
630        /// ### Params:
631        /// @offset: The offset value to be wrapped in the pointer.
632        ///
633        /// ### Returns:
634        /// A instance of Self.
635        pub fn new(offset: u32) -> Self {
636            Self { offset, _marker: PhantomData }
637        }
638
639        /// Returns the offset value in the pointer
640        pub fn offset(&self) -> u32 {
641            self.offset
642        }
643
644        /// Resolves the header based on the base_ptr of the current caller process.
645        ///
646        /// This ensures that the pointer returned is actually mapped to the process local
647        /// memory scope
648        ///
649        /// ### Params:
650        /// @base_ptr: The offset from the start of the SHM segment.
651        ///
652        /// ### Returns:
653        /// A life time speficied reference to the header of this block
654        pub unsafe fn resolve_header<'a>(&self, base_ptr: *const u8) -> &'a BlockHeader {
655            unsafe { &*(base_ptr.add((self.offset as usize) - 32) as *mut BlockHeader) }
656        }
657
658        pub unsafe fn resolve_header_mut<'a>(&self, base_ptr: *const u8) -> &'a mut BlockHeader {
659            unsafe { &mut *(base_ptr.add((self.offset as usize) - 32) as *mut BlockHeader) }
660        }
661
662        /// Resolves the block scope based on the base_ptr of the current caller process.
663        ///
664        /// This ensures that the pointer returned is actually mapped to the process local
665        /// memory scope
666        ///
667        /// ### Params:
668        /// @base_ptr: The offset from the start of the SHM segment.
669        ///
670        /// ### Returns:
671        /// A life time specified reference to the block scope.
672        pub unsafe fn resolve<'a>(&self, base_ptr: *const u8) -> &'a T {
673            unsafe { &*(base_ptr.add(self.offset as usize) as *mut T) }
674        }
675
676        /// Resolves the block scope based on the base_ptr of the current caller process.
677        ///
678        /// This ensures that the pointer returned is actually mapped to the process local
679        /// memory scope
680        ///
681        /// ### Params:
682        /// @base_ptr: The offset from the start of the SHM segment.
683        ///
684        /// ### Returns:
685        /// A life time specified mutable reference to the block scope.
686        pub unsafe fn resolve_mut<'a>(&self, base_ptr: *const u8) -> &'a mut T {
687            unsafe { &mut *(base_ptr.add(self.offset as usize) as *mut T) }
688        }
689    }
690
691    impl Mapping {
692        /// Maps a block size to its corresponding (First-Level, Second-Level) indices.
693        ///
694        /// This function implements a two-level mapping strategy used for O(1) free-block
695        /// lookup, optimized for both high-velocity small allocations and logarithmic
696        /// scaling of large blocks.
697        ///
698        /// ### Mapping Logic:
699        /// - **Linear (Small):** For sizes < 128, it uses a fixed FL (0) and 16-byte SL
700        ///   subdivisions. This minimizes fragmentation for tiny objects.
701        /// - **Logarithmic (Large):** For sizes >= 128, FL is the power of 2 (determined via
702        ///   `leading_zeros`), and SL is a 3-bit subdivider of the range between 2^n and 2^(n+1).
703        ///
704        /// ### Mathematical Transformation:
705        /// - `FL = log2(size)`
706        /// - `SL = (size - 2^FL) / (2^(FL - 3))`
707        ///
708        /// ### Bounds:
709        /// Indices are clamped to `(31, 7)` to prevent overflow in the matrix bitmask.
710        ///
711        /// # Arguments
712        /// * `size` - The total byte size of the memory block.
713        ///
714        /// # Returns
715        /// A tuple of `(fl, sl)` indices.
716        pub fn find_indices(size: u32) -> (u32, u32) {
717            if size < 128 {
718                (0, (size / 16).min(7))
719            } else {
720                let fl = 31 - size.leading_zeros();
721                let sl = ((size >> (fl - 3)) & 0x7).min(7);
722                (fl.min(31), sl)
723            }
724        }
725    }
726}
727
728#[cfg(test)]
729mod tests {
730    use crate::matrix::core::{ BlockHeader, RelativePtr };
731
732    use super::*;
733
734    /// Test if the mapping function can return the correct indexes.
735    #[test]
736    fn test_mapping() {
737        assert_eq!(core::Mapping::find_indices(16), (0, 1));
738        assert_eq!(core::Mapping::find_indices(64), (0, 4));
739        assert_eq!(core::Mapping::find_indices(128), (7, 0));
740
741        let (fl, sl) = core::Mapping::find_indices(1024);
742        assert_eq!(fl, 10);
743        assert_eq!(sl, 0);
744    }
745
746    /// Test if the bootstrap function actually initializes the matrix, and allocates the
747    /// blocks on the correct bitmaps.
748    #[test]
749    fn test_initial_bootstrap() {
750        let size = 1024 * 1024;
751        let handler = core::AtomicMatrix
752            ::bootstrap(Some(uuid::Uuid::new_v4()), size, (100, 0))
753            .unwrap();
754
755        let bitmap = handler.matrix.fl_bitmap.load(Ordering::Acquire);
756
757        assert!(bitmap != 0, "FL bitmap should not be zero after sectorization");
758        assert!(
759            (bitmap & ((1 << 19) | (1 << 18))) != 0,
760            "FL bitmap should have bits set for the expected sectors (19 and 18 for 512KB and 256KB"
761        );
762    }
763
764    /// Test if the matrix can sectorize itself.
765    #[test]
766    fn test_initialization_integrity() {
767        let mut fake_shm = vec![0u8; 1024 * 1024];
768        let base_ptr = fake_shm.as_mut_ptr();
769
770        unsafe {
771            let matrix_ptr = base_ptr.add(16) as *mut core::AtomicMatrix;
772            let matrix = core::AtomicMatrix::init(
773                matrix_ptr,
774                uuid::Uuid::new_v4(),
775                fake_shm.len() as u32
776            );
777
778            matrix.sectorize(base_ptr, 1024 * 1024, 10, 20).unwrap();
779
780            assert!(matrix.fl_bitmap.load(Ordering::Relaxed) != 0);
781        }
782    }
783
784    /// Test allocation and sppliting logic by comparing the size of our buffer.
785    #[test]
786    fn test_allocation_and_spliting() {
787        let size = 1024 * 1024;
788        let mut handler = core::AtomicMatrix
789            ::bootstrap(Some(uuid::Uuid::new_v4()), size, (100, 0))
790            .unwrap();
791        let base_ptr = handler.mmap.as_mut_ptr();
792        let matrix = &mut *handler.matrix;
793
794        unsafe {
795            let rel_ptr = matrix.allocate(base_ptr, 64).unwrap();
796
797            let header = rel_ptr.resolve_header(base_ptr);
798
799            assert_eq!(header.size.load(Ordering::Acquire), 64);
800            assert_eq!(header.state.load(Ordering::Acquire), STATE_ALLOCATED);
801        }
802    }
803
804    /// Test coalesce logic to see if blocks will merge correctly.
805    #[test]
806    fn test_ack_and_coalesce() {
807        let size = 1024 * 1024;
808        let mut handler = core::AtomicMatrix
809            ::bootstrap(Some(uuid::Uuid::new_v4()), size, (100, 0))
810            .unwrap();
811        let base_ptr = handler.mmap.as_mut_ptr();
812        let matrix = &mut *handler.matrix;
813
814        unsafe {
815            let ptr_a = matrix.allocate(base_ptr, 64).unwrap();
816            let ptr_b = matrix.allocate(base_ptr, 64).unwrap();
817            let ptr_c = matrix.allocate(base_ptr, 64).unwrap();
818            let ptr_d = matrix.allocate(base_ptr, 64).unwrap();
819            let ptr_e = matrix.allocate(base_ptr, 64).unwrap();
820
821            let h_b = ptr_b.resolve_header(base_ptr);
822            let rel_c = RelativePtr::<BlockHeader>::new(ptr_c.offset() - 32);
823            let rel_d = RelativePtr::<BlockHeader>::new(ptr_d.offset() - 32);
824
825            h_b.state.store(STATE_FREE, Ordering::Release);
826            matrix.ack(&rel_c, base_ptr);
827            matrix.ack(&rel_d, base_ptr);
828
829            matrix.coalesce(&rel_d, base_ptr);
830
831            let h_a = ptr_a.resolve_header(base_ptr);
832            assert_eq!(h_a.state.load(Ordering::Acquire), STATE_ALLOCATED);
833
834            let h_merged = ptr_b.resolve_header(base_ptr);
835            assert_eq!(h_merged.state.load(Ordering::Acquire), STATE_FREE);
836            assert_eq!(h_merged.size.load(Ordering::Acquire), 256);
837
838            let h_e = ptr_e.resolve_header(base_ptr);
839            assert_eq!(h_e.state.load(Ordering::Acquire), STATE_ALLOCATED);
840        }
841    }
842
843    /// ----------------------------------------------------------------------------
844    /// STRESS TESTS
845    /// 
846    /// These are all ignored from the correctness suite so github doesn't get mad at
847    /// me. Before shipping, running these are explicitly required.
848    /// ----------------------------------------------------------------------------
849
850    /// Run 8.000.000 allocations in parallel (1.000.000 each) to test if the matrix
851    /// can hold without race conditions.
852    #[test]
853    #[ignore]
854    fn test_multithreaded_stress() {
855        // Quick author note:
856        //
857        // May God help us all at this moment.
858
859        use std::sync::{ Arc, Barrier };
860        use std::thread;
861        use std::collections::HashSet;
862
863        // We use 500MB matrix to allocate all the buffers
864        let size = 50 * 1024 * 1024;
865        let handler = core::AtomicMatrix
866            ::bootstrap(Some(uuid::Uuid::new_v4()), size, (40, 30))
867            .unwrap();
868
869        let thread_count = 8;
870        let allocs_per_second = 100_000;
871        let barrier = Arc::new(Barrier::new(thread_count));
872
873        // Track the failed allocs
874        let fail_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
875
876        let mut handles = vec![];
877
878        // Fuck the matrix! GO GO GO
879        for _ in 0..thread_count {
880            let b = Arc::clone(&barrier);
881            let base_addr = handler.mmap.as_ptr() as usize;
882            let matrix_addr = handler.matrix as *const core::AtomicMatrix as usize;
883            let fail_count_clone = Arc::clone(&fail_count);
884
885            handles.push(
886                thread::spawn(move || {
887                    let base_ptr = base_addr as *mut u8;
888                    let matrix = unsafe { &*(matrix_addr as *const core::AtomicMatrix) };
889                    let mut my_offsets = Vec::new();
890
891                    b.wait();
892
893                    for _ in 0..allocs_per_second {
894                        for _ in 0..10 {
895                            if let Ok(rel_ptr) = matrix.allocate(base_ptr, 64) {
896                                my_offsets.push(rel_ptr.offset());
897                                break;
898                            }
899                            fail_count_clone.fetch_add(1, Ordering::Relaxed);
900                            std::hint::spin_loop();
901                        }
902                    }
903
904                    my_offsets
905                })
906            );
907        }
908
909        // Collect everything we did and check
910        let mut all_offsets = Vec::new();
911        for h in handles {
912            all_offsets.extend(h.join().unwrap());
913        }
914
915        let total_obtained = all_offsets.len();
916        let unique_offsets: HashSet<_> = all_offsets.into_iter().collect();
917
918        // We allow for a 0.5% failure marging, as this stress test does not account for deallocations.
919        let success_percentage = ((thread_count * allocs_per_second) as f64) * 0.995;
920
921        // Assert we can obtain at least 99.5% of the expected allocations without collisions, which would
922        // indicate a potential race condition.
923        assert!(
924            total_obtained >= (success_percentage as usize),
925            "Total allocations should match expected count"
926        );
927        assert_eq!(
928            unique_offsets.len(),
929            total_obtained,
930            "RACE CONDITION DETECTED: Duplicate offsets found"
931        );
932        println!(
933            "Successfully allocated {} unique blocks across {} threads without collisions! {} allocations failed",
934            total_obtained,
935            thread_count,
936            fail_count.load(Ordering::Relaxed)
937        );
938    }
939
940    /// Test if the matrix can hold 10 minutes of 8 threads executing random alloc
941    /// and dealloc operations to ensure the Propagation Principle of Atomic 
942    /// Coalescence works.
943    #[test]
944    #[ignore]
945    fn test_long_term_fragmentation_healing() {
946        use std::sync::{ Arc, Barrier };
947        use std::thread;
948        use std::time::{ Instant, Duration };
949
950        const DURATION: u32 = 600;
951        const THREADS: u32 = 8;
952
953        let size = 50 * 1024 * 1024;
954        let handler = core::AtomicMatrix
955            ::bootstrap(Some(uuid::Uuid::new_v4()), size, (40, 30))
956            .unwrap();
957        let handler_arc = Arc::new(handler);
958        let barrier = Arc::new(Barrier::new(THREADS as usize));
959        let start_time = Instant::now();
960        let duration = Duration::from_secs(DURATION as u64);
961
962        let mut handles = vec![];
963
964        for t_id in 0..THREADS {
965            let h = Arc::clone(&handler_arc);
966            let b = Arc::clone(&barrier);
967
968            handles.push(
969                thread::spawn(move || {
970                    let base_ptr = h.mmap.as_ptr() as *mut u8;
971                    let matrix = &h.matrix;
972                    let mut my_blocks = Vec::new();
973                    let mut rng = t_id + 1;
974                    let mut total_ops = 0u64;
975
976                    b.wait();
977
978                    while start_time.elapsed() < duration {
979                        rng = rng.wrapping_mul(1103515245).wrapping_add(12345);
980
981                        if rng % 10 < 7 && my_blocks.len() < 200 {
982                            let alloc_size = (rng % 512) + 32;
983                            if let Ok(ptr) = matrix.allocate(base_ptr, alloc_size as u32) {
984                                my_blocks.push(ptr);
985                            }
986                        } else if !my_blocks.is_empty() {
987                            let idx = (rng as usize) % my_blocks.len();
988                            let ptr = my_blocks.swap_remove(idx);
989
990                            let header_ptr = RelativePtr::<BlockHeader>::new(ptr.offset() - 32);
991                            matrix.ack(&header_ptr, base_ptr);
992
993                            if total_ops % 5 == 0 {
994                                matrix.coalesce(&header_ptr, base_ptr);
995                            }
996                        }
997                        total_ops += 1;
998                    }
999                    (my_blocks, total_ops)
1000                })
1001            );
1002        }
1003
1004        let mut total_work = 0u64;
1005        for h in handles {
1006            let (remaining, thread_ops) = h.join().unwrap();
1007            total_work += thread_ops;
1008            let base_ptr = handler_arc.mmap.as_ptr() as *mut u8;
1009
1010            for ptr in remaining {
1011                let header_ptr = RelativePtr::<BlockHeader>::new(ptr.offset() - 32);
1012                handler_arc.matrix.ack(&header_ptr, base_ptr);
1013                handler_arc.matrix.coalesce(&header_ptr, base_ptr);
1014            }
1015        }
1016
1017        let mut free_count = 0;
1018        for fl in 0..32 {
1019            for sl in 0..8 {
1020                if handler_arc.matrix.matrix[fl][sl].load(Ordering::Acquire) != 0 {
1021                    free_count += 1;
1022                }
1023            }
1024        }
1025
1026        let entrophy_percentage = ((free_count as f64) / (total_work as f64)) * 100.0;
1027        let mops = (total_work as f64) / (DURATION as f64) / 1_000_000.0;
1028
1029        println!("Endurance Results (Duration of {} seconds)", DURATION);
1030        println!("Num of threads: {}", THREADS);
1031        println!("Total operations: {}", total_work);
1032        println!("Throughput: {:.2} Mop/s", (total_work as f64) / (DURATION as f64) / 1_000_000.0);
1033        println!("Final free fragments: {}", free_count);
1034        println!("Entrophy percentage: {}%", entrophy_percentage);
1035
1036        assert!(entrophy_percentage < 0.001, "Excessive Fragmentation");
1037        assert!(mops > 1.0, "Throughput regression: {:.2} Mop/s", mops);
1038    }
1039}