Skip to main content

rustpix_io/
out_of_core.rs

1//! Out-of-core batching utilities for pulse-ordered TPX3 streams.
2
3use crate::reader::{EventBatch, TimeOrderedEventStream, Tpx3FileReader};
4use crate::{Error, Result};
5use rustpix_core::soa::HitBatch;
6use std::collections::VecDeque;
7use std::mem::size_of;
8use sysinfo::System;
9
10const MEMORY_OVERHEAD_FACTOR: f64 = 1.2;
11
12/// Configuration for out-of-core batching.
13#[derive(Clone, Debug)]
14pub struct OutOfCoreConfig {
15    /// Fraction of available system memory to target (0.0 < fraction <= 1.0).
16    pub memory_fraction: f64,
17    /// Explicit memory budget override (bytes). If set, `memory_fraction` is ignored.
18    pub memory_budget_bytes: Option<usize>,
19    /// Optional number of worker threads for parallel slice processing.
20    pub parallelism: Option<usize>,
21    /// Bounded queue depth for pipeline stages.
22    pub queue_depth: usize,
23    /// Enable async pipeline stage execution.
24    pub async_io: bool,
25}
26
27impl Default for OutOfCoreConfig {
28    fn default() -> Self {
29        Self {
30            memory_fraction: 0.5,
31            memory_budget_bytes: None,
32            parallelism: None,
33            queue_depth: 2,
34            async_io: false,
35        }
36    }
37}
38
39impl OutOfCoreConfig {
40    /// Set the fraction of available system memory to target.
41    #[must_use]
42    pub fn with_memory_fraction(mut self, fraction: f64) -> Self {
43        self.memory_fraction = fraction;
44        self
45    }
46
47    /// Set an explicit memory budget in bytes.
48    #[must_use]
49    pub fn with_memory_budget_bytes(mut self, bytes: usize) -> Self {
50        self.memory_budget_bytes = Some(bytes);
51        self
52    }
53
54    /// Set the number of worker threads for slice processing.
55    ///
56    /// Values less than 1 are clamped to 1. Use [`Self::try_with_parallelism`]
57    /// to surface invalid values as an error instead.
58    #[must_use]
59    pub fn with_parallelism(mut self, threads: usize) -> Self {
60        self.parallelism = Some(threads.max(1));
61        self
62    }
63
64    /// Set the bounded queue depth for pipeline stages.
65    ///
66    /// Values less than 1 are clamped to 1. Use [`Self::try_with_queue_depth`]
67    /// to surface invalid values as an error instead.
68    #[must_use]
69    pub fn with_queue_depth(mut self, depth: usize) -> Self {
70        self.queue_depth = depth.max(1);
71        self
72    }
73
74    /// Enable or disable async pipeline stage execution.
75    #[must_use]
76    pub fn with_async_io(mut self, enabled: bool) -> Self {
77        self.async_io = enabled;
78        self
79    }
80
81    /// Fallible variant of [`Self::with_parallelism`].
82    ///
83    /// # Errors
84    /// Returns an error if `threads` is 0.
85    pub fn try_with_parallelism(mut self, threads: usize) -> Result<Self> {
86        if threads == 0 {
87            return Err(Error::InvalidFormat(
88                "parallelism must be at least 1".to_string(),
89            ));
90        }
91        self.parallelism = Some(threads);
92        Ok(self)
93    }
94
95    /// Fallible variant of [`Self::with_queue_depth`].
96    ///
97    /// # Errors
98    /// Returns an error if `depth` is 0.
99    pub fn try_with_queue_depth(mut self, depth: usize) -> Result<Self> {
100        if depth == 0 {
101            return Err(Error::InvalidFormat(
102                "queue_depth must be at least 1".to_string(),
103            ));
104        }
105        self.queue_depth = depth;
106        Ok(self)
107    }
108
109    /// Return the configured worker thread count, clamped to at least 1.
110    #[must_use]
111    pub fn effective_parallelism(&self) -> usize {
112        self.parallelism.unwrap_or(1).max(1)
113    }
114
115    /// Return the configured queue depth, clamped to at least 1.
116    #[must_use]
117    pub fn effective_queue_depth(&self) -> usize {
118        self.queue_depth.max(1)
119    }
120
121    /// Returns true when the threaded pipeline should be used.
122    #[must_use]
123    pub fn use_threaded_pipeline(&self) -> bool {
124        self.async_io || self.parallelism.unwrap_or(1) > 1
125    }
126
127    /// Resolve the target memory budget in bytes.
128    ///
129    /// # Errors
130    /// Returns an error if the memory fraction is invalid or system memory cannot be queried.
131    #[allow(
132        clippy::cast_possible_truncation,
133        clippy::cast_precision_loss,
134        clippy::cast_sign_loss
135    )]
136    pub fn resolve_budget_bytes(&self) -> Result<usize> {
137        if let Some(bytes) = self.memory_budget_bytes {
138            return Ok(bytes);
139        }
140        if !(0.0 < self.memory_fraction && self.memory_fraction <= 1.0) {
141            return Err(Error::InvalidFormat(
142                "memory_fraction must be in (0.0, 1.0]".to_string(),
143            ));
144        }
145        let mut system = System::new();
146        system.refresh_memory();
147        let available = system.available_memory();
148        if available == 0 {
149            return Err(Error::InvalidFormat(
150                "available system memory reported as 0".to_string(),
151            ));
152        }
153        let budget = (available as f64 * self.memory_fraction).floor() as u64;
154        Ok(usize::try_from(budget).unwrap_or(usize::MAX))
155    }
156}
157
158/// A pulse slice with an emission cutoff for overlap handling.
159#[derive(Clone, Debug)]
160pub struct PulseSlice {
161    /// Pulse TDC timestamp (25ns ticks).
162    pub tdc_timestamp_25ns: u64,
163    /// Hits belonging to this slice.
164    pub hits: HitBatch,
165    /// Emit only clusters/neutrons with representative TOF <= cutoff.
166    pub emit_cutoff_tof: u32,
167}
168
169impl PulseSlice {
170    /// Number of hits in the slice.
171    #[must_use]
172    pub fn len(&self) -> usize {
173        self.hits.len()
174    }
175
176    /// Returns true when the slice has no hits.
177    #[must_use]
178    pub fn is_empty(&self) -> bool {
179        self.hits.is_empty()
180    }
181}
182
183/// A bounded batch of pulse slices.
184#[derive(Clone, Debug, Default)]
185pub struct PulseBatchGroup {
186    /// Pulse slices in this batch.
187    pub slices: Vec<PulseSlice>,
188    /// Estimated memory footprint (bytes).
189    pub estimated_bytes: usize,
190}
191
192impl PulseBatchGroup {
193    /// Number of slices in the group.
194    #[must_use]
195    pub fn len(&self) -> usize {
196        self.slices.len()
197    }
198
199    /// Returns true when the group has no slices.
200    #[must_use]
201    pub fn is_empty(&self) -> bool {
202        self.slices.is_empty()
203    }
204
205    /// Total number of hits across all slices.
206    #[must_use]
207    pub fn total_hits(&self) -> usize {
208        self.slices.iter().map(PulseSlice::len).sum()
209    }
210}
211
212/// Batcher that groups pulse slices into bounded-memory batches.
213pub struct PulseBatcher<I>
214where
215    I: Iterator<Item = EventBatch>,
216{
217    source: I,
218    queue: VecDeque<PulseSlice>,
219    max_hits: usize,
220    overlap_tof: u32,
221    bytes_per_hit: usize,
222}
223
224impl<I> PulseBatcher<I>
225where
226    I: Iterator<Item = EventBatch>,
227{
228    /// Create a new batcher from a pulse-ordered event stream.
229    ///
230    /// `overlap_tof` is in 25ns ticks and is used only when a single pulse must
231    /// be split to respect the memory budget.
232    ///
233    /// # Errors
234    /// Returns an error if the memory budget cannot be resolved.
235    pub fn new(source: I, config: &OutOfCoreConfig, overlap_tof: u32) -> Result<Self> {
236        let bytes_per_hit = bytes_per_hit();
237        let budget = config.resolve_budget_bytes()?;
238        let max_hits = max_hits_for_budget(budget, bytes_per_hit);
239        Ok(Self {
240            source,
241            queue: VecDeque::new(),
242            max_hits,
243            overlap_tof,
244            bytes_per_hit,
245        })
246    }
247
248    fn next_group(&mut self) -> Option<PulseBatchGroup> {
249        let mut group = PulseBatchGroup::default();
250        let mut group_hits = 0usize;
251
252        loop {
253            while let Some(slice) = self.queue.front() {
254                let slice_hits = slice.len();
255                if group_hits == 0 || group_hits.saturating_add(slice_hits) <= self.max_hits {
256                    let slice = self.queue.pop_front().expect("queue not empty");
257                    group_hits = group_hits.saturating_add(slice_hits);
258                    group.slices.push(slice);
259                } else {
260                    break;
261                }
262            }
263
264            if !group.is_empty() {
265                group.estimated_bytes = estimate_batch_bytes(group_hits, self.bytes_per_hit);
266                return Some(group);
267            }
268
269            let next = self.source.next()?;
270            let slices = split_pulse_with_overlap(next, self.max_hits, self.overlap_tof);
271            for slice in slices {
272                self.queue.push_back(slice);
273            }
274        }
275    }
276}
277
278impl<I> Iterator for PulseBatcher<I>
279where
280    I: Iterator<Item = EventBatch>,
281{
282    type Item = PulseBatchGroup;
283
284    fn next(&mut self) -> Option<Self::Item> {
285        self.next_group()
286    }
287}
288
289/// Convenience constructor for a reader-backed batcher.
290///
291/// # Errors
292/// Returns an error if the underlying reader or memory budget fails.
293pub fn pulse_batches(
294    reader: &Tpx3FileReader,
295    config: &OutOfCoreConfig,
296    overlap_tof: u32,
297) -> Result<PulseBatcher<TimeOrderedEventStream>> {
298    let stream = reader.stream_time_ordered_events()?;
299    PulseBatcher::new(stream, config, overlap_tof)
300}
301
302fn bytes_per_hit() -> usize {
303    size_of::<u16>() * 2
304        + size_of::<u32>() * 2
305        + size_of::<u16>()
306        + size_of::<u8>()
307        + size_of::<i32>()
308}
309
310#[allow(
311    clippy::cast_possible_truncation,
312    clippy::cast_precision_loss,
313    clippy::cast_sign_loss
314)]
315fn max_hits_for_budget(budget_bytes: usize, bytes_per_hit: usize) -> usize {
316    let per_hit = (bytes_per_hit as f64 * MEMORY_OVERHEAD_FACTOR).ceil() as usize;
317    let per_hit = per_hit.max(1);
318    (budget_bytes / per_hit).max(1)
319}
320
321#[allow(
322    clippy::cast_possible_truncation,
323    clippy::cast_precision_loss,
324    clippy::cast_sign_loss
325)]
326fn estimate_batch_bytes(hit_count: usize, bytes_per_hit: usize) -> usize {
327    let per_hit = bytes_per_hit as f64 * MEMORY_OVERHEAD_FACTOR;
328    (hit_count as f64 * per_hit).ceil() as usize
329}
330
331fn split_pulse_with_overlap(
332    batch: EventBatch,
333    max_hits: usize,
334    overlap_tof: u32,
335) -> Vec<PulseSlice> {
336    let hits = batch.hits;
337    let total = hits.len();
338    if total == 0 {
339        return Vec::new();
340    }
341
342    if total <= max_hits {
343        let cutoff = *hits.tof.last().unwrap_or(&0);
344        return vec![PulseSlice {
345            tdc_timestamp_25ns: batch.tdc_timestamp_25ns,
346            hits,
347            emit_cutoff_tof: cutoff,
348        }];
349    }
350
351    let mut slices = Vec::new();
352    let mut start = 0usize;
353    while start < total {
354        let mut end = (start + max_hits).min(total);
355        if end == start {
356            end = (start + 1).min(total);
357        }
358
359        let cutoff_tof = hits.tof[end - 1];
360        while end < total && hits.tof[end] == cutoff_tof {
361            end += 1;
362        }
363
364        let mut overlap_end = end;
365        if overlap_tof > 0 {
366            let overlap_limit = cutoff_tof.saturating_add(overlap_tof);
367            while overlap_end < total && hits.tof[overlap_end] <= overlap_limit {
368                overlap_end += 1;
369            }
370        }
371
372        let slice = slice_hits(&hits, start, overlap_end);
373        slices.push(PulseSlice {
374            tdc_timestamp_25ns: batch.tdc_timestamp_25ns,
375            hits: slice,
376            emit_cutoff_tof: cutoff_tof,
377        });
378
379        start = end;
380    }
381
382    slices
383}
384
385fn slice_hits(batch: &HitBatch, start: usize, end: usize) -> HitBatch {
386    let len = end.saturating_sub(start);
387    let mut sliced = HitBatch::with_capacity(len);
388    sliced.x.extend_from_slice(&batch.x[start..end]);
389    sliced.y.extend_from_slice(&batch.y[start..end]);
390    sliced.tof.extend_from_slice(&batch.tof[start..end]);
391    sliced.tot.extend_from_slice(&batch.tot[start..end]);
392    sliced
393        .timestamp
394        .extend_from_slice(&batch.timestamp[start..end]);
395    sliced.chip_id.extend_from_slice(&batch.chip_id[start..end]);
396    sliced
397        .cluster_id
398        .extend_from_slice(&batch.cluster_id[start..end]);
399    sliced
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405
406    fn make_hit_batch(tofs: &[u32]) -> HitBatch {
407        let mut batch = HitBatch::with_capacity(tofs.len());
408        for (i, &tof) in tofs.iter().enumerate() {
409            let x = u16::try_from(i % 512).unwrap_or(0);
410            let y = u16::try_from(i / 512).unwrap_or(0);
411            batch.push((x, y, tof, 0, tof, 0));
412        }
413        batch
414    }
415
416    #[test]
417    fn split_pulse_with_overlap_keeps_order() {
418        let tofs: Vec<u32> = (0..10).collect();
419        let event = EventBatch {
420            tdc_timestamp_25ns: 0,
421            hits: make_hit_batch(&tofs),
422        };
423        let slices = split_pulse_with_overlap(event, 4, 1);
424        assert_eq!(slices.len(), 3);
425        assert_eq!(slices[0].hits.tof, vec![0, 1, 2, 3, 4]);
426        assert_eq!(slices[0].emit_cutoff_tof, 3);
427        assert_eq!(slices[1].hits.tof, vec![4, 5, 6, 7, 8]);
428        assert_eq!(slices[1].emit_cutoff_tof, 7);
429        assert_eq!(slices[2].hits.tof, vec![8, 9]);
430        assert_eq!(slices[2].emit_cutoff_tof, 9);
431    }
432
433    #[test]
434    fn batcher_groups_pulses_under_budget() {
435        let bytes = bytes_per_hit() * 4;
436        let config = OutOfCoreConfig::default().with_memory_budget_bytes(bytes);
437        let overlap_tof = 0;
438
439        let pulses = vec![
440            EventBatch {
441                tdc_timestamp_25ns: 0,
442                hits: make_hit_batch(&[0, 1]),
443            },
444            EventBatch {
445                tdc_timestamp_25ns: 1,
446                hits: make_hit_batch(&[0, 1]),
447            },
448            EventBatch {
449                tdc_timestamp_25ns: 2,
450                hits: make_hit_batch(&[0, 1]),
451            },
452        ];
453
454        let mut batcher = PulseBatcher::new(pulses.into_iter(), &config, overlap_tof).unwrap();
455        let max_hits = max_hits_for_budget(bytes, bytes_per_hit());
456        let mut total_hits = 0usize;
457        let mut batch_count = 0usize;
458        for batch in &mut batcher {
459            assert!(batch.total_hits() <= max_hits);
460            total_hits += batch.total_hits();
461            batch_count += 1;
462        }
463
464        assert_eq!(total_hits, 6);
465        assert!(batch_count >= 2);
466    }
467}