#![allow(clippy::result_large_err)]
use crate::policy::Policy;
use indexmap::IndexMap;
use lex_bytecode::vm::Vm;
use lex_bytecode::{Program, Value};
use std::net::TcpListener;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
struct Conn {
room: String,
outbound: mpsc::Sender<String>,
}
#[derive(Default)]
pub struct ChatRegistry {
conns: Mutex<IndexMap<u64, Conn>>,
}
impl ChatRegistry {
fn register(&self, room: String, outbound: mpsc::Sender<String>) -> u64 {
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
let id = NEXT_ID.fetch_add(1, Ordering::SeqCst);
self.conns.lock().unwrap().insert(id, Conn { room, outbound });
id
}
fn unregister(&self, id: u64) {
self.conns.lock().unwrap().shift_remove(&id);
}
fn broadcast(&self, room: &str, body: &str) {
let conns = self.conns.lock().unwrap();
for c in conns.values() {
if c.room == room {
let _ = c.outbound.send(body.to_string());
}
}
}
fn send_to(&self, id: u64, body: &str) -> bool {
if let Some(c) = self.conns.lock().unwrap().get(&id) {
let _ = c.outbound.send(body.to_string());
true
} else {
false
}
}
}
pub fn chat_broadcast(reg: &Arc<ChatRegistry>, room: &str, body: &str) {
reg.broadcast(room, body);
}
pub fn chat_send(reg: &Arc<ChatRegistry>, conn_id: u64, body: &str) -> bool {
reg.send_to(conn_id, body)
}
pub fn serve_ws(
port: u16,
handler_name: String,
program: Arc<Program>,
policy: Policy,
registry: Arc<ChatRegistry>,
) -> Result<Value, String> {
let listener = TcpListener::bind(("127.0.0.1", port))
.map_err(|e| format!("net.serve_ws bind {port}: {e}"))?;
eprintln!("net.serve_ws: listening on ws://127.0.0.1:{port}");
for stream in listener.incoming() {
let stream = match stream {
Ok(s) => s,
Err(e) => { eprintln!("net.serve_ws accept: {e}"); continue; }
};
let program = Arc::clone(&program);
let policy = policy.clone();
let handler_name = handler_name.clone();
let registry = Arc::clone(®istry);
thread::spawn(move || {
if let Err(e) = handle_connection(stream, program, policy, handler_name, registry) {
eprintln!("net.serve_ws connection error: {e}");
}
});
}
Ok(Value::Unit)
}
fn handle_connection(
stream: std::net::TcpStream,
program: Arc<Program>,
policy: Policy,
handler_name: String,
registry: Arc<ChatRegistry>,
) -> Result<(), String> {
use tungstenite::{accept_hdr, handshake::server::{Request, Response}};
let mut path = String::new();
let path_ref = &mut path;
let mut ws = accept_hdr(stream, |req: &Request, resp: Response| {
*path_ref = req.uri().path().to_string();
Ok(resp)
}).map_err(|e| format!("ws handshake: {e}"))?;
let room = path.trim_start_matches('/').to_string();
let (tx, rx) = mpsc::channel::<String>();
let conn_id = registry.register(room.clone(), tx);
let _ = ws.get_mut().set_read_timeout(Some(Duration::from_millis(50)));
let result = run_loop(&mut ws, &rx, conn_id, &room, &program, &policy, &handler_name, ®istry);
registry.unregister(conn_id);
let _ = ws.close(None);
result
}
#[allow(clippy::too_many_arguments)]
fn run_loop(
ws: &mut tungstenite::WebSocket<std::net::TcpStream>,
rx: &mpsc::Receiver<String>,
conn_id: u64,
room: &str,
program: &Arc<Program>,
policy: &Policy,
handler_name: &str,
registry: &Arc<ChatRegistry>,
) -> Result<(), String> {
use tungstenite::Message;
use std::io::ErrorKind;
loop {
match ws.read() {
Ok(Message::Text(body)) => {
let ev = build_ws_event(conn_id, room, &body);
let handler = crate::handler::DefaultHandler::new(policy.clone())
.with_program(Arc::clone(program))
.with_chat_registry(Arc::clone(registry));
let mut vm = Vm::with_handler(program, Box::new(handler));
if let Err(e) = vm.call(handler_name, vec![ev]) {
eprintln!("on_message {conn_id}: {e}");
}
}
Ok(Message::Binary(_)) => { }
Ok(Message::Close(_)) | Err(tungstenite::Error::ConnectionClosed) => break,
Ok(_) => {} Err(tungstenite::Error::Io(ref e)) if e.kind() == ErrorKind::WouldBlock
|| e.kind() == ErrorKind::TimedOut => {}
Err(e) => return Err(format!("ws read: {e}")),
}
loop {
match rx.try_recv() {
Ok(msg) => {
if let Err(e) = ws.send(Message::Text(msg.into())) {
return Err(format!("ws send: {e}"));
}
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => return Ok(()),
}
}
}
Ok(())
}
fn build_ws_event(conn_id: u64, room: &str, body: &str) -> Value {
let mut rec = IndexMap::new();
rec.insert("body".into(), Value::Str(body.into()));
rec.insert("conn_id".into(), Value::Int(conn_id as i64));
rec.insert("room".into(), Value::Str(room.into()));
Value::Record(rec)
}
fn build_ws_conn(conn_id: u64, path: &str, subprotocol: &str) -> Value {
let mut rec = IndexMap::new();
rec.insert("id".into(), Value::Str(conn_id.to_string().into()));
rec.insert("path".into(), Value::Str(path.into()));
rec.insert("subprotocol".into(), Value::Str(subprotocol.into()));
Value::Record(rec)
}
fn build_ws_message_text(body: &str) -> Value {
Value::Variant { name: "WsText".into(), args: vec![Value::Str(body.into())] }
}
fn build_ws_message_close() -> Value {
Value::Variant { name: "WsClose".into(), args: vec![] }
}
fn build_ws_message_ping() -> Value {
Value::Variant { name: "WsPing".into(), args: vec![] }
}
fn build_ws_message_binary(payload: &[u8]) -> Value {
let bytes = payload.iter().map(|b| Value::Int(*b as i64)).collect();
Value::Variant { name: "WsBinary".into(), args: vec![Value::List(bytes)] }
}
fn apply_ws_action<S: std::io::Read + std::io::Write>(
action: &Value,
ws: &mut tungstenite::WebSocket<S>,
) -> Result<(), String> {
use tungstenite::Message;
match action {
Value::Variant { name, args } if name == "WsSend" => {
let text = match args.first() {
Some(Value::Str(s)) => s.clone(),
_ => return Err("WsSend payload must be Str".into()),
};
ws.send(Message::Text(text.to_string().into()))
.map_err(|e| format!("ws send: {e}"))
}
Value::Variant { name, args } if name == "WsSendBinary" => {
let bytes: Vec<u8> = match args.first() {
Some(Value::List(elems)) => elems
.iter()
.map(|v| match v {
Value::Int(n) => Ok(*n as u8),
_ => Err("WsSendBinary payload must be List[Int]".into()),
})
.collect::<Result<Vec<_>, String>>()?,
_ => return Err("WsSendBinary payload must be List[Int]".into()),
};
ws.send(Message::Binary(bytes.into()))
.map_err(|e| format!("ws send binary: {e}"))
}
Value::Variant { name, .. } if name == "WsNoOp" => Ok(()),
other => Err(format!("unexpected WsAction: {other:?}")),
}
}
pub fn serve_ws_fn(
port: u16,
subprotocol: String,
closure: Value,
program: Arc<Program>,
policy: Policy,
registry: Arc<ChatRegistry>,
) -> Result<Value, String> {
if !subprotocol.is_empty() {
if let Err(e) =
tungstenite::http::HeaderValue::from_str(&subprotocol)
{
return Err(format!(
"net.serve_ws_fn: subprotocol {subprotocol:?} is not a valid \
HTTP header value: {e}"
));
}
}
let listener = TcpListener::bind(("127.0.0.1", port))
.map_err(|e| format!("net.serve_ws_fn bind {port}: {e}"))?;
eprintln!("net.serve_ws_fn: listening on ws://127.0.0.1:{port}");
for stream in listener.incoming() {
let stream = match stream {
Ok(s) => s,
Err(e) => { eprintln!("net.serve_ws_fn accept: {e}"); continue; }
};
let program = Arc::clone(&program);
let policy = policy.clone();
let closure = closure.clone();
let subprotocol = subprotocol.clone();
let registry = Arc::clone(®istry);
thread::spawn(move || {
if let Err(e) = handle_connection_fn(
stream, program, policy, closure, subprotocol, registry,
) {
eprintln!("net.serve_ws_fn connection error: {e}");
}
});
}
Ok(Value::Unit)
}
fn handle_connection_fn(
stream: std::net::TcpStream,
program: Arc<Program>,
policy: Policy,
closure: Value,
subprotocol: String,
registry: Arc<ChatRegistry>,
) -> Result<(), String> {
use tungstenite::{accept_hdr, handshake::server::{Request, Response}};
let mut path = String::new();
let path_ref = &mut path;
let subproto_for_handshake = subprotocol.clone();
let mut ws = accept_hdr(stream, |req: &Request, mut resp: Response| {
*path_ref = req.uri().path().to_string();
maybe_echo_subprotocol(req, &mut resp, &subproto_for_handshake);
Ok(resp)
}).map_err(|e| format!("ws handshake: {e}"))?;
let (tx, rx) = mpsc::channel::<String>();
let conn_id = registry.register(path.trim_start_matches('/').to_string(), tx);
let _ = ws.get_mut().set_read_timeout(Some(Duration::from_millis(50)));
let result = run_loop_fn(
&mut ws, &rx, conn_id, &path, &subprotocol,
&program, &policy, &closure, ®istry,
);
registry.unregister(conn_id);
let _ = ws.close(None);
result
}
fn maybe_echo_subprotocol(
req: &tungstenite::handshake::server::Request,
resp: &mut tungstenite::handshake::server::Response,
subprotocol: &str,
) {
use tungstenite::http::HeaderValue;
if subprotocol.is_empty() {
return;
}
let offered = match req.headers().get("Sec-WebSocket-Protocol") {
Some(v) => v,
None => return,
};
let offered_str = match offered.to_str() {
Ok(s) => s,
Err(_) => return,
};
let matches = offered_str
.split(',')
.map(|p| p.trim())
.any(|p| p == subprotocol);
if !matches {
return;
}
if let Ok(h) = HeaderValue::from_str(subprotocol) {
resp.headers_mut().insert("Sec-WebSocket-Protocol", h);
}
}
pub fn serve_ws_fn_auth(
port: u16,
subprotocol: String,
auth_closure: Value,
handler_closure: Value,
program: Arc<Program>,
policy: Policy,
registry: Arc<ChatRegistry>,
) -> Result<Value, String> {
if !subprotocol.is_empty() {
if let Err(e) =
tungstenite::http::HeaderValue::from_str(&subprotocol)
{
return Err(format!(
"net.serve_ws_fn_auth: subprotocol {subprotocol:?} is not a \
valid HTTP header value: {e}"
));
}
}
let listener = TcpListener::bind(("127.0.0.1", port))
.map_err(|e| format!("net.serve_ws_fn_auth bind {port}: {e}"))?;
eprintln!("net.serve_ws_fn_auth: listening on ws://127.0.0.1:{port}");
for stream in listener.incoming() {
let stream = match stream {
Ok(s) => s,
Err(e) => {
eprintln!("net.serve_ws_fn_auth accept: {e}");
continue;
}
};
let program = Arc::clone(&program);
let policy = policy.clone();
let auth_closure = auth_closure.clone();
let handler_closure = handler_closure.clone();
let subprotocol = subprotocol.clone();
let registry = Arc::clone(®istry);
thread::spawn(move || {
if let Err(e) = handle_connection_fn_auth(
stream, program, policy, auth_closure, handler_closure,
subprotocol, registry,
) {
eprintln!("net.serve_ws_fn_auth connection error: {e}");
}
});
}
Ok(Value::Unit)
}
#[allow(clippy::too_many_arguments)]
fn handle_connection_fn_auth(
stream: std::net::TcpStream,
program: Arc<Program>,
policy: Policy,
auth_closure: Value,
handler_closure: Value,
subprotocol: String,
registry: Arc<ChatRegistry>,
) -> Result<(), String> {
use tungstenite::{accept_hdr, handshake::server::{Request, Response}};
use tungstenite::http::StatusCode;
let mut path = String::new();
let path_ref = &mut path;
let subproto_for_handshake = subprotocol.clone();
let auth_program = Arc::clone(&program);
let auth_policy = policy.clone();
let auth_registry = Arc::clone(®istry);
let auth_closure_for_cb = auth_closure;
let mut ws = accept_hdr(stream, move |req: &Request, mut resp: Response| {
*path_ref = req.uri().path().to_string();
let headers_value = build_headers_value(req);
let path_arg = Value::Str(path_ref.clone().into());
let dh = crate::handler::DefaultHandler::new(auth_policy.clone())
.with_program(Arc::clone(&auth_program))
.with_chat_registry(Arc::clone(&auth_registry));
let mut vm = Vm::with_handler(&auth_program, Box::new(dh));
let auth_result = vm.invoke_closure_value(
auth_closure_for_cb,
vec![path_arg, headers_value],
);
match auth_result {
Ok(Value::Variant { name, .. }) if name == "Ok" => {
maybe_echo_subprotocol(req, &mut resp, &subproto_for_handshake);
Ok(resp)
}
Ok(Value::Variant { name, args }) if name == "Err" => {
let msg = match args.first() {
Some(Value::Str(s)) => s.to_string(),
_ => "unauthorized".to_string(),
};
let err = build_unauthorized_response(StatusCode::UNAUTHORIZED, msg);
Err(err)
}
Ok(other) => {
let err = build_unauthorized_response(
StatusCode::INTERNAL_SERVER_ERROR,
format!(
"net.serve_ws_fn_auth: auth callback returned \
non-Result value: {other:?}"
),
);
Err(err)
}
Err(e) => {
let err = build_unauthorized_response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("net.serve_ws_fn_auth: auth callback error: {e:?}"),
);
Err(err)
}
}
}).map_err(|e| format!("ws handshake: {e}"))?;
let (tx, rx) = mpsc::channel::<String>();
let conn_id = registry.register(path.trim_start_matches('/').to_string(), tx);
let _ = ws.get_mut().set_read_timeout(Some(Duration::from_millis(50)));
let result = run_loop_fn(
&mut ws, &rx, conn_id, &path, &subprotocol,
&program, &policy, &handler_closure, ®istry,
);
registry.unregister(conn_id);
let _ = ws.close(None);
result
}
fn build_headers_value(req: &tungstenite::handshake::server::Request) -> Value {
let mut items: std::collections::VecDeque<Value> = std::collections::VecDeque::new();
for (name, val) in req.headers().iter() {
let v = match val.to_str() {
Ok(s) => s.to_string(),
Err(_) => continue,
};
let mut rec = IndexMap::new();
rec.insert("name".into(), Value::Str(name.as_str().into()));
rec.insert("value".into(), Value::Str(v.into()));
items.push_back(Value::Record(rec));
}
Value::List(items)
}
fn build_unauthorized_response(
status: tungstenite::http::StatusCode,
msg: String,
) -> tungstenite::handshake::server::ErrorResponse {
tungstenite::http::Response::builder()
.status(status)
.header("Content-Type", "text/plain; charset=utf-8")
.body(Some(msg))
.expect("ErrorResponse builder")
}
#[allow(clippy::too_many_arguments)]
fn run_loop_fn(
ws: &mut tungstenite::WebSocket<std::net::TcpStream>,
rx: &mpsc::Receiver<String>,
conn_id: u64,
path: &str,
subprotocol: &str,
program: &Arc<Program>,
policy: &Policy,
closure: &Value,
registry: &Arc<ChatRegistry>,
) -> Result<(), String> {
use tungstenite::Message;
use std::io::ErrorKind;
let ws_conn = build_ws_conn(conn_id, path, subprotocol);
loop {
let ws_msg = match ws.read() {
Ok(Message::Text(body)) => Some(build_ws_message_text(&body)),
Ok(Message::Binary(_)) => None,
Ok(Message::Ping(_)) => Some(build_ws_message_ping()),
Ok(Message::Close(_)) | Err(tungstenite::Error::ConnectionClosed) => {
let handler = crate::handler::DefaultHandler::new(policy.clone())
.with_program(Arc::clone(program))
.with_chat_registry(Arc::clone(registry));
let mut vm = Vm::with_handler(program, Box::new(handler));
let _ = vm.invoke_closure_value(
closure.clone(),
vec![ws_conn.clone(), build_ws_message_close()],
);
break;
}
Ok(_) => None, Err(tungstenite::Error::Io(ref e))
if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut => None,
Err(e) => return Err(format!("ws read: {e}")),
};
if let Some(msg) = ws_msg {
let handler = crate::handler::DefaultHandler::new(policy.clone())
.with_program(Arc::clone(program))
.with_chat_registry(Arc::clone(registry));
let mut vm = Vm::with_handler(program, Box::new(handler));
match vm.invoke_closure_value(closure.clone(), vec![ws_conn.clone(), msg]) {
Ok(action) => {
if let Err(e) = apply_ws_action(&action, ws) {
eprintln!("ws action {conn_id}: {e}");
}
}
Err(e) => eprintln!("ws handler {conn_id}: {e}"),
}
}
loop {
match rx.try_recv() {
Ok(msg) => {
if let Err(e) = ws.send(Message::Text(msg.into())) {
return Err(format!("ws send: {e}"));
}
}
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => return Ok(()),
}
}
}
Ok(())
}
pub fn serve_ws_fn_actor(
port: u16,
subprotocol: String,
name_of_closure: Value,
on_message_closure: Value,
program: Arc<Program>,
policy: Policy,
registry: Arc<ChatRegistry>,
) -> Result<Value, String> {
if !subprotocol.is_empty() {
if let Err(e) =
tungstenite::http::HeaderValue::from_str(&subprotocol)
{
return Err(format!(
"net.serve_ws_fn_actor: subprotocol {subprotocol:?} is not a valid \
HTTP header value: {e}"
));
}
}
let listener = TcpListener::bind(("127.0.0.1", port))
.map_err(|e| format!("net.serve_ws_fn_actor bind {port}: {e}"))?;
eprintln!("net.serve_ws_fn_actor: listening on ws://127.0.0.1:{port}");
for stream in listener.incoming() {
let stream = match stream {
Ok(s) => s,
Err(e) => { eprintln!("net.serve_ws_fn_actor accept: {e}"); continue; }
};
let program = Arc::clone(&program);
let policy = policy.clone();
let name_of_closure = name_of_closure.clone();
let on_message_closure = on_message_closure.clone();
let subprotocol = subprotocol.clone();
let registry = Arc::clone(®istry);
thread::spawn(move || {
if let Err(e) = handle_connection_fn_actor(
stream, program, policy, name_of_closure, on_message_closure,
subprotocol, registry,
) {
eprintln!("net.serve_ws_fn_actor connection error: {e}");
}
});
}
Ok(Value::Unit)
}
#[allow(clippy::too_many_arguments)]
fn handle_connection_fn_actor(
stream: std::net::TcpStream,
program: Arc<Program>,
policy: Policy,
name_of_closure: Value,
on_message_closure: Value,
subprotocol: String,
registry: Arc<ChatRegistry>,
) -> Result<(), String> {
use tungstenite::{accept_hdr, handshake::server::{Request, Response}};
let mut path = String::new();
let path_ref = &mut path;
let subproto_for_handshake = subprotocol.clone();
let mut ws = accept_hdr(stream, |req: &Request, mut resp: Response| {
*path_ref = req.uri().path().to_string();
maybe_echo_subprotocol(req, &mut resp, &subproto_for_handshake);
Ok(resp)
}).map_err(|e| format!("ws handshake: {e}"))?;
let (tx, rx) = mpsc::channel::<String>();
let conn_id = registry.register(path.trim_start_matches('/').to_string(), tx.clone());
let _ = ws.get_mut().set_read_timeout(Some(Duration::from_millis(50)));
let ws_conn = build_ws_conn(conn_id, &path, &subprotocol);
let registered_name: Option<String> = {
let handler = crate::handler::DefaultHandler::new(policy.clone())
.with_program(Arc::clone(&program))
.with_chat_registry(Arc::clone(®istry));
let mut vm = Vm::with_handler(&program, Box::new(handler));
match vm.invoke_closure_value(name_of_closure.clone(), vec![ws_conn.clone()]) {
Ok(Value::Str(s)) if !s.is_empty() => Some(s.to_string()),
Ok(Value::Str(_)) => None,
Ok(other) => {
registry.unregister(conn_id);
let _ = ws.close(None);
return Err(format!(
"net.serve_ws_fn_actor: name_of must return Str, got {other:?}"
));
}
Err(e) => {
registry.unregister(conn_id);
let _ = ws.close(None);
return Err(format!(
"net.serve_ws_fn_actor: name_of error: {e:?}"
));
}
}
};
if let Some(ref name) = registered_name {
let tx_for_bridge = tx.clone();
let bridge = lex_bytecode::value::NativeActorHandler {
send: Box::new(move |msg: Value| -> Result<Value, String> {
match msg {
Value::Str(s) => {
tx_for_bridge.send(s.to_string()).map_err(|e| {
format!("net.serve_ws_fn_actor: outbound channel closed: {e}")
})?;
Ok(Value::Unit)
}
other => Err(format!(
"net.serve_ws_fn_actor: native bridge accepts Str messages only, got {other:?}"
)),
}
}),
};
let cell = Value::Actor(Arc::new(Mutex::new(lex_bytecode::value::ActorCell {
state: Value::Unit,
handler: lex_bytecode::value::ActorHandler::Native(Arc::new(bridge)),
})));
if let Err(e) = lex_bytecode::conc_registry::register(name, cell) {
registry.unregister(conn_id);
let _ = ws.close(None);
return Err(format!(
"net.serve_ws_fn_actor: conc.register({name:?}) failed: {e:?}"
));
}
}
let result = run_loop_fn(
&mut ws, &rx, conn_id, &path, &subprotocol,
&program, &policy, &on_message_closure, ®istry,
);
if let Some(ref name) = registered_name {
let _ = lex_bytecode::conc_registry::unregister(name);
}
registry.unregister(conn_id);
let _ = ws.close(None);
result
}
fn build_dial_result(ok: Result<(), String>) -> Value {
match ok {
Ok(()) => Value::Variant {
name: "Ok".into(),
args: vec![Value::Unit],
},
Err(msg) => Value::Variant {
name: "Err".into(),
args: vec![Value::Str(msg.into())],
},
}
}
pub fn dial_ws(
url: String,
subprotocol: String,
on_open: Value,
on_message: Value,
program: Arc<Program>,
policy: Policy,
) -> Result<Value, String> {
use tungstenite::client::IntoClientRequest;
use tungstenite::http::HeaderValue;
let mut req = match url.as_str().into_client_request() {
Ok(r) => r,
Err(e) => {
return Ok(build_dial_result(Err(format!(
"net.dial_ws: bad URL `{url}`: {e}"
))));
}
};
if !subprotocol.is_empty() {
let header = match HeaderValue::from_str(&subprotocol) {
Ok(h) => h,
Err(e) => {
return Ok(build_dial_result(Err(format!(
"net.dial_ws: invalid subprotocol `{subprotocol}`: {e}"
))));
}
};
req.headers_mut().insert("Sec-WebSocket-Protocol", header);
}
let (mut ws, _resp) = match tungstenite::connect(req) {
Ok(pair) => pair,
Err(e) => {
return Ok(build_dial_result(Err(format!(
"net.dial_ws: connect to `{url}`: {e}"
))));
}
};
if let Some(stream) = stream_for(&mut ws) {
let _ = stream.set_read_timeout(Some(Duration::from_millis(50)));
}
{
let handler = crate::handler::DefaultHandler::new(policy.clone())
.with_program(Arc::clone(&program));
let mut vm = Vm::with_handler(&program, Box::new(handler));
match vm.invoke_closure_value(on_open.clone(), vec![]) {
Ok(action) => {
if let Err(e) = apply_ws_action(&action, &mut ws) {
return Ok(build_dial_result(Err(format!(
"net.dial_ws: on_open action: {e}"
))));
}
}
Err(e) => {
return Ok(build_dial_result(Err(format!(
"net.dial_ws: on_open: {e}"
))));
}
}
}
let loop_result = dial_run_loop(&mut ws, &on_message, &program, &policy);
let _ = ws.close(None);
Ok(build_dial_result(loop_result))
}
fn stream_for(
ws: &mut tungstenite::WebSocket<tungstenite::stream::MaybeTlsStream<std::net::TcpStream>>,
) -> Option<&mut std::net::TcpStream> {
use tungstenite::stream::MaybeTlsStream;
match ws.get_mut() {
MaybeTlsStream::Plain(s) => Some(s),
MaybeTlsStream::Rustls(s) => Some(s.get_mut()),
_ => None,
}
}
fn dial_run_loop(
ws: &mut tungstenite::WebSocket<tungstenite::stream::MaybeTlsStream<std::net::TcpStream>>,
on_message: &Value,
program: &Arc<Program>,
policy: &Policy,
) -> Result<(), String> {
use std::io::ErrorKind;
use tungstenite::Message;
loop {
let ws_msg = match ws.read() {
Ok(Message::Text(body)) => Some(build_ws_message_text(&body)),
Ok(Message::Binary(payload)) => Some(build_ws_message_binary(&payload)),
Ok(Message::Ping(_)) => Some(build_ws_message_ping()),
Ok(Message::Close(_)) | Err(tungstenite::Error::ConnectionClosed) => {
let handler = crate::handler::DefaultHandler::new(policy.clone())
.with_program(Arc::clone(program));
let mut vm = Vm::with_handler(program, Box::new(handler));
let _ = vm.invoke_closure_value(
on_message.clone(),
vec![build_ws_message_close()],
);
return Ok(());
}
Ok(_) => None, Err(tungstenite::Error::Io(ref e))
if e.kind() == ErrorKind::WouldBlock || e.kind() == ErrorKind::TimedOut =>
{
None
}
Err(e) => return Err(format!("net.dial_ws: read: {e}")),
};
if let Some(msg) = ws_msg {
let handler = crate::handler::DefaultHandler::new(policy.clone())
.with_program(Arc::clone(program));
let mut vm = Vm::with_handler(program, Box::new(handler));
match vm.invoke_closure_value(on_message.clone(), vec![msg]) {
Ok(action) => {
if let Err(e) = apply_ws_action(&action, ws) {
return Err(format!("net.dial_ws: action: {e}"));
}
}
Err(e) => return Err(format!("net.dial_ws: on_message: {e}")),
}
}
}
}