socketboard 0.1.0

The official Rust SDK for Socketboard, a lightweight websocket variable table.
Documentation
pub mod types;

use std::collections::HashMap;
use std::fmt::Display;
use std::io::{Error, Read, Write};
use std::net::{IpAddr, SocketAddr, TcpStream};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use serde_json::{json, to_string_pretty, Value};
use crate::types::{ConnectionStatus, Data, JSON};

// create a Socketboard class to connect to the websocket server
pub struct Socketboard {
    address: SocketAddr,
    stream: Option<TcpStream>,
    local_table: Arc<Mutex<HashMap<String, Data>>>,
    message_buffer: Arc<Mutex<Vec<Value>>>,
    pub name: String,
    // status of connection shared between threads
    status: Arc<Mutex<ConnectionStatus>>,
    thread: std::thread::JoinHandle<()>,
}

impl Socketboard {
    pub fn new(name: &str) -> Self {
        Self {
            address: SocketAddr::from(([127, 0, 0, 1], 8080)),
            stream: None,
            local_table: Arc::new(Mutex::new(HashMap::new())),
            name: name.to_string(),
            message_buffer: Arc::new(Mutex::new(Vec::new())),
            status: Arc::new(Mutex::new(ConnectionStatus::None)),
            thread: std::thread::spawn(|| {}),
        }
    }

    pub fn with_host(name: &str, host: &str, port: u16) -> Self {
        let address = match IpAddr::from_str(host) {
            Ok(ip) => ip,
            Err(_) => {
                panic!("Invalid IP address");
            }
        };

        Self {
            address: SocketAddr::new(address, port),
            stream: None,
            local_table: Arc::new(Mutex::new(HashMap::new())),
            name: name.to_string(),
            message_buffer: Arc::new(Mutex::new(Vec::new())),
            status: Arc::new(Mutex::new(ConnectionStatus::None)),
            thread: std::thread::spawn(|| {}),
        }
    }

    pub fn start(&mut self) -> Result<(), Error> {
        // spawn a new thread to connect to the server
        let mut address = self.address.clone();
        let mut status = self.status.clone();
        let mut message_buffer = self.message_buffer.clone();
        let mut local_table = self.local_table.clone();
        let mut name = self.name.clone();

        std::thread::spawn(move || loop {
                const TIMEOUT_SEC: u64 = 1;

                let mut handshake = false;

                let mut stream = match TcpStream::connect_timeout(&address, std::time::Duration::from_secs(TIMEOUT_SEC)) {
                    Ok(stream) => {
                        stream
                    }
                    Err(_) => {
                        if let ConnectionStatus::Stopped = *status.lock().unwrap() {
                            break;
                        }

                        *status.lock().unwrap() = ConnectionStatus::Failed(address.to_string());
                        continue;
                    }
                };

                // change the status
                *status.lock().unwrap() = ConnectionStatus::Connected(address.to_string());

                println!("Socketboard connected to {}", address.to_string());

                match stream.set_nonblocking(true) {
                    Ok(_) => {}
                    Err(e) => {
                        println!("Failed to set non-blocking: {}", e);
                        break;
                    }
                }

                // send the handshake message

                message_buffer.lock().unwrap().insert(0, serde_json::json!({
                    "type": "handshake",
                    "name": name,
                }));

                loop {
                    // read from the stream
                    let mut buffer = [0; 2048];
                    match stream.read(&mut buffer) {
                        Ok(bytes_read) => {
                            // println!("Received {} bytes", bytes_read);
                            let json_string = String::from_utf8_lossy(&buffer[..bytes_read]);
                            let json: Value = match serde_json::from_str(&json_string) {
                                Ok(json) => json,
                                Err(e) => {
                                    println!("Failed to parse JSON: {}", e);
                                    println!("Received: {}", json_string);
                                    continue;
                                }
                            };

                            // println!("{}", to_string_pretty(&json).unwrap());

                            // handle the message
                            match Socketboard::handle(&json, &mut handshake, &mut local_table, &mut status) {
                                Ok(_) => {}
                                // if permission denied, end the connection
                                Err(ref e) if e.kind() == std::io::ErrorKind::ConnectionRefused => {
                                    *status.lock().unwrap() = ConnectionStatus::Stopped;
                                    break;
                                }
                                Err(e) => {
                                    println!("Failed to handle message: {}", e);

                                    break;
                                }
                            }
                        }
                        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
                            // no data to read
                        }
                        Err(_) => {
                            break;
                        }
                    }


                    // write to the stream
                    let mut message_buffer = message_buffer.lock().unwrap();
                    while !message_buffer.is_empty() {
                        let message = message_buffer.remove(0);
                        let json_string = &message.to_string();

                        // if json_string is blank then skip
                        if json_string == "" {
                            continue;
                        }

                        match stream.write_all(json_string.as_bytes()) {
                            Ok(_) => {
                                // println!("Sent: {}", json_string);
                            }
                            Err(e) => {
                                // println!("Failed to send: {}", e);
                                break;
                            }
                        }
                    }

                    if let ConnectionStatus::Stopped = *status.lock().unwrap() {
                        break;
                    }
                }

                if let ConnectionStatus::Stopped = *status.lock().unwrap() {
                    break;
                }

                // wait 1 sec before trying again
                std::thread::sleep(std::time::Duration::from_secs(1));
        });

        // wait for the connection to be established
        for _ in 0..100 {
            std::thread::sleep(std::time::Duration::from_millis(10));
            let status = self.status.lock().unwrap();
            if let ConnectionStatus::Connecting = *status {
                continue;
            } else {
                break;
            }
        }

        if let ConnectionStatus::Connected(_) = self.get_status() {
            return Ok(())
        }

        Err(Error::new(std::io::ErrorKind::Other, "Failed to connect"))
    }

    pub fn stop(&mut self) {
        *self.status.lock().unwrap() = ConnectionStatus::Stopped;
    }

    pub fn get_status(&self) -> ConnectionStatus {
        let status = self.status.lock().unwrap();
        status.clone()
    }

    pub fn send(&mut self, message: Value) {
        // if not connected then ignore
        match self.get_status() {
            ConnectionStatus::Connected(_) => {}
            _ => {
                println!("Not connected");
                return;
            }
        }

        // push the message to the update buffer
        let mut update_buffer = self.message_buffer.lock().unwrap();
        update_buffer.push(message);
    }

    fn handle(
        json: &Value,
        handshake: &mut bool,
        local_table: &mut Arc<Mutex<HashMap<String, Data>>>,
        status: &mut Arc<Mutex<ConnectionStatus>>,
    ) -> Result<(), Error> {
        match json["type"].as_str() {
            Some("handshake") => {
                if *handshake {
                    return Err(Error::new(std::io::ErrorKind::Other, "Handshake already completed"))
                }

                // check if status is ok, otherwise return an error
                let response_status = json["status"].as_str().unwrap();
                if response_status != "ok" {
                    // check if there is a message provided
                    let message = json["message"].as_str().unwrap_or("No message");
                    println!("Handshake failed: {}", message);
                    return Err(Error::new(std::io::ErrorKind::ConnectionRefused, message))
                }

                // check if there is a terminate: true
                let terminate = json["terminate"].as_bool().unwrap_or(false);
                if terminate {
                    return Err(Error::new(std::io::ErrorKind::ConnectionRefused, "Connection terminated"));
                }

                // see if there is a table object and update the local table
                let table_json = &json["table"];
                let mut table = local_table.lock().unwrap();

                for (key, value) in table_json.as_object().unwrap() {
                    table.insert(key.to_string(), Data::from_json(value));
                }

                // change the status
                *status.lock().unwrap() = ConnectionStatus::Connected("socketboard".to_string());
                *handshake = true;
                Ok(())
            }
            Some("update") => {
                // check if there is a terminate: true
                let terminate = json["terminate"].as_bool().unwrap_or(false);
                if terminate {
                    return Err(Error::new(std::io::ErrorKind::ConnectionRefused, "Connection terminated"));
                }

                // get "table" from the json
                let table_json = &json["table"];
                // if there's no table, return an error
                if table_json.is_null() {
                    return Err(Error::new(std::io::ErrorKind::Other, "No table provided"));
                }

                let mut table = local_table.lock().unwrap();

                // update the local table
                for (key, value) in table_json.as_object().unwrap() {
                    table.insert(key.to_string(), Data::from_json(value));
                }

                Ok(())
            }
            _ => {
                println!("Unknown message: {}", json);
                Err(Error::new(std::io::ErrorKind::Other, "Unknown message"))
            }
        }
    }

    pub fn put_value(&mut self, key: &str, value: Data) -> Result<(), Error> {
        self.local_table.lock().unwrap().insert(key.to_string(), value.clone());

        match self.get_status() {
            ConnectionStatus::Connected(_) => {}
            _ => {
                return Err(Error::new(std::io::ErrorKind::Other, "Not connected"));
            }
        }

        // create a JSON object with the key and value
        let updated_table = json!({
            key: value.to_json(),
        });
        // send the updated table to the server
        self.send(json!({
            "type": "update",
            "table": updated_table,
        }));

        Ok(())
    }

    pub fn get_value(&self, key: &str, default: Data) -> Data {
        let table = self.local_table.lock().unwrap();
        match table.get(key).cloned() {
            Some(value) => value,
            _ => default,
        }
    }

    pub fn print_table(&self) {
        println!("-- SERVER DATA TABLE --");
        for (key, value) in self.local_table.lock().unwrap().iter() {
            println!("{}          {}", key, value);
        }
        println!("-----------------------");
    }
}