1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2use std::{cell::RefCell, ptr};
4
5mod arbiter;
6mod builder;
7mod system;
8
9pub use self::arbiter::Arbiter;
10pub use self::builder::{Builder, SystemRunner};
11pub use self::system::System;
12
13thread_local! {
14 static CB: RefCell<(TBefore, TEnter, TExit, TAfter)> = RefCell::new((
15 Box::new(|| {None}), Box::new(|_| {ptr::null()}), Box::new(|_| {}), Box::new(|_| {}))
16 );
17}
18
19type TBefore = Box<dyn Fn() -> Option<*const ()>>;
20type TEnter = Box<dyn Fn(*const ()) -> *const ()>;
21type TExit = Box<dyn Fn(*const ())>;
22type TAfter = Box<dyn Fn(*const ())>;
23
24pub unsafe fn spawn_cbs<FBefore, FEnter, FExit, FAfter>(
31 before: FBefore,
32 enter: FEnter,
33 exit: FExit,
34 after: FAfter,
35) where
36 FBefore: Fn() -> Option<*const ()> + 'static,
37 FEnter: Fn(*const ()) -> *const () + 'static,
38 FExit: Fn(*const ()) + 'static,
39 FAfter: Fn(*const ()) + 'static,
40{
41 CB.with(|cb| {
42 *cb.borrow_mut() = (
43 Box::new(before),
44 Box::new(enter),
45 Box::new(exit),
46 Box::new(after),
47 );
48 });
49}
50
51#[allow(dead_code)]
52#[cfg(feature = "tokio")]
53mod tokio {
54 use std::future::{poll_fn, Future};
55 use tok_io::runtime::Handle;
56 pub use tok_io::task::{spawn_blocking, JoinError, JoinHandle};
57
58 pub fn block_on<F: Future<Output = ()>>(fut: F) {
61 if let Ok(hnd) = Handle::try_current() {
62 log::debug!("Use existing tokio runtime and block on future");
63 hnd.block_on(tok_io::task::LocalSet::new().run_until(fut));
64 } else {
65 log::debug!("Create tokio runtime and block on future");
66
67 let rt = tok_io::runtime::Builder::new_current_thread()
68 .enable_all()
69 .build()
71 .unwrap();
72 tok_io::task::LocalSet::new().block_on(&rt, fut);
73 }
74 }
75
76 #[inline]
84 pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
85 where
86 F: Future + 'static,
87 {
88 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
89 tok_io::task::spawn_local(async move {
90 if let Some(ptr) = ptr {
91 tok_io::pin!(f);
92 let result = poll_fn(|ctx| {
93 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
94 let result = f.as_mut().poll(ctx);
95 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
96 result
97 })
98 .await;
99 crate::CB.with(|cb| (cb.borrow().3)(ptr));
100 result
101 } else {
102 f.await
103 }
104 })
105 }
106
107 #[inline]
115 pub fn spawn_fn<F, R>(f: F) -> tok_io::task::JoinHandle<R::Output>
116 where
117 F: FnOnce() -> R + 'static,
118 R: Future + 'static,
119 {
120 spawn(async move { f().await })
121 }
122}
123
124#[allow(dead_code)]
125#[cfg(feature = "compio")]
126mod compio {
127 use std::task::{ready, Context, Poll};
128 use std::{fmt, future::poll_fn, future::Future, pin::Pin};
129
130 use compio_runtime::Runtime;
131
132 pub fn block_on<F: Future<Output = ()>>(fut: F) {
135 log::info!(
136 "Starting compio runtime, driver {:?}",
137 compio_driver::DriverType::current()
138 );
139 let rt = Runtime::new().unwrap();
140 rt.block_on(fut);
141 }
142
143 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
149 where
150 F: FnOnce() -> T + Send + Sync + 'static,
151 T: Send + 'static,
152 {
153 JoinHandle {
154 fut: Some(compio_runtime::spawn_blocking(f)),
155 }
156 }
157
158 #[inline]
166 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
167 where
168 F: Future + 'static,
169 {
170 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
171 let fut = compio_runtime::spawn(async move {
172 if let Some(ptr) = ptr {
173 let mut f = std::pin::pin!(f);
174 let result = poll_fn(|ctx| {
175 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
176 let result = f.as_mut().poll(ctx);
177 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
178 result
179 })
180 .await;
181 crate::CB.with(|cb| (cb.borrow().3)(ptr));
182 result
183 } else {
184 f.await
185 }
186 });
187
188 JoinHandle { fut: Some(fut) }
189 }
190
191 #[inline]
199 pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
200 where
201 F: FnOnce() -> R + 'static,
202 R: Future + 'static,
203 {
204 spawn(async move { f().await })
205 }
206
207 #[derive(Debug, Copy, Clone)]
208 pub struct JoinError;
209
210 impl fmt::Display for JoinError {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 write!(f, "JoinError")
213 }
214 }
215
216 impl std::error::Error for JoinError {}
217
218 pub struct JoinHandle<T> {
219 fut: Option<compio_runtime::JoinHandle<T>>,
220 }
221
222 impl<T> JoinHandle<T> {
223 pub fn is_finished(&self) -> bool {
224 if let Some(hnd) = &self.fut {
225 hnd.is_finished()
226 } else {
227 true
228 }
229 }
230 }
231
232 impl<T> Drop for JoinHandle<T> {
233 fn drop(&mut self) {
234 self.fut.take().unwrap().detach();
235 }
236 }
237
238 impl<T> Future for JoinHandle<T> {
239 type Output = Result<T, JoinError>;
240
241 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
242 Poll::Ready(
243 ready!(Pin::new(self.fut.as_mut().unwrap()).poll(cx))
244 .map_err(|_| JoinError),
245 )
246 }
247 }
248}
249
250#[allow(dead_code)]
251#[cfg(feature = "async-std")]
252mod asyncstd {
253 use std::future::{poll_fn, Future};
254 use std::{fmt, pin::Pin, task::ready, task::Context, task::Poll};
255
256 pub fn block_on<F: Future<Output = ()>>(fut: F) {
259 async_std::task::block_on(fut);
260 }
261
262 #[inline]
270 pub fn spawn<F>(mut f: F) -> JoinHandle<F::Output>
271 where
272 F: Future + 'static,
273 {
274 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
275 JoinHandle {
276 fut: async_std::task::spawn_local(async move {
277 if let Some(ptr) = ptr {
278 let mut f = unsafe { Pin::new_unchecked(&mut f) };
279 let result = poll_fn(|ctx| {
280 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
281 let result = f.as_mut().poll(ctx);
282 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
283 result
284 })
285 .await;
286 crate::CB.with(|cb| (cb.borrow().3)(ptr));
287 result
288 } else {
289 f.await
290 }
291 }),
292 }
293 }
294
295 #[inline]
303 pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
304 where
305 F: FnOnce() -> R + 'static,
306 R: Future + 'static,
307 {
308 spawn(async move { f().await })
309 }
310
311 pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
317 where
318 F: FnOnce() -> T + Send + 'static,
319 T: Send + 'static,
320 {
321 JoinHandle {
322 fut: async_std::task::spawn_blocking(f),
323 }
324 }
325
326 #[derive(Debug, Copy, Clone)]
327 pub struct JoinError;
328
329 impl fmt::Display for JoinError {
330 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
331 write!(f, "JoinError")
332 }
333 }
334
335 impl std::error::Error for JoinError {}
336
337 pub struct JoinHandle<T> {
338 fut: async_std::task::JoinHandle<T>,
339 }
340
341 impl<T> Future for JoinHandle<T> {
342 type Output = Result<T, JoinError>;
343
344 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
345 Poll::Ready(Ok(ready!(Pin::new(&mut self.fut).poll(cx))))
346 }
347 }
348}
349
350#[allow(dead_code)]
351#[cfg(all(feature = "glommio", target_os = "linux"))]
352mod glommio {
353 use std::future::{poll_fn, Future};
354 use std::{pin::Pin, task::Context, task::Poll};
355
356 use futures_channel::oneshot::Canceled;
357 use glomm_io::task;
358
359 pub type JoinError = Canceled;
360
361 pub fn block_on<F: Future<Output = ()>>(fut: F) {
364 let ex = glomm_io::LocalExecutor::default();
365 ex.run(async move {
366 let _ = fut.await;
367 })
368 }
369
370 #[inline]
378 pub fn spawn<F>(mut f: F) -> JoinHandle<F::Output>
379 where
380 F: Future + 'static,
381 F::Output: 'static,
382 {
383 let ptr = crate::CB.with(|cb| (cb.borrow().0)());
384 JoinHandle {
385 fut: Either::Left(
386 glomm_io::spawn_local(async move {
387 if let Some(ptr) = ptr {
388 glomm_io::executor().yield_now().await;
389 let mut f = unsafe { Pin::new_unchecked(&mut f) };
390 let result = poll_fn(|ctx| {
391 let new_ptr = crate::CB.with(|cb| (cb.borrow().1)(ptr));
392 let result = f.as_mut().poll(ctx);
393 crate::CB.with(|cb| (cb.borrow().2)(new_ptr));
394 result
395 })
396 .await;
397 crate::CB.with(|cb| (cb.borrow().3)(ptr));
398 result
399 } else {
400 glomm_io::executor().yield_now().await;
401 f.await
402 }
403 })
404 .detach(),
405 ),
406 }
407 }
408
409 #[inline]
417 pub fn spawn_fn<F, R>(f: F) -> JoinHandle<R::Output>
418 where
419 F: FnOnce() -> R + 'static,
420 R: Future + 'static,
421 {
422 spawn(async move { f().await })
423 }
424
425 pub fn spawn_blocking<F, R>(f: F) -> JoinHandle<R>
426 where
427 F: FnOnce() -> R + Send + 'static,
428 R: Send + 'static,
429 {
430 let fut = glomm_io::executor().spawn_blocking(f);
431 JoinHandle {
432 fut: Either::Right(Box::pin(async move { Ok(fut.await) })),
433 }
434 }
435
436 enum Either<T1, T2> {
437 Left(T1),
438 Right(T2),
439 }
440
441 #[allow(clippy::type_complexity)]
444 pub struct JoinHandle<T> {
445 fut:
446 Either<task::JoinHandle<T>, Pin<Box<dyn Future<Output = Result<T, Canceled>>>>>,
447 }
448
449 impl<T> Future for JoinHandle<T> {
450 type Output = Result<T, Canceled>;
451
452 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
453 match self.fut {
454 Either::Left(ref mut f) => match Pin::new(f).poll(cx) {
455 Poll::Pending => Poll::Pending,
456 Poll::Ready(res) => Poll::Ready(res.ok_or(Canceled)),
457 },
458 Either::Right(ref mut f) => Pin::new(f).poll(cx),
459 }
460 }
461 }
462}
463
464#[cfg(feature = "tokio")]
465pub use self::tokio::*;
466
467#[cfg(feature = "async-std")]
468pub use self::asyncstd::*;
469
470#[cfg(feature = "glommio")]
471pub use self::glommio::*;
472
473#[cfg(feature = "compio")]
474pub use self::compio::*;
475
476#[allow(dead_code)]
477#[cfg(all(
478 not(feature = "tokio"),
479 not(feature = "async-std"),
480 not(feature = "compio"),
481 not(feature = "glommio")
482))]
483mod no_rt {
484 use std::task::{Context, Poll};
485 use std::{fmt, future::Future, marker::PhantomData, pin::Pin};
486
487 pub fn block_on<F: Future<Output = ()>>(_: F) {
490 panic!("async runtime is not configured");
491 }
492
493 pub fn spawn<F>(_: F) -> JoinHandle<F::Output>
494 where
495 F: Future + 'static,
496 {
497 unimplemented!()
498 }
499
500 pub fn spawn_blocking<F, T>(_: F) -> JoinHandle<T>
501 where
502 F: FnOnce() -> T + Send + Sync + 'static,
503 T: Send + 'static,
504 {
505 unimplemented!()
506 }
507
508 #[allow(clippy::type_complexity)]
511 pub struct JoinHandle<T> {
512 t: PhantomData<T>,
513 }
514
515 impl<T> JoinHandle<T> {
516 pub fn is_finished(&self) -> bool {
517 true
518 }
519 }
520
521 impl<T> Future for JoinHandle<T> {
522 type Output = Result<T, JoinError>;
523
524 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
525 todo!()
526 }
527 }
528
529 #[derive(Debug, Copy, Clone)]
530 pub struct JoinError;
531
532 impl fmt::Display for JoinError {
533 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
534 write!(f, "JoinError")
535 }
536 }
537
538 impl std::error::Error for JoinError {}
539}
540
541#[cfg(all(
542 not(feature = "tokio"),
543 not(feature = "async-std"),
544 not(feature = "compio"),
545 not(feature = "glommio")
546))]
547pub use self::no_rt::*;