1use crate::bus::StreamEvent;
17use crate::observe::Observer;
18use crate::{MediaFrame, StreamKey};
19use async_trait::async_trait;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::Arc;
22use tokio::sync::mpsc;
23
24#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum AuditAction {
27 PublishStarted,
29 PublishEnded,
31 SubscriberEvicted,
33 RateLimited,
35 StreamReaped,
37 Event(String),
39}
40
41#[derive(Debug, Clone)]
43pub struct AuditRecord {
44 pub ts_ms: u64,
46 pub action: AuditAction,
48 pub app: String,
50 pub stream: Option<String>,
52}
53
54#[async_trait]
56pub trait AuditSink: Send + Sync + 'static {
57 async fn write(&self, record: AuditRecord);
59}
60
61pub struct AuditPipeline {
63 tx: mpsc::Sender<AuditRecord>,
64 dropped: Arc<AtomicU64>,
65}
66
67impl AuditPipeline {
68 pub fn new(sink: Arc<dyn AuditSink>, capacity: usize) -> Self {
71 let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity.max(1));
72 tokio::spawn(async move {
73 while let Some(record) = rx.recv().await {
74 sink.write(record).await;
75 }
76 });
77 Self {
78 tx,
79 dropped: Arc::new(AtomicU64::new(0)),
80 }
81 }
82
83 pub fn dropped(&self) -> u64 {
85 self.dropped.load(Ordering::Relaxed)
86 }
87
88 fn emit(&self, action: AuditAction, app: &str, stream: Option<&str>) {
89 let record = AuditRecord {
90 ts_ms: crate::bus::now_ms(),
91 action,
92 app: app.to_string(),
93 stream: stream.map(str::to_string),
94 };
95 if self.tx.try_send(record).is_err() {
96 self.dropped.fetch_add(1, Ordering::Relaxed);
97 }
98 }
99}
100
101impl Observer for AuditPipeline {
102 fn on_event(&self, event: &StreamEvent) {
103 self.emit(
104 AuditAction::Event(format!("{:?}", event.kind)),
105 event.app.as_str(),
106 Some(event.stream_id.as_str()),
107 );
108 }
109 fn on_publish_started(&self, app: &str) {
110 self.emit(AuditAction::PublishStarted, app, None);
111 }
112 fn on_publish_ended(&self, app: &str) {
113 self.emit(AuditAction::PublishEnded, app, None);
114 }
115 fn on_subscriber_evicted(&self, key: &StreamKey) {
116 self.emit(
117 AuditAction::SubscriberEvicted,
118 key.app.as_str(),
119 Some(key.stream_id.as_str()),
120 );
121 }
122 fn on_rate_limited(&self, key: &StreamKey) {
123 self.emit(
124 AuditAction::RateLimited,
125 key.app.as_str(),
126 Some(key.stream_id.as_str()),
127 );
128 }
129 fn on_stream_reaped(&self, key: &StreamKey) {
130 self.emit(
131 AuditAction::StreamReaped,
132 key.app.as_str(),
133 Some(key.stream_id.as_str()),
134 );
135 }
136 fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {}
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use std::sync::Mutex;
144
145 #[derive(Default)]
146 struct MemSink {
147 records: Arc<Mutex<Vec<AuditRecord>>>,
148 }
149 #[async_trait]
150 impl AuditSink for MemSink {
151 async fn write(&self, record: AuditRecord) {
152 self.records.lock().unwrap().push(record);
153 }
154 }
155
156 #[tokio::test]
157 async fn lifecycle_hooks_are_audited() {
158 let records = Arc::new(Mutex::new(Vec::new()));
159 let sink = Arc::new(MemSink {
160 records: Arc::clone(&records),
161 });
162 let pipe = AuditPipeline::new(sink, 16);
163
164 pipe.on_publish_started("live");
165 pipe.on_publish_ended("live");
166
167 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
169 let got = records.lock().unwrap();
170 assert_eq!(got.len(), 2);
171 assert_eq!(got[0].action, AuditAction::PublishStarted);
172 assert_eq!(got[1].action, AuditAction::PublishEnded);
173 assert_eq!(pipe.dropped(), 0);
174 }
175}