1use crate::bus::StreamEvent;
17use crate::observe::Observer;
18use crate::{MediaFrame, Result, StreamKey};
19use async_trait::async_trait;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::{Arc, Mutex};
22use tokio::sync::mpsc;
23
24#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum AuditAction {
27 PublishStarted,
29 PublishEnded,
31 SubscriberEvicted,
33 RateLimited,
35 StreamReaped,
37 Event(String),
39}
40
41#[derive(Debug, Clone)]
43pub struct AuditRecord {
44 pub ts_ms: u64,
46 pub action: AuditAction,
48 pub app: String,
50 pub stream: Option<String>,
52}
53
54impl AuditAction {
55 pub fn label(&self) -> &str {
57 match self {
58 AuditAction::PublishStarted => "publish_started",
59 AuditAction::PublishEnded => "publish_ended",
60 AuditAction::SubscriberEvicted => "subscriber_evicted",
61 AuditAction::RateLimited => "rate_limited",
62 AuditAction::StreamReaped => "stream_reaped",
63 AuditAction::Event(_) => "event",
64 }
65 }
66}
67
68#[async_trait]
70pub trait AuditSink: Send + Sync + 'static {
71 async fn write(&self, record: AuditRecord);
73}
74
75pub struct FileAuditSink {
95 file: Arc<Mutex<std::fs::File>>,
96}
97
98impl FileAuditSink {
99 pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self> {
101 let file = std::fs::OpenOptions::new()
102 .create(true)
103 .append(true)
104 .open(path)?;
105 Ok(Self {
106 file: Arc::new(Mutex::new(file)),
107 })
108 }
109
110 fn format(record: &AuditRecord) -> String {
111 let detail = match &record.action {
113 AuditAction::Event(label) => label.replace(['\t', '\n'], " "),
114 _ => String::new(),
115 };
116 format!(
117 "{}\t{}\t{}\t{}\t{}\n",
118 record.ts_ms,
119 record.action.label(),
120 record.app,
121 record.stream.as_deref().unwrap_or("-"),
122 detail,
123 )
124 }
125}
126
127#[async_trait]
128impl AuditSink for FileAuditSink {
129 async fn write(&self, record: AuditRecord) {
130 use std::io::Write;
131 let line = Self::format(&record);
132 let file = Arc::clone(&self.file);
133 let _ = tokio::task::spawn_blocking(move || {
135 if let Ok(mut f) = file.lock() {
136 let _ = f.write_all(line.as_bytes());
137 }
138 })
139 .await;
140 }
141}
142
143pub struct AuditPipeline {
145 tx: mpsc::Sender<AuditRecord>,
146 dropped: Arc<AtomicU64>,
147}
148
149impl AuditPipeline {
150 pub fn new(sink: Arc<dyn AuditSink>, capacity: usize) -> Self {
153 let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity.max(1));
154 tokio::spawn(async move {
155 while let Some(record) = rx.recv().await {
156 sink.write(record).await;
157 }
158 });
159 Self {
160 tx,
161 dropped: Arc::new(AtomicU64::new(0)),
162 }
163 }
164
165 pub fn dropped(&self) -> u64 {
167 self.dropped.load(Ordering::Relaxed)
168 }
169
170 fn emit(&self, action: AuditAction, app: &str, stream: Option<&str>) {
171 let record = AuditRecord {
172 ts_ms: crate::bus::now_ms(),
173 action,
174 app: app.to_string(),
175 stream: stream.map(str::to_string),
176 };
177 if self.tx.try_send(record).is_err() {
178 self.dropped.fetch_add(1, Ordering::Relaxed);
179 }
180 }
181}
182
183impl Observer for AuditPipeline {
184 fn on_event(&self, event: &StreamEvent) {
185 self.emit(
186 AuditAction::Event(format!("{:?}", event.kind)),
187 event.app.as_str(),
188 Some(event.stream_id.as_str()),
189 );
190 }
191 fn on_publish_started(&self, app: &str) {
192 self.emit(AuditAction::PublishStarted, app, None);
193 }
194 fn on_publish_ended(&self, app: &str) {
195 self.emit(AuditAction::PublishEnded, app, None);
196 }
197 fn on_subscriber_evicted(&self, key: &StreamKey) {
198 self.emit(
199 AuditAction::SubscriberEvicted,
200 key.app.as_str(),
201 Some(key.stream_id.as_str()),
202 );
203 }
204 fn on_rate_limited(&self, key: &StreamKey) {
205 self.emit(
206 AuditAction::RateLimited,
207 key.app.as_str(),
208 Some(key.stream_id.as_str()),
209 );
210 }
211 fn on_stream_reaped(&self, key: &StreamKey) {
212 self.emit(
213 AuditAction::StreamReaped,
214 key.app.as_str(),
215 Some(key.stream_id.as_str()),
216 );
217 }
218 fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {}
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use std::sync::Mutex;
226
227 #[derive(Default)]
228 struct MemSink {
229 records: Arc<Mutex<Vec<AuditRecord>>>,
230 }
231 #[async_trait]
232 impl AuditSink for MemSink {
233 async fn write(&self, record: AuditRecord) {
234 self.records.lock().unwrap().push(record);
235 }
236 }
237
238 #[tokio::test]
239 async fn lifecycle_hooks_are_audited() {
240 let records = Arc::new(Mutex::new(Vec::new()));
241 let sink = Arc::new(MemSink {
242 records: Arc::clone(&records),
243 });
244 let pipe = AuditPipeline::new(sink, 16);
245
246 pipe.on_publish_started("live");
247 pipe.on_publish_ended("live");
248
249 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
251 let got = records.lock().unwrap();
252 assert_eq!(got.len(), 2);
253 assert_eq!(got[0].action, AuditAction::PublishStarted);
254 assert_eq!(got[1].action, AuditAction::PublishEnded);
255 assert_eq!(pipe.dropped(), 0);
256 }
257
258 #[tokio::test]
259 async fn file_audit_sink_appends_lines() {
260 let dir = std::env::temp_dir();
261 let path = dir.join(format!("arcly-audit-{}.log", crate::bus::now_ms()));
262 let sink = FileAuditSink::open(&path).expect("open audit log");
263
264 sink.write(AuditRecord {
265 ts_ms: 1,
266 action: AuditAction::PublishStarted,
267 app: "live".into(),
268 stream: Some("cam".into()),
269 })
270 .await;
271 sink.write(AuditRecord {
272 ts_ms: 2,
273 action: AuditAction::Event("custom\tlabel".into()),
274 app: "live".into(),
275 stream: None,
276 })
277 .await;
278
279 let contents = std::fs::read_to_string(&path).expect("read back");
280 let lines: Vec<&str> = contents.lines().collect();
281 assert_eq!(lines.len(), 2);
282 assert_eq!(lines[0], "1\tpublish_started\tlive\tcam\t");
283 assert_eq!(lines[1], "2\tevent\tlive\t-\tcustom label");
285
286 let _ = std::fs::remove_file(&path);
287 }
288}