adder_codec_rs/framer/
driver.rs

1use crate::framer::scale_intensity::{FrameValue, SaeTime};
2use bincode::config::{BigEndian, FixintEncoding, WithOtherEndian, WithOtherIntEncoding};
3use bincode::{DefaultOptions, Options};
4use rayon::iter::ParallelIterator;
5
6use std::collections::VecDeque;
7use std::error::Error;
8use std::fmt;
9
10use adder_codec_core::{
11    BigT, Coord, DeltaT, Event, PlaneSize, SourceCamera, SourceType, TimeMode, D_EMPTY,
12};
13use std::fs::File;
14use std::io::BufWriter;
15
16// Want one main framer with the same functions
17// Want additional functions
18// Want ability to get instantaneous frames at a fixed interval, or at api-spec'd times
19// Want ability to get full integration frames at a fixed interval, or at api-spec'd times
20
21/// The mode for how a `Framer` should handle events which span multiple frames and frames
22/// spanning multiple events.
23#[derive(Debug, Copy, Clone, PartialEq, Eq)]
24pub enum FramerMode {
25    /// Each frame's pixel values are derived from only the _last_ event which spanned the
26    /// frame's integration period.
27    INSTANTANEOUS,
28
29    /// The frame is the sum of all events in the integration period.
30    INTEGRATION,
31}
32
33/// Builder for a Framer.
34#[derive(Clone)]
35#[must_use]
36pub struct FramerBuilder {
37    plane: PlaneSize,
38    tps: DeltaT,
39    output_fps: Option<f32>,
40    mode: FramerMode,
41    view_mode: FramedViewMode,
42    source: SourceType,
43    codec_version: u8,
44    source_camera: SourceCamera,
45    time_mode: TimeMode,
46    ref_interval: DeltaT,
47    delta_t_max: DeltaT,
48    detect_features: bool,
49    buffer_limit: Option<u32>,
50
51    /// The number of rows to process in each chunk (thread).
52    pub chunk_rows: usize,
53}
54
55impl FramerBuilder {
56    /// Create a new [`FramerBuilder`].
57    pub fn new(plane: PlaneSize, chunk_rows: usize) -> Self {
58        Self {
59            plane,
60            chunk_rows,
61            tps: 150_000,
62            output_fps: None,
63            mode: FramerMode::INSTANTANEOUS,
64            view_mode: FramedViewMode::Intensity,
65            source: SourceType::U8,
66            codec_version: 3,
67            source_camera: SourceCamera::default(),
68            time_mode: TimeMode::default(),
69            ref_interval: 5000,
70            delta_t_max: 5000,
71            detect_features: false,
72            buffer_limit: None,
73        }
74    }
75
76    /// Set the time parameters.
77    pub fn time_parameters(
78        mut self,
79        tps: DeltaT,
80        ref_interval: DeltaT,
81        delta_t_max: DeltaT,
82        output_fps: Option<f32>,
83    ) -> Self {
84        self.tps = tps;
85        self.ref_interval = ref_interval;
86        self.delta_t_max = delta_t_max;
87        self.output_fps = output_fps;
88        self
89    }
90
91    /// Limit the size of the reconstruction frame buffer (for speed/latency)
92    pub fn buffer_limit(mut self, buffer_limit: Option<u32>) -> Self {
93        self.buffer_limit = buffer_limit;
94        self
95    }
96
97    /// Set the framer mode.
98    pub fn mode(mut self, mode: FramerMode) -> Self {
99        self.mode = mode;
100        self
101    }
102
103    /// Set the view mode.
104    pub fn view_mode(mut self, mode: FramedViewMode) -> Self {
105        self.view_mode = mode;
106        self
107    }
108
109    /// Set the source type and camera.
110    pub fn source(mut self, source: SourceType, source_camera: SourceCamera) -> Self {
111        self.source_camera = source_camera;
112        self.source = source;
113        self
114    }
115
116    /// Set the codec version and time mode.
117    pub fn codec_version(mut self, codec_version: u8, time_mode: TimeMode) -> Self {
118        self.codec_version = codec_version;
119        self.time_mode = time_mode;
120        self
121    }
122
123    /// Build a [`Framer`].
124    /// TODO: Make this return a result
125    #[must_use]
126    pub fn finish<T>(self) -> FrameSequence<T>
127    where
128        T: FrameValue<Output = T>
129            + Default
130            + Send
131            + Serialize
132            + Sync
133            + std::marker::Copy
134            + num_traits::Zero
135            + Into<f64>,
136    {
137        FrameSequence::<T>::new(self)
138    }
139
140    /// Set whether to detect features.
141    pub fn detect_features(mut self, detect_features: bool) -> Self {
142        self.detect_features = detect_features;
143        self
144    }
145}
146
147/// A trait for accumulating ADΔER events into frames.
148pub trait Framer {
149    /// The type of the output frame.
150    type Output;
151    /// Create a new [`Framer`] with the given [`FramerBuilder`].
152    fn new(builder: FramerBuilder) -> Self;
153
154    /// Ingest an ADΔER event. Will process differently depending on choice of [`FramerMode`].
155    ///
156    /// If [INSTANTANEOUS](FramerMode::INSTANTANEOUS), this function will set the corresponding output frame's pixel value to
157    /// the value derived from this [`Event`], only if this is the first value ingested for that
158    /// pixel and frame. Otherwise, the operation will silently be ignored.
159    ///
160    /// If [INTEGRATION](FramerMode::INTEGRATION), this function will integrate this [`Event`] value for the corresponding
161    /// output frame(s)
162    fn ingest_event(&mut self, event: &mut Event, last_event: Option<Event>) -> bool;
163
164    /// Ingest a vector of a vector of ADΔER events.
165    fn ingest_events_events(&mut self, events: Vec<Vec<Event>>) -> bool;
166    /// For all frames left that we haven't written out yet, for any None pixels, set them to the
167    /// last recorded intensity for that pixel.
168    ///
169    /// Returns `true` if there are frames now ready to write out
170    fn flush_frame_buffer(&mut self) -> bool;
171
172    fn detect_features(&mut self, detect_features: bool);
173}
174
175#[derive(Debug, Clone, Default)]
176pub(crate) struct Frame<T> {
177    pub(crate) array: Array3<T>,
178    pub(crate) filled_count: usize,
179}
180
181/// Errors that can occur when working with [`FrameSequence`]
182#[derive(Debug)]
183pub enum FrameSequenceError {
184    /// Frame index out of bounds
185    InvalidIndex,
186
187    /// Frame not initialized
188    UninitializedFrame,
189
190    /// Frame not initialized
191    UninitializedFrameChunk,
192
193    /// An impossible "fill count" encountered
194    BadFillCount,
195}
196
197impl fmt::Display for FrameSequenceError {
198    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
199        match self {
200            FrameSequenceError::InvalidIndex => write!(f, "Invalid frame index"),
201            FrameSequenceError::UninitializedFrame => write!(f, "Uninitialized frame"),
202            FrameSequenceError::UninitializedFrameChunk => write!(f, "Uninitialized frame chunk"),
203            FrameSequenceError::BadFillCount => write!(f, "Bad fill count"),
204        }
205    }
206}
207
208impl From<FrameSequenceError> for Box<dyn std::error::Error> {
209    fn from(value: FrameSequenceError) -> Self {
210        value.to_string().into()
211    }
212}
213
214// impl Display for FrameSequenceError {
215//     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
216//         todo!()
217//     }
218// }
219//
220// impl std::error::Error for FrameSequenceError {}
221
222/// The state of a [`FrameSequence`]
223pub struct FrameSequenceState {
224    /// The number of frames written to the output so far
225    frames_written: i64,
226    plane: PlaneSize,
227
228    /// Ticks per output frame
229    pub tpf: DeltaT,
230
231    /// Ticks per second
232    pub tps: DeltaT,
233    pub(crate) source: SourceType,
234    codec_version: u8,
235    source_camera: SourceCamera,
236    ref_interval: DeltaT,
237    source_dtm: DeltaT,
238    view_mode: FramedViewMode,
239    time_mode: TimeMode,
240}
241
242impl FrameSequenceState {
243    pub fn reset(&mut self) {
244        self.frames_written = 0;
245    }
246
247    pub fn view_mode(&mut self, view_mode: FramedViewMode) {
248        self.view_mode = view_mode;
249    }
250}
251
252/// Associates detected features with the source time in which they were detected (since ADDER
253/// events may arrive out of order)
254pub struct FeatureInterval {
255    end_ts: BigT,
256    pub features: Vec<Coord>,
257}
258
259/// A sequence of frames, each of which is a 3D array of [`FrameValue`]s
260#[allow(dead_code)]
261pub struct FrameSequence<T> {
262    /// The state of the frame sequence
263    pub state: FrameSequenceState,
264    pub(crate) frames: Vec<VecDeque<Frame<Option<T>>>>,
265    pub(crate) frame_idx_offsets: Vec<i64>,
266    pub(crate) pixel_ts_tracker: Vec<Array3<BigT>>,
267    pub(crate) last_filled_tracker: Vec<Array3<i64>>,
268    pub(crate) last_frame_intensity_tracker: Vec<Array3<T>>,
269    chunk_filled_tracker: Vec<bool>,
270    pub(crate) mode: FramerMode,
271    pub(crate) detect_features: bool,
272    pub(crate) features: VecDeque<FeatureInterval>,
273    pub buffer_limit: Option<u32>,
274
275    pub(crate) running_intensities: Array3<u8>,
276
277    /// Number of rows per chunk (per thread)
278    pub chunk_rows: usize,
279    bincode: WithOtherEndian<WithOtherIntEncoding<DefaultOptions, FixintEncoding>, BigEndian>,
280}
281
282use ndarray::{Array, Array3};
283
284use crate::transcoder::source::video::FramedViewMode;
285use crate::utils::cv::is_feature;
286use rayon::prelude::IntoParallelIterator;
287use serde::Serialize;
288
289impl<
290        T: Clone
291            + Default
292            + FrameValue<Output = T>
293            + Copy
294            + Serialize
295            + Send
296            + Sync
297            + num_traits::identities::Zero
298            + Into<f64>,
299    > Framer for FrameSequence<T>
300{
301    type Output = T;
302    fn new(builder: FramerBuilder) -> Self {
303        let plane = &builder.plane;
304
305        let chunk_rows = builder.chunk_rows;
306        assert!(chunk_rows > 0);
307
308        let num_chunks: usize = ((builder.plane.h()) as f64 / chunk_rows as f64).ceil() as usize;
309        let last_chunk_rows = builder.plane.h_usize() - (num_chunks - 1) * chunk_rows;
310
311        assert!(num_chunks > 0);
312        let array: Array3<Option<T>> =
313            Array3::<Option<T>>::default((chunk_rows, plane.w_usize(), plane.c_usize()));
314        let last_array: Array3<Option<T>> =
315            Array3::<Option<T>>::default((last_chunk_rows, plane.w_usize(), plane.c_usize()));
316
317        let mut frames = vec![
318            VecDeque::from(vec![Frame {
319                array,
320                filled_count: 0,
321            }]);
322            num_chunks
323        ];
324
325        // Override the last chunk, in case the chunk size doesn't perfectly divide the number of rows
326        if let Some(last) = frames.last_mut() {
327            *last = VecDeque::from(vec![Frame {
328                array: last_array,
329                filled_count: 0,
330            }]);
331        };
332
333        let mut pixel_ts_tracker: Vec<Array3<BigT>> =
334            vec![Array3::zeros((chunk_rows, plane.w_usize(), plane.c_usize())); num_chunks];
335        if let Some(last) = pixel_ts_tracker.last_mut() {
336            *last = Array3::zeros((last_chunk_rows, plane.w_usize(), plane.c_usize()));
337        };
338
339        let mut last_frame_intensity_tracker: Vec<Array3<T>> =
340            vec![Array3::zeros((chunk_rows, plane.w_usize(), plane.c_usize())); num_chunks];
341        if let Some(last) = last_frame_intensity_tracker.last_mut() {
342            *last = Array3::zeros((last_chunk_rows, plane.w_usize(), plane.c_usize()));
343        };
344
345        let mut last_filled_tracker: Vec<Array3<i64>> =
346            vec![Array3::zeros((chunk_rows, plane.w_usize(), plane.c_usize())); num_chunks];
347        if let Some(last) = last_filled_tracker.last_mut() {
348            *last = Array3::zeros((last_chunk_rows, plane.w_usize(), plane.c_usize()));
349        };
350        for chunk in &mut last_filled_tracker {
351            for mut row in chunk.rows_mut() {
352                row.fill(-1);
353            }
354        }
355
356        let tpf = if let Some(output_fps) = builder.output_fps {
357            (builder.tps as f32 / output_fps) as u32
358        } else {
359            builder.ref_interval
360        };
361
362        // Array3::<Option<T>>::new(num_rows, num_cols, num_channels);
363        FrameSequence {
364            state: FrameSequenceState {
365                plane: *plane,
366                frames_written: 0,
367                view_mode: builder.view_mode,
368                tpf,
369                tps: builder.tps,
370                source: builder.source,
371                codec_version: builder.codec_version,
372                source_camera: builder.source_camera,
373                ref_interval: builder.ref_interval,
374                source_dtm: builder.delta_t_max,
375                time_mode: builder.time_mode,
376            },
377            frames,
378            frame_idx_offsets: vec![0; num_chunks],
379            pixel_ts_tracker,
380            last_filled_tracker,
381            last_frame_intensity_tracker,
382            chunk_filled_tracker: vec![false; num_chunks],
383            mode: builder.mode,
384            running_intensities: Array::zeros((
385                builder.plane.h_usize(),
386                builder.plane.w_usize(),
387                builder.plane.c_usize(),
388            )),
389            detect_features: builder.detect_features,
390            buffer_limit: builder.buffer_limit,
391            features: VecDeque::with_capacity(
392                (builder.delta_t_max / builder.ref_interval) as usize,
393            ),
394            chunk_rows,
395            bincode: DefaultOptions::new()
396                .with_fixint_encoding()
397                .with_big_endian(),
398        }
399    }
400
401    fn detect_features(&mut self, detect_features: bool) {
402        self.detect_features = detect_features;
403    }
404
405    ///
406    ///
407    /// # Examples
408    ///
409    /// ```
410    /// # use adder_codec_core::{Coord, Event, PlaneSize, TimeMode};
411    /// # use adder_codec_core::SourceCamera::FramedU8;
412    /// # use adder_codec_core::SourceType::U8;
413    /// # use adder_codec_rs::framer::driver::FramerMode::INSTANTANEOUS;
414    /// # use adder_codec_rs::framer::driver::{FrameSequence, Framer, FramerBuilder};
415    ///
416    /// let mut frame_sequence: FrameSequence<u8> =
417    /// FramerBuilder::new(
418    ///             PlaneSize::new(10,10,3).unwrap(), 64)
419    ///             .codec_version(1, TimeMode::DeltaT)
420    ///             .time_parameters(50000, 1000, 1000, Some(50.0))
421    ///             .mode(INSTANTANEOUS)
422    ///             .source(U8, FramedU8)
423    ///             .finish();
424    /// let mut event: Event = Event {
425    ///         coord: Coord {
426    ///             x: 5,
427    ///             y: 5,
428    ///             c: Some(1)
429    ///         },
430    ///         d: 5,
431    ///         t: 1000
432    ///     };
433    /// frame_sequence.ingest_event(&mut event, None);
434    /// let elem = frame_sequence.px_at_current(5, 5, 1).unwrap();
435    /// assert_eq!(*elem, Some(32));
436    /// ```
437    fn ingest_event(&mut self, event: &mut Event, last_event: Option<Event>) -> bool {
438        let channel = event.coord.c.unwrap_or(0);
439        let chunk_num = event.coord.y as usize / self.chunk_rows;
440
441        // Silently handle malformed event
442        if chunk_num >= self.frames.len() {
443            return false;
444        }
445
446        let time = event.t;
447        event.coord.y -= (chunk_num * self.chunk_rows) as u16; // Modify the coordinate here, so it gets ingested at the right place
448
449        let frame_chunk = &mut self.frames[chunk_num];
450        let last_filled_frame_ref = &mut self.last_filled_tracker[chunk_num]
451            [[event.coord.y.into(), event.coord.x.into(), channel.into()]];
452        let running_ts_ref = &mut self.pixel_ts_tracker[chunk_num]
453            [[event.coord.y.into(), event.coord.x.into(), channel.into()]];
454        let frame_idx_offset = &mut self.frame_idx_offsets[chunk_num];
455        let last_frame_intensity_ref = &mut self.last_frame_intensity_tracker[chunk_num]
456            [[event.coord.y.into(), event.coord.x.into(), channel.into()]];
457
458        let (filled, grew) = ingest_event_for_chunk(
459            event,
460            frame_chunk,
461            running_ts_ref,
462            frame_idx_offset,
463            last_filled_frame_ref,
464            last_frame_intensity_ref,
465            &self.state,
466            self.buffer_limit,
467        );
468
469        self.chunk_filled_tracker[chunk_num] = filled;
470
471        if grew {
472            // handle_dtm(
473            //     frame_chunk,
474            //     &mut self.chunk_filled_tracker[chunk_num],
475            //     &mut self.last_filled_tracker[chunk_num],
476            //     &mut self.pixel_ts_tracker[chunk_num],
477            //     &mut self.last_frame_intensity_tracker[chunk_num],
478            //     &self.state,
479            // );
480        }
481
482        if self.detect_features {
483            let last_frame_intensity_ref = &mut self.last_frame_intensity_tracker[chunk_num]
484                [[event.coord.y.into(), event.coord.x.into(), channel.into()]];
485            // Revert the y coordinate
486            event.coord.y += (chunk_num * self.chunk_rows) as u16;
487            self.running_intensities
488                [[event.coord.y.into(), event.coord.x.into(), channel.into()]] =
489                <T as Into<f64>>::into(*last_frame_intensity_ref) as u8;
490
491            if let Some(last) = last_event {
492                if time != last.t {
493                    // todo!();
494                    if is_feature(event.coord, self.state.plane, &self.running_intensities).unwrap()
495                    {
496                        debug_assert!(self.state.frames_written >= 0);
497                        let mut idx = if (time / self.state.tpf) as i64 >= self.state.frames_written
498                        {
499                            (time / (self.state.tpf) - self.state.frames_written as u32) as usize
500                        } else {
501                            0
502                        };
503
504                        if time % self.state.tpf == 0 && idx > 0 {
505                            idx -= 1;
506                        }
507                        // dbg!(time);
508                        // dbg!(self.state.frames_written);
509                        // dbg!(idx);
510                        if idx >= self.features.len() {
511                            if self.features.is_empty() {
512                                // Create the first
513                                self.features.push_back(FeatureInterval {
514                                    end_ts: self.state.tpf as BigT,
515                                    features: vec![],
516                                });
517                                self.features.push_back(FeatureInterval {
518                                    end_ts: self.state.tpf as BigT * 2,
519                                    features: vec![],
520                                });
521                            }
522
523                            let new_end_ts = if time % self.state.tpf == 0 {
524                                time
525                            } else {
526                                (time / self.state.tpf + 1) * self.state.tpf
527                            } as BigT;
528
529                            let mut running_end_ts =
530                                self.features.back().unwrap().end_ts + self.state.tpf as BigT;
531                            // dbg!(new_end_ts);
532                            // dbg!(running_end_ts);
533                            while running_end_ts <= new_end_ts {
534                                self.features.push_back(FeatureInterval {
535                                    end_ts: running_end_ts,
536                                    features: vec![],
537                                });
538                                running_end_ts += self.state.tpf as BigT;
539                            }
540                        }
541
542                        // dbg!(self.features.len());
543                        // dbg!(self.features[idx].end_ts);
544                        if self.features[idx].end_ts < time as BigT {
545                            // Allow the player to enable feature detection on the fly
546                            self.features[idx].end_ts = time as BigT;
547                        }
548                        // assert!(self.features[idx].end_ts >= time as BigT);
549                        self.features[idx].features.push(event.coord);
550                    }
551                }
552            }
553        }
554
555        for chunk in &self.chunk_filled_tracker {
556            if !chunk {
557                return false;
558            }
559        }
560        debug_assert!(self.is_frame_0_filled());
561        true
562    }
563
564    fn ingest_events_events(&mut self, mut events: Vec<Vec<Event>>) -> bool {
565        // Make sure that the chunk division is aligned between the source and the framer
566        assert_eq!(events.len(), self.frames.len());
567
568        (
569            &mut events,
570            &mut self.frames,
571            &mut self.chunk_filled_tracker,
572            &mut self.pixel_ts_tracker,
573            &mut self.frame_idx_offsets,
574            &mut self.last_filled_tracker,
575            &mut self.last_frame_intensity_tracker,
576        )
577            .into_par_iter()
578            .for_each(
579                |(
580                    a,
581                    frame_chunk,
582                    chunk_filled,
583                    chunk_ts_tracker,
584                    frame_idx_offset,
585                    chunk_last_filled_tracker,
586                    last_frame_intensity_tracker,
587                )| {
588                    for event in a {
589                        let channel = event.coord.c.unwrap_or(0);
590                        let chunk_num = event.coord.y as usize / self.chunk_rows;
591                        event.coord.y -= (chunk_num * self.chunk_rows) as u16; // Modify the coordinate here, so it gets ingested at the right place
592                        let last_filled_frame_ref = &mut chunk_last_filled_tracker
593                            [[event.coord.y.into(), event.coord.x.into(), channel.into()]];
594                        let running_ts_ref = &mut chunk_ts_tracker
595                            [[event.coord.y.into(), event.coord.x.into(), channel.into()]];
596                        let last_frame_intensity_ref = &mut last_frame_intensity_tracker
597                            [[event.coord.y.into(), event.coord.x.into(), channel.into()]];
598
599                        let (filled, _grew) = ingest_event_for_chunk(
600                            event,
601                            frame_chunk,
602                            running_ts_ref,
603                            frame_idx_offset,
604                            last_filled_frame_ref,
605                            last_frame_intensity_ref,
606                            &self.state,
607                            self.buffer_limit,
608                        );
609                        *chunk_filled = filled;
610
611                        // if grew {
612                        //     handle_dtm(
613                        //         frame_chunk,
614                        //         chunk_filled,
615                        //         chunk_last_filled_tracker,
616                        //         chunk_ts_tracker,
617                        //         last_frame_intensity_tracker,
618                        //         &self.state,
619                        //     );
620                        // }
621                    }
622                },
623            );
624
625        self.is_frame_0_filled()
626    }
627
628    /// For all frames left that we haven't written out yet, for any None pixels, set them to the
629    /// last recorded intensity for that pixel.
630    ///
631    /// Returns `true` if there are frames now ready to write out
632    fn flush_frame_buffer(&mut self) -> bool {
633        let mut any_nonempty = false;
634        // Check if ANY of the frame arrays are nonempty
635        for chunk in &self.frames {
636            if chunk.len() > 1 {
637                any_nonempty = true;
638            }
639        }
640        if any_nonempty {
641            for (chunk_num, chunk) in self.frames.iter_mut().enumerate() {
642                let frame_chunk = &mut chunk[0];
643                // for frame_chunk in chunk.iter_mut() {
644                for ((y, x, c), px) in frame_chunk.array.indexed_iter_mut() {
645                    if px.is_none() {
646                        // If the pixel is empty, set its intensity to the previous intensity we recorded for it
647                        *px = Some(self.last_frame_intensity_tracker[chunk_num][[y, x, c]]);
648
649                        // Update the fill tracker
650                        frame_chunk.filled_count += 1;
651
652                        // Update the last filled tracker
653                        self.last_filled_tracker[chunk_num][[y, x, c]] += 1;
654
655                        // Update the timestamp tracker
656                        // chunk_ts_tracker[[y, x, c]] += state.ref_interval as BigT;
657                    }
658                }
659
660                // Mark the chunk as filled (ready to write out)
661                self.chunk_filled_tracker[chunk_num] = true;
662            }
663        } else {
664            eprintln!("marking not filled...");
665            self.chunk_filled_tracker[0] = false;
666        }
667
668        self.is_frame_0_filled()
669
670        // for chunk in &self.chunk_filled_tracker {
671        //     if !chunk {
672        //         all_filled = false;
673        //     }
674        // }
675        //
676        // all_filled
677    }
678}
679
680fn handle_dtm<
681    T: Clone
682        + Default
683        + FrameValue<Output = T>
684        + Copy
685        + Serialize
686        + Send
687        + Sync
688        + num_traits::identities::Zero
689        + Into<f64>,
690>(
691    frame_chunk: &mut VecDeque<Frame<Option<T>>>,
692    chunk_filled: &mut bool,
693    chunk_last_filled_tracker: &mut Array3<i64>,
694    _chunk_ts_tracker: &mut Array3<BigT>,
695    last_frame_intensity_tracker: &Array3<T>,
696    state: &FrameSequenceState,
697) {
698    if frame_chunk.len() > ((state.source_dtm / state.ref_interval) + 1) as usize {
699        /* Check the last timestamp for the other pixels in this chunk. If they were so long
700        ago that dtm time has passed, then we can repeat the last frame's intensity for
701        those pixels.
702        */
703        // Iterate the other pixels in the chunk
704        let frame_chunk = &mut frame_chunk[0];
705
706        for ((y, x, c), px) in frame_chunk.array.indexed_iter_mut() {
707            if px.is_none() {
708                // If the pixel is empty, set its intensity to the previous intensity we recorded for it
709                *px = Some(last_frame_intensity_tracker[[y, x, c]]);
710
711                // Update the fill tracker
712                frame_chunk.filled_count += 1;
713
714                // Update the last filled tracker
715                chunk_last_filled_tracker[[y, x, c]] += 1;
716
717                // Update the timestamp tracker
718                // chunk_ts_tracker[[y, x, c]] += state.ref_interval as BigT;
719            }
720        }
721
722        // Mark the chunk as filled (ready to write out)
723        *chunk_filled = true;
724    }
725}
726
727impl<T: Clone + Default + FrameValue<Output = T> + Serialize> FrameSequence<T> {
728    /// Get the number of frames queue'd up to be written
729    #[must_use]
730    pub fn get_frames_len(&self) -> usize {
731        self.frames.len()
732    }
733
734    /// Get the number of chunks in a frame
735    #[must_use]
736    pub fn get_frame_chunks_num(&self) -> usize {
737        self.pixel_ts_tracker.len()
738    }
739
740    /// Get the reference for the pixel at the given coordinates
741    /// # Arguments
742    /// * `y` - The y coordinate of the pixel
743    /// * `x` - The x coordinate of the pixel
744    /// * `c` - The channel of the pixel
745    /// # Returns
746    /// * `Option<&T>` - The reference to the pixel value
747    /// # Errors
748    /// * If the frame has not been initialized
749    pub fn px_at_current(
750        &self,
751        y: usize,
752        x: usize,
753        c: usize,
754    ) -> Result<&Option<T>, FrameSequenceError> {
755        if self.frames.is_empty() {
756            return Err(FrameSequenceError::UninitializedFrame);
757        }
758        let chunk_num = y / self.chunk_rows;
759        let local_row = y - (chunk_num * self.chunk_rows);
760        Ok(&self.frames[chunk_num][0].array[[local_row, x, c]])
761    }
762
763    /// Get the reference for the pixel at the given coordinates and frame index
764    /// # Arguments
765    /// * `y` - The y coordinate of the pixel
766    /// * `x` - The x coordinate of the pixel
767    /// * `c` - The channel of the pixel
768    /// * `frame_idx` - The index of the frame to get the pixel from
769    /// # Returns
770    /// * `Option<&T>` - The reference to the pixel value
771    /// # Errors
772    /// * If the frame at the given index has not been initialized
773    pub fn px_at_frame(
774        &self,
775        y: usize,
776        x: usize,
777        c: usize,
778        frame_idx: usize,
779    ) -> Result<&Option<T>, FrameSequenceError> {
780        let chunk_num = y / self.chunk_rows;
781        let local_row = y - (chunk_num * self.chunk_rows);
782        match self.frames.len() {
783            a if frame_idx < a => Ok(&self.frames[chunk_num][frame_idx].array[[local_row, x, c]]),
784            _ => Err(FrameSequenceError::InvalidIndex),
785        }
786    }
787
788    #[allow(clippy::unused_self)]
789    fn _get_frame(&self, _frame_idx: usize) -> Result<&Array3<Option<T>>, FrameSequenceError> {
790        todo!()
791        // match self.frames.len() <= frame_idx {
792        //     true => Err(FrameSequenceError::InvalidIndex),
793        //     false => Ok(&self.frames[frame_idx].array),
794        // }
795    }
796
797    /// Get whether or not the frame at the given index is "filled" (i.e., all pixels have been
798    /// written to)
799    /// # Arguments
800    /// * `frame_idx` - The index of the frame to check
801    /// # Returns
802    /// * `bool` - Whether or not the frame is filled
803    /// # Errors
804    /// * If the frame at the given index has not been initialized
805    /// * If the frame index is out of bounds
806    /// * If the frame is not aligned with the chunk division
807    pub fn is_frame_filled(&self, frame_idx: usize) -> Result<bool, FrameSequenceError> {
808        for chunk in &self.frames {
809            if chunk.len() <= frame_idx {
810                return Err(FrameSequenceError::InvalidIndex);
811            }
812
813            match chunk[frame_idx].filled_count {
814                a if a == chunk[0].array.len() => {}
815                a if a > chunk[0].array.len() => {
816                    return Err(FrameSequenceError::BadFillCount);
817                }
818                _ => {
819                    return Ok(false);
820                }
821            }
822        }
823        Ok(true)
824    }
825
826    /// Get whether or not the next frame is "filled" (i.e., all pixels have been written to)
827    #[must_use]
828    pub fn is_frame_0_filled(&self) -> bool {
829        if let Some(buffer_limit) = self.buffer_limit {
830            for chunk in self.frames.iter() {
831                if chunk.len() > buffer_limit as usize {
832                    return true;
833                }
834            }
835        }
836
837        for chunk in self.chunk_filled_tracker.iter() {
838            if !chunk {
839                return false;
840            }
841        }
842        true
843    }
844
845    /// Get the instantaneous intensity for each pixel
846    pub fn get_running_intensities(&self) -> &Array3<u8> {
847        &self.running_intensities
848    }
849
850    /// Get the features detected for the next frame, and pop that off the feature vec
851    pub fn pop_features(&mut self) -> Option<FeatureInterval> {
852        if self.features.is_empty() {
853            // Create the first
854            self.features.push_back(FeatureInterval {
855                end_ts: self.state.tpf as BigT,
856                features: vec![],
857            });
858            // Create the first
859            self.features.push_back(FeatureInterval {
860                end_ts: self.state.tpf as BigT * 2,
861                features: vec![],
862            });
863        } else {
864            self.features.push_back(FeatureInterval {
865                end_ts: self.state.tpf as BigT + self.features.back().unwrap().end_ts,
866                features: vec![],
867            });
868        }
869
870        // dbg!("Popping features");
871        // dbg!(self.features.front().unwrap().end_ts);
872        self.features.pop_front()
873    }
874
875    /// Pop the next frame for all chunks
876    ///
877    /// returns: the frame
878    pub fn pop_next_frame(&mut self) -> Option<Vec<Array3<Option<T>>>> {
879        let mut ret: Vec<Array3<Option<T>>> = Vec::with_capacity(self.frames.len());
880
881        for chunk_num in 0..self.frames.len() {
882            match self.pop_next_frame_for_chunk(chunk_num) {
883                Some(frame) => {
884                    ret.push(frame);
885                }
886                None => {
887                    println!("Couldn't pop chunk {chunk_num}!");
888                }
889            }
890        }
891        self.state.frames_written += 1;
892        // dbg!(self.state.frames_written);
893        Some(ret)
894    }
895
896    /// Pop the next frame from the given chunk
897    ///
898    /// # Arguments
899    ///
900    /// * `chunk_num`: the y-index of the chunk to pop the frame from
901    ///
902    /// returns: the chunk of frame values
903    pub fn pop_next_frame_for_chunk(&mut self, chunk_num: usize) -> Option<Array3<Option<T>>> {
904        self.frames[chunk_num].rotate_left(1);
905        match self.frames[chunk_num].pop_back() {
906            Some(a) => {
907                // If this is the only frame left, then add a new one to prevent invalid accesses later
908                if self.frames[chunk_num].is_empty() {
909                    let array: Array3<Option<T>> = Array3::<Option<T>>::default(a.array.raw_dim());
910                    self.frames[chunk_num].append(&mut VecDeque::from(vec![
911                        Frame {
912                            array,
913                            filled_count: 0
914                        };
915                        1
916                    ]));
917                    self.frame_idx_offsets[chunk_num] += 1;
918                }
919                self.chunk_filled_tracker[chunk_num] =
920                    self.frames[chunk_num][0].filled_count == self.frames[chunk_num][0].array.len();
921                Some(a.array)
922            }
923            None => None,
924        }
925    }
926
927    /// Write out the next frame to the given writer
928    /// # Arguments
929    /// * `writer` - The writer to write the frame to
930    /// # Returns
931    /// * `Result<(), FrameSequenceError>` - Whether or not the write was successful
932    /// # Errors
933    /// * If the frame chunk has not been initialized
934    /// * If the data cannot be written
935    pub fn write_frame_bytes(
936        &mut self,
937        writer: &mut BufWriter<File>,
938    ) -> Result<(), Box<dyn Error>> {
939        let none_val = T::default();
940        for chunk_num in 0..self.frames.len() {
941            match self.pop_next_frame_for_chunk(chunk_num) {
942                Some(arr) => {
943                    for px in arr.iter() {
944                        self.bincode.serialize_into(
945                            &mut *writer,
946                            match px {
947                                Some(event) => event,
948                                None => &none_val,
949                            },
950                        )?;
951                    }
952                }
953                None => {
954                    return Err(FrameSequenceError::UninitializedFrameChunk.into());
955                }
956            }
957        }
958        self.state.frames_written += 1;
959        dbg!(self.state.frames_written);
960        Ok(())
961    }
962
963    /// Write out next frames to the given writer so long as the frame is filled
964    /// # Arguments
965    /// * `writer` - The writer to write the frames to
966    /// # Returns
967    /// * `Result<(), FrameSequenceError>` - Whether or not the write was successful
968    /// # Errors
969    /// * If a frame could not be written
970    pub fn write_multi_frame_bytes(
971        &mut self,
972        writer: &mut BufWriter<File>,
973    ) -> Result<i32, Box<dyn Error>> {
974        let mut frame_count = 0;
975        while self.is_frame_filled(0)? {
976            self.write_frame_bytes(writer)?;
977            frame_count += 1;
978        }
979        Ok(frame_count)
980    }
981}
982
983// TODO: refactor this garbage
984fn ingest_event_for_chunk<
985    T: Clone + Default + FrameValue<Output = T> + Copy + Serialize + Send + Sync + Into<f64>,
986>(
987    event: &mut Event,
988    frame_chunk: &mut VecDeque<Frame<Option<T>>>,
989    running_ts_ref: &mut BigT,
990    frame_idx_offset: &mut i64,
991    last_filled_frame_ref: &mut i64,
992    last_frame_intensity_ref: &mut T,
993    state: &FrameSequenceState,
994    buffer_limit: Option<u32>,
995) -> (bool, bool) {
996    let channel = event.coord.c.unwrap_or(0);
997    let mut grew = false;
998
999    let prev_last_filled_frame = *last_filled_frame_ref;
1000    let prev_running_ts = *running_ts_ref;
1001
1002    if state.codec_version >= 2 && state.time_mode == TimeMode::AbsoluteT {
1003        if prev_running_ts >= event.t as BigT {
1004            return (
1005                frame_chunk[0].filled_count == frame_chunk[0].array.len(),
1006                false,
1007            );
1008        }
1009        *running_ts_ref = event.t as BigT;
1010    } else {
1011        *running_ts_ref += u64::from(event.t);
1012    }
1013
1014    if ((running_ts_ref.saturating_sub(1)) as i64 / i64::from(state.tpf)) > *last_filled_frame_ref {
1015        // Set the frame's value from the event
1016
1017        if event.d != D_EMPTY {
1018            // If d == 0xFF, then the event was empty, and we simply repeat the last non-empty
1019            // event's intensity. Else we reset the intensity here.
1020            let practical_d_max =
1021                fast_math::log2_raw(T::max_f32() * (state.source_dtm / state.ref_interval) as f32);
1022            if state.codec_version >= 2
1023                && state.time_mode == TimeMode::AbsoluteT
1024                && state.view_mode != FramedViewMode::SAE
1025            {
1026                // event.delta_t -= ((*last_filled_frame_ref + 1) * state.ref_interval as i64) as u32;
1027                event.t = event.t.saturating_sub(prev_running_ts as u32);
1028            }
1029
1030            // TODO: Handle SAE view mode
1031            *last_frame_intensity_ref = T::get_frame_value(
1032                event,
1033                state.source,
1034                state.ref_interval as f64,
1035                practical_d_max,
1036                state.source_dtm,
1037                state.view_mode,
1038                Some(SaeTime {
1039                    running_t: *running_ts_ref as DeltaT,
1040                    last_fired_t: prev_running_ts as DeltaT,
1041                }), // TODO
1042            );
1043        }
1044
1045        *last_filled_frame_ref = (running_ts_ref.saturating_sub(1)) as i64 / i64::from(state.tpf);
1046
1047        // Grow the frames vec if necessary
1048        match *last_filled_frame_ref - *frame_idx_offset {
1049            a if a > 0 => {
1050                let array: Array3<Option<T>> =
1051                    Array3::<Option<T>>::default(frame_chunk[0].array.raw_dim());
1052                frame_chunk.append(&mut VecDeque::from(vec![
1053                    Frame {
1054                        array,
1055                        filled_count: 0
1056                    };
1057                    a as usize
1058                ]));
1059                *frame_idx_offset += a;
1060                grew = true;
1061            }
1062            a if a < 0 => {
1063                // We can get here if we've forcibly popped a frame before it's ready.
1064                // Increment pixel ts trackers as normal, but don't actually do anything
1065                // with the intensities if they correspond to frames that we've already
1066                // popped.
1067                //
1068                // ALSO can arrive here if the source events are not perfectly
1069                // temporally interleaved. This may be the case for transcoder
1070                // performance reasons. The only invariant we hold is that a sequence
1071                // of events for a given (individual) pixel is in the correct order.
1072                // There is no invariant for the relative order or interleaving
1073                // of different pixel event sequences.
1074            }
1075            _ => {}
1076        }
1077
1078        let mut px: &mut Option<T>;
1079        for i in prev_last_filled_frame..*last_filled_frame_ref {
1080            if i - state.frames_written + 1 >= 0 {
1081                px = &mut frame_chunk[(i - state.frames_written + 1) as usize].array
1082                    [[event.coord.y.into(), event.coord.x.into(), channel.into()]];
1083                match px {
1084                    Some(_val) => {}
1085                    None => {
1086                        *px = Some(*last_frame_intensity_ref);
1087                        frame_chunk[(i - state.frames_written + 1) as usize].filled_count += 1;
1088                    }
1089                }
1090            }
1091        }
1092    }
1093
1094    // If framed video source, we can take advantage of scheme that reduces event rate by half
1095    if state.codec_version >= 1
1096        // && state.time_mode == TimeMode::DeltaT
1097        && match state.source_camera {
1098            SourceCamera::FramedU8
1099            | SourceCamera::FramedU16
1100            | SourceCamera::FramedU32
1101            | SourceCamera::FramedU64
1102            | SourceCamera::FramedF32
1103            | SourceCamera::FramedF64 => true,
1104            SourceCamera::Dvs
1105            | SourceCamera::DavisU8
1106            | SourceCamera::Atis
1107            | SourceCamera::Asint => false,
1108            // TODO: switch statement on the transcode MODE (frame-perfect or continuous), not just the source
1109        }
1110        && *running_ts_ref % u64::from(state.ref_interval) > 0
1111    {
1112        *running_ts_ref =
1113            ((*running_ts_ref / u64::from(state.ref_interval)) + 1) * u64::from(state.ref_interval);
1114    }
1115
1116    if let Some(buffer_limit) = buffer_limit {
1117        // dbg!("buffer filled");
1118        if *last_filled_frame_ref > state.frames_written + buffer_limit as i64 {
1119            // dbg!("buffer filled 2");
1120            frame_chunk[0].filled_count = frame_chunk[0].array.len();
1121        }
1122    }
1123
1124    debug_assert!(*last_filled_frame_ref >= 0);
1125    if frame_chunk[0].filled_count > frame_chunk[0].array.len() {
1126        frame_chunk[0].filled_count = frame_chunk[0].array.len();
1127    }
1128
1129    (
1130        frame_chunk[0].filled_count == frame_chunk[0].array.len(),
1131        grew,
1132    )
1133}