Skip to main content

radicle_tui/
task.rs

1use std::marker::PhantomData;
2use std::{fmt::Debug, future::Future};
3
4use tokio::sync::{broadcast, mpsc::UnboundedSender};
5
6use super::{Interrupted, Share};
7
8pub type EmptyProcessors = Vec<EmptyProcessor>;
9
10/// A task that can be run.
11pub trait Task: Debug + Send + Sync + 'static {
12    type Return;
13
14    fn run(&self) -> anyhow::Result<Vec<Self::Return>>;
15}
16
17/// A processor that can be added to the application environment.
18/// Processors will receive application messages and can produce new ones.
19pub trait Process<M: Share> {
20    fn process(&mut self, _message: M) -> impl Future<Output = anyhow::Result<Vec<M>>> + Send;
21}
22
23/// An empty processor that does nothing.
24#[derive(Debug, Clone)]
25pub struct EmptyProcessor;
26
27impl<M: Share> Process<M> for EmptyProcessor {
28    async fn process(&mut self, _message: M) -> anyhow::Result<Vec<M>> {
29        Ok(vec![])
30    }
31}
32
33/// A worker that is spawned by the application. Invokes
34/// all processors and sends received application messages.
35pub struct Worker<P, M, R> {
36    work_tx: UnboundedSender<M>,
37    _phantom: PhantomData<(P, M, R)>,
38}
39
40impl<P, M, R> Worker<P, M, R>
41where
42    P: Process<M> + Share,
43    M: Share,
44    R: Share,
45{
46    pub fn new(tx: UnboundedSender<M>) -> Self {
47        Self {
48            work_tx: tx,
49            _phantom: PhantomData,
50        }
51    }
52
53    pub async fn run(
54        &self,
55        processors: Vec<P>,
56        mut message_rx: broadcast::Receiver<M>,
57        mut interrupt_rx: broadcast::Receiver<Interrupted<R>>,
58    ) -> anyhow::Result<Interrupted<R>> {
59        let result = loop {
60            tokio::select! {
61                Ok(message) = message_rx.recv() => {
62                    for mut p in processors.clone() {
63                        for m in p.process(message.clone()).await? {
64                            if let Err(err) = self.work_tx.send(m) {
65                                log::error!("Unable to send message: {err}")
66                            }
67                        }
68                    }
69                },
70                // Catch and handle interrupt signal to gracefully shutdown
71                Ok(interrupted) = interrupt_rx.recv() => {
72                    break interrupted;
73                }
74            }
75        };
76
77        Ok(result)
78    }
79}