to_rgba_pipelined/
to_rgba_pipelined.rs1use std::{
10 collections::HashMap,
11 env,
12 io::{self, Write},
13 path::PathBuf,
14 sync::mpsc,
15 time::Instant,
16};
17
18use warb::{
19 Callback, Codec, Frame, HResult, Pipeline, ProcessedImage, ResolutionScale, ResourceFormat,
20};
21
22const WINDOW: u64 = 24;
23
24fn parse_scale(s: &str) -> Option<ResolutionScale> {
25 match s.to_ascii_lowercase().as_str() {
26 "full" | "1" => Some(ResolutionScale::Full),
27 "half" | "2" => Some(ResolutionScale::Half),
28 "quarter" | "qrtr" | "4" => Some(ResolutionScale::Quarter),
29 "eighth" | "eith" | "8" => Some(ResolutionScale::Eighth),
30 _ => None,
31 }
32}
33
34enum Event {
35 Image { slot: u64, image: ProcessedImage },
36 Failure { slot: u64, what: String },
37}
38
39struct PipelinedHandler {
40 tx: mpsc::Sender<Event>,
41 format: ResourceFormat,
42 scale: ResolutionScale,
43}
44
45impl PipelinedHandler {
46 fn fail(&self, slot: u64, what: impl Into<String>) {
47 let _ = self.tx.send(Event::Failure {
48 slot,
49 what: what.into(),
50 });
51 }
52}
53
54impl Callback for PipelinedHandler {
55 fn on_read_complete(&self, slot: u64, result: HResult, frame: Option<Frame>) {
58 if result.is_err() {
59 return self.fail(slot, format!("read failed: {result}"));
60 }
61 let Some(mut frame) = frame else {
62 return self.fail(slot, "read returned no frame");
63 };
64 if let Err(e) = frame.set_resource_format(self.format) {
65 return self.fail(slot, format!("SetResourceFormat: {e}"));
66 }
67 if let Err(e) = frame.set_resolution_scale(self.scale) {
68 return self.fail(slot, format!("SetResolutionScale: {e}"));
69 }
70 let mut job = match frame.create_decode_and_process_job() {
71 Ok(j) => j,
72 Err(e) => return self.fail(slot, format!("CreateJobDecodeAndProcessFrame: {e}")),
73 };
74 if let Err(e) = job.set_user_data(slot) {
75 return self.fail(slot, format!("SetUserData: {e}"));
76 }
77 if let Err(e) = job.submit() {
78 return self.fail(slot, format!("decode+process Submit: {e}"));
79 }
80 }
81
82 fn on_process_complete(&self, slot: u64, result: HResult, image: Option<ProcessedImage>) {
83 match (result.is_ok(), image) {
84 (true, Some(image)) => {
85 let _ = self.tx.send(Event::Image { slot, image });
86 }
87 (_, _) => self.fail(slot, format!("process failed: {result}")),
88 }
89 }
90}
91
92fn main() -> Result<(), Box<dyn std::error::Error>> {
93 let input = env::args()
94 .nth(1)
95 .map(PathBuf::from)
96 .unwrap_or_else(|| PathBuf::from("/tmp/braw-sample/A001_09091040_C068.braw"));
97 let limit: Option<u64> = env::args().nth(2).and_then(|s| s.parse().ok());
98 let scale = env::args()
99 .nth(3)
100 .map(|s| parse_scale(&s).expect("scale must be full/half/quarter/eighth"))
101 .unwrap_or(ResolutionScale::Full);
102
103 let (tx, rx) = mpsc::channel();
104 let format = ResourceFormat::RgbaU8;
105 let mut codec = Codec::new(PipelinedHandler { tx, format, scale })?;
106 codec.prepare_pipeline(Pipeline::Cpu)?;
107 codec.flush_jobs();
108
109 let mut clip = codec.open_clip(&input)?;
110 let (fw, fh) = (clip.width(), clip.height());
111 let fps = clip.frame_rate();
112 let total = limit.unwrap_or_else(|| clip.frame_count());
113
114 eprintln!("clip {fw}x{fh} @ {fps} fps, decoding {total} frames at {scale:?}, window={WINDOW}");
115
116 let mut out = io::stdout().lock();
119
120 let mut next_read: u64 = 0;
122 for _ in 0..WINDOW.min(total) {
123 let mut job = clip.create_read_job(next_read)?;
124 job.set_user_data(next_read)?;
125 job.submit()?;
126 next_read += 1;
127 }
128
129 let mut pending: HashMap<u64, ProcessedImage> = HashMap::new();
131 let mut emit: u64 = 0;
132 let start = Instant::now();
133
134 while emit < total {
135 match rx.recv()? {
136 Event::Image { slot, image } => {
137 pending.insert(slot, image);
138 }
139 Event::Failure { slot, what } => {
140 return Err(format!("slot {slot}: {what}").into());
141 }
142 }
143 while let Some(image) = pending.remove(&emit) {
144 if emit == 0 {
145 let (iw, ih) = (image.width(), image.height());
146 eprintln!(
147 "first image {iw}x{ih} {:?}, full buffer {} B, pixel bytes {} B",
148 image.format(),
149 image.size_bytes(),
150 image.data().len(),
151 );
152 eprintln!(
153 " ffmpeg -f rawvideo -pixel_format rgba -video_size {iw}x{ih} \
154 -framerate {fps} -i - -c:v libx264 -preset ultrafast -crf 23 \
155 -pix_fmt yuv420p out.mp4"
156 );
157 }
158 out.write_all(&image)?;
161 drop(image);
162 emit += 1;
163
164 if next_read < total {
165 let mut job = clip.create_read_job(next_read)?;
166 job.set_user_data(next_read)?;
167 job.submit()?;
168 next_read += 1;
169 }
170 if emit % 24 == 0 || emit == total {
171 let el = start.elapsed().as_secs_f64();
172 eprint!(
173 "\r frame {emit}/{total} ({:.2} fps, {:.0}% of realtime) ",
174 emit as f64 / el,
175 (emit as f64 / el) * 100.0 / fps as f64
176 );
177 }
178 }
179 }
180 codec.flush_jobs();
181 out.flush()?;
182 eprintln!();
183 Ok(())
184}