Skip to main content

oximedia_transcode/
segment_transcoder.rs

1//! Segment-based transcoding for parallel and distributed workloads.
2//!
3//! Provides time-segment decomposition, status tracking, and a multi-worker
4//! transcoding queue for processing large media files in parallel chunks.
5
6/// Specification for a single time-segment to transcode.
7#[derive(Debug, Clone, PartialEq, Eq)]
8pub struct SegmentSpec {
9    /// Segment start time in milliseconds.
10    pub start_ms: u64,
11    /// Segment end time in milliseconds.
12    pub end_ms: u64,
13    /// Name of the encoding profile to apply.
14    pub profile_name: String,
15}
16
17impl SegmentSpec {
18    /// Creates a new segment specification.
19    #[must_use]
20    pub fn new(start_ms: u64, end_ms: u64, profile_name: impl Into<String>) -> Self {
21        Self {
22            start_ms,
23            end_ms,
24            profile_name: profile_name.into(),
25        }
26    }
27
28    /// Returns the segment duration in milliseconds.
29    ///
30    /// Returns 0 if `end_ms` is not greater than `start_ms`.
31    #[must_use]
32    pub fn duration_ms(&self) -> u64 {
33        self.end_ms.saturating_sub(self.start_ms)
34    }
35}
36
37/// Status of a single transcode segment.
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub enum SegmentStatus {
40    /// Waiting to be processed.
41    Pending,
42    /// Currently encoding; value is progress percentage (0–100).
43    Encoding(u8),
44    /// Successfully completed.
45    Done,
46    /// Failed with an error message.
47    Failed(String),
48}
49
50impl SegmentStatus {
51    /// Returns `true` if the segment has finished (either done or failed).
52    #[must_use]
53    pub fn is_complete(&self) -> bool {
54        matches!(self, Self::Done | Self::Failed(_))
55    }
56
57    /// Returns the progress percentage.
58    ///
59    /// - `Pending` → 0
60    /// - `Encoding(p)` → p
61    /// - `Done` → 100
62    /// - `Failed(_)` → 0
63    #[must_use]
64    pub fn progress_pct(&self) -> u8 {
65        match self {
66            Self::Pending => 0,
67            Self::Encoding(p) => *p,
68            Self::Done => 100,
69            Self::Failed(_) => 0,
70        }
71    }
72}
73
74/// A single segment job: specification, file paths, and current status.
75#[derive(Debug, Clone)]
76pub struct TranscodeSegment {
77    /// Specification describing the time range and profile.
78    pub spec: SegmentSpec,
79    /// Absolute path of the input file.
80    pub input_path: String,
81    /// Absolute path of the output file.
82    pub output_path: String,
83    /// Current processing status.
84    pub status: SegmentStatus,
85}
86
87impl TranscodeSegment {
88    /// Creates a new segment in `Pending` state.
89    #[must_use]
90    pub fn new(
91        spec: SegmentSpec,
92        input_path: impl Into<String>,
93        output_path: impl Into<String>,
94    ) -> Self {
95        Self {
96            spec,
97            input_path: input_path.into(),
98            output_path: output_path.into(),
99            status: SegmentStatus::Pending,
100        }
101    }
102}
103
104/// Orchestrates multi-segment transcoding with configurable worker concurrency.
105#[derive(Debug, Clone)]
106pub struct SegmentTranscoder {
107    /// All queued segments.
108    pub segments: Vec<TranscodeSegment>,
109    /// Number of parallel worker threads to use.
110    pub workers: u32,
111}
112
113impl SegmentTranscoder {
114    /// Creates a new transcoder with a specified worker count.
115    ///
116    /// # Panics
117    ///
118    /// Does not panic; `workers` must be at least 1 (caller responsibility).
119    #[must_use]
120    pub fn new(workers: u32) -> Self {
121        Self {
122            segments: Vec::new(),
123            workers,
124        }
125    }
126
127    /// Queues a new segment for transcoding.
128    pub fn queue_segment(
129        &mut self,
130        spec: SegmentSpec,
131        input: impl Into<String>,
132        output: impl Into<String>,
133    ) {
134        self.segments
135            .push(TranscodeSegment::new(spec, input, output));
136    }
137
138    /// Returns the number of segments still waiting to be processed.
139    #[must_use]
140    pub fn pending_count(&self) -> usize {
141        self.segments
142            .iter()
143            .filter(|s| s.status == SegmentStatus::Pending)
144            .count()
145    }
146
147    /// Returns the number of successfully completed segments.
148    #[must_use]
149    pub fn complete_count(&self) -> usize {
150        self.segments
151            .iter()
152            .filter(|s| s.status == SegmentStatus::Done)
153            .count()
154    }
155
156    /// Returns references to all segments that have failed.
157    #[must_use]
158    pub fn failed_segments(&self) -> Vec<&TranscodeSegment> {
159        self.segments
160            .iter()
161            .filter(|s| matches!(s.status, SegmentStatus::Failed(_)))
162            .collect()
163    }
164
165    /// Returns the sum of all segment durations in milliseconds.
166    #[must_use]
167    pub fn total_duration_ms(&self) -> u64 {
168        self.segments.iter().map(|s| s.spec.duration_ms()).sum()
169    }
170
171    /// Returns the total number of queued segments.
172    #[must_use]
173    pub fn segment_count(&self) -> usize {
174        self.segments.len()
175    }
176
177    /// Returns the number of segments currently encoding.
178    #[must_use]
179    pub fn encoding_count(&self) -> usize {
180        self.segments
181            .iter()
182            .filter(|s| matches!(s.status, SegmentStatus::Encoding(_)))
183            .count()
184    }
185
186    /// Returns the overall progress across all segments as a percentage (0–100).
187    #[must_use]
188    pub fn overall_progress_pct(&self) -> u8 {
189        if self.segments.is_empty() {
190            return 0;
191        }
192        let total: u32 = self
193            .segments
194            .iter()
195            .map(|s| u32::from(s.status.progress_pct()))
196            .sum();
197        #[allow(clippy::cast_possible_truncation)]
198        let avg = (total / self.segments.len() as u32) as u8;
199        avg
200    }
201}
202
203impl Default for SegmentTranscoder {
204    fn default() -> Self {
205        Self::new(4)
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    // ── SegmentSpec ──────────────────────────────────────────────────────────
214
215    #[test]
216    fn test_spec_duration_basic() {
217        let spec = SegmentSpec::new(1000, 5000, "720p");
218        assert_eq!(spec.duration_ms(), 4000);
219    }
220
221    #[test]
222    fn test_spec_duration_zero_when_equal() {
223        let spec = SegmentSpec::new(3000, 3000, "720p");
224        assert_eq!(spec.duration_ms(), 0);
225    }
226
227    #[test]
228    fn test_spec_duration_saturating_when_reversed() {
229        // end < start saturates to 0
230        let spec = SegmentSpec::new(5000, 3000, "720p");
231        assert_eq!(spec.duration_ms(), 0);
232    }
233
234    #[test]
235    fn test_spec_profile_name() {
236        let spec = SegmentSpec::new(0, 10_000, "4k-hevc");
237        assert_eq!(spec.profile_name, "4k-hevc");
238    }
239
240    // ── SegmentStatus ────────────────────────────────────────────────────────
241
242    #[test]
243    fn test_status_pending_not_complete() {
244        assert!(!SegmentStatus::Pending.is_complete());
245    }
246
247    #[test]
248    fn test_status_encoding_not_complete() {
249        assert!(!SegmentStatus::Encoding(50).is_complete());
250    }
251
252    #[test]
253    fn test_status_done_is_complete() {
254        assert!(SegmentStatus::Done.is_complete());
255    }
256
257    #[test]
258    fn test_status_failed_is_complete() {
259        assert!(SegmentStatus::Failed("oom".to_string()).is_complete());
260    }
261
262    #[test]
263    fn test_status_progress_pending() {
264        assert_eq!(SegmentStatus::Pending.progress_pct(), 0);
265    }
266
267    #[test]
268    fn test_status_progress_encoding() {
269        assert_eq!(SegmentStatus::Encoding(73).progress_pct(), 73);
270    }
271
272    #[test]
273    fn test_status_progress_done() {
274        assert_eq!(SegmentStatus::Done.progress_pct(), 100);
275    }
276
277    #[test]
278    fn test_status_progress_failed() {
279        assert_eq!(SegmentStatus::Failed("err".to_string()).progress_pct(), 0);
280    }
281
282    // ── SegmentTranscoder ────────────────────────────────────────────────────
283
284    #[test]
285    fn test_transcoder_initial_counts() {
286        let tc = SegmentTranscoder::new(2);
287        assert_eq!(tc.segment_count(), 0);
288        assert_eq!(tc.pending_count(), 0);
289        assert_eq!(tc.complete_count(), 0);
290        assert!(tc.failed_segments().is_empty());
291        assert_eq!(tc.total_duration_ms(), 0);
292    }
293
294    #[test]
295    fn test_queue_segment_increments_count() {
296        let mut tc = SegmentTranscoder::new(2);
297        let spec = SegmentSpec::new(0, 30_000, "1080p");
298        tc.queue_segment(spec, "/in/a.mp4", "/out/a.mp4");
299        assert_eq!(tc.segment_count(), 1);
300        assert_eq!(tc.pending_count(), 1);
301    }
302
303    #[test]
304    fn test_complete_count_after_marking_done() {
305        let mut tc = SegmentTranscoder::new(1);
306        let spec = SegmentSpec::new(0, 10_000, "720p");
307        tc.queue_segment(spec, "/in/b.mp4", "/out/b.mp4");
308        tc.segments[0].status = SegmentStatus::Done;
309        assert_eq!(tc.complete_count(), 1);
310        assert_eq!(tc.pending_count(), 0);
311    }
312
313    #[test]
314    fn test_failed_segments_returns_correct_refs() {
315        let mut tc = SegmentTranscoder::new(2);
316        let s1 = SegmentSpec::new(0, 5000, "360p");
317        let s2 = SegmentSpec::new(5000, 10_000, "360p");
318        tc.queue_segment(s1, "/in/c.mp4", "/out/c1.mp4");
319        tc.queue_segment(s2, "/in/c.mp4", "/out/c2.mp4");
320        tc.segments[0].status = SegmentStatus::Failed("codec error".to_string());
321        let failed = tc.failed_segments();
322        assert_eq!(failed.len(), 1);
323        assert_eq!(failed[0].output_path, "/out/c1.mp4");
324    }
325
326    #[test]
327    fn test_total_duration_ms_sums_all_segments() {
328        let mut tc = SegmentTranscoder::new(4);
329        tc.queue_segment(SegmentSpec::new(0, 10_000, "p"), "/i", "/o1");
330        tc.queue_segment(SegmentSpec::new(10_000, 25_000, "p"), "/i", "/o2");
331        tc.queue_segment(SegmentSpec::new(25_000, 30_000, "p"), "/i", "/o3");
332        assert_eq!(tc.total_duration_ms(), 30_000);
333    }
334
335    #[test]
336    fn test_overall_progress_empty() {
337        let tc = SegmentTranscoder::new(2);
338        assert_eq!(tc.overall_progress_pct(), 0);
339    }
340
341    #[test]
342    fn test_workers_stored() {
343        let tc = SegmentTranscoder::new(8);
344        assert_eq!(tc.workers, 8);
345    }
346
347    #[test]
348    fn test_encoding_count() {
349        let mut tc = SegmentTranscoder::new(2);
350        tc.queue_segment(SegmentSpec::new(0, 5000, "p"), "/i", "/o1");
351        tc.queue_segment(SegmentSpec::new(5000, 10_000, "p"), "/i", "/o2");
352        tc.segments[0].status = SegmentStatus::Encoding(42);
353        assert_eq!(tc.encoding_count(), 1);
354        assert_eq!(tc.pending_count(), 1);
355    }
356}