futures_util/sink/
close.rs

1use futures_core::{Poll, Async, Future};
2use futures_core::task;
3use futures_sink::Sink;
4
5/// Future for the `close` combinator, which polls the sink until all data has
6/// been closed.
7#[derive(Debug)]
8#[must_use = "futures do nothing unless polled"]
9pub struct Close<S> {
10    sink: Option<S>,
11}
12
13/// A future that completes when the sink has finished closing.
14///
15/// The sink itself is returned after closeing is complete.
16pub fn new<S: Sink>(sink: S) -> Close<S> {
17    Close { sink: Some(sink) }
18}
19
20impl<S: Sink> Close<S> {
21    /// Get a shared reference to the inner sink.
22    /// Returns `None` if the sink has already been closed.
23    pub fn get_ref(&self) -> Option<&S> {
24        self.sink.as_ref()
25    }
26
27    /// Get a mutable reference to the inner sink.
28    /// Returns `None` if the sink has already been closed.
29    pub fn get_mut(&mut self) -> Option<&mut S> {
30        self.sink.as_mut()
31    }
32
33    /// Consume the `Close` and return the inner sink.
34    /// Returns `None` if the sink has already been closed.
35    pub fn into_inner(self) -> Option<S> {
36        self.sink
37    }
38}
39
40impl<S: Sink> Future for Close<S> {
41    type Item = S;
42    type Error = S::SinkError;
43
44    fn poll(&mut self, cx: &mut task::Context) -> Poll<S, S::SinkError> {
45        let mut sink = self.sink.take().expect("Attempted to poll Close after it completed");
46        if sink.poll_close(cx)?.is_ready() {
47            Ok(Async::Ready(sink))
48        } else {
49            self.sink = Some(sink);
50            Ok(Async::Pending)
51        }
52    }
53}