adder_codec_rs/utils/
simulproc.rs

1use crate::framer::driver::FramerMode::INSTANTANEOUS;
2use crate::framer::driver::{Framer, FramerBuilder};
3use crate::framer::scale_intensity;
4use crate::framer::scale_intensity::FrameValue;
5use crate::transcoder::source::framed::Framed;
6use crate::transcoder::source::video::Source;
7use adder_codec_core::DeltaT;
8use clap::Parser;
9use serde::Serialize;
10use std::cmp::max;
11use std::error::Error;
12use std::fs::File;
13use std::io;
14use std::io::{BufWriter, Write};
15
16use adder_codec_core::SourceCamera::FramedU8;
17use adder_codec_core::SourceType::U8;
18use adder_codec_core::{Event, TimeMode};
19use std::sync::mpsc::{channel, Receiver, Sender};
20use std::time::Instant;
21
22/// Command line argument parser
23#[derive(Parser, Debug, Default, serde::Deserialize)]
24#[clap(author, version, about, long_about = None)]
25pub struct SimulProcArgs {
26    /// Filename for args (optional; must be in .toml format)
27    #[clap(short, long, default_value = "")]
28    pub args_filename: String,
29
30    /// Use color? (For framed input, most likely)
31    #[clap(long, action)]
32    pub color_input: bool,
33
34    /// Number of ticks per input frame // TODO: modularize for different sources
35    #[clap(short, long, default_value_t = 255)]
36    pub ref_time: u32,
37
38    /// Max number of ticks for any event
39    #[clap(short, long, default_value_t = 15300)]
40    pub delta_t_max: u32,
41
42    /// Max number of input frames to transcode (0 = no limit)
43    #[clap(short, long, default_value_t = 0)]
44    pub frame_count_max: u32,
45
46    /// Index of first input frame to transcode
47    #[clap(long, default_value_t = 0)]
48    pub frame_idx_start: u32,
49
50    /// Show live view displays?
51    #[clap(short, long, action)]
52    pub show_display: bool,
53
54    /// Path to input file
55    #[clap(short, long, default_value = "./in.mp4")]
56    pub input_filename: String,
57
58    /// Path to output events file
59    #[clap(long, default_value = "")]
60    pub output_events_filename: String,
61
62    /// Path to output raw video file
63    #[clap(short, long, default_value = "./out")]
64    pub output_raw_video_filename: String,
65
66    /// Resize scale
67    #[clap(short('z'), long, default_value_t = 1.0)]
68    pub scale: f64,
69
70    /// CRF quality level
71    #[clap(long, default_value_t = 3)]
72    pub crf: u8,
73
74    /// Number of threads to use. If not provided, will default to the number of cores on the
75    /// system.
76    #[clap(long, default_value_t = 4)]
77    pub thread_count: u8,
78
79    /// Time mode for the v2 file
80    #[clap(long, default_value = "")]
81    pub time_mode: String,
82
83    #[clap(long, default_value = "")]
84    pub integration_mode: String,
85}
86
87/// A struct for simultaneously transcoding a video source to ADΔER and reconstructing a framed
88/// video from ADΔER
89pub struct SimulProcessor<W: Write + std::marker::Send + std::marker::Sync + 'static> {
90    /// Framed transcoder hook
91    pub source: Framed<W>,
92    thread_pool: tokio::runtime::Runtime,
93    events_tx: Sender<Vec<Vec<Event>>>,
94}
95
96impl<W: Write + std::marker::Send + std::marker::Sync + 'static> SimulProcessor<W> {
97    /// Create a new SimulProcessor
98    ///
99    /// # Arguments
100    ///
101    /// * `source`: [`Framed<W>`] source
102    /// * `ref_time`: ticks per source frame
103    /// * `output_path`: path to output file
104    /// * `frame_max`: max number of frames to transcode
105    /// * `num_threads`: number of threads to use
106    /// * `codec_version`: codec version
107    /// * `time_mode`: time mode
108    ///
109    /// returns: `Result<SimulProcessor<W>, Box<dyn Error, Global>>`
110    ///
111    /// # Examples
112    /// TODO: add examples
113    pub fn new<T>(
114        source: Framed<W>,
115        ref_time: DeltaT,
116        output_path: &str,
117        frame_max: i32,
118        num_threads: usize,
119        codec_version: u8,
120        time_mode: TimeMode,
121    ) -> Result<Self, Box<dyn Error>>
122    where
123        T: Clone
124            + std::marker::Sync
125            + std::marker::Send
126            + 'static
127            + scale_intensity::FrameValue
128            + std::default::Default
129            + std::marker::Copy
130            + FrameValue<Output = T>
131            + Serialize
132            + num_traits::Zero
133            + Into<f64>,
134    {
135        let thread_pool_framer = rayon::ThreadPoolBuilder::new()
136            .num_threads(max(num_threads / 2, 1))
137            .build()?;
138        let thread_pool_transcoder = tokio::runtime::Builder::new_multi_thread()
139            .worker_threads(max(num_threads, 1))
140            .build()?;
141        let reconstructed_frame_rate = source.source_fps;
142        // For instantaneous reconstruction, make sure the frame rate matches the source video rate
143        assert_eq!(
144            source.video.state.tps / ref_time,
145            reconstructed_frame_rate as u32
146        );
147
148        let plane = source.get_video_ref().state.plane;
149
150        let mut framer = thread_pool_framer.install(|| {
151            FramerBuilder::new(plane, source.video.state.chunk_rows)
152                .codec_version(codec_version, time_mode)
153                .time_parameters(
154                    source.video.state.tps,
155                    ref_time,
156                    source.video.state.params.delta_t_max,
157                    Some(reconstructed_frame_rate),
158                )
159                .mode(INSTANTANEOUS)
160                .source(U8, FramedU8)
161                .finish::<T>()
162        });
163
164        let mut output_stream = BufWriter::new(File::create(output_path)?);
165
166        let (events_tx, events_rx): (Sender<Vec<Vec<Event>>>, Receiver<Vec<Vec<Event>>>) =
167            channel();
168        let mut now = Instant::now();
169
170        // Spin off a thread for managing the input frame buffer. It will keep the buffer filled,
171        // and pre-process the next input frame (grayscale conversion and rescaling)
172        rayon::spawn(move || {
173            let mut frame_count = 1;
174            loop {
175                if let Ok(events) = events_rx.recv() {
176                    // assert_eq!(events.len(), (self.source.get_video().height as f64 / self.framer.chunk_rows as f64).ceil() as usize);
177
178                    // Frame the events
179                    if framer.ingest_events_events(events) {
180                        match framer.write_multi_frame_bytes(&mut output_stream) {
181                            Ok(0) => {
182                                eprintln!("Should have frame, but didn't");
183                                break;
184                            }
185                            Ok(frames_returned) => {
186                                frame_count += frames_returned;
187                                print!(
188                                    "\rOutput frame {}. Got {} frames in  {} ms/frame\t",
189                                    frame_count,
190                                    frames_returned,
191                                    now.elapsed().as_millis() / frames_returned as u128
192                                );
193                                if io::stdout().flush().is_err() {
194                                    eprintln!("Error flushing stdout");
195                                    break;
196                                };
197                                now = Instant::now();
198                            }
199                            Err(e) => {
200                                eprintln!("Error writing frame: {e}");
201                                break;
202                            }
203                        }
204                    }
205                    if output_stream.flush().is_err() {
206                        eprintln!("Error flushing output stream");
207                        break;
208                    }
209                    if frame_count >= frame_max && frame_max > 0 {
210                        eprintln!("Wrote max frames. Exiting channel.");
211                        break;
212                    }
213                } else {
214                    eprintln!("Event receiver is closed. Exiting channel.");
215                    break;
216                };
217            }
218        });
219
220        Ok(Self {
221            source,
222            thread_pool: thread_pool_transcoder,
223            events_tx,
224        })
225    }
226
227    /// Run the processor
228    /// This will run until the source is exhausted
229    pub fn run(&mut self, frame_max: u32) -> Result<(), Box<dyn Error>> {
230        let mut now = Instant::now();
231
232        loop {
233            match self.source.consume() {
234                Ok(events) => {
235                    match self.events_tx.send(events) {
236                        Ok(_) => {}
237                        Err(_) => {
238                            break;
239                        }
240                    };
241                }
242                Err(e) => {
243                    println!("Err: {e:?}");
244                    break;
245                }
246            };
247
248            let video = self.source.get_video_ref();
249
250            if video.state.in_interval_count % 30 == 0 {
251                print!(
252                    "\rFrame {} in  {}ms",
253                    video.state.in_interval_count,
254                    now.elapsed().as_millis()
255                );
256                if io::stdout().flush().is_err() {
257                    eprintln!("Error flushing stdout");
258                    break;
259                };
260                now = Instant::now();
261            }
262            if video.state.in_interval_count >= frame_max && frame_max > 0 {
263                eprintln!("Wrote max frames. Exiting channel.");
264                break;
265            }
266            // // TODO: temp
267            // if video.state.in_interval_count == 30 {
268            //     break;
269            // }
270        }
271
272        println!("Closing stream...");
273        self.source.get_video_mut().end_write_stream()?;
274        println!("FINISHED");
275
276        Ok(())
277    }
278}