vortex_io/runtime/
single.rs1use std::rc::Rc;
5use std::sync::Arc;
6
7use futures::Stream;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use futures::stream::LocalBoxStream;
11use parking_lot::Mutex;
12use smol::LocalExecutor;
13use vortex_error::vortex_panic;
14
15use crate::runtime::AbortHandle;
16use crate::runtime::AbortHandleRef;
17use crate::runtime::BlockingRuntime;
18use crate::runtime::Executor;
19use crate::runtime::Handle;
20use crate::runtime::IoTask;
21use crate::runtime::smol::SmolAbortHandle;
22
23pub struct SingleThreadRuntime {
28 sender: Arc<Sender>,
29 executor: Rc<LocalExecutor<'static>>,
30}
31
32impl Default for SingleThreadRuntime {
33 fn default() -> Self {
34 let executor = Rc::new(LocalExecutor::new());
35 let sender = Arc::new(Sender::new(&executor));
36 Self { sender, executor }
37 }
38}
39
40struct Sender {
41 scheduling: kanal::Sender<SpawnAsync<'static>>,
42 cpu: kanal::Sender<SpawnSync<'static>>,
43 blocking: kanal::Sender<SpawnSync<'static>>,
44 io: kanal::Sender<IoTask>,
45}
46
47impl Sender {
48 fn new(local: &Rc<LocalExecutor<'static>>) -> Self {
49 let (scheduling_send, scheduling_recv) = kanal::unbounded::<SpawnAsync>();
50 let (cpu_send, cpu_recv) = kanal::unbounded::<SpawnSync>();
51 let (blocking_send, blocking_recv) = kanal::unbounded::<SpawnSync>();
52 let (io_send, io_recv) = kanal::unbounded::<IoTask>();
53
54 let weak_local = Rc::downgrade(local);
57
58 let weak_local2 = weak_local.clone();
60 local
61 .spawn(async move {
62 while let Ok(spawn) = scheduling_recv.as_async().recv().await {
63 if let Some(local) = weak_local2.upgrade() {
64 drop(
66 spawn
67 .task_callback
68 .send(SmolAbortHandle::new_handle(local.spawn(spawn.future))),
69 );
70 }
71 }
72 })
73 .detach();
74
75 let weak_local2 = weak_local.clone();
77 local
78 .spawn(async move {
79 while let Ok(spawn) = cpu_recv.as_async().recv().await {
80 if let Some(local) = weak_local2.upgrade() {
81 let work = spawn.sync;
82 drop(spawn.task_callback.send(SmolAbortHandle::new_handle(
84 local.spawn(async move { work() }),
85 )));
86 }
87 }
88 })
89 .detach();
90
91 let weak_local2 = weak_local.clone();
93 local
94 .spawn(async move {
95 while let Ok(spawn) = blocking_recv.as_async().recv().await {
96 if let Some(local) = weak_local2.upgrade() {
97 let work = spawn.sync;
98 drop(spawn.task_callback.send(SmolAbortHandle::new_handle(
100 local.spawn(async move { work() }),
101 )));
102 }
103 }
104 })
105 .detach();
106
107 let weak_local2 = weak_local;
109 local
110 .spawn(async move {
111 while let Ok(task) = io_recv.as_async().recv().await {
112 if let Some(local) = weak_local2.upgrade() {
113 local.spawn(task.source.drive_local(task.stream)).detach();
114 }
115 }
116 })
117 .detach();
118
119 Self {
120 scheduling: scheduling_send,
121 cpu: cpu_send,
122 blocking: blocking_send,
123 io: io_send,
124 }
125 }
126}
127
128impl Executor for Sender {
133 fn spawn(&self, future: BoxFuture<'static, ()>) -> AbortHandleRef {
134 let (send, recv) = oneshot::channel();
135 if let Err(e) = self.scheduling.send(SpawnAsync {
136 future,
137 task_callback: send,
138 }) {
139 vortex_panic!("Executor missing: {}", e);
140 }
141 Box::new(LazyAbortHandle {
142 task: Mutex::new(recv),
143 })
144 }
145
146 fn spawn_cpu(&self, cpu: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
147 let (send, recv) = oneshot::channel();
148 if let Err(e) = self.cpu.send(SpawnSync {
149 sync: cpu,
150 task_callback: send,
151 }) {
152 vortex_panic!("Executor missing: {}", e);
153 }
154 Box::new(LazyAbortHandle {
155 task: Mutex::new(recv),
156 })
157 }
158
159 fn spawn_blocking(&self, work: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
160 let (send, recv) = oneshot::channel();
161 if let Err(e) = self.blocking.send(SpawnSync {
162 sync: work,
163 task_callback: send,
164 }) {
165 vortex_panic!("Executor missing: {}", e);
166 }
167 Box::new(LazyAbortHandle {
168 task: Mutex::new(recv),
169 })
170 }
171
172 fn spawn_io(&self, task: IoTask) {
173 if let Err(e) = self.io.send(task) {
174 vortex_panic!("Executor missing: {}", e);
175 }
176 }
177}
178
179impl BlockingRuntime for SingleThreadRuntime {
180 type BlockingIterator<'a, R: 'a> = SingleThreadIterator<'a, R>;
181
182 fn handle(&self) -> Handle {
183 let executor: Arc<dyn Executor> = self.sender.clone();
184 Handle::new(Arc::downgrade(&executor))
185 }
186
187 fn block_on<Fut, R>(&self, fut: Fut) -> R
188 where
189 Fut: Future<Output = R>,
190 {
191 smol::block_on(self.executor.run(fut))
192 }
193
194 fn block_on_stream<'a, S, R>(&self, stream: S) -> Self::BlockingIterator<'a, R>
195 where
196 S: Stream<Item = R> + Send + 'a,
197 R: Send + 'a,
198 {
199 SingleThreadIterator {
200 executor: self.executor.clone(),
201 stream: stream.boxed_local(),
202 }
203 }
204}
205
206pub fn block_on<F, Fut, R>(f: F) -> R
211where
212 F: FnOnce(Handle) -> Fut,
213 Fut: Future<Output = R>,
214{
215 let runtime = SingleThreadRuntime::default();
216 let handle = runtime.handle();
217 runtime.block_on(f(handle))
218}
219
220pub fn block_on_stream<'a, F, S, R>(f: F) -> SingleThreadIterator<'a, R>
222where
223 F: FnOnce(Handle) -> S,
224 S: Stream<Item = R> + Send + Unpin + 'a,
225 R: Send + 'a,
226{
227 let runtime = SingleThreadRuntime::default();
228 let handle = runtime.handle();
229 runtime.block_on_stream(f(handle))
230}
231
232struct SpawnAsync<'rt> {
243 future: BoxFuture<'rt, ()>,
244 task_callback: oneshot::Sender<AbortHandleRef>,
245}
246
247struct SpawnSync<'rt> {
249 sync: Box<dyn FnOnce() + Send + 'rt>,
250 task_callback: oneshot::Sender<AbortHandleRef>,
251}
252
253struct LazyAbortHandle {
254 task: Mutex<oneshot::Receiver<AbortHandleRef>>,
255}
256
257impl AbortHandle for LazyAbortHandle {
258 fn abort(self: Box<Self>) {
259 if let Ok(task) = self.task.lock().try_recv() {
261 task.abort()
262 }
263 }
264}
265
266pub struct SingleThreadIterator<'a, T> {
268 executor: Rc<LocalExecutor<'static>>,
269 stream: LocalBoxStream<'a, T>,
270}
271
272impl<T> Iterator for SingleThreadIterator<'_, T> {
273 type Item = T;
274
275 fn next(&mut self) -> Option<Self::Item> {
276 let fut = self.stream.next();
277 smol::block_on(self.executor.run(fut))
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use std::sync::Arc;
284 use std::sync::atomic::AtomicUsize;
285 use std::sync::atomic::Ordering;
286
287 use futures::FutureExt;
288
289 use crate::runtime::BlockingRuntime;
290 use crate::runtime::single::SingleThreadRuntime;
291 use crate::runtime::single::block_on;
292
293 #[test]
294 fn test_drive_simple_future() {
295 let result = SingleThreadRuntime::default().block_on(async { 123 }.boxed_local());
296 assert_eq!(result, 123);
297 }
298
299 #[test]
300 fn test_spawn_cpu_task() {
301 let counter = Arc::new(AtomicUsize::new(0));
302 let c = counter.clone();
303
304 block_on(|handle| async move {
305 handle
306 .spawn_cpu(move || {
307 c.fetch_add(1, Ordering::SeqCst);
308 })
309 .await
310 });
311
312 assert_eq!(counter.load(Ordering::SeqCst), 1);
313 }
314}