1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use std::{ops::DerefMut,
          sync::{Arc, Mutex},
          thread,
          time::Duration};

use crate::{connection::SingleConnection,
            error::{RconError,
                    RconError::{BusyReconnecting, IO}},
            reconnect::Status::{Connected, Disconnected}};

enum Status {
    Connected(SingleConnection),
    Disconnected(String),
}

/// Drop-in replacement wrapper of [`Connection`](struct.Connection.html) which intercepts all [`IO errors`](enum.Error.html#variant.IO)
/// returned by [`Connection::exec`](struct.Connection.html#method.exec) to start the reconnection thread, and will opt to return [`BusyReconnecting`](enum.Error.html#variant.BusyReconnecting)
/// instead.
///
/// For further docs, refer to [`Connection`](struct.Connection.html), as it shares the same API.
#[derive(Clone)]
pub struct ReconnectingConnection {
    address: String,
    pass: String,
    connect_timeout: Option<Duration>,

    internal: Arc<Mutex<Status>>,
}

impl ReconnectingConnection {
    /// This function behaves identical to [`Connection::open`](struct.Connection.html#method.open).
    pub fn open<S: Into<String>>(address: S, pass: S, connect_timeout: Option<Duration>) -> Result<Self, RconError> {
        let address = address.into();
        let pass = pass.into();
        let internal = Arc::new(Mutex::new(Connected(SingleConnection::open(address.clone(), pass.clone(), connect_timeout)?)));
        Ok(ReconnectingConnection {
            address,
            pass,
            connect_timeout,
            internal,
        })
    }

    /// This function behaves identical to [`Connection::exec`](struct.Connection.html#method.exec) unless `Err([IO](enum.Error.html#variant.IO))` is returned,
    /// in which case it will start reconnecting and return [`BusyReconnecting`](enum.Error.html#variant.BusyReconnecting) until the connection has been re-established.
    pub fn exec<S: Into<String>>(&mut self, cmd: S) -> Result<String, RconError> {
        // First, we check if we are actively reconnecting, this must be done within a Mutex
        let result = {
            let mut lock = self.internal.lock().unwrap();
            let connection = match lock.deref_mut() {
                Connected(ref mut c) => c,
                Disconnected(msg) => return Err(BusyReconnecting(msg.clone())),
            };

            connection.exec(cmd)
        };

        // If we are connected, send the request
        match result {
            Err(e) => match &e {
                IO(_) => Err(self.start_reconnect(e)),
                _ => Err(e)
            },
            Ok(result) => Ok(result)
        }
    }

    fn start_reconnect(&self, e: RconError) -> RconError {
        // First, we change the status, which automatically disconnects the old connection
        {
            let mut lock = self.internal.lock().unwrap();
            *lock = Disconnected(e.to_string());
        }

        let clone = self.clone();
        thread::Builder::new()
            .name(format!("RCON-{}-reconnect-thread", &self.address))
            .spawn(move || clone.reconnect_loop()).unwrap();

        BusyReconnecting(e.to_string())
    }

    fn reconnect_loop(&self) {
        loop {
            match SingleConnection::open(self.address.clone(), self.pass.clone(), self.connect_timeout.clone()) {
                Err(_) => {
                    thread::sleep(Duration::from_secs(1));
                }
                Ok(c) => {
                    let mut lock = self.internal.lock().unwrap();
                    *lock = Connected(c);
                    return;
                }
            }
        }
    }
}