use std::collections::HashMap;
use std::sync::RwLock;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::mpsc;
use tracing::{debug, info, warn};
const CHANNEL_CAPACITY: usize = 1_024;
pub type ArrayFrame = Vec<u8>;
pub struct ArrayDeliveryRegistry {
sessions: RwLock<HashMap<String, mpsc::Sender<ArrayFrame>>>,
pub sessions_registered: AtomicU64,
pub frames_dropped: AtomicU64,
}
impl Default for ArrayDeliveryRegistry {
fn default() -> Self {
Self::new()
}
}
impl ArrayDeliveryRegistry {
pub fn new() -> Self {
Self {
sessions: RwLock::new(HashMap::new()),
sessions_registered: AtomicU64::new(0),
frames_dropped: AtomicU64::new(0),
}
}
pub fn register(&self, session_id: String) -> mpsc::Receiver<ArrayFrame> {
let (tx, rx) = mpsc::channel(CHANNEL_CAPACITY);
let mut sessions = self.sessions.write().unwrap_or_else(|p| p.into_inner());
sessions.insert(session_id.clone(), tx);
self.sessions_registered.fetch_add(1, Ordering::Relaxed);
info!(session = %session_id, "array_delivery: session registered");
rx
}
pub fn unregister(&self, session_id: &str) {
let mut sessions = self.sessions.write().unwrap_or_else(|p| p.into_inner());
if sessions.remove(session_id).is_some() {
debug!(session = %session_id, "array_delivery: session unregistered");
}
}
pub fn enqueue(&self, session_id: &str, frame: ArrayFrame) {
let sessions = self.sessions.read().unwrap_or_else(|p| p.into_inner());
if let Some(tx) = sessions.get(session_id) {
match tx.try_send(frame) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
self.frames_dropped.fetch_add(1, Ordering::Relaxed);
warn!(
session = %session_id,
"array_delivery: channel full — frame dropped; Lite will catch up via snapshot"
);
}
Err(mpsc::error::TrySendError::Closed(_)) => {
debug!(session = %session_id, "array_delivery: session channel closed (disconnected)");
}
}
}
}
pub fn active_sessions(&self) -> usize {
let sessions = self.sessions.read().unwrap_or_else(|p| p.into_inner());
sessions.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn register_and_receive() {
let reg = ArrayDeliveryRegistry::new();
let mut rx = reg.register("s1".into());
reg.enqueue("s1", vec![1, 2, 3]);
let frame = rx.recv().await.expect("should receive frame");
assert_eq!(frame, vec![1, 2, 3]);
}
#[tokio::test]
async fn unregister_drops_sender() {
let reg = ArrayDeliveryRegistry::new();
let mut rx = reg.register("s1".into());
reg.unregister("s1");
reg.enqueue("s1", vec![9]); assert_eq!(rx.recv().await, None);
}
#[test]
fn enqueue_unknown_session_is_noop() {
let reg = ArrayDeliveryRegistry::new();
reg.enqueue("ghost", vec![0]); assert_eq!(reg.frames_dropped.load(Ordering::Relaxed), 0);
}
}