pitchfork-cli 2.13.1

Daemons with DX
Documentation
use axum::{
    extract::Path,
    response::sse::{Event, KeepAlive, Sse},
};
use std::convert::Infallible;

use crate::daemon::is_valid_daemon_id;
use crate::daemon_id::DaemonId;
use crate::log_store::sqlite::LOG_STORE;
use crate::log_store::{LogQuery, LogStore};
use crate::settings::settings;
use console;

pub async fn stream_sse(
    Path(id): Path<String>,
) -> Sse<impl tokio_stream::Stream<Item = Result<Event, Infallible>>> {
    let sse_poll_interval = settings().web_sse_poll_interval();

    let stream = async_stream::stream! {
        if !is_valid_daemon_id(&id) {
            yield Ok(Event::default().event("error").data("invalid daemon id"));
            return;
        }

        let daemon_id = match DaemonId::parse(&id) {
            Ok(d) => d,
            Err(_) => {
                yield Ok(Event::default().event("error").data("invalid daemon id"));
                return;
            }
        };

        let mut last_id: i64 = match tokio::task::spawn_blocking({
            let d = daemon_id.clone();
            move || LOG_STORE.query(&LogQuery {
                daemon_ids: vec![d.qualified()],
                from: None,
                to: None,
                limit: Some(1),
                order_desc: true,
                after_id: None,
            })
        }).await {
            Ok(Ok(entries)) => entries.first().map(|e| e.id).unwrap_or(0),
            _ => 0,
        };

        let mut last_clear_gen: u64 = match tokio::task::spawn_blocking({
            let d = daemon_id.clone();
            move || LOG_STORE.last_clear_generation(&d)
        }).await {
            Ok(Ok(Some(g))) => g,
            _ => 0,
        };

        loop {
            tokio::time::sleep(sse_poll_interval).await;

            let current_gen: u64 = match tokio::task::spawn_blocking({
                let d = daemon_id.clone();
                move || LOG_STORE.last_clear_generation(&d)
            }).await {
                Ok(Ok(Some(g))) => g,
                _ => 0,
            };

            if current_gen != last_clear_gen {
                last_clear_gen = current_gen;
                last_id = 0;
                yield Ok(Event::default().event("clear").data(""));
                continue;
            }

            const BATCH_SIZE: usize = 500;
            let entries = match tokio::task::spawn_blocking({
                let d = daemon_id.clone();
                move || LOG_STORE.query(&LogQuery {
                    daemon_ids: vec![d.qualified()],
                    from: None,
                    to: None,
                    limit: Some(BATCH_SIZE),
                    order_desc: false,
                    after_id: Some(last_id),
                })
            }).await {
                Ok(Ok(e)) => e,
                _ => continue,
            };

            for entry in entries {
                last_id = entry.id;
                let ts = entry.timestamp.format("%Y-%m-%d %H:%M:%S");
                let stripped = console::strip_ansi_codes(&entry.message);
                yield Ok(Event::default().event("message").data(format!("{ts} {stripped}")));
            }
        }
    };

    Sse::new(stream).keep_alive(KeepAlive::default())
}