1pub mod types;
2
3use std::collections::HashMap;
4use std::fmt::Display;
5use std::io::{Error, Read, Write};
6use std::net::{IpAddr, SocketAddr, TcpStream};
7use std::str::FromStr;
8use std::sync::{Arc, Mutex};
9use serde_json::{json, to_string_pretty, Value};
10use crate::types::{ConnectionStatus, Data, JSON};
11
12pub struct Socketboard {
14 address: SocketAddr,
15 stream: Option<TcpStream>,
16 local_table: Arc<Mutex<HashMap<String, Data>>>,
17 message_buffer: Arc<Mutex<Vec<Value>>>,
18 pub name: String,
19 status: Arc<Mutex<ConnectionStatus>>,
21 thread: std::thread::JoinHandle<()>,
22}
23
24impl Socketboard {
25 pub fn new(name: &str) -> Self {
26 Self {
27 address: SocketAddr::from(([127, 0, 0, 1], 8080)),
28 stream: None,
29 local_table: Arc::new(Mutex::new(HashMap::new())),
30 name: name.to_string(),
31 message_buffer: Arc::new(Mutex::new(Vec::new())),
32 status: Arc::new(Mutex::new(ConnectionStatus::None)),
33 thread: std::thread::spawn(|| {}),
34 }
35 }
36
37 pub fn with_host(name: &str, host: &str, port: u16) -> Self {
38 let address = match IpAddr::from_str(host) {
39 Ok(ip) => ip,
40 Err(_) => {
41 panic!("Invalid IP address");
42 }
43 };
44
45 Self {
46 address: SocketAddr::new(address, port),
47 stream: None,
48 local_table: Arc::new(Mutex::new(HashMap::new())),
49 name: name.to_string(),
50 message_buffer: Arc::new(Mutex::new(Vec::new())),
51 status: Arc::new(Mutex::new(ConnectionStatus::None)),
52 thread: std::thread::spawn(|| {}),
53 }
54 }
55
56 pub fn start(&mut self) -> Result<(), Error> {
57 let mut address = self.address.clone();
59 let mut status = self.status.clone();
60 let mut message_buffer = self.message_buffer.clone();
61 let mut local_table = self.local_table.clone();
62 let mut name = self.name.clone();
63
64 std::thread::spawn(move || loop {
65 const TIMEOUT_SEC: u64 = 1;
66
67 let mut handshake = false;
68
69 let mut stream = match TcpStream::connect_timeout(&address, std::time::Duration::from_secs(TIMEOUT_SEC)) {
70 Ok(stream) => {
71 stream
72 }
73 Err(_) => {
74 if let ConnectionStatus::Stopped = *status.lock().unwrap() {
75 break;
76 }
77
78 *status.lock().unwrap() = ConnectionStatus::Failed(address.to_string());
79 continue;
80 }
81 };
82
83 *status.lock().unwrap() = ConnectionStatus::Connected(address.to_string());
85
86 println!("Socketboard connected to {}", address.to_string());
87
88 match stream.set_nonblocking(true) {
89 Ok(_) => {}
90 Err(e) => {
91 println!("Failed to set non-blocking: {}", e);
92 break;
93 }
94 }
95
96 message_buffer.lock().unwrap().insert(0, serde_json::json!({
99 "type": "handshake",
100 "name": name,
101 }));
102
103 loop {
104 let mut buffer = [0; 2048];
106 match stream.read(&mut buffer) {
107 Ok(bytes_read) => {
108 let json_string = String::from_utf8_lossy(&buffer[..bytes_read]);
110 let json: Value = match serde_json::from_str(&json_string) {
111 Ok(json) => json,
112 Err(e) => {
113 println!("Failed to parse JSON: {}", e);
114 println!("Received: {}", json_string);
115 continue;
116 }
117 };
118
119 match Socketboard::handle(&json, &mut handshake, &mut local_table, &mut status) {
123 Ok(_) => {}
124 Err(ref e) if e.kind() == std::io::ErrorKind::ConnectionRefused => {
126 *status.lock().unwrap() = ConnectionStatus::Stopped;
127 break;
128 }
129 Err(e) => {
130 println!("Failed to handle message: {}", e);
131
132 break;
133 }
134 }
135 }
136 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
137 }
139 Err(_) => {
140 break;
141 }
142 }
143
144
145 let mut message_buffer = message_buffer.lock().unwrap();
147 while !message_buffer.is_empty() {
148 let message = message_buffer.remove(0);
149 let json_string = &message.to_string();
150
151 if json_string == "" {
153 continue;
154 }
155
156 match stream.write_all(json_string.as_bytes()) {
157 Ok(_) => {
158 }
160 Err(e) => {
161 break;
163 }
164 }
165 }
166
167 if let ConnectionStatus::Stopped = *status.lock().unwrap() {
168 break;
169 }
170 }
171
172 if let ConnectionStatus::Stopped = *status.lock().unwrap() {
173 break;
174 }
175
176 std::thread::sleep(std::time::Duration::from_secs(1));
178 });
179
180 for _ in 0..100 {
182 std::thread::sleep(std::time::Duration::from_millis(10));
183 let status = self.status.lock().unwrap();
184 if let ConnectionStatus::Connecting = *status {
185 continue;
186 } else {
187 break;
188 }
189 }
190
191 if let ConnectionStatus::Connected(_) = self.get_status() {
192 return Ok(())
193 }
194
195 Err(Error::new(std::io::ErrorKind::Other, "Failed to connect"))
196 }
197
198 pub fn stop(&mut self) {
199 *self.status.lock().unwrap() = ConnectionStatus::Stopped;
200 }
201
202 pub fn get_status(&self) -> ConnectionStatus {
203 let status = self.status.lock().unwrap();
204 status.clone()
205 }
206
207 pub fn send(&mut self, message: Value) {
208 match self.get_status() {
210 ConnectionStatus::Connected(_) => {}
211 _ => {
212 println!("Not connected");
213 return;
214 }
215 }
216
217 let mut update_buffer = self.message_buffer.lock().unwrap();
219 update_buffer.push(message);
220 }
221
222 fn handle(
223 json: &Value,
224 handshake: &mut bool,
225 local_table: &mut Arc<Mutex<HashMap<String, Data>>>,
226 status: &mut Arc<Mutex<ConnectionStatus>>,
227 ) -> Result<(), Error> {
228 match json["type"].as_str() {
229 Some("handshake") => {
230 if *handshake {
231 return Err(Error::new(std::io::ErrorKind::Other, "Handshake already completed"))
232 }
233
234 let response_status = json["status"].as_str().unwrap();
236 if response_status != "ok" {
237 let message = json["message"].as_str().unwrap_or("No message");
239 println!("Handshake failed: {}", message);
240 return Err(Error::new(std::io::ErrorKind::ConnectionRefused, message))
241 }
242
243 let terminate = json["terminate"].as_bool().unwrap_or(false);
245 if terminate {
246 return Err(Error::new(std::io::ErrorKind::ConnectionRefused, "Connection terminated"));
247 }
248
249 let table_json = &json["table"];
251 let mut table = local_table.lock().unwrap();
252
253 for (key, value) in table_json.as_object().unwrap() {
254 table.insert(key.to_string(), Data::from_json(value));
255 }
256
257 *status.lock().unwrap() = ConnectionStatus::Connected("socketboard".to_string());
259 *handshake = true;
260 Ok(())
261 }
262 Some("update") => {
263 let terminate = json["terminate"].as_bool().unwrap_or(false);
265 if terminate {
266 return Err(Error::new(std::io::ErrorKind::ConnectionRefused, "Connection terminated"));
267 }
268
269 let table_json = &json["table"];
271 if table_json.is_null() {
273 return Err(Error::new(std::io::ErrorKind::Other, "No table provided"));
274 }
275
276 let mut table = local_table.lock().unwrap();
277
278 for (key, value) in table_json.as_object().unwrap() {
280 table.insert(key.to_string(), Data::from_json(value));
281 }
282
283 Ok(())
284 }
285 _ => {
286 println!("Unknown message: {}", json);
287 Err(Error::new(std::io::ErrorKind::Other, "Unknown message"))
288 }
289 }
290 }
291
292 pub fn put_value(&mut self, key: &str, value: Data) -> Result<(), Error> {
293 self.local_table.lock().unwrap().insert(key.to_string(), value.clone());
294
295 match self.get_status() {
296 ConnectionStatus::Connected(_) => {}
297 _ => {
298 return Err(Error::new(std::io::ErrorKind::Other, "Not connected"));
299 }
300 }
301
302 let updated_table = json!({
304 key: value.to_json(),
305 });
306 self.send(json!({
308 "type": "update",
309 "table": updated_table,
310 }));
311
312 Ok(())
313 }
314
315 pub fn get_value(&self, key: &str, default: Data) -> Data {
316 let table = self.local_table.lock().unwrap();
317 match table.get(key).cloned() {
318 Some(value) => value,
319 _ => default,
320 }
321 }
322
323 pub fn print_table(&self) {
324 println!("-- SERVER DATA TABLE --");
325 for (key, value) in self.local_table.lock().unwrap().iter() {
326 println!("{} {}", key, value);
327 }
328 println!("-----------------------");
329 }
330}