terrazzo-terminal 0.2.8

A simple web-based terminal emulator built on Terrazzo.
use server_fn::Http;
use server_fn::ServerFnError;
use server_fn::codec::Json;
use server_fn::codec::StreamingText;
use server_fn::codec::TextStream;
use terrazzo::server;

use crate::api::client_address::ClientAddress;

#[server(protocol = Http<Json, StreamingText>)]
pub async fn stream(remote: ClientAddress) -> Result<TextStream<ServerFnError>, ServerFnError> {
    imp::stream_logs(remote).await
}

#[cfg(feature = "server")]
mod imp {
    use futures::Stream;
    use futures::StreamExt as _;
    use futures::TryStreamExt as _;
    use nameth::nameth;
    use scopeguard::guard;
    use server_fn::ServerFnError;
    use server_fn::codec::TextStream;
    use tracing::info;

    use crate::api::client_address::ClientAddress;
    use crate::backend::client_service::remote_fn_service;
    use crate::logs::event::LogEvent;
    use crate::logs::state::LogState;
    use crate::utils::ndjson_utils::serialize_line;

    #[nameth]
    pub(super) async fn stream_logs(
        remote: ClientAddress,
    ) -> Result<TextStream<ServerFnError>, ServerFnError> {
        let stream = STREAM_LOGS_FN.call(remote, ()).await?;
        let stream = stream.map_ok(|event| serialize_log_event(&event));
        Ok(TextStream::new(stream.map_err(|error| error.into())))
    }

    remote_fn_service::streaming::declare_remote_fn!(
        STREAM_LOGS_FN,
        STREAM_LOGS,
        (),
        LogEvent,
        |_server, ()| { local_logs_stream().map(Ok::<LogEvent, tonic::Status>) }
    );

    fn local_logs_stream() -> impl Stream<Item = LogEvent> {
        info!("Log stream start");
        let end = guard((), |_| info!("Log stream end"));
        let subscription = LogState::get().subscribe();
        let stream = futures::stream::unfold(subscription, |mut subscription| async move {
            let next = if let Some(event) = subscription.backlog.pop_front() {
                Some(event)
            } else {
                subscription.receiver.recv().await
            }?;
            Some(((*next).clone(), subscription))
        });
        stream.inspect(move |_log_event: &LogEvent| {
            let _ = &end;
        })
    }

    fn serialize_log_event(event: &LogEvent) -> String {
        serialize_line(event).unwrap_or_else(|error| {
            serialize_line(&LogEvent {
                id: event.id,
                level: event.level,
                message: format!("Failed to serialize log event: {error}"),
                timestamp_ms: event.timestamp_ms,
                file: None,
            })
            .expect("serialize fallback log event")
        })
    }
}

#[cfg(test)]
#[cfg(feature = "server")]
mod tests {
    use futures::StreamExt as _;
    use tracing::info;
    use tracing::warn;

    use crate::api::client_address::ClientAddress;
    use crate::logs::stream::stream;
    use crate::logs::tests::TestGuard;

    #[tokio::test]
    async fn stream_logs_replays_backlog_and_then_live_events() {
        let guard = TestGuard::get();
        guard.with_test_subscriber(|| {
            info!("backlog");
        });

        let mut stream = stream(ClientAddress::default())
            .await
            .expect("stream")
            .into_inner();
        let backlog = stream.next().await.expect("item").expect("data");
        assert!(
            backlog.contains("backlog"),
            "Expected {backlog} contains backlog"
        );

        guard.with_test_subscriber(|| {
            warn!("live");
        });

        let live = stream.next().await.expect("item").expect("data");
        assert!(live.contains("live"), "Expected {live} contains live");
    }
}