1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
//! Futures for working with Sinks.

use futures::{Sink, Poll, AsyncSink, Async, Future, Stream};
use futures::stream::Fuse;

/// Future which closes a sink.
pub struct Close<S: Sink> {
    sink: Option<S>,
}

impl<S: Sink> Close<S> {
    /// Create a new `Close` future.
    pub fn new(s: S) -> Close<S> {
        Close { sink: Some(s) }
    }

    /// Get a shared reference to the inner sink.
    pub fn get_ref(&self) -> &S {
        self.sink
            .as_ref()
            .take()
            .expect("Attempted Close::get_ref after completion")
    }

    /// Get a mutable reference to the inner sink.
    pub fn get_mut(&mut self) -> &mut S {
        self.sink
            .as_mut()
            .take()
            .expect("Attempted Close::get_mut after completion")
    }
}

impl<S: Sink> Future for Close<S> {
    type Item = S;
    type Error = S::SinkError;

    fn poll(&mut self) -> Poll<S, S::SinkError> {
        let mut s = self.sink
            .take()
            .expect("Attempted to poll Close after completion");

        match s.close() {
            Ok(Async::Ready(_)) => {
                return Ok(Async::Ready(s));
            }
            Ok(Async::NotReady) => {
                self.sink = Some(s);
                return Ok(Async::NotReady);
            }
            Err(e) => {
                return Err(e);
            }
        }
    }
}

// FIXME does not resolve after end of stream has been reached
// /// Future which sends all items from a Stream into a Sink. Unlike the tokio SendAll Future, this
// /// does not close the sink (it does flush though).
// pub struct SendAll<T, U: Stream> {
//     sink: Option<T>,
//     stream: Option<Fuse<U>>,
//     buffered: Option<U::Item>,
// }
//
// impl<T, U> SendAll<T, U>
//     where T: Sink,
//           U: Stream<Item = T::SinkItem>,
//           T::SinkError: From<U::Error>
// {
//     /// Create a new SendAll Future. Unlike the tokio SendAll Future, this does not close the sink
//     /// (it does flush though).
//     pub fn new(sink: T, stream: U) -> SendAll<T, U> {
//         SendAll {
//             sink: Some(sink),
//             stream: Some(stream.fuse()),
//             buffered: None,
//         }
//     }
//
//     fn sink_mut(&mut self) -> &mut T {
//         self.sink
//             .as_mut()
//             .take()
//             .expect("Attempted to poll SendAll after completion")
//     }
//
//     fn stream_mut(&mut self) -> &mut Fuse<U> {
//         self.stream
//             .as_mut()
//             .take()
//             .expect("Attempted to poll SendAll after completion")
//     }
//
//     fn take_result(&mut self) -> (T, U) {
//         let sink = self.sink
//             .take()
//             .expect("Attempted to poll SendAll after completion");
//         let fuse = self.stream
//             .take()
//             .expect("Attempted to poll SendAll after completion");
//         (sink, fuse.into_inner())
//     }
//
//     fn try_start_send(&mut self, item: U::Item) -> Poll<(), T::SinkError> {
//         debug_assert!(self.buffered.is_none());
//         if let AsyncSink::NotReady(item) = self.sink_mut().start_send(item)? {
//             self.buffered = Some(item);
//             return Ok(Async::NotReady);
//         }
//         Ok(Async::Ready(()))
//     }
// }
//
// impl<T, U> Future for SendAll<T, U>
//     where T: Sink,
//           U: Stream<Item = T::SinkItem>,
//           T::SinkError: From<U::Error>
// {
//     type Item = (T, U);
//     type Error = T::SinkError;
//
//     fn poll(&mut self) -> Poll<(T, U), T::SinkError> {
//         // If we've got an item buffered already, we need to write it to the
//         // sink before we can do anything else
//         if let Some(item) = self.buffered.take() {
//             try_ready!(self.try_start_send(item))
//         }
//
//         loop {
//             match self.stream_mut().poll()? {
//                 Async::Ready(Some(item)) => try_ready!(self.try_start_send(item)),
//                 Async::Ready(None) => {
//                     return Ok(Async::Ready(self.take_result()));
//                 }
//                 Async::NotReady => {
//                     try_ready!(self.sink_mut().poll_complete());
//                     return Ok(Async::NotReady);
//                 }
//             }
//         }
//     }
// }