bronzeflow_core/runtime/
tokio_runtime.rs1use crate::runtime::{BronzeRuntime, Runnable, RuntimeJoinHandle};
8use bronzeflow_utils::{debug, info, BronzeError};
9use std::sync::Arc;
10use std::thread::Builder as StdThreadBuilder;
11use tokio::runtime::{Builder, Runtime as TokioRawRuntime};
12use tokio::sync::mpsc;
13
14type MessageSender = mpsc::Sender<Message>;
15type MessageReceiver = mpsc::Receiver<Message>;
16
17#[derive(Debug)]
18enum Message {
19 TaskEnd(RuntimeJoinHandle<()>),
20}
21
22pub struct TokioRuntime {
23 pub(crate) runtime: Arc<TokioRawRuntime>,
24 message_tx: MessageSender,
25}
26
27impl Default for TokioRuntime {
28 fn default() -> Self {
29 TokioRuntime::new()
30 }
31}
32
33impl TokioRuntime {
34 pub fn new() -> Self {
35 let runtime = Builder::new_multi_thread()
36 .worker_threads(4)
37 .enable_all()
38 .build()
39 .unwrap();
40
41 let (tx, rx) = mpsc::channel(100);
42 let rt = TokioRuntime {
43 runtime: Arc::new(runtime),
44 message_tx: tx,
45 };
46 let event_handle = EventHandle::new(rx);
47 rt.run_event_handle(event_handle);
48 rt
49 }
50 fn run_event_handle(&self, mut event_handle: EventHandle) {
51 let rt = self.runtime.clone();
52 StdThreadBuilder::new()
53 .name("event_handle".into())
54 .spawn(move || {
55 rt.block_on(async {
56 info!("Start event loop handle");
57 event_handle.run_loop().await;
58 })
59 })
60 .expect("event_handle can't start.");
61 }
62}
63
64impl BronzeRuntime for TokioRuntime {
65 fn run(&self, _: impl Runnable, _: bool) {
66 panic!("Not supported in `TokioRuntime`, please use `run_safe`")
67 }
68
69 #[inline(always)]
70 fn run_safe<F>(&self, runnable: F, report_msg: bool)
71 where
72 F: Runnable + Send + Sync + 'static,
73 {
74 let handle = self.runtime.spawn({
75 async move {
76 runnable.run_async();
77 }
78 });
79 if report_msg {
80 self.message_tx
81 .blocking_send(Message::TaskEnd(RuntimeJoinHandle::AsyncTokioJoinHandle(
82 handle,
83 )))
84 .unwrap();
85 }
86 }
87}
88
89struct EventHandle {
90 message_receiver: MessageReceiver,
91}
92
93impl EventHandle {
94 pub fn new(message_receiver: MessageReceiver) -> Self {
95 EventHandle { message_receiver }
96 }
97
98 pub async fn run_loop(&mut self) {
99 while let Some(event) = self.message_receiver.recv().await {
100 match event {
102 Message::TaskEnd(RuntimeJoinHandle::AsyncTokioJoinHandle(jh)) => {
103 debug!("Receive tokio join handle message");
104 jh.await.map_err(BronzeError::msg).ok();
105 },
106 Message::TaskEnd(RuntimeJoinHandle::SyncJobHandle) => {},
107 _ => {},
108 }
109 }
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116
117 use crate::runtime::tokio_runtime::{EventHandle, TokioRuntime};
118 use crate::runtime::{AsyncFn, BronzeRuntime};
119
120 #[tokio::test]
121 async fn test_event_loop() {
122 let (tx, rx) = mpsc::channel(100);
123 let mut event_loop = EventHandle::new(rx);
124 StdThreadBuilder::new()
125 .name("event_handle".into())
126 .spawn(move || {
127 futures::executor::block_on({
128 async move {
129 event_loop.run_loop().await;
130 }
131 });
132 })
133 .expect("event loop can't start.");
134 let f = tokio::spawn(async {});
135 futures::executor::block_on(async {});
136
137 tx.send(Message::TaskEnd(RuntimeJoinHandle::AsyncTokioJoinHandle(f)))
138 .await
139 .expect("Send message error");
140 tx.send(Message::TaskEnd(RuntimeJoinHandle::FutureBlockJoinHandle(
141 (),
142 )))
143 .await
144 .expect("Send message error");
145 tx.send(Message::TaskEnd(RuntimeJoinHandle::SyncJobHandle))
146 .await
147 .expect("Send message error");
148 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
149 }
150
151 #[test]
152 fn tokio_runtime_in_synchronous_env() {
153 let rt = TokioRuntime::new();
154 let f = || async { info!("I am async function in synchronous environment") };
155 rt.run_safe(AsyncFn::from(f), false);
156 std::thread::sleep(std::time::Duration::from_millis(100));
157 }
158
159 #[tokio::test]
160 async fn tokio_runtime_in_asynchronous_env() {
161 let rt = TokioRuntime::new();
162 let f = || async { info!("I am async function asynchronous environment") };
163 rt.run_safe(AsyncFn::from(f), false);
164 tokio::spawn(async { info!("I am common async function") })
165 .await
166 .expect("run common async function failed");
167 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
168 }
169}