task_exec_queue/
local.rs

1use std::fmt::Debug;
2use std::future::Future;
3use std::hash::Hash;
4use std::marker::Unpin;
5use std::pin::Pin;
6use std::rc::Rc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::task::{Context, Poll};
9
10use futures::channel::mpsc;
11use futures::task::AtomicWaker;
12use futures::{Sink, SinkExt, Stream, StreamExt};
13use parking_lot::Mutex;
14use parking_lot::RwLock;
15#[cfg(feature = "rate")]
16use rate_counter::LocalCounter as RateCounter;
17
18use crate::local_builder::LocalSender;
19use queue_ext::{Action, QueueExt, Reply};
20
21use super::{
22    assert_future, close::LocalClose, flush::LocalFlush, Counter, Error, ErrorType,
23    GroupTaskExecQueue, IndexSet, LocalSpawner, TryLocalSpawner,
24};
25
26type DashMap<K, V> = dashmap::DashMap<K, V, ahash::RandomState>;
27type GroupChannels<G> = Rc<DashMap<G, Rc<Mutex<GroupTaskExecQueue<LocalTaskType>>>>>;
28
29pub type LocalTaskType = Box<dyn std::future::Future<Output = ()> + 'static + Unpin>;
30
31pub struct LocalTaskExecQueue<Tx = LocalSender<(), mpsc::SendError>, G = (), D = ()> {
32    pub(crate) tx: Tx,
33    workers: usize,
34    queue_max: isize,
35    active_count: Counter,
36    pub(crate) waiting_count: Counter,
37    completed_count: Counter,
38    #[cfg(feature = "rate")]
39    rate_counter: RateCounter,
40    pub(crate) flush_waker: Rc<AtomicWaker>,
41    pub(crate) is_flushing: Rc<AtomicBool>,
42    is_closed: Rc<AtomicBool>,
43    pending_wakers: Wakers,
44    pub(crate) waiting_wakers: Wakers,
45    //group
46    group_channels: GroupChannels<G>,
47    _d: std::marker::PhantomData<D>,
48}
49
50impl<Tx, G, D> Clone for LocalTaskExecQueue<Tx, G, D>
51where
52    Tx: Clone,
53{
54    #[inline]
55    fn clone(&self) -> Self {
56        Self {
57            tx: self.tx.clone(),
58            workers: self.workers,
59            queue_max: self.queue_max,
60            active_count: self.active_count.clone(),
61            waiting_count: self.waiting_count.clone(),
62            completed_count: self.completed_count.clone(),
63            #[cfg(feature = "rate")]
64            rate_counter: self.rate_counter.clone(),
65            flush_waker: self.flush_waker.clone(),
66            is_flushing: self.is_flushing.clone(),
67            is_closed: self.is_closed.clone(),
68            pending_wakers: self.pending_wakers.clone(),
69            waiting_wakers: self.waiting_wakers.clone(),
70            group_channels: self.group_channels.clone(),
71            _d: std::marker::PhantomData,
72        }
73    }
74}
75
76impl<Tx, G, D> LocalTaskExecQueue<Tx, G, D>
77where
78    Tx: Clone + Sink<(D, LocalTaskType)> + Unpin + 'static,
79    G: Hash + Eq + Clone + Debug + 'static,
80{
81    #[inline]
82    pub(crate) fn with_channel<Rx>(
83        workers: usize,
84        queue_max: usize,
85        tx: Tx,
86        rx: Rx,
87    ) -> (Self, impl Future<Output = ()>)
88    where
89        Rx: Stream<Item = (D, LocalTaskType)> + Unpin,
90    {
91        let exec = Self {
92            tx,
93            workers,
94            queue_max: queue_max as isize,
95            active_count: Counter::new(),
96            waiting_count: Counter::new(),
97            completed_count: Counter::new(),
98            #[cfg(feature = "rate")]
99            rate_counter: RateCounter::new(std::time::Duration::from_secs(3)),
100            flush_waker: Rc::new(AtomicWaker::new()),
101            is_flushing: Rc::new(AtomicBool::new(false)),
102            is_closed: Rc::new(AtomicBool::new(false)),
103            pending_wakers: new_wakers(),
104            waiting_wakers: new_wakers(),
105            group_channels: Rc::new(DashMap::default()),
106            _d: std::marker::PhantomData,
107        };
108        let runner = exec.clone().run(rx);
109        (exec, runner)
110    }
111
112    #[inline]
113    pub fn try_spawn_with<T>(&self, msg: T, name: D) -> TryLocalSpawner<'_, T, Tx, G, D>
114    where
115        D: Clone,
116        T: Future + 'static,
117        T::Output: 'static,
118    {
119        let fut = TryLocalSpawner::new(self, msg, name);
120        assert_future::<Result<(), _>, _>(fut)
121    }
122
123    #[inline]
124    pub fn spawn_with<T>(&self, msg: T, name: D) -> LocalSpawner<'_, T, Tx, G, D>
125    where
126        D: Clone,
127        T: Future + 'static,
128        T::Output: 'static,
129    {
130        let fut = LocalSpawner::new(self, msg, name);
131        assert_future::<Result<(), _>, _>(fut)
132    }
133
134    #[inline]
135    pub fn flush(&self) -> LocalFlush<'_, Tx, G, D> {
136        self.is_flushing.store(true, Ordering::SeqCst);
137        LocalFlush::new(self)
138    }
139
140    #[inline]
141    pub fn close(&self) -> LocalClose<'_, Tx, G, D> {
142        self.is_flushing.store(true, Ordering::SeqCst);
143        self.is_closed.store(true, Ordering::SeqCst);
144        LocalClose::new(self)
145    }
146
147    #[inline]
148    pub fn workers(&self) -> usize {
149        self.workers
150    }
151
152    #[inline]
153    pub fn active_count(&self) -> isize {
154        self.active_count.value()
155    }
156
157    #[inline]
158    pub fn waiting_count(&self) -> isize {
159        self.waiting_count.value()
160    }
161
162    #[inline]
163    #[cfg(feature = "rate")]
164    pub async fn completed_count(&self) -> isize {
165        self.rate_counter.total()
166    }
167
168    #[inline]
169    pub fn pending_wakers_count(&self) -> usize {
170        self.pending_wakers.len()
171    }
172
173    #[inline]
174    pub fn waiting_wakers_count(&self) -> usize {
175        self.waiting_wakers.len()
176    }
177
178    #[inline]
179    #[cfg(feature = "rate")]
180    pub async fn rate(&self) -> f64 {
181        self.rate_counter.rate()
182    }
183
184    #[inline]
185    pub fn is_full(&self) -> bool {
186        self.waiting_count() >= self.queue_max
187    }
188
189    #[inline]
190    pub fn is_active(&self) -> bool {
191        self.waiting_count() > 0
192            || self.active_count() > 0
193            || self.pending_wakers_count() > 0
194            || self.waiting_wakers_count() > 0
195    }
196
197    #[inline]
198    pub fn is_closed(&self) -> bool {
199        self.is_closed.load(Ordering::SeqCst)
200    }
201
202    #[inline]
203    pub fn is_flushing(&self) -> bool {
204        self.is_flushing.load(Ordering::SeqCst)
205    }
206
207    async fn run<Rx>(self, mut task_rx: Rx)
208    where
209        Rx: Stream<Item = (D, LocalTaskType)> + Unpin,
210    {
211        let exec = self;
212        let pending_wakers = exec.pending_wakers.clone();
213
214        let channel = || {
215            let rx = OneValue::new().queue_stream(|s, _| match s.take() {
216                None => Poll::Pending,
217                Some(m) => Poll::Ready(Some(m)),
218            });
219
220            let tx = rx.clone().queue_sender(|s, action| match action {
221                Action::Send(item) => Reply::Send(s.set(item)),
222                Action::IsFull => Reply::IsFull(s.is_full()),
223                Action::IsEmpty => Reply::IsEmpty(s.is_empty()),
224                Action::Len => Reply::Len(s.len()),
225            });
226
227            (tx, rx)
228        };
229
230        let idle_idxs = IndexSet::new();
231        let mut txs = Vec::new();
232        let mut rxs = Vec::new();
233        for i in 0..exec.workers {
234            let (tx, mut rx) = channel();
235            let pending_wakers = pending_wakers.clone();
236            let idle_idxs = idle_idxs.clone();
237            idle_idxs.insert(i);
238            let exec = exec.clone();
239            let rx_fut = async move {
240                loop {
241                    match rx.next().await {
242                        Some(task) => {
243                            exec.active_count.inc();
244                            task.await;
245                            exec.active_count.dec();
246                            #[cfg(feature = "rate")]
247                            exec.rate_counter.inc();
248                        }
249                        None => break,
250                    }
251
252                    if !rx.is_full() {
253                        idle_idxs.insert(i);
254                        if let Some(w) = pending_wakers.pop() {
255                            w.wake();
256                        }
257                    }
258
259                    if exec.is_flushing() && rx.is_empty() && !exec.is_active() {
260                        exec.flush_waker.wake();
261                    }
262                }
263            };
264
265            txs.push(tx);
266            rxs.push(rx_fut);
267        }
268
269        let tasks_bus = async move {
270            while let Some((_, task)) = task_rx.next().await {
271                loop {
272                    if idle_idxs.is_empty() {
273                        //sleep ...
274                        let w = Rc::new(AtomicWaker::new());
275                        pending_wakers.push(w.clone());
276                        LocalPendingOnce::new(w).await;
277                    } else if let Some(idx) = idle_idxs.pop() {
278                        //select ...
279                        if let Some(tx) = txs.get_mut(idx) {
280                            if let Err(_t) = tx.send(task).await {
281                                log::error!("send error ...");
282                                // task = t.into_inner();
283                            }
284                        }
285                        break;
286                    };
287                }
288            }
289        };
290
291        futures::future::join(tasks_bus, futures::future::join_all(rxs)).await;
292        log::info!("exit local task execution queue");
293    }
294}
295
296impl<Tx, G> LocalTaskExecQueue<Tx, G, ()>
297where
298    Tx: Clone + Sink<((), LocalTaskType)> + Unpin + 'static,
299    G: Hash + Eq + Clone + Debug + 'static,
300{
301    #[inline]
302    pub fn try_spawn<T>(&self, msg: T) -> TryLocalSpawner<'_, T, Tx, G, ()>
303    where
304        T: Future + 'static,
305        T::Output: 'static,
306    {
307        let fut = TryLocalSpawner::new(self, msg, ());
308        assert_future::<Result<(), _>, _>(fut)
309    }
310
311    #[inline]
312    pub fn spawn<T>(&self, msg: T) -> LocalSpawner<'_, T, Tx, G, ()>
313    where
314        T: Future + 'static,
315        T::Output: 'static,
316    {
317        let fut = LocalSpawner::new(self, msg, ());
318        assert_future::<Result<(), _>, _>(fut)
319    }
320
321    #[inline]
322    pub(crate) async fn group_send(
323        &self,
324        name: G,
325        task: LocalTaskType,
326    ) -> Result<(), Error<LocalTaskType>> {
327        let gt_queue = self
328            .group_channels
329            .entry(name.clone())
330            .or_insert_with(|| Rc::new(Mutex::new(GroupTaskExecQueue::new())))
331            .value()
332            .clone();
333
334        let exec = self.clone();
335        let group_channels = self.group_channels.clone();
336        let runner_task = {
337            let mut task_tx = gt_queue.lock();
338            if task_tx.is_running() {
339                task_tx.push(task);
340                drop(task_tx);
341                drop(gt_queue);
342                None
343            } else {
344                task_tx.set_running(true);
345                drop(task_tx);
346                let task_rx = gt_queue; //.clone();
347                let runner_task = async move {
348                    exec.active_count.inc();
349                    task.await;
350                    exec.active_count.dec();
351                    loop {
352                        let task: Option<LocalTaskType> = task_rx.lock().pop();
353                        if let Some(task) = task {
354                            exec.active_count.inc();
355                            task.await;
356                            exec.active_count.dec();
357                            #[cfg(feature = "rate")]
358                            exec.rate_counter.inc();
359                        } else {
360                            group_channels.remove(&name);
361                            break;
362                        }
363                    }
364                };
365                Some(runner_task)
366            }
367        };
368
369        if let Some(runner_task) = runner_task {
370            if (self
371                .tx
372                .clone()
373                .send(((), Box::new(Box::pin(runner_task))))
374                .await)
375                .is_err()
376            {
377                Err(Error::SendError(ErrorType::Closed(None)))
378            } else {
379                Ok(())
380            }
381        } else {
382            Ok(())
383        }
384    }
385}
386
387#[derive(Clone)]
388struct OneValue(Rc<RwLock<Option<LocalTaskType>>>);
389
390impl OneValue {
391    #[inline]
392    fn new() -> Self {
393        Self(Rc::new(RwLock::new(None)))
394    }
395
396    #[inline]
397    fn set(&self, val: LocalTaskType) -> Option<LocalTaskType> {
398        self.0.write().replace(val)
399    }
400
401    #[inline]
402    fn take(&self) -> Option<LocalTaskType> {
403        self.0.write().take()
404    }
405
406    #[inline]
407    fn is_full(&self) -> bool {
408        self.0.read().is_some()
409    }
410
411    #[inline]
412    fn len(&self) -> usize {
413        if self.0.read().is_some() {
414            1
415        } else {
416            0
417        }
418    }
419
420    #[inline]
421    fn is_empty(&self) -> bool {
422        self.0.read().is_none()
423    }
424}
425
426pub(crate) struct LocalPendingOnce {
427    w: Rc<AtomicWaker>,
428    is_ready: bool,
429}
430
431impl LocalPendingOnce {
432    #[inline]
433    pub(crate) fn new(w: Rc<AtomicWaker>) -> Self {
434        Self { w, is_ready: false }
435    }
436}
437
438impl Future for LocalPendingOnce {
439    type Output = ();
440    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
441        if self.is_ready {
442            Poll::Ready(())
443        } else {
444            self.w.register(cx.waker());
445            self.is_ready = true;
446            Poll::Pending
447        }
448    }
449}
450
451type Wakers = Rc<crossbeam_queue::SegQueue<Rc<AtomicWaker>>>;
452
453#[inline]
454fn new_wakers() -> Wakers {
455    Rc::new(crossbeam_queue::SegQueue::new())
456}