Skip to main content

rustpix_io/
reader.rs

1//! Memory-mapped file readers.
2//!
3
4use crate::{Error, Result};
5use memmap2::Mmap;
6use rustpix_core::soa::HitBatch;
7use rustpix_tpx::ordering::TimeOrderedStream;
8use rustpix_tpx::section::discover_sections;
9use rustpix_tpx::{DetectorConfig, Tpx3Packet};
10use std::fs::File;
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13
14/// A memory-mapped file reader.
15///
16/// Uses memmap2 to efficiently access file contents without
17/// loading the entire file into memory.
18pub struct MappedFileReader {
19    /// Memory-mapped file contents.
20    mmap: Arc<Mmap>,
21    /// Path to the underlying file.
22    path: PathBuf,
23}
24
25impl MappedFileReader {
26    /// Opens a file for memory-mapped reading.
27    ///
28    /// # Errors
29    /// Returns an error if the file cannot be opened or memory-mapped.
30    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
31        let file = File::open(&path)?;
32        // SAFETY: The file is opened read-only and we assume it is not modified concurrently.
33        // This is the standard safety contract for memory mapping.
34        #[allow(unsafe_code)]
35        let mmap = unsafe { Mmap::map(&file)? };
36        Ok(Self {
37            mmap: Arc::new(mmap),
38            path: path.as_ref().to_path_buf(),
39        })
40    }
41
42    /// Returns the file contents as a byte slice.
43    #[must_use]
44    pub fn as_bytes(&self) -> &[u8] {
45        &self.mmap[..]
46    }
47
48    /// Returns the file size in bytes.
49    #[must_use]
50    pub fn len(&self) -> usize {
51        self.mmap.len()
52    }
53
54    /// Returns true if the file is empty.
55    #[must_use]
56    pub fn is_empty(&self) -> bool {
57        self.mmap.is_empty()
58    }
59
60    /// Returns an iterator over 8-byte chunks.
61    ///
62    /// Each chunk corresponds to a raw TPX3 packet.
63    pub fn chunks(&self) -> impl Iterator<Item = &[u8]> {
64        self.mmap.chunks(8)
65    }
66}
67
68#[derive(Clone)]
69struct SharedMmap(Arc<Mmap>);
70
71impl AsRef<[u8]> for SharedMmap {
72    fn as_ref(&self) -> &[u8] {
73        &self.0[..]
74    }
75}
76
77/// Time-ordered stream of hit batches that owns the underlying file mapping.
78pub struct TimeOrderedHitStream {
79    /// Underlying pulse-ordered stream.
80    inner: TimeOrderedStream<SharedMmap>,
81}
82
83impl Iterator for TimeOrderedHitStream {
84    type Item = HitBatch;
85
86    fn next(&mut self) -> Option<Self::Item> {
87        self.inner.next()
88    }
89}
90
91/// A pulse-ordered event batch with its TDC timestamp (25ns ticks).
92pub struct EventBatch {
93    /// Pulse TDC timestamp in 25ns ticks.
94    pub tdc_timestamp_25ns: u64,
95    /// Hit batch for the pulse.
96    pub hits: HitBatch,
97}
98
99/// Time-ordered stream of event batches that owns the underlying file mapping.
100pub struct TimeOrderedEventStream {
101    /// Underlying pulse-ordered stream.
102    inner: TimeOrderedStream<SharedMmap>,
103}
104
105impl Iterator for TimeOrderedEventStream {
106    type Item = EventBatch;
107
108    fn next(&mut self) -> Option<Self::Item> {
109        self.inner.next_pulse_batch().map(|batch| EventBatch {
110            tdc_timestamp_25ns: batch.tdc_timestamp,
111            hits: batch.hits,
112        })
113    }
114}
115
116/// A TPX3 file reader with memory-mapped I/O.
117pub struct Tpx3FileReader {
118    /// Memory-mapped reader.
119    reader: MappedFileReader,
120    /// Detector configuration used for parsing.
121    config: DetectorConfig,
122}
123
124impl Tpx3FileReader {
125    /// Opens a TPX3 file for reading with default configuration.
126    ///
127    /// # Errors
128    /// Returns an error if the file cannot be opened or memory-mapped.
129    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
130        let reader = MappedFileReader::open(path)?;
131        Ok(Self {
132            reader,
133            config: DetectorConfig::default(),
134        })
135    }
136
137    /// Sets the detector configuration.
138    #[must_use]
139    pub fn with_config(mut self, config: DetectorConfig) -> Self {
140        self.config = config;
141        self
142    }
143
144    /// Returns the file size in bytes.
145    #[must_use]
146    pub fn file_size(&self) -> usize {
147        self.reader.len()
148    }
149
150    /// Returns the number of 8-byte packets in the file.
151    #[must_use]
152    pub fn packet_count(&self) -> usize {
153        self.reader.len() / 8
154    }
155
156    /// Reads and parses all hits from the file into a `HitBatch` (`SoA`).
157    ///
158    /// This uses the pulse-based time-ordered stream to ensure correct
159    /// temporal ordering across pulses and chips.
160    ///
161    /// # Errors
162    /// Returns an error if the file size is invalid or the data cannot be parsed.
163    pub fn read_batch(&self) -> Result<HitBatch> {
164        self.read_batch_time_ordered()
165    }
166
167    /// Reads hits using the efficient time-ordered stream.
168    ///
169    /// This uses a pulse-based K-way merge to produce time-ordered hits
170    /// without loading the entire file or performing a global sort.
171    ///
172    /// This is functionally equivalent to `read_batch()` and is retained
173    /// for clarity.
174    ///
175    /// # Errors
176    /// Returns an error if the file size is invalid.
177    pub fn read_batch_time_ordered(&self) -> Result<HitBatch> {
178        if !self.reader.len().is_multiple_of(8) {
179            return Err(Error::InvalidFormat(format!(
180                "file size {} is not a multiple of 8 (file: {})",
181                self.reader.len(),
182                self.reader.path.display()
183            )));
184        }
185
186        let data = self.reader.as_bytes();
187        let sections = discover_sections(data);
188
189        let stream = TimeOrderedStream::new(data, &sections, &self.config);
190        let mut batch = HitBatch::default();
191        for pulse_batch in stream {
192            batch.append(&pulse_batch);
193        }
194        Ok(batch)
195    }
196
197    /// Returns a time-ordered stream of hit batches (pulse-merged).
198    ///
199    /// # Errors
200    /// Returns an error if the file size is invalid.
201    pub fn stream_time_ordered(&self) -> Result<TimeOrderedHitStream> {
202        if !self.reader.len().is_multiple_of(8) {
203            return Err(Error::InvalidFormat(format!(
204                "file size {} is not a multiple of 8 (file: {})",
205                self.reader.len(),
206                self.reader.path.display()
207            )));
208        }
209
210        let sections = discover_sections(self.reader.as_bytes());
211        let stream = TimeOrderedStream::new(
212            SharedMmap(self.reader.mmap.clone()),
213            &sections,
214            &self.config,
215        );
216        Ok(TimeOrderedHitStream { inner: stream })
217    }
218
219    /// Returns a time-ordered stream of event batches (pulse-merged with TDC).
220    ///
221    /// # Errors
222    /// Returns an error if the file size is invalid.
223    pub fn stream_time_ordered_events(&self) -> Result<TimeOrderedEventStream> {
224        if !self.reader.len().is_multiple_of(8) {
225            return Err(Error::InvalidFormat(format!(
226                "file size {} is not a multiple of 8 (file: {})",
227                self.reader.len(),
228                self.reader.path.display()
229            )));
230        }
231
232        let sections = discover_sections(self.reader.as_bytes());
233        let stream = TimeOrderedStream::new(
234            SharedMmap(self.reader.mmap.clone()),
235            &sections,
236            &self.config,
237        );
238        Ok(TimeOrderedEventStream { inner: stream })
239    }
240
241    /// Returns an iterator over raw packets.
242    ///
243    /// # Panics
244    /// Panics if a chunk is not exactly 8 bytes. This should be unreachable because
245    /// `chunks_exact(8)` guarantees each chunk length.
246    pub fn iter_packets(&self) -> impl Iterator<Item = Tpx3Packet> + '_ {
247        self.reader.as_bytes().chunks_exact(8).map(|chunk| {
248            let bytes: [u8; 8] = chunk.try_into().unwrap();
249            Tpx3Packet::new(u64::from_le_bytes(bytes))
250        })
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use std::io::Write;
258    use tempfile::NamedTempFile;
259
260    #[test]
261    fn test_mapped_file_reader() {
262        let mut file = NamedTempFile::new().unwrap();
263        let data: Vec<u8> = (0..64).collect();
264        file.write_all(&data).unwrap();
265        file.flush().unwrap();
266
267        let reader = MappedFileReader::open(file.path()).unwrap();
268        assert_eq!(reader.len(), 64);
269        assert!(!reader.is_empty());
270        assert_eq!(reader.as_bytes(), &data[..]);
271    }
272
273    #[test]
274    fn test_tpx3_file_reader_empty() {
275        let file = NamedTempFile::new().unwrap();
276
277        let reader = Tpx3FileReader::open(file.path()).unwrap();
278        assert_eq!(reader.file_size(), 0);
279        assert_eq!(reader.packet_count(), 0);
280
281        let batch = reader.read_batch().unwrap();
282        assert!(batch.is_empty());
283    }
284
285    #[test]
286    fn test_tpx3_file_reader_invalid_size() {
287        let mut file = NamedTempFile::new().unwrap();
288        file.write_all(&[0u8; 7]).unwrap(); // Not a multiple of 8
289        file.flush().unwrap();
290
291        let reader = Tpx3FileReader::open(file.path()).unwrap();
292        assert!(reader.read_batch().is_err());
293    }
294}