1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
//! Sinks
//!
//! This module contains a number of functions for working with `Sink`s,
//! including the `SinkExt` trait which adds methods to `Sink` types.

use futures_core::{Stream, IntoFuture};
use futures_sink::Sink;
use super::future::Either;

mod close;
mod fanout;
mod flush;
mod err_into;
mod map_err;
mod send;
mod send_all;
mod with;
mod with_flat_map;

if_std! {
    mod buffer;
    pub use self::buffer::Buffer;
}

pub use self::close::Close;
pub use self::fanout::Fanout;
pub use self::flush::Flush;
pub use self::err_into::SinkErrInto;
pub use self::map_err::SinkMapErr;
pub use self::send::Send;
pub use self::send_all::SendAll;
pub use self::with::With;
pub use self::with_flat_map::WithFlatMap;

impl<T: ?Sized> SinkExt for T where T: Sink {}

/// An extension trait for `Sink`s that provides a variety of convenient
/// combinator functions.
pub trait SinkExt: Sink {
    /// Composes a function *in front of* the sink.
    ///
    /// This adapter produces a new sink that passes each value through the
    /// given function `f` before sending it to `self`.
    ///
    /// To process each value, `f` produces a *future*, which is then polled to
    /// completion before passing its result down to the underlying sink. If the
    /// future produces an error, that error is returned by the new sink.
    ///
    /// Note that this function consumes the given sink, returning a wrapped
    /// version, much like `Iterator::map`.
    fn with<U, Fut, F>(self, f: F) -> With<Self, U, Fut, F>
        where F: FnMut(U) -> Fut,
              Fut: IntoFuture<Item = Self::SinkItem>,
              Fut::Error: From<Self::SinkError>,
              Self: Sized
    {
        with::new(self, f)
    }

    /// Composes a function *in front of* the sink.
    ///
    /// This adapter produces a new sink that passes each value through the
    /// given function `f` before sending it to `self`.
    ///
    /// To process each value, `f` produces a *stream*, of which each value
    /// is passed to the underlying sink. A new value will not be accepted until
    /// the stream has been drained
    ///
    /// Note that this function consumes the given sink, returning a wrapped
    /// version, much like `Iterator::flat_map`.
    ///
    /// # Examples
    /// ---
    /// Using this function with an iterator through use of the `stream::iter_ok()`
    /// function
    ///
    /// ```
    /// # extern crate futures;
    /// # extern crate futures_channel;
    /// # extern crate futures_executor;
    /// use futures::prelude::*;
    /// use futures::stream;
    /// use futures_channel::mpsc;
    /// use futures_executor::block_on;
    ///
    /// # fn main() {
    /// let (tx, rx) = mpsc::channel::<i32>(5);
    ///
    /// let tx = tx.with_flat_map(|x| {
    ///     stream::iter_ok(vec![42; x].into_iter().map(|y| y))
    /// });
    ///
    /// block_on(tx.send(5)).unwrap();
    /// assert_eq!(block_on(rx.collect()), Ok(vec![42, 42, 42, 42, 42]));
    /// # }
    /// ```
    fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, U, St, F>
        where F: FnMut(U) -> St,
              St: Stream<Item = Self::SinkItem, Error=Self::SinkError>,
              Self: Sized
        {
            with_flat_map::new(self, f)
        }

    /*
    fn with_map<U, F>(self, f: F) -> WithMap<Self, U, F>
        where F: FnMut(U) -> Self::SinkItem,
              Self: Sized;

    fn with_filter<F>(self, f: F) -> WithFilter<Self, F>
        where F: FnMut(Self::SinkItem) -> bool,
              Self: Sized;

    fn with_filter_map<U, F>(self, f: F) -> WithFilterMap<Self, U, F>
        where F: FnMut(U) -> Option<Self::SinkItem>,
              Self: Sized;
     */

    /// Transforms the error returned by the sink.
    fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
        where F: FnOnce(Self::SinkError) -> E,
              Self: Sized,
    {
        map_err::new(self, f)
    }

    /// Map this sink's error to a different error type using the `Into` trait.
    ///
    /// If wanting to map errors of a `Sink + Stream`, use `.sink_err_into().err_into()`.
    fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, E>
        where Self: Sized,
              Self::SinkError: Into<E>,
    {
        err_into::new(self)
    }


    /// Adds a fixed-size buffer to the current sink.
    ///
    /// The resulting sink will buffer up to `amt` items when the underlying
    /// sink is unwilling to accept additional items. Calling `flush` on
    /// the buffered sink will attempt to both empty the buffer and complete
    /// processing on the underlying sink.
    ///
    /// Note that this function consumes the given sink, returning a wrapped
    /// version, much like `Iterator::map`.
    ///
    /// This method is only available when the `std` feature of this
    /// library is activated, and it is activated by default.
    #[cfg(feature = "std")]
    fn buffer(self, amt: usize) -> Buffer<Self>
        where Self: Sized
    {
        buffer::new(self, amt)
    }

    /// Close the sink.
    ///
    /// The sink itself is returned after closeing is complete.
    fn close(self) -> Close<Self>
        where Self: Sized
    {
        close::new(self)
    }

    /// Fanout items to multiple sinks.
    ///
    /// This adapter clones each incoming item and forwards it to both this as well as
    /// the other sink at the same time.
    fn fanout<S>(self, other: S) -> Fanout<Self, S>
        where Self: Sized,
              Self::SinkItem: Clone,
              S: Sink<SinkItem=Self::SinkItem, SinkError=Self::SinkError>
    {
        fanout::new(self, other)
    }

    /// Flush the sync, processing all pending items.
    ///
    /// The sink itself is returned after flushing is complete; this adapter is
    /// intended to be used when you want to stop sending to the sink until
    /// all current requests are processed.
    fn flush(self) -> Flush<Self>
        where Self: Sized
    {
        flush::new(self)
    }

    /// A future that completes after the given item has been fully processed
    /// into the sink, including flushing.
    ///
    /// Note that, **because of the flushing requirement, it is usually better
    /// to batch together items to send via `send_all`, rather than flushing
    /// between each item.**
    ///
    /// On completion, the sink is returned.
    fn send(self, item: Self::SinkItem) -> Send<Self>
        where Self: Sized
    {
        send::new(self, item)
    }

    /// A future that completes after the given stream has been fully processed
    /// into the sink, including flushing.
    ///
    /// This future will drive the stream to keep producing items until it is
    /// exhausted, sending each item to the sink. It will complete once both the
    /// stream is exhausted, the sink has received all items, and the sink has
    /// been flushed. Note that the sink is **not** closed.
    ///
    /// Doing `sink.send_all(stream)` is roughly equivalent to
    /// `stream.forward(sink)`. The returned future will exhaust all items from
    /// `stream` and send them to `self`.
    ///
    /// On completion, the pair `(sink, source)` is returned.
    fn send_all<S>(self, stream: S) -> SendAll<Self, S>
        where S: Stream<Item = Self::SinkItem>,
              Self::SinkError: From<S::Error>,
              Self: Sized
    {
        send_all::new(self, stream)
    }

    /// Wrap this sink in an `Either` sink, making it the left-hand variant
    /// of that `Either`.
    ///
    /// This can be used in combination with the `right_sink` method to write `if`
    /// statements that evaluate to different streams in different branches.
    fn left_sink<B>(self) -> Either<Self, B>
        where B: Sink<SinkItem = Self::SinkItem, SinkError = Self::SinkError>,
              Self: Sized
    {
        Either::Left(self)
    }

    /// Wrap this stream in an `Either` stream, making it the right-hand variant
    /// of that `Either`.
    ///
    /// This can be used in combination with the `left_sink` method to write `if`
    /// statements that evaluate to different streams in different branches.
    fn right_sink<B>(self) -> Either<B, Self>
        where B: Sink<SinkItem = Self::SinkItem, SinkError = Self::SinkError>,
              Self: Sized
    {
        Either::Right(self)
    }
}