discord_rpc_client/connection/
manager.rs1use std::{
2 thread,
3 sync::{Arc, Mutex},
4 time,
5 io::ErrorKind
6};
7use log::{debug, error};
8use crossbeam_channel::{unbounded, Receiver, Sender};
9use serde_json::Value as JsonValue;
10use super::{
11 Connection,
12 SocketConnection,
13};
14use crate::{
15 models::{Event, Message, payload::Payload},
16 error::{Error, Result},
17 event_handler::HandlerRegistry,
18};
19
20
21type Tx = Sender<Message>;
22type Rx = Receiver<Message>;
23
24#[derive(Clone)]
26pub struct Manager {
27 connection: Arc<Option<Mutex<SocketConnection>>>,
28 client_id: u64,
29 outbound: (Rx, Tx),
30 inbound: (Rx, Tx),
31 handshake_completed: bool,
32 event_handler_registry: HandlerRegistry<'static>,
33}
34
35impl Manager {
36 pub fn new(client_id: u64, event_handler_registry: HandlerRegistry<'static>) -> Self {
37 let connection = Arc::new(None);
38 let (sender_o, receiver_o) = unbounded();
39 let (sender_i, receiver_i) = unbounded();
40
41 Self {
42 connection,
43 client_id,
44 handshake_completed: false,
45 inbound: (receiver_i, sender_i),
46 outbound: (receiver_o, sender_o),
47 event_handler_registry,
48 }
49 }
50
51 pub fn start(&mut self) {
52 let manager_inner = self.clone();
53 thread::spawn(move || {
54 send_and_receive_loop(manager_inner);
55 });
56 }
57
58 pub fn send(&self, message: Message) -> Result<()> {
59 self.outbound.1.send(message).unwrap();
60 Ok(())
61 }
62
63 pub fn recv(&self) -> Result<Message> {
64 let message = self.inbound.0.recv().unwrap();
65 Ok(message)
66 }
67
68 fn connect(&mut self) -> Result<()> {
69 if self.connection.is_some() {
70 return Ok(());
71 }
72
73 debug!("Connecting");
74
75 let mut new_connection = SocketConnection::connect()?;
76
77 debug!("Performing handshake");
78 let msg = new_connection.handshake(self.client_id)?;
79 let payload: Payload<JsonValue> = serde_json::from_str(&msg.payload)?;
80 self.event_handler_registry.handle(Event::Ready, payload.data.unwrap())?;
81 debug!("Handshake completed");
82
83 self.connection = Arc::new(Some(Mutex::new(new_connection)));
84
85 debug!("Connected");
86
87 Ok(())
88 }
89
90 fn disconnect(&mut self) {
91 self.handshake_completed = false;
92 self.connection = Arc::new(None);
93 }
94
95}
96
97
98fn send_and_receive_loop(mut manager: Manager) {
99 debug!("Starting sender loop");
100
101 let mut inbound = manager.inbound.1.clone();
102 let outbound = manager.outbound.0.clone();
103
104 loop {
105 let connection = manager.connection.clone();
106
107 match *connection {
108 Some(ref conn) => {
109 let mut connection = conn.lock().unwrap();
110 match send_and_receive(&mut *connection, &mut manager.event_handler_registry, &mut inbound, &outbound) {
111 Err(Error::IoError(ref err)) if err.kind() == ErrorKind::WouldBlock => (),
112 Err(Error::IoError(_)) | Err(Error::ConnectionClosed) => manager.disconnect(),
113 Err(why) => error!("error: {}", why),
114 _ => (),
115 }
116
117 thread::sleep(time::Duration::from_millis(500));
118 },
119 None => {
120 match manager.connect() {
121 Err(err) => {
122 match err {
123 Error::IoError(ref err) if err.kind() == ErrorKind::ConnectionRefused => (),
124 why => error!("Failed to connect: {:?}", why),
125 }
126 thread::sleep(time::Duration::from_secs(10));
127 },
128 _ => manager.handshake_completed = true,
129 }
130 }
131 }
132 }
133}
134
135fn send_and_receive(connection: &mut SocketConnection, event_handler_registry: &mut HandlerRegistry, inbound: &mut Tx, outbound: &Rx) -> Result<()> {
136 while let Ok(msg) = outbound.try_recv() {
137 connection.send(&msg).expect("Failed to send outgoing data");
138 }
139
140 let msg = connection.recv()?;
141
142 let payload: Payload<JsonValue> = serde_json::from_str(&msg.payload)?;
143
144 match &payload {
145 Payload { evt: Some(event), .. } => {
146 event_handler_registry.handle(event.clone(), payload.data.unwrap())?;
147 },
148 _ => {
149 inbound.send(msg).expect("Failed to send received data");
150 },
151 }
152
153 Ok(())
154}