1use alloc::sync::Arc;
13use alloc::vec::Vec;
14
15#[cfg(feature = "std")]
16use std::sync::{Condvar, Mutex};
17
18use crate::slot::{ReaderMask, SLOT_HEADER_SIZE, SlotHeader};
19
20#[cfg(feature = "std")]
27#[derive(Debug, Default)]
28struct ChangeNotify {
29 generation: Mutex<u64>,
30 cvar: Condvar,
31}
32
33#[cfg(feature = "std")]
34impl ChangeNotify {
35 fn current(&self) -> u64 {
36 self.generation.lock().map(|g| *g).unwrap_or(0)
37 }
38
39 fn bump(&self) {
40 if let Ok(mut g) = self.generation.lock() {
41 *g = g.wrapping_add(1);
42 self.cvar.notify_all();
43 }
44 }
45
46 fn wait_change(&self, last: u64, timeout: core::time::Duration) {
48 if let Ok(g) = self.generation.lock() {
49 let _ = self.cvar.wait_timeout_while(g, timeout, |g| *g == last);
52 }
53 }
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
58pub struct SlotHandle {
59 pub segment_id: u64,
61 pub slot_index: u32,
63}
64
65#[derive(Debug, Clone, PartialEq, Eq)]
67pub enum SlotError {
68 NoFreeSlot,
70 OutOfBounds,
72 SampleTooLarge {
74 sample: usize,
76 slot_capacity: usize,
78 },
79 LockPoisoned,
81 InPlaceUnsupported,
84}
85
86impl core::fmt::Display for SlotError {
87 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
88 match self {
89 Self::NoFreeSlot => f.write_str("no free slot in segment"),
90 Self::OutOfBounds => f.write_str("slot index out of bounds"),
91 Self::SampleTooLarge {
92 sample,
93 slot_capacity,
94 } => write!(
95 f,
96 "sample {sample} byte does not fit in slot capacity {slot_capacity}"
97 ),
98 Self::LockPoisoned => f.write_str("slot lock poisoned"),
99 Self::InPlaceUnsupported => f.write_str("backend does not support in-place loan"),
100 }
101 }
102}
103
104#[cfg(feature = "std")]
105impl std::error::Error for SlotError {}
106
107#[cfg(feature = "std")]
112pub struct InMemorySlotAllocator {
113 slots: Arc<Mutex<Vec<Slot>>>,
114 segment_id: u64,
115 slot_capacity: usize,
116 next_sn: Arc<core::sync::atomic::AtomicU32>,
117 type_hash: Option<[u8; 16]>,
119 notify: Arc<ChangeNotify>,
123}
124
125#[derive(Debug, Clone)]
126struct Slot {
127 header: SlotHeader,
128 data: Vec<u8>,
129 loaned: bool,
132 committed_at: Option<std::time::Instant>,
136}
137
138#[cfg(feature = "std")]
139impl crate::backend::SlotBackend for InMemorySlotAllocator {
140 fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
141 Self::reserve_slot(self, active_readers_mask)
142 }
143 fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
144 Self::commit_slot(self, handle, bytes)
145 }
146 fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
147 Self::discard_slot(self, handle)
148 }
149 fn slot_data_ptr(&self, handle: SlotHandle) -> Result<(*mut u8, usize), SlotError> {
150 Self::slot_data_ptr(self, handle)
151 }
152 fn commit_in_place(&self, handle: SlotHandle, len: usize) -> Result<u32, SlotError> {
153 Self::commit_in_place(self, handle, len)
154 }
155 fn slot_read_ptr(&self, handle: SlotHandle) -> Result<(*const u8, usize), SlotError> {
156 Self::slot_read_ptr(self, handle)
157 }
158 fn next_unread_slot(&self, reader_index: u8) -> Result<Option<SlotHandle>, SlotError> {
159 Self::next_unread_slot(self, reader_index)
160 }
161 fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
162 Self::read_slot(self, handle)
163 }
164 fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
165 Self::mark_read(self, handle, reader_index)
166 }
167 fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
168 Self::mark_reader_disconnected(self, reader_index)
169 }
170 fn slot_count(&self) -> Result<usize, SlotError> {
171 Self::slot_count(self)
172 }
173 fn slot_total_size(&self) -> usize {
174 Self::slot_total_size(self)
175 }
176 fn slot_capacity(&self) -> usize {
177 Self::slot_capacity(self)
178 }
179 fn type_hash(&self) -> Option<[u8; 16]> {
180 self.type_hash
181 }
182 fn notify_generation(&self) -> u64 {
183 self.notify_gen()
184 }
185 fn wait_for_change(&self, last: u64, timeout: core::time::Duration) {
186 InMemorySlotAllocator::wait_for_change(self, last, timeout);
187 }
188}
189
190#[cfg(feature = "std")]
191impl InMemorySlotAllocator {
192 #[must_use]
195 pub fn new(segment_id: u64, slot_count: usize, slot_capacity: usize) -> Self {
196 let mut slots = Vec::with_capacity(slot_count);
197 for _ in 0..slot_count {
198 slots.push(Slot {
199 header: SlotHeader::new(0, 0),
200 data: alloc::vec![0u8; slot_capacity],
201 loaned: false,
202 committed_at: None,
203 });
204 }
205 Self {
206 slots: Arc::new(Mutex::new(slots)),
207 segment_id,
208 slot_capacity,
209 next_sn: Arc::new(core::sync::atomic::AtomicU32::new(0)),
210 type_hash: None,
211 notify: Arc::new(ChangeNotify::default()),
212 }
213 }
214
215 #[must_use]
218 pub fn notify_gen(&self) -> u64 {
219 self.notify.current()
220 }
221
222 pub fn wait_for_change(&self, last: u64, timeout: core::time::Duration) {
226 self.notify.wait_change(last, timeout);
227 }
228
229 #[must_use]
233 pub fn with_type_hash(mut self, hash: [u8; 16]) -> Self {
234 self.type_hash = Some(hash);
235 self
236 }
237
238 pub fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
244 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
245 for (idx, slot) in slots.iter_mut().enumerate() {
246 if slot.loaned {
247 continue;
248 }
249 if slot.header.sample_size == 0 || slot.header.all_read(active_readers_mask) {
252 slot.loaned = true;
253 return Ok(SlotHandle {
254 segment_id: self.segment_id,
255 slot_index: idx as u32,
256 });
257 }
258 }
259 Err(SlotError::NoFreeSlot)
260 }
261
262 pub fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
268 if bytes.len() > self.slot_capacity {
269 return Err(SlotError::SampleTooLarge {
270 sample: bytes.len(),
271 slot_capacity: self.slot_capacity,
272 });
273 }
274 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
275 let idx = handle.slot_index as usize;
276 if idx >= slots.len() {
277 return Err(SlotError::OutOfBounds);
278 }
279 let sn = self
280 .next_sn
281 .fetch_add(1, core::sync::atomic::Ordering::Relaxed);
282 let slot = &mut slots[idx];
283 let sample_size = u32::try_from(bytes.len()).unwrap_or(u32::MAX);
284 slot.header = SlotHeader::new(sn, sample_size);
285 slot.data[..bytes.len()].copy_from_slice(bytes);
286 slot.loaned = false;
287 slot.committed_at = Some(std::time::Instant::now());
288 drop(slots);
289 self.notify.bump(); Ok(sn)
291 }
292
293 pub fn slot_data_ptr(&self, handle: SlotHandle) -> Result<(*mut u8, usize), SlotError> {
299 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
300 let idx = handle.slot_index as usize;
301 let slot = slots.get_mut(idx).ok_or(SlotError::OutOfBounds)?;
302 if !slot.loaned {
303 return Err(SlotError::OutOfBounds);
304 }
305 Ok((slot.data.as_mut_ptr(), self.slot_capacity))
306 }
307
308 pub fn commit_in_place(&self, handle: SlotHandle, len: usize) -> Result<u32, SlotError> {
315 if len > self.slot_capacity {
316 return Err(SlotError::SampleTooLarge {
317 sample: len,
318 slot_capacity: self.slot_capacity,
319 });
320 }
321 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
322 let idx = handle.slot_index as usize;
323 if idx >= slots.len() {
324 return Err(SlotError::OutOfBounds);
325 }
326 let sn = self
327 .next_sn
328 .fetch_add(1, core::sync::atomic::Ordering::Relaxed);
329 let slot = &mut slots[idx];
330 let sample_size = u32::try_from(len).unwrap_or(u32::MAX);
331 slot.header = SlotHeader::new(sn, sample_size);
332 slot.loaned = false;
333 slot.committed_at = Some(std::time::Instant::now());
334 drop(slots);
335 self.notify.bump();
336 Ok(sn)
337 }
338
339 pub fn slot_read_ptr(&self, handle: SlotHandle) -> Result<(*const u8, usize), SlotError> {
345 let slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
346 let idx = handle.slot_index as usize;
347 let slot = slots.get(idx).ok_or(SlotError::OutOfBounds)?;
348 Ok((slot.data.as_ptr(), slot.header.sample_size as usize))
349 }
350
351 pub fn next_unread_slot(&self, reader_index: u8) -> Result<Option<SlotHandle>, SlotError> {
357 let slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
358 let bit = 1u32 << reader_index;
359 for (idx, slot) in slots.iter().enumerate() {
360 if slot.header.sample_size > 0 && (slot.header.reader_mask & bit) == 0 {
361 return Ok(Some(SlotHandle {
362 segment_id: self.segment_id,
363 slot_index: idx as u32,
364 }));
365 }
366 }
367 Ok(None)
368 }
369
370 pub fn evict_stale(
382 &self,
383 max_age: core::time::Duration,
384 active_readers_mask: ReaderMask,
385 ) -> Result<usize, SlotError> {
386 let now = std::time::Instant::now();
387 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
388 let mut evicted = 0;
389 for slot in slots.iter_mut() {
390 if slot.loaned || slot.header.sample_size == 0 {
391 continue;
392 }
393 if slot.header.all_read(active_readers_mask) {
394 continue; }
396 let stale = slot
397 .committed_at
398 .is_some_and(|t| now.saturating_duration_since(t) >= max_age);
399 if stale {
400 slot.header.reader_mask |= active_readers_mask;
402 evicted += 1;
403 }
404 }
405 drop(slots);
406 if evicted > 0 {
407 self.notify.bump(); }
409 Ok(evicted)
410 }
411
412 pub fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
417 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
418 let idx = handle.slot_index as usize;
419 if idx >= slots.len() {
420 return Err(SlotError::OutOfBounds);
421 }
422 slots[idx].loaned = false;
423 drop(slots);
424 self.notify.bump(); Ok(())
426 }
427
428 pub fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
433 let slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
434 let idx = handle.slot_index as usize;
435 if idx >= slots.len() {
436 return Err(SlotError::OutOfBounds);
437 }
438 let slot = &slots[idx];
439 let n = slot.header.sample_size as usize;
440 Ok((slot.header, slot.data[..n.min(slot.data.len())].to_vec()))
441 }
442
443 pub fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
448 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
449 let idx = handle.slot_index as usize;
450 if idx >= slots.len() {
451 return Err(SlotError::OutOfBounds);
452 }
453 slots[idx].header.mark_read(reader_index);
454 drop(slots);
455 self.notify.bump(); Ok(())
457 }
458
459 pub fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
469 debug_assert!(reader_index < 32);
470 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
471 for slot in slots.iter_mut() {
472 slot.header.reader_mask |= 1u32 << reader_index;
473 }
474 drop(slots);
475 self.notify.bump(); Ok(())
477 }
478
479 #[must_use]
481 pub fn slot_capacity(&self) -> usize {
482 self.slot_capacity
483 }
484
485 pub fn slot_count(&self) -> Result<usize, SlotError> {
487 Ok(self
488 .slots
489 .lock()
490 .map_err(|_| SlotError::LockPoisoned)?
491 .len())
492 }
493
494 #[must_use]
497 pub fn slot_total_size(&self) -> usize {
498 let raw = SLOT_HEADER_SIZE + self.slot_capacity;
499 (raw + 63) & !63
500 }
501}
502
503#[cfg(all(test, feature = "std"))]
504#[allow(clippy::expect_used, clippy::unwrap_used)]
505mod tests {
506 use super::*;
507
508 #[test]
509 fn reserve_returns_first_free_slot() {
510 let alloc = InMemorySlotAllocator::new(0, 4, 64);
511 let h0 = alloc.reserve_slot(0).expect("reserve 0");
512 assert_eq!(h0.slot_index, 0);
513 let h1 = alloc.reserve_slot(0).expect("reserve 1");
514 assert_eq!(h1.slot_index, 1);
515 }
516
517 #[test]
518 fn reserve_returns_no_free_slot_when_all_loaned() {
519 let alloc = InMemorySlotAllocator::new(0, 2, 64);
520 let _h0 = alloc.reserve_slot(0).unwrap();
521 let _h1 = alloc.reserve_slot(0).unwrap();
522 assert_eq!(alloc.reserve_slot(0), Err(SlotError::NoFreeSlot));
523 }
524
525 #[test]
526 fn commit_writes_bytes_and_increments_sn() {
527 let alloc = InMemorySlotAllocator::new(0, 2, 64);
528 let h = alloc.reserve_slot(0).unwrap();
529 let sn = alloc.commit_slot(h, &[1, 2, 3]).unwrap();
530 assert_eq!(sn, 0);
531 let (header, bytes) = alloc.read_slot(h).unwrap();
532 assert_eq!(header.sequence_number, 0);
533 assert_eq!(header.sample_size, 3);
534 assert_eq!(bytes, vec![1, 2, 3]);
535
536 let h2 = alloc.reserve_slot(0).unwrap();
537 let sn2 = alloc.commit_slot(h2, &[9]).unwrap();
538 assert_eq!(sn2, 1);
539 }
540
541 #[test]
542 fn in_place_loan_writes_without_staging_copy() {
543 let alloc = InMemorySlotAllocator::new(0, 2, 64);
546 let h = alloc.reserve_slot(0).unwrap();
547 let (ptr, cap) = alloc.slot_data_ptr(h).unwrap();
548 assert!(cap >= 3);
549 unsafe {
551 ptr.add(0).write(1);
552 ptr.add(1).write(2);
553 ptr.add(2).write(3);
554 }
555 let sn = alloc.commit_in_place(h, 3).unwrap();
556 assert_eq!(sn, 0);
557 let (header, bytes) = alloc.read_slot(h).unwrap();
558 assert_eq!(header.sequence_number, 0);
559 assert_eq!(header.sample_size, 3);
560 assert_eq!(bytes, vec![1, 2, 3]);
561
562 let h2 = alloc.reserve_slot(0).unwrap();
564 assert_eq!(alloc.commit_in_place(h2, 0).unwrap(), 1);
565 }
566
567 #[test]
568 fn slot_data_ptr_rejects_unreserved_slot() {
569 let alloc = InMemorySlotAllocator::new(0, 2, 64);
570 let h = SlotHandle {
572 segment_id: 0,
573 slot_index: 0,
574 };
575 assert_eq!(alloc.slot_data_ptr(h), Err(SlotError::OutOfBounds));
576 }
577
578 #[test]
579 fn commit_in_place_too_large_returns_error() {
580 let alloc = InMemorySlotAllocator::new(0, 2, 8);
581 let h = alloc.reserve_slot(0).unwrap();
582 assert!(matches!(
583 alloc.commit_in_place(h, 9),
584 Err(SlotError::SampleTooLarge { .. })
585 ));
586 }
587
588 #[test]
589 fn commit_too_large_returns_error() {
590 let alloc = InMemorySlotAllocator::new(0, 2, 8);
591 let h = alloc.reserve_slot(0).unwrap();
592 let err = alloc.commit_slot(h, &[0u8; 16]).unwrap_err();
593 assert!(matches!(
594 err,
595 SlotError::SampleTooLarge {
596 sample: 16,
597 slot_capacity: 8
598 }
599 ));
600 }
601
602 #[test]
603 fn discard_frees_slot_for_reuse() {
604 let alloc = InMemorySlotAllocator::new(0, 1, 64);
605 let h = alloc.reserve_slot(0).unwrap();
606 alloc.discard_slot(h).unwrap();
607 let _ = alloc.reserve_slot(0).unwrap();
609 }
610
611 #[test]
612 fn slot_recyclable_after_all_readers_marked() {
613 let alloc = InMemorySlotAllocator::new(0, 1, 64);
614 let active = 0b011;
616 let h = alloc.reserve_slot(active).unwrap();
617 alloc.commit_slot(h, &[0xAA]).unwrap();
618
619 assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
621
622 alloc.mark_read(h, 0).unwrap();
623 assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
624
625 alloc.mark_read(h, 1).unwrap();
626 let _ = alloc.reserve_slot(active).unwrap();
628 }
629
630 #[test]
631 fn slot_total_size_is_cache_line_padded() {
632 let alloc = InMemorySlotAllocator::new(0, 4, 100);
633 assert_eq!(alloc.slot_total_size(), 128);
635 }
636
637 #[test]
638 fn evict_stale_frees_slot_held_by_hung_reader() {
639 let alloc = InMemorySlotAllocator::new(0, 1, 64);
642 let active = 0b1; let h = alloc.reserve_slot(active).unwrap();
644 alloc.commit_slot(h, &[0xAA]).unwrap();
645
646 assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
648
649 assert_eq!(
651 alloc
652 .evict_stale(core::time::Duration::from_secs(3600), active)
653 .unwrap(),
654 0
655 );
656 assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
657
658 assert_eq!(
660 alloc
661 .evict_stale(core::time::Duration::ZERO, active)
662 .unwrap(),
663 1
664 );
665 let _ = alloc.reserve_slot(active).unwrap();
667 }
668
669 #[test]
670 fn evict_stale_leaves_read_slots_untouched() {
671 let alloc = InMemorySlotAllocator::new(0, 1, 64);
672 let active = 0b1;
673 let h = alloc.reserve_slot(active).unwrap();
674 alloc.commit_slot(h, &[0xAA]).unwrap();
675 alloc.mark_read(h, 0).unwrap(); assert_eq!(
678 alloc
679 .evict_stale(core::time::Duration::ZERO, active)
680 .unwrap(),
681 0
682 );
683 }
684
685 #[test]
686 fn reader_disconnect_frees_blocked_slots() {
687 let alloc = InMemorySlotAllocator::new(0, 1, 64);
690 let active = 0b011;
692 let h = alloc.reserve_slot(active).unwrap();
693 alloc.commit_slot(h, &[0xAA]).unwrap();
694
695 alloc.mark_read(h, 0).unwrap();
697 assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
698
699 alloc.mark_reader_disconnected(1).unwrap();
701 let _ = alloc.reserve_slot(active).expect("free after disconnect");
702 }
703}