bronzeflow_core/runtime/
tokio_runtime.rs

1// This is a part of bronze.
2
3//! TokioRuntime, wrap tokio runtime
4//!
5//! TokioRuntime design to use tokio in a synchronous environment
6
7use crate::runtime::{BronzeRuntime, Runnable, RuntimeJoinHandle};
8use bronzeflow_utils::{debug, info, BronzeError};
9use std::sync::Arc;
10use std::thread::Builder as StdThreadBuilder;
11use tokio::runtime::{Builder, Runtime as TokioRawRuntime};
12use tokio::sync::mpsc;
13
14type MessageSender = mpsc::Sender<Message>;
15type MessageReceiver = mpsc::Receiver<Message>;
16
17#[derive(Debug)]
18enum Message {
19    TaskEnd(RuntimeJoinHandle<()>),
20}
21
22pub struct TokioRuntime {
23    pub(crate) runtime: Arc<TokioRawRuntime>,
24    message_tx: MessageSender,
25}
26
27impl Default for TokioRuntime {
28    fn default() -> Self {
29        TokioRuntime::new()
30    }
31}
32
33impl TokioRuntime {
34    pub fn new() -> Self {
35        let runtime = Builder::new_multi_thread()
36            .worker_threads(4)
37            .enable_all()
38            .build()
39            .unwrap();
40
41        let (tx, rx) = mpsc::channel(100);
42        let rt = TokioRuntime {
43            runtime: Arc::new(runtime),
44            message_tx: tx,
45        };
46        let event_handle = EventHandle::new(rx);
47        rt.run_event_handle(event_handle);
48        rt
49    }
50    fn run_event_handle(&self, mut event_handle: EventHandle) {
51        let rt = self.runtime.clone();
52        StdThreadBuilder::new()
53            .name("event_handle".into())
54            .spawn(move || {
55                rt.block_on(async {
56                    info!("Start event loop handle");
57                    event_handle.run_loop().await;
58                })
59            })
60            .expect("event_handle can't start.");
61    }
62}
63
64impl BronzeRuntime for TokioRuntime {
65    fn run(&self, _: impl Runnable, _: bool) {
66        panic!("Not supported in `TokioRuntime`, please use `run_safe`")
67    }
68
69    #[inline(always)]
70    fn run_safe<F>(&self, runnable: F, report_msg: bool)
71    where
72        F: Runnable + Send + Sync + 'static,
73    {
74        let handle = self.runtime.spawn({
75            async move {
76                runnable.run_async();
77            }
78        });
79        if report_msg {
80            self.message_tx
81                .blocking_send(Message::TaskEnd(RuntimeJoinHandle::AsyncTokioJoinHandle(
82                    handle,
83                )))
84                .unwrap();
85        }
86    }
87}
88
89struct EventHandle {
90    message_receiver: MessageReceiver,
91}
92
93impl EventHandle {
94    pub fn new(message_receiver: MessageReceiver) -> Self {
95        EventHandle { message_receiver }
96    }
97
98    pub async fn run_loop(&mut self) {
99        while let Some(event) = self.message_receiver.recv().await {
100            // TODO add message dispatcher
101            match event {
102                Message::TaskEnd(RuntimeJoinHandle::AsyncTokioJoinHandle(jh)) => {
103                    debug!("Receive tokio join handle message");
104                    jh.await.map_err(BronzeError::msg).ok();
105                },
106                Message::TaskEnd(RuntimeJoinHandle::SyncJobHandle) => {},
107                _ => {},
108            }
109        }
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116
117    use crate::runtime::tokio_runtime::{EventHandle, TokioRuntime};
118    use crate::runtime::{AsyncFn, BronzeRuntime};
119
120    #[tokio::test]
121    async fn test_event_loop() {
122        let (tx, rx) = mpsc::channel(100);
123        let mut event_loop = EventHandle::new(rx);
124        StdThreadBuilder::new()
125            .name("event_handle".into())
126            .spawn(move || {
127                futures::executor::block_on({
128                    async move {
129                        event_loop.run_loop().await;
130                    }
131                });
132            })
133            .expect("event loop can't start.");
134        let f = tokio::spawn(async {});
135        futures::executor::block_on(async {});
136
137        tx.send(Message::TaskEnd(RuntimeJoinHandle::AsyncTokioJoinHandle(f)))
138            .await
139            .expect("Send message error");
140        tx.send(Message::TaskEnd(RuntimeJoinHandle::FutureBlockJoinHandle(
141            (),
142        )))
143        .await
144        .expect("Send message error");
145        tx.send(Message::TaskEnd(RuntimeJoinHandle::SyncJobHandle))
146            .await
147            .expect("Send message error");
148        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
149    }
150
151    #[test]
152    fn tokio_runtime_in_synchronous_env() {
153        let rt = TokioRuntime::new();
154        let f = || async { info!("I am async function in synchronous environment") };
155        rt.run_safe(AsyncFn::from(f), false);
156        std::thread::sleep(std::time::Duration::from_millis(100));
157    }
158
159    #[tokio::test]
160    async fn tokio_runtime_in_asynchronous_env() {
161        let rt = TokioRuntime::new();
162        let f = || async { info!("I am async function asynchronous environment") };
163        rt.run_safe(AsyncFn::from(f), false);
164        tokio::spawn(async { info!("I am common async function") })
165            .await
166            .expect("run common async function failed");
167        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
168    }
169}