fastpasta/
lib.rs

1#![warn(unused_extern_crates)]
2#![warn(missing_docs)]
3#![warn(missing_copy_implementations)]
4// Readability lints
5#![warn(
6    clippy::option_filter_map,
7    clippy::manual_filter_map,
8    clippy::if_not_else,
9    clippy::nonminimal_bool,
10    clippy::single_match_else,
11    clippy::range_plus_one,
12    clippy::int_plus_one,
13    clippy::needless_range_loop,
14    clippy::needless_continue,
15    clippy::shadow_same,
16    clippy::shadow_unrelated
17)]
18// Performance lints
19#![warn(variant_size_differences)]
20#![warn(
21    clippy::needless_pass_by_value,
22    clippy::unnecessary_wraps,
23    clippy::mutex_integer,
24    clippy::mem_forget,
25    clippy::maybe_infinite_iter
26)]
27// Safety lints
28#![warn(unused_results)]
29#![warn(unused_import_braces)]
30#![warn(trivial_casts, trivial_numeric_casts)]
31// Unhandled results (allow unwrap and expect as there are many cases where the unwrap is totally safe)
32#![warn(clippy::map_unwrap_or)]
33
34//! fast Protocol Analysis Scanner Tool for ALICE (fastPASTA), for reading and checking raw binary data from ALICE detectors
35//!
36//! # Usage
37//!
38//! ## Reading data from file and performing checks
39//! ```shell
40//! # Enable all generic checks: `sanity` (stateless) AND `running` (stateful)
41//! $ fastpasta <input_file> check all
42//!
43//! # Same as above but only enable `sanity` checks, and only check data from link 0
44//! $ fastpasta <input_file>  check sanity -f 0
45//!```
46//! ## Enable all `sanity` and `running` checks and include checks applicable to `ITS` only
47//! ```shell
48//! $ fastpasta <input_file> check all ITS
49//! ```
50//! ## Filter link 3 and check `sanity` include sanity checks specific to ITS
51//! ```shell
52//! # target `its` is case-insensitive
53//! $ fastpasta <input_file> -f 3 check sanity its
54//! ```
55//!
56//! ## Reading data from stdin and performing all checks that applies to ITS
57//!
58//! ```shell
59//! $ cat <input_file> | fastpasta check all ITS
60//! ```
61//!
62//! ## reading data from one file, filtering by link 3 and and writing to another
63//!
64//! ```bash
65//! $ fastpasta <input_file> --filter-link 3 -o <output_file>
66//! ```
67//!
68//! ## Reading from stdin and filtering by link ID and writing to stdout
69//! Writing to stdout is implicit when no checks or views are specified
70//! ```bash
71//! $ fastpasta <input_file> --filter-link 3
72//! ```
73//!
74//! ## Reading from file and printing a view of RDHs
75//!
76//! ```bash
77//! $ fastpasta <input_file> view rdh
78//! ```
79
80use crate::util::*;
81use analyze::validators::rdh::Rdh0Validator;
82
83/// Write an error message to stderr.
84/// All error messages should be written through this function to ensure consistency.
85#[inline]
86pub fn display_error(err_msg: &str) {
87    log::error!("{}", owo_colors::OwoColorize::red(&err_msg));
88}
89
90pub mod analyze;
91pub mod config;
92pub mod controller;
93pub mod init;
94pub mod stats;
95pub mod util;
96pub mod words;
97pub mod write;
98
99/// Does the initial setup for input data processing
100#[allow(clippy::needless_pass_by_value)] // We need to pass the reader by value to avoid lifetime issues (thread just spins) unless user drops the sender after calling which is not intuitive
101pub fn init_processing(
102    config: &'static impl Config,
103    mut reader: Box<dyn BufferedReaderWrapper>,
104    stat_send: flume::Sender<StatType>,
105    stop_flag: Arc<atomic::AtomicBool>,
106) -> io::Result<()> {
107    // Load the first few bytes that should contain RDH0 and do a basic sanity check before continuing.
108    // Early exit if the check fails.
109    let rdh0 = Rdh0::load(&mut reader).expect("Failed to read first RDH0");
110    if let Err(e) = Rdh0Validator::default().sanity_check(&rdh0) {
111        return Err(io::Error::new(
112            io::ErrorKind::InvalidData,
113            format!("Initial RDH0 deserialization failed sanity check: {e}"),
114        ));
115    }
116    // Determine RDH version
117    let rdh_version = rdh0.header_id;
118
119    // Send RDH version to stats thread
120    stat_send.send(StatType::RdhVersion(rdh_version)).unwrap();
121
122    // Create a receiver/sender channel for the stats that the InputScanner sends.
123    let (input_stats_send, input_stats_recv): (
124        flume::Sender<InputStatType>,
125        flume::Receiver<InputStatType>,
126    ) = flume::unbounded();
127    // Create input scanner from the already read RDH0 (to avoid seeking back and reading it twice, which would also break with stdin piping)
128    let loader = InputScanner::new_from_rdh0(config, reader, Some(input_stats_send), rdh0);
129
130    // Choose the rest of the execution based on the RDH version
131    // Necessary to prevent heap allocation and allow static dispatch as the type cannot be known at compile time
132    match rdh_version {
133        // Attempt to parse RDHs with version field in the range 3-100
134        // Upper limit is 100 and not just max of u8 (255) because:
135        //      1. Unlikely there will ever be an RDH version higher than that
136        //      2. High values decoded from this field (especially 255) is typically a sign that the data is not actually ALICE data so early exit is preferred
137        3..=100 => {
138            match process::<RdhCru, 100>(
139                config,
140                loader,
141                Some(&input_stats_recv),
142                &stat_send,
143                stop_flag,
144            ) {
145                Ok(_) => Ok(()),
146                Err(e) => {
147                    stat_send
148                        .send(StatType::Fatal(e.to_string().into()))
149                        .unwrap();
150                    Err(e)
151                }
152            }
153        }
154        _ => Err(io::Error::new(
155            io::ErrorKind::InvalidData,
156            format!("Unknown RDH version: {rdh_version}"),
157        )),
158    }
159}
160
161/// Entry point for scanning the input and delegating to checkers, view generators and/or writers depending on [Config]
162///
163/// Follows these steps:
164/// 1. Setup reading (`file` or `stdin`) using [alice_protocol_reader::spawn_reader].
165/// 2. Depending on [Config] do one of:
166///     - Validate data by dispatching it to validators with [ValidatorDispatcher][crate::analyze::validators::validator_dispatcher::ValidatorDispatcher].
167///     - Generate views of data with [analyze::view::lib::generate_view].
168///     - Write data to `file` or `stdout` with [write::lib::spawn_writer].
169pub fn process<T: RDH + 'static, const CAP: usize>(
170    config: &'static impl Config,
171    loader: InputScanner<impl BufferedReaderWrapper + ?Sized + 'static>,
172    input_stats_recv: Option<&flume::Receiver<InputStatType>>,
173    stats_send: &flume::Sender<StatType>,
174    stop_flag: Arc<atomic::AtomicBool>,
175) -> io::Result<()> {
176    // 1. Launch reader thread to read data from file or stdin
177    let (reader_handle, reader_data_recv): (
178        thread::JoinHandle<()>,
179        crossbeam_channel::Receiver<CdpArray<T, CAP>>,
180    ) = alice_protocol_reader::spawn_reader(stop_flag.clone(), loader);
181
182    // 2. Launch analysis thread if an analysis action is set (view or check)
183    let analysis_handle = if config.check().is_some() || config.view().is_some() {
184        debug_assert!(config.output_mode() == DataOutputMode::None || config.filter_enabled(),);
185        let handle = analyze::lib::spawn_analysis(
186            config,
187            stop_flag.clone(),
188            stats_send.clone(),
189            reader_data_recv.clone(),
190        )?;
191        Some(handle)
192    } else {
193        None
194    };
195
196    // 3. Write data out only in the case where no analysis is performed and a filter link is set
197    let output_handle: Option<thread::JoinHandle<()>> = match (
198        config.check(),
199        config.view(),
200        config.filter_enabled(),
201        config.output_mode(),
202    ) {
203        (None, None, true, output_mode) if output_mode != DataOutputMode::None => Some(
204            write::lib::spawn_writer(config, stop_flag, reader_data_recv),
205        ),
206
207        (Some(_), None, _, output_mode) | (None, Some(_), _, output_mode)
208            if output_mode != DataOutputMode::None =>
209        {
210            log::warn!(
211                "Config: Output destination set when checks or views are also set -> output will be ignored!"
212            );
213            drop(reader_data_recv);
214            None
215        }
216        _ => {
217            drop(reader_data_recv);
218            None
219        }
220    };
221
222    // While loop breaks when an error is received from the channel, which means the channel is disconnected
223    if let Some(input_stats_recv_chan) = input_stats_recv.as_ref() {
224        forward_input_stats_to_stats_collector(input_stats_recv_chan, stats_send);
225    }
226    reader_handle.join().expect("Error joining reader thread");
227
228    if let Some(handle) = analysis_handle {
229        if let Err(e) = handle.join() {
230            log::error!("Analysis thread terminated early: {:#?}\n", e);
231        }
232    }
233    if let Some(output) = output_handle {
234        output.join().expect("Could not join writer thread");
235    }
236    Ok(())
237}
238
239// This is basically a "glue" function that takes the stats types that the reader sends
240// handles the transformation needed to send them in the format the the stats collector expects
241// and sends them
242fn forward_input_stats_to_stats_collector(
243    input_stats_recv: &flume::Receiver<InputStatType>,
244    stats_send: &flume::Sender<StatType>,
245) {
246    while let Ok(input_stat) = input_stats_recv.recv() {
247        match input_stat {
248            InputStatType::LinksObserved(val) => {
249                stats_send.send(StatType::LinksObserved(val)).unwrap()
250            }
251            InputStatType::FeeId(val) => stats_send.send(StatType::FeeId(val)).unwrap(),
252            InputStatType::RDHSeen(val) => stats_send.send(StatType::RDHSeen(val)).unwrap(),
253            InputStatType::PayloadSize(val) => stats_send.send(StatType::PayloadSize(val)).unwrap(),
254            InputStatType::RDHFiltered(val) => stats_send.send(StatType::RDHFiltered(val)).unwrap(),
255            InputStatType::RunTriggerType(val) => stats_send
256                .send(StatType::RunTriggerType((
257                    val,
258                    crate::analyze::view::lib::trigger_type_string_from_int(val),
259                )))
260                .unwrap(),
261            InputStatType::DataFormat(val) => stats_send.send(StatType::DataFormat(val)).unwrap(),
262            InputStatType::SystemId(sys_id) => {
263                match stats::SystemId::from_system_id(sys_id) {
264                    Ok(id) => {
265                        log::info!("{id} detected");
266                        stats_send.send(StatType::SystemId(id)).unwrap()
267                    }
268                    Err(e) => {
269                        log::error!("Failed to parse system ID: {e}");
270                        stats_send
271                            .send(StatType::Fatal("Failed to parse system ID".into()))
272                            .unwrap();
273                    }
274                };
275            }
276            InputStatType::Error(e) => stats_send.send(StatType::Error(e)).unwrap(),
277            InputStatType::Fatal(e) => stats_send.send(StatType::Fatal(e)).unwrap(),
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use alice_protocol_reader::init_reader;
286    use alice_protocol_reader::prelude::test_data::CORRECT_RDH_CRU_V7;
287    use pretty_assertions::{assert_eq, assert_ne};
288
289    static CFG_TEST_INIT_PROCESSING: OnceLock<MockConfig> = OnceLock::new();
290
291    #[test]
292    fn test_init_processing() {
293        // Setup Mock Config
294        let mut mock_config = MockConfig::new();
295        // Set input file from one of the files used for regression testing
296        mock_config.input_file = Some(PathBuf::from("../tests/test-data/10_rdh.raw"));
297
298        CFG_TEST_INIT_PROCESSING.set(mock_config).unwrap();
299
300        // Setup a reader
301        let reader = init_reader(CFG_TEST_INIT_PROCESSING.get().unwrap().input_file()).unwrap();
302
303        let (sender, receiver): (flume::Sender<StatType>, flume::Receiver<StatType>) =
304            flume::unbounded();
305
306        let stop_flag = Arc::new(AtomicBool::new(false));
307
308        // Act
309        init_processing(
310            CFG_TEST_INIT_PROCESSING.get().unwrap(),
311            reader,
312            sender,
313            stop_flag.clone(),
314        )
315        .unwrap();
316
317        // Receive all messages
318        let mut stats: Vec<StatType> = Vec::new();
319
320        while let Ok(stat) = receiver.recv() {
321            stats.push(stat);
322        }
323
324        // Assert
325        let mut is_rdh_version_detected_7 = false;
326        let mut how_many_rdh_seen = 0;
327
328        // Print all stats
329        for stat in stats {
330            match stat {
331                StatType::RdhVersion(7) => is_rdh_version_detected_7 = true,
332                StatType::RDHSeen(val) => how_many_rdh_seen += val,
333                StatType::Error(e) | StatType::Fatal(e) => {
334                    panic!("Error or Fatal: {}", e)
335                }
336                _ => (),
337            }
338        }
339
340        assert!(is_rdh_version_detected_7);
341        assert_eq!(how_many_rdh_seen, 10);
342        assert!(!stop_flag.load(Ordering::SeqCst));
343    }
344
345    static CFG_TEST_SPAWN_ANALYSIS: OnceLock<MockConfig> = OnceLock::new();
346
347    #[test]
348    fn test_spawn_analysis() {
349        // Setup Mock Config, no checks or views to be done
350        let mock_config = MockConfig::default();
351        CFG_TEST_SPAWN_ANALYSIS.set(mock_config).unwrap();
352        let (stat_sender, stat_receiver): (flume::Sender<StatType>, flume::Receiver<StatType>) =
353            flume::unbounded();
354        let (data_sender, data_receiver) = crossbeam_channel::unbounded();
355        let stop_flag = Arc::new(AtomicBool::new(false));
356        let mut cdp_batch: CdpArray<RdhCru, 1> = CdpArray::new();
357        cdp_batch.push(CORRECT_RDH_CRU_V7, Vec::new(), 0);
358
359        // Act
360        let handle = analyze::lib::spawn_analysis(
361            CFG_TEST_SPAWN_ANALYSIS.get().unwrap(),
362            stop_flag.clone(),
363            stat_sender,
364            data_receiver,
365        )
366        .unwrap();
367        data_sender.send(cdp_batch).unwrap();
368        drop(data_sender);
369        // Sleep to give the thread time to process the data
370        thread::sleep(Duration::from_millis(100));
371        stop_flag.store(true, Ordering::SeqCst);
372
373        // Receive all messages
374        let mut stats: Vec<StatType> = Vec::new();
375        while let Ok(stat) = stat_receiver.recv() {
376            stats.push(stat);
377        }
378
379        // Some stats should have been sent
380        assert_ne!(
381            stats.len(),
382            0,
383            "Expected some stats received, got: {stats:?}"
384        );
385        handle.join().unwrap();
386    }
387}