atomic_matrix/matrix.rs
1//! # Atomic Matrix Core
2//!
3//! This module implements a high-velocity, lock-free memory arena designed for
4//! ultra-low latency IPC (Inter-Process Communication).
5//!
6//! # Theory of Operation: The Propagation Principle of Atomic Coalescence
7//! Unlike traditional allocators that use centralized mutexes or complex
8//! background garbage collection, the **AtomicMatrix** treats memory
9//! fragmentation as a fluid dynamics problem.
10//!
11//! 1. **Kinetic Healing:** Freeing a block triggers a "Ripple" ('coalesce')
12//! that propagates through the sector.
13//! 2. **Monotonicity:** Ripples only move backward (towards the sector origin)
14//! to prevent circular atomic dependencies and deadlocks.
15//! 3. **Permissive Concurrency:** If a thread encounters contention, it skips the
16//! block rather than blocking, relying on the high frequency of future operations
17//! to complete the healing.
18//!
19//! # Memory Topography
20//! The matrix is laid out linearly in a shared memory segment:
21//! ```text
22//! [ Init Guard (16b) ] [ AtomicMatrix Struct ] [ Padding ] [ Sector 0 ] [ Sector 1 ] ...
23//! ```
24//! Each **Sector** acts as a self-contained fault domain with its own boundary,
25//! preventing local fragmentation from "bleeding" into the entire matrix.
26//!
27//! # Safety & Atomicity
28//! All state transitions follow a strict 'STATE_FREE -> STATE_ALLOCATED ->
29//! STATE_ACKED -> STATE_COALESCING' lifecycle. Hardware-level memory fences
30//! (std::sync::atomic::fence) are utilized to ensure visibility across 16+ CPU
31//! cores without locking.
32
33use std::sync::atomic::{ AtomicU32, Ordering, fence };
34use std::marker::PhantomData;
35use std::fs::OpenOptions;
36use memmap2::MmapMut;
37use uuid::Uuid;
38
39pub mod helpers {
40 /// System initialization flags
41 pub const SYS_UNINITIALIZED: u32 = 0;
42 pub const SYS_FORMATTING: u32 = 1;
43 pub const SYS_READY: u32 = 2;
44
45 /// Header lifecycle flags
46 pub const STATE_FREE: u32 = 0;
47 pub const STATE_ALLOCATED: u32 = 1;
48 pub const STATE_ACKED: u32 = 2;
49 pub const STATE_COALESCING: u32 = 3;
50
51 /// A helper struct that provided the O(1) calculations to find the coordinates of
52 /// a block that suits exactly the requested buffer size, or the next available one
53 /// that can fit the message as well.
54 pub struct Mapping;
55
56 impl Mapping {
57 /// Maps a block size to its corresponding (First-Level, Second-Level) indices.
58 ///
59 /// This function implements a two-level mapping strategy used for O(1) free-block
60 /// lookup, optimized for both high-velocity small allocations and logarithmic
61 /// scaling of large blocks.
62 ///
63 /// ### Mapping Logic:
64 /// - **Linear (Small):** For sizes < 128, it uses a fixed FL (0) and 16-byte SL
65 /// subdivisions. This minimizes fragmentation for tiny objects.
66 /// - **Logarithmic (Large):** For sizes >= 128, FL is the power of 2 (determined via
67 /// `leading_zeros`), and SL is a 3-bit subdivider of the range between 2^n and 2^(n+1).
68 ///
69 /// ### Mathematical Transformation:
70 /// - `FL = log2(size)`
71 /// - `SL = (size - 2^FL) / (2^(FL - 3))`
72 ///
73 /// ### Bounds:
74 /// Indices are clamped to `(31, 7)` to prevent overflow in the matrix bitmask.
75 ///
76 /// # Arguments
77 /// * `size` - The total byte size of the memory block.
78 ///
79 /// # Returns
80 /// A tuple of `(fl, sl)` indices.
81 pub fn find_indices(size: u32) -> (u32, u32) {
82 if size < 128 {
83 (0, (size / 16).min(7))
84 } else {
85 let fl = 31 - size.leading_zeros();
86 let sl = ((size >> (fl - 3)) & 0x7).min(7);
87 (fl.min(31), sl)
88 }
89 }
90 }
91}
92
93pub mod core {
94 use super::*;
95
96 /// Header structure that is written at the beginning of each block/sector
97 ///
98 /// The block is made entirely of atomic primitives to ensure safe reading
99 /// and manipulation across participant modules in the matrix.
100 #[derive(Debug)]
101 #[repr(C, align(16))]
102 pub struct BlockHeader {
103 pub size: AtomicU32,
104 pub state: AtomicU32,
105 pub prev_phys: AtomicU32,
106 pub next_free: AtomicU32,
107 pub prev_free: AtomicU32,
108 }
109
110 /// The structural core of the matrix.
111 ///
112 /// Its the non-blocking, SHM-backed memory arena, utilizing a segmented **TLSF
113 /// (Two-Level segregated fit)** inspired mapping for O(1) allocation, paired with
114 /// a custom **Kinetic Coalescing** logic.
115 ///
116 /// # Memory Layout
117 /// The matrix is designed to be mapped directly into '/dev/shm". It starts with
118 /// a 16-byte 'init_guard' followed by the struct itself, and then the sectorized
119 /// raw memory blocks.
120 #[repr(C)]
121 pub struct AtomicMatrix {
122 pub id: Uuid,
123 pub fl_bitmap: AtomicU32,
124 pub sl_bitmaps: [AtomicU32; 32],
125 pub matrix: [[AtomicU32; 8]; 32],
126 pub mmap: MmapMut,
127 pub sector_boundaries: [AtomicU32; 4],
128 pub total_size: u32,
129 }
130
131 /// A Relative Pointer to the block memory address, relative to the start of the
132 /// matrix inside the process memory scope.
133 ///
134 /// This RelativePointer is used to calculate the accurate address of the block its
135 /// related to. Providing a way for independent process to localize the data inside
136 /// their own mappings of the SHM segment.
137 ///
138 /// It also receives a PhantomData to inform the compiler we safely own whatever
139 /// generic type the caller has passed to this pointer.
140 pub struct RelativePtr<T> {
141 offset: u32,
142 _marker: PhantomData<T>,
143 }
144
145
146
147 impl AtomicMatrix {
148 /// Initialized the matrix struct and returns it.
149 ///
150 /// This function will initialize both TLSF level flags, the matrix map for free
151 /// blocks, assign all the require metadata and return the ready to use object
152 ///
153 /// ### Params
154 /// @ptr: The pointer to the beginning of the matrix segment \
155 /// @id: The ID of this matrix instance \
156 /// @size: The total size of the SHM allocation
157 ///
158 /// ### Returns
159 /// A static, lifetime specified, reference to the matrix struct.
160 fn init(ptr: *mut AtomicMatrix, id: Uuid, size: u32) -> &'static mut Self {
161 unsafe {
162 let matrix = &mut *ptr;
163 matrix.fl_bitmap.store(0, Ordering::Release);
164 for i in 0..32 {
165 matrix.sl_bitmaps[i].store(0, Ordering::Release);
166 for j in 0..8 {
167 matrix.matrix[i][j].store(0, Ordering::Release);
168 }
169 }
170 matrix.id = id;
171 matrix.total_size = size;
172 matrix
173 }
174 }
175
176 /// The entry point of the matrix struct.
177 ///
178 /// It initializes SHM segment, bind to it, executes the initial formatting,
179 /// prepares both the matrix and handler structs and return the High-Level API
180 /// to the caller.
181 ///
182 /// ### Params:
183 /// @id: The ID of a new or existing matrix (if existing, will skip formatting and
184 /// just bind to it) \
185 /// @size: The SHM allocation size
186 ///
187 /// ### Returns
188 /// The matrix handler api, or an error to be handled
189 pub fn bootstrap(
190 id: Option<Uuid>,
191 size: usize
192 ) -> Result<crate::handlers::MatrixHandler, String> {
193 let path_id = id.unwrap_or_else(Uuid::new_v4);
194 let path = format!("/dev/shm/{}", path_id);
195 let file = OpenOptions::new()
196 .read(true)
197 .write(true)
198 .create(true)
199 .open(&path)
200 .map_err(|e| e.to_string())?;
201
202 file.set_len(size as u64).map_err(|e| e.to_string())?;
203
204 let mut mmap = unsafe { MmapMut::map_mut(&file).map_err(|e| e.to_string())? };
205 let base_ptr = mmap.as_mut_ptr();
206
207 let init_guard = unsafe { &*(base_ptr as *const AtomicU32) };
208 let matrix_ptr = unsafe { base_ptr.add(16) as *mut AtomicMatrix };
209 let mut current_offset: u32 = 0;
210
211 if
212 init_guard
213 .compare_exchange(
214 helpers::SYS_UNINITIALIZED,
215 helpers::SYS_FORMATTING,
216 Ordering::SeqCst,
217 Ordering::Relaxed
218 )
219 .is_ok()
220 {
221 let matrix = AtomicMatrix::init(matrix_ptr, path_id, size as u32);
222
223 let matrix_size = std::mem::size_of::<AtomicMatrix>();
224 let remaining_size = size - matrix_size;
225 current_offset = (16 + (matrix_size as u32)+ 15) & !15;
226
227 let (fl, sl) = helpers::Mapping::find_indices(remaining_size as u32);
228
229 let header: &mut BlockHeader;
230 unsafe {
231 header = &mut *(base_ptr.add(current_offset as usize) as *mut BlockHeader);
232 }
233
234 header.size.store(remaining_size as u32, Ordering::Release);
235 header.state.store(helpers::STATE_FREE, Ordering::Release);
236 header.prev_phys.store(0, Ordering::Release);
237 header.next_free.store(0, Ordering::Release);
238
239 matrix.insert_free_block(base_ptr, current_offset, fl, sl);
240 matrix.sector_boundaries[0].store(size as u32, Ordering::Release);
241
242 init_guard.store(helpers::SYS_READY, Ordering::SeqCst);
243 } else {
244 while init_guard.load(Ordering::Acquire) != helpers::SYS_READY {
245 std::hint::spin_loop();
246 }
247 }
248
249 Ok(crate::handlers::MatrixHandler::new(
250 unsafe { &mut *matrix_ptr },
251 mmap,
252 current_offset
253 ))
254 }
255
256 /// Allocates a block in the matrix for the caller
257 ///
258 /// It acts as a greed allocator, ensuring each call will either get a block allocated
259 /// in the matrix, or it throws a OOM Contention flag. It achieves this by politely
260 /// trying to claim a block for itself. In case the CAS loop fails, it will simply jump
261 /// to the next free block on the chain, granting a lock-free allocation paradigm.
262 ///
263 /// Each allocation is allowed to retry itself 512 times to confirm the matrix is
264 /// indeed out of memory before killing the execution of the function.
265 ///
266 /// ### Params:
267 /// @base_ptr: The starting offset of the SHM mapping. \
268 /// @size: The allocation size of the block
269 ///
270 /// ### Returns:
271 /// Either the relative pointer to the allocated block, or the OOM Contention flag.
272 pub fn allocate(&self, base_ptr: *const u8, size: u32) -> Result<RelativePtr<u8>, String> {
273 let size = (size + 15) & !15;
274 let size = size.max(32);
275 let (fl, sl) = helpers::Mapping::find_indices(size);
276
277 for _ in 0..512 {
278 if let Some((f_fl, f_sl)) = self.find_suitable_block(fl, sl) {
279 if let Ok(offset) = self.remove_free_block(base_ptr, f_fl, f_sl) {
280 unsafe {
281 let header = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
282 let total_size = header.size.load(Ordering::Acquire);
283
284 if total_size >= size + 32 + 16 {
285 let rem_size = total_size - size;
286 let next_off = offset + size;
287 let next_h = &mut *(
288 base_ptr.add(next_off as usize) as *mut BlockHeader
289 );
290
291 next_h.size.store(rem_size, Ordering::Release);
292 next_h.state.store(helpers::STATE_FREE, Ordering::Release);
293 next_h.prev_phys.store(offset, Ordering::Release);
294 next_h.next_free.store(0, Ordering::Release);
295
296 header.size.store(size, Ordering::Release);
297 fence(Ordering::SeqCst);
298
299 let (r_fl, r_sl) = helpers::Mapping::find_indices(rem_size);
300 self.insert_free_block(base_ptr, next_off, r_fl, r_sl);
301 }
302 header.state.store(helpers::STATE_ALLOCATED, Ordering::Release);
303 return Ok(RelativePtr::new(offset + 32));
304 }
305 }
306 }
307 std::hint::spin_loop();
308 }
309 Err("OOM: Contention".into())
310 }
311
312 /// Acknowledges the freedon of a block and pushes it to the to_be_freed queue.
313 ///
314 /// If the to_be_freed queue is full, it will imediatelly trigger the drainage
315 /// of the queue and coalesce every block present before trying to push the
316 /// newly ack block into the queue. If there is space available, simply push
317 /// it and move on
318 ///
319 /// ### Params:
320 /// @ptr: The relative pointer of the block to acknowledge \
321 /// @base_ptr: The offset from the start of the SHM segment.
322 pub fn ack(&self, ptr: &RelativePtr<BlockHeader>, base_ptr: *const u8) {
323 unsafe {
324 let header = ptr.resolve_mut(base_ptr);
325
326 header.state.store(helpers::STATE_ACKED, Ordering::Release);
327 }
328
329 self.coalesce(ptr, base_ptr);
330 }
331
332 /// Tries to merge neighbouring blocks to the left until the end of the matrix is
333 /// reached or the neighbour block is not ACKED/FREE.
334 ///
335 /// This is the elegant implementation of the Kinetic Coalescence processes. It
336 /// receives the initial block that will start the ripple, and traverse the matrix
337 /// to the left (monotonicity guard). If any race conditions are met in the middle
338 /// (another coalescing just start, or a module just claimed this block), it will
339 /// stop the coalescing and move on (permissive healing).
340 ///
341 /// Then it tries to update the next neighbour previous physical offset metadata to
342 /// the start of the new free block. If this exchange fails due to end of sector, or
343 /// just claimed blocks, it will skip this marking in hopes that when this block is
344 /// eventually coalesced, it will passivelly merge backwards with the ripple and fix
345 /// the marking on its header by himself (horizon boundary).
346 ///
347 /// This three core implementations together composes the Propagation Principle of
348 /// Atomic Coalescence and enables the matrix to have such high throughput speeds.
349 ///
350 /// ### Params:
351 /// @ptr: The relative pointer of the block to coalesce. \
352 /// @base_ptr: The offset from the start of the SHM segment.
353 ///
354 /// ### Throws:
355 /// TidalRippleContentionError: Two coalescing ripples executing simultaneously on
356 /// the same blocks.
357 pub fn coalesce(&self, ptr: &RelativePtr<BlockHeader>, base_ptr: *const u8) {
358 unsafe {
359 let current_offset = ptr.offset();
360 let mut current_header = ptr.resolve_mut(base_ptr);
361 let mut total_size = current_header.size.load(Ordering::Acquire);
362 if total_size < 32 {
363 return;
364 }
365
366 let mut final_offset = current_offset;
367
368 while current_offset > 16 {
369 let prev_phys_offset = current_header.prev_phys.load(Ordering::Acquire);
370
371 if prev_phys_offset == 0 {
372 break;
373 }
374
375 let prev_header_ptr = base_ptr.add(
376 prev_phys_offset as usize
377 ) as *mut BlockHeader;
378 let prev_header = &mut *prev_header_ptr;
379
380 let res = prev_header.state.compare_exchange(
381 helpers::STATE_FREE,
382 helpers::STATE_COALESCING,
383 Ordering::Acquire,
384 Ordering::Relaxed
385 );
386
387 let claimed = if res.is_ok() {
388 true
389 } else {
390 prev_header.state
391 .compare_exchange(
392 helpers::STATE_ACKED,
393 helpers::STATE_COALESCING,
394 Ordering::Acquire,
395 Ordering::Relaxed
396 )
397 .is_ok()
398 };
399
400 if claimed {
401 let size_to_add = prev_header.size.swap(0, Ordering::Acquire);
402 if size_to_add == 0 || prev_phys_offset >= current_offset {
403 break;
404 }
405
406 if size_to_add > self.total_size {
407 break;
408 }
409
410 let (fl, sl) = helpers::Mapping::find_indices(size_to_add);
411
412 total_size = total_size
413 .checked_add(size_to_add)
414 .expect("TidalRippleCoalescingError.");
415 final_offset = prev_phys_offset;
416 current_header = prev_header;
417 self.remove_free_block(base_ptr, fl, sl).ok();
418 } else {
419 break;
420 }
421 }
422
423 let sector_limit = self.sector_end_offset(final_offset);
424 if let Some(next_h_offset) = final_offset.checked_add(total_size) {
425 if next_h_offset < sector_limit {
426 let next_h = &*(base_ptr.add(next_h_offset as usize) as *const BlockHeader);
427 next_h.prev_phys.store(final_offset, Ordering::Release);
428 }
429 }
430
431 current_header.size.store(total_size, Ordering::Release);
432 current_header.state.store(helpers::STATE_FREE, Ordering::Release);
433
434 let (fl, sl) = helpers::Mapping::find_indices(total_size);
435 self.insert_free_block(base_ptr, final_offset, fl, sl);
436 }
437 }
438
439 /// Queries a block offset inside of the matrix.
440 ///
441 /// Not much to say about this, the name is pretty self explanatory.
442 ///
443 /// ### Params:
444 /// @offset: The offset of the block to be queried
445 ///
446 /// ### Returns:
447 /// The Relative Pointer to the queried block
448 pub fn query(&self, offset: u32) -> RelativePtr<u8> {
449 RelativePtr::new(offset + 32)
450 }
451
452 /// Queries the TLSF bitmaps in search of a block.
453 ///
454 /// It acquires the first most suitable index flag (according to the find
455 /// _indices function) and does a bitwise operation to check if it possesses an
456 /// available block. If it matches, return the coordinates of the FL and the
457 /// CTZ result from the SL. If it doesn't match, performs CTZ on the first level
458 /// to return the first available coordinate.
459 ///
460 /// ### Params:
461 /// @fl: Calculated first level coordinate \
462 /// @sl: Calculated second level coordinate
463 ///
464 /// ### Returns:
465 /// A tuple containing the FL/SL coordinates or nothing if there is no space
466 /// available in the matrix.
467 pub fn find_suitable_block(&self, fl: u32, sl: u32) -> Option<(u32, u32)> {
468 let sl_map = self.sl_bitmaps[fl as usize].load(Ordering::Acquire);
469 let m_sl = sl_map & (!0u32 << sl);
470 if m_sl != 0 {
471 return Some((fl, m_sl.trailing_zeros()));
472 }
473
474 let fl_map = self.fl_bitmap.load(Ordering::Acquire);
475 let m_fl = fl_map & (!0u32 << (fl + 1));
476 if m_fl != 0 {
477 let f_fl = m_fl.trailing_zeros();
478 if f_fl < 32 {
479 let s_map = self.sl_bitmaps[f_fl as usize].load(Ordering::Acquire);
480 if s_map != 0 {
481 return Some((f_fl, s_map.trailing_zeros()));
482 }
483 }
484 }
485 None
486 }
487
488 /// Pops a free block from the TLSF bitmap.
489 ///
490 /// It tries atomically claims ownership over the header inside the map. If
491 /// successful, swap the current head to next free head in the chain, or 0 if
492 /// there is none. If it fails, it automatically assumes someone claimed the
493 /// buffer first and calls a hint::spin loop instruction to retry claiming a
494 /// head. If, in one of the interactions, the bucket returs 0, it breaks the
495 /// function with an error.
496 ///
497 /// ### Params:
498 /// @base_ptr: The offset from the start of the SHM segment \
499 /// @fl: First level coordinates of the bucket \
500 /// @sl: Second level coordinates of the head.
501 ///
502 /// ### Returns
503 /// A result containing either the head of the newly acquired block, or an
504 /// EmptyBitmapError
505 pub fn remove_free_block(&self, base_ptr: *const u8, fl: u32, sl: u32) -> Result<u32, String> {
506 let head = &self.matrix[fl as usize][sl as usize];
507 loop {
508 let off = head.load(Ordering::Acquire);
509 if off == 0 {
510 return Err("EmptyBitmapError".into());
511 }
512 let next = unsafe {
513 (*(base_ptr.add(off as usize) as *const BlockHeader)).next_free.load(
514 Ordering::Acquire
515 )
516 };
517 if head.compare_exchange(off, next, Ordering::AcqRel, Ordering::Relaxed).is_ok() {
518 if next == 0 {
519 self.sl_bitmaps[fl as usize].fetch_and(!(1 << sl), Ordering::Release);
520 if self.sl_bitmaps[fl as usize].load(Ordering::Acquire) == 0 {
521 self.fl_bitmap.fetch_and(!(1 << fl), Ordering::Release);
522 }
523 }
524 return Ok(off);
525 }
526 std::hint::spin_loop();
527 }
528 }
529
530 /// Stores a new header inside a bucket
531 ///
532 /// It does the exact oposite of the remove_free_block basically.
533 ///
534 /// ### Params:
535 /// @base_ptr: The offset from the beginning of the SHM segment \
536 /// @offset: The header offset to be inserted into the bucket \
537 /// @fl: The first level insertion coordinates \
538 /// @sl: The second level insertion coordinates
539 pub fn insert_free_block(&self, base_ptr: *const u8, offset: u32, fl: u32, sl: u32) {
540 let head = &self.matrix[fl as usize][sl as usize];
541 unsafe {
542 let h = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
543 loop {
544 let old = head.load(Ordering::Acquire);
545 h.next_free.store(old, Ordering::Release);
546 if
547 head
548 .compare_exchange(old, offset, Ordering::AcqRel, Ordering::Relaxed)
549 .is_ok()
550 {
551 break;
552 }
553 std::hint::spin_loop();
554 }
555 }
556 self.fl_bitmap.fetch_or(1 << fl, Ordering::Release);
557 self.sl_bitmaps[fl as usize].fetch_or(1 << sl, Ordering::Release);
558 }
559
560 /// Returns the boundary of the current sector
561 ///
562 /// It queries the boundaries from the metadata and check wheter the block fits or
563 /// not inside this sector.
564 ///
565 /// ### Params:
566 /// @current_offset: The offset to check against the sector.
567 ///
568 /// ### Returns:
569 /// Either the boundary value of the current sector, or the end of the segment.
570 pub fn sector_end_offset(&self, current_offset: u32) -> u32 {
571 for i in 0..4 {
572 let boundary = self.sector_boundaries[i].load(Ordering::Acquire);
573 if boundary == 0 {
574 break;
575 }
576 if current_offset < boundary {
577 return boundary;
578 }
579 }
580
581 self.mmap.len() as u32
582 }
583 }
584
585 impl<T> RelativePtr<T> {
586 /// Creates a new relative pointer based on the provided offset
587 ///
588 /// This initializes the pointer with the PhantomData ownership over the type we
589 /// are passing the the parameter
590 ///
591 /// ### Params:
592 /// @offset: The offset value to be wrapped in the pointer.
593 ///
594 /// ### Returns:
595 /// A instance of Self.
596 pub fn new(offset: u32) -> Self {
597 Self { offset, _marker: PhantomData }
598 }
599
600 /// Returns the offset value in the pointer
601 pub fn offset(&self) -> u32 {
602 self.offset
603 }
604
605 /// Resolves the header based on the base_ptr of the current caller process.
606 ///
607 /// This ensures that the pointer returned is actually mapped to the process local
608 /// memory scope
609 ///
610 /// ### Params:
611 /// @base_ptr: The offset from the start of the SHM segment.
612 ///
613 /// ### Returns:
614 /// A life time speficied reference to the header of this block
615 pub unsafe fn resolve_header<'a>(&self, base_ptr: *const u8) -> &'a BlockHeader {
616 unsafe { &*(base_ptr.add((self.offset as usize) - 32) as *mut BlockHeader) }
617 }
618
619 /// Resolves the header based on the base_ptr of the current caller process.
620 ///
621 /// This ensures that the pointer returned is actually mapped to the process local
622 /// memory scope
623 ///
624 /// ### Params:
625 /// @base_ptr: The offset from the start of the SHM segment.
626 ///
627 /// ### Returns:
628 /// A life time speficied mutable reference to the header of this block
629 pub unsafe fn resolve_header_mut<'a>(&self, base_ptr: *const u8) -> &'a mut BlockHeader {
630 unsafe { &mut *(base_ptr.add((self.offset as usize) - 32) as *mut BlockHeader) }
631 }
632
633 /// Resolves the block scope based on the base_ptr of the current caller process.
634 ///
635 /// This ensures that the pointer returned is actually mapped to the process local
636 /// memory scope
637 ///
638 /// ### Params:
639 /// @base_ptr: The offset from the start of the SHM segment.
640 ///
641 /// ### Returns:
642 /// A life time specified reference to the block scope.
643 pub unsafe fn resolve<'a>(&self, base_ptr: *const u8) -> &'a T {
644 unsafe { &*(base_ptr.add(self.offset as usize) as *mut T) }
645 }
646
647 /// Resolves the block scope based on the base_ptr of the current caller process.
648 ///
649 /// This ensures that the pointer returned is actually mapped to the process local
650 /// memory scope
651 ///
652 /// ### Params:
653 /// @base_ptr: The offset from the start of the SHM segment.
654 ///
655 /// ### Returns:
656 /// A life time specified mutable reference to the block scope.
657 pub unsafe fn resolve_mut<'a>(&self, base_ptr: *const u8) -> &'a mut T {
658 unsafe { &mut *(base_ptr.add(self.offset as usize) as *mut T) }
659 }
660
661 pub unsafe fn write(&self, base_ptr: *const u8, value: T) {
662 unsafe { std::ptr::write(self.resolve_mut(base_ptr), value) }
663 }
664 }
665
666
667}
668
669#[cfg(test)]
670mod tests {
671 use crate::matrix::core::{ BlockHeader, RelativePtr };
672 use crate::handlers::HandlerFunctions;
673
674 use super::*;
675
676 /// Test if the mapping function can return the correct indexes.
677 #[test]
678 fn test_mapping() {
679 assert_eq!(helpers::Mapping::find_indices(16), (0, 1));
680 assert_eq!(helpers::Mapping::find_indices(64), (0, 4));
681 assert_eq!(helpers::Mapping::find_indices(128), (7, 0));
682
683 let (fl, sl) = helpers::Mapping::find_indices(1024);
684 assert_eq!(fl, 10);
685 assert_eq!(sl, 0);
686 }
687
688 /// Test if the bootstrap function actually initializes the matrix, and allocates the
689 /// blocks on the correct bitmaps.
690 #[test]
691 fn test_initial_bootstrap() {
692 let size = 1024 * 1024;
693 let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
694
695 let bitmap = handler.matrix().fl_bitmap.load(Ordering::Acquire);
696
697 assert!(bitmap != 0, "FL bitmap should not be zero after bootstrap");
698 }
699
700 /// Test allocation and sppliting logic by comparing the size of our buffer.
701 #[test]
702 fn test_allocation_and_spliting() {
703 let size = 1024 * 1024;
704 let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
705 let base_ptr = handler.base_ptr();
706 let matrix = handler.matrix();
707
708 unsafe {
709 let rel_ptr = matrix.allocate(base_ptr, 64).unwrap();
710
711 let header = rel_ptr.resolve_header(base_ptr);
712
713 assert_eq!(header.size.load(Ordering::Acquire), 64);
714 assert_eq!(header.state.load(Ordering::Acquire), helpers::STATE_ALLOCATED);
715 }
716 }
717
718 /// Test coalesce logic to see if blocks will merge correctly.
719 #[test]
720 fn test_ack_and_coalesce() {
721 let size = 1024 * 1024;
722 let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
723 let base_ptr = handler.base_ptr();
724 let matrix = handler.matrix();
725
726 unsafe {
727 let ptr_a = matrix.allocate(base_ptr, 64).unwrap();
728 let ptr_b = matrix.allocate(base_ptr, 64).unwrap();
729 let ptr_c = matrix.allocate(base_ptr, 64).unwrap();
730 let ptr_d = matrix.allocate(base_ptr, 64).unwrap();
731 let ptr_e = matrix.allocate(base_ptr, 64).unwrap();
732
733 let h_b = ptr_b.resolve_header(base_ptr);
734 let rel_c = RelativePtr::<BlockHeader>::new(ptr_c.offset() - 32);
735 let rel_d = RelativePtr::<BlockHeader>::new(ptr_d.offset() - 32);
736
737 h_b.state.store(helpers::STATE_FREE, Ordering::Release);
738 matrix.ack(&rel_c, base_ptr);
739 matrix.ack(&rel_d, base_ptr);
740
741 matrix.coalesce(&rel_d, base_ptr);
742
743 let h_a = ptr_a.resolve_header(base_ptr);
744 assert_eq!(h_a.state.load(Ordering::Acquire), helpers::STATE_ALLOCATED);
745
746 let h_merged = ptr_b.resolve_header(base_ptr);
747 assert_eq!(h_merged.state.load(Ordering::Acquire), helpers::STATE_FREE);
748 assert_eq!(h_merged.size.load(Ordering::Acquire), 256);
749
750 let h_e = ptr_e.resolve_header(base_ptr);
751 assert_eq!(h_e.state.load(Ordering::Acquire), helpers::STATE_ALLOCATED);
752 }
753 }
754
755 /// ----------------------------------------------------------------------------
756 /// STRESS TESTS
757 ///
758 /// These are all ignored from the correctness suite so github doesn't get mad at
759 /// me. Before shipping, running these are explicitly required.
760 /// ----------------------------------------------------------------------------
761
762 /// Run 8.000.000 allocations in parallel (1.000.000 each) to test if the matrix
763 /// can hold without race conditions.
764 #[test]
765 #[ignore]
766 fn test_multithreaded_stress() {
767 // Quick author note:
768 //
769 // May God help us all at this moment.
770
771 use std::sync::{ Arc, Barrier };
772 use std::thread;
773 use std::collections::HashSet;
774
775 // We use 500MB matrix to allocate all the buffers
776 let size = 50 * 1024 * 1024;
777 let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
778
779 let thread_count = 8;
780 let allocs_per_second = 100_000;
781 let barrier = Arc::new(Barrier::new(thread_count));
782
783 // Track the failed allocs
784 let fail_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
785
786 let mut handles = vec![];
787
788 // Fuck the matrix! GO GO GO
789 for _ in 0..thread_count {
790 let b = Arc::clone(&barrier);
791 let base_addr = handler.base_ptr() as usize;
792 let matrix_addr = handler.matrix() as *const core::AtomicMatrix as usize;
793 let fail_count_clone = Arc::clone(&fail_count);
794
795 handles.push(
796 thread::spawn(move || {
797 let base_ptr = base_addr as *mut u8;
798 let matrix = unsafe { &*(matrix_addr as *const core::AtomicMatrix) };
799 let mut my_offsets = Vec::new();
800
801 b.wait();
802
803 for _ in 0..allocs_per_second {
804 for _ in 0..10 {
805 if let Ok(rel_ptr) = matrix.allocate(base_ptr, 64) {
806 my_offsets.push(rel_ptr.offset());
807 break;
808 }
809 fail_count_clone.fetch_add(1, Ordering::Relaxed);
810 std::hint::spin_loop();
811 }
812 }
813
814 my_offsets
815 })
816 );
817 }
818
819 // Collect everything we did and check
820 let mut all_offsets = Vec::new();
821 for h in handles {
822 all_offsets.extend(h.join().unwrap());
823 }
824
825 let total_obtained = all_offsets.len();
826 let unique_offsets: HashSet<_> = all_offsets.into_iter().collect();
827
828 // We allow for a 0.5% failure marging, as this stress test does not account for deallocations.
829 let success_percentage = ((thread_count * allocs_per_second) as f64) * 0.995;
830
831 // Assert we can obtain at least 99.5% of the expected allocations without collisions, which would
832 // indicate a potential race condition.
833 assert!(
834 total_obtained >= (success_percentage as usize),
835 "Total allocations should match expected count"
836 );
837 assert_eq!(
838 unique_offsets.len(),
839 total_obtained,
840 "RACE CONDITION DETECTED: Duplicate offsets found"
841 );
842 println!(
843 "Successfully allocated {} unique blocks across {} threads without collisions! {} allocations failed",
844 total_obtained,
845 thread_count,
846 fail_count.load(Ordering::Relaxed)
847 );
848 }
849
850 /// Test if the matrix can hold 10 minutes of 8 threads executing random alloc
851 /// and dealloc operations to ensure the Propagation Principle of Atomic
852 /// Coalescence works.
853 #[test]
854 #[ignore]
855 fn test_long_term_fragmentation_healing() {
856 // Quick author note:
857 //
858 // May god help your device at this moment
859
860 use std::sync::{ Arc, Barrier };
861 use std::thread;
862 use std::time::{ Instant, Duration };
863
864 const DURATION: u32 = 600;
865 const THREADS: u32 = 8;
866
867 let size = 50 * 1024 * 1024;
868 let handler = core::AtomicMatrix::bootstrap(Some(uuid::Uuid::new_v4()), size).unwrap();
869 let handler_arc = Arc::new(handler);
870 let barrier = Arc::new(Barrier::new(THREADS as usize));
871 let start_time = Instant::now();
872 let duration = Duration::from_secs(DURATION as u64);
873
874 let mut handles = vec![];
875
876 for t_id in 0..THREADS {
877 let h = Arc::clone(&handler_arc);
878 let b = Arc::clone(&barrier);
879
880 handles.push(
881 thread::spawn(move || {
882 let base_ptr = h.base_ptr() as *mut u8;
883 let matrix = &h.matrix();
884 let mut my_blocks = Vec::new();
885 let mut rng = t_id + 1;
886 let mut total_ops = 0u64;
887
888 b.wait();
889
890 while start_time.elapsed() < duration {
891 rng = rng.wrapping_mul(1103515245).wrapping_add(12345);
892
893 if rng % 10 < 7 && my_blocks.len() < 200 {
894 let alloc_size = (rng % 512) + 32;
895 if let Ok(ptr) = matrix.allocate(base_ptr, alloc_size as u32) {
896 my_blocks.push(ptr);
897 }
898 } else if !my_blocks.is_empty() {
899 let idx = (rng as usize) % my_blocks.len();
900 let ptr = my_blocks.swap_remove(idx);
901
902 let header_ptr = RelativePtr::<BlockHeader>::new(ptr.offset() - 32);
903 matrix.ack(&header_ptr, base_ptr);
904
905 if total_ops % 5 == 0 {
906 matrix.coalesce(&header_ptr, base_ptr);
907 }
908 }
909 total_ops += 1;
910 }
911 (my_blocks, total_ops)
912 })
913 );
914 }
915
916 let mut total_work = 0u64;
917 for h in handles {
918 let (remaining, thread_ops) = h.join().unwrap();
919 total_work += thread_ops;
920 let base_ptr = handler_arc.base_ptr() as *mut u8;
921
922 for ptr in remaining {
923 let header_ptr = RelativePtr::<BlockHeader>::new(ptr.offset() - 32);
924 handler_arc.matrix().ack(&header_ptr, base_ptr);
925 handler_arc.matrix().coalesce(&header_ptr, base_ptr);
926 }
927 }
928
929 let mut free_count = 0;
930 for fl in 0..32 {
931 for sl in 0..8 {
932 if handler_arc.matrix().matrix[fl][sl].load(Ordering::Acquire) != 0 {
933 free_count += 1;
934 }
935 }
936 }
937
938 let entrophy_percentage = ((free_count as f64) / (total_work as f64)) * 100.0;
939 let mops = (total_work as f64) / (DURATION as f64) / 1_000_000.0;
940
941 println!("Endurance Results (Duration of {} seconds)", DURATION);
942 println!("Num of threads: {}", THREADS);
943 println!("Total operations: {}", total_work);
944 println!("Throughput: {:.2} Mop/s", (total_work as f64) / (DURATION as f64) / 1_000_000.0);
945 println!("Final free fragments: {}", free_count);
946 println!("Entrophy percentage: {}%", entrophy_percentage);
947
948 assert!(entrophy_percentage < 0.001, "Excessive Fragmentation");
949 assert!(mops > 1.0, "Throughput regression: {:.2} Mop/s", mops);
950 }
951}