use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use bytes::Bytes;
use crate::NodeId;
use crate::id::NetId64;
use crate::typed::OrbitTyped;
pub mod cursor;
#[cfg(unix)]
pub mod shm;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Frame {
pub id: NetId64,
pub kind: u8,
pub ver: u64,
pub payload: Bytes,
}
pub struct Ring {
kind: u8,
capacity: usize,
write_pos: AtomicU64,
slots: Vec<RwLock<Option<Frame>>>,
}
impl Ring {
pub fn new<T: OrbitTyped>(capacity: usize) -> Self {
assert!(capacity > 0, "Ring capacity must be > 0");
let mut slots = Vec::with_capacity(capacity);
for _ in 0..capacity {
slots.push(RwLock::new(None));
}
Self {
kind: T::KIND,
capacity,
write_pos: AtomicU64::new(0),
slots,
}
}
pub fn kind(&self) -> u8 {
self.kind
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn head(&self) -> u64 {
self.write_pos.load(Ordering::Acquire)
}
pub fn write(&self, node_id: NodeId, frame_kind: u8, ver: u64, payload: Bytes) -> NetId64 {
let counter = self.write_pos.fetch_add(1, Ordering::AcqRel);
let id = NetId64::make(self.kind, node_id.get(), counter);
let slot_idx = (counter as usize) % self.capacity;
let frame = Frame {
id,
kind: frame_kind,
ver,
payload,
};
let mut guard = self.slots[slot_idx].write().expect("ring slot poisoned");
*guard = Some(frame);
id
}
pub fn read(&self, id: NetId64) -> Option<Frame> {
if id.kind() != self.kind {
return None;
}
let slot_idx = (id.counter() as usize) % self.capacity;
let guard = self.slots[slot_idx].read().expect("ring slot poisoned");
match &*guard {
Some(f) if f.id == id => Some(f.clone()),
_ => None,
}
}
pub fn read_head(&self) -> Option<Frame> {
let head = self.head();
if head == 0 {
return None;
}
let slot_idx = ((head - 1) as usize) % self.capacity;
self.slots[slot_idx]
.read()
.expect("ring slot poisoned")
.clone()
}
pub fn read_at(&self, counter: u64) -> Option<Frame> {
let slot_idx = (counter as usize) % self.capacity;
self.slots[slot_idx]
.read()
.expect("ring slot poisoned")
.clone()
}
pub fn reset(&self) {
for slot in &self.slots {
*slot.write().expect("ring slot poisoned") = None;
}
self.write_pos.store(0, Ordering::Release);
}
}
impl cursor::RingFrameSource for Ring {
fn kind(&self) -> u8 {
Ring::kind(self)
}
fn head(&self) -> u64 {
Ring::head(self)
}
fn capacity(&self) -> usize {
Ring::capacity(self)
}
fn read_at(&self, counter: u64) -> Option<Frame> {
Ring::read_at(self, counter)
}
}
#[cfg(unix)]
impl cursor::RingFrameSource for shm::ShmRing {
fn kind(&self) -> u8 {
shm::ShmRing::kind(self)
}
fn head(&self) -> u64 {
shm::ShmRing::head(self)
}
fn capacity(&self) -> usize {
shm::ShmRing::capacity(self)
}
fn read_at(&self, counter: u64) -> Option<Frame> {
shm::ShmRing::read_at(self, counter)
}
}
impl std::fmt::Debug for Ring {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Ring")
.field("kind", &self.kind)
.field("capacity", &self.capacity)
.field("head", &self.head())
.finish()
}
}
pub(crate) struct RingRegistry {
rings: dashmap::DashMap<u8, Arc<Ring>>,
}
impl RingRegistry {
pub fn new() -> Self {
Self {
rings: dashmap::DashMap::new(),
}
}
pub fn get_or_create<T: OrbitTyped>(&self, capacity: usize) -> Arc<Ring> {
self.rings
.entry(T::KIND)
.or_insert_with(|| Arc::new(Ring::new::<T>(capacity)))
.clone()
}
pub fn lookup(&self, kind: u8) -> Option<Arc<Ring>> {
self.rings.get(&kind).map(|e| e.clone())
}
}