task_exec_queue/
flush.rs

1use std::fmt::Debug;
2use std::future::Future;
3use std::hash::Hash;
4use std::marker::Unpin;
5use std::pin::Pin;
6use std::sync::atomic::Ordering;
7use std::task::{Context, Poll};
8
9use futures::Sink;
10
11use super::{LocalTaskExecQueue, LocalTaskType, TaskExecQueue, TaskType};
12
13pub struct Flush<'a, Tx, G, D> {
14    sink: &'a TaskExecQueue<Tx, G, D>,
15}
16
17impl<'a, Tx, G, D> Unpin for Flush<'a, Tx, G, D> {}
18
19impl<'a, Tx, G, D> Flush<'a, Tx, G, D> {
20    pub(crate) fn new(sink: &'a TaskExecQueue<Tx, G, D>) -> Self {
21        Self { sink }
22    }
23}
24
25impl<'a, Tx, G, D> Future for Flush<'a, Tx, G, D>
26where
27    Tx: Clone + Sink<(D, TaskType)> + Unpin + Send + Sync + 'static,
28    G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
29{
30    type Output = Result<(), Tx::Error>;
31
32    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
33        let this = self.get_mut();
34        futures::ready!(Pin::new(&mut this.sink.tx.clone()).poll_flush(cx))?;
35        if this.sink.is_active() {
36            this.sink.flush_waker.register(cx.waker());
37            Poll::Pending
38        } else {
39            this.sink.is_flushing.store(false, Ordering::SeqCst);
40            Poll::Ready(Ok(()))
41        }
42    }
43}
44
45pub struct LocalFlush<'a, Tx, G, D> {
46    sink: &'a LocalTaskExecQueue<Tx, G, D>,
47}
48
49impl<'a, Tx, G, D> Unpin for LocalFlush<'a, Tx, G, D> {}
50
51impl<'a, Tx, G, D> LocalFlush<'a, Tx, G, D> {
52    pub(crate) fn new(sink: &'a LocalTaskExecQueue<Tx, G, D>) -> Self {
53        Self { sink }
54    }
55}
56
57impl<'a, Tx, G, D> Future for LocalFlush<'a, Tx, G, D>
58where
59    Tx: Clone + Sink<(D, LocalTaskType)> + Unpin + 'static,
60    G: Hash + Eq + Clone + Debug + 'static,
61{
62    type Output = Result<(), Tx::Error>;
63
64    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
65        let this = self.get_mut();
66        futures::ready!(Pin::new(&mut this.sink.tx.clone()).poll_flush(cx))?;
67        if this.sink.is_active() {
68            this.sink.flush_waker.register(cx.waker());
69            Poll::Pending
70        } else {
71            this.sink.is_flushing.store(false, Ordering::SeqCst);
72            Poll::Ready(Ok(()))
73        }
74    }
75}