Skip to main content

oximedia_transcode/
stage_graph.rs

1//! Pipeline stage graph for transcoding workflows.
2//!
3//! Provides a directed graph model of transcode pipeline stages,
4//! supporting stage classification, passthrough detection, and
5//! transcode-stage presence checks.
6
7#![allow(dead_code)]
8
9use serde::{Deserialize, Serialize};
10
11/// Classification of a pipeline stage by its transformation role.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
13pub enum StageType {
14    /// Decode compressed media into raw frames.
15    Decode,
16    /// Encode raw frames into compressed media.
17    Encode,
18    /// Apply a filter or transformation to raw frames.
19    Filter,
20    /// Scale or resize video frames.
21    Scale,
22    /// Normalize or adjust audio.
23    AudioProcess,
24    /// Demux a container into elementary streams.
25    Demux,
26    /// Mux elementary streams into a container.
27    Mux,
28    /// Pass data through unchanged.
29    Passthrough,
30    /// Analyse content without modifying it.
31    Analyse,
32}
33
34impl StageType {
35    /// Returns `true` if this stage type modifies media data.
36    #[must_use]
37    pub fn is_transform(self) -> bool {
38        matches!(
39            self,
40            Self::Decode | Self::Encode | Self::Filter | Self::Scale | Self::AudioProcess
41        )
42    }
43
44    /// Returns `true` if this stage type is a transcode-class operation
45    /// (i.e. involves both decode and encode).
46    #[must_use]
47    pub fn is_transcode_class(self) -> bool {
48        matches!(self, Self::Decode | Self::Encode)
49    }
50
51    /// Returns a human-readable label for the stage type.
52    #[must_use]
53    pub fn label(self) -> &'static str {
54        match self {
55            Self::Decode => "decode",
56            Self::Encode => "encode",
57            Self::Filter => "filter",
58            Self::Scale => "scale",
59            Self::AudioProcess => "audio_process",
60            Self::Demux => "demux",
61            Self::Mux => "mux",
62            Self::Passthrough => "passthrough",
63            Self::Analyse => "analyse",
64        }
65    }
66}
67
68/// A single stage within a transcode pipeline graph.
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct GraphStage {
71    /// Unique identifier for this stage.
72    pub id: u32,
73    /// Human-readable name.
74    pub name: String,
75    /// Functional classification of this stage.
76    pub stage_type: StageType,
77    /// Whether this stage is currently enabled.
78    pub enabled: bool,
79    /// Estimated processing cost (arbitrary units, higher = slower).
80    pub cost_estimate: u32,
81}
82
83impl GraphStage {
84    /// Create a new enabled stage.
85    pub fn new(id: u32, name: impl Into<String>, stage_type: StageType) -> Self {
86        Self {
87            id,
88            name: name.into(),
89            stage_type,
90            enabled: true,
91            cost_estimate: 1,
92        }
93    }
94
95    /// Returns `true` if this stage passes data through unchanged.
96    #[must_use]
97    pub fn is_passthrough(&self) -> bool {
98        self.stage_type == StageType::Passthrough || !self.enabled
99    }
100
101    /// Returns `true` if this stage transforms the media.
102    #[must_use]
103    pub fn is_transform(&self) -> bool {
104        self.enabled && self.stage_type.is_transform()
105    }
106
107    /// Set the cost estimate for this stage.
108    #[must_use]
109    pub fn with_cost(mut self, cost: u32) -> Self {
110        self.cost_estimate = cost;
111        self
112    }
113
114    /// Disable this stage (makes it behave as passthrough).
115    pub fn disable(&mut self) {
116        self.enabled = false;
117    }
118
119    /// Enable this stage.
120    pub fn enable(&mut self) {
121        self.enabled = true;
122    }
123}
124
125/// A directed acyclic graph of pipeline stages for a transcode job.
126#[derive(Debug, Clone, Default, Serialize, Deserialize)]
127pub struct TranscodeGraph {
128    stages: Vec<GraphStage>,
129    /// Edges stored as (`from_id`, `to_id`).
130    edges: Vec<(u32, u32)>,
131}
132
133impl TranscodeGraph {
134    /// Create a new empty pipeline graph.
135    #[must_use]
136    pub fn new() -> Self {
137        Self::default()
138    }
139
140    /// Add a stage to the graph and return its assigned id.
141    pub fn add_stage(&mut self, stage: GraphStage) -> u32 {
142        let id = stage.id;
143        self.stages.push(stage);
144        id
145    }
146
147    /// Connect two stages by their ids.  Returns `false` if either id is unknown.
148    pub fn connect(&mut self, from_id: u32, to_id: u32) -> bool {
149        let has_from = self.stages.iter().any(|s| s.id == from_id);
150        let has_to = self.stages.iter().any(|s| s.id == to_id);
151        if has_from && has_to {
152            self.edges.push((from_id, to_id));
153            true
154        } else {
155            false
156        }
157    }
158
159    /// Return the total number of stages in the graph.
160    #[must_use]
161    pub fn stage_count(&self) -> usize {
162        self.stages.len()
163    }
164
165    /// Return the number of active (enabled) stages.
166    #[must_use]
167    pub fn active_stage_count(&self) -> usize {
168        self.stages.iter().filter(|s| s.enabled).count()
169    }
170
171    /// Return `true` if the graph contains at least one transcode-class stage.
172    #[must_use]
173    pub fn has_transcode_stage(&self) -> bool {
174        self.stages
175            .iter()
176            .any(|s| s.enabled && s.stage_type.is_transcode_class())
177    }
178
179    /// Return `true` if every enabled stage is a passthrough or analyse stage.
180    #[must_use]
181    pub fn is_fully_passthrough(&self) -> bool {
182        self.stages
183            .iter()
184            .all(|s| s.is_passthrough() || s.stage_type == StageType::Analyse)
185    }
186
187    /// Estimate total processing cost by summing all enabled stage costs.
188    #[must_use]
189    pub fn total_cost(&self) -> u32 {
190        self.stages
191            .iter()
192            .filter(|s| s.enabled)
193            .map(|s| s.cost_estimate)
194            .sum()
195    }
196
197    /// Return a list of stage labels in insertion order.
198    #[must_use]
199    pub fn stage_labels(&self) -> Vec<&str> {
200        self.stages.iter().map(|s| s.stage_type.label()).collect()
201    }
202
203    /// Find a stage by id.
204    #[must_use]
205    pub fn find_stage(&self, id: u32) -> Option<&GraphStage> {
206        self.stages.iter().find(|s| s.id == id)
207    }
208
209    /// Remove a stage by id.  Also removes associated edges.
210    pub fn remove_stage(&mut self, id: u32) -> bool {
211        if let Some(pos) = self.stages.iter().position(|s| s.id == id) {
212            self.stages.remove(pos);
213            self.edges.retain(|(f, t)| *f != id && *t != id);
214            true
215        } else {
216            false
217        }
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    fn make_stage(id: u32, t: StageType) -> GraphStage {
226        GraphStage::new(id, format!("stage_{id}"), t)
227    }
228
229    #[test]
230    fn test_stage_type_is_transform_decode() {
231        assert!(StageType::Decode.is_transform());
232    }
233
234    #[test]
235    fn test_stage_type_is_transform_passthrough() {
236        assert!(!StageType::Passthrough.is_transform());
237    }
238
239    #[test]
240    fn test_stage_type_transcode_class() {
241        assert!(StageType::Encode.is_transcode_class());
242        assert!(!StageType::Filter.is_transcode_class());
243    }
244
245    #[test]
246    fn test_stage_type_labels_unique() {
247        let all = [
248            StageType::Decode,
249            StageType::Encode,
250            StageType::Filter,
251            StageType::Scale,
252            StageType::AudioProcess,
253            StageType::Demux,
254            StageType::Mux,
255            StageType::Passthrough,
256            StageType::Analyse,
257        ];
258        let labels: Vec<_> = all.iter().map(|t| t.label()).collect();
259        let unique: std::collections::HashSet<_> = labels.iter().collect();
260        assert_eq!(labels.len(), unique.len());
261    }
262
263    #[test]
264    fn test_graph_stage_is_passthrough_when_disabled() {
265        let mut s = make_stage(1, StageType::Encode);
266        assert!(!s.is_passthrough());
267        s.disable();
268        assert!(s.is_passthrough());
269        s.enable();
270        assert!(!s.is_passthrough());
271    }
272
273    #[test]
274    fn test_graph_stage_passthrough_type() {
275        let s = make_stage(2, StageType::Passthrough);
276        assert!(s.is_passthrough());
277    }
278
279    #[test]
280    fn test_graph_add_stage_count() {
281        let mut g = TranscodeGraph::new();
282        g.add_stage(make_stage(1, StageType::Decode));
283        g.add_stage(make_stage(2, StageType::Encode));
284        assert_eq!(g.stage_count(), 2);
285    }
286
287    #[test]
288    fn test_has_transcode_stage_true() {
289        let mut g = TranscodeGraph::new();
290        g.add_stage(make_stage(1, StageType::Decode));
291        g.add_stage(make_stage(2, StageType::Encode));
292        assert!(g.has_transcode_stage());
293    }
294
295    #[test]
296    fn test_has_transcode_stage_false() {
297        let mut g = TranscodeGraph::new();
298        g.add_stage(make_stage(1, StageType::Passthrough));
299        g.add_stage(make_stage(2, StageType::Filter));
300        assert!(!g.has_transcode_stage());
301    }
302
303    #[test]
304    fn test_is_fully_passthrough() {
305        let mut g = TranscodeGraph::new();
306        g.add_stage(make_stage(1, StageType::Passthrough));
307        g.add_stage(make_stage(2, StageType::Analyse));
308        assert!(g.is_fully_passthrough());
309    }
310
311    #[test]
312    fn test_connect_valid() {
313        let mut g = TranscodeGraph::new();
314        g.add_stage(make_stage(1, StageType::Decode));
315        g.add_stage(make_stage(2, StageType::Encode));
316        assert!(g.connect(1, 2));
317        assert_eq!(g.edges.len(), 1);
318    }
319
320    #[test]
321    fn test_connect_invalid_id() {
322        let mut g = TranscodeGraph::new();
323        g.add_stage(make_stage(1, StageType::Decode));
324        assert!(!g.connect(1, 99));
325    }
326
327    #[test]
328    fn test_total_cost() {
329        let mut g = TranscodeGraph::new();
330        g.add_stage(make_stage(1, StageType::Decode).with_cost(10));
331        g.add_stage(make_stage(2, StageType::Encode).with_cost(20));
332        assert_eq!(g.total_cost(), 30);
333    }
334
335    #[test]
336    fn test_remove_stage_removes_edges() {
337        let mut g = TranscodeGraph::new();
338        g.add_stage(make_stage(1, StageType::Decode));
339        g.add_stage(make_stage(2, StageType::Encode));
340        g.connect(1, 2);
341        g.remove_stage(1);
342        assert_eq!(g.stage_count(), 1);
343        assert!(g.edges.is_empty());
344    }
345
346    #[test]
347    fn test_find_stage() {
348        let mut g = TranscodeGraph::new();
349        g.add_stage(make_stage(42, StageType::Filter));
350        let s = g.find_stage(42).expect("should succeed in test");
351        assert_eq!(s.id, 42);
352        assert!(g.find_stage(0).is_none());
353    }
354}