use std::{future::Future, io::ErrorKind, time::Duration};
pub use tokio::net::TcpStream;
pub use killswitch::KillSwitch;
use super::{async_trait, ConnResult, Connector, RunResult};
pub struct SimpleTcpConnector<F>
where
F: Future<Output = RunResult<std::io::Error>>
{
addr: String,
delay: usize,
ks: KillSwitch,
cb: Box<dyn Fn(TcpStream, KillSwitch) -> F + Send>
}
impl<F> SimpleTcpConnector<F>
where
F: Future<Output = RunResult<std::io::Error>>
{
#[allow(clippy::needless_pass_by_value)]
pub fn new(
addr: impl ToString,
ks: KillSwitch,
cb: Box<dyn Fn(TcpStream, KillSwitch) -> F + Send>
) -> Self {
Self {
addr: addr.to_string(),
delay: 1,
ks,
cb
}
}
}
#[async_trait]
impl<F> Connector for SimpleTcpConnector<F>
where
F: Future<Output = RunResult<std::io::Error>> + Send
{
type Error = std::io::Error;
type ConnType = TcpStream;
async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error> {
tokio::select! {
res = TcpStream::connect(&self.addr) => {
match res {
Ok(conn) => ConnResult::Connected(conn),
Err(e) => match e.kind() {
ErrorKind::ConnectionRefused | ErrorKind::ConnectionAborted |
ErrorKind::NotConnected | ErrorKind::TimedOut => {
ConnResult::Reconnect
}
_ => ConnResult::Exit(Err(e))
}
}
}
() = self.ks.wait() => {
ConnResult::Exit(Ok(()))
}
}
}
async fn retry_delay(&mut self) -> RunResult<Self::Error> {
let dur = Duration::from_secs(self.delay.try_into().unwrap());
tokio::select! {
() = self.ks.wait() => {
RunResult::Exit(Ok(()))
}
() = tokio::time::sleep(dur) => {
self.delay = std::cmp::min(self.delay * 2, 60);
RunResult::Reconnect
}
}
}
async fn run(&mut self, conn: Self::ConnType) -> RunResult<Self::Error> {
self.delay = 1;
let fut = (self.cb)(conn, self.ks.clone());
fut.await
}
}