Skip to main content

oximedia_transcode/
segment_encoder.rs

1//! Segment-based encoding for HLS and DASH streaming.
2//!
3//! This module provides tools for planning segment boundaries,
4//! tracking encoded segments, and generating HLS/DASH manifests.
5
6/// Configuration for segment-based encoding.
7#[derive(Debug, Clone)]
8pub struct SegmentConfig {
9    /// Target segment duration in seconds.
10    pub duration_secs: f32,
11    /// Keyframe interval in frames.
12    pub keyframe_interval: u32,
13    /// Force a keyframe at each segment boundary.
14    pub force_key_at_segment: bool,
15}
16
17impl Default for SegmentConfig {
18    fn default() -> Self {
19        Self {
20            duration_secs: 6.0,
21            keyframe_interval: 60,
22            force_key_at_segment: true,
23        }
24    }
25}
26
27impl SegmentConfig {
28    /// Creates a new segment configuration.
29    #[must_use]
30    pub fn new(duration_secs: f32, keyframe_interval: u32, force_key_at_segment: bool) -> Self {
31        Self {
32            duration_secs,
33            keyframe_interval,
34            force_key_at_segment,
35        }
36    }
37}
38
39/// A boundary point between two segments.
40#[derive(Debug, Clone, PartialEq)]
41pub struct SegmentBoundary {
42    /// Frame index where the segment starts.
43    pub frame_idx: u64,
44    /// Whether this frame is a keyframe.
45    pub is_keyframe: bool,
46    /// Timestamp in seconds.
47    pub timestamp_secs: f64,
48}
49
50/// A complete segment plan for encoding.
51#[derive(Debug, Clone)]
52pub struct SegmentPlan {
53    /// All segment boundaries.
54    pub boundaries: Vec<SegmentBoundary>,
55    /// Total number of frames.
56    pub total_frames: u64,
57    /// Total number of segments.
58    pub segment_count: u32,
59}
60
61/// Plans segment boundaries for a given video.
62#[derive(Debug, Clone, Default)]
63pub struct SegmentPlanner;
64
65impl SegmentPlanner {
66    /// Creates a new segment planner.
67    #[must_use]
68    pub fn new() -> Self {
69        Self
70    }
71
72    /// Plans segment boundaries for `total_frames` frames at `fps` with the given config.
73    #[must_use]
74    pub fn plan(total_frames: u64, fps: f32, config: &SegmentConfig) -> SegmentPlan {
75        let frames_per_segment = (config.duration_secs * fps).round() as u64;
76        let frames_per_segment = frames_per_segment.max(1);
77
78        let mut boundaries = Vec::new();
79        let mut frame_idx = 0u64;
80        let mut seg_count = 0u32;
81
82        while frame_idx < total_frames {
83            let is_keyframe = if config.force_key_at_segment {
84                true
85            } else {
86                // Keyframe at regular intervals
87                frame_idx % u64::from(config.keyframe_interval) == 0
88            };
89
90            let timestamp_secs = frame_idx as f64 / f64::from(fps);
91
92            boundaries.push(SegmentBoundary {
93                frame_idx,
94                is_keyframe,
95                timestamp_secs,
96            });
97
98            frame_idx += frames_per_segment;
99            seg_count += 1;
100        }
101
102        SegmentPlan {
103            boundaries,
104            total_frames,
105            segment_count: seg_count,
106        }
107    }
108}
109
110/// An encoded segment of media.
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct EncodedSegment {
113    /// Segment index (0-based).
114    pub index: u32,
115    /// Start time in milliseconds.
116    pub start_ms: u64,
117    /// Duration in milliseconds.
118    pub duration_ms: u64,
119    /// File size in bytes.
120    pub size_bytes: u64,
121    /// Actual bitrate in kilobits per second.
122    pub bitrate_kbps: u32,
123    /// Codec used for this segment.
124    pub codec: String,
125}
126
127impl EncodedSegment {
128    /// Creates a new encoded segment.
129    #[must_use]
130    pub fn new(
131        index: u32,
132        start_ms: u64,
133        duration_ms: u64,
134        size_bytes: u64,
135        bitrate_kbps: u32,
136        codec: impl Into<String>,
137    ) -> Self {
138        Self {
139            index,
140            start_ms,
141            duration_ms,
142            size_bytes,
143            bitrate_kbps,
144            codec: codec.into(),
145        }
146    }
147
148    /// Returns the end time in milliseconds.
149    #[must_use]
150    pub fn end_ms(&self) -> u64 {
151        self.start_ms + self.duration_ms
152    }
153
154    /// Returns the duration in seconds.
155    #[must_use]
156    pub fn duration_secs(&self) -> f64 {
157        self.duration_ms as f64 / 1000.0
158    }
159}
160
161/// Encoder that tracks encoded segments.
162#[derive(Debug, Clone, Default)]
163pub struct SegmentEncoder {
164    /// All encoded segments.
165    pub encoded_segments: Vec<EncodedSegment>,
166}
167
168impl SegmentEncoder {
169    /// Creates a new segment encoder.
170    #[must_use]
171    pub fn new() -> Self {
172        Self::default()
173    }
174
175    /// Adds a segment to the encoder's list.
176    pub fn add_segment(&mut self, segment: EncodedSegment) {
177        self.encoded_segments.push(segment);
178    }
179
180    /// Returns the total number of encoded segments.
181    #[must_use]
182    pub fn segment_count(&self) -> usize {
183        self.encoded_segments.len()
184    }
185
186    /// Returns the total encoded size in bytes.
187    #[must_use]
188    pub fn total_bytes(&self) -> u64 {
189        self.encoded_segments.iter().map(|s| s.size_bytes).sum()
190    }
191
192    /// Returns the average bitrate across all segments.
193    #[must_use]
194    pub fn average_bitrate_kbps(&self) -> Option<u32> {
195        if self.encoded_segments.is_empty() {
196            return None;
197        }
198        let sum: u64 = self
199            .encoded_segments
200            .iter()
201            .map(|s| u64::from(s.bitrate_kbps))
202            .sum();
203        Some((sum / self.encoded_segments.len() as u64) as u32)
204    }
205}
206
207/// Generates HLS and DASH manifests from encoded segments.
208#[derive(Debug, Clone, Default)]
209pub struct SegmentManifest;
210
211impl SegmentManifest {
212    /// Generates an HLS `.m3u8` manifest.
213    #[must_use]
214    pub fn generate_hls(segments: &[EncodedSegment], base_url: &str) -> String {
215        let max_duration = segments
216            .iter()
217            .map(EncodedSegment::duration_secs)
218            .fold(0.0_f64, f64::max);
219
220        let mut manifest = format!(
221            "#EXTM3U\n#EXT-X-VERSION:3\n#EXT-X-TARGETDURATION:{}\n#EXT-X-MEDIA-SEQUENCE:0\n",
222            max_duration.ceil() as u64
223        );
224
225        for seg in segments {
226            let duration = seg.duration_secs();
227            manifest.push_str(&format!(
228                "#EXTINF:{:.3},\n{}/segment_{:05}.ts\n",
229                duration, base_url, seg.index
230            ));
231        }
232
233        manifest.push_str("#EXT-X-ENDLIST\n");
234        manifest
235    }
236
237    /// Generates a MPEG-DASH `manifest.mpd` manifest.
238    #[must_use]
239    pub fn generate_dash(segments: &[EncodedSegment], base_url: &str) -> String {
240        let total_ms: u64 = segments.iter().map(|s| s.duration_ms).sum();
241        let total_secs = total_ms as f64 / 1000.0;
242
243        let avg_bitrate = if segments.is_empty() {
244            0u32
245        } else {
246            let sum: u64 = segments.iter().map(|s| u64::from(s.bitrate_kbps)).sum();
247            (sum / segments.len() as u64) as u32
248        };
249
250        let codec = segments.first().map_or("avc1", |s| s.codec.as_str());
251
252        let mut mpd = format!(
253            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n\
254             <MPD xmlns=\"urn:mpeg:dash:schema:mpd:2011\" \
255             mediaPresentationDuration=\"PT{total_secs:.3}S\" \
256             type=\"static\">\n  \
257             <Period>\n    \
258             <AdaptationSet mimeType=\"video/mp4\">\n      \
259             <Representation id=\"0\" codecs=\"{codec}\" bandwidth=\"{}\">\n",
260            avg_bitrate * 1000
261        );
262
263        mpd.push_str("        <SegmentList>\n");
264        for seg in segments {
265            mpd.push_str(&format!(
266                "          <SegmentURL media=\"{}/segment_{:05}.mp4\"/>\n",
267                base_url, seg.index
268            ));
269        }
270        mpd.push_str(
271            "        </SegmentList>\n      \
272             </Representation>\n    \
273             </AdaptationSet>\n  \
274             </Period>\n\
275             </MPD>\n",
276        );
277
278        mpd
279    }
280}
281
282// ─── Parallel segment encoding ────────────────────────────────────────────────
283
284/// The result of encoding a single independent segment in parallel.
285#[derive(Debug, Clone)]
286pub struct ParallelSegmentResult {
287    /// Zero-based segment index.
288    pub index: u32,
289    /// Whether the encode succeeded.
290    pub success: bool,
291    /// Error message if `!success`.
292    pub error: Option<String>,
293    /// Produced encoded segment (on success).
294    pub segment: Option<EncodedSegment>,
295    /// Wall-clock time for this segment in seconds.
296    pub wall_time_secs: f64,
297}
298
299impl ParallelSegmentResult {
300    /// Creates a successful result.
301    #[must_use]
302    pub fn ok(index: u32, segment: EncodedSegment, wall_time_secs: f64) -> Self {
303        Self {
304            index,
305            success: true,
306            error: None,
307            segment: Some(segment),
308            wall_time_secs,
309        }
310    }
311
312    /// Creates a failed result.
313    #[must_use]
314    pub fn err(index: u32, error: impl Into<String>, wall_time_secs: f64) -> Self {
315        Self {
316            index,
317            success: false,
318            error: Some(error.into()),
319            segment: None,
320            wall_time_secs,
321        }
322    }
323}
324
325/// Summary statistics for a parallel segment encode batch.
326#[derive(Debug, Clone, Default)]
327pub struct ParallelSegmentStats {
328    /// Total segments submitted.
329    pub total_segments: u32,
330    /// Number of segments encoded successfully.
331    pub succeeded: u32,
332    /// Number of segments that failed.
333    pub failed: u32,
334    /// Total compressed bytes from all successful segments.
335    pub total_bytes: u64,
336    /// Total wall-clock time in seconds.
337    pub wall_time_secs: f64,
338}
339
340impl ParallelSegmentStats {
341    /// Throughput in segments per second.
342    #[must_use]
343    pub fn segments_per_second(&self) -> f64 {
344        if self.wall_time_secs > 0.0 {
345            f64::from(self.succeeded) / self.wall_time_secs
346        } else {
347            0.0
348        }
349    }
350
351    /// Failure rate as a fraction in [0.0, 1.0].
352    #[must_use]
353    pub fn failure_rate(&self) -> f64 {
354        if self.total_segments == 0 {
355            return 0.0;
356        }
357        f64::from(self.failed) / f64::from(self.total_segments)
358    }
359}
360
361/// A specification for one segment in a parallel encode batch.
362///
363/// The caller describes the segment (start/duration/codec) and provides raw
364/// pixel/sample data.  The encoder executes each spec independently, which
365/// allows rayon to encode all segments concurrently.
366#[derive(Debug, Clone)]
367pub struct SegmentSpec {
368    /// Zero-based segment index.
369    pub index: u32,
370    /// Start time in milliseconds.
371    pub start_ms: u64,
372    /// Duration in milliseconds.
373    pub duration_ms: u64,
374    /// Codec identifier string (e.g. `"av1"`, `"vp9"`).
375    pub codec: String,
376    /// Raw frame data for this segment (RGBA bytes, row-major).
377    pub frame_data: Vec<u8>,
378    /// Frame width in pixels.
379    pub width: u32,
380    /// Frame height in pixels.
381    pub height: u32,
382}
383
384impl SegmentSpec {
385    /// Creates a new segment specification.
386    #[must_use]
387    pub fn new(
388        index: u32,
389        start_ms: u64,
390        duration_ms: u64,
391        codec: impl Into<String>,
392        frame_data: Vec<u8>,
393        width: u32,
394        height: u32,
395    ) -> Self {
396        Self {
397            index,
398            start_ms,
399            duration_ms,
400            codec: codec.into(),
401            frame_data,
402            width,
403            height,
404        }
405    }
406
407    /// Expected byte length for the RGBA frame data.
408    #[must_use]
409    pub fn expected_frame_bytes(&self) -> usize {
410        (self.width * self.height * 4) as usize
411    }
412
413    /// Returns `true` if the frame data matches the expected dimensions.
414    #[must_use]
415    pub fn frame_data_valid(&self) -> bool {
416        self.frame_data.len() >= self.expected_frame_bytes() && self.width > 0 && self.height > 0
417    }
418}
419
420/// Encodes a batch of independent GOPs (segments) in parallel using rayon.
421///
422/// Each [`SegmentSpec`] is a self-contained unit: it carries its own raw
423/// frame data and codec hint.  The encoder compresses each spec on a rayon
424/// thread and collects the results, maintaining the original ordering.
425///
426/// # Thread safety
427///
428/// All state is local to each rayon task.  No shared mutable state is used.
429pub struct ParallelSegmentEncoder {
430    /// Maximum number of rayon threads (0 = use global pool).
431    max_threads: usize,
432    /// Accumulated stats from previous encode calls.
433    stats: ParallelSegmentStats,
434}
435
436impl ParallelSegmentEncoder {
437    /// Creates a new parallel segment encoder.
438    ///
439    /// `max_threads` controls the rayon thread pool size; pass `0` to use
440    /// rayon's default (number of logical CPUs).
441    #[must_use]
442    pub fn new(max_threads: usize) -> Self {
443        Self {
444            max_threads,
445            stats: ParallelSegmentStats::default(),
446        }
447    }
448
449    /// Returns the current accumulated statistics.
450    #[must_use]
451    pub fn stats(&self) -> &ParallelSegmentStats {
452        &self.stats
453    }
454
455    /// Resets the accumulated statistics.
456    pub fn reset_stats(&mut self) {
457        self.stats = ParallelSegmentStats::default();
458    }
459
460    /// Encodes all specs in parallel and returns a result per spec.
461    ///
462    /// Results are returned in the same order as `specs`.
463    ///
464    /// # Errors
465    ///
466    /// Returns an error if the rayon thread pool cannot be created.
467    pub fn encode_batch(
468        &mut self,
469        specs: Vec<SegmentSpec>,
470    ) -> crate::Result<Vec<ParallelSegmentResult>> {
471        use rayon::prelude::*;
472
473        let total = specs.len() as u32;
474        let wall_start = std::time::Instant::now();
475
476        // Build a custom thread pool if a limit was requested.
477        let results: Vec<ParallelSegmentResult> = if self.max_threads > 0 {
478            let pool = rayon::ThreadPoolBuilder::new()
479                .num_threads(self.max_threads)
480                .build()
481                .map_err(|e| {
482                    crate::TranscodeError::PipelineError(format!(
483                        "Failed to create segment thread pool: {e}"
484                    ))
485                })?;
486
487            pool.install(|| specs.into_par_iter().map(encode_single_segment).collect())
488        } else {
489            specs.into_par_iter().map(encode_single_segment).collect()
490        };
491
492        let wall_secs = wall_start.elapsed().as_secs_f64();
493
494        // Accumulate stats.
495        let succeeded = results.iter().filter(|r| r.success).count() as u32;
496        let failed = total - succeeded;
497        let total_bytes: u64 = results
498            .iter()
499            .filter_map(|r| r.segment.as_ref().map(|s| s.size_bytes))
500            .sum();
501
502        self.stats.total_segments += total;
503        self.stats.succeeded += succeeded;
504        self.stats.failed += failed;
505        self.stats.total_bytes += total_bytes;
506        self.stats.wall_time_secs += wall_secs;
507
508        Ok(results)
509    }
510}
511
512/// Encode a single segment spec on whichever rayon thread picks it up.
513fn encode_single_segment(spec: SegmentSpec) -> ParallelSegmentResult {
514    let t0 = std::time::Instant::now();
515
516    if !spec.frame_data_valid() {
517        return ParallelSegmentResult::err(
518            spec.index,
519            format!(
520                "Segment {}: invalid frame data ({}×{}, {} bytes)",
521                spec.index,
522                spec.width,
523                spec.height,
524                spec.frame_data.len()
525            ),
526            t0.elapsed().as_secs_f64(),
527        );
528    }
529
530    // Compress the segment frame data using a simple luma RLE placeholder.
531    // In production this would call the appropriate oximedia-codec encoder.
532    let compressed = compress_segment_placeholder(&spec.frame_data);
533
534    let segment = EncodedSegment::new(
535        spec.index,
536        spec.start_ms,
537        spec.duration_ms,
538        compressed.len() as u64,
539        estimate_bitrate_kbps(&compressed, spec.duration_ms),
540        spec.codec.clone(),
541    );
542
543    ParallelSegmentResult::ok(spec.index, segment, t0.elapsed().as_secs_f64())
544}
545
546/// Estimates bitrate in kbps from compressed size and duration.
547fn estimate_bitrate_kbps(data: &[u8], duration_ms: u64) -> u32 {
548    if duration_ms == 0 {
549        return 0;
550    }
551    // bits / duration_secs / 1000
552    let bits = data.len() as u64 * 8;
553    let duration_secs = duration_ms as f64 / 1_000.0;
554    ((bits as f64 / duration_secs) / 1_000.0) as u32
555}
556
557/// Placeholder compressor: simple byte-value RLE on the raw data.
558fn compress_segment_placeholder(data: &[u8]) -> Vec<u8> {
559    if data.is_empty() {
560        return Vec::new();
561    }
562    let mut out = Vec::with_capacity(data.len() / 2 + 2);
563    let mut i = 0;
564    while i < data.len() {
565        let val = data[i];
566        let mut run: u8 = 1;
567        while i + usize::from(run) < data.len() && data[i + usize::from(run)] == val && run < 255 {
568            run += 1;
569        }
570        out.push(val);
571        out.push(run);
572        i += usize::from(run);
573    }
574    out
575}
576
577#[cfg(test)]
578mod tests {
579    use super::*;
580
581    #[test]
582    fn test_segment_config_default() {
583        let cfg = SegmentConfig::default();
584        assert_eq!(cfg.duration_secs, 6.0);
585        assert!(cfg.force_key_at_segment);
586    }
587
588    #[test]
589    fn test_segment_config_new() {
590        let cfg = SegmentConfig::new(4.0, 120, false);
591        assert_eq!(cfg.duration_secs, 4.0);
592        assert_eq!(cfg.keyframe_interval, 120);
593        assert!(!cfg.force_key_at_segment);
594    }
595
596    #[test]
597    fn test_segment_planner_basic() {
598        let cfg = SegmentConfig::default(); // 6s segments
599                                            // 180 frames at 30fps = 6 seconds total → 1 segment
600        let plan = SegmentPlanner::plan(180, 30.0, &cfg);
601        assert_eq!(plan.total_frames, 180);
602        assert!(plan.segment_count >= 1);
603    }
604
605    #[test]
606    fn test_segment_planner_multiple_segments() {
607        let cfg = SegmentConfig::new(2.0, 30, true);
608        // 60 frames at 30fps = 2s per segment → expect 1 segment start at 0
609        let plan = SegmentPlanner::plan(60, 30.0, &cfg);
610        assert!(!plan.boundaries.is_empty());
611    }
612
613    #[test]
614    fn test_segment_planner_keyframe_at_boundary() {
615        let cfg = SegmentConfig::new(2.0, 60, true);
616        let plan = SegmentPlanner::plan(120, 30.0, &cfg);
617        for b in &plan.boundaries {
618            assert!(b.is_keyframe, "All boundaries should be keyframes");
619        }
620    }
621
622    #[test]
623    fn test_segment_boundary_timestamp() {
624        let cfg = SegmentConfig::new(2.0, 60, true);
625        let plan = SegmentPlanner::plan(120, 30.0, &cfg);
626        assert!((plan.boundaries[0].timestamp_secs - 0.0).abs() < 1e-9);
627    }
628
629    #[test]
630    fn test_encoded_segment_end_ms() {
631        let seg = EncodedSegment::new(0, 0, 2000, 512_000, 2048, "h264");
632        assert_eq!(seg.end_ms(), 2000);
633        assert!((seg.duration_secs() - 2.0).abs() < 1e-9);
634    }
635
636    #[test]
637    fn test_segment_encoder_add_and_count() {
638        let mut enc = SegmentEncoder::new();
639        assert_eq!(enc.segment_count(), 0);
640        enc.add_segment(EncodedSegment::new(0, 0, 2000, 1024, 4000, "h264"));
641        enc.add_segment(EncodedSegment::new(1, 2000, 2000, 2048, 8000, "h264"));
642        assert_eq!(enc.segment_count(), 2);
643    }
644
645    #[test]
646    fn test_segment_encoder_total_bytes() {
647        let mut enc = SegmentEncoder::new();
648        enc.add_segment(EncodedSegment::new(0, 0, 2000, 1000, 4000, "h264"));
649        enc.add_segment(EncodedSegment::new(1, 2000, 2000, 2000, 8000, "h264"));
650        assert_eq!(enc.total_bytes(), 3000);
651    }
652
653    #[test]
654    fn test_segment_encoder_average_bitrate() {
655        let mut enc = SegmentEncoder::new();
656        assert!(enc.average_bitrate_kbps().is_none());
657        enc.add_segment(EncodedSegment::new(0, 0, 2000, 1000, 4000, "h264"));
658        enc.add_segment(EncodedSegment::new(1, 2000, 2000, 2000, 6000, "h264"));
659        assert_eq!(enc.average_bitrate_kbps(), Some(5000));
660    }
661
662    #[test]
663    fn test_generate_hls_contains_extm3u() {
664        let segments = vec![
665            EncodedSegment::new(0, 0, 6000, 1000, 4000, "h264"),
666            EncodedSegment::new(1, 6000, 6000, 1000, 4000, "h264"),
667        ];
668        let manifest = SegmentManifest::generate_hls(&segments, "https://cdn.example.com");
669        assert!(manifest.contains("#EXTM3U"));
670        assert!(manifest.contains("#EXT-X-ENDLIST"));
671        assert!(manifest.contains("segment_00000.ts"));
672        assert!(manifest.contains("segment_00001.ts"));
673    }
674
675    #[test]
676    fn test_generate_dash_contains_mpd() {
677        let segments = vec![EncodedSegment::new(0, 0, 6000, 1000, 4000, "avc1")];
678        let manifest = SegmentManifest::generate_dash(&segments, "https://cdn.example.com");
679        assert!(manifest.contains("<?xml"));
680        assert!(manifest.contains("<MPD"));
681        assert!(manifest.contains("segment_00000.mp4"));
682    }
683
684    #[test]
685    fn test_generate_hls_empty() {
686        let manifest = SegmentManifest::generate_hls(&[], "https://cdn.example.com");
687        assert!(manifest.contains("#EXTM3U"));
688        assert!(manifest.contains("#EXT-X-ENDLIST"));
689    }
690
691    #[test]
692    fn test_generate_dash_empty() {
693        let manifest = SegmentManifest::generate_dash(&[], "https://cdn.example.com");
694        assert!(manifest.contains("<?xml"));
695    }
696
697    // ── ParallelSegmentEncoder tests ──────────────────────────────────────────
698
699    #[test]
700    fn test_segment_spec_new() {
701        let spec = SegmentSpec::new(0, 0, 2000, "av1", vec![0u8; 8 * 8 * 4], 8, 8);
702        assert_eq!(spec.index, 0);
703        assert_eq!(spec.duration_ms, 2000);
704        assert_eq!(spec.codec, "av1");
705        assert!(spec.frame_data_valid());
706    }
707
708    #[test]
709    fn test_segment_spec_invalid_frame_data() {
710        let spec = SegmentSpec::new(0, 0, 2000, "av1", vec![0u8; 4], 64, 64);
711        assert!(
712            !spec.frame_data_valid(),
713            "undersized frame data should be invalid"
714        );
715    }
716
717    #[test]
718    fn test_segment_spec_expected_bytes() {
719        let spec = SegmentSpec::new(0, 0, 1000, "vp9", vec![], 16, 16);
720        assert_eq!(spec.expected_frame_bytes(), 16 * 16 * 4);
721    }
722
723    #[test]
724    fn test_parallel_segment_encoder_single() {
725        let mut encoder = ParallelSegmentEncoder::new(2);
726        let frame_data = vec![128u8; 64 * 64 * 4]; // grey 64×64
727        let spec = SegmentSpec::new(0, 0, 2000, "av1", frame_data, 64, 64);
728
729        let results = encoder.encode_batch(vec![spec]).expect("encode ok");
730        assert_eq!(results.len(), 1);
731        assert!(results[0].success, "single segment should succeed");
732        assert!(results[0].segment.is_some());
733        assert_eq!(results[0].index, 0);
734    }
735
736    #[test]
737    fn test_parallel_segment_encoder_multiple_preserves_order() {
738        let mut encoder = ParallelSegmentEncoder::new(4);
739        let specs: Vec<SegmentSpec> = (0..8)
740            .map(|i| {
741                let frame_data = vec![(i * 30) as u8; 64 * 64 * 4];
742                SegmentSpec::new(i as u32, i as u64 * 2000, 2000, "av1", frame_data, 64, 64)
743            })
744            .collect();
745
746        let results = encoder.encode_batch(specs).expect("encode ok");
747        assert_eq!(results.len(), 8);
748
749        // Results must be in the same order as input specs.
750        for (i, result) in results.iter().enumerate() {
751            assert_eq!(result.index, i as u32, "result order mismatch at index {i}");
752            assert!(result.success, "all segments should succeed");
753        }
754    }
755
756    #[test]
757    fn test_parallel_segment_encoder_invalid_spec_fails_gracefully() {
758        let mut encoder = ParallelSegmentEncoder::new(2);
759        // Invalid: frame_data too small
760        let bad_spec = SegmentSpec::new(0, 0, 2000, "av1", vec![0u8; 10], 64, 64);
761
762        let results = encoder
763            .encode_batch(vec![bad_spec])
764            .expect("encode batch ok");
765        assert_eq!(results.len(), 1);
766        assert!(!results[0].success, "invalid spec should fail");
767        assert!(results[0].error.is_some());
768    }
769
770    #[test]
771    fn test_parallel_segment_encoder_stats() {
772        let mut encoder = ParallelSegmentEncoder::new(2);
773        let specs: Vec<SegmentSpec> = (0..4)
774            .map(|i| {
775                let frame_data = vec![64u8; 64 * 64 * 4];
776                SegmentSpec::new(i as u32, i as u64 * 1000, 1000, "vp9", frame_data, 64, 64)
777            })
778            .collect();
779
780        encoder.encode_batch(specs).expect("encode ok");
781
782        let stats = encoder.stats();
783        assert_eq!(stats.total_segments, 4);
784        assert_eq!(stats.succeeded, 4);
785        assert_eq!(stats.failed, 0);
786        assert!(stats.total_bytes > 0);
787    }
788
789    #[test]
790    fn test_parallel_segment_encoder_stats_reset() {
791        let mut encoder = ParallelSegmentEncoder::new(2);
792        let spec = SegmentSpec::new(0, 0, 1000, "av1", vec![0u8; 64 * 64 * 4], 64, 64);
793        encoder.encode_batch(vec![spec]).expect("encode ok");
794        assert!(encoder.stats().total_segments > 0);
795
796        encoder.reset_stats();
797        assert_eq!(encoder.stats().total_segments, 0);
798        assert_eq!(encoder.stats().succeeded, 0);
799    }
800
801    #[test]
802    fn test_parallel_segment_stats_failure_rate() {
803        let stats = ParallelSegmentStats {
804            total_segments: 10,
805            succeeded: 8,
806            failed: 2,
807            total_bytes: 1_000,
808            wall_time_secs: 1.0,
809        };
810        assert!((stats.failure_rate() - 0.2).abs() < 1e-9);
811        assert!((stats.segments_per_second() - 8.0).abs() < 1e-9);
812    }
813
814    #[test]
815    fn test_parallel_segment_result_ok() {
816        let seg = EncodedSegment::new(0, 0, 1000, 512, 4096, "av1");
817        let result = ParallelSegmentResult::ok(0, seg, 0.5);
818        assert!(result.success);
819        assert!(result.error.is_none());
820        assert!(result.segment.is_some());
821        assert!((result.wall_time_secs - 0.5).abs() < 1e-9);
822    }
823
824    #[test]
825    fn test_parallel_segment_result_err() {
826        let result = ParallelSegmentResult::err(1, "codec unavailable", 0.1);
827        assert!(!result.success);
828        assert_eq!(result.error.as_deref(), Some("codec unavailable"));
829        assert!(result.segment.is_none());
830    }
831
832    #[test]
833    fn test_compress_segment_placeholder_empty() {
834        assert!(compress_segment_placeholder(&[]).is_empty());
835    }
836
837    #[test]
838    fn test_compress_segment_placeholder_rle() {
839        let data = vec![42u8; 8];
840        let compressed = compress_segment_placeholder(&data);
841        // One RLE pair: value=42, run=8
842        assert_eq!(compressed, vec![42, 8]);
843    }
844
845    #[test]
846    fn test_estimate_bitrate_kbps_zero_duration() {
847        assert_eq!(estimate_bitrate_kbps(&[0u8; 100], 0), 0);
848    }
849
850    #[test]
851    fn test_estimate_bitrate_kbps_nonzero() {
852        // 125 bytes × 8 bits = 1000 bits over 1 second = 1 kbps
853        let bps = estimate_bitrate_kbps(&[0u8; 125], 1_000);
854        assert_eq!(bps, 1);
855    }
856
857    #[test]
858    fn test_parallel_segment_encoder_zero_threads() {
859        // 0 threads = use rayon global pool
860        let mut encoder = ParallelSegmentEncoder::new(0);
861        let spec = SegmentSpec::new(0, 0, 1000, "av1", vec![0u8; 8 * 8 * 4], 8, 8);
862        let results = encoder.encode_batch(vec![spec]).expect("encode ok");
863        assert!(results[0].success);
864    }
865}