1use crate::codec::{CodecError, CodecMetadata, EncoderOptions, ReadCompression, WriteCompression};
2use bitstream_io::{BigEndian, BitRead, BitReader, BitWrite, BitWriter};
3use priority_queue::PriorityQueue;
4use std::cmp::Reverse;
5use std::io::{Cursor, Read, Seek, SeekFrom, Write};
6use std::sync::{Arc, RwLock};
7
8use crate::codec::compressed::source_model::event_structure::event_adu::EventAdu;
9use crate::codec::compressed::source_model::HandleEvent;
10use crate::codec::header::{Magic, MAGIC_COMPRESSED};
11use crate::{DeltaT, Event};
12
13pub(crate) struct BytesMessage {
16 message_id: u32,
17 bytes: Vec<u8>,
18}
19
20pub struct CompressedOutput<W: Write> {
22 pub(crate) meta: CodecMetadata,
23 pub(crate) adu: EventAdu,
24 pub(crate) stream: Option<Arc<RwLock<BitWriter<W, BigEndian>>>>,
25 pub(crate) options: EncoderOptions,
26 pub(crate) written_bytes_tx: Option<std::sync::mpsc::Sender<BytesMessage>>,
28 pub(crate) last_message_sent: u32,
31
32 pub(crate) last_message_written: Arc<RwLock<u32>>,
34
35 pub(crate) _phantom: std::marker::PhantomData<W>,
36}
37
38pub struct CompressedInput<R: Read> {
40 pub(crate) meta: CodecMetadata,
41
42 adu: Option<EventAdu>,
43
44 _phantom: std::marker::PhantomData<R>,
45}
46
47fn flush_bytes_queue_worker<W: Write>(
72 stream: Arc<RwLock<BitWriter<W, BigEndian>>>,
73 written_bytes_rx: std::sync::mpsc::Receiver<BytesMessage>,
74 last_message_written: Arc<RwLock<u32>>,
75 mut bytes_writer_queue: PriorityQueue<Vec<u8>, Reverse<u32>>,
76) {
77 while let Ok(bytes_message) = written_bytes_rx.recv() {
78 bytes_writer_queue.push(bytes_message.bytes, Reverse(bytes_message.message_id));
82
83 let mut last_message_written = last_message_written.write().unwrap();
84 while let Some((bytes, message_id)) = bytes_writer_queue.pop() {
85 if message_id == Reverse(*last_message_written + 1) {
86 let mut stream_write = stream.write().unwrap();
87
88 stream_write
90 .write_bytes(&(bytes.len() as u32).to_be_bytes())
91 .unwrap();
92 stream_write.write_bytes(&bytes).unwrap();
93 *last_message_written += 1;
94 } else {
95 bytes_writer_queue.push(bytes, message_id); break;
97 }
98 }
99 }
100 eprintln!("Exiting writer thread...");
101}
102
103impl<W: Write + std::marker::Send + std::marker::Sync + 'static> CompressedOutput<W> {
104 pub fn new(meta: CodecMetadata, writer: W) -> Self {
106 let adu = EventAdu::new(meta.plane, 0, meta.ref_interval, meta.adu_interval);
107 let (written_bytes_tx, written_bytes_rx) = std::sync::mpsc::channel();
108
109 let stream_lock = RwLock::new(BitWriter::endian(writer, BigEndian));
110 let stream_lock_arc = Arc::new(stream_lock);
111 let stream_lock_arc_clone = stream_lock_arc.clone();
112
113 let last_message_written = Arc::new(RwLock::new(0));
114 let last_message_written_clone = last_message_written.clone();
115
116 std::thread::spawn(move || {
117 flush_bytes_queue_worker(
118 stream_lock_arc_clone,
119 written_bytes_rx,
120 last_message_written_clone,
121 PriorityQueue::new(),
122 );
123 eprintln!("Exiting writer thread...");
124 });
125 Self {
126 meta,
127 adu,
128 stream: Some(stream_lock_arc),
131 options: EncoderOptions::default(meta.plane),
132 written_bytes_tx: Some(written_bytes_tx),
134 last_message_sent: 0,
136 last_message_written,
137 _phantom: Default::default(),
138 }
139 }
140
141 pub(crate) fn with_options(&mut self, options: EncoderOptions) {
143 self.options = options;
144 }
145
146 #[inline(always)]
148 pub(crate) fn stream(&mut self) -> &mut Arc<RwLock<BitWriter<W, BigEndian>>> {
149 self.stream.as_mut().unwrap()
150 }
151}
152
153impl<W: Write + std::marker::Send + std::marker::Sync + 'static + 'static + 'static>
154 WriteCompression<W> for CompressedOutput<W>
155{
156 fn magic(&self) -> Magic {
157 MAGIC_COMPRESSED
158 }
159
160 fn meta(&self) -> &CodecMetadata {
161 &self.meta
162 }
163
164 fn meta_mut(&mut self) -> &mut CodecMetadata {
165 &mut self.meta
166 }
167
168 fn write_bytes(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
169 self.stream().write().unwrap().write_bytes(bytes)
170 }
171
172 fn byte_align(&mut self) -> std::io::Result<()> {
173 self.stream().write().unwrap().byte_align()
174 }
175
176 fn into_writer(mut self) -> Option<W> {
177 if !self.adu.skip_adu {
178 dbg!("compressing partial last adu");
202 let mut temp_stream = BitWriter::endian(Vec::new(), BigEndian);
203
204 let parameters = *self.options.crf.get_parameters();
205 let mut adu = self.adu.clone();
206 let tx = self.written_bytes_tx.as_ref().unwrap().clone();
207 let message_id_to_send = self.last_message_sent + 1;
210 self.last_message_sent += 1;
211
212 std::thread::spawn(move || {
213 adu.compress(&mut temp_stream, parameters.c_thresh_max).ok();
214 let written_data = temp_stream.into_writer();
215
216 tx.send(BytesMessage {
217 message_id: message_id_to_send,
218 bytes: written_data,
219 })
220 .unwrap();
221 });
222 }
224
225 while self.last_message_sent != *self.last_message_written.read().unwrap() {
227 eprintln!(
229 "Sleeping. {} messages sent, {} messages written",
230 self.last_message_sent,
231 *self.last_message_written.read().unwrap()
232 );
233 std::thread::sleep(std::time::Duration::from_secs(1));
234 }
235
236 dbg!("All ADUs written.");
237 self.written_bytes_tx = None; std::thread::sleep(std::time::Duration::from_secs(1)); let arc = self.stream.take()?;
243
244 let lock = Arc::into_inner(arc).unwrap();
245 let consumed_data = lock.into_inner().unwrap();
247 Some(consumed_data.into_writer())
250 }
255
256 fn flush_writer(&mut self) -> std::io::Result<()> {
261 self.stream().write().unwrap().flush()
262 }
263
264 fn ingest_event(&mut self, event: Event) -> Result<(), CodecError> {
265 if event.t > self.adu.start_t + (self.adu.dt_ref * self.adu.num_intervals as DeltaT) {
267 if self.stream.is_some() {
272 let mut temp_stream = BitWriter::endian(Vec::new(), BigEndian);
288
289 let parameters = *self.options.crf.get_parameters();
290
291 let mut adu = self.adu.clone();
294 let tx = self.written_bytes_tx.as_ref().unwrap().clone();
295 let message_id_to_send = self.last_message_sent + 1;
298 self.last_message_sent += 1;
299
300 std::thread::spawn(move || {
301 adu.compress(&mut temp_stream, parameters.c_thresh_max).ok();
302 let written_data = temp_stream.into_writer();
303
304 tx.send(BytesMessage {
305 message_id: message_id_to_send,
306 bytes: written_data,
307 })
308 .unwrap();
309 });
310
311 self.adu.clear_compression();
312 }
313 }
314
315 let _ = self.adu.ingest_event(event);
317
318 Ok(())
319 }
320 }
329
330impl<R: Read> CompressedInput<R> {
331 pub fn new(delta_t_max: DeltaT, ref_interval: DeltaT, adu_interval: usize) -> Self
333 where
334 Self: Sized,
335 {
336 Self {
337 meta: CodecMetadata {
338 codec_version: 0,
339 header_size: 0,
340 time_mode: Default::default(),
341 plane: Default::default(),
342 tps: 0,
343 ref_interval,
344 delta_t_max,
345 event_size: 0,
346 source_camera: Default::default(),
347 adu_interval,
348 },
349 adu: None,
350 _phantom: std::marker::PhantomData,
351 }
352 }
353}
354
355impl<R: Read + Seek> ReadCompression<R> for CompressedInput<R> {
356 fn magic(&self) -> Magic {
357 MAGIC_COMPRESSED
358 }
359
360 fn meta(&self) -> &CodecMetadata {
361 &self.meta
362 }
363
364 fn meta_mut(&mut self) -> &mut CodecMetadata {
365 &mut self.meta
366 }
367
368 fn read_bytes(
369 &mut self,
370 bytes: &mut [u8],
371 reader: &mut BitReader<R, BigEndian>,
372 ) -> std::io::Result<()> {
373 reader.read_bytes(bytes)
374 }
375
376 #[allow(unused_variables)]
381 fn digest_event(&mut self, reader: &mut BitReader<R, BigEndian>) -> Result<Event, CodecError> {
382 if self.adu.is_none() {
383 self.adu = Some(EventAdu::new(
384 self.meta.plane,
385 0,
386 self.meta.ref_interval,
387 self.meta.adu_interval,
388 ));
389 }
390
391 if let Some(adu) = &mut self.adu {
392 if adu.decoder_is_empty() {
393 let start = std::time::Instant::now();
394 let mut buffer = [0u8; 4];
396 reader.read_bytes(&mut buffer)?;
397 let num_bytes = u32::from_be_bytes(buffer);
398
399 let adu_bytes = reader.read_to_vec(num_bytes as usize)?;
401
402 let mut adu_stream = BitReader::endian(Cursor::new(adu_bytes), BigEndian);
404
405 adu.decompress(&mut adu_stream);
407
408 let duration = start.elapsed();
409 println!("Decompressed Adu in {:?} ns", duration.as_nanos());
410 }
411 match adu.digest_event() {
413 Ok(event) => Ok(event),
414 Err(CodecError::NoMoreEvents) => {
415 self.digest_event(reader)
417 }
418 Err(e) => Err(e),
419 }
420 } else {
421 unreachable!("Invalid state");
422 }
423 }
424
425 #[allow(unused_variables)]
426 fn set_input_stream_position(
427 &mut self,
428 reader: &mut BitReader<R, BigEndian>,
429 pos: u64,
430 ) -> Result<(), CodecError> {
431 if pos.saturating_sub(self.meta.header_size as u64) % u64::from(self.meta.event_size) != 0 {
432 eprintln!("Attempted to seek to bad position in stream: {pos}");
433 return Err(CodecError::Seek);
434 }
435
436 if reader.seek_bits(SeekFrom::Start(pos * 8)).is_err() {
437 return Err(CodecError::Seek);
438 }
439 Ok(())
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use crate::codec::compressed::stream::CompressedInput;
446 use crate::codec::{CodecError, ReadCompression};
447 use crate::PlaneSize;
448 use bitstream_io::{BigEndian, BitReader};
449 use std::cmp::min;
450 use std::error::Error;
451 use std::io;
452
453 #[test]
456 fn test_compress_empty() -> Result<(), Box<dyn Error>> {
457 use crate::codec::compressed::stream::CompressedOutput;
458 use crate::codec::WriteCompression;
459 use crate::Coord;
460 use crate::{Event, SourceCamera, TimeMode};
461 use std::io::Cursor;
462
463 let start_t = 0;
464 let dt_ref = 255;
465 let num_intervals = 10;
466
467 let mut compressed_output = CompressedOutput::new(
468 crate::codec::CodecMetadata {
469 codec_version: 0,
470 header_size: 0,
471 time_mode: TimeMode::AbsoluteT,
472 plane: PlaneSize {
473 width: 16,
474 height: 32,
475 channels: 1,
476 },
477 tps: 7650,
478 ref_interval: dt_ref,
479 delta_t_max: dt_ref * num_intervals,
480 event_size: 0,
481 source_camera: SourceCamera::FramedU8,
482 adu_interval: num_intervals as usize,
483 },
484 Cursor::new(Vec::new()),
485 );
486
487 let mut counter = 0;
488 for y in 0..30 {
489 for x in 0..16 {
490 compressed_output
491 .ingest_event(Event {
492 coord: Coord { x, y, c: None },
493 t: min(280 + counter, start_t + dt_ref * num_intervals),
494 d: 7,
495 })
496 .unwrap();
497 if 280 + counter > start_t + dt_ref * num_intervals {
498 break;
499 } else {
500 counter += 1;
501 }
502 }
503 }
504
505 let output = compressed_output.into_writer().unwrap().into_inner();
506 assert!(!output.is_empty());
507 Ok(())
508 }
509
510 #[test]
511 fn test_compress_decompress_barely_full() -> Result<(), Box<dyn Error>> {
512 use crate::codec::compressed::stream::CompressedOutput;
513 use crate::codec::WriteCompression;
514 use crate::Coord;
515 use crate::{Event, SourceCamera, TimeMode};
516 use std::io::Cursor;
517
518 let plane = PlaneSize::new(16, 30, 1)?;
519 let start_t = 0;
520 let dt_ref = 255;
521 let num_intervals = 10;
522
523 let candidate_px_idx = (7, 12);
525 let mut input_px_events = Vec::new();
526 let mut output_px_events = Vec::new();
527
528 let mut compressed_output = CompressedOutput::new(
529 crate::codec::CodecMetadata {
530 codec_version: 0,
531 header_size: 0,
532 time_mode: TimeMode::AbsoluteT,
533 plane: PlaneSize {
534 width: 16,
535 height: 32,
536 channels: 1,
537 },
538 tps: 7650,
539 ref_interval: dt_ref,
540 delta_t_max: dt_ref * num_intervals,
541 event_size: 0,
542 source_camera: SourceCamera::FramedU8,
543 adu_interval: num_intervals as usize,
544 },
545 Cursor::new(Vec::new()),
546 );
547
548 let mut counter = 0;
549 for y in 0..30 {
550 for x in 0..16 {
551 let event = Event {
552 coord: Coord { x, y, c: None },
553 t: min(280 + counter, start_t + dt_ref * num_intervals),
554 d: 7,
555 };
556 if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
557 input_px_events.push(event);
558 }
559 compressed_output.ingest_event(event).unwrap();
560 if 280 + counter > start_t + dt_ref * num_intervals {
561 break;
562 } else {
563 counter += 1;
564 }
565 }
566 }
567
568 compressed_output
570 .ingest_event(Event {
571 coord: Coord {
572 x: 0,
573 y: 0,
574 c: None,
575 },
576 t: start_t + dt_ref * num_intervals + 1,
577 d: 7,
578 })
579 .unwrap();
580 counter += 1;
581
582 std::thread::sleep(std::time::Duration::from_secs(3));
584
585 let output = compressed_output.into_writer().unwrap().into_inner();
586 assert!(!output.is_empty());
587 dbg!(counter);
588 dbg!(output.len());
589 assert!((output.len() as u32) < counter * 9);
591
592 let mut compressed_input =
593 CompressedInput::new(dt_ref * num_intervals, dt_ref, num_intervals as usize);
594 compressed_input.meta.plane = plane;
595 let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
596 for i in 0..counter - 1 {
597 let event = compressed_input.digest_event(&mut stream);
598 if event.is_err() {
599 dbg!(i);
600 }
601 let event = event.unwrap();
602 if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
603 output_px_events.push(event);
604 }
605 }
606
607 assert_eq!(input_px_events, output_px_events);
608 Ok(())
609 }
610
611 #[test]
612 fn test_compress_decompress_several() -> Result<(), Box<dyn Error>> {
613 use crate::codec::compressed::stream::CompressedOutput;
614 use crate::codec::WriteCompression;
615 use crate::Coord;
616 use crate::{Event, SourceCamera, TimeMode};
617 use std::io::Cursor;
618
619 let plane = PlaneSize::new(16, 30, 1)?;
620 let dt_ref = 255;
621 let num_intervals = 5;
622
623 let candidate_px_idx = (7, 12);
625 let mut input_px_events = Vec::new();
626 let mut output_px_events = Vec::new();
627
628 let mut compressed_output = CompressedOutput::new(
629 crate::codec::CodecMetadata {
630 codec_version: 0,
631 header_size: 0,
632 time_mode: TimeMode::AbsoluteT,
633 plane: PlaneSize {
634 width: 16,
635 height: 32,
636 channels: 1,
637 },
638 tps: 7650,
639 ref_interval: dt_ref,
640 delta_t_max: dt_ref * num_intervals as u32,
641 event_size: 0,
642 source_camera: SourceCamera::FramedU8,
643 adu_interval: num_intervals as usize,
644 },
645 Cursor::new(Vec::new()),
646 );
647
648 let mut counter = 0;
649 for _ in 0..10 {
650 for y in 0..30 {
651 for x in 0..16 {
652 let event = Event {
653 coord: Coord { x, y, c: None },
654 t: 280 + counter,
655 d: 7,
656 };
657 if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
658 input_px_events.push(event);
659 }
660 compressed_output.ingest_event(event)?;
661
662 counter += 1;
663 }
664 }
665 }
666
667 let output = compressed_output.into_writer().unwrap().into_inner();
668 assert!(!output.is_empty());
669 assert!((output.len() as u32) < counter * 9);
671
672 let mut compressed_input = CompressedInput::new(
673 dt_ref * num_intervals as u32,
674 dt_ref,
675 num_intervals as usize,
676 );
677 compressed_input.meta.plane = plane;
678 let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
679 for _ in 0..counter - 1 {
680 match compressed_input.digest_event(&mut stream) {
681 Ok(event) => {
682 if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
683 output_px_events.push(event);
684 }
685 }
686 Err(CodecError::IoError(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
687
688 Err(e) => return Err(Box::new(e)),
689 }
690 }
691
692 assert!(input_px_events.len() >= output_px_events.len());
693 for i in 0..output_px_events.len() {
694 let a = input_px_events[i].t - 5..input_px_events[i].t + 5;
696 let comp_t = output_px_events[i].t;
697 assert!(a.contains(&comp_t));
698 assert_eq!(input_px_events[i].d, output_px_events[i].d);
699 }
700 Ok(())
701 }
702
703 #[test]
704 fn test_compress_decompress_several_single() -> Result<(), Box<dyn Error>> {
705 use crate::codec::compressed::stream::CompressedOutput;
706 use crate::codec::WriteCompression;
707 use crate::Coord;
708 use crate::{Event, SourceCamera, TimeMode};
709 use std::io::Cursor;
710
711 let plane = PlaneSize::new(32, 16, 1)?;
712 let dt_ref = 255;
713 let num_intervals = 5;
714
715 let candidate_px_idx = (7, 12);
717 let mut input_px_events = Vec::new();
718 let mut output_px_events = Vec::new();
719
720 let mut compressed_output = CompressedOutput::new(
721 crate::codec::CodecMetadata {
722 codec_version: 0,
723 header_size: 0,
724 time_mode: TimeMode::AbsoluteT,
725 plane,
726 tps: 7650,
727 ref_interval: dt_ref,
728 delta_t_max: dt_ref * num_intervals as u32,
729 event_size: 0,
730 source_camera: SourceCamera::FramedU8,
731 adu_interval: num_intervals as usize,
732 },
733 Cursor::new(Vec::new()),
734 );
735
736 let mut counter = 0;
737 for i in 0..60 {
738 let event = Event {
739 coord: Coord {
740 x: 12,
741 y: 7,
742 c: None,
743 },
744 t: 280 + i * 100 + counter,
745 d: 7,
746 };
747
748 input_px_events.push(event);
749
750 compressed_output.ingest_event(event)?;
751
752 counter += 1;
753 }
754
755 let late_event = Event {
757 coord: Coord {
758 x: 19,
759 y: 14,
760 c: None,
761 },
762 t: 280,
763 d: 7,
764 };
765 compressed_output.ingest_event(late_event)?;
766
767 for i in 60..70 {
768 let event = Event {
769 coord: Coord {
770 x: 12,
771 y: 7,
772 c: None,
773 },
774 t: 280 + i * 100 + counter,
775 d: 7,
776 };
777
778 input_px_events.push(event);
779
780 compressed_output.ingest_event(event)?;
781
782 counter += 1;
783 }
784
785 std::thread::sleep(std::time::Duration::from_secs(3));
787
788 let output = compressed_output.into_writer().unwrap().into_inner();
789 assert!(!output.is_empty());
790 let mut compressed_input = CompressedInput::new(
793 dt_ref * num_intervals as u32,
794 dt_ref,
795 num_intervals as usize,
796 );
797 compressed_input.meta.plane = plane;
798 let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
799 for _ in 0..=counter {
800 match compressed_input.digest_event(&mut stream) {
801 Ok(event) => {
802 if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
803 output_px_events.push(event);
804 }
805 }
806 Err(CodecError::IoError(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
807
808 Err(e) => return Err(Box::new(e)),
809 }
810 }
811
812 assert!(input_px_events.len() >= output_px_events.len());
813 for i in 0..output_px_events.len() {
814 let span = input_px_events[i].t - 5..input_px_events[i].t + 5;
815 let t = output_px_events[i].t;
816 assert!(span.contains(&t));
817 }
818 Ok(())
819 }
820
821 #[test]
822 fn test_compress_decompress_several_with_skip() -> Result<(), Box<dyn Error>> {
823 use crate::codec::compressed::stream::CompressedOutput;
824 use crate::codec::WriteCompression;
825 use crate::Coord;
826 use crate::{Event, SourceCamera, TimeMode};
827 use std::io::Cursor;
828
829 let plane = PlaneSize::new(30, 30, 1)?;
830 let dt_ref = 255;
831 let num_intervals = 10;
832
833 let candidate_px_idx = (7, 12);
835 let mut input_px_events = Vec::new();
836 let mut output_px_events = Vec::new();
837
838 let mut compressed_output = CompressedOutput::new(
839 crate::codec::CodecMetadata {
840 codec_version: 0,
841 header_size: 0,
842 time_mode: TimeMode::AbsoluteT,
843 plane,
844 tps: 7650,
845 ref_interval: dt_ref,
846 delta_t_max: dt_ref * num_intervals as u32,
847 event_size: 0,
848 source_camera: SourceCamera::FramedU8,
849 adu_interval: num_intervals as usize,
850 },
851 Cursor::new(Vec::new()),
852 );
853
854 let mut counter = 0;
855 for i in 0..10 {
856 for y in 0..30 {
857 for x in 0..30 {
858 if !(y == 14 && x == 14 || i % 3 == 0 && y >= 16 && x < 16) {
860 let event = Event {
861 coord: Coord { x, y, c: None },
862 t: 280 + counter,
863 d: 7,
864 };
865 if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
866 input_px_events.push(event);
867 }
868 compressed_output.ingest_event(event)?;
869
870 counter += 1;
871 }
872 }
873 }
874 }
875
876 let late_event = Event {
878 coord: Coord {
879 x: 14,
880 y: 14,
881 c: None,
882 },
883 t: 280,
884 d: 7,
885 };
886 compressed_output.ingest_event(late_event)?;
887
888 for i in 0..10 {
889 for y in 0..30 {
890 for x in 0..30 {
891 if !(y == 14 && x == 14 || i % 3 == 0 && y >= 16 && x < 16) {
893 let event = Event {
894 coord: Coord { x, y, c: None },
895 t: 280 + counter,
896 d: 7,
897 };
898 if y == candidate_px_idx.0 && x == candidate_px_idx.1 {
899 input_px_events.push(event);
900 }
901 compressed_output.ingest_event(event)?;
902
903 counter += 1;
904 }
905 }
906 }
907 }
908
909 std::thread::sleep(std::time::Duration::from_secs(10));
911
912 let output = compressed_output.into_writer().unwrap().into_inner();
913 assert!(!output.is_empty());
914 assert!((output.len() as u32) < counter * 9);
916
917 let mut compressed_input = CompressedInput::new(
918 dt_ref * num_intervals as u32,
919 dt_ref,
920 num_intervals as usize,
921 );
922 compressed_input.meta.plane = plane;
923 let mut stream = BitReader::endian(Cursor::new(output), BigEndian);
924 loop {
925 match compressed_input.digest_event(&mut stream) {
926 Ok(event) => {
927 if event.coord.y == candidate_px_idx.0 && event.coord.x == candidate_px_idx.1 {
928 output_px_events.push(event);
929 }
930 }
931 Err(CodecError::IoError(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break,
932
933 Err(e) => return Err(Box::new(e)),
934 }
935 }
936
937 assert!(input_px_events.len() >= output_px_events.len());
938 for i in 0..output_px_events.len() {
939 let a = input_px_events[i].t - 5..input_px_events[i].t + 5;
941 let comp_t = output_px_events[i].t;
942 assert!(a.contains(&comp_t));
943 assert_eq!(input_px_events[i].d, output_px_events[i].d);
944 }
945 Ok(())
946 }
947}