use crate::daemon_id::DaemonId;
use crate::pitchfork_toml::PitchforkToml;
use crate::state_file::StateFile;
use crate::ui::style::edim;
use crate::watch_files::WatchFiles;
use crate::{Result, env};
use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
use itertools::Itertools;
use miette::IntoDiagnostic;
use notify::RecursiveMode;
use std::cmp::{Ordering, Reverse};
use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap};
use std::fs::{self, File};
use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::time::Duration;
use xx::regex;
struct PagerConfig {
command: String,
args: Vec<String>,
}
impl PagerConfig {
fn new(start_at_end: bool) -> Self {
let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
let args = Self::build_args(&command, start_at_end);
Self { command, args }
}
fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
let mut args = vec![];
if pager == "less" {
args.push("-R".to_string());
if start_at_end {
args.push("+G".to_string());
}
}
args
}
fn spawn_piped(&self) -> io::Result<Child> {
Command::new(&self.command)
.args(&self.args)
.stdin(Stdio::piped())
.spawn()
}
}
fn format_log_line(date: &str, id: &str, msg: &str, single_daemon: bool) -> String {
if single_daemon {
format!("{} {}", edim(date), msg)
} else {
format!("{} {} {}", edim(date), id, msg)
}
}
#[derive(Debug)]
struct LogEntry {
timestamp: String,
daemon: String,
message: String,
source_idx: usize, }
impl PartialEq for LogEntry {
fn eq(&self, other: &Self) -> bool {
self.timestamp == other.timestamp
}
}
impl Eq for LogEntry {}
impl PartialOrd for LogEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for LogEntry {
fn cmp(&self, other: &Self) -> Ordering {
self.timestamp.cmp(&other.timestamp)
}
}
struct StreamingMerger<I>
where
I: Iterator<Item = (String, String)>,
{
sources: Vec<(String, I)>, heap: BinaryHeap<Reverse<LogEntry>>, }
impl<I> StreamingMerger<I>
where
I: Iterator<Item = (String, String)>,
{
fn new() -> Self {
Self {
sources: Vec::new(),
heap: BinaryHeap::new(),
}
}
fn add_source(&mut self, daemon_name: String, iter: I) {
self.sources.push((daemon_name, iter));
}
fn initialize(&mut self) {
for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
if let Some((timestamp, message)) = iter.next() {
self.heap.push(Reverse(LogEntry {
timestamp,
daemon: daemon.clone(),
message,
source_idx: idx,
}));
}
}
}
}
impl<I> Iterator for StreamingMerger<I>
where
I: Iterator<Item = (String, String)>,
{
type Item = (String, String, String);
fn next(&mut self) -> Option<Self::Item> {
let Reverse(entry) = self.heap.pop()?;
let (daemon, iter) = &mut self.sources[entry.source_idx];
if let Some((timestamp, message)) = iter.next() {
self.heap.push(Reverse(LogEntry {
timestamp,
daemon: daemon.clone(),
message,
source_idx: entry.source_idx,
}));
}
Some((entry.timestamp, entry.daemon, entry.message))
}
}
struct StreamingLogParser {
reader: BufReader<File>,
current_entry: Option<(String, String)>,
finished: bool,
}
impl StreamingLogParser {
fn new(file: File) -> Self {
Self {
reader: BufReader::new(file),
current_entry: None,
finished: false,
}
}
}
impl Iterator for StreamingLogParser {
type Item = (String, String);
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
loop {
let mut line = String::new();
match self.reader.read_line(&mut line) {
Ok(0) => {
self.finished = true;
return self.current_entry.take();
}
Ok(_) => {
if line.ends_with('\n') {
line.pop();
if line.ends_with('\r') {
line.pop();
}
}
if let Some(caps) = re.captures(&line) {
let date = match caps.get(1) {
Some(d) => d.as_str().to_string(),
None => continue,
};
let msg = match caps.get(3) {
Some(m) => m.as_str().to_string(),
None => continue,
};
let prev = self.current_entry.take();
self.current_entry = Some((date, msg));
if prev.is_some() {
return prev;
}
} else {
if let Some((_, ref mut msg)) = self.current_entry {
msg.push('\n');
msg.push_str(&line);
}
}
}
Err(_) => {
self.finished = true;
return self.current_entry.take();
}
}
}
}
}
#[derive(Debug, clap::Args)]
#[clap(
visible_alias = "l",
verbatim_doc_comment,
long_about = "\
Displays logs for daemon(s)
Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
and include timestamps for filtering.
Examples:
pitchfork logs api Show all logs for 'api' (paged if needed)
pitchfork logs api worker Show logs for multiple daemons
pitchfork logs Show logs for all daemons
pitchfork logs api -n 50 Show last 50 lines
pitchfork logs api --follow Follow logs in real-time
pitchfork logs api --since '2024-01-15 10:00:00'
Show logs since a specific time (forward)
pitchfork logs api --since '10:30:00'
Show logs since 10:30:00 today
pitchfork logs api --since '10:30' --until '12:00'
Show logs since 10:30:00 until 12:00:00 today
pitchfork logs api --since 5min Show logs from last 5 minutes
pitchfork logs api --raw Output raw log lines without formatting
pitchfork logs api --raw -n 100 Output last 100 raw log lines
pitchfork logs api --clear Delete logs for 'api'
pitchfork logs --clear Delete logs for all daemons"
)]
pub struct Logs {
id: Vec<String>,
#[clap(short, long)]
clear: bool,
#[clap(short)]
n: Option<usize>,
#[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
tail: bool,
#[clap(short = 's', long)]
since: Option<String>,
#[clap(short = 'u', long)]
until: Option<String>,
#[clap(long)]
no_pager: bool,
#[clap(long)]
raw: bool,
}
impl Logs {
pub async fn run(&self) -> Result<()> {
migrate_legacy_log_dirs();
let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
get_all_daemon_ids()?
} else {
PitchforkToml::resolve_ids(&self.id)?
};
if self.clear {
for id in &resolved_ids {
let path = id.log_path();
if path.exists() {
xx::file::create(&path)?;
}
}
return Ok(());
}
let from = if let Some(since) = self.since.as_ref() {
Some(parse_time_input(since, true)?)
} else {
None
};
let to = if let Some(until) = self.until.as_ref() {
Some(parse_time_input(until, false)?)
} else {
None
};
self.print_existing_logs(&resolved_ids, from, to)?;
if self.tail {
tail_logs(&resolved_ids).await?;
}
Ok(())
}
fn print_existing_logs(
&self,
resolved_ids: &[DaemonId],
from: Option<DateTime<Local>>,
to: Option<DateTime<Local>>,
) -> Result<()> {
let log_files = get_log_file_infos(resolved_ids)?;
trace!("log files for: {}", log_files.keys().join(", "));
let single_daemon = resolved_ids.len() == 1;
let has_time_filter = from.is_some() || to.is_some();
if has_time_filter {
let mut log_lines = self.collect_log_lines_forward(&log_files, from, to)?;
if let Some(n) = self.n {
let len = log_lines.len();
if len > n {
log_lines = log_lines.into_iter().skip(len - n).collect_vec();
}
}
self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
} else if let Some(n) = self.n {
let log_lines = self.collect_log_lines_reverse(&log_files, Some(n))?;
self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
} else {
self.stream_logs_to_pager(&log_files, single_daemon, self.raw)?;
}
Ok(())
}
fn collect_log_lines_forward(
&self,
log_files: &BTreeMap<DaemonId, LogFile>,
from: Option<DateTime<Local>>,
to: Option<DateTime<Local>>,
) -> Result<Vec<(String, String, String)>> {
let log_lines: Vec<(String, String, String)> = log_files
.iter()
.flat_map(
|(name, lf)| match read_lines_in_time_range(&lf.path, from, to) {
Ok(lines) => merge_log_lines(&name.qualified(), lines, false),
Err(e) => {
error!("{}: {}", lf.path.display(), e);
vec![]
}
},
)
.sorted_by_cached_key(|l| l.0.to_string())
.collect_vec();
Ok(log_lines)
}
fn collect_log_lines_reverse(
&self,
log_files: &BTreeMap<DaemonId, LogFile>,
limit: Option<usize>,
) -> Result<Vec<(String, String, String)>> {
let log_lines: Vec<(String, String, String)> = log_files
.iter()
.flat_map(|(daemon_id, lf)| {
let rev = match xx::file::open(&lf.path) {
Ok(f) => rev_lines::RevLines::new(f),
Err(e) => {
error!("{}: {}", lf.path.display(), e);
return vec![];
}
};
let lines = rev.into_iter().filter_map(Result::ok);
let lines = match limit {
Some(n) => lines.take(n).collect_vec(),
None => lines.collect_vec(),
};
merge_log_lines(&daemon_id.qualified(), lines, true)
})
.sorted_by_cached_key(|l| l.0.to_string())
.collect_vec();
let log_lines = match limit {
Some(n) => {
let len = log_lines.len();
if len > n {
log_lines.into_iter().skip(len - n).collect_vec()
} else {
log_lines
}
}
None => log_lines,
};
Ok(log_lines)
}
fn output_logs(
&self,
log_lines: Vec<(String, String, String)>,
single_daemon: bool,
has_time_filter: bool,
raw: bool,
) -> Result<()> {
if log_lines.is_empty() {
return Ok(());
}
if raw {
for (date, id, msg) in log_lines {
if single_daemon {
println!("{date} {msg}");
} else {
println!("{date} {id} {msg}");
}
}
return Ok(());
}
let use_pager = !self.no_pager && should_use_pager(log_lines.len());
if use_pager {
self.output_with_pager(log_lines, single_daemon, has_time_filter)?;
} else {
for (date, id, msg) in log_lines {
println!("{}", format_log_line(&date, &id, &msg, single_daemon));
}
}
Ok(())
}
fn output_with_pager(
&self,
log_lines: Vec<(String, String, String)>,
single_daemon: bool,
has_time_filter: bool,
) -> Result<()> {
let pager_config = PagerConfig::new(!has_time_filter);
match pager_config.spawn_piped() {
Ok(mut child) => {
if let Some(stdin) = child.stdin.as_mut() {
for (date, id, msg) in log_lines {
let line =
format!("{}\n", format_log_line(&date, &id, &msg, single_daemon));
if stdin.write_all(line.as_bytes()).is_err() {
break;
}
}
let _ = child.wait();
} else {
debug!("Failed to get pager stdin, falling back to direct output");
for (date, id, msg) in log_lines {
println!("{}", format_log_line(&date, &id, &msg, single_daemon));
}
}
}
Err(e) => {
debug!("Failed to spawn pager: {e}, falling back to direct output");
for (date, id, msg) in log_lines {
println!("{}", format_log_line(&date, &id, &msg, single_daemon));
}
}
}
Ok(())
}
fn stream_logs_to_pager(
&self,
log_files: &BTreeMap<DaemonId, LogFile>,
single_daemon: bool,
raw: bool,
) -> Result<()> {
if !io::stdout().is_terminal() || self.no_pager || self.tail || raw {
return self.stream_logs_direct(log_files, single_daemon, raw);
}
let pager_config = PagerConfig::new(true);
match pager_config.spawn_piped() {
Ok(mut child) => {
if let Some(stdin) = child.stdin.take() {
let log_files_clone: Vec<_> = log_files
.iter()
.map(|(daemon_id, lf)| (daemon_id.qualified(), lf.path.clone()))
.collect();
let single_daemon_clone = single_daemon;
std::thread::spawn(move || {
let mut writer = BufWriter::new(stdin);
if log_files_clone.len() == 1 {
let (name, path) = &log_files_clone[0];
let file = match File::open(path) {
Ok(f) => f,
Err(_) => return,
};
let parser = StreamingLogParser::new(file);
for (timestamp, message) in parser {
let output = format!(
"{}\n",
format_log_line(
×tamp,
name,
&message,
single_daemon_clone
)
);
if writer.write_all(output.as_bytes()).is_err() {
return;
}
}
let _ = writer.flush();
return;
}
let mut merger: StreamingMerger<StreamingLogParser> =
StreamingMerger::new();
for (name, path) in log_files_clone {
let file = match File::open(&path) {
Ok(f) => f,
Err(_) => continue,
};
let parser = StreamingLogParser::new(file);
merger.add_source(name, parser);
}
merger.initialize();
for (timestamp, daemon, message) in merger {
let output = format!(
"{}\n",
format_log_line(×tamp, &daemon, &message, single_daemon_clone)
);
if writer.write_all(output.as_bytes()).is_err() {
return;
}
}
let _ = writer.flush();
});
let _ = child.wait();
} else {
debug!("Failed to get pager stdin, falling back to direct output");
return self.stream_logs_direct(log_files, single_daemon, raw);
}
}
Err(e) => {
debug!("Failed to spawn pager: {e}, falling back to direct output");
return self.stream_logs_direct(log_files, single_daemon, raw);
}
}
Ok(())
}
fn stream_logs_direct(
&self,
log_files: &BTreeMap<DaemonId, LogFile>,
single_daemon: bool,
raw: bool,
) -> Result<()> {
if log_files.len() == 1 {
let (daemon_id, lf) = log_files.iter().next().unwrap();
let file = match File::open(&lf.path) {
Ok(f) => f,
Err(e) => {
error!("{}: {}", lf.path.display(), e);
return Ok(());
}
};
let reader = BufReader::new(file);
if raw {
for line in reader.lines() {
match line {
Ok(l) => {
if io::stdout().write_all(l.as_bytes()).is_err()
|| io::stdout().write_all(b"\n").is_err()
{
return Ok(());
}
}
Err(_) => continue,
}
}
} else {
let parser = StreamingLogParser::new(File::open(&lf.path).into_diagnostic()?);
for (timestamp, message) in parser {
let output = format!(
"{}\n",
format_log_line(
×tamp,
&daemon_id.qualified(),
&message,
single_daemon
)
);
if io::stdout().write_all(output.as_bytes()).is_err() {
return Ok(());
}
}
}
return Ok(());
}
let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
for (daemon_id, lf) in log_files {
let file = match File::open(&lf.path) {
Ok(f) => f,
Err(e) => {
error!("{}: {}", lf.path.display(), e);
continue;
}
};
let parser = StreamingLogParser::new(file);
merger.add_source(daemon_id.qualified(), parser);
}
merger.initialize();
for (timestamp, daemon, message) in merger {
let output = if raw {
if single_daemon {
format!("{timestamp} {message}\n")
} else {
format!("{timestamp} {daemon} {message}\n")
}
} else {
format!(
"{}\n",
format_log_line(×tamp, &daemon, &message, single_daemon)
)
};
if io::stdout().write_all(output.as_bytes()).is_err() {
return Ok(());
}
}
Ok(())
}
}
fn should_use_pager(line_count: usize) -> bool {
if !io::stdout().is_terminal() {
return false;
}
let terminal_height = get_terminal_height().unwrap_or(24);
line_count > terminal_height
}
fn get_terminal_height() -> Option<usize> {
if let Ok(rows) = std::env::var("LINES")
&& let Ok(h) = rows.parse::<usize>()
{
return Some(h);
}
crossterm::terminal::size().ok().map(|(_, h)| h as usize)
}
fn read_lines_in_time_range(
path: &Path,
from: Option<DateTime<Local>>,
to: Option<DateTime<Local>>,
) -> Result<Vec<String>> {
let mut file = File::open(path).into_diagnostic()?;
let file_size = file.metadata().into_diagnostic()?.len();
if file_size == 0 {
return Ok(vec![]);
}
let start_pos = if let Some(from_time) = from {
binary_search_log_position(&mut file, file_size, from_time, true)?
} else {
0
};
let end_pos = if let Some(to_time) = to {
binary_search_log_position(&mut file, file_size, to_time, false)?
} else {
file_size
};
if start_pos >= end_pos {
return Ok(vec![]);
}
file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
let mut reader = BufReader::new(&file);
let mut lines = Vec::new();
let mut current_pos = start_pos;
loop {
if current_pos >= end_pos {
break;
}
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) => break,
Ok(bytes_read) => {
current_pos += bytes_read as u64;
if line.ends_with('\n') {
line.pop();
if line.ends_with('\r') {
line.pop();
}
}
lines.push(line);
}
Err(_) => break,
}
}
Ok(lines)
}
fn binary_search_log_position(
file: &mut File,
file_size: u64,
target_time: DateTime<Local>,
find_start: bool,
) -> Result<u64> {
let mut low: u64 = 0;
let mut high: u64 = file_size;
while low < high {
let mid = low + (high - low) / 2;
let line_start = find_line_start(file, mid)?;
file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
let mut reader = BufReader::new(&*file);
let mut line = String::new();
let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
if bytes_read == 0 {
high = mid;
continue;
}
let line_time = extract_timestamp(&line);
match line_time {
Some(lt) => {
if find_start {
if lt < target_time {
low = line_start + bytes_read as u64;
} else {
high = line_start;
}
} else if lt <= target_time {
low = line_start + bytes_read as u64;
} else {
high = line_start;
}
}
None => {
low = line_start + bytes_read as u64;
}
}
}
find_line_start(file, low)
}
fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
if pos == 0 {
return Ok(0);
}
let mut search_pos = pos.saturating_sub(1);
const CHUNK_SIZE: usize = 8192;
loop {
let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
let len_u64 = search_pos - chunk_start + 1;
let len = len_u64 as usize;
file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
let mut buf = vec![0u8; len];
if file.read_exact(&mut buf).is_err() {
return Ok(0);
}
for (i, &b) in buf.iter().enumerate().rev() {
if b == b'\n' {
return Ok(chunk_start + i as u64 + 1);
}
}
if chunk_start == 0 {
return Ok(0);
}
search_pos = chunk_start - 1;
}
}
fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
re.captures(line)
.and_then(|caps| caps.get(1))
.and_then(|m| parse_datetime(m.as_str()).ok())
}
fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
let lines = if reverse {
lines.into_iter().rev().collect()
} else {
lines
};
let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
lines
.into_iter()
.fold(vec![], |mut acc, line| match re.captures(&line) {
Some(caps) => {
let (date, msg) = match (caps.get(1), caps.get(3)) {
(Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
_ => return acc,
};
acc.push((date, id.to_string(), msg));
acc
}
None => {
if let Some(l) = acc.last_mut() {
l.2.push('\n');
l.2.push_str(&line);
}
acc
}
})
}
fn migrate_legacy_log_dirs() {
let known_safe_paths = known_daemon_safe_paths();
let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
Ok(d) => d,
Err(_) => return,
};
for dir in dirs {
if dir.starts_with(".") || !dir.is_dir() {
continue;
}
let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
Some(n) => n,
None => continue,
};
if name.contains("--") {
if DaemonId::from_safe_path(&name).is_ok() {
continue;
}
if known_safe_paths.contains(&name) {
continue;
}
warn!(
"Skipping invalid legacy log directory '{name}': contains '--' but is not a valid daemon safe-path"
);
continue;
}
let old_log = dir.join(format!("{name}.log"));
if !old_log.exists() {
continue;
}
if DaemonId::try_new("legacy", &name).is_err() {
warn!("Skipping invalid legacy log directory '{name}': not a valid daemon ID");
continue;
}
let new_name = format!("legacy--{name}");
let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
if new_dir.exists() {
continue;
}
if std::fs::rename(&dir, &new_dir).is_err() {
continue;
}
let old_log = new_dir.join(format!("{name}.log"));
let new_log = new_dir.join(format!("{new_name}.log"));
if old_log.exists() {
let _ = std::fs::rename(&old_log, &new_log);
}
debug!("Migrated legacy log dir '{name}' → '{new_name}'");
}
}
fn known_daemon_safe_paths() -> BTreeSet<String> {
let mut out = BTreeSet::new();
match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
Ok(state) => {
for id in state.daemons.keys() {
out.insert(id.safe_path());
}
}
Err(e) => {
warn!("Failed to read state while checking known daemon IDs: {e}");
}
}
match PitchforkToml::all_merged() {
Ok(config) => {
for id in config.daemons.keys() {
out.insert(id.safe_path());
}
}
Err(e) => {
warn!("Failed to read config while checking known daemon IDs: {e}");
}
}
out
}
fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
let mut ids = BTreeSet::new();
match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
Ok(state) => ids.extend(state.daemons.keys().cloned()),
Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
}
match PitchforkToml::all_merged() {
Ok(config) => ids.extend(config.daemons.keys().cloned()),
Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
}
Ok(ids
.into_iter()
.filter(|id| id.log_path().exists())
.collect())
}
fn get_log_file_infos(names: &[DaemonId]) -> Result<BTreeMap<DaemonId, LogFile>> {
let mut out = BTreeMap::new();
for daemon_id in names {
let path = daemon_id.log_path();
if !path.exists() {
continue;
}
let mut file = xx::file::open(&path)?;
file.seek(SeekFrom::End(0)).into_diagnostic()?;
let cur = file.stream_position().into_diagnostic()?;
out.insert(
daemon_id.clone(),
LogFile {
_name: daemon_id.clone(),
file,
cur,
path,
},
);
}
Ok(out)
}
pub async fn tail_logs(names: &[DaemonId]) -> Result<()> {
let mut log_files = get_log_file_infos(names)?;
let mut wf = WatchFiles::new(Duration::from_millis(10))?;
for lf in log_files.values() {
wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
}
let files_to_name: HashMap<PathBuf, DaemonId> = log_files
.iter()
.map(|(n, f)| (f.path.clone(), n.clone()))
.collect();
while let Some(paths) = wf.rx.recv().await {
let mut out = vec![];
for path in paths {
let Some(name) = files_to_name.get(&path) else {
warn!("Unknown log file changed: {}", path.display());
continue;
};
let Some(info) = log_files.get_mut(name) else {
warn!("No log info for: {name}");
continue;
};
info.file
.seek(SeekFrom::Start(info.cur))
.into_diagnostic()?;
let reader = BufReader::new(&info.file);
let lines = reader.lines().map_while(Result::ok).collect_vec();
info.cur = info.file.stream_position().into_diagnostic()?;
out.extend(merge_log_lines(&name.qualified(), lines, false));
}
let out = out
.into_iter()
.sorted_by_cached_key(|l| l.0.to_string())
.collect_vec();
for (date, name, msg) in out {
println!("{} {} {}", edim(&date), name, msg);
}
}
Ok(())
}
struct LogFile {
_name: DaemonId,
path: PathBuf,
file: fs::File,
cur: u64,
}
fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
Local
.from_local_datetime(&naive_dt)
.single()
.ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
}
fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
let s = s.trim();
if let Ok(dt) = parse_datetime(s) {
return Ok(dt);
}
if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
return Local
.from_local_datetime(&naive_dt)
.single()
.ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
}
if let Ok(time) = parse_time_only(s) {
let now = Local::now();
let today = now.date_naive();
let mut naive_dt = NaiveDateTime::new(today, time);
let mut dt = Local
.from_local_datetime(&naive_dt)
.single()
.ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
if is_since
&& dt > now
&& let Some(yesterday) = today.pred_opt()
{
naive_dt = NaiveDateTime::new(yesterday, time);
dt = Local
.from_local_datetime(&naive_dt)
.single()
.ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
}
return Ok(dt);
}
if let Ok(duration) = humantime::parse_duration(s) {
let now = Local::now();
let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
return Ok(target);
}
Err(miette::miette!(
"Invalid time format: '{}'. Expected formats:\n\
- Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
- Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
- Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
s
))
}
fn parse_time_only(s: &str) -> Result<NaiveTime> {
if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
return Ok(time);
}
if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
return Ok(time);
}
Err(miette::miette!("Invalid time format: '{}'", s))
}
pub fn print_logs_for_time_range(
daemon_id: &DaemonId,
from: DateTime<Local>,
to: Option<DateTime<Local>>,
) -> Result<()> {
let daemon_ids = vec![daemon_id.clone()];
let log_files = get_log_file_infos(&daemon_ids)?;
let from = from
.with_nanosecond(0)
.expect("0 is always valid for nanoseconds");
let to = to.map(|t| {
t.with_nanosecond(0)
.expect("0 is always valid for nanoseconds")
});
let log_lines = log_files
.iter()
.flat_map(
|(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), to) {
Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
Err(e) => {
error!("{}: {}", lf.path.display(), e);
vec![]
}
},
)
.sorted_by_cached_key(|l| l.0.to_string())
.collect_vec();
if log_lines.is_empty() {
eprintln!("No logs found for daemon '{daemon_id}' in the specified time range");
} else {
eprintln!("\n{} {} {}", edim("==="), edim("Error logs"), edim("==="));
for (date, _id, msg) in log_lines {
eprintln!("{} {}", edim(&date), msg);
}
eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
}
Ok(())
}
pub fn print_startup_logs(daemon_id: &DaemonId, from: DateTime<Local>) -> Result<()> {
let daemon_ids = vec![daemon_id.clone()];
let log_files = get_log_file_infos(&daemon_ids)?;
let from = from
.with_nanosecond(0)
.expect("0 is always valid for nanoseconds");
let log_lines = log_files
.iter()
.flat_map(
|(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), None) {
Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
Err(e) => {
error!("{}: {}", lf.path.display(), e);
vec![]
}
},
)
.sorted_by_cached_key(|l| l.0.to_string())
.collect_vec();
if !log_lines.is_empty() {
eprintln!("\n{} {} {}", edim("==="), edim("Startup logs"), edim("==="));
for (date, _id, msg) in log_lines {
eprintln!("{} {}", edim(&date), msg);
}
eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
}
Ok(())
}