use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use alice_protocol_reader::cdp_wrapper::cdp_array::CdpArray;
use crossbeam_channel::Receiver;
use super::writer::BufferedWriter;
use super::writer::Writer;
use crate::config::inputoutput::InputOutputOpt;
use alice_protocol_reader::prelude::RDH;
const BUFFER_SIZE: usize = 1024 * 1024; pub fn spawn_writer<T: RDH + 'static, const CAP: usize>(
config: &'static impl InputOutputOpt,
stop_flag: Arc<AtomicBool>,
data_recv: Receiver<CdpArray<T, CAP>>,
) -> thread::JoinHandle<()> {
let writer_thread = thread::Builder::new().name("Writer".to_string());
writer_thread
.spawn({
let mut writer = BufferedWriter::<T>::new(config, BUFFER_SIZE);
move || loop {
let cdps = match data_recv.recv() {
Ok(cdps) => cdps,
Err(e) => {
debug_assert_eq!(e, crossbeam_channel::RecvError);
break;
}
};
if stop_flag.load(Ordering::SeqCst) {
log::trace!("Stopping writer thread");
break;
}
writer.push_cdp_arr(cdps);
}
})
.expect("Failed to spawn writer thread")
}