1use crate::out_of_core::{
4 pulse_batches, OutOfCoreConfig, PulseBatchGroup, PulseBatcher, PulseSlice,
5};
6use crate::reader::{TimeOrderedEventStream, Tpx3FileReader};
7use crate::{Error, Result};
8use rayon::prelude::*;
9use rustpix_algorithms::{cluster_and_extract_batch, AlgorithmParams, ClusteringAlgorithm};
10use rustpix_core::clustering::ClusteringConfig;
11use rustpix_core::extraction::ExtractionConfig;
12use rustpix_core::neutron::NeutronBatch;
13use rustpix_core::soa::HitBatch;
14use std::collections::VecDeque;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{mpsc, Arc};
17use std::thread;
18use std::time::Duration;
19
20#[derive(Clone, Debug)]
22pub struct PulseNeutronBatch {
23 pub tdc_timestamp_25ns: u64,
25 pub hits_processed: usize,
27 pub neutrons: NeutronBatch,
29}
30
31struct SliceOutput {
32 tdc_timestamp_25ns: u64,
33 hits_processed: usize,
34 neutrons: NeutronBatch,
35}
36
37pub enum OutOfCoreNeutronStreamHandle {
39 Single(Box<OutOfCoreNeutronStream<PulseBatcher<TimeOrderedEventStream>>>),
41 Threaded(ThreadedOutOfCoreNeutronStream),
43}
44
45impl Iterator for OutOfCoreNeutronStreamHandle {
46 type Item = Result<PulseNeutronBatch>;
47
48 fn next(&mut self) -> Option<Self::Item> {
49 match self {
50 Self::Single(stream) => stream.next(),
51 Self::Threaded(stream) => stream.next(),
52 }
53 }
54}
55
56pub struct ThreadedOutOfCoreNeutronStream {
61 receiver: mpsc::Receiver<Result<PulseNeutronBatch>>,
63 handles: Vec<thread::JoinHandle<()>>,
65 cancel: Arc<AtomicBool>,
67}
68
69impl Iterator for ThreadedOutOfCoreNeutronStream {
70 type Item = Result<PulseNeutronBatch>;
71
72 fn next(&mut self) -> Option<Self::Item> {
73 self.receiver.recv().ok()
74 }
75}
76
77impl Drop for ThreadedOutOfCoreNeutronStream {
78 fn drop(&mut self) {
79 self.cancel.store(true, Ordering::Relaxed);
80 for handle in self.handles.drain(..) {
81 let _ = handle.join();
82 }
83 }
84}
85
86pub struct OutOfCoreNeutronStream<I>
88where
89 I: Iterator<Item = PulseBatchGroup>,
90{
91 batches: I,
93 slices: VecDeque<PulseSlice>,
95 algorithm: ClusteringAlgorithm,
97 clustering: ClusteringConfig,
99 extraction: ExtractionConfig,
101 params: AlgorithmParams,
103 current_tdc: Option<u64>,
105 current_neutrons: NeutronBatch,
107 current_hits: usize,
109 pending: VecDeque<PulseNeutronBatch>,
111 finished: bool,
113}
114
115impl<I> OutOfCoreNeutronStream<I>
116where
117 I: Iterator<Item = PulseBatchGroup>,
118{
119 #[must_use]
121 pub fn new(
122 batches: I,
123 algorithm: ClusteringAlgorithm,
124 clustering: ClusteringConfig,
125 extraction: ExtractionConfig,
126 params: AlgorithmParams,
127 ) -> Self {
128 Self {
129 batches,
130 slices: VecDeque::new(),
131 algorithm,
132 clustering,
133 extraction,
134 params,
135 current_tdc: None,
136 current_neutrons: NeutronBatch::default(),
137 current_hits: 0,
138 pending: VecDeque::new(),
139 finished: false,
140 }
141 }
142
143 fn next_slice(&mut self) -> Option<PulseSlice> {
144 if let Some(slice) = self.slices.pop_front() {
145 return Some(slice);
146 }
147
148 let group = self.batches.next()?;
149 self.slices.extend(group.slices);
150 self.slices.pop_front()
151 }
152
153 fn process_slice(&mut self, slice: PulseSlice) -> Result<()> {
154 let mut hits = slice.hits;
155 let mut neutrons = cluster_and_extract_batch(
156 &mut hits,
157 self.algorithm,
158 &self.clustering,
159 &self.extraction,
160 &self.params,
161 )
162 .map_err(Error::CoreError)?;
163
164 let emitted_hits = count_emitted_hits(&hits, slice.emit_cutoff_tof);
165 if slice.emit_cutoff_tof != u32::MAX {
166 neutrons = filter_neutrons_by_tof(&neutrons, slice.emit_cutoff_tof);
167 }
168
169 self.append_neutrons(slice.tdc_timestamp_25ns, &neutrons, emitted_hits);
170 Ok(())
171 }
172
173 fn append_neutrons(
174 &mut self,
175 tdc_timestamp_25ns: u64,
176 neutrons: &NeutronBatch,
177 emitted_hits: usize,
178 ) {
179 let current = self.current_tdc.unwrap_or(tdc_timestamp_25ns);
180 if current != tdc_timestamp_25ns {
181 if self.current_hits > 0 || !self.current_neutrons.is_empty() {
182 self.pending.push_back(PulseNeutronBatch {
183 tdc_timestamp_25ns: current,
184 hits_processed: self.current_hits,
185 neutrons: std::mem::take(&mut self.current_neutrons),
186 });
187 }
188 self.current_tdc = Some(tdc_timestamp_25ns);
189 self.current_hits = 0;
190 } else if self.current_tdc.is_none() {
191 self.current_tdc = Some(tdc_timestamp_25ns);
192 }
193
194 self.current_neutrons.append(neutrons);
195 self.current_hits = self.current_hits.saturating_add(emitted_hits);
196 }
197
198 fn flush_current(&mut self) {
199 if let Some(tdc) = self.current_tdc.take() {
200 if self.current_hits > 0 || !self.current_neutrons.is_empty() {
201 self.pending.push_back(PulseNeutronBatch {
202 tdc_timestamp_25ns: tdc,
203 hits_processed: self.current_hits,
204 neutrons: std::mem::take(&mut self.current_neutrons),
205 });
206 }
207 }
208 self.current_hits = 0;
209 }
210}
211
212impl<I> Iterator for OutOfCoreNeutronStream<I>
213where
214 I: Iterator<Item = PulseBatchGroup>,
215{
216 type Item = Result<PulseNeutronBatch>;
217
218 fn next(&mut self) -> Option<Self::Item> {
219 loop {
220 if let Some(batch) = self.pending.pop_front() {
221 return Some(Ok(batch));
222 }
223
224 if self.finished {
225 return None;
226 }
227
228 if let Some(slice) = self.next_slice() {
229 if let Err(err) = self.process_slice(slice) {
230 return Some(Err(err));
231 }
232 continue;
233 }
234
235 self.flush_current();
236 if self.pending.is_empty() {
237 self.finished = true;
238 return None;
239 }
240 }
241 }
242}
243
244pub fn out_of_core_neutron_stream(
249 reader: &Tpx3FileReader,
250 algorithm: ClusteringAlgorithm,
251 clustering: &ClusteringConfig,
252 extraction: &ExtractionConfig,
253 params: &AlgorithmParams,
254 memory: &OutOfCoreConfig,
255) -> Result<Box<dyn Iterator<Item = Result<PulseNeutronBatch>>>> {
256 let handle = out_of_core_neutron_stream_handle(
257 reader, algorithm, clustering, extraction, params, memory,
258 )?;
259
260 Ok(Box::new(handle))
261}
262
263pub fn out_of_core_neutron_stream_handle(
271 reader: &Tpx3FileReader,
272 algorithm: ClusteringAlgorithm,
273 clustering: &ClusteringConfig,
274 extraction: &ExtractionConfig,
275 params: &AlgorithmParams,
276 memory: &OutOfCoreConfig,
277) -> Result<OutOfCoreNeutronStreamHandle> {
278 let overlap_tof = clustering.window_tof();
279 let batcher = pulse_batches(reader, memory, overlap_tof)?;
280 if memory.use_threaded_pipeline() {
281 Ok(OutOfCoreNeutronStreamHandle::Threaded(
282 build_threaded_stream(
283 batcher,
284 algorithm,
285 clustering.clone(),
286 extraction.clone(),
287 params.clone(),
288 memory.effective_parallelism(),
289 memory.effective_queue_depth(),
290 ),
291 ))
292 } else {
293 Ok(OutOfCoreNeutronStreamHandle::Single(Box::new(
294 OutOfCoreNeutronStream::new(
295 batcher,
296 algorithm,
297 clustering.clone(),
298 extraction.clone(),
299 params.clone(),
300 ),
301 )))
302 }
303}
304
305fn build_threaded_stream<I>(
306 batcher: PulseBatcher<I>,
307 algorithm: ClusteringAlgorithm,
308 clustering: ClusteringConfig,
309 extraction: ExtractionConfig,
310 params: AlgorithmParams,
311 parallelism: usize,
312 queue_depth: usize,
313) -> ThreadedOutOfCoreNeutronStream
314where
315 I: Iterator<Item = crate::reader::EventBatch> + Send + 'static,
316{
317 let (group_tx, group_rx) = mpsc::sync_channel::<PulseBatchGroup>(queue_depth);
318 let (out_tx, out_rx) = mpsc::sync_channel::<Result<PulseNeutronBatch>>(queue_depth);
319 let cancel = Arc::new(AtomicBool::new(false));
320 let cancel_reader = Arc::clone(&cancel);
321 let cancel_worker = Arc::clone(&cancel);
322
323 let reader_handle = thread::spawn(move || {
324 for group in batcher {
325 if cancel_reader.load(Ordering::Relaxed) {
326 break;
327 }
328 let mut pending = group;
329 loop {
330 if cancel_reader.load(Ordering::Relaxed) {
331 return;
332 }
333 match group_tx.try_send(pending) {
334 Ok(()) => break,
335 Err(mpsc::TrySendError::Disconnected(_)) => return,
336 Err(mpsc::TrySendError::Full(group)) => {
337 pending = group;
338 thread::sleep(Duration::from_millis(1));
339 }
340 }
341 }
342 }
343 });
344
345 let worker_handle = thread::spawn(move || {
346 let pool = if parallelism > 1 {
347 rayon::ThreadPoolBuilder::new()
348 .num_threads(parallelism)
349 .build()
350 .ok()
351 } else {
352 None
353 };
354
355 loop {
356 if cancel_worker.load(Ordering::Relaxed) {
357 break;
358 }
359 let group = match group_rx.recv_timeout(Duration::from_millis(50)) {
360 Ok(group) => group,
361 Err(mpsc::RecvTimeoutError::Timeout) => continue,
362 Err(mpsc::RecvTimeoutError::Disconnected) => break,
363 };
364
365 if cancel_worker.load(Ordering::Relaxed) {
366 break;
367 }
368
369 let result = if let Some(pool) = &pool {
370 pool.install(|| {
371 process_group(group, algorithm, &clustering, &extraction, ¶ms, true)
372 })
373 } else {
374 process_group(group, algorithm, &clustering, &extraction, ¶ms, false)
375 };
376
377 match result {
378 Ok(group_batches) => {
379 for batch in group_batches {
380 if cancel_worker.load(Ordering::Relaxed) {
381 return;
382 }
383 if out_tx.send(Ok(batch)).is_err() {
384 return;
385 }
386 }
387 }
388 Err(err) => {
389 let _ = out_tx.send(Err(err));
390 return;
391 }
392 }
393 }
394 });
395
396 ThreadedOutOfCoreNeutronStream {
397 receiver: out_rx,
398 handles: vec![reader_handle, worker_handle],
399 cancel,
400 }
401}
402
403fn process_group(
404 group: PulseBatchGroup,
405 algorithm: ClusteringAlgorithm,
406 clustering: &ClusteringConfig,
407 extraction: &ExtractionConfig,
408 params: &AlgorithmParams,
409 parallel: bool,
410) -> Result<Vec<PulseNeutronBatch>> {
411 let slice_results: Vec<Result<SliceOutput>> = if parallel {
412 group
413 .slices
414 .into_par_iter()
415 .map(|slice| process_slice_output(slice, algorithm, clustering, extraction, params))
416 .collect()
417 } else {
418 group
419 .slices
420 .into_iter()
421 .map(|slice| process_slice_output(slice, algorithm, clustering, extraction, params))
422 .collect()
423 };
424
425 let mut outputs = Vec::with_capacity(slice_results.len());
426 for result in slice_results {
427 outputs.push(result?);
428 }
429
430 let mut batches = Vec::new();
431 let mut current_tdc: Option<u64> = None;
432 let mut current_batch = PulseNeutronBatch {
433 tdc_timestamp_25ns: 0,
434 hits_processed: 0,
435 neutrons: NeutronBatch::default(),
436 };
437
438 for output in outputs {
439 if current_tdc != Some(output.tdc_timestamp_25ns) {
440 if current_tdc.is_some()
441 && (current_batch.hits_processed > 0 || !current_batch.neutrons.is_empty())
442 {
443 batches.push(current_batch);
444 }
445 current_batch = PulseNeutronBatch {
446 tdc_timestamp_25ns: output.tdc_timestamp_25ns,
447 hits_processed: 0,
448 neutrons: NeutronBatch::default(),
449 };
450 current_tdc = Some(output.tdc_timestamp_25ns);
451 }
452
453 current_batch.neutrons.append(&output.neutrons);
454 current_batch.hits_processed = current_batch
455 .hits_processed
456 .saturating_add(output.hits_processed);
457 }
458
459 if current_tdc.is_some()
460 && (current_batch.hits_processed > 0 || !current_batch.neutrons.is_empty())
461 {
462 batches.push(current_batch);
463 }
464
465 Ok(batches)
466}
467
468fn process_slice_output(
469 slice: PulseSlice,
470 algorithm: ClusteringAlgorithm,
471 clustering: &ClusteringConfig,
472 extraction: &ExtractionConfig,
473 params: &AlgorithmParams,
474) -> Result<SliceOutput> {
475 let mut hits = slice.hits;
476 let mut neutrons =
477 cluster_and_extract_batch(&mut hits, algorithm, clustering, extraction, params)
478 .map_err(Error::CoreError)?;
479
480 let emitted_hits = count_emitted_hits(&hits, slice.emit_cutoff_tof);
481 if slice.emit_cutoff_tof != u32::MAX {
482 neutrons = filter_neutrons_by_tof(&neutrons, slice.emit_cutoff_tof);
483 }
484
485 Ok(SliceOutput {
486 tdc_timestamp_25ns: slice.tdc_timestamp_25ns,
487 hits_processed: emitted_hits,
488 neutrons,
489 })
490}
491
492fn filter_neutrons_by_tof(neutrons: &NeutronBatch, cutoff_tof: u32) -> NeutronBatch {
493 let mut filtered = NeutronBatch::with_capacity(neutrons.len());
494 for i in 0..neutrons.len() {
495 if neutrons.tof[i] <= cutoff_tof {
496 push_neutron(&mut filtered, neutrons, i);
497 }
498 }
499 filtered
500}
501
502fn push_neutron(dest: &mut NeutronBatch, src: &NeutronBatch, idx: usize) {
503 dest.x.push(src.x[idx]);
504 dest.y.push(src.y[idx]);
505 dest.tof.push(src.tof[idx]);
506 dest.tot.push(src.tot[idx]);
507 dest.n_hits.push(src.n_hits[idx]);
508 dest.chip_id.push(src.chip_id[idx]);
509}
510
511fn count_emitted_hits(hits: &HitBatch, cutoff: u32) -> usize {
512 if hits.is_empty() {
513 return 0;
514 }
515 hits.tof.partition_point(|&tof| tof <= cutoff)
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use crate::reader::EventBatch;
522 use rustpix_core::soa::HitRecord;
523
524 fn make_event_batch(tdc: u64, hits: &[HitRecord]) -> EventBatch {
525 let mut batch = HitBatch::with_capacity(hits.len());
526 for &hit in hits {
527 batch.push(hit);
528 }
529 batch.sort_by_tof();
530 EventBatch {
531 tdc_timestamp_25ns: tdc,
532 hits: batch,
533 }
534 }
535
536 fn collect_neutrons<I>(iter: I) -> Vec<u32>
537 where
538 I: Iterator<Item = Result<PulseNeutronBatch>>,
539 {
540 let mut tofs = Vec::new();
541 for batch in iter {
542 let batch = batch.unwrap();
543 for tof in batch.neutrons.tof {
544 tofs.push(tof);
545 }
546 }
547 tofs.sort_unstable();
548 tofs
549 }
550
551 #[test]
552 fn out_of_core_matches_pulse_processing() {
553 let pulses_for_stream = vec![
554 make_event_batch(1, &[(1, 1, 10, 1, 10, 0), (100, 100, 20, 1, 20, 0)]),
555 make_event_batch(2, &[(50, 50, 30, 1, 30, 0)]),
556 ];
557 let pulses_for_expected = vec![
558 make_event_batch(1, &[(1, 1, 10, 1, 10, 0), (100, 100, 20, 1, 20, 0)]),
559 make_event_batch(2, &[(50, 50, 30, 1, 30, 0)]),
560 ];
561
562 let config = OutOfCoreConfig::default().with_memory_budget_bytes(10_000);
563 let batcher =
564 crate::out_of_core::PulseBatcher::new(pulses_for_stream.into_iter(), &config, 0)
565 .unwrap();
566
567 let clustering = ClusteringConfig {
568 radius: 1.0,
569 temporal_window_ns: 25.0,
570 min_cluster_size: 1,
571 max_cluster_size: None,
572 };
573 let extraction = ExtractionConfig::default();
574 let params = AlgorithmParams::default();
575
576 let stream = OutOfCoreNeutronStream::new(
577 batcher,
578 ClusteringAlgorithm::Grid,
579 clustering.clone(),
580 extraction.clone(),
581 params.clone(),
582 );
583 let ooc_tofs = collect_neutrons(stream);
584
585 let mut expected = Vec::new();
586 for mut pulse in pulses_for_expected {
587 let neutrons = cluster_and_extract_batch(
588 &mut pulse.hits,
589 ClusteringAlgorithm::Grid,
590 &clustering,
591 &extraction,
592 ¶ms,
593 )
594 .unwrap();
595 expected.extend(neutrons.tof);
596 }
597 expected.sort_unstable();
598
599 assert_eq!(ooc_tofs, expected);
600 }
601
602 #[test]
603 fn out_of_core_split_pulse_preserves_hits() {
604 let hits = [
605 (1, 1, 1, 1, 1, 0),
606 (50, 50, 2, 1, 2, 0),
607 (100, 100, 3, 1, 3, 0),
608 (150, 150, 4, 1, 4, 0),
609 (200, 200, 5, 1, 5, 0),
610 ];
611 let pulse_for_stream = make_event_batch(7, &hits);
612 let mut pulse_for_expected = make_event_batch(7, &hits);
613
614 let config = OutOfCoreConfig::default().with_memory_budget_bytes(32);
615 let batcher =
616 crate::out_of_core::PulseBatcher::new(vec![pulse_for_stream].into_iter(), &config, 1)
617 .unwrap();
618
619 let clustering = ClusteringConfig {
620 radius: 1.0,
621 temporal_window_ns: 25.0,
622 min_cluster_size: 1,
623 max_cluster_size: None,
624 };
625 let extraction = ExtractionConfig::default();
626 let params = AlgorithmParams::default();
627
628 let stream = OutOfCoreNeutronStream::new(
629 batcher,
630 ClusteringAlgorithm::Grid,
631 clustering.clone(),
632 extraction.clone(),
633 params.clone(),
634 );
635 let ooc_tofs = collect_neutrons(stream);
636
637 let mut expected = Vec::new();
638 let neutrons = cluster_and_extract_batch(
639 &mut pulse_for_expected.hits,
640 ClusteringAlgorithm::Grid,
641 &clustering,
642 &extraction,
643 ¶ms,
644 )
645 .unwrap();
646 expected.extend(neutrons.tof);
647 expected.sort_unstable();
648
649 assert_eq!(ooc_tofs, expected);
650 }
651
652 #[test]
653 fn out_of_core_counts_hits_without_double_counting() {
654 let hits = [
655 (1, 1, 1, 1, 1, 0),
656 (50, 50, 2, 1, 2, 0),
657 (100, 100, 3, 1, 3, 0),
658 (150, 150, 4, 1, 4, 0),
659 (200, 200, 5, 1, 5, 0),
660 ];
661 let pulse = make_event_batch(7, &hits);
662
663 let config = OutOfCoreConfig::default().with_memory_budget_bytes(32);
664 let batcher =
665 crate::out_of_core::PulseBatcher::new(vec![pulse].into_iter(), &config, 1).unwrap();
666
667 let clustering = ClusteringConfig {
668 radius: 1.0,
669 temporal_window_ns: 25.0,
670 min_cluster_size: 1,
671 max_cluster_size: None,
672 };
673 let extraction = ExtractionConfig::default();
674 let params = AlgorithmParams::default();
675
676 let stream = OutOfCoreNeutronStream::new(
677 batcher,
678 ClusteringAlgorithm::Grid,
679 clustering,
680 extraction,
681 params,
682 );
683
684 let total_hits: usize = stream.map(|batch| batch.unwrap().hits_processed).sum();
685 assert_eq!(total_hits, hits.len());
686 }
687
688 #[test]
689 fn out_of_core_threaded_matches_single() {
690 let config = OutOfCoreConfig::default().with_memory_budget_bytes(10_000);
691 let batcher = crate::out_of_core::PulseBatcher::new(
692 vec![
693 make_event_batch(1, &[(1, 1, 10, 1, 10, 0), (100, 100, 20, 1, 20, 0)]),
694 make_event_batch(2, &[(50, 50, 30, 1, 30, 0)]),
695 ]
696 .into_iter(),
697 &config,
698 0,
699 )
700 .unwrap();
701
702 let clustering = ClusteringConfig {
703 radius: 1.0,
704 temporal_window_ns: 25.0,
705 min_cluster_size: 1,
706 max_cluster_size: None,
707 };
708 let extraction = ExtractionConfig::default();
709 let params = AlgorithmParams::default();
710
711 let threaded = build_threaded_stream(
712 batcher,
713 ClusteringAlgorithm::Grid,
714 clustering.clone(),
715 extraction.clone(),
716 params.clone(),
717 2,
718 1,
719 );
720 let threaded_tofs = collect_neutrons(threaded);
721
722 let mut expected = Vec::new();
723 for mut pulse in [
724 make_event_batch(1, &[(1, 1, 10, 1, 10, 0), (100, 100, 20, 1, 20, 0)]),
725 make_event_batch(2, &[(50, 50, 30, 1, 30, 0)]),
726 ] {
727 let neutrons = cluster_and_extract_batch(
728 &mut pulse.hits,
729 ClusteringAlgorithm::Grid,
730 &clustering,
731 &extraction,
732 ¶ms,
733 )
734 .unwrap();
735 expected.extend(neutrons.tof);
736 }
737 expected.sort_unstable();
738
739 assert_eq!(threaded_tofs, expected);
740 }
741}