rivet/progress.rs
1//! Job progress reporting.
2//!
3//! Every transcode job reports progress through a [`ProgressSink`] — a small
4//! trait that receives a **uniform** [`RungProgress`] struct (status,
5//! percentage, frame/segment/byte counters) for each rung as the job runs,
6//! plus coarse [`JobEvent`]s for job-level lifecycle.
7//!
8//! The sink methods are synchronous but are called *as the job progresses*
9//! (i.e. asynchronously with respect to completion). To bridge into async
10//! code — e.g. forward progress to a websocket/SQS reporter — wrap a
11//! `tokio::sync::mpsc::Sender` with [`channel_sink`], or implement
12//! [`ProgressSink`] and `try_send` into your own channel.
13
14use std::sync::Arc;
15
16/// Lifecycle status of a single rung (one rendition of the ABR ladder).
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum RungStatus {
19 /// Queued; no frames processed yet.
20 Pending,
21 /// Actively decoding + scaling + encoding frames.
22 Running,
23 /// Finalizing the container / writing playlists.
24 Finalizing,
25 /// Done — `percent == 100`.
26 Completed,
27 /// Errored out — see [`RungProgress::message`].
28 Failed,
29}
30
31/// A uniform progress update for one rung. Emitted repeatedly over the life
32/// of the job. Consumers can render a per-rung progress bar straight from
33/// these fields without knowing anything about the output mode.
34#[derive(Debug, Clone)]
35pub struct RungProgress {
36 /// Index into the job's `rungs` list.
37 pub rung_index: usize,
38 /// Human label, e.g. `"720p"`.
39 pub label: String,
40 /// Target width in pixels.
41 pub width: u32,
42 /// Target height in pixels.
43 pub height: u32,
44 /// Current status.
45 pub status: RungStatus,
46 /// Completion fraction in `0.0..=100.0`. Derived from
47 /// `frames_done / frames_total` when the total is known, else a coarse
48 /// stage estimate.
49 pub percent: f32,
50 /// Frames encoded so far for this rung.
51 pub frames_done: u64,
52 /// Total frames expected, if known up front (from the container header).
53 pub frames_total: Option<u64>,
54 /// Segments written (HLS/CMAF mode only; `0` for single-file).
55 pub segments_written: u32,
56 /// Output bytes produced so far for this rung.
57 pub bytes_out: u64,
58 /// Optional human message — error text on `Failed`, notes otherwise.
59 pub message: Option<String>,
60}
61
62impl RungProgress {
63 /// Construct a `Pending` update for a rung at job start.
64 pub fn pending(rung_index: usize, label: impl Into<String>, width: u32, height: u32) -> Self {
65 Self {
66 rung_index,
67 label: label.into(),
68 width,
69 height,
70 status: RungStatus::Pending,
71 percent: 0.0,
72 frames_done: 0,
73 frames_total: None,
74 segments_written: 0,
75 bytes_out: 0,
76 message: None,
77 }
78 }
79}
80
81/// Job-level lifecycle events, independent of any single rung.
82#[derive(Debug, Clone)]
83pub enum JobEvent {
84 /// Job accepted; `rungs` renditions will be produced.
85 Started { rungs: usize },
86 /// Source probed.
87 Probed {
88 codec: String,
89 width: u32,
90 height: u32,
91 frame_rate: f64,
92 audio_codec: Option<String>,
93 },
94 /// Job finished.
95 Finished {
96 rungs_completed: usize,
97 rungs_failed: usize,
98 },
99}
100
101/// Receiver for job progress. Implement to consume updates; or use
102/// [`channel_sink`] / [`fn_sink`] for the common cases.
103pub trait ProgressSink: Send + Sync {
104 /// Called with a fresh [`RungProgress`] each time a rung advances.
105 fn on_rung(&self, update: RungProgress);
106 /// Called for job-level lifecycle events. Default: ignore.
107 fn on_event(&self, _event: JobEvent) {}
108}
109
110/// A sink that drops every update. Useful as a default.
111pub struct NullSink;
112
113impl ProgressSink for NullSink {
114 fn on_rung(&self, _update: RungProgress) {}
115}
116
117/// Wraps a closure as a [`ProgressSink`].
118pub struct FnSink<F>(F);
119
120impl<F: Fn(RungProgress) + Send + Sync> ProgressSink for FnSink<F> {
121 fn on_rung(&self, update: RungProgress) {
122 (self.0)(update)
123 }
124}
125
126/// Build a [`ProgressSink`] from a closure: `fn_sink(|p| println!("{}", p.percent))`.
127pub fn fn_sink<F: Fn(RungProgress) + Send + Sync>(f: F) -> FnSink<F> {
128 FnSink(f)
129}
130
131/// A sink that forwards every [`RungProgress`] into a Tokio mpsc channel,
132/// turning the callback into an async stream the caller can `.recv().await`.
133/// Sends are non-blocking (`try_send`); if the channel is full or closed the
134/// update is dropped (progress is advisory, never load-bearing).
135pub struct ChannelSink {
136 tx: tokio::sync::mpsc::Sender<RungProgress>,
137}
138
139impl ChannelSink {
140 pub fn new(tx: tokio::sync::mpsc::Sender<RungProgress>) -> Self {
141 Self { tx }
142 }
143}
144
145impl ProgressSink for ChannelSink {
146 fn on_rung(&self, update: RungProgress) {
147 let _ = self.tx.try_send(update);
148 }
149}
150
151/// Convenience: wrap an mpsc sender as an `Arc<dyn ProgressSink>`.
152pub fn channel_sink(tx: tokio::sync::mpsc::Sender<RungProgress>) -> Arc<dyn ProgressSink> {
153 Arc::new(ChannelSink::new(tx))
154}