Skip to main content

atomic_matrix/
matrix.rs

1//! # Atomic Matrix Core
2//!
3//! This module implements a high-velocity, lock-free memory arena designed for
4//! ultra-low latency IPC (Inter-Process Communication).
5//!
6//! # Theory of Operation: The Propagation Principle of Atomic Coalescence
7//! Unlike traditional allocators that use centralized mutexes or complex
8//! background garbage collection, the **AtomicMatrix** treats memory
9//! fragmentation as a fluid dynamics problem.
10//!
11//! 1. **Kinetic Healing:** Freeing a block triggers a "Ripple" ('coalesce')
12//! that propagates through the sector.
13//! 2. **Monotonicity:** Ripples only move backward (towards the sector origin)
14//! to prevent circular atomic dependencies and deadlocks.
15//! 3. **Permissive Concurrency:** If a thread encounters contention, it skips the
16//! block rather than blocking, relying on the high frequency of future operations
17//! to complete the healing.
18//!
19//! # Memory Topography
20//! The matrix is laid out linearly in a shared memory segment:
21//! ```text
22//! [ Init Guard (16b) ] [ AtomicMatrix Struct ] [ Padding ] [ Sector 0 ] [ Sector 1 ] ...
23//! ```
24//! Each **Sector** acts as a self-contained fault domain with its own boundary,
25//! preventing local fragmentation from "bleeding" into the entire matrix.
26//!
27//! # Safety & Atomicity
28//! All state transitions follow a strict 'STATE_FREE -> STATE_ALLOCATED ->
29//! STATE_ACKED -> STATE_COALESCING' lifecycle. Hardware-level memory fences
30//! (std::sync::atomic::fence) are utilized to ensure visibility across 16+ CPU
31//! cores without locking.
32
33use std::sync::atomic::{ AtomicU32, Ordering, fence };
34use std::marker::PhantomData;
35use std::fs::OpenOptions;
36use memmap2::MmapMut;
37use uuid::Uuid;
38
39pub mod helpers {
40    /// System initialization flags
41    pub const SYS_UNINITIALIZED: u32 = 0;
42    pub const SYS_FORMATTING: u32 = 1;
43    pub const SYS_READY: u32 = 2;
44
45    /// Header lifecycle flags
46    pub const STATE_FREE: u32 = 0;
47    pub const STATE_ALLOCATED: u32 = 1;
48    pub const STATE_ACKED: u32 = 2;
49    pub const STATE_COALESCING: u32 = 3;
50
51    /// A helper struct that provided the O(1) calculations to find the coordinates of
52    /// a block that suits exactly the requested buffer size, or the next available one
53    /// that can fit the message as well.
54    pub struct Mapping;
55
56    impl Mapping {
57        /// Maps a block size to its corresponding (First-Level, Second-Level) indices.
58        ///
59        /// This function implements a two-level mapping strategy used for O(1) free-block
60        /// lookup, optimized for both high-velocity small allocations and logarithmic
61        /// scaling of large blocks.
62        ///
63        /// ### Mapping Logic:
64        /// - **Linear (Small):** For sizes < 128, it uses a fixed FL (0) and 16-byte SL
65        ///   subdivisions. This minimizes fragmentation for tiny objects.
66        /// - **Logarithmic (Large):** For sizes >= 128, FL is the power of 2 (determined via
67        ///   `leading_zeros`), and SL is a 3-bit subdivider of the range between 2^n and 2^(n+1).
68        ///
69        /// ### Mathematical Transformation:
70        /// - `FL = log2(size)`
71        /// - `SL = (size - 2^FL) / (2^(FL - 3))`
72        ///
73        /// ### Bounds:
74        /// Indices are clamped to `(31, 7)` to prevent overflow in the matrix bitmask.
75        ///
76        /// # Arguments
77        /// * `size` - The total byte size of the memory block.
78        ///
79        /// # Returns
80        /// A tuple of `(fl, sl)` indices.
81        pub fn find_indices(size: u32) -> (u32, u32) {
82            if size < 128 {
83                (0, (size / 16).min(7))
84            } else {
85                let fl = 31 - size.leading_zeros();
86                let sl = ((size >> (fl - 3)) & 0x7).min(7);
87                (fl.min(31), sl)
88            }
89        }
90    }
91}
92
93pub mod core {
94    use super::*;
95
96    /// Header structure that is written at the beginning of each block/sector
97    ///
98    /// The block is made entirely of atomic primitives to ensure safe reading
99    /// and manipulation across participant modules in the matrix.
100    #[derive(Debug)]
101    #[repr(C, align(16))]
102    pub struct BlockHeader {
103        pub size: AtomicU32,
104        pub state: AtomicU32,
105        pub prev_phys: AtomicU32,
106        pub next_free: AtomicU32,
107        pub prev_free: AtomicU32,
108    }
109
110    /// The structural core of the matrix.
111    ///
112    /// Its the non-blocking, SHM-backed memory arena, utilizing a segmented **TLSF
113    /// (Two-Level segregated fit)** inspired mapping for O(1) allocation, paired with
114    /// a custom **Kinetic Coalescing** logic.
115    ///
116    /// # Memory Layout
117    /// The matrix is designed to be mapped directly into '/dev/shm". It starts with
118    /// a 16-byte 'init_guard' followed by the struct itself, and then the sectorized
119    /// raw memory blocks.
120    #[repr(C)]
121    pub struct AtomicMatrix {
122        pub id: Uuid,
123        pub fl_bitmap: AtomicU32,
124        pub sl_bitmaps: [AtomicU32; 32],
125        pub matrix: [[AtomicU32; 8]; 32],
126        pub mmap: MmapMut,
127        pub sector_boundaries: [AtomicU32; 4],
128        pub total_size: u32,
129    }
130
131    /// A Relative Pointer to the block memory address, relative to the start of the
132    /// matrix inside the process memory scope.
133    ///
134    /// This RelativePointer is used to calculate the accurate address of the block its
135    /// related to. Providing a way for independent process to localize the data inside
136    /// their own mappings of the SHM segment.
137    ///
138    /// It also receives a PhantomData to inform the compiler we safely own whatever
139    /// generic type the caller has passed to this pointer.
140    pub struct RelativePtr<T> {
141        offset: u32,
142        _marker: PhantomData<T>,
143    }
144
145    
146
147    impl AtomicMatrix {
148        /// Initialized the matrix struct and returns it.
149        ///
150        /// This function will initialize both TLSF level flags, the matrix map for free
151        /// blocks, assign all the require metadata and return the ready to use object
152        ///
153        /// ### Params
154        /// @ptr: The pointer to the beginning of the matrix segment \
155        /// @id: The ID of this matrix instance \
156        /// @size: The total size of the SHM allocation
157        ///
158        /// ### Returns
159        /// A static, lifetime specified, reference to the matrix struct.
160        fn init(ptr: *mut AtomicMatrix, id: Uuid, size: u32) -> &'static mut Self {
161            unsafe {
162                let matrix = &mut *ptr;
163                matrix.fl_bitmap.store(0, Ordering::Release);
164                for i in 0..32 {
165                    matrix.sl_bitmaps[i].store(0, Ordering::Release);
166                    for j in 0..8 {
167                        matrix.matrix[i][j].store(0, Ordering::Release);
168                    }
169                }
170                matrix.id = id;
171                matrix.total_size = size;
172                matrix
173            }
174        }
175
176        /// The entry point of the matrix struct.
177        ///
178        /// It initializes SHM segment, bind to it, executes the initial formatting,
179        /// prepares both the matrix and handler structs and return the High-Level API
180        /// to the caller.
181        ///
182        /// ### Params:
183        /// @id: The ID of a new or existing matrix (if existing, will skip formatting and
184        /// just bind to it) \
185        /// @size: The SHM allocation size
186        ///
187        /// ### Returns
188        /// The matrix handler api, or an error to be handled
189        pub fn bootstrap(
190            id: Option<Uuid>,
191            size: usize
192        ) -> Result<crate::handlers::MatrixHandler, String> {
193            let path_id = id.unwrap_or_else(Uuid::new_v4);
194            let path = format!("/dev/shm/{}", path_id);
195            let file = OpenOptions::new()
196                .read(true)
197                .write(true)
198                .create(true)
199                .open(&path)
200                .map_err(|e| e.to_string())?;
201
202            file.set_len(size as u64).map_err(|e| e.to_string())?;
203
204            let mut mmap = unsafe { MmapMut::map_mut(&file).map_err(|e| e.to_string())? };
205            let base_ptr = mmap.as_mut_ptr();
206
207            let init_guard = unsafe { &*(base_ptr as *const AtomicU32) };
208            let matrix_ptr = unsafe { base_ptr.add(16) as *mut AtomicMatrix };
209            let mut current_offset: u32 = 0;
210
211            if
212                init_guard
213                    .compare_exchange(
214                        helpers::SYS_UNINITIALIZED,
215                        helpers::SYS_FORMATTING,
216                        Ordering::SeqCst,
217                        Ordering::Relaxed
218                    )
219                    .is_ok()
220            {
221                let matrix = AtomicMatrix::init(matrix_ptr, path_id, size as u32);
222
223                let matrix_size = std::mem::size_of::<AtomicMatrix>();
224                let remaining_size = size - matrix_size;
225                current_offset = (16 + (matrix_size as u32)+ 15) & !15;
226
227                let (fl, sl) = helpers::Mapping::find_indices(remaining_size as u32);
228
229                let header: &mut BlockHeader;
230                unsafe {
231                    header = &mut *(base_ptr.add(current_offset as usize) as *mut BlockHeader);
232                }
233
234                header.size.store(remaining_size as u32, Ordering::Release);
235                header.state.store(helpers::STATE_FREE, Ordering::Release);
236                header.prev_phys.store(0, Ordering::Release);
237                header.next_free.store(0, Ordering::Release);
238
239                matrix.insert_free_block(base_ptr, current_offset, fl, sl);
240                matrix.sector_boundaries[0].store(size as u32, Ordering::Release);
241
242                init_guard.store(helpers::SYS_READY, Ordering::SeqCst);
243            } else {
244                while init_guard.load(Ordering::Acquire) != helpers::SYS_READY {
245                    std::hint::spin_loop();
246                }
247            }
248
249            Ok(crate::handlers::MatrixHandler::new(
250                unsafe { &mut *matrix_ptr }, 
251                mmap, 
252                current_offset
253            ))
254        }
255
256        /// Allocates a block in the matrix for the caller
257        ///
258        /// It acts as a greed allocator, ensuring each call will either get a block allocated
259        /// in the matrix, or it throws a OOM Contention flag. It achieves this by politely
260        /// trying to claim a block for itself. In case the CAS loop fails, it will simply jump
261        /// to the next free block on the chain, granting a lock-free allocation paradigm.
262        ///
263        /// Each allocation is allowed to retry itself 512 times to confirm the matrix is
264        /// indeed out of memory before killing the execution of the function.
265        ///
266        /// ### Params:
267        /// @base_ptr: The starting offset of the SHM mapping. \
268        /// @size: The allocation size of the block
269        ///
270        /// ### Returns:
271        /// Either the relative pointer to the allocated block, or the OOM Contention flag.
272        pub fn allocate(&self, base_ptr: *const u8, size: u32) -> Result<RelativePtr<u8>, String> {
273            let size = (size + 15) & !15;
274            let size = size.max(32);
275            let (fl, sl) = helpers::Mapping::find_indices(size);
276
277            for _ in 0..512 {
278                if let Some((f_fl, f_sl)) = self.find_suitable_block(fl, sl) {
279                    if let Ok(offset) = self.remove_free_block(base_ptr, f_fl, f_sl) {
280                        unsafe {
281                            let header = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
282                            let total_size = header.size.load(Ordering::Acquire);
283
284                            if total_size >= size + 32 + 16 {
285                                let rem_size = total_size - size;
286                                let next_off = offset + size;
287                                let next_h = &mut *(
288                                    base_ptr.add(next_off as usize) as *mut BlockHeader
289                                );
290
291                                next_h.size.store(rem_size, Ordering::Release);
292                                next_h.state.store(helpers::STATE_FREE, Ordering::Release);
293                                next_h.prev_phys.store(offset, Ordering::Release);
294                                next_h.next_free.store(0, Ordering::Release);
295
296                                header.size.store(size, Ordering::Release);
297                                fence(Ordering::SeqCst);
298
299                                let (r_fl, r_sl) = helpers::Mapping::find_indices(rem_size);
300                                self.insert_free_block(base_ptr, next_off, r_fl, r_sl);
301                            }
302                            header.state.store(helpers::STATE_ALLOCATED, Ordering::Release);
303                            return Ok(RelativePtr::new(offset + 32));
304                        }
305                    }
306                }
307                std::hint::spin_loop();
308            }
309            Err("OOM: Contention".into())
310        }
311
312        /// Acknowledges the freedon of a block and pushes it to the to_be_freed queue.
313        ///
314        /// If the to_be_freed queue is full, it will imediatelly trigger the drainage
315        /// of the queue and coalesce every block present before trying to push the
316        /// newly ack block into the queue. If there is space available, simply push
317        /// it and move on
318        ///
319        /// ### Params:
320        /// @ptr: The relative pointer of the block to acknowledge \
321        /// @base_ptr: The offset from the start of the SHM segment.
322        pub fn ack(&self, ptr: &RelativePtr<BlockHeader>, base_ptr: *const u8) {
323            unsafe {
324                let header = ptr.resolve_mut(base_ptr);
325
326                header.state.store(helpers::STATE_ACKED, Ordering::Release);
327            }
328
329            self.coalesce(ptr, base_ptr);
330        }
331
332        /// Tries to merge neighbouring blocks to the left until the end of the matrix is
333        /// reached or the neighbour block is not ACKED/FREE.
334        ///
335        /// This is the elegant implementation of the Kinetic Coalescence processes. It
336        /// receives the initial block that will start the ripple, and traverse the matrix
337        /// to the left (monotonicity guard). If any race conditions are met in the middle
338        /// (another coalescing just start, or a module just claimed this block), it will
339        /// stop the coalescing and move on (permissive healing).
340        ///
341        /// Then it tries to update the next neighbour previous physical offset metadata to
342        /// the start of the new free block. If this exchange fails due to end of sector, or
343        /// just claimed blocks, it will skip this marking in hopes that when this block is
344        /// eventually coalesced, it will passivelly merge backwards with the ripple and fix
345        /// the marking on its header by himself (horizon boundary).
346        ///
347        /// This three core implementations together composes the Propagation Principle of
348        /// Atomic Coalescence and enables the matrix to have such high throughput speeds.
349        ///
350        /// ### Params:
351        /// @ptr: The relative pointer of the block to coalesce. \
352        /// @base_ptr: The offset from the start of the SHM segment.
353        ///
354        /// ### Throws:
355        /// TidalRippleContentionError: Two coalescing ripples executing simultaneously on
356        /// the same blocks.
357        pub fn coalesce(&self, ptr: &RelativePtr<BlockHeader>, base_ptr: *const u8) {
358            unsafe {
359                let current_offset = ptr.offset();
360                let mut current_header = ptr.resolve_mut(base_ptr);
361                let mut total_size = current_header.size.load(Ordering::Acquire);
362                if total_size < 32 {
363                    return;
364                }
365
366                let mut final_offset = current_offset;
367
368                while current_offset > 16 {
369                    let prev_phys_offset = current_header.prev_phys.load(Ordering::Acquire);
370
371                    if prev_phys_offset == 0 {
372                        break;
373                    }
374
375                    let prev_header_ptr = base_ptr.add(
376                        prev_phys_offset as usize
377                    ) as *mut BlockHeader;
378                    let prev_header = &mut *prev_header_ptr;
379
380                    let res = prev_header.state.compare_exchange(
381                        helpers::STATE_FREE,
382                        helpers::STATE_COALESCING,
383                        Ordering::Acquire,
384                        Ordering::Relaxed
385                    );
386
387                    let claimed = if res.is_ok() {
388                        true
389                    } else {
390                        prev_header.state
391                            .compare_exchange(
392                                helpers::STATE_ACKED,
393                                helpers::STATE_COALESCING,
394                                Ordering::Acquire,
395                                Ordering::Relaxed
396                            )
397                            .is_ok()
398                    };
399
400                    if claimed {
401                        let size_to_add = prev_header.size.swap(0, Ordering::Acquire);
402                        if size_to_add == 0 || prev_phys_offset >= current_offset {
403                            break;
404                        }
405
406                        if size_to_add > self.total_size {
407                            break;
408                        }
409
410                        let (fl, sl) = helpers::Mapping::find_indices(size_to_add);
411
412                        total_size = total_size
413                            .checked_add(size_to_add)
414                            .expect("TidalRippleCoalescingError.");
415                        final_offset = prev_phys_offset;
416                        current_header = prev_header;
417                        self.remove_free_block(base_ptr, fl, sl).ok();
418                    } else {
419                        break;
420                    }
421                }
422
423                let sector_limit = self.sector_end_offset(final_offset);
424                if let Some(next_h_offset) = final_offset.checked_add(total_size) {
425                    if next_h_offset < sector_limit {
426                        let next_h = &*(base_ptr.add(next_h_offset as usize) as *const BlockHeader);
427                        next_h.prev_phys.store(final_offset, Ordering::Release);
428                    }
429                }
430
431                current_header.size.store(total_size, Ordering::Release);
432                current_header.state.store(helpers::STATE_FREE, Ordering::Release);
433
434                let (fl, sl) = helpers::Mapping::find_indices(total_size);
435                self.insert_free_block(base_ptr, final_offset, fl, sl);
436            }
437        }
438
439        /// Queries a block offset inside of the matrix.
440        ///
441        /// Not much to say about this, the name is pretty self explanatory.
442        ///
443        /// ### Params:
444        /// @offset: The offset of the block to be queried
445        ///
446        /// ### Returns:
447        /// The Relative Pointer to the queried block
448        pub fn query(&self, offset: u32) -> RelativePtr<u8> {
449            RelativePtr::new(offset + 32)
450        }
451
452        /// Queries the TLSF bitmaps in search of a block.
453        ///
454        /// It acquires the first most suitable index flag (according to the find
455        /// _indices function) and does a bitwise operation to check if it possesses an
456        /// available block. If it matches, return the coordinates of the FL and the
457        /// CTZ result from the SL. If it doesn't match, performs CTZ on the first level
458        /// to return the first available coordinate.
459        ///
460        /// ### Params:
461        /// @fl: Calculated first level coordinate \
462        /// @sl: Calculated second level coordinate
463        ///
464        /// ### Returns:
465        /// A tuple containing the FL/SL coordinates or nothing if there is no space
466        /// available in the matrix.
467        pub fn find_suitable_block(&self, fl: u32, sl: u32) -> Option<(u32, u32)> {
468            let sl_map = self.sl_bitmaps[fl as usize].load(Ordering::Acquire);
469            let m_sl = sl_map & (!0u32 << sl);
470            if m_sl != 0 {
471                return Some((fl, m_sl.trailing_zeros()));
472            }
473
474            let fl_map = self.fl_bitmap.load(Ordering::Acquire);
475            let m_fl = fl_map & (!0u32 << (fl + 1));
476            if m_fl != 0 {
477                let f_fl = m_fl.trailing_zeros();
478                if f_fl < 32 {
479                    let s_map = self.sl_bitmaps[f_fl as usize].load(Ordering::Acquire);
480                    if s_map != 0 {
481                        return Some((f_fl, s_map.trailing_zeros()));
482                    }
483                }
484            }
485            None
486        }
487
488        /// Pops a free block from the TLSF bitmap.
489        ///
490        /// It tries atomically claims ownership over the header inside the map. If
491        /// successful, swap the current head to next free head in the chain, or 0 if
492        /// there is none. If it fails, it automatically assumes someone claimed the
493        /// buffer first and calls a hint::spin loop instruction to retry claiming a
494        /// head. If, in one of the interactions, the bucket returs 0, it breaks the
495        /// function with an error.
496        ///
497        /// ### Params:
498        /// @base_ptr: The offset from the start of the SHM segment \
499        /// @fl: First level coordinates of the bucket \
500        /// @sl: Second level coordinates of the head.
501        ///
502        /// ### Returns
503        /// A result containing either the head of the newly acquired block, or an
504        /// EmptyBitmapError
505        pub fn remove_free_block(&self, base_ptr: *const u8, fl: u32, sl: u32) -> Result<u32, String> {
506            let head = &self.matrix[fl as usize][sl as usize];
507            loop {
508                let off = head.load(Ordering::Acquire);
509                if off == 0 {
510                    return Err("EmptyBitmapError".into());
511                }
512                let next = unsafe {
513                    (*(base_ptr.add(off as usize) as *const BlockHeader)).next_free.load(
514                        Ordering::Acquire
515                    )
516                };
517                if head.compare_exchange(off, next, Ordering::AcqRel, Ordering::Relaxed).is_ok() {
518                    if next == 0 {
519                        self.sl_bitmaps[fl as usize].fetch_and(!(1 << sl), Ordering::Release);
520                        if self.sl_bitmaps[fl as usize].load(Ordering::Acquire) == 0 {
521                            self.fl_bitmap.fetch_and(!(1 << fl), Ordering::Release);
522                        }
523                    }
524                    return Ok(off);
525                }
526                std::hint::spin_loop();
527            }
528        }
529
530        /// Stores a new header inside a bucket
531        ///
532        /// It does the exact oposite of the remove_free_block basically.
533        ///
534        /// ### Params:
535        /// @base_ptr: The offset from the beginning of the SHM segment \
536        /// @offset: The header offset to be inserted into the bucket \
537        /// @fl: The first level insertion coordinates \
538        /// @sl: The second level insertion coordinates
539        pub fn insert_free_block(&self, base_ptr: *const u8, offset: u32, fl: u32, sl: u32) {
540            let head = &self.matrix[fl as usize][sl as usize];
541            unsafe {
542                let h = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
543                loop {
544                    let old = head.load(Ordering::Acquire);
545                    h.next_free.store(old, Ordering::Release);
546                    if
547                        head
548                            .compare_exchange(old, offset, Ordering::AcqRel, Ordering::Relaxed)
549                            .is_ok()
550                    {
551                        break;
552                    }
553                    std::hint::spin_loop();
554                }
555            }
556            self.fl_bitmap.fetch_or(1 << fl, Ordering::Release);
557            self.sl_bitmaps[fl as usize].fetch_or(1 << sl, Ordering::Release);
558        }
559
560        /// Returns the boundary of the current sector
561        ///
562        /// It queries the boundaries from the metadata and check wheter the block fits or
563        /// not inside this sector.
564        ///
565        /// ### Params:
566        /// @current_offset: The offset to check against the sector.
567        ///
568        /// ### Returns:
569        /// Either the boundary value of the current sector, or the end of the segment.
570        pub fn sector_end_offset(&self, current_offset: u32) -> u32 {
571            for i in 0..4 {
572                let boundary = self.sector_boundaries[i].load(Ordering::Acquire);
573                if boundary == 0 {
574                    break;
575                }
576                if current_offset < boundary {
577                    return boundary;
578                }
579            }
580
581            self.mmap.len() as u32
582        }
583    }
584
585    impl<T> RelativePtr<T> {
586        /// Creates a new relative pointer based on the provided offset
587        ///
588        /// This initializes the pointer with the PhantomData ownership over the type we
589        /// are passing the the parameter
590        ///
591        /// ### Params:
592        /// @offset: The offset value to be wrapped in the pointer.
593        ///
594        /// ### Returns:
595        /// A instance of Self.
596        pub fn new(offset: u32) -> Self {
597            Self { offset, _marker: PhantomData }
598        }
599
600        /// Returns the offset value in the pointer
601        pub fn offset(&self) -> u32 {
602            self.offset
603        }
604
605        /// Resolves the header based on the base_ptr of the current caller process.
606        ///
607        /// This ensures that the pointer returned is actually mapped to the process local
608        /// memory scope
609        ///
610        /// ### Params:
611        /// @base_ptr: The offset from the start of the SHM segment.
612        ///
613        /// ### Returns:
614        /// A life time speficied reference to the header of this block
615        pub unsafe fn resolve_header<'a>(&self, base_ptr: *const u8) -> &'a BlockHeader {
616            unsafe { &*(base_ptr.add((self.offset as usize) - 32) as *mut BlockHeader) }
617        }
618
619        /// Resolves the header based on the base_ptr of the current caller process.
620        ///
621        /// This ensures that the pointer returned is actually mapped to the process local
622        /// memory scope
623        ///
624        /// ### Params:
625        /// @base_ptr: The offset from the start of the SHM segment.
626        ///
627        /// ### Returns:
628        /// A life time speficied mutable reference to the header of this block
629        pub unsafe fn resolve_header_mut<'a>(&self, base_ptr: *const u8) -> &'a mut BlockHeader {
630            unsafe { &mut *(base_ptr.add((self.offset as usize) - 32) as *mut BlockHeader) }
631        }
632
633        /// Resolves the block scope based on the base_ptr of the current caller process.
634        ///
635        /// This ensures that the pointer returned is actually mapped to the process local
636        /// memory scope
637        ///
638        /// ### Params:
639        /// @base_ptr: The offset from the start of the SHM segment.
640        ///
641        /// ### Returns:
642        /// A life time specified reference to the block scope.
643        pub unsafe fn resolve<'a>(&self, base_ptr: *const u8) -> &'a T {
644            unsafe { &*(base_ptr.add(self.offset as usize) as *mut T) }
645        }
646
647        /// Resolves the block scope based on the base_ptr of the current caller process.
648        ///
649        /// This ensures that the pointer returned is actually mapped to the process local
650        /// memory scope
651        ///
652        /// ### Params:
653        /// @base_ptr: The offset from the start of the SHM segment.
654        ///
655        /// ### Returns:
656        /// A life time specified mutable reference to the block scope.
657        pub unsafe fn resolve_mut<'a>(&self, base_ptr: *const u8) -> &'a mut T {
658            unsafe { &mut *(base_ptr.add(self.offset as usize) as *mut T) }
659        }
660
661        pub unsafe fn write(&self, base_ptr: *const u8, value: T) {
662            unsafe { std::ptr::write(self.resolve_mut(base_ptr), value) }
663        }
664    }
665
666    
667}
668
669#[cfg(test)]
670mod tests {
671    use crate::matrix::core::{ BlockHeader, RelativePtr };
672    use crate::handlers::HandlerFunctions;
673
674    use super::*;
675
676    /// Test if the mapping function can return the correct indexes.
677    #[test]
678    fn test_mapping() {
679        assert_eq!(helpers::Mapping::find_indices(16), (0, 1));
680        assert_eq!(helpers::Mapping::find_indices(64), (0, 4));
681        assert_eq!(helpers::Mapping::find_indices(128), (7, 0));
682
683        let (fl, sl) = helpers::Mapping::find_indices(1024);
684        assert_eq!(fl, 10);
685        assert_eq!(sl, 0);
686    }
687
688    /// Test if the bootstrap function actually initializes the matrix, and allocates the
689    /// blocks on the correct bitmaps.
690    #[test]
691    fn test_initial_bootstrap() {
692        let size = 1024 * 1024;
693        let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
694
695        let bitmap = handler.matrix().fl_bitmap.load(Ordering::Acquire);
696
697        assert!(bitmap != 0, "FL bitmap should not be zero after bootstrap");
698    }
699
700    /// Test allocation and sppliting logic by comparing the size of our buffer.
701    #[test]
702    fn test_allocation_and_spliting() {
703        let size = 1024 * 1024;
704        let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
705        let base_ptr = handler.base_ptr();
706        let matrix = handler.matrix();
707
708        unsafe {
709            let rel_ptr = matrix.allocate(base_ptr, 64).unwrap();
710
711            let header = rel_ptr.resolve_header(base_ptr);
712
713            assert_eq!(header.size.load(Ordering::Acquire), 64);
714            assert_eq!(header.state.load(Ordering::Acquire), helpers::STATE_ALLOCATED);
715        }
716    }
717
718    /// Test coalesce logic to see if blocks will merge correctly.
719    #[test]
720    fn test_ack_and_coalesce() {
721        let size = 1024 * 1024;
722        let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
723        let base_ptr = handler.base_ptr();
724        let matrix = handler.matrix();
725
726        unsafe {
727            let ptr_a = matrix.allocate(base_ptr, 64).unwrap();
728            let ptr_b = matrix.allocate(base_ptr, 64).unwrap();
729            let ptr_c = matrix.allocate(base_ptr, 64).unwrap();
730            let ptr_d = matrix.allocate(base_ptr, 64).unwrap();
731            let ptr_e = matrix.allocate(base_ptr, 64).unwrap();
732
733            let h_b = ptr_b.resolve_header(base_ptr);
734            let rel_c = RelativePtr::<BlockHeader>::new(ptr_c.offset() - 32);
735            let rel_d = RelativePtr::<BlockHeader>::new(ptr_d.offset() - 32);
736
737            h_b.state.store(helpers::STATE_FREE, Ordering::Release);
738            matrix.ack(&rel_c, base_ptr);
739            matrix.ack(&rel_d, base_ptr);
740
741            matrix.coalesce(&rel_d, base_ptr);
742
743            let h_a = ptr_a.resolve_header(base_ptr);
744            assert_eq!(h_a.state.load(Ordering::Acquire), helpers::STATE_ALLOCATED);
745
746            let h_merged = ptr_b.resolve_header(base_ptr);
747            assert_eq!(h_merged.state.load(Ordering::Acquire), helpers::STATE_FREE);
748            assert_eq!(h_merged.size.load(Ordering::Acquire), 256);
749
750            let h_e = ptr_e.resolve_header(base_ptr);
751            assert_eq!(h_e.state.load(Ordering::Acquire), helpers::STATE_ALLOCATED);
752        }
753    }
754
755    /// ----------------------------------------------------------------------------
756    /// STRESS TESTS
757    ///
758    /// These are all ignored from the correctness suite so github doesn't get mad at
759    /// me. Before shipping, running these are explicitly required.
760    /// ----------------------------------------------------------------------------
761
762    /// Run 8.000.000 allocations in parallel (1.000.000 each) to test if the matrix
763    /// can hold without race conditions.
764    #[test]
765    #[ignore]
766    fn test_multithreaded_stress() {
767        // Quick author note:
768        //
769        // May God help us all at this moment.
770
771        use std::sync::{ Arc, Barrier };
772        use std::thread;
773        use std::collections::HashSet;
774
775        // We use 500MB matrix to allocate all the buffers
776        let size = 50 * 1024 * 1024;
777        let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
778
779        let thread_count = 8;
780        let allocs_per_second = 100_000;
781        let barrier = Arc::new(Barrier::new(thread_count));
782
783        // Track the failed allocs
784        let fail_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
785
786        let mut handles = vec![];
787
788        // Fuck the matrix! GO GO GO
789        for _ in 0..thread_count {
790            let b = Arc::clone(&barrier);
791            let base_addr = handler.base_ptr() as usize;
792            let matrix_addr = handler.matrix() as *const core::AtomicMatrix as usize;
793            let fail_count_clone = Arc::clone(&fail_count);
794
795            handles.push(
796                thread::spawn(move || {
797                    let base_ptr = base_addr as *mut u8;
798                    let matrix = unsafe { &*(matrix_addr as *const core::AtomicMatrix) };
799                    let mut my_offsets = Vec::new();
800
801                    b.wait();
802
803                    for _ in 0..allocs_per_second {
804                        for _ in 0..10 {
805                            if let Ok(rel_ptr) = matrix.allocate(base_ptr, 64) {
806                                my_offsets.push(rel_ptr.offset());
807                                break;
808                            }
809                            fail_count_clone.fetch_add(1, Ordering::Relaxed);
810                            std::hint::spin_loop();
811                        }
812                    }
813
814                    my_offsets
815                })
816            );
817        }
818
819        // Collect everything we did and check
820        let mut all_offsets = Vec::new();
821        for h in handles {
822            all_offsets.extend(h.join().unwrap());
823        }
824
825        let total_obtained = all_offsets.len();
826        let unique_offsets: HashSet<_> = all_offsets.into_iter().collect();
827
828        // We allow for a 0.5% failure marging, as this stress test does not account for deallocations.
829        let success_percentage = ((thread_count * allocs_per_second) as f64) * 0.995;
830
831        // Assert we can obtain at least 99.5% of the expected allocations without collisions, which would
832        // indicate a potential race condition.
833        assert!(
834            total_obtained >= (success_percentage as usize),
835            "Total allocations should match expected count"
836        );
837        assert_eq!(
838            unique_offsets.len(),
839            total_obtained,
840            "RACE CONDITION DETECTED: Duplicate offsets found"
841        );
842        println!(
843            "Successfully allocated {} unique blocks across {} threads without collisions! {} allocations failed",
844            total_obtained,
845            thread_count,
846            fail_count.load(Ordering::Relaxed)
847        );
848    }
849
850    /// Test if the matrix can hold 10 minutes of 8 threads executing random alloc
851    /// and dealloc operations to ensure the Propagation Principle of Atomic
852    /// Coalescence works.
853    #[test]
854    #[ignore]
855    fn test_long_term_fragmentation_healing() {
856        // Quick author note:
857        //
858        // May god help your device at this moment
859
860        use std::sync::{ Arc, Barrier };
861        use std::thread;
862        use std::time::{ Instant, Duration };
863
864        const DURATION: u32 = 600;
865        const THREADS: u32 = 8;
866
867        let size = 50 * 1024 * 1024;
868        let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
869        let handler_arc = Arc::new(handler);
870        let barrier = Arc::new(Barrier::new(THREADS as usize));
871        let start_time = Instant::now();
872        let duration = Duration::from_secs(DURATION as u64);
873
874        let mut handles = vec![];
875
876        for t_id in 0..THREADS {
877            let h = Arc::clone(&handler_arc);
878            let b = Arc::clone(&barrier);
879
880            handles.push(
881                thread::spawn(move || {
882                    let base_ptr = h.base_ptr() as *mut u8;
883                    let matrix = &h.matrix();
884                    let mut my_blocks = Vec::new();
885                    let mut rng = t_id + 1;
886                    let mut total_ops = 0u64;
887
888                    b.wait();
889
890                    while start_time.elapsed() < duration {
891                        rng = rng.wrapping_mul(1103515245).wrapping_add(12345);
892
893                        if rng % 10 < 7 && my_blocks.len() < 200 {
894                            let alloc_size = (rng % 512) + 32;
895                            if let Ok(ptr) = matrix.allocate(base_ptr, alloc_size as u32) {
896                                my_blocks.push(ptr);
897                            }
898                        } else if !my_blocks.is_empty() {
899                            let idx = (rng as usize) % my_blocks.len();
900                            let ptr = my_blocks.swap_remove(idx);
901
902                            let header_ptr = RelativePtr::<BlockHeader>::new(ptr.offset() - 32);
903                            matrix.ack(&header_ptr, base_ptr);
904
905                            if total_ops % 5 == 0 {
906                                matrix.coalesce(&header_ptr, base_ptr);
907                            }
908                        }
909                        total_ops += 1;
910                    }
911                    (my_blocks, total_ops)
912                })
913            );
914        }
915
916        let mut total_work = 0u64;
917        for h in handles {
918            let (remaining, thread_ops) = h.join().unwrap();
919            total_work += thread_ops;
920            let base_ptr = handler_arc.base_ptr() as *mut u8;
921
922            for ptr in remaining {
923                let header_ptr = RelativePtr::<BlockHeader>::new(ptr.offset() - 32);
924                handler_arc.matrix().ack(&header_ptr, base_ptr);
925                handler_arc.matrix().coalesce(&header_ptr, base_ptr);
926            }
927        }
928
929        let mut free_count = 0;
930        for fl in 0..32 {
931            for sl in 0..8 {
932                if handler_arc.matrix().matrix[fl][sl].load(Ordering::Acquire) != 0 {
933                    free_count += 1;
934                }
935            }
936        }
937
938        let entrophy_percentage = ((free_count as f64) / (total_work as f64)) * 100.0;
939        let mops = (total_work as f64) / (DURATION as f64) / 1_000_000.0;
940
941        println!("Endurance Results (Duration of {} seconds)", DURATION);
942        println!("Num of threads: {}", THREADS);
943        println!("Total operations: {}", total_work);
944        println!("Throughput: {:.2} Mop/s", (total_work as f64) / (DURATION as f64) / 1_000_000.0);
945        println!("Final free fragments: {}", free_count);
946        println!("Entrophy percentage: {}%", entrophy_percentage);
947
948        assert!(entrophy_percentage < 0.001, "Excessive Fragmentation");
949        assert!(mops > 1.0, "Throughput regression: {:.2} Mop/s", mops);
950    }
951}