1#![warn(unused_extern_crates)]
2#![warn(missing_docs)]
3#![warn(missing_copy_implementations)]
4#![warn(
6 clippy::option_filter_map,
7 clippy::manual_filter_map,
8 clippy::if_not_else,
9 clippy::nonminimal_bool,
10 clippy::single_match_else,
11 clippy::range_plus_one,
12 clippy::int_plus_one,
13 clippy::needless_range_loop,
14 clippy::needless_continue,
15 clippy::shadow_same,
16 clippy::shadow_unrelated
17)]
18#![warn(variant_size_differences)]
20#![warn(
21 clippy::needless_pass_by_value,
22 clippy::unnecessary_wraps,
23 clippy::mutex_integer,
24 clippy::mem_forget,
25 clippy::maybe_infinite_iter
26)]
27#![warn(unused_results)]
29#![warn(unused_import_braces)]
30#![warn(trivial_casts, trivial_numeric_casts)]
31#![warn(clippy::map_unwrap_or)]
33
34use crate::util::*;
81use analyze::validators::rdh::Rdh0Validator;
82
83#[inline]
86pub fn display_error(err_msg: &str) {
87 log::error!("{}", owo_colors::OwoColorize::red(&err_msg));
88}
89
90pub mod analyze;
91pub mod config;
92pub mod controller;
93pub mod init;
94pub mod stats;
95pub mod util;
96pub mod words;
97pub mod write;
98
99#[allow(clippy::needless_pass_by_value)] pub fn init_processing(
102 config: &'static impl Config,
103 mut reader: Box<dyn BufferedReaderWrapper>,
104 stat_send: flume::Sender<StatType>,
105 stop_flag: Arc<atomic::AtomicBool>,
106) -> io::Result<()> {
107 let rdh0 = Rdh0::load(&mut reader).expect("Failed to read first RDH0");
110 if let Err(e) = Rdh0Validator::default().sanity_check(&rdh0) {
111 return Err(io::Error::new(
112 io::ErrorKind::InvalidData,
113 format!("Initial RDH0 deserialization failed sanity check: {e}"),
114 ));
115 }
116 let rdh_version = rdh0.header_id;
118
119 stat_send.send(StatType::RdhVersion(rdh_version)).unwrap();
121
122 let (input_stats_send, input_stats_recv): (
124 flume::Sender<InputStatType>,
125 flume::Receiver<InputStatType>,
126 ) = flume::unbounded();
127 let loader = InputScanner::new_from_rdh0(config, reader, Some(input_stats_send), rdh0);
129
130 match rdh_version {
133 3..=100 => {
138 match process::<RdhCru, 100>(
139 config,
140 loader,
141 Some(&input_stats_recv),
142 &stat_send,
143 stop_flag,
144 ) {
145 Ok(_) => Ok(()),
146 Err(e) => {
147 stat_send
148 .send(StatType::Fatal(e.to_string().into()))
149 .unwrap();
150 Err(e)
151 }
152 }
153 }
154 _ => Err(io::Error::new(
155 io::ErrorKind::InvalidData,
156 format!("Unknown RDH version: {rdh_version}"),
157 )),
158 }
159}
160
161pub fn process<T: RDH + 'static, const CAP: usize>(
170 config: &'static impl Config,
171 loader: InputScanner<impl BufferedReaderWrapper + ?Sized + 'static>,
172 input_stats_recv: Option<&flume::Receiver<InputStatType>>,
173 stats_send: &flume::Sender<StatType>,
174 stop_flag: Arc<atomic::AtomicBool>,
175) -> io::Result<()> {
176 let (reader_handle, reader_data_recv): (
178 thread::JoinHandle<()>,
179 crossbeam_channel::Receiver<CdpArray<T, CAP>>,
180 ) = alice_protocol_reader::spawn_reader(stop_flag.clone(), loader);
181
182 let analysis_handle = if config.check().is_some() || config.view().is_some() {
184 debug_assert!(config.output_mode() == DataOutputMode::None || config.filter_enabled(),);
185 let handle = analyze::lib::spawn_analysis(
186 config,
187 stop_flag.clone(),
188 stats_send.clone(),
189 reader_data_recv.clone(),
190 )?;
191 Some(handle)
192 } else {
193 None
194 };
195
196 let output_handle: Option<thread::JoinHandle<()>> = match (
198 config.check(),
199 config.view(),
200 config.filter_enabled(),
201 config.output_mode(),
202 ) {
203 (None, None, true, output_mode) if output_mode != DataOutputMode::None => Some(
204 write::lib::spawn_writer(config, stop_flag, reader_data_recv),
205 ),
206
207 (Some(_), None, _, output_mode) | (None, Some(_), _, output_mode)
208 if output_mode != DataOutputMode::None =>
209 {
210 log::warn!(
211 "Config: Output destination set when checks or views are also set -> output will be ignored!"
212 );
213 drop(reader_data_recv);
214 None
215 }
216 _ => {
217 drop(reader_data_recv);
218 None
219 }
220 };
221
222 if let Some(input_stats_recv_chan) = input_stats_recv.as_ref() {
224 forward_input_stats_to_stats_collector(input_stats_recv_chan, stats_send);
225 }
226 reader_handle.join().expect("Error joining reader thread");
227
228 if let Some(handle) = analysis_handle {
229 if let Err(e) = handle.join() {
230 log::error!("Analysis thread terminated early: {:#?}\n", e);
231 }
232 }
233 if let Some(output) = output_handle {
234 output.join().expect("Could not join writer thread");
235 }
236 Ok(())
237}
238
239fn forward_input_stats_to_stats_collector(
243 input_stats_recv: &flume::Receiver<InputStatType>,
244 stats_send: &flume::Sender<StatType>,
245) {
246 while let Ok(input_stat) = input_stats_recv.recv() {
247 match input_stat {
248 InputStatType::LinksObserved(val) => {
249 stats_send.send(StatType::LinksObserved(val)).unwrap()
250 }
251 InputStatType::FeeId(val) => stats_send.send(StatType::FeeId(val)).unwrap(),
252 InputStatType::RDHSeen(val) => stats_send.send(StatType::RDHSeen(val)).unwrap(),
253 InputStatType::PayloadSize(val) => stats_send.send(StatType::PayloadSize(val)).unwrap(),
254 InputStatType::RDHFiltered(val) => stats_send.send(StatType::RDHFiltered(val)).unwrap(),
255 InputStatType::RunTriggerType(val) => stats_send
256 .send(StatType::RunTriggerType((
257 val,
258 crate::analyze::view::lib::trigger_type_string_from_int(val),
259 )))
260 .unwrap(),
261 InputStatType::DataFormat(val) => stats_send.send(StatType::DataFormat(val)).unwrap(),
262 InputStatType::SystemId(sys_id) => {
263 match stats::SystemId::from_system_id(sys_id) {
264 Ok(id) => {
265 log::info!("{id} detected");
266 stats_send.send(StatType::SystemId(id)).unwrap()
267 }
268 Err(e) => {
269 log::error!("Failed to parse system ID: {e}");
270 stats_send
271 .send(StatType::Fatal("Failed to parse system ID".into()))
272 .unwrap();
273 }
274 };
275 }
276 InputStatType::Error(e) => stats_send.send(StatType::Error(e)).unwrap(),
277 InputStatType::Fatal(e) => stats_send.send(StatType::Fatal(e)).unwrap(),
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use alice_protocol_reader::init_reader;
286 use alice_protocol_reader::prelude::test_data::CORRECT_RDH_CRU_V7;
287 use pretty_assertions::{assert_eq, assert_ne};
288
289 static CFG_TEST_INIT_PROCESSING: OnceLock<MockConfig> = OnceLock::new();
290
291 #[test]
292 fn test_init_processing() {
293 let mut mock_config = MockConfig::new();
295 mock_config.input_file = Some(PathBuf::from("../tests/test-data/10_rdh.raw"));
297
298 CFG_TEST_INIT_PROCESSING.set(mock_config).unwrap();
299
300 let reader = init_reader(CFG_TEST_INIT_PROCESSING.get().unwrap().input_file()).unwrap();
302
303 let (sender, receiver): (flume::Sender<StatType>, flume::Receiver<StatType>) =
304 flume::unbounded();
305
306 let stop_flag = Arc::new(AtomicBool::new(false));
307
308 init_processing(
310 CFG_TEST_INIT_PROCESSING.get().unwrap(),
311 reader,
312 sender,
313 stop_flag.clone(),
314 )
315 .unwrap();
316
317 let mut stats: Vec<StatType> = Vec::new();
319
320 while let Ok(stat) = receiver.recv() {
321 stats.push(stat);
322 }
323
324 let mut is_rdh_version_detected_7 = false;
326 let mut how_many_rdh_seen = 0;
327
328 for stat in stats {
330 match stat {
331 StatType::RdhVersion(7) => is_rdh_version_detected_7 = true,
332 StatType::RDHSeen(val) => how_many_rdh_seen += val,
333 StatType::Error(e) | StatType::Fatal(e) => {
334 panic!("Error or Fatal: {}", e)
335 }
336 _ => (),
337 }
338 }
339
340 assert!(is_rdh_version_detected_7);
341 assert_eq!(how_many_rdh_seen, 10);
342 assert!(!stop_flag.load(Ordering::SeqCst));
343 }
344
345 static CFG_TEST_SPAWN_ANALYSIS: OnceLock<MockConfig> = OnceLock::new();
346
347 #[test]
348 fn test_spawn_analysis() {
349 let mock_config = MockConfig::default();
351 CFG_TEST_SPAWN_ANALYSIS.set(mock_config).unwrap();
352 let (stat_sender, stat_receiver): (flume::Sender<StatType>, flume::Receiver<StatType>) =
353 flume::unbounded();
354 let (data_sender, data_receiver) = crossbeam_channel::unbounded();
355 let stop_flag = Arc::new(AtomicBool::new(false));
356 let mut cdp_batch: CdpArray<RdhCru, 1> = CdpArray::new();
357 cdp_batch.push(CORRECT_RDH_CRU_V7, Vec::new(), 0);
358
359 let handle = analyze::lib::spawn_analysis(
361 CFG_TEST_SPAWN_ANALYSIS.get().unwrap(),
362 stop_flag.clone(),
363 stat_sender,
364 data_receiver,
365 )
366 .unwrap();
367 data_sender.send(cdp_batch).unwrap();
368 drop(data_sender);
369 thread::sleep(Duration::from_millis(100));
371 stop_flag.store(true, Ordering::SeqCst);
372
373 let mut stats: Vec<StatType> = Vec::new();
375 while let Ok(stat) = stat_receiver.recv() {
376 stats.push(stat);
377 }
378
379 assert_ne!(
381 stats.len(),
382 0,
383 "Expected some stats received, got: {stats:?}"
384 );
385 handle.join().unwrap();
386 }
387}