mssf_core/runtime/
executor.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::future::Future;
7
8use tokio::runtime::Handle;
9
10use crate::error::ErrorCode;
11
12// Executor is used by rs to post jobs to execute in the background
13// Sync is needed due to we use the executor across await boundary.
14pub trait Executor: Clone + Sync + Send + 'static {
15    // Required functions
16
17    /// spawns the task to run in background, and returns a join handle
18    /// where the future's result can be awaited.
19    /// If the future panics, the join handle should return an error code.
20    /// This is primarily used by mssf Bridge to execute user app async callbacks/notifications.
21    /// User app impl future may panic, and mssf propagates panic as an error in JoinHandle
22    /// to SF.
23    fn spawn<F>(&self, future: F) -> impl JoinHandle<F::Output>
24    where
25        F: Future + Send + 'static,
26        F::Output: Send;
27
28    /// run the future on the executor until completion.
29    fn block_on<F: Future>(&self, future: F) -> F::Output;
30}
31
32/// Handle can be awaited to get the success status of the task.
33/// The handle is primarily needed to propagate background task error
34/// back to SF.
35#[trait_variant::make(JoinHandle: Send)]
36pub trait LocalJoinHandle<T> {
37    async fn join(self) -> crate::Result<T>;
38}
39
40#[derive(Clone)]
41pub struct DefaultExecutor {
42    rt: Handle,
43}
44
45/// Default implementation of the JoinHandle using tokio
46pub struct DefaultJoinHandle<T> {
47    inner: tokio::task::JoinHandle<T>,
48}
49
50impl DefaultExecutor {
51    pub fn new(rt: Handle) -> DefaultExecutor {
52        DefaultExecutor { rt }
53    }
54}
55
56impl Executor for DefaultExecutor {
57    fn spawn<F>(&self, future: F) -> impl JoinHandle<F::Output>
58    where
59        F: Future + Send + 'static,
60        F::Output: Send,
61    {
62        let h = self.rt.spawn(future);
63        DefaultJoinHandle::<F::Output> { inner: h }
64    }
65
66    fn block_on<F: Future>(&self, future: F) -> F::Output {
67        self.rt.block_on(future)
68    }
69}
70
71impl<T: Send> JoinHandle<T> for DefaultJoinHandle<T> {
72    async fn join(self) -> crate::Result<T> {
73        match self.inner.await {
74            Ok(x) => Ok(x),
75            Err(e) => {
76                let e = if e.is_cancelled() {
77                    // we never cancel in executor
78                    ErrorCode::E_ABORT
79                } else if e.is_panic() {
80                    ErrorCode::E_UNEXPECTED
81                } else {
82                    ErrorCode::E_FAIL
83                };
84                tracing::error!("DefaultJoinHandle: background task failed: {e}");
85                Err(e.into())
86            }
87        }
88    }
89}
90
91#[cfg(test)]
92mod test {
93    use super::DefaultExecutor;
94
95    #[test]
96    fn test_executor() {
97        let rt = tokio::runtime::Runtime::new().unwrap();
98
99        let _ex = DefaultExecutor::new(rt.handle().clone());
100        // let b_ex: Box<dyn Executor> = Box::new(ex);
101    }
102}