adder_codec_core/codec/compressed/
stream.rs

1use crate::codec::{CodecError, CodecMetadata, EncoderOptions, ReadCompression, WriteCompression};
2use bitstream_io::{BigEndian, BitRead, BitReader, BitWrite, BitWriter};
3use priority_queue::PriorityQueue;
4use std::cmp::Reverse;
5use std::io::{Cursor, Read, Seek, SeekFrom, Write};
6use std::ops::{Add, AddAssign};
7use std::sync::{Arc, RwLock};
8
9use crate::codec::compressed::source_model::event_structure::event_adu::EventAdu;
10use crate::codec::compressed::source_model::HandleEvent;
11use crate::codec::header::{Magic, MAGIC_COMPRESSED};
12use crate::codec::rate_controller::CrfParameters;
13use crate::{DeltaT, Event};
14
15/// A message to send to the writer thread (that is, the main thread) to write out the compressed
16/// ADΔER data to the stream
17pub(crate) struct BytesMessage {
18    message_id: u32,
19    bytes: Vec<u8>,
20}
21
22/// Write compressed ADΔER data to a stream.
23pub struct CompressedOutput<W: Write> {
24    pub(crate) meta: CodecMetadata,
25    pub(crate) adu: EventAdu,
26    pub(crate) stream: Option<Arc<RwLock<BitWriter<W, BigEndian>>>>,
27    pub(crate) options: EncoderOptions,
28    // pub(crate) written_bytes_rx: std::sync::mpsc::Receiver<BytesMessage>,
29    pub(crate) written_bytes_tx: Option<std::sync::mpsc::Sender<BytesMessage>>,
30    // pub(crate) bytes_writer_queue: PriorityQueue<Vec<u8>, Reverse<u32>>,
31    /// The ID of the last message sent from a spawned compressor thread
32    pub(crate) last_message_sent: u32,
33
34    /// The ID of the last message received in the writer thread and actually written out the stream
35    pub(crate) last_message_written: Arc<RwLock<u32>>,
36
37    pub(crate) _phantom: std::marker::PhantomData<W>,
38}
39
40/// Read compressed ADΔER data from a stream.
41pub struct CompressedInput<R: Read> {
42    pub(crate) meta: CodecMetadata,
43
44    adu: Option<EventAdu>,
45
46    _phantom: std::marker::PhantomData<R>,
47}
48
49// fn compressor_worker<W: Write>(
50//     mut stream: Arc<RwLock<BitWriter<W, BigEndian>>>,
51//     rx: std::sync::mpsc::Receiver<(EventAdu, CrfParameters)>,
52// ) -> Option<W> {
53//     while let Ok((mut adu, parameters)) = rx.recv() {
54//         let mut temp_stream = BitWriter::endian(Vec::new(), BigEndian);
55//
56//         // Compress the Adu. This also writes the EOF symbol and flushes the encoder
57//         adu.compress(&mut temp_stream, parameters.c_thresh_max).ok();
58//         let written_data = temp_stream.into_writer();
59//
60//         let mut stream_write = stream.write().unwrap();
61//         // Write the number of bytes in the compressed Adu as the 32-bit header for this Adu
62//         stream_write
63//             .write_bytes(&(written_data.len() as u32).to_be_bytes())
64//             .ok()?;
65//
66//         // Write the temporary stream to the actual stream
67//         stream_write.write_bytes(&written_data).ok()?;
68//     }
69//
70//     None
71// }
72
73fn flush_bytes_queue_worker<W: Write>(
74    mut stream: Arc<RwLock<BitWriter<W, BigEndian>>>,
75    written_bytes_rx: std::sync::mpsc::Receiver<BytesMessage>,
76    last_message_written: Arc<RwLock<u32>>,
77    mut bytes_writer_queue: PriorityQueue<Vec<u8>, Reverse<u32>>,
78) {
79    while let Ok(bytes_message) = written_bytes_rx.recv() {
80        // Blocking recv
81        // eprintln!("received message");
82
83        bytes_writer_queue.push(bytes_message.bytes, Reverse(bytes_message.message_id));
84
85        let mut last_message_written = last_message_written.write().unwrap();
86        while let Some((bytes, message_id)) = bytes_writer_queue.pop() {
87            if message_id == Reverse(*last_message_written + 1) {
88                let mut stream_write = stream.write().unwrap();
89
90                // Write the number of bytes in the compressed Adu as the 32-bit header for this Adu
91                stream_write
92                    .write_bytes(&(bytes.len() as u32).to_be_bytes())
93                    .unwrap();
94                stream_write.write_bytes(&bytes).unwrap();
95                *last_message_written += 1;
96            } else {
97                bytes_writer_queue.push(bytes, message_id); // message_id here is already Reversed
98                break;
99            }
100        }
101    }
102    eprintln!("Exiting writer thread...");
103}
104
105impl<W: Write + std::marker::Send + std::marker::Sync + 'static> CompressedOutput<W> {
106    /// Create a new compressed output stream.
107    pub fn new(meta: CodecMetadata, writer: W) -> Self {
108        let adu = EventAdu::new(meta.plane, 0, meta.ref_interval, meta.adu_interval as usize);
109        let (written_bytes_tx, written_bytes_rx) = std::sync::mpsc::channel();
110
111        let stream_lock = RwLock::new(BitWriter::endian(writer, BigEndian));
112        let stream_lock_arc = Arc::new(stream_lock);
113        let stream_lock_arc_clone = stream_lock_arc.clone();
114
115        let last_message_written = Arc::new(RwLock::new(0));
116        let last_message_written_clone = last_message_written.clone();
117
118        std::thread::spawn(move || {
119            flush_bytes_queue_worker(
120                stream_lock_arc_clone,
121                written_bytes_rx,
122                last_message_written_clone,
123                PriorityQueue::new(),
124            );
125            eprintln!("Exiting writer thread...");
126        });
127        Self {
128            meta,
129            adu,
130            // arithmetic_coder: Some(arithmetic_coder),
131            // contexts: Some(contexts),
132            stream: Some(stream_lock_arc),
133            options: EncoderOptions::default(meta.plane),
134            // written_bytes_rx,
135            written_bytes_tx: Some(written_bytes_tx),
136            // bytes_writer_queue: PriorityQueue::new(),
137            last_message_sent: 0,
138            last_message_written,
139            _phantom: Default::default(),
140        }
141    }
142
143    /// Keep the compressed encoder's option state synchronized with the high-level encoder container
144    pub(crate) fn with_options(&mut self, options: EncoderOptions) {
145        self.options = options;
146    }
147
148    /// Convenience function to get a mutable reference to the underlying stream.
149    #[inline(always)]
150    pub(crate) fn stream(&mut self) -> &mut Arc<RwLock<BitWriter<W, BigEndian>>> {
151        self.stream.as_mut().unwrap()
152    }
153}
154
155impl<W: Write + std::marker::Send + std::marker::Sync + 'static + 'static + 'static>
156    WriteCompression<W> for CompressedOutput<W>
157{
158    fn magic(&self) -> Magic {
159        MAGIC_COMPRESSED
160    }
161
162    fn meta(&self) -> &CodecMetadata {
163        &self.meta
164    }
165
166    fn meta_mut(&mut self) -> &mut CodecMetadata {
167        &mut self.meta
168    }
169
170    fn write_bytes(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
171        self.stream().write().unwrap().write_bytes(bytes)
172    }
173
174    fn byte_align(&mut self) -> std::io::Result<()> {
175        self.stream().write().unwrap().byte_align()
176    }
177
178    fn into_writer(&mut self) -> Option<W> {
179        if !self.adu.skip_adu {
180            // while self.last_message_sent
181            //     != self.last_message_written + self.bytes_writer_queue.len() as u32
182            // {
183            //     self.flush_bytes_queue();
184            //
185            //     // Sleep 1 second
186            //     std::thread::sleep(std::time::Duration::from_secs(1));
187            // }
188            //
189            // if let Some(stream) = &mut self.stream {
190            //     while let Some((bytes, message_id)) = self.bytes_writer_queue.pop() {
191            //         if message_id == Reverse(self.last_message_written + 1) {
192            //             // Write the number of bytes in the compressed Adu as the 32-bit header for this Adu
193            //             stream
194            //                 .write_bytes(&(bytes.len() as u32).to_be_bytes())
195            //                 .unwrap();
196            //             stream.write_bytes(&bytes).unwrap();
197            //             self.last_message_written += 1;
198            //         } else {
199            //             break;
200            //         }
201            //     }
202
203            dbg!("compressing partial last adu");
204            let mut temp_stream = BitWriter::endian(Vec::new(), BigEndian);
205
206            let parameters = self.options.crf.get_parameters().clone();
207            let mut adu = self.adu.clone();
208            let tx = self.written_bytes_tx.as_ref().unwrap().clone();
209            // Spawn a thread to compress the ADU and write out the data
210
211            let message_id_to_send = self.last_message_sent + 1;
212            self.last_message_sent += 1;
213
214            std::thread::spawn(move || {
215                adu.compress(&mut temp_stream, parameters.c_thresh_max).ok();
216                let written_data = temp_stream.into_writer();
217
218                tx.send(BytesMessage {
219                    message_id: message_id_to_send,
220                    bytes: written_data,
221                })
222                .unwrap();
223            });
224            // }
225        }
226
227        // Wait for the partial ADU to be written...
228        while self.last_message_sent != *self.last_message_written.read().unwrap() {
229            // Sleep 1 second
230            eprintln!(
231                "Sleeping. {} messages sent, {} messages written",
232                self.last_message_sent,
233                *self.last_message_written.read().unwrap()
234            );
235            std::thread::sleep(std::time::Duration::from_secs(1));
236        }
237
238        dbg!("All ADUs written.");
239        // Kill the written_bytes_tx, so that the Arc only has one reference
240        self.written_bytes_tx = None; // This will cause the flush_bytes_queue_worker() thread to
241                                      // error out from the receiver, because the communication channel is severed
242
243        std::thread::sleep(std::time::Duration::from_secs(1)); // Wait for the thread to exit. TODO: Make this deterministic, wait on the thread handle
244        let arc = self.stream.take()?;
245
246        let lock = Arc::into_inner(arc).unwrap();
247        // let mut guard = tmp.write().unwrap();
248        let consumed_data = lock.into_inner().unwrap();
249        // let new_writer = BitWriter::endian(Default::default(), BigEndian);
250        // let old_writer = std::mem::replace(&mut *guard, new_writer);
251        Some(consumed_data.into_writer())
252        //     let temp_writer = BitWriter::endian(W, BigEndian);
253        //     let aa = std::mem::replace(&mut tmpp, temp_writer);
254        //     tmpp.into_writer()
255        // })
256    }
257
258    // fn into_writer(self: Self) -> Option<Box<W>> {
259    //     Some(Box::new(self.stream.into_writer()))
260    // }
261
262    fn flush_writer(&mut self) -> std::io::Result<()> {
263        self.stream().write().unwrap().flush()
264    }
265
266    fn ingest_event(&mut self, event: Event) -> Result<(), CodecError> {
267        // Check that the event fits within the Adu's time range
268        if event.t > self.adu.start_t + (self.adu.dt_ref * self.adu.num_intervals as DeltaT) {
269            // dbg!("compressing adu");
270            // If it doesn't, compress the events and reset the Adu
271
272            // self.flush_bytes_queue();
273            if self.stream.is_some() {
274                // if let Some((bytes, message_id)) = self.bytes_writer_queue.pop() {
275                //     if message_id == Reverse(self.last_message_written + 1) {
276                //         // Write the number of bytes in the compressed Adu as the 32-bit header for this Adu
277                //         stream
278                //             .write_bytes(&(bytes.len() as u32).to_be_bytes())
279                //             .unwrap();
280                //         stream.write_bytes(&bytes).unwrap();
281                //         self.last_message_written += 1;
282                //     } else {
283                //         self.bytes_writer_queue.push(bytes, message_id); // message_id here is already Reversed
284                //                                                          // break;
285                //     }
286                // }
287
288                // Create a temporary u8 stream to write the arithmetic-coded data to
289                let mut temp_stream = BitWriter::endian(Vec::new(), BigEndian);
290
291                let parameters = self.options.crf.get_parameters().clone();
292
293                // Compress the Adu. This also writes the EOF symbol and flushes the encoder
294                // First, clone the ADU
295                let mut adu = self.adu.clone();
296                let tx = self.written_bytes_tx.as_ref().unwrap().clone();
297                // Spawn a thread to compress the ADU and write out the data
298
299                let message_id_to_send = self.last_message_sent + 1;
300                self.last_message_sent += 1;
301
302                std::thread::spawn(move || {
303                    adu.compress(&mut temp_stream, parameters.c_thresh_max).ok();
304                    let written_data = temp_stream.into_writer();
305
306                    tx.send(BytesMessage {
307                        message_id: message_id_to_send,
308                        bytes: written_data,
309                    })
310                    .unwrap();
311                });
312
313                self.adu.clear_compression();
314            }
315        }
316
317        // Ingest the event in the Adu
318        let _ = self.adu.ingest_event(event);
319
320        Ok(())
321    }
322    // fn ingest_event_debug(&mut self, event: Event) -> Result<Option<Adu>, CodecError> {
323    //     if let (true, _) = self.frame.add_event(event, self.meta.delta_t_max)? {
324    //         let adu = self.compress_events()?;
325    //         self.frame.add_event(event, self.meta.delta_t_max)?;
326    //         return Ok(Some(adu));
327    //     };
328    //     Ok(None)
329    // }
330}
331
332impl<R: Read> CompressedInput<R> {
333    /// Create a new compressed input stream.
334    pub fn new(delta_t_max: DeltaT, ref_interval: DeltaT, adu_interval: usize) -> Self
335    where
336        Self: Sized,
337    {
338        Self {
339            meta: CodecMetadata {
340                codec_version: 0,
341                header_size: 0,
342                time_mode: Default::default(),
343                plane: Default::default(),
344                tps: 0,
345                ref_interval,
346                delta_t_max,
347                event_size: 0,
348                source_camera: Default::default(),
349                adu_interval,
350            },
351            adu: None,
352            _phantom: std::marker::PhantomData,
353        }
354    }
355}
356
357impl<R: Read + Seek> ReadCompression<R> for CompressedInput<R> {
358    fn magic(&self) -> Magic {
359        MAGIC_COMPRESSED
360    }
361
362    fn meta(&self) -> &CodecMetadata {
363        &self.meta
364    }
365
366    fn meta_mut(&mut self) -> &mut CodecMetadata {
367        &mut self.meta
368    }
369
370    fn read_bytes(
371        &mut self,
372        bytes: &mut [u8],
373        reader: &mut BitReader<R, BigEndian>,
374    ) -> std::io::Result<()> {
375        reader.read_bytes(bytes)
376    }
377
378    // fn into_reader(self: Box<Self>, reader: &mut BitReader<R, BigEndian>) -> R {
379    //     reader.into_reader()
380    // }
381
382    #[allow(unused_variables)]
383    fn digest_event(&mut self, reader: &mut BitReader<R, BigEndian>) -> Result<Event, CodecError> {
384        if self.adu.is_none() {
385            self.adu = Some(EventAdu::new(
386                self.meta.plane,
387                0,
388                self.meta.ref_interval,
389                self.meta.adu_interval,
390            ));
391        }
392
393        if let Some(adu) = &mut self.adu {
394            if adu.decoder_is_empty() {
395                let start = std::time::Instant::now();
396                // Read the size of the Adu in bytes
397                let mut buffer = [0u8; 4];
398                reader.read_bytes(&mut buffer)?;
399                let num_bytes = u32::from_be_bytes(buffer);
400
401                // Read the compressed Adu from the stream
402                let adu_bytes = reader.read_to_vec(num_bytes as usize)?;
403
404                // Create a temporary u8 stream to read the arithmetic-coded data from
405                let mut adu_stream = BitReader::endian(Cursor::new(adu_bytes), BigEndian);
406
407                // Decompress the Adu
408                adu.decompress(&mut adu_stream);
409
410                let duration = start.elapsed();
411                println!("Decompressed Adu in {:?} ns", duration.as_nanos());
412            }
413            // Then return the next event from the queue
414            match adu.digest_event() {
415                Ok(event) => Ok(event),
416                Err(CodecError::NoMoreEvents) => {
417                    // If there are no more events in the Adu, try decompressing the next Adu
418                    self.digest_event(reader)
419                }
420                Err(e) => Err(e),
421            }
422        } else {
423            unreachable!("Invalid state");
424        }
425    }
426
427    #[allow(unused_variables)]
428    fn set_input_stream_position(
429        &mut self,
430        reader: &mut BitReader<R, BigEndian>,
431        pos: u64,
432    ) -> Result<(), CodecError> {
433        if pos.saturating_sub(self.meta.header_size as u64) % u64::from(self.meta.event_size) != 0 {
434            eprintln!("Attempted to seek to bad position in stream: {pos}");
435            return Err(CodecError::Seek);
436        }
437
438        if reader.seek_bits(SeekFrom::Start(pos * 8)).is_err() {
439            return Err(CodecError::Seek);
440        }
441        Ok(())
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use crate::codec::compressed::stream::CompressedInput;
448    use crate::codec::{CodecError, ReadCompression};
449    use crate::PlaneSize;
450    use bitstream_io::{BigEndian, BitReader};
451    use std::cmp::min;
452    use std::error::Error;
453    use std::io;
454
455    /// Test the creation a CompressedOutput and writing a bunch of events to it but NOT getting
456    /// to the time where we have a full Adu. It will compress the last partial ADU.
457    #[test]
458    fn test_compress_empty() -> Result<(), Box<dyn Error>> {
459        use crate::codec::compressed::stream::CompressedOutput;
460        use crate::codec::WriteCompression;
461        use crate::Coord;
462        use crate::{Event, SourceCamera, TimeMode};
463        use std::io::Cursor;
464
465        let start_t = 0;
466        let dt_ref = 255;
467        let num_intervals = 10;
468
469        let mut compressed_output = CompressedOutput::new(
470            crate::codec::CodecMetadata {
471                codec_version: 0,
472                header_size: 0,
473                time_mode: TimeMode::AbsoluteT,
474                plane: PlaneSize {
475                    width: 16,
476                    height: 32,
477                    channels: 1,
478                },
479                tps: 7650,
480                ref_interval: dt_ref,
481                delta_t_max: dt_ref * num_intervals,
482                event_size: 0,
483                source_camera: SourceCamera::FramedU8,
484                adu_interval: num_intervals as usize,
485            },
486            Cursor::new(Vec::new()),
487        );
488
489        let mut counter = 0;
490        for y in 0..30 {
491            for x in 0..16 {
492                compressed_output
493                    .ingest_event(Event {
494                        coord: Coord { x, y, c: None },
495                        t: min(280 + counter, start_t + dt_ref * num_intervals as u32),
496                        d: 7,
497                    })
498                    .unwrap();
499                if 280 + counter > start_t + dt_ref * num_intervals as u32 {
500                    break;
501                } else {
502                    counter += 1;
503                }
504            }
505        }
506
507        let output = compressed_output.into_writer().unwrap().into_inner();
508        assert!(!output.is_empty());
509        Ok(())
510    }
511
512    #[test]
513    fn test_compress_decompress_barely_full() -> Result<(), Box<dyn Error>> {
514        use crate::codec::compressed::stream::CompressedOutput;
515        use crate::codec::WriteCompression;
516        use crate::Coord;
517        use crate::{Event, SourceCamera, TimeMode};
518        use std::io::Cursor;
519
520        let plane = PlaneSize::new(16, 30, 1)?;
521        let start_t = 0;
522        let dt_ref = 255;
523        let num_intervals = 10;
524
525        // A random candidate pixel to check that its events match
526        let candidate_px_idx = (7, 12);
527        let mut input_px_events = Vec::new();
528        let mut output_px_events = Vec::new();
529
530        let mut compressed_output = CompressedOutput::new(
531            crate::codec::CodecMetadata {
532                codec_version: 0,
533                header_size: 0,
534                time_mode: TimeMode::AbsoluteT,
535                plane: PlaneSize {
536                    width: 16,
537                    height: 32,
538                    channels: 1,
539                },
540                tps: 7650,
541                ref_interval: dt_ref,
542                delta_t_max: dt_ref * num_intervals,
543                event_size: 0,
544                source_camera: SourceCamera::FramedU8,
545                adu_interval: num_intervals as usize,
546            },
547            Cursor::new(Vec::new()),
548        );
549
550        let mut counter = 0;
551        for y in 0..30 {
552            for x in 0..16 {
553                let event = Event {
554                    coord: Coord { x, y, c: None },
555                    t: min(280 + counter, start_t + dt_ref * num_intervals as u32),
556                    d: 7,
557                };
558                if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
559                    input_px_events.push(event);
560                }
561                compressed_output.ingest_event(event).unwrap();
562                if 280 + counter > start_t + dt_ref * num_intervals as u32 {
563                    break;
564                } else {
565                    counter += 1;
566                }
567            }
568        }
569
570        // Ingest one more event which is in the next Adu time span
571        compressed_output
572            .ingest_event(Event {
573                coord: Coord {
574                    x: 0,
575                    y: 0,
576                    c: None,
577                },
578                t: start_t + dt_ref * num_intervals as u32 + 1,
579                d: 7,
580            })
581            .unwrap();
582        counter += 1;
583
584        // Sleep for 3 seconds to give the writer thread time to catch up
585        std::thread::sleep(std::time::Duration::from_secs(3));
586
587        let output = compressed_output.into_writer().unwrap().into_inner();
588        assert!(!output.is_empty());
589        dbg!(counter);
590        dbg!(output.len());
591        // Check that the size is less than the raw events
592        assert!((output.len() as u32) < counter * 9);
593
594        let mut compressed_input = CompressedInput::new(
595            dt_ref * num_intervals as u32,
596            dt_ref,
597            num_intervals as usize,
598        );
599        compressed_input.meta.plane = plane;
600        let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
601        for i in 0..counter - 1 {
602            let event = compressed_input.digest_event(&mut stream);
603            if event.is_err() {
604                dbg!(i);
605            }
606            let event = event.unwrap();
607            if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
608                output_px_events.push(event);
609            }
610        }
611
612        assert_eq!(input_px_events, output_px_events);
613        Ok(())
614    }
615
616    #[test]
617    fn test_compress_decompress_several() -> Result<(), Box<dyn Error>> {
618        use crate::codec::compressed::stream::CompressedOutput;
619        use crate::codec::WriteCompression;
620        use crate::Coord;
621        use crate::{Event, SourceCamera, TimeMode};
622        use std::io::Cursor;
623
624        let plane = PlaneSize::new(16, 30, 1)?;
625        let dt_ref = 255;
626        let num_intervals = 5;
627
628        // A random candidate pixel to check that its events match
629        let candidate_px_idx = (7, 12);
630        let mut input_px_events = Vec::new();
631        let mut output_px_events = Vec::new();
632
633        let mut compressed_output = CompressedOutput::new(
634            crate::codec::CodecMetadata {
635                codec_version: 0,
636                header_size: 0,
637                time_mode: TimeMode::AbsoluteT,
638                plane: PlaneSize {
639                    width: 16,
640                    height: 32,
641                    channels: 1,
642                },
643                tps: 7650,
644                ref_interval: dt_ref,
645                delta_t_max: dt_ref * num_intervals as u32,
646                event_size: 0,
647                source_camera: SourceCamera::FramedU8,
648                adu_interval: num_intervals as usize,
649            },
650            Cursor::new(Vec::new()),
651        );
652
653        let mut counter = 0;
654        for _ in 0..10 {
655            for y in 0..30 {
656                for x in 0..16 {
657                    let event = Event {
658                        coord: Coord { x, y, c: None },
659                        t: 280 + counter,
660                        d: 7,
661                    };
662                    if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
663                        input_px_events.push(event);
664                    }
665                    compressed_output.ingest_event(event)?;
666
667                    counter += 1;
668                }
669            }
670        }
671
672        let output = compressed_output.into_writer().unwrap().into_inner();
673        assert!(!output.is_empty());
674        // Check that the size is less than the raw events
675        assert!((output.len() as u32) < counter * 9);
676
677        let mut compressed_input = CompressedInput::new(
678            dt_ref * num_intervals as u32,
679            dt_ref,
680            num_intervals as usize,
681        );
682        compressed_input.meta.plane = plane;
683        let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
684        for _ in 0..counter - 1 {
685            match compressed_input.digest_event(&mut stream) {
686                Ok(event) => {
687                    if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
688                        output_px_events.push(event);
689                    }
690                }
691                Err(CodecError::IoError(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
692
693                Err(e) => return Err(Box::new(e)),
694            }
695        }
696
697        assert!(input_px_events.len() >= output_px_events.len());
698        for i in 0..output_px_events.len() {
699            // Have some slack in the comparison of the T component, since there could be some slight loss here
700            let a = input_px_events[i].t - 5..input_px_events[i].t + 5;
701            let comp_t = output_px_events[i].t;
702            assert!(a.contains(&comp_t));
703            assert_eq!(input_px_events[i].d, output_px_events[i].d);
704        }
705        Ok(())
706    }
707
708    #[test]
709    fn test_compress_decompress_several_single() -> Result<(), Box<dyn Error>> {
710        use crate::codec::compressed::stream::CompressedOutput;
711        use crate::codec::WriteCompression;
712        use crate::Coord;
713        use crate::{Event, SourceCamera, TimeMode};
714        use std::io::Cursor;
715
716        let plane = PlaneSize::new(32, 16, 1)?;
717        let dt_ref = 255;
718        let num_intervals = 5;
719
720        // A random candidate pixel to check that its events match
721        let candidate_px_idx = (7, 12);
722        let mut input_px_events = Vec::new();
723        let mut output_px_events = Vec::new();
724
725        let mut compressed_output = CompressedOutput::new(
726            crate::codec::CodecMetadata {
727                codec_version: 0,
728                header_size: 0,
729                time_mode: TimeMode::AbsoluteT,
730                plane,
731                tps: 7650,
732                ref_interval: dt_ref,
733                delta_t_max: dt_ref * num_intervals as u32,
734                event_size: 0,
735                source_camera: SourceCamera::FramedU8,
736                adu_interval: num_intervals as usize,
737            },
738            Cursor::new(Vec::new()),
739        );
740
741        let mut counter = 0;
742        for i in 0..60 {
743            let event = Event {
744                coord: Coord {
745                    x: 12,
746                    y: 7,
747                    c: None,
748                },
749                t: 280 + i * 100 + counter,
750                d: 7,
751            };
752
753            input_px_events.push(event);
754
755            compressed_output.ingest_event(event)?;
756
757            counter += 1;
758        }
759
760        // MUCH LATER, integrate an event that with a timestamp far in the past:
761        let late_event = Event {
762            coord: Coord {
763                x: 19,
764                y: 14,
765                c: None,
766            },
767            t: 280,
768            d: 7,
769        };
770        compressed_output.ingest_event(late_event)?;
771
772        for i in 60..70 {
773            let event = Event {
774                coord: Coord {
775                    x: 12,
776                    y: 7,
777                    c: None,
778                },
779                t: 280 + i * 100 + counter,
780                d: 7,
781            };
782
783            input_px_events.push(event);
784
785            compressed_output.ingest_event(event)?;
786
787            counter += 1;
788        }
789
790        // Sleep for 3 seconds to give the writer thread time to catch up
791        std::thread::sleep(std::time::Duration::from_secs(3));
792
793        let output = compressed_output.into_writer().unwrap().into_inner();
794        assert!(!output.is_empty());
795        // Check that the size is less than the raw events
796
797        let mut compressed_input = CompressedInput::new(
798            dt_ref * num_intervals as u32,
799            dt_ref,
800            num_intervals as usize,
801        );
802        compressed_input.meta.plane = plane;
803        let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
804        for _ in 0..counter + 1 {
805            match compressed_input.digest_event(&mut stream) {
806                Ok(event) => {
807                    if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
808                        output_px_events.push(event);
809                    }
810                }
811                Err(CodecError::IoError(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
812
813                Err(e) => return Err(Box::new(e)),
814            }
815        }
816
817        assert!(input_px_events.len() >= output_px_events.len());
818        for i in 0..output_px_events.len() {
819            let span = input_px_events[i].t - 5..input_px_events[i].t + 5;
820            let t = output_px_events[i].t;
821            assert!(span.contains(&t));
822        }
823        Ok(())
824    }
825
826    #[test]
827    fn test_compress_decompress_several_with_skip() -> Result<(), Box<dyn Error>> {
828        use crate::codec::compressed::stream::CompressedOutput;
829        use crate::codec::WriteCompression;
830        use crate::Coord;
831        use crate::{Event, SourceCamera, TimeMode};
832        use std::io::Cursor;
833
834        let plane = PlaneSize::new(30, 30, 1)?;
835        let dt_ref = 255;
836        let num_intervals = 10;
837
838        // A random candidate pixel to check that its events match
839        let candidate_px_idx = (7, 12);
840        let mut input_px_events = Vec::new();
841        let mut output_px_events = Vec::new();
842
843        let mut compressed_output = CompressedOutput::new(
844            crate::codec::CodecMetadata {
845                codec_version: 0,
846                header_size: 0,
847                time_mode: TimeMode::AbsoluteT,
848                plane,
849                tps: 7650,
850                ref_interval: dt_ref,
851                delta_t_max: dt_ref * num_intervals as u32,
852                event_size: 0,
853                source_camera: SourceCamera::FramedU8,
854                adu_interval: num_intervals as usize,
855            },
856            Cursor::new(Vec::new()),
857        );
858
859        let mut counter = 0;
860        for i in 0..10 {
861            for y in 0..30 {
862                for x in 0..30 {
863                    // Make the top left cube a skip cube half the time, and skip pixel (14, 14)
864                    if !(y == 14 && x == 14 || i % 3 == 0 && y >= 16 && x < 16) {
865                        let event = Event {
866                            coord: Coord { x, y, c: None },
867                            t: 280 + counter,
868                            d: 7,
869                        };
870                        if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
871                            input_px_events.push(event);
872                        }
873                        compressed_output.ingest_event(event)?;
874
875                        counter += 1;
876                    }
877                }
878            }
879        }
880
881        // MUCH LATER, integrate an event that with a timestamp far in the past:
882        let late_event = Event {
883            coord: Coord {
884                x: 14,
885                y: 14,
886                c: None,
887            },
888            t: 280,
889            d: 7,
890        };
891        compressed_output.ingest_event(late_event)?;
892
893        for i in 0..10 {
894            for y in 0..30 {
895                for x in 0..30 {
896                    // Make the top left cube a skip cube half the time, and skip pixel (14, 14)
897                    if !(y == 14 && x == 14 || i % 3 == 0 && y >= 16 && x < 16) {
898                        let event = Event {
899                            coord: Coord { x, y, c: None },
900                            t: 280 + counter,
901                            d: 7,
902                        };
903                        if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
904                            input_px_events.push(event);
905                        }
906                        compressed_output.ingest_event(event)?;
907
908                        counter += 1;
909                    }
910                }
911            }
912        }
913
914        // Sleep for 3 seconds to give the writer thread time to catch up
915        std::thread::sleep(std::time::Duration::from_secs(10));
916
917        let output = compressed_output.into_writer().unwrap().into_inner();
918        assert!(!output.is_empty());
919        // Check that the size is less than the raw events
920        assert!((output.len() as u32) < counter * 9);
921
922        let mut compressed_input = CompressedInput::new(
923            dt_ref * num_intervals as u32,
924            dt_ref,
925            num_intervals as usize,
926        );
927        compressed_input.meta.plane = plane;
928        let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
929        loop {
930            match compressed_input.digest_event(&mut stream) {
931                Ok(event) => {
932                    if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
933                        output_px_events.push(event);
934                    }
935                }
936                Err(CodecError::IoError(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
937
938                Err(e) => return Err(Box::new(e)),
939            }
940        }
941
942        assert!(input_px_events.len() >= output_px_events.len());
943        for i in 0..output_px_events.len() {
944            // Have some slack in the comparison of the T component, since there could be some slight loss here
945            let a = input_px_events[i].t - 5..input_px_events[i].t + 5;
946            let comp_t = output_px_events[i].t;
947            assert!(a.contains(&comp_t));
948            assert_eq!(input_px_events[i].d, output_px_events[i].d);
949        }
950        Ok(())
951    }
952}