use crate::{Error, Result};
use memmap2::Mmap;
use rustpix_core::soa::HitBatch;
use rustpix_tpx::ordering::TimeOrderedStream;
use rustpix_tpx::section::discover_sections;
use rustpix_tpx::{DetectorConfig, Tpx3Packet};
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
pub struct MappedFileReader {
mmap: Arc<Mmap>,
path: PathBuf,
}
impl MappedFileReader {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let file = File::open(&path)?;
#[allow(unsafe_code)]
let mmap = unsafe { Mmap::map(&file)? };
Ok(Self {
mmap: Arc::new(mmap),
path: path.as_ref().to_path_buf(),
})
}
#[must_use]
pub fn as_bytes(&self) -> &[u8] {
&self.mmap[..]
}
#[must_use]
pub fn len(&self) -> usize {
self.mmap.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.mmap.is_empty()
}
pub fn chunks(&self) -> impl Iterator<Item = &[u8]> {
self.mmap.chunks(8)
}
}
#[derive(Clone)]
struct SharedMmap(Arc<Mmap>);
impl AsRef<[u8]> for SharedMmap {
fn as_ref(&self) -> &[u8] {
&self.0[..]
}
}
pub struct TimeOrderedHitStream {
inner: TimeOrderedStream<SharedMmap>,
}
impl Iterator for TimeOrderedHitStream {
type Item = HitBatch;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}
pub struct EventBatch {
pub tdc_timestamp_25ns: u64,
pub hits: HitBatch,
}
pub struct TimeOrderedEventStream {
inner: TimeOrderedStream<SharedMmap>,
}
impl Iterator for TimeOrderedEventStream {
type Item = EventBatch;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next_pulse_batch().map(|batch| EventBatch {
tdc_timestamp_25ns: batch.tdc_timestamp,
hits: batch.hits,
})
}
}
pub struct Tpx3FileReader {
reader: MappedFileReader,
config: DetectorConfig,
}
impl Tpx3FileReader {
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
let reader = MappedFileReader::open(path)?;
Ok(Self {
reader,
config: DetectorConfig::default(),
})
}
#[must_use]
pub fn with_config(mut self, config: DetectorConfig) -> Self {
self.config = config;
self
}
#[must_use]
pub fn file_size(&self) -> usize {
self.reader.len()
}
#[must_use]
pub fn packet_count(&self) -> usize {
self.reader.len() / 8
}
pub fn read_batch(&self) -> Result<HitBatch> {
self.read_batch_time_ordered()
}
pub fn read_batch_time_ordered(&self) -> Result<HitBatch> {
if !self.reader.len().is_multiple_of(8) {
return Err(Error::InvalidFormat(format!(
"file size {} is not a multiple of 8 (file: {})",
self.reader.len(),
self.reader.path.display()
)));
}
let data = self.reader.as_bytes();
let sections = discover_sections(data);
let stream = TimeOrderedStream::new(data, §ions, &self.config);
let mut batch = HitBatch::default();
for pulse_batch in stream {
batch.append(&pulse_batch);
}
Ok(batch)
}
pub fn stream_time_ordered(&self) -> Result<TimeOrderedHitStream> {
if !self.reader.len().is_multiple_of(8) {
return Err(Error::InvalidFormat(format!(
"file size {} is not a multiple of 8 (file: {})",
self.reader.len(),
self.reader.path.display()
)));
}
let sections = discover_sections(self.reader.as_bytes());
let stream = TimeOrderedStream::new(
SharedMmap(self.reader.mmap.clone()),
§ions,
&self.config,
);
Ok(TimeOrderedHitStream { inner: stream })
}
pub fn stream_time_ordered_events(&self) -> Result<TimeOrderedEventStream> {
if !self.reader.len().is_multiple_of(8) {
return Err(Error::InvalidFormat(format!(
"file size {} is not a multiple of 8 (file: {})",
self.reader.len(),
self.reader.path.display()
)));
}
let sections = discover_sections(self.reader.as_bytes());
let stream = TimeOrderedStream::new(
SharedMmap(self.reader.mmap.clone()),
§ions,
&self.config,
);
Ok(TimeOrderedEventStream { inner: stream })
}
pub fn iter_packets(&self) -> impl Iterator<Item = Tpx3Packet> + '_ {
self.reader.as_bytes().chunks_exact(8).map(|chunk| {
let bytes: [u8; 8] = chunk.try_into().unwrap();
Tpx3Packet::new(u64::from_le_bytes(bytes))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_mapped_file_reader() {
let mut file = NamedTempFile::new().unwrap();
let data: Vec<u8> = (0..64).collect();
file.write_all(&data).unwrap();
file.flush().unwrap();
let reader = MappedFileReader::open(file.path()).unwrap();
assert_eq!(reader.len(), 64);
assert!(!reader.is_empty());
assert_eq!(reader.as_bytes(), &data[..]);
}
#[test]
fn test_tpx3_file_reader_empty() {
let file = NamedTempFile::new().unwrap();
let reader = Tpx3FileReader::open(file.path()).unwrap();
assert_eq!(reader.file_size(), 0);
assert_eq!(reader.packet_count(), 0);
let batch = reader.read_batch().unwrap();
assert!(batch.is_empty());
}
#[test]
fn test_tpx3_file_reader_invalid_size() {
let mut file = NamedTempFile::new().unwrap();
file.write_all(&[0u8; 7]).unwrap(); file.flush().unwrap();
let reader = Tpx3FileReader::open(file.path()).unwrap();
assert!(reader.read_batch().is_err());
}
}