1use alloc::sync::Arc;
13use alloc::vec::Vec;
14
15#[cfg(feature = "std")]
16use std::sync::Mutex;
17
18use crate::slot::{ReaderMask, SLOT_HEADER_SIZE, SlotHeader};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
22pub struct SlotHandle {
23 pub segment_id: u64,
25 pub slot_index: u32,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq)]
31pub enum SlotError {
32 NoFreeSlot,
34 OutOfBounds,
36 SampleTooLarge {
38 sample: usize,
40 slot_capacity: usize,
42 },
43 LockPoisoned,
45}
46
47impl core::fmt::Display for SlotError {
48 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
49 match self {
50 Self::NoFreeSlot => f.write_str("no free slot in segment"),
51 Self::OutOfBounds => f.write_str("slot index out of bounds"),
52 Self::SampleTooLarge {
53 sample,
54 slot_capacity,
55 } => write!(
56 f,
57 "sample {sample} byte does not fit in slot capacity {slot_capacity}"
58 ),
59 Self::LockPoisoned => f.write_str("slot lock poisoned"),
60 }
61 }
62}
63
64#[cfg(feature = "std")]
65impl std::error::Error for SlotError {}
66
67#[cfg(feature = "std")]
72pub struct InMemorySlotAllocator {
73 slots: Arc<Mutex<Vec<Slot>>>,
74 segment_id: u64,
75 slot_capacity: usize,
76 next_sn: Arc<core::sync::atomic::AtomicU32>,
77 type_hash: Option<[u8; 16]>,
79}
80
81#[derive(Debug, Clone)]
82struct Slot {
83 header: SlotHeader,
84 data: Vec<u8>,
85 loaned: bool,
88}
89
90#[cfg(feature = "std")]
91impl crate::backend::SlotBackend for InMemorySlotAllocator {
92 fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
93 Self::reserve_slot(self, active_readers_mask)
94 }
95 fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
96 Self::commit_slot(self, handle, bytes)
97 }
98 fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
99 Self::discard_slot(self, handle)
100 }
101 fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
102 Self::read_slot(self, handle)
103 }
104 fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
105 Self::mark_read(self, handle, reader_index)
106 }
107 fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
108 Self::mark_reader_disconnected(self, reader_index)
109 }
110 fn slot_count(&self) -> Result<usize, SlotError> {
111 Self::slot_count(self)
112 }
113 fn slot_total_size(&self) -> usize {
114 Self::slot_total_size(self)
115 }
116 fn slot_capacity(&self) -> usize {
117 Self::slot_capacity(self)
118 }
119 fn type_hash(&self) -> Option<[u8; 16]> {
120 self.type_hash
121 }
122}
123
124#[cfg(feature = "std")]
125impl InMemorySlotAllocator {
126 #[must_use]
129 pub fn new(segment_id: u64, slot_count: usize, slot_capacity: usize) -> Self {
130 let mut slots = Vec::with_capacity(slot_count);
131 for _ in 0..slot_count {
132 slots.push(Slot {
133 header: SlotHeader::new(0, 0),
134 data: alloc::vec![0u8; slot_capacity],
135 loaned: false,
136 });
137 }
138 Self {
139 slots: Arc::new(Mutex::new(slots)),
140 segment_id,
141 slot_capacity,
142 next_sn: Arc::new(core::sync::atomic::AtomicU32::new(0)),
143 type_hash: None,
144 }
145 }
146
147 #[must_use]
151 pub fn with_type_hash(mut self, hash: [u8; 16]) -> Self {
152 self.type_hash = Some(hash);
153 self
154 }
155
156 pub fn reserve_slot(&self, active_readers_mask: ReaderMask) -> Result<SlotHandle, SlotError> {
162 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
163 for (idx, slot) in slots.iter_mut().enumerate() {
164 if slot.loaned {
165 continue;
166 }
167 if slot.header.sample_size == 0 || slot.header.all_read(active_readers_mask) {
170 slot.loaned = true;
171 return Ok(SlotHandle {
172 segment_id: self.segment_id,
173 slot_index: idx as u32,
174 });
175 }
176 }
177 Err(SlotError::NoFreeSlot)
178 }
179
180 pub fn commit_slot(&self, handle: SlotHandle, bytes: &[u8]) -> Result<u32, SlotError> {
186 if bytes.len() > self.slot_capacity {
187 return Err(SlotError::SampleTooLarge {
188 sample: bytes.len(),
189 slot_capacity: self.slot_capacity,
190 });
191 }
192 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
193 let idx = handle.slot_index as usize;
194 if idx >= slots.len() {
195 return Err(SlotError::OutOfBounds);
196 }
197 let sn = self
198 .next_sn
199 .fetch_add(1, core::sync::atomic::Ordering::Relaxed);
200 let slot = &mut slots[idx];
201 let sample_size = u32::try_from(bytes.len()).unwrap_or(u32::MAX);
202 slot.header = SlotHeader::new(sn, sample_size);
203 slot.data[..bytes.len()].copy_from_slice(bytes);
204 slot.loaned = false;
205 Ok(sn)
206 }
207
208 pub fn discard_slot(&self, handle: SlotHandle) -> Result<(), SlotError> {
213 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
214 let idx = handle.slot_index as usize;
215 if idx >= slots.len() {
216 return Err(SlotError::OutOfBounds);
217 }
218 slots[idx].loaned = false;
219 Ok(())
220 }
221
222 pub fn read_slot(&self, handle: SlotHandle) -> Result<(SlotHeader, Vec<u8>), SlotError> {
227 let slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
228 let idx = handle.slot_index as usize;
229 if idx >= slots.len() {
230 return Err(SlotError::OutOfBounds);
231 }
232 let slot = &slots[idx];
233 let n = slot.header.sample_size as usize;
234 Ok((slot.header, slot.data[..n.min(slot.data.len())].to_vec()))
235 }
236
237 pub fn mark_read(&self, handle: SlotHandle, reader_index: u8) -> Result<(), SlotError> {
242 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
243 let idx = handle.slot_index as usize;
244 if idx >= slots.len() {
245 return Err(SlotError::OutOfBounds);
246 }
247 slots[idx].header.mark_read(reader_index);
248 Ok(())
249 }
250
251 pub fn mark_reader_disconnected(&self, reader_index: u8) -> Result<(), SlotError> {
261 debug_assert!(reader_index < 32);
262 let mut slots = self.slots.lock().map_err(|_| SlotError::LockPoisoned)?;
263 for slot in slots.iter_mut() {
264 slot.header.reader_mask |= 1u32 << reader_index;
265 }
266 Ok(())
267 }
268
269 #[must_use]
271 pub fn slot_capacity(&self) -> usize {
272 self.slot_capacity
273 }
274
275 pub fn slot_count(&self) -> Result<usize, SlotError> {
277 Ok(self
278 .slots
279 .lock()
280 .map_err(|_| SlotError::LockPoisoned)?
281 .len())
282 }
283
284 #[must_use]
287 pub fn slot_total_size(&self) -> usize {
288 let raw = SLOT_HEADER_SIZE + self.slot_capacity;
289 (raw + 63) & !63
290 }
291}
292
293#[cfg(all(test, feature = "std"))]
294#[allow(clippy::expect_used, clippy::unwrap_used)]
295mod tests {
296 use super::*;
297
298 #[test]
299 fn reserve_returns_first_free_slot() {
300 let alloc = InMemorySlotAllocator::new(0, 4, 64);
301 let h0 = alloc.reserve_slot(0).expect("reserve 0");
302 assert_eq!(h0.slot_index, 0);
303 let h1 = alloc.reserve_slot(0).expect("reserve 1");
304 assert_eq!(h1.slot_index, 1);
305 }
306
307 #[test]
308 fn reserve_returns_no_free_slot_when_all_loaned() {
309 let alloc = InMemorySlotAllocator::new(0, 2, 64);
310 let _h0 = alloc.reserve_slot(0).unwrap();
311 let _h1 = alloc.reserve_slot(0).unwrap();
312 assert_eq!(alloc.reserve_slot(0), Err(SlotError::NoFreeSlot));
313 }
314
315 #[test]
316 fn commit_writes_bytes_and_increments_sn() {
317 let alloc = InMemorySlotAllocator::new(0, 2, 64);
318 let h = alloc.reserve_slot(0).unwrap();
319 let sn = alloc.commit_slot(h, &[1, 2, 3]).unwrap();
320 assert_eq!(sn, 0);
321 let (header, bytes) = alloc.read_slot(h).unwrap();
322 assert_eq!(header.sequence_number, 0);
323 assert_eq!(header.sample_size, 3);
324 assert_eq!(bytes, vec![1, 2, 3]);
325
326 let h2 = alloc.reserve_slot(0).unwrap();
327 let sn2 = alloc.commit_slot(h2, &[9]).unwrap();
328 assert_eq!(sn2, 1);
329 }
330
331 #[test]
332 fn commit_too_large_returns_error() {
333 let alloc = InMemorySlotAllocator::new(0, 2, 8);
334 let h = alloc.reserve_slot(0).unwrap();
335 let err = alloc.commit_slot(h, &[0u8; 16]).unwrap_err();
336 assert!(matches!(
337 err,
338 SlotError::SampleTooLarge {
339 sample: 16,
340 slot_capacity: 8
341 }
342 ));
343 }
344
345 #[test]
346 fn discard_frees_slot_for_reuse() {
347 let alloc = InMemorySlotAllocator::new(0, 1, 64);
348 let h = alloc.reserve_slot(0).unwrap();
349 alloc.discard_slot(h).unwrap();
350 let _ = alloc.reserve_slot(0).unwrap();
352 }
353
354 #[test]
355 fn slot_recyclable_after_all_readers_marked() {
356 let alloc = InMemorySlotAllocator::new(0, 1, 64);
357 let active = 0b011;
359 let h = alloc.reserve_slot(active).unwrap();
360 alloc.commit_slot(h, &[0xAA]).unwrap();
361
362 assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
364
365 alloc.mark_read(h, 0).unwrap();
366 assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
367
368 alloc.mark_read(h, 1).unwrap();
369 let _ = alloc.reserve_slot(active).unwrap();
371 }
372
373 #[test]
374 fn slot_total_size_is_cache_line_padded() {
375 let alloc = InMemorySlotAllocator::new(0, 4, 100);
376 assert_eq!(alloc.slot_total_size(), 128);
378 }
379
380 #[test]
381 fn reader_disconnect_frees_blocked_slots() {
382 let alloc = InMemorySlotAllocator::new(0, 1, 64);
385 let active = 0b011;
387 let h = alloc.reserve_slot(active).unwrap();
388 alloc.commit_slot(h, &[0xAA]).unwrap();
389
390 alloc.mark_read(h, 0).unwrap();
392 assert_eq!(alloc.reserve_slot(active), Err(SlotError::NoFreeSlot));
393
394 alloc.mark_reader_disconnected(1).unwrap();
396 let _ = alloc.reserve_slot(active).expect("free after disconnect");
397 }
398}