spawned_concurrency/tasks/
process.rs1use spawned_rt::tasks::{self as rt, mpsc, JoinHandle};
5use std::future::Future;
6
7#[derive(Debug)]
8pub struct ProcessInfo<T> {
9 pub tx: mpsc::Sender<T>,
10 pub handle: JoinHandle<()>,
11}
12
13impl<T> ProcessInfo<T> {
14 pub fn sender(&self) -> mpsc::Sender<T> {
15 self.tx.clone()
16 }
17
18 pub fn handle(self) -> JoinHandle<()> {
19 self.handle
20 }
21}
22
23pub trait Process<T: Send + 'static>
24where
25 Self: Send + Sync + Sized + 'static,
26{
27 fn spawn(mut self) -> impl Future<Output = ProcessInfo<T>> + Send {
28 async {
29 let (tx, mut rx) = mpsc::channel::<T>();
30 let tx_clone = tx.clone();
31 let handle = rt::spawn(async move {
32 self.run(&tx_clone, &mut rx).await;
33 });
34 ProcessInfo { tx, handle }
35 }
36 }
37
38 fn run(
39 &mut self,
40 tx: &mpsc::Sender<T>,
41 rx: &mut mpsc::Receiver<T>,
42 ) -> impl Future<Output = ()> + Send {
43 async {
44 self.init(tx).await;
45 self.main_loop(tx, rx).await;
46 }
47 }
48
49 fn main_loop(
50 &mut self,
51 tx: &mpsc::Sender<T>,
52 rx: &mut mpsc::Receiver<T>,
53 ) -> impl Future<Output = ()> + Send {
54 async {
55 loop {
56 if self.should_stop() {
57 break;
58 }
59
60 self.receive(tx, rx).await;
61 }
62 }
63 }
64
65 fn should_stop(&self) -> bool {
66 false
67 }
68
69 fn init(&mut self, _tx: &mpsc::Sender<T>) -> impl Future<Output = ()> + Send {
70 async {}
71 }
72
73 fn receive(
74 &mut self,
75 tx: &mpsc::Sender<T>,
76 rx: &mut mpsc::Receiver<T>,
77 ) -> impl std::future::Future<Output = T> + Send {
78 async {
79 match rx.recv().await {
80 Some(message) => self.handle(message, tx).await,
81 None => todo!(),
82 }
83 }
84 }
85
86 fn handle(&mut self, message: T, tx: &mpsc::Sender<T>) -> impl Future<Output = T> + Send;
87}
88
89pub fn send<T>(tx: &mpsc::Sender<T>, message: T)
90where
91 T: Send,
92{
93 let _ = tx.send(message);
94}