Skip to main content

oximedia_proxy/
proxy_pipeline.rs

1//! Proxy production pipeline with ordered steps and phase tracking.
2//!
3//! A [`ProxyPipeline`] models the sequence of operations required to move
4//! a media asset from ingest through proxy creation, review, and online
5//! finishing.  Each operation is a [`PipelineStep`] that belongs to one of
6//! the five [`PipelinePhase`]s.
7
8#![allow(dead_code)]
9
10/// High-level phase within a proxy production pipeline.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
12pub enum PipelinePhase {
13    /// Raw media acquisition and integrity verification.
14    Ingest,
15    /// Proxy transcoding and quality checks.
16    ProxyCreation,
17    /// Offline editorial using low-resolution proxies.
18    OfflineEdit,
19    /// Client or stakeholder review before lock.
20    Review,
21    /// Online conform, colour grade, and final delivery.
22    OnlineFinish,
23}
24
25impl PipelinePhase {
26    /// Return a short human-readable name for this phase.
27    pub fn name(self) -> &'static str {
28        match self {
29            Self::Ingest => "Ingest",
30            Self::ProxyCreation => "Proxy Creation",
31            Self::OfflineEdit => "Offline Edit",
32            Self::Review => "Review",
33            Self::OnlineFinish => "Online Finish",
34        }
35    }
36
37    /// Return the next phase, or `None` if this is the final phase.
38    pub fn next(self) -> Option<Self> {
39        match self {
40            Self::Ingest => Some(Self::ProxyCreation),
41            Self::ProxyCreation => Some(Self::OfflineEdit),
42            Self::OfflineEdit => Some(Self::Review),
43            Self::Review => Some(Self::OnlineFinish),
44            Self::OnlineFinish => None,
45        }
46    }
47
48    /// Return all phases in pipeline order.
49    pub fn all() -> &'static [PipelinePhase] {
50        &[
51            Self::Ingest,
52            Self::ProxyCreation,
53            Self::OfflineEdit,
54            Self::Review,
55            Self::OnlineFinish,
56        ]
57    }
58}
59
60impl std::fmt::Display for PipelinePhase {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        f.write_str(self.name())
63    }
64}
65
66/// Completion status of a [`PipelineStep`].
67#[derive(Debug, Clone, Copy, PartialEq, Eq)]
68pub enum StepStatus {
69    /// Not yet started.
70    Pending,
71    /// Currently executing.
72    InProgress,
73    /// Completed successfully.
74    Complete,
75    /// Failed; pipeline may be blocked.
76    Failed,
77    /// Explicitly skipped (e.g. optional step).
78    Skipped,
79}
80
81impl StepStatus {
82    /// Return `true` if this step no longer needs to run.
83    pub fn is_terminal(self) -> bool {
84        matches!(self, Self::Complete | Self::Failed | Self::Skipped)
85    }
86}
87
88/// A single named operation within a [`PipelinePhase`].
89#[derive(Debug, Clone)]
90pub struct PipelineStep {
91    /// Human-readable name of this step.
92    pub name: String,
93    /// The phase this step belongs to.
94    pub phase: PipelinePhase,
95    /// Whether this step must succeed for the pipeline to advance.
96    pub required: bool,
97    /// Current completion status.
98    pub status: StepStatus,
99    /// Optional error message set when `status` is [`StepStatus::Failed`].
100    pub error: Option<String>,
101}
102
103impl PipelineStep {
104    /// Create a new required step in `Pending` state.
105    pub fn new(name: impl Into<String>, phase: PipelinePhase) -> Self {
106        Self {
107            name: name.into(),
108            phase,
109            required: true,
110            status: StepStatus::Pending,
111            error: None,
112        }
113    }
114
115    /// Create an optional step (failure will not block the pipeline).
116    pub fn optional(name: impl Into<String>, phase: PipelinePhase) -> Self {
117        Self {
118            required: false,
119            ..Self::new(name, phase)
120        }
121    }
122
123    /// Mark this step as complete.
124    pub fn complete(&mut self) {
125        self.status = StepStatus::Complete;
126        self.error = None;
127    }
128
129    /// Mark this step as failed with a reason.
130    pub fn fail(&mut self, reason: impl Into<String>) {
131        self.status = StepStatus::Failed;
132        self.error = Some(reason.into());
133    }
134
135    /// Mark this step as skipped.
136    pub fn skip(&mut self) {
137        self.status = StepStatus::Skipped;
138    }
139
140    /// Mark this step as in progress.
141    pub fn start(&mut self) {
142        self.status = StepStatus::InProgress;
143    }
144
145    /// Return `true` if this step blocks pipeline advancement when failed.
146    pub fn is_blocking(&self) -> bool {
147        self.required && self.status == StepStatus::Failed
148    }
149}
150
151/// Ordered pipeline of [`PipelineStep`]s tracking proxy workflow progress.
152#[derive(Debug, Default)]
153pub struct ProxyPipeline {
154    steps: Vec<PipelineStep>,
155}
156
157impl ProxyPipeline {
158    /// Create an empty pipeline.
159    pub fn new() -> Self {
160        Self::default()
161    }
162
163    /// Create a pipeline pre-populated with a standard set of steps.
164    pub fn standard() -> Self {
165        let mut p = Self::new();
166        p.add(PipelineStep::new("Media ingest", PipelinePhase::Ingest));
167        p.add(PipelineStep::new(
168            "Checksum verification",
169            PipelinePhase::Ingest,
170        ));
171        p.add(PipelineStep::new(
172            "Proxy transcode",
173            PipelinePhase::ProxyCreation,
174        ));
175        p.add(PipelineStep::optional(
176            "QC check",
177            PipelinePhase::ProxyCreation,
178        ));
179        p.add(PipelineStep::new("Load in NLE", PipelinePhase::OfflineEdit));
180        p.add(PipelineStep::new(
181            "Picture lock",
182            PipelinePhase::OfflineEdit,
183        ));
184        p.add(PipelineStep::new("Client screening", PipelinePhase::Review));
185        p.add(PipelineStep::optional(
186            "Revision notes",
187            PipelinePhase::Review,
188        ));
189        p.add(PipelineStep::new(
190            "Online conform",
191            PipelinePhase::OnlineFinish,
192        ));
193        p.add(PipelineStep::new(
194            "Final delivery",
195            PipelinePhase::OnlineFinish,
196        ));
197        p
198    }
199
200    /// Append a step to the pipeline.
201    pub fn add(&mut self, step: PipelineStep) {
202        self.steps.push(step);
203    }
204
205    /// Return a slice of all steps.
206    pub fn steps(&self) -> &[PipelineStep] {
207        &self.steps
208    }
209
210    /// Return all steps belonging to `phase`.
211    pub fn steps_for_phase(&self, phase: PipelinePhase) -> Vec<&PipelineStep> {
212        self.steps.iter().filter(|s| s.phase == phase).collect()
213    }
214
215    /// Return the current active phase (the phase of the first non-terminal step).
216    ///
217    /// Returns `None` if all steps are terminal.
218    pub fn current_phase(&self) -> Option<PipelinePhase> {
219        self.steps
220            .iter()
221            .find(|s| !s.status.is_terminal())
222            .map(|s| s.phase)
223    }
224
225    /// Return `true` if any required step has failed.
226    pub fn is_blocked(&self) -> bool {
227        self.steps.iter().any(|s| s.is_blocking())
228    }
229
230    /// Return `true` if all steps are terminal (complete, skipped, or failed).
231    pub fn is_finished(&self) -> bool {
232        self.steps.iter().all(|s| s.status.is_terminal())
233    }
234
235    /// Return `true` if all required steps have completed or been skipped.
236    pub fn is_successful(&self) -> bool {
237        self.steps
238            .iter()
239            .filter(|s| s.required)
240            .all(|s| matches!(s.status, StepStatus::Complete | StepStatus::Skipped))
241    }
242
243    /// Count steps by status.
244    pub fn count_by_status(&self, status: StepStatus) -> usize {
245        self.steps.iter().filter(|s| s.status == status).count()
246    }
247
248    /// Mutable access to a step by index.
249    pub fn step_mut(&mut self, idx: usize) -> Option<&mut PipelineStep> {
250        self.steps.get_mut(idx)
251    }
252
253    /// Total number of steps.
254    pub fn len(&self) -> usize {
255        self.steps.len()
256    }
257
258    /// Return `true` if the pipeline contains no steps.
259    pub fn is_empty(&self) -> bool {
260        self.steps.is_empty()
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    #[test]
269    fn pipeline_phase_name() {
270        assert_eq!(PipelinePhase::Ingest.name(), "Ingest");
271        assert_eq!(PipelinePhase::OnlineFinish.name(), "Online Finish");
272    }
273
274    #[test]
275    fn pipeline_phase_display() {
276        assert_eq!(format!("{}", PipelinePhase::Review), "Review");
277    }
278
279    #[test]
280    fn pipeline_phase_next_chain() {
281        assert_eq!(
282            PipelinePhase::Ingest.next(),
283            Some(PipelinePhase::ProxyCreation)
284        );
285        assert_eq!(PipelinePhase::OnlineFinish.next(), None);
286    }
287
288    #[test]
289    fn pipeline_phase_all_count() {
290        assert_eq!(PipelinePhase::all().len(), 5);
291    }
292
293    #[test]
294    fn step_status_terminal() {
295        assert!(StepStatus::Complete.is_terminal());
296        assert!(StepStatus::Failed.is_terminal());
297        assert!(StepStatus::Skipped.is_terminal());
298        assert!(!StepStatus::Pending.is_terminal());
299        assert!(!StepStatus::InProgress.is_terminal());
300    }
301
302    #[test]
303    fn pipeline_step_complete() {
304        let mut step = PipelineStep::new("Test", PipelinePhase::Ingest);
305        step.complete();
306        assert_eq!(step.status, StepStatus::Complete);
307        assert!(step.error.is_none());
308    }
309
310    #[test]
311    fn pipeline_step_fail() {
312        let mut step = PipelineStep::new("Test", PipelinePhase::Ingest);
313        step.fail("disk full");
314        assert_eq!(step.status, StepStatus::Failed);
315        assert_eq!(step.error.as_deref(), Some("disk full"));
316    }
317
318    #[test]
319    fn pipeline_step_is_blocking_required_failed() {
320        let mut step = PipelineStep::new("Req", PipelinePhase::ProxyCreation);
321        step.fail("err");
322        assert!(step.is_blocking());
323    }
324
325    #[test]
326    fn pipeline_step_optional_not_blocking_on_fail() {
327        let mut step = PipelineStep::optional("Opt", PipelinePhase::Review);
328        step.fail("warn");
329        assert!(!step.is_blocking());
330    }
331
332    #[test]
333    fn proxy_pipeline_standard_length() {
334        let p = ProxyPipeline::standard();
335        assert_eq!(p.len(), 10);
336    }
337
338    #[test]
339    fn proxy_pipeline_current_phase_initial() {
340        let p = ProxyPipeline::standard();
341        assert_eq!(p.current_phase(), Some(PipelinePhase::Ingest));
342    }
343
344    #[test]
345    fn proxy_pipeline_is_not_finished_initially() {
346        let p = ProxyPipeline::standard();
347        assert!(!p.is_finished());
348    }
349
350    #[test]
351    fn proxy_pipeline_is_blocked_after_required_failure() {
352        let mut p = ProxyPipeline::standard();
353        p.step_mut(0)
354            .expect("should succeed in test")
355            .fail("checksum mismatch");
356        assert!(p.is_blocked());
357    }
358
359    #[test]
360    fn proxy_pipeline_count_by_status() {
361        let mut p = ProxyPipeline::standard();
362        p.step_mut(0).expect("should succeed in test").complete();
363        assert_eq!(p.count_by_status(StepStatus::Complete), 1);
364        assert_eq!(p.count_by_status(StepStatus::Pending), p.len() - 1);
365    }
366
367    #[test]
368    fn proxy_pipeline_steps_for_phase() {
369        let p = ProxyPipeline::standard();
370        let ingest_steps = p.steps_for_phase(PipelinePhase::Ingest);
371        assert_eq!(ingest_steps.len(), 2); // "Media ingest" + "Checksum verification"
372    }
373
374    #[test]
375    fn proxy_pipeline_is_successful_when_all_complete() {
376        let mut p = ProxyPipeline::new();
377        p.add(PipelineStep::new("A", PipelinePhase::Ingest));
378        p.add(PipelineStep::new("B", PipelinePhase::ProxyCreation));
379        p.step_mut(0).expect("should succeed in test").complete();
380        p.step_mut(1).expect("should succeed in test").complete();
381        assert!(p.is_successful());
382    }
383
384    #[test]
385    fn proxy_pipeline_empty() {
386        let p = ProxyPipeline::new();
387        assert!(p.is_empty());
388        assert!(p.current_phase().is_none());
389        assert!(p.is_finished());
390    }
391}