use crate::bus::StreamEvent;
use crate::observe::Observer;
use crate::{MediaFrame, StreamKey};
use async_trait::async_trait;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AuditAction {
PublishStarted,
PublishEnded,
SubscriberEvicted,
RateLimited,
StreamReaped,
Event(String),
}
#[derive(Debug, Clone)]
pub struct AuditRecord {
pub ts_ms: u64,
pub action: AuditAction,
pub app: String,
pub stream: Option<String>,
}
#[async_trait]
pub trait AuditSink: Send + Sync + 'static {
async fn write(&self, record: AuditRecord);
}
pub struct AuditPipeline {
tx: mpsc::Sender<AuditRecord>,
dropped: Arc<AtomicU64>,
}
impl AuditPipeline {
pub fn new(sink: Arc<dyn AuditSink>, capacity: usize) -> Self {
let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity.max(1));
tokio::spawn(async move {
while let Some(record) = rx.recv().await {
sink.write(record).await;
}
});
Self {
tx,
dropped: Arc::new(AtomicU64::new(0)),
}
}
pub fn dropped(&self) -> u64 {
self.dropped.load(Ordering::Relaxed)
}
fn emit(&self, action: AuditAction, app: &str, stream: Option<&str>) {
let record = AuditRecord {
ts_ms: crate::bus::now_ms(),
action,
app: app.to_string(),
stream: stream.map(str::to_string),
};
if self.tx.try_send(record).is_err() {
self.dropped.fetch_add(1, Ordering::Relaxed);
}
}
}
impl Observer for AuditPipeline {
fn on_event(&self, event: &StreamEvent) {
self.emit(
AuditAction::Event(format!("{:?}", event.kind)),
event.app.as_str(),
Some(event.stream_id.as_str()),
);
}
fn on_publish_started(&self, app: &str) {
self.emit(AuditAction::PublishStarted, app, None);
}
fn on_publish_ended(&self, app: &str) {
self.emit(AuditAction::PublishEnded, app, None);
}
fn on_subscriber_evicted(&self, key: &StreamKey) {
self.emit(
AuditAction::SubscriberEvicted,
key.app.as_str(),
Some(key.stream_id.as_str()),
);
}
fn on_rate_limited(&self, key: &StreamKey) {
self.emit(
AuditAction::RateLimited,
key.app.as_str(),
Some(key.stream_id.as_str()),
);
}
fn on_stream_reaped(&self, key: &StreamKey) {
self.emit(
AuditAction::StreamReaped,
key.app.as_str(),
Some(key.stream_id.as_str()),
);
}
fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
#[derive(Default)]
struct MemSink {
records: Arc<Mutex<Vec<AuditRecord>>>,
}
#[async_trait]
impl AuditSink for MemSink {
async fn write(&self, record: AuditRecord) {
self.records.lock().unwrap().push(record);
}
}
#[tokio::test]
async fn lifecycle_hooks_are_audited() {
let records = Arc::new(Mutex::new(Vec::new()));
let sink = Arc::new(MemSink {
records: Arc::clone(&records),
});
let pipe = AuditPipeline::new(sink, 16);
pipe.on_publish_started("live");
pipe.on_publish_ended("live");
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let got = records.lock().unwrap();
assert_eq!(got.len(), 2);
assert_eq!(got[0].action, AuditAction::PublishStarted);
assert_eq!(got[1].action, AuditAction::PublishEnded);
assert_eq!(pipe.dropped(), 0);
}
}