use crate::error::{NetError, NetResult};
use std::collections::VecDeque;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone)]
pub struct LlHlsSegmenterConfig {
pub part_duration: Duration,
pub parts_per_segment: u32,
pub uri_prefix: String,
pub mark_independent: bool,
pub window_size: usize,
}
impl Default for LlHlsSegmenterConfig {
fn default() -> Self {
Self {
part_duration: Duration::from_millis(200),
parts_per_segment: 30, uri_prefix: "seg".to_owned(),
mark_independent: true,
window_size: 5,
}
}
}
impl LlHlsSegmenterConfig {
#[must_use]
pub fn with_part_duration(part_duration_ms: u64) -> Self {
Self {
part_duration: Duration::from_millis(part_duration_ms),
..Self::default()
}
}
#[must_use]
pub fn segment_duration_secs(&self) -> f64 {
self.part_duration.as_secs_f64() * f64::from(self.parts_per_segment)
}
#[must_use]
pub fn part_duration_secs(&self) -> f64 {
self.part_duration.as_secs_f64()
}
}
#[derive(Debug, Clone)]
pub struct HlsPartialSegment {
pub segment_sequence: u64,
pub part_index: u32,
pub uri: String,
pub duration_secs: f64,
pub independent: bool,
pub data: Vec<u8>,
pub created_at: SystemTime,
pub is_last_in_segment: bool,
}
impl HlsPartialSegment {
#[must_use]
pub fn to_ext_x_part_tag(&self) -> String {
let mut tag = format!(
"#EXT-X-PART:DURATION={:.5},URI=\"{}\"",
self.duration_secs, self.uri
);
if self.independent {
tag.push_str(",INDEPENDENT=YES");
}
tag
}
#[must_use]
pub fn parent_segment_uri(&self) -> String {
format!("seg{}.mp4", self.segment_sequence)
}
}
#[derive(Debug, Clone)]
pub struct CompletedSegment {
pub sequence: u64,
pub uri: String,
pub duration_secs: f64,
pub parts: Vec<HlsPartialSegment>,
pub finalized_at: SystemTime,
}
impl CompletedSegment {
#[must_use]
pub fn to_m3u8_lines(&self) -> String {
let mut out = String::new();
for part in &self.parts {
out.push_str(&part.to_ext_x_part_tag());
out.push('\n');
}
out.push_str(&format!("#EXTINF:{:.5},\n{}\n", self.duration_secs, self.uri));
out
}
}
#[derive(Debug, Clone)]
pub struct MediaFrame {
pub data: Vec<u8>,
pub pts_ms: u64,
pub is_keyframe: bool,
}
impl MediaFrame {
#[must_use]
pub fn new(data: Vec<u8>, pts_ms: u64, is_keyframe: bool) -> Self {
Self {
data,
pts_ms,
is_keyframe,
}
}
}
#[derive(Debug, Default)]
struct PartAccumulator {
data: Vec<u8>,
duration_ms: u64,
has_keyframe: bool,
frame_count: u32,
start_pts_ms: Option<u64>,
}
impl PartAccumulator {
fn reset(&mut self) {
self.data.clear();
self.duration_ms = 0;
self.has_keyframe = false;
self.frame_count = 0;
self.start_pts_ms = None;
}
fn is_empty(&self) -> bool {
self.frame_count == 0
}
fn push_frame(&mut self, frame: &MediaFrame, frame_duration_ms: u64) {
if self.start_pts_ms.is_none() {
self.start_pts_ms = Some(frame.pts_ms);
}
self.data.extend_from_slice(&frame.data);
self.duration_ms += frame_duration_ms;
self.frame_count += 1;
if frame.is_keyframe {
self.has_keyframe = true;
}
}
}
pub struct LlHlsSegmenter {
config: LlHlsSegmenterConfig,
segment_seq: u64,
part_index: u32,
accumulator: PartAccumulator,
current_segment_parts: Vec<HlsPartialSegment>,
completed: VecDeque<CompletedSegment>,
ready_parts: VecDeque<HlsPartialSegment>,
force_split_on_keyframe: bool,
last_pts_ms: Option<u64>,
default_frame_duration_ms: u64,
}
impl LlHlsSegmenter {
#[must_use]
pub fn new(config: LlHlsSegmenterConfig) -> Self {
Self {
config,
segment_seq: 0,
part_index: 0,
accumulator: PartAccumulator::default(),
current_segment_parts: Vec::new(),
completed: VecDeque::new(),
ready_parts: VecDeque::new(),
force_split_on_keyframe: true,
default_frame_duration_ms: 33, }
}
#[must_use]
pub fn default_200ms() -> Self {
Self::new(LlHlsSegmenterConfig::default())
}
#[must_use]
pub fn default_100ms() -> Self {
Self::new(LlHlsSegmenterConfig::with_part_duration(100))
}
#[must_use]
pub fn current_segment_seq(&self) -> u64 {
self.segment_seq
}
#[must_use]
pub fn current_part_index(&self) -> u32 {
self.part_index
}
#[must_use]
pub fn completed_segment_count(&self) -> usize {
self.completed.len()
}
#[must_use]
pub fn ready_part_count(&self) -> usize {
self.ready_parts.len()
}
pub fn push_frame(&mut self, frame: MediaFrame) {
let frame_dur = match self.last_pts_ms {
Some(prev) => frame.pts_ms.saturating_sub(prev).max(1),
None => self.default_frame_duration_ms,
};
self.last_pts_ms = Some(frame.pts_ms);
if self.force_split_on_keyframe && frame.is_keyframe && !self.accumulator.is_empty() {
self.flush_partial();
}
self.accumulator.push_frame(&frame, frame_dur);
let target_ms = self.config.part_duration.as_millis() as u64;
if self.accumulator.duration_ms >= target_ms {
self.flush_partial();
}
}
pub fn flush(&mut self) {
if !self.accumulator.is_empty() {
self.flush_partial();
}
}
pub fn drain_parts(&mut self) -> Vec<HlsPartialSegment> {
self.ready_parts.drain(..).collect()
}
pub fn take_completed_segment(&mut self) -> Option<CompletedSegment> {
self.completed.pop_front()
}
#[must_use]
pub fn completed_segments(&self) -> &VecDeque<CompletedSegment> {
&self.completed
}
#[must_use]
pub fn preload_hint_uri(&self) -> String {
format!(
"{}{}_part{}.mp4",
self.config.uri_prefix,
self.segment_seq,
self.part_index
)
}
fn flush_partial(&mut self) {
if self.accumulator.is_empty() {
return;
}
let duration_secs = self.accumulator.duration_ms as f64 / 1000.0;
let independent = self.config.mark_independent && self.accumulator.has_keyframe;
let uri = format!(
"{}{}_part{}.mp4",
self.config.uri_prefix, self.segment_seq, self.part_index
);
let is_last = self.part_index + 1 >= self.config.parts_per_segment;
let part = HlsPartialSegment {
segment_sequence: self.segment_seq,
part_index: self.part_index,
uri,
duration_secs,
independent,
data: std::mem::take(&mut self.accumulator.data),
created_at: SystemTime::now(),
is_last_in_segment: is_last,
};
self.accumulator.reset();
self.part_index += 1;
self.current_segment_parts.push(part.clone());
self.ready_parts.push_back(part);
if is_last {
self.finalize_segment();
}
}
fn finalize_segment(&mut self) {
let seg_uri = format!("{}{}.mp4", self.config.uri_prefix, self.segment_seq);
let total_duration: f64 = self.current_segment_parts.iter().map(|p| p.duration_secs).sum();
let seg = CompletedSegment {
sequence: self.segment_seq,
uri: seg_uri,
duration_secs: total_duration,
parts: std::mem::take(&mut self.current_segment_parts),
finalized_at: SystemTime::now(),
};
self.completed.push_back(seg);
while self.completed.len() > self.config.window_size {
self.completed.pop_front();
}
self.segment_seq += 1;
self.part_index = 0;
}
}
impl std::fmt::Debug for LlHlsSegmenter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LlHlsSegmenter")
.field("segment_seq", &self.segment_seq)
.field("part_index", &self.part_index)
.field("ready_parts", &self.ready_parts.len())
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_frame(pts_ms: u64, is_keyframe: bool, size: usize) -> MediaFrame {
MediaFrame::new(vec![0u8; size], pts_ms, is_keyframe)
}
fn default_segmenter() -> LlHlsSegmenter {
let mut cfg = LlHlsSegmenterConfig::default();
cfg.parts_per_segment = 3; LlHlsSegmenter::new(cfg)
}
#[test]
fn test_config_segment_duration() {
let cfg = LlHlsSegmenterConfig::default();
let dur = cfg.segment_duration_secs();
assert!(dur > 0.0);
}
#[test]
fn test_config_part_duration_secs() {
let cfg = LlHlsSegmenterConfig::with_part_duration(100);
let d = cfg.part_duration_secs();
assert!((d - 0.1).abs() < 1e-9);
}
#[test]
fn test_initial_state() {
let seg = default_segmenter();
assert_eq!(seg.current_segment_seq(), 0);
assert_eq!(seg.current_part_index(), 0);
assert_eq!(seg.ready_part_count(), 0);
}
#[test]
fn test_single_frame_no_part() {
let mut seg = default_segmenter();
seg.push_frame(make_frame(0, true, 1024));
assert_eq!(seg.ready_part_count(), 0);
}
#[test]
fn test_part_flush_on_duration() {
let mut cfg = LlHlsSegmenterConfig::default();
cfg.part_duration = Duration::from_millis(100);
cfg.parts_per_segment = 10;
let mut seg = LlHlsSegmenter::new(cfg);
for i in 0..4u64 {
seg.push_frame(make_frame(i * 33, i == 0, 512));
}
seg.push_frame(make_frame(4 * 33, false, 512));
let parts = seg.drain_parts();
assert!(!parts.is_empty());
}
#[test]
fn test_keyframe_splits_partial() {
let mut cfg = LlHlsSegmenterConfig::default();
cfg.part_duration = Duration::from_millis(500); cfg.parts_per_segment = 10;
let mut seg = LlHlsSegmenter::new(cfg);
seg.push_frame(make_frame(0, false, 512));
seg.push_frame(make_frame(33, true, 512));
let parts = seg.drain_parts();
assert_eq!(parts.len(), 1, "first partial should flush on keyframe");
}
#[test]
fn test_independent_marking() {
let mut cfg = LlHlsSegmenterConfig::default();
cfg.part_duration = Duration::from_millis(500);
cfg.parts_per_segment = 5;
let mut seg = LlHlsSegmenter::new(cfg);
seg.push_frame(make_frame(0, true, 512)); seg.push_frame(make_frame(33, false, 512)); seg.push_frame(make_frame(66, true, 512));
let parts = seg.drain_parts();
assert!(!parts.is_empty());
let first = &parts[0];
assert!(first.independent, "first part has keyframe → INDEPENDENT=YES");
}
#[test]
fn test_ext_x_part_tag() {
let part = HlsPartialSegment {
segment_sequence: 2,
part_index: 0,
uri: "seg2_part0.mp4".to_owned(),
duration_secs: 0.2,
independent: true,
data: vec![],
created_at: SystemTime::now(),
is_last_in_segment: false,
};
let tag = part.to_ext_x_part_tag();
assert!(tag.contains("#EXT-X-PART"));
assert!(tag.contains("DURATION=0.20000"));
assert!(tag.contains("INDEPENDENT=YES"));
assert!(tag.contains("seg2_part0.mp4"));
}
#[test]
fn test_ext_x_part_tag_no_independent() {
let part = HlsPartialSegment {
segment_sequence: 0,
part_index: 1,
uri: "seg0_part1.mp4".to_owned(),
duration_secs: 0.2,
independent: false,
data: vec![],
created_at: SystemTime::now(),
is_last_in_segment: false,
};
let tag = part.to_ext_x_part_tag();
assert!(!tag.contains("INDEPENDENT"));
}
#[test]
fn test_parent_segment_uri() {
let part = HlsPartialSegment {
segment_sequence: 7,
part_index: 2,
uri: "seg7_part2.mp4".to_owned(),
duration_secs: 0.2,
independent: false,
data: vec![],
created_at: SystemTime::now(),
is_last_in_segment: false,
};
assert_eq!(part.parent_segment_uri(), "seg7.mp4");
}
#[test]
fn test_full_segment_produced() {
let mut cfg = LlHlsSegmenterConfig::default();
cfg.part_duration = Duration::from_millis(50);
cfg.parts_per_segment = 2;
let mut seg = LlHlsSegmenter::new(cfg);
for i in 0..6u64 {
seg.push_frame(make_frame(i * 20, i == 0, 256));
}
let completed = seg.take_completed_segment();
assert!(completed.is_some());
let c = completed.expect("should have segment");
assert_eq!(c.sequence, 0);
assert_eq!(c.parts.len(), 2);
}
#[test]
fn test_segment_seq_increments() {
let mut cfg = LlHlsSegmenterConfig::default();
cfg.part_duration = Duration::from_millis(50);
cfg.parts_per_segment = 2;
let mut seg = LlHlsSegmenter::new(cfg);
for i in 0..12u64 {
seg.push_frame(make_frame(i * 20, i % 4 == 0, 256));
}
assert_eq!(seg.current_segment_seq(), 2);
}
#[test]
fn test_window_size_limit() {
let mut cfg = LlHlsSegmenterConfig::default();
cfg.part_duration = Duration::from_millis(50);
cfg.parts_per_segment = 2;
cfg.window_size = 2;
let mut seg = LlHlsSegmenter::new(cfg);
for i in 0..16u64 {
seg.push_frame(make_frame(i * 20, i % 4 == 0, 256));
}
assert_eq!(seg.completed_segment_count(), 2);
}
#[test]
fn test_flush_drains_accumulator() {
let mut seg = default_segmenter();
seg.push_frame(make_frame(0, true, 512)); seg.flush();
assert!(seg.ready_part_count() > 0 || seg.current_segment_seq() > 0);
}
#[test]
fn test_preload_hint_uri() {
let seg = default_segmenter();
let hint = seg.preload_hint_uri();
assert!(hint.contains("seg0_part0.mp4"));
}
#[test]
fn test_preload_hint_advances() {
let mut cfg = LlHlsSegmenterConfig::default();
cfg.part_duration = Duration::from_millis(50);
cfg.parts_per_segment = 5;
let mut seg = LlHlsSegmenter::new(cfg);
for i in 0..4u64 {
seg.push_frame(make_frame(i * 20, i == 0, 256));
}
seg.drain_parts();
let hint = seg.preload_hint_uri();
assert!(!hint.contains("_part0.mp4") || seg.current_part_index() == 0);
}
#[test]
fn test_media_frame_new() {
let f = MediaFrame::new(vec![1, 2, 3], 1000, true);
assert_eq!(f.pts_ms, 1000);
assert!(f.is_keyframe);
assert_eq!(f.data.len(), 3);
}
#[test]
fn test_completed_segment_to_m3u8() {
let part = HlsPartialSegment {
segment_sequence: 0,
part_index: 0,
uri: "seg0_part0.mp4".to_owned(),
duration_secs: 0.2,
independent: true,
data: vec![],
created_at: SystemTime::now(),
is_last_in_segment: false,
};
let seg = CompletedSegment {
sequence: 0,
uri: "seg0.mp4".to_owned(),
duration_secs: 0.2,
parts: vec![part],
finalized_at: SystemTime::now(),
};
let lines = seg.to_m3u8_lines();
assert!(lines.contains("#EXT-X-PART"));
assert!(lines.contains("#EXTINF"));
assert!(lines.contains("seg0.mp4"));
}
#[test]
fn test_segmenter_debug() {
let seg = default_segmenter();
let dbg = format!("{seg:?}");
assert!(dbg.contains("LlHlsSegmenter"));
}
#[test]
fn test_default_100ms_ctor() {
let seg = LlHlsSegmenter::default_100ms();
assert!((seg.config.part_duration_secs() - 0.1).abs() < 1e-9);
}
#[test]
fn test_default_200ms_ctor() {
let seg = LlHlsSegmenter::default_200ms();
assert!((seg.config.part_duration_secs() - 0.2).abs() < 1e-9);
}
#[test]
fn test_part_index_resets_on_segment() {
let mut cfg = LlHlsSegmenterConfig::default();
cfg.part_duration = Duration::from_millis(50);
cfg.parts_per_segment = 2;
let mut seg = LlHlsSegmenter::new(cfg);
for i in 0..6u64 {
seg.push_frame(make_frame(i * 20, i % 3 == 0, 256));
}
assert_eq!(seg.current_part_index(), 0);
}
#[test]
fn test_parts_carry_segment_seq() {
let mut cfg = LlHlsSegmenterConfig::default();
cfg.part_duration = Duration::from_millis(50);
cfg.parts_per_segment = 10;
let mut seg = LlHlsSegmenter::new(cfg);
seg.push_frame(make_frame(0, true, 256));
seg.push_frame(make_frame(33, false, 256));
seg.push_frame(make_frame(66, true, 256)); let parts = seg.drain_parts();
for p in &parts {
assert_eq!(p.segment_sequence, 0);
}
}
#[test]
fn test_part_indices_increasing() {
let mut cfg = LlHlsSegmenterConfig::default();
cfg.part_duration = Duration::from_millis(50);
cfg.parts_per_segment = 5;
let mut seg = LlHlsSegmenter::new(cfg);
for i in 0..6u64 {
seg.push_frame(make_frame(i * 20, i % 2 == 0, 256));
}
let parts = seg.drain_parts();
let indices: Vec<u32> = parts.iter().map(|p| p.part_index).collect();
if indices.len() > 1 {
for w in indices.windows(2) {
assert!(w[1] == w[0] + 1, "part indices should be sequential");
}
}
}
#[test]
fn test_last_part_in_segment_flag() {
let mut cfg = LlHlsSegmenterConfig::default();
cfg.part_duration = Duration::from_millis(50);
cfg.parts_per_segment = 2;
let mut seg = LlHlsSegmenter::new(cfg);
for i in 0..6u64 {
seg.push_frame(make_frame(i * 20, i % 3 == 0, 256));
}
let all_parts = seg.drain_parts();
if !all_parts.is_empty() {
let last = all_parts.last().expect("should have a part");
let _ = last.is_last_in_segment; }
}
}