# io-tether
<p align="center">
<img src="https://cdn.akamai.steamstatic.com/apps/dota2/images/dota_react/abilities/wisp_tether.png" />
</p>
[](https://crates.io/crates/io-tether)
[](https://docs.rs/io-tether/)
A small library for defining I/O types which reconnect on errors.
## Usage
To get started, add `io-tether` to your list of dependencies
```toml
io-tether = { version = "0.6.1" }
```
### Basics
The primary type exposed by this library is the `Tether` type. This
type is generic over two parameters:
1. `C`: The I/O connector. This is the type which produces the
underlying connections. For some io types like QUIC this may
need to be fairly involved, while for io like TCP, it may just
be a wrapper around a socket address
1. `R`: The resolver. This type will likely be generated by you in
order to handle the buisness logic required for your application
whenever a disconnect occurs. It drives the reconnect process and
allows developers to inject arbirtary asynchronous code at various
stages of the reconnection process
### Example
Below is a simple example of a resolver implmentation that calls back
to a channel whenever it detects a disconnect.
```rust
use std::{time::Duration, net::{SocketAddrV4, Ipv4Addr}};
use io_tether::{Action, Resolver, Context, Reason, Tether, PinFut, tcp::TcpConnector};
use tokio::{net::TcpStream, io::{AsyncReadExt, AsyncWriteExt}, sync::mpsc};
pub struct ChannelResolver(mpsc::Sender<String>);
type Connector = TcpConnector<SocketAddrV4>;
// NOTE: If you don't need to act on the connector, this can be implemented for generic `C`
impl Resolver<Connector> for ChannelResolver {
fn disconnected(&mut self, context: &Context, conn: &mut Connector) -> PinFut<Action> {
let sender = self.0.clone();
let reason = context.reason().to_string();
// Try port 8081 when retrying
conn.get_addr_mut().set_port(8081);
Box::pin(async move {
// Send the disconnect reason over the channel
sender.send(reason).await.unwrap();
// We can call arbirtary async code here
tokio::time::sleep(Duration::from_millis(500)).await;
Action::AttemptReconnect
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, mut rx) = mpsc::channel(1);
let resolver = ChannelResolver(tx);
let listener_1 = tokio::net::TcpListener::bind("0.0.0.0:8080").await?;
let listener_2 = tokio::net::TcpListener::bind("0.0.0.0:8081").await?;
// Each listener, only accepts 1 connection, writing half of "foobar"
tokio::spawn(async move {
let (mut stream, _addr) = listener_1.accept().await.unwrap();
stream.write_all(b"foo").await.unwrap();
});
tokio::spawn(async move {
let (mut stream, _addr) = listener_2.accept().await.unwrap();
stream.write_all(b"bar").await.unwrap();
});
let handle = tokio::spawn(async move {
// Start by connecting to port 8080
let addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 8080);
let mut tether = Tether::connect_tcp(addr, resolver)
.await
.unwrap();
let mut buf = [0; 6];
// A disconnect occurs here after the server writes
// "foo" then drops the client, triggering a disconnect.
// The disconnect is detected and forwarded to the resolver,
// which adjusts the port, sleeps and attempts a reconnect.
//
// The resolver then connects to the new remote socket and we
// pull the next 3 bytes. This all happens under the hood
// without any extra work at each read callsite.
tether.read_exact(&mut buf).await.unwrap();
assert_eq!(&buf, b"foobar");
});
// Since a disconnect occurred during the call to read_exact,
// the channel will contain the disconnect reason
assert!(rx.recv().await.is_some());
handle.await?;
Ok(())
}
```
## Alternatives
1. [stubborn-io](https://crates.io/crates/stubborn-io) similar, but
uses synchronous callbacks and a duration iterator for retries
2. [tokio-retry](https://crates.io/crates/tokio-retry) a more general
purpose future retry library