1use std::fmt::Debug;
2use std::future::Future;
3use std::hash::Hash;
4use std::marker::Unpin;
5use std::pin::Pin;
6use std::sync::atomic::Ordering;
7use std::task::{Context, Poll};
8
9use futures::Sink;
10
11use super::{LocalTaskExecQueue, LocalTaskType, TaskExecQueue, TaskType};
12
13pub struct Close<'a, Tx, G, D> {
14 sink: &'a TaskExecQueue<Tx, G, D>,
15}
16
17impl<'a, Tx, G, D> Unpin for Close<'a, Tx, G, D> {}
18
19impl<'a, Tx, G, D> Close<'a, Tx, G, D> {
20 pub(crate) fn new(sink: &'a TaskExecQueue<Tx, G, D>) -> Self {
21 Self { sink }
22 }
23}
24
25impl<'a, Tx, G, D> Future for Close<'a, Tx, G, D>
26where
27 Tx: Clone + Sink<(D, TaskType)> + Unpin + Send + Sync + 'static,
28 G: Hash + Eq + Clone + Debug + Send + Sync + 'static,
29{
30 type Output = Result<(), Tx::Error>;
31
32 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
33 let this = self.get_mut();
34 futures::ready!(Pin::new(&mut this.sink.tx.clone()).poll_close(cx))?;
35 if this.sink.is_active() {
36 this.sink.flush_waker.register(cx.waker());
37 Poll::Pending
38 } else {
39 this.sink.is_flushing.store(false, Ordering::SeqCst);
40 Poll::Ready(Ok(()))
41 }
42 }
43}
44
45pub struct LocalClose<'a, Tx, G, D> {
46 sink: &'a LocalTaskExecQueue<Tx, G, D>,
47}
48
49impl<'a, Tx, G, D> Unpin for LocalClose<'a, Tx, G, D> {}
50
51impl<'a, Tx, G, D> LocalClose<'a, Tx, G, D> {
52 pub(crate) fn new(sink: &'a LocalTaskExecQueue<Tx, G, D>) -> Self {
53 Self { sink }
54 }
55}
56
57impl<'a, Tx, G, D> Future for LocalClose<'a, Tx, G, D>
58where
59 Tx: Clone + Sink<(D, LocalTaskType)> + Unpin + 'static,
60 G: Hash + Eq + Clone + Debug + 'static,
61{
62 type Output = Result<(), Tx::Error>;
63
64 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
65 let this = self.get_mut();
66 futures::ready!(Pin::new(&mut this.sink.tx.clone()).poll_close(cx))?;
67 if this.sink.is_active() {
68 this.sink.flush_waker.register(cx.waker());
69 Poll::Pending
70 } else {
71 this.sink.is_flushing.store(false, Ordering::SeqCst);
72 Poll::Ready(Ok(()))
73 }
74 }
75}