arcly-stream 0.1.0

A high-performance live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, pluggable HLS/recording, and trait-driven protocol/storage/auth/observer extension points — runtime, config, and metrics free.
Documentation
//! Compliance-grade audit trail for stream lifecycle.
//!
//! A host supplies an [`AuditSink`] (append-only table, Kafka, WORM bucket) and
//! wraps it in an [`AuditPipeline`]. The pipeline **is an [`Observer`]**, so it
//! installs on the engine builder like any telemetry hook and records every
//! publish/play lifecycle transition.
//!
//! ## Hot-path contract
//!
//! Recording is a non-blocking `try_send` onto a bounded channel; a background
//! worker drains it to the sink. If the channel is full the record is dropped
//! and [`dropped`](AuditPipeline::dropped) is incremented — alert on it rather
//! than letting a slow sink stall the media path. Mirrors `arcly-http`'s
//! `AuditPipeline`.

use crate::bus::StreamEvent;
use crate::observe::Observer;
use crate::{MediaFrame, StreamKey};
use async_trait::async_trait;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;

/// What happened, for the audit log.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AuditAction {
    /// A publish session started.
    PublishStarted,
    /// A publish session ended.
    PublishEnded,
    /// A subscriber was evicted for chronic lag.
    SubscriberEvicted,
    /// An ingress rate limit was hit.
    RateLimited,
    /// A stream was reaped for inactivity.
    StreamReaped,
    /// A lifecycle event with a free-form label.
    Event(String),
}

/// One immutable audit record.
#[derive(Debug, Clone)]
pub struct AuditRecord {
    /// When it happened (Unix ms).
    pub ts_ms: u64,
    /// What happened.
    pub action: AuditAction,
    /// Application name, when known.
    pub app: String,
    /// Stream id, when known.
    pub stream: Option<String>,
}

/// Append-only audit destination supplied by the host.
#[async_trait]
pub trait AuditSink: Send + Sync + 'static {
    /// Durably persist one record. Errors are the sink's concern (retry/buffer).
    async fn write(&self, record: AuditRecord);
}

/// Bridges engine lifecycle hooks to an [`AuditSink`] over a bounded channel.
pub struct AuditPipeline {
    tx: mpsc::Sender<AuditRecord>,
    dropped: Arc<AtomicU64>,
}

impl AuditPipeline {
    /// Spawn the background writer draining into `sink`, buffering up to
    /// `capacity` records.
    pub fn new(sink: Arc<dyn AuditSink>, capacity: usize) -> Self {
        let (tx, mut rx) = mpsc::channel::<AuditRecord>(capacity.max(1));
        tokio::spawn(async move {
            while let Some(record) = rx.recv().await {
                sink.write(record).await;
            }
        });
        Self {
            tx,
            dropped: Arc::new(AtomicU64::new(0)),
        }
    }

    /// Count of records dropped because the channel was full.
    pub fn dropped(&self) -> u64 {
        self.dropped.load(Ordering::Relaxed)
    }

    fn emit(&self, action: AuditAction, app: &str, stream: Option<&str>) {
        let record = AuditRecord {
            ts_ms: crate::bus::now_ms(),
            action,
            app: app.to_string(),
            stream: stream.map(str::to_string),
        };
        if self.tx.try_send(record).is_err() {
            self.dropped.fetch_add(1, Ordering::Relaxed);
        }
    }
}

impl Observer for AuditPipeline {
    fn on_event(&self, event: &StreamEvent) {
        self.emit(
            AuditAction::Event(format!("{:?}", event.kind)),
            event.app.as_str(),
            Some(event.stream_id.as_str()),
        );
    }
    fn on_publish_started(&self, app: &str) {
        self.emit(AuditAction::PublishStarted, app, None);
    }
    fn on_publish_ended(&self, app: &str) {
        self.emit(AuditAction::PublishEnded, app, None);
    }
    fn on_subscriber_evicted(&self, key: &StreamKey) {
        self.emit(
            AuditAction::SubscriberEvicted,
            key.app.as_str(),
            Some(key.stream_id.as_str()),
        );
    }
    fn on_rate_limited(&self, key: &StreamKey) {
        self.emit(
            AuditAction::RateLimited,
            key.app.as_str(),
            Some(key.stream_id.as_str()),
        );
    }
    fn on_stream_reaped(&self, key: &StreamKey) {
        self.emit(
            AuditAction::StreamReaped,
            key.app.as_str(),
            Some(key.stream_id.as_str()),
        );
    }
    // `on_frame` intentionally not audited — far too high-cardinality.
    fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {}
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Mutex;

    #[derive(Default)]
    struct MemSink {
        records: Arc<Mutex<Vec<AuditRecord>>>,
    }
    #[async_trait]
    impl AuditSink for MemSink {
        async fn write(&self, record: AuditRecord) {
            self.records.lock().unwrap().push(record);
        }
    }

    #[tokio::test]
    async fn lifecycle_hooks_are_audited() {
        let records = Arc::new(Mutex::new(Vec::new()));
        let sink = Arc::new(MemSink {
            records: Arc::clone(&records),
        });
        let pipe = AuditPipeline::new(sink, 16);

        pipe.on_publish_started("live");
        pipe.on_publish_ended("live");

        // Let the background worker drain.
        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
        let got = records.lock().unwrap();
        assert_eq!(got.len(), 2);
        assert_eq!(got[0].action, AuditAction::PublishStarted);
        assert_eq!(got[1].action, AuditAction::PublishEnded);
        assert_eq!(pipe.dropped(), 0);
    }
}