1use std::fs::File;
2use std::io::{Read};
3use std::net::{TcpStream, ToSocketAddrs};
4#[cfg(target_family = "unix")]
5use std::os::unix::net::UnixStream;
6use num_derive::FromPrimitive;
7use thiserror::Error;
8
9#[allow(dead_code, unused_imports)]
10#[path = "./ioheader_generated.rs"]
11pub mod ioheader_generated;
12
13#[allow(dead_code, unused_imports)]
14#[path = "./events_generated.rs"]
15mod events_generated;
16
17#[allow(dead_code, unused_imports)]
18#[path = "./frame_generated.rs"]
19mod frame_generated;
20
21#[allow(dead_code, unused_imports)]
22#[path = "./imus_generated.rs"]
23mod imus_generated;
24
25#[allow(dead_code, unused_imports)]
26#[path = "./triggers_generated.rs"]
27mod triggers_generated;
28
29const MAGIC_NUMBER: &str = "#!AER-DAT4.0\r\n";
30
31
32#[allow(missing_docs)]
33#[derive(Error, Debug)]
34pub enum ParseError {
35 #[error("Parse error: `{0}`")]
36 General(String),
37
38 #[error("Unsupported stream type: `{0}`")]
39 UnsupportedStreamType(String),
40
41 #[error("FlatBuffer error")]
42 FlatBuffer(#[from] flatbuffers::InvalidFlatbuffer),
43
44 #[error("Utf8 error")]
45 Utf8(#[from] std::str::Utf8Error),
46
47 #[error("RoxmlTree error")]
48 RoxmlTree(#[from] roxmltree::Error),
49
50 #[error("ParseIntError error")]
51 ParseInt(#[from] std::num::ParseIntError),
52
53 #[error("IO error")]
54 Io(#[from] std::io::Error),
55}
56
57trait Source:std::io::Read {}
58impl Source for File {}
59#[cfg(target_family = "unix")]
60impl Source for UnixStream {}
61impl Source for TcpStream {}
62
63#[derive(FromPrimitive, Copy, Clone)]
64pub enum StreamContent {
65 Events,
66 Frame,
67 Imus,
68 Triggers,
69}
70
71impl StreamContent {
72 fn from(identifier: &str) -> Result<Self, ParseError> {
73 match identifier {
74 "EVTS" => Ok(StreamContent::Events),
75 "FRME" => Ok(StreamContent::Frame),
76 "IMUS" => Ok(StreamContent::Imus),
77 "TRIG" => Ok(StreamContent::Triggers),
78 _ => Err(ParseError::UnsupportedStreamType("unsupported stream type".to_string())),
79 }
80 }
81}
82
83impl std::fmt::Display for StreamContent {
84 fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
85 write!(
86 formatter,
87 "{}",
88 match self {
89 StreamContent::Events => "EVTS",
90 StreamContent::Frame => "FRME",
91 StreamContent::Imus => "IMUS",
92 StreamContent::Triggers => "TRIG",
93 }
94 )
95 }
96}
97
98pub struct Stream {
99 pub content: StreamContent,
100 pub width: u16,
101 pub height: u16,
102}
103
104pub struct Decoder {
105 pub id_to_stream: std::collections::HashMap<u32, Stream>,
106 file: Box<dyn Source>,
107 position: i64,
108 compression: ioheader_generated::Compression,
109 file_data_position: i64,
110}
111
112unsafe impl Send for Decoder {}
113
114impl Decoder {
115 pub fn new_from_file<P: std::convert::AsRef<std::path::Path>>(path: P) -> Result<Self, ParseError> {
116 let mut decoder = Decoder {
117 id_to_stream: std::collections::HashMap::new(),
118 file: Box::new(File::open(path)?),
119 position: 0i64,
120 file_data_position: 0,
121 compression: ioheader_generated::Compression::None,
122 };
123 {
124 let mut magic_number_buffer = [0; MAGIC_NUMBER.len()];
125 decoder.file.read_exact(&mut magic_number_buffer)?;
126 if std::str::from_utf8(&magic_number_buffer)? != MAGIC_NUMBER {
127 return Err(ParseError::General(
128 "the file does not contain AEDAT4 data (wrong magic number)".to_string(),
129 ));
130 }
131 decoder.position += MAGIC_NUMBER.len() as i64;
132 }
133 decoder = read_io_header(decoder)?;
134
135 Ok(decoder)
136 }
137
138
139 #[cfg(target_family = "unix")]
140 pub fn new_from_unix_stream<P: std::convert::AsRef<std::path::Path> + Clone>(
141 path: P) -> Result<Self, ParseError> {
142 let mut decoder = Decoder {
143 id_to_stream: std::collections::HashMap::new(),
144 file: Box::new(UnixStream::connect(path)?),
145 position: 0i64,
146 file_data_position: -1,
147 compression: ioheader_generated::Compression::None,
148 };
149 decoder = read_io_header(decoder)?;
150 Ok(decoder)
151 }
152
153 pub fn new_from_tcp_stream<P: ToSocketAddrs + Clone>(
154 path: P,
155 ) -> Result<Self, ParseError> {
156 let mut decoder = Decoder {
157 id_to_stream: std::collections::HashMap::new(),
158 file: Box::new(TcpStream::connect(path)?),
159 position: 0i64,
160 file_data_position: -1,
161 compression: ioheader_generated::Compression::None,
162 };
163 decoder = read_io_header(decoder)?;
164 Ok(decoder)
165 }
166}
167
168fn read_io_header(mut decoder: Decoder) -> Result<Decoder, ParseError> {
169 let length = {
170 let mut bytes = [0; 4];
171 decoder.file.read_exact(&mut bytes)?;
172 u32::from_le_bytes(bytes)
173 };
174 decoder.position += 4i64 + length as i64;
175 {
176 let mut buffer = std::vec![0; length as usize];
177 decoder.file.read_exact(&mut buffer)?;
178 let ioheader = unsafe { ioheader_generated::root_as_ioheader_unchecked(&buffer) };
179 decoder.compression = ioheader.compression();
180 decoder.file_data_position = ioheader.file_data_position();
181 let description = match ioheader.description() {
182 Some(content) => content,
183 None => return Err(ParseError::General("the description is empty".to_string())),
184 };
185 let document = roxmltree::Document::parse(description)?;
186 let dv_node = match document.root().first_child() {
187 Some(content) => content,
188 None => return Err(ParseError::General("the description has no dv node".to_string())),
189 };
190 if !dv_node.has_tag_name("dv") {
191 return Err(ParseError::General("unexpected dv node tag".to_string()));
192 }
193 let output_node = match dv_node.children().find(|node| {
194 node.is_element()
195 && node.has_tag_name("node")
196 && node.attribute("name") == Some("outInfo")
197 }) {
198 Some(content) => content,
199 None => return Err(ParseError::General("the description has no output node".to_string())),
200 };
201 for stream_node in output_node.children() {
202 if stream_node.is_element() && stream_node.has_tag_name("node") {
203 if !stream_node.has_tag_name("node") {
204 return Err(ParseError::General("unexpected stream node tag".to_string()));
205 }
206 let stream_id = match stream_node.attribute("name") {
207 Some(content) => content,
208 None => return Err(ParseError::General("missing stream node id".to_string())),
209 }
210 .parse::<u32>()?;
211 let identifier = match stream_node.children().find(|node| {
212 node.is_element()
213 && node.has_tag_name("attr")
214 && node.attribute("key") == Some("typeIdentifier")
215 }) {
216 Some(content) => match content.text() {
217 Some(content) => content,
218 None => {
219 return Err(ParseError::General("empty stream node type identifier".to_string()))
220 }
221 },
222 None => return Err(ParseError::General("missing stream node type identifier".to_string())),
223 }
224 .to_string();
225 let mut width = 0u16;
226 let mut height = 0u16;
227 if identifier == "EVTS" || identifier == "FRME" {
228 let info_node = match stream_node.children().find(|node| {
229 node.is_element()
230 && node.has_tag_name("node")
231 && node.attribute("name") == Some("info")
232 }) {
233 Some(content) => content,
234 None => return Err(ParseError::General("missing info node".to_string())),
235 };
236 width = match info_node.children().find(|node| {
237 node.is_element()
238 && node.has_tag_name("attr")
239 && node.attribute("key") == Some("sizeX")
240 }) {
241 Some(content) => match content.text() {
242 Some(content) => content,
243 None => return Err(ParseError::General("empty sizeX attribute".to_string())),
244 },
245 None => return Err(ParseError::General("missing sizeX attribute".to_string())),
246 }
247 .parse::<u16>()?;
248 height = match info_node.children().find(|node| {
249 node.is_element()
250 && node.has_tag_name("attr")
251 && node.attribute("key") == Some("sizeY")
252 }) {
253 Some(content) => match content.text() {
254 Some(content) => content,
255 None => return Err(ParseError::General("empty sizeX attribute".to_string())),
256 },
257 None => return Err(ParseError::General("missing sizeX attribute".to_string())),
258 }
259 .parse::<u16>()?;
260 }
261 if decoder
262 .id_to_stream
263 .insert(
264 stream_id,
265 Stream {
266 content: StreamContent::from(&identifier)?,
267 width,
268 height,
269 },
270 )
271 .is_some()
272 {
273 return Err(ParseError::General("duplicated stream id".to_string()));
274 }
275 }
276 }
277 }
278 if decoder.id_to_stream.is_empty() {
279 return Err(ParseError::General("no stream found in the description".to_string()));
280 }
281 Ok(decoder)
282
283}
284
285#[derive(Debug, Clone)]
286pub struct Packet {
287 pub buffer: Vec<u8>,
288 pub stream_id: u32,
289}
290
291impl Iterator for Decoder {
292 type Item = Result<Packet, ParseError>;
293
294 fn next(&mut self) -> Option<Self::Item> {
295 if self.file_data_position > -1 && self.position == self.file_data_position {
296 return None;
297 }
298 let mut packet = Packet {
299 buffer: Vec::new(),
300 stream_id: {
301 let mut bytes = [0; 4];
302 match self.file.read_exact(&mut bytes) {
303 Ok(()) => (),
304 Err(_) => return None,
305 }
306 u32::from_le_bytes(bytes)
307 },
308 };
309 let length = {
310 let mut bytes = [0; 4];
311 if let Err(error) = self.file.read_exact(&mut bytes) {
312 return Some(Err(ParseError::from(error)));
313 }
314 u32::from_le_bytes(bytes)
315 };
316 self.position += 8i64 + length as i64;
317 let mut raw_buffer = std::vec![0; length as usize];
318 if let Err(error) = self.file.read_exact(&mut raw_buffer) {
319 return Some(Err(ParseError::from(error)));
320 }
321 match self.compression {
322 ioheader_generated::Compression::None => {
323 std::mem::swap(&mut raw_buffer, &mut packet.buffer)
324 }
325 ioheader_generated::Compression::Lz4 | ioheader_generated::Compression::Lz4High => {
326 match lz4::Decoder::new(&raw_buffer[..]) {
327 Ok(mut result) => {
328 if let Err(error) = result.read_to_end(&mut packet.buffer) {
329 return Some(Err(ParseError::from(error)));
330 }
331 }
332 Err(error) => return Some(Err(ParseError::from(error))),
333 }
334 }
335 ioheader_generated::Compression::Zstd | ioheader_generated::Compression::ZstdHigh => {
336 match zstd::stream::Decoder::new(&raw_buffer[..]) {
337 Ok(mut result) => {
338 if let Err(error) = result.read_to_end(&mut packet.buffer) {
339 return Some(Err(ParseError::from(error)));
340 }
341 }
342 Err(error) => return Some(Err(ParseError::from(error))),
343 }
344 }
345 _ => return Some(Err(ParseError::General("unknown compression algorithm".to_string()))),
346 }
347 let expected_content = &(match self.id_to_stream.get(&packet.stream_id) {
348 Some(content) => content,
349 None => return Some(Err(ParseError::General("unknown stream id".to_string()))),
350 }
351 .content);
352 if !flatbuffers::buffer_has_identifier(&packet.buffer, &expected_content.to_string(), true)
353 {
354 return Some(Err(ParseError::General(
355 "the stream id and the identifier do not match".to_string(),
356 )));
357 }
358 Some(Ok(packet))
359 }
360}