1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
//! Contains the ingredients needed to create wrappers over [Stream](futures::Stream)/[Sink](futures::Sink) items
//! to automatically reconnect upon failures. This is done so that a user can use them without worrying
//! that their application logic will terminate simply due to an event like a temporary network failure.
//!
//! To wrap existing streams, you simply need to implement the [UnderlyingStream] trait.
//! Once implemented, you can construct it easily by creating a [ReconnectStream] type as seen below.
//!
//! This crate supports both `tokio` and `async-std` runtime.
//!
//! *This crate is a fork of [stubborn-io](https://github.com/craftytrickster/stubborn-io).*
//!
//! *Minimum supported rust version: 1.43.1*
//!
//! ### Runtime Support
//!
//! This crate supports both `tokio` and `async-std` runtime.
//!
//! `tokio` support is enabled by default. While used on an `async-std` runtime, change the corresponding dependency in `Cargo.toml` to
//!
//! ``` toml
//! stream-reconnect = { version = "0.3", default-features = false, features = ["async-std"] }
//! ```
//!
//! ### Motivations (preserved from stubborn-io)
//! This crate was created because I was working on a service that needed to fetch data from a remote server
//! via a tokio TcpConnection. It normally worked perfectly (as does all of my code ☺), but every time the
//! remote server had a restart or turnaround, my application logic would stop working.
//! **stubborn-io** was born because I did not want to complicate my service's logic with TcpStream
//! reconnect and disconnect handling code. With stubborn-io, I can keep the service exactly the same,
//! knowing that the StubbornTcpStream's sensible defaults will perform reconnects in a way to keep my service running.
//! Once I realized that the implementation could apply to all IO items and not just TcpStream, I made it customizable as
//! seen below.
//!
//! ## Example on how a ReconnectStream item might be created
//! ```
//! use stream_reconnect::{UnderlyingStream, ReconnectStream};
//! use std::future::Future;
//! use std::io;
//! use std::pin::Pin;
//! use tokio::net::TcpStream;
//! use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
//! use tokio_tungstenite::tungstenite::{Message, error::Error as WsError};
//! use futures::{SinkExt, Stream, Sink};
//! use std::task::{Context, Poll};
//!
//! struct MyWs(WebSocketStream<MaybeTlsStream<TcpStream>>);
//!
//! // implement Stream & Sink for MyWs
//! # impl Stream for MyWs {
//! #     type Item = Result<Message, WsError>;
//! #
//! #     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>{
//! #         Pin::new(&mut self.0).poll_next(cx)
//! #     }
//! # }
//! #
//! # impl Sink<Message> for MyWs {
//! #     type Error = WsError;
//! #
//! #     fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>{
//! #         Pin::new(&mut self.0).poll_ready(cx)
//! #     }
//! #
//! #     fn start_send(mut self: Pin<&mut Self>, item: Message) -> Result<(), Self::Error>{
//! #         Pin::new(&mut self.0).start_send(item)
//! #     }
//! #
//! #     fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>{
//! #         Pin::new(&mut self.0).poll_flush(cx)
//! #     }
//! #
//! #     fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>{
//! #         Pin::new(&mut self.0).poll_close(cx)
//! #     }
//! # }
//!
//! impl UnderlyingStream<String, Result<Message, WsError>, WsError> for MyWs {
//!     // Establishes connection.
//!     // Additionally, this will be used when reconnect tries are attempted.
//!     fn establish(addr: String) -> Pin<Box<dyn Future<Output = Result<Self, WsError>> + Send>> {
//!         Box::pin(async move {
//!             // In this case, we are trying to connect to the WebSocket endpoint
//!             let ws_connection = connect_async(addr).await.unwrap().0;
//!             Ok(MyWs(ws_connection))
//!         })
//!     }
//!
//!     // The following errors are considered disconnect errors.
//!     fn is_write_disconnect_error(&self, err: &WsError) -> bool {
//!         matches!(
//!                 err,
//!                 WsError::ConnectionClosed
//!                     | WsError::AlreadyClosed
//!                     | WsError::Io(_)
//!                     | WsError::Tls(_)
//!                     | WsError::Protocol(_)
//!             )
//!     }
//!
//!     // If an `Err` is read, then there might be an disconnection.
//!     fn is_read_disconnect_error(&self, item: &Result<Message, WsError>) -> bool {
//!         if let Err(e) = item {
//!             self.is_write_disconnect_error(e)
//!         } else {
//!             false
//!         }
//!     }
//!
//!     // Return "Exhausted" if all retry attempts are failed.
//!     fn exhaust_err() -> WsError {
//!         WsError::Io(io::Error::new(io::ErrorKind::Other, "Exhausted"))
//!     }
//! }
//!
//! type ReconnectWs = ReconnectStream<MyWs, String, Result<Message, WsError>, WsError>;
//!
//! # async fn test() {
//! let mut ws_stream: ReconnectWs = ReconnectWs::connect(String::from("wss://localhost:8000")).await.unwrap();
//! ws_stream.send(Message::text(String::from("hello world!"))).await.unwrap();
//! # }
//! ```

#[doc(inline)]
pub use crate::config::ReconnectOptions;
pub use crate::stream::{ReconnectStream, UnderlyingStream};

pub mod config;
mod stream;