use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering};
const DEFAULT_REGISTRY_SIZE: usize = 1024;
pub struct StrandSlot {
pub strand_id: AtomicU64,
pub spawn_time: AtomicU64,
}
impl StrandSlot {
const fn new() -> Self {
Self {
strand_id: AtomicU64::new(0),
spawn_time: AtomicU64::new(0),
}
}
}
pub struct StrandRegistry {
slots: Box<[StrandSlot]>,
pub overflow_count: AtomicU64,
}
impl StrandRegistry {
pub(super) fn new(capacity: usize) -> Self {
let mut slots = Vec::with_capacity(capacity);
for _ in 0..capacity {
slots.push(StrandSlot::new());
}
Self {
slots: slots.into_boxed_slice(),
overflow_count: AtomicU64::new(0),
}
}
pub fn register(&self, strand_id: u64) -> Option<usize> {
let spawn_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
for (idx, slot) in self.slots.iter().enumerate() {
slot.spawn_time.store(spawn_time, Ordering::Relaxed);
if slot
.strand_id
.compare_exchange(0, strand_id, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
return Some(idx);
}
}
self.overflow_count.fetch_add(1, Ordering::Relaxed);
None
}
pub fn unregister(&self, strand_id: u64) -> bool {
for slot in self.slots.iter() {
if slot
.strand_id
.compare_exchange(strand_id, 0, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
{
slot.spawn_time.store(0, Ordering::Release);
return true;
}
}
false
}
pub fn active_strands(&self) -> impl Iterator<Item = (u64, u64)> + '_ {
self.slots.iter().filter_map(|slot| {
let id = slot.strand_id.load(Ordering::Acquire);
if id > 0 {
let time = slot.spawn_time.load(Ordering::Relaxed);
Some((id, time))
} else {
None
}
})
}
pub fn capacity(&self) -> usize {
self.slots.len()
}
}
static STRAND_REGISTRY: OnceLock<StrandRegistry> = OnceLock::new();
pub fn strand_registry() -> &'static StrandRegistry {
STRAND_REGISTRY.get_or_init(|| {
let size = std::env::var("SEQ_STRAND_REGISTRY_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_REGISTRY_SIZE);
StrandRegistry::new(size)
})
}