Skip to main content

web_vlog/
lib.rs

1//! `web-vlog` implements `v-log` with the goal of being feature complete but minimal in size.
2//! This goal is achieved by offloading the drawing to a webbrowser. The webpage is served
3//! exactly once before changing to a websocket connection, which handles the potentially
4//! high datarates. This setup doesn't have the performance of a direct GPU renderer, but
5//! it has decent performance at very little compiletime and runtime cost for the vlogging
6//! process itself.
7//!
8//! The webpage uses SVG to render the vlogging surfaces and provides clickable links
9//! to open the relevant lines in VSCode. For VSCodium on Linux [there is a workaround](https://github.com/VSCodium/vscodium/issues/933#issuecomment-1135229015).
10//!
11//! This crate depends on `sha1` and `base64` due to the websocket handshake, which requires both.
12//! **Nothing is encrypted, as this is a debug utility, which should not be shipped in production code.**
13//!
14//! # Usage
15//!
16//! ```
17//! use v_log::message;
18//!
19//! // Initialize the vlogger on any free port.
20//! // This should be done as early as possible in the binary.
21//! let port = web_vlog::init();
22//! println!("Listening on port {port}");
23//!
24//! // Now we need a webbrowser to connect to the port.
25//! // This can be accelerated using the `open` crate.
26//! let _ = open::that(format!("http://localhost:{port}/"));
27//!
28//! // wait for a webbrowser to connect to the port.
29//! web_vlog::wait_for_connection();
30//!
31//! message!(target: "custom_target_1", "surface", "First message");
32//! message!(target: "custom_target_2", "surface", "Second message");
33//! message!(target: "custom_target_2::submodule", "surface", "Third message");
34//! # std::thread::sleep(std::time::Duration::from_millis(100));
35//! ```
36//!
37//! When called without environment variables, all 3 messages will be logged.
38//! Using the environment variable `RUST_VLOG` it is possible to filter by target prefixes.
39//! The environment variable is interpreted as a comma separated list of target prefix filters.
40//! Each filter, allows all targets which start with it to be vlogged. In our example
41//! above, running it with
42//! ```cmd
43//! $ RUST_VLOG=custom_target_1 ./main
44//! ```
45//! would only produce the message "First message". When instead the second target is specified
46//! ```cmd
47//! $ RUST_VLOG=custom_target_2 cargo run
48//! ```
49//! the output is "Second message" and "Third message". This is due to the filter being a prefix filter.
50//! Executing the executable directly with an environment variable, and executing using
51//! `cargo run` both work. This way it is also possible to use filtering in tests using `RUST_VLOG=... cargo test`.
52//! Tests in a library should only use a vlogger implementation as dev-dependency.
53//!
54//! The target filters can also be chosen in the programm using the [`Builder`] to initialize the [`WebVLogger`].
55//! That would be done using the following code:
56//! ```
57//! // Init a vlogger on port 1234, ignoring the environment variable and
58//! // choosing "custom_target_1" as an allowed prefix for the vlogger.
59//! web_vlog::Builder::new().port(1234).add_target("custom_target_1").init().unwrap();
60//! ```
61
62use base64::{prelude::BASE64_STANDARD, Engine};
63use sha1::Digest;
64use std::{
65    fmt::{self, Write as _},
66    io::{self, BufReader, BufWriter, prelude::*},
67    net::*,
68    sync::{
69        Condvar, Mutex, atomic::AtomicBool, mpsc::{Receiver, RecvTimeoutError, Sender, channel}
70    },
71    time::Duration,
72};
73use v_log::{Color, Record, SetVLoggerError, VLog, Visual};
74
75static WAIT: (Mutex<bool>, Condvar) = (Mutex::new(false), Condvar::new());
76static INIT: AtomicBool = AtomicBool::new(false);
77
78/// A builder for [`WebVLogger`].
79pub struct Builder {
80    port: u16,
81    targets: Vec<String>,
82}
83/// A Vlogger implementation, which hosts a webpage for the visualisation.
84pub struct WebVLogger {
85    sender: Sender<String>,
86    targets: Vec<String>,
87}
88
89/// The error type returned by [`init`].
90///
91/// [`init`]: fn.init.html
92#[allow(missing_copy_implementations)]
93#[derive(Debug)]
94pub enum InitError {
95    SetVLoggerError(SetVLoggerError),
96    TcpError(io::Error),
97}
98
99impl fmt::Display for InitError {
100    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
101        match self {
102            Self::SetVLoggerError(e) => e.fmt(f),
103            Self::TcpError(e) => e.fmt(f),
104        }
105    }
106}
107
108impl std::error::Error for InitError {}
109
110impl From<SetVLoggerError> for InitError {
111    fn from(value: SetVLoggerError) -> Self {
112        Self::SetVLoggerError(value)
113    }
114}
115impl From<io::Error> for InitError {
116    fn from(value: io::Error) -> Self {
117        Self::TcpError(value)
118    }
119}
120
121impl Builder {
122    /// Create a new [`Builder`] for [`WebVLogger`] with
123    /// the default port `0`, which means the OS will choose the port.
124    pub fn new() -> Self {
125        Self {
126            port: 0,
127            targets: vec![],
128        }
129    }
130    /// Set the port on which the server will be made available.
131    ///
132    /// If set to 0, an available port will be choosen by the OS.
133    pub fn port(&mut self, port: u16) -> &mut Self {
134        self.port = port;
135        self
136    }
137    /// Add a target to the target whitelist.
138    /// If the whitelist is left empty, all targets are allowed.
139    pub fn add_target(&mut self, target: &str) -> &mut Self {
140        self.targets.push(target.to_owned());
141        self
142    }
143    /// Read the targets from the
144    pub fn targets_from_env(&mut self) -> &mut Self {
145        if let Ok(var) = std::env::var("RUST_VLOG") {
146            for target in var.split(",") {
147                let target = target.trim();
148                if !target.is_empty() {
149                    self.add_target(target);
150                }
151            }
152        }
153        self
154    }
155    /// Initialize the [`WebVLogger`] and set it as the global vlogger for [`v_log`].
156    ///
157    /// Returns the actual port, which the server runs on.
158    /// This is only relevant if the port was set to 0.
159    ///
160    /// # Errors
161    ///
162    /// If the global vlogger has already been set an [`InitError::SetVLoggerError`] is returned.
163    /// If the server could not be started on the chosen port, the [`std::io::Error`] is returned inside [`InitError::TcpError`].
164    pub fn init(&self) -> Result<u16, InitError> {
165        let port = self.port;
166        let (sender, rx) = channel();
167        let mut vlogger = WebVLogger {
168            sender,
169            targets: self.targets.clone(),
170        };
171        vlogger.targets.sort();
172        vlogger.targets.dedup();
173        // first try to set the vlogger.
174        v_log::set_boxed_vlogger(Box::new(vlogger))?;
175        INIT.store(true, std::sync::atomic::Ordering::SeqCst);
176        // then try to open the port on localhost
177        // If this fails, the `rx` will be dropped.
178        // The vlogger will therefore stop.
179        let listener = TcpListener::bind(("localhost", port))?;
180        let addr = listener.local_addr()?;
181        log::info!("web-vlog server started on {addr}");
182        // If the vlogger is successfully set, start the webserver.
183        std::thread::spawn(move || {
184            server_loop(listener, rx);
185        });
186        if port != 0 {
187            assert_eq!(port, addr.port());
188        }
189        Ok(addr.port())
190    }
191}
192
193impl VLog for WebVLogger {
194    fn enabled(&self, metadata: &v_log::Metadata) -> bool {
195        self.targets.is_empty()
196            || self
197                .targets
198                .iter()
199                .any(|target| metadata.target().starts_with(target))
200    }
201    fn vlog(&self, record: &Record) {
202        if !self.enabled(record.metadata()) {
203            return;
204        }
205        // convert the record into a message to be send to the frontend.
206        let surface = record.surface().escape_default();
207        let size = record.size();
208        let color_meta = |start| {
209            let mut msg = format!(
210                "{start},\"surf\":\"{surface}\",\"meta\":{{\"target\":\"{}\",\"file\":\"{}\",\"line\":{}}},\"col\":\"",
211                record.target().escape_default(),
212                record
213                    .file()
214                    .unwrap_or("")
215                    .trim_start_matches('.')
216                    .escape_default(),
217                record.line().unwrap_or(0),
218            );
219            match *record.color() {
220                Color::Base => msg.push_str("var(--base)\"}"),
221                Color::Healthy => msg.push_str("var(--healthy)\"}"),
222                Color::Error => msg.push_str("var(--error)\"}"),
223                Color::Warn => msg.push_str("var(--warn)\"}"),
224                Color::Info => msg.push_str("var(--info)\"}"),
225                Color::X => msg.push_str("var(--x)\"}"),
226                Color::Y => msg.push_str("var(--y)\"}"),
227                Color::Z => msg.push_str("var(--z)\"}"),
228                Color::Missing => msg.push_str("var(--mis)\"}"),
229                Color::Hex(hexcode) => write!(&mut msg, "#{hexcode:08X}\"}}").unwrap(),
230                _ => msg.push_str("#000\"}"), // unknown -> black, as Missing is already pink
231            }
232            msg
233        };
234        let mut tmp = String::new();
235        let label = record.args().as_str().map_or_else(
236            || {
237                tmp = record.args().to_string();
238                tmp.escape_default()
239            },
240            |s| s.escape_default(),
241        );
242        let msg = match record.visual() {
243            Visual::Message => {
244                color_meta(format_args!("{{\"msg\":\"{label}\""))
245            }
246            Visual::Label { x, y, z, alignment } => {
247                if record.args().as_str().map_or(false, |s| s.is_empty()) {
248                    return; // ignore empty labels
249                }
250                color_meta(format_args!("{{\"lbl\":\"{label}\",\"pos\":[{x},{y},{z}],\"align\":{},\"size\":{size}", *alignment as u8))
251            }
252            Visual::Point { x, y, z, style } => {
253                color_meta(format_args!("{{\"lbl\":\"{label}\",\"pos\":[{x},{y},{z}],\"style\":\"{style:?}\",\"size\":{size}"))
254            }
255            Visual::Line { x1, y1, z1, x2, y2, z2, style } => {
256                color_meta(format_args!("{{\"lbl\":\"{label}\",\"pos\":[{x1},{y1},{z1}],\"pos2\":[{x2},{y2},{z2}],\"style\":\"{style:?}\",\"size\":{size}"))
257            }
258        };
259        // If the receiver is dropped, the messages will still be constructed, but no longer sent.
260        // This case doesn't have to be optimized with an early return, as it's the error state.
261        let _ = self.sender.send(msg);
262    }
263    fn clear(&self, surface: &str) {
264        let _ = self.sender.send(format!(
265            "{{\"clear\":1,\"surf\":\"{}\"}}",
266            surface.escape_default()
267        ));
268    }
269    fn flush(&self) {
270        let lock = WAIT.0.lock().unwrap();
271        if *lock {
272            if let Ok(_) = self.sender.send(String::new()) {
273                let _lock = WAIT.1.wait(lock).unwrap();
274            }
275        }
276    }
277}
278
279/// Initialise the vlogger with a custom port and otherwise default configuation.
280/// If the custom port is set to 0, a free port will be choosen by the OS and
281/// returned by this function. This function never panics.
282///
283/// Vlog messages will not be filtered.
284/// The `RUST_VLOG` environment variable is not used.
285pub fn init_port(port: u16) -> Result<u16, InitError> {
286    Builder::new().port(port).init()
287}
288
289/// Initialise the vlogger with the default configuation.
290/// The target whitelist gets loaded from the environment variable
291/// `RUST_VLOG`. If it is not set, all targets are whitelisted.
292///
293/// Returns the port at which the server is made available.
294///
295/// # Panics
296///
297/// This function will panic if the vlogger has already been
298/// set or the server could not be started. For a non panicking
299/// version see [`init_port`].
300pub fn init() -> u16 {
301    Builder::new().targets_from_env().init().unwrap()
302}
303
304/// Wait for a client to connect to the vlogging server.
305/// This blocks indefinitely if no server has been started.
306pub fn wait_for_connection() {
307    if INIT.load(std::sync::atomic::Ordering::SeqCst) {
308        let lock = WAIT.0.lock().unwrap();
309        let _lock = WAIT.1.wait_while(lock, |v| !*v).unwrap();
310    }
311}
312/// Wait for the client to disconnect from the vlogging server.
313/// This can be used to ensure all messages have been received.
314pub fn wait_for_disconnect() {
315    let lock = WAIT.0.lock().unwrap();
316    let _lock = WAIT.1.wait_while(lock, |v| *v).unwrap();
317}
318/// Wait for the client to disconnect from the vlogging server.
319///
320/// Returns true on success and false if it timed out.
321pub fn wait_for_disconnect_timeout(dur: Duration) -> bool {
322    let lock = WAIT.0.lock().unwrap();
323    let lock = WAIT.1.wait_timeout_while(lock, dur, |v| *v).unwrap();
324    !lock.1.timed_out()
325}
326
327fn server_loop(listener: TcpListener, rx: Receiver<String>) {
328    // It's ok to panic in this thread to notify the user that something went wrong.
329    while let Ok((mut stream, addr)) = listener.accept() {
330        log::info!("vlogger connection from {addr}");
331        if let Err(err) = handle_connection(&stream, &rx) {
332            if let Err(err) = stream
333                .write_all(format!("HTTP/1.1 500 INTERNAL SERVER ERROR\r\n\r\n{err}").as_bytes())
334            {
335                log::error!("an error occurred: {err:?}");
336            }
337        }
338    }
339}
340
341fn handle_connection(stream: &TcpStream, rx: &Receiver<String>) -> std::io::Result<()> {
342    let mut buf_reader = BufReader::new(stream);
343    let mut buf_writer = BufWriter::new(stream);
344    // only use the first line
345    let mut buf = String::new();
346    let mut http_request = String::new();
347    let mut key_back = String::new();
348    while let Ok(bytes) = buf_reader.read_line(&mut buf) {
349        let l = buf.trim_end();
350        log::debug!("{l}");
351        if bytes == 0 || l.is_empty() {
352            break;
353        }
354        if http_request.is_empty() {
355            http_request.push_str(l);
356        }
357        // see https://datatracker.ietf.org/doc/html/rfc6455
358        else if let Some(key) = l.strip_prefix("Sec-WebSocket-Key: ") {
359            let key = key.to_owned() + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
360            let digest = sha1::Sha1::digest(key);
361            key_back = BASE64_STANDARD.encode(digest);
362        }
363        buf.clear();
364    }
365    let (get, rest) = http_request.split_once(' ').unwrap_or(("", ""));
366    let (path, http) = rest.split_once(' ').unwrap_or(("", ""));
367    if get == "GET" && http == "HTTP/1.1" {
368        if !key_back.is_empty() {
369            log::debug!("vlogging client connected");
370            {
371                let mut guard = WAIT.0.lock().unwrap();
372                *guard = true;
373                WAIT.1.notify_all();
374            }
375            buf_writer.write_all(format!("HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: {key_back}\r\n\r\n").as_bytes())?;
376            buf_writer.flush()?;
377            stream.set_nonblocking(true)?;
378            let close = |buf_writer: &mut BufWriter<&TcpStream>| {
379                // ignore IO errors here, as the condvar needs to be notified.
380                let _ = stream.set_nonblocking(false);
381                let _ = buf_writer.write_all(&[0x88, 0x80]);
382                let _ = buf_writer.flush();
383                log::info!("vlogger connection closed");
384                let mut guard = WAIT.0.lock().unwrap();
385                *guard = false;
386                WAIT.1.notify_all();
387                Ok(())
388            };
389            let mut byte_buf = [0u8; 64];
390            while let Ok(msg) = {
391                if let Ok(msg) = rx.try_recv() {
392                    Ok(msg)
393                }
394                else {
395                    buf_writer.flush()?;
396                    loop {
397                        match rx.recv_timeout(Duration::from_millis(1000)) {
398                            Ok(msg) => break Ok(msg),
399                            Err(RecvTimeoutError::Timeout) => {
400                                while let Ok(bytes) = buf_reader.read(&mut byte_buf) {
401                                    // don't parse it properly. Only ever expect close events to happen.
402                                    // if bytes = 0, the connection has ended already without the closing message.
403                                    if bytes == 0 || byte_buf[..bytes].iter().any(|b| *b == 0x88) {
404                                        // close the connection correctly so the server can listen for a new connection.
405                                        // Note, the current message is lost, just like all previously sent messages.
406                                        return close(&mut buf_writer);
407                                    }
408                                }
409                            }
410                            Err(err) => break Err(err),
411                        }
412                    }
413                }
414             } {
415                if msg.is_empty() {
416                    // this is a message to this thread, that the main wants to flush.
417                    // Since it has now been processed, notify the main thread (it's waiting)
418                    let _ = stream.set_nonblocking(false);
419                    buf_writer.flush()?;
420                    let _ = stream.set_nonblocking(true);
421                    let guard = WAIT.0.lock().unwrap();
422                    WAIT.1.notify_all();
423                    drop(guard);
424                    continue;
425                }
426                // first check if a socket close is received
427                while let Ok(bytes) = buf_reader.read(&mut byte_buf) {
428                    // don't parse it properly. Only ever expect close events to happen.
429                    // if bytes = 0, the connection has ended already without the closing message.
430                    if bytes == 0 || byte_buf[..bytes].iter().any(|b| *b == 0x88) {
431                        // close the connection correctly so the server can listen for a new connection.
432                        // Note, the current message is lost, just like all previously sent messages.
433                        return close(&mut buf_writer);
434                    }
435                }
436                // send message
437                if msg.len() < 126 {
438                    buf_writer.write_all(&[0x81, msg.len() as u8])?;
439                    buf_writer.write_all(msg.as_bytes())?;
440                } else if msg.len() <= u16::MAX as usize {
441                    buf_writer.write_all(&[0x81, 126])?;
442                    buf_writer.write_all(&(msg.len() as u16).to_be_bytes())?;
443                    buf_writer.write_all(msg.as_bytes())?;
444                } else {
445                    buf_writer.write_all(&[0x81, 127])?;
446                    buf_writer.write_all(&(msg.len() as u64).to_be_bytes())?;
447                    buf_writer.write_all(msg.as_bytes())?;
448                }
449            }
450        } else if path == "/" {
451            buf_writer.write_all("HTTP/1.1 200 OK\r\n\r\n".as_bytes())?;
452            buf_writer.write_all(include_bytes!("site.html"))?;
453        } else {
454            buf_writer.write_all(
455                "HTTP/1.1 404 NOT FOUND\r\n\r\n<html><body>Path not found</body></html>".as_bytes(),
456            )?;
457        }
458    } else {
459        buf_writer.write_all("HTTP/1.1 400 BAD REQUEST\r\n\r\n".as_bytes())?;
460    }
461    stream.set_nonblocking(false)?;
462    buf_writer.flush()?;
463    Ok(())
464}