lelet_task_queue/
lib.rs

1//! Task queue
2//!
3//! A global task queue for buildnig task executor
4//!
5//! any task in the queue can be polled from multiple thread
6
7mod deque;
8mod local;
9mod poll_fn;
10mod shared;
11
12use std::future::Future;
13use std::marker::PhantomData;
14use std::pin::Pin;
15use std::task::Poll;
16
17use crate::poll_fn::poll_fn;
18
19/// Push a task
20pub fn push(task: impl Future<Output = ()> + Send + 'static) {
21    shared::push(task);
22}
23
24/// Push a local task
25pub fn push_local(task: impl Future<Output = ()> + 'static) {
26    local::push(task);
27}
28
29/// Poller
30pub struct Poller<'a> {
31    local: local::Poller<'a>,
32    shared: shared::Poller<'a>,
33
34    // !Send + !Sync
35    _marker: PhantomData<*mut ()>,
36}
37
38/// Get [`Poller`] for polling task
39///
40/// [`Poller`]: struct.Poller.html
41pub fn poller<'a>() -> Poller<'a> {
42    Poller {
43        local: local::poller(),
44        shared: shared::poller(),
45        _marker: PhantomData,
46    }
47}
48
49impl<'a> Poller<'a> {
50    /// Poll one task
51    ///
52    /// return false if no more task to be polled
53    ///
54    /// note that, when this function return false, it doesn't mean that is the queue is empty,
55    /// some task maybe is in pending state
56    ///
57    /// use [`wait`] to check if the queue is already empty
58    ///
59    /// [`wait`]: struct.Poller.html#method.wait
60    #[inline(always)]
61    pub fn poll_one(&self) -> bool {
62        if self.local.poll_one() {
63            return true;
64        }
65
66        if self.shared.poll_one() {
67            return true;
68        }
69
70        false
71    }
72
73    /// Wait
74    ///
75    /// return true if there is a task to be polled, or false if the queue becomes empty
76    #[inline(always)]
77    pub async fn wait(&self) -> bool {
78        let mut local_wait = Some(self.local.wait());
79        let mut shared_wait = Some(self.shared.wait());
80        poll_fn(move |cx| {
81            assert!(
82                local_wait.is_some() || shared_wait.is_some(),
83                "calling poll when future is already done"
84            );
85
86            if local_wait.is_some() {
87                let f = local_wait.as_mut().unwrap();
88                match unsafe { Pin::new_unchecked(f) }.poll(cx) {
89                    Poll::Ready(ok) => {
90                        local_wait.take();
91                        if ok {
92                            shared_wait.take();
93                            return Poll::Ready(true);
94                        }
95                    }
96                    Poll::Pending => {}
97                }
98            }
99
100            if shared_wait.is_some() {
101                let f = shared_wait.as_mut().unwrap();
102                match unsafe { Pin::new_unchecked(f) }.poll(cx) {
103                    Poll::Ready(ok) => {
104                        shared_wait.take();
105                        if ok {
106                            local_wait.take();
107                            return Poll::Ready(true);
108                        }
109                    }
110                    Poll::Pending => {}
111                }
112            }
113
114            if local_wait.is_some() || shared_wait.is_some() {
115                Poll::Pending
116            } else {
117                Poll::Ready(false)
118            }
119        })
120        .await
121    }
122}