spawned_concurrency/threads/
process.rs

1//! Process trait and struct to create a process abstraction similar to Erlang processes.
2//! See examples/ping_pong for a usage example.
3
4use spawned_rt::threads::{self as rt, mpsc, JoinHandle};
5
6#[derive(Debug)]
7pub struct ProcessInfo<T> {
8    pub tx: mpsc::Sender<T>,
9    pub handle: JoinHandle<()>,
10}
11
12impl<T> ProcessInfo<T> {
13    pub fn sender(&self) -> mpsc::Sender<T> {
14        self.tx.clone()
15    }
16
17    pub fn handle(self) -> JoinHandle<()> {
18        self.handle
19    }
20}
21
22pub trait Process<T: Send + 'static>
23where
24    Self: Send + Sync + Sized + 'static,
25{
26    fn spawn(mut self) -> ProcessInfo<T> {
27        let (tx, mut rx) = mpsc::channel::<T>();
28        let tx_clone = tx.clone();
29        let handle = rt::spawn(move || self.run(&tx_clone, &mut rx));
30        ProcessInfo { tx, handle }
31    }
32
33    fn run(&mut self, tx: &mpsc::Sender<T>, rx: &mut mpsc::Receiver<T>) {
34        self.init(tx);
35        self.main_loop(tx, rx);
36    }
37
38    fn main_loop(&mut self, tx: &mpsc::Sender<T>, rx: &mut mpsc::Receiver<T>) {
39        loop {
40            if self.should_stop() {
41                break;
42            }
43
44            self.receive(tx, rx);
45        }
46    }
47
48    fn should_stop(&self) -> bool {
49        false
50    }
51
52    fn init(&mut self, _tx: &mpsc::Sender<T>) {
53        {}
54    }
55
56    fn receive(&mut self, tx: &mpsc::Sender<T>, rx: &mut mpsc::Receiver<T>) -> T {
57        match rx.recv().ok() {
58            Some(message) => self.handle(message, tx),
59            None => todo!(),
60        }
61    }
62
63    fn handle(&mut self, message: T, tx: &mpsc::Sender<T>) -> T;
64}
65
66pub fn send<T>(tx: &mpsc::Sender<T>, message: T)
67where
68    T: Send,
69{
70    let _ = tx.send(message);
71}