crusty_core/
parser_processor.rs1use crate::{_prelude::*, config, types::*};
2
3#[derive(Clone)]
4pub struct ParserProcessor {
5 profile: config::ParserProfile,
6 rx: Receiver<ParserTask>,
7}
8
9impl ParserProcessor {
10 pub fn spawn(
11 concurrency_profile: config::ConcurrencyProfile,
12 parser_profile: config::ParserProfile,
13 ) -> Arc<Sender<ParserTask>> {
14 let (tx, rx) = bounded_ch::<ParserTask>(concurrency_profile.transit_buffer_size());
15
16 let s = Self { profile: parser_profile, rx };
17 let _ = tokio::spawn(s.go());
18 Arc::new(tx)
19 }
20
21 fn process(&self, n: usize) -> TaskFut {
22 TracingTask::new(span!(n = n), async move {
23 while let Ok(task) = self.rx.recv() {
24 if task.res_tx.is_disconnected() {
25 continue
26 }
27
28 let wait_time = task.time.elapsed();
29 let t = Instant::now();
30 let res = (task.payload)();
31 let work_time = t.elapsed();
32
33 let _ = task.res_tx.send(ParserResponse {
34 payload: res,
35 wait_duration: wait_time,
36 work_duration: work_time,
37 });
38 }
39
40 Ok(())
41 })
42 .instrument()
43 }
44
45 pub async fn go(self) -> Result<()> {
46 let mut core_ids = core_affinity::get_core_ids().unwrap().into_iter();
47
48 let mut pin = self.profile.pin;
49 let handles: Vec<Result<std::thread::JoinHandle<()>>> = (0..self.profile.concurrency)
50 .into_iter()
51 .map(|n| {
52 let p = self.clone();
53 let mut thread_builder = std::thread::Builder::new().name(format!("parser processor {}", n));
54 if let Some(stack_size) = &self.profile.stack_size {
55 thread_builder = thread_builder.stack_size(stack_size.0);
56 }
57
58 let id = if pin > 0 {
59 pin -= 1;
60 core_ids.next()
61 } else {
62 None
63 };
64 let h = thread_builder
65 .spawn(move || {
66 if let Some(id) = id {
67 core_affinity::set_for_current(id);
68 }
69 let _ = futures_lite::future::block_on(p.process(n));
70 })
71 .context("cannot spawn parser processor thread")?;
72 Ok::<_, Error>(h)
73 })
74 .collect();
75 for h in handles {
76 let _ = h?.join();
77 }
78 Ok(())
79 }
80}