arcly-stream 0.1.7

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! In-memory helpers for exercising the engine in downstream tests.
//!
//! None of this is required at runtime; it exists so consumers can verify
//! their `MediaSource`/`MediaSink`/`Observer` implementations against a real
//! engine without standing up a socket.

use crate::bus::StreamEvent;
use crate::observe::Observer;
use crate::traits::{MediaSink, StorageBackend};
use crate::{CodecId, MediaFrame, Result, StreamError, StreamKey};
use async_trait::async_trait;
use bytes::Bytes;
use dashmap::DashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};

/// Build a video [`MediaFrame`] at `pts` (ms), keyframe or delta. A terse
/// fixture for tests and examples.
pub fn video_frame(pts: i64, keyframe: bool) -> MediaFrame {
    MediaFrame::new_video(
        pts,
        pts,
        Bytes::from_static(b"vframe"),
        CodecId::H264,
        keyframe,
    )
}

/// Build an audio [`MediaFrame`] at `pts` (ms).
pub fn audio_frame(pts: i64) -> MediaFrame {
    MediaFrame::new_audio(pts, Bytes::from_static(b"aframe"), CodecId::AAC)
}

/// An in-memory [`StorageBackend`] for testing packagers and recorders without
/// touching the filesystem. Cheap to clone (shared object map).
#[derive(Default, Clone)]
pub struct InMemoryStorage {
    objects: Arc<DashMap<String, Bytes>>,
}

impl InMemoryStorage {
    /// A new, empty store.
    pub fn new() -> Self {
        Self::default()
    }

    /// Number of stored objects.
    pub fn len(&self) -> usize {
        self.objects.len()
    }

    /// Whether the store is empty.
    pub fn is_empty(&self) -> bool {
        self.objects.is_empty()
    }
}

#[async_trait]
impl StorageBackend for InMemoryStorage {
    async fn put(&self, key: &str, data: Bytes) -> Result<()> {
        self.objects.insert(key.to_string(), data);
        Ok(())
    }
    async fn get(&self, key: &str) -> Result<Bytes> {
        self.objects
            .get(key)
            .map(|v| v.clone())
            .ok_or_else(|| StreamError::StorageNotFound(key.to_string()))
    }
    async fn delete(&self, key: &str) -> Result<()> {
        self.objects.remove(key);
        Ok(())
    }
    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
        Ok(self
            .objects
            .iter()
            .filter(|e| e.key().starts_with(prefix))
            .map(|e| e.key().clone())
            .collect())
    }
}

/// A [`MediaSink`] that records every frame it receives. Useful for asserting
/// what a pipeline produced.
#[derive(Default, Clone)]
pub struct CollectingSink {
    frames: Arc<Mutex<Vec<MediaFrame>>>,
}

impl CollectingSink {
    /// A new, empty collecting sink.
    pub fn new() -> Self {
        Self::default()
    }

    /// Snapshot the frames collected so far.
    pub fn frames(&self) -> Vec<MediaFrame> {
        self.frames
            .lock()
            .expect("collecting sink mutex poisoned")
            .clone()
    }

    /// Number of frames collected so far.
    pub fn len(&self) -> usize {
        self.frames
            .lock()
            .expect("collecting sink mutex poisoned")
            .len()
    }

    /// Whether no frames have been collected yet.
    pub fn is_empty(&self) -> bool {
        self.frames
            .lock()
            .expect("collecting sink mutex poisoned")
            .is_empty()
    }
}

#[async_trait]
impl MediaSink for CollectingSink {
    async fn send_frame(&mut self, frame: MediaFrame) -> Result<()> {
        self.frames
            .lock()
            .expect("collecting sink mutex poisoned")
            .push(frame);
        Ok(())
    }
}

/// A [`MediaSink`] that discards everything — the `/dev/null` of sinks.
#[derive(Debug, Default, Clone, Copy)]
pub struct NullSink;

#[async_trait]
impl MediaSink for NullSink {
    async fn send_frame(&mut self, _frame: MediaFrame) -> Result<()> {
        Ok(())
    }
}

/// An [`Observer`] that counts the events it sees. Cheap to clone (shared
/// counters) so a copy can be handed to the engine and another kept for asserts.
#[derive(Default, Clone)]
pub struct CountingObserver {
    inner: Arc<CountingInner>,
}

#[derive(Default)]
struct CountingInner {
    frames: AtomicU64,
    publish_started: AtomicU64,
    publish_ended: AtomicU64,
    events: AtomicU64,
    lagged: AtomicU64,
}

impl CountingObserver {
    /// A new observer with all counters at zero.
    pub fn new() -> Self {
        Self::default()
    }

    /// Total frames observed via [`Observer::on_frame`].
    pub fn frames(&self) -> u64 {
        self.inner.frames.load(Ordering::Relaxed)
    }
    /// Count of [`Observer::on_publish_started`] calls.
    pub fn publish_started(&self) -> u64 {
        self.inner.publish_started.load(Ordering::Relaxed)
    }
    /// Count of [`Observer::on_publish_ended`] calls.
    pub fn publish_ended(&self) -> u64 {
        self.inner.publish_ended.load(Ordering::Relaxed)
    }
    /// Count of [`Observer::on_event`] calls.
    pub fn events(&self) -> u64 {
        self.inner.events.load(Ordering::Relaxed)
    }
    /// Total frames reported dropped via [`Observer::on_subscriber_lagged`].
    pub fn lagged(&self) -> u64 {
        self.inner.lagged.load(Ordering::Relaxed)
    }
}

impl Observer for CountingObserver {
    fn on_event(&self, _event: &StreamEvent) {
        self.inner.events.fetch_add(1, Ordering::Relaxed);
    }
    fn on_publish_started(&self, _app: &str) {
        self.inner.publish_started.fetch_add(1, Ordering::Relaxed);
    }
    fn on_publish_ended(&self, _app: &str) {
        self.inner.publish_ended.fetch_add(1, Ordering::Relaxed);
    }
    fn on_frame(&self, _key: &StreamKey, _frame: &MediaFrame) {
        self.inner.frames.fetch_add(1, Ordering::Relaxed);
    }
    fn on_subscriber_lagged(&self, _key: &StreamKey, skipped: u64) {
        self.inner.lagged.fetch_add(skipped, Ordering::Relaxed);
    }
}