Skip to main content

arcly_stream/
audit.rs

1//! Compliance-grade audit trail for stream lifecycle.
2//!
3//! A host supplies an [`AuditSink`] (append-only table, Kafka, WORM bucket) and
4//! wraps it in an [`AuditPipeline`]. The pipeline **is an [`Observer`]**, so it
5//! installs on the engine builder like any telemetry hook and records every
6//! publish/play lifecycle transition.
7//!
8//! ## Hot-path contract
9//!
10//! Recording is a non-blocking `try_send` onto a bounded channel; a background
11//! worker drains it to the sink. If the channel is full the record is dropped
12//! and [`dropped`](AuditPipeline::dropped) is incremented — alert on it rather
13//! than letting a slow sink stall the media path. Mirrors `arcly-http`'s
14//! `AuditPipeline`.
15
16use crate::bus::StreamEvent;
17use crate::observe::Observer;
18use crate::{MediaFrame, Result, StreamKey};
19use async_trait::async_trait;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::{Arc, Mutex};
22use tokio::sync::mpsc;
23
24/// What happened, for the audit log.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum AuditAction {
27    /// A publish session started.
28    PublishStarted,
29    /// A publish session ended.
30    PublishEnded,
31    /// A subscriber was evicted for chronic lag.
32    SubscriberEvicted,
33    /// An ingress rate limit was hit.
34    RateLimited,
35    /// A stream was reaped for inactivity.
36    StreamReaped,
37    /// A lifecycle event with a free-form label.
38    Event(String),
39}
40
41/// One immutable audit record.
42#[derive(Debug, Clone)]
43pub struct AuditRecord {
44    /// When it happened (Unix ms).
45    pub ts_ms: u64,
46    /// What happened.
47    pub action: AuditAction,
48    /// Application name, when known.
49    pub app: String,
50    /// Stream id, when known.
51    pub stream: Option<String>,
52}
53
54impl AuditAction {
55    /// A stable, log-friendly label for this action.
56    pub fn label(&self) -> &str {
57        match self {
58            AuditAction::PublishStarted => "publish_started",
59            AuditAction::PublishEnded => "publish_ended",
60            AuditAction::SubscriberEvicted => "subscriber_evicted",
61            AuditAction::RateLimited => "rate_limited",
62            AuditAction::StreamReaped => "stream_reaped",
63            AuditAction::Event(_) => "event",
64        }
65    }
66}
67
68/// Append-only audit destination supplied by the host.
69#[async_trait]
70pub trait AuditSink: Send + Sync + 'static {
71    /// Durably persist one record. Errors are the sink's concern (retry/buffer).
72    async fn write(&self, record: AuditRecord);
73}
74
75/// A ready-to-use [`AuditSink`] that appends one tab-separated line per record
76/// to a file — a turnkey compliance trail with no external dependency.
77///
78/// Line format: `ts_ms \t action \t app \t stream \t detail`, newline-terminated.
79/// Writes happen on a blocking thread-pool task so the async audit worker never
80/// stalls on disk I/O. Pair it with an [`AuditPipeline`] (which is the
81/// [`Observer`]) on the engine builder:
82///
83/// ```no_run
84/// # async fn demo() -> arcly_stream::Result<()> {
85/// use arcly_stream::audit::{AuditPipeline, FileAuditSink};
86/// use std::sync::Arc;
87///
88/// let sink = Arc::new(FileAuditSink::open("/var/log/arcly/audit.log")?);
89/// let pipeline = AuditPipeline::new(sink, 1024);
90/// // engine = Engine::builder().observer(pipeline).build();
91/// # let _ = pipeline; Ok(())
92/// # }
93/// ```
94pub struct FileAuditSink {
95    file: Arc<Mutex<std::fs::File>>,
96}
97
98impl FileAuditSink {
99    /// Open (creating if absent, appending if present) the audit log at `path`.
100    pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self> {
101        let file = std::fs::OpenOptions::new()
102            .create(true)
103            .append(true)
104            .open(path)?;
105        Ok(Self {
106            file: Arc::new(Mutex::new(file)),
107        })
108    }
109
110    fn format(record: &AuditRecord) -> String {
111        // Identifiers can't contain tabs/newlines; sanitize the free-form detail.
112        let detail = match &record.action {
113            AuditAction::Event(label) => label.replace(['\t', '\n'], " "),
114            _ => String::new(),
115        };
116        format!(
117            "{}\t{}\t{}\t{}\t{}\n",
118            record.ts_ms,
119            record.action.label(),
120            record.app,
121            record.stream.as_deref().unwrap_or("-"),
122            detail,
123        )
124    }
125}
126
127#[async_trait]
128impl AuditSink for FileAuditSink {
129    async fn write(&self, record: AuditRecord) {
130        use std::io::Write;
131        let line = Self::format(&record);
132        let file = Arc::clone(&self.file);
133        // Disk I/O on the blocking pool; a slow disk can't block the runtime.
134        let _ = tokio::task::spawn_blocking(move || {
135            if let Ok(mut f) = file.lock() {
136                let _ = f.write_all(line.as_bytes());
137            }
138        })
139        .await;
140    }
141}
142
143/// Bridges engine lifecycle hooks to an [`AuditSink`] over a bounded channel.
144pub struct AuditPipeline {
145    tx: mpsc::Sender<AuditRecord>,
146    dropped: Arc<AtomicU64>,
147}
148
149impl AuditPipeline {
150    /// Spawn the background writer draining into `sink`, buffering up to
151    /// `capacity` records.
152    pub fn new(sink: Arc<dyn AuditSink>, capacity: usize) -> Self {
153        let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity.max(1));
154        tokio::spawn(async move {
155            while let Some(record) = rx.recv().await {
156                sink.write(record).await;
157            }
158        });
159        Self {
160            tx,
161            dropped: Arc::new(AtomicU64::new(0)),
162        }
163    }
164
165    /// Count of records dropped because the channel was full.
166    pub fn dropped(&self) -> u64 {
167        self.dropped.load(Ordering::Relaxed)
168    }
169
170    fn emit(&self, action: AuditAction, app: &str, stream: Option<&str>) {
171        let record = AuditRecord {
172            ts_ms: crate::bus::now_ms(),
173            action,
174            app: app.to_string(),
175            stream: stream.map(str::to_string),
176        };
177        if self.tx.try_send(record).is_err() {
178            self.dropped.fetch_add(1, Ordering::Relaxed);
179        }
180    }
181}
182
183impl Observer for AuditPipeline {
184    fn on_event(&self, event: &StreamEvent) {
185        self.emit(
186            AuditAction::Event(format!("{:?}", event.kind)),
187            event.app.as_str(),
188            Some(event.stream_id.as_str()),
189        );
190    }
191    fn on_publish_started(&self, app: &str) {
192        self.emit(AuditAction::PublishStarted, app, None);
193    }
194    fn on_publish_ended(&self, app: &str) {
195        self.emit(AuditAction::PublishEnded, app, None);
196    }
197    fn on_subscriber_evicted(&self, key: &StreamKey) {
198        self.emit(
199            AuditAction::SubscriberEvicted,
200            key.app.as_str(),
201            Some(key.stream_id.as_str()),
202        );
203    }
204    fn on_rate_limited(&self, key: &StreamKey) {
205        self.emit(
206            AuditAction::RateLimited,
207            key.app.as_str(),
208            Some(key.stream_id.as_str()),
209        );
210    }
211    fn on_stream_reaped(&self, key: &StreamKey) {
212        self.emit(
213            AuditAction::StreamReaped,
214            key.app.as_str(),
215            Some(key.stream_id.as_str()),
216        );
217    }
218    // `on_frame` intentionally not audited — far too high-cardinality.
219    fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {}
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use std::sync::Mutex;
226
227    #[derive(Default)]
228    struct MemSink {
229        records: Arc<Mutex<Vec<AuditRecord>>>,
230    }
231    #[async_trait]
232    impl AuditSink for MemSink {
233        async fn write(&self, record: AuditRecord) {
234            self.records.lock().unwrap().push(record);
235        }
236    }
237
238    #[tokio::test]
239    async fn lifecycle_hooks_are_audited() {
240        let records = Arc::new(Mutex::new(Vec::new()));
241        let sink = Arc::new(MemSink {
242            records: Arc::clone(&records),
243        });
244        let pipe = AuditPipeline::new(sink, 16);
245
246        pipe.on_publish_started("live");
247        pipe.on_publish_ended("live");
248
249        // Let the background worker drain.
250        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
251        let got = records.lock().unwrap();
252        assert_eq!(got.len(), 2);
253        assert_eq!(got[0].action, AuditAction::PublishStarted);
254        assert_eq!(got[1].action, AuditAction::PublishEnded);
255        assert_eq!(pipe.dropped(), 0);
256    }
257
258    #[tokio::test]
259    async fn file_audit_sink_appends_lines() {
260        let dir = std::env::temp_dir();
261        let path = dir.join(format!("arcly-audit-{}.log", crate::bus::now_ms()));
262        let sink = FileAuditSink::open(&path).expect("open audit log");
263
264        sink.write(AuditRecord {
265            ts_ms: 1,
266            action: AuditAction::PublishStarted,
267            app: "live".into(),
268            stream: Some("cam".into()),
269        })
270        .await;
271        sink.write(AuditRecord {
272            ts_ms: 2,
273            action: AuditAction::Event("custom\tlabel".into()),
274            app: "live".into(),
275            stream: None,
276        })
277        .await;
278
279        let contents = std::fs::read_to_string(&path).expect("read back");
280        let lines: Vec<&str> = contents.lines().collect();
281        assert_eq!(lines.len(), 2);
282        assert_eq!(lines[0], "1\tpublish_started\tlive\tcam\t");
283        // Tab in the free-form label is sanitized to a space; missing stream → "-".
284        assert_eq!(lines[1], "2\tevent\tlive\t-\tcustom label");
285
286        let _ = std::fs::remove_file(&path);
287    }
288}