use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use arc_swap::ArcSwap;
use parking_lot::Mutex;
use tokio::sync::mpsc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TapStage {
Raw,
Decoded,
}
#[derive(Debug)]
pub enum TapPayload {
Raw(String),
Decoded(Box<serde_json::Value>),
}
pub(crate) struct TapSession {
id: u64,
pub(crate) stage: TapStage,
tx: mpsc::Sender<TapPayload>,
captured: Arc<AtomicU64>,
dropped: Arc<AtomicU64>,
}
impl TapSession {
pub(crate) fn offer(&self, payload: TapPayload) {
match self.tx.try_send(payload) {
Ok(()) => {
self.captured.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
self.dropped.fetch_add(1, Ordering::Relaxed);
}
}
}
}
pub struct TapSessionHandle {
id: u64,
registry: Arc<TapRegistry>,
pub rx: mpsc::Receiver<TapPayload>,
pub captured: Arc<AtomicU64>,
pub dropped: Arc<AtomicU64>,
}
impl Drop for TapSessionHandle {
fn drop(&mut self) {
self.registry.deregister(self.id);
}
}
pub struct TapRegistry {
buffer_events: usize,
max_sessions: usize,
max_duration: Duration,
next_id: AtomicU64,
sessions: Mutex<Vec<Arc<TapSession>>>,
snapshot: ArcSwap<Vec<Arc<TapSession>>>,
}
impl TapRegistry {
pub fn new(buffer_events: usize, max_sessions: usize, max_duration: Duration) -> Arc<Self> {
Arc::new(Self {
buffer_events: buffer_events.max(1),
max_sessions,
max_duration,
next_id: AtomicU64::new(0),
sessions: Mutex::new(Vec::new()),
snapshot: ArcSwap::from_pointee(Vec::new()),
})
}
pub fn max_duration(&self) -> Duration {
self.max_duration
}
pub fn active_sessions(&self) -> usize {
self.snapshot.load().len()
}
pub fn register(self: &Arc<Self>, stage: TapStage) -> Option<TapSessionHandle> {
let mut sessions = self.sessions.lock();
if sessions.len() >= self.max_sessions {
return None;
}
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = mpsc::channel(self.buffer_events);
let captured = Arc::new(AtomicU64::new(0));
let dropped = Arc::new(AtomicU64::new(0));
sessions.push(Arc::new(TapSession {
id,
stage,
tx,
captured: captured.clone(),
dropped: dropped.clone(),
}));
self.publish(&sessions);
Some(TapSessionHandle {
id,
registry: self.clone(),
rx,
captured,
dropped,
})
}
fn deregister(&self, id: u64) {
let mut sessions = self.sessions.lock();
let before = sessions.len();
sessions.retain(|s| s.id != id);
if sessions.len() != before {
self.publish(&sessions);
}
}
fn publish(&self, sessions: &[Arc<TapSession>]) {
self.snapshot.store(Arc::new(sessions.to_vec()));
}
pub(crate) fn sessions_snapshot(&self) -> arc_swap::Guard<Arc<Vec<Arc<TapSession>>>> {
self.snapshot.load()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn register_respects_session_cap() {
let reg = TapRegistry::new(8, 2, Duration::from_secs(30));
let a = reg.register(TapStage::Decoded);
let b = reg.register(TapStage::Raw);
assert!(a.is_some());
assert!(b.is_some());
assert_eq!(reg.active_sessions(), 2);
assert!(reg.register(TapStage::Decoded).is_none(), "third over cap");
}
#[tokio::test]
async fn dropping_handle_deregisters_session() {
let reg = TapRegistry::new(8, 2, Duration::from_secs(30));
let handle = reg.register(TapStage::Decoded).expect("registered");
assert_eq!(reg.active_sessions(), 1);
drop(handle);
assert_eq!(reg.active_sessions(), 0);
assert!(reg.register(TapStage::Raw).is_some());
}
#[tokio::test]
async fn offer_delivers_until_full_then_drops() {
let reg = TapRegistry::new(2, 1, Duration::from_secs(30));
let handle = reg.register(TapStage::Raw).expect("registered");
let snapshot = reg.sessions_snapshot();
let session = &snapshot[0];
session.offer(TapPayload::Raw("a".into()));
session.offer(TapPayload::Raw("b".into()));
session.offer(TapPayload::Raw("c".into()));
assert_eq!(handle.captured.load(Ordering::Relaxed), 2);
assert_eq!(handle.dropped.load(Ordering::Relaxed), 1);
}
}