Skip to main content

pitchfork_cli/cli/
logs.rs

1use crate::daemon_id::DaemonId;
2use crate::pitchfork_toml::PitchforkToml;
3use crate::state_file::StateFile;
4use crate::ui::style::edim;
5use crate::watch_files::WatchFiles;
6use crate::{Result, env};
7use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
8use itertools::Itertools;
9use miette::IntoDiagnostic;
10use notify::RecursiveMode;
11use std::cmp::{Ordering, Reverse};
12use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap};
13use std::fs::{self, File};
14use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
15use std::path::{Path, PathBuf};
16use std::process::{Child, Command, Stdio};
17use std::time::Duration;
18use xx::regex;
19
20/// Pager configuration for displaying logs
21struct PagerConfig {
22    command: String,
23    args: Vec<String>,
24}
25
26impl PagerConfig {
27    /// Select and configure the appropriate pager.
28    /// Uses $PAGER environment variable if set, otherwise defaults to less.
29    fn new(start_at_end: bool) -> Self {
30        let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
31        let args = Self::build_args(&command, start_at_end);
32        Self { command, args }
33    }
34
35    fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
36        let mut args = vec![];
37        if pager == "less" {
38            args.push("-R".to_string());
39            if start_at_end {
40                args.push("+G".to_string());
41            }
42        }
43        args
44    }
45
46    /// Spawn the pager with piped stdin
47    fn spawn_piped(&self) -> io::Result<Child> {
48        Command::new(&self.command)
49            .args(&self.args)
50            .stdin(Stdio::piped())
51            .spawn()
52    }
53}
54
55/// Format a single log line for output.
56/// When `single_daemon` is true, omits the daemon ID from the output.
57fn format_log_line(date: &str, id: &str, msg: &str, single_daemon: bool) -> String {
58    if single_daemon {
59        format!("{} {}", edim(date), msg)
60    } else {
61        format!("{} {} {}", edim(date), id, msg)
62    }
63}
64
65/// A parsed log entry with timestamp, daemon name, and message
66#[derive(Debug)]
67struct LogEntry {
68    timestamp: String,
69    daemon: String,
70    message: String,
71    source_idx: usize, // Index of the source iterator
72}
73
74impl PartialEq for LogEntry {
75    fn eq(&self, other: &Self) -> bool {
76        self.timestamp == other.timestamp
77    }
78}
79
80impl Eq for LogEntry {}
81
82impl PartialOrd for LogEntry {
83    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
84        Some(self.cmp(other))
85    }
86}
87
88impl Ord for LogEntry {
89    fn cmp(&self, other: &Self) -> Ordering {
90        self.timestamp.cmp(&other.timestamp)
91    }
92}
93
94/// Streaming merger for multiple sorted log files using a min-heap.
95/// This allows merging sorted iterators without loading all data into memory.
96struct StreamingMerger<I>
97where
98    I: Iterator<Item = (String, String)>,
99{
100    sources: Vec<(String, I)>,           // (daemon_name, line_iterator)
101    heap: BinaryHeap<Reverse<LogEntry>>, // Min-heap (using Reverse for ascending order)
102}
103
104impl<I> StreamingMerger<I>
105where
106    I: Iterator<Item = (String, String)>,
107{
108    fn new() -> Self {
109        Self {
110            sources: Vec::new(),
111            heap: BinaryHeap::new(),
112        }
113    }
114
115    fn add_source(&mut self, daemon_name: String, iter: I) {
116        self.sources.push((daemon_name, iter));
117    }
118
119    fn initialize(&mut self) {
120        // Pull the first entry from each source into the heap
121        for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
122            if let Some((timestamp, message)) = iter.next() {
123                self.heap.push(Reverse(LogEntry {
124                    timestamp,
125                    daemon: daemon.clone(),
126                    message,
127                    source_idx: idx,
128                }));
129            }
130        }
131    }
132}
133
134impl<I> Iterator for StreamingMerger<I>
135where
136    I: Iterator<Item = (String, String)>,
137{
138    type Item = (String, String, String); // (timestamp, daemon, message)
139
140    fn next(&mut self) -> Option<Self::Item> {
141        // Pop the smallest entry from the heap
142        let Reverse(entry) = self.heap.pop()?;
143
144        // Pull the next entry from the same source and push to heap
145        let (daemon, iter) = &mut self.sources[entry.source_idx];
146        if let Some((timestamp, message)) = iter.next() {
147            self.heap.push(Reverse(LogEntry {
148                timestamp,
149                daemon: daemon.clone(),
150                message,
151                source_idx: entry.source_idx,
152            }));
153        }
154
155        Some((entry.timestamp, entry.daemon, entry.message))
156    }
157}
158
159/// A proper streaming log parser that handles multi-line entries
160struct StreamingLogParser {
161    reader: BufReader<File>,
162    current_entry: Option<(String, String)>,
163    finished: bool,
164}
165
166impl StreamingLogParser {
167    fn new(file: File) -> Self {
168        Self {
169            reader: BufReader::new(file),
170            current_entry: None,
171            finished: false,
172        }
173    }
174}
175
176impl Iterator for StreamingLogParser {
177    type Item = (String, String);
178
179    fn next(&mut self) -> Option<Self::Item> {
180        if self.finished {
181            return None;
182        }
183
184        let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
185
186        loop {
187            let mut line = String::new();
188            match self.reader.read_line(&mut line) {
189                Ok(0) => {
190                    // EOF - return the last entry if any
191                    self.finished = true;
192                    return self.current_entry.take();
193                }
194                Ok(_) => {
195                    // Remove trailing newline
196                    if line.ends_with('\n') {
197                        line.pop();
198                        if line.ends_with('\r') {
199                            line.pop();
200                        }
201                    }
202
203                    if let Some(caps) = re.captures(&line) {
204                        let date = match caps.get(1) {
205                            Some(d) => d.as_str().to_string(),
206                            None => continue,
207                        };
208                        let msg = match caps.get(3) {
209                            Some(m) => m.as_str().to_string(),
210                            None => continue,
211                        };
212
213                        // Return the previous entry and start a new one
214                        let prev = self.current_entry.take();
215                        self.current_entry = Some((date, msg));
216
217                        if prev.is_some() {
218                            return prev;
219                        }
220                        // First entry - continue to read more
221                    } else {
222                        // Continuation line - append to current entry
223                        if let Some((_, ref mut msg)) = self.current_entry {
224                            msg.push('\n');
225                            msg.push_str(&line);
226                        }
227                    }
228                }
229                Err(_) => {
230                    self.finished = true;
231                    return self.current_entry.take();
232                }
233            }
234        }
235    }
236}
237
238/// Displays logs for daemon(s)
239#[derive(Debug, clap::Args)]
240#[clap(
241    visible_alias = "l",
242    verbatim_doc_comment,
243    long_about = "\
244Displays logs for daemon(s)
245
246Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
247and include timestamps for filtering.
248
249Examples:
250  pitchfork logs api              Show all logs for 'api' (paged if needed)
251  pitchfork logs api worker       Show logs for multiple daemons
252  pitchfork logs                  Show logs for all daemons
253  pitchfork logs api -n 50        Show last 50 lines
254  pitchfork logs api --follow     Follow logs in real-time
255  pitchfork logs api --since '2024-01-15 10:00:00'
256                                  Show logs since a specific time (forward)
257  pitchfork logs api --since '10:30:00'
258                                  Show logs since 10:30:00 today
259  pitchfork logs api --since '10:30' --until '12:00'
260                                  Show logs since 10:30:00 until 12:00:00 today
261  pitchfork logs api --since 5min Show logs from last 5 minutes
262  pitchfork logs api --raw        Output raw log lines without formatting
263  pitchfork logs api --raw -n 100 Output last 100 raw log lines
264  pitchfork logs api --clear      Delete logs for 'api'
265  pitchfork logs --clear          Delete logs for all daemons"
266)]
267pub struct Logs {
268    /// Show only logs for the specified daemon(s)
269    id: Vec<String>,
270
271    /// Delete logs
272    #[clap(short, long)]
273    clear: bool,
274
275    /// Show last N lines of logs
276    ///
277    /// Only applies when --since/--until is not used.
278    /// Without this option, all logs are shown.
279    #[clap(short)]
280    n: Option<usize>,
281
282    /// Show logs in real-time
283    #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
284    tail: bool,
285
286    /// Show logs from this time
287    ///
288    /// Supports multiple formats:
289    /// - Full datetime: "YYYY-MM-DD HH:MM:SS" or "YYYY-MM-DD HH:MM"
290    /// - Time only: "HH:MM:SS" or "HH:MM" (uses today's date)
291    /// - Relative time: "5min", "2h", "1d" (e.g., last 5 minutes)
292    #[clap(short = 's', long)]
293    since: Option<String>,
294
295    /// Show logs until this time
296    ///
297    /// Supports multiple formats:
298    /// - Full datetime: "YYYY-MM-DD HH:MM:SS" or "YYYY-MM-DD HH:MM"
299    /// - Time only: "HH:MM:SS" or "HH:MM" (uses today's date)
300    #[clap(short = 'u', long)]
301    until: Option<String>,
302
303    /// Disable pager even in interactive terminal
304    #[clap(long)]
305    no_pager: bool,
306
307    /// Output raw log lines without color or formatting
308    #[clap(long)]
309    raw: bool,
310}
311
312impl Logs {
313    pub async fn run(&self) -> Result<()> {
314        // Migrate legacy log directories (old format: "api" → new format: "legacy--api").
315        // This is idempotent and silent so it is safe to run on every invocation.
316        migrate_legacy_log_dirs();
317
318        // Resolve user-provided IDs to qualified IDs
319        let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
320            // When no IDs provided, use all daemon IDs
321            get_all_daemon_ids()?
322        } else {
323            PitchforkToml::resolve_ids(&self.id)?
324        };
325
326        if self.clear {
327            for id in &resolved_ids {
328                let path = id.log_path();
329                if path.exists() {
330                    xx::file::create(&path)?;
331                }
332            }
333            return Ok(());
334        }
335
336        let from = if let Some(since) = self.since.as_ref() {
337            Some(parse_time_input(since, true)?)
338        } else {
339            None
340        };
341        let to = if let Some(until) = self.until.as_ref() {
342            Some(parse_time_input(until, false)?)
343        } else {
344            None
345        };
346
347        self.print_existing_logs(&resolved_ids, from, to)?;
348        if self.tail {
349            tail_logs(&resolved_ids).await?;
350        }
351
352        Ok(())
353    }
354
355    fn print_existing_logs(
356        &self,
357        resolved_ids: &[DaemonId],
358        from: Option<DateTime<Local>>,
359        to: Option<DateTime<Local>>,
360    ) -> Result<()> {
361        let log_files = get_log_file_infos(resolved_ids)?;
362        trace!("log files for: {}", log_files.keys().join(", "));
363        let single_daemon = resolved_ids.len() == 1;
364        let has_time_filter = from.is_some() || to.is_some();
365
366        if has_time_filter {
367            let mut log_lines = self.collect_log_lines_forward(&log_files, from, to)?;
368
369            if let Some(n) = self.n {
370                let len = log_lines.len();
371                if len > n {
372                    log_lines = log_lines.into_iter().skip(len - n).collect_vec();
373                }
374            }
375
376            self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
377        } else if let Some(n) = self.n {
378            let log_lines = self.collect_log_lines_reverse(&log_files, Some(n))?;
379            self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
380        } else {
381            self.stream_logs_to_pager(&log_files, single_daemon, self.raw)?;
382        }
383
384        Ok(())
385    }
386
387    fn collect_log_lines_forward(
388        &self,
389        log_files: &BTreeMap<DaemonId, LogFile>,
390        from: Option<DateTime<Local>>,
391        to: Option<DateTime<Local>>,
392    ) -> Result<Vec<(String, String, String)>> {
393        let log_lines: Vec<(String, String, String)> = log_files
394            .iter()
395            .flat_map(
396                |(name, lf)| match read_lines_in_time_range(&lf.path, from, to) {
397                    Ok(lines) => merge_log_lines(&name.qualified(), lines, false),
398                    Err(e) => {
399                        error!("{}: {}", lf.path.display(), e);
400                        vec![]
401                    }
402                },
403            )
404            .sorted_by_cached_key(|l| l.0.to_string())
405            .collect_vec();
406
407        Ok(log_lines)
408    }
409
410    fn collect_log_lines_reverse(
411        &self,
412        log_files: &BTreeMap<DaemonId, LogFile>,
413        limit: Option<usize>,
414    ) -> Result<Vec<(String, String, String)>> {
415        let log_lines: Vec<(String, String, String)> = log_files
416            .iter()
417            .flat_map(|(daemon_id, lf)| {
418                let rev = match xx::file::open(&lf.path) {
419                    Ok(f) => rev_lines::RevLines::new(f),
420                    Err(e) => {
421                        error!("{}: {}", lf.path.display(), e);
422                        return vec![];
423                    }
424                };
425                let lines = rev.into_iter().filter_map(Result::ok);
426                let lines = match limit {
427                    Some(n) => lines.take(n).collect_vec(),
428                    None => lines.collect_vec(),
429                };
430                merge_log_lines(&daemon_id.qualified(), lines, true)
431            })
432            .sorted_by_cached_key(|l| l.0.to_string())
433            .collect_vec();
434
435        let log_lines = match limit {
436            Some(n) => {
437                let len = log_lines.len();
438                if len > n {
439                    log_lines.into_iter().skip(len - n).collect_vec()
440                } else {
441                    log_lines
442                }
443            }
444            None => log_lines,
445        };
446
447        Ok(log_lines)
448    }
449
450    fn output_logs(
451        &self,
452        log_lines: Vec<(String, String, String)>,
453        single_daemon: bool,
454        has_time_filter: bool,
455        raw: bool,
456    ) -> Result<()> {
457        if log_lines.is_empty() {
458            return Ok(());
459        }
460
461        // Raw mode: output without formatting and without pager
462        if raw {
463            for (date, id, msg) in log_lines {
464                if single_daemon {
465                    println!("{date} {msg}");
466                } else {
467                    println!("{date} {id} {msg}");
468                }
469            }
470            return Ok(());
471        }
472
473        let use_pager = !self.no_pager && should_use_pager(log_lines.len());
474
475        if use_pager {
476            self.output_with_pager(log_lines, single_daemon, has_time_filter)?;
477        } else {
478            for (date, id, msg) in log_lines {
479                println!("{}", format_log_line(&date, &id, &msg, single_daemon));
480            }
481        }
482
483        Ok(())
484    }
485
486    fn output_with_pager(
487        &self,
488        log_lines: Vec<(String, String, String)>,
489        single_daemon: bool,
490        has_time_filter: bool,
491    ) -> Result<()> {
492        // When time filter is used, start at top; otherwise start at end
493        let pager_config = PagerConfig::new(!has_time_filter);
494
495        match pager_config.spawn_piped() {
496            Ok(mut child) => {
497                if let Some(stdin) = child.stdin.as_mut() {
498                    for (date, id, msg) in log_lines {
499                        let line =
500                            format!("{}\n", format_log_line(&date, &id, &msg, single_daemon));
501                        if stdin.write_all(line.as_bytes()).is_err() {
502                            break;
503                        }
504                    }
505                    let _ = child.wait();
506                } else {
507                    debug!("Failed to get pager stdin, falling back to direct output");
508                    for (date, id, msg) in log_lines {
509                        println!("{}", format_log_line(&date, &id, &msg, single_daemon));
510                    }
511                }
512            }
513            Err(e) => {
514                debug!("Failed to spawn pager: {e}, falling back to direct output");
515                for (date, id, msg) in log_lines {
516                    println!("{}", format_log_line(&date, &id, &msg, single_daemon));
517                }
518            }
519        }
520
521        Ok(())
522    }
523
524    fn stream_logs_to_pager(
525        &self,
526        log_files: &BTreeMap<DaemonId, LogFile>,
527        single_daemon: bool,
528        raw: bool,
529    ) -> Result<()> {
530        if !io::stdout().is_terminal() || self.no_pager || self.tail || raw {
531            return self.stream_logs_direct(log_files, single_daemon, raw);
532        }
533
534        let pager_config = PagerConfig::new(true); // start_at_end = true
535
536        match pager_config.spawn_piped() {
537            Ok(mut child) => {
538                if let Some(stdin) = child.stdin.take() {
539                    // Collect file info for the streaming thread
540                    let log_files_clone: Vec<_> = log_files
541                        .iter()
542                        .map(|(daemon_id, lf)| (daemon_id.qualified(), lf.path.clone()))
543                        .collect();
544                    let single_daemon_clone = single_daemon;
545
546                    // Stream logs using a background thread to avoid blocking
547                    std::thread::spawn(move || {
548                        let mut writer = BufWriter::new(stdin);
549
550                        // Single file: stream directly without merge overhead
551                        if log_files_clone.len() == 1 {
552                            let (name, path) = &log_files_clone[0];
553                            let file = match File::open(path) {
554                                Ok(f) => f,
555                                Err(_) => return,
556                            };
557                            let parser = StreamingLogParser::new(file);
558                            for (timestamp, message) in parser {
559                                let output = format!(
560                                    "{}\n",
561                                    format_log_line(
562                                        &timestamp,
563                                        name,
564                                        &message,
565                                        single_daemon_clone
566                                    )
567                                );
568                                if writer.write_all(output.as_bytes()).is_err() {
569                                    return;
570                                }
571                            }
572                            let _ = writer.flush();
573                            return;
574                        }
575
576                        // Multiple files: use streaming merger for sorted/interleaved output
577                        let mut merger: StreamingMerger<StreamingLogParser> =
578                            StreamingMerger::new();
579
580                        for (name, path) in log_files_clone {
581                            let file = match File::open(&path) {
582                                Ok(f) => f,
583                                Err(_) => continue,
584                            };
585                            let parser = StreamingLogParser::new(file);
586                            merger.add_source(name, parser);
587                        }
588
589                        // Initialize the heap with first entry from each source
590                        merger.initialize();
591
592                        // Stream merged entries to pager
593                        for (timestamp, daemon, message) in merger {
594                            let output = format!(
595                                "{}\n",
596                                format_log_line(&timestamp, &daemon, &message, single_daemon_clone)
597                            );
598                            if writer.write_all(output.as_bytes()).is_err() {
599                                return;
600                            }
601                        }
602
603                        let _ = writer.flush();
604                    });
605
606                    let _ = child.wait();
607                } else {
608                    debug!("Failed to get pager stdin, falling back to direct output");
609                    return self.stream_logs_direct(log_files, single_daemon, raw);
610                }
611            }
612            Err(e) => {
613                debug!("Failed to spawn pager: {e}, falling back to direct output");
614                return self.stream_logs_direct(log_files, single_daemon, raw);
615            }
616        }
617
618        Ok(())
619    }
620
621    fn stream_logs_direct(
622        &self,
623        log_files: &BTreeMap<DaemonId, LogFile>,
624        single_daemon: bool,
625        raw: bool,
626    ) -> Result<()> {
627        // Fast path for single daemon: directly output file content without parsing
628        // This avoids expensive regex parsing for each line in large log files
629        if log_files.len() == 1 {
630            let (daemon_id, lf) = log_files.iter().next().unwrap();
631            let file = match File::open(&lf.path) {
632                Ok(f) => f,
633                Err(e) => {
634                    error!("{}: {}", lf.path.display(), e);
635                    return Ok(());
636                }
637            };
638            let reader = BufReader::new(file);
639            if raw {
640                // Raw mode: output lines as-is
641                for line in reader.lines() {
642                    match line {
643                        Ok(l) => {
644                            if io::stdout().write_all(l.as_bytes()).is_err()
645                                || io::stdout().write_all(b"\n").is_err()
646                            {
647                                return Ok(());
648                            }
649                        }
650                        Err(_) => continue,
651                    }
652                }
653            } else {
654                // Formatted mode: parse and format each line
655                let parser = StreamingLogParser::new(File::open(&lf.path).into_diagnostic()?);
656                for (timestamp, message) in parser {
657                    let output = format!(
658                        "{}\n",
659                        format_log_line(
660                            &timestamp,
661                            &daemon_id.qualified(),
662                            &message,
663                            single_daemon
664                        )
665                    );
666                    if io::stdout().write_all(output.as_bytes()).is_err() {
667                        return Ok(());
668                    }
669                }
670            }
671            return Ok(());
672        }
673
674        // Multiple daemons: use streaming merger for sorted output
675        let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
676
677        for (daemon_id, lf) in log_files {
678            let file = match File::open(&lf.path) {
679                Ok(f) => f,
680                Err(e) => {
681                    error!("{}: {}", lf.path.display(), e);
682                    continue;
683                }
684            };
685            let parser = StreamingLogParser::new(file);
686            merger.add_source(daemon_id.qualified(), parser);
687        }
688
689        // Initialize the heap with first entry from each source
690        merger.initialize();
691
692        // Stream merged entries to stdout
693        for (timestamp, daemon, message) in merger {
694            let output = if raw {
695                if single_daemon {
696                    format!("{timestamp} {message}\n")
697                } else {
698                    format!("{timestamp} {daemon} {message}\n")
699                }
700            } else {
701                format!(
702                    "{}\n",
703                    format_log_line(&timestamp, &daemon, &message, single_daemon)
704                )
705            };
706            if io::stdout().write_all(output.as_bytes()).is_err() {
707                return Ok(());
708            }
709        }
710
711        Ok(())
712    }
713}
714
715fn should_use_pager(line_count: usize) -> bool {
716    if !io::stdout().is_terminal() {
717        return false;
718    }
719
720    let terminal_height = get_terminal_height().unwrap_or(24);
721    line_count > terminal_height
722}
723
724fn get_terminal_height() -> Option<usize> {
725    if let Ok(rows) = std::env::var("LINES")
726        && let Ok(h) = rows.parse::<usize>()
727    {
728        return Some(h);
729    }
730
731    crossterm::terminal::size().ok().map(|(_, h)| h as usize)
732}
733
734fn read_lines_in_time_range(
735    path: &Path,
736    from: Option<DateTime<Local>>,
737    to: Option<DateTime<Local>>,
738) -> Result<Vec<String>> {
739    let mut file = File::open(path).into_diagnostic()?;
740    let file_size = file.metadata().into_diagnostic()?.len();
741
742    if file_size == 0 {
743        return Ok(vec![]);
744    }
745
746    let start_pos = if let Some(from_time) = from {
747        binary_search_log_position(&mut file, file_size, from_time, true)?
748    } else {
749        0
750    };
751
752    let end_pos = if let Some(to_time) = to {
753        binary_search_log_position(&mut file, file_size, to_time, false)?
754    } else {
755        file_size
756    };
757
758    if start_pos >= end_pos {
759        return Ok(vec![]);
760    }
761
762    file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
763    let mut reader = BufReader::new(&file);
764    let mut lines = Vec::new();
765    let mut current_pos = start_pos;
766
767    loop {
768        if current_pos >= end_pos {
769            break;
770        }
771
772        let mut line = String::new();
773        match reader.read_line(&mut line) {
774            Ok(0) => break,
775            Ok(bytes_read) => {
776                current_pos += bytes_read as u64;
777                if line.ends_with('\n') {
778                    line.pop();
779                    if line.ends_with('\r') {
780                        line.pop();
781                    }
782                }
783                lines.push(line);
784            }
785            Err(_) => break,
786        }
787    }
788
789    Ok(lines)
790}
791
792fn binary_search_log_position(
793    file: &mut File,
794    file_size: u64,
795    target_time: DateTime<Local>,
796    find_start: bool,
797) -> Result<u64> {
798    let mut low: u64 = 0;
799    let mut high: u64 = file_size;
800
801    while low < high {
802        let mid = low + (high - low) / 2;
803
804        let line_start = find_line_start(file, mid)?;
805
806        file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
807        let mut reader = BufReader::new(&*file);
808        let mut line = String::new();
809        let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
810        if bytes_read == 0 {
811            high = mid;
812            continue;
813        }
814
815        let line_time = extract_timestamp(&line);
816
817        match line_time {
818            Some(lt) => {
819                if find_start {
820                    if lt < target_time {
821                        low = line_start + bytes_read as u64;
822                    } else {
823                        high = line_start;
824                    }
825                } else if lt <= target_time {
826                    low = line_start + bytes_read as u64;
827                } else {
828                    high = line_start;
829                }
830            }
831            None => {
832                low = line_start + bytes_read as u64;
833            }
834        }
835    }
836
837    find_line_start(file, low)
838}
839
840fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
841    if pos == 0 {
842        return Ok(0);
843    }
844
845    // Start searching from the byte just before `pos`.
846    let mut search_pos = pos.saturating_sub(1);
847    const CHUNK_SIZE: usize = 8192;
848
849    loop {
850        // Determine the start of the chunk we want to read.
851        let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
852        let len_u64 = search_pos - chunk_start + 1;
853        let len = len_u64 as usize;
854
855        // Seek once to the beginning of this chunk.
856        file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
857        let mut buf = vec![0u8; len];
858        if file.read_exact(&mut buf).is_err() {
859            // Match the original behavior: on read error, fall back to start of file.
860            return Ok(0);
861        }
862
863        // Scan this chunk backwards for a newline.
864        for (i, &b) in buf.iter().enumerate().rev() {
865            if b == b'\n' {
866                return Ok(chunk_start + i as u64 + 1);
867            }
868        }
869
870        // No newline in this chunk; if we've reached the start of the file,
871        // there is no earlier newline.
872        if chunk_start == 0 {
873            return Ok(0);
874        }
875
876        // Move to the previous chunk (just before this one).
877        search_pos = chunk_start - 1;
878    }
879}
880
881fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
882    let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
883    re.captures(line)
884        .and_then(|caps| caps.get(1))
885        .and_then(|m| parse_datetime(m.as_str()).ok())
886}
887
888fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
889    let lines = if reverse {
890        lines.into_iter().rev().collect()
891    } else {
892        lines
893    };
894
895    let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
896    lines
897        .into_iter()
898        .fold(vec![], |mut acc, line| match re.captures(&line) {
899            Some(caps) => {
900                let (date, msg) = match (caps.get(1), caps.get(3)) {
901                    (Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
902                    _ => return acc,
903                };
904                acc.push((date, id.to_string(), msg));
905                acc
906            }
907            None => {
908                if let Some(l) = acc.last_mut() {
909                    l.2.push('\n');
910                    l.2.push_str(&line);
911                }
912                acc
913            }
914        })
915}
916
917/// Rename legacy log directories that predate namespace-qualified daemon IDs.
918///
919/// Old layout: `PITCHFORK_LOGS_DIR/<name>/<name>.log`
920/// New layout: `PITCHFORK_LOGS_DIR/legacy--<name>/legacy--<name>.log`
921///
922/// Only directories that clearly match the old layout are migrated:
923/// - directory name does not contain `"--"`
924/// - directory contains `<name>.log`
925/// - `<name>` is a valid daemon short name under current DaemonId rules
926fn migrate_legacy_log_dirs() {
927    let known_safe_paths = known_daemon_safe_paths();
928    let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
929        Ok(d) => d,
930        Err(_) => return,
931    };
932    for dir in dirs {
933        if dir.starts_with(".") || !dir.is_dir() {
934            continue;
935        }
936        let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
937            Some(n) => n,
938            None => continue,
939        };
940        // New-format directories usually contain "--". For safety, only treat
941        // them as new-format if they match a known daemon ID safe-path.
942        if name.contains("--") {
943            // If it parses as a valid safe-path, treat it as already migrated
944            // and keep idempotent behavior silent.
945            if DaemonId::from_safe_path(&name).is_ok() {
946                continue;
947            }
948            // Keep noisy warnings only for invalid/ambiguous names that cannot
949            // be interpreted as new-format IDs.
950            if known_safe_paths.contains(&name) {
951                continue;
952            }
953            warn!(
954                "Skipping invalid legacy log directory '{name}': contains '--' but is not a valid daemon safe-path"
955            );
956            continue;
957        }
958
959        // Migrate only explicit old-layout directories to avoid renaming
960        // unrelated folders under logs/.
961        let old_log = dir.join(format!("{name}.log"));
962        if !old_log.exists() {
963            continue;
964        }
965        if DaemonId::try_new("legacy", &name).is_err() {
966            warn!("Skipping invalid legacy log directory '{name}': not a valid daemon ID");
967            continue;
968        }
969
970        let new_name = format!("legacy--{name}");
971        let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
972        // Skip if a target directory already exists to avoid clobbering data.
973        if new_dir.exists() {
974            continue;
975        }
976        if std::fs::rename(&dir, &new_dir).is_err() {
977            continue;
978        }
979        // Also rename the log file inside the directory.
980        let old_log = new_dir.join(format!("{name}.log"));
981        let new_log = new_dir.join(format!("{new_name}.log"));
982        if old_log.exists() {
983            let _ = std::fs::rename(&old_log, &new_log);
984        }
985        debug!("Migrated legacy log dir '{name}' → '{new_name}'");
986    }
987}
988
989fn known_daemon_safe_paths() -> BTreeSet<String> {
990    let mut out = BTreeSet::new();
991
992    match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
993        Ok(state) => {
994            for id in state.daemons.keys() {
995                out.insert(id.safe_path());
996            }
997        }
998        Err(e) => {
999            warn!("Failed to read state while checking known daemon IDs: {e}");
1000        }
1001    }
1002
1003    match PitchforkToml::all_merged() {
1004        Ok(config) => {
1005            for id in config.daemons.keys() {
1006                out.insert(id.safe_path());
1007            }
1008        }
1009        Err(e) => {
1010            warn!("Failed to read config while checking known daemon IDs: {e}");
1011        }
1012    }
1013
1014    out
1015}
1016
1017fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
1018    let mut ids = BTreeSet::new();
1019
1020    match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1021        Ok(state) => ids.extend(state.daemons.keys().cloned()),
1022        Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
1023    }
1024
1025    match PitchforkToml::all_merged() {
1026        Ok(config) => ids.extend(config.daemons.keys().cloned()),
1027        Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
1028    }
1029
1030    Ok(ids
1031        .into_iter()
1032        .filter(|id| id.log_path().exists())
1033        .collect())
1034}
1035
1036fn get_log_file_infos(names: &[DaemonId]) -> Result<BTreeMap<DaemonId, LogFile>> {
1037    let mut out = BTreeMap::new();
1038
1039    for daemon_id in names {
1040        let path = daemon_id.log_path();
1041
1042        // Directory may exist before the first log line is written.
1043        if !path.exists() {
1044            continue;
1045        }
1046
1047        let mut file = xx::file::open(&path)?;
1048        // Seek to end and get position atomically to avoid race condition
1049        // where content is written between metadata check and file open
1050        file.seek(SeekFrom::End(0)).into_diagnostic()?;
1051        let cur = file.stream_position().into_diagnostic()?;
1052
1053        out.insert(
1054            daemon_id.clone(),
1055            LogFile {
1056                _name: daemon_id.clone(),
1057                file,
1058                cur,
1059                path,
1060            },
1061        );
1062    }
1063
1064    Ok(out)
1065}
1066
1067pub async fn tail_logs(names: &[DaemonId]) -> Result<()> {
1068    let mut log_files = get_log_file_infos(names)?;
1069    let mut wf = WatchFiles::new(Duration::from_millis(10))?;
1070
1071    for lf in log_files.values() {
1072        wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
1073    }
1074
1075    let files_to_name: HashMap<PathBuf, DaemonId> = log_files
1076        .iter()
1077        .map(|(n, f)| (f.path.clone(), n.clone()))
1078        .collect();
1079
1080    while let Some(paths) = wf.rx.recv().await {
1081        let mut out = vec![];
1082        for path in paths {
1083            let Some(name) = files_to_name.get(&path) else {
1084                warn!("Unknown log file changed: {}", path.display());
1085                continue;
1086            };
1087            let Some(info) = log_files.get_mut(name) else {
1088                warn!("No log info for: {name}");
1089                continue;
1090            };
1091            info.file
1092                .seek(SeekFrom::Start(info.cur))
1093                .into_diagnostic()?;
1094            let reader = BufReader::new(&info.file);
1095            let lines = reader.lines().map_while(Result::ok).collect_vec();
1096            info.cur = info.file.stream_position().into_diagnostic()?;
1097            out.extend(merge_log_lines(&name.qualified(), lines, false));
1098        }
1099        let out = out
1100            .into_iter()
1101            .sorted_by_cached_key(|l| l.0.to_string())
1102            .collect_vec();
1103        for (date, name, msg) in out {
1104            println!("{} {} {}", edim(&date), name, msg);
1105        }
1106    }
1107    Ok(())
1108}
1109
1110struct LogFile {
1111    _name: DaemonId,
1112    path: PathBuf,
1113    file: fs::File,
1114    cur: u64,
1115}
1116
1117fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
1118    let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
1119    Local
1120        .from_local_datetime(&naive_dt)
1121        .single()
1122        .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
1123}
1124
1125/// Parse time input string into DateTime.
1126///
1127/// `is_since` indicates whether this is for --since (true) or --until (false).
1128/// The "yesterday fallback" only applies to --since: if the time is in the future,
1129/// assume the user meant yesterday. For --until, future times are kept as-is.
1130fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
1131    let s = s.trim();
1132
1133    // Try full datetime first (YYYY-MM-DD HH:MM:SS)
1134    if let Ok(dt) = parse_datetime(s) {
1135        return Ok(dt);
1136    }
1137
1138    // Try datetime without seconds (YYYY-MM-DD HH:MM)
1139    if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
1140        return Local
1141            .from_local_datetime(&naive_dt)
1142            .single()
1143            .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
1144    }
1145
1146    // Try time-only format (HH:MM:SS or HH:MM)
1147    // Note: This branch won't be reached for inputs like "10:30" that could match
1148    // parse_datetime, because parse_datetime expects a full date prefix and will fail.
1149    if let Ok(time) = parse_time_only(s) {
1150        let now = Local::now();
1151        let today = now.date_naive();
1152        let mut naive_dt = NaiveDateTime::new(today, time);
1153        let mut dt = Local
1154            .from_local_datetime(&naive_dt)
1155            .single()
1156            .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1157
1158        // If the interpreted time for today is in the future, assume the user meant yesterday
1159        // BUT only for --since. For --until, a future time today is valid.
1160        if is_since
1161            && dt > now
1162            && let Some(yesterday) = today.pred_opt()
1163        {
1164            naive_dt = NaiveDateTime::new(yesterday, time);
1165            dt = Local
1166                .from_local_datetime(&naive_dt)
1167                .single()
1168                .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1169        }
1170        return Ok(dt);
1171    }
1172
1173    if let Ok(duration) = humantime::parse_duration(s) {
1174        let now = Local::now();
1175        let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
1176        return Ok(target);
1177    }
1178
1179    Err(miette::miette!(
1180        "Invalid time format: '{}'. Expected formats:\n\
1181         - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
1182         - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
1183         - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
1184        s
1185    ))
1186}
1187
1188fn parse_time_only(s: &str) -> Result<NaiveTime> {
1189    if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
1190        return Ok(time);
1191    }
1192
1193    if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
1194        return Ok(time);
1195    }
1196
1197    Err(miette::miette!("Invalid time format: '{}'", s))
1198}
1199
1200pub fn print_logs_for_time_range(
1201    daemon_id: &DaemonId,
1202    from: DateTime<Local>,
1203    to: Option<DateTime<Local>>,
1204) -> Result<()> {
1205    let daemon_ids = vec![daemon_id.clone()];
1206    let log_files = get_log_file_infos(&daemon_ids)?;
1207
1208    let from = from
1209        .with_nanosecond(0)
1210        .expect("0 is always valid for nanoseconds");
1211    let to = to.map(|t| {
1212        t.with_nanosecond(0)
1213            .expect("0 is always valid for nanoseconds")
1214    });
1215
1216    let log_lines = log_files
1217        .iter()
1218        .flat_map(
1219            |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), to) {
1220                Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1221                Err(e) => {
1222                    error!("{}: {}", lf.path.display(), e);
1223                    vec![]
1224                }
1225            },
1226        )
1227        .sorted_by_cached_key(|l| l.0.to_string())
1228        .collect_vec();
1229
1230    if log_lines.is_empty() {
1231        eprintln!("No logs found for daemon '{daemon_id}' in the specified time range");
1232    } else {
1233        eprintln!("\n{} {} {}", edim("==="), edim("Error logs"), edim("==="));
1234        for (date, _id, msg) in log_lines {
1235            eprintln!("{} {}", edim(&date), msg);
1236        }
1237        eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1238    }
1239
1240    Ok(())
1241}
1242
1243pub fn print_startup_logs(daemon_id: &DaemonId, from: DateTime<Local>) -> Result<()> {
1244    let daemon_ids = vec![daemon_id.clone()];
1245    let log_files = get_log_file_infos(&daemon_ids)?;
1246
1247    let from = from
1248        .with_nanosecond(0)
1249        .expect("0 is always valid for nanoseconds");
1250
1251    let log_lines = log_files
1252        .iter()
1253        .flat_map(
1254            |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), None) {
1255                Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1256                Err(e) => {
1257                    error!("{}: {}", lf.path.display(), e);
1258                    vec![]
1259                }
1260            },
1261        )
1262        .sorted_by_cached_key(|l| l.0.to_string())
1263        .collect_vec();
1264
1265    if !log_lines.is_empty() {
1266        eprintln!("\n{} {} {}", edim("==="), edim("Startup logs"), edim("==="));
1267        for (date, _id, msg) in log_lines {
1268            eprintln!("{} {}", edim(&date), msg);
1269        }
1270        eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1271    }
1272
1273    Ok(())
1274}