adder_codec_rs/utils/
simulproc.rs1use crate::framer::driver::FramerMode::INSTANTANEOUS;
2use crate::framer::driver::{Framer, FramerBuilder};
3use crate::framer::scale_intensity;
4use crate::framer::scale_intensity::FrameValue;
5use crate::transcoder::source::framed::Framed;
6use crate::transcoder::source::video::Source;
7use adder_codec_core::DeltaT;
8use clap::Parser;
9use rayon::ThreadPool;
10use serde::Serialize;
11use std::cmp::max;
12use std::error::Error;
13use std::fs::File;
14use std::io;
15use std::io::{BufWriter, Write};
16
17use adder_codec_core::SourceCamera::FramedU8;
18use adder_codec_core::SourceType::U8;
19use adder_codec_core::{Event, TimeMode};
20use std::sync::mpsc::{channel, Receiver, Sender};
21use std::time::Instant;
22
23#[derive(Parser, Debug, Default, serde::Deserialize)]
25#[clap(author, version, about, long_about = None)]
26pub struct SimulProcArgs {
27 #[clap(short, long, default_value = "")]
29 pub args_filename: String,
30
31 #[clap(long, action)]
33 pub color_input: bool,
34
35 #[clap(short, long, default_value_t = 255)]
37 pub ref_time: u32,
38
39 #[clap(short, long, default_value_t = 15300)]
41 pub delta_t_max: u32,
42
43 #[clap(short, long, default_value_t = 0)]
45 pub frame_count_max: u32,
46
47 #[clap(long, default_value_t = 0)]
49 pub frame_idx_start: u32,
50
51 #[clap(short, long, action)]
53 pub show_display: bool,
54
55 #[clap(short, long, default_value = "./in.mp4")]
57 pub input_filename: String,
58
59 #[clap(long, default_value = "")]
61 pub output_events_filename: String,
62
63 #[clap(short, long, default_value = "./out")]
65 pub output_raw_video_filename: String,
66
67 #[clap(short('z'), long, default_value_t = 1.0)]
69 pub scale: f64,
70
71 #[clap(long, default_value_t = 3)]
73 pub crf: u8,
74
75 #[clap(long, default_value_t = 4)]
78 pub thread_count: u8,
79
80 #[clap(long, default_value = "")]
82 pub time_mode: String,
83
84 #[clap(long, default_value = "")]
85 pub integration_mode: String,
86}
87
88pub struct SimulProcessor<W: Write + std::marker::Send + std::marker::Sync + 'static> {
91 pub source: Framed<W>,
93 thread_pool: tokio::runtime::Runtime,
94 events_tx: Sender<Vec<Vec<Event>>>,
95}
96
97impl<W: Write + std::marker::Send + std::marker::Sync + 'static> SimulProcessor<W> {
98 pub fn new<T>(
115 source: Framed<W>,
116 ref_time: DeltaT,
117 output_path: &str,
118 frame_max: i32,
119 num_threads: usize,
120 codec_version: u8,
121 time_mode: TimeMode,
122 ) -> Result<SimulProcessor<W>, Box<dyn Error>>
123 where
124 T: Clone
125 + std::marker::Sync
126 + std::marker::Send
127 + 'static
128 + scale_intensity::FrameValue
129 + std::default::Default
130 + std::marker::Copy
131 + FrameValue<Output = T>
132 + Serialize
133 + num_traits::Zero
134 + Into<f64>,
135 {
136 let thread_pool_framer = rayon::ThreadPoolBuilder::new()
137 .num_threads(max(num_threads / 2, 1))
138 .build()?;
139 let thread_pool_transcoder = tokio::runtime::Builder::new_multi_thread()
140 .worker_threads(max(num_threads, 1))
141 .build()?;
142 let reconstructed_frame_rate = source.source_fps;
143 assert_eq!(
145 source.video.state.tps / ref_time,
146 reconstructed_frame_rate as u32
147 );
148
149 let plane = source.get_video_ref().state.plane;
150
151 let mut framer = thread_pool_framer.install(|| {
152 FramerBuilder::new(plane, source.video.state.chunk_rows)
153 .codec_version(codec_version, time_mode)
154 .time_parameters(
155 source.video.state.tps,
156 ref_time,
157 source.video.state.params.delta_t_max,
158 Some(reconstructed_frame_rate),
159 )
160 .mode(INSTANTANEOUS)
161 .source(U8, FramedU8)
162 .finish::<T>()
163 });
164
165 let mut output_stream = BufWriter::new(File::create(output_path)?);
166
167 let (events_tx, events_rx): (Sender<Vec<Vec<Event>>>, Receiver<Vec<Vec<Event>>>) =
168 channel();
169 let mut now = Instant::now();
170
171 rayon::spawn(move || {
174 let mut frame_count = 1;
175 loop {
176 if let Ok(events) = events_rx.recv() {
177 if framer.ingest_events_events(events) {
181 match framer.write_multi_frame_bytes(&mut output_stream) {
182 Ok(0) => {
183 eprintln!("Should have frame, but didn't");
184 break;
185 }
186 Ok(frames_returned) => {
187 frame_count += frames_returned;
188 print!(
189 "\rOutput frame {}. Got {} frames in {} ms/frame\t",
190 frame_count,
191 frames_returned,
192 now.elapsed().as_millis() / frames_returned as u128
193 );
194 if io::stdout().flush().is_err() {
195 eprintln!("Error flushing stdout");
196 break;
197 };
198 now = Instant::now();
199 }
200 Err(e) => {
201 eprintln!("Error writing frame: {e}");
202 break;
203 }
204 }
205 }
206 if output_stream.flush().is_err() {
207 eprintln!("Error flushing output stream");
208 break;
209 }
210 if frame_count >= frame_max && frame_max > 0 {
211 eprintln!("Wrote max frames. Exiting channel.");
212 break;
213 }
214 } else {
215 eprintln!("Event receiver is closed. Exiting channel.");
216 break;
217 };
218 }
219 });
220
221 Ok(SimulProcessor {
222 source,
223 thread_pool: thread_pool_transcoder,
224 events_tx,
225 })
226 }
227
228 pub fn run(&mut self, frame_max: u32) -> Result<(), Box<dyn Error>> {
231 let mut now = Instant::now();
232
233 loop {
234 match self.source.consume() {
235 Ok(events) => {
236 match self.events_tx.send(events) {
237 Ok(_) => {}
238 Err(_) => {
239 break;
240 }
241 };
242 }
243 Err(e) => {
244 println!("Err: {e:?}");
245 break;
246 }
247 };
248
249 let video = self.source.get_video_ref();
250
251 if video.state.in_interval_count % 30 == 0 {
252 print!(
253 "\rFrame {} in {}ms",
254 video.state.in_interval_count,
255 now.elapsed().as_millis()
256 );
257 if io::stdout().flush().is_err() {
258 eprintln!("Error flushing stdout");
259 break;
260 };
261 now = Instant::now();
262 }
263 if video.state.in_interval_count >= frame_max && frame_max > 0 {
264 eprintln!("Wrote max frames. Exiting channel.");
265 break;
266 }
267 }
272
273 println!("Closing stream...");
274 self.source.get_video_mut().end_write_stream()?;
275 println!("FINISHED");
276
277 Ok(())
278 }
279}