Skip to main content

rivet/
progress.rs

1//! Job progress reporting.
2//!
3//! Every transcode job reports progress through a [`ProgressSink`] — a small
4//! trait that receives a **uniform** [`RungProgress`] struct (status,
5//! percentage, frame/segment/byte counters) for each rung as the job runs,
6//! plus coarse [`JobEvent`]s for job-level lifecycle.
7//!
8//! The sink methods are synchronous but are called *as the job progresses*
9//! (i.e. asynchronously with respect to completion). To bridge into async
10//! code — e.g. forward progress to a websocket/SQS reporter — wrap a
11//! `tokio::sync::mpsc::Sender` with [`channel_sink`], or implement
12//! [`ProgressSink`] and `try_send` into your own channel.
13
14use std::sync::Arc;
15
16/// Lifecycle status of a single rung (one rendition of the ABR ladder).
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum RungStatus {
19    /// Queued; no frames processed yet.
20    Pending,
21    /// Actively decoding + scaling + encoding frames.
22    Running,
23    /// Finalizing the container / writing playlists.
24    Finalizing,
25    /// Done — `percent == 100`.
26    Completed,
27    /// Errored out — see [`RungProgress::message`].
28    Failed,
29}
30
31/// A uniform progress update for one rung. Emitted repeatedly over the life
32/// of the job. Consumers can render a per-rung progress bar straight from
33/// these fields without knowing anything about the output mode.
34#[derive(Debug, Clone)]
35pub struct RungProgress {
36    /// Index into the job's `rungs` list.
37    pub rung_index: usize,
38    /// Human label, e.g. `"720p"`.
39    pub label: String,
40    /// Target width in pixels.
41    pub width: u32,
42    /// Target height in pixels.
43    pub height: u32,
44    /// Current status.
45    pub status: RungStatus,
46    /// Completion fraction in `0.0..=100.0`. Derived from
47    /// `frames_done / frames_total` when the total is known, else a coarse
48    /// stage estimate.
49    pub percent: f32,
50    /// Frames encoded so far for this rung.
51    pub frames_done: u64,
52    /// Total frames expected, if known up front (from the container header).
53    pub frames_total: Option<u64>,
54    /// Segments written (HLS/CMAF mode only; `0` for single-file).
55    pub segments_written: u32,
56    /// Output bytes produced so far for this rung.
57    pub bytes_out: u64,
58    /// Optional human message — error text on `Failed`, notes otherwise.
59    pub message: Option<String>,
60}
61
62impl RungProgress {
63    /// Construct a `Pending` update for a rung at job start.
64    pub fn pending(rung_index: usize, label: impl Into<String>, width: u32, height: u32) -> Self {
65        Self {
66            rung_index,
67            label: label.into(),
68            width,
69            height,
70            status: RungStatus::Pending,
71            percent: 0.0,
72            frames_done: 0,
73            frames_total: None,
74            segments_written: 0,
75            bytes_out: 0,
76            message: None,
77        }
78    }
79}
80
81/// Job-level lifecycle events, independent of any single rung.
82#[derive(Debug, Clone)]
83pub enum JobEvent {
84    /// Job accepted; `rungs` renditions will be produced.
85    Started { rungs: usize },
86    /// Source probed.
87    Probed {
88        codec: String,
89        width: u32,
90        height: u32,
91        frame_rate: f64,
92        audio_codec: Option<String>,
93    },
94    /// Job finished.
95    Finished {
96        rungs_completed: usize,
97        rungs_failed: usize,
98    },
99}
100
101/// Receiver for job progress. Implement to consume updates; or use
102/// [`channel_sink`] / [`fn_sink`] for the common cases.
103pub trait ProgressSink: Send + Sync {
104    /// Called with a fresh [`RungProgress`] each time a rung advances.
105    fn on_rung(&self, update: RungProgress);
106    /// Called for job-level lifecycle events. Default: ignore.
107    fn on_event(&self, _event: JobEvent) {}
108}
109
110/// A sink that drops every update. Useful as a default.
111pub struct NullSink;
112
113impl ProgressSink for NullSink {
114    fn on_rung(&self, _update: RungProgress) {}
115}
116
117/// Wraps a closure as a [`ProgressSink`].
118pub struct FnSink<F>(F);
119
120impl<F: Fn(RungProgress) + Send + Sync> ProgressSink for FnSink<F> {
121    fn on_rung(&self, update: RungProgress) {
122        (self.0)(update)
123    }
124}
125
126/// Build a [`ProgressSink`] from a closure: `fn_sink(|p| println!("{}", p.percent))`.
127pub fn fn_sink<F: Fn(RungProgress) + Send + Sync>(f: F) -> FnSink<F> {
128    FnSink(f)
129}
130
131/// A sink that forwards every [`RungProgress`] into a Tokio mpsc channel,
132/// turning the callback into an async stream the caller can `.recv().await`.
133/// Sends are non-blocking (`try_send`); if the channel is full or closed the
134/// update is dropped (progress is advisory, never load-bearing).
135pub struct ChannelSink {
136    tx: tokio::sync::mpsc::Sender<RungProgress>,
137}
138
139impl ChannelSink {
140    pub fn new(tx: tokio::sync::mpsc::Sender<RungProgress>) -> Self {
141        Self { tx }
142    }
143}
144
145impl ProgressSink for ChannelSink {
146    fn on_rung(&self, update: RungProgress) {
147        let _ = self.tx.try_send(update);
148    }
149}
150
151/// Convenience: wrap an mpsc sender as an `Arc<dyn ProgressSink>`.
152pub fn channel_sink(tx: tokio::sync::mpsc::Sender<RungProgress>) -> Arc<dyn ProgressSink> {
153    Arc::new(ChannelSink::new(tx))
154}