crusty_core/
parser_processor.rs

1use crate::{_prelude::*, config, types::*};
2
3#[derive(Clone)]
4pub struct ParserProcessor {
5	profile: config::ParserProfile,
6	rx:      Receiver<ParserTask>,
7}
8
9impl ParserProcessor {
10	pub fn spawn(
11		concurrency_profile: config::ConcurrencyProfile,
12		parser_profile: config::ParserProfile,
13	) -> Arc<Sender<ParserTask>> {
14		let (tx, rx) = bounded_ch::<ParserTask>(concurrency_profile.transit_buffer_size());
15
16		let s = Self { profile: parser_profile, rx };
17		let _ = tokio::spawn(s.go());
18		Arc::new(tx)
19	}
20
21	fn process(&self, n: usize) -> TaskFut {
22		TracingTask::new(span!(n = n), async move {
23			while let Ok(task) = self.rx.recv() {
24				if task.res_tx.is_disconnected() {
25					continue
26				}
27
28				let wait_time = task.time.elapsed();
29				let t = Instant::now();
30				let res = (task.payload)();
31				let work_time = t.elapsed();
32
33				let _ = task.res_tx.send(ParserResponse {
34					payload:       res,
35					wait_duration: wait_time,
36					work_duration: work_time,
37				});
38			}
39
40			Ok(())
41		})
42		.instrument()
43	}
44
45	pub async fn go(self) -> Result<()> {
46		let mut core_ids = core_affinity::get_core_ids().unwrap().into_iter();
47
48		let mut pin = self.profile.pin;
49		let handles: Vec<Result<std::thread::JoinHandle<()>>> = (0..self.profile.concurrency)
50			.into_iter()
51			.map(|n| {
52				let p = self.clone();
53				let mut thread_builder = std::thread::Builder::new().name(format!("parser processor {}", n));
54				if let Some(stack_size) = &self.profile.stack_size {
55					thread_builder = thread_builder.stack_size(stack_size.0);
56				}
57
58				let id = if pin > 0 {
59					pin -= 1;
60					core_ids.next()
61				} else {
62					None
63				};
64				let h = thread_builder
65					.spawn(move || {
66						if let Some(id) = id {
67							core_affinity::set_for_current(id);
68						}
69						let _ = futures_lite::future::block_on(p.process(n));
70					})
71					.context("cannot spawn parser processor thread")?;
72				Ok::<_, Error>(h)
73			})
74			.collect();
75		for h in handles {
76			let _ = h?.join();
77		}
78		Ok(())
79	}
80}