atomic_matrix/matrix.rs
1//! #A Atomic Matrix Core
2//!
3//! This module implements a high-velocity, lock-free memory arena designed for
4//! ultra-low latency IPC (Inter-Process Communication).
5//!
6//! # Theory of Operation: The Propagation Principle of Atomic Coalescence
7//! Unlike traditional allocators that use centralized mutexes or complex
8//! background garbage collection, the **AtomicMatrix** treats memory
9//! fragmentation as a fluid dynamics problem.
10//!
11//! 1. **Kinetic Healing:** Freeing a block triggers a "Ripple" ('coalesce')
12//! that propagates through the sector.
13//! 2. **Monotonicity:** Ripples only move backward (towards the sector origin)
14//! to prevent circular atomic dependencies and deadlocks.
15//! 3. **Permissive Concurrency:** If a thread encounters contention, it skips the
16//! block rather than blocking, relying on the high frequency of future operations
17//! to complete the healing.
18//!
19//! # Memory Topography
20//! The matrix is laid out linearly in a shared memory segment:
21//! ```text
22//! [ Init Guard (16b) ] [ AtomicMatrix Struct ] [ Padding ] [ Sector 0 ] [ Sector 1 ] ...
23//! ```
24//! Each **Sector** acts as a self-contained fault domain with its own boundary,
25//! preventing local fragmentation from "bleeding" into the entire matrix.
26//!
27//! # Safety & Atomicity
28//! All state transitions follow a strict 'STATE_FREE -> STATE_ALLOCATED ->
29//! STATE_ACKED -> STATE_COALESCING' lifecycle. Hardware-level memory fences
30//! (std::sync::atomic::fence) are utilized to ensure visibility across 16+ CPU
31//! cores without locking.
32
33use std::sync::atomic::{ AtomicU32, Ordering, fence };
34use std::marker::PhantomData;
35use std::fs::OpenOptions;
36use memmap2::MmapMut;
37use uuid::Uuid;
38
39const SYS_UNINITIALIZED: u32 = 0;
40const SYS_FORMATTING: u32 = 1;
41const SYS_READY: u32 = 2;
42
43const STATE_FREE: u32 = 0;
44const STATE_ALLOCATED: u32 = 1;
45const STATE_ACKED: u32 = 3;
46const STATE_COALESCING: u32 = 4;
47
48pub mod core {
49 use super::*;
50
51 /// Header structure that is written at the beginning of each block/sector
52 ///
53 /// The block is made entirely of atomic primitives to ensure safe reading
54 /// and manipulation across participant modules in the matrix.
55 #[derive(Debug)]
56 #[repr(C, align(16))]
57 pub struct BlockHeader {
58 pub size: AtomicU32,
59 pub state: AtomicU32,
60 pub prev_phys: AtomicU32,
61 pub next_free: AtomicU32,
62 pub prev_free: AtomicU32,
63 }
64
65 /// The structural core of the matrix.
66 ///
67 /// Its the non-blocking, SHM-backed memory arena, utilizing a segmented **TLSF
68 /// (Two-Level segregated fit)** inspired mapping for O(1) allocation, paired with
69 /// a custom **Kinetic Coalescing** logic.
70 ///
71 /// # Memory Layout
72 /// The matrix is designed to be mapped directly into '/dev/shm". It starts with
73 /// a 16-byte 'init_guard' followed by the struct itself, and then the sectorized
74 /// raw memory blocks.
75 #[repr(C)]
76 pub struct AtomicMatrix {
77 pub id: Uuid,
78 pub fl_bitmap: AtomicU32,
79 pub sl_bitmaps: [AtomicU32; 32],
80 pub matrix: [[AtomicU32; 8]; 32],
81 pub mmap: MmapMut,
82 pub sector_boundaries: [AtomicU32; 4],
83 pub total_size: u32,
84 }
85
86 /// A Relative Pointer to the block memory address, relative to the start of the
87 /// matrix inside the process memory scope.
88 ///
89 /// This RelativePointer is used to calculate the accurate address of the block its
90 /// related to. Providing a way for independent process to localize the data inside
91 /// their own mappings of the SHM segment.
92 ///
93 /// It also receives a PhantomData to inform the compiler we safely own whatever
94 /// generic type the caller has passed to this pointer.
95 pub struct RelativePtr<T> {
96 offset: u32,
97 _marker: PhantomData<T>,
98 }
99
100 /// A helper struct that provided the O(1) calculations to find the coordinates of
101 /// a block that suits exactly the requested buffer size, or the next available one
102 /// that can fit the message as well.
103 pub struct Mapping;
104
105 impl AtomicMatrix {
106 /// Initialized the matrix struct and returns it.
107 ///
108 /// This function will initialize both TLSF level flags, the matrix map for free
109 /// blocks, assign all the require metadata and return the ready to use object
110 ///
111 /// ### Params
112 /// @ptr: The pointer to the beginning of the matrix segment \
113 /// @id: The ID of this matrix instance \
114 /// @size: The total size of the SHM allocation
115 ///
116 /// ### Returns
117 /// A static, lifetime specified, reference to the matrix struct.
118 pub fn init(ptr: *mut AtomicMatrix, id: Uuid, size: u32) -> &'static mut Self {
119 unsafe {
120 let matrix = &mut *ptr;
121 matrix.fl_bitmap.store(0, Ordering::Release);
122 for i in 0..32 {
123 matrix.sl_bitmaps[i].store(0, Ordering::Release);
124 for j in 0..8 {
125 matrix.matrix[i][j].store(0, Ordering::Release);
126 }
127 }
128 matrix.id = id;
129 matrix.total_size = size;
130 matrix
131 }
132 }
133
134 /// The entry point of the matrix struct.
135 ///
136 /// It initializes SHM segment, bind to it, executes the initial formatting,
137 /// prepares both the matrix and handler structs and return the High-Level API
138 /// to the caller.
139 ///
140 /// ### Params:
141 /// @id: The ID of a new or existing matrix (if existing, will skip formatting and
142 /// just bind to it) \
143 /// @size: The SHM allocation size
144 ///
145 /// ### Returns
146 /// The matrix handler api, or an error to be handled
147 pub fn bootstrap(
148 id: Option<Uuid>,
149 size: usize,
150 sector_barriers: (u32, u32)
151 ) -> Result<crate::handlers::matrix_handler::MatrixHandler, String> {
152 let path_id = id.unwrap_or_else(Uuid::new_v4);
153 let path = format!("/dev/shm/{}", path_id);
154 let file = OpenOptions::new()
155 .read(true)
156 .write(true)
157 .create(true)
158 .open(&path)
159 .map_err(|e| e.to_string())?;
160
161 file.set_len(size as u64).map_err(|e| e.to_string())?;
162
163 let mut mmap = unsafe { MmapMut::map_mut(&file).map_err(|e| e.to_string())? };
164 let base_ptr = mmap.as_mut_ptr();
165
166 let init_guard = unsafe { &*(base_ptr as *const AtomicU32) };
167 let matrix_ptr = unsafe { base_ptr.add(16) as *mut AtomicMatrix };
168
169 if
170 init_guard
171 .compare_exchange(
172 SYS_UNINITIALIZED,
173 SYS_FORMATTING,
174 Ordering::SeqCst,
175 Ordering::Relaxed
176 )
177 .is_ok()
178 {
179 let matrix = AtomicMatrix::init(matrix_ptr, path_id, size as u32);
180 matrix
181 .sectorize(base_ptr, size, sector_barriers.0 as u8, sector_barriers.1 as u8)
182 .unwrap();
183 init_guard.store(SYS_READY, Ordering::SeqCst);
184 } else {
185 while init_guard.load(Ordering::Acquire) != SYS_READY {
186 std::hint::spin_loop();
187 }
188 }
189
190 Ok(crate::handlers::matrix_handler::MatrixHandler {
191 matrix: unsafe {
192 &mut *matrix_ptr
193 },
194 mmap,
195 })
196 }
197
198 /// Sectorizes the SHM segment into three different zones of allocation. These
199 /// zones are classified as Small, Medium and Large.
200 ///
201 /// - **Small Sector:** For data objects between 32 bytes and 1 KB.
202 /// - **Medium Sector:** For data objects between 1 KB and 1 MB.
203 /// - **Large Sector:** For data objects bigger than 1 MB.
204 ///
205 /// This ensures three main safeties for the matrix:
206 ///
207 /// - **Size integrity:** Blocks with similar sizes are required to stay together,
208 /// ensuring that we don't deal with a huge size variety in coalescing.
209 /// - **Propagation granularity:** The healing propagation only occurs inside the
210 /// block sector, ensuring that high operation sectors dont cause a tide of coalescing
211 /// into lower operation sectors.
212 /// - **Seach optimization:** Since small blocks are always together, it reduces
213 /// the TLSF searching index as the size you need is almost always garanteed to
214 /// exist.
215 ///
216 /// The sectorize also limits sectors based on the choosen size for the matrix to
217 /// ensure that if we have a small matrix (e.g.: 1mb) we don't allocate a unneces-
218 /// sary large sector.
219 ///
220 /// ### Params:
221 /// @base_ptr: The starting offset of the SHM mapping. \
222 /// @total_file_size: The total size of SHM segment \
223 /// @mut small_percent: The desired size percentage of the small sector \
224 /// @mut medium_sector: The desired size percentage of the medium sector
225 ///
226 /// ### Returns:
227 /// Any error that arises from the sectorizing. Otherwise, an Ok flag.
228 pub fn sectorize(
229 &self,
230 base_ptr: *const u8,
231 total_file_size: usize,
232 mut small_percent: u8,
233 mut medium_percent: u8
234 ) -> Result<(), String> {
235 let matrix_size = std::mem::size_of::<AtomicMatrix>();
236 let mut current_offset = (16 + (matrix_size as u32) + 15) & !15;
237 let usable_space = (total_file_size as u32).saturating_sub(current_offset);
238
239 if total_file_size < 5 * 1024 * 1024 {
240 small_percent = 30;
241 medium_percent = 70;
242 }
243
244 let small_size =
245 ((((usable_space as u64) * (small_percent as u64)) / 100) as u32) & !15;
246 let medium_size =
247 ((((usable_space as u64) * (medium_percent as u64)) / 100) as u32) & !15;
248 let large_size = usable_space.saturating_sub(small_size).saturating_sub(medium_size);
249
250 let mut prev_phys = 0u32;
251 let mut sizes = vec![small_size, medium_size, large_size];
252 sizes.retain(|&s| s > 64);
253
254 for (i, size) in sizes.iter().enumerate() {
255 let (fl, sl) = Mapping::find_indices(*size);
256 self.create_and_insert_sector(base_ptr, current_offset, *size, prev_phys, fl, sl);
257 prev_phys = current_offset;
258 current_offset += size;
259
260 if i < 4 {
261 self.sector_boundaries[i].store(current_offset, Ordering::Release);
262 }
263 }
264 Ok(())
265 }
266
267 /// Allocates a block in the matrix for the caller
268 ///
269 /// It acts as a greed allocator, ensuring each call will either get a block allocated
270 /// in the matrix, or it throws a OOM Contention flag. It achieves this by politely
271 /// trying to claim a block for itself. In case the CAS loop fails, it will simply jump
272 /// to the next free block on the chain, granting a lock-free allocation paradigm.
273 ///
274 /// Each allocation is allowed to retry itself 512 times to confirm the matrix is
275 /// indeed out of memory before killing the execution of the function.
276 ///
277 /// ### Params:
278 /// @base_ptr: The starting offset of the SHM mapping. \
279 /// @size: The allocation size of the block
280 ///
281 /// ### Returns:
282 /// Either the relative pointer to the allocated block, or the OOM Contention flag.
283 pub fn allocate(&self, base_ptr: *const u8, size: u32) -> Result<RelativePtr<u8>, String> {
284 let size = (size + 15) & !15;
285 let size = size.max(32);
286 let (fl, sl) = Mapping::find_indices(size);
287
288 for _ in 0..512 {
289 if let Some((f_fl, f_sl)) = self.find_suitable_block(fl, sl) {
290 if let Ok(offset) = self.remove_free_block(base_ptr, f_fl, f_sl) {
291 unsafe {
292 let header = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
293 let total_size = header.size.load(Ordering::Acquire);
294
295 if total_size >= size + 32 + 16 {
296 let rem_size = total_size - size;
297 let next_off = offset + size;
298 let next_h = &mut *(
299 base_ptr.add(next_off as usize) as *mut BlockHeader
300 );
301
302 next_h.size.store(rem_size, Ordering::Release);
303 next_h.state.store(STATE_FREE, Ordering::Release);
304 next_h.prev_phys.store(offset, Ordering::Release);
305 next_h.next_free.store(0, Ordering::Release);
306
307 header.size.store(size, Ordering::Release);
308 fence(Ordering::SeqCst);
309
310 let (r_fl, r_sl) = Mapping::find_indices(rem_size);
311 self.insert_free_block(base_ptr, next_off, r_fl, r_sl);
312 }
313 header.state.store(STATE_ALLOCATED, Ordering::Release);
314 return Ok(RelativePtr::new(offset + 32));
315 }
316 }
317 }
318 std::hint::spin_loop();
319 }
320 Err("OOM: Contention".into())
321 }
322
323 /// Acknowledges the freedon of a block and pushes it to the to_be_freed queue.
324 ///
325 /// If the to_be_freed queue is full, it will imediatelly trigger the drainage
326 /// of the queue and coalesce every block present before trying to push the
327 /// newly ack block into the queue. If there is space available, simply push
328 /// it and move on
329 ///
330 /// ### Params:
331 /// @ptr: The relative pointer of the block to acknowledge \
332 /// @base_ptr: The offset from the start of the SHM segment.
333 pub fn ack(&self, ptr: &RelativePtr<BlockHeader>, base_ptr: *const u8) {
334 unsafe {
335 let header = ptr.resolve_mut(base_ptr);
336
337 header.state.store(STATE_ACKED, Ordering::Release);
338 }
339
340 self.coalesce(ptr, base_ptr);
341 }
342
343 /// Tries to merge neighbouring blocks to the left until the end of the matrix is
344 /// reached or the neighbour block is not ACKED/FREE.
345 ///
346 /// This is the elegant implementation of the Kinetic Coalescence processes. It
347 /// receives the initial block that will start the ripple, and traverse the matrix
348 /// to the left (monotonicity guard). If any race conditions are met in the middle
349 /// (another coalescing just start, or a module just claimed this block), it will
350 /// stop the coalescing and move on (permissive healing).
351 ///
352 /// Then it tries to update the next neighbour previous physical offset metadata to
353 /// the start of the new free block. If this exchange fails due to end of sector, or
354 /// just claimed blocks, it will skip this marking in hopes that when this block is
355 /// eventually coalesced, it will passivelly merge backwards with the ripple and fix
356 /// the marking on its header by himself (horizon boundary).
357 ///
358 /// This three core implementations together composes the Propagation Principle of
359 /// Atomic Coalescence and enables the matrix to have such high throughput speeds.
360 ///
361 /// ### Params:
362 /// @ptr: The relative pointer of the block to coalesce. \
363 /// @base_ptr: The offset from the start of the SHM segment.
364 ///
365 /// ### Throws:
366 /// TidalRippleContentionError: Two coalescing ripples executing simultaneously on
367 /// the same blocks.
368 pub fn coalesce(&self, ptr: &RelativePtr<BlockHeader>, base_ptr: *const u8) {
369 unsafe {
370 let current_offset = ptr.offset();
371 let mut current_header = ptr.resolve_mut(base_ptr);
372 let mut total_size = current_header.size.load(Ordering::Acquire);
373 if total_size < 32 {
374 return;
375 }
376
377 let mut final_offset = current_offset;
378
379 while current_offset > 16 {
380 let prev_phys_offset = current_header.prev_phys.load(Ordering::Acquire);
381
382 if prev_phys_offset == 0 {
383 break;
384 }
385
386 let prev_header_ptr = base_ptr.add(
387 prev_phys_offset as usize
388 ) as *mut BlockHeader;
389 let prev_header = &mut *prev_header_ptr;
390
391 let res = prev_header.state.compare_exchange(
392 STATE_FREE,
393 STATE_COALESCING,
394 Ordering::Acquire,
395 Ordering::Relaxed
396 );
397
398 let claimed = if res.is_ok() {
399 true
400 } else {
401 prev_header.state
402 .compare_exchange(
403 STATE_ACKED,
404 STATE_COALESCING,
405 Ordering::Acquire,
406 Ordering::Relaxed
407 )
408 .is_ok()
409 };
410
411 if claimed {
412 let size_to_add = prev_header.size.swap(0, Ordering::Acquire);
413 if size_to_add == 0 || prev_phys_offset >= current_offset {
414 break;
415 }
416
417 if size_to_add > self.total_size {
418 break;
419 }
420
421 let (fl, sl) = Mapping::find_indices(size_to_add);
422
423 total_size = total_size
424 .checked_add(size_to_add)
425 .expect("TidalRippleCoalescingError.");
426 final_offset = prev_phys_offset;
427 current_header = prev_header;
428 self.remove_free_block(base_ptr, fl, sl).ok();
429 } else {
430 break;
431 }
432 }
433
434 let sector_limit = self.sector_end_offset(final_offset);
435 if let Some(next_h_offset) = final_offset.checked_add(total_size) {
436 if next_h_offset < sector_limit {
437 let next_h = &*(base_ptr.add(next_h_offset as usize) as *const BlockHeader);
438 next_h.prev_phys.store(final_offset, Ordering::Release);
439 }
440 }
441
442 current_header.size.store(total_size, Ordering::Release);
443 current_header.state.store(STATE_FREE, Ordering::Release);
444
445 let (fl, sl) = Mapping::find_indices(total_size);
446 self.insert_free_block(base_ptr, final_offset, fl, sl);
447 }
448 }
449
450 /// Queries a block offset inside of the matrix.
451 ///
452 /// Not much to say about this, the name is pretty self explanatory.
453 ///
454 /// ### Params:
455 /// @offset: The offset of the block to be queried
456 ///
457 /// ### Returns:
458 /// The Relative Pointer to the queried block
459 pub fn query(&self, offset: u32) -> RelativePtr<u8> {
460 RelativePtr::new(offset + 32)
461 }
462
463 /// Queries the TLSF bitmaps in search of a block.
464 ///
465 /// It acquires the first most suitable index flag (according to the find
466 /// _indices function) and does a bitwise operation to check if it possesses an
467 /// available block. If it matches, return the coordinates of the FL and the
468 /// CTZ result from the SL. If it doesn't match, performs CTZ on the first level
469 /// to return the first available coordinate.
470 ///
471 /// ### Params:
472 /// @fl: Calculated first level coordinate \
473 /// @sl: Calculated second level coordinate
474 ///
475 /// ### Returns:
476 /// A tuple containing the FL/SL coordinates or nothing if there is no space
477 /// available in the matrix.
478 fn find_suitable_block(&self, fl: u32, sl: u32) -> Option<(u32, u32)> {
479 let sl_map = self.sl_bitmaps[fl as usize].load(Ordering::Acquire);
480 let m_sl = sl_map & (!0u32 << sl);
481 if m_sl != 0 {
482 return Some((fl, m_sl.trailing_zeros()));
483 }
484
485 let fl_map = self.fl_bitmap.load(Ordering::Acquire);
486 let m_fl = fl_map & (!0u32 << (fl + 1));
487 if m_fl != 0 {
488 let f_fl = m_fl.trailing_zeros();
489 if f_fl < 32 {
490 let s_map = self.sl_bitmaps[f_fl as usize].load(Ordering::Acquire);
491 if s_map != 0 {
492 return Some((f_fl, s_map.trailing_zeros()));
493 }
494 }
495 }
496 None
497 }
498
499 /// Pops a free block from the TLSF bitmap.
500 ///
501 /// It tries atomically claims ownership over the header inside the map. If
502 /// successful, swap the current head to next free head in the chain, or 0 if
503 /// there is none. If it fails, it automatically assumes someone claimed the
504 /// buffer first and calls a hint::spin loop instruction to retry claiming a
505 /// head. If, in one of the interactions, the bucket returs 0, it breaks the
506 /// function with an error.
507 ///
508 /// ### Params:
509 /// @base_ptr: The offset from the start of the SHM segment \
510 /// @fl: First level coordinates of the bucket \
511 /// @sl: Second level coordinates of the head.
512 ///
513 /// ### Returns
514 /// A result containing either the head of the newly acquired block, or an
515 /// EmptyBitmapError
516 fn remove_free_block(&self, base_ptr: *const u8, fl: u32, sl: u32) -> Result<u32, String> {
517 let head = &self.matrix[fl as usize][sl as usize];
518 loop {
519 let off = head.load(Ordering::Acquire);
520 if off == 0 {
521 return Err("EmptyBitmapError".into());
522 }
523 let next = unsafe {
524 (*(base_ptr.add(off as usize) as *const BlockHeader)).next_free.load(
525 Ordering::Acquire
526 )
527 };
528 if head.compare_exchange(off, next, Ordering::AcqRel, Ordering::Relaxed).is_ok() {
529 if next == 0 {
530 self.sl_bitmaps[fl as usize].fetch_and(!(1 << sl), Ordering::Release);
531 if self.sl_bitmaps[fl as usize].load(Ordering::Acquire) == 0 {
532 self.fl_bitmap.fetch_and(!(1 << fl), Ordering::Release);
533 }
534 }
535 return Ok(off);
536 }
537 std::hint::spin_loop();
538 }
539 }
540
541 /// Stores a new header inside a bucket
542 ///
543 /// It does the exact oposite of the remove_free_block basically.
544 ///
545 /// ### Params:
546 /// @base_ptr: The offset from the beginning of the SHM segment \
547 /// @offset: The header offset to be inserted into the bucket \
548 /// @fl: The first level insertion coordinates \
549 /// @sl: The second level insertion coordinates
550 fn insert_free_block(&self, base_ptr: *const u8, offset: u32, fl: u32, sl: u32) {
551 let head = &self.matrix[fl as usize][sl as usize];
552 unsafe {
553 let h = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
554 loop {
555 let old = head.load(Ordering::Acquire);
556 h.next_free.store(old, Ordering::Release);
557 if
558 head
559 .compare_exchange(old, offset, Ordering::AcqRel, Ordering::Relaxed)
560 .is_ok()
561 {
562 break;
563 }
564 std::hint::spin_loop();
565 }
566 }
567 self.fl_bitmap.fetch_or(1 << fl, Ordering::Release);
568 self.sl_bitmaps[fl as usize].fetch_or(1 << sl, Ordering::Release);
569 }
570
571 /// Creates and formates the header of the sector, as well as pushing it into its
572 /// corresponding boundary position inside the matrix.
573 ///
574 /// ### Params:
575 /// @base_ptr: The offset from the start of the SHM segment \
576 /// @size: The size of the sector \
577 /// @prev: The previous sector, if any \
578 /// @fl: The first level coordinate based on po2 size scalling \
579 /// @sl: The second level coordinate based on 8 steps division size scalling
580 fn create_and_insert_sector(
581 &self,
582 base_ptr: *const u8,
583 offset: u32,
584 size: u32,
585 prev: u32,
586 fl: u32,
587 sl: u32
588 ) {
589 unsafe {
590 let h = &mut *(base_ptr.add(offset as usize) as *mut BlockHeader);
591 h.size.store(size, Ordering::Release);
592 h.state.store(STATE_FREE, Ordering::Release);
593 h.prev_phys.store(prev, Ordering::Release);
594 h.next_free.store(0, Ordering::Release);
595 self.insert_free_block(base_ptr, offset, fl, sl);
596 }
597 }
598
599 /// Returns the boundary of the current sector
600 ///
601 /// It queries the boundaries from the metadata and check wheter the block fits or
602 /// not inside this sector.
603 ///
604 /// ### Params:
605 /// @current_offset: The offset to check against the sector.
606 ///
607 /// ### Returns:
608 /// Either the boundary value of the current sector, or the end of the segment.
609 fn sector_end_offset(&self, current_offset: u32) -> u32 {
610 for i in 0..4 {
611 let boundary = self.sector_boundaries[i].load(Ordering::Acquire);
612 if boundary == 0 {
613 break;
614 }
615 if current_offset < boundary {
616 return boundary;
617 }
618 }
619
620 self.mmap.len() as u32
621 }
622 }
623
624 impl<T> RelativePtr<T> {
625 /// Creates a new relative pointer based on the provided offset
626 ///
627 /// This initializes the pointer with the PhantomData ownership over the type we
628 /// are passing the the parameter
629 ///
630 /// ### Params:
631 /// @offset: The offset value to be wrapped in the pointer.
632 ///
633 /// ### Returns:
634 /// A instance of Self.
635 pub fn new(offset: u32) -> Self {
636 Self { offset, _marker: PhantomData }
637 }
638
639 /// Returns the offset value in the pointer
640 pub fn offset(&self) -> u32 {
641 self.offset
642 }
643
644 /// Resolves the header based on the base_ptr of the current caller process.
645 ///
646 /// This ensures that the pointer returned is actually mapped to the process local
647 /// memory scope
648 ///
649 /// ### Params:
650 /// @base_ptr: The offset from the start of the SHM segment.
651 ///
652 /// ### Returns:
653 /// A life time speficied reference to the header of this block
654 pub unsafe fn resolve_header<'a>(&self, base_ptr: *const u8) -> &'a BlockHeader {
655 unsafe { &*(base_ptr.add((self.offset as usize) - 32) as *mut BlockHeader) }
656 }
657
658 pub unsafe fn resolve_header_mut<'a>(&self, base_ptr: *const u8) -> &'a mut BlockHeader {
659 unsafe { &mut *(base_ptr.add((self.offset as usize) - 32) as *mut BlockHeader) }
660 }
661
662 /// Resolves the block scope based on the base_ptr of the current caller process.
663 ///
664 /// This ensures that the pointer returned is actually mapped to the process local
665 /// memory scope
666 ///
667 /// ### Params:
668 /// @base_ptr: The offset from the start of the SHM segment.
669 ///
670 /// ### Returns:
671 /// A life time specified reference to the block scope.
672 pub unsafe fn resolve<'a>(&self, base_ptr: *const u8) -> &'a T {
673 unsafe { &*(base_ptr.add(self.offset as usize) as *mut T) }
674 }
675
676 /// Resolves the block scope based on the base_ptr of the current caller process.
677 ///
678 /// This ensures that the pointer returned is actually mapped to the process local
679 /// memory scope
680 ///
681 /// ### Params:
682 /// @base_ptr: The offset from the start of the SHM segment.
683 ///
684 /// ### Returns:
685 /// A life time specified mutable reference to the block scope.
686 pub unsafe fn resolve_mut<'a>(&self, base_ptr: *const u8) -> &'a mut T {
687 unsafe { &mut *(base_ptr.add(self.offset as usize) as *mut T) }
688 }
689 }
690
691 impl Mapping {
692 /// Maps a block size to its corresponding (First-Level, Second-Level) indices.
693 ///
694 /// This function implements a two-level mapping strategy used for O(1) free-block
695 /// lookup, optimized for both high-velocity small allocations and logarithmic
696 /// scaling of large blocks.
697 ///
698 /// ### Mapping Logic:
699 /// - **Linear (Small):** For sizes < 128, it uses a fixed FL (0) and 16-byte SL
700 /// subdivisions. This minimizes fragmentation for tiny objects.
701 /// - **Logarithmic (Large):** For sizes >= 128, FL is the power of 2 (determined via
702 /// `leading_zeros`), and SL is a 3-bit subdivider of the range between 2^n and 2^(n+1).
703 ///
704 /// ### Mathematical Transformation:
705 /// - `FL = log2(size)`
706 /// - `SL = (size - 2^FL) / (2^(FL - 3))`
707 ///
708 /// ### Bounds:
709 /// Indices are clamped to `(31, 7)` to prevent overflow in the matrix bitmask.
710 ///
711 /// # Arguments
712 /// * `size` - The total byte size of the memory block.
713 ///
714 /// # Returns
715 /// A tuple of `(fl, sl)` indices.
716 pub fn find_indices(size: u32) -> (u32, u32) {
717 if size < 128 {
718 (0, (size / 16).min(7))
719 } else {
720 let fl = 31 - size.leading_zeros();
721 let sl = ((size >> (fl - 3)) & 0x7).min(7);
722 (fl.min(31), sl)
723 }
724 }
725 }
726}
727
728#[cfg(test)]
729mod tests {
730 use crate::matrix::core::{ BlockHeader, RelativePtr };
731
732 use super::*;
733
734 /// Test if the mapping function can return the correct indexes.
735 #[test]
736 fn test_mapping() {
737 assert_eq!(core::Mapping::find_indices(16), (0, 1));
738 assert_eq!(core::Mapping::find_indices(64), (0, 4));
739 assert_eq!(core::Mapping::find_indices(128), (7, 0));
740
741 let (fl, sl) = core::Mapping::find_indices(1024);
742 assert_eq!(fl, 10);
743 assert_eq!(sl, 0);
744 }
745
746 /// Test if the bootstrap function actually initializes the matrix, and allocates the
747 /// blocks on the correct bitmaps.
748 #[test]
749 fn test_initial_bootstrap() {
750 let size = 1024 * 1024;
751 let handler = core::AtomicMatrix
752 ::bootstrap(Some(uuid::Uuid::new_v4()), size, (100, 0))
753 .unwrap();
754
755 let bitmap = handler.matrix.fl_bitmap.load(Ordering::Acquire);
756
757 assert!(bitmap != 0, "FL bitmap should not be zero after sectorization");
758 assert!(
759 (bitmap & ((1 << 19) | (1 << 18))) != 0,
760 "FL bitmap should have bits set for the expected sectors (19 and 18 for 512KB and 256KB"
761 );
762 }
763
764 /// Test if the matrix can sectorize itself.
765 #[test]
766 fn test_initialization_integrity() {
767 let mut fake_shm = vec![0u8; 1024 * 1024];
768 let base_ptr = fake_shm.as_mut_ptr();
769
770 unsafe {
771 let matrix_ptr = base_ptr.add(16) as *mut core::AtomicMatrix;
772 let matrix = core::AtomicMatrix::init(
773 matrix_ptr,
774 uuid::Uuid::new_v4(),
775 fake_shm.len() as u32
776 );
777
778 matrix.sectorize(base_ptr, 1024 * 1024, 10, 20).unwrap();
779
780 assert!(matrix.fl_bitmap.load(Ordering::Relaxed) != 0);
781 }
782 }
783
784 /// Test allocation and sppliting logic by comparing the size of our buffer.
785 #[test]
786 fn test_allocation_and_spliting() {
787 let size = 1024 * 1024;
788 let mut handler = core::AtomicMatrix
789 ::bootstrap(Some(uuid::Uuid::new_v4()), size, (100, 0))
790 .unwrap();
791 let base_ptr = handler.mmap.as_mut_ptr();
792 let matrix = &mut *handler.matrix;
793
794 unsafe {
795 let rel_ptr = matrix.allocate(base_ptr, 64).unwrap();
796
797 let header = rel_ptr.resolve_header(base_ptr);
798
799 assert_eq!(header.size.load(Ordering::Acquire), 64);
800 assert_eq!(header.state.load(Ordering::Acquire), STATE_ALLOCATED);
801 }
802 }
803
804 /// Test coalesce logic to see if blocks will merge correctly.
805 #[test]
806 fn test_ack_and_coalesce() {
807 let size = 1024 * 1024;
808 let mut handler = core::AtomicMatrix
809 ::bootstrap(Some(uuid::Uuid::new_v4()), size, (100, 0))
810 .unwrap();
811 let base_ptr = handler.mmap.as_mut_ptr();
812 let matrix = &mut *handler.matrix;
813
814 unsafe {
815 let ptr_a = matrix.allocate(base_ptr, 64).unwrap();
816 let ptr_b = matrix.allocate(base_ptr, 64).unwrap();
817 let ptr_c = matrix.allocate(base_ptr, 64).unwrap();
818 let ptr_d = matrix.allocate(base_ptr, 64).unwrap();
819 let ptr_e = matrix.allocate(base_ptr, 64).unwrap();
820
821 let h_b = ptr_b.resolve_header(base_ptr);
822 let rel_c = RelativePtr::<BlockHeader>::new(ptr_c.offset() - 32);
823 let rel_d = RelativePtr::<BlockHeader>::new(ptr_d.offset() - 32);
824
825 h_b.state.store(STATE_FREE, Ordering::Release);
826 matrix.ack(&rel_c, base_ptr);
827 matrix.ack(&rel_d, base_ptr);
828
829 matrix.coalesce(&rel_d, base_ptr);
830
831 let h_a = ptr_a.resolve_header(base_ptr);
832 assert_eq!(h_a.state.load(Ordering::Acquire), STATE_ALLOCATED);
833
834 let h_merged = ptr_b.resolve_header(base_ptr);
835 assert_eq!(h_merged.state.load(Ordering::Acquire), STATE_FREE);
836 assert_eq!(h_merged.size.load(Ordering::Acquire), 256);
837
838 let h_e = ptr_e.resolve_header(base_ptr);
839 assert_eq!(h_e.state.load(Ordering::Acquire), STATE_ALLOCATED);
840 }
841 }
842
843 /// ----------------------------------------------------------------------------
844 /// STRESS TESTS
845 ///
846 /// These are all ignored from the correctness suite so github doesn't get mad at
847 /// me. Before shipping, running these are explicitly required.
848 /// ----------------------------------------------------------------------------
849
850 /// Run 8.000.000 allocations in parallel (1.000.000 each) to test if the matrix
851 /// can hold without race conditions.
852 #[test]
853 #[ignore]
854 fn test_multithreaded_stress() {
855 // Quick author note:
856 //
857 // May God help us all at this moment.
858
859 use std::sync::{ Arc, Barrier };
860 use std::thread;
861 use std::collections::HashSet;
862
863 // We use 500MB matrix to allocate all the buffers
864 let size = 50 * 1024 * 1024;
865 let handler = core::AtomicMatrix
866 ::bootstrap(Some(uuid::Uuid::new_v4()), size, (40, 30))
867 .unwrap();
868
869 let thread_count = 8;
870 let allocs_per_second = 100_000;
871 let barrier = Arc::new(Barrier::new(thread_count));
872
873 // Track the failed allocs
874 let fail_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
875
876 let mut handles = vec![];
877
878 // Fuck the matrix! GO GO GO
879 for _ in 0..thread_count {
880 let b = Arc::clone(&barrier);
881 let base_addr = handler.mmap.as_ptr() as usize;
882 let matrix_addr = handler.matrix as *const core::AtomicMatrix as usize;
883 let fail_count_clone = Arc::clone(&fail_count);
884
885 handles.push(
886 thread::spawn(move || {
887 let base_ptr = base_addr as *mut u8;
888 let matrix = unsafe { &*(matrix_addr as *const core::AtomicMatrix) };
889 let mut my_offsets = Vec::new();
890
891 b.wait();
892
893 for _ in 0..allocs_per_second {
894 for _ in 0..10 {
895 if let Ok(rel_ptr) = matrix.allocate(base_ptr, 64) {
896 my_offsets.push(rel_ptr.offset());
897 break;
898 }
899 fail_count_clone.fetch_add(1, Ordering::Relaxed);
900 std::hint::spin_loop();
901 }
902 }
903
904 my_offsets
905 })
906 );
907 }
908
909 // Collect everything we did and check
910 let mut all_offsets = Vec::new();
911 for h in handles {
912 all_offsets.extend(h.join().unwrap());
913 }
914
915 let total_obtained = all_offsets.len();
916 let unique_offsets: HashSet<_> = all_offsets.into_iter().collect();
917
918 // We allow for a 0.5% failure marging, as this stress test does not account for deallocations.
919 let success_percentage = ((thread_count * allocs_per_second) as f64) * 0.995;
920
921 // Assert we can obtain at least 99.5% of the expected allocations without collisions, which would
922 // indicate a potential race condition.
923 assert!(
924 total_obtained >= (success_percentage as usize),
925 "Total allocations should match expected count"
926 );
927 assert_eq!(
928 unique_offsets.len(),
929 total_obtained,
930 "RACE CONDITION DETECTED: Duplicate offsets found"
931 );
932 println!(
933 "Successfully allocated {} unique blocks across {} threads without collisions! {} allocations failed",
934 total_obtained,
935 thread_count,
936 fail_count.load(Ordering::Relaxed)
937 );
938 }
939
940 /// Test if the matrix can hold 10 minutes of 8 threads executing random alloc
941 /// and dealloc operations to ensure the Propagation Principle of Atomic
942 /// Coalescence works.
943 #[test]
944 #[ignore]
945 fn test_long_term_fragmentation_healing() {
946 use std::sync::{ Arc, Barrier };
947 use std::thread;
948 use std::time::{ Instant, Duration };
949
950 const DURATION: u32 = 600;
951 const THREADS: u32 = 8;
952
953 let size = 50 * 1024 * 1024;
954 let handler = core::AtomicMatrix
955 ::bootstrap(Some(uuid::Uuid::new_v4()), size, (40, 30))
956 .unwrap();
957 let handler_arc = Arc::new(handler);
958 let barrier = Arc::new(Barrier::new(THREADS as usize));
959 let start_time = Instant::now();
960 let duration = Duration::from_secs(DURATION as u64);
961
962 let mut handles = vec![];
963
964 for t_id in 0..THREADS {
965 let h = Arc::clone(&handler_arc);
966 let b = Arc::clone(&barrier);
967
968 handles.push(
969 thread::spawn(move || {
970 let base_ptr = h.mmap.as_ptr() as *mut u8;
971 let matrix = &h.matrix;
972 let mut my_blocks = Vec::new();
973 let mut rng = t_id + 1;
974 let mut total_ops = 0u64;
975
976 b.wait();
977
978 while start_time.elapsed() < duration {
979 rng = rng.wrapping_mul(1103515245).wrapping_add(12345);
980
981 if rng % 10 < 7 && my_blocks.len() < 200 {
982 let alloc_size = (rng % 512) + 32;
983 if let Ok(ptr) = matrix.allocate(base_ptr, alloc_size as u32) {
984 my_blocks.push(ptr);
985 }
986 } else if !my_blocks.is_empty() {
987 let idx = (rng as usize) % my_blocks.len();
988 let ptr = my_blocks.swap_remove(idx);
989
990 let header_ptr = RelativePtr::<BlockHeader>::new(ptr.offset() - 32);
991 matrix.ack(&header_ptr, base_ptr);
992
993 if total_ops % 5 == 0 {
994 matrix.coalesce(&header_ptr, base_ptr);
995 }
996 }
997 total_ops += 1;
998 }
999 (my_blocks, total_ops)
1000 })
1001 );
1002 }
1003
1004 let mut total_work = 0u64;
1005 for h in handles {
1006 let (remaining, thread_ops) = h.join().unwrap();
1007 total_work += thread_ops;
1008 let base_ptr = handler_arc.mmap.as_ptr() as *mut u8;
1009
1010 for ptr in remaining {
1011 let header_ptr = RelativePtr::<BlockHeader>::new(ptr.offset() - 32);
1012 handler_arc.matrix.ack(&header_ptr, base_ptr);
1013 handler_arc.matrix.coalesce(&header_ptr, base_ptr);
1014 }
1015 }
1016
1017 let mut free_count = 0;
1018 for fl in 0..32 {
1019 for sl in 0..8 {
1020 if handler_arc.matrix.matrix[fl][sl].load(Ordering::Acquire) != 0 {
1021 free_count += 1;
1022 }
1023 }
1024 }
1025
1026 let entrophy_percentage = ((free_count as f64) / (total_work as f64)) * 100.0;
1027 let mops = (total_work as f64) / (DURATION as f64) / 1_000_000.0;
1028
1029 println!("Endurance Results (Duration of {} seconds)", DURATION);
1030 println!("Num of threads: {}", THREADS);
1031 println!("Total operations: {}", total_work);
1032 println!("Throughput: {:.2} Mop/s", (total_work as f64) / (DURATION as f64) / 1_000_000.0);
1033 println!("Final free fragments: {}", free_count);
1034 println!("Entrophy percentage: {}%", entrophy_percentage);
1035
1036 assert!(entrophy_percentage < 0.001, "Excessive Fragmentation");
1037 assert!(mops > 1.0, "Throughput regression: {:.2} Mop/s", mops);
1038 }
1039}