mssf_core/runtime/
executor.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::future::Future;
7
8use tokio::{runtime::Handle, sync::mpsc::channel};
9use tracing::info;
10
11use crate::error::ErrorCode;
12
13// Executor is used by rs to post jobs to execute in the background
14// Sync is needed due to we use the executor across await boundary.
15pub trait Executor: Clone + Sync + Send + 'static {
16    // Required functions
17
18    /// spawns the task to run in background, and returns a join handle
19    /// where the future's result can be awaited.
20    /// If the future panics, the join handle should return an error code.
21    /// This is primarily used by mssf Bridge to execute user app async callbacks/notifications.
22    /// User app impl future may panic, and mssf propagates panic as an error in JoinHandle
23    /// to SF.
24    fn spawn<F>(&self, future: F) -> impl JoinHandle<F::Output>
25    where
26        F: Future + Send + 'static,
27        F::Output: Send;
28
29    /// run the future on the executor until completion.
30    fn block_on<F: Future>(&self, future: F) -> F::Output;
31
32    // provided functions
33
34    /// Run the executor and block the current thread until ctrl-c event is
35    /// Received.
36    fn run_until_ctrl_c(&self) {
37        info!("DefaultExecutor: setting up ctrl-c event.");
38        // set ctrc event
39        let (tx, mut rx) = channel(1);
40        let handler = move || {
41            tx.blocking_send(())
42                .expect("Could not send signal on channel.")
43        };
44        ctrlc::set_handler(handler).expect("Error setting Ctrl-C handler");
45
46        // wait for ctrl-c signal.
47        self.block_on(async move {
48            info!("DefaultExecutor: Waiting for Ctrl-C...");
49            rx.recv().await.expect("Could not receive from channel.");
50            info!("DefaultExecutor: Got Ctrl-C! Exiting...");
51        });
52    }
53}
54
55/// Handle can be awaited to get the success status of the task.
56/// The handle is primarily needed to propagate background task error
57/// back to SF.
58#[trait_variant::make(JoinHandle: Send)]
59pub trait LocalJoinHandle<T> {
60    async fn join(self) -> crate::Result<T>;
61}
62
63#[derive(Clone)]
64pub struct DefaultExecutor {
65    rt: Handle,
66}
67
68/// Default implementation of the JoinHandle using tokio
69pub struct DefaultJoinHandle<T> {
70    inner: tokio::task::JoinHandle<T>,
71}
72
73impl DefaultExecutor {
74    pub fn new(rt: Handle) -> DefaultExecutor {
75        DefaultExecutor { rt }
76    }
77}
78
79impl Executor for DefaultExecutor {
80    fn spawn<F>(&self, future: F) -> impl JoinHandle<F::Output>
81    where
82        F: Future + Send + 'static,
83        F::Output: Send,
84    {
85        let h = self.rt.spawn(future);
86        DefaultJoinHandle::<F::Output> { inner: h }
87    }
88
89    fn block_on<F: Future>(&self, future: F) -> F::Output {
90        self.rt.block_on(future)
91    }
92}
93
94impl<T: Send> JoinHandle<T> for DefaultJoinHandle<T> {
95    async fn join(self) -> crate::Result<T> {
96        match self.inner.await {
97            Ok(x) => Ok(x),
98            Err(e) => {
99                let e = if e.is_cancelled() {
100                    // we never cancel in executor
101                    ErrorCode::E_ABORT
102                } else if e.is_panic() {
103                    ErrorCode::E_UNEXPECTED
104                } else {
105                    ErrorCode::E_FAIL
106                };
107                tracing::error!("DefaultJoinHandle: background task failed: {e}");
108                Err(e.into())
109            }
110        }
111    }
112}
113
114#[cfg(test)]
115mod test {
116    use super::DefaultExecutor;
117
118    #[test]
119    fn test_executor() {
120        let rt = tokio::runtime::Runtime::new().unwrap();
121
122        let _ex = DefaultExecutor::new(rt.handle().clone());
123        // let b_ex: Box<dyn Executor> = Box::new(ex);
124    }
125}