use crate::bus::StreamEvent;
use crate::observe::Observer;
use crate::{MediaFrame, Result, StreamKey};
use async_trait::async_trait;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AuditAction {
PublishStarted,
PublishEnded,
SubscriberEvicted,
RateLimited,
StreamReaped,
Event(String),
}
#[derive(Debug, Clone)]
pub struct AuditRecord {
pub ts_ms: u64,
pub action: AuditAction,
pub app: String,
pub stream: Option<String>,
}
impl AuditAction {
pub fn label(&self) -> &str {
match self {
AuditAction::PublishStarted => "publish_started",
AuditAction::PublishEnded => "publish_ended",
AuditAction::SubscriberEvicted => "subscriber_evicted",
AuditAction::RateLimited => "rate_limited",
AuditAction::StreamReaped => "stream_reaped",
AuditAction::Event(_) => "event",
}
}
}
#[async_trait]
pub trait AuditSink: Send + Sync + 'static {
async fn write(&self, record: AuditRecord);
}
pub struct FileAuditSink {
file: Arc<Mutex<std::fs::File>>,
}
impl FileAuditSink {
pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self> {
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
Ok(Self {
file: Arc::new(Mutex::new(file)),
})
}
fn format(record: &AuditRecord) -> String {
let detail = match &record.action {
AuditAction::Event(label) => label.replace(['\t', '\n'], " "),
_ => String::new(),
};
format!(
"{}\t{}\t{}\t{}\t{}\n",
record.ts_ms,
record.action.label(),
record.app,
record.stream.as_deref().unwrap_or("-"),
detail,
)
}
}
#[async_trait]
impl AuditSink for FileAuditSink {
async fn write(&self, record: AuditRecord) {
use std::io::Write;
let line = Self::format(&record);
let file = Arc::clone(&self.file);
let _ = tokio::task::spawn_blocking(move || {
if let Ok(mut f) = file.lock() {
let _ = f.write_all(line.as_bytes());
}
})
.await;
}
}
pub struct AuditPipeline {
tx: mpsc::Sender<AuditRecord>,
dropped: Arc<AtomicU64>,
}
impl AuditPipeline {
pub fn new(sink: Arc<dyn AuditSink>, capacity: usize) -> Self {
let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity.max(1));
tokio::spawn(async move {
while let Some(record) = rx.recv().await {
sink.write(record).await;
}
});
Self {
tx,
dropped: Arc::new(AtomicU64::new(0)),
}
}
pub fn dropped(&self) -> u64 {
self.dropped.load(Ordering::Relaxed)
}
fn emit(&self, action: AuditAction, app: &str, stream: Option<&str>) {
let record = AuditRecord {
ts_ms: crate::bus::now_ms(),
action,
app: app.to_string(),
stream: stream.map(str::to_string),
};
if self.tx.try_send(record).is_err() {
self.dropped.fetch_add(1, Ordering::Relaxed);
}
}
}
impl Observer for AuditPipeline {
fn on_event(&self, event: &StreamEvent) {
self.emit(
AuditAction::Event(format!("{:?}", event.kind)),
event.app.as_str(),
Some(event.stream_id.as_str()),
);
}
fn on_publish_started(&self, app: &str) {
self.emit(AuditAction::PublishStarted, app, None);
}
fn on_publish_ended(&self, app: &str) {
self.emit(AuditAction::PublishEnded, app, None);
}
fn on_subscriber_evicted(&self, key: &StreamKey) {
self.emit(
AuditAction::SubscriberEvicted,
key.app.as_str(),
Some(key.stream_id.as_str()),
);
}
fn on_rate_limited(&self, key: &StreamKey) {
self.emit(
AuditAction::RateLimited,
key.app.as_str(),
Some(key.stream_id.as_str()),
);
}
fn on_stream_reaped(&self, key: &StreamKey) {
self.emit(
AuditAction::StreamReaped,
key.app.as_str(),
Some(key.stream_id.as_str()),
);
}
fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
#[derive(Default)]
struct MemSink {
records: Arc<Mutex<Vec<AuditRecord>>>,
}
#[async_trait]
impl AuditSink for MemSink {
async fn write(&self, record: AuditRecord) {
self.records.lock().unwrap().push(record);
}
}
#[tokio::test]
async fn lifecycle_hooks_are_audited() {
let records = Arc::new(Mutex::new(Vec::new()));
let sink = Arc::new(MemSink {
records: Arc::clone(&records),
});
let pipe = AuditPipeline::new(sink, 16);
pipe.on_publish_started("live");
pipe.on_publish_ended("live");
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
let got = records.lock().unwrap();
assert_eq!(got.len(), 2);
assert_eq!(got[0].action, AuditAction::PublishStarted);
assert_eq!(got[1].action, AuditAction::PublishEnded);
assert_eq!(pipe.dropped(), 0);
}
#[tokio::test]
async fn file_audit_sink_appends_lines() {
let dir = std::env::temp_dir();
let path = dir.join(format!("arcly-audit-{}.log", crate::bus::now_ms()));
let sink = FileAuditSink::open(&path).expect("open audit log");
sink.write(AuditRecord {
ts_ms: 1,
action: AuditAction::PublishStarted,
app: "live".into(),
stream: Some("cam".into()),
})
.await;
sink.write(AuditRecord {
ts_ms: 2,
action: AuditAction::Event("custom\tlabel".into()),
app: "live".into(),
stream: None,
})
.await;
let contents = std::fs::read_to_string(&path).expect("read back");
let lines: Vec<&str> = contents.lines().collect();
assert_eq!(lines.len(), 2);
assert_eq!(lines[0], "1\tpublish_started\tlive\tcam\t");
assert_eq!(lines[1], "2\tevent\tlive\t-\tcustom label");
let _ = std::fs::remove_file(&path);
}
}