1use std::{future::Future, io::ErrorKind, time::Duration};
5
6pub use tokio::net::TcpStream;
7
8pub use killswitch::KillSwitch;
9
10use super::{async_trait, ConnResult, Connector, RunResult};
11
12pub struct SimpleTcpConnector<F>
18where
19 F: Future<Output = RunResult<std::io::Error>>
20{
21 addr: String,
22 delay: usize,
23 ks: KillSwitch,
24 cb: Box<dyn Fn(TcpStream, KillSwitch) -> F + Send>
25}
26
27impl<F> SimpleTcpConnector<F>
28where
29 F: Future<Output = RunResult<std::io::Error>>
30{
31 #[allow(clippy::needless_pass_by_value)]
32 pub fn new(
33 addr: impl ToString,
34 ks: KillSwitch,
35 cb: Box<dyn Fn(TcpStream, KillSwitch) -> F + Send>
36 ) -> Self {
37 Self {
38 addr: addr.to_string(),
39 delay: 1,
40 ks,
41 cb
42 }
43 }
44}
45
46#[async_trait]
47impl<F> Connector for SimpleTcpConnector<F>
48where
49 F: Future<Output = RunResult<std::io::Error>> + Send
50{
51 type Error = std::io::Error;
52 type ConnType = TcpStream;
53
54 async fn connect(&mut self) -> ConnResult<Self::ConnType, Self::Error> {
55 tokio::select! {
56 res = TcpStream::connect(&self.addr) => {
57 match res {
58 Ok(conn) => ConnResult::Connected(conn),
59 Err(e) => match e.kind() {
60 ErrorKind::ConnectionRefused | ErrorKind::ConnectionAborted |
61 ErrorKind::NotConnected | ErrorKind::TimedOut => {
62 ConnResult::Reconnect
63 }
64 _ => ConnResult::Exit(Err(e))
65 }
66 }
67 }
68 () = self.ks.wait() => {
69 ConnResult::Exit(Ok(()))
70 }
71 }
72 }
73
74 async fn retry_delay(&mut self) -> RunResult<Self::Error> {
75 let dur = Duration::from_secs(self.delay.try_into().unwrap());
76 tokio::select! {
77 () = self.ks.wait() => {
78 RunResult::Exit(Ok(()))
79 }
80 () = tokio::time::sleep(dur) => {
81 self.delay = std::cmp::min(self.delay * 2, 60);
83 RunResult::Reconnect
84 }
85 }
86 }
87
88 async fn run(&mut self, conn: Self::ConnType) -> RunResult<Self::Error> {
89 self.delay = 1;
91
92 let fut = (self.cb)(conn, self.ks.clone());
93
94 fut.await
95 }
96}
97
98