1use crate::codec::{CodecError, CodecMetadata, EncoderOptions, ReadCompression, WriteCompression};
2use bitstream_io::{BigEndian, BitRead, BitReader, BitWrite, BitWriter};
3use priority_queue::PriorityQueue;
4use std::cmp::Reverse;
5use std::io::{Cursor, Read, Seek, SeekFrom, Write};
6use std::ops::{Add, AddAssign};
7use std::sync::{Arc, RwLock};
8
9use crate::codec::compressed::source_model::event_structure::event_adu::EventAdu;
10use crate::codec::compressed::source_model::HandleEvent;
11use crate::codec::header::{Magic, MAGIC_COMPRESSED};
12use crate::codec::rate_controller::CrfParameters;
13use crate::{DeltaT, Event};
14
15pub(crate) struct BytesMessage {
18 message_id: u32,
19 bytes: Vec<u8>,
20}
21
22pub struct CompressedOutput<W: Write> {
24 pub(crate) meta: CodecMetadata,
25 pub(crate) adu: EventAdu,
26 pub(crate) stream: Option<Arc<RwLock<BitWriter<W, BigEndian>>>>,
27 pub(crate) options: EncoderOptions,
28 pub(crate) written_bytes_tx: Option<std::sync::mpsc::Sender<BytesMessage>>,
30 pub(crate) last_message_sent: u32,
33
34 pub(crate) last_message_written: Arc<RwLock<u32>>,
36
37 pub(crate) _phantom: std::marker::PhantomData<W>,
38}
39
40pub struct CompressedInput<R: Read> {
42 pub(crate) meta: CodecMetadata,
43
44 adu: Option<EventAdu>,
45
46 _phantom: std::marker::PhantomData<R>,
47}
48
49fn flush_bytes_queue_worker<W: Write>(
74 mut stream: Arc<RwLock<BitWriter<W, BigEndian>>>,
75 written_bytes_rx: std::sync::mpsc::Receiver<BytesMessage>,
76 last_message_written: Arc<RwLock<u32>>,
77 mut bytes_writer_queue: PriorityQueue<Vec<u8>, Reverse<u32>>,
78) {
79 while let Ok(bytes_message) = written_bytes_rx.recv() {
80 bytes_writer_queue.push(bytes_message.bytes, Reverse(bytes_message.message_id));
84
85 let mut last_message_written = last_message_written.write().unwrap();
86 while let Some((bytes, message_id)) = bytes_writer_queue.pop() {
87 if message_id == Reverse(*last_message_written + 1) {
88 let mut stream_write = stream.write().unwrap();
89
90 stream_write
92 .write_bytes(&(bytes.len() as u32).to_be_bytes())
93 .unwrap();
94 stream_write.write_bytes(&bytes).unwrap();
95 *last_message_written += 1;
96 } else {
97 bytes_writer_queue.push(bytes, message_id); break;
99 }
100 }
101 }
102 eprintln!("Exiting writer thread...");
103}
104
105impl<W: Write + std::marker::Send + std::marker::Sync + 'static> CompressedOutput<W> {
106 pub fn new(meta: CodecMetadata, writer: W) -> Self {
108 let adu = EventAdu::new(meta.plane, 0, meta.ref_interval, meta.adu_interval as usize);
109 let (written_bytes_tx, written_bytes_rx) = std::sync::mpsc::channel();
110
111 let stream_lock = RwLock::new(BitWriter::endian(writer, BigEndian));
112 let stream_lock_arc = Arc::new(stream_lock);
113 let stream_lock_arc_clone = stream_lock_arc.clone();
114
115 let last_message_written = Arc::new(RwLock::new(0));
116 let last_message_written_clone = last_message_written.clone();
117
118 std::thread::spawn(move || {
119 flush_bytes_queue_worker(
120 stream_lock_arc_clone,
121 written_bytes_rx,
122 last_message_written_clone,
123 PriorityQueue::new(),
124 );
125 eprintln!("Exiting writer thread...");
126 });
127 Self {
128 meta,
129 adu,
130 stream: Some(stream_lock_arc),
133 options: EncoderOptions::default(meta.plane),
134 written_bytes_tx: Some(written_bytes_tx),
136 last_message_sent: 0,
138 last_message_written,
139 _phantom: Default::default(),
140 }
141 }
142
143 pub(crate) fn with_options(&mut self, options: EncoderOptions) {
145 self.options = options;
146 }
147
148 #[inline(always)]
150 pub(crate) fn stream(&mut self) -> &mut Arc<RwLock<BitWriter<W, BigEndian>>> {
151 self.stream.as_mut().unwrap()
152 }
153}
154
155impl<W: Write + std::marker::Send + std::marker::Sync + 'static + 'static + 'static>
156 WriteCompression<W> for CompressedOutput<W>
157{
158 fn magic(&self) -> Magic {
159 MAGIC_COMPRESSED
160 }
161
162 fn meta(&self) -> &CodecMetadata {
163 &self.meta
164 }
165
166 fn meta_mut(&mut self) -> &mut CodecMetadata {
167 &mut self.meta
168 }
169
170 fn write_bytes(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
171 self.stream().write().unwrap().write_bytes(bytes)
172 }
173
174 fn byte_align(&mut self) -> std::io::Result<()> {
175 self.stream().write().unwrap().byte_align()
176 }
177
178 fn into_writer(&mut self) -> Option<W> {
179 if !self.adu.skip_adu {
180 dbg!("compressing partial last adu");
204 let mut temp_stream = BitWriter::endian(Vec::new(), BigEndian);
205
206 let parameters = self.options.crf.get_parameters().clone();
207 let mut adu = self.adu.clone();
208 let tx = self.written_bytes_tx.as_ref().unwrap().clone();
209 let message_id_to_send = self.last_message_sent + 1;
212 self.last_message_sent += 1;
213
214 std::thread::spawn(move || {
215 adu.compress(&mut temp_stream, parameters.c_thresh_max).ok();
216 let written_data = temp_stream.into_writer();
217
218 tx.send(BytesMessage {
219 message_id: message_id_to_send,
220 bytes: written_data,
221 })
222 .unwrap();
223 });
224 }
226
227 while self.last_message_sent != *self.last_message_written.read().unwrap() {
229 eprintln!(
231 "Sleeping. {} messages sent, {} messages written",
232 self.last_message_sent,
233 *self.last_message_written.read().unwrap()
234 );
235 std::thread::sleep(std::time::Duration::from_secs(1));
236 }
237
238 dbg!("All ADUs written.");
239 self.written_bytes_tx = None; std::thread::sleep(std::time::Duration::from_secs(1)); let arc = self.stream.take()?;
245
246 let lock = Arc::into_inner(arc).unwrap();
247 let consumed_data = lock.into_inner().unwrap();
249 Some(consumed_data.into_writer())
252 }
257
258 fn flush_writer(&mut self) -> std::io::Result<()> {
263 self.stream().write().unwrap().flush()
264 }
265
266 fn ingest_event(&mut self, event: Event) -> Result<(), CodecError> {
267 if event.t > self.adu.start_t + (self.adu.dt_ref * self.adu.num_intervals as DeltaT) {
269 if self.stream.is_some() {
274 let mut temp_stream = BitWriter::endian(Vec::new(), BigEndian);
290
291 let parameters = self.options.crf.get_parameters().clone();
292
293 let mut adu = self.adu.clone();
296 let tx = self.written_bytes_tx.as_ref().unwrap().clone();
297 let message_id_to_send = self.last_message_sent + 1;
300 self.last_message_sent += 1;
301
302 std::thread::spawn(move || {
303 adu.compress(&mut temp_stream, parameters.c_thresh_max).ok();
304 let written_data = temp_stream.into_writer();
305
306 tx.send(BytesMessage {
307 message_id: message_id_to_send,
308 bytes: written_data,
309 })
310 .unwrap();
311 });
312
313 self.adu.clear_compression();
314 }
315 }
316
317 let _ = self.adu.ingest_event(event);
319
320 Ok(())
321 }
322 }
331
332impl<R: Read> CompressedInput<R> {
333 pub fn new(delta_t_max: DeltaT, ref_interval: DeltaT, adu_interval: usize) -> Self
335 where
336 Self: Sized,
337 {
338 Self {
339 meta: CodecMetadata {
340 codec_version: 0,
341 header_size: 0,
342 time_mode: Default::default(),
343 plane: Default::default(),
344 tps: 0,
345 ref_interval,
346 delta_t_max,
347 event_size: 0,
348 source_camera: Default::default(),
349 adu_interval,
350 },
351 adu: None,
352 _phantom: std::marker::PhantomData,
353 }
354 }
355}
356
357impl<R: Read + Seek> ReadCompression<R> for CompressedInput<R> {
358 fn magic(&self) -> Magic {
359 MAGIC_COMPRESSED
360 }
361
362 fn meta(&self) -> &CodecMetadata {
363 &self.meta
364 }
365
366 fn meta_mut(&mut self) -> &mut CodecMetadata {
367 &mut self.meta
368 }
369
370 fn read_bytes(
371 &mut self,
372 bytes: &mut [u8],
373 reader: &mut BitReader<R, BigEndian>,
374 ) -> std::io::Result<()> {
375 reader.read_bytes(bytes)
376 }
377
378 #[allow(unused_variables)]
383 fn digest_event(&mut self, reader: &mut BitReader<R, BigEndian>) -> Result<Event, CodecError> {
384 if self.adu.is_none() {
385 self.adu = Some(EventAdu::new(
386 self.meta.plane,
387 0,
388 self.meta.ref_interval,
389 self.meta.adu_interval,
390 ));
391 }
392
393 if let Some(adu) = &mut self.adu {
394 if adu.decoder_is_empty() {
395 let start = std::time::Instant::now();
396 let mut buffer = [0u8; 4];
398 reader.read_bytes(&mut buffer)?;
399 let num_bytes = u32::from_be_bytes(buffer);
400
401 let adu_bytes = reader.read_to_vec(num_bytes as usize)?;
403
404 let mut adu_stream = BitReader::endian(Cursor::new(adu_bytes), BigEndian);
406
407 adu.decompress(&mut adu_stream);
409
410 let duration = start.elapsed();
411 println!("Decompressed Adu in {:?} ns", duration.as_nanos());
412 }
413 match adu.digest_event() {
415 Ok(event) => Ok(event),
416 Err(CodecError::NoMoreEvents) => {
417 self.digest_event(reader)
419 }
420 Err(e) => Err(e),
421 }
422 } else {
423 unreachable!("Invalid state");
424 }
425 }
426
427 #[allow(unused_variables)]
428 fn set_input_stream_position(
429 &mut self,
430 reader: &mut BitReader<R, BigEndian>,
431 pos: u64,
432 ) -> Result<(), CodecError> {
433 if pos.saturating_sub(self.meta.header_size as u64) % u64::from(self.meta.event_size) != 0 {
434 eprintln!("Attempted to seek to bad position in stream: {pos}");
435 return Err(CodecError::Seek);
436 }
437
438 if reader.seek_bits(SeekFrom::Start(pos * 8)).is_err() {
439 return Err(CodecError::Seek);
440 }
441 Ok(())
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use crate::codec::compressed::stream::CompressedInput;
448 use crate::codec::{CodecError, ReadCompression};
449 use crate::PlaneSize;
450 use bitstream_io::{BigEndian, BitReader};
451 use std::cmp::min;
452 use std::error::Error;
453 use std::io;
454
455 #[test]
458 fn test_compress_empty() -> Result<(), Box<dyn Error>> {
459 use crate::codec::compressed::stream::CompressedOutput;
460 use crate::codec::WriteCompression;
461 use crate::Coord;
462 use crate::{Event, SourceCamera, TimeMode};
463 use std::io::Cursor;
464
465 let start_t = 0;
466 let dt_ref = 255;
467 let num_intervals = 10;
468
469 let mut compressed_output = CompressedOutput::new(
470 crate::codec::CodecMetadata {
471 codec_version: 0,
472 header_size: 0,
473 time_mode: TimeMode::AbsoluteT,
474 plane: PlaneSize {
475 width: 16,
476 height: 32,
477 channels: 1,
478 },
479 tps: 7650,
480 ref_interval: dt_ref,
481 delta_t_max: dt_ref * num_intervals,
482 event_size: 0,
483 source_camera: SourceCamera::FramedU8,
484 adu_interval: num_intervals as usize,
485 },
486 Cursor::new(Vec::new()),
487 );
488
489 let mut counter = 0;
490 for y in 0..30 {
491 for x in 0..16 {
492 compressed_output
493 .ingest_event(Event {
494 coord: Coord { x, y, c: None },
495 t: min(280 + counter, start_t + dt_ref * num_intervals as u32),
496 d: 7,
497 })
498 .unwrap();
499 if 280 + counter > start_t + dt_ref * num_intervals as u32 {
500 break;
501 } else {
502 counter += 1;
503 }
504 }
505 }
506
507 let output = compressed_output.into_writer().unwrap().into_inner();
508 assert!(!output.is_empty());
509 Ok(())
510 }
511
512 #[test]
513 fn test_compress_decompress_barely_full() -> Result<(), Box<dyn Error>> {
514 use crate::codec::compressed::stream::CompressedOutput;
515 use crate::codec::WriteCompression;
516 use crate::Coord;
517 use crate::{Event, SourceCamera, TimeMode};
518 use std::io::Cursor;
519
520 let plane = PlaneSize::new(16, 30, 1)?;
521 let start_t = 0;
522 let dt_ref = 255;
523 let num_intervals = 10;
524
525 let candidate_px_idx = (7, 12);
527 let mut input_px_events = Vec::new();
528 let mut output_px_events = Vec::new();
529
530 let mut compressed_output = CompressedOutput::new(
531 crate::codec::CodecMetadata {
532 codec_version: 0,
533 header_size: 0,
534 time_mode: TimeMode::AbsoluteT,
535 plane: PlaneSize {
536 width: 16,
537 height: 32,
538 channels: 1,
539 },
540 tps: 7650,
541 ref_interval: dt_ref,
542 delta_t_max: dt_ref * num_intervals,
543 event_size: 0,
544 source_camera: SourceCamera::FramedU8,
545 adu_interval: num_intervals as usize,
546 },
547 Cursor::new(Vec::new()),
548 );
549
550 let mut counter = 0;
551 for y in 0..30 {
552 for x in 0..16 {
553 let event = Event {
554 coord: Coord { x, y, c: None },
555 t: min(280 + counter, start_t + dt_ref * num_intervals as u32),
556 d: 7,
557 };
558 if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
559 input_px_events.push(event);
560 }
561 compressed_output.ingest_event(event).unwrap();
562 if 280 + counter > start_t + dt_ref * num_intervals as u32 {
563 break;
564 } else {
565 counter += 1;
566 }
567 }
568 }
569
570 compressed_output
572 .ingest_event(Event {
573 coord: Coord {
574 x: 0,
575 y: 0,
576 c: None,
577 },
578 t: start_t + dt_ref * num_intervals as u32 + 1,
579 d: 7,
580 })
581 .unwrap();
582 counter += 1;
583
584 std::thread::sleep(std::time::Duration::from_secs(3));
586
587 let output = compressed_output.into_writer().unwrap().into_inner();
588 assert!(!output.is_empty());
589 dbg!(counter);
590 dbg!(output.len());
591 assert!((output.len() as u32) < counter * 9);
593
594 let mut compressed_input = CompressedInput::new(
595 dt_ref * num_intervals as u32,
596 dt_ref,
597 num_intervals as usize,
598 );
599 compressed_input.meta.plane = plane;
600 let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
601 for i in 0..counter - 1 {
602 let event = compressed_input.digest_event(&mut stream);
603 if event.is_err() {
604 dbg!(i);
605 }
606 let event = event.unwrap();
607 if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
608 output_px_events.push(event);
609 }
610 }
611
612 assert_eq!(input_px_events, output_px_events);
613 Ok(())
614 }
615
616 #[test]
617 fn test_compress_decompress_several() -> Result<(), Box<dyn Error>> {
618 use crate::codec::compressed::stream::CompressedOutput;
619 use crate::codec::WriteCompression;
620 use crate::Coord;
621 use crate::{Event, SourceCamera, TimeMode};
622 use std::io::Cursor;
623
624 let plane = PlaneSize::new(16, 30, 1)?;
625 let dt_ref = 255;
626 let num_intervals = 5;
627
628 let candidate_px_idx = (7, 12);
630 let mut input_px_events = Vec::new();
631 let mut output_px_events = Vec::new();
632
633 let mut compressed_output = CompressedOutput::new(
634 crate::codec::CodecMetadata {
635 codec_version: 0,
636 header_size: 0,
637 time_mode: TimeMode::AbsoluteT,
638 plane: PlaneSize {
639 width: 16,
640 height: 32,
641 channels: 1,
642 },
643 tps: 7650,
644 ref_interval: dt_ref,
645 delta_t_max: dt_ref * num_intervals as u32,
646 event_size: 0,
647 source_camera: SourceCamera::FramedU8,
648 adu_interval: num_intervals as usize,
649 },
650 Cursor::new(Vec::new()),
651 );
652
653 let mut counter = 0;
654 for _ in 0..10 {
655 for y in 0..30 {
656 for x in 0..16 {
657 let event = Event {
658 coord: Coord { x, y, c: None },
659 t: 280 + counter,
660 d: 7,
661 };
662 if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
663 input_px_events.push(event);
664 }
665 compressed_output.ingest_event(event)?;
666
667 counter += 1;
668 }
669 }
670 }
671
672 let output = compressed_output.into_writer().unwrap().into_inner();
673 assert!(!output.is_empty());
674 assert!((output.len() as u32) < counter * 9);
676
677 let mut compressed_input = CompressedInput::new(
678 dt_ref * num_intervals as u32,
679 dt_ref,
680 num_intervals as usize,
681 );
682 compressed_input.meta.plane = plane;
683 let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
684 for _ in 0..counter - 1 {
685 match compressed_input.digest_event(&mut stream) {
686 Ok(event) => {
687 if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
688 output_px_events.push(event);
689 }
690 }
691 Err(CodecError::IoError(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
692
693 Err(e) => return Err(Box::new(e)),
694 }
695 }
696
697 assert!(input_px_events.len() >= output_px_events.len());
698 for i in 0..output_px_events.len() {
699 let a = input_px_events[i].t - 5..input_px_events[i].t + 5;
701 let comp_t = output_px_events[i].t;
702 assert!(a.contains(&comp_t));
703 assert_eq!(input_px_events[i].d, output_px_events[i].d);
704 }
705 Ok(())
706 }
707
708 #[test]
709 fn test_compress_decompress_several_single() -> Result<(), Box<dyn Error>> {
710 use crate::codec::compressed::stream::CompressedOutput;
711 use crate::codec::WriteCompression;
712 use crate::Coord;
713 use crate::{Event, SourceCamera, TimeMode};
714 use std::io::Cursor;
715
716 let plane = PlaneSize::new(32, 16, 1)?;
717 let dt_ref = 255;
718 let num_intervals = 5;
719
720 let candidate_px_idx = (7, 12);
722 let mut input_px_events = Vec::new();
723 let mut output_px_events = Vec::new();
724
725 let mut compressed_output = CompressedOutput::new(
726 crate::codec::CodecMetadata {
727 codec_version: 0,
728 header_size: 0,
729 time_mode: TimeMode::AbsoluteT,
730 plane,
731 tps: 7650,
732 ref_interval: dt_ref,
733 delta_t_max: dt_ref * num_intervals as u32,
734 event_size: 0,
735 source_camera: SourceCamera::FramedU8,
736 adu_interval: num_intervals as usize,
737 },
738 Cursor::new(Vec::new()),
739 );
740
741 let mut counter = 0;
742 for i in 0..60 {
743 let event = Event {
744 coord: Coord {
745 x: 12,
746 y: 7,
747 c: None,
748 },
749 t: 280 + i * 100 + counter,
750 d: 7,
751 };
752
753 input_px_events.push(event);
754
755 compressed_output.ingest_event(event)?;
756
757 counter += 1;
758 }
759
760 let late_event = Event {
762 coord: Coord {
763 x: 19,
764 y: 14,
765 c: None,
766 },
767 t: 280,
768 d: 7,
769 };
770 compressed_output.ingest_event(late_event)?;
771
772 for i in 60..70 {
773 let event = Event {
774 coord: Coord {
775 x: 12,
776 y: 7,
777 c: None,
778 },
779 t: 280 + i * 100 + counter,
780 d: 7,
781 };
782
783 input_px_events.push(event);
784
785 compressed_output.ingest_event(event)?;
786
787 counter += 1;
788 }
789
790 std::thread::sleep(std::time::Duration::from_secs(3));
792
793 let output = compressed_output.into_writer().unwrap().into_inner();
794 assert!(!output.is_empty());
795 let mut compressed_input = CompressedInput::new(
798 dt_ref * num_intervals as u32,
799 dt_ref,
800 num_intervals as usize,
801 );
802 compressed_input.meta.plane = plane;
803 let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
804 for _ in 0..counter + 1 {
805 match compressed_input.digest_event(&mut stream) {
806 Ok(event) => {
807 if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
808 output_px_events.push(event);
809 }
810 }
811 Err(CodecError::IoError(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
812
813 Err(e) => return Err(Box::new(e)),
814 }
815 }
816
817 assert!(input_px_events.len() >= output_px_events.len());
818 for i in 0..output_px_events.len() {
819 let span = input_px_events[i].t - 5..input_px_events[i].t + 5;
820 let t = output_px_events[i].t;
821 assert!(span.contains(&t));
822 }
823 Ok(())
824 }
825
826 #[test]
827 fn test_compress_decompress_several_with_skip() -> Result<(), Box<dyn Error>> {
828 use crate::codec::compressed::stream::CompressedOutput;
829 use crate::codec::WriteCompression;
830 use crate::Coord;
831 use crate::{Event, SourceCamera, TimeMode};
832 use std::io::Cursor;
833
834 let plane = PlaneSize::new(30, 30, 1)?;
835 let dt_ref = 255;
836 let num_intervals = 10;
837
838 let candidate_px_idx = (7, 12);
840 let mut input_px_events = Vec::new();
841 let mut output_px_events = Vec::new();
842
843 let mut compressed_output = CompressedOutput::new(
844 crate::codec::CodecMetadata {
845 codec_version: 0,
846 header_size: 0,
847 time_mode: TimeMode::AbsoluteT,
848 plane,
849 tps: 7650,
850 ref_interval: dt_ref,
851 delta_t_max: dt_ref * num_intervals as u32,
852 event_size: 0,
853 source_camera: SourceCamera::FramedU8,
854 adu_interval: num_intervals as usize,
855 },
856 Cursor::new(Vec::new()),
857 );
858
859 let mut counter = 0;
860 for i in 0..10 {
861 for y in 0..30 {
862 for x in 0..30 {
863 if !(y == 14 && x == 14 || i % 3 == 0 && y >= 16 && x < 16) {
865 let event = Event {
866 coord: Coord { x, y, c: None },
867 t: 280 + counter,
868 d: 7,
869 };
870 if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
871 input_px_events.push(event);
872 }
873 compressed_output.ingest_event(event)?;
874
875 counter += 1;
876 }
877 }
878 }
879 }
880
881 let late_event = Event {
883 coord: Coord {
884 x: 14,
885 y: 14,
886 c: None,
887 },
888 t: 280,
889 d: 7,
890 };
891 compressed_output.ingest_event(late_event)?;
892
893 for i in 0..10 {
894 for y in 0..30 {
895 for x in 0..30 {
896 if !(y == 14 && x == 14 || i % 3 == 0 && y >= 16 && x < 16) {
898 let event = Event {
899 coord: Coord { x, y, c: None },
900 t: 280 + counter,
901 d: 7,
902 };
903 if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
904 input_px_events.push(event);
905 }
906 compressed_output.ingest_event(event)?;
907
908 counter += 1;
909 }
910 }
911 }
912 }
913
914 std::thread::sleep(std::time::Duration::from_secs(10));
916
917 let output = compressed_output.into_writer().unwrap().into_inner();
918 assert!(!output.is_empty());
919 assert!((output.len() as u32) < counter * 9);
921
922 let mut compressed_input = CompressedInput::new(
923 dt_ref * num_intervals as u32,
924 dt_ref,
925 num_intervals as usize,
926 );
927 compressed_input.meta.plane = plane;
928 let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
929 loop {
930 match compressed_input.digest_event(&mut stream) {
931 Ok(event) => {
932 if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
933 output_px_events.push(event);
934 }
935 }
936 Err(CodecError::IoError(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
937
938 Err(e) => return Err(Box::new(e)),
939 }
940 }
941
942 assert!(input_px_events.len() >= output_px_events.len());
943 for i in 0..output_px_events.len() {
944 let a = input_px_events[i].t - 5..input_px_events[i].t + 5;
946 let comp_t = output_px_events[i].t;
947 assert!(a.contains(&comp_t));
948 assert_eq!(input_px_events[i].d, output_px_events[i].d);
949 }
950 Ok(())
951 }
952}