limen_core/memory/concurrent_manager.rs
1//! Concurrent heap-backed memory manager with lock-free freelist.
2//!
3//! This module implements a production-ready concurrent memory manager that
4//! satisfies the C1a requirements:
5//!
6//! - Per-slot locking: reads/writes only lock the targeted slot using an
7//! `RwLock`, giving fine-grained isolation and good concurrency for reads.
8//! - No whole-manager lock: allocation and free are lock-free using a Treiber
9//! stack free-list implemented with atomics + a 32-bit tag to avoid ABA.
10//! - Heap-backed capacity: the manager takes a `capacity` at construction and
11//! stores slots/next pointers in `Vec` so the size is dynamic when `alloc`
12//! is available.
13//! - Ergonomic concurrent API: `store_shared`, `read_shared`, etc. take `&self`
14//! and are safe for concurrent use; trait methods required by
15//! `MemoryManager` delegate to these shared methods.
16//!
17//! Rationale
18//! ---------
19//! - A Treiber stack with a tagged head (index + u32 tag packed into `u64`)
20//! gives a simple, fast lock-free freelist and mitigates ABA by advancing
21//! the tag on each successful CAS. The tag wraps naturally and safely.
22//! - Per-slot `RwLock` ensures that reads are concurrent and writes are
23//! exclusive on a per-slot basis. Combined with the lock-free freelist this
24//! means no global lock is held during steady-state reads and writes.
25//! - `available_count` is an `AtomicUsize` for cheap diagnostics without
26//! locking and is kept consistent with successful freelist operations.
27//!
28//! Safety notes
29//! ------------
30//! - `free_shared` uses a write lock on the slot, which waits for existing
31//! readers to finish. This ensures correctness: freeing a slot cannot race
32//! with readers that think the message exists because readers hold the read
33//! lock while accessing `message`.
34//! - `pop_free` / `push_free` are lock-free loops using atomic CAS on the
35//! tagged head; they may spin under heavy contention. Backoff/yielding is
36//! used to avoid burning a single core indefinitely.
37//!
38//! Performance notes
39//! -----------------
40//! - Reads are highly concurrent and cheap (single `RwLock::read` + access).
41//! - Writes are exclusive per-slot and only block readers of the same slot.
42//! - Allocation/free operations are lock-free and fast; the only blocking
43//! synchronization remaining is slot `RwLock` for the actual store/free.
44//!
45//! Tests in this module exercise correctness under concurrent usage and
46//! allocation reuse.
47
48use core::ops::{Deref, DerefMut};
49use std::sync::{
50 atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering},
51 Arc, RwLock, RwLockReadGuard, RwLockWriteGuard,
52};
53
54use crate::errors::MemoryError;
55use crate::memory::header_store::HeaderStore;
56use crate::memory::manager::MemoryManager;
57use crate::memory::MemoryClass;
58use crate::message::payload::Payload;
59use crate::message::{Message, MessageHeader};
60use crate::prelude::ScopedManager;
61use crate::types::MessageToken;
62
63/// Special value meaning "no index" for the freelist next pointer and head.
64const EMPTY_INDEX: u32 = u32::MAX;
65
66/// Pack helper for the free-list head: upper 32 bits contain a tag (u32),
67/// lower 32 bits contain the head index (u32). The tagged head prevents ABA
68/// by changing the tag on each successful CAS update.
69#[inline]
70fn pack_head(tag: u32, idx: u32) -> u64 {
71 ((tag as u64) << 32) | (idx as u64)
72}
73/// Unpack helper returning (tag, index).
74#[inline]
75fn unpack_head(v: u64) -> (u32, u32) {
76 ((v >> 32) as u32, (v & 0xffff_ffff) as u32)
77}
78
79/// The mutable per-slot payload state protected by an `RwLock`.
80///
81/// The `message` field is `Option<Message<P>>`. While `Some`, the slot is
82/// considered allocated and its content must be accessible to readers and
83/// writers only while appropriate locks are held.
84struct ConcurrentSlotState<P: Payload> {
85 message: Option<Message<P>>,
86}
87
88impl<P: Payload> ConcurrentSlotState<P> {
89 /// Construct an empty slot state (unallocated).
90 fn new() -> Self {
91 Self { message: None }
92 }
93}
94
95/// A slot combines the `RwLock` with its `ConcurrentSlotState`.
96///
97/// We keep a `ConcurrentSlot` abstraction so the shared vector type is clear
98/// and we can initialize slots uniformly.
99struct ConcurrentSlot<P: Payload> {
100 state: RwLock<ConcurrentSlotState<P>>,
101}
102
103impl<P: Payload> ConcurrentSlot<P> {
104 /// Construct a new slot with an empty state.
105 fn new() -> Self {
106 Self {
107 state: RwLock::new(ConcurrentSlotState::new()),
108 }
109 }
110}
111
112/// The shared, heap-backed state of the concurrent manager.
113///
114/// This struct is reference-counted by `Arc` inside `ConcurrentMemoryManager` so
115/// multiple handles can be cloned cheaply and share the underlying storage.
116struct ConcurrentMemoryManagerShared<P: Payload> {
117 /// The per-slot RwLocks and states.
118 slots: Vec<ConcurrentSlot<P>>,
119
120 /// Per-slot "next" pointers used by the Treiber free-list. `next_free[i]`
121 /// stores the index of the successor of `i` in the freelist, or
122 /// `EMPTY_INDEX` for none.
123 next_free: Vec<AtomicU32>,
124
125 /// The freelist head is a packed (tag, index) in an AtomicU64. Lower 32
126 /// bits: index of head or EMPTY_INDEX; upper 32 bits: tag counter.
127 free_head: AtomicU64,
128
129 /// Number of free slots, updated on successful push/pop. Used for quick
130 /// diagnostics and admission control. Relaxed loads are acceptable.
131 available_count: AtomicUsize,
132
133 /// Memory class metadata for diagnostics / policy.
134 mem_class: MemoryClass,
135}
136
137impl<P: Payload> ConcurrentMemoryManagerShared<P> {
138 /// Create a new shared state object with `capacity` slots and initial
139 /// freelist linking 0 -> 1 -> 2 -> ... -> capacity-1 -> EMPTY.
140 ///
141 /// The `mem_class` parameter describes where the memory logically resides.
142 fn new(mem_class: MemoryClass, capacity: usize) -> Self {
143 assert!(capacity <= u32::MAX as usize);
144 // Initialize slots vector with RwLocks protecting empty states.
145 let mut slots = Vec::with_capacity(capacity);
146 for _ in 0..capacity {
147 slots.push(ConcurrentSlot::new());
148 }
149
150 // Initialize the per-slot next_free pointers to form a singly-linked
151 // list of free indices. This is the initial freelist state.
152 let mut next_free = Vec::with_capacity(capacity);
153 for i in 0..capacity {
154 let next = if i + 1 < capacity {
155 (i + 1) as u32
156 } else {
157 EMPTY_INDEX
158 };
159 next_free.push(AtomicU32::new(next));
160 }
161
162 // Head initially points to index 0 with tag 0 (unless capacity==0).
163 let head_index = if capacity > 0 { 0 } else { EMPTY_INDEX };
164 let free_head = AtomicU64::new(pack_head(0, head_index));
165 let available_count = AtomicUsize::new(capacity);
166
167 Self {
168 slots,
169 next_free,
170 free_head,
171 available_count,
172 mem_class,
173 }
174 }
175
176 // ---------------------------------------------------------------------
177 // Lock-free freelist operations (Treiber stack)
178 // ---------------------------------------------------------------------
179
180 /// Pop (allocate) an index from the freelist in a lock-free manner.
181 ///
182 /// Returns `Some(index)` on success, or `None` if the freelist is empty.
183 /// On success the `available_count` is decremented. The caller must then
184 /// acquire the slot's write lock before storing into the slot.
185 ///
186 /// Correctness notes:
187 /// - This is a classic Treiber stack pop with a packed (tag,index) head to
188 /// avoid ABA. The tag is incremented on every successful CAS.
189 /// - We use `Ordering::Acquire` for loads and `AcqRel` for the successful
190 /// CAS to ensure proper memory ordering with respect to `next_free`.
191 fn pop_free(&self) -> Option<usize> {
192 let mut spins = 0u32;
193 loop {
194 // Load the packed head (Acquire)
195 let head = self.free_head.load(Ordering::Acquire);
196 let (tag, idx) = unpack_head(head);
197
198 // Empty?
199 if idx == EMPTY_INDEX {
200 return None;
201 }
202
203 // Read the successor index of the head. Acquire to synchronize.
204 let next = self.next_free[idx as usize].load(Ordering::Acquire);
205
206 // Prepare a new head with incremented tag and successor index.
207 let new_tag = tag.wrapping_add(1);
208 let new_head = pack_head(new_tag, next);
209
210 // Attempt CAS to move head -> successor.
211 if self
212 .free_head
213 .compare_exchange(head, new_head, Ordering::AcqRel, Ordering::Relaxed)
214 .is_ok()
215 {
216 // Successful pop: update diagnostics and return index.
217 self.available_count.fetch_sub(1, Ordering::AcqRel);
218 return Some(idx as usize);
219 }
220
221 // CAS failed due to concurrent updates. Backoff occasionally.
222 spins = spins.wrapping_add(1);
223 if spins & 0xFF == 0 {
224 std::thread::yield_now();
225 }
226 }
227 }
228
229 /// Push (free) an index back onto the freelist in a lock-free manner.
230 ///
231 /// On success the `available_count` is incremented. The caller must ensure
232 /// the slot is already cleared (message removed) before pushing it back.
233 ///
234 /// Correctness notes:
235 /// - We store the current head into `next_free[idx]` (Release) and then try
236 /// to CAS the head to `(tag+1, idx)`. A successful CAS inserts `idx`
237 /// as the new head. We increment the available counter only on success.
238 fn push_free(&self, idx: usize) {
239 let mut spins = 0u32;
240 loop {
241 // Load the packed head (Acquire)
242 let head = self.free_head.load(Ordering::Acquire);
243 let (tag, head_idx) = unpack_head(head);
244
245 // Publish the current head as the successor of the index being pushed.
246 // Release ordering ensures successor becomes visible before CAS.
247 self.next_free[idx].store(head_idx, Ordering::Release);
248
249 // New head has incremented tag and index==idx.
250 let new_tag = tag.wrapping_add(1);
251 let new_head = pack_head(new_tag, idx as u32);
252
253 if self
254 .free_head
255 .compare_exchange(head, new_head, Ordering::AcqRel, Ordering::Relaxed)
256 .is_ok()
257 {
258 // Successful push: update diagnostics and return.
259 self.available_count.fetch_add(1, Ordering::AcqRel);
260 return;
261 }
262
263 // CAS failed; retry with occasional yield.
264 spins = spins.wrapping_add(1);
265 if spins & 0xFF == 0 {
266 std::thread::yield_now();
267 }
268 }
269 }
270}
271
272/// Public concurrent manager handle.
273///
274/// This is cheap to clone (clones the `Arc`), so multiple threads can share
275/// a `ConcurrentMemoryManager` instance and call the `*_shared(&self)`
276/// methods concurrently.
277pub struct ConcurrentMemoryManager<P: Payload> {
278 shared: Arc<ConcurrentMemoryManagerShared<P>>,
279}
280
281impl<P: Payload> Clone for ConcurrentMemoryManager<P> {
282 fn clone(&self) -> Self {
283 Self {
284 shared: Arc::clone(&self.shared),
285 }
286 }
287}
288
289impl<P: Payload> ConcurrentMemoryManager<P> {
290 // ---------------------------------------------------------------------
291 // Construction
292 // ---------------------------------------------------------------------
293
294 /// Create a new manager with the given `capacity`. Capacity must be finite
295 /// and typically small-to-moderate (4..1024) for best performance.
296 ///
297 /// The manager defaults to `MemoryClass::Host`.
298 pub fn new(capacity: usize) -> Self {
299 Self::with_memory_class(capacity, MemoryClass::Host)
300 }
301
302 /// Create a new manager with explicit `memory_class` metadata.
303 ///
304 /// `memory_class` is useful for telemetry and for future routing decisions
305 /// when different storage classes exist.
306 pub fn with_memory_class(capacity: usize, mem_class: MemoryClass) -> Self {
307 let shared = ConcurrentMemoryManagerShared::new(mem_class, capacity);
308 Self {
309 shared: Arc::new(shared),
310 }
311 }
312
313 // ---------------------------------------------------------------------
314 // Ergonomic shared (&self) API for concurrent use
315 // ---------------------------------------------------------------------
316 //
317 // These methods all take `&self` so multiple threads can call them on the
318 // same manager handle. The trait `MemoryManager` requires `&mut self`,
319 // so we implement the trait by delegating to these methods below.
320
321 /// Store a `Message<P>` and return its `MessageToken`.
322 ///
323 /// This method:
324 /// - Pops a free index from the freelist (lock-free).
325 /// - Acquires the slot write lock and sets `message = Some(value)`.
326 ///
327 /// Returns `MemoryError::NoFreeSlots` if the freelist is empty.
328 ///
329 /// Concurrency: multiple threads may concurrently allocate and store into
330 /// different slots. Only the chosen slot is write-locked during the store.
331 pub fn store_shared(&self, value: Message<P>) -> Result<MessageToken, MemoryError> {
332 // Obtain a free slot index from the lock-free freelist.
333 let idx = match self.shared.pop_free() {
334 None => return Err(MemoryError::NoFreeSlots),
335 Some(i) => i,
336 };
337
338 // Acquire the slot's write lock (this is fine: this lock only affects
339 // the single slot) and store the message.
340 let slot = &self.shared.slots[idx];
341 let mut guard = slot.state.write().map_err(|_| MemoryError::Poisoned)?;
342 guard.message = Some(value);
343 drop(guard);
344
345 Ok(MessageToken::new(idx as u32))
346 }
347
348 /// Borrow a stored message immutably, returning a guard that keeps the
349 /// slot read-locked for the lifetime of the guard.
350 ///
351 /// Returns:
352 /// - `BadToken` if the token index is out of range,
353 /// - `NotAllocated` if the slot is currently empty,
354 /// - `Poisoned` if the slot lock is poisoned.
355 ///
356 /// Concurrency: read locks allow many concurrent readers for the same slot.
357 pub fn read_shared(
358 &self,
359 token: MessageToken,
360 ) -> Result<ConcurrentReadGuard<'_, P>, MemoryError> {
361 let idx = token.index();
362 if idx >= self.shared.slots.len() {
363 return Err(MemoryError::BadToken);
364 }
365 let slot = &self.shared.slots[idx];
366
367 // Acquire read lock and validate allocation.
368 let guard = slot.state.read().map_err(|_| MemoryError::Poisoned)?;
369 if guard.message.is_none() {
370 return Err(MemoryError::NotAllocated);
371 }
372
373 Ok(ConcurrentReadGuard { guard })
374 }
375
376 /// Borrow a stored message mutably, returning a guard that keeps the slot
377 /// write-locked for the lifetime of the guard.
378 ///
379 /// This enforces exclusive access to the stored message while the guard is live.
380 pub fn read_mut_shared(
381 &self,
382 token: MessageToken,
383 ) -> Result<ConcurrentWriteGuard<'_, P>, MemoryError> {
384 let idx = token.index();
385 if idx >= self.shared.slots.len() {
386 return Err(MemoryError::BadToken);
387 }
388 let slot = &self.shared.slots[idx];
389
390 // Acquire write lock which excludes readers/writers on the slot.
391 let guard = slot.state.write().map_err(|_| MemoryError::Poisoned)?;
392 if guard.message.is_none() {
393 return Err(MemoryError::NotAllocated);
394 }
395 Ok(ConcurrentWriteGuard { guard })
396 }
397
398 /// Free a previously allocated token/slot.
399 ///
400 /// Behavior:
401 /// - Acquire the slot write lock (this will wait for readers/writers).
402 /// - Ensure the slot is allocated; set `message = None`.
403 /// - Push the index back onto the lock-free freelist.
404 ///
405 /// Returns `NotAllocated` if slot already empty or `BadToken` if token invalid.
406 pub fn free_shared(&self, token: MessageToken) -> Result<(), MemoryError> {
407 let idx = token.index();
408 if idx >= self.shared.slots.len() {
409 return Err(MemoryError::BadToken);
410 }
411 let slot = &self.shared.slots[idx];
412
413 // Acquire write lock to ensure exclusivity for clearing the slot.
414 let mut guard = slot.state.write().map_err(|_| MemoryError::Poisoned)?;
415 if guard.message.is_none() {
416 return Err(MemoryError::NotAllocated);
417 }
418 guard.message = None;
419 drop(guard);
420
421 // Return index to freelist lock-free.
422 self.shared.push_free(idx);
423 Ok(())
424 }
425
426 /// Return the approximate number of free slots. This is updated atomically
427 /// on each successful freelist push/pop. The value is diagnostic and may be
428 /// slightly stale if concurrent operations are in-flight.
429 pub fn available(&self) -> usize {
430 self.shared.available_count.load(Ordering::Relaxed)
431 }
432
433 /// Return the configured capacity (number of slots).
434 pub fn capacity(&self) -> usize {
435 self.shared.slots.len()
436 }
437
438 /// Return the memory class attached to this manager.
439 pub fn memory_class(&self) -> MemoryClass {
440 self.shared.mem_class
441 }
442}
443
444// ---------------------------------------------------------------------
445// Guard wrappers that hold the slot locks while the guard is live.
446// ---------------------------------------------------------------------
447
448/// Header guard — holds a read lock on the slot and derefs to `MessageHeader`.
449pub struct ConcurrentHeaderGuard<'a, P: Payload> {
450 guard: RwLockReadGuard<'a, ConcurrentSlotState<P>>,
451}
452
453impl<'a, P: Payload> Deref for ConcurrentHeaderGuard<'a, P> {
454 type Target = MessageHeader;
455 fn deref(&self) -> &Self::Target {
456 self.guard
457 .message
458 .as_ref()
459 .expect("header guard constructed only when Some")
460 .header()
461 }
462}
463
464/// Read guard — holds a read lock and dereferences to `Message<P>`.
465pub struct ConcurrentReadGuard<'a, P: Payload> {
466 guard: RwLockReadGuard<'a, ConcurrentSlotState<P>>,
467}
468
469impl<'a, P: Payload> Deref for ConcurrentReadGuard<'a, P> {
470 type Target = Message<P>;
471 fn deref(&self) -> &Self::Target {
472 self.guard
473 .message
474 .as_ref()
475 .expect("read guard constructed only when Some")
476 }
477}
478
479/// Write guard — holds a write lock and dereferences (mutably) to `Message<P>`.
480pub struct ConcurrentWriteGuard<'a, P: Payload> {
481 guard: RwLockWriteGuard<'a, ConcurrentSlotState<P>>,
482}
483
484impl<'a, P: Payload> Deref for ConcurrentWriteGuard<'a, P> {
485 type Target = Message<P>;
486 fn deref(&self) -> &Self::Target {
487 self.guard
488 .message
489 .as_ref()
490 .expect("write guard constructed only when Some")
491 }
492}
493
494impl<'a, P: Payload> DerefMut for ConcurrentWriteGuard<'a, P> {
495 fn deref_mut(&mut self) -> &mut Self::Target {
496 self.guard
497 .message
498 .as_mut()
499 .expect("write guard constructed only when Some")
500 }
501}
502
503// ---------------------------------------------------------------------
504// Trait implementations
505// ---------------------------------------------------------------------
506
507impl<P: Payload> HeaderStore for ConcurrentMemoryManager<P> {
508 type HeaderGuard<'a>
509 = ConcurrentHeaderGuard<'a, P>
510 where
511 Self: 'a;
512
513 /// Peek the header of the stored message identified by `token`.
514 ///
515 /// This acquires a slot read lock and returns a guard that keeps the lock
516 /// for the lifetime of the returned guard.
517 fn peek_header(&self, token: MessageToken) -> Result<Self::HeaderGuard<'_>, MemoryError> {
518 let idx = token.index();
519 if idx >= self.shared.slots.len() {
520 return Err(MemoryError::BadToken);
521 }
522 let slot = &self.shared.slots[idx];
523
524 // Acquire read lock and validate that slot is allocated.
525 let guard = slot.state.read().map_err(|_| MemoryError::Poisoned)?;
526 if guard.message.is_none() {
527 return Err(MemoryError::NotAllocated);
528 }
529
530 Ok(ConcurrentHeaderGuard { guard })
531 }
532}
533
534impl<P: Payload> MemoryManager<P> for ConcurrentMemoryManager<P> {
535 type ReadGuard<'a>
536 = ConcurrentReadGuard<'a, P>
537 where
538 Self: 'a;
539
540 type WriteGuard<'a>
541 = ConcurrentWriteGuard<'a, P>
542 where
543 Self: 'a;
544
545 // The trait requires &mut self for store/read_mut/free. For the concurrent
546 // manager we delegate those trait methods to the `&self` shared methods so
547 // callers can use either the trait (with a mutable handle) or the shared
548 // API for concurrency.
549 fn store(&mut self, value: Message<P>) -> Result<MessageToken, MemoryError> {
550 // Delegate to the &self method: safe since we use internal Arc-shared state.
551 self.store_shared(value)
552 }
553
554 fn read(&self, token: MessageToken) -> Result<Self::ReadGuard<'_>, MemoryError> {
555 self.read_shared(token)
556 }
557
558 fn read_mut(&mut self, token: MessageToken) -> Result<Self::WriteGuard<'_>, MemoryError> {
559 self.read_mut_shared(token)
560 }
561
562 fn free(&mut self, token: MessageToken) -> Result<(), MemoryError> {
563 self.free_shared(token)
564 }
565
566 fn available(&self) -> usize {
567 self.available()
568 }
569
570 fn capacity(&self) -> usize {
571 self.capacity()
572 }
573
574 fn memory_class(&self) -> MemoryClass {
575 self.memory_class()
576 }
577}
578
579impl<P: Payload + Send + Sync> ScopedManager<P> for ConcurrentMemoryManager<P> {
580 type Handle<'a>
581 = ConcurrentMemoryManager<P>
582 where
583 Self: 'a;
584
585 fn scoped_handle<'a>(&'a self) -> Self::Handle<'a>
586 where
587 Self: 'a,
588 {
589 self.clone()
590 }
591}
592
593// ---------------------------------------------------------------------
594// Tests (std only)
595// ---------------------------------------------------------------------
596
597#[cfg(test)]
598mod tests {
599 use super::*;
600 use crate::message::MessageHeader;
601 use crate::prelude::{create_test_tensor_filled_with, TestTensor, TEST_TENSOR_BYTE_COUNT};
602 use std::sync::Arc;
603 use std::thread;
604 use std::time::Duration;
605
606 // Helper: build a simple Message<TestTensor>.
607 fn make_msg(val: u32) -> Message<TestTensor> {
608 Message::new(MessageHeader::empty(), create_test_tensor_filled_with(val))
609 }
610
611 // --- Concurrency-oriented tests ------------------------------------------------
612
613 #[test]
614 fn basic_store_read_free() {
615 let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
616
617 let t = mgr.store_shared(make_msg(10)).unwrap();
618 assert_eq!(mgr.available(), 3);
619
620 {
621 let g = mgr.read_shared(t).unwrap();
622 assert_eq!(*g.payload(), create_test_tensor_filled_with(10));
623 }
624
625 mgr.free_shared(t).unwrap();
626 assert_eq!(mgr.available(), 4);
627 }
628
629 #[test]
630 fn concurrent_reads_same_slot() {
631 let mgr = Arc::new(ConcurrentMemoryManager::<TestTensor>::new(4));
632 let t = mgr.store_shared(make_msg(5)).unwrap();
633
634 let m1 = mgr.clone();
635 let th1 = thread::spawn(move || {
636 let g = m1.read_shared(t).unwrap();
637 assert_eq!(*g.payload(), create_test_tensor_filled_with(5));
638 });
639
640 let m2 = mgr.clone();
641 let th2 = thread::spawn(move || {
642 let g = m2.read_shared(t).unwrap();
643 assert_eq!(*g.payload(), create_test_tensor_filled_with(5));
644 });
645
646 th1.join().unwrap();
647 th2.join().unwrap();
648
649 mgr.free_shared(t).unwrap();
650 }
651
652 #[test]
653 fn write_excludes_read() {
654 use std::sync::Barrier;
655
656 let mgr = Arc::new(ConcurrentMemoryManager::<TestTensor>::new(4));
657 let t = mgr.store_shared(make_msg(7)).unwrap();
658
659 // Barrier: writer signals after acquiring the lock, reader waits before reading
660 let barrier = Arc::new(Barrier::new(2));
661
662 let mwriter = mgr.clone();
663 let bwriter = barrier.clone();
664 let writer = thread::spawn(move || {
665 let mut w = mwriter.read_mut_shared(t).unwrap();
666 *w.payload_mut() = create_test_tensor_filled_with(42);
667 // Signal: lock is held and value is written
668 bwriter.wait();
669 // Hold lock until reader has had a chance to block on it
670 std::thread::sleep(Duration::from_millis(50));
671 });
672
673 // Wait until writer confirms it holds the lock with value written
674 barrier.wait();
675
676 // Now read_shared must block until writer releases; when it returns, value is 42
677 let g = mgr.read_shared(t).unwrap();
678 assert_eq!(*g.payload(), create_test_tensor_filled_with(42));
679
680 writer.join().unwrap();
681 }
682
683 #[test]
684 fn allocate_exhaustion_and_reuse() {
685 let mgr = ConcurrentMemoryManager::<TestTensor>::new(2);
686 let t0 = mgr.store_shared(make_msg(1)).unwrap();
687 let t1 = mgr.store_shared(make_msg(2)).unwrap();
688 assert_eq!(mgr.available(), 0);
689 assert!(matches!(
690 mgr.store_shared(make_msg(3)),
691 Err(MemoryError::NoFreeSlots)
692 ));
693
694 mgr.free_shared(t0).unwrap();
695 assert_eq!(mgr.available(), 1);
696
697 // t0 slot reused
698 let t2 = mgr.store_shared(make_msg(4)).unwrap();
699 assert_eq!(t2.index(), t0.index());
700 mgr.free_shared(t1).unwrap();
701 mgr.free_shared(t2).unwrap();
702 }
703
704 // --- Parity tests mirroring StaticMemoryManager contract -----------------------
705
706 #[test]
707 fn store_read_free_cycle() {
708 let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
709 assert_eq!(mgr.available(), 4);
710 assert_eq!(mgr.capacity(), 4);
711
712 let token = mgr.store_shared(make_msg(42)).unwrap();
713 assert_eq!(mgr.available(), 3);
714
715 {
716 let msg = mgr.read_shared(token).unwrap();
717 assert_eq!(*msg.payload(), create_test_tensor_filled_with(42));
718 }
719
720 mgr.free_shared(token).unwrap();
721 assert_eq!(mgr.available(), 4);
722 }
723
724 #[test]
725 fn read_mut_works() {
726 let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
727 let token = mgr.store_shared(make_msg(10)).unwrap();
728
729 {
730 // mutable borrow, must be declared mut binding
731 let mut msg = mgr.read_mut_shared(token).unwrap();
732 *msg.payload_mut() = create_test_tensor_filled_with(99);
733 // mutable guard dropped here
734 }
735
736 {
737 // now we can take an immutable borrow safely
738 let msg = mgr.read_shared(token).unwrap();
739 assert_eq!(*msg.payload(), create_test_tensor_filled_with(99));
740 }
741
742 // free now that no borrows exist
743 mgr.free_shared(token).unwrap();
744 }
745
746 #[test]
747 fn peek_header_works() {
748 let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
749 let token = mgr.store_shared(make_msg(7)).unwrap();
750
751 {
752 let header = mgr.peek_header(token).unwrap();
753 assert_eq!(*header.payload_size_bytes(), TEST_TENSOR_BYTE_COUNT);
754 // header dropped at end of scope
755 }
756
757 mgr.free_shared(token).unwrap();
758 }
759
760 #[test]
761 fn capacity_exhaustion() {
762 let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(2);
763 let _t0 = mgr.store_shared(make_msg(1)).unwrap();
764 let _t1 = mgr.store_shared(make_msg(2)).unwrap();
765 assert_eq!(mgr.available(), 0);
766
767 let err = mgr.store_shared(make_msg(3));
768 assert_eq!(err, Err(MemoryError::NoFreeSlots));
769 }
770
771 #[test]
772 fn double_free_detected() {
773 let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
774 let token = mgr.store_shared(make_msg(1)).unwrap();
775 mgr.free_shared(token).unwrap();
776
777 let err = mgr.free_shared(token);
778 assert_eq!(err, Err(MemoryError::NotAllocated));
779 }
780
781 #[test]
782 fn bad_token_detected() {
783 let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
784 let bad = MessageToken::new(99);
785
786 assert!(matches!(mgr.read_shared(bad), Err(MemoryError::BadToken)));
787 assert!(matches!(mgr.peek_header(bad), Err(MemoryError::BadToken)));
788 }
789
790 #[test]
791 fn read_freed_slot_is_bad_token() {
792 let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
793 let token = mgr.store_shared(make_msg(1)).unwrap();
794 mgr.free_shared(token).unwrap();
795
796 assert!(matches!(
797 mgr.read_shared(token),
798 Err(MemoryError::NotAllocated)
799 ));
800 assert!(matches!(
801 mgr.peek_header(token),
802 Err(MemoryError::NotAllocated)
803 ));
804 }
805
806 #[test]
807 fn slot_reuse_after_free() {
808 let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(1);
809 let t0 = mgr.store_shared(make_msg(10)).unwrap();
810 mgr.free_shared(t0).unwrap();
811
812 // Slot 0 should be reused.
813 let t1 = mgr.store_shared(make_msg(20)).unwrap();
814 assert_eq!(t1.index(), 0);
815 assert_eq!(
816 *mgr.read_shared(t1).unwrap().payload(),
817 create_test_tensor_filled_with(20)
818 );
819 }
820
821 #[test]
822 fn memory_class_configurable() {
823 let mgr: ConcurrentMemoryManager<TestTensor> =
824 ConcurrentMemoryManager::with_memory_class(4, MemoryClass::Device(0));
825 assert_eq!(mgr.memory_class(), MemoryClass::Device(0));
826 }
827
828 #[test]
829 fn default_memory_class_is_host() {
830 let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
831 assert_eq!(mgr.memory_class(), MemoryClass::Host);
832 }
833}