Skip to main content

atomic_matrix/
matrix.rs

1//! # Atomic Matrix Core
2//!
3//! This module implements a high-velocity, lock-free memory arena designed for
4//! ultra-low latency IPC (Inter-Process Communication).
5//!
6//! # Theory of Operation: The Propagation Principle of Atomic Coalescence
7//! Unlike traditional allocators that use centralized mutexes or complex
8//! background garbage collection, the **AtomicMatrix** treats memory
9//! fragmentation as a fluid dynamics problem.
10//!
11//! 1. **Kinetic Healing:** Freeing a block triggers a "Ripple" ('coalesce')
12//! that propagates through the sector.
13//! 2. **Monotonicity:** Ripples only move backward (towards the sector origin)
14//! to prevent circular atomic dependencies and deadlocks.
15//! 3. **Permissive Concurrency:** If a thread encounters contention, it skips the
16//! block rather than blocking, relying on the high frequency of future operations
17//! to complete the healing.
18//!
19//! # Memory Topography
20//! The matrix is laid out linearly in a shared memory segment:
21//! ```text
22//! [ Init Guard (16b) ] [ AtomicMatrix Struct ] [ Padding ] [ Sector 0 ] [ Sector 1 ] ...
23//! ```
24//! Each **Sector** acts as a self-contained fault domain with its own boundary,
25//! preventing local fragmentation from "bleeding" into the entire matrix.
26//!
27//! # Safety & Atomicity
28//! All state transitions follow a strict 'STATE_FREE -> STATE_ALLOCATED ->
29//! STATE_ACKED -> STATE_COALESCING' lifecycle. Hardware-level memory fences
30//! (std::sync::atomic::fence) are utilized to ensure visibility across 16+ CPU
31//! cores without locking.
32
33use std::sync::atomic::{ AtomicU32, Ordering, fence };
34use std::marker::PhantomData;
35use std::fs::{ OpenOptions };
36use memmap2::MmapMut;
37use uuid::Uuid;
38
39pub mod helpers {
40    use super::*;
41    
42    /// System initialization flags
43    pub const SYS_UNINITIALIZED: u32 = 0;
44    pub const SYS_FORMATTING: u32 = 1;
45    pub const SYS_READY: u32 = 2;
46
47    /// Header lifecycle flags
48    pub const STATE_FREE: u32 = 0;
49    pub const STATE_ALLOCATED: u32 = 1;
50    pub const STATE_ACKED: u32 = 2;
51    pub const STATE_COALESCING: u32 = 3;
52
53    pub const HEADER_SPACE: u32 = std::mem::size_of::<core::BlockHeader>() as u32 + 16;
54
55    /// A helper struct that provided the O(1) calculations to find the coordinates of
56    /// a block that suits exactly the requested buffer size, or the next available one
57    /// that can fit the message as well.
58    pub struct Mapping;
59
60    impl Mapping {
61        /// Maps a block size to its corresponding (First-Level, Second-Level) indices.
62        ///
63        /// This function implements a two-level mapping strategy used for O(1) free-block
64        /// lookup, optimized for both high-velocity small allocations and logarithmic
65        /// scaling of large blocks.
66        ///
67        /// ### Mapping Logic:
68        /// - **Linear (Small):** For sizes < 128, it uses a fixed FL (0) and 16-byte SL
69        ///   subdivisions. This minimizes fragmentation for tiny objects.
70        /// - **Logarithmic (Large):** For sizes >= 128, FL is the power of 2 (determined via
71        ///   `leading_zeros`), and SL is a 3-bit subdivider of the range between 2^n and 2^(n+1).
72        ///
73        /// ### Mathematical Transformation:
74        /// - `FL = log2(size)`
75        /// - `SL = (size - 2^FL) / (2^(FL - 3))`
76        ///
77        /// ### Bounds:
78        /// Indices are clamped to `(31, 7)` to prevent overflow in the matrix bitmask.
79        ///
80        /// # Arguments
81        /// * `size` - The total byte size of the memory block.
82        ///
83        /// # Returns
84        /// A tuple of `(fl, sl)` indices.
85        pub fn find_indices(size: u32) -> (u32, u32) {
86            if size < 128 {
87                (0, (size / 16).min(7))
88            } else {
89                let fl = 31 - size.leading_zeros();
90                let sl = ((size >> (fl - 3)) & 0x7).min(7);
91                (fl.min(31), sl)
92            }
93        }
94    }
95}
96
97pub mod core {
98    use crate::matrix::helpers::HEADER_SPACE;
99
100    use super::*;
101
102    /// Header structure that is written at the beginning of each block/sector
103    ///
104    /// The block is made entirely of atomic primitives to ensure safe reading
105    /// and manipulation across participant modules in the matrix.
106    #[derive(Debug)]
107    #[repr(C, align(16))]
108    pub struct BlockHeader {
109        pub size: AtomicU32,
110        pub state: AtomicU32,
111        pub prev_phys: AtomicU32,
112        pub next_free: AtomicU32,
113        pub prev_free: AtomicU32,
114    }
115
116    /// The structural core of the matrix.
117    ///
118    /// Its the non-blocking, SHM-backed memory arena, utilizing a segmented **TLSF
119    /// (Two-Level segregated fit)** inspired mapping for O(1) allocation, paired with
120    /// a custom **Kinetic Coalescing** logic.
121    ///
122    /// # Memory Layout
123    /// The matrix is designed to be mapped directly into '/dev/shm". It starts with
124    /// a 16-byte 'init_guard' followed by the struct itself, and then the sectorized
125    /// raw memory blocks.
126    #[repr(C)]
127    pub struct AtomicMatrix {
128        pub id: Uuid,
129        pub fl_bitmap: AtomicU32,
130        pub sl_bitmaps: [AtomicU32; 32],
131        pub matrix: [[AtomicU32; 8]; 32],
132        pub mmap: MmapMut,
133        pub sector_boundaries: [AtomicU32; 4],
134        pub total_size: u32,
135    }
136
137    /// A Relative Pointer to the block memory address, relative to the start of the
138    /// matrix inside the process memory scope.
139    ///
140    /// This RelativePointer is used to calculate the accurate address of the block its
141    /// related to. Providing a way for independent process to localize the data inside
142    /// their own mappings of the SHM segment.
143    ///
144    /// It also receives a PhantomData to inform the compiler we safely own whatever
145    /// generic type the caller has passed to this pointer.
146    #[derive(Debug)]
147    pub struct RelativePtr<T> {
148        offset: u32,
149        _marker: PhantomData<T>,
150    }
151
152    
153
154    impl AtomicMatrix {
155        /// Initialized the matrix struct and returns it.
156        ///
157        /// This function will initialize both TLSF level flags, the matrix map for free
158        /// blocks, assign all the require metadata and return the ready to use object
159        ///
160        /// ### Params
161        /// @ptr: The pointer to the beginning of the matrix segment \
162        /// @id: The ID of this matrix instance \
163        /// @size: The total size of the SHM allocation
164        ///
165        /// ### Returns
166        /// A static, lifetime specified, reference to the matrix struct.
167        fn init(ptr: *mut AtomicMatrix, id: Uuid, size: u32) -> &'static mut Self {
168            unsafe {
169                let matrix = &mut *ptr;
170                matrix.fl_bitmap.store(0, Ordering::Release);
171                for i in 0..32 {
172                    matrix.sl_bitmaps[i].store(0, Ordering::Release);
173                    for j in 0..8 {
174                        matrix.matrix[i][j].store(0, Ordering::Release);
175                    }
176                }
177                matrix.id = id;
178                matrix.total_size = size;
179                matrix
180            }
181        }
182
183        /// The entry point of the matrix struct.
184        ///
185        /// It initializes SHM segment, bind to it, executes the initial formatting,
186        /// prepares both the matrix and handler structs and return the High-Level API
187        /// to the caller.
188        ///
189        /// ### Params:
190        /// @id: The ID of a new or existing matrix. If one is provided, will skip 
191        /// formatting and just bind to it \
192        /// @size: The SHM allocation size
193        ///
194        /// ### Returns
195        /// The matrix handler api, or an error to be handled
196        pub fn bootstrap(
197            id: Option<Uuid>,
198            size: usize
199        ) -> Result<crate::handlers::MatrixHandler, String> {
200            let path_id = id.unwrap_or_else(Uuid::new_v4);
201            let path = format!("/dev/shm/{}", path_id);
202            let file = OpenOptions::new()
203                .read(true)
204                .write(true)
205                .create(true)
206                .open(&path)
207                .map_err(|e| e.to_string())?;
208
209            file.set_len(size as u64).map_err(|e| e.to_string())?;
210
211            let mut mmap = unsafe { MmapMut::map_mut(&file).map_err(|e| e.to_string())? };
212            let base_ptr = mmap.as_mut_ptr();
213
214            let init_guard = unsafe { &*(base_ptr as *const AtomicU32) };
215            let matrix_ptr = unsafe { base_ptr.add(16) as *mut AtomicMatrix };
216            let mut current_offset: u32 = 0;
217
218            if
219                init_guard
220                    .compare_exchange(
221                        helpers::SYS_UNINITIALIZED,
222                        helpers::SYS_FORMATTING,
223                        Ordering::SeqCst,
224                        Ordering::Relaxed
225                    )
226                    .is_ok()
227            {
228                let matrix = AtomicMatrix::init(matrix_ptr, path_id, size as u32);
229
230                let matrix_size = std::mem::size_of::<AtomicMatrix>();
231                current_offset = (16 + (matrix_size as u32)+ 15) & !15;
232                let remaining_size = size - (current_offset as usize);
233
234                let (fl, sl) = helpers::Mapping::find_indices(remaining_size as u32);
235
236                let header: &mut BlockHeader;
237                unsafe {
238                    header = &mut *(base_ptr.add(current_offset as usize) as *mut BlockHeader);
239                }
240
241                header.size.store(remaining_size as u32, Ordering::Release);
242                header.state.store(helpers::STATE_FREE, Ordering::Release);
243                header.prev_phys.store(0, Ordering::Release);
244                header.next_free.store(0, Ordering::Release);
245
246                matrix.insert_free_block(base_ptr, current_offset, fl, sl);
247                matrix.sector_boundaries[0].store(size as u32, Ordering::Release);
248
249                init_guard.store(helpers::SYS_READY, Ordering::SeqCst);
250            } else {
251                while init_guard.load(Ordering::Acquire) != helpers::SYS_READY {
252                    std::hint::spin_loop();
253                }
254            }
255
256            Ok(crate::handlers::MatrixHandler::new(
257                unsafe { &mut *matrix_ptr }, 
258                mmap, 
259                current_offset
260            ))
261        }
262
263        /// Allocates a block in the matrix for the caller
264        ///
265        /// It acts as a greed allocator, ensuring each call will either get a block allocated
266        /// in the matrix, or it throws a OOM Contention flag. It achieves this by politely
267        /// trying to claim a block for itself. In case the CAS loop fails, it will simply jump
268        /// to the next free block on the chain, granting a lock-free allocation paradigm.
269        ///
270        /// Each allocation is allowed to retry itself 512 times to confirm the matrix is
271        /// indeed out of memory before killing the execution of the function.
272        ///
273        /// ### Params:
274        /// @base_ptr: The starting offset of the SHM mapping. \
275        /// @size: The allocation size of the block
276        ///
277        /// ### Returns:
278        /// Either the relative pointer to the allocated block, or the OOM Contention flag.
279        pub fn allocate(&self, base_ptr: *const u8, size: u32) -> Result<RelativePtr<u8>, String> {
280            let size = (size + 15) & !15;
281            let size = size.max(32);
282            let (fl, sl) = helpers::Mapping::find_indices(size);
283
284            for _ in 0..512 {
285                if let Some((f_fl, f_sl)) = self.find_suitable_block(fl, sl) {
286                    if let Ok(offset) = self.remove_free_block(base_ptr, f_fl, f_sl) {
287                        unsafe {
288                            let header = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
289                            let total_size = header.size.load(Ordering::Acquire);
290
291                            if total_size >= size + helpers::HEADER_SPACE {
292                                let rem_size = total_size - (size + helpers::HEADER_SPACE);
293                                let next_off = offset + size + helpers::HEADER_SPACE;
294                                let next_h = &mut *(
295                                    base_ptr.add(next_off as usize) as *mut BlockHeader
296                                );
297
298                                next_h.size.store(rem_size, Ordering::Release);
299                                next_h.state.store(helpers::STATE_FREE, Ordering::Release);
300                                next_h.prev_phys.store(offset, Ordering::Release);
301                                next_h.next_free.store(0, Ordering::Release);
302
303                                header.size.store(size + helpers::HEADER_SPACE, Ordering::Release);
304                                fence(Ordering::SeqCst);
305
306                                let (r_fl, r_sl) = helpers::Mapping::find_indices(rem_size);
307                                self.insert_free_block(base_ptr, next_off, r_fl, r_sl);
308                            }
309                            header.state.store(helpers::STATE_ALLOCATED, Ordering::Release);
310                            return Ok(RelativePtr::new(offset + helpers::HEADER_SPACE));
311                        }
312                    }
313                }
314                std::hint::spin_loop();
315            }
316            Err("OOM: Contention".into())
317        }
318
319        /// Acknowledges the freedon of a block and pushes it to the to_be_freed queue.
320        ///
321        /// If the to_be_freed queue is full, it will imediatelly trigger the drainage
322        /// of the queue and coalesce every block present before trying to push the
323        /// newly ack block into the queue. If there is space available, simply push
324        /// it and move on
325        ///
326        /// ### Params:
327        /// @ptr: The relative pointer of the block to acknowledge \
328        /// @base_ptr: The offset from the start of the SHM segment.
329        pub fn ack(&self, ptr: &RelativePtr<BlockHeader>, base_ptr: *const u8) {
330            unsafe {
331                let header = ptr.resolve_mut(base_ptr);
332
333                header.state.store(helpers::STATE_ACKED, Ordering::Release);
334            }
335
336            self.coalesce(ptr, base_ptr);
337        }
338
339        /// Tries to merge neighbouring blocks to the left until the end of the matrix is
340        /// reached or the neighbour block is not ACKED/FREE.
341        ///
342        /// This is the elegant implementation of the Kinetic Coalescence processes. It
343        /// receives the initial block that will start the ripple, and traverse the matrix
344        /// to the left (monotonicity guard). If any race conditions are met in the middle
345        /// (another coalescing just start, or a module just claimed this block), it will
346        /// stop the coalescing and move on (permissive healing).
347        ///
348        /// Then it tries to update the next neighbour previous physical offset metadata to
349        /// the start of the new free block. If this exchange fails due to end of sector, or
350        /// just claimed blocks, it will skip this marking in hopes that when this block is
351        /// eventually coalesced, it will passivelly merge backwards with the ripple and fix
352        /// the marking on its header by himself (horizon boundary).
353        ///
354        /// This three core implementations together composes the Propagation Principle of
355        /// Atomic Coalescence and enables the matrix to have such high throughput speeds.
356        ///
357        /// ### Params:
358        /// @ptr: The relative pointer of the block to coalesce. \
359        /// @base_ptr: The offset from the start of the SHM segment.
360        ///
361        /// ### Throws:
362        /// TidalRippleContentionError: Two coalescing ripples executing simultaneously on
363        /// the same blocks.
364        pub fn coalesce(&self, ptr: &RelativePtr<BlockHeader>, base_ptr: *const u8) {
365            unsafe {
366                let current_offset = ptr.offset();
367                let mut current_header = ptr.resolve_mut(base_ptr);
368                let mut total_size = current_header.size.load(Ordering::Acquire);
369                if total_size < 32 {
370                    return;
371                }
372
373                let mut final_offset = current_offset;
374
375                while current_offset > 16 {
376                    let prev_phys_offset = current_header.prev_phys.load(Ordering::Acquire);
377
378                    if prev_phys_offset == 0 {
379                        break;
380                    }
381
382                    let prev_header_ptr = base_ptr.add(
383                        prev_phys_offset as usize
384                    ) as *mut BlockHeader;
385                    let prev_header = &mut *prev_header_ptr;
386
387                    let res = prev_header.state.compare_exchange(
388                        helpers::STATE_FREE,
389                        helpers::STATE_COALESCING,
390                        Ordering::Acquire,
391                        Ordering::Relaxed
392                    );
393
394                    let claimed = if res.is_ok() {
395                        true
396                    } else {
397                        prev_header.state
398                            .compare_exchange(
399                                helpers::STATE_ACKED,
400                                helpers::STATE_COALESCING,
401                                Ordering::Acquire,
402                                Ordering::Relaxed
403                            )
404                            .is_ok()
405                    };
406
407                    if claimed {
408                        let size_to_add = prev_header.size.swap(0, Ordering::Acquire);
409                        if size_to_add == 0 || prev_phys_offset >= current_offset {
410                            break;
411                        }
412
413                        if size_to_add > self.total_size {
414                            break;
415                        }
416
417                        let (fl, sl) = helpers::Mapping::find_indices(size_to_add);
418
419                        total_size = total_size
420                            .checked_add(size_to_add)
421                            .expect("TidalRippleCoalescingError.");
422                        final_offset = prev_phys_offset;
423                        current_header = prev_header;
424                        self.remove_free_block(base_ptr, fl, sl).ok();
425                    } else {
426                        break;
427                    }
428                }
429
430                let sector_limit = self.sector_end_offset(final_offset);
431                if let Some(next_h_offset) = final_offset.checked_add(total_size) {
432                    if next_h_offset < sector_limit {
433                        let next_h = &*(base_ptr.add(next_h_offset as usize) as *const BlockHeader);
434                        next_h.prev_phys.store(final_offset, Ordering::Release);
435                    }
436                }
437
438                current_header.size.store(total_size, Ordering::Release);
439                current_header.state.store(helpers::STATE_FREE, Ordering::Release);
440
441                let (fl, sl) = helpers::Mapping::find_indices(total_size);
442                self.insert_free_block(base_ptr, final_offset, fl, sl);
443            }
444        }
445
446        /// Queries a block offset inside of the matrix.
447        ///
448        /// Not much to say about this, the name is pretty self explanatory.
449        ///
450        /// ### Params:
451        /// @offset: The offset of the block to be queried
452        ///
453        /// ### Returns:
454        /// The Relative Pointer to the queried block
455        pub fn query(&self, offset: u32) -> RelativePtr<u8> {
456            RelativePtr::new(offset + helpers::HEADER_SPACE)
457        }
458
459        /// Queries the TLSF bitmaps in search of a block.
460        ///
461        /// It acquires the first most suitable index flag (according to the find
462        /// _indices function) and does a bitwise operation to check if it possesses an
463        /// available block. If it matches, return the coordinates of the FL and the
464        /// CTZ result from the SL. If it doesn't match, performs CTZ on the first level
465        /// to return the first available coordinate.
466        ///
467        /// ### Params:
468        /// @fl: Calculated first level coordinate \
469        /// @sl: Calculated second level coordinate
470        ///
471        /// ### Returns:
472        /// A tuple containing the FL/SL coordinates or nothing if there is no space
473        /// available in the matrix.
474        pub fn find_suitable_block(&self, fl: u32, sl: u32) -> Option<(u32, u32)> {
475            let sl_map = self.sl_bitmaps[fl as usize].load(Ordering::Acquire);
476            let m_sl = sl_map & (!0u32 << sl);
477            if m_sl != 0 {
478                return Some((fl, m_sl.trailing_zeros()));
479            }
480
481            let fl_map = self.fl_bitmap.load(Ordering::Acquire);
482            let m_fl = fl_map & (!0u32 << (fl + 1));
483            if m_fl != 0 {
484                let f_fl = m_fl.trailing_zeros();
485                if f_fl < 32 {
486                    let s_map = self.sl_bitmaps[f_fl as usize].load(Ordering::Acquire);
487                    if s_map != 0 {
488                        return Some((f_fl, s_map.trailing_zeros()));
489                    }
490                }
491            }
492            None
493        }
494
495        /// Pops a free block from the TLSF bitmap.
496        ///
497        /// It tries atomically claims ownership over the header inside the map. If
498        /// successful, swap the current head to next free head in the chain, or 0 if
499        /// there is none. If it fails, it automatically assumes someone claimed the
500        /// buffer first and calls a hint::spin loop instruction to retry claiming a
501        /// head. If, in one of the interactions, the bucket returs 0, it breaks the
502        /// function with an error.
503        ///
504        /// ### Params:
505        /// @base_ptr: The offset from the start of the SHM segment \
506        /// @fl: First level coordinates of the bucket \
507        /// @sl: Second level coordinates of the head.
508        ///
509        /// ### Returns
510        /// A result containing either the head of the newly acquired block, or an
511        /// EmptyBitmapError
512        pub fn remove_free_block(&self, base_ptr: *const u8, fl: u32, sl: u32) -> Result<u32, String> {
513            let head = &self.matrix[fl as usize][sl as usize];
514            loop {
515                let off = head.load(Ordering::Acquire);
516                if off == 0 {
517                    return Err("EmptyBitmapError".into());
518                }
519                let next = unsafe {
520                    (*(base_ptr.add(off as usize) as *const BlockHeader)).next_free.load(
521                        Ordering::Acquire
522                    )
523                };
524                if head.compare_exchange(off, next, Ordering::AcqRel, Ordering::Relaxed).is_ok() {
525                    if next == 0 {
526                        self.sl_bitmaps[fl as usize].fetch_and(!(1 << sl), Ordering::Release);
527                        if self.sl_bitmaps[fl as usize].load(Ordering::Acquire) == 0 {
528                            self.fl_bitmap.fetch_and(!(1 << fl), Ordering::Release);
529                        }
530                    }
531                    return Ok(off);
532                }
533                std::hint::spin_loop();
534            }
535        }
536
537        /// Stores a new header inside a bucket
538        ///
539        /// It does the exact oposite of the remove_free_block basically.
540        ///
541        /// ### Params:
542        /// @base_ptr: The offset from the beginning of the SHM segment \
543        /// @offset: The header offset to be inserted into the bucket \
544        /// @fl: The first level insertion coordinates \
545        /// @sl: The second level insertion coordinates
546        pub fn insert_free_block(&self, base_ptr: *const u8, offset: u32, fl: u32, sl: u32) {
547            let head = &self.matrix[fl as usize][sl as usize];
548            unsafe {
549                let h = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
550                loop {
551                    let old = head.load(Ordering::Acquire);
552                    h.next_free.store(old, Ordering::Release);
553                    if
554                        head
555                            .compare_exchange(old, offset, Ordering::AcqRel, Ordering::Relaxed)
556                            .is_ok()
557                    {
558                        break;
559                    }
560                    std::hint::spin_loop();
561                }
562            }
563            self.fl_bitmap.fetch_or(1 << fl, Ordering::Release);
564            self.sl_bitmaps[fl as usize].fetch_or(1 << sl, Ordering::Release);
565        }
566
567        /// Returns the boundary of the current sector
568        ///
569        /// It queries the boundaries from the metadata and check wheter the block fits or
570        /// not inside this sector.
571        ///
572        /// ### Params:
573        /// @current_offset: The offset to check against the sector.
574        ///
575        /// ### Returns:
576        /// Either the boundary value of the current sector, or the end of the segment.
577        pub fn sector_end_offset(&self, current_offset: u32) -> u32 {
578            for i in 0..4 {
579                let boundary = self.sector_boundaries[i].load(Ordering::Acquire);
580                if boundary == 0 {
581                    break;
582                }
583                if current_offset < boundary {
584                    return boundary;
585                }
586            }
587
588            self.mmap.len() as u32
589        }
590    }
591
592    impl<T> RelativePtr<T> {
593        /// Creates a new relative pointer based on the provided offset
594        ///
595        /// This initializes the pointer with the PhantomData ownership over the type we
596        /// are passing the the parameter
597        ///
598        /// ### Params:
599        /// @offset: The offset value to be wrapped in the pointer.
600        ///
601        /// ### Returns:
602        /// A instance of Self.
603        pub fn new(offset: u32) -> Self {
604            Self { offset, _marker: PhantomData }
605        }
606
607        /// Returns the offset value in the pointer
608        pub fn offset(&self) -> u32 {
609            self.offset
610        }
611
612        /// Resolves the header based on the base_ptr of the current caller process.
613        ///
614        /// This ensures that the pointer returned is actually mapped to the process local
615        /// memory scope
616        ///
617        /// ### Params:
618        /// @base_ptr: The offset from the start of the SHM segment.
619        ///
620        /// ### Returns:
621        /// A life time speficied reference to the header of this block
622        pub unsafe fn resolve_header<'a>(&self, base_ptr: *const u8) -> &'a BlockHeader {
623            unsafe { &*(base_ptr.add((self.offset as usize) - helpers::HEADER_SPACE as usize) as *mut BlockHeader) }
624        }
625
626        /// Resolves the header based on the base_ptr of the current caller process.
627        ///
628        /// This ensures that the pointer returned is actually mapped to the process local
629        /// memory scope
630        ///
631        /// ### Params:
632        /// @base_ptr: The offset from the start of the SHM segment.
633        ///
634        /// ### Returns:
635        /// A life time speficied mutable reference to the header of this block
636        pub unsafe fn resolve_header_mut<'a>(&self, base_ptr: *const u8) -> &'a mut BlockHeader {
637            unsafe { &mut *(base_ptr.add((self.offset as usize) - helpers::HEADER_SPACE as usize) as *mut BlockHeader) }
638        }
639
640        /// Resolves the block scope based on the base_ptr of the current caller process.
641        ///
642        /// This ensures that the pointer returned is actually mapped to the process local
643        /// memory scope
644        ///
645        /// ### Params:
646        /// @base_ptr: The offset from the start of the SHM segment.
647        ///
648        /// ### Returns:
649        /// A life time specified reference to the block scope.
650        pub unsafe fn resolve<'a>(&self, base_ptr: *const u8) -> &'a T {
651            unsafe { &*(base_ptr.add(self.offset as usize) as *mut T) }
652        }
653
654        /// Resolves the block scope based on the base_ptr of the current caller process.
655        ///
656        /// This ensures that the pointer returned is actually mapped to the process local
657        /// memory scope
658        ///
659        /// ### Params:
660        /// @base_ptr: The offset from the start of the SHM segment.
661        ///
662        /// ### Returns:
663        /// A life time specified mutable reference to the block scope.
664        pub unsafe fn resolve_mut<'a>(&self, base_ptr: *const u8) -> &'a mut T {
665            unsafe { &mut *(base_ptr.add(self.offset as usize) as *mut T) }
666        }
667
668        pub unsafe fn write(&self, base_ptr: *const u8, value: T) {
669            unsafe { std::ptr::write(self.resolve_mut(base_ptr), value) }
670        }
671    }
672
673    
674}
675
676#[cfg(test)]
677mod tests {
678    use crate::matrix::core::{ BlockHeader, RelativePtr };
679    use crate::handlers::HandlerFunctions;
680
681    use super::*;
682
683    /// Test if the mapping function can return the correct indexes.
684    #[test]
685    fn test_mapping() {
686        assert_eq!(helpers::Mapping::find_indices(16), (0, 1));
687        assert_eq!(helpers::Mapping::find_indices(64), (0, 4));
688        assert_eq!(helpers::Mapping::find_indices(128), (7, 0));
689
690        let (fl, sl) = helpers::Mapping::find_indices(1024);
691        assert_eq!(fl, 10);
692        assert_eq!(sl, 0);
693    }
694
695    /// Test if the bootstrap function actually initializes the matrix, and allocates the
696    /// blocks on the correct bitmaps.
697    #[test]
698    fn test_initial_bootstrap() {
699        let size = 1024 * 1024;
700        let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
701
702        let bitmap = handler.matrix().fl_bitmap.load(Ordering::Acquire);
703
704        assert!(bitmap != 0, "FL bitmap should not be zero after bootstrap");
705    }
706
707    /// Test allocation and sppliting logic by comparing the size of our buffer.
708    #[test]
709    fn test_allocation_and_spliting() {
710        let size = 1024 * 1024;
711        let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
712        let base_ptr = handler.base_ptr();
713        let matrix = handler.matrix();
714
715        unsafe {
716            let rel_ptr = matrix.allocate(base_ptr, 64).unwrap();
717
718            let header = rel_ptr.resolve_header(base_ptr);
719
720            assert_eq!(header.size.load(Ordering::Acquire), 64 + helpers::HEADER_SPACE);
721            assert_eq!(header.state.load(Ordering::Acquire), helpers::STATE_ALLOCATED);
722        }
723    }
724
725    /// Test coalesce logic to see if blocks will merge correctly.
726    #[test]
727    fn test_ack_and_coalesce() {
728        let size = 1024 * 1024;
729        let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
730        let base_ptr = handler.base_ptr();
731        let matrix = handler.matrix();
732
733        unsafe {
734            let ptr_a = matrix.allocate(base_ptr, 64).unwrap();
735            let ptr_b = matrix.allocate(base_ptr, 64).unwrap();
736            let ptr_c = matrix.allocate(base_ptr, 64).unwrap();
737            let ptr_d = matrix.allocate(base_ptr, 64).unwrap();
738            let ptr_e = matrix.allocate(base_ptr, 64).unwrap();
739
740            let h_b = ptr_b.resolve_header(base_ptr);
741            let rel_c = RelativePtr::<BlockHeader>::new(ptr_c.offset() - helpers::HEADER_SPACE);
742            let rel_d = RelativePtr::<BlockHeader>::new(ptr_d.offset() - helpers::HEADER_SPACE);
743
744            h_b.state.store(helpers::STATE_FREE, Ordering::Release);
745            matrix.ack(&rel_c, base_ptr);
746            matrix.ack(&rel_d, base_ptr);
747
748            matrix.coalesce(&rel_d, base_ptr);
749
750            let h_a = ptr_a.resolve_header(base_ptr);
751            assert_eq!(h_a.state.load(Ordering::Acquire), helpers::STATE_ALLOCATED);
752
753            let h_merged = ptr_b.resolve_header(base_ptr);
754            assert_eq!(h_merged.state.load(Ordering::Acquire), helpers::STATE_FREE);
755            assert_eq!(h_merged.size.load(Ordering::Acquire), 448);
756
757            let h_e = ptr_e.resolve_header(base_ptr);
758            assert_eq!(h_e.state.load(Ordering::Acquire), helpers::STATE_ALLOCATED);
759        }
760    }
761
762    /// ----------------------------------------------------------------------------
763    /// STRESS TESTS
764    ///
765    /// These are all ignored from the correctness suite so github doesn't get mad at
766    /// me. Before shipping, running these are explicitly required.
767    /// ----------------------------------------------------------------------------
768
769    /// Run 8.000.000 allocations in parallel (1.000.000 each) to test if the matrix
770    /// can hold without race conditions.
771    #[test]
772    #[ignore]
773    fn test_multithreaded_stress() {
774        // Quick author note:
775        //
776        // May God help us all at this moment.
777
778        use std::sync::{ Arc, Barrier };
779        use std::thread;
780        use std::collections::HashSet;
781
782        // We use 500MB matrix to allocate all the buffers
783        let size = 50 * 1024 * 1024;
784        let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
785
786        let thread_count = 8;
787        let allocs_per_second = 100_000;
788        let barrier = Arc::new(Barrier::new(thread_count));
789
790        // Track the failed allocs
791        let fail_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
792
793        let mut handles = vec![];
794
795        // Fuck the matrix! GO GO GO
796        for _ in 0..thread_count {
797            let b = Arc::clone(&barrier);
798            let base_addr = handler.base_ptr() as usize;
799            let matrix_addr = handler.matrix() as *const core::AtomicMatrix as usize;
800            let fail_count_clone = Arc::clone(&fail_count);
801
802            handles.push(
803                thread::spawn(move || {
804                    let base_ptr = base_addr as *mut u8;
805                    let matrix = unsafe { &*(matrix_addr as *const core::AtomicMatrix) };
806                    let mut my_offsets = Vec::new();
807
808                    b.wait();
809
810                    for _ in 0..allocs_per_second {
811                        for _ in 0..10 {
812                            if let Ok(rel_ptr) = matrix.allocate(base_ptr, 64) {
813                                my_offsets.push(rel_ptr.offset());
814                                break;
815                            }
816                            fail_count_clone.fetch_add(1, Ordering::Relaxed);
817                            std::hint::spin_loop();
818                        }
819                    }
820
821                    my_offsets
822                })
823            );
824        }
825
826        // Collect everything we did and check
827        let mut all_offsets = Vec::new();
828        for h in handles {
829            all_offsets.extend(h.join().unwrap());
830        }
831
832        let total_obtained = all_offsets.len();
833        let unique_offsets: HashSet<_> = all_offsets.into_iter().collect();
834
835        // We allow for a 0.5% failure marging, as this stress test does not account for deallocations.
836        let success_percentage = ((thread_count * allocs_per_second) as f64) * 0.995;
837
838        // Assert we can obtain at least 99.5% of the expected allocations without collisions, which would
839        // indicate a potential race condition.
840        assert!(
841            total_obtained >= (success_percentage as usize),
842            "Total allocations should match expected count"
843        );
844        assert_eq!(
845            unique_offsets.len(),
846            total_obtained,
847            "RACE CONDITION DETECTED: Duplicate offsets found"
848        );
849        println!(
850            "Successfully allocated {} unique blocks across {} threads without collisions! {} allocations failed",
851            total_obtained,
852            thread_count,
853            fail_count.load(Ordering::Relaxed)
854        );
855    }
856
857    /// Test if the matrix can hold 10 minutes of 8 threads executing random alloc
858    /// and dealloc operations to ensure the Propagation Principle of Atomic
859    /// Coalescence works.
860    #[test]
861    #[ignore]
862    fn test_long_term_fragmentation_healing() {
863        // Quick author note:
864        //
865        // May god help your device at this moment
866
867        use std::sync::{ Arc, Barrier };
868        use std::thread;
869        use std::time::{ Instant, Duration };
870
871        const DURATION: u32 = 600;
872        const THREADS: u32 = 8;
873
874        let size = 50 * 1024 * 1024;
875        let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
876        let handler_arc = Arc::new(handler);
877        let barrier = Arc::new(Barrier::new(THREADS as usize));
878        let start_time = Instant::now();
879        let duration = Duration::from_secs(DURATION as u64);
880
881        let mut handles = vec![];
882
883        for t_id in 0..THREADS {
884            let h = Arc::clone(&handler_arc);
885            let b = Arc::clone(&barrier);
886
887            handles.push(
888                thread::spawn(move || {
889                    let base_ptr = h.base_ptr() as *mut u8;
890                    let matrix = &h.matrix();
891                    let mut my_blocks = Vec::new();
892                    let mut rng = t_id + 1;
893                    let mut total_ops = 0u64;
894
895                    b.wait();
896
897                    while start_time.elapsed() < duration {
898                        rng = rng.wrapping_mul(1103515245).wrapping_add(12345);
899
900                        if rng % 10 < 7 && my_blocks.len() < 200 {
901                            let alloc_size = (rng % 512);
902                            if let Ok(ptr) = matrix.allocate(base_ptr, alloc_size as u32) {
903                                my_blocks.push(ptr);
904                            }
905                        } else if !my_blocks.is_empty() {
906                            let idx = (rng as usize) % my_blocks.len();
907                            let ptr = my_blocks.swap_remove(idx);
908
909                            let header_ptr = RelativePtr::<BlockHeader>::new(ptr.offset() - helpers::HEADER_SPACE);
910                            matrix.ack(&header_ptr, base_ptr);
911
912                            if total_ops % 5 == 0 {
913                                matrix.coalesce(&header_ptr, base_ptr);
914                            }
915                        }
916                        total_ops += 1;
917                    }
918                    (my_blocks, total_ops)
919                })
920            );
921        }
922
923        let mut total_work = 0u64;
924        for h in handles {
925            let (remaining, thread_ops) = h.join().unwrap();
926            total_work += thread_ops;
927            let base_ptr = handler_arc.base_ptr() as *mut u8;
928
929            for ptr in remaining {
930                let header_ptr = RelativePtr::<BlockHeader>::new(ptr.offset() - helpers::HEADER_SPACE);
931                handler_arc.matrix().ack(&header_ptr, base_ptr);
932                handler_arc.matrix().coalesce(&header_ptr, base_ptr);
933            }
934        }
935
936        let mut free_count = 0;
937        for fl in 0..32 {
938            for sl in 0..8 {
939                if handler_arc.matrix().matrix[fl][sl].load(Ordering::Acquire) != 0 {
940                    free_count += 1;
941                }
942            }
943        }
944
945        let entrophy_percentage = ((free_count as f64) / (total_work as f64)) * 100.0;
946        let mops = (total_work as f64) / (DURATION as f64) / 1_000_000.0;
947
948        println!("Endurance Results (Duration of {} seconds)", DURATION);
949        println!("Num of threads: {}", THREADS);
950        println!("Total operations: {}", total_work);
951        println!("Throughput: {:.2} Mop/s", (total_work as f64) / (DURATION as f64) / 1_000_000.0);
952        println!("Final free fragments: {}", free_count);
953        println!("Entrophy percentage: {}%", entrophy_percentage);
954
955        assert!(entrophy_percentage < 0.001, "Excessive Fragmentation");
956        assert!(mops > 1.0, "Throughput regression: {:.2} Mop/s", mops);
957    }
958}