[][src]Trait futures_retry::SinkRetryExt

pub trait SinkRetryExt: Sink {
    fn retry<F>(self, error_action: F) -> SinkRetry<F, Self>
    where
        Self: Sized,
        F: ErrorHandler<Self::SinkError>,
        Self::SinkItem: Clone,
        F: ErrorHandler<Self::SinkError>,
        F::OutError: From<Self::SinkError>
, { ... } }

An extention trait for Sink which allows to use SinkRetry in a chain-like manner.

Example

This magic trait allows you to handle errors on sink in a very neat manner:


fn main() {
     let addr = "127.0.0.1:12345".parse().unwrap();
     let tcp = TcpListener::bind(&addr).unwrap();

    let conn_error_handler = |e: io::Error| match e.kind() {
        io::ErrorKind::Interrupted
        | io::ErrorKind::ConnectionRefused
        | io::ErrorKind::ConnectionReset
        | io::ErrorKind::ConnectionAborted
        | io::ErrorKind::NotConnected
        | io::ErrorKind::BrokenPipe => RetryPolicy::Repeat,
        io::ErrorKind::PermissionDenied => RetryPolicy::ForwardError(e),
        _ => RetryPolicy::WaitRetry(Duration::from_millis(5)),
    };

    let data_sending_error_handler = |e: io::Error| match e.kind() {
        io::ErrorKind::Interrupted => RetryPolicy::Repeat,
        io::ErrorKind::TimedOut | io::ErrorKind::InvalidInput | io::ErrorKind::InvalidData => {
            RetryPolicy::WaitRetry(Duration::from_millis(5))
        }
        _ => RetryPolicy::ForwardError(e),
    };

    let server = tcp
        .incoming()
        .retry(conn_error_handler)
        .for_each(move |tcp| {
            let (reader, writer) = tcp.split();

            let reader = tokio::codec::FramedRead::new(reader, tokio::codec::LinesCodec::new());
            let writer = tokio::codec::FramedWrite::new(writer, tokio::codec::LinesCodec::new());
            // Copy the data back to the client

            let conn = writer
                .retry(data_sending_error_handler) // retry
                .send_all(reader.retry(data_sending_error_handler))
                // when future is resolved sink and stream is returned we just drop them.
                .map(drop)
                // Handle any errors
                .map_err(|err| eprintln!("Can't copy data: IO error {:?}", err));

            // Spawn the future as a concurrent task
            tokio::spawn(conn);
            Ok(())
        })
        .map_err(|err| {
            eprintln!("server error {:?}", err);
        });
    tokio::run(server.select(Ok(())).map(|(_, _)| ()).map_err(|(_, _)| ()));
}

Provided methods

fn retry<F>(self, error_action: F) -> SinkRetry<F, Self> where
    Self: Sized,
    F: ErrorHandler<Self::SinkError>,
    Self::SinkItem: Clone,
    F: ErrorHandler<Self::SinkError>,
    F::OutError: From<Self::SinkError>, 

Converts the sink into a retry sink. See SinkRetry::new for details.

Warning (Implementation details)

It depends from inner sink what happen when inner start_send() is resolve to error. This function assume that error has the same meaning as AsyncSink::NotReady. If your item will be buffered before error was returned - SinkRetry will insert this item again anyway.

Loading content...

Implementors

impl<S: ?Sized> SinkRetryExt for S where
    S: Sink
[src]

fn retry<F>(self, error_action: F) -> SinkRetry<F, Self> where
    Self: Sized,
    F: ErrorHandler<Self::SinkError>,
    Self::SinkItem: Clone,
    F: ErrorHandler<Self::SinkError>,
    F::OutError: From<Self::SinkError>, 
[src]

Loading content...