cufflink-cli 0.11.2

CLI for the Cufflink CRUD microservice platform — deploy, init, and manage services
use crate::config::CliConfig;
use crate::project_config::ProjectConfig;

pub struct LogParams<'a> {
    pub follow: bool,
    pub since: Option<&'a str>,
    pub from: Option<&'a str>,
    pub to: Option<&'a str>,
    pub search: Option<&'a str>,
    pub level: Option<&'a str>,
    pub limit: Option<usize>,
    pub output: Option<&'a str>,
}

fn resolve_service_name() -> eyre::Result<String> {
    if let Some(project) = ProjectConfig::find_and_load()? {
        if let Some(name) = project.service.name.as_deref() {
            return Ok(name.to_string());
        }
    }

    let output = std::process::Command::new("cargo")
        .args(["run", "--", "--emit-manifest"])
        .output()?;

    if !output.status.success() {
        eyre::bail!("Cannot determine service name. Run from a cufflink service directory.");
    }

    let stdout = String::from_utf8(output.stdout)?;
    let manifest: serde_json::Value = serde_json::from_str(stdout.trim())?;
    manifest["name"]
        .as_str()
        .map(String::from)
        .ok_or_else(|| eyre::eyre!("No service name in manifest"))
}

pub async fn run(params: LogParams<'_>, env: Option<&str>) -> eyre::Result<()> {
    let config = CliConfig::load_with_env(env)?;
    if let Some(ref name) = config.env_name {
        println!("Environment: {}", name);
    }

    let service_name = resolve_service_name()?;
    let service_id = config.find_service_id(&service_name).await?;

    if params.follow && params.since.is_none() && params.from.is_none() && params.to.is_none() {
        return tail_logs(&config, &service_id, params.search, params.level).await;
    }

    query_logs(&config, &service_id, &params).await
}

async fn query_logs(
    config: &CliConfig,
    service_id: &str,
    params: &LogParams<'_>,
) -> eyre::Result<()> {
    let client = config.http_client();
    let mut url = format!("{}/api/services/{}/logs", config.api_url, service_id);
    let mut query_params = Vec::new();

    if let Some(s) = params.since {
        query_params.push(format!("since={}", s));
    }
    if let Some(f) = params.from {
        query_params.push(format!("from={}", f));
    }
    if let Some(t) = params.to {
        query_params.push(format!("to={}", t));
    }
    if let Some(s) = params.search {
        query_params.push(format!("search={}", urlencoding::encode(s)));
    }
    if let Some(l) = params.level {
        query_params.push(format!("level={}", l));
    }
    if let Some(n) = params.limit {
        query_params.push(format!("limit={}", n));
    }

    if !query_params.is_empty() {
        url.push('?');
        url.push_str(&query_params.join("&"));
    }

    let resp = config
        .auth_request(&client, reqwest::Method::GET, &url)
        .send()
        .await?;

    if !resp.status().is_success() {
        let status = resp.status();
        let body = resp.text().await.unwrap_or_default();
        eyre::bail!("Failed to fetch logs ({}): {}", status, body);
    }

    let body: serde_json::Value = resp.json().await?;
    let logs = body["logs"].as_array();

    let mut file_output = params
        .output
        .map(|p| std::fs::File::create(p).expect("Failed to create output file"));

    let mut count = 0;
    if let Some(logs) = logs {
        for entry in logs {
            let line = entry["line"].as_str().unwrap_or("");
            let ts_nanos = entry["timestamp"].as_str().unwrap_or("0");
            let formatted = format_log_line(line, ts_nanos);
            println!("{}", formatted);

            if let Some(ref mut f) = file_output {
                use std::io::Write;
                writeln!(f, "{}", line).ok();
            }

            count += 1;
        }
    }

    if count == 0 {
        println!("No logs found.");
    } else if let Some(path) = params.output {
        println!("\n{} log lines saved to {}", count, path);
    }

    Ok(())
}

async fn tail_logs(
    config: &CliConfig,
    service_id: &str,
    search: Option<&str>,
    level: Option<&str>,
) -> eyre::Result<()> {
    let client = config.http_client();
    let mut url = format!("{}/api/services/{}/logs/tail", config.api_url, service_id);
    let mut params = Vec::new();

    if let Some(s) = search {
        params.push(format!("search={}", urlencoding::encode(s)));
    }
    if let Some(l) = level {
        params.push(format!("level={}", l));
    }

    if !params.is_empty() {
        url.push('?');
        url.push_str(&params.join("&"));
    }

    println!("Streaming logs (Ctrl+C to stop)...\n");

    let resp = config
        .auth_request(&client, reqwest::Method::GET, &url)
        .header("Accept", "text/event-stream")
        .send()
        .await?;

    if !resp.status().is_success() {
        let status = resp.status();
        let body = resp.text().await.unwrap_or_default();
        eyre::bail!("Failed to stream logs ({}): {}", status, body);
    }

    use futures_util::StreamExt;
    let mut stream = resp.bytes_stream();

    let mut buffer = String::new();
    while let Some(chunk) = stream.next().await {
        let chunk = chunk?;
        buffer.push_str(&String::from_utf8_lossy(&chunk));

        while let Some(pos) = buffer.find("\n\n") {
            let event = buffer[..pos].to_string();
            buffer = buffer[pos + 2..].to_string();

            if let Some(data) = event.strip_prefix("data:") {
                let data = data.trim();
                if let Ok(entry) = serde_json::from_str::<serde_json::Value>(data) {
                    let line = entry["line"].as_str().unwrap_or("");
                    let ts = entry["timestamp"].as_str().unwrap_or("0");
                    if !line.is_empty() {
                        println!("{}", format_log_line(line, ts));
                    }
                }
            }
        }
    }

    Ok(())
}

fn format_timestamp(ts_nanos: &str) -> String {
    ts_nanos
        .parse::<i64>()
        .ok()
        .and_then(|ns| {
            chrono::DateTime::from_timestamp(ns / 1_000_000_000, (ns % 1_000_000_000) as u32)
        })
        .map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
        .unwrap_or_default()
}

fn format_log_line(line: &str, ts_nanos: &str) -> String {
    if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(line) {
        let message = parsed["body"]
            .as_str()
            .or_else(|| parsed["message"].as_str())
            .or_else(|| parsed["msg"].as_str())
            .unwrap_or(line);

        let level = parsed["severity"]
            .as_str()
            .or_else(|| parsed["level"].as_str())
            .unwrap_or("INFO")
            .to_uppercase();

        let level_colored = match level.as_str() {
            "ERROR" => format!("\x1b[31m{}\x1b[0m", level),
            "WARN" | "WARNING" => format!("\x1b[33m{}\x1b[0m", level),
            "DEBUG" | "TRACE" => format!("\x1b[36m{}\x1b[0m", level),
            _ => format!("\x1b[32m{}\x1b[0m", level),
        };

        let attrs = &parsed["attributes"];
        let mut extra = String::new();
        for key in &["tenant", "service", "table", "operation"] {
            if let Some(val) = attrs[key].as_str() {
                if !extra.is_empty() {
                    extra.push(' ');
                }
                extra.push_str(&format!("{}={}", key, val));
            }
        }

        let scope = parsed["instrumentation_scope"]["name"]
            .as_str()
            .unwrap_or("");
        let target = if !scope.is_empty() {
            format!(" \x1b[90m{}\x1b[0m", scope)
        } else {
            String::new()
        };

        let ts = format_timestamp(ts_nanos);
        let ts_prefix = if ts.is_empty() {
            String::new()
        } else {
            format!("\x1b[90m{}\x1b[0m ", ts)
        };

        if extra.is_empty() {
            format!("{}[{}]{} {}", ts_prefix, level_colored, target, message)
        } else {
            format!(
                "{}[{}]{} {} \x1b[90m{}\x1b[0m",
                ts_prefix, level_colored, target, message, extra
            )
        }
    } else {
        line.to_string()
    }
}