flutter_rust_bridge/handler/implementation/
executor.rs1use crate::codec::BaseCodec;
2use crate::codec::Rust2DartMessageTrait;
3use crate::generalized_isolate::Channel;
4use crate::handler::error::Error;
5use crate::handler::error_listener::ErrorListener;
6use crate::handler::executor::Executor;
7use crate::handler::handler::{TaskContext, TaskInfo, TaskRetFutTrait};
8use crate::handler::implementation::error_listener::handle_non_sync_panic_error;
9use crate::misc::panic_backtrace::{CatchUnwindWithBacktrace, PanicBacktrace};
10use crate::platform_types::MessagePort;
11use crate::rust2dart::sender::Rust2DartSender;
12use crate::rust_async::BaseAsyncRuntime;
13use crate::thread_pool::BaseThreadPool;
14use crate::transfer;
15#[cfg(feature = "rust-async")]
16use futures::FutureExt;
17use std::future::Future;
18use std::panic::AssertUnwindSafe;
19
20pub struct SimpleExecutor<EL: ErrorListener, TP: BaseThreadPool, AR: BaseAsyncRuntime> {
24 error_listener: EL,
25 thread_pool: TP,
26 async_runtime: AR,
27}
28
29impl<EL: ErrorListener, TP: BaseThreadPool, AR: BaseAsyncRuntime> SimpleExecutor<EL, TP, AR> {
30 pub fn new(error_listener: EL, thread_pool: TP, async_runtime: AR) -> Self {
32 SimpleExecutor {
33 error_listener,
34 thread_pool,
35 async_runtime,
36 }
37 }
38
39 pub fn thread_pool(&self) -> &TP {
40 &self.thread_pool
41 }
42
43 pub fn async_runtime(&self) -> &AR {
44 &self.async_runtime
45 }
46}
47
48impl<EL: ErrorListener + Sync, TP: BaseThreadPool, AR: BaseAsyncRuntime> Executor
49 for SimpleExecutor<EL, TP, AR>
50{
51 #[cfg(feature = "thread-pool")]
52 fn execute_normal<Rust2DartCodec, TaskFn>(&self, task_info: TaskInfo, task: TaskFn)
53 where
54 TaskFn: FnOnce(TaskContext) -> Result<Rust2DartCodec::Message, Rust2DartCodec::Message>
55 + Send
56 + 'static,
57 Rust2DartCodec: BaseCodec,
58 {
59 let el = self.error_listener;
60 let el2 = self.error_listener;
61
62 let TaskInfo { port, .. } = task_info;
63 let port: MessagePort = port.unwrap();
64
65 self.thread_pool
66 .execute(transfer!(|port: crate::platform_types::MessagePort| {
67 #[allow(clippy::clone_on_copy)]
68 let port2 = port.clone();
69 let thread_result = PanicBacktrace::catch_unwind(AssertUnwindSafe(|| {
70 #[allow(clippy::clone_on_copy)]
71 let sender = Rust2DartSender::new(Channel::new(port2.clone()));
72 let task_context = TaskContext::new();
73
74 let ret = task(task_context);
75
76 ExecuteNormalOrAsyncUtils::handle_result::<Rust2DartCodec, _>(ret, sender, el2);
77 }));
78
79 if let Err(error) = thread_result {
80 handle_non_sync_panic_error::<Rust2DartCodec>(el, port, error);
81 }
82 }));
83 }
84
85 fn execute_sync<Rust2DartCodec, SyncTaskFn>(
86 &self,
87 _task_info: TaskInfo,
88 sync_task: SyncTaskFn,
89 ) -> Rust2DartCodec::Message
90 where
91 SyncTaskFn: FnOnce() -> Result<Rust2DartCodec::Message, Rust2DartCodec::Message>,
92 Rust2DartCodec: BaseCodec,
93 {
94 match sync_task() {
95 Ok(data) => data,
96 Err(err) => {
97 self.error_listener.on_error(Error::CustomError);
98 err
99 }
100 }
101 }
102
103 #[cfg(feature = "rust-async")]
104 fn execute_async<Rust2DartCodec, TaskFn, TaskRetFut>(&self, task_info: TaskInfo, task: TaskFn)
105 where
106 TaskFn: FnOnce(TaskContext) -> TaskRetFut + Send + 'static,
107 TaskRetFut: Future<Output = Result<Rust2DartCodec::Message, Rust2DartCodec::Message>>
108 + TaskRetFutTrait,
109 Rust2DartCodec: BaseCodec,
110 {
111 let el = self.error_listener;
112 let el2 = self.error_listener;
113
114 self.async_runtime.spawn(async move {
115 let TaskInfo { port, .. } = task_info;
116 let port = port.unwrap();
117 #[allow(clippy::clone_on_copy)]
118 let port2 = port.clone();
119
120 let async_result = AssertUnwindSafe(async {
121 #[allow(clippy::clone_on_copy)]
122 let sender = Rust2DartSender::new(Channel::new(port2.clone()));
123 let task_context = TaskContext::new();
124
125 let ret = task(task_context).await;
126
127 ExecuteNormalOrAsyncUtils::handle_result::<Rust2DartCodec, _>(ret, sender, el2);
128 })
129 .catch_unwind()
130 .await;
131
132 if let Err(err) = async_result {
133 let err = CatchUnwindWithBacktrace::new(err, PanicBacktrace::take_last());
134 handle_non_sync_panic_error::<Rust2DartCodec>(el, port, err);
135 }
136 });
137 }
138}
139
140struct ExecuteNormalOrAsyncUtils;
141
142impl ExecuteNormalOrAsyncUtils {
143 fn handle_result<Rust2DartCodec, EL>(
144 ret: Result<Rust2DartCodec::Message, Rust2DartCodec::Message>,
145 sender: Rust2DartSender,
146 el: EL,
147 ) where
148 EL: ErrorListener + Sync,
149 Rust2DartCodec: BaseCodec,
150 {
151 match ret {
152 Ok(result) => {
153 sender.send_or_warn(result.into_dart_abi());
154 }
155 Err(error) => {
156 el.on_error(Error::CustomError);
157 sender.send_or_warn(error.into_dart_abi());
158 }
159 };
160 }
161}