adder_codec_core/codec/compressed/
stream.rs

1use crate::codec::{CodecError, CodecMetadata, EncoderOptions, ReadCompression, WriteCompression};
2use bitstream_io::{BigEndian, BitRead, BitReader, BitWrite, BitWriter};
3use priority_queue::PriorityQueue;
4use std::cmp::Reverse;
5use std::io::{Cursor, Read, Seek, SeekFrom, Write};
6use std::sync::{Arc, RwLock};
7
8use crate::codec::compressed::source_model::event_structure::event_adu::EventAdu;
9use crate::codec::compressed::source_model::HandleEvent;
10use crate::codec::header::{Magic, MAGIC_COMPRESSED};
11use crate::{DeltaT, Event};
12
13/// A message to send to the writer thread (that is, the main thread) to write out the compressed
14/// ADΔER data to the stream
15pub(crate) struct BytesMessage {
16    message_id: u32,
17    bytes: Vec<u8>,
18}
19
20/// Write compressed ADΔER data to a stream.
21pub struct CompressedOutput<W: Write> {
22    pub(crate) meta: CodecMetadata,
23    pub(crate) adu: EventAdu,
24    pub(crate) stream: Option<Arc<RwLock<BitWriter<W, BigEndian>>>>,
25    pub(crate) options: EncoderOptions,
26    // pub(crate) written_bytes_rx: std::sync::mpsc::Receiver<BytesMessage>,
27    pub(crate) written_bytes_tx: Option<std::sync::mpsc::Sender<BytesMessage>>,
28    // pub(crate) bytes_writer_queue: PriorityQueue<Vec<u8>, Reverse<u32>>,
29    /// The ID of the last message sent from a spawned compressor thread
30    pub(crate) last_message_sent: u32,
31
32    /// The ID of the last message received in the writer thread and actually written out the stream
33    pub(crate) last_message_written: Arc<RwLock<u32>>,
34
35    pub(crate) _phantom: std::marker::PhantomData<W>,
36}
37
38/// Read compressed ADΔER data from a stream.
39pub struct CompressedInput<R: Read> {
40    pub(crate) meta: CodecMetadata,
41
42    adu: Option<EventAdu>,
43
44    _phantom: std::marker::PhantomData<R>,
45}
46
47// fn compressor_worker<W: Write>(
48//     mut stream: Arc<RwLock<BitWriter<W, BigEndian>>>,
49//     rx: std::sync::mpsc::Receiver<(EventAdu, CrfParameters)>,
50// ) -> Option<W> {
51//     while let Ok((mut adu, parameters)) = rx.recv() {
52//         let mut temp_stream = BitWriter::endian(Vec::new(), BigEndian);
53//
54//         // Compress the Adu. This also writes the EOF symbol and flushes the encoder
55//         adu.compress(&mut temp_stream, parameters.c_thresh_max).ok();
56//         let written_data = temp_stream.into_writer();
57//
58//         let mut stream_write = stream.write().unwrap();
59//         // Write the number of bytes in the compressed Adu as the 32-bit header for this Adu
60//         stream_write
61//             .write_bytes(&(written_data.len() as u32).to_be_bytes())
62//             .ok()?;
63//
64//         // Write the temporary stream to the actual stream
65//         stream_write.write_bytes(&written_data).ok()?;
66//     }
67//
68//     None
69// }
70
71fn flush_bytes_queue_worker<W: Write>(
72    stream: Arc<RwLock<BitWriter<W, BigEndian>>>,
73    written_bytes_rx: std::sync::mpsc::Receiver<BytesMessage>,
74    last_message_written: Arc<RwLock<u32>>,
75    mut bytes_writer_queue: PriorityQueue<Vec<u8>, Reverse<u32>>,
76) {
77    while let Ok(bytes_message) = written_bytes_rx.recv() {
78        // Blocking recv
79        // eprintln!("received message");
80
81        bytes_writer_queue.push(bytes_message.bytes, Reverse(bytes_message.message_id));
82
83        let mut last_message_written = last_message_written.write().unwrap();
84        while let Some((bytes, message_id)) = bytes_writer_queue.pop() {
85            if message_id == Reverse(*last_message_written + 1) {
86                let mut stream_write = stream.write().unwrap();
87
88                // Write the number of bytes in the compressed Adu as the 32-bit header for this Adu
89                stream_write
90                    .write_bytes(&(bytes.len() as u32).to_be_bytes())
91                    .unwrap();
92                stream_write.write_bytes(&bytes).unwrap();
93                *last_message_written += 1;
94            } else {
95                bytes_writer_queue.push(bytes, message_id); // message_id here is already Reversed
96                break;
97            }
98        }
99    }
100    eprintln!("Exiting writer thread...");
101}
102
103impl<W: Write + std::marker::Send + std::marker::Sync + 'static> CompressedOutput<W> {
104    /// Create a new compressed output stream.
105    pub fn new(meta: CodecMetadata, writer: W) -> Self {
106        let adu = EventAdu::new(meta.plane, 0, meta.ref_interval, meta.adu_interval);
107        let (written_bytes_tx, written_bytes_rx) = std::sync::mpsc::channel();
108
109        let stream_lock = RwLock::new(BitWriter::endian(writer, BigEndian));
110        let stream_lock_arc = Arc::new(stream_lock);
111        let stream_lock_arc_clone = stream_lock_arc.clone();
112
113        let last_message_written = Arc::new(RwLock::new(0));
114        let last_message_written_clone = last_message_written.clone();
115
116        std::thread::spawn(move || {
117            flush_bytes_queue_worker(
118                stream_lock_arc_clone,
119                written_bytes_rx,
120                last_message_written_clone,
121                PriorityQueue::new(),
122            );
123            eprintln!("Exiting writer thread...");
124        });
125        Self {
126            meta,
127            adu,
128            // arithmetic_coder: Some(arithmetic_coder),
129            // contexts: Some(contexts),
130            stream: Some(stream_lock_arc),
131            options: EncoderOptions::default(meta.plane),
132            // written_bytes_rx,
133            written_bytes_tx: Some(written_bytes_tx),
134            // bytes_writer_queue: PriorityQueue::new(),
135            last_message_sent: 0,
136            last_message_written,
137            _phantom: Default::default(),
138        }
139    }
140
141    /// Keep the compressed encoder's option state synchronized with the high-level encoder container
142    pub(crate) fn with_options(&mut self, options: EncoderOptions) {
143        self.options = options;
144    }
145
146    /// Convenience function to get a mutable reference to the underlying stream.
147    #[inline(always)]
148    pub(crate) fn stream(&mut self) -> &mut Arc<RwLock<BitWriter<W, BigEndian>>> {
149        self.stream.as_mut().unwrap()
150    }
151}
152
153impl<W: Write + std::marker::Send + std::marker::Sync + 'static + 'static + 'static>
154    WriteCompression<W> for CompressedOutput<W>
155{
156    fn magic(&self) -> Magic {
157        MAGIC_COMPRESSED
158    }
159
160    fn meta(&self) -> &CodecMetadata {
161        &self.meta
162    }
163
164    fn meta_mut(&mut self) -> &mut CodecMetadata {
165        &mut self.meta
166    }
167
168    fn write_bytes(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
169        self.stream().write().unwrap().write_bytes(bytes)
170    }
171
172    fn byte_align(&mut self) -> std::io::Result<()> {
173        self.stream().write().unwrap().byte_align()
174    }
175
176    fn into_writer(mut self) -> Option<W> {
177        if !self.adu.skip_adu {
178            // while self.last_message_sent
179            //     != self.last_message_written + self.bytes_writer_queue.len() as u32
180            // {
181            //     self.flush_bytes_queue();
182            //
183            //     // Sleep 1 second
184            //     std::thread::sleep(std::time::Duration::from_secs(1));
185            // }
186            //
187            // if let Some(stream) = &mut self.stream {
188            //     while let Some((bytes, message_id)) = self.bytes_writer_queue.pop() {
189            //         if message_id == Reverse(self.last_message_written + 1) {
190            //             // Write the number of bytes in the compressed Adu as the 32-bit header for this Adu
191            //             stream
192            //                 .write_bytes(&(bytes.len() as u32).to_be_bytes())
193            //                 .unwrap();
194            //             stream.write_bytes(&bytes).unwrap();
195            //             self.last_message_written += 1;
196            //         } else {
197            //             break;
198            //         }
199            //     }
200
201            dbg!("compressing partial last adu");
202            let mut temp_stream = BitWriter::endian(Vec::new(), BigEndian);
203
204            let parameters = *self.options.crf.get_parameters();
205            let mut adu = self.adu.clone();
206            let tx = self.written_bytes_tx.as_ref().unwrap().clone();
207            // Spawn a thread to compress the ADU and write out the data
208
209            let message_id_to_send = self.last_message_sent + 1;
210            self.last_message_sent += 1;
211
212            std::thread::spawn(move || {
213                adu.compress(&mut temp_stream, parameters.c_thresh_max).ok();
214                let written_data = temp_stream.into_writer();
215
216                tx.send(BytesMessage {
217                    message_id: message_id_to_send,
218                    bytes: written_data,
219                })
220                .unwrap();
221            });
222            // }
223        }
224
225        // Wait for the partial ADU to be written...
226        while self.last_message_sent != *self.last_message_written.read().unwrap() {
227            // Sleep 1 second
228            eprintln!(
229                "Sleeping. {} messages sent, {} messages written",
230                self.last_message_sent,
231                *self.last_message_written.read().unwrap()
232            );
233            std::thread::sleep(std::time::Duration::from_secs(1));
234        }
235
236        dbg!("All ADUs written.");
237        // Kill the written_bytes_tx, so that the Arc only has one reference
238        self.written_bytes_tx = None; // This will cause the flush_bytes_queue_worker() thread to
239                                      // error out from the receiver, because the communication channel is severed
240
241        std::thread::sleep(std::time::Duration::from_secs(1)); // Wait for the thread to exit. TODO: Make this deterministic, wait on the thread handle
242        let arc = self.stream.take()?;
243
244        let lock = Arc::into_inner(arc).unwrap();
245        // let mut guard = tmp.write().unwrap();
246        let consumed_data = lock.into_inner().unwrap();
247        // let new_writer = BitWriter::endian(Default::default(), BigEndian);
248        // let old_writer = std::mem::replace(&mut *guard, new_writer);
249        Some(consumed_data.into_writer())
250        //     let temp_writer = BitWriter::endian(W, BigEndian);
251        //     let aa = std::mem::replace(&mut tmpp, temp_writer);
252        //     tmpp.into_writer()
253        // })
254    }
255
256    // fn into_writer(self: Self) -> Option<Box<W>> {
257    //     Some(Box::new(self.stream.into_writer()))
258    // }
259
260    fn flush_writer(&mut self) -> std::io::Result<()> {
261        self.stream().write().unwrap().flush()
262    }
263
264    fn ingest_event(&mut self, event: Event) -> Result<(), CodecError> {
265        // Check that the event fits within the Adu's time range
266        if event.t > self.adu.start_t + (self.adu.dt_ref * self.adu.num_intervals as DeltaT) {
267            // dbg!("compressing adu");
268            // If it doesn't, compress the events and reset the Adu
269
270            // self.flush_bytes_queue();
271            if self.stream.is_some() {
272                // if let Some((bytes, message_id)) = self.bytes_writer_queue.pop() {
273                //     if message_id == Reverse(self.last_message_written + 1) {
274                //         // Write the number of bytes in the compressed Adu as the 32-bit header for this Adu
275                //         stream
276                //             .write_bytes(&(bytes.len() as u32).to_be_bytes())
277                //             .unwrap();
278                //         stream.write_bytes(&bytes).unwrap();
279                //         self.last_message_written += 1;
280                //     } else {
281                //         self.bytes_writer_queue.push(bytes, message_id); // message_id here is already Reversed
282                //                                                          // break;
283                //     }
284                // }
285
286                // Create a temporary u8 stream to write the arithmetic-coded data to
287                let mut temp_stream = BitWriter::endian(Vec::new(), BigEndian);
288
289                let parameters = *self.options.crf.get_parameters();
290
291                // Compress the Adu. This also writes the EOF symbol and flushes the encoder
292                // First, clone the ADU
293                let mut adu = self.adu.clone();
294                let tx = self.written_bytes_tx.as_ref().unwrap().clone();
295                // Spawn a thread to compress the ADU and write out the data
296
297                let message_id_to_send = self.last_message_sent + 1;
298                self.last_message_sent += 1;
299
300                std::thread::spawn(move || {
301                    adu.compress(&mut temp_stream, parameters.c_thresh_max).ok();
302                    let written_data = temp_stream.into_writer();
303
304                    tx.send(BytesMessage {
305                        message_id: message_id_to_send,
306                        bytes: written_data,
307                    })
308                    .unwrap();
309                });
310
311                self.adu.clear_compression();
312            }
313        }
314
315        // Ingest the event in the Adu
316        let _ = self.adu.ingest_event(event);
317
318        Ok(())
319    }
320    // fn ingest_event_debug(&mut self, event: Event) -> Result<Option<Adu>, CodecError> {
321    //     if let (true, _) = self.frame.add_event(event, self.meta.delta_t_max)? {
322    //         let adu = self.compress_events()?;
323    //         self.frame.add_event(event, self.meta.delta_t_max)?;
324    //         return Ok(Some(adu));
325    //     };
326    //     Ok(None)
327    // }
328}
329
330impl<R: Read> CompressedInput<R> {
331    /// Create a new compressed input stream.
332    pub fn new(delta_t_max: DeltaT, ref_interval: DeltaT, adu_interval: usize) -> Self
333    where
334        Self: Sized,
335    {
336        Self {
337            meta: CodecMetadata {
338                codec_version: 0,
339                header_size: 0,
340                time_mode: Default::default(),
341                plane: Default::default(),
342                tps: 0,
343                ref_interval,
344                delta_t_max,
345                event_size: 0,
346                source_camera: Default::default(),
347                adu_interval,
348            },
349            adu: None,
350            _phantom: std::marker::PhantomData,
351        }
352    }
353}
354
355impl<R: Read + Seek> ReadCompression<R> for CompressedInput<R> {
356    fn magic(&self) -> Magic {
357        MAGIC_COMPRESSED
358    }
359
360    fn meta(&self) -> &CodecMetadata {
361        &self.meta
362    }
363
364    fn meta_mut(&mut self) -> &mut CodecMetadata {
365        &mut self.meta
366    }
367
368    fn read_bytes(
369        &mut self,
370        bytes: &mut [u8],
371        reader: &mut BitReader<R, BigEndian>,
372    ) -> std::io::Result<()> {
373        reader.read_bytes(bytes)
374    }
375
376    // fn into_reader(self: Box<Self>, reader: &mut BitReader<R, BigEndian>) -> R {
377    //     reader.into_reader()
378    // }
379
380    #[allow(unused_variables)]
381    fn digest_event(&mut self, reader: &mut BitReader<R, BigEndian>) -> Result<Event, CodecError> {
382        if self.adu.is_none() {
383            self.adu = Some(EventAdu::new(
384                self.meta.plane,
385                0,
386                self.meta.ref_interval,
387                self.meta.adu_interval,
388            ));
389        }
390
391        if let Some(adu) = &mut self.adu {
392            if adu.decoder_is_empty() {
393                let start = std::time::Instant::now();
394                // Read the size of the Adu in bytes
395                let mut buffer = [0u8; 4];
396                reader.read_bytes(&mut buffer)?;
397                let num_bytes = u32::from_be_bytes(buffer);
398
399                // Read the compressed Adu from the stream
400                let adu_bytes = reader.read_to_vec(num_bytes as usize)?;
401
402                // Create a temporary u8 stream to read the arithmetic-coded data from
403                let mut adu_stream = BitReader::endian(Cursor::new(adu_bytes), BigEndian);
404
405                // Decompress the Adu
406                adu.decompress(&mut adu_stream);
407
408                let duration = start.elapsed();
409                println!("Decompressed Adu in {:?} ns", duration.as_nanos());
410            }
411            // Then return the next event from the queue
412            match adu.digest_event() {
413                Ok(event) => Ok(event),
414                Err(CodecError::NoMoreEvents) => {
415                    // If there are no more events in the Adu, try decompressing the next Adu
416                    self.digest_event(reader)
417                }
418                Err(e) => Err(e),
419            }
420        } else {
421            unreachable!("Invalid state");
422        }
423    }
424
425    #[allow(unused_variables)]
426    fn set_input_stream_position(
427        &mut self,
428        reader: &mut BitReader<R, BigEndian>,
429        pos: u64,
430    ) -> Result<(), CodecError> {
431        if pos.saturating_sub(self.meta.header_size as u64) % u64::from(self.meta.event_size) != 0 {
432            eprintln!("Attempted to seek to bad position in stream: {pos}");
433            return Err(CodecError::Seek);
434        }
435
436        if reader.seek_bits(SeekFrom::Start(pos * 8)).is_err() {
437            return Err(CodecError::Seek);
438        }
439        Ok(())
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use crate::codec::compressed::stream::CompressedInput;
446    use crate::codec::{CodecError, ReadCompression};
447    use crate::PlaneSize;
448    use bitstream_io::{BigEndian, BitReader};
449    use std::cmp::min;
450    use std::error::Error;
451    use std::io;
452
453    /// Test the creation a CompressedOutput and writing a bunch of events to it but NOT getting
454    /// to the time where we have a full Adu. It will compress the last partial ADU.
455    #[test]
456    fn test_compress_empty() -> Result<(), Box<dyn Error>> {
457        use crate::codec::compressed::stream::CompressedOutput;
458        use crate::codec::WriteCompression;
459        use crate::Coord;
460        use crate::{Event, SourceCamera, TimeMode};
461        use std::io::Cursor;
462
463        let start_t = 0;
464        let dt_ref = 255;
465        let num_intervals = 10;
466
467        let mut compressed_output = CompressedOutput::new(
468            crate::codec::CodecMetadata {
469                codec_version: 0,
470                header_size: 0,
471                time_mode: TimeMode::AbsoluteT,
472                plane: PlaneSize {
473                    width: 16,
474                    height: 32,
475                    channels: 1,
476                },
477                tps: 7650,
478                ref_interval: dt_ref,
479                delta_t_max: dt_ref * num_intervals,
480                event_size: 0,
481                source_camera: SourceCamera::FramedU8,
482                adu_interval: num_intervals as usize,
483            },
484            Cursor::new(Vec::new()),
485        );
486
487        let mut counter = 0;
488        for y in 0..30 {
489            for x in 0..16 {
490                compressed_output
491                    .ingest_event(Event {
492                        coord: Coord { x, y, c: None },
493                        t: min(280 + counter, start_t + dt_ref * num_intervals),
494                        d: 7,
495                    })
496                    .unwrap();
497                if 280 + counter > start_t + dt_ref * num_intervals {
498                    break;
499                } else {
500                    counter += 1;
501                }
502            }
503        }
504
505        let output = compressed_output.into_writer().unwrap().into_inner();
506        assert!(!output.is_empty());
507        Ok(())
508    }
509
510    #[test]
511    fn test_compress_decompress_barely_full() -> Result<(), Box<dyn Error>> {
512        use crate::codec::compressed::stream::CompressedOutput;
513        use crate::codec::WriteCompression;
514        use crate::Coord;
515        use crate::{Event, SourceCamera, TimeMode};
516        use std::io::Cursor;
517
518        let plane = PlaneSize::new(16, 30, 1)?;
519        let start_t = 0;
520        let dt_ref = 255;
521        let num_intervals = 10;
522
523        // A random candidate pixel to check that its events match
524        let candidate_px_idx = (7, 12);
525        let mut input_px_events = Vec::new();
526        let mut output_px_events = Vec::new();
527
528        let mut compressed_output = CompressedOutput::new(
529            crate::codec::CodecMetadata {
530                codec_version: 0,
531                header_size: 0,
532                time_mode: TimeMode::AbsoluteT,
533                plane: PlaneSize {
534                    width: 16,
535                    height: 32,
536                    channels: 1,
537                },
538                tps: 7650,
539                ref_interval: dt_ref,
540                delta_t_max: dt_ref * num_intervals,
541                event_size: 0,
542                source_camera: SourceCamera::FramedU8,
543                adu_interval: num_intervals as usize,
544            },
545            Cursor::new(Vec::new()),
546        );
547
548        let mut counter = 0;
549        for y in 0..30 {
550            for x in 0..16 {
551                let event = Event {
552                    coord: Coord { x, y, c: None },
553                    t: min(280 + counter, start_t + dt_ref * num_intervals),
554                    d: 7,
555                };
556                if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
557                    input_px_events.push(event);
558                }
559                compressed_output.ingest_event(event).unwrap();
560                if 280 + counter > start_t + dt_ref * num_intervals {
561                    break;
562                } else {
563                    counter += 1;
564                }
565            }
566        }
567
568        // Ingest one more event which is in the next Adu time span
569        compressed_output
570            .ingest_event(Event {
571                coord: Coord {
572                    x: 0,
573                    y: 0,
574                    c: None,
575                },
576                t: start_t + dt_ref * num_intervals + 1,
577                d: 7,
578            })
579            .unwrap();
580        counter += 1;
581
582        // Sleep for 3 seconds to give the writer thread time to catch up
583        std::thread::sleep(std::time::Duration::from_secs(3));
584
585        let output = compressed_output.into_writer().unwrap().into_inner();
586        assert!(!output.is_empty());
587        dbg!(counter);
588        dbg!(output.len());
589        // Check that the size is less than the raw events
590        assert!((output.len() as u32) < counter * 9);
591
592        let mut compressed_input =
593            CompressedInput::new(dt_ref * num_intervals, dt_ref, num_intervals as usize);
594        compressed_input.meta.plane = plane;
595        let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
596        for i in 0..counter - 1 {
597            let event = compressed_input.digest_event(&mut stream);
598            if event.is_err() {
599                dbg!(i);
600            }
601            let event = event.unwrap();
602            if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
603                output_px_events.push(event);
604            }
605        }
606
607        assert_eq!(input_px_events, output_px_events);
608        Ok(())
609    }
610
611    #[test]
612    fn test_compress_decompress_several() -> Result<(), Box<dyn Error>> {
613        use crate::codec::compressed::stream::CompressedOutput;
614        use crate::codec::WriteCompression;
615        use crate::Coord;
616        use crate::{Event, SourceCamera, TimeMode};
617        use std::io::Cursor;
618
619        let plane = PlaneSize::new(16, 30, 1)?;
620        let dt_ref = 255;
621        let num_intervals = 5;
622
623        // A random candidate pixel to check that its events match
624        let candidate_px_idx = (7, 12);
625        let mut input_px_events = Vec::new();
626        let mut output_px_events = Vec::new();
627
628        let mut compressed_output = CompressedOutput::new(
629            crate::codec::CodecMetadata {
630                codec_version: 0,
631                header_size: 0,
632                time_mode: TimeMode::AbsoluteT,
633                plane: PlaneSize {
634                    width: 16,
635                    height: 32,
636                    channels: 1,
637                },
638                tps: 7650,
639                ref_interval: dt_ref,
640                delta_t_max: dt_ref * num_intervals as u32,
641                event_size: 0,
642                source_camera: SourceCamera::FramedU8,
643                adu_interval: num_intervals as usize,
644            },
645            Cursor::new(Vec::new()),
646        );
647
648        let mut counter = 0;
649        for _ in 0..10 {
650            for y in 0..30 {
651                for x in 0..16 {
652                    let event = Event {
653                        coord: Coord { x, y, c: None },
654                        t: 280 + counter,
655                        d: 7,
656                    };
657                    if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
658                        input_px_events.push(event);
659                    }
660                    compressed_output.ingest_event(event)?;
661
662                    counter += 1;
663                }
664            }
665        }
666
667        let output = compressed_output.into_writer().unwrap().into_inner();
668        assert!(!output.is_empty());
669        // Check that the size is less than the raw events
670        assert!((output.len() as u32) < counter * 9);
671
672        let mut compressed_input = CompressedInput::new(
673            dt_ref * num_intervals as u32,
674            dt_ref,
675            num_intervals as usize,
676        );
677        compressed_input.meta.plane = plane;
678        let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
679        for _ in 0..counter - 1 {
680            match compressed_input.digest_event(&mut stream) {
681                Ok(event) => {
682                    if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
683                        output_px_events.push(event);
684                    }
685                }
686                Err(CodecError::IoError(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
687
688                Err(e) => return Err(Box::new(e)),
689            }
690        }
691
692        assert!(input_px_events.len() >= output_px_events.len());
693        for i in 0..output_px_events.len() {
694            // Have some slack in the comparison of the T component, since there could be some slight loss here
695            let a = input_px_events[i].t - 5..input_px_events[i].t + 5;
696            let comp_t = output_px_events[i].t;
697            assert!(a.contains(&comp_t));
698            assert_eq!(input_px_events[i].d, output_px_events[i].d);
699        }
700        Ok(())
701    }
702
703    #[test]
704    fn test_compress_decompress_several_single() -> Result<(), Box<dyn Error>> {
705        use crate::codec::compressed::stream::CompressedOutput;
706        use crate::codec::WriteCompression;
707        use crate::Coord;
708        use crate::{Event, SourceCamera, TimeMode};
709        use std::io::Cursor;
710
711        let plane = PlaneSize::new(32, 16, 1)?;
712        let dt_ref = 255;
713        let num_intervals = 5;
714
715        // A random candidate pixel to check that its events match
716        let candidate_px_idx = (7, 12);
717        let mut input_px_events = Vec::new();
718        let mut output_px_events = Vec::new();
719
720        let mut compressed_output = CompressedOutput::new(
721            crate::codec::CodecMetadata {
722                codec_version: 0,
723                header_size: 0,
724                time_mode: TimeMode::AbsoluteT,
725                plane,
726                tps: 7650,
727                ref_interval: dt_ref,
728                delta_t_max: dt_ref * num_intervals as u32,
729                event_size: 0,
730                source_camera: SourceCamera::FramedU8,
731                adu_interval: num_intervals as usize,
732            },
733            Cursor::new(Vec::new()),
734        );
735
736        let mut counter = 0;
737        for i in 0..60 {
738            let event = Event {
739                coord: Coord {
740                    x: 12,
741                    y: 7,
742                    c: None,
743                },
744                t: 280 + i * 100 + counter,
745                d: 7,
746            };
747
748            input_px_events.push(event);
749
750            compressed_output.ingest_event(event)?;
751
752            counter += 1;
753        }
754
755        // MUCH LATER, integrate an event that with a timestamp far in the past:
756        let late_event = Event {
757            coord: Coord {
758                x: 19,
759                y: 14,
760                c: None,
761            },
762            t: 280,
763            d: 7,
764        };
765        compressed_output.ingest_event(late_event)?;
766
767        for i in 60..70 {
768            let event = Event {
769                coord: Coord {
770                    x: 12,
771                    y: 7,
772                    c: None,
773                },
774                t: 280 + i * 100 + counter,
775                d: 7,
776            };
777
778            input_px_events.push(event);
779
780            compressed_output.ingest_event(event)?;
781
782            counter += 1;
783        }
784
785        // Sleep for 3 seconds to give the writer thread time to catch up
786        std::thread::sleep(std::time::Duration::from_secs(3));
787
788        let output = compressed_output.into_writer().unwrap().into_inner();
789        assert!(!output.is_empty());
790        // Check that the size is less than the raw events
791
792        let mut compressed_input = CompressedInput::new(
793            dt_ref * num_intervals as u32,
794            dt_ref,
795            num_intervals as usize,
796        );
797        compressed_input.meta.plane = plane;
798        let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
799        for _ in 0..=counter {
800            match compressed_input.digest_event(&mut stream) {
801                Ok(event) => {
802                    if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
803                        output_px_events.push(event);
804                    }
805                }
806                Err(CodecError::IoError(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
807
808                Err(e) => return Err(Box::new(e)),
809            }
810        }
811
812        assert!(input_px_events.len() >= output_px_events.len());
813        for i in 0..output_px_events.len() {
814            let span = input_px_events[i].t - 5..input_px_events[i].t + 5;
815            let t = output_px_events[i].t;
816            assert!(span.contains(&t));
817        }
818        Ok(())
819    }
820
821    #[test]
822    fn test_compress_decompress_several_with_skip() -> Result<(), Box<dyn Error>> {
823        use crate::codec::compressed::stream::CompressedOutput;
824        use crate::codec::WriteCompression;
825        use crate::Coord;
826        use crate::{Event, SourceCamera, TimeMode};
827        use std::io::Cursor;
828
829        let plane = PlaneSize::new(30, 30, 1)?;
830        let dt_ref = 255;
831        let num_intervals = 10;
832
833        // A random candidate pixel to check that its events match
834        let candidate_px_idx = (7, 12);
835        let mut input_px_events = Vec::new();
836        let mut output_px_events = Vec::new();
837
838        let mut compressed_output = CompressedOutput::new(
839            crate::codec::CodecMetadata {
840                codec_version: 0,
841                header_size: 0,
842                time_mode: TimeMode::AbsoluteT,
843                plane,
844                tps: 7650,
845                ref_interval: dt_ref,
846                delta_t_max: dt_ref * num_intervals as u32,
847                event_size: 0,
848                source_camera: SourceCamera::FramedU8,
849                adu_interval: num_intervals as usize,
850            },
851            Cursor::new(Vec::new()),
852        );
853
854        let mut counter = 0;
855        for i in 0..10 {
856            for y in 0..30 {
857                for x in 0..30 {
858                    // Make the top left cube a skip cube half the time, and skip pixel (14, 14)
859                    if !(y == 14 && x == 14 || i % 3 == 0 && y >= 16 && x < 16) {
860                        let event = Event {
861                            coord: Coord { x, y, c: None },
862                            t: 280 + counter,
863                            d: 7,
864                        };
865                        if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
866                            input_px_events.push(event);
867                        }
868                        compressed_output.ingest_event(event)?;
869
870                        counter += 1;
871                    }
872                }
873            }
874        }
875
876        // MUCH LATER, integrate an event that with a timestamp far in the past:
877        let late_event = Event {
878            coord: Coord {
879                x: 14,
880                y: 14,
881                c: None,
882            },
883            t: 280,
884            d: 7,
885        };
886        compressed_output.ingest_event(late_event)?;
887
888        for i in 0..10 {
889            for y in 0..30 {
890                for x in 0..30 {
891                    // Make the top left cube a skip cube half the time, and skip pixel (14, 14)
892                    if !(y == 14 && x == 14 || i % 3 == 0 && y >= 16 && x < 16) {
893                        let event = Event {
894                            coord: Coord { x, y, c: None },
895                            t: 280 + counter,
896                            d: 7,
897                        };
898                        if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
899                            input_px_events.push(event);
900                        }
901                        compressed_output.ingest_event(event)?;
902
903                        counter += 1;
904                    }
905                }
906            }
907        }
908
909        // Sleep for 3 seconds to give the writer thread time to catch up
910        std::thread::sleep(std::time::Duration::from_secs(10));
911
912        let output = compressed_output.into_writer().unwrap().into_inner();
913        assert!(!output.is_empty());
914        // Check that the size is less than the raw events
915        assert!((output.len() as u32) < counter * 9);
916
917        let mut compressed_input = CompressedInput::new(
918            dt_ref * num_intervals as u32,
919            dt_ref,
920            num_intervals as usize,
921        );
922        compressed_input.meta.plane = plane;
923        let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
924        loop {
925            match compressed_input.digest_event(&mut stream) {
926                Ok(event) => {
927                    if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
928                        output_px_events.push(event);
929                    }
930                }
931                Err(CodecError::IoError(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
932
933                Err(e) => return Err(Box::new(e)),
934            }
935        }
936
937        assert!(input_px_events.len() >= output_px_events.len());
938        for i in 0..output_px_events.len() {
939            // Have some slack in the comparison of the T component, since there could be some slight loss here
940            let a = input_px_events[i].t - 5..input_px_events[i].t + 5;
941            let comp_t = output_px_events[i].t;
942            assert!(a.contains(&comp_t));
943            assert_eq!(input_px_events[i].d, output_px_events[i].d);
944        }
945        Ok(())
946    }
947}