Skip to main content

rustpix_io/
out_of_core_pipeline.rs

1//! Out-of-core processing pipeline for pulse-bounded streams.
2
3use crate::out_of_core::{
4    pulse_batches, OutOfCoreConfig, PulseBatchGroup, PulseBatcher, PulseSlice,
5};
6use crate::reader::{TimeOrderedEventStream, Tpx3FileReader};
7use crate::{Error, Result};
8use rayon::prelude::*;
9use rustpix_algorithms::{cluster_and_extract_batch, AlgorithmParams, ClusteringAlgorithm};
10use rustpix_core::clustering::ClusteringConfig;
11use rustpix_core::extraction::ExtractionConfig;
12use rustpix_core::neutron::NeutronBatch;
13use rustpix_core::soa::HitBatch;
14use std::collections::VecDeque;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::{mpsc, Arc};
17use std::thread;
18use std::time::Duration;
19
20/// Neutron output for a single pulse.
21#[derive(Clone, Debug)]
22pub struct PulseNeutronBatch {
23    /// Pulse TDC timestamp (25ns ticks).
24    pub tdc_timestamp_25ns: u64,
25    /// Number of hits processed for this pulse.
26    pub hits_processed: usize,
27    /// Neutrons extracted from this pulse.
28    pub neutrons: NeutronBatch,
29}
30
31struct SliceOutput {
32    tdc_timestamp_25ns: u64,
33    hits_processed: usize,
34    neutrons: NeutronBatch,
35}
36
37/// Stream handle that can be single-threaded or threaded.
38pub enum OutOfCoreNeutronStreamHandle {
39    /// Single-threaded out-of-core stream.
40    Single(Box<OutOfCoreNeutronStream<PulseBatcher<TimeOrderedEventStream>>>),
41    /// Threaded out-of-core stream.
42    Threaded(ThreadedOutOfCoreNeutronStream),
43}
44
45impl Iterator for OutOfCoreNeutronStreamHandle {
46    type Item = Result<PulseNeutronBatch>;
47
48    fn next(&mut self) -> Option<Self::Item> {
49        match self {
50            Self::Single(stream) => stream.next(),
51            Self::Threaded(stream) => stream.next(),
52        }
53    }
54}
55
56/// Threaded out-of-core stream with bounded queues.
57///
58/// Dropping the stream signals cancellation and joins worker threads; if a
59/// batch is already being processed, shutdown waits for that batch to finish.
60pub struct ThreadedOutOfCoreNeutronStream {
61    /// Receives pulse outputs from the worker thread.
62    receiver: mpsc::Receiver<Result<PulseNeutronBatch>>,
63    /// Join handles for reader/worker threads.
64    handles: Vec<thread::JoinHandle<()>>,
65    /// Cancellation flag used to stop worker threads.
66    cancel: Arc<AtomicBool>,
67}
68
69impl Iterator for ThreadedOutOfCoreNeutronStream {
70    type Item = Result<PulseNeutronBatch>;
71
72    fn next(&mut self) -> Option<Self::Item> {
73        self.receiver.recv().ok()
74    }
75}
76
77impl Drop for ThreadedOutOfCoreNeutronStream {
78    fn drop(&mut self) {
79        self.cancel.store(true, Ordering::Relaxed);
80        for handle in self.handles.drain(..) {
81            let _ = handle.join();
82        }
83    }
84}
85
86/// Iterator over out-of-core pulse batches.
87pub struct OutOfCoreNeutronStream<I>
88where
89    I: Iterator<Item = PulseBatchGroup>,
90{
91    /// Pulse batch source.
92    batches: I,
93    /// Pending slices from the current batch group.
94    slices: VecDeque<PulseSlice>,
95    /// Selected clustering algorithm.
96    algorithm: ClusteringAlgorithm,
97    /// Clustering configuration.
98    clustering: ClusteringConfig,
99    /// Extraction configuration.
100    extraction: ExtractionConfig,
101    /// Algorithm tuning parameters.
102    params: AlgorithmParams,
103    /// Current pulse TDC timestamp.
104    current_tdc: Option<u64>,
105    /// Accumulated neutrons for the current pulse.
106    current_neutrons: NeutronBatch,
107    /// Count of emitted hits for the current pulse.
108    current_hits: usize,
109    /// Completed pulse outputs waiting to be returned.
110    pending: VecDeque<PulseNeutronBatch>,
111    /// Whether the stream has been fully drained.
112    finished: bool,
113}
114
115impl<I> OutOfCoreNeutronStream<I>
116where
117    I: Iterator<Item = PulseBatchGroup>,
118{
119    /// Create a new out-of-core stream from pulse batch groups.
120    #[must_use]
121    pub fn new(
122        batches: I,
123        algorithm: ClusteringAlgorithm,
124        clustering: ClusteringConfig,
125        extraction: ExtractionConfig,
126        params: AlgorithmParams,
127    ) -> Self {
128        Self {
129            batches,
130            slices: VecDeque::new(),
131            algorithm,
132            clustering,
133            extraction,
134            params,
135            current_tdc: None,
136            current_neutrons: NeutronBatch::default(),
137            current_hits: 0,
138            pending: VecDeque::new(),
139            finished: false,
140        }
141    }
142
143    fn next_slice(&mut self) -> Option<PulseSlice> {
144        if let Some(slice) = self.slices.pop_front() {
145            return Some(slice);
146        }
147
148        let group = self.batches.next()?;
149        self.slices.extend(group.slices);
150        self.slices.pop_front()
151    }
152
153    fn process_slice(&mut self, slice: PulseSlice) -> Result<()> {
154        let mut hits = slice.hits;
155        let mut neutrons = cluster_and_extract_batch(
156            &mut hits,
157            self.algorithm,
158            &self.clustering,
159            &self.extraction,
160            &self.params,
161        )
162        .map_err(Error::CoreError)?;
163
164        let emitted_hits = count_emitted_hits(&hits, slice.emit_cutoff_tof);
165        if slice.emit_cutoff_tof != u32::MAX {
166            neutrons = filter_neutrons_by_tof(&neutrons, slice.emit_cutoff_tof);
167        }
168
169        self.append_neutrons(slice.tdc_timestamp_25ns, &neutrons, emitted_hits);
170        Ok(())
171    }
172
173    fn append_neutrons(
174        &mut self,
175        tdc_timestamp_25ns: u64,
176        neutrons: &NeutronBatch,
177        emitted_hits: usize,
178    ) {
179        let current = self.current_tdc.unwrap_or(tdc_timestamp_25ns);
180        if current != tdc_timestamp_25ns {
181            if self.current_hits > 0 || !self.current_neutrons.is_empty() {
182                self.pending.push_back(PulseNeutronBatch {
183                    tdc_timestamp_25ns: current,
184                    hits_processed: self.current_hits,
185                    neutrons: std::mem::take(&mut self.current_neutrons),
186                });
187            }
188            self.current_tdc = Some(tdc_timestamp_25ns);
189            self.current_hits = 0;
190        } else if self.current_tdc.is_none() {
191            self.current_tdc = Some(tdc_timestamp_25ns);
192        }
193
194        self.current_neutrons.append(neutrons);
195        self.current_hits = self.current_hits.saturating_add(emitted_hits);
196    }
197
198    fn flush_current(&mut self) {
199        if let Some(tdc) = self.current_tdc.take() {
200            if self.current_hits > 0 || !self.current_neutrons.is_empty() {
201                self.pending.push_back(PulseNeutronBatch {
202                    tdc_timestamp_25ns: tdc,
203                    hits_processed: self.current_hits,
204                    neutrons: std::mem::take(&mut self.current_neutrons),
205                });
206            }
207        }
208        self.current_hits = 0;
209    }
210}
211
212impl<I> Iterator for OutOfCoreNeutronStream<I>
213where
214    I: Iterator<Item = PulseBatchGroup>,
215{
216    type Item = Result<PulseNeutronBatch>;
217
218    fn next(&mut self) -> Option<Self::Item> {
219        loop {
220            if let Some(batch) = self.pending.pop_front() {
221                return Some(Ok(batch));
222            }
223
224            if self.finished {
225                return None;
226            }
227
228            if let Some(slice) = self.next_slice() {
229                if let Err(err) = self.process_slice(slice) {
230                    return Some(Err(err));
231                }
232                continue;
233            }
234
235            self.flush_current();
236            if self.pending.is_empty() {
237                self.finished = true;
238                return None;
239            }
240        }
241    }
242}
243
244/// Build an out-of-core neutron stream from a TPX3 reader.
245///
246/// # Errors
247/// Returns an error if the reader fails or the memory budget is invalid.
248pub fn out_of_core_neutron_stream(
249    reader: &Tpx3FileReader,
250    algorithm: ClusteringAlgorithm,
251    clustering: &ClusteringConfig,
252    extraction: &ExtractionConfig,
253    params: &AlgorithmParams,
254    memory: &OutOfCoreConfig,
255) -> Result<Box<dyn Iterator<Item = Result<PulseNeutronBatch>>>> {
256    let handle = out_of_core_neutron_stream_handle(
257        reader, algorithm, clustering, extraction, params, memory,
258    )?;
259
260    Ok(Box::new(handle))
261}
262
263/// Build an out-of-core neutron stream handle from a TPX3 reader.
264///
265/// This exposes the underlying handle type, while [`out_of_core_neutron_stream`]
266/// returns a boxed iterator for compatibility.
267///
268/// # Errors
269/// Returns an error if the reader fails or the memory budget is invalid.
270pub fn out_of_core_neutron_stream_handle(
271    reader: &Tpx3FileReader,
272    algorithm: ClusteringAlgorithm,
273    clustering: &ClusteringConfig,
274    extraction: &ExtractionConfig,
275    params: &AlgorithmParams,
276    memory: &OutOfCoreConfig,
277) -> Result<OutOfCoreNeutronStreamHandle> {
278    let overlap_tof = clustering.window_tof();
279    let batcher = pulse_batches(reader, memory, overlap_tof)?;
280    if memory.use_threaded_pipeline() {
281        Ok(OutOfCoreNeutronStreamHandle::Threaded(
282            build_threaded_stream(
283                batcher,
284                algorithm,
285                clustering.clone(),
286                extraction.clone(),
287                params.clone(),
288                memory.effective_parallelism(),
289                memory.effective_queue_depth(),
290            ),
291        ))
292    } else {
293        Ok(OutOfCoreNeutronStreamHandle::Single(Box::new(
294            OutOfCoreNeutronStream::new(
295                batcher,
296                algorithm,
297                clustering.clone(),
298                extraction.clone(),
299                params.clone(),
300            ),
301        )))
302    }
303}
304
305fn build_threaded_stream<I>(
306    batcher: PulseBatcher<I>,
307    algorithm: ClusteringAlgorithm,
308    clustering: ClusteringConfig,
309    extraction: ExtractionConfig,
310    params: AlgorithmParams,
311    parallelism: usize,
312    queue_depth: usize,
313) -> ThreadedOutOfCoreNeutronStream
314where
315    I: Iterator<Item = crate::reader::EventBatch> + Send + 'static,
316{
317    let (group_tx, group_rx) = mpsc::sync_channel::<PulseBatchGroup>(queue_depth);
318    let (out_tx, out_rx) = mpsc::sync_channel::<Result<PulseNeutronBatch>>(queue_depth);
319    let cancel = Arc::new(AtomicBool::new(false));
320    let cancel_reader = Arc::clone(&cancel);
321    let cancel_worker = Arc::clone(&cancel);
322
323    let reader_handle = thread::spawn(move || {
324        for group in batcher {
325            if cancel_reader.load(Ordering::Relaxed) {
326                break;
327            }
328            let mut pending = group;
329            loop {
330                if cancel_reader.load(Ordering::Relaxed) {
331                    return;
332                }
333                match group_tx.try_send(pending) {
334                    Ok(()) => break,
335                    Err(mpsc::TrySendError::Disconnected(_)) => return,
336                    Err(mpsc::TrySendError::Full(group)) => {
337                        pending = group;
338                        thread::sleep(Duration::from_millis(1));
339                    }
340                }
341            }
342        }
343    });
344
345    let worker_handle = thread::spawn(move || {
346        let pool = if parallelism > 1 {
347            rayon::ThreadPoolBuilder::new()
348                .num_threads(parallelism)
349                .build()
350                .ok()
351        } else {
352            None
353        };
354
355        loop {
356            if cancel_worker.load(Ordering::Relaxed) {
357                break;
358            }
359            let group = match group_rx.recv_timeout(Duration::from_millis(50)) {
360                Ok(group) => group,
361                Err(mpsc::RecvTimeoutError::Timeout) => continue,
362                Err(mpsc::RecvTimeoutError::Disconnected) => break,
363            };
364
365            if cancel_worker.load(Ordering::Relaxed) {
366                break;
367            }
368
369            let result = if let Some(pool) = &pool {
370                pool.install(|| {
371                    process_group(group, algorithm, &clustering, &extraction, &params, true)
372                })
373            } else {
374                process_group(group, algorithm, &clustering, &extraction, &params, false)
375            };
376
377            match result {
378                Ok(group_batches) => {
379                    for batch in group_batches {
380                        if cancel_worker.load(Ordering::Relaxed) {
381                            return;
382                        }
383                        if out_tx.send(Ok(batch)).is_err() {
384                            return;
385                        }
386                    }
387                }
388                Err(err) => {
389                    let _ = out_tx.send(Err(err));
390                    return;
391                }
392            }
393        }
394    });
395
396    ThreadedOutOfCoreNeutronStream {
397        receiver: out_rx,
398        handles: vec![reader_handle, worker_handle],
399        cancel,
400    }
401}
402
403fn process_group(
404    group: PulseBatchGroup,
405    algorithm: ClusteringAlgorithm,
406    clustering: &ClusteringConfig,
407    extraction: &ExtractionConfig,
408    params: &AlgorithmParams,
409    parallel: bool,
410) -> Result<Vec<PulseNeutronBatch>> {
411    let slice_results: Vec<Result<SliceOutput>> = if parallel {
412        group
413            .slices
414            .into_par_iter()
415            .map(|slice| process_slice_output(slice, algorithm, clustering, extraction, params))
416            .collect()
417    } else {
418        group
419            .slices
420            .into_iter()
421            .map(|slice| process_slice_output(slice, algorithm, clustering, extraction, params))
422            .collect()
423    };
424
425    let mut outputs = Vec::with_capacity(slice_results.len());
426    for result in slice_results {
427        outputs.push(result?);
428    }
429
430    let mut batches = Vec::new();
431    let mut current_tdc: Option<u64> = None;
432    let mut current_batch = PulseNeutronBatch {
433        tdc_timestamp_25ns: 0,
434        hits_processed: 0,
435        neutrons: NeutronBatch::default(),
436    };
437
438    for output in outputs {
439        if current_tdc != Some(output.tdc_timestamp_25ns) {
440            if current_tdc.is_some()
441                && (current_batch.hits_processed > 0 || !current_batch.neutrons.is_empty())
442            {
443                batches.push(current_batch);
444            }
445            current_batch = PulseNeutronBatch {
446                tdc_timestamp_25ns: output.tdc_timestamp_25ns,
447                hits_processed: 0,
448                neutrons: NeutronBatch::default(),
449            };
450            current_tdc = Some(output.tdc_timestamp_25ns);
451        }
452
453        current_batch.neutrons.append(&output.neutrons);
454        current_batch.hits_processed = current_batch
455            .hits_processed
456            .saturating_add(output.hits_processed);
457    }
458
459    if current_tdc.is_some()
460        && (current_batch.hits_processed > 0 || !current_batch.neutrons.is_empty())
461    {
462        batches.push(current_batch);
463    }
464
465    Ok(batches)
466}
467
468fn process_slice_output(
469    slice: PulseSlice,
470    algorithm: ClusteringAlgorithm,
471    clustering: &ClusteringConfig,
472    extraction: &ExtractionConfig,
473    params: &AlgorithmParams,
474) -> Result<SliceOutput> {
475    let mut hits = slice.hits;
476    let mut neutrons =
477        cluster_and_extract_batch(&mut hits, algorithm, clustering, extraction, params)
478            .map_err(Error::CoreError)?;
479
480    let emitted_hits = count_emitted_hits(&hits, slice.emit_cutoff_tof);
481    if slice.emit_cutoff_tof != u32::MAX {
482        neutrons = filter_neutrons_by_tof(&neutrons, slice.emit_cutoff_tof);
483    }
484
485    Ok(SliceOutput {
486        tdc_timestamp_25ns: slice.tdc_timestamp_25ns,
487        hits_processed: emitted_hits,
488        neutrons,
489    })
490}
491
492fn filter_neutrons_by_tof(neutrons: &NeutronBatch, cutoff_tof: u32) -> NeutronBatch {
493    let mut filtered = NeutronBatch::with_capacity(neutrons.len());
494    for i in 0..neutrons.len() {
495        if neutrons.tof[i] <= cutoff_tof {
496            push_neutron(&mut filtered, neutrons, i);
497        }
498    }
499    filtered
500}
501
502fn push_neutron(dest: &mut NeutronBatch, src: &NeutronBatch, idx: usize) {
503    dest.x.push(src.x[idx]);
504    dest.y.push(src.y[idx]);
505    dest.tof.push(src.tof[idx]);
506    dest.tot.push(src.tot[idx]);
507    dest.n_hits.push(src.n_hits[idx]);
508    dest.chip_id.push(src.chip_id[idx]);
509}
510
511fn count_emitted_hits(hits: &HitBatch, cutoff: u32) -> usize {
512    if hits.is_empty() {
513        return 0;
514    }
515    hits.tof.partition_point(|&tof| tof <= cutoff)
516}
517
518#[cfg(test)]
519mod tests {
520    use super::*;
521    use crate::reader::EventBatch;
522    use rustpix_core::soa::HitRecord;
523
524    fn make_event_batch(tdc: u64, hits: &[HitRecord]) -> EventBatch {
525        let mut batch = HitBatch::with_capacity(hits.len());
526        for &hit in hits {
527            batch.push(hit);
528        }
529        batch.sort_by_tof();
530        EventBatch {
531            tdc_timestamp_25ns: tdc,
532            hits: batch,
533        }
534    }
535
536    fn collect_neutrons<I>(iter: I) -> Vec<u32>
537    where
538        I: Iterator<Item = Result<PulseNeutronBatch>>,
539    {
540        let mut tofs = Vec::new();
541        for batch in iter {
542            let batch = batch.unwrap();
543            for tof in batch.neutrons.tof {
544                tofs.push(tof);
545            }
546        }
547        tofs.sort_unstable();
548        tofs
549    }
550
551    #[test]
552    fn out_of_core_matches_pulse_processing() {
553        let pulses_for_stream = vec![
554            make_event_batch(1, &[(1, 1, 10, 1, 10, 0), (100, 100, 20, 1, 20, 0)]),
555            make_event_batch(2, &[(50, 50, 30, 1, 30, 0)]),
556        ];
557        let pulses_for_expected = vec![
558            make_event_batch(1, &[(1, 1, 10, 1, 10, 0), (100, 100, 20, 1, 20, 0)]),
559            make_event_batch(2, &[(50, 50, 30, 1, 30, 0)]),
560        ];
561
562        let config = OutOfCoreConfig::default().with_memory_budget_bytes(10_000);
563        let batcher =
564            crate::out_of_core::PulseBatcher::new(pulses_for_stream.into_iter(), &config, 0)
565                .unwrap();
566
567        let clustering = ClusteringConfig {
568            radius: 1.0,
569            temporal_window_ns: 25.0,
570            min_cluster_size: 1,
571            max_cluster_size: None,
572        };
573        let extraction = ExtractionConfig::default();
574        let params = AlgorithmParams::default();
575
576        let stream = OutOfCoreNeutronStream::new(
577            batcher,
578            ClusteringAlgorithm::Grid,
579            clustering.clone(),
580            extraction.clone(),
581            params.clone(),
582        );
583        let ooc_tofs = collect_neutrons(stream);
584
585        let mut expected = Vec::new();
586        for mut pulse in pulses_for_expected {
587            let neutrons = cluster_and_extract_batch(
588                &mut pulse.hits,
589                ClusteringAlgorithm::Grid,
590                &clustering,
591                &extraction,
592                &params,
593            )
594            .unwrap();
595            expected.extend(neutrons.tof);
596        }
597        expected.sort_unstable();
598
599        assert_eq!(ooc_tofs, expected);
600    }
601
602    #[test]
603    fn out_of_core_split_pulse_preserves_hits() {
604        let hits = [
605            (1, 1, 1, 1, 1, 0),
606            (50, 50, 2, 1, 2, 0),
607            (100, 100, 3, 1, 3, 0),
608            (150, 150, 4, 1, 4, 0),
609            (200, 200, 5, 1, 5, 0),
610        ];
611        let pulse_for_stream = make_event_batch(7, &hits);
612        let mut pulse_for_expected = make_event_batch(7, &hits);
613
614        let config = OutOfCoreConfig::default().with_memory_budget_bytes(32);
615        let batcher =
616            crate::out_of_core::PulseBatcher::new(vec![pulse_for_stream].into_iter(), &config, 1)
617                .unwrap();
618
619        let clustering = ClusteringConfig {
620            radius: 1.0,
621            temporal_window_ns: 25.0,
622            min_cluster_size: 1,
623            max_cluster_size: None,
624        };
625        let extraction = ExtractionConfig::default();
626        let params = AlgorithmParams::default();
627
628        let stream = OutOfCoreNeutronStream::new(
629            batcher,
630            ClusteringAlgorithm::Grid,
631            clustering.clone(),
632            extraction.clone(),
633            params.clone(),
634        );
635        let ooc_tofs = collect_neutrons(stream);
636
637        let mut expected = Vec::new();
638        let neutrons = cluster_and_extract_batch(
639            &mut pulse_for_expected.hits,
640            ClusteringAlgorithm::Grid,
641            &clustering,
642            &extraction,
643            &params,
644        )
645        .unwrap();
646        expected.extend(neutrons.tof);
647        expected.sort_unstable();
648
649        assert_eq!(ooc_tofs, expected);
650    }
651
652    #[test]
653    fn out_of_core_counts_hits_without_double_counting() {
654        let hits = [
655            (1, 1, 1, 1, 1, 0),
656            (50, 50, 2, 1, 2, 0),
657            (100, 100, 3, 1, 3, 0),
658            (150, 150, 4, 1, 4, 0),
659            (200, 200, 5, 1, 5, 0),
660        ];
661        let pulse = make_event_batch(7, &hits);
662
663        let config = OutOfCoreConfig::default().with_memory_budget_bytes(32);
664        let batcher =
665            crate::out_of_core::PulseBatcher::new(vec![pulse].into_iter(), &config, 1).unwrap();
666
667        let clustering = ClusteringConfig {
668            radius: 1.0,
669            temporal_window_ns: 25.0,
670            min_cluster_size: 1,
671            max_cluster_size: None,
672        };
673        let extraction = ExtractionConfig::default();
674        let params = AlgorithmParams::default();
675
676        let stream = OutOfCoreNeutronStream::new(
677            batcher,
678            ClusteringAlgorithm::Grid,
679            clustering,
680            extraction,
681            params,
682        );
683
684        let total_hits: usize = stream.map(|batch| batch.unwrap().hits_processed).sum();
685        assert_eq!(total_hits, hits.len());
686    }
687
688    #[test]
689    fn out_of_core_threaded_matches_single() {
690        let config = OutOfCoreConfig::default().with_memory_budget_bytes(10_000);
691        let batcher = crate::out_of_core::PulseBatcher::new(
692            vec![
693                make_event_batch(1, &[(1, 1, 10, 1, 10, 0), (100, 100, 20, 1, 20, 0)]),
694                make_event_batch(2, &[(50, 50, 30, 1, 30, 0)]),
695            ]
696            .into_iter(),
697            &config,
698            0,
699        )
700        .unwrap();
701
702        let clustering = ClusteringConfig {
703            radius: 1.0,
704            temporal_window_ns: 25.0,
705            min_cluster_size: 1,
706            max_cluster_size: None,
707        };
708        let extraction = ExtractionConfig::default();
709        let params = AlgorithmParams::default();
710
711        let threaded = build_threaded_stream(
712            batcher,
713            ClusteringAlgorithm::Grid,
714            clustering.clone(),
715            extraction.clone(),
716            params.clone(),
717            2,
718            1,
719        );
720        let threaded_tofs = collect_neutrons(threaded);
721
722        let mut expected = Vec::new();
723        for mut pulse in [
724            make_event_batch(1, &[(1, 1, 10, 1, 10, 0), (100, 100, 20, 1, 20, 0)]),
725            make_event_batch(2, &[(50, 50, 30, 1, 30, 0)]),
726        ] {
727            let neutrons = cluster_and_extract_batch(
728                &mut pulse.hits,
729                ClusteringAlgorithm::Grid,
730                &clustering,
731                &extraction,
732                &params,
733            )
734            .unwrap();
735            expected.extend(neutrons.tof);
736        }
737        expected.sort_unstable();
738
739        assert_eq!(threaded_tofs, expected);
740    }
741}