use std::path::PathBuf;
use tokio::sync::mpsc;
#[derive(Debug, Clone)]
pub struct ProducedSegment {
pub sequence: u64,
pub data: Vec<u8>,
pub duration_secs: f64,
pub is_init: bool,
pub path_hint: Option<PathBuf>,
}
impl ProducedSegment {
#[must_use]
pub fn media(sequence: u64, data: Vec<u8>, duration_secs: f64) -> Self {
Self {
sequence,
data,
duration_secs,
is_init: false,
path_hint: None,
}
}
#[must_use]
pub fn init(data: Vec<u8>) -> Self {
Self {
sequence: 0,
data,
duration_secs: 0.0,
is_init: true,
path_hint: None,
}
}
#[must_use]
pub fn with_path(mut self, path: PathBuf) -> Self {
self.path_hint = Some(path);
self
}
#[must_use]
pub fn byte_len(&self) -> usize {
self.data.len()
}
}
#[derive(Debug, Clone)]
pub struct SegmentStreamConfig {
pub channel_depth: usize,
pub auto_write: bool,
pub pre_allocate: bool,
}
impl Default for SegmentStreamConfig {
fn default() -> Self {
Self {
channel_depth: 8,
auto_write: true,
pre_allocate: false,
}
}
}
#[derive(Clone, Debug)]
pub struct SegmentSender {
tx: mpsc::Sender<ProducedSegment>,
}
impl SegmentSender {
pub async fn send(&self, segment: ProducedSegment) -> Result<(), String> {
self.tx
.send(segment)
.await
.map_err(|e| format!("SegmentSender: channel closed: {e}"))
}
pub fn try_send(&self, segment: ProducedSegment) -> Result<(), String> {
self.tx
.try_send(segment)
.map_err(|e| format!("SegmentSender::try_send: {e}"))
}
}
pub struct SegmentStream {
rx: mpsc::Receiver<ProducedSegment>,
config: SegmentStreamConfig,
bytes_written: u64,
segments_received: u64,
}
impl SegmentStream {
#[must_use]
pub fn new(config: SegmentStreamConfig) -> (Self, SegmentSender) {
let (tx, rx) = mpsc::channel(config.channel_depth);
let stream = Self {
rx,
config,
bytes_written: 0,
segments_received: 0,
};
let sender = SegmentSender { tx };
(stream, sender)
}
pub async fn next(&mut self) -> Option<ProducedSegment> {
let segment = self.rx.recv().await?;
self.segments_received += 1;
self.bytes_written += segment.byte_len() as u64;
if self.config.auto_write {
if let Some(path) = &segment.path_hint {
let path = path.clone();
let data = segment.data.clone();
let pre_alloc = self.config.pre_allocate;
if let Err(e) = write_segment_to_disk(&path, &data, pre_alloc).await {
tracing::warn!(
"SegmentStream: failed to write segment to {:?}: {}",
path,
e
);
}
}
}
Some(segment)
}
pub async fn drain(&mut self) {
while self.rx.recv().await.is_some() {
}
}
#[must_use]
pub fn bytes_written(&self) -> u64 {
self.bytes_written
}
#[must_use]
pub fn segments_received(&self) -> u64 {
self.segments_received
}
}
pub fn pre_allocate_file(path: &std::path::Path, expected_bytes: u64) -> std::io::Result<()> {
use std::fs::OpenOptions;
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)?;
file.set_len(expected_bytes)?;
Ok(())
}
async fn write_segment_to_disk(
path: &std::path::Path,
data: &[u8],
pre_allocate: bool,
) -> std::io::Result<()> {
use tokio::io::AsyncWriteExt;
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file = tokio::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)
.await?;
if pre_allocate && !data.is_empty() {
file.set_len(data.len() as u64).await?;
}
let mut writer = tokio::io::BufWriter::new(file);
writer.write_all(data).await?;
writer.flush().await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_produced_segment_media() {
let seg = ProducedSegment::media(1, vec![0xAB; 100], 6.0);
assert_eq!(seg.sequence, 1);
assert_eq!(seg.byte_len(), 100);
assert!(!seg.is_init);
assert_eq!(seg.duration_secs, 6.0);
}
#[test]
fn test_produced_segment_init() {
let init = ProducedSegment::init(vec![0xFF; 256]);
assert!(init.is_init);
assert_eq!(init.sequence, 0);
assert_eq!(init.byte_len(), 256);
}
#[test]
fn test_produced_segment_with_path() {
let seg = ProducedSegment::media(0, vec![], 6.0)
.with_path(std::env::temp_dir().join("oximedia-packager-stream-seg0.ts"));
assert!(seg.path_hint.is_some());
}
#[test]
fn test_segment_stream_config_default() {
let cfg = SegmentStreamConfig::default();
assert_eq!(cfg.channel_depth, 8);
assert!(cfg.auto_write);
assert!(!cfg.pre_allocate);
}
#[tokio::test]
async fn test_segment_stream_send_recv() {
let config = SegmentStreamConfig {
auto_write: false, ..Default::default()
};
let (mut stream, tx) = SegmentStream::new(config);
let seg = ProducedSegment::media(0, vec![1, 2, 3], 6.0);
tx.send(seg).await.expect("send ok");
drop(tx);
let received = stream.next().await.expect("should receive segment");
assert_eq!(received.sequence, 0);
assert_eq!(received.data, vec![1, 2, 3]);
assert!(stream.next().await.is_none());
assert_eq!(stream.segments_received(), 1);
assert_eq!(stream.bytes_written(), 3);
}
#[tokio::test]
async fn test_segment_stream_multiple_segments() {
let config = SegmentStreamConfig {
auto_write: false,
..Default::default()
};
let (mut stream, tx) = SegmentStream::new(config);
for i in 0..5u64 {
let seg = ProducedSegment::media(i, vec![i as u8; 100], 6.0);
tx.send(seg).await.expect("send ok");
}
drop(tx);
let mut count = 0u64;
while let Some(seg) = stream.next().await {
assert_eq!(seg.sequence, count);
count += 1;
}
assert_eq!(count, 5);
assert_eq!(stream.segments_received(), 5);
}
#[tokio::test]
async fn test_segment_stream_drain() {
let config = SegmentStreamConfig {
auto_write: false,
..Default::default()
};
let (mut stream, tx) = SegmentStream::new(config);
for i in 0..3u64 {
tx.send(ProducedSegment::media(i, vec![0; 10], 6.0))
.await
.expect("send ok");
}
drop(tx);
stream.drain().await;
assert!(stream.next().await.is_none());
}
#[tokio::test]
async fn test_sender_try_send() {
let config = SegmentStreamConfig {
channel_depth: 2,
auto_write: false,
..Default::default()
};
let (_stream, tx) = SegmentStream::new(config);
assert!(tx.try_send(ProducedSegment::media(0, vec![], 0.0)).is_ok());
assert!(tx.try_send(ProducedSegment::media(1, vec![], 0.0)).is_ok());
assert!(tx.try_send(ProducedSegment::media(2, vec![], 0.0)).is_err());
}
#[tokio::test]
async fn test_auto_write_with_path() {
let tmp_dir = std::env::temp_dir().join("oximedia_stream_test");
tokio::fs::create_dir_all(&tmp_dir)
.await
.expect("create dir");
let path = tmp_dir.join("seg0.ts");
let data = vec![0xDE, 0xAD, 0xBE, 0xEF];
let config = SegmentStreamConfig {
auto_write: true,
pre_allocate: false,
..Default::default()
};
let (mut stream, tx) = SegmentStream::new(config);
let seg = ProducedSegment::media(0, data.clone(), 6.0).with_path(path.clone());
tx.send(seg).await.expect("send ok");
drop(tx);
stream.next().await.expect("segment");
let written = tokio::fs::read(&path).await.expect("read written file");
assert_eq!(written, data);
let _ = tokio::fs::remove_file(&path).await;
}
#[test]
fn test_pre_allocate_file_creates_correct_size() {
let path =
std::env::temp_dir().join(format!("oximedia_prealloc_{}.bin", std::process::id()));
let expected = 4096u64;
pre_allocate_file(&path, expected).expect("pre_allocate_file must succeed");
let meta = std::fs::metadata(&path).expect("file must exist after pre-allocation");
assert_eq!(
meta.len(),
expected,
"pre-allocated file must have the requested size"
);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_pre_allocate_file_zero_size() {
let path =
std::env::temp_dir().join(format!("oximedia_prealloc_zero_{}.bin", std::process::id()));
pre_allocate_file(&path, 0).expect("pre_allocate_file with size 0 must succeed");
let meta = std::fs::metadata(&path).expect("file must exist");
assert_eq!(meta.len(), 0);
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_pre_allocate_file_truncates_existing() {
let path = std::env::temp_dir().join(format!(
"oximedia_prealloc_trunc_{}.bin",
std::process::id()
));
std::fs::write(&path, vec![0xABu8; 8192]).expect("write initial data");
pre_allocate_file(&path, 1024).expect("pre_allocate_file truncate must succeed");
let meta = std::fs::metadata(&path).expect("file must exist");
assert_eq!(meta.len(), 1024, "file must be truncated to 1024 bytes");
let _ = std::fs::remove_file(&path);
}
#[test]
fn test_pre_allocate_file_creates_parent_dir() {
let dir =
std::env::temp_dir().join(format!("oximedia_prealloc_dir_{}", std::process::id()));
let path = dir.join("segment.ts");
pre_allocate_file(&path, 512).expect("pre_allocate_file must create parent dir");
assert!(dir.exists(), "parent directory must be created");
let meta = std::fs::metadata(&path).expect("file must exist");
assert_eq!(meta.len(), 512);
let _ = std::fs::remove_file(&path);
let _ = std::fs::remove_dir(&dir);
}
}