gun 0.1.2

The Gun database, in Rust
Documentation
use std::{
	net::{TcpListener, TcpStream},
	process,
	str::from_utf8,
	sync::{Arc, Mutex},
	thread,
};

use anyhow::Result;
use serde_json::{json, Value as JSON};
use tungstenite::{accept, handshake::HandshakeRole, Error, HandshakeError, Message, WebSocket};

use crate::{
	dedup::{Dedup, random_soul},
	GunBuilder,
	GunFunctions,
	ham::mix_ham,
	util::{lex_from_graph, parse_json, SOUL},
};

type PeerList = Vec<Arc<Mutex<WebSocket<TcpStream>>>>;

struct Store {
	peers: PeerList,
	dedup: Dedup,
    graph: JSON,
}

impl Store {
	pub fn new() -> Self {
		Self {
			peers: Vec::new(),
			dedup: Dedup::new(),
            graph: json!({}),
		}
	}
}

fn emit(peers: &PeerList, msg: String) {
    for socket in peers {
        match socket.try_lock() {
        	Ok(mut socket) => socket
        		.write_message(Message::Text(msg.clone()))
        		.unwrap_or_else(|err| log::error!("{}", err)),
        	Err(_) => (),
        }
    }
}

fn handle_message(store: &Arc<Mutex<Store>>, msg_str: &str) {
    match parse_json(msg_str) {
        Some(msg) => {
            let soul = msg[SOUL]
                .as_str()
                .expect("Soul must be a string")
                .to_owned();

            if store.lock().unwrap().dedup.check(soul.clone()).is_none() {
                store.lock().unwrap().dedup.track(soul);

                if !msg["put"].is_null() {
                    mix_ham(msg["put"].clone(), &mut store.lock().unwrap().graph);
                    log::info!("PUT {}", store.lock().unwrap().graph);
                }

                if !msg["get"].is_null() {
                    let ack = lex_from_graph(msg["get"].clone(), &store.lock().unwrap().graph);

                    match ack {
                        Ok(ack) => {
                            let data = json!({
                                SOUL: store.lock().unwrap().dedup.track(random_soul()),
                                "@": msg[SOUL],
                                "put": ack,
                            }).to_string();

                            emit(
                                &store.lock().unwrap().peers,
                                data.into(),
                            );

                            log::info!("GET {}", ack);
                        },
                        Err(err) => log::error!("{}", err),
                    }
                }
                
                emit(&store.lock().unwrap().peers, msg_str.into());
            }
        },
        None => (),
    }
}

fn must_not_block<Role: HandshakeRole>(err: HandshakeError<Role>) -> Error {
    match err {
        HandshakeError::Interrupted(_) => {
        	log::error!("Blocking socket would block");
        	process::exit(0)
        },
        HandshakeError::Failure(f) => f,
    }
}

fn handle_client(store: Arc<Mutex<Store>>, stream: TcpStream) -> tungstenite::Result<()> {
    let socket = accept(stream).map_err(must_not_block)?;
    let socket = Arc::new(Mutex::new(socket));
	store.lock().unwrap().peers.push(socket.clone());

    loop {
    	let mut socket = socket.lock().unwrap();
        match socket.read_message()? {
            Message::Text(msg) => {
            	drop(socket);
            	handle_message(&store, &msg)
            },
            Message::Binary(msg) => {
            	drop(socket);
            	handle_message(&store, from_utf8(msg.as_slice()).unwrap())
            },
            Message::Ping(_) | Message::Pong(_) | Message::Close(_) => (),
        }
    }
}

pub fn start(peers: &[&str]) -> Result<()> {
	let server = TcpListener::bind("127.0.0.1:8080").unwrap();
	let store = Arc::new(Mutex::new(Store::new()));

	log::info!("Running on 127.0.0.1:8080");

    for stream in server.incoming() {
    	let store = store.clone();
        thread::spawn(move || match stream {
            Ok(stream) => {
                if let Err(err) = handle_client(store, stream) {
                    match err {
                        Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
                        e => log::error!("{}", e),
                    }
                }
            },
            Err(e) => log::error!("Error accepting stream: {}", e),
        });
    }

	Ok(())
}

pub fn plug_into(gun: &mut GunBuilder) {
	gun.functions = GunFunctions {
		start,
	}
}