1use base64::{prelude::BASE64_STANDARD, Engine};
63use sha1::Digest;
64use std::{
65 fmt::{self, Write as _},
66 io::{self, prelude::*, BufReader, BufWriter},
67 net::*,
68 sync::{
69 atomic::AtomicBool,
70 mpsc::{channel, Receiver, Sender},
71 Condvar, Mutex,
72 },
73 time::Duration,
74};
75use v_log::{Color, Record, SetVLoggerError, VLog, Visual};
76
77static WAIT: (Mutex<bool>, Condvar) = (Mutex::new(false), Condvar::new());
78static INIT: AtomicBool = AtomicBool::new(false);
79
80pub struct Builder {
82 port: u16,
83 targets: Vec<String>,
84}
85pub struct WebVLogger {
87 sender: Sender<String>,
88 targets: Vec<String>,
89}
90
91#[allow(missing_copy_implementations)]
95#[derive(Debug)]
96pub enum InitError {
97 SetVLoggerError(SetVLoggerError),
98 TcpError(io::Error),
99}
100
101impl fmt::Display for InitError {
102 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
103 match self {
104 Self::SetVLoggerError(e) => e.fmt(f),
105 Self::TcpError(e) => e.fmt(f),
106 }
107 }
108}
109
110impl std::error::Error for InitError {}
111
112impl From<SetVLoggerError> for InitError {
113 fn from(value: SetVLoggerError) -> Self {
114 Self::SetVLoggerError(value)
115 }
116}
117impl From<io::Error> for InitError {
118 fn from(value: io::Error) -> Self {
119 Self::TcpError(value)
120 }
121}
122
123impl Builder {
124 pub fn new() -> Self {
127 Self {
128 port: 0,
129 targets: vec![],
130 }
131 }
132 pub fn port(&mut self, port: u16) -> &mut Self {
136 self.port = port;
137 self
138 }
139 pub fn add_target(&mut self, target: &str) -> &mut Self {
142 self.targets.push(target.to_owned());
143 self
144 }
145 pub fn targets_from_env(&mut self) -> &mut Self {
147 if let Ok(var) = std::env::var("RUST_VLOG") {
148 for target in var.split(",") {
149 let target = target.trim();
150 if !target.is_empty() {
151 self.add_target(target);
152 }
153 }
154 }
155 self
156 }
157 pub fn init(&self) -> Result<u16, InitError> {
167 let port = self.port;
168 let (sender, rx) = channel();
169 let mut vlogger = WebVLogger {
170 sender,
171 targets: self.targets.clone(),
172 };
173 vlogger.targets.sort();
174 vlogger.targets.dedup();
175 v_log::set_boxed_vlogger(Box::new(vlogger))?;
177 INIT.store(true, std::sync::atomic::Ordering::SeqCst);
178 let listener = TcpListener::bind(("localhost", port))?;
182 let addr = listener.local_addr()?;
183 log::info!("web-vlog server started on {addr}");
184 std::thread::spawn(move || {
186 server_loop(listener, rx);
187 });
188 if port != 0 {
189 assert_eq!(port, addr.port());
190 }
191 Ok(addr.port())
192 }
193}
194
195impl VLog for WebVLogger {
196 fn enabled(&self, metadata: &v_log::Metadata) -> bool {
197 self.targets.is_empty()
198 || self
199 .targets
200 .iter()
201 .any(|target| metadata.target().starts_with(target))
202 }
203 fn vlog(&self, record: &Record) {
204 if !self.enabled(record.metadata()) {
205 return;
206 }
207 let surface = record.surface().escape_default();
209 let size = record.size();
210 let color_meta = |start| {
211 let mut msg = format!(
212 "{start},\"surf\":\"{surface}\",\"meta\":{{\"target\":\"{}\",\"file\":\"{}\",\"line\":{}}},\"col\":\"",
213 record.target().escape_default(),
214 record
215 .file()
216 .unwrap_or("")
217 .trim_start_matches('.')
218 .escape_default(),
219 record.line().unwrap_or(0),
220 );
221 match *record.color() {
222 Color::Base => msg.push_str("var(--base)\"}"),
223 Color::Healthy => msg.push_str("var(--healthy)\"}"),
224 Color::Error => msg.push_str("var(--error)\"}"),
225 Color::Warn => msg.push_str("var(--warn)\"}"),
226 Color::Info => msg.push_str("var(--info)\"}"),
227 Color::X => msg.push_str("var(--x)\"}"),
228 Color::Y => msg.push_str("var(--y)\"}"),
229 Color::Z => msg.push_str("var(--z)\"}"),
230 Color::Missing => msg.push_str("var(--mis)\"}"),
231 Color::Hex(hexcode) => write!(&mut msg, "#{hexcode:08X}\"}}").unwrap(),
232 _ => msg.push_str("#000\"}"), }
234 msg
235 };
236 let mut tmp = String::new();
237 let label = record.args().as_str().map_or_else(
238 || {
239 tmp = record.args().to_string();
240 tmp.escape_default()
241 },
242 |s| s.escape_default(),
243 );
244 let msg = match record.visual() {
245 Visual::Message => {
246 color_meta(format_args!("{{\"msg\":\"{label}\""))
247 }
248 Visual::Label { x, y, z, alignment } => {
249 color_meta(format_args!("{{\"lbl\":\"{label}\",\"pos\":[{x},{y},{z}],\"align\":{},\"size\":{size}", *alignment as u8))
250 }
251 Visual::Point { x, y, z, style } => {
252 color_meta(format_args!("{{\"lbl\":\"{label}\",\"pos\":[{x},{y},{z}],\"style\":\"{style:?}\",\"size\":{size}"))
253 }
254 Visual::Line { x1, y1, z1, x2, y2, z2, style } => {
255 color_meta(format_args!("{{\"lbl\":\"{label}\",\"pos\":[{x1},{y1},{z1}],\"pos2\":[{x2},{y2},{z2}],\"style\":\"{style:?}\",\"size\":{size}"))
256 }
257 };
258 let _ = self.sender.send(msg);
261 }
262 fn clear(&self, surface: &str) {
263 let _ = self.sender.send(format!(
264 "{{\"clear\":1,\"surf\":\"{}\"}}",
265 surface.escape_default()
266 ));
267 }
268 fn flush(&self) {
269 let lock = WAIT.0.lock().unwrap();
270 if let Ok(_) = self.sender.send(String::new()) {
271 let _lock = WAIT.1.wait_while(lock, |v| *v).unwrap();
272 }
273 }
274}
275
276pub fn init_port(port: u16) -> Result<u16, InitError> {
283 Builder::new().port(port).init()
284}
285
286pub fn init() -> u16 {
298 Builder::new().targets_from_env().init().unwrap()
299}
300
301pub fn wait_for_connection() {
304 if INIT.load(std::sync::atomic::Ordering::SeqCst) {
305 let lock = WAIT.0.lock().unwrap();
306 let _lock = WAIT.1.wait_while(lock, |v| !*v).unwrap();
307 }
308}
309pub fn wait_for_disconnect() {
312 let lock = WAIT.0.lock().unwrap();
313 let _lock = WAIT.1.wait_while(lock, |v| *v).unwrap();
314}
315pub fn wait_for_disconnect_timeout(dur: Duration) -> bool {
319 let lock = WAIT.0.lock().unwrap();
320 let lock = WAIT.1.wait_timeout_while(lock, dur, |v| *v).unwrap();
321 !lock.1.timed_out()
322}
323
324fn server_loop(listener: TcpListener, rx: Receiver<String>) {
325 while let Ok((mut stream, addr)) = listener.accept() {
327 log::info!("vlogger connection from {addr}");
328 if let Err(err) = handle_connection(&stream, &rx) {
329 if let Err(err) = stream
330 .write_all(format!("HTTP/1.1 500 INTERNAL SERVER ERROR\r\n\r\n{err}").as_bytes())
331 {
332 log::error!("an error occurred: {err:?}");
333 }
334 }
335 }
336}
337
338fn handle_connection(stream: &TcpStream, rx: &Receiver<String>) -> std::io::Result<()> {
339 let mut buf_reader = BufReader::new(stream);
340 let mut buf_writer = BufWriter::new(stream);
341 let mut buf = String::new();
343 let mut http_request = String::new();
344 let mut key_back = String::new();
345 while let Ok(bytes) = buf_reader.read_line(&mut buf) {
346 let l = buf.trim_end();
347 log::debug!("{l}");
348 if bytes == 0 || l.is_empty() {
349 break;
350 }
351 if http_request.is_empty() {
352 http_request.push_str(l);
353 }
354 else if let Some(key) = l.strip_prefix("Sec-WebSocket-Key: ") {
356 let key = key.to_owned() + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
357 let digest = sha1::Sha1::digest(key);
358 key_back = BASE64_STANDARD.encode(digest);
359 }
360 buf.clear();
361 }
362 let (get, rest) = http_request.split_once(' ').unwrap_or(("", ""));
363 let (path, http) = rest.split_once(' ').unwrap_or(("", ""));
364 if get == "GET" && http == "HTTP/1.1" {
365 if !key_back.is_empty() {
366 log::debug!("vlogging client connected");
367 {
368 let mut guard = WAIT.0.lock().unwrap();
369 *guard = true;
370 WAIT.1.notify_all();
371 }
372 buf_writer.write_all(format!("HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: {key_back}\r\n\r\n").as_bytes())?;
373 buf_writer.flush()?;
374 stream.set_nonblocking(true)?;
375 let close = |buf_writer: &mut BufWriter<&TcpStream>| {
376 let _ = stream.set_nonblocking(false);
378 let _ = buf_writer.write_all(&[0x88, 0x80]);
379 let _ = buf_writer.flush();
380 log::info!("vlogger connection closed");
381 let mut guard = WAIT.0.lock().unwrap();
382 *guard = false;
383 WAIT.1.notify_all();
384 Ok(())
385 };
386 let mut byte_buf = [0u8; 64];
387 while let Ok(msg) = rx.recv() {
388 if msg.is_empty() {
389 return close(&mut buf_writer);
392 }
393 while let Ok(bytes) = buf_reader.read(&mut byte_buf) {
395 if bytes == 0 || byte_buf[..bytes].iter().any(|b| *b == 0x88) {
398 return close(&mut buf_writer);
400 }
401 }
402 if msg.len() < 126 {
404 buf_writer.write_all(&[0x81, msg.len() as u8])?;
405 buf_writer.write_all(msg.as_bytes())?;
406 } else if msg.len() <= u16::MAX as usize {
407 buf_writer.write_all(&[0x81, 126])?;
408 buf_writer.write_all(&(msg.len() as u16).to_be_bytes())?;
409 buf_writer.write_all(msg.as_bytes())?;
410 } else {
411 buf_writer.write_all(&[0x81, 127])?;
412 buf_writer.write_all(&(msg.len() as u64).to_be_bytes())?;
413 buf_writer.write_all(msg.as_bytes())?;
414 }
415 buf_writer.flush()?;
416 }
417 } else if path == "/" {
418 buf_writer.write_all("HTTP/1.1 200 OK\r\n\r\n".as_bytes())?;
419 buf_writer.write_all(include_bytes!("site.html"))?;
420 } else {
421 buf_writer.write_all(
422 "HTTP/1.1 404 NOT FOUND\r\n\r\n<html><body>Path not found</body></html>".as_bytes(),
423 )?;
424 }
425 } else {
426 buf_writer.write_all("HTTP/1.1 400 BAD REQUEST\r\n\r\n".as_bytes())?;
427 }
428 stream.set_nonblocking(false)?;
429 buf_writer.flush()?;
430 Ok(())
431}