Skip to main content

limen_core/memory/
concurrent_manager.rs

1//! Concurrent heap-backed memory manager with lock-free freelist.
2//!
3//! This module implements a production-ready concurrent memory manager that
4//! satisfies the C1a requirements:
5//!
6//! - Per-slot locking: reads/writes only lock the targeted slot using an
7//!   `RwLock`, giving fine-grained isolation and good concurrency for reads.
8//! - No whole-manager lock: allocation and free are lock-free using a Treiber
9//!   stack free-list implemented with atomics + a 32-bit tag to avoid ABA.
10//! - Heap-backed capacity: the manager takes a `capacity` at construction and
11//!   stores slots/next pointers in `Vec` so the size is dynamic when `alloc`
12//!   is available.
13//! - Ergonomic concurrent API: `store_shared`, `read_shared`, etc. take `&self`
14//!   and are safe for concurrent use; trait methods required by
15//!   `MemoryManager` delegate to these shared methods.
16//!
17//! Rationale
18//! ---------
19//! - A Treiber stack with a tagged head (index + u32 tag packed into `u64`)
20//!   gives a simple, fast lock-free freelist and mitigates ABA by advancing
21//!   the tag on each successful CAS. The tag wraps naturally and safely.
22//! - Per-slot `RwLock` ensures that reads are concurrent and writes are
23//!   exclusive on a per-slot basis. Combined with the lock-free freelist this
24//!   means no global lock is held during steady-state reads and writes.
25//! - `available_count` is an `AtomicUsize` for cheap diagnostics without
26//!   locking and is kept consistent with successful freelist operations.
27//!
28//! Safety notes
29//! ------------
30//! - `free_shared` uses a write lock on the slot, which waits for existing
31//!   readers to finish. This ensures correctness: freeing a slot cannot race
32//!   with readers that think the message exists because readers hold the read
33//!   lock while accessing `message`.
34//! - `pop_free` / `push_free` are lock-free loops using atomic CAS on the
35//!   tagged head; they may spin under heavy contention. Backoff/yielding is
36//!   used to avoid burning a single core indefinitely.
37//!
38//! Performance notes
39//! -----------------
40//! - Reads are highly concurrent and cheap (single `RwLock::read` + access).
41//! - Writes are exclusive per-slot and only block readers of the same slot.
42//! - Allocation/free operations are lock-free and fast; the only blocking
43//!   synchronization remaining is slot `RwLock` for the actual store/free.
44//!
45//! Tests in this module exercise correctness under concurrent usage and
46//! allocation reuse.
47
48use core::ops::{Deref, DerefMut};
49use std::sync::{
50    atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering},
51    Arc, RwLock, RwLockReadGuard, RwLockWriteGuard,
52};
53
54use crate::errors::MemoryError;
55use crate::memory::header_store::HeaderStore;
56use crate::memory::manager::MemoryManager;
57use crate::memory::MemoryClass;
58use crate::message::payload::Payload;
59use crate::message::{Message, MessageHeader};
60use crate::prelude::ScopedManager;
61use crate::types::MessageToken;
62
63/// Special value meaning "no index" for the freelist next pointer and head.
64const EMPTY_INDEX: u32 = u32::MAX;
65
66/// Pack helper for the free-list head: upper 32 bits contain a tag (u32),
67/// lower 32 bits contain the head index (u32). The tagged head prevents ABA
68/// by changing the tag on each successful CAS update.
69#[inline]
70fn pack_head(tag: u32, idx: u32) -> u64 {
71    ((tag as u64) << 32) | (idx as u64)
72}
73/// Unpack helper returning (tag, index).
74#[inline]
75fn unpack_head(v: u64) -> (u32, u32) {
76    ((v >> 32) as u32, (v & 0xffff_ffff) as u32)
77}
78
79/// The mutable per-slot payload state protected by an `RwLock`.
80///
81/// The `message` field is `Option<Message<P>>`. While `Some`, the slot is
82/// considered allocated and its content must be accessible to readers and
83/// writers only while appropriate locks are held.
84struct ConcurrentSlotState<P: Payload> {
85    message: Option<Message<P>>,
86}
87
88impl<P: Payload> ConcurrentSlotState<P> {
89    /// Construct an empty slot state (unallocated).
90    fn new() -> Self {
91        Self { message: None }
92    }
93}
94
95/// A slot combines the `RwLock` with its `ConcurrentSlotState`.
96///
97/// We keep a `ConcurrentSlot` abstraction so the shared vector type is clear
98/// and we can initialize slots uniformly.
99struct ConcurrentSlot<P: Payload> {
100    state: RwLock<ConcurrentSlotState<P>>,
101}
102
103impl<P: Payload> ConcurrentSlot<P> {
104    /// Construct a new slot with an empty state.
105    fn new() -> Self {
106        Self {
107            state: RwLock::new(ConcurrentSlotState::new()),
108        }
109    }
110}
111
112/// The shared, heap-backed state of the concurrent manager.
113///
114/// This struct is reference-counted by `Arc` inside `ConcurrentMemoryManager` so
115/// multiple handles can be cloned cheaply and share the underlying storage.
116struct ConcurrentMemoryManagerShared<P: Payload> {
117    /// The per-slot RwLocks and states.
118    slots: Vec<ConcurrentSlot<P>>,
119
120    /// Per-slot "next" pointers used by the Treiber free-list. `next_free[i]`
121    /// stores the index of the successor of `i` in the freelist, or
122    /// `EMPTY_INDEX` for none.
123    next_free: Vec<AtomicU32>,
124
125    /// The freelist head is a packed (tag, index) in an AtomicU64. Lower 32
126    /// bits: index of head or EMPTY_INDEX; upper 32 bits: tag counter.
127    free_head: AtomicU64,
128
129    /// Number of free slots, updated on successful push/pop. Used for quick
130    /// diagnostics and admission control. Relaxed loads are acceptable.
131    available_count: AtomicUsize,
132
133    /// Memory class metadata for diagnostics / policy.
134    mem_class: MemoryClass,
135}
136
137impl<P: Payload> ConcurrentMemoryManagerShared<P> {
138    /// Create a new shared state object with `capacity` slots and initial
139    /// freelist linking 0 -> 1 -> 2 -> ... -> capacity-1 -> EMPTY.
140    ///
141    /// The `mem_class` parameter describes where the memory logically resides.
142    fn new(mem_class: MemoryClass, capacity: usize) -> Self {
143        assert!(capacity <= u32::MAX as usize);
144        // Initialize slots vector with RwLocks protecting empty states.
145        let mut slots = Vec::with_capacity(capacity);
146        for _ in 0..capacity {
147            slots.push(ConcurrentSlot::new());
148        }
149
150        // Initialize the per-slot next_free pointers to form a singly-linked
151        // list of free indices. This is the initial freelist state.
152        let mut next_free = Vec::with_capacity(capacity);
153        for i in 0..capacity {
154            let next = if i + 1 < capacity {
155                (i + 1) as u32
156            } else {
157                EMPTY_INDEX
158            };
159            next_free.push(AtomicU32::new(next));
160        }
161
162        // Head initially points to index 0 with tag 0 (unless capacity==0).
163        let head_index = if capacity > 0 { 0 } else { EMPTY_INDEX };
164        let free_head = AtomicU64::new(pack_head(0, head_index));
165        let available_count = AtomicUsize::new(capacity);
166
167        Self {
168            slots,
169            next_free,
170            free_head,
171            available_count,
172            mem_class,
173        }
174    }
175
176    // ---------------------------------------------------------------------
177    // Lock-free freelist operations (Treiber stack)
178    // ---------------------------------------------------------------------
179
180    /// Pop (allocate) an index from the freelist in a lock-free manner.
181    ///
182    /// Returns `Some(index)` on success, or `None` if the freelist is empty.
183    /// On success the `available_count` is decremented. The caller must then
184    /// acquire the slot's write lock before storing into the slot.
185    ///
186    /// Correctness notes:
187    /// - This is a classic Treiber stack pop with a packed (tag,index) head to
188    ///   avoid ABA. The tag is incremented on every successful CAS.
189    /// - We use `Ordering::Acquire` for loads and `AcqRel` for the successful
190    ///   CAS to ensure proper memory ordering with respect to `next_free`.
191    fn pop_free(&self) -> Option<usize> {
192        let mut spins = 0u32;
193        loop {
194            // Load the packed head (Acquire)
195            let head = self.free_head.load(Ordering::Acquire);
196            let (tag, idx) = unpack_head(head);
197
198            // Empty?
199            if idx == EMPTY_INDEX {
200                return None;
201            }
202
203            // Read the successor index of the head. Acquire to synchronize.
204            let next = self.next_free[idx as usize].load(Ordering::Acquire);
205
206            // Prepare a new head with incremented tag and successor index.
207            let new_tag = tag.wrapping_add(1);
208            let new_head = pack_head(new_tag, next);
209
210            // Attempt CAS to move head -> successor.
211            if self
212                .free_head
213                .compare_exchange(head, new_head, Ordering::AcqRel, Ordering::Relaxed)
214                .is_ok()
215            {
216                // Successful pop: update diagnostics and return index.
217                self.available_count.fetch_sub(1, Ordering::AcqRel);
218                return Some(idx as usize);
219            }
220
221            // CAS failed due to concurrent updates. Backoff occasionally.
222            spins = spins.wrapping_add(1);
223            if spins & 0xFF == 0 {
224                std::thread::yield_now();
225            }
226        }
227    }
228
229    /// Push (free) an index back onto the freelist in a lock-free manner.
230    ///
231    /// On success the `available_count` is incremented. The caller must ensure
232    /// the slot is already cleared (message removed) before pushing it back.
233    ///
234    /// Correctness notes:
235    /// - We store the current head into `next_free[idx]` (Release) and then try
236    ///   to CAS the head to `(tag+1, idx)`. A successful CAS inserts `idx`
237    ///   as the new head. We increment the available counter only on success.
238    fn push_free(&self, idx: usize) {
239        let mut spins = 0u32;
240        loop {
241            // Load the packed head (Acquire)
242            let head = self.free_head.load(Ordering::Acquire);
243            let (tag, head_idx) = unpack_head(head);
244
245            // Publish the current head as the successor of the index being pushed.
246            // Release ordering ensures successor becomes visible before CAS.
247            self.next_free[idx].store(head_idx, Ordering::Release);
248
249            // New head has incremented tag and index==idx.
250            let new_tag = tag.wrapping_add(1);
251            let new_head = pack_head(new_tag, idx as u32);
252
253            if self
254                .free_head
255                .compare_exchange(head, new_head, Ordering::AcqRel, Ordering::Relaxed)
256                .is_ok()
257            {
258                // Successful push: update diagnostics and return.
259                self.available_count.fetch_add(1, Ordering::AcqRel);
260                return;
261            }
262
263            // CAS failed; retry with occasional yield.
264            spins = spins.wrapping_add(1);
265            if spins & 0xFF == 0 {
266                std::thread::yield_now();
267            }
268        }
269    }
270}
271
272/// Public concurrent manager handle.
273///
274/// This is cheap to clone (clones the `Arc`), so multiple threads can share
275/// a `ConcurrentMemoryManager` instance and call the `*_shared(&self)`
276/// methods concurrently.
277pub struct ConcurrentMemoryManager<P: Payload> {
278    shared: Arc<ConcurrentMemoryManagerShared<P>>,
279}
280
281impl<P: Payload> Clone for ConcurrentMemoryManager<P> {
282    fn clone(&self) -> Self {
283        Self {
284            shared: Arc::clone(&self.shared),
285        }
286    }
287}
288
289impl<P: Payload> ConcurrentMemoryManager<P> {
290    // ---------------------------------------------------------------------
291    // Construction
292    // ---------------------------------------------------------------------
293
294    /// Create a new manager with the given `capacity`. Capacity must be finite
295    /// and typically small-to-moderate (4..1024) for best performance.
296    ///
297    /// The manager defaults to `MemoryClass::Host`.
298    pub fn new(capacity: usize) -> Self {
299        Self::with_memory_class(capacity, MemoryClass::Host)
300    }
301
302    /// Create a new manager with explicit `memory_class` metadata.
303    ///
304    /// `memory_class` is useful for telemetry and for future routing decisions
305    /// when different storage classes exist.
306    pub fn with_memory_class(capacity: usize, mem_class: MemoryClass) -> Self {
307        let shared = ConcurrentMemoryManagerShared::new(mem_class, capacity);
308        Self {
309            shared: Arc::new(shared),
310        }
311    }
312
313    // ---------------------------------------------------------------------
314    // Ergonomic shared (&self) API for concurrent use
315    // ---------------------------------------------------------------------
316    //
317    // These methods all take `&self` so multiple threads can call them on the
318    // same manager handle. The trait `MemoryManager` requires `&mut self`,
319    // so we implement the trait by delegating to these methods below.
320
321    /// Store a `Message<P>` and return its `MessageToken`.
322    ///
323    /// This method:
324    /// - Pops a free index from the freelist (lock-free).
325    /// - Acquires the slot write lock and sets `message = Some(value)`.
326    ///
327    /// Returns `MemoryError::NoFreeSlots` if the freelist is empty.
328    ///
329    /// Concurrency: multiple threads may concurrently allocate and store into
330    /// different slots. Only the chosen slot is write-locked during the store.
331    pub fn store_shared(&self, value: Message<P>) -> Result<MessageToken, MemoryError> {
332        // Obtain a free slot index from the lock-free freelist.
333        let idx = match self.shared.pop_free() {
334            None => return Err(MemoryError::NoFreeSlots),
335            Some(i) => i,
336        };
337
338        // Acquire the slot's write lock (this is fine: this lock only affects
339        // the single slot) and store the message.
340        let slot = &self.shared.slots[idx];
341        let mut guard = slot.state.write().map_err(|_| MemoryError::Poisoned)?;
342        guard.message = Some(value);
343        drop(guard);
344
345        Ok(MessageToken::new(idx as u32))
346    }
347
348    /// Borrow a stored message immutably, returning a guard that keeps the
349    /// slot read-locked for the lifetime of the guard.
350    ///
351    /// Returns:
352    /// - `BadToken` if the token index is out of range,
353    /// - `NotAllocated` if the slot is currently empty,
354    /// - `Poisoned` if the slot lock is poisoned.
355    ///
356    /// Concurrency: read locks allow many concurrent readers for the same slot.
357    pub fn read_shared(
358        &self,
359        token: MessageToken,
360    ) -> Result<ConcurrentReadGuard<'_, P>, MemoryError> {
361        let idx = token.index();
362        if idx >= self.shared.slots.len() {
363            return Err(MemoryError::BadToken);
364        }
365        let slot = &self.shared.slots[idx];
366
367        // Acquire read lock and validate allocation.
368        let guard = slot.state.read().map_err(|_| MemoryError::Poisoned)?;
369        if guard.message.is_none() {
370            return Err(MemoryError::NotAllocated);
371        }
372
373        Ok(ConcurrentReadGuard { guard })
374    }
375
376    /// Borrow a stored message mutably, returning a guard that keeps the slot
377    /// write-locked for the lifetime of the guard.
378    ///
379    /// This enforces exclusive access to the stored message while the guard is live.
380    pub fn read_mut_shared(
381        &self,
382        token: MessageToken,
383    ) -> Result<ConcurrentWriteGuard<'_, P>, MemoryError> {
384        let idx = token.index();
385        if idx >= self.shared.slots.len() {
386            return Err(MemoryError::BadToken);
387        }
388        let slot = &self.shared.slots[idx];
389
390        // Acquire write lock which excludes readers/writers on the slot.
391        let guard = slot.state.write().map_err(|_| MemoryError::Poisoned)?;
392        if guard.message.is_none() {
393            return Err(MemoryError::NotAllocated);
394        }
395        Ok(ConcurrentWriteGuard { guard })
396    }
397
398    /// Free a previously allocated token/slot.
399    ///
400    /// Behavior:
401    /// - Acquire the slot write lock (this will wait for readers/writers).
402    /// - Ensure the slot is allocated; set `message = None`.
403    /// - Push the index back onto the lock-free freelist.
404    ///
405    /// Returns `NotAllocated` if slot already empty or `BadToken` if token invalid.
406    pub fn free_shared(&self, token: MessageToken) -> Result<(), MemoryError> {
407        let idx = token.index();
408        if idx >= self.shared.slots.len() {
409            return Err(MemoryError::BadToken);
410        }
411        let slot = &self.shared.slots[idx];
412
413        // Acquire write lock to ensure exclusivity for clearing the slot.
414        let mut guard = slot.state.write().map_err(|_| MemoryError::Poisoned)?;
415        if guard.message.is_none() {
416            return Err(MemoryError::NotAllocated);
417        }
418        guard.message = None;
419        drop(guard);
420
421        // Return index to freelist lock-free.
422        self.shared.push_free(idx);
423        Ok(())
424    }
425
426    /// Return the approximate number of free slots. This is updated atomically
427    /// on each successful freelist push/pop. The value is diagnostic and may be
428    /// slightly stale if concurrent operations are in-flight.
429    pub fn available(&self) -> usize {
430        self.shared.available_count.load(Ordering::Relaxed)
431    }
432
433    /// Return the configured capacity (number of slots).
434    pub fn capacity(&self) -> usize {
435        self.shared.slots.len()
436    }
437
438    /// Return the memory class attached to this manager.
439    pub fn memory_class(&self) -> MemoryClass {
440        self.shared.mem_class
441    }
442}
443
444// ---------------------------------------------------------------------
445// Guard wrappers that hold the slot locks while the guard is live.
446// ---------------------------------------------------------------------
447
448/// Header guard — holds a read lock on the slot and derefs to `MessageHeader`.
449pub struct ConcurrentHeaderGuard<'a, P: Payload> {
450    guard: RwLockReadGuard<'a, ConcurrentSlotState<P>>,
451}
452
453impl<'a, P: Payload> Deref for ConcurrentHeaderGuard<'a, P> {
454    type Target = MessageHeader;
455    fn deref(&self) -> &Self::Target {
456        self.guard
457            .message
458            .as_ref()
459            .expect("header guard constructed only when Some")
460            .header()
461    }
462}
463
464/// Read guard — holds a read lock and dereferences to `Message<P>`.
465pub struct ConcurrentReadGuard<'a, P: Payload> {
466    guard: RwLockReadGuard<'a, ConcurrentSlotState<P>>,
467}
468
469impl<'a, P: Payload> Deref for ConcurrentReadGuard<'a, P> {
470    type Target = Message<P>;
471    fn deref(&self) -> &Self::Target {
472        self.guard
473            .message
474            .as_ref()
475            .expect("read guard constructed only when Some")
476    }
477}
478
479/// Write guard — holds a write lock and dereferences (mutably) to `Message<P>`.
480pub struct ConcurrentWriteGuard<'a, P: Payload> {
481    guard: RwLockWriteGuard<'a, ConcurrentSlotState<P>>,
482}
483
484impl<'a, P: Payload> Deref for ConcurrentWriteGuard<'a, P> {
485    type Target = Message<P>;
486    fn deref(&self) -> &Self::Target {
487        self.guard
488            .message
489            .as_ref()
490            .expect("write guard constructed only when Some")
491    }
492}
493
494impl<'a, P: Payload> DerefMut for ConcurrentWriteGuard<'a, P> {
495    fn deref_mut(&mut self) -> &mut Self::Target {
496        self.guard
497            .message
498            .as_mut()
499            .expect("write guard constructed only when Some")
500    }
501}
502
503// ---------------------------------------------------------------------
504// Trait implementations
505// ---------------------------------------------------------------------
506
507impl<P: Payload> HeaderStore for ConcurrentMemoryManager<P> {
508    type HeaderGuard<'a>
509        = ConcurrentHeaderGuard<'a, P>
510    where
511        Self: 'a;
512
513    /// Peek the header of the stored message identified by `token`.
514    ///
515    /// This acquires a slot read lock and returns a guard that keeps the lock
516    /// for the lifetime of the returned guard.
517    fn peek_header(&self, token: MessageToken) -> Result<Self::HeaderGuard<'_>, MemoryError> {
518        let idx = token.index();
519        if idx >= self.shared.slots.len() {
520            return Err(MemoryError::BadToken);
521        }
522        let slot = &self.shared.slots[idx];
523
524        // Acquire read lock and validate that slot is allocated.
525        let guard = slot.state.read().map_err(|_| MemoryError::Poisoned)?;
526        if guard.message.is_none() {
527            return Err(MemoryError::NotAllocated);
528        }
529
530        Ok(ConcurrentHeaderGuard { guard })
531    }
532}
533
534impl<P: Payload> MemoryManager<P> for ConcurrentMemoryManager<P> {
535    type ReadGuard<'a>
536        = ConcurrentReadGuard<'a, P>
537    where
538        Self: 'a;
539
540    type WriteGuard<'a>
541        = ConcurrentWriteGuard<'a, P>
542    where
543        Self: 'a;
544
545    // The trait requires &mut self for store/read_mut/free. For the concurrent
546    // manager we delegate those trait methods to the `&self` shared methods so
547    // callers can use either the trait (with a mutable handle) or the shared
548    // API for concurrency.
549    fn store(&mut self, value: Message<P>) -> Result<MessageToken, MemoryError> {
550        // Delegate to the &self method: safe since we use internal Arc-shared state.
551        self.store_shared(value)
552    }
553
554    fn read(&self, token: MessageToken) -> Result<Self::ReadGuard<'_>, MemoryError> {
555        self.read_shared(token)
556    }
557
558    fn read_mut(&mut self, token: MessageToken) -> Result<Self::WriteGuard<'_>, MemoryError> {
559        self.read_mut_shared(token)
560    }
561
562    fn free(&mut self, token: MessageToken) -> Result<(), MemoryError> {
563        self.free_shared(token)
564    }
565
566    fn available(&self) -> usize {
567        self.available()
568    }
569
570    fn capacity(&self) -> usize {
571        self.capacity()
572    }
573
574    fn memory_class(&self) -> MemoryClass {
575        self.memory_class()
576    }
577}
578
579impl<P: Payload + Send + Sync> ScopedManager<P> for ConcurrentMemoryManager<P> {
580    type Handle<'a>
581        = ConcurrentMemoryManager<P>
582    where
583        Self: 'a;
584
585    fn scoped_handle<'a>(&'a self) -> Self::Handle<'a>
586    where
587        Self: 'a,
588    {
589        self.clone()
590    }
591}
592
593// ---------------------------------------------------------------------
594// Tests (std only)
595// ---------------------------------------------------------------------
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600    use crate::message::MessageHeader;
601    use crate::prelude::{create_test_tensor_filled_with, TestTensor, TEST_TENSOR_BYTE_COUNT};
602    use std::sync::Arc;
603    use std::thread;
604    use std::time::Duration;
605
606    // Helper: build a simple Message<TestTensor>.
607    fn make_msg(val: u32) -> Message<TestTensor> {
608        Message::new(MessageHeader::empty(), create_test_tensor_filled_with(val))
609    }
610
611    // --- Concurrency-oriented tests ------------------------------------------------
612
613    #[test]
614    fn basic_store_read_free() {
615        let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
616
617        let t = mgr.store_shared(make_msg(10)).unwrap();
618        assert_eq!(mgr.available(), 3);
619
620        {
621            let g = mgr.read_shared(t).unwrap();
622            assert_eq!(*g.payload(), create_test_tensor_filled_with(10));
623        }
624
625        mgr.free_shared(t).unwrap();
626        assert_eq!(mgr.available(), 4);
627    }
628
629    #[test]
630    fn concurrent_reads_same_slot() {
631        let mgr = Arc::new(ConcurrentMemoryManager::<TestTensor>::new(4));
632        let t = mgr.store_shared(make_msg(5)).unwrap();
633
634        let m1 = mgr.clone();
635        let th1 = thread::spawn(move || {
636            let g = m1.read_shared(t).unwrap();
637            assert_eq!(*g.payload(), create_test_tensor_filled_with(5));
638        });
639
640        let m2 = mgr.clone();
641        let th2 = thread::spawn(move || {
642            let g = m2.read_shared(t).unwrap();
643            assert_eq!(*g.payload(), create_test_tensor_filled_with(5));
644        });
645
646        th1.join().unwrap();
647        th2.join().unwrap();
648
649        mgr.free_shared(t).unwrap();
650    }
651
652    #[test]
653    fn write_excludes_read() {
654        use std::sync::Barrier;
655
656        let mgr = Arc::new(ConcurrentMemoryManager::<TestTensor>::new(4));
657        let t = mgr.store_shared(make_msg(7)).unwrap();
658
659        // Barrier: writer signals after acquiring the lock, reader waits before reading
660        let barrier = Arc::new(Barrier::new(2));
661
662        let mwriter = mgr.clone();
663        let bwriter = barrier.clone();
664        let writer = thread::spawn(move || {
665            let mut w = mwriter.read_mut_shared(t).unwrap();
666            *w.payload_mut() = create_test_tensor_filled_with(42);
667            // Signal: lock is held and value is written
668            bwriter.wait();
669            // Hold lock until reader has had a chance to block on it
670            std::thread::sleep(Duration::from_millis(50));
671        });
672
673        // Wait until writer confirms it holds the lock with value written
674        barrier.wait();
675
676        // Now read_shared must block until writer releases; when it returns, value is 42
677        let g = mgr.read_shared(t).unwrap();
678        assert_eq!(*g.payload(), create_test_tensor_filled_with(42));
679
680        writer.join().unwrap();
681    }
682
683    #[test]
684    fn allocate_exhaustion_and_reuse() {
685        let mgr = ConcurrentMemoryManager::<TestTensor>::new(2);
686        let t0 = mgr.store_shared(make_msg(1)).unwrap();
687        let t1 = mgr.store_shared(make_msg(2)).unwrap();
688        assert_eq!(mgr.available(), 0);
689        assert!(matches!(
690            mgr.store_shared(make_msg(3)),
691            Err(MemoryError::NoFreeSlots)
692        ));
693
694        mgr.free_shared(t0).unwrap();
695        assert_eq!(mgr.available(), 1);
696
697        // t0 slot reused
698        let t2 = mgr.store_shared(make_msg(4)).unwrap();
699        assert_eq!(t2.index(), t0.index());
700        mgr.free_shared(t1).unwrap();
701        mgr.free_shared(t2).unwrap();
702    }
703
704    // --- Parity tests mirroring StaticMemoryManager contract -----------------------
705
706    #[test]
707    fn store_read_free_cycle() {
708        let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
709        assert_eq!(mgr.available(), 4);
710        assert_eq!(mgr.capacity(), 4);
711
712        let token = mgr.store_shared(make_msg(42)).unwrap();
713        assert_eq!(mgr.available(), 3);
714
715        {
716            let msg = mgr.read_shared(token).unwrap();
717            assert_eq!(*msg.payload(), create_test_tensor_filled_with(42));
718        }
719
720        mgr.free_shared(token).unwrap();
721        assert_eq!(mgr.available(), 4);
722    }
723
724    #[test]
725    fn read_mut_works() {
726        let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
727        let token = mgr.store_shared(make_msg(10)).unwrap();
728
729        {
730            // mutable borrow, must be declared mut binding
731            let mut msg = mgr.read_mut_shared(token).unwrap();
732            *msg.payload_mut() = create_test_tensor_filled_with(99);
733            // mutable guard dropped here
734        }
735
736        {
737            // now we can take an immutable borrow safely
738            let msg = mgr.read_shared(token).unwrap();
739            assert_eq!(*msg.payload(), create_test_tensor_filled_with(99));
740        }
741
742        // free now that no borrows exist
743        mgr.free_shared(token).unwrap();
744    }
745
746    #[test]
747    fn peek_header_works() {
748        let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
749        let token = mgr.store_shared(make_msg(7)).unwrap();
750
751        {
752            let header = mgr.peek_header(token).unwrap();
753            assert_eq!(*header.payload_size_bytes(), TEST_TENSOR_BYTE_COUNT);
754            // header dropped at end of scope
755        }
756
757        mgr.free_shared(token).unwrap();
758    }
759
760    #[test]
761    fn capacity_exhaustion() {
762        let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(2);
763        let _t0 = mgr.store_shared(make_msg(1)).unwrap();
764        let _t1 = mgr.store_shared(make_msg(2)).unwrap();
765        assert_eq!(mgr.available(), 0);
766
767        let err = mgr.store_shared(make_msg(3));
768        assert_eq!(err, Err(MemoryError::NoFreeSlots));
769    }
770
771    #[test]
772    fn double_free_detected() {
773        let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
774        let token = mgr.store_shared(make_msg(1)).unwrap();
775        mgr.free_shared(token).unwrap();
776
777        let err = mgr.free_shared(token);
778        assert_eq!(err, Err(MemoryError::NotAllocated));
779    }
780
781    #[test]
782    fn bad_token_detected() {
783        let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
784        let bad = MessageToken::new(99);
785
786        assert!(matches!(mgr.read_shared(bad), Err(MemoryError::BadToken)));
787        assert!(matches!(mgr.peek_header(bad), Err(MemoryError::BadToken)));
788    }
789
790    #[test]
791    fn read_freed_slot_is_bad_token() {
792        let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
793        let token = mgr.store_shared(make_msg(1)).unwrap();
794        mgr.free_shared(token).unwrap();
795
796        assert!(matches!(
797            mgr.read_shared(token),
798            Err(MemoryError::NotAllocated)
799        ));
800        assert!(matches!(
801            mgr.peek_header(token),
802            Err(MemoryError::NotAllocated)
803        ));
804    }
805
806    #[test]
807    fn slot_reuse_after_free() {
808        let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(1);
809        let t0 = mgr.store_shared(make_msg(10)).unwrap();
810        mgr.free_shared(t0).unwrap();
811
812        // Slot 0 should be reused.
813        let t1 = mgr.store_shared(make_msg(20)).unwrap();
814        assert_eq!(t1.index(), 0);
815        assert_eq!(
816            *mgr.read_shared(t1).unwrap().payload(),
817            create_test_tensor_filled_with(20)
818        );
819    }
820
821    #[test]
822    fn memory_class_configurable() {
823        let mgr: ConcurrentMemoryManager<TestTensor> =
824            ConcurrentMemoryManager::with_memory_class(4, MemoryClass::Device(0));
825        assert_eq!(mgr.memory_class(), MemoryClass::Device(0));
826    }
827
828    #[test]
829    fn default_memory_class_is_host() {
830        let mgr: ConcurrentMemoryManager<TestTensor> = ConcurrentMemoryManager::new(4);
831        assert_eq!(mgr.memory_class(), MemoryClass::Host);
832    }
833}