1use crate::traits::{MediaSink, StorageBackend};
12use crate::{MediaFrame, Result};
13use async_trait::async_trait;
14use bytes::{BufMut, BytesMut};
15
16pub struct RecordingSink<S: StorageBackend> {
21 storage: S,
22 prefix: String,
23 clock: crate::segment::SegmentClock,
24 buf: BytesMut,
25 seq: u64,
26}
27
28impl<S: StorageBackend> RecordingSink<S> {
29 pub fn new(storage: S, prefix: impl Into<String>, chunk_duration_secs: u64) -> Self {
31 Self {
32 storage,
33 prefix: prefix.into(),
34 clock: crate::segment::SegmentClock::new(chunk_duration_secs),
35 buf: BytesMut::new(),
36 seq: 0,
37 }
38 }
39
40 fn chunk_key(&self, seq: u64) -> String {
41 format!("{}/chunk{}.bin", self.prefix, seq)
42 }
43
44 async fn flush_chunk(&mut self) -> Result<()> {
45 if self.buf.is_empty() {
46 return Ok(());
47 }
48 let key = self.chunk_key(self.seq);
49 let bytes = std::mem::take(&mut self.buf).freeze();
50 self.storage.put(&key, bytes).await?;
51 self.seq += 1;
52 Ok(())
53 }
54}
55
56#[async_trait]
57impl<S: StorageBackend> MediaSink for RecordingSink<S> {
58 async fn send_frame(&mut self, frame: MediaFrame) -> Result<()> {
59 let decision = self.clock.observe(&frame);
60 if decision.skip {
61 return Ok(());
62 }
63 if decision.cut_previous.is_some() {
66 self.flush_chunk().await?;
67 }
68 self.buf.put_slice(&frame.data);
69 Ok(())
70 }
71
72 async fn flush(&mut self) -> Result<()> {
73 self.clock.flush();
74 self.flush_chunk().await
75 }
76}
77
78#[cfg(test)]
79mod tests {
80 use super::*;
81 use crate::testing::{video_frame, InMemoryStorage};
82
83 #[tokio::test]
84 async fn records_chunks_on_keyframe_boundaries() {
85 let store = InMemoryStorage::new();
86 let mut rec = RecordingSink::new(store.clone(), "rec/cam", 2);
87
88 for i in 0..5 {
89 rec.send_frame(video_frame(i * 1000, true)).await.unwrap();
90 rec.send_frame(video_frame(i * 1000 + 500, false))
91 .await
92 .unwrap();
93 }
94 rec.flush().await.unwrap();
95
96 assert!(store.get("rec/cam/chunk0.bin").await.is_ok());
98 assert!(store.get("rec/cam/chunk1.bin").await.is_ok());
99 }
100}