use base64::{prelude::BASE64_STANDARD, Engine};
use sha1::Digest;
use std::{
fmt::{self, Write as _},
io::{self, BufReader, BufWriter, prelude::*},
net::*,
sync::{
Condvar, Mutex, atomic::AtomicBool, mpsc::{Receiver, RecvTimeoutError, Sender, channel}
},
time::Duration,
};
use v_log::{Color, Record, SetVLoggerError, VLog, Visual};
static WAIT: (Mutex<bool>, Condvar) = (Mutex::new(false), Condvar::new());
static INIT: AtomicBool = AtomicBool::new(false);
pub struct Builder {
port: u16,
targets: Vec<String>,
}
pub struct WebVLogger {
sender: Sender<String>,
targets: Vec<String>,
}
#[allow(missing_copy_implementations)]
#[derive(Debug)]
pub enum InitError {
SetVLoggerError(SetVLoggerError),
TcpError(io::Error),
}
impl fmt::Display for InitError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::SetVLoggerError(e) => e.fmt(f),
Self::TcpError(e) => e.fmt(f),
}
}
}
impl std::error::Error for InitError {}
impl From<SetVLoggerError> for InitError {
fn from(value: SetVLoggerError) -> Self {
Self::SetVLoggerError(value)
}
}
impl From<io::Error> for InitError {
fn from(value: io::Error) -> Self {
Self::TcpError(value)
}
}
impl Builder {
pub fn new() -> Self {
Self {
port: 0,
targets: vec![],
}
}
pub fn port(&mut self, port: u16) -> &mut Self {
self.port = port;
self
}
pub fn add_target(&mut self, target: &str) -> &mut Self {
self.targets.push(target.to_owned());
self
}
pub fn targets_from_env(&mut self) -> &mut Self {
if let Ok(var) = std::env::var("RUST_VLOG") {
for target in var.split(",") {
let target = target.trim();
if !target.is_empty() {
self.add_target(target);
}
}
}
self
}
pub fn init(&self) -> Result<u16, InitError> {
let port = self.port;
let (sender, rx) = channel();
let mut vlogger = WebVLogger {
sender,
targets: self.targets.clone(),
};
vlogger.targets.sort();
vlogger.targets.dedup();
v_log::set_boxed_vlogger(Box::new(vlogger))?;
INIT.store(true, std::sync::atomic::Ordering::SeqCst);
let listener = TcpListener::bind(("localhost", port))?;
let addr = listener.local_addr()?;
log::info!("web-vlog server started on {addr}");
std::thread::spawn(move || {
server_loop(listener, rx);
});
if port != 0 {
assert_eq!(port, addr.port());
}
Ok(addr.port())
}
}
impl VLog for WebVLogger {
fn enabled(&self, metadata: &v_log::Metadata) -> bool {
self.targets.is_empty()
|| self
.targets
.iter()
.any(|target| metadata.target().starts_with(target))
}
fn vlog(&self, record: &Record) {
if !self.enabled(record.metadata()) {
return;
}
let surface = record.surface().escape_default();
let size = record.size();
let color_meta = |start| {
let mut msg = format!(
"{start},\"surf\":\"{surface}\",\"meta\":{{\"target\":\"{}\",\"file\":\"{}\",\"line\":{}}},\"col\":\"",
record.target().escape_default(),
record
.file()
.unwrap_or("")
.trim_start_matches('.')
.escape_default(),
record.line().unwrap_or(0),
);
match *record.color() {
Color::Base => msg.push_str("var(--base)\"}"),
Color::Healthy => msg.push_str("var(--healthy)\"}"),
Color::Error => msg.push_str("var(--error)\"}"),
Color::Warn => msg.push_str("var(--warn)\"}"),
Color::Info => msg.push_str("var(--info)\"}"),
Color::X => msg.push_str("var(--x)\"}"),
Color::Y => msg.push_str("var(--y)\"}"),
Color::Z => msg.push_str("var(--z)\"}"),
Color::Missing => msg.push_str("var(--mis)\"}"),
Color::Hex(hexcode) => write!(&mut msg, "#{hexcode:08X}\"}}").unwrap(),
_ => msg.push_str("#000\"}"), }
msg
};
let mut tmp = String::new();
let label = record.args().as_str().map_or_else(
|| {
tmp = record.args().to_string();
tmp.escape_default()
},
|s| s.escape_default(),
);
let msg = match record.visual() {
Visual::Message => {
color_meta(format_args!("{{\"msg\":\"{label}\""))
}
Visual::Label { x, y, z, alignment } => {
if record.args().as_str().map_or(false, |s| s.is_empty()) {
return; }
color_meta(format_args!("{{\"lbl\":\"{label}\",\"pos\":[{x},{y},{z}],\"align\":{},\"size\":{size}", *alignment as u8))
}
Visual::Point { x, y, z, style } => {
color_meta(format_args!("{{\"lbl\":\"{label}\",\"pos\":[{x},{y},{z}],\"style\":\"{style:?}\",\"size\":{size}"))
}
Visual::Line { x1, y1, z1, x2, y2, z2, style } => {
color_meta(format_args!("{{\"lbl\":\"{label}\",\"pos\":[{x1},{y1},{z1}],\"pos2\":[{x2},{y2},{z2}],\"style\":\"{style:?}\",\"size\":{size}"))
}
};
let _ = self.sender.send(msg);
}
fn clear(&self, surface: &str) {
let _ = self.sender.send(format!(
"{{\"clear\":1,\"surf\":\"{}\"}}",
surface.escape_default()
));
}
fn flush(&self) {
let lock = WAIT.0.lock().unwrap();
if *lock {
if let Ok(_) = self.sender.send(String::new()) {
let _lock = WAIT.1.wait(lock).unwrap();
}
}
}
}
pub fn init_port(port: u16) -> Result<u16, InitError> {
Builder::new().port(port).init()
}
pub fn init() -> u16 {
Builder::new().targets_from_env().init().unwrap()
}
pub fn wait_for_connection() {
if INIT.load(std::sync::atomic::Ordering::SeqCst) {
let lock = WAIT.0.lock().unwrap();
let _lock = WAIT.1.wait_while(lock, |v| !*v).unwrap();
}
}
pub fn wait_for_disconnect() {
let lock = WAIT.0.lock().unwrap();
let _lock = WAIT.1.wait_while(lock, |v| *v).unwrap();
}
pub fn wait_for_disconnect_timeout(dur: Duration) -> bool {
let lock = WAIT.0.lock().unwrap();
let lock = WAIT.1.wait_timeout_while(lock, dur, |v| *v).unwrap();
!lock.1.timed_out()
}
fn server_loop(listener: TcpListener, rx: Receiver<String>) {
while let Ok((mut stream, addr)) = listener.accept() {
log::info!("vlogger connection from {addr}");
if let Err(err) = handle_connection(&stream, &rx) {
if let Err(err) = stream
.write_all(format!("HTTP/1.1 500 INTERNAL SERVER ERROR\r\n\r\n{err}").as_bytes())
{
log::error!("an error occurred: {err:?}");
}
}
}
}
fn handle_connection(stream: &TcpStream, rx: &Receiver<String>) -> std::io::Result<()> {
let mut buf_reader = BufReader::new(stream);
let mut buf_writer = BufWriter::new(stream);
let mut buf = String::new();
let mut http_request = String::new();
let mut key_back = String::new();
while let Ok(bytes) = buf_reader.read_line(&mut buf) {
let l = buf.trim_end();
log::debug!("{l}");
if bytes == 0 || l.is_empty() {
break;
}
if http_request.is_empty() {
http_request.push_str(l);
}
else if let Some(key) = l.strip_prefix("Sec-WebSocket-Key: ") {
let key = key.to_owned() + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
let digest = sha1::Sha1::digest(key);
key_back = BASE64_STANDARD.encode(digest);
}
buf.clear();
}
let (get, rest) = http_request.split_once(' ').unwrap_or(("", ""));
let (path, http) = rest.split_once(' ').unwrap_or(("", ""));
if get == "GET" && http == "HTTP/1.1" {
if !key_back.is_empty() {
log::debug!("vlogging client connected");
{
let mut guard = WAIT.0.lock().unwrap();
*guard = true;
WAIT.1.notify_all();
}
buf_writer.write_all(format!("HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: {key_back}\r\n\r\n").as_bytes())?;
buf_writer.flush()?;
stream.set_nonblocking(true)?;
let close = |buf_writer: &mut BufWriter<&TcpStream>| {
let _ = stream.set_nonblocking(false);
let _ = buf_writer.write_all(&[0x88, 0x80]);
let _ = buf_writer.flush();
log::info!("vlogger connection closed");
let mut guard = WAIT.0.lock().unwrap();
*guard = false;
WAIT.1.notify_all();
Ok(())
};
let mut byte_buf = [0u8; 64];
while let Ok(msg) = {
if let Ok(msg) = rx.try_recv() {
Ok(msg)
}
else {
buf_writer.flush()?;
loop {
match rx.recv_timeout(Duration::from_millis(1000)) {
Ok(msg) => break Ok(msg),
Err(RecvTimeoutError::Timeout) => {
while let Ok(bytes) = buf_reader.read(&mut byte_buf) {
if bytes == 0 || byte_buf[..bytes].iter().any(|b| *b == 0x88) {
return close(&mut buf_writer);
}
}
}
Err(err) => break Err(err),
}
}
}
} {
if msg.is_empty() {
let _ = stream.set_nonblocking(false);
buf_writer.flush()?;
let _ = stream.set_nonblocking(true);
let guard = WAIT.0.lock().unwrap();
WAIT.1.notify_all();
drop(guard);
continue;
}
while let Ok(bytes) = buf_reader.read(&mut byte_buf) {
if bytes == 0 || byte_buf[..bytes].iter().any(|b| *b == 0x88) {
return close(&mut buf_writer);
}
}
if msg.len() < 126 {
buf_writer.write_all(&[0x81, msg.len() as u8])?;
buf_writer.write_all(msg.as_bytes())?;
} else if msg.len() <= u16::MAX as usize {
buf_writer.write_all(&[0x81, 126])?;
buf_writer.write_all(&(msg.len() as u16).to_be_bytes())?;
buf_writer.write_all(msg.as_bytes())?;
} else {
buf_writer.write_all(&[0x81, 127])?;
buf_writer.write_all(&(msg.len() as u64).to_be_bytes())?;
buf_writer.write_all(msg.as_bytes())?;
}
}
} else if path == "/" {
buf_writer.write_all("HTTP/1.1 200 OK\r\n\r\n".as_bytes())?;
buf_writer.write_all(include_bytes!("site.html"))?;
} else {
buf_writer.write_all(
"HTTP/1.1 404 NOT FOUND\r\n\r\n<html><body>Path not found</body></html>".as_bytes(),
)?;
}
} else {
buf_writer.write_all("HTTP/1.1 400 BAD REQUEST\r\n\r\n".as_bytes())?;
}
stream.set_nonblocking(false)?;
buf_writer.flush()?;
Ok(())
}