1use crate::{Error, Result};
5use memmap2::Mmap;
6use rustpix_core::soa::HitBatch;
7use rustpix_tpx::ordering::TimeOrderedStream;
8use rustpix_tpx::section::discover_sections;
9use rustpix_tpx::{DetectorConfig, Tpx3Packet};
10use std::fs::File;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14pub struct MappedFileReader {
19 mmap: Arc<Mmap>,
21 path: PathBuf,
23}
24
25impl MappedFileReader {
26 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
31 let file = File::open(&path)?;
32 #[allow(unsafe_code)]
35 let mmap = unsafe { Mmap::map(&file)? };
36 Ok(Self {
37 mmap: Arc::new(mmap),
38 path: path.as_ref().to_path_buf(),
39 })
40 }
41
42 #[must_use]
44 pub fn as_bytes(&self) -> &[u8] {
45 &self.mmap[..]
46 }
47
48 #[must_use]
50 pub fn len(&self) -> usize {
51 self.mmap.len()
52 }
53
54 #[must_use]
56 pub fn is_empty(&self) -> bool {
57 self.mmap.is_empty()
58 }
59
60 pub fn chunks(&self) -> impl Iterator<Item = &[u8]> {
64 self.mmap.chunks(8)
65 }
66}
67
68#[derive(Clone)]
69struct SharedMmap(Arc<Mmap>);
70
71impl AsRef<[u8]> for SharedMmap {
72 fn as_ref(&self) -> &[u8] {
73 &self.0[..]
74 }
75}
76
77pub struct TimeOrderedHitStream {
79 inner: TimeOrderedStream<SharedMmap>,
81}
82
83impl Iterator for TimeOrderedHitStream {
84 type Item = HitBatch;
85
86 fn next(&mut self) -> Option<Self::Item> {
87 self.inner.next()
88 }
89}
90
91pub struct EventBatch {
93 pub tdc_timestamp_25ns: u64,
95 pub hits: HitBatch,
97}
98
99pub struct TimeOrderedEventStream {
101 inner: TimeOrderedStream<SharedMmap>,
103}
104
105impl Iterator for TimeOrderedEventStream {
106 type Item = EventBatch;
107
108 fn next(&mut self) -> Option<Self::Item> {
109 self.inner.next_pulse_batch().map(|batch| EventBatch {
110 tdc_timestamp_25ns: batch.tdc_timestamp,
111 hits: batch.hits,
112 })
113 }
114}
115
116pub struct Tpx3FileReader {
118 reader: MappedFileReader,
120 config: DetectorConfig,
122}
123
124impl Tpx3FileReader {
125 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
130 let reader = MappedFileReader::open(path)?;
131 Ok(Self {
132 reader,
133 config: DetectorConfig::default(),
134 })
135 }
136
137 #[must_use]
139 pub fn with_config(mut self, config: DetectorConfig) -> Self {
140 self.config = config;
141 self
142 }
143
144 #[must_use]
146 pub fn file_size(&self) -> usize {
147 self.reader.len()
148 }
149
150 #[must_use]
152 pub fn packet_count(&self) -> usize {
153 self.reader.len() / 8
154 }
155
156 pub fn read_batch(&self) -> Result<HitBatch> {
164 self.read_batch_time_ordered()
165 }
166
167 pub fn read_batch_time_ordered(&self) -> Result<HitBatch> {
178 if !self.reader.len().is_multiple_of(8) {
179 return Err(Error::InvalidFormat(format!(
180 "file size {} is not a multiple of 8 (file: {})",
181 self.reader.len(),
182 self.reader.path.display()
183 )));
184 }
185
186 let data = self.reader.as_bytes();
187 let sections = discover_sections(data);
188
189 let stream = TimeOrderedStream::new(data, §ions, &self.config);
190 let mut batch = HitBatch::default();
191 for pulse_batch in stream {
192 batch.append(&pulse_batch);
193 }
194 Ok(batch)
195 }
196
197 pub fn stream_time_ordered(&self) -> Result<TimeOrderedHitStream> {
202 if !self.reader.len().is_multiple_of(8) {
203 return Err(Error::InvalidFormat(format!(
204 "file size {} is not a multiple of 8 (file: {})",
205 self.reader.len(),
206 self.reader.path.display()
207 )));
208 }
209
210 let sections = discover_sections(self.reader.as_bytes());
211 let stream = TimeOrderedStream::new(
212 SharedMmap(self.reader.mmap.clone()),
213 §ions,
214 &self.config,
215 );
216 Ok(TimeOrderedHitStream { inner: stream })
217 }
218
219 pub fn stream_time_ordered_events(&self) -> Result<TimeOrderedEventStream> {
224 if !self.reader.len().is_multiple_of(8) {
225 return Err(Error::InvalidFormat(format!(
226 "file size {} is not a multiple of 8 (file: {})",
227 self.reader.len(),
228 self.reader.path.display()
229 )));
230 }
231
232 let sections = discover_sections(self.reader.as_bytes());
233 let stream = TimeOrderedStream::new(
234 SharedMmap(self.reader.mmap.clone()),
235 §ions,
236 &self.config,
237 );
238 Ok(TimeOrderedEventStream { inner: stream })
239 }
240
241 pub fn iter_packets(&self) -> impl Iterator<Item = Tpx3Packet> + '_ {
247 self.reader.as_bytes().chunks_exact(8).map(|chunk| {
248 let bytes: [u8; 8] = chunk.try_into().unwrap();
249 Tpx3Packet::new(u64::from_le_bytes(bytes))
250 })
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257 use std::io::Write;
258 use tempfile::NamedTempFile;
259
260 #[test]
261 fn test_mapped_file_reader() {
262 let mut file = NamedTempFile::new().unwrap();
263 let data: Vec<u8> = (0..64).collect();
264 file.write_all(&data).unwrap();
265 file.flush().unwrap();
266
267 let reader = MappedFileReader::open(file.path()).unwrap();
268 assert_eq!(reader.len(), 64);
269 assert!(!reader.is_empty());
270 assert_eq!(reader.as_bytes(), &data[..]);
271 }
272
273 #[test]
274 fn test_tpx3_file_reader_empty() {
275 let file = NamedTempFile::new().unwrap();
276
277 let reader = Tpx3FileReader::open(file.path()).unwrap();
278 assert_eq!(reader.file_size(), 0);
279 assert_eq!(reader.packet_count(), 0);
280
281 let batch = reader.read_batch().unwrap();
282 assert!(batch.is_empty());
283 }
284
285 #[test]
286 fn test_tpx3_file_reader_invalid_size() {
287 let mut file = NamedTempFile::new().unwrap();
288 file.write_all(&[0u8; 7]).unwrap(); file.flush().unwrap();
290
291 let reader = Tpx3FileReader::open(file.path()).unwrap();
292 assert!(reader.read_batch().is_err());
293 }
294}