Skip to main content

to_rgba_pipelined/
to_rgba_pipelined.rs

1// Pipelined .braw → raw RGBA decoder. Keeps `WINDOW` read jobs in flight,
2// chains each one's decode+process from inside `on_read_complete`, and
3// emits images on stdout in frame order. This is the shape of use the
4// low-level API is built for.
5//
6//   BRAW_RUNTIME_DIR=/path/to/BlackmagicRawAPI \
7//   cargo run --release --example to_rgba_pipelined -- clip.braw > out.rgba
8
9use std::{
10    collections::HashMap,
11    env,
12    io::{self, Write},
13    path::PathBuf,
14    sync::mpsc,
15    time::Instant,
16};
17
18use warb::{
19    Callback, Codec, Frame, HResult, Pipeline, ProcessedImage, ResolutionScale, ResourceFormat,
20};
21
22const WINDOW: u64 = 24;
23
24fn parse_scale(s: &str) -> Option<ResolutionScale> {
25    match s.to_ascii_lowercase().as_str() {
26        "full" | "1" => Some(ResolutionScale::Full),
27        "half" | "2" => Some(ResolutionScale::Half),
28        "quarter" | "qrtr" | "4" => Some(ResolutionScale::Quarter),
29        "eighth" | "eith" | "8" => Some(ResolutionScale::Eighth),
30        _ => None,
31    }
32}
33
34enum Event {
35    Image { slot: u64, image: ProcessedImage },
36    Failure { slot: u64, what: String },
37}
38
39struct PipelinedHandler {
40    tx: mpsc::Sender<Event>,
41    format: ResourceFormat,
42    scale: ResolutionScale,
43}
44
45impl PipelinedHandler {
46    fn fail(&self, slot: u64, what: impl Into<String>) {
47        let _ = self.tx.send(Event::Failure {
48            slot,
49            what: what.into(),
50        });
51    }
52}
53
54impl Callback for PipelinedHandler {
55    // Called from an SDK worker thread once the bitstream is on-host. We
56    // chain straight into decode+process so the worker pool stays busy.
57    fn on_read_complete(&self, slot: u64, result: HResult, frame: Option<Frame>) {
58        if result.is_err() {
59            return self.fail(slot, format!("read failed: {result}"));
60        }
61        let Some(mut frame) = frame else {
62            return self.fail(slot, "read returned no frame");
63        };
64        if let Err(e) = frame.set_resource_format(self.format) {
65            return self.fail(slot, format!("SetResourceFormat: {e}"));
66        }
67        if let Err(e) = frame.set_resolution_scale(self.scale) {
68            return self.fail(slot, format!("SetResolutionScale: {e}"));
69        }
70        let mut job = match frame.create_decode_and_process_job() {
71            Ok(j) => j,
72            Err(e) => return self.fail(slot, format!("CreateJobDecodeAndProcessFrame: {e}")),
73        };
74        if let Err(e) = job.set_user_data(slot) {
75            return self.fail(slot, format!("SetUserData: {e}"));
76        }
77        if let Err(e) = job.submit() {
78            return self.fail(slot, format!("decode+process Submit: {e}"));
79        }
80    }
81
82    fn on_process_complete(&self, slot: u64, result: HResult, image: Option<ProcessedImage>) {
83        match (result.is_ok(), image) {
84            (true, Some(image)) => {
85                let _ = self.tx.send(Event::Image { slot, image });
86            }
87            (_, _) => self.fail(slot, format!("process failed: {result}")),
88        }
89    }
90}
91
92fn main() -> Result<(), Box<dyn std::error::Error>> {
93    let input = env::args()
94        .nth(1)
95        .map(PathBuf::from)
96        .unwrap_or_else(|| PathBuf::from("/tmp/braw-sample/A001_09091040_C068.braw"));
97    let limit: Option<u64> = env::args().nth(2).and_then(|s| s.parse().ok());
98    let scale = env::args()
99        .nth(3)
100        .map(|s| parse_scale(&s).expect("scale must be full/half/quarter/eighth"))
101        .unwrap_or(ResolutionScale::Full);
102
103    let (tx, rx) = mpsc::channel();
104    let format = ResourceFormat::RgbaU8;
105    let mut codec = Codec::new(PipelinedHandler { tx, format, scale })?;
106    codec.prepare_pipeline(Pipeline::Cpu)?;
107    codec.flush_jobs();
108
109    let mut clip = codec.open_clip(&input)?;
110    let (fw, fh) = (clip.width(), clip.height());
111    let fps = clip.frame_rate();
112    let total = limit.unwrap_or_else(|| clip.frame_count());
113
114    eprintln!("clip {fw}x{fh} @ {fps} fps, decoding {total} frames at {scale:?}, window={WINDOW}");
115
116    // Frames are multi-MB, already larger than any reasonable BufWriter
117    // buffer, so buffering would be a no-op — write to the raw stdout.
118    let mut out = io::stdout().lock();
119
120    // Prime the window.
121    let mut next_read: u64 = 0;
122    for _ in 0..WINDOW.min(total) {
123        let mut job = clip.create_read_job(next_read)?;
124        job.set_user_data(next_read)?;
125        job.submit()?;
126        next_read += 1;
127    }
128
129    // Drain the channel, emit images in order, top up the window as we go.
130    let mut pending: HashMap<u64, ProcessedImage> = HashMap::new();
131    let mut emit: u64 = 0;
132    let start = Instant::now();
133
134    while emit < total {
135        match rx.recv()? {
136            Event::Image { slot, image } => {
137                pending.insert(slot, image);
138            }
139            Event::Failure { slot, what } => {
140                return Err(format!("slot {slot}: {what}").into());
141            }
142        }
143        while let Some(image) = pending.remove(&emit) {
144            if emit == 0 {
145                let (iw, ih) = (image.width(), image.height());
146                eprintln!(
147                    "first image {iw}x{ih} {:?}, full buffer {} B, pixel bytes {} B",
148                    image.format(),
149                    image.size_bytes(),
150                    image.data().len(),
151                );
152                eprintln!(
153                    "  ffmpeg -f rawvideo -pixel_format rgba -video_size {iw}x{ih} \
154                     -framerate {fps} -i - -c:v libx264 -preset ultrafast -crf 23 \
155                     -pix_fmt yuv420p out.mp4"
156                );
157            }
158            // ProcessedImage derefs to the truncated pixel slice; SDK tail
159            // padding is dropped so the rawvideo stream stays aligned.
160            out.write_all(&image)?;
161            drop(image);
162            emit += 1;
163
164            if next_read < total {
165                let mut job = clip.create_read_job(next_read)?;
166                job.set_user_data(next_read)?;
167                job.submit()?;
168                next_read += 1;
169            }
170            if emit % 24 == 0 || emit == total {
171                let el = start.elapsed().as_secs_f64();
172                eprint!(
173                    "\r  frame {emit}/{total} ({:.2} fps, {:.0}% of realtime)    ",
174                    emit as f64 / el,
175                    (emit as f64 / el) * 100.0 / fps as f64
176                );
177            }
178        }
179    }
180    codec.flush_jobs();
181    out.flush()?;
182    eprintln!();
183    Ok(())
184}