kanbus 0.8.2

High-performance CLI and web console for the Kanbus issue tracker. Includes kanbus (CLI) and kanbus-console (web UI server).
Documentation
//! Console telemetry streaming helpers.

use std::fs::{create_dir_all, OpenOptions};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};

use reqwest::blocking::Client;
use serde::Deserialize;

use crate::config_loader::load_project_configuration;
use crate::error::KanbusError;
use crate::file_io::get_configuration_path;

#[derive(Debug, Deserialize)]
struct ConsoleTelemetryEvent {
    level: Option<String>,
    message: Option<String>,
    args: Option<Vec<serde_json::Value>>,
    timestamp: Option<String>,
    received_at: Option<String>,
    url: Option<String>,
    session_id: Option<String>,
}

pub fn stream_console_telemetry(
    root: &Path,
    output_override: Option<String>,
    url_override: Option<String>,
) -> Result<(), KanbusError> {
    let output_path = resolve_output_path(root, output_override)?;
    let url = resolve_telemetry_url(root, url_override)?;
    let file = open_output_file(&output_path)?;
    let client = Client::new();
    let response = client
        .get(&url)
        .header("Accept", "text/event-stream")
        .send()
        .map_err(|error| KanbusError::Io(format!("telemetry connection failed: {error}")))?;

    let reader = BufReader::new(response);
    let mut writer = std::io::BufWriter::new(file);

    for line in reader.lines() {
        let line = line.map_err(|error| KanbusError::Io(error.to_string()))?;
        if !line.starts_with("data: ") {
            continue;
        }
        let payload = line.trim_start_matches("data: ").trim();
        let formatted = format_telemetry_line(payload);
        writeln!(writer, "{formatted}").map_err(|error| KanbusError::Io(error.to_string()))?;
        writer
            .flush()
            .map_err(|error| KanbusError::Io(error.to_string()))?;
    }

    Ok(())
}

fn resolve_output_path(
    root: &Path,
    output_override: Option<String>,
) -> Result<PathBuf, KanbusError> {
    let path = if let Some(output) = output_override {
        PathBuf::from(output)
    } else {
        root.join(".kanbus").join("telemetry").join("console.log")
    };
    if let Some(parent) = path.parent() {
        create_dir_all(parent).map_err(|error| KanbusError::Io(error.to_string()))?;
    }
    Ok(path)
}

fn open_output_file(path: &Path) -> Result<std::fs::File, KanbusError> {
    OpenOptions::new()
        .create(true)
        .append(true)
        .open(path)
        .map_err(|error| KanbusError::Io(error.to_string()))
}

fn resolve_telemetry_url(root: &Path, url_override: Option<String>) -> Result<String, KanbusError> {
    if let Some(url) = url_override {
        return Ok(url);
    }
    let config_path = get_configuration_path(root)?;
    let config = load_project_configuration(&config_path)?;
    let port = config.console_port.unwrap_or(5174);
    Ok(format!(
        "http://127.0.0.1:{port}/api/telemetry/console/events"
    ))
}

fn format_telemetry_line(payload: &str) -> String {
    let parsed = serde_json::from_str::<ConsoleTelemetryEvent>(payload);
    match parsed {
        Ok(event) => {
            let timestamp = event
                .timestamp
                .or(event.received_at)
                .unwrap_or_else(|| "unknown-time".to_string());
            let level = event.level.unwrap_or_else(|| "log".to_string());
            let message = event.message.or_else(|| flatten_args(event.args));
            let source = event.url.unwrap_or_else(|| "unknown-source".to_string());
            let session = event
                .session_id
                .unwrap_or_else(|| "unknown-session".to_string());
            let body = message.unwrap_or_else(|| payload.to_string());
            format!("[{timestamp}] [{level}] [{session}] {body} ({source})")
        }
        Err(_) => format!("[unparsed] {payload}"),
    }
}

fn flatten_args(args: Option<Vec<serde_json::Value>>) -> Option<String> {
    let values = args?;
    let parts: Vec<String> = values
        .into_iter()
        .map(|value| match value {
            serde_json::Value::String(text) => text,
            other => other.to_string(),
        })
        .collect();
    if parts.is_empty() {
        None
    } else {
        Some(parts.join(" "))
    }
}