#![allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
use crate::hdf5::{
append_slice, create_extendable_dataset, set_attr_str_group, set_dataset_units,
to_var_len_unicode, NeutronEventBatch,
};
use crate::reader::EventBatch;
use crate::{Error, Result};
use hdf5::types::VarLenUnicode;
use hdf5::{Dataset, File, Group};
use std::collections::HashSet;
use std::path::Path;
const NS_PER_TICK: u64 = 25;
const US_PER_TICK: f64 = 25.0 / 1000.0;
fn remap_gap(coord: u32, gaps: &[u32]) -> Option<u32> {
let shift = gaps.partition_point(|&g| g < coord);
if gaps.get(shift).copied() == Some(coord) {
return None; }
Some(coord - shift as u32)
}
#[derive(Clone, Debug)]
pub struct SnsBankConfig {
pub name: String,
pub pixel_id_offset: u32,
pub width: u32,
pub height: u32,
pub gap_columns: Vec<u32>,
pub gap_rows: Vec<u32>,
}
#[derive(Clone, Debug)]
pub struct SnsRunMetadata {
pub run_number: u32,
pub experiment_identifier: String,
pub start_time: String,
pub end_time: Option<String>,
pub duration: Option<f64>,
pub proton_charge: Option<f64>,
pub title: Option<String>,
}
#[derive(Clone, Debug)]
pub struct SnsInstrumentConfig {
pub name: String,
pub beamline: String,
pub instrument_xml: Option<String>,
}
#[derive(Clone, Debug)]
pub struct SnsWriteOptions {
pub banks: Vec<SnsBankConfig>,
pub run: SnsRunMetadata,
pub instrument: SnsInstrumentConfig,
pub super_resolution_factor: f64,
pub chunk_events: usize,
pub compression: Option<u8>,
pub shuffle: bool,
}
impl SnsWriteOptions {
#[must_use]
pub fn venus_defaults(run: SnsRunMetadata) -> Self {
Self {
banks: vec![SnsBankConfig {
name: "bank100".to_string(),
pixel_id_offset: 1_000_000,
width: 512,
height: 512,
gap_columns: vec![256, 257],
gap_rows: vec![256, 257],
}],
run,
instrument: SnsInstrumentConfig {
name: "VENUS".to_string(),
beamline: "BL10".to_string(),
instrument_xml: None,
},
super_resolution_factor: 1.0,
chunk_events: 100_000,
compression: Some(1),
shuffle: true,
}
}
}
#[derive(Clone, Debug)]
pub struct DasLogEntry {
pub name: String,
pub time: Vec<f64>,
pub value: Vec<f64>,
pub units: Option<String>,
}
struct SnsBankEventWriter {
event_id: Dataset,
event_time_offset: Dataset,
event_time_zero: Dataset,
event_index: Dataset,
total_counts_ds: Dataset,
event_count: u64,
pulse_count: u64,
pulse_timestamps_ns: Vec<u64>,
}
impl SnsBankEventWriter {
fn new(group: &Group, options: &SnsWriteOptions) -> Result<Self> {
let event_id = create_extendable_dataset::<u32>(
group,
"event_id",
options.chunk_events,
options.compression,
options.shuffle,
)?;
let event_time_offset = create_extendable_dataset::<f32>(
group,
"event_time_offset",
options.chunk_events,
options.compression,
options.shuffle,
)?;
set_dataset_units(&event_time_offset, "microsecond")?;
let event_time_zero = create_extendable_dataset::<f64>(
group,
"event_time_zero",
options.chunk_events,
options.compression,
options.shuffle,
)?;
set_dataset_units(&event_time_zero, "second")?;
let event_index = create_extendable_dataset::<u64>(
group,
"event_index",
options.chunk_events,
options.compression,
options.shuffle,
)?;
let total_counts_ds = group
.new_dataset::<u64>()
.shape((1,))
.create("total_counts")?;
total_counts_ds.write_raw(&[0u64])?;
Ok(Self {
event_id,
event_time_offset,
event_time_zero,
event_index,
total_counts_ds,
event_count: 0,
pulse_count: 0,
pulse_timestamps_ns: Vec::new(),
})
}
fn append_hits(
&mut self,
bank: &SnsBankConfig,
batch: &EventBatch,
pulse_ns: u64,
) -> Result<usize> {
let n = batch.hits.x.len();
if n == 0 {
return Ok(0);
}
let mut pixel_ids = Vec::with_capacity(n);
let mut tof_us = Vec::with_capacity(n);
for i in 0..n {
let raw_x = u32::from(batch.hits.x[i]);
let raw_y = u32::from(batch.hits.y[i]);
let Some(px) = remap_gap(raw_x, &bank.gap_columns) else {
continue; };
let Some(py) = remap_gap(raw_y, &bank.gap_rows) else {
continue;
};
if px >= bank.width || py >= bank.height {
continue; }
pixel_ids.push(bank.pixel_id_offset + py * bank.width + px);
tof_us.push((f64::from(batch.hits.tof[i]) * US_PER_TICK) as f32);
}
if pixel_ids.is_empty() {
return Ok(0); }
append_slice(
&self.event_index,
self.pulse_count as usize,
&[self.event_count],
)?;
self.pulse_timestamps_ns.push(pulse_ns);
self.pulse_count += 1;
append_slice(&self.event_id, self.event_count as usize, &pixel_ids)?;
append_slice(&self.event_time_offset, self.event_count as usize, &tof_us)?;
let written = pixel_ids.len();
self.event_count += written as u64;
Ok(written)
}
fn append_neutrons(
&mut self,
bank: &SnsBankConfig,
batch: &NeutronEventBatch,
pulse_ns: u64,
super_resolution_factor: f64,
) -> Result<usize> {
let n = batch.neutrons.x.len();
if n == 0 {
return Ok(0);
}
let inv = 1.0 / super_resolution_factor;
let mut pixel_ids = Vec::with_capacity(n);
let mut tof_us = Vec::with_capacity(n);
for i in 0..n {
let fx = (batch.neutrons.x[i] * inv).round();
let fy = (batch.neutrons.y[i] * inv).round();
if !fx.is_finite() || !fy.is_finite() || fx < 0.0 || fy < 0.0 {
continue;
}
let raw_x = fx as u32;
let raw_y = fy as u32;
let Some(px) = remap_gap(raw_x, &bank.gap_columns) else {
continue;
};
let Some(py) = remap_gap(raw_y, &bank.gap_rows) else {
continue;
};
if px >= bank.width || py >= bank.height {
continue; }
pixel_ids.push(bank.pixel_id_offset + py * bank.width + px);
tof_us.push((f64::from(batch.neutrons.tof[i]) * US_PER_TICK) as f32);
}
if pixel_ids.is_empty() {
return Ok(0); }
append_slice(
&self.event_index,
self.pulse_count as usize,
&[self.event_count],
)?;
self.pulse_timestamps_ns.push(pulse_ns);
self.pulse_count += 1;
append_slice(&self.event_id, self.event_count as usize, &pixel_ids)?;
append_slice(&self.event_time_offset, self.event_count as usize, &tof_us)?;
let written = pixel_ids.len();
self.event_count += written as u64;
Ok(written)
}
fn write_pulse_times(&self, min_start_ns: u64) -> Result<()> {
let times_s: Vec<f64> = self
.pulse_timestamps_ns
.iter()
.map(|&ns| (ns - min_start_ns) as f64 / 1_000_000_000.0)
.collect();
if !times_s.is_empty() {
append_slice(&self.event_time_zero, 0, ×_s)?;
}
Ok(())
}
fn write_total_counts(&self) -> Result<()> {
self.total_counts_ds.write_raw(&[self.event_count])?;
Ok(())
}
}
pub struct SnsEventSink {
_file: File,
entry: Group,
writers: Vec<(SnsBankConfig, SnsBankEventWriter)>,
options: SnsWriteOptions,
min_pulse_ns: Option<u64>,
last_pulse_ns_per_bank: Vec<u64>,
max_pulse_ns: u64,
counted_pulse_ns: HashSet<u64>,
total_counts: u64,
total_pulses: u64,
finalized: bool,
}
impl SnsEventSink {
pub fn create<P: AsRef<Path>>(path: P, options: SnsWriteOptions) -> Result<Self> {
let srf = options.super_resolution_factor;
if !srf.is_finite() || srf <= 0.0 {
return Err(crate::Error::InvalidFormat(format!(
"super_resolution_factor must be finite and positive, got {srf}"
)));
}
let file = File::create(path)?;
let entry = file.create_group("entry")?;
set_attr_str_group(&entry, "NX_class", "NXentry")?;
set_attr_str_group(&entry, "definition", "NXsnsevent")?;
write_str_dataset(&entry, "run_number", &options.run.run_number.to_string())?;
write_str_dataset(
&entry,
"experiment_identifier",
&options.run.experiment_identifier,
)?;
write_str_dataset(&entry, "start_time", &options.run.start_time)?;
write_str_dataset(
&entry,
"end_time",
options.run.end_time.as_deref().unwrap_or(""),
)?;
if let Some(title) = &options.run.title {
write_str_dataset(&entry, "title", title)?;
}
write_f64_dataset(
&entry,
"duration",
options.run.duration.unwrap_or(0.0),
"second",
)?;
write_f64_dataset(
&entry,
"proton_charge",
options.run.proton_charge.unwrap_or(0.0),
"picoCoulomb",
)?;
write_u64_dataset(&entry, "total_counts", 0)?;
write_u64_dataset(&entry, "total_pulses", 0)?;
let mut writers = Vec::with_capacity(options.banks.len());
for bank in &options.banks {
let group_name = format!("{}_events", bank.name);
let group = entry.create_group(&group_name)?;
set_attr_str_group(&group, "NX_class", "NXevent_data")?;
let writer = SnsBankEventWriter::new(&group, &options)?;
writers.push((bank.clone(), writer));
}
let instrument = entry.create_group("instrument")?;
set_attr_str_group(&instrument, "NX_class", "NXinstrument")?;
write_str_dataset(&instrument, "name", &options.instrument.name)?;
write_str_dataset(&instrument, "beamline", &options.instrument.beamline)?;
for bank in &options.banks {
let src = format!("{}_events", bank.name);
let dst = format!("instrument/{}", bank.name);
entry.link_hard(&src, &dst)?;
}
if let Some(ref xml) = options.instrument.instrument_xml {
let xml_group = instrument.create_group("instrument_xml")?;
set_attr_str_group(&xml_group, "NX_class", "NXnote")?;
write_str_dataset(&xml_group, "data", xml)?;
write_str_dataset(&xml_group, "type", "text/xml")?;
}
let daslogs = entry.create_group("DASlogs")?;
set_attr_str_group(&daslogs, "NX_class", "NXcollection")?;
let sample = entry.create_group("sample")?;
set_attr_str_group(&sample, "NX_class", "NXsample")?;
write_str_dataset(&sample, "name", "")?;
let num_banks = writers.len();
Ok(Self {
_file: file,
entry,
writers,
options,
min_pulse_ns: None,
last_pulse_ns_per_bank: vec![0u64; num_banks],
max_pulse_ns: 0,
counted_pulse_ns: HashSet::new(),
total_counts: 0,
total_pulses: 0,
finalized: false,
})
}
pub fn write_hits(&mut self, bank_index: usize, batch: &EventBatch) -> Result<()> {
if self.finalized {
return Err(Error::InvalidFormat(
"cannot write after finalization".into(),
));
}
if bank_index >= self.writers.len() {
return Err(Error::InvalidFormat(format!(
"bank index {bank_index} out of range (have {} banks)",
self.writers.len()
)));
}
let pulse_ns = batch.tdc_timestamp_25ns * NS_PER_TICK;
self.validate_pulse_monotonicity(pulse_ns, bank_index)?;
let (ref bank, ref mut writer) = self.writers[bank_index];
let written = writer.append_hits(bank, batch, pulse_ns)?;
if written > 0 {
self.track_written_pulse(pulse_ns);
self.total_counts += written as u64;
if self.counted_pulse_ns.insert(pulse_ns) {
self.total_pulses += 1;
}
}
Ok(())
}
pub fn write_neutrons(&mut self, bank_index: usize, batch: &NeutronEventBatch) -> Result<()> {
if self.finalized {
return Err(Error::InvalidFormat(
"cannot write after finalization".into(),
));
}
if bank_index >= self.writers.len() {
return Err(Error::InvalidFormat(format!(
"bank index {bank_index} out of range (have {} banks)",
self.writers.len()
)));
}
let pulse_ns = batch.tdc_timestamp_25ns * NS_PER_TICK;
self.validate_pulse_monotonicity(pulse_ns, bank_index)?;
let (ref bank, ref mut writer) = self.writers[bank_index];
let written =
writer.append_neutrons(bank, batch, pulse_ns, self.options.super_resolution_factor)?;
if written > 0 {
self.track_written_pulse(pulse_ns);
self.total_counts += written as u64;
if self.counted_pulse_ns.insert(pulse_ns) {
self.total_pulses += 1;
}
}
Ok(())
}
pub fn write_daslogs(&self, logs: &[DasLogEntry]) -> Result<()> {
if self.finalized {
return Err(Error::InvalidFormat(
"cannot write after finalization".into(),
));
}
let daslogs = self.entry.group("DASlogs")?;
for log in logs {
if log.time.len() != log.value.len() {
return Err(Error::InvalidFormat(format!(
"DASlog '{}': time length ({}) != value length ({})",
log.name,
log.time.len(),
log.value.len(),
)));
}
let group = daslogs.create_group(&log.name)?;
set_attr_str_group(&group, "NX_class", "NXlog")?;
let time_ds = group
.new_dataset::<f64>()
.shape((log.time.len(),))
.create("time")?;
time_ds.write_raw(&log.time)?;
set_dataset_units(&time_ds, "second")?;
let value_ds = group
.new_dataset::<f64>()
.shape((log.value.len(),))
.create("value")?;
value_ds.write_raw(&log.value)?;
if let Some(ref units) = log.units {
set_dataset_units(&value_ds, units)?;
}
}
Ok(())
}
#[allow(clippy::cast_precision_loss)]
pub fn finalize(&mut self) -> Result<()> {
if self.finalized {
return Ok(());
}
self.finalized = true;
let min_start_ns = self.min_pulse_ns.unwrap_or(0);
for (_bank, writer) in &self.writers {
writer.write_pulse_times(min_start_ns)?;
writer.write_total_counts()?;
}
overwrite_u64_dataset(&self.entry, "total_counts", self.total_counts)?;
overwrite_u64_dataset(&self.entry, "total_pulses", self.total_pulses)?;
if self.min_pulse_ns.is_some() {
let duration_s =
(self.max_pulse_ns.saturating_sub(min_start_ns)) as f64 / 1_000_000_000.0;
overwrite_f64_dataset(&self.entry, "duration", duration_s)?;
if let Some(start_epoch) = iso8601_to_epoch_secs(&self.options.run.start_time) {
let end_epoch = start_epoch + duration_s as u64;
let end_time = epoch_secs_to_iso8601(end_epoch);
overwrite_str_dataset(&self.entry, "end_time", &end_time)?;
}
}
Ok(())
}
fn validate_pulse_monotonicity(&mut self, pulse_ns: u64, bank_index: usize) -> Result<()> {
let bank_last = self.last_pulse_ns_per_bank[bank_index];
if pulse_ns < bank_last {
return Err(Error::InvalidFormat(format!(
"non-monotonic pulse timestamp for bank {bank_index}: \
{pulse_ns} ns < previous {bank_last} ns",
)));
}
self.last_pulse_ns_per_bank[bank_index] = pulse_ns;
Ok(())
}
fn track_written_pulse(&mut self, pulse_ns: u64) {
self.min_pulse_ns = Some(self.min_pulse_ns.map_or(pulse_ns, |m| m.min(pulse_ns)));
self.max_pulse_ns = self.max_pulse_ns.max(pulse_ns);
}
}
impl Drop for SnsEventSink {
fn drop(&mut self) {
let _ = self.finalize();
}
}
pub fn write_hits_sns<P, I>(path: P, batches: I, options: &SnsWriteOptions) -> Result<()>
where
P: AsRef<Path>,
I: IntoIterator<Item = EventBatch>,
{
let mut sink = SnsEventSink::create(path, options.clone())?;
for batch in batches {
sink.write_hits(0, &batch)?;
}
sink.finalize()?;
Ok(())
}
pub fn write_neutrons_sns<P, I>(path: P, batches: I, options: &SnsWriteOptions) -> Result<()>
where
P: AsRef<Path>,
I: IntoIterator<Item = NeutronEventBatch>,
{
let mut sink = SnsEventSink::create(path, options.clone())?;
for batch in batches {
sink.write_neutrons(0, &batch)?;
}
sink.finalize()?;
Ok(())
}
fn write_str_dataset(group: &Group, name: &str, value: &str) -> Result<()> {
let vlu = to_var_len_unicode(value)?;
group
.new_dataset::<VarLenUnicode>()
.shape(())
.create(name)?
.write_scalar(&vlu)?;
Ok(())
}
fn write_f64_dataset(group: &Group, name: &str, value: f64, units: &str) -> Result<()> {
let ds = group.new_dataset::<f64>().shape(()).create(name)?;
ds.write_scalar(&value)?;
set_dataset_units(&ds, units)?;
Ok(())
}
fn write_u64_dataset(group: &Group, name: &str, value: u64) -> Result<()> {
let ds = group.new_dataset::<u64>().shape(()).create(name)?;
ds.write_scalar(&value)?;
Ok(())
}
fn overwrite_u64_dataset(group: &Group, name: &str, value: u64) -> Result<()> {
let ds = group.dataset(name)?;
ds.write_scalar(&value)?;
Ok(())
}
fn overwrite_f64_dataset(group: &Group, name: &str, value: f64) -> Result<()> {
let ds = group.dataset(name)?;
ds.write_scalar(&value)?;
Ok(())
}
fn overwrite_str_dataset(group: &Group, name: &str, value: &str) -> Result<()> {
if group.dataset(name).is_ok() {
group.unlink(name)?;
}
write_str_dataset(group, name, value)?;
Ok(())
}
fn epoch_secs_to_iso8601(secs: u64) -> String {
let days = secs / 86400;
let rem = secs % 86400;
let hours = rem / 3600;
let minutes = (rem % 3600) / 60;
let seconds = rem % 60;
let mut y = 1970i64;
let mut d = i64::try_from(days).unwrap_or(0);
loop {
let year_days = if y % 4 == 0 && (y % 100 != 0 || y % 400 == 0) {
366
} else {
365
};
if d < year_days {
break;
}
d -= year_days;
y += 1;
}
let leap = y % 4 == 0 && (y % 100 != 0 || y % 400 == 0);
let month_days: [i64; 12] = [
31,
if leap { 29 } else { 28 },
31,
30,
31,
30,
31,
31,
30,
31,
30,
31,
];
let mut m = 0usize;
for (i, &md) in month_days.iter().enumerate() {
if d < md {
m = i;
break;
}
d -= md;
}
format!(
"{y:04}-{:02}-{:02}T{hours:02}:{minutes:02}:{seconds:02}Z",
m + 1,
d + 1,
)
}
fn iso8601_to_epoch_secs(s: &str) -> Option<u64> {
if s.len() < 19 {
return None;
}
let year: i64 = s.get(0..4)?.parse().ok()?;
let month: u32 = s.get(5..7)?.parse().ok()?;
let day: u32 = s.get(8..10)?.parse().ok()?;
let hour: i64 = s.get(11..13)?.parse().ok()?;
let min: i64 = s.get(14..16)?.parse().ok()?;
let sec: i64 = s.get(17..19)?.parse().ok()?;
let tz_offset_s: i64 = {
let tz = s.get(19..).unwrap_or("");
if tz == "Z" || tz.is_empty() {
0
} else if tz.starts_with('+') || tz.starts_with('-') {
let sign: i64 = if tz.starts_with('+') { 1 } else { -1 };
let tz = &tz[1..];
let oh: i64 = tz.get(0..2).and_then(|v| v.parse().ok())?;
let om: i64 = tz.get(3..5).and_then(|v| v.parse().ok()).unwrap_or(0);
sign * (oh * 3600 + om * 60)
} else {
return None; }
};
let mut days: i64 = 0;
for y in 1970..year {
days += if y % 4 == 0 && (y % 100 != 0 || y % 400 == 0) {
366
} else {
365
};
}
let leap = year % 4 == 0 && (year % 100 != 0 || year % 400 == 0);
let month_days: [u32; 12] = [
31,
if leap { 29 } else { 28 },
31,
30,
31,
30,
31,
31,
30,
31,
30,
31,
];
if month == 0 || month > 12 || day == 0 {
return None;
}
for &md in &month_days[..(month as usize - 1)] {
days += i64::from(md);
}
days += i64::from(day.saturating_sub(1));
let utc_secs = days * 86400 + hour * 3600 + min * 60 + sec - tz_offset_s;
u64::try_from(utc_secs).ok()
}
#[cfg(test)]
mod tests {
use super::*;
use rustpix_core::neutron::NeutronBatch;
use rustpix_core::soa::HitBatch;
use tempfile::NamedTempFile;
fn make_test_run() -> SnsRunMetadata {
SnsRunMetadata {
run_number: 99999,
experiment_identifier: "IPTS-00001".to_string(),
start_time: "2025-01-01T00:00:00-05:00".to_string(),
end_time: None,
duration: None,
proton_charge: Some(100.0),
title: Some("test run".to_string()),
}
}
fn make_test_options() -> SnsWriteOptions {
SnsWriteOptions::venus_defaults(make_test_run())
}
fn make_hit_batch(tdc: u64, xs: &[u16], ys: &[u16], tofs: &[u32]) -> EventBatch {
let n = xs.len();
let mut hits = HitBatch::with_capacity(n);
hits.x.extend_from_slice(xs);
hits.y.extend_from_slice(ys);
hits.tof.extend_from_slice(tofs);
hits.tot.extend_from_slice(&vec![10u16; n]);
hits.timestamp.extend_from_slice(&vec![0u32; n]);
hits.chip_id.extend_from_slice(&vec![0u8; n]);
hits.cluster_id.extend_from_slice(&vec![-1i32; n]);
EventBatch {
tdc_timestamp_25ns: tdc,
hits,
}
}
fn make_neutron_batch(tdc: u64, xs: &[f64], ys: &[f64], tofs: &[u32]) -> NeutronEventBatch {
let n = xs.len();
NeutronEventBatch {
tdc_timestamp_25ns: tdc,
neutrons: NeutronBatch {
x: xs.to_vec(),
y: ys.to_vec(),
tof: tofs.to_vec(),
tot: vec![10u16; n],
n_hits: vec![3u16; n],
chip_id: vec![0u8; n],
},
}
}
#[test]
fn test_pixel_id_origin() {
let opts = make_test_options();
let batch = make_hit_batch(1000, &[0], &[0], &[100]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let ids: Vec<u32> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_id")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(ids, vec![1_000_000u32]);
}
#[test]
fn test_pixel_id_corner() {
let opts = make_test_options();
let batch = make_hit_batch(1000, &[511], &[511], &[100]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let ids: Vec<u32> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_id")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(ids, vec![1_000_000 + 509 * 512 + 509]);
}
#[test]
fn test_tof_conversion() {
let opts = make_test_options();
let batch = make_hit_batch(1000, &[0], &[0], &[400]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let tof: Vec<f32> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_time_offset")
.unwrap()
.read_raw()
.unwrap();
assert!((tof[0] - 10.0f32).abs() < 1e-5);
}
#[test]
fn test_pulse_time_relative() {
let opts = make_test_options();
let batch1 = make_hit_batch(40_000_000, &[0], &[0], &[100]);
let batch2 = make_hit_batch(40_666_667, &[1], &[1], &[200]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch1).unwrap();
sink.write_hits(0, &batch2).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let etz: Vec<f64> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_time_zero")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(etz.len(), 2);
assert!((etz[0] - 0.0).abs() < 1e-12); let expected = 666_667.0 * 25.0 / 1_000_000_000.0; assert!((etz[1] - expected).abs() < 1e-9);
}
#[test]
fn test_event_index_tracking() {
let opts = make_test_options();
let batch1 = make_hit_batch(1000, &[0, 1, 2], &[0, 0, 0], &[100, 200, 300]);
let batch2 = make_hit_batch(2000, &[10, 11], &[10, 10], &[400, 500]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch1).unwrap();
sink.write_hits(0, &batch2).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let idx: Vec<u64> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_index")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(idx, vec![0u64, 3u64]); }
#[test]
fn test_hdf5_structure() {
let opts = make_test_options();
let batch = make_hit_batch(1000, &[5], &[10], &[100]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let entry = f.group("entry").unwrap();
let nx_class: VarLenUnicode = entry.attr("NX_class").unwrap().read_scalar().unwrap();
assert_eq!(nx_class.as_str(), "NXentry");
let definition: VarLenUnicode = entry.attr("definition").unwrap().read_scalar().unwrap();
assert_eq!(definition.as_str(), "NXsnsevent");
let bank = f.group("entry/bank100_events").unwrap();
let nx_class: VarLenUnicode = bank.attr("NX_class").unwrap().read_scalar().unwrap();
assert_eq!(nx_class.as_str(), "NXevent_data");
let inst = f.group("entry/instrument").unwrap();
let nx_class: VarLenUnicode = inst.attr("NX_class").unwrap().read_scalar().unwrap();
assert_eq!(nx_class.as_str(), "NXinstrument");
let link = f.group("entry/instrument/bank100").unwrap();
let link_ids: Vec<u32> = link.dataset("event_id").unwrap().read_raw().unwrap();
let direct_ids: Vec<u32> = bank.dataset("event_id").unwrap().read_raw().unwrap();
assert_eq!(link_ids, direct_ids);
let daslogs = f.group("entry/DASlogs").unwrap();
let nx_class: VarLenUnicode = daslogs.attr("NX_class").unwrap().read_scalar().unwrap();
assert_eq!(nx_class.as_str(), "NXcollection");
let run_num: VarLenUnicode = entry.dataset("run_number").unwrap().read_scalar().unwrap();
assert_eq!(run_num.as_str(), "99999");
}
#[test]
fn test_finalization_totals() {
let opts = make_test_options();
let batch1 = make_hit_batch(1000, &[0, 1], &[0, 0], &[100, 200]);
let batch2 = make_hit_batch(2000, &[2, 3, 4], &[0, 0, 0], &[300, 400, 500]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch1).unwrap();
sink.write_hits(0, &batch2).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let entry = f.group("entry").unwrap();
let tc: u64 = entry
.dataset("total_counts")
.unwrap()
.read_scalar()
.unwrap();
assert_eq!(tc, 5);
let tp: u64 = entry
.dataset("total_pulses")
.unwrap()
.read_scalar()
.unwrap();
assert_eq!(tp, 2);
let bank_tc: Vec<u64> = entry
.group("bank100_events")
.unwrap()
.dataset("total_counts")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(bank_tc, vec![5u64]);
}
#[test]
fn test_pulse_count_deduplication_across_banks() {
let mut opts = make_test_options();
opts.banks.push(SnsBankConfig {
name: "bank200".to_string(),
pixel_id_offset: 2_000_000,
width: 512,
height: 512,
gap_columns: vec![256, 257],
gap_rows: vec![256, 257],
});
let batch = make_hit_batch(1000, &[0], &[0], &[100]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.write_hits(1, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let entry = f.group("entry").unwrap();
let tc: u64 = entry
.dataset("total_counts")
.unwrap()
.read_scalar()
.unwrap();
assert_eq!(tc, 2);
let tp: u64 = entry
.dataset("total_pulses")
.unwrap()
.read_scalar()
.unwrap();
assert_eq!(tp, 1); }
#[test]
fn test_empty_batch() {
let opts = make_test_options();
let batch = make_hit_batch(1000, &[], &[], &[]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let ids: Vec<u32> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_id")
.unwrap()
.read_raw()
.unwrap();
assert!(ids.is_empty());
}
#[test]
fn test_neutron_pixel_ids() {
let mut opts = make_test_options();
opts.super_resolution_factor = 8.0;
let batch = make_neutron_batch(1000, &[80.0], &[160.0], &[100]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_neutrons(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let ids: Vec<u32> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_id")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(ids, vec![1_000_000 + 20 * 512 + 10]);
}
#[test]
fn test_daslogs_write() {
let opts = make_test_options();
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_daslogs(&[DasLogEntry {
name: "BL10:Mot:S1:X".to_string(),
time: vec![0.0, 1.0, 2.0],
value: vec![-212.0, -212.0, -212.0],
units: Some("mm".to_string()),
}])
.unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let log = f.group("entry/DASlogs/BL10:Mot:S1:X").unwrap();
let nx_class: VarLenUnicode = log.attr("NX_class").unwrap().read_scalar().unwrap();
assert_eq!(nx_class.as_str(), "NXlog");
let times: Vec<f64> = log.dataset("time").unwrap().read_raw().unwrap();
assert_eq!(times, vec![0.0, 1.0, 2.0]);
let values: Vec<f64> = log.dataset("value").unwrap().read_raw().unwrap();
assert_eq!(values, vec![-212.0, -212.0, -212.0]);
}
#[test]
fn test_daslogs_mismatched_lengths_rejected() {
let opts = make_test_options();
let file = NamedTempFile::new().unwrap();
let path = file.path();
let sink = SnsEventSink::create(path, opts).unwrap();
let err = sink
.write_daslogs(&[DasLogEntry {
name: "BL10:Mot:S1:X".to_string(),
time: vec![0.0, 1.0],
value: vec![-212.0],
units: Some("mm".to_string()),
}])
.unwrap_err();
assert!(
err.to_string().contains("time length"),
"Expected length mismatch error, got: {err}"
);
}
#[test]
fn test_venus_defaults() {
let opts = SnsWriteOptions::venus_defaults(make_test_run());
assert_eq!(opts.banks.len(), 1);
assert_eq!(opts.banks[0].name, "bank100");
assert_eq!(opts.banks[0].pixel_id_offset, 1_000_000);
assert_eq!(opts.banks[0].width, 512);
assert_eq!(opts.banks[0].height, 512);
assert_eq!(opts.instrument.name, "VENUS");
assert_eq!(opts.instrument.beamline, "BL10");
}
#[test]
fn test_dataset_dtypes() {
let opts = make_test_options();
let batch = make_hit_batch(1000, &[0], &[0], &[100]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let bank = f.group("entry/bank100_events").unwrap();
let eid = bank.dataset("event_id").unwrap();
assert!(eid.read_raw::<u32>().is_ok());
let eto = bank.dataset("event_time_offset").unwrap();
assert!(eto.read_raw::<f32>().is_ok());
let etz = bank.dataset("event_time_zero").unwrap();
assert!(etz.read_raw::<f64>().is_ok());
let idx = bank.dataset("event_index").unwrap();
assert!(idx.read_raw::<u64>().is_ok());
}
#[test]
fn test_hit_pixel_id_remapped_through_gap() {
let opts = make_test_options();
let batch = make_hit_batch(1000, &[513], &[513], &[100]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let ids: Vec<u32> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_id")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(ids, vec![1_000_000 + 511 * 512 + 511]);
}
#[test]
fn test_neutron_pixel_id_remapped_through_gap() {
let mut opts = make_test_options();
opts.super_resolution_factor = 8.0;
let batch = make_neutron_batch(1000, &[4104.0], &[4104.0], &[100]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_neutrons(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let ids: Vec<u32> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_id")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(ids, vec![1_000_000 + 511 * 512 + 511]);
}
#[test]
fn test_hit_gap_pixel_dropped() {
let opts = make_test_options();
let batch = make_hit_batch(1000, &[256], &[100], &[100]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let ids: Vec<u32> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_id")
.unwrap()
.read_raw()
.unwrap();
assert!(ids.is_empty(), "Gap pixel should have been dropped");
}
#[test]
fn test_hit_edge_column_preserved() {
let opts = make_test_options();
let batch = make_hit_batch(1000, &[258], &[0], &[100]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let ids: Vec<u32> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_id")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(ids, vec![1_000_256]);
}
#[test]
fn test_hit_no_gap_bank_passes_through() {
let mut opts = make_test_options();
opts.banks[0].gap_columns = vec![];
opts.banks[0].gap_rows = vec![];
let batch = make_hit_batch(1000, &[256], &[257], &[100]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let ids: Vec<u32> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_id")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(ids, vec![1_000_000 + 257 * 512 + 256]);
}
#[test]
fn test_invalid_bank_index() {
let opts = make_test_options();
let batch = make_hit_batch(1000, &[0], &[0], &[100]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
let result = sink.write_hits(99, &batch);
assert!(result.is_err());
}
#[test]
fn test_non_monotonic_pulse_timestamp_rejected() {
let opts = make_test_options();
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
let batch1 = make_hit_batch(2000, &[0], &[0], &[100]);
sink.write_hits(0, &batch1).unwrap();
let batch2 = make_hit_batch(1000, &[1], &[1], &[200]);
let result = sink.write_hits(0, &batch2);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("non-monotonic"),
"Expected non-monotonic error, got: {err_msg}"
);
}
#[test]
fn test_non_monotonic_regression_intermediate_decrease() {
let opts = make_test_options();
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &make_hit_batch(1000, &[0], &[0], &[100]))
.unwrap();
sink.write_hits(0, &make_hit_batch(2000, &[1], &[1], &[200]))
.unwrap();
let result = sink.write_hits(0, &make_hit_batch(1500, &[2], &[2], &[300]));
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("non-monotonic"),);
}
#[test]
fn test_hit_out_of_bounds_dropped() {
let mut opts = make_test_options();
opts.banks[0].gap_columns = vec![];
opts.banks[0].gap_rows = vec![];
let batch = make_hit_batch(1000, &[5, 520], &[3, 3], &[100, 200]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let ids: Vec<u32> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_id")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(ids, vec![1_000_000 + 3 * 512 + 5]);
}
#[test]
fn test_neutron_out_of_bounds_dropped() {
let mut opts = make_test_options();
opts.banks[0].gap_columns = vec![];
opts.banks[0].gap_rows = vec![];
opts.super_resolution_factor = 1.0;
let batch = make_neutron_batch(1000, &[5.0, 520.0], &[3.0, 3.0], &[100, 200]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_neutrons(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let ids: Vec<u32> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_id")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(ids, vec![1_000_000 + 3 * 512 + 5]);
}
#[test]
fn test_all_gap_pulse_no_pulse_entry() {
let opts = make_test_options();
let batch = make_hit_batch(1000, &[256, 257], &[0, 0], &[100, 200]);
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let entry = f.group("entry").unwrap();
let tp: u64 = entry
.dataset("total_pulses")
.unwrap()
.read_scalar()
.unwrap();
assert_eq!(tp, 0, "All-gap pulse should not increment total_pulses");
let tc: u64 = entry
.dataset("total_counts")
.unwrap()
.read_scalar()
.unwrap();
assert_eq!(tc, 0);
let etz: Vec<f64> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_time_zero")
.unwrap()
.read_raw()
.unwrap();
assert!(
etz.is_empty(),
"No event_time_zero entries for all-gap pulse"
);
let idx: Vec<u64> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_index")
.unwrap()
.read_raw()
.unwrap();
assert!(idx.is_empty(), "No event_index entries for all-gap pulse");
}
#[test]
fn test_mixed_gap_and_valid_pulses() {
let opts = make_test_options();
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
let gap_batch = make_hit_batch(1000, &[256, 257], &[0, 0], &[100, 200]);
sink.write_hits(0, &gap_batch).unwrap();
let valid_batch = make_hit_batch(2000, &[5], &[3], &[300]);
sink.write_hits(0, &valid_batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let entry = f.group("entry").unwrap();
let tp: u64 = entry
.dataset("total_pulses")
.unwrap()
.read_scalar()
.unwrap();
assert_eq!(tp, 1, "Only the valid pulse should be counted");
let tc: u64 = entry
.dataset("total_counts")
.unwrap()
.read_scalar()
.unwrap();
assert_eq!(tc, 1);
let etz: Vec<f64> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_time_zero")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(etz.len(), 1, "Only one pulse entry");
let idx: Vec<u64> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_index")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(idx, vec![0u64], "Single pulse starts at event 0");
}
#[test]
fn test_empty_pulse_does_not_shift_time_origin() {
let opts = make_test_options();
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
let gap_batch = make_hit_batch(1000, &[256, 257], &[0, 0], &[100, 200]);
sink.write_hits(0, &gap_batch).unwrap();
let valid_batch = make_hit_batch(2000, &[5], &[3], &[300]);
sink.write_hits(0, &valid_batch).unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let etz: Vec<f64> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_time_zero")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(etz.len(), 1, "Only the non-empty pulse gets an entry");
assert!(
(etz[0] - 0.0).abs() < 1e-12,
"First written pulse must be time-origin zero, got {}",
etz[0]
);
}
#[test]
fn test_empty_pulse_does_not_shift_duration() {
let opts = make_test_options();
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &make_hit_batch(1000, &[5], &[3], &[100]))
.unwrap();
sink.write_hits(0, &make_hit_batch(2000, &[5], &[3], &[200]))
.unwrap();
sink.write_hits(0, &make_hit_batch(5000, &[256], &[0], &[300]))
.unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let entry = f.group("entry").unwrap();
let duration: f64 = entry.dataset("duration").unwrap().read_scalar().unwrap();
let expected = f64::from(2000 - 1000) * 25e-9;
assert!(
(duration - expected).abs() < 1e-12,
"Duration must reflect only written pulses: expected {expected}, got {duration}"
);
}
#[test]
fn test_monotonic_rebased_timestamps_accepted() {
let opts = make_test_options();
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &make_hit_batch(1000, &[0], &[0], &[100]))
.unwrap();
sink.write_hits(0, &make_hit_batch(2000, &[1], &[1], &[200]))
.unwrap();
let tdc_offset: u64 = 2001; sink.write_hits(0, &make_hit_batch(500 + tdc_offset, &[2], &[2], &[300]))
.unwrap();
sink.write_hits(0, &make_hit_batch(1500 + tdc_offset, &[3], &[3], &[400]))
.unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let entry = f.group("entry").unwrap();
let tp: u64 = entry
.dataset("total_pulses")
.unwrap()
.read_scalar()
.unwrap();
assert_eq!(tp, 4);
let tc: u64 = entry
.dataset("total_counts")
.unwrap()
.read_scalar()
.unwrap();
assert_eq!(tc, 4);
let etz: Vec<f64> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_time_zero")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(etz.len(), 4);
for i in 1..etz.len() {
assert!(
etz[i] > etz[i - 1],
"Pulse times must be monotonically increasing: etz[{}]={} <= etz[{}]={}",
i,
etz[i],
i - 1,
etz[i - 1]
);
}
}
#[test]
fn test_sequential_multi_bank_writes_accepted() {
let mut opts = make_test_options();
opts.banks.push(SnsBankConfig {
name: "bank200".to_string(),
pixel_id_offset: 2_000_000,
width: 512,
height: 512,
gap_columns: vec![256, 257],
gap_rows: vec![256, 257],
});
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &make_hit_batch(1000, &[0], &[0], &[100]))
.unwrap();
sink.write_hits(0, &make_hit_batch(2000, &[1], &[1], &[200]))
.unwrap();
sink.write_hits(0, &make_hit_batch(3000, &[2], &[2], &[300]))
.unwrap();
sink.write_hits(1, &make_hit_batch(1000, &[0], &[0], &[100]))
.unwrap();
sink.write_hits(1, &make_hit_batch(2000, &[1], &[1], &[200]))
.unwrap();
sink.write_hits(1, &make_hit_batch(3000, &[2], &[2], &[300]))
.unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let entry = f.group("entry").unwrap();
let tc: u64 = entry
.dataset("total_counts")
.unwrap()
.read_scalar()
.unwrap();
assert_eq!(tc, 6);
let tp: u64 = entry
.dataset("total_pulses")
.unwrap()
.read_scalar()
.unwrap();
assert_eq!(tp, 3);
for bank_name in &["bank100_events", "bank200_events"] {
let bank = f.group(&format!("entry/{bank_name}")).unwrap();
let ids: Vec<u32> = bank.dataset("event_id").unwrap().read_raw().unwrap();
assert_eq!(ids.len(), 3);
let etz: Vec<f64> = bank.dataset("event_time_zero").unwrap().read_raw().unwrap();
assert_eq!(etz.len(), 3);
}
}
#[test]
fn test_per_bank_non_monotonic_rejected() {
let mut opts = make_test_options();
opts.banks.push(SnsBankConfig {
name: "bank200".to_string(),
pixel_id_offset: 2_000_000,
width: 512,
height: 512,
gap_columns: vec![256, 257],
gap_rows: vec![256, 257],
});
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(1, &make_hit_batch(2000, &[0], &[0], &[100]))
.unwrap();
let result = sink.write_hits(1, &make_hit_batch(1000, &[1], &[1], &[200]));
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("non-monotonic"));
}
#[test]
fn test_later_bank_with_earlier_timestamps() {
let mut opts = make_test_options();
opts.banks.push(SnsBankConfig {
name: "bank200".to_string(),
pixel_id_offset: 2_000_000,
width: 512,
height: 512,
gap_columns: vec![256, 257],
gap_rows: vec![256, 257],
});
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &make_hit_batch(5000, &[0], &[0], &[100]))
.unwrap();
sink.write_hits(0, &make_hit_batch(6000, &[1], &[1], &[200]))
.unwrap();
sink.write_hits(1, &make_hit_batch(1000, &[0], &[0], &[100]))
.unwrap();
sink.write_hits(1, &make_hit_batch(2000, &[1], &[1], &[200]))
.unwrap();
sink.finalize().unwrap();
let f = File::open(path).unwrap();
let etz1: Vec<f64> = f
.group("entry/bank200_events")
.unwrap()
.dataset("event_time_zero")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(etz1.len(), 2);
assert!(
etz1[0].abs() < 1e-12,
"Bank 1's first pulse should be at t=0, got {}",
etz1[0]
);
let expected_1 = (2000.0 - 1000.0) * 25.0 / 1_000_000_000.0;
assert!(
(etz1[1] - expected_1).abs() < 1e-12,
"Expected {expected_1}, got {}",
etz1[1]
);
let etz0: Vec<f64> = f
.group("entry/bank100_events")
.unwrap()
.dataset("event_time_zero")
.unwrap()
.read_raw()
.unwrap();
assert_eq!(etz0.len(), 2);
let expected_0_first = (5000.0 - 1000.0) * 25.0 / 1_000_000_000.0;
assert!(
(etz0[0] - expected_0_first).abs() < 1e-12,
"Expected {expected_0_first}, got {}",
etz0[0]
);
let entry = f.group("entry").unwrap();
let duration: f64 = entry.dataset("duration").unwrap().read_scalar().unwrap();
let expected_dur = 5000.0 * 25.0 / 1_000_000_000.0;
assert!(
(duration - expected_dur).abs() < 1e-12,
"Expected duration {expected_dur}, got {duration}"
);
}
#[test]
fn test_write_hits_after_finalize_rejected() {
let opts = make_test_options();
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_hits(0, &make_hit_batch(1000, &[0], &[0], &[100]))
.unwrap();
sink.finalize().unwrap();
let err = sink
.write_hits(0, &make_hit_batch(2000, &[1], &[1], &[200]))
.unwrap_err();
assert!(
err.to_string().contains("finalization"),
"Expected finalization error, got: {err}"
);
}
#[test]
fn test_write_neutrons_after_finalize_rejected() {
let opts = make_test_options();
let file = NamedTempFile::new().unwrap();
let path = file.path();
let mut sink = SnsEventSink::create(path, opts).unwrap();
sink.write_neutrons(0, &make_neutron_batch(1000, &[0.5], &[0.5], &[100]))
.unwrap();
sink.finalize().unwrap();
let err = sink
.write_neutrons(0, &make_neutron_batch(2000, &[1.5], &[1.5], &[200]))
.unwrap_err();
assert!(
err.to_string().contains("finalization"),
"Expected finalization error, got: {err}"
);
}
}