1use std::fmt::Debug;
2use std::future::Future;
3use std::hash::Hash;
4use std::marker::Unpin;
5use std::pin::Pin;
6use std::rc::Rc;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::task::{Context, Poll};
9
10use futures::channel::mpsc;
11use futures::task::AtomicWaker;
12use futures::{Sink, SinkExt, Stream, StreamExt};
13use parking_lot::Mutex;
14use parking_lot::RwLock;
15#[cfg(feature = "rate")]
16use rate_counter::LocalCounter as RateCounter;
17
18use crate::local_builder::LocalSender;
19use queue_ext::{Action, QueueExt, Reply};
20
21use super::{
22 assert_future, close::LocalClose, flush::LocalFlush, Counter, Error, ErrorType,
23 GroupTaskExecQueue, IndexSet, LocalSpawner, TryLocalSpawner,
24};
25
26type DashMap<K, V> = dashmap::DashMap<K, V, ahash::RandomState>;
27type GroupChannels<G> = Rc<DashMap<G, Rc<Mutex<GroupTaskExecQueue<LocalTaskType>>>>>;
28
29pub type LocalTaskType = Box<dyn std::future::Future<Output = ()> + 'static + Unpin>;
30
31pub struct LocalTaskExecQueue<Tx = LocalSender<(), mpsc::SendError>, G = (), D = ()> {
32 pub(crate) tx: Tx,
33 workers: usize,
34 queue_max: isize,
35 active_count: Counter,
36 pub(crate) waiting_count: Counter,
37 completed_count: Counter,
38 #[cfg(feature = "rate")]
39 rate_counter: RateCounter,
40 pub(crate) flush_waker: Rc<AtomicWaker>,
41 pub(crate) is_flushing: Rc<AtomicBool>,
42 is_closed: Rc<AtomicBool>,
43 pending_wakers: Wakers,
44 pub(crate) waiting_wakers: Wakers,
45 group_channels: GroupChannels<G>,
47 _d: std::marker::PhantomData<D>,
48}
49
50impl<Tx, G, D> Clone for LocalTaskExecQueue<Tx, G, D>
51where
52 Tx: Clone,
53{
54 #[inline]
55 fn clone(&self) -> Self {
56 Self {
57 tx: self.tx.clone(),
58 workers: self.workers,
59 queue_max: self.queue_max,
60 active_count: self.active_count.clone(),
61 waiting_count: self.waiting_count.clone(),
62 completed_count: self.completed_count.clone(),
63 #[cfg(feature = "rate")]
64 rate_counter: self.rate_counter.clone(),
65 flush_waker: self.flush_waker.clone(),
66 is_flushing: self.is_flushing.clone(),
67 is_closed: self.is_closed.clone(),
68 pending_wakers: self.pending_wakers.clone(),
69 waiting_wakers: self.waiting_wakers.clone(),
70 group_channels: self.group_channels.clone(),
71 _d: std::marker::PhantomData,
72 }
73 }
74}
75
76impl<Tx, G, D> LocalTaskExecQueue<Tx, G, D>
77where
78 Tx: Clone + Sink<(D, LocalTaskType)> + Unpin + 'static,
79 G: Hash + Eq + Clone + Debug + 'static,
80{
81 #[inline]
82 pub(crate) fn with_channel<Rx>(
83 workers: usize,
84 queue_max: usize,
85 tx: Tx,
86 rx: Rx,
87 ) -> (Self, impl Future<Output = ()>)
88 where
89 Rx: Stream<Item = (D, LocalTaskType)> + Unpin,
90 {
91 let exec = Self {
92 tx,
93 workers,
94 queue_max: queue_max as isize,
95 active_count: Counter::new(),
96 waiting_count: Counter::new(),
97 completed_count: Counter::new(),
98 #[cfg(feature = "rate")]
99 rate_counter: RateCounter::new(std::time::Duration::from_secs(3)),
100 flush_waker: Rc::new(AtomicWaker::new()),
101 is_flushing: Rc::new(AtomicBool::new(false)),
102 is_closed: Rc::new(AtomicBool::new(false)),
103 pending_wakers: new_wakers(),
104 waiting_wakers: new_wakers(),
105 group_channels: Rc::new(DashMap::default()),
106 _d: std::marker::PhantomData,
107 };
108 let runner = exec.clone().run(rx);
109 (exec, runner)
110 }
111
112 #[inline]
113 pub fn try_spawn_with<T>(&self, msg: T, name: D) -> TryLocalSpawner<'_, T, Tx, G, D>
114 where
115 D: Clone,
116 T: Future + 'static,
117 T::Output: 'static,
118 {
119 let fut = TryLocalSpawner::new(self, msg, name);
120 assert_future::<Result<(), _>, _>(fut)
121 }
122
123 #[inline]
124 pub fn spawn_with<T>(&self, msg: T, name: D) -> LocalSpawner<'_, T, Tx, G, D>
125 where
126 D: Clone,
127 T: Future + 'static,
128 T::Output: 'static,
129 {
130 let fut = LocalSpawner::new(self, msg, name);
131 assert_future::<Result<(), _>, _>(fut)
132 }
133
134 #[inline]
135 pub fn flush(&self) -> LocalFlush<'_, Tx, G, D> {
136 self.is_flushing.store(true, Ordering::SeqCst);
137 LocalFlush::new(self)
138 }
139
140 #[inline]
141 pub fn close(&self) -> LocalClose<'_, Tx, G, D> {
142 self.is_flushing.store(true, Ordering::SeqCst);
143 self.is_closed.store(true, Ordering::SeqCst);
144 LocalClose::new(self)
145 }
146
147 #[inline]
148 pub fn workers(&self) -> usize {
149 self.workers
150 }
151
152 #[inline]
153 pub fn active_count(&self) -> isize {
154 self.active_count.value()
155 }
156
157 #[inline]
158 pub fn waiting_count(&self) -> isize {
159 self.waiting_count.value()
160 }
161
162 #[inline]
163 #[cfg(feature = "rate")]
164 pub async fn completed_count(&self) -> isize {
165 self.rate_counter.total()
166 }
167
168 #[inline]
169 pub fn pending_wakers_count(&self) -> usize {
170 self.pending_wakers.len()
171 }
172
173 #[inline]
174 pub fn waiting_wakers_count(&self) -> usize {
175 self.waiting_wakers.len()
176 }
177
178 #[inline]
179 #[cfg(feature = "rate")]
180 pub async fn rate(&self) -> f64 {
181 self.rate_counter.rate()
182 }
183
184 #[inline]
185 pub fn is_full(&self) -> bool {
186 self.waiting_count() >= self.queue_max
187 }
188
189 #[inline]
190 pub fn is_active(&self) -> bool {
191 self.waiting_count() > 0
192 || self.active_count() > 0
193 || self.pending_wakers_count() > 0
194 || self.waiting_wakers_count() > 0
195 }
196
197 #[inline]
198 pub fn is_closed(&self) -> bool {
199 self.is_closed.load(Ordering::SeqCst)
200 }
201
202 #[inline]
203 pub fn is_flushing(&self) -> bool {
204 self.is_flushing.load(Ordering::SeqCst)
205 }
206
207 async fn run<Rx>(self, mut task_rx: Rx)
208 where
209 Rx: Stream<Item = (D, LocalTaskType)> + Unpin,
210 {
211 let exec = self;
212 let pending_wakers = exec.pending_wakers.clone();
213
214 let channel = || {
215 let rx = OneValue::new().queue_stream(|s, _| match s.take() {
216 None => Poll::Pending,
217 Some(m) => Poll::Ready(Some(m)),
218 });
219
220 let tx = rx.clone().queue_sender(|s, action| match action {
221 Action::Send(item) => Reply::Send(s.set(item)),
222 Action::IsFull => Reply::IsFull(s.is_full()),
223 Action::IsEmpty => Reply::IsEmpty(s.is_empty()),
224 Action::Len => Reply::Len(s.len()),
225 });
226
227 (tx, rx)
228 };
229
230 let idle_idxs = IndexSet::new();
231 let mut txs = Vec::new();
232 let mut rxs = Vec::new();
233 for i in 0..exec.workers {
234 let (tx, mut rx) = channel();
235 let pending_wakers = pending_wakers.clone();
236 let idle_idxs = idle_idxs.clone();
237 idle_idxs.insert(i);
238 let exec = exec.clone();
239 let rx_fut = async move {
240 loop {
241 match rx.next().await {
242 Some(task) => {
243 exec.active_count.inc();
244 task.await;
245 exec.active_count.dec();
246 #[cfg(feature = "rate")]
247 exec.rate_counter.inc();
248 }
249 None => break,
250 }
251
252 if !rx.is_full() {
253 idle_idxs.insert(i);
254 if let Some(w) = pending_wakers.pop() {
255 w.wake();
256 }
257 }
258
259 if exec.is_flushing() && rx.is_empty() && !exec.is_active() {
260 exec.flush_waker.wake();
261 }
262 }
263 };
264
265 txs.push(tx);
266 rxs.push(rx_fut);
267 }
268
269 let tasks_bus = async move {
270 while let Some((_, task)) = task_rx.next().await {
271 loop {
272 if idle_idxs.is_empty() {
273 let w = Rc::new(AtomicWaker::new());
275 pending_wakers.push(w.clone());
276 LocalPendingOnce::new(w).await;
277 } else if let Some(idx) = idle_idxs.pop() {
278 if let Some(tx) = txs.get_mut(idx) {
280 if let Err(_t) = tx.send(task).await {
281 log::error!("send error ...");
282 }
284 }
285 break;
286 };
287 }
288 }
289 };
290
291 futures::future::join(tasks_bus, futures::future::join_all(rxs)).await;
292 log::info!("exit local task execution queue");
293 }
294}
295
296impl<Tx, G> LocalTaskExecQueue<Tx, G, ()>
297where
298 Tx: Clone + Sink<((), LocalTaskType)> + Unpin + 'static,
299 G: Hash + Eq + Clone + Debug + 'static,
300{
301 #[inline]
302 pub fn try_spawn<T>(&self, msg: T) -> TryLocalSpawner<'_, T, Tx, G, ()>
303 where
304 T: Future + 'static,
305 T::Output: 'static,
306 {
307 let fut = TryLocalSpawner::new(self, msg, ());
308 assert_future::<Result<(), _>, _>(fut)
309 }
310
311 #[inline]
312 pub fn spawn<T>(&self, msg: T) -> LocalSpawner<'_, T, Tx, G, ()>
313 where
314 T: Future + 'static,
315 T::Output: 'static,
316 {
317 let fut = LocalSpawner::new(self, msg, ());
318 assert_future::<Result<(), _>, _>(fut)
319 }
320
321 #[inline]
322 pub(crate) async fn group_send(
323 &self,
324 name: G,
325 task: LocalTaskType,
326 ) -> Result<(), Error<LocalTaskType>> {
327 let gt_queue = self
328 .group_channels
329 .entry(name.clone())
330 .or_insert_with(|| Rc::new(Mutex::new(GroupTaskExecQueue::new())))
331 .value()
332 .clone();
333
334 let exec = self.clone();
335 let group_channels = self.group_channels.clone();
336 let runner_task = {
337 let mut task_tx = gt_queue.lock();
338 if task_tx.is_running() {
339 task_tx.push(task);
340 drop(task_tx);
341 drop(gt_queue);
342 None
343 } else {
344 task_tx.set_running(true);
345 drop(task_tx);
346 let task_rx = gt_queue; let runner_task = async move {
348 exec.active_count.inc();
349 task.await;
350 exec.active_count.dec();
351 loop {
352 let task: Option<LocalTaskType> = task_rx.lock().pop();
353 if let Some(task) = task {
354 exec.active_count.inc();
355 task.await;
356 exec.active_count.dec();
357 #[cfg(feature = "rate")]
358 exec.rate_counter.inc();
359 } else {
360 group_channels.remove(&name);
361 break;
362 }
363 }
364 };
365 Some(runner_task)
366 }
367 };
368
369 if let Some(runner_task) = runner_task {
370 if (self
371 .tx
372 .clone()
373 .send(((), Box::new(Box::pin(runner_task))))
374 .await)
375 .is_err()
376 {
377 Err(Error::SendError(ErrorType::Closed(None)))
378 } else {
379 Ok(())
380 }
381 } else {
382 Ok(())
383 }
384 }
385}
386
387#[derive(Clone)]
388struct OneValue(Rc<RwLock<Option<LocalTaskType>>>);
389
390impl OneValue {
391 #[inline]
392 fn new() -> Self {
393 Self(Rc::new(RwLock::new(None)))
394 }
395
396 #[inline]
397 fn set(&self, val: LocalTaskType) -> Option<LocalTaskType> {
398 self.0.write().replace(val)
399 }
400
401 #[inline]
402 fn take(&self) -> Option<LocalTaskType> {
403 self.0.write().take()
404 }
405
406 #[inline]
407 fn is_full(&self) -> bool {
408 self.0.read().is_some()
409 }
410
411 #[inline]
412 fn len(&self) -> usize {
413 if self.0.read().is_some() {
414 1
415 } else {
416 0
417 }
418 }
419
420 #[inline]
421 fn is_empty(&self) -> bool {
422 self.0.read().is_none()
423 }
424}
425
426pub(crate) struct LocalPendingOnce {
427 w: Rc<AtomicWaker>,
428 is_ready: bool,
429}
430
431impl LocalPendingOnce {
432 #[inline]
433 pub(crate) fn new(w: Rc<AtomicWaker>) -> Self {
434 Self { w, is_ready: false }
435 }
436}
437
438impl Future for LocalPendingOnce {
439 type Output = ();
440 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
441 if self.is_ready {
442 Poll::Ready(())
443 } else {
444 self.w.register(cx.waker());
445 self.is_ready = true;
446 Poll::Pending
447 }
448 }
449}
450
451type Wakers = Rc<crossbeam_queue::SegQueue<Rc<AtomicWaker>>>;
452
453#[inline]
454fn new_wakers() -> Wakers {
455 Rc::new(crossbeam_queue::SegQueue::new())
456}