use crate::config::{SegmentConfig, SegmentFormat};
use crate::error::PackagerResult;
use bytes::{BufMut, BytesMut};
use std::time::Duration;
use tracing::{debug, trace};
#[derive(Debug, Clone)]
pub struct SegmentInfo {
pub index: u64,
pub duration: Duration,
pub size: u64,
pub path: String,
pub keyframe: bool,
pub timestamp: Duration,
}
#[derive(Debug, Clone)]
pub struct Keyframe {
pub timestamp: Duration,
pub position: u64,
pub size: u32,
}
trait BytesMutExt {
fn put_u24(&mut self, n: u32);
}
impl BytesMutExt for BytesMut {
fn put_u24(&mut self, n: u32) {
let bytes = [(n >> 16) as u8, (n >> 8) as u8, n as u8];
self.put_slice(&bytes);
}
}
pub struct SegmentGenerator {
config: SegmentConfig,
segment_index: u64,
current_duration: Duration,
current_data: BytesMut,
keyframes: Vec<Keyframe>,
}
impl SegmentGenerator {
#[must_use]
pub fn new(config: SegmentConfig) -> Self {
Self {
config,
segment_index: 0,
current_duration: Duration::ZERO,
current_data: BytesMut::new(),
keyframes: Vec::new(),
}
}
pub fn add_frame(
&mut self,
data: &[u8],
is_keyframe: bool,
timestamp: Duration,
) -> PackagerResult<Option<SegmentInfo>> {
trace!(
"Adding frame: size={}, keyframe={}",
data.len(),
is_keyframe
);
if is_keyframe {
self.keyframes.push(Keyframe {
timestamp,
position: self.current_data.len() as u64,
size: data.len() as u32,
});
}
self.current_data.extend_from_slice(data);
self.current_duration = timestamp;
if self.should_finalize_segment(is_keyframe, timestamp) {
return self.finalize_segment();
}
Ok(None)
}
fn should_finalize_segment(&self, is_keyframe: bool, timestamp: Duration) -> bool {
if self.current_data.is_empty() {
return false;
}
if self.config.keyframe_alignment && !is_keyframe {
return false;
}
timestamp >= self.config.duration
}
fn finalize_segment(&mut self) -> PackagerResult<Option<SegmentInfo>> {
if self.current_data.is_empty() {
return Ok(None);
}
debug!(
"Finalizing segment {}: {} bytes, duration: {:?}",
self.segment_index,
self.current_data.len(),
self.current_duration
);
let segment_data = match self.config.format {
SegmentFormat::MpegTs => self.create_mpegts_segment()?,
SegmentFormat::Fmp4 => self.create_fmp4_segment()?,
SegmentFormat::Cmaf => self.create_cmaf_segment()?,
};
let segment_info = SegmentInfo {
index: self.segment_index,
duration: self.current_duration,
size: segment_data.len() as u64,
path: self.get_segment_path(),
keyframe: !self.keyframes.is_empty(),
timestamp: self.current_duration,
};
self.segment_index += 1;
self.current_data.clear();
self.current_duration = Duration::ZERO;
self.keyframes.clear();
Ok(Some(segment_info))
}
fn create_mpegts_segment(&self) -> PackagerResult<Vec<u8>> {
debug!("Creating MPEG-TS segment");
let mut output = BytesMut::new();
const TS_PACKET_SIZE: usize = 188;
const SYNC_BYTE: u8 = 0x47;
self.write_pat(&mut output)?;
self.write_pmt(&mut output)?;
let payload = &self.current_data[..];
let mut offset = 0;
while offset < payload.len() {
let chunk_size = (payload.len() - offset).min(TS_PACKET_SIZE - 4);
output.put_u8(SYNC_BYTE);
output.put_u8(0x40); output.put_u8(0x01); output.put_u8(0x10);
output.put_slice(&payload[offset..offset + chunk_size]);
let padding = TS_PACKET_SIZE - 4 - chunk_size;
output.put_bytes(0xFF, padding);
offset += chunk_size;
}
Ok(output.to_vec())
}
fn write_pat(&self, output: &mut BytesMut) -> PackagerResult<()> {
const TS_PACKET_SIZE: usize = 188;
const SYNC_BYTE: u8 = 0x47;
output.put_u8(SYNC_BYTE);
output.put_u8(0x40); output.put_u8(0x00); output.put_u8(0x10);
output.put_u8(0x00); output.put_u8(0x00); output.put_u8(0xB0); output.put_u8(0x0D);
output.put_u16(0x0001);
output.put_u8(0xC1);
output.put_u8(0x00);
output.put_u8(0x00);
output.put_u16(0x0001);
output.put_u16(0xE100);
output.put_u32(0x00000000);
let padding = TS_PACKET_SIZE - output.len();
output.put_bytes(0xFF, padding);
Ok(())
}
fn write_pmt(&self, output: &mut BytesMut) -> PackagerResult<()> {
const TS_PACKET_SIZE: usize = 188;
const SYNC_BYTE: u8 = 0x47;
output.put_u8(SYNC_BYTE);
output.put_u8(0x41); output.put_u8(0x00); output.put_u8(0x10);
output.put_u8(0x00); output.put_u8(0x02); output.put_u8(0xB0); output.put_u8(0x12);
output.put_u16(0x0001);
output.put_u8(0xC1);
output.put_u8(0x00);
output.put_u8(0x00);
output.put_u16(0xE101);
output.put_u16(0xF000);
output.put_u8(0x1B); output.put_u16(0xE101); output.put_u16(0xF000);
output.put_u32(0x00000000);
let padding = TS_PACKET_SIZE - output.len();
output.put_bytes(0xFF, padding);
Ok(())
}
fn create_fmp4_segment(&self) -> PackagerResult<Vec<u8>> {
debug!("Creating fMP4 segment");
let mut output = BytesMut::new();
if self.config.fast_start {
self.write_moof(&mut output)?;
self.write_mdat(&mut output)?;
} else {
self.write_mdat(&mut output)?;
self.write_moof(&mut output)?;
}
Ok(output.to_vec())
}
fn write_moof(&self, output: &mut BytesMut) -> PackagerResult<()> {
let moof_start = output.len();
output.put_u32(0); output.put_slice(b"moof");
self.write_mfhd(output)?;
self.write_traf(output)?;
let moof_size = output.len() - moof_start;
let size_bytes = (moof_size as u32).to_be_bytes();
output[moof_start..moof_start + 4].copy_from_slice(&size_bytes);
Ok(())
}
fn write_mfhd(&self, output: &mut BytesMut) -> PackagerResult<()> {
output.put_u32(16); output.put_slice(b"mfhd");
output.put_u8(0); output.put_u24(0); output.put_u32(self.segment_index as u32);
Ok(())
}
fn write_traf(&self, output: &mut BytesMut) -> PackagerResult<()> {
let traf_start = output.len();
output.put_u32(0); output.put_slice(b"traf");
self.write_tfhd(output)?;
self.write_tfdt(output)?;
self.write_trun(output)?;
let traf_size = output.len() - traf_start;
let size_bytes = (traf_size as u32).to_be_bytes();
output[traf_start..traf_start + 4].copy_from_slice(&size_bytes);
Ok(())
}
fn write_tfhd(&self, output: &mut BytesMut) -> PackagerResult<()> {
output.put_u32(16); output.put_slice(b"tfhd");
output.put_u8(0); output.put_u24(0x020000); output.put_u32(1);
Ok(())
}
fn write_tfdt(&self, output: &mut BytesMut) -> PackagerResult<()> {
output.put_u32(20); output.put_slice(b"tfdt");
output.put_u8(1); output.put_u24(0); output.put_u64(self.current_duration.as_millis() as u64);
Ok(())
}
fn write_trun(&self, output: &mut BytesMut) -> PackagerResult<()> {
let sample_count = 1;
output.put_u32(20); output.put_slice(b"trun");
output.put_u8(0); output.put_u24(0x000001); output.put_u32(sample_count); output.put_u32(0);
Ok(())
}
fn write_mdat(&self, output: &mut BytesMut) -> PackagerResult<()> {
let mdat_size = 8 + self.current_data.len();
output.put_u32(mdat_size as u32); output.put_slice(b"mdat");
output.put_slice(&self.current_data);
Ok(())
}
fn create_cmaf_segment(&self) -> PackagerResult<Vec<u8>> {
debug!("Creating CMAF segment");
let mut output = BytesMut::new();
self.write_styp(&mut output)?;
self.write_moof(&mut output)?;
self.write_mdat(&mut output)?;
Ok(output.to_vec())
}
fn write_styp(&self, output: &mut BytesMut) -> PackagerResult<()> {
output.put_u32(24); output.put_slice(b"styp");
output.put_slice(b"cmfc"); output.put_u32(0); output.put_slice(b"iso6"); output.put_slice(b"cmfc");
Ok(())
}
fn get_segment_path(&self) -> String {
match self.config.format {
SegmentFormat::MpegTs => format!("segment_{}.ts", self.segment_index),
SegmentFormat::Fmp4 => format!("segment_{}.m4s", self.segment_index),
SegmentFormat::Cmaf => format!("chunk_{}.m4s", self.segment_index),
}
}
#[must_use]
pub fn segment_index(&self) -> u64 {
self.segment_index
}
pub fn reset(&mut self) {
self.segment_index = 0;
self.current_duration = Duration::ZERO;
self.current_data.clear();
self.keyframes.clear();
}
}
pub struct SegmentWriter {
output_dir: std::path::PathBuf,
}
impl SegmentWriter {
#[must_use]
pub fn new(output_dir: std::path::PathBuf) -> Self {
Self { output_dir }
}
pub async fn write_segment(&self, segment: &SegmentInfo, data: &[u8]) -> PackagerResult<()> {
let path = self.output_dir.join(&segment.path);
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(&path, data).await?;
debug!(
"Wrote segment {} to {} ({} bytes)",
segment.index,
path.display(),
data.len()
);
Ok(())
}
pub async fn cleanup_old_segments(
&self,
current_index: u64,
max_segments: usize,
) -> PackagerResult<()> {
if current_index <= max_segments as u64 {
return Ok(());
}
let delete_index = current_index - max_segments as u64;
for ext in &["ts", "m4s"] {
let path = self
.output_dir
.join(format!("segment_{delete_index}.{ext}"));
if path.exists() {
tokio::fs::remove_file(&path).await?;
debug!("Deleted old segment: {}", path.display());
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_segment_generator_creation() {
let config = SegmentConfig::default();
let generator = SegmentGenerator::new(config);
assert_eq!(generator.segment_index(), 0);
}
#[test]
fn test_segment_path_generation() {
let mut config = SegmentConfig::default();
config.format = SegmentFormat::MpegTs;
let generator = SegmentGenerator::new(config);
let path = generator.get_segment_path();
assert_eq!(path, "segment_0.ts");
}
}