schmoozer 0.4.1

A simple abstraction over a retryable async operation, such as establishing a connection
Documentation
//! A simple [`Connector`] implementation that specifically attempts to connect
//! to a TCP server and run an async function once successful.

use std::{future::Future, io::ErrorKind, time::Duration};

pub use tokio::net::TcpStream;

pub use killswitch::KillSwitch;

use super::{async_trait, ConnResult, Connector, RunResult};

/// Simple TCP (re)connector.
///
/// When run, the `Connector` implementation will attempt to establish a
/// [`TcpStream`] connection and then call an application-specified `Future`
/// (returned by a closure).
pub struct SimpleTcpConnector<F>
where
  F: Future<Output = RunResult<std::io::Error>>
{
  addr: String,
  delay: usize,
  ks: KillSwitch,
  cb: Box<dyn Fn(TcpStream, KillSwitch) -> F + Send>
}

impl<F> SimpleTcpConnector<F>
where
  F: Future<Output = RunResult<std::io::Error>>
{
  #[allow(clippy::needless_pass_by_value)]
  pub fn new(
    addr: impl ToString,
    ks: KillSwitch,
    cb: Box<dyn Fn(TcpStream, KillSwitch) -> F + Send>
  ) -> Self {
    Self {
      addr: addr.to_string(),
      delay: 1,
      ks,
      cb
    }
  }
}

#[async_trait]
impl<F> Connector for SimpleTcpConnector<F>
where
  F: Future<Output = RunResult<std::io::Error>> + Send
{
  type Error = std::io::Error;
  type ConnType = TcpStream;

  async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error> {
    tokio::select! {
      res = TcpStream::connect(&self.addr) =>  {
        match res {
          Ok(conn) => ConnResult::Connected(conn),
          Err(e) => match e.kind() {
            ErrorKind::ConnectionRefused | ErrorKind::ConnectionAborted |
                ErrorKind::NotConnected | ErrorKind::TimedOut => {
              ConnResult::Reconnect
            }
            _ => ConnResult::Exit(Err(e))
          }
        }
      }
      () = self.ks.wait() => {
        ConnResult::Exit(Ok(()))
      }
    }
  }

  async fn retry_delay(&mut self) -> RunResult<Self::Error> {
    let dur = Duration::from_secs(self.delay.try_into().unwrap());
    tokio::select! {
      () = self.ks.wait() => {
        RunResult::Exit(Ok(()))
      }
      () = tokio::time::sleep(dur) => {
        // double sleep duration for each iteration, but cap at 60 seconds
        self.delay = std::cmp::min(self.delay * 2, 60);
        RunResult::Reconnect
      }
    }
  }

  async fn run(&mut self, conn: Self::ConnType) -> RunResult<Self::Error> {
    // reset delay
    self.delay = 1;

    let fut = (self.cb)(conn, self.ks.clone());

    fut.await
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :