streamling_plugin/
async.rs1#![allow(clippy::missing_const_for_thread_local)]
2
3use std::cell::RefCell;
4use std::sync::Arc;
5
6use abi_stable::derive_macro_reexports::{RErr, ROk, RResult, TD_Opaque};
7use abi_stable::sabi_trait;
8use abi_stable::std_types::{RBox, RDuration};
9use abi_stable::traits::IntoReprRust;
10use async_ffi::{FfiFuture, FutureExt as AsyncFfiFutureExt};
11use tokio::runtime::{EnterGuard, Handle, Runtime};
12use tracing::info;
13
14thread_local! {
15 static RUNTIME: RefCell<Option<Arc<Runtime>>> = RefCell::new(None);
17 static RUNTIME_ENTER_GUARD: RefCell<Option<EnterGuard<'static>>> = RefCell::new(None);
18}
19
20#[sabi_trait]
23pub trait PluginAsyncRuntime: Send + Sync + Clone {
24 fn spawn(&self, fut: FfiFuture<()>) -> FfiFuture<()>;
25 fn sleep(&self, dur: RDuration) -> FfiFuture<()>;
26 fn timeout(&self, dur: RDuration, fut: FfiFuture<()>) -> FfiFuture<RResult<(), ()>>;
27 fn block_on(&self, fut: FfiFuture<()>);
28 fn yield_now(&self) -> FfiFuture<()>;
29}
30
31pub type PluginAsyncRuntimeObj = PluginAsyncRuntime_TO<'static, RBox<()>>;
32
33#[derive(Clone)]
36pub struct DirectTokioProxy {
37 handle: Handle,
38}
39
40impl DirectTokioProxy {
41 pub fn new() -> Self {
42 match Handle::try_current() {
43 Ok(handle) => {
44 info!("Using existing Tokio runtime");
45 Self { handle }
46 }
47 Err(_) => {
48 let existing_runtime = RUNTIME.with(|r| r.borrow().clone());
49
50 if let Some(runtime) = existing_runtime {
51 info!("Using existing thread-local Tokio runtime");
52 let handle = runtime.handle().clone();
53 Self { handle }
54 } else {
55 info!("No existing Tokio runtime found, creating new runtime");
56 let runtime = tokio::runtime::Builder::new_multi_thread()
57 .enable_all()
60 .build()
61 .expect("Failed to create Tokio runtime");
62 let handle = runtime.handle().clone();
63
64 let runtime_arc = Arc::new(runtime);
65
66 let guard = unsafe {
67 std::mem::transmute::<EnterGuard<'_>, EnterGuard<'static>>(
69 runtime_arc.enter(),
70 )
71 };
72
73 RUNTIME.with(|r| {
74 *r.borrow_mut() = Some(runtime_arc);
75 });
76 RUNTIME_ENTER_GUARD.with(|g| {
77 *g.borrow_mut() = Some(guard);
78 });
79
80 Self { handle }
81 }
82 }
83 }
84 }
85
86 pub fn into_async_runtime_obj(self) -> PluginAsyncRuntimeObj {
87 PluginAsyncRuntime_TO::from_value(self, TD_Opaque)
88 }
89}
90
91impl Default for DirectTokioProxy {
92 fn default() -> Self {
93 Self::new()
94 }
95}
96
97impl PluginAsyncRuntime for DirectTokioProxy {
98 fn spawn(&self, fut: FfiFuture<()>) -> FfiFuture<()> {
99 let handle = self.handle.clone();
100 async move {
101 if let Err(e) = handle.spawn(fut).await {
102 panic!("Spawned task panicked: {e:?}");
103 }
104 }
105 .into_ffi()
106 }
107
108 fn sleep(&self, dur: RDuration) -> FfiFuture<()> {
109 async move { tokio::time::sleep(dur.into_rust()).await }.into_ffi()
110 }
111
112 fn timeout(&self, dur: RDuration, fut: FfiFuture<()>) -> FfiFuture<RResult<(), ()>> {
113 async move {
114 match tokio::time::timeout(dur.into_rust(), fut).await {
115 Ok(_) => ROk(()),
116 Err(_) => RErr(()),
117 }
118 }
119 .into_ffi()
120 }
121
122 fn block_on(&self, fut: FfiFuture<()>) {
123 self.handle.block_on(fut);
124 }
125
126 fn yield_now(&self) -> FfiFuture<()> {
127 tokio::task::yield_now().into_ffi()
128 }
129}