Skip to main content

streamling_plugin/
async.rs

1#![allow(clippy::missing_const_for_thread_local)]
2
3use std::cell::RefCell;
4use std::sync::Arc;
5
6use abi_stable::derive_macro_reexports::{RErr, ROk, RResult, TD_Opaque};
7use abi_stable::sabi_trait;
8use abi_stable::std_types::{RBox, RDuration};
9use abi_stable::traits::IntoReprRust;
10use async_ffi::{FfiFuture, FutureExt as AsyncFfiFutureExt};
11use tokio::runtime::{EnterGuard, Handle, Runtime};
12use tracing::info;
13
14thread_local! {
15    // The runtime will be unique for each plugin library
16    static RUNTIME: RefCell<Option<Arc<Runtime>>> = RefCell::new(None);
17    static RUNTIME_ENTER_GUARD: RefCell<Option<EnterGuard<'static>>> = RefCell::new(None);
18}
19
20/// Async runtime for performing asynchronous operations in the plugin.
21/// See [`PluginTokioWrapper`] on the host side for FFI-safe implementation.
22#[sabi_trait]
23pub trait PluginAsyncRuntime: Send + Sync + Clone {
24    fn spawn(&self, fut: FfiFuture<()>) -> FfiFuture<()>;
25    fn sleep(&self, dur: RDuration) -> FfiFuture<()>;
26    fn timeout(&self, dur: RDuration, fut: FfiFuture<()>) -> FfiFuture<RResult<(), ()>>;
27    fn block_on(&self, fut: FfiFuture<()>);
28    fn yield_now(&self) -> FfiFuture<()>;
29}
30
31pub type PluginAsyncRuntimeObj = PluginAsyncRuntime_TO<'static, RBox<()>>;
32
33/// A special implementation of `PluginAsyncRuntime` that uses Tokio's runtime directly. This means
34/// that a complete Tokio runtime is created and used by a plugin.
35#[derive(Clone)]
36pub struct DirectTokioProxy {
37    handle: Handle,
38}
39
40impl DirectTokioProxy {
41    pub fn new() -> Self {
42        match Handle::try_current() {
43            Ok(handle) => {
44                info!("Using existing Tokio runtime");
45                Self { handle }
46            }
47            Err(_) => {
48                let existing_runtime = RUNTIME.with(|r| r.borrow().clone());
49
50                if let Some(runtime) = existing_runtime {
51                    info!("Using existing thread-local Tokio runtime");
52                    let handle = runtime.handle().clone();
53                    Self { handle }
54                } else {
55                    info!("No existing Tokio runtime found, creating new runtime");
56                    let runtime = tokio::runtime::Builder::new_multi_thread()
57                        // TODO: should this be configurable?
58                        // .worker_threads(num_workers)
59                        .enable_all()
60                        .build()
61                        .expect("Failed to create Tokio runtime");
62                    let handle = runtime.handle().clone();
63
64                    let runtime_arc = Arc::new(runtime);
65
66                    let guard = unsafe {
67                        // SAFETY: We're storing the guard in thread-local storage with the same lifetime as the runtime
68                        std::mem::transmute::<EnterGuard<'_>, EnterGuard<'static>>(
69                            runtime_arc.enter(),
70                        )
71                    };
72
73                    RUNTIME.with(|r| {
74                        *r.borrow_mut() = Some(runtime_arc);
75                    });
76                    RUNTIME_ENTER_GUARD.with(|g| {
77                        *g.borrow_mut() = Some(guard);
78                    });
79
80                    Self { handle }
81                }
82            }
83        }
84    }
85
86    pub fn into_async_runtime_obj(self) -> PluginAsyncRuntimeObj {
87        PluginAsyncRuntime_TO::from_value(self, TD_Opaque)
88    }
89}
90
91impl Default for DirectTokioProxy {
92    fn default() -> Self {
93        Self::new()
94    }
95}
96
97impl PluginAsyncRuntime for DirectTokioProxy {
98    fn spawn(&self, fut: FfiFuture<()>) -> FfiFuture<()> {
99        let handle = self.handle.clone();
100        async move {
101            if let Err(e) = handle.spawn(fut).await {
102                panic!("Spawned task panicked: {e:?}");
103            }
104        }
105        .into_ffi()
106    }
107
108    fn sleep(&self, dur: RDuration) -> FfiFuture<()> {
109        async move { tokio::time::sleep(dur.into_rust()).await }.into_ffi()
110    }
111
112    fn timeout(&self, dur: RDuration, fut: FfiFuture<()>) -> FfiFuture<RResult<(), ()>> {
113        async move {
114            match tokio::time::timeout(dur.into_rust(), fut).await {
115                Ok(_) => ROk(()),
116                Err(_) => RErr(()),
117            }
118        }
119        .into_ffi()
120    }
121
122    fn block_on(&self, fut: FfiFuture<()>) {
123        self.handle.block_on(fut);
124    }
125
126    fn yield_now(&self) -> FfiFuture<()> {
127        tokio::task::yield_now().into_ffi()
128    }
129}