1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
#[allow(unused_imports)] use crate::internal_prelude::*; use crate::{config, types::*}; #[derive(Clone)] pub struct ParserProcessor { concurrency: usize, stack_size_bytes: usize, rx: Receiver<ParserTask>, } pub struct Handle { pub(crate) tx: Sender<ParserTask>, h: tokio::task::JoinHandle<Result<()>>, } impl Handle { pub fn join(self) -> tokio::task::JoinHandle<Result<()>> { drop(self.tx); self.h } } impl ParserProcessor { pub fn spawn(concurrency_profile: config::ConcurrencyProfile, stack_size_bytes: usize) -> Handle { let (tx, rx) = bounded_ch::<ParserTask>(concurrency_profile.transit_buffer_size()); let s = Self { concurrency: concurrency_profile.parser_concurrency, stack_size_bytes, rx }; let h = tokio::spawn(s.go()); Handle { tx, h } } fn process(&self, n: usize) -> PinnedTask { TracingTask::new(span!(n = n), async move { while let Ok(task) = self.rx.recv() { if task.res_tx.is_disconnected() { continue } let wait_time = task.time.elapsed(); let t = Instant::now(); let res = (task.payload)(); let work_time = t.elapsed(); let _ = task.res_tx.send(ParserResponse { payload: res, wait_duration: wait_time, work_duration: work_time, }); } Ok(()) }) .instrument() } pub async fn go(self) -> Result<()> { let handles: Vec<Result<std::thread::JoinHandle<()>>> = (0..self.concurrency) .into_iter() .map(|n| { let p = self.clone(); let thread_builder = std::thread::Builder::new() .name(format!("parser processor {}", n)) .stack_size(self.stack_size_bytes); let h = thread_builder .spawn(move || { let _ = futures_lite::future::block_on(p.process(n)); }) .context("cannot spawn parser processor thread")?; Ok::<_, Error>(h) }) .collect(); for h in handles { let _ = h?.join(); } Ok(()) } }