Skip to main content

vibe_ready/api/
engine_executor.rs

1use crate::api::engine_error::{VibeEngineError, VibeEngineErrorCode};
2use crate::log::log_def::DESC;
3use crate::log_e;
4use std::any::Any;
5use std::future::Future;
6use std::panic::AssertUnwindSafe;
7use std::pin::Pin;
8use std::sync::mpsc::Receiver;
9use std::sync::{Arc, Mutex};
10use std::time::Duration;
11use threadpool::ThreadPool;
12use tokio::runtime::{Handle, Runtime};
13use tokio::sync::mpsc::Sender;
14use tokio::sync::oneshot;
15
16pub(crate) type VibeEngineTask = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
17
18fn panic_payload_message(payload: &Box<dyn Any + Send + 'static>) -> String {
19    if let Some(s) = payload.downcast_ref::<&str>() {
20        (*s).to_string()
21    } else if let Some(s) = payload.downcast_ref::<String>() {
22        s.clone()
23    } else {
24        "unknown panic payload".to_string()
25    }
26}
27
28#[derive(Clone)]
29/// Executes user callbacks on the engine callback thread pool.
30///
31/// Use this when an integration receives events on an async/runtime thread but
32/// wants user callbacks to run on the configured callback pool.
33///
34/// # Examples
35///
36/// ```no_run
37/// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
38///
39/// # fn demo() -> VibeResult<()> {
40/// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
41/// let callbacks = engine.executor().callback();
42/// callbacks.execute(|| println!("callback ran"));
43/// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
44/// # Ok(())
45/// # }
46/// ```
47pub struct VibeCallbackExecutor {
48    cb_pool: ThreadPool,
49}
50
51impl VibeCallbackExecutor {
52    pub(crate) fn new(cb_pool: ThreadPool) -> Self {
53        Self { cb_pool }
54    }
55
56    /// Executes a zero-argument callback on the callback pool.
57    ///
58    /// # Returns
59    ///
60    /// This method returns `()` after queuing the callback.
61    pub fn execute<F>(&self, cb: F)
62    where
63        F: FnOnce() + Send + 'static,
64    {
65        self.cb_pool.execute(cb);
66    }
67
68    /// Converts a one-shot single-argument callback into a pool-dispatched callback.
69    ///
70    /// # Returns
71    ///
72    /// A `FnOnce(R)` wrapper that moves the argument into the callback pool.
73    pub fn once<F, R>(&self, cb: F) -> impl FnOnce(R) + Send + 'static
74    where
75        F: FnOnce(R) + Send + 'static,
76        R: Send + 'static,
77    {
78        let executor = self.clone();
79        move |value| executor.execute(move || cb(value))
80    }
81
82    /// Converts a one-shot two-argument callback into a pool-dispatched callback.
83    ///
84    /// # Returns
85    ///
86    /// A `FnOnce(R1, R2)` wrapper that moves both arguments into the callback pool.
87    pub fn once2<F, R1, R2>(&self, cb: F) -> impl FnOnce(R1, R2) + Send + 'static
88    where
89        F: FnOnce(R1, R2) + Send + 'static,
90        R1: Send + 'static,
91        R2: Send + 'static,
92    {
93        let executor = self.clone();
94        move |value1, value2| executor.execute(move || cb(value1, value2))
95    }
96
97    /// Converts a one-shot three-argument callback into a pool-dispatched callback.
98    ///
99    /// # Returns
100    ///
101    /// A `FnOnce(R1, R2, R3)` wrapper that moves all arguments into the callback pool.
102    pub fn once3<F, R1, R2, R3>(&self, cb: F) -> impl FnOnce(R1, R2, R3) + Send + 'static
103    where
104        F: FnOnce(R1, R2, R3) + Send + 'static,
105        R1: Send + 'static,
106        R2: Send + 'static,
107        R3: Send + 'static,
108    {
109        let executor = self.clone();
110        move |value1, value2, value3| executor.execute(move || cb(value1, value2, value3))
111    }
112
113    /// Returns a boxed one-shot three-argument callback wrapper.
114    ///
115    /// # Returns
116    ///
117    /// A boxed `FnOnce` that dispatches the original callback on the pool.
118    pub fn once3_boxed<F, R1, R2, R3>(
119        &self,
120        cb: F,
121    ) -> Box<dyn FnOnce(R1, R2, R3) + Send + Sync + 'static>
122    where
123        F: FnOnce(R1, R2, R3) + Send + Sync + 'static,
124        R1: Send + 'static,
125        R2: Send + 'static,
126        R3: Send + 'static,
127    {
128        let executor = self.clone();
129        Box::new(move |value1, value2, value3| {
130            executor.execute(move || cb(value1, value2, value3));
131        })
132    }
133
134    /// Returns a boxed zero-argument callback wrapper.
135    ///
136    /// # Returns
137    ///
138    /// A boxed `Fn` that can be cloned or stored by APIs expecting trait objects.
139    pub fn boxed_fn0<F>(&self, cb: F) -> Box<dyn Fn() + Send + Sync + 'static>
140    where
141        F: Fn() + Send + Sync + 'static,
142    {
143        let executor = self.clone();
144        let cb = Arc::new(cb);
145        Box::new(move || {
146            let cb_clone = Arc::clone(&cb);
147            executor.execute(move || cb_clone());
148        })
149    }
150
151    /// Returns a boxed one-argument callback wrapper.
152    ///
153    /// # Returns
154    ///
155    /// A boxed `Fn(R)` that dispatches each invocation on the callback pool.
156    pub fn boxed_fn<F, R>(&self, cb: F) -> Box<dyn Fn(R) + Send + Sync + 'static>
157    where
158        F: Fn(R) + Send + Sync + 'static,
159        R: Send + 'static,
160    {
161        let executor = self.clone();
162        let cb = Arc::new(cb);
163        Box::new(move |value| {
164            let cb_clone = Arc::clone(&cb);
165            executor.execute(move || cb_clone(value));
166        })
167    }
168
169    /// Returns a boxed two-argument callback wrapper.
170    ///
171    /// # Returns
172    ///
173    /// A boxed `Fn(R1, R2)` that dispatches each invocation on the callback pool.
174    pub fn boxed_fn2<F, R1, R2>(&self, cb: F) -> Box<dyn Fn(R1, R2) + Send + Sync + 'static>
175    where
176        F: Fn(R1, R2) + Send + Sync + 'static,
177        R1: Send + 'static,
178        R2: Send + 'static,
179    {
180        let executor = self.clone();
181        let cb = Arc::new(cb);
182        Box::new(move |value1, value2| {
183            let cb_clone = Arc::clone(&cb);
184            executor.execute(move || cb_clone(value1, value2));
185        })
186    }
187
188    /// Returns an unboxed two-argument callback wrapper.
189    ///
190    /// # Returns
191    ///
192    /// An `impl Fn(R1, R2)` wrapper that dispatches each invocation on the pool.
193    pub fn fn2<F, R1, R2>(&self, cb: F) -> impl Fn(R1, R2) + Send + Sync + 'static
194    where
195        F: Fn(R1, R2) + Send + Sync + 'static,
196        R1: Send + 'static,
197        R2: Send + 'static,
198    {
199        let executor = self.clone();
200        let cb = Arc::new(cb);
201        move |value1, value2| {
202            let cb_clone = Arc::clone(&cb);
203            executor.execute(move || cb_clone(value1, value2));
204        }
205    }
206
207    /// Returns a cloneable unboxed two-argument callback wrapper.
208    ///
209    /// # Returns
210    ///
211    /// A cloneable `impl Fn(R1, R2)` wrapper for APIs that duplicate callbacks.
212    pub fn fn2_cloneable<F, R1, R2>(&self, cb: F) -> impl Fn(R1, R2) + Clone + Send + Sync + 'static
213    where
214        F: Fn(R1, R2) + Clone + Send + Sync + 'static,
215        R1: Send + 'static,
216        R2: Send + 'static,
217    {
218        let executor = self.clone();
219        let cb = Arc::new(cb);
220        move |value1, value2| {
221            let cb_clone = Arc::clone(&cb);
222            executor.execute(move || cb_clone(value1, value2));
223        }
224    }
225
226    /// Returns a boxed three-argument callback wrapper.
227    ///
228    /// # Returns
229    ///
230    /// A boxed `Fn(R1, R2, R3)` that dispatches each invocation on the pool.
231    pub fn boxed_fn3<F, R1, R2, R3>(&self, cb: F) -> Box<dyn Fn(R1, R2, R3) + Send + Sync + 'static>
232    where
233        F: Fn(R1, R2, R3) + Send + Sync + 'static,
234        R1: Send + 'static,
235        R2: Send + 'static,
236        R3: Send + 'static,
237    {
238        let executor = self.clone();
239        let cb = Arc::new(cb);
240        Box::new(move |value1, value2, value3| {
241            let cb_clone = Arc::clone(&cb);
242            executor.execute(move || cb_clone(value1, value2, value3));
243        })
244    }
245
246    /// Returns a boxed four-argument callback wrapper.
247    ///
248    /// # Returns
249    ///
250    /// A boxed `Fn(R1, R2, R3, R4)` dispatched on the callback pool.
251    pub fn boxed_fn4<F, R1, R2, R3, R4>(
252        &self,
253        cb: F,
254    ) -> Box<dyn Fn(R1, R2, R3, R4) + Send + Sync + 'static>
255    where
256        F: Fn(R1, R2, R3, R4) + Send + Sync + 'static,
257        R1: Send + 'static,
258        R2: Send + 'static,
259        R3: Send + 'static,
260        R4: Send + 'static,
261    {
262        let executor = self.clone();
263        let cb = Arc::new(cb);
264        Box::new(move |value1, value2, value3, value4| {
265            let cb_clone = Arc::clone(&cb);
266            executor.execute(move || cb_clone(value1, value2, value3, value4));
267        })
268    }
269
270    /// Returns a boxed five-argument callback wrapper.
271    ///
272    /// # Returns
273    ///
274    /// A boxed `Fn(R1, R2, R3, R4, R5)` dispatched on the callback pool.
275    pub fn boxed_fn5<F, R1, R2, R3, R4, R5>(
276        &self,
277        cb: F,
278    ) -> Box<dyn Fn(R1, R2, R3, R4, R5) + Send + Sync + 'static>
279    where
280        F: Fn(R1, R2, R3, R4, R5) + Send + Sync + 'static,
281        R1: Send + 'static,
282        R2: Send + 'static,
283        R3: Send + 'static,
284        R4: Send + 'static,
285        R5: Send + 'static,
286    {
287        let executor = self.clone();
288        let cb = Arc::new(cb);
289        Box::new(move |value1, value2, value3, value4, value5| {
290            let cb_clone = Arc::clone(&cb);
291            executor.execute(move || cb_clone(value1, value2, value3, value4, value5));
292        })
293    }
294
295    /// Returns a boxed callback wrapper that accepts a borrowed first argument.
296    ///
297    /// # Returns
298    ///
299    /// A boxed `Fn(&R1, R2)` wrapper. The borrowed value is cloned before the
300    /// callback is moved to the pool.
301    pub fn boxed_ref_fn2<F, R1, R2>(&self, cb: F) -> Box<dyn Fn(&R1, R2) + Send + Sync + 'static>
302    where
303        F: Fn(&R1, R2) + Send + Sync + 'static,
304        R1: Clone + Send + 'static,
305        R2: Send + 'static,
306    {
307        let executor = self.clone();
308        let cb = Arc::new(cb);
309        Box::new(move |value1, value2| {
310            let owned_value1 = value1.clone();
311            let cb_clone = Arc::clone(&cb);
312            executor.execute(move || cb_clone(&owned_value1, value2));
313        })
314    }
315
316    /// Returns a boxed string-slice callback wrapper.
317    ///
318    /// # Returns
319    ///
320    /// A boxed `Fn(&str)` wrapper that owns the string before dispatching.
321    pub fn boxed_str_fn<F>(&self, cb: F) -> Box<dyn Fn(&str) + Send + Sync + 'static>
322    where
323        F: Fn(&str) + Send + Sync + 'static,
324    {
325        let executor = self.clone();
326        let cb = Arc::new(cb);
327        Box::new(move |value| {
328            let owned = value.to_string();
329            let cb_clone = Arc::clone(&cb);
330            executor.execute(move || cb_clone(&owned));
331        })
332    }
333
334    /// Returns a boxed `&str` plus one-argument callback wrapper.
335    ///
336    /// # Returns
337    ///
338    /// A boxed `Fn(&str, R2)` wrapper that owns the string before dispatching.
339    pub fn boxed_str_fn2<F, R2>(&self, cb: F) -> Box<dyn Fn(&str, R2) + Send + Sync + 'static>
340    where
341        F: Fn(&str, R2) + Send + Sync + 'static,
342        R2: Send + 'static,
343    {
344        let executor = self.clone();
345        let cb = Arc::new(cb);
346        Box::new(move |value, value2| {
347            let owned = value.to_string();
348            let cb_clone = Arc::clone(&cb);
349            executor.execute(move || cb_clone(&owned, value2));
350        })
351    }
352
353    /// Returns a boxed `&str` plus two-argument callback wrapper.
354    ///
355    /// # Returns
356    ///
357    /// A boxed `Fn(&str, R2, R3)` wrapper that owns the string before dispatching.
358    pub fn boxed_str_fn3<F, R2, R3>(
359        &self,
360        cb: F,
361    ) -> Box<dyn Fn(&str, R2, R3) + Send + Sync + 'static>
362    where
363        F: Fn(&str, R2, R3) + Send + Sync + 'static,
364        R2: Send + 'static,
365        R3: Send + 'static,
366    {
367        let executor = self.clone();
368        let cb = Arc::new(cb);
369        Box::new(move |value, value2, value3| {
370            let owned = value.to_string();
371            let cb_clone = Arc::clone(&cb);
372            executor.execute(move || cb_clone(&owned, value2, value3));
373        })
374    }
375
376    /// Returns a boxed `&str` plus three-argument callback wrapper.
377    ///
378    /// # Returns
379    ///
380    /// A boxed `Fn(&str, R2, R3, R4)` wrapper that owns the string before dispatching.
381    pub fn boxed_str_fn4<F, R2, R3, R4>(
382        &self,
383        cb: F,
384    ) -> Box<dyn Fn(&str, R2, R3, R4) + Send + Sync + 'static>
385    where
386        F: Fn(&str, R2, R3, R4) + Send + Sync + 'static,
387        R2: Send + 'static,
388        R3: Send + 'static,
389        R4: Send + 'static,
390    {
391        let executor = self.clone();
392        let cb = Arc::new(cb);
393        Box::new(move |value, value2, value3, value4| {
394            let owned = value.to_string();
395            let cb_clone = Arc::clone(&cb);
396            executor.execute(move || cb_clone(&owned, value2, value3, value4));
397        })
398    }
399
400    /// Returns a boxed callback wrapper with first and fourth string arguments.
401    ///
402    /// # Returns
403    ///
404    /// A boxed `Fn(&str, R2, R3, &str)` wrapper that owns both string slices
405    /// before dispatching.
406    pub fn boxed_str_str4_fn<F, R2, R3>(
407        &self,
408        cb: F,
409    ) -> Box<dyn Fn(&str, R2, R3, &str) + Send + Sync + 'static>
410    where
411        F: Fn(&str, R2, R3, &str) + Send + Sync + 'static,
412        R2: Send + 'static,
413        R3: Send + 'static,
414    {
415        let executor = self.clone();
416        let cb = Arc::new(cb);
417        Box::new(move |value1, value2, value3, value4| {
418            let owned_value1 = value1.to_string();
419            let owned_value4 = value4.to_string();
420            let cb_clone = Arc::clone(&cb);
421            executor.execute(move || cb_clone(&owned_value1, value2, value3, &owned_value4));
422        })
423    }
424}
425
426#[derive(Clone)]
427/// Executes futures and callbacks on the engine-owned runtime infrastructure.
428///
429/// # Examples
430///
431/// ```no_run
432/// use vibe_ready::{VibeEngine, VibeEngineConfig, VibeResult};
433///
434/// # fn demo() -> VibeResult<()> {
435/// let engine = VibeEngine::create(VibeEngineConfig::builder().build())?;
436/// let executor = engine.executor();
437/// let value = executor.invoke(async { "ready" })?;
438/// assert_eq!(value, "ready");
439/// # engine.destroy_with_timeout(std::time::Duration::from_secs(1))?;
440/// # Ok(())
441/// # }
442/// ```
443pub struct VibeEngineExecutor {
444    inner: Arc<VibeEngineExecutorInner>,
445}
446
447pub(crate) struct VibeRuntimeHandle {
448    handle: Handle,
449    _runtime: Option<Arc<Runtime>>,
450}
451
452impl VibeRuntimeHandle {
453    pub(crate) fn owned(runtime: Arc<Runtime>) -> Self {
454        Self {
455            handle: runtime.handle().clone(),
456            _runtime: Some(runtime),
457        }
458    }
459
460    pub(crate) fn external(handle: Handle) -> Self {
461        Self {
462            handle,
463            _runtime: None,
464        }
465    }
466
467    fn handle(&self) -> &Handle {
468        &self.handle
469    }
470}
471
472struct VibeEngineExecutorInner {
473    callback_executor: VibeCallbackExecutor,
474    async_tx: Mutex<Option<Sender<VibeEngineTask>>>,
475    sync_tx: Mutex<Option<Sender<VibeEngineTask>>>,
476    rt: VibeRuntimeHandle,
477    shutdown_rx: Mutex<Receiver<()>>,
478}
479
480impl VibeEngineExecutor {
481    pub(crate) fn new(
482        cb_pool: ThreadPool,
483        async_tx: Sender<VibeEngineTask>,
484        sync_tx: Sender<VibeEngineTask>,
485        rt: VibeRuntimeHandle,
486        shutdown_rx: Receiver<()>,
487    ) -> Self {
488        Self {
489            inner: Arc::new(VibeEngineExecutorInner {
490                callback_executor: VibeCallbackExecutor::new(cb_pool),
491                async_tx: Mutex::new(Some(async_tx)),
492                sync_tx: Mutex::new(Some(sync_tx)),
493                rt,
494                shutdown_rx: Mutex::new(shutdown_rx),
495            }),
496        }
497    }
498
499    /// Returns the callback executor associated with this engine.
500    ///
501    /// # Returns
502    ///
503    /// A cheap clone of [`VibeCallbackExecutor`].
504    pub fn callback(&self) -> VibeCallbackExecutor {
505        self.inner.callback_executor.clone()
506    }
507
508    pub(crate) fn block_on_with_timeout<T, Fut>(
509        &self,
510        future: Fut,
511        timeout: Duration,
512    ) -> Result<T, VibeEngineError>
513    where
514        T: Send + 'static,
515        Fut: Future<Output = Result<T, VibeEngineError>> + Send + 'static,
516    {
517        let handle = self.inner.rt.handle().clone();
518        self.run_blocking_on_engine_rt(move || {
519            handle.block_on(async move {
520                tokio::time::timeout(timeout, future).await.map_err(|_| {
521                    VibeEngineError::from_error_code(VibeEngineErrorCode::TimeoutError)
522                })?
523            })
524        })?
525    }
526
527    pub(crate) fn shutdown_queues(&self, timeout: Duration) -> Result<(), VibeEngineError> {
528        let async_tx = self
529            .inner
530            .async_tx
531            .lock()
532            .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?
533            .take();
534        let sync_tx = self
535            .inner
536            .sync_tx
537            .lock()
538            .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?
539            .take();
540
541        if async_tx.is_none() && sync_tx.is_none() {
542            return Ok(());
543        }
544
545        drop(async_tx);
546        drop(sync_tx);
547
548        let shutdown_rx = self
549            .inner
550            .shutdown_rx
551            .lock()
552            .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?;
553        shutdown_rx
554            .recv_timeout(timeout)
555            .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::TimeoutError))
556    }
557
558    fn run_blocking_on_engine_rt<R>(
559        &self,
560        f: impl FnOnce() -> R + Send + 'static,
561    ) -> Result<R, VibeEngineError>
562    where
563        R: Send + 'static,
564    {
565        let engine_rt_id = self.inner.rt.handle().id();
566        match Handle::try_current() {
567            Ok(current) if current.id() == engine_rt_id => {
568                std::panic::catch_unwind(AssertUnwindSafe(|| tokio::task::block_in_place(f)))
569                    .map_err(|payload| {
570                        log_e!(
571                            "run_blocking_on_engine_rt",
572                            DESC,
573                            format!(
574                                "engine runtime task panicked: {}",
575                                panic_payload_message(&payload)
576                            )
577                        );
578                        VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError)
579                    })
580            }
581            Ok(_) => {
582                let handle =
583                    std::thread::spawn(move || std::panic::catch_unwind(AssertUnwindSafe(f)));
584                match handle.join() {
585                    Ok(Ok(v)) => Ok(v),
586                    Ok(Err(payload)) => {
587                        log_e!(
588                            "run_blocking_on_engine_rt",
589                            DESC,
590                            format!(
591                                "helper thread task panicked: {}",
592                                panic_payload_message(&payload)
593                            )
594                        );
595                        Err(VibeEngineError::from_error_code(
596                            VibeEngineErrorCode::RuntimeError,
597                        ))
598                    }
599                    Err(payload) => {
600                        log_e!(
601                            "run_blocking_on_engine_rt",
602                            DESC,
603                            format!(
604                                "helper thread panicked: {}",
605                                panic_payload_message(&payload)
606                            )
607                        );
608                        Err(VibeEngineError::from_error_code(
609                            VibeEngineErrorCode::RuntimeError,
610                        ))
611                    }
612                }
613            }
614            Err(_) => std::panic::catch_unwind(AssertUnwindSafe(f)).map_err(|payload| {
615                log_e!(
616                    "run_blocking_on_engine_rt",
617                    DESC,
618                    format!(
619                        "non-runtime thread task panicked: {}",
620                        panic_payload_message(&payload)
621                    )
622                );
623                VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError)
624            }),
625        }
626    }
627
628    /// Runs a future on the engine synchronous queue and waits for its output.
629    ///
630    /// # Returns
631    ///
632    /// `Ok(F)` with the future output, or [`VibeEngineError`] if the task could
633    /// not be queued or the runtime failed while waiting.
634    pub fn invoke<T, F>(&self, future: T) -> Result<F, VibeEngineError>
635    where
636        T: Future<Output = F> + Send + 'static,
637        F: Send + 'static,
638    {
639        let (result_tx, result_rx) = oneshot::channel();
640        let invoke_future = async move {
641            let result = future.await;
642            let _ = result_tx.send(result);
643        };
644
645        let handle = self.inner.rt.handle().clone();
646        let sync_tx = self
647            .inner
648            .sync_tx
649            .lock()
650            .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?
651            .clone()
652            .ok_or_else(|| VibeEngineError::from_code(VibeEngineErrorCode::PostError))?;
653        self.run_blocking_on_engine_rt(move || {
654            let task: VibeEngineTask = Box::pin(invoke_future);
655            if handle.block_on(sync_tx.send(task)).is_err() {
656                log_e!("invoke", DESC, "runtime handle block_on error");
657                return Err(VibeEngineError::from_code(VibeEngineErrorCode::PostError));
658            }
659            result_rx.blocking_recv().map_err(|e| {
660                log_e!("invoke", DESC, format!("blocking_recv error {}", e));
661                VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError)
662            })
663        })?
664    }
665
666    /// Posts a fire-and-forget future to the engine asynchronous queue.
667    ///
668    /// # Returns
669    ///
670    /// `Ok(())` after the future is queued, or [`VibeEngineError`] if queuing fails.
671    pub fn post<T>(&self, future: T) -> Result<(), VibeEngineError>
672    where
673        T: Future<Output = ()> + Send + 'static,
674    {
675        let handle = self.inner.rt.handle().clone();
676        let async_tx = self
677            .inner
678            .async_tx
679            .lock()
680            .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?
681            .clone()
682            .ok_or_else(|| VibeEngineError::from_code(VibeEngineErrorCode::PostError))?;
683        self.run_blocking_on_engine_rt(move || {
684            let task: VibeEngineTask = Box::pin(future);
685            handle.block_on(async_tx.send(task)).map_err(|error| {
686                log_e!("post", DESC, format!("send async task error: {}", error));
687                VibeEngineError::from_code(VibeEngineErrorCode::PostError)
688            })
689        })?
690    }
691}
692
693#[cfg(test)]
694mod tests {
695    use super::*;
696    use std::sync::mpsc;
697
698    #[test]
699    fn callback_executor_supports_multi_argument_wrappers() {
700        let executor = VibeCallbackExecutor::new(ThreadPool::new(1));
701        let (tx, rx) = mpsc::channel();
702        let callback = executor.once3(move |a: i32, b: i32, c: i32| {
703            tx.send(a + b + c).expect("send callback result");
704        });
705
706        callback(1, 2, 3);
707
708        assert_eq!(rx.recv_timeout(Duration::from_secs(1)).unwrap(), 6);
709    }
710
711    #[test]
712    fn callback_executor_supports_boxed_string_wrappers() {
713        let executor = VibeCallbackExecutor::new(ThreadPool::new(1));
714        let (tx, rx) = mpsc::channel();
715        let callback = executor.boxed_str_fn4(move |name, a: i32, b: i32, c: i32| {
716            tx.send(format!("{name}:{a}:{b}:{c}"))
717                .expect("send callback result");
718        });
719
720        callback("demo", 1, 2, 3);
721
722        assert_eq!(
723            rx.recv_timeout(Duration::from_secs(1)).unwrap(),
724            "demo:1:2:3"
725        );
726    }
727}
728
729#[cfg(test)]
730mod strict_tests {
731    use super::*;
732    include!(concat!(
733        env!("CARGO_MANIFEST_DIR"),
734        "/test/unit/api/engine_executor_tests.rs"
735    ));
736}