1use crate::codec::{
2 CodecError, CodecMetadata, EncoderOptions, EventDrop, EventOrder, WriteCompression,
3 WriteCompressionEnum,
4};
5use crate::SourceType::*;
6use crate::{Event, EventSingle, SourceCamera, SourceType, EOF_EVENT};
7use std::collections::BinaryHeap;
8
9use std::io;
10use std::io::{Sink, Write};
11use std::time::Instant;
12
13#[cfg(feature = "compression")]
16use crate::codec::compressed::stream::CompressedOutput;
17
18use crate::codec::empty::stream::EmptyOutput;
19use crate::codec::header::{
20 EventStreamHeader, EventStreamHeaderExtensionV0, EventStreamHeaderExtensionV1,
21 EventStreamHeaderExtensionV2, EventStreamHeaderExtensionV3,
22};
23
24use crate::codec::raw::stream::RawOutput;
25use bincode::config::{FixintEncoding, WithOtherEndian, WithOtherIntEncoding};
26use bincode::{DefaultOptions, Options};
27
28pub struct Encoder<W: Write + std::marker::Send + std::marker::Sync + 'static> {
30 output: WriteCompressionEnum<W>,
31 bincode: WithOtherEndian<
32 WithOtherIntEncoding<DefaultOptions, FixintEncoding>,
33 bincode::config::BigEndian,
34 >,
35 pub options: EncoderOptions,
36 state: EncoderState,
37}
38
39struct EncoderState {
40 current_event_rate: f64,
41 last_event_ts: Instant,
42 queue: BinaryHeap<Event>,
43}
44
45impl Default for EncoderState {
46 fn default() -> Self {
47 EncoderState {
48 current_event_rate: 0.0,
49 last_event_ts: Instant::now(),
50 queue: BinaryHeap::new(),
51 }
52 }
53}
54
55#[allow(dead_code)]
56impl<W: Write + 'static + std::marker::Send + std::marker::Sync> Encoder<W> {
57 pub fn new_empty(compression: EmptyOutput<Sink>, options: EncoderOptions) -> Self
59 where
60 Self: Sized,
61 {
62 let mut encoder = Self {
63 output: WriteCompressionEnum::EmptyOutput(compression),
64 bincode: DefaultOptions::new()
65 .with_fixint_encoding()
66 .with_big_endian(),
67 options,
68 state: EncoderState::default(),
69 };
70 encoder.encode_header().unwrap();
71 encoder
72 }
73
74 #[cfg(feature = "compression")]
76 pub fn new_compressed(mut compression: CompressedOutput<W>, options: EncoderOptions) -> Self
77 where
78 Self: Sized,
79 {
80 compression.with_options(options);
81 let mut encoder = Self {
82 output: WriteCompressionEnum::CompressedOutput(compression),
83 bincode: DefaultOptions::new()
84 .with_fixint_encoding()
85 .with_big_endian(),
86 options,
87 state: Default::default(),
88 };
89 encoder.encode_header().unwrap();
90 encoder
91 }
92
93 pub fn new_raw(compression: RawOutput<W>, options: EncoderOptions) -> Self
95 where
96 Self: Sized,
97 {
98 let mut encoder = Self {
99 output: WriteCompressionEnum::RawOutput(compression),
100 bincode: DefaultOptions::new()
101 .with_fixint_encoding()
102 .with_big_endian(),
103 options,
104 state: Default::default(),
105 };
106 encoder.encode_header().unwrap();
107 encoder
108 }
109
110 #[inline]
112 pub fn meta(&self) -> &CodecMetadata {
113 self.output.meta()
114 }
115
116 #[allow(clippy::match_same_arms)]
117 fn get_source_type(&self) -> SourceType {
118 match self.output.meta().source_camera {
119 SourceCamera::FramedU8 => U8,
120 SourceCamera::FramedU16 => U16,
121 SourceCamera::FramedU32 => U32,
122 SourceCamera::FramedU64 => U64,
123 SourceCamera::FramedF32 => F32,
124 SourceCamera::FramedF64 => F64,
125 SourceCamera::Dvs => U8,
126 SourceCamera::DavisU8 => U8,
127 SourceCamera::Atis => U8,
128 SourceCamera::Asint => F64,
129 }
130 }
131
132 fn write_eof(&mut self) -> Result<(), CodecError> {
134 self.output.byte_align()?;
135 let output_event: EventSingle;
136 let mut buffer = Vec::new();
137 if self.output.meta().plane.channels == 1 {
138 output_event = (&EOF_EVENT).into();
139 self.bincode.serialize_into(&mut buffer, &output_event)?;
140 } else {
141 self.bincode.serialize_into(&mut buffer, &EOF_EVENT)?;
142 }
143 Ok(self.output.write_bytes(&buffer)?)
144 }
145
146 pub fn flush_writer(&mut self) -> io::Result<()> {
148 self.output.flush_writer()
149 }
150
151 pub fn close_writer(mut self) -> Result<Option<W>, CodecError> {
153 Ok(self.output.into_writer())
157 }
168
169 fn encode_header(&mut self) -> Result<(), CodecError> {
171 let mut buffer: Vec<u8> = Vec::new();
172 let meta = self.output.meta();
173 let header = EventStreamHeader::new(
174 self.output.magic(),
175 meta.plane,
176 meta.tps,
177 meta.ref_interval,
178 meta.delta_t_max,
179 meta.codec_version,
180 );
181 self.bincode.serialize_into(&mut buffer, &header)?;
182
183 buffer = self.encode_header_extension(buffer)?;
185
186 self.output.write_bytes(&buffer)?;
187 self.output.meta_mut().header_size = buffer.len();
188 Ok(())
189 }
190
191 fn encode_header_extension(&self, mut buffer: Vec<u8>) -> Result<Vec<u8>, CodecError> {
192 let meta = self.output.meta();
193 self.bincode
194 .serialize_into(&mut buffer, &EventStreamHeaderExtensionV0 {})?;
195 if meta.codec_version == 0 {
196 return Ok(buffer);
197 }
198
199 self.bincode.serialize_into(
200 &mut buffer,
201 &EventStreamHeaderExtensionV1 {
202 source: meta.source_camera,
203 },
204 )?;
205 if meta.codec_version == 1 {
206 return Ok(buffer);
207 }
208
209 self.bincode.serialize_into(
210 &mut buffer,
211 &EventStreamHeaderExtensionV2 {
212 time_mode: meta.time_mode,
213 },
214 )?;
215 if meta.codec_version == 2 {
216 return Ok(buffer);
217 }
218
219 self.bincode.serialize_into(
220 &mut buffer,
221 &EventStreamHeaderExtensionV3 {
222 adu_interval: meta.adu_interval as u32,
223 },
224 )?;
225 if meta.codec_version == 3 {
226 return Ok(buffer);
227 }
228 Err(CodecError::BadFile)
229 }
230
231 #[inline(always)]
233 pub fn ingest_event(&mut self, event: Event) -> Result<(), CodecError> {
234 match self.options.event_drop {
235 EventDrop::None => {}
236 EventDrop::Manual {
237 target_event_rate,
238 alpha,
239 } => {
240 let now = Instant::now();
241 let t_diff = now.duration_since(self.state.last_event_ts).as_secs_f64();
242 let new_event_rate = alpha * self.state.current_event_rate + (1.0 - alpha) / t_diff;
243 if new_event_rate > target_event_rate {
244 self.state.current_event_rate *= alpha;
245 return Ok(()); }
247 self.state.last_event_ts = now; self.state.current_event_rate = new_event_rate;
249 }
250 EventDrop::Auto => {
251 todo!()
252 }
253 }
254
255 match self.options.event_order {
256 EventOrder::Unchanged => self.output.ingest_event(event),
257 EventOrder::Interleaved => {
258 let dt = event.t;
259 self.state.queue.push(event);
261
262 let mut res = Ok(());
263 if let Some(first_item_addr) = self.state.queue.peek() {
264 if first_item_addr.t < dt.saturating_sub(self.meta().delta_t_max) {
265 if let Some(first_item) = self.state.queue.pop() {
266 res = self.output.ingest_event(first_item);
267 }
268 }
269 }
270 res
271 }
272 }
273 }
274 pub fn ingest_events(&mut self, events: &[Event]) -> Result<(), CodecError> {
284 for event in events {
285 self.ingest_event(*event)?;
286 }
287 Ok(())
288 }
289
290 pub fn ingest_events_events(&mut self, events: &[Vec<Event>]) -> Result<(), CodecError> {
292 for v in events {
293 self.ingest_events(v)?;
294 }
295 Ok(())
296 }
297
298 pub fn get_options(&self) -> EncoderOptions {
299 self.options
300 }
301
302 pub fn sync_crf(&mut self) {
305 match &mut self.output {
306 #[cfg(feature = "compression")]
307 WriteCompressionEnum::CompressedOutput(compressed_output) => {
308 compressed_output.options = self.options;
309 }
310 WriteCompressionEnum::RawOutput(_) => {}
311 WriteCompressionEnum::EmptyOutput(_) => {}
312 }
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use crate::codec::raw::stream::RawOutput;
320 use crate::codec::{CodecMetadata, LATEST_CODEC_VERSION};
321 use crate::{Coord, PlaneSize};
322 use bitstream_io::{BigEndian, BitWriter};
323 use std::io::BufWriter;
324 use std::sync::{Arc, RwLock};
325
326 #[test]
327 fn raw() {
328 let output = Vec::new();
329 let bufwriter = BufWriter::new(output);
330 let compression = RawOutput {
331 meta: CodecMetadata {
332 codec_version: 0,
333 header_size: 0,
334 time_mode: Default::default(),
335 plane: Default::default(),
336 tps: 0,
337 ref_interval: 0,
338 delta_t_max: 0,
339 event_size: 0,
340 source_camera: Default::default(),
341 adu_interval: 1,
342 },
343 bincode: DefaultOptions::new()
344 .with_fixint_encoding()
345 .with_big_endian(),
346 stream: Some(bufwriter),
347 };
348 let encoder = Encoder {
349 output: WriteCompressionEnum::RawOutput(compression),
350 bincode: DefaultOptions::new()
351 .with_fixint_encoding()
352 .with_big_endian(),
353 options: EncoderOptions::default(PlaneSize {
354 width: 100,
355 height: 100,
356 channels: 1,
357 }),
358 state: EncoderState::default(),
359 };
360 let mut writer = encoder.close_writer().unwrap().unwrap();
361 writer.flush().unwrap();
362 let _output = writer.into_inner().unwrap();
363 }
364
365 #[test]
366 fn raw2() {
367 let output = Vec::new();
368 let bufwriter = BufWriter::new(output);
369 let compression = RawOutput::new(
370 CodecMetadata {
371 codec_version: 1,
372 header_size: 0,
373 time_mode: Default::default(),
374 plane: Default::default(),
375 tps: 0,
376 ref_interval: 0,
377 delta_t_max: 0,
378 event_size: 0,
379 source_camera: Default::default(),
380 adu_interval: 1,
381 },
382 bufwriter,
383 );
384 let encoder = Encoder {
385 output: WriteCompressionEnum::RawOutput(compression),
386 bincode: DefaultOptions::new()
387 .with_fixint_encoding()
388 .with_big_endian(),
389 options: EncoderOptions::default(PlaneSize {
390 width: 100,
391 height: 100,
392 channels: 1,
393 }),
394 state: EncoderState::default(),
395 };
396 let mut writer = encoder.close_writer().unwrap().unwrap();
397 writer.flush().unwrap();
398 let _output = writer.into_inner().unwrap();
399 }
400
401 #[test]
402 fn raw3() {
403 let output = Vec::new();
404 let bufwriter = BufWriter::new(output);
405 let compression = RawOutput::new(
406 CodecMetadata {
407 codec_version: LATEST_CODEC_VERSION,
408 header_size: 0,
409 time_mode: Default::default(),
410 plane: PlaneSize {
411 width: 1,
412 height: 1,
413 channels: 3,
414 },
415 tps: 0,
416 ref_interval: 255,
417 delta_t_max: 255,
418 event_size: 0,
419 source_camera: Default::default(),
420 adu_interval: 1,
421 },
422 bufwriter,
423 );
424 let mut encoder: Encoder<BufWriter<Vec<u8>>> = Encoder::new_raw(
425 compression,
426 EncoderOptions::default(PlaneSize {
427 width: 1,
428 height: 1,
429 channels: 3,
430 }),
431 );
432
433 let event = Event {
434 coord: Coord {
435 x: 0,
436 y: 0,
437 c: Some(0),
438 },
439 d: 0,
440 t: 0,
441 };
442
443 encoder.ingest_event(event).unwrap();
444 let mut writer = encoder.close_writer().unwrap().unwrap();
445 writer.flush().unwrap();
446 let output = writer.into_inner().unwrap();
447 assert_eq!(output.len(), 37 + 22); }
449
450 #[test]
451 #[cfg(feature = "compression")]
452 fn compressed() {
453 let output = Vec::new();
454 let bufwriter = BufWriter::new(output);
455 let (written_bytes_tx, written_bytes_rx) = std::sync::mpsc::channel();
456
457 let compression = CompressedOutput {
458 meta: CodecMetadata {
459 codec_version: 0,
460 header_size: 0,
461 time_mode: Default::default(),
462 plane: Default::default(),
463 tps: 0,
464 ref_interval: 0,
465 delta_t_max: 0,
466 event_size: 0,
467 source_camera: Default::default(),
468 adu_interval: 1,
469 },
470 adu: Default::default(),
474 stream: Some(Arc::new(RwLock::new(BitWriter::endian(
475 bufwriter, BigEndian,
476 )))),
477 options: EncoderOptions::default(PlaneSize::default()),
478 written_bytes_tx: Some(written_bytes_tx),
479 last_message_sent: 0,
480 last_message_written: Arc::new(RwLock::new(0)),
481 _phantom: Default::default(),
482 };
483 let _encoder = Encoder {
484 output: WriteCompressionEnum::CompressedOutput(compression),
485 bincode: DefaultOptions::new()
486 .with_fixint_encoding()
487 .with_big_endian(),
488 options: EncoderOptions::default(PlaneSize::default()),
489 state: Default::default(),
490 };
491 }
492
493 #[test]
494 #[cfg(feature = "compression")]
495 fn compressed2() {
496 let output = Vec::new();
497 let bufwriter = BufWriter::new(output);
498 let compression = CompressedOutput::new(
499 CodecMetadata {
500 codec_version: 0,
501 header_size: 0,
502 time_mode: Default::default(),
503 plane: Default::default(),
504 tps: 0,
505 ref_interval: 255,
506 delta_t_max: 255,
507 event_size: 0,
508 source_camera: Default::default(),
509 adu_interval: Default::default(),
510 },
511 bufwriter,
512 );
513 let _encoder = Encoder {
514 output: WriteCompressionEnum::CompressedOutput(compression),
515 bincode: DefaultOptions::new()
516 .with_fixint_encoding()
517 .with_big_endian(),
518 options: EncoderOptions::default(PlaneSize::default()),
519 state: Default::default(),
520 };
521 }
522
523 #[test]
524 #[cfg(feature = "compression")]
525 fn compressed3() {
526 let output = Vec::new();
527 let bufwriter = BufWriter::new(output);
528 let compression = CompressedOutput::new(
529 CodecMetadata {
530 codec_version: LATEST_CODEC_VERSION,
531 header_size: 0,
532 time_mode: Default::default(),
533 plane: Default::default(),
534 tps: 0,
535 ref_interval: 255,
536 delta_t_max: 255,
537 event_size: 0,
538 source_camera: Default::default(),
539 adu_interval: Default::default(),
540 },
541 bufwriter,
542 );
543 let _encoder =
544 Encoder::new_compressed(compression, EncoderOptions::default(PlaneSize::default()));
545 }
546}