discord_rpc_client/connection/
manager.rs

1use std::{
2    thread,
3    sync::{Arc, Mutex},
4    time,
5    io::ErrorKind
6};
7use log::{debug, error};
8use crossbeam_channel::{unbounded, Receiver, Sender};
9use serde_json::Value as JsonValue;
10use super::{
11    Connection,
12    SocketConnection,
13};
14use crate::{
15    models::{Event, Message, payload::Payload},
16    error::{Error, Result},
17    event_handler::HandlerRegistry,
18};
19
20
21type Tx = Sender<Message>;
22type Rx = Receiver<Message>;
23
24// TODO: Refactor connection manager
25#[derive(Clone)]
26pub struct Manager {
27    connection: Arc<Option<Mutex<SocketConnection>>>,
28    client_id: u64,
29    outbound: (Rx, Tx),
30    inbound: (Rx, Tx),
31    handshake_completed: bool,
32    event_handler_registry: HandlerRegistry<'static>,
33}
34
35impl Manager {
36    pub fn new(client_id: u64, event_handler_registry: HandlerRegistry<'static>) -> Self {
37        let connection = Arc::new(None);
38        let (sender_o, receiver_o) = unbounded();
39        let (sender_i, receiver_i) = unbounded();
40
41        Self {
42            connection,
43            client_id,
44            handshake_completed: false,
45            inbound: (receiver_i, sender_i),
46            outbound: (receiver_o, sender_o),
47            event_handler_registry,
48        }
49    }
50
51    pub fn start(&mut self) {
52        let manager_inner = self.clone();
53        thread::spawn(move || {
54            send_and_receive_loop(manager_inner);
55        });
56    }
57
58    pub fn send(&self, message: Message) -> Result<()> {
59        self.outbound.1.send(message).unwrap();
60        Ok(())
61    }
62
63    pub fn recv(&self) -> Result<Message> {
64        let message = self.inbound.0.recv().unwrap();
65        Ok(message)
66    }
67
68    fn connect(&mut self) -> Result<()> {
69        if self.connection.is_some() {
70            return Ok(());
71        }
72
73        debug!("Connecting");
74
75        let mut new_connection = SocketConnection::connect()?;
76
77        debug!("Performing handshake");
78        let msg = new_connection.handshake(self.client_id)?;
79        let payload: Payload<JsonValue> = serde_json::from_str(&msg.payload)?;
80        self.event_handler_registry.handle(Event::Ready, payload.data.unwrap())?;
81        debug!("Handshake completed");
82
83        self.connection = Arc::new(Some(Mutex::new(new_connection)));
84
85        debug!("Connected");
86
87        Ok(())
88    }
89
90    fn disconnect(&mut self) {
91        self.handshake_completed = false;
92        self.connection = Arc::new(None);
93    }
94
95}
96
97
98fn send_and_receive_loop(mut manager: Manager) {
99    debug!("Starting sender loop");
100
101    let mut inbound = manager.inbound.1.clone();
102    let outbound = manager.outbound.0.clone();
103
104    loop {
105        let connection = manager.connection.clone();
106
107        match *connection {
108            Some(ref conn) => {
109                let mut connection = conn.lock().unwrap();
110                match send_and_receive(&mut *connection, &mut manager.event_handler_registry, &mut inbound, &outbound) {
111                    Err(Error::IoError(ref err)) if err.kind() == ErrorKind::WouldBlock => (),
112                    Err(Error::IoError(_)) | Err(Error::ConnectionClosed) => manager.disconnect(),
113                    Err(why) => error!("error: {}", why),
114                    _ => (),
115                }
116
117                thread::sleep(time::Duration::from_millis(500));
118            },
119            None => {
120                match manager.connect() {
121                    Err(err) => {
122                        match err {
123                            Error::IoError(ref err) if err.kind() == ErrorKind::ConnectionRefused => (),
124                            why => error!("Failed to connect: {:?}", why),
125                        }
126                        thread::sleep(time::Duration::from_secs(10));
127                    },
128                    _ => manager.handshake_completed = true,
129                }
130            }
131        }
132    }
133}
134
135fn send_and_receive(connection: &mut SocketConnection, event_handler_registry: &mut HandlerRegistry, inbound: &mut Tx, outbound: &Rx) -> Result<()> {
136    while let Ok(msg) = outbound.try_recv() {
137        connection.send(&msg).expect("Failed to send outgoing data");
138    }
139
140    let msg = connection.recv()?;
141
142    let payload: Payload<JsonValue> = serde_json::from_str(&msg.payload)?;
143
144    match &payload {
145        Payload { evt: Some(event), .. } => {
146            event_handler_registry.handle(event.clone(), payload.data.unwrap())?;
147        },
148        _ => {
149            inbound.send(msg).expect("Failed to send received data");
150        },
151    }
152
153    Ok(())
154}