fastpasta 1.22.0

CLI for verifying or examining readout data from the ALICE detector.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
//! Contains the [Controller] that collects stats and reports errors.
//! It also controls the stop flag, which can be used to stop the program if a fatal error occurs, or if the config contains a max number of errors to tolerate.
//! Finally when the event loop breaks (at the end of execution), it will print a summary of the stats collected, using the Report struct.
//!
//! Also contains the convenience [init_controller] function, which spawns a thread with the [Controller] running, and returns the thread handle, the channel to send stats to, and the stop flag.

use super::*;
use crate::stats::err_printer::ErrPrinter;
use std::io::Write;

/// Spawns a thread with the [Controller] running, and returns the thread handle, the channel to send stats to, and the stop flag.
pub fn init_controller<C: Config + 'static>(
    config: &'static C,
) -> (
    JoinHandle<()>,
    flume::Sender<StatType>,
    Arc<AtomicBool>,
    Arc<AtomicBool>,
) {
    log::trace!("Initializing stats controller");
    let mut stats = Controller::new(config);
    let stats_send_chan = stats.send_channel();
    let thread_stop_flag = stats.end_processing_flag();
    let any_errors_flag = stats.any_errors_flag();

    let stats_thread = Builder::new()
        .name("stats_thread".to_string())
        .spawn(move || {
            stats.run();
        })
        .expect("Failed to spawn stats thread");
    (
        stats_thread,
        stats_send_chan,
        thread_stop_flag,
        any_errors_flag,
    )
}

/// The Controller receives stats and builds a summary report that is printed at the end of execution.
pub struct Controller<C: Config + 'static> {
    stats_collector: StatsCollector,
    /// Time from [Controller] is instantiated, to all data processing threads disconnected their [StatType] producer channel.
    pub processing_time: Instant,
    config: &'static C,
    max_tolerate_errors: u32,
    // The channel where stats are received from other threads.
    stats_recv_chan: flume::Receiver<StatType>,
    // The channel stats are sent through, stored so that a clone of the channel can be returned easily
    // Has to be an option so that it can be set to None when the event loop starts.
    // Once run is called no producers that don't already have a channel to send stats through, will be able to get one.
    // This is because the event loop breaks when all sender channels are dropped, and if the Controller keeps a reference to the channel, it will cause a deadlock.
    stats_send_chan: Option<flume::Sender<StatType>>,
    end_processing_flag: Arc<AtomicBool>,
    any_errors_flag: Arc<AtomicBool>,
    spinner: Option<ProgressBar>,
    spinner_message: String,
}
impl<C: Config + 'static> Controller<C> {
    /// Creates a new [Controller] from a [Config], a [flume::Receiver] for [StatType], and a [Arc] of an [AtomicBool] that is used to signal to other threads to exit if a fatal error occurs.
    pub fn new(global_config: &'static C) -> Self {
        let (stats_send_chan, stats_recv_chan): (
            flume::Sender<StatType>,
            flume::Receiver<StatType>,
        ) = flume::unbounded();
        Controller {
            // Only collect alpide stats if alpide checks are enabled
            stats_collector: if global_config.alpide_checks_enabled() {
                StatsCollector::with_alpide_stats()
            } else {
                StatsCollector::default()
            },
            config: global_config,
            processing_time: Instant::now(),
            max_tolerate_errors: global_config.max_tolerate_errors(),
            stats_recv_chan,
            stats_send_chan: Some(stats_send_chan),
            end_processing_flag: Arc::new(AtomicBool::new(false)),
            any_errors_flag: Arc::new(AtomicBool::new(false)),
            spinner: if global_config.view().is_some() {
                None
            } else {
                Some(new_styled_spinner())
            },
            spinner_message: String::new(),
        }
    }

    /// Returns a clone of the channel that is used to send stats to the Controller.
    pub fn send_channel(&self) -> flume::Sender<StatType> {
        if self.stats_send_chan.is_none() {
            log::error!("Controller send channel is none, most likely it is already running and does not accept new producers");
            panic!("Controller send channel is none, most likely it is already running and does not accept new producers");
        }
        self.stats_send_chan.as_ref().unwrap().clone()
    }

    /// Returns a cloned reference to the end processing flag.
    pub fn end_processing_flag(&self) -> Arc<AtomicBool> {
        self.end_processing_flag.clone()
    }

    /// Returns a cloned reference to the any errors flag
    ///
    /// The flag is set if there's any errors in the input data at end of processing.
    pub fn any_errors_flag(&self) -> Arc<AtomicBool> {
        self.any_errors_flag.clone()
    }

    /// Starts the event loop for the Controller
    /// This function will block until the channel is closed
    pub fn run(&mut self) {
        // Set the send stats channel to none so that no new producers can be added, and so the loop breaks when all producers have dropped their channel.
        self.stats_send_chan = None;

        // While loop breaks when an error is received from the channel, which means the channel is disconnected
        while let Ok(stats_update) = self.stats_recv_chan.recv() {
            self.update(stats_update);
        }

        if self.config.custom_checks_enabled() {
            self.stats_collector.validate_custom_stats(self.config);
        }

        // After processing all stats, print the summary report or don't if in view mode
        if self.config.view().is_some() || self.config.output_mode() == DataOutputMode::Stdout {
            // Avoid printing the report in the middle of a view, or if output is being redirected
            log::info!("View active or output is being piped, skipping report summary printout.")
        } else {
            self.process_stats();

            // Print the summary report if any RDHs were seen. If not, it's likely that an early error occurred and no data was processed.
            if self.stats_collector.any_rdhs_seen() {
                // New spinner/progress bar
                self.new_spinner_with_prefix("Generating report".to_string());
                self.print();
            }
        }
        if self.stats_collector.any_errors() {
            self.any_errors_flag.store(true, Ordering::SeqCst);
        }

        // Stats collector will serialize and write out stats if the config specifies it
        if self.config.stats_output_mode() != DataOutputMode::None {
            self.stats_collector.write_stats(
                &self.config.stats_output_mode(),
                self.config.stats_output_format().unwrap(),
            );
        }

        // User supplied a stats file to compare against, validate the match
        if let Some(input_stats) = self.config.input_stats_file() {
            log::info!("Validating input stats file against collected stats");
            let input_stats_str =
                fs::read_to_string(input_stats).expect("Failed to read input stats file");

            let input_stats_collector: StatsCollector = if input_stats.extension().unwrap()
                == "json"
            {
                serde_json::from_str(&input_stats_str)
                    .expect("Failed to deserialize input stats file")
            } else if input_stats.extension().unwrap() == "toml" {
                toml::from_str(&input_stats_str).expect("Failed to deserialize input stats file")
            } else {
                // Should've already been validated when parsing the command-line arguments
                panic!("Invalid input stats file extension, must be .json or .toml")
            };

            if self
                .stats_collector
                .validate_other_stats(&input_stats_collector, self.config.mute_errors())
                .is_err()
            {
                self.any_errors_flag.store(true, Ordering::SeqCst);
                log::warn!("Input stats did not match collected stats");
            } else {
                log::info!("Input stats matched collected stats");
            }
        }
    }

    fn update(&mut self, stat: StatType) {
        match stat {
            StatType::RDHSeen(_)
            | StatType::RDHFiltered(_)
            | StatType::PayloadSize(_)
            | StatType::LinksObserved(_)
            | StatType::RdhVersion(_)
            | StatType::DataFormat(_)
            | StatType::LayerStaveSeen { .. }
            | StatType::SystemId(_)
            | StatType::FeeId(_)
            | StatType::TriggerType(_)
            | StatType::AlpideStats(_) => {
                self.stats_collector.collect(stat);
            }
            StatType::HBFsSeen(_) => {
                self.stats_collector.collect(stat);
                if self.spinner.is_some() {
                    self.spinner.as_mut().unwrap().set_prefix(format!(
                        "Analyzing {hbfs} HBFs",
                        hbfs = self.stats_collector.hbfs_seen()
                    ))
                };
            }
            StatType::RunTriggerType((raw_tt, tt_str)) => {
                log::debug!("Run trigger type determined to be {raw_tt:#0x}: {tt_str}");
                self.stats_collector
                    .collect(StatType::RunTriggerType((raw_tt, tt_str)));
            }
            StatType::Error(msg) => {
                // Stop processing any error messages
                if self.stats_collector.any_fatal_err() {
                    log::trace!("Fatal error already seen, ignoring error: {msg}");
                    return;
                }

                self.stats_collector.collect(StatType::Error(msg));

                self.set_spinner_msg(
                    format!(
                        "{err_cnt} Errors in data!",
                        err_cnt = self.stats_collector.err_count()
                    )
                    .red()
                    .to_string(),
                );

                if self.max_tolerate_errors > 0 {
                    log::trace!("Error count: {}", self.stats_collector.err_count());
                    if self.stats_collector.err_count() == self.max_tolerate_errors as u64 {
                        log::trace!("Errors reached maximum tolerated errors, exiting...");
                        self.end_processing_flag.store(true, Ordering::SeqCst);
                    }
                }
            }
            StatType::Fatal(err) => {
                // Stop processing any error messages
                if self.stats_collector.any_fatal_err() {
                    log::trace!("Fatal error already seen, ignoring error: {err}");
                    return;
                }
                self.end_processing_flag.store(true, Ordering::SeqCst);
                log::error!("FATAL: {err}\nShutting down...");
                self.stats_collector.collect(StatType::Fatal(err));
            }
        }
    }

    fn process_stats(&mut self) {
        // New spinner/progress bar if there's any errors
        if self.stats_collector.err_count() > 0 {
            self.new_spinner_with_prefix(
                format!(
                    "Processing {err_count} error messages",
                    err_count = self.stats_collector.err_count()
                )
                .yellow()
                .to_string(),
            );
            self.stats_collector.finalize(self.config.mute_errors());
            self.spinner.as_mut().unwrap().abandon();
        } else {
            self.stats_collector.finalize(self.config.mute_errors());
        }

        if self.stats_collector.any_errors() && !self.config.mute_errors() {
            // Print the errors, limited if there's a max error limit set
            ErrPrinter::new(
                if self.config.max_tolerate_errors() > 0 {
                    Some(self.config.max_tolerate_errors())
                } else {
                    None
                },
                self.config.error_code_filter(),
            )
            .print(
                self.stats_collector.error_stats().errors_as_slice_iter(),
                self.stats_collector.unique_error_codes_as_slice(),
            );
        }
    }

    /// Builds and prints the report
    fn print(&mut self) {
        let mut report = stats::stats_report::make_report(
            self.processing_time.elapsed(),
            &mut self.stats_collector,
            self.config.filter_target(),
        );
        self.append_spinner_msg("... completed");
        if self.spinner.is_some() {
            self.spinner.as_mut().unwrap().abandon();
        }

        let mut lock = io::stdout().lock();
        if let Err(e) = writeln!(lock, "{}", report.format()) {
            if e.kind() == io::ErrorKind::BrokenPipe {
                log::warn!("Broken pipe, stdout was closed before report could be written");
            } else {
                log::error!("Failed to write report to stdout: {e}");
            }
        }
    }

    /// Add completed message to current spinner and abandon it
    /// Replace it with new spinner with an empty message
    /// Set the new spinners prefix message
    fn new_spinner_with_prefix(&mut self, prefix: String) {
        if self.spinner.is_some() {
            self.append_spinner_msg("... completed");
            self.spinner.as_mut().unwrap().abandon();
            self.spinner = Some(new_styled_spinner());
            self.spinner_message = "".to_string();
            self.spinner.as_mut().unwrap().set_prefix(prefix);
        } else {
            self.spinner = Some(new_styled_spinner());
            self.spinner_message = "".to_string();
            self.spinner.as_mut().unwrap().set_prefix(prefix);
        }
    }

    fn set_spinner_msg(&mut self, new_msg: String) {
        if self.spinner.is_some() {
            self.spinner_message = new_msg;
            self.spinner
                .as_mut()
                .unwrap()
                .set_message(self.spinner_message.clone());
        }
    }

    fn append_spinner_msg(&mut self, to_append: &str) {
        if self.spinner.is_some() {
            self.spinner_message = self.spinner_message.clone() + to_append + " ";
            self.spinner
                .as_mut()
                .unwrap()
                .set_message(self.spinner_message.clone());
        }
    }
}

fn new_styled_spinner() -> ProgressBar {
    let spinner_style =
        ProgressStyle::with_template("{spinner} [ {prefix:.bold.blue} ] {wide_msg}")
            .unwrap()
            .tick_strings(&[
                "▹▹▹▹▹",
                "▸▹▹▹▹",
                "▹▸▹▹▹",
                "▹▹▸▹▹",
                "▹▹▹▸▹",
                "▹▹▹▹▸",
                "▪▪▪▪▪",
            ]);
    let pb = ProgressBar::new_spinner();
    pb.set_style(spinner_style);
    pb.enable_steady_tick(Duration::from_millis(120));
    pb
}

#[cfg(test)]
mod tests {
    use super::*;

    static CONFIG_TEST_INIT_CONTROLLER: OnceLock<MockConfig> = OnceLock::new();

    #[test]
    fn test_init_controller() {
        let mock_config = MockConfig::default();
        CONFIG_TEST_INIT_CONTROLLER.set(mock_config).unwrap();

        let (handle, send_ch, stop_flag, _errors_flag) =
            init_controller(CONFIG_TEST_INIT_CONTROLLER.get().unwrap());

        // Stop flag should be false
        assert!(!stop_flag.load(Ordering::SeqCst));

        // Send RDH version seen
        send_ch.send(StatType::RdhVersion(7)).unwrap();

        // Send Data format seen
        send_ch.send(StatType::DataFormat(99)).unwrap();

        // Send Run Trigger Type
        send_ch
            .send(StatType::RunTriggerType((0xBEEF, "BEEF".to_owned().into())))
            .unwrap();

        // Send rdh seen stat
        send_ch.send(StatType::RDHSeen(1)).unwrap();

        // Send a fatal error that should cause the stop flag to be set
        send_ch
            .send(StatType::Fatal("Test fatal error".to_string().into()))
            .unwrap();

        // Stop the controller by dropping the sender channel
        drop(send_ch);

        // Wait for the controller to stop
        handle.join().unwrap();

        // Stop flag should be true
        assert!(stop_flag.load(Ordering::SeqCst));
    }
}