use crate::config::CliConfig;
use crate::project_config::ProjectConfig;
pub struct LogParams<'a> {
pub follow: bool,
pub since: Option<&'a str>,
pub from: Option<&'a str>,
pub to: Option<&'a str>,
pub search: Option<&'a str>,
pub level: Option<&'a str>,
pub limit: Option<usize>,
pub output: Option<&'a str>,
}
fn resolve_service_name() -> eyre::Result<String> {
if let Some(project) = ProjectConfig::find_and_load()? {
if let Some(name) = project.service.name.as_deref() {
return Ok(name.to_string());
}
}
let output = std::process::Command::new("cargo")
.args(["run", "--", "--emit-manifest"])
.output()?;
if !output.status.success() {
eyre::bail!("Cannot determine service name. Run from a cufflink service directory.");
}
let stdout = String::from_utf8(output.stdout)?;
let manifest: serde_json::Value = serde_json::from_str(stdout.trim())?;
manifest["name"]
.as_str()
.map(String::from)
.ok_or_else(|| eyre::eyre!("No service name in manifest"))
}
pub async fn run(params: LogParams<'_>, env: Option<&str>) -> eyre::Result<()> {
let config = CliConfig::load_with_env(env)?;
if let Some(ref name) = config.env_name {
println!("Environment: {}", name);
}
let service_name = resolve_service_name()?;
let service_id = config.find_service_id(&service_name).await?;
if params.follow && params.since.is_none() && params.from.is_none() && params.to.is_none() {
return tail_logs(&config, &service_id, params.search, params.level).await;
}
query_logs(&config, &service_id, ¶ms).await
}
async fn query_logs(
config: &CliConfig,
service_id: &str,
params: &LogParams<'_>,
) -> eyre::Result<()> {
let client = config.http_client();
let mut url = format!("{}/api/services/{}/logs", config.api_url, service_id);
let mut query_params = Vec::new();
if let Some(s) = params.since {
query_params.push(format!("since={}", s));
}
if let Some(f) = params.from {
query_params.push(format!("from={}", f));
}
if let Some(t) = params.to {
query_params.push(format!("to={}", t));
}
if let Some(s) = params.search {
query_params.push(format!("search={}", urlencoding::encode(s)));
}
if let Some(l) = params.level {
query_params.push(format!("level={}", l));
}
if let Some(n) = params.limit {
query_params.push(format!("limit={}", n));
}
if !query_params.is_empty() {
url.push('?');
url.push_str(&query_params.join("&"));
}
let resp = config
.auth_request(&client, reqwest::Method::GET, &url)
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
eyre::bail!("Failed to fetch logs ({}): {}", status, body);
}
let body: serde_json::Value = resp.json().await?;
let logs = body["logs"].as_array();
let mut file_output = params
.output
.map(|p| std::fs::File::create(p).expect("Failed to create output file"));
let mut count = 0;
if let Some(logs) = logs {
for entry in logs {
let line = entry["line"].as_str().unwrap_or("");
let ts_nanos = entry["timestamp"].as_str().unwrap_or("0");
let formatted = format_log_line(line, ts_nanos);
println!("{}", formatted);
if let Some(ref mut f) = file_output {
use std::io::Write;
writeln!(f, "{}", line).ok();
}
count += 1;
}
}
if count == 0 {
println!("No logs found.");
} else if let Some(path) = params.output {
println!("\n{} log lines saved to {}", count, path);
}
Ok(())
}
async fn tail_logs(
config: &CliConfig,
service_id: &str,
search: Option<&str>,
level: Option<&str>,
) -> eyre::Result<()> {
let client = config.http_client();
let mut url = format!("{}/api/services/{}/logs/tail", config.api_url, service_id);
let mut params = Vec::new();
if let Some(s) = search {
params.push(format!("search={}", urlencoding::encode(s)));
}
if let Some(l) = level {
params.push(format!("level={}", l));
}
if !params.is_empty() {
url.push('?');
url.push_str(¶ms.join("&"));
}
println!("Streaming logs (Ctrl+C to stop)...\n");
let resp = config
.auth_request(&client, reqwest::Method::GET, &url)
.header("Accept", "text/event-stream")
.send()
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
eyre::bail!("Failed to stream logs ({}): {}", status, body);
}
use futures_util::StreamExt;
let mut stream = resp.bytes_stream();
let mut buffer = String::new();
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
while let Some(pos) = buffer.find("\n\n") {
let event = buffer[..pos].to_string();
buffer = buffer[pos + 2..].to_string();
if let Some(data) = event.strip_prefix("data:") {
let data = data.trim();
if let Ok(entry) = serde_json::from_str::<serde_json::Value>(data) {
let line = entry["line"].as_str().unwrap_or("");
let ts = entry["timestamp"].as_str().unwrap_or("0");
if !line.is_empty() {
println!("{}", format_log_line(line, ts));
}
}
}
}
}
Ok(())
}
fn format_timestamp(ts_nanos: &str) -> String {
ts_nanos
.parse::<i64>()
.ok()
.and_then(|ns| {
chrono::DateTime::from_timestamp(ns / 1_000_000_000, (ns % 1_000_000_000) as u32)
})
.map(|dt| dt.format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_default()
}
fn format_log_line(line: &str, ts_nanos: &str) -> String {
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(line) {
let message = parsed["body"]
.as_str()
.or_else(|| parsed["message"].as_str())
.or_else(|| parsed["msg"].as_str())
.unwrap_or(line);
let level = parsed["severity"]
.as_str()
.or_else(|| parsed["level"].as_str())
.unwrap_or("INFO")
.to_uppercase();
let level_colored = match level.as_str() {
"ERROR" => format!("\x1b[31m{}\x1b[0m", level),
"WARN" | "WARNING" => format!("\x1b[33m{}\x1b[0m", level),
"DEBUG" | "TRACE" => format!("\x1b[36m{}\x1b[0m", level),
_ => format!("\x1b[32m{}\x1b[0m", level),
};
let attrs = &parsed["attributes"];
let mut extra = String::new();
for key in &["tenant", "service", "table", "operation"] {
if let Some(val) = attrs[key].as_str() {
if !extra.is_empty() {
extra.push(' ');
}
extra.push_str(&format!("{}={}", key, val));
}
}
let scope = parsed["instrumentation_scope"]["name"]
.as_str()
.unwrap_or("");
let target = if !scope.is_empty() {
format!(" \x1b[90m{}\x1b[0m", scope)
} else {
String::new()
};
let ts = format_timestamp(ts_nanos);
let ts_prefix = if ts.is_empty() {
String::new()
} else {
format!("\x1b[90m{}\x1b[0m ", ts)
};
if extra.is_empty() {
format!("{}[{}]{} {}", ts_prefix, level_colored, target, message)
} else {
format!(
"{}[{}]{} {} \x1b[90m{}\x1b[0m",
ts_prefix, level_colored, target, message, extra
)
}
} else {
line.to_string()
}
}