task_exec_queue/
exec.rs

1use std::fmt::Debug;
2use std::future::Future;
3use std::hash::Hash;
4use std::marker::Unpin;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Arc;
8use std::task::{Context, Poll};
9
10use futures::channel::mpsc;
11use futures::task::AtomicWaker;
12use futures::{Sink, SinkExt, Stream, StreamExt};
13use parking_lot::Mutex;
14use parking_lot::RwLock;
15#[cfg(feature = "rate")]
16use rate_counter::Counter as RateCounter;
17
18use queue_ext::{Action, QueueExt, Reply};
19
20use super::{
21    assert_future, close::Close, flush::Flush, Counter, Error, ErrorType, GroupTaskExecQueue,
22    IndexSet, Spawner, TrySpawner,
23};
24
25type DashMap<K, V> = dashmap::DashMap<K, V, ahash::RandomState>;
26type GroupChannels<G> = Arc<DashMap<G, Arc<Mutex<GroupTaskExecQueue<TaskType>>>>>;
27
28pub type TaskType = Box<dyn std::future::Future<Output = ()> + Send + 'static + Unpin>;
29
30pub struct TaskExecQueue<Tx = mpsc::Sender<((), TaskType)>, G = (), D = ()> {
31    pub(crate) tx: Tx,
32    workers: usize,
33    queue_max: isize,
34    active_count: Counter,
35    pub(crate) waiting_count: Counter,
36    #[cfg(feature = "rate")]
37    rate_counter: RateCounter,
38    pub(crate) flush_waker: Arc<AtomicWaker>,
39    pub(crate) is_flushing: Arc<AtomicBool>,
40    is_closed: Arc<AtomicBool>,
41    pending_wakers: Wakers,
42    pub(crate) waiting_wakers: Wakers,
43    //group
44    group_channels: GroupChannels<G>,
45    _d: std::marker::PhantomData<D>,
46}
47
48impl<Tx, G, D> Clone for TaskExecQueue<Tx, G, D>
49where
50    Tx: Clone,
51{
52    #[inline]
53    fn clone(&self) -> Self {
54        Self {
55            tx: self.tx.clone(),
56            workers: self.workers,
57            queue_max: self.queue_max,
58            active_count: self.active_count.clone(),
59            waiting_count: self.waiting_count.clone(),
60            #[cfg(feature = "rate")]
61            rate_counter: self.rate_counter.clone(),
62            flush_waker: self.flush_waker.clone(),
63            is_flushing: self.is_flushing.clone(),
64            is_closed: self.is_closed.clone(),
65            pending_wakers: self.pending_wakers.clone(),
66            waiting_wakers: self.waiting_wakers.clone(),
67            group_channels: self.group_channels.clone(),
68            _d: std::marker::PhantomData,
69        }
70    }
71}
72
73impl<Tx, G, D> TaskExecQueue<Tx, G, D>
74where
75    Tx: Clone + Sink<(D, TaskType)> + Unpin + Send + Sync + 'static,
76    G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
77{
78    #[inline]
79    pub(crate) fn with_channel<Rx>(
80        workers: usize,
81        queue_max: usize,
82        tx: Tx,
83        rx: Rx,
84    ) -> (Self, impl Future<Output = ()>)
85    where
86        Rx: Stream<Item = (D, TaskType)> + Unpin,
87    {
88        let exec = Self {
89            tx,
90            workers,
91            queue_max: queue_max as isize,
92            active_count: Counter::new(),
93            waiting_count: Counter::new(),
94            #[cfg(feature = "rate")]
95            rate_counter: RateCounter::new(std::time::Duration::from_secs(3)),
96            flush_waker: Arc::new(AtomicWaker::new()),
97            is_flushing: Arc::new(AtomicBool::new(false)),
98            is_closed: Arc::new(AtomicBool::new(false)),
99            pending_wakers: new_wakers(),
100            waiting_wakers: new_wakers(),
101            group_channels: Arc::new(DashMap::default()),
102            _d: std::marker::PhantomData,
103        };
104        let runner = exec.clone().run(rx);
105        (exec, runner)
106    }
107
108    #[inline]
109    pub fn try_spawn_with<T>(&self, msg: T, name: D) -> TrySpawner<'_, T, Tx, G, D>
110    where
111        D: Clone,
112        T: Future + Send + 'static,
113        T::Output: Send + 'static,
114    {
115        let fut = TrySpawner::new(self, msg, name);
116        assert_future::<Result<(), _>, _>(fut)
117    }
118
119    #[inline]
120    pub fn spawn_with<T>(&self, msg: T, name: D) -> Spawner<'_, T, Tx, G, D>
121    where
122        D: Clone,
123        T: Future + Send + 'static,
124        T::Output: Send + 'static,
125    {
126        let fut = Spawner::new(self, msg, name);
127        assert_future::<Result<(), _>, _>(fut)
128    }
129
130    #[inline]
131    pub fn flush(&self) -> Flush<'_, Tx, G, D> {
132        self.is_flushing.store(true, Ordering::SeqCst);
133        Flush::new(self)
134    }
135
136    #[inline]
137    pub fn close(&self) -> Close<'_, Tx, G, D> {
138        self.is_flushing.store(true, Ordering::SeqCst);
139        self.is_closed.store(true, Ordering::SeqCst);
140        Close::new(self)
141    }
142
143    #[inline]
144    pub fn workers(&self) -> usize {
145        self.workers
146    }
147
148    #[inline]
149    pub fn active_count(&self) -> isize {
150        self.active_count.value()
151    }
152
153    #[inline]
154    pub fn waiting_count(&self) -> isize {
155        self.waiting_count.value()
156    }
157
158    #[inline]
159    #[cfg(feature = "rate")]
160    pub async fn completed_count(&self) -> isize {
161        self.rate_counter.total()
162    }
163
164    #[inline]
165    pub fn pending_wakers_count(&self) -> usize {
166        self.pending_wakers.len()
167    }
168
169    #[inline]
170    pub fn waiting_wakers_count(&self) -> usize {
171        self.waiting_wakers.len()
172    }
173
174    #[inline]
175    #[cfg(feature = "rate")]
176    pub async fn rate(&self) -> f64 {
177        self.rate_counter.rate()
178    }
179
180    #[inline]
181    pub fn is_full(&self) -> bool {
182        self.waiting_count() >= self.queue_max
183    }
184
185    #[inline]
186    pub fn is_active(&self) -> bool {
187        self.waiting_count() > 0
188            || self.active_count() > 0
189            || self.pending_wakers_count() > 0
190            || self.waiting_wakers_count() > 0
191    }
192
193    #[inline]
194    pub fn is_closed(&self) -> bool {
195        self.is_closed.load(Ordering::SeqCst)
196    }
197
198    #[inline]
199    pub fn is_flushing(&self) -> bool {
200        self.is_flushing.load(Ordering::SeqCst)
201    }
202
203    async fn run<Rx>(self, mut task_rx: Rx)
204    where
205        Rx: Stream<Item = (D, TaskType)> + Unpin,
206    {
207        let exec = self;
208        let pending_wakers = exec.pending_wakers.clone();
209
210        let channel = || {
211            let rx = OneValue::new().queue_stream(|s, _| match s.take() {
212                None => Poll::Pending,
213                Some(m) => Poll::Ready(Some(m)),
214            });
215
216            let tx = rx.clone().queue_sender(|s, action| match action {
217                Action::Send(item) => Reply::Send(s.set(item)),
218                Action::IsFull => Reply::IsFull(s.is_full()),
219                Action::IsEmpty => Reply::IsEmpty(s.is_empty()),
220                Action::Len => Reply::Len(s.len()),
221            });
222
223            (tx, rx)
224        };
225
226        let idle_idxs = IndexSet::new();
227        let mut txs = Vec::new();
228        let mut rxs = Vec::new();
229        for i in 0..exec.workers {
230            let (tx, mut rx) = channel();
231            let pending_wakers = pending_wakers.clone();
232            let idle_idxs = idle_idxs.clone();
233            idle_idxs.insert(i);
234            let exec = exec.clone();
235            let rx_fut = async move {
236                loop {
237                    match rx.next().await {
238                        Some(task) => {
239                            exec.active_count.inc();
240                            task.await;
241                            exec.active_count.dec();
242                            #[cfg(feature = "rate")]
243                            exec.rate_counter.inc();
244                        }
245                        None => break,
246                    }
247
248                    if !rx.is_full() {
249                        idle_idxs.insert(i);
250                        if let Some(w) = pending_wakers.pop() {
251                            w.wake();
252                        }
253                    }
254
255                    if exec.is_flushing() && rx.is_empty() && !exec.is_active() {
256                        exec.flush_waker.wake();
257                    }
258                }
259            };
260
261            txs.push(tx);
262            rxs.push(rx_fut);
263        }
264
265        let tasks_bus = async move {
266            while let Some((_, task)) = task_rx.next().await {
267                loop {
268                    if idle_idxs.is_empty() {
269                        //sleep ...
270                        let w = Arc::new(AtomicWaker::new());
271                        pending_wakers.push(w.clone());
272                        PendingOnce::new(w).await;
273                    } else if let Some(idx) = idle_idxs.pop() {
274                        //select ...
275                        if let Some(tx) = txs.get_mut(idx) {
276                            if let Err(_t) = tx.send(task).await {
277                                log::error!("send error ...");
278                                // task = t.into_inner();
279                            }
280                        }
281                        break;
282                    };
283                }
284            }
285        };
286
287        futures::future::join(tasks_bus, futures::future::join_all(rxs)).await;
288        log::info!("exit task execution queue");
289    }
290}
291
292impl<Tx, G> TaskExecQueue<Tx, G, ()>
293where
294    Tx: Clone + Sink<((), TaskType)> + Unpin + Send + Sync + 'static,
295    G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
296{
297    #[inline]
298    pub fn try_spawn<T>(&self, task: T) -> TrySpawner<'_, T, Tx, G, ()>
299    where
300        T: Future + Send + 'static,
301        T::Output: Send + 'static,
302    {
303        let fut = TrySpawner::new(self, task, ());
304        assert_future::<Result<(), _>, _>(fut)
305    }
306
307    #[inline]
308    pub fn spawn<T>(&self, task: T) -> Spawner<'_, T, Tx, G, ()>
309    where
310        T: Future + Send + 'static,
311        T::Output: Send + 'static,
312    {
313        let fut = Spawner::new(self, task, ());
314        assert_future::<Result<(), _>, _>(fut)
315    }
316
317    #[inline]
318    pub(crate) async fn group_send(&self, name: G, task: TaskType) -> Result<(), Error<TaskType>> {
319        let gt_queue = self
320            .group_channels
321            .entry(name.clone())
322            .or_insert_with(|| Arc::new(Mutex::new(GroupTaskExecQueue::new())))
323            .value()
324            .clone();
325
326        let exec = self.clone();
327        let group_channels = self.group_channels.clone();
328        let runner_task = {
329            let mut task_tx = gt_queue.lock();
330            if task_tx.is_running() {
331                task_tx.push(task);
332                drop(task_tx);
333                drop(gt_queue);
334                None
335            } else {
336                task_tx.set_running(true);
337                drop(task_tx);
338                let task_rx = gt_queue; //.clone();
339                let runner_task = async move {
340                    exec.active_count.inc();
341                    task.await;
342                    exec.active_count.dec();
343                    loop {
344                        let task: Option<TaskType> = task_rx.lock().pop();
345                        if let Some(task) = task {
346                            exec.active_count.inc();
347                            task.await;
348                            exec.active_count.dec();
349                            #[cfg(feature = "rate")]
350                            exec.rate_counter.inc();
351                        } else {
352                            group_channels.remove(&name);
353                            break;
354                        }
355                    }
356                };
357                Some(runner_task)
358            }
359        };
360
361        if let Some(runner_task) = runner_task {
362            if (self
363                .tx
364                .clone()
365                .send(((), Box::new(Box::pin(runner_task))))
366                .await)
367                .is_err()
368            {
369                Err(Error::SendError(ErrorType::Closed(None)))
370            } else {
371                Ok(())
372            }
373        } else {
374            Ok(())
375        }
376    }
377}
378
379#[derive(Clone)]
380struct OneValue(Arc<RwLock<Option<TaskType>>>);
381
382impl OneValue {
383    #[inline]
384    fn new() -> Self {
385        #[allow(clippy::arc_with_non_send_sync)]
386        Self(Arc::new(RwLock::new(None::<TaskType>)))
387    }
388
389    #[inline]
390    fn set(&self, val: TaskType) -> Option<TaskType> {
391        self.0.write().replace(val)
392    }
393
394    #[inline]
395    fn take(&self) -> Option<TaskType> {
396        self.0.write().take()
397    }
398
399    #[inline]
400    fn is_full(&self) -> bool {
401        self.0.read().is_some()
402    }
403
404    #[inline]
405    fn len(&self) -> usize {
406        if self.0.read().is_some() {
407            1
408        } else {
409            0
410        }
411    }
412
413    #[inline]
414    fn is_empty(&self) -> bool {
415        self.0.read().is_none()
416    }
417}
418
419pub(crate) struct PendingOnce {
420    w: Arc<AtomicWaker>,
421    is_ready: bool,
422}
423
424impl PendingOnce {
425    #[inline]
426    pub(crate) fn new(w: Arc<AtomicWaker>) -> Self {
427        Self { w, is_ready: false }
428    }
429}
430
431impl Future for PendingOnce {
432    type Output = ();
433    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
434        if self.is_ready {
435            Poll::Ready(())
436        } else {
437            self.w.register(cx.waker());
438            self.is_ready = true;
439            Poll::Pending
440        }
441    }
442}
443
444type Wakers = Arc<crossbeam_queue::SegQueue<Arc<AtomicWaker>>>;
445
446#[inline]
447fn new_wakers() -> Wakers {
448    Arc::new(crossbeam_queue::SegQueue::new())
449}