vortex_io/runtime/
single.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::rc::Rc;
5use std::sync::Arc;
6
7use futures::Stream;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use futures::stream::LocalBoxStream;
11use parking_lot::Mutex;
12use smol::LocalExecutor;
13use vortex_error::vortex_panic;
14
15use crate::runtime::AbortHandle;
16use crate::runtime::AbortHandleRef;
17use crate::runtime::BlockingRuntime;
18use crate::runtime::Executor;
19use crate::runtime::Handle;
20use crate::runtime::IoTask;
21use crate::runtime::smol::SmolAbortHandle;
22
23/// A runtime that drives all work on the current thread.
24///
25/// This is subtly different from using a current-thread runtime to drive a future since it is
26/// capable of running `!Send` I/O futures.
27pub struct SingleThreadRuntime {
28    sender: Arc<Sender>,
29    executor: Rc<LocalExecutor<'static>>,
30}
31
32impl Default for SingleThreadRuntime {
33    fn default() -> Self {
34        let executor = Rc::new(LocalExecutor::new());
35        let sender = Arc::new(Sender::new(&executor));
36        Self { sender, executor }
37    }
38}
39
40struct Sender {
41    scheduling: kanal::Sender<SpawnAsync<'static>>,
42    cpu: kanal::Sender<SpawnSync<'static>>,
43    blocking: kanal::Sender<SpawnSync<'static>>,
44    io: kanal::Sender<IoTask>,
45}
46
47impl Sender {
48    fn new(local: &Rc<LocalExecutor<'static>>) -> Self {
49        let (scheduling_send, scheduling_recv) = kanal::unbounded::<SpawnAsync>();
50        let (cpu_send, cpu_recv) = kanal::unbounded::<SpawnSync>();
51        let (blocking_send, blocking_recv) = kanal::unbounded::<SpawnSync>();
52        let (io_send, io_recv) = kanal::unbounded::<IoTask>();
53
54        // We pass weak references to the local execution into the async tasks such that the task's
55        // reference doesn't keep the execution alive after the runtime is dropped.
56        let weak_local = Rc::downgrade(local);
57
58        // Drive scheduling tasks.
59        let weak_local2 = weak_local.clone();
60        local
61            .spawn(async move {
62                while let Ok(spawn) = scheduling_recv.as_async().recv().await {
63                    if let Some(local) = weak_local2.upgrade() {
64                        // Ignore send errors since it means the caller immediately detached.
65                        drop(
66                            spawn
67                                .task_callback
68                                .send(SmolAbortHandle::new_handle(local.spawn(spawn.future))),
69                        );
70                    }
71                }
72            })
73            .detach();
74
75        // Drive CPU tasks.
76        let weak_local2 = weak_local.clone();
77        local
78            .spawn(async move {
79                while let Ok(spawn) = cpu_recv.as_async().recv().await {
80                    if let Some(local) = weak_local2.upgrade() {
81                        let work = spawn.sync;
82                        // Ignore send errors since it means the caller immediately detached.
83                        drop(spawn.task_callback.send(SmolAbortHandle::new_handle(
84                            local.spawn(async move { work() }),
85                        )));
86                    }
87                }
88            })
89            .detach();
90
91        // Drive blocking tasks.
92        let weak_local2 = weak_local.clone();
93        local
94            .spawn(async move {
95                while let Ok(spawn) = blocking_recv.as_async().recv().await {
96                    if let Some(local) = weak_local2.upgrade() {
97                        let work = spawn.sync;
98                        // Ignore send errors since it means the caller immediately detached.
99                        drop(spawn.task_callback.send(SmolAbortHandle::new_handle(
100                            local.spawn(async move { work() }),
101                        )));
102                    }
103                }
104            })
105            .detach();
106
107        // Drive I/O tasks.
108        let weak_local2 = weak_local;
109        local
110            .spawn(async move {
111                while let Ok(task) = io_recv.as_async().recv().await {
112                    if let Some(local) = weak_local2.upgrade() {
113                        local.spawn(task.source.drive_local(task.stream)).detach();
114                    }
115                }
116            })
117            .detach();
118
119        Self {
120            scheduling: scheduling_send,
121            cpu: cpu_send,
122            blocking: blocking_send,
123            io: io_send,
124        }
125    }
126}
127
128/// Since the [`Handle`], and therefore runtime implementation needs to be `Send` and `Sync`,
129/// we cannot just `impl Runtime for LocalExecutor`. Instead, we create channels that the handle
130/// can forward its work into, and we drive the resulting tasks on a [`LocalExecutor`] on the
131/// calling thread.
132impl Executor for Sender {
133    fn spawn(&self, future: BoxFuture<'static, ()>) -> AbortHandleRef {
134        let (send, recv) = oneshot::channel();
135        if let Err(e) = self.scheduling.send(SpawnAsync {
136            future,
137            task_callback: send,
138        }) {
139            vortex_panic!("Executor missing: {}", e);
140        }
141        Box::new(LazyAbortHandle {
142            task: Mutex::new(recv),
143        })
144    }
145
146    fn spawn_cpu(&self, cpu: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
147        let (send, recv) = oneshot::channel();
148        if let Err(e) = self.cpu.send(SpawnSync {
149            sync: cpu,
150            task_callback: send,
151        }) {
152            vortex_panic!("Executor missing: {}", e);
153        }
154        Box::new(LazyAbortHandle {
155            task: Mutex::new(recv),
156        })
157    }
158
159    fn spawn_blocking(&self, work: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
160        let (send, recv) = oneshot::channel();
161        if let Err(e) = self.blocking.send(SpawnSync {
162            sync: work,
163            task_callback: send,
164        }) {
165            vortex_panic!("Executor missing: {}", e);
166        }
167        Box::new(LazyAbortHandle {
168            task: Mutex::new(recv),
169        })
170    }
171
172    fn spawn_io(&self, task: IoTask) {
173        if let Err(e) = self.io.send(task) {
174            vortex_panic!("Executor missing: {}", e);
175        }
176    }
177}
178
179impl BlockingRuntime for SingleThreadRuntime {
180    type BlockingIterator<'a, R: 'a> = SingleThreadIterator<'a, R>;
181
182    fn handle(&self) -> Handle {
183        let executor: Arc<dyn Executor> = self.sender.clone();
184        Handle::new(Arc::downgrade(&executor))
185    }
186
187    fn block_on<Fut, R>(&self, fut: Fut) -> R
188    where
189        Fut: Future<Output = R>,
190    {
191        smol::block_on(self.executor.run(fut))
192    }
193
194    fn block_on_stream<'a, S, R>(&self, stream: S) -> Self::BlockingIterator<'a, R>
195    where
196        S: Stream<Item = R> + Send + 'a,
197        R: Send + 'a,
198    {
199        SingleThreadIterator {
200            executor: self.executor.clone(),
201            stream: stream.boxed_local(),
202        }
203    }
204}
205
206/// Runs a future to completion on the current thread until it completes.
207///
208/// The future is provided a [`Handle`] to the runtime so that it may spawn additional tasks
209/// to be executed concurrently.
210pub fn block_on<F, Fut, R>(f: F) -> R
211where
212    F: FnOnce(Handle) -> Fut,
213    Fut: Future<Output = R>,
214{
215    let runtime = SingleThreadRuntime::default();
216    let handle = runtime.handle();
217    runtime.block_on(f(handle))
218}
219
220/// Returns an iterator wrapper around a stream, blocking the current thread for each item.
221pub fn block_on_stream<'a, F, S, R>(f: F) -> SingleThreadIterator<'a, R>
222where
223    F: FnOnce(Handle) -> S,
224    S: Stream<Item = R> + Send + Unpin + 'a,
225    R: Send + 'a,
226{
227    let runtime = SingleThreadRuntime::default();
228    let handle = runtime.handle();
229    runtime.block_on_stream(f(handle))
230}
231
232/// A spawn request for a future.
233///
234/// We pass back the abort handle via oneshot channel because this is a single-threaded runtime,
235/// meaning we need the spawning channel consumer to do some work before the caller can actually
236/// get ahold of their task handle.
237///
238/// The reason we don't pass back a smol::Task, and instead pass back a SmolAbortHandle, is because
239/// we invert the behaviour of abort and drop. Dropping the abort handle results in the task being
240/// detached, whereas dropping the smol::Task results in the task being canceled. This helps avoid
241/// a race where the caller detaches the LazyAbortHandle before the smol::Task has been launched.
242struct SpawnAsync<'rt> {
243    future: BoxFuture<'rt, ()>,
244    task_callback: oneshot::Sender<AbortHandleRef>,
245}
246
247// A spawn request for a synchronous job.
248struct SpawnSync<'rt> {
249    sync: Box<dyn FnOnce() + Send + 'rt>,
250    task_callback: oneshot::Sender<AbortHandleRef>,
251}
252
253struct LazyAbortHandle {
254    task: Mutex<oneshot::Receiver<AbortHandleRef>>,
255}
256
257impl AbortHandle for LazyAbortHandle {
258    fn abort(self: Box<Self>) {
259        // Aborting a smol::Task is done by dropping it.
260        if let Ok(task) = self.task.lock().try_recv() {
261            task.abort()
262        }
263    }
264}
265
266/// A stream that wraps up the stream with the execution that drives it.
267pub struct SingleThreadIterator<'a, T> {
268    executor: Rc<LocalExecutor<'static>>,
269    stream: LocalBoxStream<'a, T>,
270}
271
272impl<T> Iterator for SingleThreadIterator<'_, T> {
273    type Item = T;
274
275    fn next(&mut self) -> Option<Self::Item> {
276        let fut = self.stream.next();
277        smol::block_on(self.executor.run(fut))
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use std::sync::Arc;
284    use std::sync::atomic::AtomicUsize;
285    use std::sync::atomic::Ordering;
286
287    use futures::FutureExt;
288
289    use crate::runtime::BlockingRuntime;
290    use crate::runtime::single::SingleThreadRuntime;
291    use crate::runtime::single::block_on;
292
293    #[test]
294    fn test_drive_simple_future() {
295        let result = SingleThreadRuntime::default().block_on(async { 123 }.boxed_local());
296        assert_eq!(result, 123);
297    }
298
299    #[test]
300    fn test_spawn_cpu_task() {
301        let counter = Arc::new(AtomicUsize::new(0));
302        let c = counter.clone();
303
304        block_on(|handle| async move {
305            handle
306                .spawn_cpu(move || {
307                    c.fetch_add(1, Ordering::SeqCst);
308                })
309                .await
310        });
311
312        assert_eq!(counter.load(Ordering::SeqCst), 1);
313    }
314}