mssf_core/runtime/
executor.rs1use std::future::Future;
7
8use tokio::{runtime::Handle, sync::mpsc::channel};
9use tracing::info;
10
11use crate::error::ErrorCode;
12
13pub trait Executor: Clone + Sync + Send + 'static {
16 fn spawn<F>(&self, future: F) -> impl JoinHandle<F::Output>
25 where
26 F: Future + Send + 'static,
27 F::Output: Send;
28
29 fn block_on<F: Future>(&self, future: F) -> F::Output;
31
32 fn run_until_ctrl_c(&self) {
37 info!("DefaultExecutor: setting up ctrl-c event.");
38 let (tx, mut rx) = channel(1);
40 let handler = move || {
41 tx.blocking_send(())
42 .expect("Could not send signal on channel.")
43 };
44 ctrlc::set_handler(handler).expect("Error setting Ctrl-C handler");
45
46 self.block_on(async move {
48 info!("DefaultExecutor: Waiting for Ctrl-C...");
49 rx.recv().await.expect("Could not receive from channel.");
50 info!("DefaultExecutor: Got Ctrl-C! Exiting...");
51 });
52 }
53}
54
55#[trait_variant::make(JoinHandle: Send)]
59pub trait LocalJoinHandle<T> {
60 async fn join(self) -> crate::Result<T>;
61}
62
63#[derive(Clone)]
64pub struct DefaultExecutor {
65 rt: Handle,
66}
67
68pub struct DefaultJoinHandle<T> {
70 inner: tokio::task::JoinHandle<T>,
71}
72
73impl DefaultExecutor {
74 pub fn new(rt: Handle) -> DefaultExecutor {
75 DefaultExecutor { rt }
76 }
77}
78
79impl Executor for DefaultExecutor {
80 fn spawn<F>(&self, future: F) -> impl JoinHandle<F::Output>
81 where
82 F: Future + Send + 'static,
83 F::Output: Send,
84 {
85 let h = self.rt.spawn(future);
86 DefaultJoinHandle::<F::Output> { inner: h }
87 }
88
89 fn block_on<F: Future>(&self, future: F) -> F::Output {
90 self.rt.block_on(future)
91 }
92}
93
94impl<T: Send> JoinHandle<T> for DefaultJoinHandle<T> {
95 async fn join(self) -> crate::Result<T> {
96 match self.inner.await {
97 Ok(x) => Ok(x),
98 Err(e) => {
99 let e = if e.is_cancelled() {
100 ErrorCode::E_ABORT
102 } else if e.is_panic() {
103 ErrorCode::E_UNEXPECTED
104 } else {
105 ErrorCode::E_FAIL
106 };
107 tracing::error!("DefaultJoinHandle: background task failed: {e}");
108 Err(e.into())
109 }
110 }
111 }
112}
113
114#[cfg(test)]
115mod test {
116 use super::DefaultExecutor;
117
118 #[test]
119 fn test_executor() {
120 let rt = tokio::runtime::Runtime::new().unwrap();
121
122 let _ex = DefaultExecutor::new(rt.handle().clone());
123 }
125}