1use crate::api::engine_error::{VibeEngineError, VibeEngineErrorCode};
2use crate::log::log_def::DESC;
3use crate::log_e;
4use std::any::Any;
5use std::future::Future;
6use std::panic::AssertUnwindSafe;
7use std::pin::Pin;
8use std::sync::mpsc::Receiver;
9use std::sync::{Arc, Mutex};
10use std::time::Duration;
11use threadpool::ThreadPool;
12use tokio::runtime::{Handle, Runtime};
13use tokio::sync::mpsc::Sender;
14use tokio::sync::oneshot;
15
16pub(crate) type VibeEngineTask = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
17
18fn panic_payload_message(payload: &Box<dyn Any + Send + 'static>) -> String {
19 if let Some(s) = payload.downcast_ref::<&str>() {
20 (*s).to_string()
21 } else if let Some(s) = payload.downcast_ref::<String>() {
22 s.clone()
23 } else {
24 "unknown panic payload".to_string()
25 }
26}
27
28#[derive(Clone)]
29pub struct VibeCallbackExecutor {
48 cb_pool: ThreadPool,
49}
50
51impl VibeCallbackExecutor {
52 pub(crate) fn new(cb_pool: ThreadPool) -> Self {
53 Self { cb_pool }
54 }
55
56 pub fn execute<F>(&self, cb: F)
62 where
63 F: FnOnce() + Send + 'static,
64 {
65 self.cb_pool.execute(cb);
66 }
67
68 pub fn once<F, R>(&self, cb: F) -> impl FnOnce(R) + Send + 'static
74 where
75 F: FnOnce(R) + Send + 'static,
76 R: Send + 'static,
77 {
78 let executor = self.clone();
79 move |value| executor.execute(move || cb(value))
80 }
81
82 pub fn once2<F, R1, R2>(&self, cb: F) -> impl FnOnce(R1, R2) + Send + 'static
88 where
89 F: FnOnce(R1, R2) + Send + 'static,
90 R1: Send + 'static,
91 R2: Send + 'static,
92 {
93 let executor = self.clone();
94 move |value1, value2| executor.execute(move || cb(value1, value2))
95 }
96
97 pub fn once3<F, R1, R2, R3>(&self, cb: F) -> impl FnOnce(R1, R2, R3) + Send + 'static
103 where
104 F: FnOnce(R1, R2, R3) + Send + 'static,
105 R1: Send + 'static,
106 R2: Send + 'static,
107 R3: Send + 'static,
108 {
109 let executor = self.clone();
110 move |value1, value2, value3| executor.execute(move || cb(value1, value2, value3))
111 }
112
113 pub fn once3_boxed<F, R1, R2, R3>(
119 &self,
120 cb: F,
121 ) -> Box<dyn FnOnce(R1, R2, R3) + Send + Sync + 'static>
122 where
123 F: FnOnce(R1, R2, R3) + Send + Sync + 'static,
124 R1: Send + 'static,
125 R2: Send + 'static,
126 R3: Send + 'static,
127 {
128 let executor = self.clone();
129 Box::new(move |value1, value2, value3| {
130 executor.execute(move || cb(value1, value2, value3));
131 })
132 }
133
134 pub fn boxed_fn0<F>(&self, cb: F) -> Box<dyn Fn() + Send + Sync + 'static>
140 where
141 F: Fn() + Send + Sync + 'static,
142 {
143 let executor = self.clone();
144 let cb = Arc::new(cb);
145 Box::new(move || {
146 let cb_clone = Arc::clone(&cb);
147 executor.execute(move || cb_clone());
148 })
149 }
150
151 pub fn boxed_fn<F, R>(&self, cb: F) -> Box<dyn Fn(R) + Send + Sync + 'static>
157 where
158 F: Fn(R) + Send + Sync + 'static,
159 R: Send + 'static,
160 {
161 let executor = self.clone();
162 let cb = Arc::new(cb);
163 Box::new(move |value| {
164 let cb_clone = Arc::clone(&cb);
165 executor.execute(move || cb_clone(value));
166 })
167 }
168
169 pub fn boxed_fn2<F, R1, R2>(&self, cb: F) -> Box<dyn Fn(R1, R2) + Send + Sync + 'static>
175 where
176 F: Fn(R1, R2) + Send + Sync + 'static,
177 R1: Send + 'static,
178 R2: Send + 'static,
179 {
180 let executor = self.clone();
181 let cb = Arc::new(cb);
182 Box::new(move |value1, value2| {
183 let cb_clone = Arc::clone(&cb);
184 executor.execute(move || cb_clone(value1, value2));
185 })
186 }
187
188 pub fn fn2<F, R1, R2>(&self, cb: F) -> impl Fn(R1, R2) + Send + Sync + 'static
194 where
195 F: Fn(R1, R2) + Send + Sync + 'static,
196 R1: Send + 'static,
197 R2: Send + 'static,
198 {
199 let executor = self.clone();
200 let cb = Arc::new(cb);
201 move |value1, value2| {
202 let cb_clone = Arc::clone(&cb);
203 executor.execute(move || cb_clone(value1, value2));
204 }
205 }
206
207 pub fn fn2_cloneable<F, R1, R2>(&self, cb: F) -> impl Fn(R1, R2) + Clone + Send + Sync + 'static
213 where
214 F: Fn(R1, R2) + Clone + Send + Sync + 'static,
215 R1: Send + 'static,
216 R2: Send + 'static,
217 {
218 let executor = self.clone();
219 let cb = Arc::new(cb);
220 move |value1, value2| {
221 let cb_clone = Arc::clone(&cb);
222 executor.execute(move || cb_clone(value1, value2));
223 }
224 }
225
226 pub fn boxed_fn3<F, R1, R2, R3>(&self, cb: F) -> Box<dyn Fn(R1, R2, R3) + Send + Sync + 'static>
232 where
233 F: Fn(R1, R2, R3) + Send + Sync + 'static,
234 R1: Send + 'static,
235 R2: Send + 'static,
236 R3: Send + 'static,
237 {
238 let executor = self.clone();
239 let cb = Arc::new(cb);
240 Box::new(move |value1, value2, value3| {
241 let cb_clone = Arc::clone(&cb);
242 executor.execute(move || cb_clone(value1, value2, value3));
243 })
244 }
245
246 pub fn boxed_fn4<F, R1, R2, R3, R4>(
252 &self,
253 cb: F,
254 ) -> Box<dyn Fn(R1, R2, R3, R4) + Send + Sync + 'static>
255 where
256 F: Fn(R1, R2, R3, R4) + Send + Sync + 'static,
257 R1: Send + 'static,
258 R2: Send + 'static,
259 R3: Send + 'static,
260 R4: Send + 'static,
261 {
262 let executor = self.clone();
263 let cb = Arc::new(cb);
264 Box::new(move |value1, value2, value3, value4| {
265 let cb_clone = Arc::clone(&cb);
266 executor.execute(move || cb_clone(value1, value2, value3, value4));
267 })
268 }
269
270 pub fn boxed_fn5<F, R1, R2, R3, R4, R5>(
276 &self,
277 cb: F,
278 ) -> Box<dyn Fn(R1, R2, R3, R4, R5) + Send + Sync + 'static>
279 where
280 F: Fn(R1, R2, R3, R4, R5) + Send + Sync + 'static,
281 R1: Send + 'static,
282 R2: Send + 'static,
283 R3: Send + 'static,
284 R4: Send + 'static,
285 R5: Send + 'static,
286 {
287 let executor = self.clone();
288 let cb = Arc::new(cb);
289 Box::new(move |value1, value2, value3, value4, value5| {
290 let cb_clone = Arc::clone(&cb);
291 executor.execute(move || cb_clone(value1, value2, value3, value4, value5));
292 })
293 }
294
295 pub fn boxed_ref_fn2<F, R1, R2>(&self, cb: F) -> Box<dyn Fn(&R1, R2) + Send + Sync + 'static>
302 where
303 F: Fn(&R1, R2) + Send + Sync + 'static,
304 R1: Clone + Send + 'static,
305 R2: Send + 'static,
306 {
307 let executor = self.clone();
308 let cb = Arc::new(cb);
309 Box::new(move |value1, value2| {
310 let owned_value1 = value1.clone();
311 let cb_clone = Arc::clone(&cb);
312 executor.execute(move || cb_clone(&owned_value1, value2));
313 })
314 }
315
316 pub fn boxed_str_fn<F>(&self, cb: F) -> Box<dyn Fn(&str) + Send + Sync + 'static>
322 where
323 F: Fn(&str) + Send + Sync + 'static,
324 {
325 let executor = self.clone();
326 let cb = Arc::new(cb);
327 Box::new(move |value| {
328 let owned = value.to_string();
329 let cb_clone = Arc::clone(&cb);
330 executor.execute(move || cb_clone(&owned));
331 })
332 }
333
334 pub fn boxed_str_fn2<F, R2>(&self, cb: F) -> Box<dyn Fn(&str, R2) + Send + Sync + 'static>
340 where
341 F: Fn(&str, R2) + Send + Sync + 'static,
342 R2: Send + 'static,
343 {
344 let executor = self.clone();
345 let cb = Arc::new(cb);
346 Box::new(move |value, value2| {
347 let owned = value.to_string();
348 let cb_clone = Arc::clone(&cb);
349 executor.execute(move || cb_clone(&owned, value2));
350 })
351 }
352
353 pub fn boxed_str_fn3<F, R2, R3>(
359 &self,
360 cb: F,
361 ) -> Box<dyn Fn(&str, R2, R3) + Send + Sync + 'static>
362 where
363 F: Fn(&str, R2, R3) + Send + Sync + 'static,
364 R2: Send + 'static,
365 R3: Send + 'static,
366 {
367 let executor = self.clone();
368 let cb = Arc::new(cb);
369 Box::new(move |value, value2, value3| {
370 let owned = value.to_string();
371 let cb_clone = Arc::clone(&cb);
372 executor.execute(move || cb_clone(&owned, value2, value3));
373 })
374 }
375
376 pub fn boxed_str_fn4<F, R2, R3, R4>(
382 &self,
383 cb: F,
384 ) -> Box<dyn Fn(&str, R2, R3, R4) + Send + Sync + 'static>
385 where
386 F: Fn(&str, R2, R3, R4) + Send + Sync + 'static,
387 R2: Send + 'static,
388 R3: Send + 'static,
389 R4: Send + 'static,
390 {
391 let executor = self.clone();
392 let cb = Arc::new(cb);
393 Box::new(move |value, value2, value3, value4| {
394 let owned = value.to_string();
395 let cb_clone = Arc::clone(&cb);
396 executor.execute(move || cb_clone(&owned, value2, value3, value4));
397 })
398 }
399
400 pub fn boxed_str_str4_fn<F, R2, R3>(
407 &self,
408 cb: F,
409 ) -> Box<dyn Fn(&str, R2, R3, &str) + Send + Sync + 'static>
410 where
411 F: Fn(&str, R2, R3, &str) + Send + Sync + 'static,
412 R2: Send + 'static,
413 R3: Send + 'static,
414 {
415 let executor = self.clone();
416 let cb = Arc::new(cb);
417 Box::new(move |value1, value2, value3, value4| {
418 let owned_value1 = value1.to_string();
419 let owned_value4 = value4.to_string();
420 let cb_clone = Arc::clone(&cb);
421 executor.execute(move || cb_clone(&owned_value1, value2, value3, &owned_value4));
422 })
423 }
424}
425
426#[derive(Clone)]
427pub struct VibeEngineExecutor {
444 inner: Arc<VibeEngineExecutorInner>,
445}
446
447pub(crate) struct VibeRuntimeHandle {
448 handle: Handle,
449 _runtime: Option<Arc<Runtime>>,
450}
451
452impl VibeRuntimeHandle {
453 pub(crate) fn owned(runtime: Arc<Runtime>) -> Self {
454 Self {
455 handle: runtime.handle().clone(),
456 _runtime: Some(runtime),
457 }
458 }
459
460 pub(crate) fn external(handle: Handle) -> Self {
461 Self {
462 handle,
463 _runtime: None,
464 }
465 }
466
467 fn handle(&self) -> &Handle {
468 &self.handle
469 }
470}
471
472struct VibeEngineExecutorInner {
473 callback_executor: VibeCallbackExecutor,
474 async_tx: Mutex<Option<Sender<VibeEngineTask>>>,
475 sync_tx: Mutex<Option<Sender<VibeEngineTask>>>,
476 rt: VibeRuntimeHandle,
477 shutdown_rx: Mutex<Receiver<()>>,
478}
479
480impl VibeEngineExecutor {
481 pub(crate) fn new(
482 cb_pool: ThreadPool,
483 async_tx: Sender<VibeEngineTask>,
484 sync_tx: Sender<VibeEngineTask>,
485 rt: VibeRuntimeHandle,
486 shutdown_rx: Receiver<()>,
487 ) -> Self {
488 Self {
489 inner: Arc::new(VibeEngineExecutorInner {
490 callback_executor: VibeCallbackExecutor::new(cb_pool),
491 async_tx: Mutex::new(Some(async_tx)),
492 sync_tx: Mutex::new(Some(sync_tx)),
493 rt,
494 shutdown_rx: Mutex::new(shutdown_rx),
495 }),
496 }
497 }
498
499 pub fn callback(&self) -> VibeCallbackExecutor {
505 self.inner.callback_executor.clone()
506 }
507
508 pub(crate) fn block_on_with_timeout<T, Fut>(
509 &self,
510 future: Fut,
511 timeout: Duration,
512 ) -> Result<T, VibeEngineError>
513 where
514 T: Send + 'static,
515 Fut: Future<Output = Result<T, VibeEngineError>> + Send + 'static,
516 {
517 let handle = self.inner.rt.handle().clone();
518 self.run_blocking_on_engine_rt(move || {
519 handle.block_on(async move {
520 tokio::time::timeout(timeout, future).await.map_err(|_| {
521 VibeEngineError::from_error_code(VibeEngineErrorCode::TimeoutError)
522 })?
523 })
524 })?
525 }
526
527 pub(crate) fn shutdown_queues(&self, timeout: Duration) -> Result<(), VibeEngineError> {
528 let async_tx = self
529 .inner
530 .async_tx
531 .lock()
532 .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?
533 .take();
534 let sync_tx = self
535 .inner
536 .sync_tx
537 .lock()
538 .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?
539 .take();
540
541 if async_tx.is_none() && sync_tx.is_none() {
542 return Ok(());
543 }
544
545 drop(async_tx);
546 drop(sync_tx);
547
548 let shutdown_rx = self
549 .inner
550 .shutdown_rx
551 .lock()
552 .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?;
553 shutdown_rx
554 .recv_timeout(timeout)
555 .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::TimeoutError))
556 }
557
558 fn run_blocking_on_engine_rt<R>(
559 &self,
560 f: impl FnOnce() -> R + Send + 'static,
561 ) -> Result<R, VibeEngineError>
562 where
563 R: Send + 'static,
564 {
565 let engine_rt_id = self.inner.rt.handle().id();
566 match Handle::try_current() {
567 Ok(current) if current.id() == engine_rt_id => {
568 std::panic::catch_unwind(AssertUnwindSafe(|| tokio::task::block_in_place(f)))
569 .map_err(|payload| {
570 log_e!(
571 "run_blocking_on_engine_rt",
572 DESC,
573 format!(
574 "engine runtime task panicked: {}",
575 panic_payload_message(&payload)
576 )
577 );
578 VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError)
579 })
580 }
581 Ok(_) => {
582 let handle =
583 std::thread::spawn(move || std::panic::catch_unwind(AssertUnwindSafe(f)));
584 match handle.join() {
585 Ok(Ok(v)) => Ok(v),
586 Ok(Err(payload)) => {
587 log_e!(
588 "run_blocking_on_engine_rt",
589 DESC,
590 format!(
591 "helper thread task panicked: {}",
592 panic_payload_message(&payload)
593 )
594 );
595 Err(VibeEngineError::from_error_code(
596 VibeEngineErrorCode::RuntimeError,
597 ))
598 }
599 Err(payload) => {
600 log_e!(
601 "run_blocking_on_engine_rt",
602 DESC,
603 format!(
604 "helper thread panicked: {}",
605 panic_payload_message(&payload)
606 )
607 );
608 Err(VibeEngineError::from_error_code(
609 VibeEngineErrorCode::RuntimeError,
610 ))
611 }
612 }
613 }
614 Err(_) => std::panic::catch_unwind(AssertUnwindSafe(f)).map_err(|payload| {
615 log_e!(
616 "run_blocking_on_engine_rt",
617 DESC,
618 format!(
619 "non-runtime thread task panicked: {}",
620 panic_payload_message(&payload)
621 )
622 );
623 VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError)
624 }),
625 }
626 }
627
628 pub fn invoke<T, F>(&self, future: T) -> Result<F, VibeEngineError>
635 where
636 T: Future<Output = F> + Send + 'static,
637 F: Send + 'static,
638 {
639 let (result_tx, result_rx) = oneshot::channel();
640 let invoke_future = async move {
641 let result = future.await;
642 let _ = result_tx.send(result);
643 };
644
645 let handle = self.inner.rt.handle().clone();
646 let sync_tx = self
647 .inner
648 .sync_tx
649 .lock()
650 .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?
651 .clone()
652 .ok_or_else(|| VibeEngineError::from_code(VibeEngineErrorCode::PostError))?;
653 self.run_blocking_on_engine_rt(move || {
654 let task: VibeEngineTask = Box::pin(invoke_future);
655 if handle.block_on(sync_tx.send(task)).is_err() {
656 log_e!("invoke", DESC, "runtime handle block_on error");
657 return Err(VibeEngineError::from_code(VibeEngineErrorCode::PostError));
658 }
659 result_rx.blocking_recv().map_err(|e| {
660 log_e!("invoke", DESC, format!("blocking_recv error {}", e));
661 VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError)
662 })
663 })?
664 }
665
666 pub fn post<T>(&self, future: T) -> Result<(), VibeEngineError>
672 where
673 T: Future<Output = ()> + Send + 'static,
674 {
675 let handle = self.inner.rt.handle().clone();
676 let async_tx = self
677 .inner
678 .async_tx
679 .lock()
680 .map_err(|_| VibeEngineError::from_error_code(VibeEngineErrorCode::RuntimeError))?
681 .clone()
682 .ok_or_else(|| VibeEngineError::from_code(VibeEngineErrorCode::PostError))?;
683 self.run_blocking_on_engine_rt(move || {
684 let task: VibeEngineTask = Box::pin(future);
685 handle.block_on(async_tx.send(task)).map_err(|error| {
686 log_e!("post", DESC, format!("send async task error: {}", error));
687 VibeEngineError::from_code(VibeEngineErrorCode::PostError)
688 })
689 })?
690 }
691}
692
693#[cfg(test)]
694mod tests {
695 use super::*;
696 use std::sync::mpsc;
697
698 #[test]
699 fn callback_executor_supports_multi_argument_wrappers() {
700 let executor = VibeCallbackExecutor::new(ThreadPool::new(1));
701 let (tx, rx) = mpsc::channel();
702 let callback = executor.once3(move |a: i32, b: i32, c: i32| {
703 tx.send(a + b + c).expect("send callback result");
704 });
705
706 callback(1, 2, 3);
707
708 assert_eq!(rx.recv_timeout(Duration::from_secs(1)).unwrap(), 6);
709 }
710
711 #[test]
712 fn callback_executor_supports_boxed_string_wrappers() {
713 let executor = VibeCallbackExecutor::new(ThreadPool::new(1));
714 let (tx, rx) = mpsc::channel();
715 let callback = executor.boxed_str_fn4(move |name, a: i32, b: i32, c: i32| {
716 tx.send(format!("{name}:{a}:{b}:{c}"))
717 .expect("send callback result");
718 });
719
720 callback("demo", 1, 2, 3);
721
722 assert_eq!(
723 rx.recv_timeout(Duration::from_secs(1)).unwrap(),
724 "demo:1:2:3"
725 );
726 }
727}
728
729#[cfg(test)]
730mod strict_tests {
731 use super::*;
732 include!(concat!(
733 env!("CARGO_MANIFEST_DIR"),
734 "/test/unit/api/engine_executor_tests.rs"
735 ));
736}