stream_reconnect/lib.rs
1//! Contains the ingredients needed to create wrappers over [Stream](futures::Stream)/[Sink](futures::Sink) items
2//! to automatically reconnect upon failures. This is done so that a user can use them without worrying
3//! that their application logic will terminate simply due to an event like a temporary network failure.
4//!
5//! To wrap existing streams, you simply need to implement the [UnderlyingStream] trait.
6//! Once implemented, you can construct it easily by creating a [ReconnectStream] type as seen below.
7//!
8//! This crate supports both `tokio` and `async-std` runtime.
9//!
10//! *This crate is a fork of [stubborn-io](https://github.com/craftytrickster/stubborn-io).*
11//!
12//! *Minimum supported rust version: 1.43.1*
13//!
14//! ### Runtime Support
15//!
16//! This crate supports both `tokio` and `async-std` runtime.
17//!
18//! `tokio` support is enabled by default. While used on an `async-std` runtime, change the corresponding dependency in `Cargo.toml` to
19//!
20//! ``` toml
21//! stream-reconnect = { version = "0.3", default-features = false, features = ["async-std"] }
22//! ```
23//!
24//! ### Motivations (preserved from stubborn-io)
25//! This crate was created because I was working on a service that needed to fetch data from a remote server
26//! via a tokio TcpConnection. It normally worked perfectly (as does all of my code ☺), but every time the
27//! remote server had a restart or turnaround, my application logic would stop working.
28//! **stubborn-io** was born because I did not want to complicate my service's logic with TcpStream
29//! reconnect and disconnect handling code. With stubborn-io, I can keep the service exactly the same,
30//! knowing that the StubbornTcpStream's sensible defaults will perform reconnects in a way to keep my service running.
31//! Once I realized that the implementation could apply to all IO items and not just TcpStream, I made it customizable as
32//! seen below.
33//!
34//! ## Example on how a ReconnectStream item might be created
35//! ```
36//! use stream_reconnect::{UnderlyingStream, ReconnectStream};
37//! use std::future::Future;
38//! use std::io;
39//! use std::pin::Pin;
40//! use tokio::net::TcpStream;
41//! use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
42//! use tokio_tungstenite::tungstenite::{Message, error::Error as WsError};
43//! use futures::{SinkExt, Stream, Sink};
44//! use std::task::{Context, Poll};
45//!
46//! struct MyWs(WebSocketStream<MaybeTlsStream<TcpStream>>);
47//!
48//! // implement Stream & Sink for MyWs
49//! # impl Stream for MyWs {
50//! # type Item = Result<Message, WsError>;
51//! #
52//! # fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>{
53//! # Pin::new(&mut self.0).poll_next(cx)
54//! # }
55//! # }
56//! #
57//! # impl Sink<Message> for MyWs {
58//! # type Error = WsError;
59//! #
60//! # fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>{
61//! # Pin::new(&mut self.0).poll_ready(cx)
62//! # }
63//! #
64//! # fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error>{
65//! # Pin::new(&mut self.0).start_send(item)
66//! # }
67//! #
68//! # fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>{
69//! # Pin::new(&mut self.0).poll_flush(cx)
70//! # }
71//! #
72//! # fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>{
73//! # Pin::new(&mut self.0).poll_close(cx)
74//! # }
75//! # }
76//!
77//! impl UnderlyingStream<String, Result<Message, WsError>, WsError> for MyWs {
78//! // Establishes connection.
79//! // Additionally, this will be used when reconnect tries are attempted.
80//! fn establish(addr: String) -> Pin<Box<dyn Future<Output = Result<Self, WsError>> + Send>> {
81//! Box::pin(async move {
82//! // In this case, we are trying to connect to the WebSocket endpoint
83//! let ws_connection = connect_async(addr).await.unwrap().0;
84//! Ok(MyWs(ws_connection))
85//! })
86//! }
87//!
88//! // The following errors are considered disconnect errors.
89//! fn is_write_disconnect_error(&self, err: &WsError) -> bool {
90//! matches!(
91//! err,
92//! WsError::ConnectionClosed
93//! | WsError::AlreadyClosed
94//! | WsError::Io(_)
95//! | WsError::Tls(_)
96//! | WsError::Protocol(_)
97//! )
98//! }
99//!
100//! // If an `Err` is read, then there might be an disconnection.
101//! fn is_read_disconnect_error(&self, item: &Result<Message, WsError>) -> bool {
102//! if let Err(e) = item {
103//! self.is_write_disconnect_error(e)
104//! } else {
105//! false
106//! }
107//! }
108//!
109//! // Return "Exhausted" if all retry attempts are failed.
110//! fn exhaust_err() -> WsError {
111//! WsError::Io(io::Error::new(io::ErrorKind::Other, "Exhausted"))
112//! }
113//! }
114//!
115//! type ReconnectWs = ReconnectStream<MyWs, String, Result<Message, WsError>, WsError>;
116//!
117//! # async fn test() {
118//! let mut ws_stream: ReconnectWs = ReconnectWs::connect(String::from("wss://localhost:8000")).await.unwrap();
119//! ws_stream.send(Message::text(String::from("hello world!"))).await.unwrap();
120//! # }
121//! ```
122
123#[doc(inline)]
124pub use crate::config::ReconnectOptions;
125pub use crate::stream::{ReconnectStream, UnderlyingStream};
126
127pub mod config;
128mod stream;