bach/
task.rs

1use crate::executor::Handle;
2use core::{
3    future::{poll_fn, Future},
4    task::Poll,
5};
6
7crate::scope::define!(scope, Handle);
8
9mod join;
10pub(crate) mod spawn;
11pub(crate) mod supervisor;
12pub(crate) mod waker;
13
14pub use join::{JoinError, JoinHandle};
15
16pub fn spawn<F, T>(future: F) -> JoinHandle<T>
17where
18    F: 'static + Future<Output = T> + Send,
19    T: 'static + Send,
20{
21    spawn_named(future, "")
22}
23
24pub fn spawn_named<F, N, T>(future: F, name: N) -> JoinHandle<T>
25where
26    F: 'static + Future<Output = T> + Send,
27    N: core::fmt::Display,
28    T: 'static + Send,
29{
30    scope::borrow_with(|handle| {
31        // try to inherit the parent group
32        crate::group::scope::try_borrow_with(|group| {
33            if let Some(group) = group {
34                handle.spawn_named(crate::group::Grouped::new(future, *group), name)
35            } else {
36                handle.spawn_named(future, name)
37            }
38        })
39    })
40}
41
42pub async fn yield_now() {
43    let mut pending = true;
44    poll_fn(|cx| {
45        if core::mem::take(&mut pending) {
46            cx.waker().wake_by_ref();
47            return Poll::Pending;
48        }
49        Poll::Ready(())
50    })
51    .await
52}
53
54pub mod primary {
55    use super::*;
56    use alloc::sync::Arc;
57    use core::sync::atomic::{AtomicU64, Ordering};
58    use pin_project_lite::pin_project;
59
60    pub fn spawn<F, T>(future: F) -> JoinHandle<T>
61    where
62        F: 'static + Future<Output = T> + Send,
63        T: 'static + Send,
64    {
65        super::spawn(create(future))
66    }
67
68    pub fn spawn_named<F, N, T>(future: F, name: N) -> JoinHandle<T>
69    where
70        F: 'static + Future<Output = T> + Send,
71        N: core::fmt::Display,
72        T: 'static + Send,
73    {
74        super::spawn_named(create(future), name)
75    }
76
77    #[derive(Debug)]
78    pub struct Guard(Arc<AtomicU64>);
79
80    impl Guard {
81        pub(crate) fn new(count: Arc<AtomicU64>) -> Self {
82            count.fetch_add(1, Ordering::SeqCst);
83            Self(count)
84        }
85    }
86
87    impl Clone for Guard {
88        fn clone(&self) -> Self {
89            self.0.fetch_add(1, Ordering::SeqCst);
90            Self(self.0.clone())
91        }
92    }
93
94    impl Drop for Guard {
95        fn drop(&mut self) {
96            self.0.fetch_sub(1, Ordering::SeqCst);
97        }
98    }
99
100    pub fn guard() -> Guard {
101        scope::borrow_with(|h| h.primary_guard())
102    }
103
104    pub fn create<F: Future>(future: F) -> Wrapped<F> {
105        let guard = guard();
106        Wrapped {
107            inner: future,
108            guard,
109        }
110    }
111
112    pin_project! {
113        pub struct Wrapped<F> {
114            #[pin]
115            inner: F,
116            guard: Guard,
117        }
118    }
119
120    impl<F: Future> Future for Wrapped<F> {
121        type Output = F::Output;
122
123        fn poll(
124            self: std::pin::Pin<&mut Self>,
125            cx: &mut std::task::Context<'_>,
126        ) -> std::task::Poll<Self::Output> {
127            self.project().inner.poll(cx)
128        }
129    }
130}
131
132pub use info::Info;
133
134pub(crate) mod info {
135    use super::*;
136    use crate::{
137        define,
138        tracing::{info_span, Span},
139    };
140    use pin_project_lite::pin_project;
141    use std::sync::Arc;
142
143    define!(scope, Info);
144
145    #[derive(Clone, Debug)]
146    pub struct Info {
147        id: u64,
148        name: Option<Arc<str>>,
149    }
150
151    impl Info {
152        pub fn current() -> Self {
153            scope::borrow_with(|v| v.clone())
154        }
155
156        pub fn id(&self) -> u64 {
157            self.id
158        }
159
160        pub fn name(&self) -> Option<&str> {
161            self.name.as_deref()
162        }
163    }
164
165    pin_project! {
166        pub struct WithInfo<F> {
167            #[pin]
168            inner: F,
169            info: Info,
170            span: Span,
171        }
172    }
173
174    impl<F> WithInfo<F> {
175        pub fn new(inner: F, id: u64, name: &Arc<str>) -> Self {
176            let name = if name.is_empty() {
177                None
178            } else {
179                Some(name.clone())
180            };
181            let span = if let Some(name) = &name {
182                let _ = name;
183                info_span!("task", task = ?name)
184            } else {
185                info_span!("task", task = id)
186            };
187            let info = Info { id, name };
188            Self { inner, info, span }
189        }
190    }
191
192    impl<F: Future> Future for WithInfo<F> {
193        type Output = F::Output;
194
195        fn poll(
196            self: std::pin::Pin<&mut Self>,
197            cx: &mut std::task::Context<'_>,
198        ) -> std::task::Poll<Self::Output> {
199            let this = self.project();
200            let (_info, res) = scope::with(this.info.clone(), || {
201                this.span.in_scope(|| this.inner.poll(cx))
202            });
203            res
204        }
205    }
206}