1use std::fmt::Debug;
2use std::future::Future;
3use std::hash::Hash;
4use std::marker::Unpin;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicBool, Ordering};
7use std::sync::Arc;
8use std::task::{Context, Poll};
9
10use futures::channel::mpsc;
11use futures::task::AtomicWaker;
12use futures::{Sink, SinkExt, Stream, StreamExt};
13use parking_lot::Mutex;
14use parking_lot::RwLock;
15#[cfg(feature = "rate")]
16use rate_counter::Counter as RateCounter;
17
18use queue_ext::{Action, QueueExt, Reply};
19
20use super::{
21 assert_future, close::Close, flush::Flush, Counter, Error, ErrorType, GroupTaskExecQueue,
22 IndexSet, Spawner, TrySpawner,
23};
24
25type DashMap<K, V> = dashmap::DashMap<K, V, ahash::RandomState>;
26type GroupChannels<G> = Arc<DashMap<G, Arc<Mutex<GroupTaskExecQueue<TaskType>>>>>;
27
28pub type TaskType = Box<dyn std::future::Future<Output = ()> + Send + 'static + Unpin>;
29
30pub struct TaskExecQueue<Tx = mpsc::Sender<((), TaskType)>, G = (), D = ()> {
31 pub(crate) tx: Tx,
32 workers: usize,
33 queue_max: isize,
34 active_count: Counter,
35 pub(crate) waiting_count: Counter,
36 #[cfg(feature = "rate")]
37 rate_counter: RateCounter,
38 pub(crate) flush_waker: Arc<AtomicWaker>,
39 pub(crate) is_flushing: Arc<AtomicBool>,
40 is_closed: Arc<AtomicBool>,
41 pending_wakers: Wakers,
42 pub(crate) waiting_wakers: Wakers,
43 group_channels: GroupChannels<G>,
45 _d: std::marker::PhantomData<D>,
46}
47
48impl<Tx, G, D> Clone for TaskExecQueue<Tx, G, D>
49where
50 Tx: Clone,
51{
52 #[inline]
53 fn clone(&self) -> Self {
54 Self {
55 tx: self.tx.clone(),
56 workers: self.workers,
57 queue_max: self.queue_max,
58 active_count: self.active_count.clone(),
59 waiting_count: self.waiting_count.clone(),
60 #[cfg(feature = "rate")]
61 rate_counter: self.rate_counter.clone(),
62 flush_waker: self.flush_waker.clone(),
63 is_flushing: self.is_flushing.clone(),
64 is_closed: self.is_closed.clone(),
65 pending_wakers: self.pending_wakers.clone(),
66 waiting_wakers: self.waiting_wakers.clone(),
67 group_channels: self.group_channels.clone(),
68 _d: std::marker::PhantomData,
69 }
70 }
71}
72
73impl<Tx, G, D> TaskExecQueue<Tx, G, D>
74where
75 Tx: Clone + Sink<(D, TaskType)> + Unpin + Send + Sync + 'static,
76 G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
77{
78 #[inline]
79 pub(crate) fn with_channel<Rx>(
80 workers: usize,
81 queue_max: usize,
82 tx: Tx,
83 rx: Rx,
84 ) -> (Self, impl Future<Output = ()>)
85 where
86 Rx: Stream<Item = (D, TaskType)> + Unpin,
87 {
88 let exec = Self {
89 tx,
90 workers,
91 queue_max: queue_max as isize,
92 active_count: Counter::new(),
93 waiting_count: Counter::new(),
94 #[cfg(feature = "rate")]
95 rate_counter: RateCounter::new(std::time::Duration::from_secs(3)),
96 flush_waker: Arc::new(AtomicWaker::new()),
97 is_flushing: Arc::new(AtomicBool::new(false)),
98 is_closed: Arc::new(AtomicBool::new(false)),
99 pending_wakers: new_wakers(),
100 waiting_wakers: new_wakers(),
101 group_channels: Arc::new(DashMap::default()),
102 _d: std::marker::PhantomData,
103 };
104 let runner = exec.clone().run(rx);
105 (exec, runner)
106 }
107
108 #[inline]
109 pub fn try_spawn_with<T>(&self, msg: T, name: D) -> TrySpawner<'_, T, Tx, G, D>
110 where
111 D: Clone,
112 T: Future + Send + 'static,
113 T::Output: Send + 'static,
114 {
115 let fut = TrySpawner::new(self, msg, name);
116 assert_future::<Result<(), _>, _>(fut)
117 }
118
119 #[inline]
120 pub fn spawn_with<T>(&self, msg: T, name: D) -> Spawner<'_, T, Tx, G, D>
121 where
122 D: Clone,
123 T: Future + Send + 'static,
124 T::Output: Send + 'static,
125 {
126 let fut = Spawner::new(self, msg, name);
127 assert_future::<Result<(), _>, _>(fut)
128 }
129
130 #[inline]
131 pub fn flush(&self) -> Flush<'_, Tx, G, D> {
132 self.is_flushing.store(true, Ordering::SeqCst);
133 Flush::new(self)
134 }
135
136 #[inline]
137 pub fn close(&self) -> Close<'_, Tx, G, D> {
138 self.is_flushing.store(true, Ordering::SeqCst);
139 self.is_closed.store(true, Ordering::SeqCst);
140 Close::new(self)
141 }
142
143 #[inline]
144 pub fn workers(&self) -> usize {
145 self.workers
146 }
147
148 #[inline]
149 pub fn active_count(&self) -> isize {
150 self.active_count.value()
151 }
152
153 #[inline]
154 pub fn waiting_count(&self) -> isize {
155 self.waiting_count.value()
156 }
157
158 #[inline]
159 #[cfg(feature = "rate")]
160 pub async fn completed_count(&self) -> isize {
161 self.rate_counter.total()
162 }
163
164 #[inline]
165 pub fn pending_wakers_count(&self) -> usize {
166 self.pending_wakers.len()
167 }
168
169 #[inline]
170 pub fn waiting_wakers_count(&self) -> usize {
171 self.waiting_wakers.len()
172 }
173
174 #[inline]
175 #[cfg(feature = "rate")]
176 pub async fn rate(&self) -> f64 {
177 self.rate_counter.rate()
178 }
179
180 #[inline]
181 pub fn is_full(&self) -> bool {
182 self.waiting_count() >= self.queue_max
183 }
184
185 #[inline]
186 pub fn is_active(&self) -> bool {
187 self.waiting_count() > 0
188 || self.active_count() > 0
189 || self.pending_wakers_count() > 0
190 || self.waiting_wakers_count() > 0
191 }
192
193 #[inline]
194 pub fn is_closed(&self) -> bool {
195 self.is_closed.load(Ordering::SeqCst)
196 }
197
198 #[inline]
199 pub fn is_flushing(&self) -> bool {
200 self.is_flushing.load(Ordering::SeqCst)
201 }
202
203 async fn run<Rx>(self, mut task_rx: Rx)
204 where
205 Rx: Stream<Item = (D, TaskType)> + Unpin,
206 {
207 let exec = self;
208 let pending_wakers = exec.pending_wakers.clone();
209
210 let channel = || {
211 let rx = OneValue::new().queue_stream(|s, _| match s.take() {
212 None => Poll::Pending,
213 Some(m) => Poll::Ready(Some(m)),
214 });
215
216 let tx = rx.clone().queue_sender(|s, action| match action {
217 Action::Send(item) => Reply::Send(s.set(item)),
218 Action::IsFull => Reply::IsFull(s.is_full()),
219 Action::IsEmpty => Reply::IsEmpty(s.is_empty()),
220 Action::Len => Reply::Len(s.len()),
221 });
222
223 (tx, rx)
224 };
225
226 let idle_idxs = IndexSet::new();
227 let mut txs = Vec::new();
228 let mut rxs = Vec::new();
229 for i in 0..exec.workers {
230 let (tx, mut rx) = channel();
231 let pending_wakers = pending_wakers.clone();
232 let idle_idxs = idle_idxs.clone();
233 idle_idxs.insert(i);
234 let exec = exec.clone();
235 let rx_fut = async move {
236 loop {
237 match rx.next().await {
238 Some(task) => {
239 exec.active_count.inc();
240 task.await;
241 exec.active_count.dec();
242 #[cfg(feature = "rate")]
243 exec.rate_counter.inc();
244 }
245 None => break,
246 }
247
248 if !rx.is_full() {
249 idle_idxs.insert(i);
250 if let Some(w) = pending_wakers.pop() {
251 w.wake();
252 }
253 }
254
255 if exec.is_flushing() && rx.is_empty() && !exec.is_active() {
256 exec.flush_waker.wake();
257 }
258 }
259 };
260
261 txs.push(tx);
262 rxs.push(rx_fut);
263 }
264
265 let tasks_bus = async move {
266 while let Some((_, task)) = task_rx.next().await {
267 loop {
268 if idle_idxs.is_empty() {
269 let w = Arc::new(AtomicWaker::new());
271 pending_wakers.push(w.clone());
272 PendingOnce::new(w).await;
273 } else if let Some(idx) = idle_idxs.pop() {
274 if let Some(tx) = txs.get_mut(idx) {
276 if let Err(_t) = tx.send(task).await {
277 log::error!("send error ...");
278 }
280 }
281 break;
282 };
283 }
284 }
285 };
286
287 futures::future::join(tasks_bus, futures::future::join_all(rxs)).await;
288 log::info!("exit task execution queue");
289 }
290}
291
292impl<Tx, G> TaskExecQueue<Tx, G, ()>
293where
294 Tx: Clone + Sink<((), TaskType)> + Unpin + Send + Sync + 'static,
295 G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
296{
297 #[inline]
298 pub fn try_spawn<T>(&self, task: T) -> TrySpawner<'_, T, Tx, G, ()>
299 where
300 T: Future + Send + 'static,
301 T::Output: Send + 'static,
302 {
303 let fut = TrySpawner::new(self, task, ());
304 assert_future::<Result<(), _>, _>(fut)
305 }
306
307 #[inline]
308 pub fn spawn<T>(&self, task: T) -> Spawner<'_, T, Tx, G, ()>
309 where
310 T: Future + Send + 'static,
311 T::Output: Send + 'static,
312 {
313 let fut = Spawner::new(self, task, ());
314 assert_future::<Result<(), _>, _>(fut)
315 }
316
317 #[inline]
318 pub(crate) async fn group_send(&self, name: G, task: TaskType) -> Result<(), Error<TaskType>> {
319 let gt_queue = self
320 .group_channels
321 .entry(name.clone())
322 .or_insert_with(|| Arc::new(Mutex::new(GroupTaskExecQueue::new())))
323 .value()
324 .clone();
325
326 let exec = self.clone();
327 let group_channels = self.group_channels.clone();
328 let runner_task = {
329 let mut task_tx = gt_queue.lock();
330 if task_tx.is_running() {
331 task_tx.push(task);
332 drop(task_tx);
333 drop(gt_queue);
334 None
335 } else {
336 task_tx.set_running(true);
337 drop(task_tx);
338 let task_rx = gt_queue; let runner_task = async move {
340 exec.active_count.inc();
341 task.await;
342 exec.active_count.dec();
343 loop {
344 let task: Option<TaskType> = task_rx.lock().pop();
345 if let Some(task) = task {
346 exec.active_count.inc();
347 task.await;
348 exec.active_count.dec();
349 #[cfg(feature = "rate")]
350 exec.rate_counter.inc();
351 } else {
352 group_channels.remove(&name);
353 break;
354 }
355 }
356 };
357 Some(runner_task)
358 }
359 };
360
361 if let Some(runner_task) = runner_task {
362 if (self
363 .tx
364 .clone()
365 .send(((), Box::new(Box::pin(runner_task))))
366 .await)
367 .is_err()
368 {
369 Err(Error::SendError(ErrorType::Closed(None)))
370 } else {
371 Ok(())
372 }
373 } else {
374 Ok(())
375 }
376 }
377}
378
379#[derive(Clone)]
380struct OneValue(Arc<RwLock<Option<TaskType>>>);
381
382impl OneValue {
383 #[inline]
384 fn new() -> Self {
385 #[allow(clippy::arc_with_non_send_sync)]
386 Self(Arc::new(RwLock::new(None::<TaskType>)))
387 }
388
389 #[inline]
390 fn set(&self, val: TaskType) -> Option<TaskType> {
391 self.0.write().replace(val)
392 }
393
394 #[inline]
395 fn take(&self) -> Option<TaskType> {
396 self.0.write().take()
397 }
398
399 #[inline]
400 fn is_full(&self) -> bool {
401 self.0.read().is_some()
402 }
403
404 #[inline]
405 fn len(&self) -> usize {
406 if self.0.read().is_some() {
407 1
408 } else {
409 0
410 }
411 }
412
413 #[inline]
414 fn is_empty(&self) -> bool {
415 self.0.read().is_none()
416 }
417}
418
419pub(crate) struct PendingOnce {
420 w: Arc<AtomicWaker>,
421 is_ready: bool,
422}
423
424impl PendingOnce {
425 #[inline]
426 pub(crate) fn new(w: Arc<AtomicWaker>) -> Self {
427 Self { w, is_ready: false }
428 }
429}
430
431impl Future for PendingOnce {
432 type Output = ();
433 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
434 if self.is_ready {
435 Poll::Ready(())
436 } else {
437 self.w.register(cx.waker());
438 self.is_ready = true;
439 Poll::Pending
440 }
441 }
442}
443
444type Wakers = Arc<crossbeam_queue::SegQueue<Arc<AtomicWaker>>>;
445
446#[inline]
447fn new_wakers() -> Wakers {
448 Arc::new(crossbeam_queue::SegQueue::new())
449}