[−][src]Trait futures_retry::SinkRetryExt
An extention trait for Sink
which allows to use SinkRetry
in a chain-like manner.
Example
This magic trait allows you to handle errors on sink in a very neat manner:
fn main() { let addr = "127.0.0.1:12345".parse().unwrap(); let tcp = TcpListener::bind(&addr).unwrap(); let conn_error_handler = |e: io::Error| match e.kind() { io::ErrorKind::Interrupted | io::ErrorKind::ConnectionRefused | io::ErrorKind::ConnectionReset | io::ErrorKind::ConnectionAborted | io::ErrorKind::NotConnected | io::ErrorKind::BrokenPipe => RetryPolicy::Repeat, io::ErrorKind::PermissionDenied => RetryPolicy::ForwardError(e), _ => RetryPolicy::WaitRetry(Duration::from_millis(5)), }; let data_sending_error_handler = |e: io::Error| match e.kind() { io::ErrorKind::Interrupted => RetryPolicy::Repeat, io::ErrorKind::TimedOut | io::ErrorKind::InvalidInput | io::ErrorKind::InvalidData => { RetryPolicy::WaitRetry(Duration::from_millis(5)) } _ => RetryPolicy::ForwardError(e), }; let server = tcp .incoming() .retry(conn_error_handler) .for_each(move |tcp| { let (reader, writer) = tcp.split(); let reader = tokio::codec::FramedRead::new(reader, tokio::codec::LinesCodec::new()); let writer = tokio::codec::FramedWrite::new(writer, tokio::codec::LinesCodec::new()); // Copy the data back to the client let conn = writer .retry(data_sending_error_handler) // retry .send_all(reader.retry(data_sending_error_handler)) // when future is resolved sink and stream is returned we just drop them. .map(drop) // Handle any errors .map_err(|err| eprintln!("Can't copy data: IO error {:?}", err)); // Spawn the future as a concurrent task tokio::spawn(conn); Ok(()) }) .map_err(|err| { eprintln!("server error {:?}", err); }); tokio::run(server.select(Ok(())).map(|(_, _)| ()).map_err(|(_, _)| ())); }
Provided methods
fn retry<F>(self, error_action: F) -> SinkRetry<F, Self> where
Self: Sized,
F: ErrorHandler<Self::SinkError>,
Self::SinkItem: Clone,
F: ErrorHandler<Self::SinkError>,
F::OutError: From<Self::SinkError>,
Self: Sized,
F: ErrorHandler<Self::SinkError>,
Self::SinkItem: Clone,
F: ErrorHandler<Self::SinkError>,
F::OutError: From<Self::SinkError>,
Converts the sink into a retry sink. See SinkRetry::new
for details.
Warning (Implementation details)
It depends from inner sink what happen when inner start_send()
is resolve to error. This
function assume that error has the same meaning as AsyncSink::NotReady
. If your item will
be buffered before error was returned - SinkRetry will insert this item again anyway.