tokio_uring/runtime/mod.rs
1use std::future::Future;
2use std::io;
3use std::mem::ManuallyDrop;
4use tokio::io::unix::AsyncFd;
5use tokio::task::LocalSet;
6
7mod context;
8pub(crate) mod driver;
9
10pub(crate) use context::RuntimeContext;
11
12thread_local! {
13 pub(crate) static CONTEXT: RuntimeContext = RuntimeContext::new();
14}
15
16/// The Runtime Executor
17///
18/// This is the Runtime for `tokio-uring`.
19/// It wraps the default [`Runtime`] using the platform-specific Driver.
20///
21/// This executes futures and tasks within the current-thread only.
22///
23/// [`Runtime`]: tokio::runtime::Runtime
24pub struct Runtime {
25 /// Tokio runtime, always current-thread
26 tokio_rt: ManuallyDrop<tokio::runtime::Runtime>,
27
28 /// LocalSet for !Send tasks
29 local: ManuallyDrop<LocalSet>,
30
31 /// Strong reference to the driver.
32 driver: driver::Handle,
33}
34
35/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
36///
37/// Spawning a task enables the task to execute concurrently to other tasks.
38/// There is no guarantee that a spawned task will execute to completion. When a
39/// runtime is shutdown, all outstanding tasks are dropped, regardless of the
40/// lifecycle of that task.
41///
42/// This function must be called from the context of a `tokio-uring` runtime.
43///
44/// [`JoinHandle`]: tokio::task::JoinHandle
45///
46/// # Examples
47///
48/// In this example, a server is started and `spawn` is used to start a new task
49/// that processes each received connection.
50///
51/// ```no_run
52/// tokio_uring::start(async {
53/// let handle = tokio_uring::spawn(async {
54/// println!("hello from a background task");
55/// });
56///
57/// // Let the task complete
58/// handle.await.unwrap();
59/// });
60/// ```
61pub fn spawn<T: Future + 'static>(task: T) -> tokio::task::JoinHandle<T::Output> {
62 tokio::task::spawn_local(task)
63}
64
65impl Runtime {
66 /// Creates a new tokio_uring runtime on the current thread.
67 ///
68 /// This takes the tokio-uring [`Builder`](crate::Builder) as a parameter.
69 pub fn new(b: &crate::Builder) -> io::Result<Runtime> {
70 let rt = tokio::runtime::Builder::new_current_thread()
71 .on_thread_park(|| {
72 CONTEXT.with(|x| {
73 let _ = x
74 .handle()
75 .expect("Internal error, driver context not present when invoking hooks")
76 .flush();
77 });
78 })
79 .enable_all()
80 .build()?;
81
82 let tokio_rt = ManuallyDrop::new(rt);
83 let local = ManuallyDrop::new(LocalSet::new());
84 let driver = driver::Handle::new(b)?;
85
86 start_uring_wakes_task(&tokio_rt, &local, driver.clone());
87
88 Ok(Runtime {
89 local,
90 tokio_rt,
91 driver,
92 })
93 }
94
95 /// Runs a future to completion on the tokio-uring runtime. This is the
96 /// runtime's entry point.
97 ///
98 /// This runs the given future on the current thread, blocking until it is
99 /// complete, and yielding its resolved result. Any tasks, futures, or timers
100 /// which the future spawns internally will be executed on this runtime.
101 ///
102 /// Any spawned tasks will be suspended after `block_on` returns. Calling
103 /// `block_on` again will resume previously spawned tasks.
104 ///
105 /// # Panics
106 ///
107 /// This function panics if the provided future panics, or if called within an
108 /// asynchronous execution context.
109 /// Runs a future to completion on the current runtime.
110 pub fn block_on<F>(&self, future: F) -> F::Output
111 where
112 F: Future,
113 {
114 struct ContextGuard;
115
116 impl Drop for ContextGuard {
117 fn drop(&mut self) {
118 CONTEXT.with(|cx| cx.unset_driver());
119 }
120 }
121
122 CONTEXT.with(|cx| cx.set_handle(self.driver.clone()));
123
124 let _guard = ContextGuard;
125
126 tokio::pin!(future);
127
128 let res = self
129 .tokio_rt
130 .block_on(self.local.run_until(std::future::poll_fn(|cx| {
131 // assert!(drive.as_mut().poll(cx).is_pending());
132 future.as_mut().poll(cx)
133 })));
134
135 res
136 }
137}
138
139impl Drop for Runtime {
140 fn drop(&mut self) {
141 // drop tasks in correct order
142 unsafe {
143 ManuallyDrop::drop(&mut self.local);
144 ManuallyDrop::drop(&mut self.tokio_rt);
145 }
146 }
147}
148
149fn start_uring_wakes_task(
150 tokio_rt: &tokio::runtime::Runtime,
151 local: &LocalSet,
152 driver: driver::Handle,
153) {
154 let _guard = tokio_rt.enter();
155 let async_driver_handle = AsyncFd::new(driver).unwrap();
156
157 local.spawn_local(drive_uring_wakes(async_driver_handle));
158}
159
160async fn drive_uring_wakes(driver: AsyncFd<driver::Handle>) {
161 loop {
162 // Wait for read-readiness
163 let mut guard = driver.readable().await.unwrap();
164
165 guard.get_inner().dispatch_completions();
166
167 guard.clear_ready();
168 }
169}
170
171#[cfg(test)]
172mod test {
173
174 use super::*;
175 use crate::builder;
176
177 #[test]
178 fn block_on() {
179 let rt = Runtime::new(&builder()).unwrap();
180 rt.block_on(async move { () });
181 }
182
183 #[test]
184 fn block_on_twice() {
185 let rt = Runtime::new(&builder()).unwrap();
186 rt.block_on(async move { () });
187 rt.block_on(async move { () });
188 }
189}