vortex_io/runtime/
single.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::rc::Rc;
5use std::sync::Arc;
6
7use futures::future::BoxFuture;
8use futures::stream::LocalBoxStream;
9use futures::{Stream, StreamExt};
10use parking_lot::Mutex;
11use smol::LocalExecutor;
12use vortex_error::vortex_panic;
13
14use crate::runtime::smol::SmolAbortHandle;
15use crate::runtime::{AbortHandle, AbortHandleRef, BlockingRuntime, Executor, Handle, IoTask};
16
17/// A runtime that drives all work on the current thread.
18///
19/// This is subtly different from using a current-thread runtime to drive a future since it is
20/// capable of running `!Send` I/O futures.
21pub struct SingleThreadRuntime {
22    sender: Arc<Sender>,
23    executor: Rc<LocalExecutor<'static>>,
24}
25
26impl Default for SingleThreadRuntime {
27    fn default() -> Self {
28        let executor = Rc::new(LocalExecutor::new());
29        let sender = Arc::new(Sender::new(&executor));
30        Self { sender, executor }
31    }
32}
33
34struct Sender {
35    scheduling: kanal::Sender<SpawnAsync<'static>>,
36    cpu: kanal::Sender<SpawnSync<'static>>,
37    blocking: kanal::Sender<SpawnSync<'static>>,
38    io: kanal::Sender<IoTask>,
39}
40
41impl Sender {
42    fn new(local: &Rc<LocalExecutor<'static>>) -> Self {
43        let (scheduling_send, scheduling_recv) = kanal::unbounded::<SpawnAsync>();
44        let (cpu_send, cpu_recv) = kanal::unbounded::<SpawnSync>();
45        let (blocking_send, blocking_recv) = kanal::unbounded::<SpawnSync>();
46        let (io_send, io_recv) = kanal::unbounded::<IoTask>();
47
48        // We pass weak references to the local execution into the async tasks such that the task's
49        // reference doesn't keep the execution alive after the runtime is dropped.
50        let weak_local = Rc::downgrade(local);
51
52        // Drive scheduling tasks.
53        let weak_local2 = weak_local.clone();
54        local
55            .spawn(async move {
56                while let Ok(spawn) = scheduling_recv.as_async().recv().await {
57                    if let Some(local) = weak_local2.upgrade() {
58                        // Ignore send errors since it means the caller immediately detached.
59                        drop(
60                            spawn
61                                .task_callback
62                                .send(SmolAbortHandle::new_handle(local.spawn(spawn.future))),
63                        );
64                    }
65                }
66            })
67            .detach();
68
69        // Drive CPU tasks.
70        let weak_local2 = weak_local.clone();
71        local
72            .spawn(async move {
73                while let Ok(spawn) = cpu_recv.as_async().recv().await {
74                    if let Some(local) = weak_local2.upgrade() {
75                        let work = spawn.sync;
76                        // Ignore send errors since it means the caller immediately detached.
77                        drop(spawn.task_callback.send(SmolAbortHandle::new_handle(
78                            local.spawn(async move { work() }),
79                        )));
80                    }
81                }
82            })
83            .detach();
84
85        // Drive blocking tasks.
86        let weak_local2 = weak_local.clone();
87        local
88            .spawn(async move {
89                while let Ok(spawn) = blocking_recv.as_async().recv().await {
90                    if let Some(local) = weak_local2.upgrade() {
91                        let work = spawn.sync;
92                        // Ignore send errors since it means the caller immediately detached.
93                        drop(spawn.task_callback.send(SmolAbortHandle::new_handle(
94                            local.spawn(async move { work() }),
95                        )));
96                    }
97                }
98            })
99            .detach();
100
101        // Drive I/O tasks.
102        let weak_local2 = weak_local;
103        local
104            .spawn(async move {
105                while let Ok(task) = io_recv.as_async().recv().await {
106                    if let Some(local) = weak_local2.upgrade() {
107                        local.spawn(task.source.drive_local(task.stream)).detach();
108                    }
109                }
110            })
111            .detach();
112
113        Self {
114            scheduling: scheduling_send,
115            cpu: cpu_send,
116            blocking: blocking_send,
117            io: io_send,
118        }
119    }
120}
121
122/// Since the [`Handle`], and therefore runtime implementation needs to be `Send` and `Sync`,
123/// we cannot just `impl Runtime for LocalExecutor`. Instead, we create channels that the handle
124/// can forward its work into, and we drive the resulting tasks on a [`LocalExecutor`] on the
125/// calling thread.
126impl Executor for Sender {
127    fn spawn(&self, future: BoxFuture<'static, ()>) -> AbortHandleRef {
128        let (send, recv) = oneshot::channel();
129        if let Err(e) = self.scheduling.send(SpawnAsync {
130            future,
131            task_callback: send,
132        }) {
133            vortex_panic!("Executor missing: {}", e);
134        }
135        Box::new(LazyAbortHandle {
136            task: Mutex::new(recv),
137        })
138    }
139
140    fn spawn_cpu(&self, cpu: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
141        let (send, recv) = oneshot::channel();
142        if let Err(e) = self.cpu.send(SpawnSync {
143            sync: cpu,
144            task_callback: send,
145        }) {
146            vortex_panic!("Executor missing: {}", e);
147        }
148        Box::new(LazyAbortHandle {
149            task: Mutex::new(recv),
150        })
151    }
152
153    fn spawn_blocking(&self, work: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
154        let (send, recv) = oneshot::channel();
155        if let Err(e) = self.blocking.send(SpawnSync {
156            sync: work,
157            task_callback: send,
158        }) {
159            vortex_panic!("Executor missing: {}", e);
160        }
161        Box::new(LazyAbortHandle {
162            task: Mutex::new(recv),
163        })
164    }
165
166    fn spawn_io(&self, task: IoTask) {
167        if let Err(e) = self.io.send(task) {
168            vortex_panic!("Executor missing: {}", e);
169        }
170    }
171}
172
173impl BlockingRuntime for SingleThreadRuntime {
174    type BlockingIterator<'a, R: 'a> = SingleThreadIterator<'a, R>;
175
176    fn handle(&self) -> Handle {
177        let executor: Arc<dyn Executor> = self.sender.clone();
178        Handle::new(Arc::downgrade(&executor))
179    }
180
181    fn block_on<Fut, R>(&self, fut: Fut) -> R
182    where
183        Fut: Future<Output = R>,
184    {
185        smol::block_on(self.executor.run(fut))
186    }
187
188    fn block_on_stream<'a, S, R>(&self, stream: S) -> Self::BlockingIterator<'a, R>
189    where
190        S: Stream<Item = R> + Send + 'a,
191        R: Send + 'a,
192    {
193        SingleThreadIterator {
194            executor: self.executor.clone(),
195            stream: stream.boxed_local(),
196        }
197    }
198}
199
200/// Runs a future to completion on the current thread until it completes.
201///
202/// The future is provided a [`Handle`] to the runtime so that it may spawn additional tasks
203/// to be executed concurrently.
204pub fn block_on<F, Fut, R>(f: F) -> R
205where
206    F: FnOnce(Handle) -> Fut,
207    Fut: Future<Output = R>,
208{
209    let runtime = SingleThreadRuntime::default();
210    let handle = runtime.handle();
211    runtime.block_on(f(handle))
212}
213
214/// Returns an iterator wrapper around a stream, blocking the current thread for each item.
215pub fn block_on_stream<'a, F, S, R>(f: F) -> SingleThreadIterator<'a, R>
216where
217    F: FnOnce(Handle) -> S,
218    S: Stream<Item = R> + Send + Unpin + 'a,
219    R: Send + 'a,
220{
221    let runtime = SingleThreadRuntime::default();
222    let handle = runtime.handle();
223    runtime.block_on_stream(f(handle))
224}
225
226/// A spawn request for a future.
227///
228/// We pass back the abort handle via oneshot channel because this is a single-threaded runtime,
229/// meaning we need the spawning channel consumer to do some work before the caller can actually
230/// get ahold of their task handle.
231///
232/// The reason we don't pass back a smol::Task, and instead pass back a SmolAbortHandle, is because
233/// we invert the behaviour of abort and drop. Dropping the abort handle results in the task being
234/// detached, whereas dropping the smol::Task results in the task being canceled. This helps avoid
235/// a race where the caller detaches the LazyAbortHandle before the smol::Task has been launched.
236struct SpawnAsync<'rt> {
237    future: BoxFuture<'rt, ()>,
238    task_callback: oneshot::Sender<AbortHandleRef>,
239}
240
241// A spawn request for a synchronous job.
242struct SpawnSync<'rt> {
243    sync: Box<dyn FnOnce() + Send + 'rt>,
244    task_callback: oneshot::Sender<AbortHandleRef>,
245}
246
247struct LazyAbortHandle {
248    task: Mutex<oneshot::Receiver<AbortHandleRef>>,
249}
250
251impl AbortHandle for LazyAbortHandle {
252    fn abort(self: Box<Self>) {
253        // Aborting a smol::Task is done by dropping it.
254        if let Ok(task) = self.task.lock().try_recv() {
255            task.abort()
256        }
257    }
258}
259
260/// A stream that wraps up the stream with the execution that drives it.
261pub struct SingleThreadIterator<'a, T> {
262    executor: Rc<LocalExecutor<'static>>,
263    stream: LocalBoxStream<'a, T>,
264}
265
266impl<T> Iterator for SingleThreadIterator<'_, T> {
267    type Item = T;
268
269    fn next(&mut self) -> Option<Self::Item> {
270        let fut = self.stream.next();
271        smol::block_on(self.executor.run(fut))
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use std::sync::Arc;
278    use std::sync::atomic::{AtomicUsize, Ordering};
279
280    use futures::FutureExt;
281
282    use crate::runtime::BlockingRuntime;
283    use crate::runtime::single::{SingleThreadRuntime, block_on};
284
285    #[test]
286    fn test_drive_simple_future() {
287        let result = SingleThreadRuntime::default().block_on(async { 123 }.boxed_local());
288        assert_eq!(result, 123);
289    }
290
291    #[test]
292    fn test_spawn_cpu_task() {
293        let counter = Arc::new(AtomicUsize::new(0));
294        let c = counter.clone();
295
296        block_on(|handle| async move {
297            handle
298                .spawn_cpu(move || {
299                    c.fetch_add(1, Ordering::SeqCst);
300                })
301                .await
302        });
303
304        assert_eq!(counter.load(Ordering::SeqCst), 1);
305    }
306}