flutter_rust_bridge/handler/implementation/
executor.rs

1use crate::codec::BaseCodec;
2use crate::codec::Rust2DartMessageTrait;
3use crate::generalized_isolate::Channel;
4use crate::handler::error::Error;
5use crate::handler::error_listener::ErrorListener;
6use crate::handler::executor::Executor;
7use crate::handler::handler::{TaskContext, TaskInfo, TaskRetFutTrait};
8use crate::handler::implementation::error_listener::handle_non_sync_panic_error;
9use crate::misc::panic_backtrace::{CatchUnwindWithBacktrace, PanicBacktrace};
10use crate::platform_types::MessagePort;
11use crate::rust2dart::sender::Rust2DartSender;
12use crate::rust_async::BaseAsyncRuntime;
13use crate::thread_pool::BaseThreadPool;
14use crate::transfer;
15#[cfg(feature = "rust-async")]
16use futures::FutureExt;
17use std::future::Future;
18use std::panic::AssertUnwindSafe;
19
20/// The default executor used.
21/// It creates an internal thread pool, and each call to a Rust function is
22/// handled by a different thread.
23pub struct SimpleExecutor<EL: ErrorListener, TP: BaseThreadPool, AR: BaseAsyncRuntime> {
24    error_listener: EL,
25    thread_pool: TP,
26    async_runtime: AR,
27}
28
29impl<EL: ErrorListener, TP: BaseThreadPool, AR: BaseAsyncRuntime> SimpleExecutor<EL, TP, AR> {
30    /// Create a new executor backed by a thread pool.
31    pub fn new(error_listener: EL, thread_pool: TP, async_runtime: AR) -> Self {
32        SimpleExecutor {
33            error_listener,
34            thread_pool,
35            async_runtime,
36        }
37    }
38
39    pub fn thread_pool(&self) -> &TP {
40        &self.thread_pool
41    }
42
43    pub fn async_runtime(&self) -> &AR {
44        &self.async_runtime
45    }
46}
47
48impl<EL: ErrorListener + Sync, TP: BaseThreadPool, AR: BaseAsyncRuntime> Executor
49    for SimpleExecutor<EL, TP, AR>
50{
51    #[cfg(feature = "thread-pool")]
52    fn execute_normal<Rust2DartCodec, TaskFn>(&self, task_info: TaskInfo, task: TaskFn)
53    where
54        TaskFn: FnOnce(TaskContext) -> Result<Rust2DartCodec::Message, Rust2DartCodec::Message>
55            + Send
56            + 'static,
57        Rust2DartCodec: BaseCodec,
58    {
59        let el = self.error_listener;
60        let el2 = self.error_listener;
61
62        let TaskInfo { port, .. } = task_info;
63        let port: MessagePort = port.unwrap();
64
65        self.thread_pool
66            .execute(transfer!(|port: crate::platform_types::MessagePort| {
67                #[allow(clippy::clone_on_copy)]
68                let port2 = port.clone();
69                let thread_result = PanicBacktrace::catch_unwind(AssertUnwindSafe(|| {
70                    #[allow(clippy::clone_on_copy)]
71                    let sender = Rust2DartSender::new(Channel::new(port2.clone()));
72                    let task_context = TaskContext::new();
73
74                    let ret = task(task_context);
75
76                    ExecuteNormalOrAsyncUtils::handle_result::<Rust2DartCodec, _>(ret, sender, el2);
77                }));
78
79                if let Err(error) = thread_result {
80                    handle_non_sync_panic_error::<Rust2DartCodec>(el, port, error);
81                }
82            }));
83    }
84
85    fn execute_sync<Rust2DartCodec, SyncTaskFn>(
86        &self,
87        _task_info: TaskInfo,
88        sync_task: SyncTaskFn,
89    ) -> Rust2DartCodec::Message
90    where
91        SyncTaskFn: FnOnce() -> Result<Rust2DartCodec::Message, Rust2DartCodec::Message>,
92        Rust2DartCodec: BaseCodec,
93    {
94        match sync_task() {
95            Ok(data) => data,
96            Err(err) => {
97                self.error_listener.on_error(Error::CustomError);
98                err
99            }
100        }
101    }
102
103    #[cfg(feature = "rust-async")]
104    fn execute_async<Rust2DartCodec, TaskFn, TaskRetFut>(&self, task_info: TaskInfo, task: TaskFn)
105    where
106        TaskFn: FnOnce(TaskContext) -> TaskRetFut + Send + 'static,
107        TaskRetFut: Future<Output = Result<Rust2DartCodec::Message, Rust2DartCodec::Message>>
108            + TaskRetFutTrait,
109        Rust2DartCodec: BaseCodec,
110    {
111        let el = self.error_listener;
112        let el2 = self.error_listener;
113
114        self.async_runtime.spawn(async move {
115            let TaskInfo { port, .. } = task_info;
116            let port = port.unwrap();
117            #[allow(clippy::clone_on_copy)]
118            let port2 = port.clone();
119
120            let async_result = AssertUnwindSafe(async {
121                #[allow(clippy::clone_on_copy)]
122                let sender = Rust2DartSender::new(Channel::new(port2.clone()));
123                let task_context = TaskContext::new();
124
125                let ret = task(task_context).await;
126
127                ExecuteNormalOrAsyncUtils::handle_result::<Rust2DartCodec, _>(ret, sender, el2);
128            })
129            .catch_unwind()
130            .await;
131
132            if let Err(err) = async_result {
133                let err = CatchUnwindWithBacktrace::new(err, PanicBacktrace::take_last());
134                handle_non_sync_panic_error::<Rust2DartCodec>(el, port, err);
135            }
136        });
137    }
138}
139
140struct ExecuteNormalOrAsyncUtils;
141
142impl ExecuteNormalOrAsyncUtils {
143    fn handle_result<Rust2DartCodec, EL>(
144        ret: Result<Rust2DartCodec::Message, Rust2DartCodec::Message>,
145        sender: Rust2DartSender,
146        el: EL,
147    ) where
148        EL: ErrorListener + Sync,
149        Rust2DartCodec: BaseCodec,
150    {
151        match ret {
152            Ok(result) => {
153                sender.send_or_warn(result.into_dart_abi());
154            }
155            Err(error) => {
156                el.on_error(Error::CustomError);
157                sender.send_or_warn(error.into_dart_abi());
158            }
159        };
160    }
161}