arcly-stream 0.1.7

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Compliance-grade audit trail for stream lifecycle.
//!
//! A host supplies an [`AuditSink`] (append-only table, Kafka, WORM bucket) and
//! wraps it in an [`AuditPipeline`]. The pipeline **is an [`Observer`]**, so it
//! installs on the engine builder like any telemetry hook and records every
//! publish/play lifecycle transition.
//!
//! ## Hot-path contract
//!
//! Recording is a non-blocking `try_send` onto a bounded channel; a background
//! worker drains it to the sink. If the channel is full the record is dropped
//! and [`dropped`](AuditPipeline::dropped) is incremented — alert on it rather
//! than letting a slow sink stall the media path. Mirrors `arcly-http`'s
//! `AuditPipeline`.

use crate::bus::StreamEvent;
use crate::observe::Observer;
use crate::{MediaFrame, Result, StreamKey};
use async_trait::async_trait;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;

/// What happened, for the audit log.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AuditAction {
    /// A publish session started.
    PublishStarted,
    /// A publish session ended.
    PublishEnded,
    /// A subscriber was evicted for chronic lag.
    SubscriberEvicted,
    /// An ingress rate limit was hit.
    RateLimited,
    /// A stream was reaped for inactivity.
    StreamReaped,
    /// A lifecycle event with a free-form label.
    Event(String),
}

/// One immutable audit record.
#[derive(Debug, Clone)]
pub struct AuditRecord {
    /// When it happened (Unix ms).
    pub ts_ms: u64,
    /// What happened.
    pub action: AuditAction,
    /// Application name, when known.
    pub app: String,
    /// Stream id, when known.
    pub stream: Option<String>,
}

impl AuditAction {
    /// A stable, log-friendly label for this action.
    pub fn label(&self) -> &str {
        match self {
            AuditAction::PublishStarted => "publish_started",
            AuditAction::PublishEnded => "publish_ended",
            AuditAction::SubscriberEvicted => "subscriber_evicted",
            AuditAction::RateLimited => "rate_limited",
            AuditAction::StreamReaped => "stream_reaped",
            AuditAction::Event(_) => "event",
        }
    }
}

/// Append-only audit destination supplied by the host.
#[async_trait]
pub trait AuditSink: Send + Sync + 'static {
    /// Durably persist one record. Errors are the sink's concern (retry/buffer).
    async fn write(&self, record: AuditRecord);
}

/// A ready-to-use [`AuditSink`] that appends one tab-separated line per record
/// to a file — a turnkey compliance trail with no external dependency.
///
/// Line format: `ts_ms \t action \t app \t stream \t detail`, newline-terminated.
/// Writes happen on a blocking thread-pool task so the async audit worker never
/// stalls on disk I/O. Pair it with an [`AuditPipeline`] (which is the
/// [`Observer`]) on the engine builder:
///
/// ```no_run
/// # async fn demo() -> arcly_stream::Result<()> {
/// use arcly_stream::audit::{AuditPipeline, FileAuditSink};
/// use std::sync::Arc;
///
/// let sink = Arc::new(FileAuditSink::open("/var/log/arcly/audit.log")?);
/// let pipeline = AuditPipeline::new(sink, 1024);
/// // engine = Engine::builder().observer(pipeline).build();
/// # let _ = pipeline; Ok(())
/// # }
/// ```
pub struct FileAuditSink {
    file: Arc<Mutex<std::fs::File>>,
}

impl FileAuditSink {
    /// Open (creating if absent, appending if present) the audit log at `path`.
    pub fn open(path: impl AsRef<std::path::Path>) -> Result<Self> {
        let file = std::fs::OpenOptions::new()
            .create(true)
            .append(true)
            .open(path)?;
        Ok(Self {
            file: Arc::new(Mutex::new(file)),
        })
    }

    fn format(record: &AuditRecord) -> String {
        // Identifiers can't contain tabs/newlines; sanitize the free-form detail.
        let detail = match &record.action {
            AuditAction::Event(label) => label.replace(['\t', '\n'], " "),
            _ => String::new(),
        };
        format!(
            "{}\t{}\t{}\t{}\t{}\n",
            record.ts_ms,
            record.action.label(),
            record.app,
            record.stream.as_deref().unwrap_or("-"),
            detail,
        )
    }
}

#[async_trait]
impl AuditSink for FileAuditSink {
    async fn write(&self, record: AuditRecord) {
        use std::io::Write;
        let line = Self::format(&record);
        let file = Arc::clone(&self.file);
        // Disk I/O on the blocking pool; a slow disk can't block the runtime.
        let _ = tokio::task::spawn_blocking(move || {
            if let Ok(mut f) = file.lock() {
                let _ = f.write_all(line.as_bytes());
            }
        })
        .await;
    }
}

/// Bridges engine lifecycle hooks to an [`AuditSink`] over a bounded channel.
pub struct AuditPipeline {
    tx: mpsc::Sender<AuditRecord>,
    dropped: Arc<AtomicU64>,
}

impl AuditPipeline {
    /// Spawn the background writer draining into `sink`, buffering up to
    /// `capacity` records.
    pub fn new(sink: Arc<dyn AuditSink>, capacity: usize) -> Self {
        let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity.max(1));
        tokio::spawn(async move {
            while let Some(record) = rx.recv().await {
                sink.write(record).await;
            }
        });
        Self {
            tx,
            dropped: Arc::new(AtomicU64::new(0)),
        }
    }

    /// Count of records dropped because the channel was full.
    pub fn dropped(&self) -> u64 {
        self.dropped.load(Ordering::Relaxed)
    }

    fn emit(&self, action: AuditAction, app: &str, stream: Option<&str>) {
        let record = AuditRecord {
            ts_ms: crate::bus::now_ms(),
            action,
            app: app.to_string(),
            stream: stream.map(str::to_string),
        };
        if self.tx.try_send(record).is_err() {
            self.dropped.fetch_add(1, Ordering::Relaxed);
        }
    }
}

impl Observer for AuditPipeline {
    fn on_event(&self, event: &StreamEvent) {
        self.emit(
            AuditAction::Event(format!("{:?}", event.kind)),
            event.app.as_str(),
            Some(event.stream_id.as_str()),
        );
    }
    fn on_publish_started(&self, app: &str) {
        self.emit(AuditAction::PublishStarted, app, None);
    }
    fn on_publish_ended(&self, app: &str) {
        self.emit(AuditAction::PublishEnded, app, None);
    }
    fn on_subscriber_evicted(&self, key: &StreamKey) {
        self.emit(
            AuditAction::SubscriberEvicted,
            key.app.as_str(),
            Some(key.stream_id.as_str()),
        );
    }
    fn on_rate_limited(&self, key: &StreamKey) {
        self.emit(
            AuditAction::RateLimited,
            key.app.as_str(),
            Some(key.stream_id.as_str()),
        );
    }
    fn on_stream_reaped(&self, key: &StreamKey) {
        self.emit(
            AuditAction::StreamReaped,
            key.app.as_str(),
            Some(key.stream_id.as_str()),
        );
    }
    // `on_frame` intentionally not audited — far too high-cardinality.
    fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {}
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Mutex;

    #[derive(Default)]
    struct MemSink {
        records: Arc<Mutex<Vec<AuditRecord>>>,
    }
    #[async_trait]
    impl AuditSink for MemSink {
        async fn write(&self, record: AuditRecord) {
            self.records.lock().unwrap().push(record);
        }
    }

    #[tokio::test]
    async fn lifecycle_hooks_are_audited() {
        let records = Arc::new(Mutex::new(Vec::new()));
        let sink = Arc::new(MemSink {
            records: Arc::clone(&records),
        });
        let pipe = AuditPipeline::new(sink, 16);

        pipe.on_publish_started("live");
        pipe.on_publish_ended("live");

        // Let the background worker drain.
        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
        let got = records.lock().unwrap();
        assert_eq!(got.len(), 2);
        assert_eq!(got[0].action, AuditAction::PublishStarted);
        assert_eq!(got[1].action, AuditAction::PublishEnded);
        assert_eq!(pipe.dropped(), 0);
    }

    #[tokio::test]
    async fn file_audit_sink_appends_lines() {
        let dir = std::env::temp_dir();
        let path = dir.join(format!("arcly-audit-{}.log", crate::bus::now_ms()));
        let sink = FileAuditSink::open(&path).expect("open audit log");

        sink.write(AuditRecord {
            ts_ms: 1,
            action: AuditAction::PublishStarted,
            app: "live".into(),
            stream: Some("cam".into()),
        })
        .await;
        sink.write(AuditRecord {
            ts_ms: 2,
            action: AuditAction::Event("custom\tlabel".into()),
            app: "live".into(),
            stream: None,
        })
        .await;

        let contents = std::fs::read_to_string(&path).expect("read back");
        let lines: Vec<&str> = contents.lines().collect();
        assert_eq!(lines.len(), 2);
        assert_eq!(lines[0], "1\tpublish_started\tlive\tcam\t");
        // Tab in the free-form label is sanitized to a space; missing stream → "-".
        assert_eq!(lines[1], "2\tevent\tlive\t-\tcustom label");

        let _ = std::fs::remove_file(&path);
    }
}