terrazzo-terminal 0.2.8

A simple web-based terminal emulator built on Terrazzo.
use std::collections::VecDeque;
use std::sync::Arc;

use futures::StreamExt as _;
use futures::future::AbortHandle;
use futures::future::Abortable;
use scopeguard::defer;
use server_fn::ServerFnError;
use server_fn::codec::TextStream;
use terrazzo::prelude::XSignal;
use terrazzo::prelude::diagnostics;
use wasm_bindgen_futures::spawn_local;

use self::diagnostics::debug;
use self::diagnostics::warn;
use crate::frontend::remotes::Remote;
use crate::logs::event::LogEvent;
use crate::utils::ndjson::NdjsonBuffer;

const MAX_LOGS: usize = if cfg!(debug_assertions) { 25 } else { 1000 };

pub struct LogsEngine {
    logs: XSignal<Arc<VecDeque<ClientLogEvent>>>,
    abort_handle: AbortHandle,
}

impl LogsEngine {
    pub fn new(remote: Remote) -> Self {
        let logs = XSignal::new("log-events", Arc::default());
        let (abort_handle, abort_registration) = AbortHandle::new_pair();

        let consume_stream = {
            let logs = logs.clone();
            async move {
                let Ok(stream) = crate::logs::stream::stream(remote)
                    .await
                    .inspect_err(|error| warn!("Failed to start log stream: {error}"))
                else {
                    return;
                };
                consume_stream(logs, stream).await;
            }
        };
        spawn_local(async move {
            match Abortable::new(consume_stream, abort_registration).await {
                Ok(()) => debug!("Logs stream finished"),
                Err(_) => debug!("Logs stream aborted"),
            }
        });

        Self { logs, abort_handle }
    }

    pub fn logs(&self) -> XSignal<Arc<VecDeque<ClientLogEvent>>> {
        self.logs.clone()
    }
}

impl Drop for LogsEngine {
    fn drop(&mut self) {
        debug!("Dropping LogsEngine, aborting log stream");
        self.abort_handle.abort();
    }
}

async fn consume_stream(
    logs: XSignal<Arc<VecDeque<ClientLogEvent>>>,
    stream: TextStream<ServerFnError>,
) {
    debug!("Start");
    defer!(debug!("End"));
    let mut parser = NdjsonBuffer::<LogEvent>::default();
    let mut stream = stream.into_inner();
    while let Some(chunk) = stream.next().await {
        match chunk {
            Ok(chunk) => {
                let mut new_logs = vec![];
                for event in parser.push_chunk(&chunk) {
                    match event {
                        Ok(event) => new_logs.push(ClientLogEvent::new(event)),
                        Err(error) => warn!("Failed to parse log stream line: {error}"),
                    }
                }
                if new_logs.is_empty() {
                    continue;
                }
                logs.update(|current| {
                    let mut current = current.as_ref().clone();
                    current.extend(new_logs);
                    if current.len() > MAX_LOGS {
                        let start = current.len().saturating_sub(20);
                        current.drain(..start);
                    }
                    Some(Arc::new(current))
                });
            }
            Err(error) => {
                warn!("Log stream failed: {error}");
                return;
            }
        }
    }
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ClientLogEvent {
    event: LogEvent,
    pub received_at_ms: u64,
}

impl ClientLogEvent {
    pub(super) fn new(event: LogEvent) -> Self {
        Self {
            event,
            received_at_ms: web_sys::js_sys::Date::now() as u64,
        }
    }
}

impl std::ops::Deref for ClientLogEvent {
    type Target = LogEvent;

    fn deref(&self) -> &Self::Target {
        &self.event
    }
}