Skip to main content

arcly_stream/
audit.rs

1//! Compliance-grade audit trail for stream lifecycle.
2//!
3//! A host supplies an [`AuditSink`] (append-only table, Kafka, WORM bucket) and
4//! wraps it in an [`AuditPipeline`]. The pipeline **is an [`Observer`]**, so it
5//! installs on the engine builder like any telemetry hook and records every
6//! publish/play lifecycle transition.
7//!
8//! ## Hot-path contract
9//!
10//! Recording is a non-blocking `try_send` onto a bounded channel; a background
11//! worker drains it to the sink. If the channel is full the record is dropped
12//! and [`dropped`](AuditPipeline::dropped) is incremented — alert on it rather
13//! than letting a slow sink stall the media path. Mirrors `arcly-http`'s
14//! `AuditPipeline`.
15
16use crate::bus::StreamEvent;
17use crate::observe::Observer;
18use crate::{MediaFrame, StreamKey};
19use async_trait::async_trait;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::Arc;
22use tokio::sync::mpsc;
23
24/// What happened, for the audit log.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum AuditAction {
27    /// A publish session started.
28    PublishStarted,
29    /// A publish session ended.
30    PublishEnded,
31    /// A subscriber was evicted for chronic lag.
32    SubscriberEvicted,
33    /// An ingress rate limit was hit.
34    RateLimited,
35    /// A stream was reaped for inactivity.
36    StreamReaped,
37    /// A lifecycle event with a free-form label.
38    Event(String),
39}
40
41/// One immutable audit record.
42#[derive(Debug, Clone)]
43pub struct AuditRecord {
44    /// When it happened (Unix ms).
45    pub ts_ms: u64,
46    /// What happened.
47    pub action: AuditAction,
48    /// Application name, when known.
49    pub app: String,
50    /// Stream id, when known.
51    pub stream: Option<String>,
52}
53
54/// Append-only audit destination supplied by the host.
55#[async_trait]
56pub trait AuditSink: Send + Sync + 'static {
57    /// Durably persist one record. Errors are the sink's concern (retry/buffer).
58    async fn write(&self, record: AuditRecord);
59}
60
61/// Bridges engine lifecycle hooks to an [`AuditSink`] over a bounded channel.
62pub struct AuditPipeline {
63    tx: mpsc::Sender<AuditRecord>,
64    dropped: Arc<AtomicU64>,
65}
66
67impl AuditPipeline {
68    /// Spawn the background writer draining into `sink`, buffering up to
69    /// `capacity` records.
70    pub fn new(sink: Arc<dyn AuditSink>, capacity: usize) -> Self {
71        let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity.max(1));
72        tokio::spawn(async move {
73            while let Some(record) = rx.recv().await {
74                sink.write(record).await;
75            }
76        });
77        Self {
78            tx,
79            dropped: Arc::new(AtomicU64::new(0)),
80        }
81    }
82
83    /// Count of records dropped because the channel was full.
84    pub fn dropped(&self) -> u64 {
85        self.dropped.load(Ordering::Relaxed)
86    }
87
88    fn emit(&self, action: AuditAction, app: &str, stream: Option<&str>) {
89        let record = AuditRecord {
90            ts_ms: crate::bus::now_ms(),
91            action,
92            app: app.to_string(),
93            stream: stream.map(str::to_string),
94        };
95        if self.tx.try_send(record).is_err() {
96            self.dropped.fetch_add(1, Ordering::Relaxed);
97        }
98    }
99}
100
101impl Observer for AuditPipeline {
102    fn on_event(&self, event: &StreamEvent) {
103        self.emit(
104            AuditAction::Event(format!("{:?}", event.kind)),
105            event.app.as_str(),
106            Some(event.stream_id.as_str()),
107        );
108    }
109    fn on_publish_started(&self, app: &str) {
110        self.emit(AuditAction::PublishStarted, app, None);
111    }
112    fn on_publish_ended(&self, app: &str) {
113        self.emit(AuditAction::PublishEnded, app, None);
114    }
115    fn on_subscriber_evicted(&self, key: &StreamKey) {
116        self.emit(
117            AuditAction::SubscriberEvicted,
118            key.app.as_str(),
119            Some(key.stream_id.as_str()),
120        );
121    }
122    fn on_rate_limited(&self, key: &StreamKey) {
123        self.emit(
124            AuditAction::RateLimited,
125            key.app.as_str(),
126            Some(key.stream_id.as_str()),
127        );
128    }
129    fn on_stream_reaped(&self, key: &StreamKey) {
130        self.emit(
131            AuditAction::StreamReaped,
132            key.app.as_str(),
133            Some(key.stream_id.as_str()),
134        );
135    }
136    // `on_frame` intentionally not audited — far too high-cardinality.
137    fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {}
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use std::sync::Mutex;
144
145    #[derive(Default)]
146    struct MemSink {
147        records: Arc<Mutex<Vec<AuditRecord>>>,
148    }
149    #[async_trait]
150    impl AuditSink for MemSink {
151        async fn write(&self, record: AuditRecord) {
152            self.records.lock().unwrap().push(record);
153        }
154    }
155
156    #[tokio::test]
157    async fn lifecycle_hooks_are_audited() {
158        let records = Arc::new(Mutex::new(Vec::new()));
159        let sink = Arc::new(MemSink {
160            records: Arc::clone(&records),
161        });
162        let pipe = AuditPipeline::new(sink, 16);
163
164        pipe.on_publish_started("live");
165        pipe.on_publish_ended("live");
166
167        // Let the background worker drain.
168        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
169        let got = records.lock().unwrap();
170        assert_eq!(got.len(), 2);
171        assert_eq!(got[0].action, AuditAction::PublishStarted);
172        assert_eq!(got[1].action, AuditAction::PublishEnded);
173        assert_eq!(pipe.dropped(), 0);
174    }
175}