1use std::marker::PhantomData;
2use std::{fmt::Debug, future::Future};
3
4use tokio::sync::{broadcast, mpsc::UnboundedSender};
5
6use super::{Interrupted, Share};
7
8pub type EmptyProcessors = Vec<EmptyProcessor>;
9
10pub trait Task: Debug + Send + Sync + 'static {
12 type Return;
13
14 fn run(&self) -> anyhow::Result<Vec<Self::Return>>;
15}
16
17pub trait Process<M: Share> {
20 fn process(&mut self, _message: M) -> impl Future<Output = anyhow::Result<Vec<M>>> + Send;
21}
22
23#[derive(Debug, Clone)]
25pub struct EmptyProcessor;
26
27impl<M: Share> Process<M> for EmptyProcessor {
28 async fn process(&mut self, _message: M) -> anyhow::Result<Vec<M>> {
29 Ok(vec![])
30 }
31}
32
33pub struct Worker<P, M, R> {
36 work_tx: UnboundedSender<M>,
37 _phantom: PhantomData<(P, M, R)>,
38}
39
40impl<P, M, R> Worker<P, M, R>
41where
42 P: Process<M> + Share,
43 M: Share,
44 R: Share,
45{
46 pub fn new(tx: UnboundedSender<M>) -> Self {
47 Self {
48 work_tx: tx,
49 _phantom: PhantomData,
50 }
51 }
52
53 pub async fn run(
54 &self,
55 processors: Vec<P>,
56 mut message_rx: broadcast::Receiver<M>,
57 mut interrupt_rx: broadcast::Receiver<Interrupted<R>>,
58 ) -> anyhow::Result<Interrupted<R>> {
59 let result = loop {
60 tokio::select! {
61 Ok(message) = message_rx.recv() => {
62 for mut p in processors.clone() {
63 for m in p.process(message.clone()).await? {
64 if let Err(err) = self.work_tx.send(m) {
65 log::error!("Unable to send message: {err}")
66 }
67 }
68 }
69 },
70 Ok(interrupted) = interrupt_rx.recv() => {
72 break interrupted;
73 }
74 }
75 };
76
77 Ok(result)
78 }
79}