adder_codec_rs/utils/
simulproc.rs1use crate::framer::driver::FramerMode::INSTANTANEOUS;
2use crate::framer::driver::{Framer, FramerBuilder};
3use crate::framer::scale_intensity;
4use crate::framer::scale_intensity::FrameValue;
5use crate::transcoder::source::framed::Framed;
6use crate::transcoder::source::video::Source;
7use adder_codec_core::DeltaT;
8use clap::Parser;
9use serde::Serialize;
10use std::cmp::max;
11use std::error::Error;
12use std::fs::File;
13use std::io;
14use std::io::{BufWriter, Write};
15
16use adder_codec_core::SourceCamera::FramedU8;
17use adder_codec_core::SourceType::U8;
18use adder_codec_core::{Event, TimeMode};
19use std::sync::mpsc::{channel, Receiver, Sender};
20use std::time::Instant;
21
22#[derive(Parser, Debug, Default, serde::Deserialize)]
24#[clap(author, version, about, long_about = None)]
25pub struct SimulProcArgs {
26 #[clap(short, long, default_value = "")]
28 pub args_filename: String,
29
30 #[clap(long, action)]
32 pub color_input: bool,
33
34 #[clap(short, long, default_value_t = 255)]
36 pub ref_time: u32,
37
38 #[clap(short, long, default_value_t = 15300)]
40 pub delta_t_max: u32,
41
42 #[clap(short, long, default_value_t = 0)]
44 pub frame_count_max: u32,
45
46 #[clap(long, default_value_t = 0)]
48 pub frame_idx_start: u32,
49
50 #[clap(short, long, action)]
52 pub show_display: bool,
53
54 #[clap(short, long, default_value = "./in.mp4")]
56 pub input_filename: String,
57
58 #[clap(long, default_value = "")]
60 pub output_events_filename: String,
61
62 #[clap(short, long, default_value = "./out")]
64 pub output_raw_video_filename: String,
65
66 #[clap(short('z'), long, default_value_t = 1.0)]
68 pub scale: f64,
69
70 #[clap(long, default_value_t = 3)]
72 pub crf: u8,
73
74 #[clap(long, default_value_t = 4)]
77 pub thread_count: u8,
78
79 #[clap(long, default_value = "")]
81 pub time_mode: String,
82
83 #[clap(long, default_value = "")]
84 pub integration_mode: String,
85}
86
87pub struct SimulProcessor<W: Write + std::marker::Send + std::marker::Sync + 'static> {
90 pub source: Framed<W>,
92 thread_pool: tokio::runtime::Runtime,
93 events_tx: Sender<Vec<Vec<Event>>>,
94}
95
96impl<W: Write + std::marker::Send + std::marker::Sync + 'static> SimulProcessor<W> {
97 pub fn new<T>(
114 source: Framed<W>,
115 ref_time: DeltaT,
116 output_path: &str,
117 frame_max: i32,
118 num_threads: usize,
119 codec_version: u8,
120 time_mode: TimeMode,
121 ) -> Result<Self, Box<dyn Error>>
122 where
123 T: Clone
124 + std::marker::Sync
125 + std::marker::Send
126 + 'static
127 + scale_intensity::FrameValue
128 + std::default::Default
129 + std::marker::Copy
130 + FrameValue<Output = T>
131 + Serialize
132 + num_traits::Zero
133 + Into<f64>,
134 {
135 let thread_pool_framer = rayon::ThreadPoolBuilder::new()
136 .num_threads(max(num_threads / 2, 1))
137 .build()?;
138 let thread_pool_transcoder = tokio::runtime::Builder::new_multi_thread()
139 .worker_threads(max(num_threads, 1))
140 .build()?;
141 let reconstructed_frame_rate = source.source_fps;
142 assert_eq!(
144 source.video.state.tps / ref_time,
145 reconstructed_frame_rate as u32
146 );
147
148 let plane = source.get_video_ref().state.plane;
149
150 let mut framer = thread_pool_framer.install(|| {
151 FramerBuilder::new(plane, source.video.state.chunk_rows)
152 .codec_version(codec_version, time_mode)
153 .time_parameters(
154 source.video.state.tps,
155 ref_time,
156 source.video.state.params.delta_t_max,
157 Some(reconstructed_frame_rate),
158 )
159 .mode(INSTANTANEOUS)
160 .source(U8, FramedU8)
161 .finish::<T>()
162 });
163
164 let mut output_stream = BufWriter::new(File::create(output_path)?);
165
166 let (events_tx, events_rx): (Sender<Vec<Vec<Event>>>, Receiver<Vec<Vec<Event>>>) =
167 channel();
168 let mut now = Instant::now();
169
170 rayon::spawn(move || {
173 let mut frame_count = 1;
174 loop {
175 if let Ok(events) = events_rx.recv() {
176 if framer.ingest_events_events(events) {
180 match framer.write_multi_frame_bytes(&mut output_stream) {
181 Ok(0) => {
182 eprintln!("Should have frame, but didn't");
183 break;
184 }
185 Ok(frames_returned) => {
186 frame_count += frames_returned;
187 print!(
188 "\rOutput frame {}. Got {} frames in {} ms/frame\t",
189 frame_count,
190 frames_returned,
191 now.elapsed().as_millis() / frames_returned as u128
192 );
193 if io::stdout().flush().is_err() {
194 eprintln!("Error flushing stdout");
195 break;
196 };
197 now = Instant::now();
198 }
199 Err(e) => {
200 eprintln!("Error writing frame: {e}");
201 break;
202 }
203 }
204 }
205 if output_stream.flush().is_err() {
206 eprintln!("Error flushing output stream");
207 break;
208 }
209 if frame_count >= frame_max && frame_max > 0 {
210 eprintln!("Wrote max frames. Exiting channel.");
211 break;
212 }
213 } else {
214 eprintln!("Event receiver is closed. Exiting channel.");
215 break;
216 };
217 }
218 });
219
220 Ok(Self {
221 source,
222 thread_pool: thread_pool_transcoder,
223 events_tx,
224 })
225 }
226
227 pub fn run(&mut self, frame_max: u32) -> Result<(), Box<dyn Error>> {
230 let mut now = Instant::now();
231
232 loop {
233 match self.source.consume() {
234 Ok(events) => {
235 match self.events_tx.send(events) {
236 Ok(_) => {}
237 Err(_) => {
238 break;
239 }
240 };
241 }
242 Err(e) => {
243 println!("Err: {e:?}");
244 break;
245 }
246 };
247
248 let video = self.source.get_video_ref();
249
250 if video.state.in_interval_count % 30 == 0 {
251 print!(
252 "\rFrame {} in {}ms",
253 video.state.in_interval_count,
254 now.elapsed().as_millis()
255 );
256 if io::stdout().flush().is_err() {
257 eprintln!("Error flushing stdout");
258 break;
259 };
260 now = Instant::now();
261 }
262 if video.state.in_interval_count >= frame_max && frame_max > 0 {
263 eprintln!("Wrote max frames. Exiting channel.");
264 break;
265 }
266 }
271
272 println!("Closing stream...");
273 self.source.get_video_mut().end_write_stream()?;
274 println!("FINISHED");
275
276 Ok(())
277 }
278}