spawned_concurrency/tasks/
process.rs

1//! Process trait and struct to create a process abstraction similar to Erlang processes.
2//! See examples/ping_pong for a usage example.
3
4use spawned_rt::tasks::{self as rt, mpsc, JoinHandle};
5use std::future::Future;
6
7#[derive(Debug)]
8pub struct ProcessInfo<T> {
9    pub tx: mpsc::Sender<T>,
10    pub handle: JoinHandle<()>,
11}
12
13impl<T> ProcessInfo<T> {
14    pub fn sender(&self) -> mpsc::Sender<T> {
15        self.tx.clone()
16    }
17
18    pub fn handle(self) -> JoinHandle<()> {
19        self.handle
20    }
21}
22
23pub trait Process<T: Send + 'static>
24where
25    Self: Send + Sync + Sized + 'static,
26{
27    fn spawn(mut self) -> impl Future<Output = ProcessInfo<T>> + Send {
28        async {
29            let (tx, mut rx) = mpsc::channel::<T>();
30            let tx_clone = tx.clone();
31            let handle = rt::spawn(async move {
32                self.run(&tx_clone, &mut rx).await;
33            });
34            ProcessInfo { tx, handle }
35        }
36    }
37
38    fn run(
39        &mut self,
40        tx: &mpsc::Sender<T>,
41        rx: &mut mpsc::Receiver<T>,
42    ) -> impl Future<Output = ()> + Send {
43        async {
44            self.init(tx).await;
45            self.main_loop(tx, rx).await;
46        }
47    }
48
49    fn main_loop(
50        &mut self,
51        tx: &mpsc::Sender<T>,
52        rx: &mut mpsc::Receiver<T>,
53    ) -> impl Future<Output = ()> + Send {
54        async {
55            loop {
56                if self.should_stop() {
57                    break;
58                }
59
60                self.receive(tx, rx).await;
61            }
62        }
63    }
64
65    fn should_stop(&self) -> bool {
66        false
67    }
68
69    fn init(&mut self, _tx: &mpsc::Sender<T>) -> impl Future<Output = ()> + Send {
70        async {}
71    }
72
73    fn receive(
74        &mut self,
75        tx: &mpsc::Sender<T>,
76        rx: &mut mpsc::Receiver<T>,
77    ) -> impl std::future::Future<Output = T> + Send {
78        async {
79            match rx.recv().await {
80                Some(message) => self.handle(message, tx).await,
81                None => todo!(),
82            }
83        }
84    }
85
86    fn handle(&mut self, message: T, tx: &mpsc::Sender<T>) -> impl Future<Output = T> + Send;
87}
88
89pub fn send<T>(tx: &mpsc::Sender<T>, message: T)
90where
91    T: Send,
92{
93    let _ = tx.send(message);
94}