arcly-stream 0.1.3

An open-extensible live-media streaming kernel: lock-free zero-copy frame fan-out, instant-start GOP cache, a pluggable multi-protocol ingestion layer (RTMP, RTSP, SRT, WHIP/WHEP shipped), and a feature-gated pure-Rust media plane (MPEG-TS/HLS/fMP4) — runtime, config, and metrics free.
Documentation
//! Continuous recording: a [`MediaSink`] that persists frames through a
//! [`StorageBackend`].
//!
//! Gated behind `record`. [`RecordingSink`] buffers frame payloads and flushes
//! them to storage as fixed-duration objects, giving a simple VOD/DVR artifact
//! without coupling to any container format (pair it with a real [`Muxer`] when
//! player-compatible output is required).
//!
//! [`Muxer`]: crate::packager::Muxer

use crate::traits::{MediaSink, StorageBackend};
use crate::{MediaFrame, Result};
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};

/// Records a stream to sequential objects under a storage prefix.
///
/// Each object spans up to `chunk_duration` seconds (cut on the first keyframe
/// at or past the boundary, so chunks remain independently decodable).
pub struct RecordingSink<S: StorageBackend> {
    storage: S,
    prefix: String,
    clock: crate::segment::SegmentClock,
    buf: BytesMut,
    seq: u64,
}

impl<S: StorageBackend> RecordingSink<S> {
    /// New recorder writing `chunk_duration`-second objects under `prefix`.
    pub fn new(storage: S, prefix: impl Into<String>, chunk_duration_secs: u64) -> Self {
        Self {
            storage,
            prefix: prefix.into(),
            clock: crate::segment::SegmentClock::new(chunk_duration_secs),
            buf: BytesMut::new(),
            seq: 0,
        }
    }

    fn chunk_key(&self, seq: u64) -> String {
        format!("{}/chunk{}.bin", self.prefix, seq)
    }

    async fn flush_chunk(&mut self) -> Result<()> {
        if self.buf.is_empty() {
            return Ok(());
        }
        let key = self.chunk_key(self.seq);
        let bytes = std::mem::take(&mut self.buf).freeze();
        self.storage.put(&key, bytes).await?;
        self.seq += 1;
        Ok(())
    }
}

#[async_trait]
impl<S: StorageBackend> MediaSink for RecordingSink<S> {
    async fn send_frame(&mut self, frame: MediaFrame) -> Result<()> {
        let decision = self.clock.observe(&frame);
        if decision.skip {
            return Ok(());
        }
        // A new keyframe past the duration boundary flushes the prior chunk; the
        // chunk's byte length is what matters here, not its measured duration.
        if decision.cut_previous.is_some() {
            self.flush_chunk().await?;
        }
        self.buf.put_slice(&frame.data);
        Ok(())
    }

    async fn flush(&mut self) -> Result<()> {
        self.clock.flush();
        self.flush_chunk().await
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::testing::{video_frame, InMemoryStorage};

    #[tokio::test]
    async fn records_chunks_on_keyframe_boundaries() {
        let store = InMemoryStorage::new();
        let mut rec = RecordingSink::new(store.clone(), "rec/cam", 2);

        for i in 0..5 {
            rec.send_frame(video_frame(i * 1000, true)).await.unwrap();
            rec.send_frame(video_frame(i * 1000 + 500, false))
                .await
                .unwrap();
        }
        rec.flush().await.unwrap();

        // 2s chunks cut at keyframes 2000 and 4000, plus the flushed tail.
        assert!(store.get("rec/cam/chunk0.bin").await.is_ok());
        assert!(store.get("rec/cam/chunk1.bin").await.is_ok());
    }
}