tokio_uring/runtime/
mod.rs

1use std::future::Future;
2use std::io;
3use std::mem::ManuallyDrop;
4use tokio::io::unix::AsyncFd;
5use tokio::task::LocalSet;
6
7mod context;
8pub(crate) mod driver;
9
10pub(crate) use context::RuntimeContext;
11
12thread_local! {
13    pub(crate) static CONTEXT: RuntimeContext = RuntimeContext::new();
14}
15
16/// The Runtime Executor
17///
18/// This is the Runtime for `tokio-uring`.
19/// It wraps the default [`Runtime`] using the platform-specific Driver.
20///
21/// This executes futures and tasks within the current-thread only.
22///
23/// [`Runtime`]: tokio::runtime::Runtime
24pub struct Runtime {
25    /// Tokio runtime, always current-thread
26    tokio_rt: ManuallyDrop<tokio::runtime::Runtime>,
27
28    /// LocalSet for !Send tasks
29    local: ManuallyDrop<LocalSet>,
30
31    /// Strong reference to the driver.
32    driver: driver::Handle,
33}
34
35/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
36///
37/// Spawning a task enables the task to execute concurrently to other tasks.
38/// There is no guarantee that a spawned task will execute to completion. When a
39/// runtime is shutdown, all outstanding tasks are dropped, regardless of the
40/// lifecycle of that task.
41///
42/// This function must be called from the context of a `tokio-uring` runtime.
43///
44/// [`JoinHandle`]: tokio::task::JoinHandle
45///
46/// # Examples
47///
48/// In this example, a server is started and `spawn` is used to start a new task
49/// that processes each received connection.
50///
51/// ```no_run
52/// tokio_uring::start(async {
53///     let handle = tokio_uring::spawn(async {
54///         println!("hello from a background task");
55///     });
56///
57///     // Let the task complete
58///     handle.await.unwrap();
59/// });
60/// ```
61pub fn spawn<T: Future + 'static>(task: T) -> tokio::task::JoinHandle<T::Output> {
62    tokio::task::spawn_local(task)
63}
64
65impl Runtime {
66    /// Creates a new tokio_uring runtime on the current thread.
67    ///
68    /// This takes the tokio-uring [`Builder`](crate::Builder) as a parameter.
69    pub fn new(b: &crate::Builder) -> io::Result<Runtime> {
70        let rt = tokio::runtime::Builder::new_current_thread()
71            .on_thread_park(|| {
72                CONTEXT.with(|x| {
73                    let _ = x
74                        .handle()
75                        .expect("Internal error, driver context not present when invoking hooks")
76                        .flush();
77                });
78            })
79            .enable_all()
80            .build()?;
81
82        let tokio_rt = ManuallyDrop::new(rt);
83        let local = ManuallyDrop::new(LocalSet::new());
84        let driver = driver::Handle::new(b)?;
85
86        start_uring_wakes_task(&tokio_rt, &local, driver.clone());
87
88        Ok(Runtime {
89            local,
90            tokio_rt,
91            driver,
92        })
93    }
94
95    /// Runs a future to completion on the tokio-uring runtime. This is the
96    /// runtime's entry point.
97    ///
98    /// This runs the given future on the current thread, blocking until it is
99    /// complete, and yielding its resolved result. Any tasks, futures, or timers
100    /// which the future spawns internally will be executed on this runtime.
101    ///
102    /// Any spawned tasks will be suspended after `block_on` returns. Calling
103    /// `block_on` again will resume previously spawned tasks.
104    ///
105    /// # Panics
106    ///
107    /// This function panics if the provided future panics, or if called within an
108    /// asynchronous execution context.
109    /// Runs a future to completion on the current runtime.
110    pub fn block_on<F>(&self, future: F) -> F::Output
111    where
112        F: Future,
113    {
114        struct ContextGuard;
115
116        impl Drop for ContextGuard {
117            fn drop(&mut self) {
118                CONTEXT.with(|cx| cx.unset_driver());
119            }
120        }
121
122        CONTEXT.with(|cx| cx.set_handle(self.driver.clone()));
123
124        let _guard = ContextGuard;
125
126        tokio::pin!(future);
127
128        let res = self
129            .tokio_rt
130            .block_on(self.local.run_until(std::future::poll_fn(|cx| {
131                // assert!(drive.as_mut().poll(cx).is_pending());
132                future.as_mut().poll(cx)
133            })));
134
135        res
136    }
137}
138
139impl Drop for Runtime {
140    fn drop(&mut self) {
141        // drop tasks in correct order
142        unsafe {
143            ManuallyDrop::drop(&mut self.local);
144            ManuallyDrop::drop(&mut self.tokio_rt);
145        }
146    }
147}
148
149fn start_uring_wakes_task(
150    tokio_rt: &tokio::runtime::Runtime,
151    local: &LocalSet,
152    driver: driver::Handle,
153) {
154    let _guard = tokio_rt.enter();
155    let async_driver_handle = AsyncFd::new(driver).unwrap();
156
157    local.spawn_local(drive_uring_wakes(async_driver_handle));
158}
159
160async fn drive_uring_wakes(driver: AsyncFd<driver::Handle>) {
161    loop {
162        // Wait for read-readiness
163        let mut guard = driver.readable().await.unwrap();
164
165        guard.get_inner().dispatch_completions();
166
167        guard.clear_ready();
168    }
169}
170
171#[cfg(test)]
172mod test {
173
174    use super::*;
175    use crate::builder;
176
177    #[test]
178    fn block_on() {
179        let rt = Runtime::new(&builder()).unwrap();
180        rt.block_on(async move { () });
181    }
182
183    #[test]
184    fn block_on_twice() {
185        let rt = Runtime::new(&builder()).unwrap();
186        rt.block_on(async move { () });
187        rt.block_on(async move { () });
188    }
189}