adder_codec_rs/utils/
simulproc.rs

1use crate::framer::driver::FramerMode::INSTANTANEOUS;
2use crate::framer::driver::{Framer, FramerBuilder};
3use crate::framer::scale_intensity;
4use crate::framer::scale_intensity::FrameValue;
5use crate::transcoder::source::framed::Framed;
6use crate::transcoder::source::video::Source;
7use adder_codec_core::DeltaT;
8use clap::Parser;
9use rayon::ThreadPool;
10use serde::Serialize;
11use std::cmp::max;
12use std::error::Error;
13use std::fs::File;
14use std::io;
15use std::io::{BufWriter, Write};
16
17use adder_codec_core::SourceCamera::FramedU8;
18use adder_codec_core::SourceType::U8;
19use adder_codec_core::{Event, TimeMode};
20use std::sync::mpsc::{channel, Receiver, Sender};
21use std::time::Instant;
22
23/// Command line argument parser
24#[derive(Parser, Debug, Default, serde::Deserialize)]
25#[clap(author, version, about, long_about = None)]
26pub struct SimulProcArgs {
27    /// Filename for args (optional; must be in .toml format)
28    #[clap(short, long, default_value = "")]
29    pub args_filename: String,
30
31    /// Use color? (For framed input, most likely)
32    #[clap(long, action)]
33    pub color_input: bool,
34
35    /// Number of ticks per input frame // TODO: modularize for different sources
36    #[clap(short, long, default_value_t = 255)]
37    pub ref_time: u32,
38
39    /// Max number of ticks for any event
40    #[clap(short, long, default_value_t = 15300)]
41    pub delta_t_max: u32,
42
43    /// Max number of input frames to transcode (0 = no limit)
44    #[clap(short, long, default_value_t = 0)]
45    pub frame_count_max: u32,
46
47    /// Index of first input frame to transcode
48    #[clap(long, default_value_t = 0)]
49    pub frame_idx_start: u32,
50
51    /// Show live view displays?
52    #[clap(short, long, action)]
53    pub show_display: bool,
54
55    /// Path to input file
56    #[clap(short, long, default_value = "./in.mp4")]
57    pub input_filename: String,
58
59    /// Path to output events file
60    #[clap(long, default_value = "")]
61    pub output_events_filename: String,
62
63    /// Path to output raw video file
64    #[clap(short, long, default_value = "./out")]
65    pub output_raw_video_filename: String,
66
67    /// Resize scale
68    #[clap(short('z'), long, default_value_t = 1.0)]
69    pub scale: f64,
70
71    /// CRF quality level
72    #[clap(long, default_value_t = 3)]
73    pub crf: u8,
74
75    /// Number of threads to use. If not provided, will default to the number of cores on the
76    /// system.
77    #[clap(long, default_value_t = 4)]
78    pub thread_count: u8,
79
80    /// Time mode for the v2 file
81    #[clap(long, default_value = "")]
82    pub time_mode: String,
83
84    #[clap(long, default_value = "")]
85    pub integration_mode: String,
86}
87
88/// A struct for simultaneously transcoding a video source to ADΔER and reconstructing a framed
89/// video from ADΔER
90pub struct SimulProcessor<W: Write + std::marker::Send + std::marker::Sync + 'static> {
91    /// Framed transcoder hook
92    pub source: Framed<W>,
93    thread_pool: tokio::runtime::Runtime,
94    events_tx: Sender<Vec<Vec<Event>>>,
95}
96
97impl<W: Write + std::marker::Send + std::marker::Sync + 'static> SimulProcessor<W> {
98    /// Create a new SimulProcessor
99    ///
100    /// # Arguments
101    ///
102    /// * `source`: [`Framed<W>`] source
103    /// * `ref_time`: ticks per source frame
104    /// * `output_path`: path to output file
105    /// * `frame_max`: max number of frames to transcode
106    /// * `num_threads`: number of threads to use
107    /// * `codec_version`: codec version
108    /// * `time_mode`: time mode
109    ///
110    /// returns: `Result<SimulProcessor<W>, Box<dyn Error, Global>>`
111    ///
112    /// # Examples
113    /// TODO: add examples
114    pub fn new<T>(
115        source: Framed<W>,
116        ref_time: DeltaT,
117        output_path: &str,
118        frame_max: i32,
119        num_threads: usize,
120        codec_version: u8,
121        time_mode: TimeMode,
122    ) -> Result<SimulProcessor<W>, Box<dyn Error>>
123    where
124        T: Clone
125            + std::marker::Sync
126            + std::marker::Send
127            + 'static
128            + scale_intensity::FrameValue
129            + std::default::Default
130            + std::marker::Copy
131            + FrameValue<Output = T>
132            + Serialize
133            + num_traits::Zero
134            + Into<f64>,
135    {
136        let thread_pool_framer = rayon::ThreadPoolBuilder::new()
137            .num_threads(max(num_threads / 2, 1))
138            .build()?;
139        let thread_pool_transcoder = tokio::runtime::Builder::new_multi_thread()
140            .worker_threads(max(num_threads, 1))
141            .build()?;
142        let reconstructed_frame_rate = source.source_fps;
143        // For instantaneous reconstruction, make sure the frame rate matches the source video rate
144        assert_eq!(
145            source.video.state.tps / ref_time,
146            reconstructed_frame_rate as u32
147        );
148
149        let plane = source.get_video_ref().state.plane;
150
151        let mut framer = thread_pool_framer.install(|| {
152            FramerBuilder::new(plane, source.video.state.chunk_rows)
153                .codec_version(codec_version, time_mode)
154                .time_parameters(
155                    source.video.state.tps,
156                    ref_time,
157                    source.video.state.params.delta_t_max,
158                    Some(reconstructed_frame_rate),
159                )
160                .mode(INSTANTANEOUS)
161                .source(U8, FramedU8)
162                .finish::<T>()
163        });
164
165        let mut output_stream = BufWriter::new(File::create(output_path)?);
166
167        let (events_tx, events_rx): (Sender<Vec<Vec<Event>>>, Receiver<Vec<Vec<Event>>>) =
168            channel();
169        let mut now = Instant::now();
170
171        // Spin off a thread for managing the input frame buffer. It will keep the buffer filled,
172        // and pre-process the next input frame (grayscale conversion and rescaling)
173        rayon::spawn(move || {
174            let mut frame_count = 1;
175            loop {
176                if let Ok(events) = events_rx.recv() {
177                    // assert_eq!(events.len(), (self.source.get_video().height as f64 / self.framer.chunk_rows as f64).ceil() as usize);
178
179                    // Frame the events
180                    if framer.ingest_events_events(events) {
181                        match framer.write_multi_frame_bytes(&mut output_stream) {
182                            Ok(0) => {
183                                eprintln!("Should have frame, but didn't");
184                                break;
185                            }
186                            Ok(frames_returned) => {
187                                frame_count += frames_returned;
188                                print!(
189                                    "\rOutput frame {}. Got {} frames in  {} ms/frame\t",
190                                    frame_count,
191                                    frames_returned,
192                                    now.elapsed().as_millis() / frames_returned as u128
193                                );
194                                if io::stdout().flush().is_err() {
195                                    eprintln!("Error flushing stdout");
196                                    break;
197                                };
198                                now = Instant::now();
199                            }
200                            Err(e) => {
201                                eprintln!("Error writing frame: {e}");
202                                break;
203                            }
204                        }
205                    }
206                    if output_stream.flush().is_err() {
207                        eprintln!("Error flushing output stream");
208                        break;
209                    }
210                    if frame_count >= frame_max && frame_max > 0 {
211                        eprintln!("Wrote max frames. Exiting channel.");
212                        break;
213                    }
214                } else {
215                    eprintln!("Event receiver is closed. Exiting channel.");
216                    break;
217                };
218            }
219        });
220
221        Ok(SimulProcessor {
222            source,
223            thread_pool: thread_pool_transcoder,
224            events_tx,
225        })
226    }
227
228    /// Run the processor
229    /// This will run until the source is exhausted
230    pub fn run(&mut self, frame_max: u32) -> Result<(), Box<dyn Error>> {
231        let mut now = Instant::now();
232
233        loop {
234            match self.source.consume() {
235                Ok(events) => {
236                    match self.events_tx.send(events) {
237                        Ok(_) => {}
238                        Err(_) => {
239                            break;
240                        }
241                    };
242                }
243                Err(e) => {
244                    println!("Err: {e:?}");
245                    break;
246                }
247            };
248
249            let video = self.source.get_video_ref();
250
251            if video.state.in_interval_count % 30 == 0 {
252                print!(
253                    "\rFrame {} in  {}ms",
254                    video.state.in_interval_count,
255                    now.elapsed().as_millis()
256                );
257                if io::stdout().flush().is_err() {
258                    eprintln!("Error flushing stdout");
259                    break;
260                };
261                now = Instant::now();
262            }
263            if video.state.in_interval_count >= frame_max && frame_max > 0 {
264                eprintln!("Wrote max frames. Exiting channel.");
265                break;
266            }
267            // // TODO: temp
268            // if video.state.in_interval_count == 30 {
269            //     break;
270            // }
271        }
272
273        println!("Closing stream...");
274        self.source.get_video_mut().end_write_stream()?;
275        println!("FINISHED");
276
277        Ok(())
278    }
279}