Skip to main content

arcly_stream/
record.rs

1//! Continuous recording: a [`MediaSink`] that persists frames through a
2//! [`StorageBackend`].
3//!
4//! Gated behind `record`. [`RecordingSink`] buffers frame payloads and flushes
5//! them to storage as fixed-duration objects, giving a simple VOD/DVR artifact
6//! without coupling to any container format (pair it with a real [`Muxer`] when
7//! player-compatible output is required).
8//!
9//! [`Muxer`]: crate::packager::Muxer
10
11use crate::traits::{MediaSink, StorageBackend};
12use crate::{MediaFrame, Result};
13use async_trait::async_trait;
14use bytes::{BufMut, BytesMut};
15
16/// Records a stream to sequential objects under a storage prefix.
17///
18/// Each object spans up to `chunk_duration` seconds (cut on the first keyframe
19/// at or past the boundary, so chunks remain independently decodable).
20pub struct RecordingSink<S: StorageBackend> {
21    storage: S,
22    prefix: String,
23    clock: crate::segment::SegmentClock,
24    buf: BytesMut,
25    seq: u64,
26}
27
28impl<S: StorageBackend> RecordingSink<S> {
29    /// New recorder writing `chunk_duration`-second objects under `prefix`.
30    pub fn new(storage: S, prefix: impl Into<String>, chunk_duration_secs: u64) -> Self {
31        Self {
32            storage,
33            prefix: prefix.into(),
34            clock: crate::segment::SegmentClock::new(chunk_duration_secs),
35            buf: BytesMut::new(),
36            seq: 0,
37        }
38    }
39
40    fn chunk_key(&self, seq: u64) -> String {
41        format!("{}/chunk{}.bin", self.prefix, seq)
42    }
43
44    async fn flush_chunk(&mut self) -> Result<()> {
45        if self.buf.is_empty() {
46            return Ok(());
47        }
48        let key = self.chunk_key(self.seq);
49        let bytes = std::mem::take(&mut self.buf).freeze();
50        self.storage.put(&key, bytes).await?;
51        self.seq += 1;
52        Ok(())
53    }
54}
55
56#[async_trait]
57impl<S: StorageBackend> MediaSink for RecordingSink<S> {
58    async fn send_frame(&mut self, frame: MediaFrame) -> Result<()> {
59        let decision = self.clock.observe(&frame);
60        if decision.skip {
61            return Ok(());
62        }
63        // A new keyframe past the duration boundary flushes the prior chunk; the
64        // chunk's byte length is what matters here, not its measured duration.
65        if decision.cut_previous.is_some() {
66            self.flush_chunk().await?;
67        }
68        self.buf.put_slice(&frame.data);
69        Ok(())
70    }
71
72    async fn flush(&mut self) -> Result<()> {
73        self.clock.flush();
74        self.flush_chunk().await
75    }
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81    use crate::testing::{video_frame, InMemoryStorage};
82
83    #[tokio::test]
84    async fn records_chunks_on_keyframe_boundaries() {
85        let store = InMemoryStorage::new();
86        let mut rec = RecordingSink::new(store.clone(), "rec/cam", 2);
87
88        for i in 0..5 {
89            rec.send_frame(video_frame(i * 1000, true)).await.unwrap();
90            rec.send_frame(video_frame(i * 1000 + 500, false))
91                .await
92                .unwrap();
93        }
94        rec.flush().await.unwrap();
95
96        // 2s chunks cut at keyframes 2000 and 4000, plus the flushed tail.
97        assert!(store.get("rec/cam/chunk0.bin").await.is_ok());
98        assert!(store.get("rec/cam/chunk1.bin").await.is_ok());
99    }
100}