atomic_matrix/matrix.rs
1//! # Atomic Matrix Core
2//!
3//! This module implements a high-velocity, lock-free memory arena designed for
4//! ultra-low latency IPC (Inter-Process Communication).
5//!
6//! # Theory of Operation: The Propagation Principle of Atomic Coalescence
7//! Unlike traditional allocators that use centralized mutexes or complex
8//! background garbage collection, the **AtomicMatrix** treats memory
9//! fragmentation as a fluid dynamics problem.
10//!
11//! 1. **Kinetic Healing:** Freeing a block triggers a "Ripple" ('coalesce')
12//! that propagates through the sector.
13//! 2. **Monotonicity:** Ripples only move backward (towards the sector origin)
14//! to prevent circular atomic dependencies and deadlocks.
15//! 3. **Permissive Concurrency:** If a thread encounters contention, it skips the
16//! block rather than blocking, relying on the high frequency of future operations
17//! to complete the healing.
18//!
19//! # Memory Topography
20//! The matrix is laid out linearly in a shared memory segment:
21//! ```text
22//! [ Init Guard (16b) ] [ AtomicMatrix Struct ] [ Padding ] [ Sector 0 ] [ Sector 1 ] ...
23//! ```
24//! Each **Sector** acts as a self-contained fault domain with its own boundary,
25//! preventing local fragmentation from "bleeding" into the entire matrix.
26//!
27//! # Safety & Atomicity
28//! All state transitions follow a strict 'STATE_FREE -> STATE_ALLOCATED ->
29//! STATE_ACKED -> STATE_COALESCING' lifecycle. Hardware-level memory fences
30//! (std::sync::atomic::fence) are utilized to ensure visibility across 16+ CPU
31//! cores without locking.
32
33use std::sync::atomic::{ AtomicU32, Ordering, fence };
34use std::marker::PhantomData;
35use std::fs::{ OpenOptions };
36use memmap2::MmapMut;
37use uuid::Uuid;
38
39pub mod helpers {
40 use super::*;
41
42 /// System initialization flags
43 pub const SYS_UNINITIALIZED: u32 = 0;
44 pub const SYS_FORMATTING: u32 = 1;
45 pub const SYS_READY: u32 = 2;
46
47 /// Header lifecycle flags
48 pub const STATE_FREE: u32 = 0;
49 pub const STATE_ALLOCATED: u32 = 1;
50 pub const STATE_ACKED: u32 = 2;
51 pub const STATE_COALESCING: u32 = 3;
52
53 pub const HEADER_SPACE: u32 = std::mem::size_of::<core::BlockHeader>() as u32 + 16;
54
55 /// A helper struct that provided the O(1) calculations to find the coordinates of
56 /// a block that suits exactly the requested buffer size, or the next available one
57 /// that can fit the message as well.
58 pub struct Mapping;
59
60 impl Mapping {
61 /// Maps a block size to its corresponding (First-Level, Second-Level) indices.
62 ///
63 /// This function implements a two-level mapping strategy used for O(1) free-block
64 /// lookup, optimized for both high-velocity small allocations and logarithmic
65 /// scaling of large blocks.
66 ///
67 /// ### Mapping Logic:
68 /// - **Linear (Small):** For sizes < 128, it uses a fixed FL (0) and 16-byte SL
69 /// subdivisions. This minimizes fragmentation for tiny objects.
70 /// - **Logarithmic (Large):** For sizes >= 128, FL is the power of 2 (determined via
71 /// `leading_zeros`), and SL is a 3-bit subdivider of the range between 2^n and 2^(n+1).
72 ///
73 /// ### Mathematical Transformation:
74 /// - `FL = log2(size)`
75 /// - `SL = (size - 2^FL) / (2^(FL - 3))`
76 ///
77 /// ### Bounds:
78 /// Indices are clamped to `(31, 7)` to prevent overflow in the matrix bitmask.
79 ///
80 /// # Arguments
81 /// * `size` - The total byte size of the memory block.
82 ///
83 /// # Returns
84 /// A tuple of `(fl, sl)` indices.
85 pub fn find_indices(size: u32) -> (u32, u32) {
86 if size < 128 {
87 (0, (size / 16).min(7))
88 } else {
89 let fl = 31 - size.leading_zeros();
90 let sl = ((size >> (fl - 3)) & 0x7).min(7);
91 (fl.min(31), sl)
92 }
93 }
94 }
95}
96
97pub mod core {
98 use crate::matrix::helpers::HEADER_SPACE;
99
100 use super::*;
101
102 /// Header structure that is written at the beginning of each block/sector
103 ///
104 /// The block is made entirely of atomic primitives to ensure safe reading
105 /// and manipulation across participant modules in the matrix.
106 #[derive(Debug)]
107 #[repr(C, align(16))]
108 pub struct BlockHeader {
109 pub size: AtomicU32,
110 pub state: AtomicU32,
111 pub prev_phys: AtomicU32,
112 pub next_free: AtomicU32,
113 pub prev_free: AtomicU32,
114 }
115
116 /// The structural core of the matrix.
117 ///
118 /// Its the non-blocking, SHM-backed memory arena, utilizing a segmented **TLSF
119 /// (Two-Level segregated fit)** inspired mapping for O(1) allocation, paired with
120 /// a custom **Kinetic Coalescing** logic.
121 ///
122 /// # Memory Layout
123 /// The matrix is designed to be mapped directly into '/dev/shm". It starts with
124 /// a 16-byte 'init_guard' followed by the struct itself, and then the sectorized
125 /// raw memory blocks.
126 #[repr(C)]
127 pub struct AtomicMatrix {
128 pub id: Uuid,
129 pub fl_bitmap: AtomicU32,
130 pub sl_bitmaps: [AtomicU32; 32],
131 pub matrix: [[AtomicU32; 8]; 32],
132 pub mmap: MmapMut,
133 pub sector_boundaries: [AtomicU32; 4],
134 pub total_size: u32,
135 }
136
137 /// A Relative Pointer to the block memory address, relative to the start of the
138 /// matrix inside the process memory scope.
139 ///
140 /// This RelativePointer is used to calculate the accurate address of the block its
141 /// related to. Providing a way for independent process to localize the data inside
142 /// their own mappings of the SHM segment.
143 ///
144 /// It also receives a PhantomData to inform the compiler we safely own whatever
145 /// generic type the caller has passed to this pointer.
146 #[derive(Debug)]
147 pub struct RelativePtr<T> {
148 offset: u32,
149 _marker: PhantomData<T>,
150 }
151
152
153
154 impl AtomicMatrix {
155 /// Initialized the matrix struct and returns it.
156 ///
157 /// This function will initialize both TLSF level flags, the matrix map for free
158 /// blocks, assign all the require metadata and return the ready to use object
159 ///
160 /// ### Params
161 /// @ptr: The pointer to the beginning of the matrix segment \
162 /// @id: The ID of this matrix instance \
163 /// @size: The total size of the SHM allocation
164 ///
165 /// ### Returns
166 /// A static, lifetime specified, reference to the matrix struct.
167 fn init(ptr: *mut AtomicMatrix, id: Uuid, size: u32) -> &'static mut Self {
168 unsafe {
169 let matrix = &mut *ptr;
170 matrix.fl_bitmap.store(0, Ordering::Release);
171 for i in 0..32 {
172 matrix.sl_bitmaps[i].store(0, Ordering::Release);
173 for j in 0..8 {
174 matrix.matrix[i][j].store(0, Ordering::Release);
175 }
176 }
177 matrix.id = id;
178 matrix.total_size = size;
179 matrix
180 }
181 }
182
183 /// The entry point of the matrix struct.
184 ///
185 /// It initializes SHM segment, bind to it, executes the initial formatting,
186 /// prepares both the matrix and handler structs and return the High-Level API
187 /// to the caller.
188 ///
189 /// ### Params:
190 /// @id: The ID of a new or existing matrix. If one is provided, will skip
191 /// formatting and just bind to it \
192 /// @size: The SHM allocation size
193 ///
194 /// ### Returns
195 /// The matrix handler api, or an error to be handled
196 pub fn bootstrap(
197 id: Option<Uuid>,
198 size: usize
199 ) -> Result<crate::handlers::MatrixHandler, String> {
200 let path_id = id.unwrap_or_else(Uuid::new_v4);
201 let path = format!("/dev/shm/{}", path_id);
202 let file = OpenOptions::new()
203 .read(true)
204 .write(true)
205 .create(true)
206 .open(&path)
207 .map_err(|e| e.to_string())?;
208
209 file.set_len(size as u64).map_err(|e| e.to_string())?;
210
211 let mut mmap = unsafe { MmapMut::map_mut(&file).map_err(|e| e.to_string())? };
212 let base_ptr = mmap.as_mut_ptr();
213
214 let init_guard = unsafe { &*(base_ptr as *const AtomicU32) };
215 let matrix_ptr = unsafe { base_ptr.add(16) as *mut AtomicMatrix };
216 let mut current_offset: u32 = 0;
217
218 if
219 init_guard
220 .compare_exchange(
221 helpers::SYS_UNINITIALIZED,
222 helpers::SYS_FORMATTING,
223 Ordering::SeqCst,
224 Ordering::Relaxed
225 )
226 .is_ok()
227 {
228 let matrix = AtomicMatrix::init(matrix_ptr, path_id, size as u32);
229
230 let matrix_size = std::mem::size_of::<AtomicMatrix>();
231 current_offset = (16 + (matrix_size as u32)+ 15) & !15;
232 let remaining_size = size - (current_offset as usize);
233
234 let (fl, sl) = helpers::Mapping::find_indices(remaining_size as u32);
235
236 let header: &mut BlockHeader;
237 unsafe {
238 header = &mut *(base_ptr.add(current_offset as usize) as *mut BlockHeader);
239 }
240
241 header.size.store(remaining_size as u32, Ordering::Release);
242 header.state.store(helpers::STATE_FREE, Ordering::Release);
243 header.prev_phys.store(0, Ordering::Release);
244 header.next_free.store(0, Ordering::Release);
245
246 matrix.insert_free_block(base_ptr, current_offset, fl, sl);
247 matrix.sector_boundaries[0].store(size as u32, Ordering::Release);
248
249 init_guard.store(helpers::SYS_READY, Ordering::SeqCst);
250 } else {
251 while init_guard.load(Ordering::Acquire) != helpers::SYS_READY {
252 std::hint::spin_loop();
253 }
254 }
255
256 Ok(crate::handlers::MatrixHandler::new(
257 unsafe { &mut *matrix_ptr },
258 mmap,
259 current_offset
260 ))
261 }
262
263 /// Allocates a block in the matrix for the caller
264 ///
265 /// It acts as a greed allocator, ensuring each call will either get a block allocated
266 /// in the matrix, or it throws a OOM Contention flag. It achieves this by politely
267 /// trying to claim a block for itself. In case the CAS loop fails, it will simply jump
268 /// to the next free block on the chain, granting a lock-free allocation paradigm.
269 ///
270 /// Each allocation is allowed to retry itself 512 times to confirm the matrix is
271 /// indeed out of memory before killing the execution of the function.
272 ///
273 /// ### Params:
274 /// @base_ptr: The starting offset of the SHM mapping. \
275 /// @size: The allocation size of the block
276 ///
277 /// ### Returns:
278 /// Either the relative pointer to the allocated block, or the OOM Contention flag.
279 pub fn allocate(&self, base_ptr: *const u8, size: u32) -> Result<RelativePtr<u8>, String> {
280 let size = (size + 15) & !15;
281 let size = size.max(32);
282 let (fl, sl) = helpers::Mapping::find_indices(size);
283
284 for _ in 0..512 {
285 if let Some((f_fl, f_sl)) = self.find_suitable_block(fl, sl) {
286 if let Ok(offset) = self.remove_free_block(base_ptr, f_fl, f_sl) {
287 unsafe {
288 let header = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
289 let total_size = header.size.load(Ordering::Acquire);
290
291 if total_size >= size + helpers::HEADER_SPACE {
292 let rem_size = total_size - (size + helpers::HEADER_SPACE);
293 let next_off = offset + size + helpers::HEADER_SPACE;
294 let next_h = &mut *(
295 base_ptr.add(next_off as usize) as *mut BlockHeader
296 );
297
298 next_h.size.store(rem_size, Ordering::Release);
299 next_h.state.store(helpers::STATE_FREE, Ordering::Release);
300 next_h.prev_phys.store(offset, Ordering::Release);
301 next_h.next_free.store(0, Ordering::Release);
302
303 header.size.store(size + helpers::HEADER_SPACE, Ordering::Release);
304 fence(Ordering::SeqCst);
305
306 let (r_fl, r_sl) = helpers::Mapping::find_indices(rem_size);
307 self.insert_free_block(base_ptr, next_off, r_fl, r_sl);
308 }
309 header.state.store(helpers::STATE_ALLOCATED, Ordering::Release);
310 return Ok(RelativePtr::new(offset + helpers::HEADER_SPACE));
311 }
312 }
313 }
314 std::hint::spin_loop();
315 }
316 Err("OOM: Contention".into())
317 }
318
319 /// Acknowledges the freedon of a block and pushes it to the to_be_freed queue.
320 ///
321 /// If the to_be_freed queue is full, it will imediatelly trigger the drainage
322 /// of the queue and coalesce every block present before trying to push the
323 /// newly ack block into the queue. If there is space available, simply push
324 /// it and move on
325 ///
326 /// ### Params:
327 /// @ptr: The relative pointer of the block to acknowledge \
328 /// @base_ptr: The offset from the start of the SHM segment.
329 pub fn ack(&self, ptr: &RelativePtr<BlockHeader>, base_ptr: *const u8) {
330 unsafe {
331 let header = ptr.resolve_mut(base_ptr);
332
333 header.state.store(helpers::STATE_ACKED, Ordering::Release);
334 }
335
336 self.coalesce(ptr, base_ptr);
337 }
338
339 /// Tries to merge neighbouring blocks to the left until the end of the matrix is
340 /// reached or the neighbour block is not ACKED/FREE.
341 ///
342 /// This is the elegant implementation of the Kinetic Coalescence processes. It
343 /// receives the initial block that will start the ripple, and traverse the matrix
344 /// to the left (monotonicity guard). If any race conditions are met in the middle
345 /// (another coalescing just start, or a module just claimed this block), it will
346 /// stop the coalescing and move on (permissive healing).
347 ///
348 /// Then it tries to update the next neighbour previous physical offset metadata to
349 /// the start of the new free block. If this exchange fails due to end of sector, or
350 /// just claimed blocks, it will skip this marking in hopes that when this block is
351 /// eventually coalesced, it will passivelly merge backwards with the ripple and fix
352 /// the marking on its header by himself (horizon boundary).
353 ///
354 /// This three core implementations together composes the Propagation Principle of
355 /// Atomic Coalescence and enables the matrix to have such high throughput speeds.
356 ///
357 /// ### Params:
358 /// @ptr: The relative pointer of the block to coalesce. \
359 /// @base_ptr: The offset from the start of the SHM segment.
360 ///
361 /// ### Throws:
362 /// TidalRippleContentionError: Two coalescing ripples executing simultaneously on
363 /// the same blocks.
364 pub fn coalesce(&self, ptr: &RelativePtr<BlockHeader>, base_ptr: *const u8) {
365 unsafe {
366 let current_offset = ptr.offset();
367 let mut current_header = ptr.resolve_mut(base_ptr);
368 let mut total_size = current_header.size.load(Ordering::Acquire);
369 if total_size < 32 {
370 return;
371 }
372
373 let mut final_offset = current_offset;
374
375 while current_offset > 16 {
376 let prev_phys_offset = current_header.prev_phys.load(Ordering::Acquire);
377
378 if prev_phys_offset == 0 {
379 break;
380 }
381
382 let prev_header_ptr = base_ptr.add(
383 prev_phys_offset as usize
384 ) as *mut BlockHeader;
385 let prev_header = &mut *prev_header_ptr;
386
387 let res = prev_header.state.compare_exchange(
388 helpers::STATE_FREE,
389 helpers::STATE_COALESCING,
390 Ordering::Acquire,
391 Ordering::Relaxed
392 );
393
394 let claimed = if res.is_ok() {
395 true
396 } else {
397 prev_header.state
398 .compare_exchange(
399 helpers::STATE_ACKED,
400 helpers::STATE_COALESCING,
401 Ordering::Acquire,
402 Ordering::Relaxed
403 )
404 .is_ok()
405 };
406
407 if claimed {
408 let size_to_add = prev_header.size.swap(0, Ordering::Acquire);
409 if size_to_add == 0 || prev_phys_offset >= current_offset {
410 break;
411 }
412
413 if size_to_add > self.total_size {
414 break;
415 }
416
417 let (fl, sl) = helpers::Mapping::find_indices(size_to_add);
418
419 total_size = total_size
420 .checked_add(size_to_add)
421 .expect("TidalRippleCoalescingError.");
422 final_offset = prev_phys_offset;
423 current_header = prev_header;
424 self.remove_free_block(base_ptr, fl, sl).ok();
425 } else {
426 break;
427 }
428 }
429
430 let sector_limit = self.sector_end_offset(final_offset);
431 if let Some(next_h_offset) = final_offset.checked_add(total_size) {
432 if next_h_offset < sector_limit {
433 let next_h = &*(base_ptr.add(next_h_offset as usize) as *const BlockHeader);
434 next_h.prev_phys.store(final_offset, Ordering::Release);
435 }
436 }
437
438 current_header.size.store(total_size, Ordering::Release);
439 current_header.state.store(helpers::STATE_FREE, Ordering::Release);
440
441 let (fl, sl) = helpers::Mapping::find_indices(total_size);
442 self.insert_free_block(base_ptr, final_offset, fl, sl);
443 }
444 }
445
446 /// Queries a block offset inside of the matrix.
447 ///
448 /// Not much to say about this, the name is pretty self explanatory.
449 ///
450 /// ### Params:
451 /// @offset: The offset of the block to be queried
452 ///
453 /// ### Returns:
454 /// The Relative Pointer to the queried block
455 pub fn query(&self, offset: u32) -> RelativePtr<u8> {
456 RelativePtr::new(offset + helpers::HEADER_SPACE)
457 }
458
459 /// Queries the TLSF bitmaps in search of a block.
460 ///
461 /// It acquires the first most suitable index flag (according to the find
462 /// _indices function) and does a bitwise operation to check if it possesses an
463 /// available block. If it matches, return the coordinates of the FL and the
464 /// CTZ result from the SL. If it doesn't match, performs CTZ on the first level
465 /// to return the first available coordinate.
466 ///
467 /// ### Params:
468 /// @fl: Calculated first level coordinate \
469 /// @sl: Calculated second level coordinate
470 ///
471 /// ### Returns:
472 /// A tuple containing the FL/SL coordinates or nothing if there is no space
473 /// available in the matrix.
474 pub fn find_suitable_block(&self, fl: u32, sl: u32) -> Option<(u32, u32)> {
475 let sl_map = self.sl_bitmaps[fl as usize].load(Ordering::Acquire);
476 let m_sl = sl_map & (!0u32 << sl);
477 if m_sl != 0 {
478 return Some((fl, m_sl.trailing_zeros()));
479 }
480
481 let fl_map = self.fl_bitmap.load(Ordering::Acquire);
482 let m_fl = fl_map & (!0u32 << (fl + 1));
483 if m_fl != 0 {
484 let f_fl = m_fl.trailing_zeros();
485 if f_fl < 32 {
486 let s_map = self.sl_bitmaps[f_fl as usize].load(Ordering::Acquire);
487 if s_map != 0 {
488 return Some((f_fl, s_map.trailing_zeros()));
489 }
490 }
491 }
492 None
493 }
494
495 /// Pops a free block from the TLSF bitmap.
496 ///
497 /// It tries atomically claims ownership over the header inside the map. If
498 /// successful, swap the current head to next free head in the chain, or 0 if
499 /// there is none. If it fails, it automatically assumes someone claimed the
500 /// buffer first and calls a hint::spin loop instruction to retry claiming a
501 /// head. If, in one of the interactions, the bucket returs 0, it breaks the
502 /// function with an error.
503 ///
504 /// ### Params:
505 /// @base_ptr: The offset from the start of the SHM segment \
506 /// @fl: First level coordinates of the bucket \
507 /// @sl: Second level coordinates of the head.
508 ///
509 /// ### Returns
510 /// A result containing either the head of the newly acquired block, or an
511 /// EmptyBitmapError
512 pub fn remove_free_block(&self, base_ptr: *const u8, fl: u32, sl: u32) -> Result<u32, String> {
513 let head = &self.matrix[fl as usize][sl as usize];
514 loop {
515 let off = head.load(Ordering::Acquire);
516 if off == 0 {
517 return Err("EmptyBitmapError".into());
518 }
519 let next = unsafe {
520 (*(base_ptr.add(off as usize) as *const BlockHeader)).next_free.load(
521 Ordering::Acquire
522 )
523 };
524 if head.compare_exchange(off, next, Ordering::AcqRel, Ordering::Relaxed).is_ok() {
525 if next == 0 {
526 self.sl_bitmaps[fl as usize].fetch_and(!(1 << sl), Ordering::Release);
527 if self.sl_bitmaps[fl as usize].load(Ordering::Acquire) == 0 {
528 self.fl_bitmap.fetch_and(!(1 << fl), Ordering::Release);
529 }
530 }
531 return Ok(off);
532 }
533 std::hint::spin_loop();
534 }
535 }
536
537 /// Stores a new header inside a bucket
538 ///
539 /// It does the exact oposite of the remove_free_block basically.
540 ///
541 /// ### Params:
542 /// @base_ptr: The offset from the beginning of the SHM segment \
543 /// @offset: The header offset to be inserted into the bucket \
544 /// @fl: The first level insertion coordinates \
545 /// @sl: The second level insertion coordinates
546 pub fn insert_free_block(&self, base_ptr: *const u8, offset: u32, fl: u32, sl: u32) {
547 let head = &self.matrix[fl as usize][sl as usize];
548 unsafe {
549 let h = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
550 loop {
551 let old = head.load(Ordering::Acquire);
552 h.next_free.store(old, Ordering::Release);
553 if
554 head
555 .compare_exchange(old, offset, Ordering::AcqRel, Ordering::Relaxed)
556 .is_ok()
557 {
558 break;
559 }
560 std::hint::spin_loop();
561 }
562 }
563 self.fl_bitmap.fetch_or(1 << fl, Ordering::Release);
564 self.sl_bitmaps[fl as usize].fetch_or(1 << sl, Ordering::Release);
565 }
566
567 /// Returns the boundary of the current sector
568 ///
569 /// It queries the boundaries from the metadata and check wheter the block fits or
570 /// not inside this sector.
571 ///
572 /// ### Params:
573 /// @current_offset: The offset to check against the sector.
574 ///
575 /// ### Returns:
576 /// Either the boundary value of the current sector, or the end of the segment.
577 pub fn sector_end_offset(&self, current_offset: u32) -> u32 {
578 for i in 0..4 {
579 let boundary = self.sector_boundaries[i].load(Ordering::Acquire);
580 if boundary == 0 {
581 break;
582 }
583 if current_offset < boundary {
584 return boundary;
585 }
586 }
587
588 self.mmap.len() as u32
589 }
590 }
591
592 impl<T> RelativePtr<T> {
593 /// Creates a new relative pointer based on the provided offset
594 ///
595 /// This initializes the pointer with the PhantomData ownership over the type we
596 /// are passing the the parameter
597 ///
598 /// ### Params:
599 /// @offset: The offset value to be wrapped in the pointer.
600 ///
601 /// ### Returns:
602 /// A instance of Self.
603 pub fn new(offset: u32) -> Self {
604 Self { offset, _marker: PhantomData }
605 }
606
607 /// Returns the offset value in the pointer
608 pub fn offset(&self) -> u32 {
609 self.offset
610 }
611
612 /// Resolves the header based on the base_ptr of the current caller process.
613 ///
614 /// This ensures that the pointer returned is actually mapped to the process local
615 /// memory scope
616 ///
617 /// ### Params:
618 /// @base_ptr: The offset from the start of the SHM segment.
619 ///
620 /// ### Returns:
621 /// A life time speficied reference to the header of this block
622 pub unsafe fn resolve_header<'a>(&self, base_ptr: *const u8) -> &'a BlockHeader {
623 unsafe { &*(base_ptr.add((self.offset as usize) - helpers::HEADER_SPACE as usize) as *mut BlockHeader) }
624 }
625
626 /// Resolves the header based on the base_ptr of the current caller process.
627 ///
628 /// This ensures that the pointer returned is actually mapped to the process local
629 /// memory scope
630 ///
631 /// ### Params:
632 /// @base_ptr: The offset from the start of the SHM segment.
633 ///
634 /// ### Returns:
635 /// A life time speficied mutable reference to the header of this block
636 pub unsafe fn resolve_header_mut<'a>(&self, base_ptr: *const u8) -> &'a mut BlockHeader {
637 unsafe { &mut *(base_ptr.add((self.offset as usize) - helpers::HEADER_SPACE as usize) as *mut BlockHeader) }
638 }
639
640 /// Resolves the block scope based on the base_ptr of the current caller process.
641 ///
642 /// This ensures that the pointer returned is actually mapped to the process local
643 /// memory scope
644 ///
645 /// ### Params:
646 /// @base_ptr: The offset from the start of the SHM segment.
647 ///
648 /// ### Returns:
649 /// A life time specified reference to the block scope.
650 pub unsafe fn resolve<'a>(&self, base_ptr: *const u8) -> &'a T {
651 unsafe { &*(base_ptr.add(self.offset as usize) as *mut T) }
652 }
653
654 /// Resolves the block scope based on the base_ptr of the current caller process.
655 ///
656 /// This ensures that the pointer returned is actually mapped to the process local
657 /// memory scope
658 ///
659 /// ### Params:
660 /// @base_ptr: The offset from the start of the SHM segment.
661 ///
662 /// ### Returns:
663 /// A life time specified mutable reference to the block scope.
664 pub unsafe fn resolve_mut<'a>(&self, base_ptr: *const u8) -> &'a mut T {
665 unsafe { &mut *(base_ptr.add(self.offset as usize) as *mut T) }
666 }
667
668 pub unsafe fn write(&self, base_ptr: *const u8, value: T) {
669 unsafe { std::ptr::write(self.resolve_mut(base_ptr), value) }
670 }
671 }
672
673
674}
675
676#[cfg(test)]
677mod tests {
678 use crate::matrix::core::{ BlockHeader, RelativePtr };
679 use crate::handlers::HandlerFunctions;
680
681 use super::*;
682
683 /// Test if the mapping function can return the correct indexes.
684 #[test]
685 fn test_mapping() {
686 assert_eq!(helpers::Mapping::find_indices(16), (0, 1));
687 assert_eq!(helpers::Mapping::find_indices(64), (0, 4));
688 assert_eq!(helpers::Mapping::find_indices(128), (7, 0));
689
690 let (fl, sl) = helpers::Mapping::find_indices(1024);
691 assert_eq!(fl, 10);
692 assert_eq!(sl, 0);
693 }
694
695 /// Test if the bootstrap function actually initializes the matrix, and allocates the
696 /// blocks on the correct bitmaps.
697 #[test]
698 fn test_initial_bootstrap() {
699 let size = 1024 * 1024;
700 let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
701
702 let bitmap = handler.matrix().fl_bitmap.load(Ordering::Acquire);
703
704 assert!(bitmap != 0, "FL bitmap should not be zero after bootstrap");
705 }
706
707 /// Test allocation and sppliting logic by comparing the size of our buffer.
708 #[test]
709 fn test_allocation_and_spliting() {
710 let size = 1024 * 1024;
711 let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
712 let base_ptr = handler.base_ptr();
713 let matrix = handler.matrix();
714
715 unsafe {
716 let rel_ptr = matrix.allocate(base_ptr, 64).unwrap();
717
718 let header = rel_ptr.resolve_header(base_ptr);
719
720 assert_eq!(header.size.load(Ordering::Acquire), 64 + helpers::HEADER_SPACE);
721 assert_eq!(header.state.load(Ordering::Acquire), helpers::STATE_ALLOCATED);
722 }
723 }
724
725 /// Test coalesce logic to see if blocks will merge correctly.
726 #[test]
727 fn test_ack_and_coalesce() {
728 let size = 1024 * 1024;
729 let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
730 let base_ptr = handler.base_ptr();
731 let matrix = handler.matrix();
732
733 unsafe {
734 let ptr_a = matrix.allocate(base_ptr, 64).unwrap();
735 let ptr_b = matrix.allocate(base_ptr, 64).unwrap();
736 let ptr_c = matrix.allocate(base_ptr, 64).unwrap();
737 let ptr_d = matrix.allocate(base_ptr, 64).unwrap();
738 let ptr_e = matrix.allocate(base_ptr, 64).unwrap();
739
740 let h_b = ptr_b.resolve_header(base_ptr);
741 let rel_c = RelativePtr::<BlockHeader>::new(ptr_c.offset() - helpers::HEADER_SPACE);
742 let rel_d = RelativePtr::<BlockHeader>::new(ptr_d.offset() - helpers::HEADER_SPACE);
743
744 h_b.state.store(helpers::STATE_FREE, Ordering::Release);
745 matrix.ack(&rel_c, base_ptr);
746 matrix.ack(&rel_d, base_ptr);
747
748 matrix.coalesce(&rel_d, base_ptr);
749
750 let h_a = ptr_a.resolve_header(base_ptr);
751 assert_eq!(h_a.state.load(Ordering::Acquire), helpers::STATE_ALLOCATED);
752
753 let h_merged = ptr_b.resolve_header(base_ptr);
754 assert_eq!(h_merged.state.load(Ordering::Acquire), helpers::STATE_FREE);
755 assert_eq!(h_merged.size.load(Ordering::Acquire), 448);
756
757 let h_e = ptr_e.resolve_header(base_ptr);
758 assert_eq!(h_e.state.load(Ordering::Acquire), helpers::STATE_ALLOCATED);
759 }
760 }
761
762 /// ----------------------------------------------------------------------------
763 /// STRESS TESTS
764 ///
765 /// These are all ignored from the correctness suite so github doesn't get mad at
766 /// me. Before shipping, running these are explicitly required.
767 /// ----------------------------------------------------------------------------
768
769 /// Run 8.000.000 allocations in parallel (1.000.000 each) to test if the matrix
770 /// can hold without race conditions.
771 #[test]
772 #[ignore]
773 fn test_multithreaded_stress() {
774 // Quick author note:
775 //
776 // May God help us all at this moment.
777
778 use std::sync::{ Arc, Barrier };
779 use std::thread;
780 use std::collections::HashSet;
781
782 // We use 500MB matrix to allocate all the buffers
783 let size = 50 * 1024 * 1024;
784 let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
785
786 let thread_count = 8;
787 let allocs_per_second = 100_000;
788 let barrier = Arc::new(Barrier::new(thread_count));
789
790 // Track the failed allocs
791 let fail_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
792
793 let mut handles = vec![];
794
795 // Fuck the matrix! GO GO GO
796 for _ in 0..thread_count {
797 let b = Arc::clone(&barrier);
798 let base_addr = handler.base_ptr() as usize;
799 let matrix_addr = handler.matrix() as *const core::AtomicMatrix as usize;
800 let fail_count_clone = Arc::clone(&fail_count);
801
802 handles.push(
803 thread::spawn(move || {
804 let base_ptr = base_addr as *mut u8;
805 let matrix = unsafe { &*(matrix_addr as *const core::AtomicMatrix) };
806 let mut my_offsets = Vec::new();
807
808 b.wait();
809
810 for _ in 0..allocs_per_second {
811 for _ in 0..10 {
812 if let Ok(rel_ptr) = matrix.allocate(base_ptr, 64) {
813 my_offsets.push(rel_ptr.offset());
814 break;
815 }
816 fail_count_clone.fetch_add(1, Ordering::Relaxed);
817 std::hint::spin_loop();
818 }
819 }
820
821 my_offsets
822 })
823 );
824 }
825
826 // Collect everything we did and check
827 let mut all_offsets = Vec::new();
828 for h in handles {
829 all_offsets.extend(h.join().unwrap());
830 }
831
832 let total_obtained = all_offsets.len();
833 let unique_offsets: HashSet<_> = all_offsets.into_iter().collect();
834
835 // We allow for a 0.5% failure marging, as this stress test does not account for deallocations.
836 let success_percentage = ((thread_count * allocs_per_second) as f64) * 0.995;
837
838 // Assert we can obtain at least 99.5% of the expected allocations without collisions, which would
839 // indicate a potential race condition.
840 assert!(
841 total_obtained >= (success_percentage as usize),
842 "Total allocations should match expected count"
843 );
844 assert_eq!(
845 unique_offsets.len(),
846 total_obtained,
847 "RACE CONDITION DETECTED: Duplicate offsets found"
848 );
849 println!(
850 "Successfully allocated {} unique blocks across {} threads without collisions! {} allocations failed",
851 total_obtained,
852 thread_count,
853 fail_count.load(Ordering::Relaxed)
854 );
855 }
856
857 /// Test if the matrix can hold 10 minutes of 8 threads executing random alloc
858 /// and dealloc operations to ensure the Propagation Principle of Atomic
859 /// Coalescence works.
860 #[test]
861 #[ignore]
862 fn test_long_term_fragmentation_healing() {
863 // Quick author note:
864 //
865 // May god help your device at this moment
866
867 use std::sync::{ Arc, Barrier };
868 use std::thread;
869 use std::time::{ Instant, Duration };
870
871 const DURATION: u32 = 600;
872 const THREADS: u32 = 8;
873
874 let size = 50 * 1024 * 1024;
875 let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
876 let handler_arc = Arc::new(handler);
877 let barrier = Arc::new(Barrier::new(THREADS as usize));
878 let start_time = Instant::now();
879 let duration = Duration::from_secs(DURATION as u64);
880
881 let mut handles = vec![];
882
883 for t_id in 0..THREADS {
884 let h = Arc::clone(&handler_arc);
885 let b = Arc::clone(&barrier);
886
887 handles.push(
888 thread::spawn(move || {
889 let base_ptr = h.base_ptr() as *mut u8;
890 let matrix = &h.matrix();
891 let mut my_blocks = Vec::new();
892 let mut rng = t_id + 1;
893 let mut total_ops = 0u64;
894
895 b.wait();
896
897 while start_time.elapsed() < duration {
898 rng = rng.wrapping_mul(1103515245).wrapping_add(12345);
899
900 if rng % 10 < 7 && my_blocks.len() < 200 {
901 let alloc_size = (rng % 512);
902 if let Ok(ptr) = matrix.allocate(base_ptr, alloc_size as u32) {
903 my_blocks.push(ptr);
904 }
905 } else if !my_blocks.is_empty() {
906 let idx = (rng as usize) % my_blocks.len();
907 let ptr = my_blocks.swap_remove(idx);
908
909 let header_ptr = RelativePtr::<BlockHeader>::new(ptr.offset() - helpers::HEADER_SPACE);
910 matrix.ack(&header_ptr, base_ptr);
911
912 if total_ops % 5 == 0 {
913 matrix.coalesce(&header_ptr, base_ptr);
914 }
915 }
916 total_ops += 1;
917 }
918 (my_blocks, total_ops)
919 })
920 );
921 }
922
923 let mut total_work = 0u64;
924 for h in handles {
925 let (remaining, thread_ops) = h.join().unwrap();
926 total_work += thread_ops;
927 let base_ptr = handler_arc.base_ptr() as *mut u8;
928
929 for ptr in remaining {
930 let header_ptr = RelativePtr::<BlockHeader>::new(ptr.offset() - helpers::HEADER_SPACE);
931 handler_arc.matrix().ack(&header_ptr, base_ptr);
932 handler_arc.matrix().coalesce(&header_ptr, base_ptr);
933 }
934 }
935
936 let mut free_count = 0;
937 for fl in 0..32 {
938 for sl in 0..8 {
939 if handler_arc.matrix().matrix[fl][sl].load(Ordering::Acquire) != 0 {
940 free_count += 1;
941 }
942 }
943 }
944
945 let entrophy_percentage = ((free_count as f64) / (total_work as f64)) * 100.0;
946 let mops = (total_work as f64) / (DURATION as f64) / 1_000_000.0;
947
948 println!("Endurance Results (Duration of {} seconds)", DURATION);
949 println!("Num of threads: {}", THREADS);
950 println!("Total operations: {}", total_work);
951 println!("Throughput: {:.2} Mop/s", (total_work as f64) / (DURATION as f64) / 1_000_000.0);
952 println!("Final free fragments: {}", free_count);
953 println!("Entrophy percentage: {}%", entrophy_percentage);
954
955 assert!(entrophy_percentage < 0.001, "Excessive Fragmentation");
956 assert!(mops > 1.0, "Throughput regression: {:.2} Mop/s", mops);
957 }
958}