use crossbeam_channel::{Receiver, Sender, bounded};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Copy, Clone)]
struct SlotPtr(*mut u8);
unsafe impl Send for SlotPtr {}
unsafe impl Sync for SlotPtr {}
pub struct Round {
pub slot_id: u32,
ptr: *const u8,
pub len: usize,
pub skip: bool,
pub file_index: u64,
pub fdata_offset: u64,
pub chunk_seq: u32,
}
unsafe impl Send for Round {}
impl Round {
pub unsafe fn as_slice<'a>(&self) -> &'a [u8] {
unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
}
}
#[derive(Clone)]
pub struct Ejector {
inner: Arc<EjectorInner>,
}
struct EjectorInner {
free_tx: Sender<u32>,
outstanding: Vec<AtomicUsize>,
}
impl Ejector {
pub fn release_one(&self, slot_id: u32) {
let prev = self.inner.outstanding[slot_id as usize].fetch_sub(1, Ordering::AcqRel);
debug_assert!(prev >= 1, "release_one underflow on slot {slot_id}");
if prev == 1 {
self.inner.free_tx.send(slot_id).ok();
}
}
}
pub struct Magazine {
slot_size: usize,
slice_size: usize,
base: Vec<SlotPtr>,
_mem: Vec<Box<[u8]>>, free_rx: Receiver<u32>,
ret: Ejector,
}
impl Magazine {
pub fn new(num_slots: usize, slot_size: usize, num_workers: usize) -> Self {
assert!(num_slots > 0 && slot_size > 0 && num_workers > 0);
let slice_size = (slot_size / num_workers).max(1);
let mut mem: Vec<Box<[u8]>> = (0..num_slots)
.map(|_| vec![0u8; slot_size].into_boxed_slice())
.collect();
let base: Vec<SlotPtr> = mem.iter_mut().map(|b| SlotPtr(b.as_mut_ptr())).collect();
let (free_tx, free_rx) = bounded(num_slots);
for id in 0..num_slots as u32 {
free_tx.send(id).expect("free channel send during init");
}
let outstanding = (0..num_slots).map(|_| AtomicUsize::new(0)).collect();
Magazine {
slot_size,
slice_size,
base,
_mem: mem,
free_rx,
ret: Ejector { inner: Arc::new(EjectorInner { free_tx, outstanding }) },
}
}
pub fn slot_size(&self) -> usize {
self.slot_size
}
pub fn slice_size(&self) -> usize {
self.slice_size
}
pub fn num_slots(&self) -> usize {
self.base.len()
}
pub fn returner(&self) -> Ejector {
self.ret.clone()
}
pub fn claim(&self) -> Option<Clip<'_>> {
let slot_id = self.free_rx.recv().ok()?;
Some(Clip { pool: self, slot_id, cursor: 0, slices: Vec::new() })
}
}
pub struct Clip<'a> {
pool: &'a Magazine,
slot_id: u32,
cursor: usize,
slices: Vec<Round>,
}
impl<'a> Clip<'a> {
pub fn slot_id(&self) -> u32 {
self.slot_id
}
pub fn remaining(&self) -> usize {
self.pool.slot_size - self.cursor
}
pub fn writable(&mut self, max: usize) -> &mut [u8] {
let n = max.min(self.remaining());
let base = self.pool.base[self.slot_id as usize].0;
unsafe { std::slice::from_raw_parts_mut(base.add(self.cursor), n) }
}
pub fn commit_slice(
&mut self,
len: usize,
skip: bool,
file_index: u64,
fdata_offset: u64,
chunk_seq: u32,
) {
debug_assert!(len <= self.remaining());
let base = self.pool.base[self.slot_id as usize].0 as *const u8;
let ptr = unsafe { base.add(self.cursor) };
self.slices.push(Round {
slot_id: self.slot_id,
ptr,
len,
skip,
file_index,
fdata_offset,
chunk_seq,
});
self.cursor += len;
}
pub fn slice_count(&self) -> usize {
self.slices.len()
}
#[must_use]
pub fn publish(self) -> Vec<Round> {
let n = self.slices.len();
if n == 0 {
self.pool.ret.inner.free_tx.send(self.slot_id).ok();
return Vec::new();
}
self.pool.ret.inner.outstanding[self.slot_id as usize].store(n, Ordering::Release);
self.slices
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn slot_returns_only_after_last_slice() {
let pool = Magazine::new(2, 64, 4);
assert_eq!(pool.slice_size(), 16);
let ret = pool.returner();
let mut a = pool.claim().unwrap();
let _b = pool.claim().unwrap();
let n = { a.writable(10).len().min(10) };
a.commit_slice(n, false, 0, 0, 0);
let n2 = { a.writable(20).len().min(20) };
a.commit_slice(n2, false, 1, 0, 0);
let slices = a.publish();
assert_eq!(slices.len(), 2);
let slot_a = slices[0].slot_id;
ret.release_one(slot_a);
assert!(pool.claim_now().is_none(), "slot freed too early");
ret.release_one(slot_a);
assert!(pool.claim_now().is_some(), "slot not freed after last slice");
}
#[test]
fn writable_clamps_to_remaining() {
let pool = Magazine::new(1, 32, 4);
let mut f = pool.claim().unwrap();
assert_eq!(f.writable(1000).len(), 32);
f.commit_slice(30, false, 0, 0, 0);
assert_eq!(f.remaining(), 2);
assert_eq!(f.writable(1000).len(), 2);
}
impl Magazine {
fn claim_now(&self) -> Option<u32> {
self.free_rx.try_recv().ok()
}
}
}