adder_codec_core/codec/raw/
stream.rs

1// #[cfg(feature = "compression")]
2// use crate::codec::compressed::adu::frame::Adu;
3use crate::codec::header::{Magic, MAGIC_RAW};
4use crate::codec::{CodecError, CodecMetadata, ReadCompression, WriteCompression};
5use crate::{Coord, Event, EventSingle, EOF_PX_ADDRESS};
6use bincode::config::{FixintEncoding, WithOtherEndian, WithOtherIntEncoding};
7use bincode::{DefaultOptions, Options};
8use bitstream_io::{BigEndian, BitRead, BitReader};
9use std::io::{Read, Seek, SeekFrom, Write};
10
11/// Write uncompressed (raw) ADΔER data to a stream.
12pub struct RawOutput<W> {
13    pub(crate) meta: CodecMetadata,
14    pub(crate) bincode: WithOtherEndian<
15        WithOtherIntEncoding<DefaultOptions, FixintEncoding>,
16        bincode::config::BigEndian,
17    >,
18    pub(crate) stream: Option<W>,
19}
20
21/// Read uncompressed (raw) ADΔER data from a stream.
22pub struct RawInput<R: Read + Seek> {
23    pub(crate) meta: CodecMetadata,
24    pub(crate) bincode: WithOtherEndian<
25        WithOtherIntEncoding<DefaultOptions, FixintEncoding>,
26        bincode::config::BigEndian,
27    >,
28    _phantom: std::marker::PhantomData<R>,
29}
30
31impl<W: Write> RawOutput<W> {
32    /// Create a new raw output stream.
33    pub fn new(mut meta: CodecMetadata, writer: W) -> Self {
34        let bincode = DefaultOptions::new()
35            .with_fixint_encoding()
36            .with_big_endian();
37        meta.event_size = match meta.plane.c() {
38            1 => bincode.serialized_size(&EventSingle::default()).unwrap() as u8,
39            _ => bincode.serialized_size(&Event::default()).unwrap() as u8,
40        };
41        Self {
42            meta,
43            bincode,
44            stream: Some(writer),
45        }
46    }
47
48    fn stream(&mut self) -> &mut W {
49        self.stream.as_mut().unwrap()
50    }
51}
52
53impl<W: Write + std::marker::Send + std::marker::Sync + 'static> WriteCompression<W>
54    for RawOutput<W>
55{
56    fn magic(&self) -> Magic {
57        MAGIC_RAW
58    }
59
60    fn meta(&self) -> &CodecMetadata {
61        &self.meta
62    }
63
64    fn meta_mut(&mut self) -> &mut CodecMetadata {
65        &mut self.meta
66    }
67
68    fn write_bytes(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
69        // Silently ignore the returned usize because we don't care about the number of bytes
70        self.stream().write(bytes).map(|_| ())
71    }
72
73    // Will always be byte-aligned. Do nothing.
74    fn byte_align(&mut self) -> std::io::Result<()> {
75        Ok(())
76    }
77
78    // If `self.writer` is a `BufWriter`, you'll need to flush it yourself after this.
79    fn into_writer(mut self) -> Option<W> {
80        let eof = Event {
81            coord: Coord {
82                x: EOF_PX_ADDRESS,
83                y: EOF_PX_ADDRESS,
84                c: Some(0),
85            },
86            d: 0,
87            t: 0,
88        };
89        self.bincode.serialize_into(self.stream(), &eof).unwrap();
90        self.flush_writer().unwrap();
91        self.stream.take()
92    }
93
94    fn flush_writer(&mut self) -> std::io::Result<()> {
95        self.stream().flush()
96    }
97
98    /// Ingest an event into the codec.
99    ///
100    /// This will always write the event immediately to the underlying writer.
101    fn ingest_event(&mut self, event: Event) -> Result<(), CodecError> {
102        // NOTE: for speed, the following checks only run in debug builds. It's entirely
103        // possibly to encode nonsensical events if you want to.
104        debug_assert!(event.coord.x < self.meta.plane.width || event.coord.x == EOF_PX_ADDRESS);
105        debug_assert!(event.coord.y < self.meta.plane.height || event.coord.y == EOF_PX_ADDRESS);
106
107        // TODO: Switch functionality based on what the deltat mode is!
108
109        let output_event: EventSingle;
110        if self.meta.plane.channels == 1 {
111            // let event_to_write = self.queue.pop()
112            output_event = (&event).into();
113            self.bincode.serialize_into(self.stream(), &output_event)?;
114            // bincode::serialize_into(&mut *stream, &output_event, my_options).unwrap();
115        } else {
116            self.bincode.serialize_into(self.stream(), &event)?;
117        }
118
119        Ok(())
120    }
121
122    // #[cfg(feature = "compression")]
123    // fn ingest_event_debug(&mut self, event: Event) -> Result<Option<Adu>, CodecError> {
124    //     todo!()
125    // }
126}
127
128impl<R: Read + Seek> Default for RawInput<R> {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134impl<R: Read + Seek> RawInput<R> {
135    /// Create a new raw input stream.
136    pub fn new() -> Self
137    where
138        Self: Sized,
139    {
140        Self {
141            meta: CodecMetadata::default(),
142            bincode: DefaultOptions::new()
143                .with_fixint_encoding()
144                .with_big_endian(),
145            // stream: reader,
146            _phantom: std::marker::PhantomData,
147        }
148    }
149}
150
151impl<R: Read + Seek> ReadCompression<R> for RawInput<R> {
152    fn magic(&self) -> Magic {
153        MAGIC_RAW
154    }
155
156    fn meta(&self) -> &CodecMetadata {
157        &self.meta
158    }
159
160    fn meta_mut(&mut self) -> &mut CodecMetadata {
161        &mut self.meta
162    }
163
164    fn read_bytes(
165        &mut self,
166        bytes: &mut [u8],
167        reader: &mut BitReader<R, BigEndian>,
168    ) -> std::io::Result<()> {
169        reader.read_bytes(bytes)
170    }
171
172    // fn into_reader(self: Box<Self>, reader: &mut BitReader<R, BigEndian>) -> R {
173    //     reader.into_reader()
174    // }
175
176    #[inline]
177    fn digest_event(&mut self, reader: &mut BitReader<R, BigEndian>) -> Result<Event, CodecError> {
178        // TODO: Why is the encoded event size wrong?
179        let mut buffer: Vec<u8> = vec![0; self.meta.event_size as usize];
180        reader.read_bytes(&mut buffer)?;
181        let event: Event = if self.meta.plane.channels == 1 {
182            match self.bincode.deserialize_from::<_, EventSingle>(&*buffer) {
183                Ok(ev) => ev.into(),
184                Err(_e) => return Err(CodecError::Deserialize),
185            }
186        } else {
187            match self.bincode.deserialize_from::<_, Event>(&*buffer) {
188                Ok(ev) => ev,
189                Err(e) => {
190                    dbg!(self.meta.event_size);
191                    eprintln!("Error deserializing event: {e}");
192                    return Err(CodecError::Deserialize);
193                }
194            }
195        };
196
197        if event.coord.is_eof() {
198            return Err(CodecError::Eof);
199        }
200        Ok(event)
201    }
202
203    // #[cfg(feature = "compression")]
204    // fn digest_event_debug(
205    //     &mut self,
206    //     reader: &mut BitReader<R, BigEndian>,
207    // ) -> Result<(Option<Adu>, Event), CodecError> {
208    //     todo!()
209    // }
210
211    fn set_input_stream_position(
212        &mut self,
213        reader: &mut BitReader<R, BigEndian>,
214        pos: u64,
215    ) -> Result<(), CodecError> {
216        if (pos - self.meta.header_size as u64) % u64::from(self.meta.event_size) != 0 {
217            eprintln!("Attempted to seek to bad position in stream: {pos}");
218            return Err(CodecError::Seek);
219        }
220
221        if reader.seek_bits(SeekFrom::Start(pos * 8)).is_err() {
222            return Err(CodecError::Seek);
223        }
224
225        Ok(())
226    }
227}