1use crate::reader::{EventBatch, TimeOrderedEventStream, Tpx3FileReader};
4use crate::{Error, Result};
5use rustpix_core::soa::HitBatch;
6use std::collections::VecDeque;
7use std::mem::size_of;
8use sysinfo::System;
9
10const MEMORY_OVERHEAD_FACTOR: f64 = 1.2;
11
12#[derive(Clone, Debug)]
14pub struct OutOfCoreConfig {
15 pub memory_fraction: f64,
17 pub memory_budget_bytes: Option<usize>,
19 pub parallelism: Option<usize>,
21 pub queue_depth: usize,
23 pub async_io: bool,
25}
26
27impl Default for OutOfCoreConfig {
28 fn default() -> Self {
29 Self {
30 memory_fraction: 0.5,
31 memory_budget_bytes: None,
32 parallelism: None,
33 queue_depth: 2,
34 async_io: false,
35 }
36 }
37}
38
39impl OutOfCoreConfig {
40 #[must_use]
42 pub fn with_memory_fraction(mut self, fraction: f64) -> Self {
43 self.memory_fraction = fraction;
44 self
45 }
46
47 #[must_use]
49 pub fn with_memory_budget_bytes(mut self, bytes: usize) -> Self {
50 self.memory_budget_bytes = Some(bytes);
51 self
52 }
53
54 #[must_use]
59 pub fn with_parallelism(mut self, threads: usize) -> Self {
60 self.parallelism = Some(threads.max(1));
61 self
62 }
63
64 #[must_use]
69 pub fn with_queue_depth(mut self, depth: usize) -> Self {
70 self.queue_depth = depth.max(1);
71 self
72 }
73
74 #[must_use]
76 pub fn with_async_io(mut self, enabled: bool) -> Self {
77 self.async_io = enabled;
78 self
79 }
80
81 pub fn try_with_parallelism(mut self, threads: usize) -> Result<Self> {
86 if threads == 0 {
87 return Err(Error::InvalidFormat(
88 "parallelism must be at least 1".to_string(),
89 ));
90 }
91 self.parallelism = Some(threads);
92 Ok(self)
93 }
94
95 pub fn try_with_queue_depth(mut self, depth: usize) -> Result<Self> {
100 if depth == 0 {
101 return Err(Error::InvalidFormat(
102 "queue_depth must be at least 1".to_string(),
103 ));
104 }
105 self.queue_depth = depth;
106 Ok(self)
107 }
108
109 #[must_use]
111 pub fn effective_parallelism(&self) -> usize {
112 self.parallelism.unwrap_or(1).max(1)
113 }
114
115 #[must_use]
117 pub fn effective_queue_depth(&self) -> usize {
118 self.queue_depth.max(1)
119 }
120
121 #[must_use]
123 pub fn use_threaded_pipeline(&self) -> bool {
124 self.async_io || self.parallelism.unwrap_or(1) > 1
125 }
126
127 #[allow(
132 clippy::cast_possible_truncation,
133 clippy::cast_precision_loss,
134 clippy::cast_sign_loss
135 )]
136 pub fn resolve_budget_bytes(&self) -> Result<usize> {
137 if let Some(bytes) = self.memory_budget_bytes {
138 return Ok(bytes);
139 }
140 if !(0.0 < self.memory_fraction && self.memory_fraction <= 1.0) {
141 return Err(Error::InvalidFormat(
142 "memory_fraction must be in (0.0, 1.0]".to_string(),
143 ));
144 }
145 let mut system = System::new();
146 system.refresh_memory();
147 let available = system.available_memory();
148 if available == 0 {
149 return Err(Error::InvalidFormat(
150 "available system memory reported as 0".to_string(),
151 ));
152 }
153 let budget = (available as f64 * self.memory_fraction).floor() as u64;
154 Ok(usize::try_from(budget).unwrap_or(usize::MAX))
155 }
156}
157
158#[derive(Clone, Debug)]
160pub struct PulseSlice {
161 pub tdc_timestamp_25ns: u64,
163 pub hits: HitBatch,
165 pub emit_cutoff_tof: u32,
167}
168
169impl PulseSlice {
170 #[must_use]
172 pub fn len(&self) -> usize {
173 self.hits.len()
174 }
175
176 #[must_use]
178 pub fn is_empty(&self) -> bool {
179 self.hits.is_empty()
180 }
181}
182
183#[derive(Clone, Debug, Default)]
185pub struct PulseBatchGroup {
186 pub slices: Vec<PulseSlice>,
188 pub estimated_bytes: usize,
190}
191
192impl PulseBatchGroup {
193 #[must_use]
195 pub fn len(&self) -> usize {
196 self.slices.len()
197 }
198
199 #[must_use]
201 pub fn is_empty(&self) -> bool {
202 self.slices.is_empty()
203 }
204
205 #[must_use]
207 pub fn total_hits(&self) -> usize {
208 self.slices.iter().map(PulseSlice::len).sum()
209 }
210}
211
212pub struct PulseBatcher<I>
214where
215 I: Iterator<Item = EventBatch>,
216{
217 source: I,
218 queue: VecDeque<PulseSlice>,
219 max_hits: usize,
220 overlap_tof: u32,
221 bytes_per_hit: usize,
222}
223
224impl<I> PulseBatcher<I>
225where
226 I: Iterator<Item = EventBatch>,
227{
228 pub fn new(source: I, config: &OutOfCoreConfig, overlap_tof: u32) -> Result<Self> {
236 let bytes_per_hit = bytes_per_hit();
237 let budget = config.resolve_budget_bytes()?;
238 let max_hits = max_hits_for_budget(budget, bytes_per_hit);
239 Ok(Self {
240 source,
241 queue: VecDeque::new(),
242 max_hits,
243 overlap_tof,
244 bytes_per_hit,
245 })
246 }
247
248 fn next_group(&mut self) -> Option<PulseBatchGroup> {
249 let mut group = PulseBatchGroup::default();
250 let mut group_hits = 0usize;
251
252 loop {
253 while let Some(slice) = self.queue.front() {
254 let slice_hits = slice.len();
255 if group_hits == 0 || group_hits.saturating_add(slice_hits) <= self.max_hits {
256 let slice = self.queue.pop_front().expect("queue not empty");
257 group_hits = group_hits.saturating_add(slice_hits);
258 group.slices.push(slice);
259 } else {
260 break;
261 }
262 }
263
264 if !group.is_empty() {
265 group.estimated_bytes = estimate_batch_bytes(group_hits, self.bytes_per_hit);
266 return Some(group);
267 }
268
269 let next = self.source.next()?;
270 let slices = split_pulse_with_overlap(next, self.max_hits, self.overlap_tof);
271 for slice in slices {
272 self.queue.push_back(slice);
273 }
274 }
275 }
276}
277
278impl<I> Iterator for PulseBatcher<I>
279where
280 I: Iterator<Item = EventBatch>,
281{
282 type Item = PulseBatchGroup;
283
284 fn next(&mut self) -> Option<Self::Item> {
285 self.next_group()
286 }
287}
288
289pub fn pulse_batches(
294 reader: &Tpx3FileReader,
295 config: &OutOfCoreConfig,
296 overlap_tof: u32,
297) -> Result<PulseBatcher<TimeOrderedEventStream>> {
298 let stream = reader.stream_time_ordered_events()?;
299 PulseBatcher::new(stream, config, overlap_tof)
300}
301
302fn bytes_per_hit() -> usize {
303 size_of::<u16>() * 2
304 + size_of::<u32>() * 2
305 + size_of::<u16>()
306 + size_of::<u8>()
307 + size_of::<i32>()
308}
309
310#[allow(
311 clippy::cast_possible_truncation,
312 clippy::cast_precision_loss,
313 clippy::cast_sign_loss
314)]
315fn max_hits_for_budget(budget_bytes: usize, bytes_per_hit: usize) -> usize {
316 let per_hit = (bytes_per_hit as f64 * MEMORY_OVERHEAD_FACTOR).ceil() as usize;
317 let per_hit = per_hit.max(1);
318 (budget_bytes / per_hit).max(1)
319}
320
321#[allow(
322 clippy::cast_possible_truncation,
323 clippy::cast_precision_loss,
324 clippy::cast_sign_loss
325)]
326fn estimate_batch_bytes(hit_count: usize, bytes_per_hit: usize) -> usize {
327 let per_hit = bytes_per_hit as f64 * MEMORY_OVERHEAD_FACTOR;
328 (hit_count as f64 * per_hit).ceil() as usize
329}
330
331fn split_pulse_with_overlap(
332 batch: EventBatch,
333 max_hits: usize,
334 overlap_tof: u32,
335) -> Vec<PulseSlice> {
336 let hits = batch.hits;
337 let total = hits.len();
338 if total == 0 {
339 return Vec::new();
340 }
341
342 if total <= max_hits {
343 let cutoff = *hits.tof.last().unwrap_or(&0);
344 return vec![PulseSlice {
345 tdc_timestamp_25ns: batch.tdc_timestamp_25ns,
346 hits,
347 emit_cutoff_tof: cutoff,
348 }];
349 }
350
351 let mut slices = Vec::new();
352 let mut start = 0usize;
353 while start < total {
354 let mut end = (start + max_hits).min(total);
355 if end == start {
356 end = (start + 1).min(total);
357 }
358
359 let cutoff_tof = hits.tof[end - 1];
360 while end < total && hits.tof[end] == cutoff_tof {
361 end += 1;
362 }
363
364 let mut overlap_end = end;
365 if overlap_tof > 0 {
366 let overlap_limit = cutoff_tof.saturating_add(overlap_tof);
367 while overlap_end < total && hits.tof[overlap_end] <= overlap_limit {
368 overlap_end += 1;
369 }
370 }
371
372 let slice = slice_hits(&hits, start, overlap_end);
373 slices.push(PulseSlice {
374 tdc_timestamp_25ns: batch.tdc_timestamp_25ns,
375 hits: slice,
376 emit_cutoff_tof: cutoff_tof,
377 });
378
379 start = end;
380 }
381
382 slices
383}
384
385fn slice_hits(batch: &HitBatch, start: usize, end: usize) -> HitBatch {
386 let len = end.saturating_sub(start);
387 let mut sliced = HitBatch::with_capacity(len);
388 sliced.x.extend_from_slice(&batch.x[start..end]);
389 sliced.y.extend_from_slice(&batch.y[start..end]);
390 sliced.tof.extend_from_slice(&batch.tof[start..end]);
391 sliced.tot.extend_from_slice(&batch.tot[start..end]);
392 sliced
393 .timestamp
394 .extend_from_slice(&batch.timestamp[start..end]);
395 sliced.chip_id.extend_from_slice(&batch.chip_id[start..end]);
396 sliced
397 .cluster_id
398 .extend_from_slice(&batch.cluster_id[start..end]);
399 sliced
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405
406 fn make_hit_batch(tofs: &[u32]) -> HitBatch {
407 let mut batch = HitBatch::with_capacity(tofs.len());
408 for (i, &tof) in tofs.iter().enumerate() {
409 let x = u16::try_from(i % 512).unwrap_or(0);
410 let y = u16::try_from(i / 512).unwrap_or(0);
411 batch.push((x, y, tof, 0, tof, 0));
412 }
413 batch
414 }
415
416 #[test]
417 fn split_pulse_with_overlap_keeps_order() {
418 let tofs: Vec<u32> = (0..10).collect();
419 let event = EventBatch {
420 tdc_timestamp_25ns: 0,
421 hits: make_hit_batch(&tofs),
422 };
423 let slices = split_pulse_with_overlap(event, 4, 1);
424 assert_eq!(slices.len(), 3);
425 assert_eq!(slices[0].hits.tof, vec![0, 1, 2, 3, 4]);
426 assert_eq!(slices[0].emit_cutoff_tof, 3);
427 assert_eq!(slices[1].hits.tof, vec![4, 5, 6, 7, 8]);
428 assert_eq!(slices[1].emit_cutoff_tof, 7);
429 assert_eq!(slices[2].hits.tof, vec![8, 9]);
430 assert_eq!(slices[2].emit_cutoff_tof, 9);
431 }
432
433 #[test]
434 fn batcher_groups_pulses_under_budget() {
435 let bytes = bytes_per_hit() * 4;
436 let config = OutOfCoreConfig::default().with_memory_budget_bytes(bytes);
437 let overlap_tof = 0;
438
439 let pulses = vec![
440 EventBatch {
441 tdc_timestamp_25ns: 0,
442 hits: make_hit_batch(&[0, 1]),
443 },
444 EventBatch {
445 tdc_timestamp_25ns: 1,
446 hits: make_hit_batch(&[0, 1]),
447 },
448 EventBatch {
449 tdc_timestamp_25ns: 2,
450 hits: make_hit_batch(&[0, 1]),
451 },
452 ];
453
454 let mut batcher = PulseBatcher::new(pulses.into_iter(), &config, overlap_tof).unwrap();
455 let max_hits = max_hits_for_budget(bytes, bytes_per_hit());
456 let mut total_hits = 0usize;
457 let mut batch_count = 0usize;
458 for batch in &mut batcher {
459 assert!(batch.total_hits() <= max_hits);
460 total_hits += batch.total_hits();
461 batch_count += 1;
462 }
463
464 assert_eq!(total_hits, 6);
465 assert!(batch_count >= 2);
466 }
467}