schmoozer/
tcpconn.rs

1//! A simple [`Connector`] implementation that specifically attempts to connect
2//! to a TCP server and run an async function once successful.
3
4use std::{future::Future, io::ErrorKind, time::Duration};
5
6pub use tokio::net::TcpStream;
7
8pub use killswitch::KillSwitch;
9
10use super::{async_trait, ConnResult, Connector, RunResult};
11
12/// Simple TCP (re)connector.
13///
14/// When run, the `Connector` implementation will attempt to establish a
15/// [`TcpStream`] connection and then call an application-specified `Future`
16/// (returned by a closure).
17pub struct SimpleTcpConnector<F>
18where
19  F: Future<Output = RunResult<std::io::Error>>
20{
21  addr: String,
22  delay: usize,
23  ks: KillSwitch,
24  cb: Box<dyn Fn(TcpStream, KillSwitch) -> F + Send>
25}
26
27impl<F> SimpleTcpConnector<F>
28where
29  F: Future<Output = RunResult<std::io::Error>>
30{
31  #[allow(clippy::needless_pass_by_value)]
32  pub fn new(
33    addr: impl ToString,
34    ks: KillSwitch,
35    cb: Box<dyn Fn(TcpStream, KillSwitch) -> F + Send>
36  ) -> Self {
37    Self {
38      addr: addr.to_string(),
39      delay: 1,
40      ks,
41      cb
42    }
43  }
44}
45
46#[async_trait]
47impl<F> Connector for SimpleTcpConnector<F>
48where
49  F: Future<Output = RunResult<std::io::Error>> + Send
50{
51  type Error = std::io::Error;
52  type ConnType = TcpStream;
53
54  async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error> {
55    tokio::select! {
56      res = TcpStream::connect(&self.addr) =>  {
57        match res {
58          Ok(conn) => ConnResult::Connected(conn),
59          Err(e) => match e.kind() {
60            ErrorKind::ConnectionRefused | ErrorKind::ConnectionAborted |
61                ErrorKind::NotConnected | ErrorKind::TimedOut => {
62              ConnResult::Reconnect
63            }
64            _ => ConnResult::Exit(Err(e))
65          }
66        }
67      }
68      () = self.ks.wait() => {
69        ConnResult::Exit(Ok(()))
70      }
71    }
72  }
73
74  async fn retry_delay(&mut self) -> RunResult<Self::Error> {
75    let dur = Duration::from_secs(self.delay.try_into().unwrap());
76    tokio::select! {
77      () = self.ks.wait() => {
78        RunResult::Exit(Ok(()))
79      }
80      () = tokio::time::sleep(dur) => {
81        // double sleep duration for each iteration, but cap at 60 seconds
82        self.delay = std::cmp::min(self.delay * 2, 60);
83        RunResult::Reconnect
84      }
85    }
86  }
87
88  async fn run(&mut self, conn: Self::ConnType) -> RunResult<Self::Error> {
89    // reset delay
90    self.delay = 1;
91
92    let fut = (self.cb)(conn, self.ks.clone());
93
94    fut.await
95  }
96}
97
98// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :