io-tether

A small library for defining I/O types which reconnect on errors.
Usage
To get started, add io-tether to your list of dependencies
io-tether = { version = "0.4.3" }
Basics
The primary type exposed by this library is the Tether type. This
type is generic over two parameters:
-
C: The I/O connector. This is the type which produces the
underlying connections. For some io types like QUIC this may
need to be fairly involved, while for io like TCP, it may just
be a wrapper around a socket address
-
R: The resolver. This type will likely be generated by you in
order to handle the buisness logic required for your application
whenever a disconnect occurs. It drives the reconnect process and
allows developers to inject arbirtary asynchronous code at various
stages of the reconnection process
Example
Below is a simple example of a resolver implmentation that calls back
to a channel whenever it detects a disconnect.
use std::{time::Duration, net::{SocketAddrV4, Ipv4Addr}};
use io_tether::{Resolver, Context, Reason, Tether, PinFut, tcp::TcpConnector};
use tokio::{net::TcpStream, io::{AsyncReadExt, AsyncWriteExt}, sync::mpsc};
pub struct ChannelResolver(mpsc::Sender<String>);
type Connector = TcpConnector<SocketAddrV4>;
impl Resolver<Connector> for ChannelResolver {
fn disconnected(&mut self, context: &Context, conn: &mut Connector) -> PinFut<bool> {
let sender = self.0.clone();
let reason = context.reason().to_string();
conn.get_addr_mut().set_port(8081);
Box::pin(async move {
sender.send(reason).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
true
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, mut rx) = mpsc::channel(1);
let resolver = ChannelResolver(tx);
let listener_1 = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
let listener_2 = tokio::net::TcpListener::bind("0.0.0.0:8081").await?;
tokio::spawn(async move {
let (mut stream, _addr) = listener_1.accept().await.unwrap();
stream.write_all(b"foo").await.unwrap();
});
tokio::spawn(async move {
let (mut stream, _addr) = listener_2.accept().await.unwrap();
stream.write_all(b"bar").await.unwrap();
});
let handle = tokio::spawn(async move {
let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080);
let mut tether = Tether::connect_tcp(addr, resolver)
.await
.unwrap();
let mut buf = [0; 6];
tether.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"foobar");
});
assert!(rx.recv().await.is_some());
handle.await?;
Ok(())
}
Alternatives
-
stubborn-io similar, but
uses synchronous callbacks and a duration iterator for retries
-
tokio-retry a more general
purpose future retry library