#![allow(clippy::result_large_err)]
use crate::policy::Policy;
use indexmap::IndexMap;
use lex_bytecode::vm::Vm;
use lex_bytecode::{Program, Value};
use std::net::TcpListener;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
struct Conn {
room: String,
outbound: mpsc::Sender<String>,
}
#[derive(Default)]
pub struct ChatRegistry {
conns: Mutex<IndexMap<u64, Conn>>,
}
impl ChatRegistry {
fn register(&self, room: String, outbound: mpsc::Sender<String>) -> u64 {
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
let id = NEXT_ID.fetch_add(1, Ordering::SeqCst);
self.conns.lock().unwrap().insert(id, Conn { room, outbound });
id
}
fn unregister(&self, id: u64) {
self.conns.lock().unwrap().shift_remove(&id);
}
fn broadcast(&self, room: &str, body: &str) {
let conns = self.conns.lock().unwrap();
for c in conns.values() {
if c.room == room {
let _ = c.outbound.send(body.to_string());
}
}
}
fn send_to(&self, id: u64, body: &str) -> bool {
if let Some(c) = self.conns.lock().unwrap().get(&id) {
let _ = c.outbound.send(body.to_string());
true
} else {
false
}
}
}
pub fn chat_broadcast(reg: &Arc<ChatRegistry>, room: &str, body: &str) {
reg.broadcast(room, body);
}
pub fn chat_send(reg: &Arc<ChatRegistry>, conn_id: u64, body: &str) -> bool {
reg.send_to(conn_id, body)
}
pub fn serve_ws(
port: u16,
handler_name: String,
program: Arc<Program>,
policy: Policy,
registry: Arc<ChatRegistry>,
) -> Result<Value, String> {
let listener = TcpListener::bind(("127.0.0.1", port))
.map_err(|e| format!("net.serve_ws bind {port}: {e}"))?;
eprintln!("net.serve_ws: listening on ws://127.0.0.1:{port}");
for stream in listener.incoming() {
let stream = match stream {
Ok(s) => s,
Err(e) => { eprintln!("net.serve_ws accept: {e}"); continue; }
};
let program = Arc::clone(&program);
let policy = policy.clone();
let handler_name = handler_name.clone();
let registry = Arc::clone(®istry);
thread::spawn(move || {
if let Err(e) = handle_connection(stream, program, policy, handler_name, registry) {
eprintln!("net.serve_ws connection error: {e}");
}
});
}
Ok(Value::Unit)
}
fn handle_connection(
stream: std::net::TcpStream,
program: Arc<Program>,
policy: Policy,
handler_name: String,
registry: Arc<ChatRegistry>,
) -> Result<(), String> {
use tungstenite::{accept_hdr, handshake::server::{Request, Response}};
let mut path = String::new();
let path_ref = &mut path;
let mut ws = accept_hdr(stream, |req: &Request, resp: Response| {
*path_ref = req.uri().path().to_string();
Ok(resp)
}).map_err(|e| format!("ws handshake: {e}"))?;
let room = path.trim_start_matches('/').to_string();
let (tx, rx) = mpsc::channel::<String>();
let conn_id = registry.register(room.clone(), tx);
let _ = ws.get_mut().set_read_timeout(Some(Duration::from_millis(50)));
let result = run_loop(&mut ws, &rx, conn_id, &room, &program, &policy, &handler_name, ®istry);
registry.unregister(conn_id);
let _ = ws.close(None);
result
}
#[allow(clippy::too_many_arguments)]
fn run_loop(
ws: &mut tungstenite::WebSocket<std::net::TcpStream>,
rx: &mpsc::Receiver<String>,
conn_id: u64,
room: &str,
program: &Arc<Program>,
policy: &Policy,
handler_name: &str,
registry: &Arc<ChatRegistry>,
) -> Result<(), String> {
use tungstenite::Message;
use std::io::ErrorKind;
loop {
match ws.read() {
Ok(Message::Text(body)) => {
let ev = build_ws_event(conn_id, room, &body);
let handler = crate::handler::DefaultHandler::new(policy.clone())
.with_program(Arc::clone(program))
.with_chat_registry(Arc::clone(registry));
let mut vm = Vm::with_handler(program, Box::new(handler));
if let Err(e) = vm.call(handler_name, vec![ev]) {
eprintln!("on_message {conn_id}: {e}");
}
}
Ok(Message::Binary(_)) => { }
Ok(Message::Close(_)) | Err(tungstenite::Error::ConnectionClosed) => break,
Ok(_) => {} Err(tungstenite::Error::Io(ref e)) if e.kind() == ErrorKind::WouldBlock
|| e.kind() == ErrorKind::TimedOut => {}
Err(e) => return Err(format!("ws read: {e}")),
}
loop {
match rx.try_recv() {
Ok(msg) => {
if let Err(e) = ws.send(Message::Text(msg.into())) {
return Err(format!("ws send: {e}"));
}
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => return Ok(()),
}
}
}
Ok(())
}
fn build_ws_event(conn_id: u64, room: &str, body: &str) -> Value {
let mut rec = IndexMap::new();
rec.insert("body".into(), Value::Str(body.to_string()));
rec.insert("conn_id".into(), Value::Int(conn_id as i64));
rec.insert("room".into(), Value::Str(room.to_string()));
Value::Record(rec)
}