use crate::daemon_id::DaemonId;
use crate::log_store::sqlite::LOG_STORE;
use crate::log_store::{LogQuery, LogStore};
use crate::pitchfork_toml::PitchforkToml;
use crate::state_file::StateFile;
use crate::ui::style::{edim, estyle, ndim};
use crate::{Result, env};
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone};
use console;
use itertools::Itertools;
use miette::IntoDiagnostic;
use std::collections::BTreeSet;
use std::io::{self, IsTerminal, Write};
use std::process::{Child, Command, Stdio};
use std::time::Duration;
struct PagerConfig {
command: String,
args: Vec<String>,
}
impl PagerConfig {
fn new(start_at_end: bool) -> Self {
let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
let args = Self::build_args(&command, start_at_end);
Self { command, args }
}
fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
let mut args = vec![];
if pager == "less" {
args.push("-R".to_string());
if start_at_end {
args.push("+G".to_string());
}
}
args
}
fn spawn_piped(&self) -> io::Result<Child> {
Command::new(&self.command)
.args(&self.args)
.stdin(Stdio::piped())
.spawn()
}
}
fn format_log_line(
date: &str,
id: &str,
msg: &str,
single_daemon: bool,
id_width: usize,
strip_ansi: bool,
) -> String {
let msg = if strip_ansi {
console::strip_ansi_codes(msg).to_string()
} else {
msg.to_string()
};
if single_daemon {
format!("{} {}", ndim(date), msg)
} else {
let colors_on = !strip_ansi && console::colors_enabled();
let colored = dimmed_id(id, colors_on);
let padded = console::pad_str(&colored, id_width, console::Alignment::Left, None);
format!("{} {} {}", padded, ndim(date), msg)
}
}
fn dimmed_id(id: &str, colors_enabled: bool) -> String {
if !colors_enabled {
return id.to_string();
}
let colors = [
(180, 120, 120), (180, 160, 100), (120, 180, 120), (120, 180, 180), (180, 120, 180), (120, 160, 180), ];
let mut h: usize = 0x811C_9DC5; for b in id.bytes() {
h = h.wrapping_mul(0x0100_0193).wrapping_add(b as usize);
}
let (r, g, b) = colors[h % colors.len()];
format!("\x1b[2;38;2;{};{};{}m{}\x1b[0m", r, g, b, id)
}
pub fn colored_id_label(id: &str, colors_enabled: bool) -> String {
if !colors_enabled {
return format!("[{}]", id);
}
let colors: [u8; 4] = [34, 35, 36, 32]; let mut h: usize = 0x811C_9DC5; for b in id.bytes() {
h = h.wrapping_mul(0x0100_0193).wrapping_add(b as usize);
}
let color = colors[h % colors.len()];
format!("\x1b[{color}m[{id}]\x1b[0m")
}
#[derive(Debug, clap::Args)]
#[clap(
visible_alias = "l",
verbatim_doc_comment,
long_about = "\
Displays logs for daemon(s)
Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
and include timestamps for filtering.
Examples:
pitchfork logs api Show all logs for 'api' (paged if needed)
pitchfork logs api worker Show logs for multiple daemons
pitchfork logs Show logs for all daemons
pitchfork logs api -n 50 Show last 50 lines
pitchfork logs api --follow Follow logs in real-time
pitchfork logs api --since '2024-01-15 10:00:00'
Show logs since a specific time (forward)
pitchfork logs api --since '10:30:00'
Show logs since 10:30:00 today
pitchfork logs api --since '10:30' --until '12:00'
Show logs since 10:30:00 until 12:00:00 today
pitchfork logs api --since 5min Show logs from last 5 minutes
pitchfork logs api --raw Output raw log lines without formatting
pitchfork logs api --raw -n 100 Output last 100 raw log lines
pitchfork logs api --clear Delete logs for 'api'
pitchfork logs --clear Delete logs for all daemons"
)]
pub struct Logs {
id: Vec<String>,
#[clap(short, long)]
clear: bool,
#[clap(short)]
n: Option<usize>,
#[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
tail: bool,
#[clap(short = 's', long)]
since: Option<String>,
#[clap(short = 'u', long)]
until: Option<String>,
#[clap(long)]
no_pager: bool,
#[clap(long)]
raw: bool,
}
impl Logs {
pub async fn run(&self) -> Result<()> {
migrate_legacy_log_dirs();
let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
get_all_daemon_ids()?
} else {
PitchforkToml::resolve_ids(&self.id)?
};
if self.clear {
LOG_STORE.clear(&resolved_ids)?;
return Ok(());
}
let from = if let Some(since) = self.since.as_ref() {
Some(parse_time_input(since, true)?)
} else {
None
};
let to = if let Some(until) = self.until.as_ref() {
Some(parse_time_input(until, false)?)
} else {
None
};
let single_daemon = resolved_ids.len() == 1;
self.print_existing_logs(&resolved_ids, from, to, single_daemon, self.tail)?;
if self.tail {
tail_logs(&resolved_ids, single_daemon, true).await?;
}
Ok(())
}
fn print_existing_logs(
&self,
resolved_ids: &[DaemonId],
from: Option<DateTime<Local>>,
to: Option<DateTime<Local>>,
single_daemon: bool,
force_no_pager: bool,
) -> Result<()> {
let daemon_ids: Vec<String> = resolved_ids.iter().map(|id| id.qualified()).collect();
let id_width = daemon_ids.iter().map(|id| id.len()).max().unwrap_or(0);
trace!("log query for: {}", daemon_ids.iter().join(", "));
let has_time_filter = from.is_some() || to.is_some();
let opts = LogQuery {
daemon_ids: daemon_ids.clone(),
from,
to,
limit: if !has_time_filter { self.n } else { None },
order_desc: !has_time_filter,
after_id: None,
};
let entries = LOG_STORE.query(&opts)?;
let log_lines: Vec<(String, String, String)> = entries
.into_iter()
.map(|e| {
let ts = e.timestamp.format("%Y-%m-%d %H:%M:%S").to_string();
(ts, e.daemon_id, e.message)
})
.collect();
let log_lines = if has_time_filter {
if let Some(n) = self.n {
let len = log_lines.len();
if len > n {
log_lines.into_iter().skip(len - n).collect_vec()
} else {
log_lines
}
} else {
log_lines
}
} else if let Some(n) = self.n {
let len = log_lines.len();
if len > n {
log_lines.into_iter().skip(len - n).rev().collect_vec()
} else {
log_lines.into_iter().rev().collect_vec()
}
} else {
log_lines.into_iter().rev().collect_vec()
};
self.output_logs(
log_lines,
single_daemon,
id_width,
has_time_filter,
self.raw,
force_no_pager,
)?;
Ok(())
}
fn output_logs(
&self,
log_lines: Vec<(String, String, String)>,
single_daemon: bool,
id_width: usize,
has_time_filter: bool,
raw: bool,
force_no_pager: bool,
) -> Result<()> {
if log_lines.is_empty() {
return Ok(());
}
let strip_ansi = raw || !console::colors_enabled();
if raw {
for (date, id, msg) in log_lines {
let line = format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi);
println!("{line}");
}
return Ok(());
}
let use_pager = !force_no_pager && !self.no_pager && should_use_pager(log_lines.len());
if use_pager {
self.output_with_pager(
log_lines,
single_daemon,
id_width,
has_time_filter,
strip_ansi,
)?;
} else {
for (date, id, msg) in log_lines {
println!(
"{}",
format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
);
}
}
Ok(())
}
fn output_with_pager(
&self,
log_lines: Vec<(String, String, String)>,
single_daemon: bool,
id_width: usize,
has_time_filter: bool,
strip_ansi: bool,
) -> Result<()> {
let pager_config = PagerConfig::new(!has_time_filter);
match pager_config.spawn_piped() {
Ok(mut child) => {
if let Some(stdin) = child.stdin.as_mut() {
for (date, id, msg) in log_lines {
let line = format!(
"{}\n",
format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
);
if stdin.write_all(line.as_bytes()).is_err() {
break;
}
}
let _ = child.wait();
} else {
debug!("Failed to get pager stdin, falling back to direct output");
for (date, id, msg) in log_lines {
println!(
"{}",
format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
);
}
}
}
Err(e) => {
debug!("Failed to spawn pager: {e}, falling back to direct output");
for (date, id, msg) in log_lines {
println!(
"{}",
format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
);
}
}
}
Ok(())
}
}
fn should_use_pager(line_count: usize) -> bool {
if !io::stdout().is_terminal() {
return false;
}
let terminal_height = get_terminal_height().unwrap_or(24);
line_count > terminal_height
}
fn get_terminal_height() -> Option<usize> {
if let Ok(rows) = std::env::var("LINES")
&& let Ok(h) = rows.parse::<usize>()
{
return Some(h);
}
crossterm::terminal::size().ok().map(|(_, h)| h as usize)
}
fn migrate_legacy_log_dirs() {
let known_safe_paths = known_daemon_safe_paths();
let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
Ok(d) => d,
Err(_) => return,
};
for dir in dirs {
if dir.starts_with(".") || !dir.is_dir() {
continue;
}
let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
Some(n) => n,
None => continue,
};
if name == "pitchfork" {
continue;
}
if name.contains("--") {
if DaemonId::from_safe_path(&name).is_ok() {
continue;
}
if known_safe_paths.contains(&name) {
continue;
}
warn!(
"Skipping invalid legacy log directory '{name}': contains '--' but is not a valid daemon safe-path"
);
continue;
}
let old_log = dir.join(format!("{name}.log"));
if !old_log.exists() {
continue;
}
if DaemonId::try_new("legacy", &name).is_err() {
warn!("Skipping invalid legacy log directory '{name}': not a valid daemon ID");
continue;
}
let new_name = format!("legacy--{name}");
let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
if new_dir.exists() {
continue;
}
if std::fs::rename(&dir, &new_dir).is_err() {
continue;
}
let old_log = new_dir.join(format!("{name}.log"));
let new_log = new_dir.join(format!("{new_name}.log"));
if old_log.exists() {
let _ = std::fs::rename(&old_log, &new_log);
}
debug!("Migrated legacy log dir '{name}' → '{new_name}'");
}
}
fn known_daemon_safe_paths() -> BTreeSet<String> {
let mut out = BTreeSet::new();
match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
Ok(state) => {
for id in state.daemons.keys() {
out.insert(id.safe_path());
}
}
Err(e) => {
warn!("Failed to read state while checking known daemon IDs: {e}");
}
}
match PitchforkToml::all_merged() {
Ok(config) => {
for id in config.daemons.keys() {
out.insert(id.safe_path());
}
}
Err(e) => {
warn!("Failed to read config while checking known daemon IDs: {e}");
}
}
out
}
fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
let mut ids = BTreeSet::new();
match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
Ok(state) => ids.extend(state.daemons.keys().cloned()),
Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
}
match PitchforkToml::all_merged() {
Ok(config) => ids.extend(config.daemons.keys().cloned()),
Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
}
let logged_ids: std::collections::HashSet<String> =
LOG_STORE.list_daemon_ids()?.into_iter().collect();
Ok(ids
.into_iter()
.filter(|id| logged_ids.contains(&id.qualified()))
.collect())
}
pub async fn tail_logs(
names: &[DaemonId],
single_daemon: bool,
start_from_end: bool,
) -> Result<()> {
let id_width = names
.iter()
.map(|id| id.qualified().len())
.max()
.unwrap_or(0);
let strip_ansi = !console::colors_enabled();
let mut states: std::collections::HashMap<String, i64> = names
.iter()
.map(|id| {
let since = if start_from_end {
match LOG_STORE.query(&LogQuery {
daemon_ids: vec![id.qualified()],
from: None,
to: None,
limit: Some(1),
order_desc: true,
after_id: None,
}) {
Ok(entries) => entries.first().map(|e| e.id).unwrap_or(0),
Err(_) => 0,
}
} else {
0
};
(id.qualified(), since)
})
.collect();
let interval = tokio::time::interval(Duration::from_millis(200));
tokio::pin!(interval);
loop {
interval.tick().await;
let mut out = vec![];
for id in names {
let after_id = states.get(&id.qualified()).copied();
match LOG_STORE.tail(id, after_id) {
Ok(entries) => {
for entry in &entries {
let ts = entry.timestamp.format("%Y-%m-%d %H:%M:%S").to_string();
out.push((ts, entry.daemon_id.clone(), entry.message.clone()));
}
if let Some(last) = entries.last() {
states.insert(id.qualified(), last.id);
}
}
Err(e) => {
error!("Failed to tail logs for {}: {e}", id.qualified());
}
}
}
if !out.is_empty() {
let out = out
.into_iter()
.sorted_by(|a, b| (&a.0, &a.1).cmp(&(&b.0, &b.1)))
.collect_vec();
for (date, name, msg) in out {
println!(
"{}",
format_log_line(&date, &name, &msg, single_daemon, id_width, strip_ansi)
);
}
}
}
}
fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
Local
.from_local_datetime(&naive_dt)
.single()
.ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
}
fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
let s = s.trim();
if let Ok(dt) = parse_datetime(s) {
return Ok(dt);
}
if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
return Local
.from_local_datetime(&naive_dt)
.single()
.ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
}
if let Ok(time) = parse_time_only(s) {
let now = Local::now();
let today = now.date_naive();
let mut naive_dt = NaiveDateTime::new(today, time);
let mut dt = Local
.from_local_datetime(&naive_dt)
.single()
.ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
if is_since
&& dt > now
&& let Some(yesterday) = today.pred_opt()
{
naive_dt = NaiveDateTime::new(yesterday, time);
dt = Local
.from_local_datetime(&naive_dt)
.single()
.ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
}
return Ok(dt);
}
if let Ok(duration) = humantime::parse_duration(s) {
let now = Local::now();
let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
return Ok(target);
}
Err(miette::miette!(
"Invalid time format: '{}'. Expected formats:\n\
- Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
- Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
- Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
s
))
}
fn parse_time_only(s: &str) -> Result<NaiveTime> {
if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
return Ok(time);
}
if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
return Ok(time);
}
Err(miette::miette!("Invalid time format: '{}'", s))
}
pub fn print_error_logs_block(log_lines: &[(String, String, String)]) {
if log_lines.is_empty() {
return;
}
let is_tty = std::io::stderr().is_terminal();
let format_msg = |msg: &str| -> String {
let stripped = strip_pty_controls(msg);
if is_tty {
stripped
} else {
console::strip_ansi_codes(&stripped).to_string()
}
};
let tag = estyle(" ERROR LOGS ").white().on_red();
eprintln!("\n{tag}");
let unique_ids: BTreeSet<&str> = log_lines.iter().map(|(_, id, _)| id.as_str()).collect();
let show_id = unique_ids.len() > 1;
if show_id {
let id_width = log_lines
.iter()
.map(|(_, id, _)| console::measure_text_width(id))
.max()
.unwrap_or(0);
for (date, id, msg) in log_lines {
let time = date.split(' ').nth(1).unwrap_or(date);
let colored = dimmed_id(id, is_tty && console::colors_enabled_stderr());
let padded = console::pad_str(&colored, id_width, console::Alignment::Left, None);
eprintln!(
"{} {} {}",
padded,
estyle(time).red().dim(),
format_msg(msg)
);
}
} else {
for (date, _, msg) in log_lines {
let time = date.split(' ').nth(1).unwrap_or(date);
eprintln!("{} {}", estyle(time).red().dim(), format_msg(msg));
}
}
}
pub enum ReadyCheckType {
Output(String),
Http(String),
Port(u16),
Cmd(String),
Delay(u64),
Default,
}
impl std::fmt::Display for ReadyCheckType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ReadyCheckType::Output(pattern) => write!(f, "output matching '{pattern}'"),
ReadyCheckType::Http(url) => write!(f, "HTTP {url}"),
ReadyCheckType::Port(port) => write!(f, "TCP port {port}"),
ReadyCheckType::Cmd(cmd) => write!(f, "command '{cmd}'"),
ReadyCheckType::Delay(secs) => write!(f, "delay ({secs}s)"),
ReadyCheckType::Default => write!(f, "default readiness check"),
}
}
}
pub fn create_ready_check_job(
daemon_id: &DaemonId,
check_type: &ReadyCheckType,
) -> std::sync::Arc<clx::progress::ProgressJob> {
use clx::progress::{ProgressJobBuilder, ProgressJobDoneBehavior, ProgressStatus};
let is_tty = std::io::stderr().is_terminal();
let colors_enabled = is_tty && console::colors_enabled_stderr();
let id_label = colored_id_label(&daemon_id.qualified(), colors_enabled);
let show_ts = crate::settings::settings().general.startup_log_timestamps;
let prefix = if show_ts {
edim(chrono::Local::now().format("%H:%M:%S").to_string()).to_string()
} else {
"{{spinner()}}".to_string()
};
ProgressJobBuilder::new()
.body(format!(
"{} {} waiting for {{{{ check_type }}}}...",
prefix, id_label
))
.prop("check_type", &check_type.to_string())
.status(ProgressStatus::Running)
.on_done(ProgressJobDoneBehavior::Keep)
.start()
}
pub fn collect_startup_logs(
daemon_id: &DaemonId,
from: DateTime<Local>,
) -> Result<Vec<(String, String, String)>> {
let entries = LOG_STORE.query(&LogQuery {
daemon_ids: vec![daemon_id.qualified()],
from: Some(from),
to: None,
limit: None,
order_desc: false,
after_id: None,
})?;
let log_lines = entries
.into_iter()
.map(|e| {
let ts = e.timestamp.format("%Y-%m-%d %H:%M:%S").to_string();
(ts, e.daemon_id, e.message)
})
.collect();
Ok(log_lines)
}
pub fn stream_startup_logs(
daemon_id: &DaemonId,
from: DateTime<Local>,
job: std::sync::Arc<clx::progress::ProgressJob>,
) -> (
tokio::sync::watch::Sender<bool>,
tokio::task::JoinHandle<()>,
) {
let (tx, mut rx) = tokio::sync::watch::channel(false);
let id = daemon_id.clone();
let show_ts = crate::settings::settings().general.startup_log_timestamps;
let handle = tokio::spawn(async move {
let is_tty = std::io::stderr().is_terminal();
let colors_enabled = is_tty && console::colors_enabled_stderr();
let id_label = colored_id_label(&id.qualified(), colors_enabled);
let prefix = if show_ts {
String::new()
} else {
edim("•").to_string()
};
let mut last_id: i64 = 0;
let initial_entries = LOG_STORE.query(&LogQuery {
daemon_ids: vec![id.qualified()],
from: Some(from),
to: None,
limit: None,
order_desc: false,
after_id: None,
});
if let Ok(entries) = initial_entries {
for entry in &entries {
let time = entry.timestamp.format("%H:%M:%S").to_string();
let msg = strip_pty_controls(&entry.message);
let msg = if is_tty {
msg
} else {
console::strip_ansi_codes(&msg).to_string()
};
let line_prefix = if show_ts {
edim(time).to_string()
} else {
prefix.clone()
};
job.println(&format!("{} {} {}", line_prefix, id_label, msg));
}
if let Some(last) = entries.last() {
last_id = last.id;
}
}
loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(200)) => {
if let Ok(entries) = LOG_STORE.tail(&id, Some(last_id)) {
for entry in &entries {
let time = entry.timestamp.format("%H:%M:%S").to_string();
let msg = strip_pty_controls(&entry.message);
let msg = if is_tty {
msg
} else {
console::strip_ansi_codes(&msg).to_string()
};
let line_prefix = if show_ts {
edim(time).to_string()
} else {
prefix.clone()
};
job.println(&format!("{} {} {}", line_prefix, id_label, msg));
}
if let Some(last) = entries.last() {
last_id = last.id;
}
}
}
_ = rx.changed() => {
break;
}
}
}
if let Ok(entries) = LOG_STORE.tail(&id, Some(last_id)) {
for entry in &entries {
let time = entry.timestamp.format("%H:%M:%S").to_string();
let msg = strip_pty_controls(&entry.message);
let msg = if is_tty {
msg
} else {
console::strip_ansi_codes(&msg).to_string()
};
let line_prefix = if show_ts {
edim(time).to_string()
} else {
prefix.clone()
};
job.println(&format!("{} {} {}", line_prefix, id_label, msg));
}
}
});
(tx, handle)
}
fn strip_pty_controls(s: &str) -> String {
struct Stripper {
result: String,
}
impl vte::Perform for Stripper {
fn print(&mut self, c: char) {
self.result.push(c);
}
fn execute(&mut self, byte: u8) {
if byte == b'\n' || byte == b'\t' {
self.result.push(byte as char);
}
}
fn csi_dispatch(
&mut self,
params: &vte::Params,
_intermediates: &[u8],
_ignore: bool,
action: char,
) {
if action == 'm' {
self.result.push_str("\x1b[");
let mut first = true;
for sub in params.iter() {
if !first {
self.result.push(';');
}
first = false;
for (i, &p) in sub.iter().enumerate() {
if i > 0 {
self.result.push(':');
}
self.result.push_str(&p.to_string());
}
}
self.result.push('m');
}
}
fn osc_dispatch(&mut self, _params: &[&[u8]], _bell_terminated: bool) {
}
fn esc_dispatch(&mut self, _intermediates: &[u8], _ignore: bool, _byte: u8) {
}
fn hook(
&mut self,
_params: &vte::Params,
_intermediates: &[u8],
_ignore: bool,
_action: char,
) {
}
fn put(&mut self, _byte: u8) {
}
fn unhook(&mut self) {
}
}
let mut parser = vte::Parser::new();
let mut stripper = Stripper {
result: String::with_capacity(s.len()),
};
parser.advance(&mut stripper, s.as_bytes());
stripper.result
}