io-tether 0.3.0

A small library for defining I/O types which reconnect on errors.
Documentation

io-tether

Crates.io Documentation

A small library for defining I/O types which reconnect on errors.

Usage

To get started, add io-tether to your list of dependencies

io-tether = { version = "0.3.0" }

Basics

The primary type exposed by this library is the Tether type. This type is generic over three parameters:

  1. I: The data supplied to attempt a connection to the io source. A socket address for TCP, a file name for Files, etc. This type must be Clone since an owned value is passed to Io::reconnect on each reconnect attempt.

  2. T: The I/O constructor. This is the type which produces the underlying connections. This is necessary to support more complicated io creation such as a QUIC connection, but for cases like TCP, this can mostly be ignored

  3. R: The resolver. This type will likely be generated by you in order to handle the buisness logic required for your application whenever a disconnect occurs. It drives the reconnect process and allows developers to inject arbirtary asynchronous code at various stages of the reconnection process

Example

Below is a simple example of a resolver implmentation that calls back to a channel whenever it detects a disconnect.

use std::time::Duration;
use io_tether::{Resolver, Context, Reason, Tether, PinFut, tcp::TcpConnector};
use tokio::{net::TcpStream, io::{AsyncReadExt, AsyncWriteExt}, sync::mpsc};

/// Custom resolver
pub struct CallbackResolver {
    channel: mpsc::Sender<()>,
}

impl Resolver for CallbackResolver {
    fn disconnected(
        &mut self,
        context: &Context,
        state: &Reason,
    ) -> PinFut<bool> {

        let sender = self.channel.clone();
        Box::pin(async move {
            tokio::time::sleep(Duration::from_millis(500)).await;
            sender.send(()).await.unwrap();
            true
        })
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (channel, mut rx) = mpsc::channel(10);

    let listener = tokio::net::TcpListener::bind("localhost:8080").await?;
    tokio::spawn(async move {
        loop {
            let (mut stream, _addr) = listener.accept().await.unwrap();
            stream.write_all(b"foobar").await.unwrap();
            stream.shutdown().await.unwrap();
        }
    });

    let resolver = CallbackResolver {
        channel,
    };

    let handle = tokio::spawn(async move {
        let addr = String::from("localhost:8080");
        let mut tether = Tether::connect(TcpConnector, addr, resolver)
            .await
            .unwrap();

        let mut buf = [0; 12];
        tether.read_exact(&mut buf).await.unwrap();
        assert_eq!(&buf, b"foobarfoobar");
    });
    
    assert!(rx.recv().await.is_some());
    handle.await?;

    Ok(())
}

Alternatives

  1. stubborn-io similar, but uses synchronous callbacks and a duration iterator for retries

  2. tokio-retry a more general purpose future retry library