1#![allow(clippy::type_complexity, clippy::let_underscore_future)]
2mod arbiter;
4mod builder;
5mod driver;
6mod handle;
7mod pool;
8mod rt;
9mod system;
10mod task;
11
12pub use self::arbiter::Arbiter;
13pub use self::builder::{Builder, SystemRunner};
14pub use self::driver::{BlockFuture, Driver, DriverType, Notify, PollResult, Runner};
15pub use self::pool::{BlockingError, BlockingResult};
16pub use self::rt::{Runtime, RuntimeBuilder};
17pub use self::system::{Id, PingRecord, System};
18pub use self::task::{task_callbacks, task_opt_callbacks};
19
20pub fn spawn_blocking<F, R>(f: F) -> BlockingResult<R>
24where
25 F: FnOnce() -> R + Send + 'static,
26 R: Send + 'static,
27{
28 System::current().spawn_blocking(f)
29}
30
31#[cfg(feature = "tokio")]
32mod tokio {
33 use std::future::{Future, poll_fn};
34 pub use tok_io::task::{JoinError, JoinHandle};
35
36 #[inline]
44 pub fn spawn<F>(f: F) -> tok_io::task::JoinHandle<F::Output>
45 where
46 F: Future + 'static,
47 {
48 if let Some(mut data) = crate::task::Data::load() {
49 tok_io::task::spawn_local(async move {
50 tok_io::pin!(f);
51 poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
52 })
53 } else {
54 tok_io::task::spawn_local(f)
55 }
56 }
57
58 #[derive(Clone, Debug)]
59 pub struct Handle(tok_io::runtime::Handle);
61
62 impl Handle {
63 #[inline]
64 pub fn current() -> Self {
65 Self(tok_io::runtime::Handle::current())
66 }
67
68 #[inline]
69 pub fn notify(&self) {}
71
72 #[inline]
73 pub fn spawn<F>(&self, future: F) -> tok_io::task::JoinHandle<F::Output>
78 where
79 F: Future + Send + 'static,
80 F::Output: Send + 'static,
81 {
82 self.0.spawn(future)
83 }
84 }
85}
86
87#[cfg(feature = "compio")]
88mod compio {
89 use std::task::{Context, Poll, ready};
90 use std::{fmt, future::Future, future::poll_fn, pin::Pin};
91
92 #[inline]
100 pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
101 where
102 F: Future + 'static,
103 {
104 let fut = if let Some(mut data) = crate::task::Data::load() {
105 compio_runtime::spawn(async move {
106 let mut f = std::pin::pin!(f);
107 poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
108 })
109 } else {
110 compio_runtime::spawn(f)
111 };
112
113 JoinHandle {
114 fut: Some(Either::Compio(fut)),
115 }
116 }
117
118 #[derive(Debug, Copy, Clone)]
119 pub struct JoinError;
120
121 impl fmt::Display for JoinError {
122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123 write!(f, "JoinError")
124 }
125 }
126
127 impl std::error::Error for JoinError {}
128
129 enum Either<T> {
130 Compio(compio_runtime::JoinHandle<T>),
131 Spawn(oneshot::Receiver<T>),
132 }
133
134 pub struct JoinHandle<T> {
135 fut: Option<Either<T>>,
136 }
137
138 impl<T> JoinHandle<T> {
139 pub fn is_finished(&self) -> bool {
140 match &self.fut {
141 Some(Either::Compio(fut)) => fut.is_finished(),
142 Some(Either::Spawn(fut)) => fut.is_closed(),
143 None => true,
144 }
145 }
146 }
147
148 impl<T> Drop for JoinHandle<T> {
149 fn drop(&mut self) {
150 if let Some(Either::Compio(fut)) = self.fut.take() {
151 fut.detach();
152 }
153 }
154 }
155
156 impl<T> Future for JoinHandle<T> {
157 type Output = Result<T, JoinError>;
158
159 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
160 Poll::Ready(match self.fut.as_mut() {
161 Some(Either::Compio(fut)) => {
162 ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
163 }
164 Some(Either::Spawn(fut)) => {
165 ready!(Pin::new(fut).poll(cx)).map_err(|_| JoinError)
166 }
167 None => Err(JoinError),
168 })
169 }
170 }
171
172 #[derive(Clone, Debug)]
173 pub struct Handle(crate::Arbiter);
174
175 impl Handle {
176 pub fn current() -> Self {
177 Self(crate::Arbiter::current())
178 }
179
180 pub fn notify(&self) {}
181
182 pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
183 where
184 F: Future + Send + 'static,
185 F::Output: Send + 'static,
186 {
187 let (tx, rx) = oneshot::channel();
188 self.0.spawn(async move {
189 let result = future.await;
190 let _ = tx.send(result);
191 });
192 JoinHandle {
193 fut: Some(Either::Spawn(rx)),
194 }
195 }
196 }
197}
198
199#[cfg(feature = "tokio")]
200pub use self::tokio::*;
201
202#[cfg(all(feature = "compio", not(feature = "tokio")))]
203pub use self::compio::*;
204
205pub mod default_runtime {
206 use std::fmt;
207 use std::future::{Future, poll_fn};
208
209 pub use crate::rt::{Handle, Runtime};
210
211 #[derive(Debug, Copy, Clone)]
212 pub struct JoinError;
213
214 impl fmt::Display for JoinError {
215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216 write!(f, "JoinError")
217 }
218 }
219
220 impl std::error::Error for JoinError {}
221
222 pub fn spawn<F>(fut: F) -> crate::handle::JoinHandle<F::Output>
223 where
224 F: Future + 'static,
225 {
226 if let Some(mut data) = crate::task::Data::load() {
227 Runtime::with_current(|rt| {
228 rt.spawn(async move {
229 let mut f = std::pin::pin!(fut);
230 poll_fn(|cx| data.run(|| f.as_mut().poll(cx))).await
231 })
232 })
233 } else {
234 Runtime::with_current(|rt| rt.spawn(fut))
235 }
236 }
237}
238
239#[cfg(all(not(feature = "tokio"), not(feature = "compio")))]
240pub use self::default_runtime::*;