adder_codec_core/codec/
encoder.rs

1use crate::codec::{
2    CodecError, CodecMetadata, EncoderOptions, EventDrop, EventOrder, WriteCompression,
3    WriteCompressionEnum,
4};
5use crate::SourceType::*;
6use crate::{Event, EventSingle, SourceCamera, SourceType, EOF_EVENT};
7use std::collections::BinaryHeap;
8
9use std::io;
10use std::io::{Sink, Write};
11use std::time::Instant;
12
13// #[cfg(feature = "compression")]
14// use crate::codec::compressed::adu::frame::Adu;
15#[cfg(feature = "compression")]
16use crate::codec::compressed::stream::CompressedOutput;
17
18use crate::codec::empty::stream::EmptyOutput;
19use crate::codec::header::{
20    EventStreamHeader, EventStreamHeaderExtensionV0, EventStreamHeaderExtensionV1,
21    EventStreamHeaderExtensionV2, EventStreamHeaderExtensionV3,
22};
23
24use crate::codec::raw::stream::RawOutput;
25use bincode::config::{FixintEncoding, WithOtherEndian, WithOtherIntEncoding};
26use bincode::{DefaultOptions, Options};
27
28/// Struct for encoding [`Event`]s to a stream
29pub struct Encoder<W: Write + std::marker::Send + std::marker::Sync + 'static> {
30    output: WriteCompressionEnum<W>,
31    bincode: WithOtherEndian<
32        WithOtherIntEncoding<DefaultOptions, FixintEncoding>,
33        bincode::config::BigEndian,
34    >,
35    pub options: EncoderOptions,
36    state: EncoderState,
37}
38
39struct EncoderState {
40    current_event_rate: f64,
41    last_event_ts: Instant,
42    queue: BinaryHeap<Event>,
43}
44
45impl Default for EncoderState {
46    fn default() -> Self {
47        EncoderState {
48            current_event_rate: 0.0,
49            last_event_ts: Instant::now(),
50            queue: BinaryHeap::new(),
51        }
52    }
53}
54
55#[allow(dead_code)]
56impl<W: Write + 'static + std::marker::Send + std::marker::Sync> Encoder<W> {
57    /// Create a new [`Encoder`] with an empty compression scheme
58    pub fn new_empty(compression: EmptyOutput<Sink>, options: EncoderOptions) -> Self
59    where
60        Self: Sized,
61    {
62        let mut encoder = Self {
63            output: WriteCompressionEnum::EmptyOutput(compression),
64            bincode: DefaultOptions::new()
65                .with_fixint_encoding()
66                .with_big_endian(),
67            options,
68            state: EncoderState::default(),
69        };
70        encoder.encode_header().unwrap();
71        encoder
72    }
73
74    /// Create a new [`Encoder`] with the given compression scheme
75    #[cfg(feature = "compression")]
76    pub fn new_compressed(mut compression: CompressedOutput<W>, options: EncoderOptions) -> Self
77    where
78        Self: Sized,
79    {
80        compression.with_options(options);
81        let mut encoder = Self {
82            output: WriteCompressionEnum::CompressedOutput(compression),
83            bincode: DefaultOptions::new()
84                .with_fixint_encoding()
85                .with_big_endian(),
86            options,
87            state: Default::default(),
88        };
89        encoder.encode_header().unwrap();
90        encoder
91    }
92
93    /// Create a new [`Encoder`] with the given raw compression scheme
94    pub fn new_raw(compression: RawOutput<W>, options: EncoderOptions) -> Self
95    where
96        Self: Sized,
97    {
98        let mut encoder = Self {
99            output: WriteCompressionEnum::RawOutput(compression),
100            bincode: DefaultOptions::new()
101                .with_fixint_encoding()
102                .with_big_endian(),
103            options,
104            state: Default::default(),
105        };
106        encoder.encode_header().unwrap();
107        encoder
108    }
109
110    /// Returns a reference to the metadata of the underlying compression scheme
111    #[inline]
112    pub fn meta(&self) -> &CodecMetadata {
113        self.output.meta()
114    }
115
116    #[allow(clippy::match_same_arms)]
117    fn get_source_type(&self) -> SourceType {
118        match self.output.meta().source_camera {
119            SourceCamera::FramedU8 => U8,
120            SourceCamera::FramedU16 => U16,
121            SourceCamera::FramedU32 => U32,
122            SourceCamera::FramedU64 => U64,
123            SourceCamera::FramedF32 => F32,
124            SourceCamera::FramedF64 => F64,
125            SourceCamera::Dvs => U8,
126            SourceCamera::DavisU8 => U8,
127            SourceCamera::Atis => U8,
128            SourceCamera::Asint => F64,
129        }
130    }
131
132    /// Signify the end of the file in a unified way
133    fn write_eof(&mut self) -> Result<(), CodecError> {
134        self.output.byte_align()?;
135        let output_event: EventSingle;
136        let mut buffer = Vec::new();
137        if self.output.meta().plane.channels == 1 {
138            output_event = (&EOF_EVENT).into();
139            self.bincode.serialize_into(&mut buffer, &output_event)?;
140        } else {
141            self.bincode.serialize_into(&mut buffer, &EOF_EVENT)?;
142        }
143        Ok(self.output.write_bytes(&buffer)?)
144    }
145
146    /// Flush the `BitWriter`. Does not flush the internal `BufWriter`.
147    pub fn flush_writer(&mut self) -> io::Result<()> {
148        self.output.flush_writer()
149    }
150
151    /// Close the encoder's writer and return it, consuming the encoder in the process.
152    pub fn close_writer(mut self) -> Result<Option<W>, CodecError> {
153        // self.output.byte_align()?;
154        // self.write_eof()?;
155        // self.flush_writer()?;
156        Ok(self.output.into_writer())
157        // let compressed_output = self.compressed_output.take();
158        // let raw_output = self.raw_output.take();
159        //
160        // if compressed_output.is_some() {
161        //     return Ok(compressed_output.unwrap().into_writer());
162        // } else if raw_output.is_some() {
163        //     return Ok(raw_output.unwrap().into_writer());
164        // } else {
165        //     unreachable!()
166        // }
167    }
168
169    /// Encode the header and its extensions.
170    fn encode_header(&mut self) -> Result<(), CodecError> {
171        let mut buffer: Vec<u8> = Vec::new();
172        let meta = self.output.meta();
173        let header = EventStreamHeader::new(
174            self.output.magic(),
175            meta.plane,
176            meta.tps,
177            meta.ref_interval,
178            meta.delta_t_max,
179            meta.codec_version,
180        );
181        self.bincode.serialize_into(&mut buffer, &header)?;
182
183        // Encode the header extensions (for newer versions of the codec)
184        buffer = self.encode_header_extension(buffer)?;
185
186        self.output.write_bytes(&buffer)?;
187        self.output.meta_mut().header_size = buffer.len();
188        Ok(())
189    }
190
191    fn encode_header_extension(&self, mut buffer: Vec<u8>) -> Result<Vec<u8>, CodecError> {
192        let meta = self.output.meta();
193        self.bincode
194            .serialize_into(&mut buffer, &EventStreamHeaderExtensionV0 {})?;
195        if meta.codec_version == 0 {
196            return Ok(buffer);
197        }
198
199        self.bincode.serialize_into(
200            &mut buffer,
201            &EventStreamHeaderExtensionV1 {
202                source: meta.source_camera,
203            },
204        )?;
205        if meta.codec_version == 1 {
206            return Ok(buffer);
207        }
208
209        self.bincode.serialize_into(
210            &mut buffer,
211            &EventStreamHeaderExtensionV2 {
212                time_mode: meta.time_mode,
213            },
214        )?;
215        if meta.codec_version == 2 {
216            return Ok(buffer);
217        }
218
219        self.bincode.serialize_into(
220            &mut buffer,
221            &EventStreamHeaderExtensionV3 {
222                adu_interval: meta.adu_interval as u32,
223            },
224        )?;
225        if meta.codec_version == 3 {
226            return Ok(buffer);
227        }
228        Err(CodecError::BadFile)
229    }
230
231    /// Ingest an event
232    #[inline(always)]
233    pub fn ingest_event(&mut self, event: Event) -> Result<(), CodecError> {
234        match self.options.event_drop {
235            EventDrop::None => {}
236            EventDrop::Manual {
237                target_event_rate,
238                alpha,
239            } => {
240                let now = Instant::now();
241                let t_diff = now.duration_since(self.state.last_event_ts).as_secs_f64();
242                let new_event_rate = alpha * self.state.current_event_rate + (1.0 - alpha) / t_diff;
243                if new_event_rate > target_event_rate {
244                    self.state.current_event_rate *= alpha;
245                    return Ok(()); // skip this event
246                }
247                self.state.last_event_ts = now; // update time
248                self.state.current_event_rate = new_event_rate;
249            }
250            EventDrop::Auto => {
251                todo!()
252            }
253        }
254
255        match self.options.event_order {
256            EventOrder::Unchanged => self.output.ingest_event(event),
257            EventOrder::Interleaved => {
258                let dt = event.t;
259                // First, push the event to the queue
260                self.state.queue.push(event);
261
262                let mut res = Ok(());
263                if let Some(first_item_addr) = self.state.queue.peek() {
264                    if first_item_addr.t < dt.saturating_sub(self.meta().delta_t_max) {
265                        if let Some(first_item) = self.state.queue.pop() {
266                            res = self.output.ingest_event(first_item);
267                        }
268                    }
269                }
270                res
271            }
272        }
273    }
274    // /// Ingest an event
275    // #[cfg(feature = "compression")]
276    // pub fn ingest_event_debug(&mut self, event: Event) -> Result<Option<Adu>, CodecError> {
277    //     self.output.ingest_event_debug(event)
278    // }
279
280    /// Ingest an array of events
281    ///
282    /// TODO: Make this move events, not by reference
283    pub fn ingest_events(&mut self, events: &[Event]) -> Result<(), CodecError> {
284        for event in events {
285            self.ingest_event(*event)?;
286        }
287        Ok(())
288    }
289
290    /// Ingest a vector of an array of events
291    pub fn ingest_events_events(&mut self, events: &[Vec<Event>]) -> Result<(), CodecError> {
292        for v in events {
293            self.ingest_events(v)?;
294        }
295        Ok(())
296    }
297
298    pub fn get_options(&self) -> EncoderOptions {
299        self.options
300    }
301
302    /// Keeps the compressed output options in sync with the encoder options. This prevents us
303    /// from constantly having to look up a reference-counted variable, which is costly at this scale.
304    pub fn sync_crf(&mut self) {
305        match &mut self.output {
306            #[cfg(feature = "compression")]
307            WriteCompressionEnum::CompressedOutput(compressed_output) => {
308                compressed_output.options = self.options;
309            }
310            WriteCompressionEnum::RawOutput(_) => {}
311            WriteCompressionEnum::EmptyOutput(_) => {}
312        }
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use crate::codec::raw::stream::RawOutput;
320    use crate::codec::{CodecMetadata, LATEST_CODEC_VERSION};
321    use crate::{Coord, PlaneSize};
322    use bitstream_io::{BigEndian, BitWriter};
323    use std::io::BufWriter;
324    use std::sync::{Arc, RwLock};
325
326    #[test]
327    fn raw() {
328        let output = Vec::new();
329        let bufwriter = BufWriter::new(output);
330        let compression = RawOutput {
331            meta: CodecMetadata {
332                codec_version: 0,
333                header_size: 0,
334                time_mode: Default::default(),
335                plane: Default::default(),
336                tps: 0,
337                ref_interval: 0,
338                delta_t_max: 0,
339                event_size: 0,
340                source_camera: Default::default(),
341                adu_interval: 1,
342            },
343            bincode: DefaultOptions::new()
344                .with_fixint_encoding()
345                .with_big_endian(),
346            stream: Some(bufwriter),
347        };
348        let encoder = Encoder {
349            output: WriteCompressionEnum::RawOutput(compression),
350            bincode: DefaultOptions::new()
351                .with_fixint_encoding()
352                .with_big_endian(),
353            options: EncoderOptions::default(PlaneSize {
354                width: 100,
355                height: 100,
356                channels: 1,
357            }),
358            state: EncoderState::default(),
359        };
360        let mut writer = encoder.close_writer().unwrap().unwrap();
361        writer.flush().unwrap();
362        let _output = writer.into_inner().unwrap();
363    }
364
365    #[test]
366    fn raw2() {
367        let output = Vec::new();
368        let bufwriter = BufWriter::new(output);
369        let compression = RawOutput::new(
370            CodecMetadata {
371                codec_version: 1,
372                header_size: 0,
373                time_mode: Default::default(),
374                plane: Default::default(),
375                tps: 0,
376                ref_interval: 0,
377                delta_t_max: 0,
378                event_size: 0,
379                source_camera: Default::default(),
380                adu_interval: 1,
381            },
382            bufwriter,
383        );
384        let encoder = Encoder {
385            output: WriteCompressionEnum::RawOutput(compression),
386            bincode: DefaultOptions::new()
387                .with_fixint_encoding()
388                .with_big_endian(),
389            options: EncoderOptions::default(PlaneSize {
390                width: 100,
391                height: 100,
392                channels: 1,
393            }),
394            state: EncoderState::default(),
395        };
396        let mut writer = encoder.close_writer().unwrap().unwrap();
397        writer.flush().unwrap();
398        let _output = writer.into_inner().unwrap();
399    }
400
401    #[test]
402    fn raw3() {
403        let output = Vec::new();
404        let bufwriter = BufWriter::new(output);
405        let compression = RawOutput::new(
406            CodecMetadata {
407                codec_version: LATEST_CODEC_VERSION,
408                header_size: 0,
409                time_mode: Default::default(),
410                plane: PlaneSize {
411                    width: 1,
412                    height: 1,
413                    channels: 3,
414                },
415                tps: 0,
416                ref_interval: 255,
417                delta_t_max: 255,
418                event_size: 0,
419                source_camera: Default::default(),
420                adu_interval: 1,
421            },
422            bufwriter,
423        );
424        let mut encoder: Encoder<BufWriter<Vec<u8>>> = Encoder::new_raw(
425            compression,
426            EncoderOptions::default(PlaneSize {
427                width: 1,
428                height: 1,
429                channels: 3,
430            }),
431        );
432
433        let event = Event {
434            coord: Coord {
435                x: 0,
436                y: 0,
437                c: Some(0),
438            },
439            d: 0,
440            t: 0,
441        };
442
443        encoder.ingest_event(event).unwrap();
444        let mut writer = encoder.close_writer().unwrap().unwrap();
445        writer.flush().unwrap();
446        let output = writer.into_inner().unwrap();
447        assert_eq!(output.len(), 37 + 22); // 37 bytes for the header, 22 bytes for the 2 events
448    }
449
450    #[test]
451    #[cfg(feature = "compression")]
452    fn compressed() {
453        let output = Vec::new();
454        let bufwriter = BufWriter::new(output);
455        let (written_bytes_tx, written_bytes_rx) = std::sync::mpsc::channel();
456
457        let compression = CompressedOutput {
458            meta: CodecMetadata {
459                codec_version: 0,
460                header_size: 0,
461                time_mode: Default::default(),
462                plane: Default::default(),
463                tps: 0,
464                ref_interval: 0,
465                delta_t_max: 0,
466                event_size: 0,
467                source_camera: Default::default(),
468                adu_interval: 1,
469            },
470            // frame: Default::default(),
471            // adu: Adu::new(),
472            // contexts: None,
473            adu: Default::default(),
474            stream: Some(Arc::new(RwLock::new(BitWriter::endian(
475                bufwriter, BigEndian,
476            )))),
477            options: EncoderOptions::default(PlaneSize::default()),
478            written_bytes_tx: Some(written_bytes_tx),
479            last_message_sent: 0,
480            last_message_written: Arc::new(RwLock::new(0)),
481            _phantom: Default::default(),
482        };
483        let _encoder = Encoder {
484            output: WriteCompressionEnum::CompressedOutput(compression),
485            bincode: DefaultOptions::new()
486                .with_fixint_encoding()
487                .with_big_endian(),
488            options: EncoderOptions::default(PlaneSize::default()),
489            state: Default::default(),
490        };
491    }
492
493    #[test]
494    #[cfg(feature = "compression")]
495    fn compressed2() {
496        let output = Vec::new();
497        let bufwriter = BufWriter::new(output);
498        let compression = CompressedOutput::new(
499            CodecMetadata {
500                codec_version: 0,
501                header_size: 0,
502                time_mode: Default::default(),
503                plane: Default::default(),
504                tps: 0,
505                ref_interval: 255,
506                delta_t_max: 255,
507                event_size: 0,
508                source_camera: Default::default(),
509                adu_interval: Default::default(),
510            },
511            bufwriter,
512        );
513        let _encoder = Encoder {
514            output: WriteCompressionEnum::CompressedOutput(compression),
515            bincode: DefaultOptions::new()
516                .with_fixint_encoding()
517                .with_big_endian(),
518            options: EncoderOptions::default(PlaneSize::default()),
519            state: Default::default(),
520        };
521    }
522
523    #[test]
524    #[cfg(feature = "compression")]
525    fn compressed3() {
526        let output = Vec::new();
527        let bufwriter = BufWriter::new(output);
528        let compression = CompressedOutput::new(
529            CodecMetadata {
530                codec_version: LATEST_CODEC_VERSION,
531                header_size: 0,
532                time_mode: Default::default(),
533                plane: Default::default(),
534                tps: 0,
535                ref_interval: 255,
536                delta_t_max: 255,
537                event_size: 0,
538                source_camera: Default::default(),
539                adu_interval: Default::default(),
540            },
541            bufwriter,
542        );
543        let _encoder =
544            Encoder::new_compressed(compression, EncoderOptions::default(PlaneSize::default()));
545    }
546}