aedat/
base.rs

1use std::fs::File;
2use std::io::{Read};
3use std::net::{TcpStream, ToSocketAddrs};
4#[cfg(target_family = "unix")]
5use std::os::unix::net::UnixStream;
6use num_derive::FromPrimitive;
7use thiserror::Error;
8
9#[allow(dead_code, unused_imports)]
10#[path = "./ioheader_generated.rs"]
11pub mod ioheader_generated;
12
13#[allow(dead_code, unused_imports)]
14#[path = "./events_generated.rs"]
15mod events_generated;
16
17#[allow(dead_code, unused_imports)]
18#[path = "./frame_generated.rs"]
19mod frame_generated;
20
21#[allow(dead_code, unused_imports)]
22#[path = "./imus_generated.rs"]
23mod imus_generated;
24
25#[allow(dead_code, unused_imports)]
26#[path = "./triggers_generated.rs"]
27mod triggers_generated;
28
29const MAGIC_NUMBER: &str = "#!AER-DAT4.0\r\n";
30
31
32#[allow(missing_docs)]
33#[derive(Error, Debug)]
34pub enum ParseError {
35    #[error("Parse error: `{0}`")]
36    General(String),
37
38    #[error("Unsupported stream type: `{0}`")]
39    UnsupportedStreamType(String),
40
41    #[error("FlatBuffer error")]
42    FlatBuffer(#[from] flatbuffers::InvalidFlatbuffer),
43
44    #[error("Utf8 error")]
45    Utf8(#[from] std::str::Utf8Error),
46
47    #[error("RoxmlTree error")]
48    RoxmlTree(#[from] roxmltree::Error),
49
50    #[error("ParseIntError error")]
51    ParseInt(#[from] std::num::ParseIntError),
52
53    #[error("IO error")]
54    Io(#[from] std::io::Error),
55}
56
57trait Source:std::io::Read {}
58impl Source for File {}
59#[cfg(target_family = "unix")]
60impl Source for UnixStream {}
61impl Source for TcpStream {}
62
63#[derive(FromPrimitive, Copy, Clone)]
64pub enum StreamContent {
65    Events,
66    Frame,
67    Imus,
68    Triggers,
69}
70
71impl StreamContent {
72    fn from(identifier: &str) -> Result<Self, ParseError> {
73        match identifier {
74            "EVTS" => Ok(StreamContent::Events),
75            "FRME" => Ok(StreamContent::Frame),
76            "IMUS" => Ok(StreamContent::Imus),
77            "TRIG" => Ok(StreamContent::Triggers),
78            _ => Err(ParseError::UnsupportedStreamType("unsupported stream type".to_string())),
79        }
80    }
81}
82
83impl std::fmt::Display for StreamContent {
84    fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
85        write!(
86            formatter,
87            "{}",
88            match self {
89                StreamContent::Events => "EVTS",
90                StreamContent::Frame => "FRME",
91                StreamContent::Imus => "IMUS",
92                StreamContent::Triggers => "TRIG",
93            }
94        )
95    }
96}
97
98pub struct Stream {
99    pub content: StreamContent,
100    pub width: u16,
101    pub height: u16,
102}
103
104pub struct Decoder {
105    pub id_to_stream: std::collections::HashMap<u32, Stream>,
106    file: Box<dyn Source>,
107    position: i64,
108    compression: ioheader_generated::Compression,
109    file_data_position: i64,
110}
111
112unsafe impl Send for Decoder {}
113
114impl Decoder {
115    pub fn new_from_file<P: std::convert::AsRef<std::path::Path>>(path: P) -> Result<Self, ParseError> {
116        let mut decoder = Decoder {
117            id_to_stream: std::collections::HashMap::new(),
118            file: Box::new(File::open(path)?),
119            position: 0i64,
120            file_data_position: 0,
121            compression: ioheader_generated::Compression::None,
122        };
123        {
124            let mut magic_number_buffer = [0; MAGIC_NUMBER.len()];
125            decoder.file.read_exact(&mut magic_number_buffer)?;
126            if std::str::from_utf8(&magic_number_buffer)? != MAGIC_NUMBER {
127                return Err(ParseError::General(
128                    "the file does not contain AEDAT4 data (wrong magic number)".to_string(),
129                ));
130            }
131            decoder.position += MAGIC_NUMBER.len() as i64;
132        }
133        decoder = read_io_header(decoder)?;
134
135        Ok(decoder)
136    }
137
138
139    #[cfg(target_family = "unix")]
140    pub fn new_from_unix_stream<P: std::convert::AsRef<std::path::Path> + Clone>(
141        path: P) -> Result<Self, ParseError> {
142        let mut decoder = Decoder {
143            id_to_stream: std::collections::HashMap::new(),
144            file: Box::new(UnixStream::connect(path)?),
145            position: 0i64,
146            file_data_position: -1,
147            compression: ioheader_generated::Compression::None,
148        };
149        decoder = read_io_header(decoder)?;
150        Ok(decoder)
151    }
152
153    pub fn new_from_tcp_stream<P: ToSocketAddrs + Clone>(
154        path: P,
155    ) -> Result<Self, ParseError> {
156        let mut decoder = Decoder {
157            id_to_stream: std::collections::HashMap::new(),
158            file: Box::new(TcpStream::connect(path)?),
159            position: 0i64,
160            file_data_position: -1,
161            compression: ioheader_generated::Compression::None,
162        };
163        decoder = read_io_header(decoder)?;
164        Ok(decoder)
165    }
166}
167
168fn read_io_header(mut decoder: Decoder) -> Result<Decoder, ParseError> {
169    let length = {
170        let mut bytes = [0; 4];
171        decoder.file.read_exact(&mut bytes)?;
172        u32::from_le_bytes(bytes)
173    };
174    decoder.position += 4i64 + length as i64;
175    {
176        let mut buffer = std::vec![0; length as usize];
177        decoder.file.read_exact(&mut buffer)?;
178        let ioheader = unsafe { ioheader_generated::root_as_ioheader_unchecked(&buffer) };
179        decoder.compression = ioheader.compression();
180        decoder.file_data_position = ioheader.file_data_position();
181        let description = match ioheader.description() {
182            Some(content) => content,
183            None => return Err(ParseError::General("the description is empty".to_string())),
184        };
185        let document = roxmltree::Document::parse(description)?;
186        let dv_node = match document.root().first_child() {
187            Some(content) => content,
188            None => return Err(ParseError::General("the description has no dv node".to_string())),
189        };
190        if !dv_node.has_tag_name("dv") {
191            return Err(ParseError::General("unexpected dv node tag".to_string()));
192        }
193        let output_node = match dv_node.children().find(|node| {
194            node.is_element()
195                && node.has_tag_name("node")
196                && node.attribute("name") == Some("outInfo")
197        }) {
198            Some(content) => content,
199            None => return Err(ParseError::General("the description has no output node".to_string())),
200        };
201        for stream_node in output_node.children() {
202            if stream_node.is_element() && stream_node.has_tag_name("node") {
203                if !stream_node.has_tag_name("node") {
204                    return Err(ParseError::General("unexpected stream node tag".to_string()));
205                }
206                let stream_id = match stream_node.attribute("name") {
207                    Some(content) => content,
208                    None => return Err(ParseError::General("missing stream node id".to_string())),
209                }
210                    .parse::<u32>()?;
211                let identifier = match stream_node.children().find(|node| {
212                    node.is_element()
213                        && node.has_tag_name("attr")
214                        && node.attribute("key") == Some("typeIdentifier")
215                }) {
216                    Some(content) => match content.text() {
217                        Some(content) => content,
218                        None => {
219                            return Err(ParseError::General("empty stream node type identifier".to_string()))
220                        }
221                    },
222                    None => return Err(ParseError::General("missing stream node type identifier".to_string())),
223                }
224                    .to_string();
225                let mut width = 0u16;
226                let mut height = 0u16;
227                if identifier == "EVTS" || identifier == "FRME" {
228                    let info_node = match stream_node.children().find(|node| {
229                        node.is_element()
230                            && node.has_tag_name("node")
231                            && node.attribute("name") == Some("info")
232                    }) {
233                        Some(content) => content,
234                        None => return Err(ParseError::General("missing info node".to_string())),
235                    };
236                    width = match info_node.children().find(|node| {
237                        node.is_element()
238                            && node.has_tag_name("attr")
239                            && node.attribute("key") == Some("sizeX")
240                    }) {
241                        Some(content) => match content.text() {
242                            Some(content) => content,
243                            None => return Err(ParseError::General("empty sizeX attribute".to_string())),
244                        },
245                        None => return Err(ParseError::General("missing sizeX attribute".to_string())),
246                    }
247                        .parse::<u16>()?;
248                    height = match info_node.children().find(|node| {
249                        node.is_element()
250                            && node.has_tag_name("attr")
251                            && node.attribute("key") == Some("sizeY")
252                    }) {
253                        Some(content) => match content.text() {
254                            Some(content) => content,
255                            None => return Err(ParseError::General("empty sizeX attribute".to_string())),
256                        },
257                        None => return Err(ParseError::General("missing sizeX attribute".to_string())),
258                    }
259                        .parse::<u16>()?;
260                }
261                if decoder
262                    .id_to_stream
263                    .insert(
264                        stream_id,
265                        Stream {
266                            content: StreamContent::from(&identifier)?,
267                            width,
268                            height,
269                        },
270                    )
271                    .is_some()
272                {
273                    return Err(ParseError::General("duplicated stream id".to_string()));
274                }
275            }
276        }
277    }
278    if decoder.id_to_stream.is_empty() {
279        return Err(ParseError::General("no stream found in the description".to_string()));
280    }
281    Ok(decoder)
282
283}
284
285#[derive(Debug, Clone)]
286pub struct Packet {
287    pub buffer: Vec<u8>,
288    pub stream_id: u32,
289}
290
291impl Iterator for Decoder {
292    type Item = Result<Packet, ParseError>;
293
294    fn next(&mut self) -> Option<Self::Item> {
295        if self.file_data_position > -1 && self.position == self.file_data_position {
296            return None;
297        }
298        let mut packet = Packet {
299            buffer: Vec::new(),
300            stream_id: {
301                let mut bytes = [0; 4];
302                match self.file.read_exact(&mut bytes) {
303                    Ok(()) => (),
304                    Err(_) => return None,
305                }
306                u32::from_le_bytes(bytes)
307            },
308        };
309        let length = {
310            let mut bytes = [0; 4];
311            if let Err(error) = self.file.read_exact(&mut bytes) {
312                return Some(Err(ParseError::from(error)));
313            }
314            u32::from_le_bytes(bytes)
315        };
316        self.position += 8i64 + length as i64;
317        let mut raw_buffer = std::vec![0; length as usize];
318        if let Err(error) = self.file.read_exact(&mut raw_buffer) {
319            return Some(Err(ParseError::from(error)));
320        }
321        match self.compression {
322            ioheader_generated::Compression::None => {
323                std::mem::swap(&mut raw_buffer, &mut packet.buffer)
324            }
325            ioheader_generated::Compression::Lz4 | ioheader_generated::Compression::Lz4High => {
326                match lz4::Decoder::new(&raw_buffer[..]) {
327                    Ok(mut result) => {
328                        if let Err(error) = result.read_to_end(&mut packet.buffer) {
329                            return Some(Err(ParseError::from(error)));
330                        }
331                    }
332                    Err(error) => return Some(Err(ParseError::from(error))),
333                }
334            }
335            ioheader_generated::Compression::Zstd | ioheader_generated::Compression::ZstdHigh => {
336                match zstd::stream::Decoder::new(&raw_buffer[..]) {
337                    Ok(mut result) => {
338                        if let Err(error) = result.read_to_end(&mut packet.buffer) {
339                            return Some(Err(ParseError::from(error)));
340                        }
341                    }
342                    Err(error) => return Some(Err(ParseError::from(error))),
343                }
344            }
345            _ => return Some(Err(ParseError::General("unknown compression algorithm".to_string()))),
346        }
347        let expected_content = &(match self.id_to_stream.get(&packet.stream_id) {
348            Some(content) => content,
349            None => return Some(Err(ParseError::General("unknown stream id".to_string()))),
350        }
351            .content);
352        if !flatbuffers::buffer_has_identifier(&packet.buffer, &expected_content.to_string(), true)
353        {
354            return Some(Err(ParseError::General(
355                "the stream id and the identifier do not match".to_string(),
356            )));
357        }
358        Some(Ok(packet))
359    }
360}