vortex_io/runtime/
single.rs1use std::rc::Rc;
5use std::sync::Arc;
6
7use futures::future::BoxFuture;
8use futures::stream::LocalBoxStream;
9use futures::{Stream, StreamExt};
10use parking_lot::Mutex;
11use smol::LocalExecutor;
12use vortex_error::vortex_panic;
13
14use crate::runtime::smol::SmolAbortHandle;
15use crate::runtime::{AbortHandle, AbortHandleRef, BlockingRuntime, Executor, Handle, IoTask};
16
17pub struct SingleThreadRuntime {
22 sender: Arc<Sender>,
23 executor: Rc<LocalExecutor<'static>>,
24}
25
26impl Default for SingleThreadRuntime {
27 fn default() -> Self {
28 let executor = Rc::new(LocalExecutor::new());
29 let sender = Arc::new(Sender::new(&executor));
30 Self { sender, executor }
31 }
32}
33
34struct Sender {
35 scheduling: kanal::Sender<SpawnAsync<'static>>,
36 cpu: kanal::Sender<SpawnSync<'static>>,
37 blocking: kanal::Sender<SpawnSync<'static>>,
38 io: kanal::Sender<IoTask>,
39}
40
41impl Sender {
42 fn new(local: &Rc<LocalExecutor<'static>>) -> Self {
43 let (scheduling_send, scheduling_recv) = kanal::unbounded::<SpawnAsync>();
44 let (cpu_send, cpu_recv) = kanal::unbounded::<SpawnSync>();
45 let (blocking_send, blocking_recv) = kanal::unbounded::<SpawnSync>();
46 let (io_send, io_recv) = kanal::unbounded::<IoTask>();
47
48 let weak_local = Rc::downgrade(local);
51
52 let weak_local2 = weak_local.clone();
54 local
55 .spawn(async move {
56 while let Ok(spawn) = scheduling_recv.as_async().recv().await {
57 if let Some(local) = weak_local2.upgrade() {
58 drop(
60 spawn
61 .task_callback
62 .send(SmolAbortHandle::new_handle(local.spawn(spawn.future))),
63 );
64 }
65 }
66 })
67 .detach();
68
69 let weak_local2 = weak_local.clone();
71 local
72 .spawn(async move {
73 while let Ok(spawn) = cpu_recv.as_async().recv().await {
74 if let Some(local) = weak_local2.upgrade() {
75 let work = spawn.sync;
76 drop(spawn.task_callback.send(SmolAbortHandle::new_handle(
78 local.spawn(async move { work() }),
79 )));
80 }
81 }
82 })
83 .detach();
84
85 let weak_local2 = weak_local.clone();
87 local
88 .spawn(async move {
89 while let Ok(spawn) = blocking_recv.as_async().recv().await {
90 if let Some(local) = weak_local2.upgrade() {
91 let work = spawn.sync;
92 drop(spawn.task_callback.send(SmolAbortHandle::new_handle(
94 local.spawn(async move { work() }),
95 )));
96 }
97 }
98 })
99 .detach();
100
101 let weak_local2 = weak_local;
103 local
104 .spawn(async move {
105 while let Ok(task) = io_recv.as_async().recv().await {
106 if let Some(local) = weak_local2.upgrade() {
107 local.spawn(task.source.drive_local(task.stream)).detach();
108 }
109 }
110 })
111 .detach();
112
113 Self {
114 scheduling: scheduling_send,
115 cpu: cpu_send,
116 blocking: blocking_send,
117 io: io_send,
118 }
119 }
120}
121
122impl Executor for Sender {
127 fn spawn(&self, future: BoxFuture<'static, ()>) -> AbortHandleRef {
128 let (send, recv) = oneshot::channel();
129 if let Err(e) = self.scheduling.send(SpawnAsync {
130 future,
131 task_callback: send,
132 }) {
133 vortex_panic!("Executor missing: {}", e);
134 }
135 Box::new(LazyAbortHandle {
136 task: Mutex::new(recv),
137 })
138 }
139
140 fn spawn_cpu(&self, cpu: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
141 let (send, recv) = oneshot::channel();
142 if let Err(e) = self.cpu.send(SpawnSync {
143 sync: cpu,
144 task_callback: send,
145 }) {
146 vortex_panic!("Executor missing: {}", e);
147 }
148 Box::new(LazyAbortHandle {
149 task: Mutex::new(recv),
150 })
151 }
152
153 fn spawn_blocking(&self, work: Box<dyn FnOnce() + Send + 'static>) -> AbortHandleRef {
154 let (send, recv) = oneshot::channel();
155 if let Err(e) = self.blocking.send(SpawnSync {
156 sync: work,
157 task_callback: send,
158 }) {
159 vortex_panic!("Executor missing: {}", e);
160 }
161 Box::new(LazyAbortHandle {
162 task: Mutex::new(recv),
163 })
164 }
165
166 fn spawn_io(&self, task: IoTask) {
167 if let Err(e) = self.io.send(task) {
168 vortex_panic!("Executor missing: {}", e);
169 }
170 }
171}
172
173impl BlockingRuntime for SingleThreadRuntime {
174 type BlockingIterator<'a, R: 'a> = SingleThreadIterator<'a, R>;
175
176 fn handle(&self) -> Handle {
177 let executor: Arc<dyn Executor> = self.sender.clone();
178 Handle::new(Arc::downgrade(&executor))
179 }
180
181 fn block_on<Fut, R>(&self, fut: Fut) -> R
182 where
183 Fut: Future<Output = R>,
184 {
185 smol::block_on(self.executor.run(fut))
186 }
187
188 fn block_on_stream<'a, S, R>(&self, stream: S) -> Self::BlockingIterator<'a, R>
189 where
190 S: Stream<Item = R> + Send + 'a,
191 R: Send + 'a,
192 {
193 SingleThreadIterator {
194 executor: self.executor.clone(),
195 stream: stream.boxed_local(),
196 }
197 }
198}
199
200pub fn block_on<F, Fut, R>(f: F) -> R
205where
206 F: FnOnce(Handle) -> Fut,
207 Fut: Future<Output = R>,
208{
209 let runtime = SingleThreadRuntime::default();
210 let handle = runtime.handle();
211 runtime.block_on(f(handle))
212}
213
214pub fn block_on_stream<'a, F, S, R>(f: F) -> SingleThreadIterator<'a, R>
216where
217 F: FnOnce(Handle) -> S,
218 S: Stream<Item = R> + Send + Unpin + 'a,
219 R: Send + 'a,
220{
221 let runtime = SingleThreadRuntime::default();
222 let handle = runtime.handle();
223 runtime.block_on_stream(f(handle))
224}
225
226struct SpawnAsync<'rt> {
237 future: BoxFuture<'rt, ()>,
238 task_callback: oneshot::Sender<AbortHandleRef>,
239}
240
241struct SpawnSync<'rt> {
243 sync: Box<dyn FnOnce() + Send + 'rt>,
244 task_callback: oneshot::Sender<AbortHandleRef>,
245}
246
247struct LazyAbortHandle {
248 task: Mutex<oneshot::Receiver<AbortHandleRef>>,
249}
250
251impl AbortHandle for LazyAbortHandle {
252 fn abort(self: Box<Self>) {
253 if let Ok(task) = self.task.lock().try_recv() {
255 task.abort()
256 }
257 }
258}
259
260pub struct SingleThreadIterator<'a, T> {
262 executor: Rc<LocalExecutor<'static>>,
263 stream: LocalBoxStream<'a, T>,
264}
265
266impl<T> Iterator for SingleThreadIterator<'_, T> {
267 type Item = T;
268
269 fn next(&mut self) -> Option<Self::Item> {
270 let fut = self.stream.next();
271 smol::block_on(self.executor.run(fut))
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use std::sync::Arc;
278 use std::sync::atomic::{AtomicUsize, Ordering};
279
280 use futures::FutureExt;
281
282 use crate::runtime::BlockingRuntime;
283 use crate::runtime::single::{SingleThreadRuntime, block_on};
284
285 #[test]
286 fn test_drive_simple_future() {
287 let result = SingleThreadRuntime::default().block_on(async { 123 }.boxed_local());
288 assert_eq!(result, 123);
289 }
290
291 #[test]
292 fn test_spawn_cpu_task() {
293 let counter = Arc::new(AtomicUsize::new(0));
294 let c = counter.clone();
295
296 block_on(|handle| async move {
297 handle
298 .spawn_cpu(move || {
299 c.fetch_add(1, Ordering::SeqCst);
300 })
301 .await
302 });
303
304 assert_eq!(counter.load(Ordering::SeqCst), 1);
305 }
306}