pub mod types;
use std::collections::HashMap;
use std::fmt::Display;
use std::io::{Error, Read, Write};
use std::net::{IpAddr, SocketAddr, TcpStream};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use serde_json::{json, to_string_pretty, Value};
use crate::types::{ConnectionStatus, Data, JSON};
pub struct Socketboard {
address: SocketAddr,
stream: Option<TcpStream>,
local_table: Arc<Mutex<HashMap<String, Data>>>,
message_buffer: Arc<Mutex<Vec<Value>>>,
pub name: String,
status: Arc<Mutex<ConnectionStatus>>,
thread: std::thread::JoinHandle<()>,
}
impl Socketboard {
pub fn new(name: &str) -> Self {
Self {
address: SocketAddr::from(([127, 0, 0, 1], 8080)),
stream: None,
local_table: Arc::new(Mutex::new(HashMap::new())),
name: name.to_string(),
message_buffer: Arc::new(Mutex::new(Vec::new())),
status: Arc::new(Mutex::new(ConnectionStatus::None)),
thread: std::thread::spawn(|| {}),
}
}
pub fn with_host(name: &str, host: &str, port: u16) -> Self {
let address = match IpAddr::from_str(host) {
Ok(ip) => ip,
Err(_) => {
panic!("Invalid IP address");
}
};
Self {
address: SocketAddr::new(address, port),
stream: None,
local_table: Arc::new(Mutex::new(HashMap::new())),
name: name.to_string(),
message_buffer: Arc::new(Mutex::new(Vec::new())),
status: Arc::new(Mutex::new(ConnectionStatus::None)),
thread: std::thread::spawn(|| {}),
}
}
pub fn start(&mut self) -> Result<(), Error> {
let mut address = self.address.clone();
let mut status = self.status.clone();
let mut message_buffer = self.message_buffer.clone();
let mut local_table = self.local_table.clone();
let mut name = self.name.clone();
std::thread::spawn(move || loop {
const TIMEOUT_SEC: u64 = 1;
let mut handshake = false;
let mut stream = match TcpStream::connect_timeout(&address, std::time::Duration::from_secs(TIMEOUT_SEC)) {
Ok(stream) => {
stream
}
Err(_) => {
if let ConnectionStatus::Stopped = *status.lock().unwrap() {
break;
}
*status.lock().unwrap() = ConnectionStatus::Failed(address.to_string());
continue;
}
};
*status.lock().unwrap() = ConnectionStatus::Connected(address.to_string());
println!("Socketboard connected to {}", address.to_string());
match stream.set_nonblocking(true) {
Ok(_) => {}
Err(e) => {
println!("Failed to set non-blocking: {}", e);
break;
}
}
message_buffer.lock().unwrap().insert(0, serde_json::json!({
"type": "handshake",
"name": name,
}));
loop {
let mut buffer = [0; 2048];
match stream.read(&mut buffer) {
Ok(bytes_read) => {
let json_string = String::from_utf8_lossy(&buffer[..bytes_read]);
let json: Value = match serde_json::from_str(&json_string) {
Ok(json) => json,
Err(e) => {
println!("Failed to parse JSON: {}", e);
println!("Received: {}", json_string);
continue;
}
};
match Socketboard::handle(&json, &mut handshake, &mut local_table, &mut status) {
Ok(_) => {}
Err(ref e) if e.kind() == std::io::ErrorKind::ConnectionRefused => {
*status.lock().unwrap() = ConnectionStatus::Stopped;
break;
}
Err(e) => {
println!("Failed to handle message: {}", e);
break;
}
}
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
}
Err(_) => {
break;
}
}
let mut message_buffer = message_buffer.lock().unwrap();
while !message_buffer.is_empty() {
let message = message_buffer.remove(0);
let json_string = &message.to_string();
if json_string == "" {
continue;
}
match stream.write_all(json_string.as_bytes()) {
Ok(_) => {
}
Err(e) => {
break;
}
}
}
if let ConnectionStatus::Stopped = *status.lock().unwrap() {
break;
}
}
if let ConnectionStatus::Stopped = *status.lock().unwrap() {
break;
}
std::thread::sleep(std::time::Duration::from_secs(1));
});
for _ in 0..100 {
std::thread::sleep(std::time::Duration::from_millis(10));
let status = self.status.lock().unwrap();
if let ConnectionStatus::Connecting = *status {
continue;
} else {
break;
}
}
if let ConnectionStatus::Connected(_) = self.get_status() {
return Ok(())
}
Err(Error::new(std::io::ErrorKind::Other, "Failed to connect"))
}
pub fn stop(&mut self) {
*self.status.lock().unwrap() = ConnectionStatus::Stopped;
}
pub fn get_status(&self) -> ConnectionStatus {
let status = self.status.lock().unwrap();
status.clone()
}
pub fn send(&mut self, message: Value) {
match self.get_status() {
ConnectionStatus::Connected(_) => {}
_ => {
println!("Not connected");
return;
}
}
let mut update_buffer = self.message_buffer.lock().unwrap();
update_buffer.push(message);
}
fn handle(
json: &Value,
handshake: &mut bool,
local_table: &mut Arc<Mutex<HashMap<String, Data>>>,
status: &mut Arc<Mutex<ConnectionStatus>>,
) -> Result<(), Error> {
match json["type"].as_str() {
Some("handshake") => {
if *handshake {
return Err(Error::new(std::io::ErrorKind::Other, "Handshake already completed"))
}
let response_status = json["status"].as_str().unwrap();
if response_status != "ok" {
let message = json["message"].as_str().unwrap_or("No message");
println!("Handshake failed: {}", message);
return Err(Error::new(std::io::ErrorKind::ConnectionRefused, message))
}
let terminate = json["terminate"].as_bool().unwrap_or(false);
if terminate {
return Err(Error::new(std::io::ErrorKind::ConnectionRefused, "Connection terminated"));
}
let table_json = &json["table"];
let mut table = local_table.lock().unwrap();
for (key, value) in table_json.as_object().unwrap() {
table.insert(key.to_string(), Data::from_json(value));
}
*status.lock().unwrap() = ConnectionStatus::Connected("socketboard".to_string());
*handshake = true;
Ok(())
}
Some("update") => {
let terminate = json["terminate"].as_bool().unwrap_or(false);
if terminate {
return Err(Error::new(std::io::ErrorKind::ConnectionRefused, "Connection terminated"));
}
let table_json = &json["table"];
if table_json.is_null() {
return Err(Error::new(std::io::ErrorKind::Other, "No table provided"));
}
let mut table = local_table.lock().unwrap();
for (key, value) in table_json.as_object().unwrap() {
table.insert(key.to_string(), Data::from_json(value));
}
Ok(())
}
_ => {
println!("Unknown message: {}", json);
Err(Error::new(std::io::ErrorKind::Other, "Unknown message"))
}
}
}
pub fn put_value(&mut self, key: &str, value: Data) -> Result<(), Error> {
self.local_table.lock().unwrap().insert(key.to_string(), value.clone());
match self.get_status() {
ConnectionStatus::Connected(_) => {}
_ => {
return Err(Error::new(std::io::ErrorKind::Other, "Not connected"));
}
}
let updated_table = json!({
key: value.to_json(),
});
self.send(json!({
"type": "update",
"table": updated_table,
}));
Ok(())
}
pub fn get_value(&self, key: &str, default: Data) -> Data {
let table = self.local_table.lock().unwrap();
match table.get(key).cloned() {
Some(value) => value,
_ => default,
}
}
pub fn print_table(&self) {
println!("-- SERVER DATA TABLE --");
for (key, value) in self.local_table.lock().unwrap().iter() {
println!("{} {}", key, value);
}
println!("-----------------------");
}
}