#![doc = include_str!("../README.md")]
use std::{future::Future, io::ErrorKind, pin::Pin};
pub mod config;
#[cfg(feature = "fs")]
pub mod fs;
mod implementations;
#[cfg(feature = "net")]
pub mod tcp;
#[cfg(all(feature = "net", target_family = "unix"))]
pub mod unix;
#[cfg(test)]
mod tests;
use config::Config;
pub type PinFut<O> = Pin<Box<dyn Future<Output = O> + 'static + Send>>;
pub trait Resolver<C> {
fn disconnected(&mut self, context: &Context, connector: &mut C) -> PinFut<Action>;
fn unreachable(&mut self, context: &Context, connector: &mut C) -> PinFut<bool> {
let fut = self.disconnected(context, connector);
Box::pin(async move {
match fut.await {
Action::AttemptReconnect => true,
Action::Exhaust | Action::Ignore => false,
}
})
}
fn established(&mut self, context: &Context) -> PinFut<()> {
self.reconnected(context)
}
fn reconnected(&mut self, _context: &Context) -> PinFut<()> {
Box::pin(std::future::ready(()))
}
}
pub trait Connector {
type Output;
fn connect(&mut self) -> PinFut<Result<Self::Output, std::io::Error>>;
fn reconnect(&mut self) -> PinFut<Result<Self::Output, std::io::Error>> {
self.connect()
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum Reason {
Eof,
Err(std::io::Error),
}
impl std::fmt::Display for Reason {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Reason::Eof => f.write_str("End of file detected"),
Reason::Err(error) => error.fmt(f),
}
}
}
impl std::error::Error for Reason {}
impl Reason {
pub(crate) fn clone_private(&self) -> Self {
match self {
Reason::Eof => Self::Eof,
Reason::Err(error) => {
let kind = error.kind();
let error = std::io::Error::new(kind, error.to_string());
Self::Err(error)
}
}
}
pub fn retryable(&self) -> bool {
use std::io::ErrorKind as Kind;
match self {
Reason::Eof => true,
Reason::Err(error) => matches!(
error.kind(),
Kind::NotFound
| Kind::PermissionDenied
| Kind::ConnectionRefused
| Kind::ConnectionAborted
| Kind::ConnectionReset
| Kind::NotConnected
| Kind::AlreadyExists
| Kind::HostUnreachable
| Kind::AddrNotAvailable
| Kind::NetworkDown
| Kind::BrokenPipe
| Kind::TimedOut
| Kind::UnexpectedEof
| Kind::NetworkUnreachable
| Kind::AddrInUse
),
}
}
}
impl From<Reason> for std::io::Error {
fn from(value: Reason) -> Self {
match value {
Reason::Eof => std::io::Error::new(ErrorKind::UnexpectedEof, "Eof error"),
Reason::Err(error) => error,
}
}
}
pub struct Tether<C: Connector, R> {
state: State<C::Output>,
inner: TetherInner<C, R>,
}
struct TetherInner<C: Connector, R> {
config: Config,
connector: C,
context: Context,
io: C::Output,
resolver: R,
last_write: Option<Reason>,
}
impl<C: Connector, R: Resolver<C>> TetherInner<C, R> {
fn set_connected(&mut self, state: &mut State<C::Output>) {
*state = State::Connected;
self.context.reset();
}
fn set_reconnected(&mut self, state: &mut State<C::Output>, new_io: <C as Connector>::Output) {
self.io = new_io;
let fut = self.resolver.reconnected(&self.context);
*state = State::Reconnected(fut);
}
fn set_reconnecting(&mut self, state: &mut State<C::Output>) {
let fut = self.connector.reconnect();
*state = State::Reconnecting(fut);
}
fn set_disconnected(&mut self, state: &mut State<C::Output>, reason: Reason, source: Source) {
self.context.reason = Some((reason, source));
let fut = self
.resolver
.disconnected(&self.context, &mut self.connector);
*state = State::Disconnected(fut);
}
}
impl<C: Connector, R> Tether<C, R> {
pub fn resolver(&self) -> &R {
&self.inner.resolver
}
pub fn connector(&self) -> &C {
&self.inner.connector
}
pub fn context(&self) -> &Context {
&self.inner.context
}
}
impl<C, R> Tether<C, R>
where
C: Connector,
R: Resolver<C>,
{
pub fn new(connector: C, io: C::Output, resolver: R) -> Self {
Self::new_with_config(connector, io, resolver, Config::default())
}
pub fn new_with_config(connector: C, io: C::Output, resolver: R, config: Config) -> Self {
Self::new_with_context(connector, io, resolver, Context::default(), config)
}
fn new_with_context(
connector: C,
io: C::Output,
resolver: R,
context: Context,
config: Config,
) -> Self {
Self {
state: Default::default(),
inner: TetherInner {
config,
connector,
context,
io,
resolver,
last_write: None,
},
}
}
pub fn set_config(&mut self, config: Config) {
self.inner.config = config;
}
#[inline]
pub fn into_inner(self) -> C::Output {
self.inner.io
}
pub async fn connect(mut connector: C, mut resolver: R) -> Result<Self, std::io::Error> {
let mut context = Context::default();
loop {
let state = match connector.connect().await {
Ok(io) => {
resolver.established(&context).await;
context.reset();
return Ok(Self::new_with_context(
connector,
io,
resolver,
context,
Config::default(),
));
}
Err(error) => error,
};
context.reason = Some((Reason::Err(state), Source::Reconnect));
context.increment_attempts();
if !resolver.unreachable(&context, &mut connector).await {
let Some((Reason::Err(error), _)) = context.reason else {
unreachable!("state is immutable and established as Err above");
};
return Err(error);
}
}
}
pub async fn connect_without_retry(
mut connector: C,
mut resolver: R,
) -> Result<Self, std::io::Error> {
let context = Context::default();
let io = connector.connect().await?;
resolver.established(&context).await;
Ok(Self::new_with_context(
connector,
io,
resolver,
context,
Config::default(),
))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum Action {
AttemptReconnect,
Exhaust,
Ignore,
}
#[derive(Default)]
enum State<T> {
#[default]
Connected,
Disconnected(PinFut<Action>),
Reconnecting(PinFut<Result<T, std::io::Error>>),
Reconnected(PinFut<()>),
Exhausted(Reason, Source),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
enum Source {
Io,
Reconnect,
}
#[derive(Default, Debug)]
pub struct Context {
total_attempts: usize,
current_attempts: usize,
reason: Option<(Reason, Source)>,
}
impl Context {
#[inline]
pub fn current_reconnect_attempts(&self) -> usize {
self.current_attempts
}
#[inline]
pub fn total_reconnect_attempts(&self) -> usize {
self.total_attempts
}
fn increment_attempts(&mut self) {
self.current_attempts += 1;
self.total_attempts += 1;
}
#[inline]
pub fn reason(&self) -> &Reason {
self.try_reason().unwrap()
}
#[inline]
pub fn try_reason(&self) -> Option<&Reason> {
self.reason.as_ref().map(|val| &val.0)
}
#[inline]
fn reset(&mut self) {
self.current_attempts = 0;
}
}
pub(crate) mod ready {
macro_rules! ready {
($e:expr $(,)?) => {
match $e {
std::task::Poll::Ready(t) => t,
std::task::Poll::Pending => return std::task::Poll::Pending,
}
};
}
pub(crate) use ready;
}