socketboard/
lib.rs

1pub mod types;
2
3use std::collections::HashMap;
4use std::fmt::Display;
5use std::io::{Error, Read, Write};
6use std::net::{IpAddr, SocketAddr, TcpStream};
7use std::str::FromStr;
8use std::sync::{Arc, Mutex};
9use serde_json::{json, to_string_pretty, Value};
10use crate::types::{ConnectionStatus, Data, JSON};
11
12// create a Socketboard class to connect to the websocket server
13pub struct Socketboard {
14    address: SocketAddr,
15    stream: Option<TcpStream>,
16    local_table: Arc<Mutex<HashMap<String, Data>>>,
17    message_buffer: Arc<Mutex<Vec<Value>>>,
18    pub name: String,
19    // status of connection shared between threads
20    status: Arc<Mutex<ConnectionStatus>>,
21    thread: std::thread::JoinHandle<()>,
22}
23
24impl Socketboard {
25    pub fn new(name: &str) -> Self {
26        Self {
27            address: SocketAddr::from(([127, 0, 0, 1], 8080)),
28            stream: None,
29            local_table: Arc::new(Mutex::new(HashMap::new())),
30            name: name.to_string(),
31            message_buffer: Arc::new(Mutex::new(Vec::new())),
32            status: Arc::new(Mutex::new(ConnectionStatus::None)),
33            thread: std::thread::spawn(|| {}),
34        }
35    }
36
37    pub fn with_host(name: &str, host: &str, port: u16) -> Self {
38        let address = match IpAddr::from_str(host) {
39            Ok(ip) => ip,
40            Err(_) => {
41                panic!("Invalid IP address");
42            }
43        };
44
45        Self {
46            address: SocketAddr::new(address, port),
47            stream: None,
48            local_table: Arc::new(Mutex::new(HashMap::new())),
49            name: name.to_string(),
50            message_buffer: Arc::new(Mutex::new(Vec::new())),
51            status: Arc::new(Mutex::new(ConnectionStatus::None)),
52            thread: std::thread::spawn(|| {}),
53        }
54    }
55
56    pub fn start(&mut self) -> Result<(), Error> {
57        // spawn a new thread to connect to the server
58        let mut address = self.address.clone();
59        let mut status = self.status.clone();
60        let mut message_buffer = self.message_buffer.clone();
61        let mut local_table = self.local_table.clone();
62        let mut name = self.name.clone();
63
64        std::thread::spawn(move || loop {
65                const TIMEOUT_SEC: u64 = 1;
66
67                let mut handshake = false;
68
69                let mut stream = match TcpStream::connect_timeout(&address, std::time::Duration::from_secs(TIMEOUT_SEC)) {
70                    Ok(stream) => {
71                        stream
72                    }
73                    Err(_) => {
74                        if let ConnectionStatus::Stopped = *status.lock().unwrap() {
75                            break;
76                        }
77
78                        *status.lock().unwrap() = ConnectionStatus::Failed(address.to_string());
79                        continue;
80                    }
81                };
82
83                // change the status
84                *status.lock().unwrap() = ConnectionStatus::Connected(address.to_string());
85
86                println!("Socketboard connected to {}", address.to_string());
87
88                match stream.set_nonblocking(true) {
89                    Ok(_) => {}
90                    Err(e) => {
91                        println!("Failed to set non-blocking: {}", e);
92                        break;
93                    }
94                }
95
96                // send the handshake message
97
98                message_buffer.lock().unwrap().insert(0, serde_json::json!({
99                    "type": "handshake",
100                    "name": name,
101                }));
102
103                loop {
104                    // read from the stream
105                    let mut buffer = [0; 2048];
106                    match stream.read(&mut buffer) {
107                        Ok(bytes_read) => {
108                            // println!("Received {} bytes", bytes_read);
109                            let json_string = String::from_utf8_lossy(&buffer[..bytes_read]);
110                            let json: Value = match serde_json::from_str(&json_string) {
111                                Ok(json) => json,
112                                Err(e) => {
113                                    println!("Failed to parse JSON: {}", e);
114                                    println!("Received: {}", json_string);
115                                    continue;
116                                }
117                            };
118
119                            // println!("{}", to_string_pretty(&json).unwrap());
120
121                            // handle the message
122                            match Socketboard::handle(&json, &mut handshake, &mut local_table, &mut status) {
123                                Ok(_) => {}
124                                // if permission denied, end the connection
125                                Err(ref e) if e.kind() == std::io::ErrorKind::ConnectionRefused => {
126                                    *status.lock().unwrap() = ConnectionStatus::Stopped;
127                                    break;
128                                }
129                                Err(e) => {
130                                    println!("Failed to handle message: {}", e);
131
132                                    break;
133                                }
134                            }
135                        }
136                        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
137                            // no data to read
138                        }
139                        Err(_) => {
140                            break;
141                        }
142                    }
143
144
145                    // write to the stream
146                    let mut message_buffer = message_buffer.lock().unwrap();
147                    while !message_buffer.is_empty() {
148                        let message = message_buffer.remove(0);
149                        let json_string = &message.to_string();
150
151                        // if json_string is blank then skip
152                        if json_string == "" {
153                            continue;
154                        }
155
156                        match stream.write_all(json_string.as_bytes()) {
157                            Ok(_) => {
158                                // println!("Sent: {}", json_string);
159                            }
160                            Err(e) => {
161                                // println!("Failed to send: {}", e);
162                                break;
163                            }
164                        }
165                    }
166
167                    if let ConnectionStatus::Stopped = *status.lock().unwrap() {
168                        break;
169                    }
170                }
171
172                if let ConnectionStatus::Stopped = *status.lock().unwrap() {
173                    break;
174                }
175
176                // wait 1 sec before trying again
177                std::thread::sleep(std::time::Duration::from_secs(1));
178        });
179
180        // wait for the connection to be established
181        for _ in 0..100 {
182            std::thread::sleep(std::time::Duration::from_millis(10));
183            let status = self.status.lock().unwrap();
184            if let ConnectionStatus::Connecting = *status {
185                continue;
186            } else {
187                break;
188            }
189        }
190
191        if let ConnectionStatus::Connected(_) = self.get_status() {
192            return Ok(())
193        }
194
195        Err(Error::new(std::io::ErrorKind::Other, "Failed to connect"))
196    }
197
198    pub fn stop(&mut self) {
199        *self.status.lock().unwrap() = ConnectionStatus::Stopped;
200    }
201
202    pub fn get_status(&self) -> ConnectionStatus {
203        let status = self.status.lock().unwrap();
204        status.clone()
205    }
206
207    pub fn send(&mut self, message: Value) {
208        // if not connected then ignore
209        match self.get_status() {
210            ConnectionStatus::Connected(_) => {}
211            _ => {
212                println!("Not connected");
213                return;
214            }
215        }
216
217        // push the message to the update buffer
218        let mut update_buffer = self.message_buffer.lock().unwrap();
219        update_buffer.push(message);
220    }
221
222    fn handle(
223        json: &Value,
224        handshake: &mut bool,
225        local_table: &mut Arc<Mutex<HashMap<String, Data>>>,
226        status: &mut Arc<Mutex<ConnectionStatus>>,
227    ) -> Result<(), Error> {
228        match json["type"].as_str() {
229            Some("handshake") => {
230                if *handshake {
231                    return Err(Error::new(std::io::ErrorKind::Other, "Handshake already completed"))
232                }
233
234                // check if status is ok, otherwise return an error
235                let response_status = json["status"].as_str().unwrap();
236                if response_status != "ok" {
237                    // check if there is a message provided
238                    let message = json["message"].as_str().unwrap_or("No message");
239                    println!("Handshake failed: {}", message);
240                    return Err(Error::new(std::io::ErrorKind::ConnectionRefused, message))
241                }
242
243                // check if there is a terminate: true
244                let terminate = json["terminate"].as_bool().unwrap_or(false);
245                if terminate {
246                    return Err(Error::new(std::io::ErrorKind::ConnectionRefused, "Connection terminated"));
247                }
248
249                // see if there is a table object and update the local table
250                let table_json = &json["table"];
251                let mut table = local_table.lock().unwrap();
252
253                for (key, value) in table_json.as_object().unwrap() {
254                    table.insert(key.to_string(), Data::from_json(value));
255                }
256
257                // change the status
258                *status.lock().unwrap() = ConnectionStatus::Connected("socketboard".to_string());
259                *handshake = true;
260                Ok(())
261            }
262            Some("update") => {
263                // check if there is a terminate: true
264                let terminate = json["terminate"].as_bool().unwrap_or(false);
265                if terminate {
266                    return Err(Error::new(std::io::ErrorKind::ConnectionRefused, "Connection terminated"));
267                }
268
269                // get "table" from the json
270                let table_json = &json["table"];
271                // if there's no table, return an error
272                if table_json.is_null() {
273                    return Err(Error::new(std::io::ErrorKind::Other, "No table provided"));
274                }
275
276                let mut table = local_table.lock().unwrap();
277
278                // update the local table
279                for (key, value) in table_json.as_object().unwrap() {
280                    table.insert(key.to_string(), Data::from_json(value));
281                }
282
283                Ok(())
284            }
285            _ => {
286                println!("Unknown message: {}", json);
287                Err(Error::new(std::io::ErrorKind::Other, "Unknown message"))
288            }
289        }
290    }
291
292    pub fn put_value(&mut self, key: &str, value: Data) -> Result<(), Error> {
293        self.local_table.lock().unwrap().insert(key.to_string(), value.clone());
294
295        match self.get_status() {
296            ConnectionStatus::Connected(_) => {}
297            _ => {
298                return Err(Error::new(std::io::ErrorKind::Other, "Not connected"));
299            }
300        }
301
302        // create a JSON object with the key and value
303        let updated_table = json!({
304            key: value.to_json(),
305        });
306        // send the updated table to the server
307        self.send(json!({
308            "type": "update",
309            "table": updated_table,
310        }));
311
312        Ok(())
313    }
314
315    pub fn get_value(&self, key: &str, default: Data) -> Data {
316        let table = self.local_table.lock().unwrap();
317        match table.get(key).cloned() {
318            Some(value) => value,
319            _ => default,
320        }
321    }
322
323    pub fn print_table(&self) {
324        println!("-- SERVER DATA TABLE --");
325        for (key, value) in self.local_table.lock().unwrap().iter() {
326            println!("{}          {}", key, value);
327        }
328        println!("-----------------------");
329    }
330}