use super::validators::validator_dispatcher::ValidatorDispatcher;
use crate::util::*;
pub fn spawn_analysis<T: RDH + 'static, const CAP: usize>(
config: &'static impl Config,
stop_flag: Arc<AtomicBool>,
stats_send: flume::Sender<StatType>,
data_recv: crossbeam_channel::Receiver<CdpArray<T, CAP>>,
) -> Result<JoinHandle<()>, io::Error> {
let analysis_thread = thread::Builder::new().name("Analysis".to_string());
let mut system_id: Option<SystemId> = None; analysis_thread.spawn({
move || {
let mut validator_dispatcher = ValidatorDispatcher::new(config, stats_send.clone());
while !stop_flag.load(Ordering::SeqCst) {
let cdp_batch = match data_recv.recv() {
Ok(cdp) => cdp,
Err(e) => {
debug_assert_eq!(e, crossbeam_channel::RecvError);
break;
}
};
let mut hbfs_seen: u32 = 0;
for rdh in cdp_batch.rdh_slice().iter() {
hbfs_seen += (rdh.stop_bit() == 1) as u32;
stats_send
.send(StatType::TriggerType(rdh.trigger_type()))
.unwrap();
if let Err(e) =
stats::collect_system_specific_stats(rdh, &mut system_id, &stats_send)
{
stats_send.send(StatType::Fatal(e.into())).unwrap();
break; }
}
stats_send.send(StatType::HBFsSeen(hbfs_seen)).unwrap();
if config.check().is_some() {
validator_dispatcher.dispatch_cdp_batch(cdp_batch);
} else if let Some(view) = config.view() {
if let Err(e) = view::lib::generate_view(view, &cdp_batch) {
stats_send
.send(StatType::Fatal(e.to_string().into()))
.expect("Couldn't send to Controller");
}
}
}
validator_dispatcher.join();
}
})
}