use crate::traits::{MediaSink, StorageBackend};
use crate::{MediaFrame, Result};
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
pub struct RecordingSink<S: StorageBackend> {
storage: S,
prefix: String,
clock: crate::segment::SegmentClock,
buf: BytesMut,
seq: u64,
}
impl<S: StorageBackend> RecordingSink<S> {
pub fn new(storage: S, prefix: impl Into<String>, chunk_duration_secs: u64) -> Self {
Self {
storage,
prefix: prefix.into(),
clock: crate::segment::SegmentClock::new(chunk_duration_secs),
buf: BytesMut::new(),
seq: 0,
}
}
fn chunk_key(&self, seq: u64) -> String {
format!("{}/chunk{}.bin", self.prefix, seq)
}
async fn flush_chunk(&mut self) -> Result<()> {
if self.buf.is_empty() {
return Ok(());
}
let key = self.chunk_key(self.seq);
let bytes = std::mem::take(&mut self.buf).freeze();
self.storage.put(&key, bytes).await?;
self.seq += 1;
Ok(())
}
}
#[async_trait]
impl<S: StorageBackend> MediaSink for RecordingSink<S> {
async fn send_frame(&mut self, frame: MediaFrame) -> Result<()> {
let decision = self.clock.observe(&frame);
if decision.skip {
return Ok(());
}
if decision.cut_previous.is_some() {
self.flush_chunk().await?;
}
self.buf.put_slice(&frame.data);
Ok(())
}
async fn flush(&mut self) -> Result<()> {
self.clock.flush();
self.flush_chunk().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::testing::{video_frame, InMemoryStorage};
#[tokio::test]
async fn records_chunks_on_keyframe_boundaries() {
let store = InMemoryStorage::new();
let mut rec = RecordingSink::new(store.clone(), "rec/cam", 2);
for i in 0..5 {
rec.send_frame(video_frame(i * 1000, true)).await.unwrap();
rec.send_frame(video_frame(i * 1000 + 500, false))
.await
.unwrap();
}
rec.flush().await.unwrap();
assert!(store.get("rec/cam/chunk0.bin").await.is_ok());
assert!(store.get("rec/cam/chunk1.bin").await.is_ok());
}
}