#![warn(unused_extern_crates)]
#![warn(missing_docs)]
#![warn(missing_copy_implementations)]
#![warn(
clippy::option_filter_map,
clippy::manual_filter_map,
clippy::if_not_else,
clippy::nonminimal_bool,
clippy::single_match_else,
clippy::range_plus_one,
clippy::int_plus_one,
clippy::needless_range_loop,
clippy::needless_continue,
clippy::shadow_same,
clippy::shadow_unrelated
)]
#![warn(variant_size_differences)]
#![warn(
clippy::needless_pass_by_value,
clippy::unnecessary_wraps,
clippy::mutex_integer,
clippy::mem_forget,
clippy::maybe_infinite_iter
)]
#![warn(unused_results)]
#![warn(unused_import_braces)]
#![warn(trivial_casts, trivial_numeric_casts)]
#![warn(clippy::map_unwrap_or)]
use crate::util::*;
use analyze::validators::rdh::Rdh0Validator;
#[inline]
pub fn display_error(err_msg: &str) {
log::error!("{}", owo_colors::OwoColorize::red(&err_msg));
}
pub mod analyze;
pub mod config;
pub mod controller;
pub mod init;
pub mod stats;
pub mod util;
pub mod words;
pub mod write;
#[allow(clippy::needless_pass_by_value)] pub fn init_processing(
config: &'static impl Config,
mut reader: Box<dyn BufferedReaderWrapper>,
stat_send: flume::Sender<StatType>,
stop_flag: Arc<atomic::AtomicBool>,
) -> io::Result<()> {
let rdh0 = Rdh0::load(&mut reader).expect("Failed to read first RDH0");
if let Err(e) = Rdh0Validator::default().sanity_check(&rdh0) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Initial RDH0 deserialization failed sanity check: {e}"),
));
}
let rdh_version = rdh0.header_id;
stat_send.send(StatType::RdhVersion(rdh_version)).unwrap();
let (input_stats_send, input_stats_recv): (
flume::Sender<InputStatType>,
flume::Receiver<InputStatType>,
) = flume::unbounded();
let loader = InputScanner::new_from_rdh0(config, reader, Some(input_stats_send), rdh0);
match rdh_version {
3..=100 => {
match process::<RdhCru, 100>(
config,
loader,
Some(&input_stats_recv),
&stat_send,
stop_flag,
) {
Ok(_) => Ok(()),
Err(e) => {
stat_send
.send(StatType::Fatal(e.to_string().into()))
.unwrap();
Err(e)
}
}
}
_ => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Unknown RDH version: {rdh_version}"),
)),
}
}
pub fn process<T: RDH + 'static, const CAP: usize>(
config: &'static impl Config,
loader: InputScanner<impl BufferedReaderWrapper + ?Sized + 'static>,
input_stats_recv: Option<&flume::Receiver<InputStatType>>,
stats_send: &flume::Sender<StatType>,
stop_flag: Arc<atomic::AtomicBool>,
) -> io::Result<()> {
let (reader_handle, reader_data_recv): (
thread::JoinHandle<()>,
crossbeam_channel::Receiver<CdpArray<T, CAP>>,
) = alice_protocol_reader::spawn_reader(stop_flag.clone(), loader);
let analysis_handle = if config.check().is_some() || config.view().is_some() {
debug_assert!(config.output_mode() == DataOutputMode::None || config.filter_enabled(),);
let handle = analyze::lib::spawn_analysis(
config,
stop_flag.clone(),
stats_send.clone(),
reader_data_recv.clone(),
)?;
Some(handle)
} else {
None
};
let output_handle: Option<thread::JoinHandle<()>> = match (
config.check(),
config.view(),
config.filter_enabled(),
config.output_mode(),
) {
(None, None, true, output_mode) if output_mode != DataOutputMode::None => Some(
write::lib::spawn_writer(config, stop_flag, reader_data_recv),
),
(Some(_), None, _, output_mode) | (None, Some(_), _, output_mode)
if output_mode != DataOutputMode::None =>
{
log::warn!(
"Config: Output destination set when checks or views are also set -> output will be ignored!"
);
drop(reader_data_recv);
None
}
_ => {
drop(reader_data_recv);
None
}
};
if let Some(input_stats_recv_chan) = input_stats_recv.as_ref() {
forward_input_stats_to_stats_collector(input_stats_recv_chan, stats_send);
}
reader_handle.join().expect("Error joining reader thread");
if let Some(handle) = analysis_handle {
if let Err(e) = handle.join() {
log::error!("Analysis thread terminated early: {:#?}\n", e);
}
}
if let Some(output) = output_handle {
output.join().expect("Could not join writer thread");
}
Ok(())
}
fn forward_input_stats_to_stats_collector(
input_stats_recv: &flume::Receiver<InputStatType>,
stats_send: &flume::Sender<StatType>,
) {
while let Ok(input_stat) = input_stats_recv.recv() {
match input_stat {
InputStatType::LinksObserved(val) => {
stats_send.send(StatType::LinksObserved(val)).unwrap()
}
InputStatType::FeeId(val) => stats_send.send(StatType::FeeId(val)).unwrap(),
InputStatType::RDHSeen(val) => stats_send.send(StatType::RDHSeen(val)).unwrap(),
InputStatType::PayloadSize(val) => stats_send.send(StatType::PayloadSize(val)).unwrap(),
InputStatType::RDHFiltered(val) => stats_send.send(StatType::RDHFiltered(val)).unwrap(),
InputStatType::RunTriggerType(val) => stats_send
.send(StatType::RunTriggerType((
val,
crate::analyze::view::lib::trigger_type_string_from_int(val),
)))
.unwrap(),
InputStatType::DataFormat(val) => stats_send.send(StatType::DataFormat(val)).unwrap(),
InputStatType::SystemId(sys_id) => {
match stats::SystemId::from_system_id(sys_id) {
Ok(id) => {
log::info!("{id} detected");
stats_send.send(StatType::SystemId(id)).unwrap()
}
Err(e) => {
log::error!("Failed to parse system ID: {e}");
stats_send
.send(StatType::Fatal("Failed to parse system ID".into()))
.unwrap();
}
};
}
InputStatType::Error(e) => stats_send.send(StatType::Error(e)).unwrap(),
InputStatType::Fatal(e) => stats_send.send(StatType::Fatal(e)).unwrap(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use alice_protocol_reader::init_reader;
use alice_protocol_reader::prelude::test_data::CORRECT_RDH_CRU_V7;
use pretty_assertions::{assert_eq, assert_ne};
static CFG_TEST_INIT_PROCESSING: OnceLock<MockConfig> = OnceLock::new();
#[test]
fn test_init_processing() {
let mut mock_config = MockConfig::new();
mock_config.input_file = Some(PathBuf::from("../tests/test-data/10_rdh.raw"));
CFG_TEST_INIT_PROCESSING.set(mock_config).unwrap();
let reader = init_reader(CFG_TEST_INIT_PROCESSING.get().unwrap().input_file()).unwrap();
let (sender, receiver): (flume::Sender<StatType>, flume::Receiver<StatType>) =
flume::unbounded();
let stop_flag = Arc::new(AtomicBool::new(false));
init_processing(
CFG_TEST_INIT_PROCESSING.get().unwrap(),
reader,
sender,
stop_flag.clone(),
)
.unwrap();
let mut stats: Vec<StatType> = Vec::new();
while let Ok(stat) = receiver.recv() {
stats.push(stat);
}
let mut is_rdh_version_detected_7 = false;
let mut how_many_rdh_seen = 0;
for stat in stats {
match stat {
StatType::RdhVersion(7) => is_rdh_version_detected_7 = true,
StatType::RDHSeen(val) => how_many_rdh_seen += val,
StatType::Error(e) | StatType::Fatal(e) => {
panic!("Error or Fatal: {}", e)
}
_ => (),
}
}
assert!(is_rdh_version_detected_7);
assert_eq!(how_many_rdh_seen, 10);
assert!(!stop_flag.load(Ordering::SeqCst));
}
static CFG_TEST_SPAWN_ANALYSIS: OnceLock<MockConfig> = OnceLock::new();
#[test]
fn test_spawn_analysis() {
let mock_config = MockConfig::default();
CFG_TEST_SPAWN_ANALYSIS.set(mock_config).unwrap();
let (stat_sender, stat_receiver): (flume::Sender<StatType>, flume::Receiver<StatType>) =
flume::unbounded();
let (data_sender, data_receiver) = crossbeam_channel::unbounded();
let stop_flag = Arc::new(AtomicBool::new(false));
let mut cdp_batch: CdpArray<RdhCru, 1> = CdpArray::new();
cdp_batch.push(CORRECT_RDH_CRU_V7, Vec::new(), 0);
let handle = analyze::lib::spawn_analysis(
CFG_TEST_SPAWN_ANALYSIS.get().unwrap(),
stop_flag.clone(),
stat_sender,
data_receiver,
)
.unwrap();
data_sender.send(cdp_batch).unwrap();
drop(data_sender);
thread::sleep(Duration::from_millis(100));
stop_flag.store(true, Ordering::SeqCst);
let mut stats: Vec<StatType> = Vec::new();
while let Ok(stat) = stat_receiver.recv() {
stats.push(stat);
}
assert_ne!(
stats.len(),
0,
"Expected some stats received, got: {stats:?}"
);
handle.join().unwrap();
}
}