Skip to main content

pitchfork_cli/cli/
logs.rs

1use crate::daemon_id::DaemonId;
2use crate::pitchfork_toml::{PitchforkToml, WatchMode};
3use crate::state_file::StateFile;
4use crate::ui::style::edim;
5use crate::watch_files::WatchFiles;
6use crate::{Result, env};
7use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
8use console;
9use itertools::Itertools;
10use miette::IntoDiagnostic;
11use notify::RecursiveMode;
12use std::cmp::{Ordering, Reverse};
13use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap};
14use std::fs::{self, File};
15use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
16use std::path::{Path, PathBuf};
17use std::process::{Child, Command, Stdio};
18use std::time::Duration;
19use xx::regex;
20
21/// Pager configuration for displaying logs
22struct PagerConfig {
23    command: String,
24    args: Vec<String>,
25}
26
27impl PagerConfig {
28    /// Select and configure the appropriate pager.
29    /// Uses $PAGER environment variable if set, otherwise defaults to less.
30    fn new(start_at_end: bool) -> Self {
31        let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
32        let args = Self::build_args(&command, start_at_end);
33        Self { command, args }
34    }
35
36    fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
37        let mut args = vec![];
38        if pager == "less" {
39            args.push("-R".to_string());
40            if start_at_end {
41                args.push("+G".to_string());
42            }
43        }
44        args
45    }
46
47    /// Spawn the pager with piped stdin
48    fn spawn_piped(&self) -> io::Result<Child> {
49        Command::new(&self.command)
50            .args(&self.args)
51            .stdin(Stdio::piped())
52            .spawn()
53    }
54}
55
56/// Format a single log line for output.
57/// When `single_daemon` is true, omits the daemon ID from the output.
58/// When `strip_ansi` is true, strips ANSI escape codes from the message.
59fn format_log_line(
60    date: &str,
61    id: &str,
62    msg: &str,
63    single_daemon: bool,
64    strip_ansi: bool,
65) -> String {
66    let msg = if strip_ansi {
67        console::strip_ansi_codes(msg).to_string()
68    } else {
69        msg.to_string()
70    };
71    if single_daemon {
72        format!("{} {}", edim(date), msg)
73    } else {
74        format!("{} {} {}", edim(date), id, msg)
75    }
76}
77
78/// A parsed log entry with timestamp, daemon name, and message
79#[derive(Debug)]
80struct LogEntry {
81    timestamp: String,
82    daemon: String,
83    message: String,
84    source_idx: usize, // Index of the source iterator
85}
86
87impl PartialEq for LogEntry {
88    fn eq(&self, other: &Self) -> bool {
89        self.timestamp == other.timestamp
90    }
91}
92
93impl Eq for LogEntry {}
94
95impl PartialOrd for LogEntry {
96    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
97        Some(self.cmp(other))
98    }
99}
100
101impl Ord for LogEntry {
102    fn cmp(&self, other: &Self) -> Ordering {
103        self.timestamp.cmp(&other.timestamp)
104    }
105}
106
107/// Streaming merger for multiple sorted log files using a min-heap.
108/// This allows merging sorted iterators without loading all data into memory.
109struct StreamingMerger<I>
110where
111    I: Iterator<Item = (String, String)>,
112{
113    sources: Vec<(String, I)>,           // (daemon_name, line_iterator)
114    heap: BinaryHeap<Reverse<LogEntry>>, // Min-heap (using Reverse for ascending order)
115}
116
117impl<I> StreamingMerger<I>
118where
119    I: Iterator<Item = (String, String)>,
120{
121    fn new() -> Self {
122        Self {
123            sources: Vec::new(),
124            heap: BinaryHeap::new(),
125        }
126    }
127
128    fn add_source(&mut self, daemon_name: String, iter: I) {
129        self.sources.push((daemon_name, iter));
130    }
131
132    fn initialize(&mut self) {
133        // Pull the first entry from each source into the heap
134        for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
135            if let Some((timestamp, message)) = iter.next() {
136                self.heap.push(Reverse(LogEntry {
137                    timestamp,
138                    daemon: daemon.clone(),
139                    message,
140                    source_idx: idx,
141                }));
142            }
143        }
144    }
145}
146
147impl<I> Iterator for StreamingMerger<I>
148where
149    I: Iterator<Item = (String, String)>,
150{
151    type Item = (String, String, String); // (timestamp, daemon, message)
152
153    fn next(&mut self) -> Option<Self::Item> {
154        // Pop the smallest entry from the heap
155        let Reverse(entry) = self.heap.pop()?;
156
157        // Pull the next entry from the same source and push to heap
158        let (daemon, iter) = &mut self.sources[entry.source_idx];
159        if let Some((timestamp, message)) = iter.next() {
160            self.heap.push(Reverse(LogEntry {
161                timestamp,
162                daemon: daemon.clone(),
163                message,
164                source_idx: entry.source_idx,
165            }));
166        }
167
168        Some((entry.timestamp, entry.daemon, entry.message))
169    }
170}
171
172/// A proper streaming log parser that handles multi-line entries
173struct StreamingLogParser {
174    reader: BufReader<File>,
175    current_entry: Option<(String, String)>,
176    finished: bool,
177}
178
179impl StreamingLogParser {
180    fn new(file: File) -> Self {
181        Self {
182            reader: BufReader::new(file),
183            current_entry: None,
184            finished: false,
185        }
186    }
187}
188
189impl Iterator for StreamingLogParser {
190    type Item = (String, String);
191
192    fn next(&mut self) -> Option<Self::Item> {
193        if self.finished {
194            return None;
195        }
196
197        let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
198
199        loop {
200            let mut line = String::new();
201            match self.reader.read_line(&mut line) {
202                Ok(0) => {
203                    // EOF - return the last entry if any
204                    self.finished = true;
205                    return self.current_entry.take();
206                }
207                Ok(_) => {
208                    // Remove trailing newline
209                    if line.ends_with('\n') {
210                        line.pop();
211                        if line.ends_with('\r') {
212                            line.pop();
213                        }
214                    }
215
216                    if let Some(caps) = re.captures(&line) {
217                        let date = match caps.get(1) {
218                            Some(d) => d.as_str().to_string(),
219                            None => continue,
220                        };
221                        let msg = match caps.get(3) {
222                            Some(m) => m.as_str().to_string(),
223                            None => continue,
224                        };
225
226                        // Return the previous entry and start a new one
227                        let prev = self.current_entry.take();
228                        self.current_entry = Some((date, msg));
229
230                        if prev.is_some() {
231                            return prev;
232                        }
233                        // First entry - continue to read more
234                    } else {
235                        // Continuation line - append to current entry
236                        if let Some((_, ref mut msg)) = self.current_entry {
237                            msg.push('\n');
238                            msg.push_str(&line);
239                        }
240                    }
241                }
242                Err(_) => {
243                    self.finished = true;
244                    return self.current_entry.take();
245                }
246            }
247        }
248    }
249}
250
251/// Displays logs for daemon(s)
252#[derive(Debug, clap::Args)]
253#[clap(
254    visible_alias = "l",
255    verbatim_doc_comment,
256    long_about = "\
257Displays logs for daemon(s)
258
259Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
260and include timestamps for filtering.
261
262Examples:
263  pitchfork logs api              Show all logs for 'api' (paged if needed)
264  pitchfork logs api worker       Show logs for multiple daemons
265  pitchfork logs                  Show logs for all daemons
266  pitchfork logs api -n 50        Show last 50 lines
267  pitchfork logs api --follow     Follow logs in real-time
268  pitchfork logs api --since '2024-01-15 10:00:00'
269                                  Show logs since a specific time (forward)
270  pitchfork logs api --since '10:30:00'
271                                  Show logs since 10:30:00 today
272  pitchfork logs api --since '10:30' --until '12:00'
273                                  Show logs since 10:30:00 until 12:00:00 today
274  pitchfork logs api --since 5min Show logs from last 5 minutes
275  pitchfork logs api --raw        Output raw log lines without formatting
276  pitchfork logs api --raw -n 100 Output last 100 raw log lines
277  pitchfork logs api --clear      Delete logs for 'api'
278  pitchfork logs --clear          Delete logs for all daemons"
279)]
280pub struct Logs {
281    /// Show only logs for the specified daemon(s)
282    id: Vec<String>,
283
284    /// Delete logs
285    #[clap(short, long)]
286    clear: bool,
287
288    /// Show last N lines of logs
289    ///
290    /// Only applies when --since/--until is not used.
291    /// Without this option, all logs are shown.
292    #[clap(short)]
293    n: Option<usize>,
294
295    /// Show logs in real-time
296    #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
297    tail: bool,
298
299    /// Show logs from this time
300    ///
301    /// Supports multiple formats:
302    /// - Full datetime: "YYYY-MM-DD HH:MM:SS" or "YYYY-MM-DD HH:MM"
303    /// - Time only: "HH:MM:SS" or "HH:MM" (uses today's date)
304    /// - Relative time: "5min", "2h", "1d" (e.g., last 5 minutes)
305    #[clap(short = 's', long)]
306    since: Option<String>,
307
308    /// Show logs until this time
309    ///
310    /// Supports multiple formats:
311    /// - Full datetime: "YYYY-MM-DD HH:MM:SS" or "YYYY-MM-DD HH:MM"
312    /// - Time only: "HH:MM:SS" or "HH:MM" (uses today's date)
313    #[clap(short = 'u', long)]
314    until: Option<String>,
315
316    /// Disable pager even in interactive terminal
317    #[clap(long)]
318    no_pager: bool,
319
320    /// Output raw log lines without color or formatting
321    #[clap(long)]
322    raw: bool,
323}
324
325impl Logs {
326    pub async fn run(&self) -> Result<()> {
327        // Migrate legacy log directories (old format: "api" → new format: "legacy--api").
328        // This is idempotent and silent so it is safe to run on every invocation.
329        migrate_legacy_log_dirs();
330
331        // Resolve user-provided IDs to qualified IDs
332        let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
333            // When no IDs provided, use all daemon IDs
334            get_all_daemon_ids()?
335        } else {
336            PitchforkToml::resolve_ids(&self.id)?
337        };
338
339        if self.clear {
340            for id in &resolved_ids {
341                let path = id.log_path();
342                if path.exists() {
343                    xx::file::create(&path)?;
344                }
345            }
346            return Ok(());
347        }
348
349        let from = if let Some(since) = self.since.as_ref() {
350            Some(parse_time_input(since, true)?)
351        } else {
352            None
353        };
354        let to = if let Some(until) = self.until.as_ref() {
355            Some(parse_time_input(until, false)?)
356        } else {
357            None
358        };
359
360        self.print_existing_logs(&resolved_ids, from, to)?;
361        if self.tail {
362            tail_logs(&resolved_ids).await?;
363        }
364
365        Ok(())
366    }
367
368    fn print_existing_logs(
369        &self,
370        resolved_ids: &[DaemonId],
371        from: Option<DateTime<Local>>,
372        to: Option<DateTime<Local>>,
373    ) -> Result<()> {
374        let log_files = get_log_file_infos(resolved_ids)?;
375        trace!("log files for: {}", log_files.keys().join(", "));
376        let single_daemon = resolved_ids.len() == 1;
377        let has_time_filter = from.is_some() || to.is_some();
378
379        if has_time_filter {
380            let mut log_lines = self.collect_log_lines_forward(&log_files, from, to)?;
381
382            if let Some(n) = self.n {
383                let len = log_lines.len();
384                if len > n {
385                    log_lines = log_lines.into_iter().skip(len - n).collect_vec();
386                }
387            }
388
389            self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
390        } else if let Some(n) = self.n {
391            let log_lines = self.collect_log_lines_reverse(&log_files, Some(n))?;
392            self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
393        } else {
394            self.stream_logs_to_pager(&log_files, single_daemon, self.raw)?;
395        }
396
397        Ok(())
398    }
399
400    fn collect_log_lines_forward(
401        &self,
402        log_files: &BTreeMap<DaemonId, LogFile>,
403        from: Option<DateTime<Local>>,
404        to: Option<DateTime<Local>>,
405    ) -> Result<Vec<(String, String, String)>> {
406        let log_lines: Vec<(String, String, String)> = log_files
407            .iter()
408            .flat_map(
409                |(name, lf)| match read_lines_in_time_range(&lf.path, from, to) {
410                    Ok(lines) => merge_log_lines(&name.qualified(), lines, false),
411                    Err(e) => {
412                        error!("{}: {}", lf.path.display(), e);
413                        vec![]
414                    }
415                },
416            )
417            .sorted_by_cached_key(|l| l.0.to_string())
418            .collect_vec();
419
420        Ok(log_lines)
421    }
422
423    fn collect_log_lines_reverse(
424        &self,
425        log_files: &BTreeMap<DaemonId, LogFile>,
426        limit: Option<usize>,
427    ) -> Result<Vec<(String, String, String)>> {
428        let log_lines: Vec<(String, String, String)> = log_files
429            .iter()
430            .flat_map(|(daemon_id, lf)| {
431                let rev = match xx::file::open(&lf.path) {
432                    Ok(f) => rev_lines::RevLines::new(f),
433                    Err(e) => {
434                        error!("{}: {}", lf.path.display(), e);
435                        return vec![];
436                    }
437                };
438                let lines = rev.into_iter().filter_map(Result::ok);
439                let lines = match limit {
440                    Some(n) => lines.take(n).collect_vec(),
441                    None => lines.collect_vec(),
442                };
443                merge_log_lines(&daemon_id.qualified(), lines, true)
444            })
445            .sorted_by_cached_key(|l| l.0.to_string())
446            .collect_vec();
447
448        let log_lines = match limit {
449            Some(n) => {
450                let len = log_lines.len();
451                if len > n {
452                    log_lines.into_iter().skip(len - n).collect_vec()
453                } else {
454                    log_lines
455                }
456            }
457            None => log_lines,
458        };
459
460        Ok(log_lines)
461    }
462
463    fn output_logs(
464        &self,
465        log_lines: Vec<(String, String, String)>,
466        single_daemon: bool,
467        has_time_filter: bool,
468        raw: bool,
469    ) -> Result<()> {
470        if log_lines.is_empty() {
471            return Ok(());
472        }
473
474        let strip_ansi = raw || !console::colors_enabled();
475
476        // Raw mode: output without formatting and without pager
477        if raw {
478            for (date, id, msg) in log_lines {
479                let msg = if strip_ansi {
480                    console::strip_ansi_codes(&msg).to_string()
481                } else {
482                    msg
483                };
484                if single_daemon {
485                    println!("{date} {msg}");
486                } else {
487                    println!("{date} {id} {msg}");
488                }
489            }
490            return Ok(());
491        }
492
493        let use_pager = !self.no_pager && should_use_pager(log_lines.len());
494
495        if use_pager {
496            self.output_with_pager(log_lines, single_daemon, has_time_filter, strip_ansi)?;
497        } else {
498            for (date, id, msg) in log_lines {
499                println!(
500                    "{}",
501                    format_log_line(&date, &id, &msg, single_daemon, strip_ansi)
502                );
503            }
504        }
505
506        Ok(())
507    }
508
509    fn output_with_pager(
510        &self,
511        log_lines: Vec<(String, String, String)>,
512        single_daemon: bool,
513        has_time_filter: bool,
514        strip_ansi: bool,
515    ) -> Result<()> {
516        // When time filter is used, start at top; otherwise start at end
517        let pager_config = PagerConfig::new(!has_time_filter);
518
519        match pager_config.spawn_piped() {
520            Ok(mut child) => {
521                if let Some(stdin) = child.stdin.as_mut() {
522                    for (date, id, msg) in log_lines {
523                        let line = format!(
524                            "{}\n",
525                            format_log_line(&date, &id, &msg, single_daemon, strip_ansi)
526                        );
527                        if stdin.write_all(line.as_bytes()).is_err() {
528                            break;
529                        }
530                    }
531                    let _ = child.wait();
532                } else {
533                    debug!("Failed to get pager stdin, falling back to direct output");
534                    for (date, id, msg) in log_lines {
535                        println!(
536                            "{}",
537                            format_log_line(&date, &id, &msg, single_daemon, strip_ansi)
538                        );
539                    }
540                }
541            }
542            Err(e) => {
543                debug!("Failed to spawn pager: {e}, falling back to direct output");
544                for (date, id, msg) in log_lines {
545                    println!(
546                        "{}",
547                        format_log_line(&date, &id, &msg, single_daemon, strip_ansi)
548                    );
549                }
550            }
551        }
552
553        Ok(())
554    }
555
556    fn stream_logs_to_pager(
557        &self,
558        log_files: &BTreeMap<DaemonId, LogFile>,
559        single_daemon: bool,
560        raw: bool,
561    ) -> Result<()> {
562        let strip_ansi = raw || !console::colors_enabled();
563
564        if !io::stdout().is_terminal() || self.no_pager || self.tail || raw {
565            return self.stream_logs_direct(log_files, single_daemon, raw, strip_ansi);
566        }
567
568        let pager_config = PagerConfig::new(true); // start_at_end = true
569
570        match pager_config.spawn_piped() {
571            Ok(mut child) => {
572                if let Some(stdin) = child.stdin.take() {
573                    // Collect file info for the streaming thread
574                    let log_files_clone: Vec<_> = log_files
575                        .iter()
576                        .map(|(daemon_id, lf)| (daemon_id.qualified(), lf.path.clone()))
577                        .collect();
578                    let single_daemon_clone = single_daemon;
579                    let strip_ansi_clone = strip_ansi;
580
581                    // Stream logs using a background thread to avoid blocking
582                    std::thread::spawn(move || {
583                        let mut writer = BufWriter::new(stdin);
584
585                        // Single file: stream directly without merge overhead
586                        if log_files_clone.len() == 1 {
587                            let (name, path) = &log_files_clone[0];
588                            let file = match File::open(path) {
589                                Ok(f) => f,
590                                Err(_) => return,
591                            };
592                            let parser = StreamingLogParser::new(file);
593                            for (timestamp, message) in parser {
594                                let output = format!(
595                                    "{}\n",
596                                    format_log_line(
597                                        &timestamp,
598                                        name,
599                                        &message,
600                                        single_daemon_clone,
601                                        strip_ansi_clone
602                                    )
603                                );
604                                if writer.write_all(output.as_bytes()).is_err() {
605                                    return;
606                                }
607                            }
608                            let _ = writer.flush();
609                            return;
610                        }
611
612                        // Multiple files: use streaming merger for sorted/interleaved output
613                        let mut merger: StreamingMerger<StreamingLogParser> =
614                            StreamingMerger::new();
615
616                        for (name, path) in log_files_clone {
617                            let file = match File::open(&path) {
618                                Ok(f) => f,
619                                Err(_) => continue,
620                            };
621                            let parser = StreamingLogParser::new(file);
622                            merger.add_source(name, parser);
623                        }
624
625                        // Initialize the heap with first entry from each source
626                        merger.initialize();
627
628                        // Stream merged entries to pager
629                        for (timestamp, daemon, message) in merger {
630                            let output = format!(
631                                "{}\n",
632                                format_log_line(
633                                    &timestamp,
634                                    &daemon,
635                                    &message,
636                                    single_daemon_clone,
637                                    strip_ansi_clone
638                                )
639                            );
640                            if writer.write_all(output.as_bytes()).is_err() {
641                                return;
642                            }
643                        }
644
645                        let _ = writer.flush();
646                    });
647
648                    let _ = child.wait();
649                } else {
650                    debug!("Failed to get pager stdin, falling back to direct output");
651                    return self.stream_logs_direct(log_files, single_daemon, raw, strip_ansi);
652                }
653            }
654            Err(e) => {
655                debug!("Failed to spawn pager: {e}, falling back to direct output");
656                return self.stream_logs_direct(log_files, single_daemon, raw, strip_ansi);
657            }
658        }
659
660        Ok(())
661    }
662
663    fn stream_logs_direct(
664        &self,
665        log_files: &BTreeMap<DaemonId, LogFile>,
666        single_daemon: bool,
667        raw: bool,
668        strip_ansi: bool,
669    ) -> Result<()> {
670        // Fast path for single daemon: directly output file content without parsing
671        // This avoids expensive regex parsing for each line in large log files
672        if log_files.len() == 1 {
673            let (daemon_id, lf) = log_files.iter().next().unwrap();
674            let file = match File::open(&lf.path) {
675                Ok(f) => f,
676                Err(e) => {
677                    error!("{}: {}", lf.path.display(), e);
678                    return Ok(());
679                }
680            };
681            let reader = BufReader::new(file);
682            if raw {
683                // Raw mode: output lines as-is (but strip ansi if colors disabled)
684                for line in reader.lines() {
685                    match line {
686                        Ok(l) => {
687                            let l = if strip_ansi {
688                                console::strip_ansi_codes(&l).to_string()
689                            } else {
690                                l
691                            };
692                            if io::stdout().write_all(l.as_bytes()).is_err()
693                                || io::stdout().write_all(b"\n").is_err()
694                            {
695                                return Ok(());
696                            }
697                        }
698                        Err(_) => continue,
699                    }
700                }
701            } else {
702                // Formatted mode: parse and format each line
703                let parser = StreamingLogParser::new(File::open(&lf.path).into_diagnostic()?);
704                for (timestamp, message) in parser {
705                    let output = format!(
706                        "{}\n",
707                        format_log_line(
708                            &timestamp,
709                            &daemon_id.qualified(),
710                            &message,
711                            single_daemon,
712                            strip_ansi
713                        )
714                    );
715                    if io::stdout().write_all(output.as_bytes()).is_err() {
716                        return Ok(());
717                    }
718                }
719            }
720            return Ok(());
721        }
722
723        // Multiple daemons: use streaming merger for sorted output
724        let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
725
726        for (daemon_id, lf) in log_files {
727            let file = match File::open(&lf.path) {
728                Ok(f) => f,
729                Err(e) => {
730                    error!("{}: {}", lf.path.display(), e);
731                    continue;
732                }
733            };
734            let parser = StreamingLogParser::new(file);
735            merger.add_source(daemon_id.qualified(), parser);
736        }
737
738        // Initialize the heap with first entry from each source
739        merger.initialize();
740
741        // Stream merged entries to stdout
742        for (timestamp, daemon, message) in merger {
743            let output = if raw {
744                let message = if strip_ansi {
745                    console::strip_ansi_codes(&message).to_string()
746                } else {
747                    message
748                };
749                if single_daemon {
750                    format!("{timestamp} {message}\n")
751                } else {
752                    format!("{timestamp} {daemon} {message}\n")
753                }
754            } else {
755                format!(
756                    "{}\n",
757                    format_log_line(&timestamp, &daemon, &message, single_daemon, strip_ansi)
758                )
759            };
760            if io::stdout().write_all(output.as_bytes()).is_err() {
761                return Ok(());
762            }
763        }
764
765        Ok(())
766    }
767}
768
769fn should_use_pager(line_count: usize) -> bool {
770    if !io::stdout().is_terminal() {
771        return false;
772    }
773
774    let terminal_height = get_terminal_height().unwrap_or(24);
775    line_count > terminal_height
776}
777
778fn get_terminal_height() -> Option<usize> {
779    if let Ok(rows) = std::env::var("LINES")
780        && let Ok(h) = rows.parse::<usize>()
781    {
782        return Some(h);
783    }
784
785    crossterm::terminal::size().ok().map(|(_, h)| h as usize)
786}
787
788fn read_lines_in_time_range(
789    path: &Path,
790    from: Option<DateTime<Local>>,
791    to: Option<DateTime<Local>>,
792) -> Result<Vec<String>> {
793    let mut file = File::open(path).into_diagnostic()?;
794    let file_size = file.metadata().into_diagnostic()?.len();
795
796    if file_size == 0 {
797        return Ok(vec![]);
798    }
799
800    let start_pos = if let Some(from_time) = from {
801        binary_search_log_position(&mut file, file_size, from_time, true)?
802    } else {
803        0
804    };
805
806    let end_pos = if let Some(to_time) = to {
807        binary_search_log_position(&mut file, file_size, to_time, false)?
808    } else {
809        file_size
810    };
811
812    if start_pos >= end_pos {
813        return Ok(vec![]);
814    }
815
816    file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
817    let mut reader = BufReader::new(&file);
818    let mut lines = Vec::new();
819    let mut current_pos = start_pos;
820
821    loop {
822        if current_pos >= end_pos {
823            break;
824        }
825
826        let mut line = String::new();
827        match reader.read_line(&mut line) {
828            Ok(0) => break,
829            Ok(bytes_read) => {
830                current_pos += bytes_read as u64;
831                if line.ends_with('\n') {
832                    line.pop();
833                    if line.ends_with('\r') {
834                        line.pop();
835                    }
836                }
837                lines.push(line);
838            }
839            Err(_) => break,
840        }
841    }
842
843    Ok(lines)
844}
845
846fn binary_search_log_position(
847    file: &mut File,
848    file_size: u64,
849    target_time: DateTime<Local>,
850    find_start: bool,
851) -> Result<u64> {
852    let mut low: u64 = 0;
853    let mut high: u64 = file_size;
854
855    while low < high {
856        let mid = low + (high - low) / 2;
857
858        let line_start = find_line_start(file, mid)?;
859
860        file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
861        let mut reader = BufReader::new(&*file);
862        let mut line = String::new();
863        let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
864        if bytes_read == 0 {
865            high = mid;
866            continue;
867        }
868
869        let line_time = extract_timestamp(&line);
870
871        match line_time {
872            Some(lt) => {
873                if find_start {
874                    if lt < target_time {
875                        low = line_start + bytes_read as u64;
876                    } else {
877                        high = line_start;
878                    }
879                } else if lt <= target_time {
880                    low = line_start + bytes_read as u64;
881                } else {
882                    high = line_start;
883                }
884            }
885            None => {
886                low = line_start + bytes_read as u64;
887            }
888        }
889    }
890
891    find_line_start(file, low)
892}
893
894fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
895    if pos == 0 {
896        return Ok(0);
897    }
898
899    // Start searching from the byte just before `pos`.
900    let mut search_pos = pos.saturating_sub(1);
901    const CHUNK_SIZE: usize = 8192;
902
903    loop {
904        // Determine the start of the chunk we want to read.
905        let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
906        let len_u64 = search_pos - chunk_start + 1;
907        let len = len_u64 as usize;
908
909        // Seek once to the beginning of this chunk.
910        file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
911        let mut buf = vec![0u8; len];
912        if file.read_exact(&mut buf).is_err() {
913            // Match the original behavior: on read error, fall back to start of file.
914            return Ok(0);
915        }
916
917        // Scan this chunk backwards for a newline.
918        for (i, &b) in buf.iter().enumerate().rev() {
919            if b == b'\n' {
920                return Ok(chunk_start + i as u64 + 1);
921            }
922        }
923
924        // No newline in this chunk; if we've reached the start of the file,
925        // there is no earlier newline.
926        if chunk_start == 0 {
927            return Ok(0);
928        }
929
930        // Move to the previous chunk (just before this one).
931        search_pos = chunk_start - 1;
932    }
933}
934
935fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
936    let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
937    re.captures(line)
938        .and_then(|caps| caps.get(1))
939        .and_then(|m| parse_datetime(m.as_str()).ok())
940}
941
942fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
943    let lines = if reverse {
944        lines.into_iter().rev().collect()
945    } else {
946        lines
947    };
948
949    let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
950    lines
951        .into_iter()
952        .fold(vec![], |mut acc, line| match re.captures(&line) {
953            Some(caps) => {
954                let (date, msg) = match (caps.get(1), caps.get(3)) {
955                    (Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
956                    _ => return acc,
957                };
958                acc.push((date, id.to_string(), msg));
959                acc
960            }
961            None => {
962                if let Some(l) = acc.last_mut() {
963                    l.2.push('\n');
964                    l.2.push_str(&line);
965                }
966                acc
967            }
968        })
969}
970
971/// Rename legacy log directories that predate namespace-qualified daemon IDs.
972///
973/// Old layout: `PITCHFORK_LOGS_DIR/<name>/<name>.log`
974/// New layout: `PITCHFORK_LOGS_DIR/legacy--<name>/legacy--<name>.log`
975///
976/// Only directories that clearly match the old layout are migrated:
977/// - directory name does not contain `"--"`
978/// - directory contains `<name>.log`
979/// - `<name>` is a valid daemon short name under current DaemonId rules
980fn migrate_legacy_log_dirs() {
981    let known_safe_paths = known_daemon_safe_paths();
982    let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
983        Ok(d) => d,
984        Err(_) => return,
985    };
986    for dir in dirs {
987        if dir.starts_with(".") || !dir.is_dir() {
988            continue;
989        }
990        let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
991            Some(n) => n,
992            None => continue,
993        };
994        // New-format directories usually contain "--". For safety, only treat
995        // them as new-format if they match a known daemon ID safe-path.
996        if name.contains("--") {
997            // If it parses as a valid safe-path, treat it as already migrated
998            // and keep idempotent behavior silent.
999            if DaemonId::from_safe_path(&name).is_ok() {
1000                continue;
1001            }
1002            // Keep noisy warnings only for invalid/ambiguous names that cannot
1003            // be interpreted as new-format IDs.
1004            if known_safe_paths.contains(&name) {
1005                continue;
1006            }
1007            warn!(
1008                "Skipping invalid legacy log directory '{name}': contains '--' but is not a valid daemon safe-path"
1009            );
1010            continue;
1011        }
1012
1013        // Migrate only explicit old-layout directories to avoid renaming
1014        // unrelated folders under logs/.
1015        let old_log = dir.join(format!("{name}.log"));
1016        if !old_log.exists() {
1017            continue;
1018        }
1019        if DaemonId::try_new("legacy", &name).is_err() {
1020            warn!("Skipping invalid legacy log directory '{name}': not a valid daemon ID");
1021            continue;
1022        }
1023
1024        let new_name = format!("legacy--{name}");
1025        let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
1026        // Skip if a target directory already exists to avoid clobbering data.
1027        if new_dir.exists() {
1028            continue;
1029        }
1030        if std::fs::rename(&dir, &new_dir).is_err() {
1031            continue;
1032        }
1033        // Also rename the log file inside the directory.
1034        let old_log = new_dir.join(format!("{name}.log"));
1035        let new_log = new_dir.join(format!("{new_name}.log"));
1036        if old_log.exists() {
1037            let _ = std::fs::rename(&old_log, &new_log);
1038        }
1039        debug!("Migrated legacy log dir '{name}' → '{new_name}'");
1040    }
1041}
1042
1043fn known_daemon_safe_paths() -> BTreeSet<String> {
1044    let mut out = BTreeSet::new();
1045
1046    match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1047        Ok(state) => {
1048            for id in state.daemons.keys() {
1049                out.insert(id.safe_path());
1050            }
1051        }
1052        Err(e) => {
1053            warn!("Failed to read state while checking known daemon IDs: {e}");
1054        }
1055    }
1056
1057    match PitchforkToml::all_merged() {
1058        Ok(config) => {
1059            for id in config.daemons.keys() {
1060                out.insert(id.safe_path());
1061            }
1062        }
1063        Err(e) => {
1064            warn!("Failed to read config while checking known daemon IDs: {e}");
1065        }
1066    }
1067
1068    out
1069}
1070
1071fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
1072    let mut ids = BTreeSet::new();
1073
1074    match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1075        Ok(state) => ids.extend(state.daemons.keys().cloned()),
1076        Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
1077    }
1078
1079    match PitchforkToml::all_merged() {
1080        Ok(config) => ids.extend(config.daemons.keys().cloned()),
1081        Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
1082    }
1083
1084    Ok(ids
1085        .into_iter()
1086        .filter(|id| id.log_path().exists())
1087        .collect())
1088}
1089
1090fn get_log_file_infos(names: &[DaemonId]) -> Result<BTreeMap<DaemonId, LogFile>> {
1091    let mut out = BTreeMap::new();
1092
1093    for daemon_id in names {
1094        let path = daemon_id.log_path();
1095
1096        // Directory may exist before the first log line is written.
1097        if !path.exists() {
1098            continue;
1099        }
1100
1101        let mut file = xx::file::open(&path)?;
1102        // Seek to end and get position atomically to avoid race condition
1103        // where content is written between metadata check and file open
1104        file.seek(SeekFrom::End(0)).into_diagnostic()?;
1105        let cur = file.stream_position().into_diagnostic()?;
1106
1107        out.insert(
1108            daemon_id.clone(),
1109            LogFile {
1110                _name: daemon_id.clone(),
1111                file,
1112                cur,
1113                path,
1114            },
1115        );
1116    }
1117
1118    Ok(out)
1119}
1120
1121pub async fn tail_logs(names: &[DaemonId]) -> Result<()> {
1122    let mut log_files = get_log_file_infos(names)?;
1123    let mut wf = WatchFiles::new(
1124        Duration::from_millis(10),
1125        WatchMode::Native,
1126        Duration::from_millis(500),
1127    )?;
1128
1129    for lf in log_files.values() {
1130        wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
1131    }
1132
1133    let files_to_name: HashMap<PathBuf, DaemonId> = log_files
1134        .iter()
1135        .map(|(n, f)| (f.path.clone(), n.clone()))
1136        .collect();
1137
1138    while let Some(paths) = wf.rx.recv().await {
1139        let mut out = vec![];
1140        for path in paths {
1141            let Some(name) = files_to_name.get(&path) else {
1142                warn!("Unknown log file changed: {}", path.display());
1143                continue;
1144            };
1145            let Some(info) = log_files.get_mut(name) else {
1146                warn!("No log info for: {name}");
1147                continue;
1148            };
1149            info.file
1150                .seek(SeekFrom::Start(info.cur))
1151                .into_diagnostic()?;
1152            let reader = BufReader::new(&info.file);
1153            let lines = reader.lines().map_while(Result::ok).collect_vec();
1154            info.cur = info.file.stream_position().into_diagnostic()?;
1155            out.extend(merge_log_lines(&name.qualified(), lines, false));
1156        }
1157        let out = out
1158            .into_iter()
1159            .sorted_by_cached_key(|l| l.0.to_string())
1160            .collect_vec();
1161        for (date, name, msg) in out {
1162            println!("{} {} {}", edim(&date), name, msg);
1163        }
1164    }
1165    Ok(())
1166}
1167
1168struct LogFile {
1169    _name: DaemonId,
1170    path: PathBuf,
1171    file: fs::File,
1172    cur: u64,
1173}
1174
1175fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
1176    let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
1177    Local
1178        .from_local_datetime(&naive_dt)
1179        .single()
1180        .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
1181}
1182
1183/// Parse time input string into DateTime.
1184///
1185/// `is_since` indicates whether this is for --since (true) or --until (false).
1186/// The "yesterday fallback" only applies to --since: if the time is in the future,
1187/// assume the user meant yesterday. For --until, future times are kept as-is.
1188fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
1189    let s = s.trim();
1190
1191    // Try full datetime first (YYYY-MM-DD HH:MM:SS)
1192    if let Ok(dt) = parse_datetime(s) {
1193        return Ok(dt);
1194    }
1195
1196    // Try datetime without seconds (YYYY-MM-DD HH:MM)
1197    if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
1198        return Local
1199            .from_local_datetime(&naive_dt)
1200            .single()
1201            .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
1202    }
1203
1204    // Try time-only format (HH:MM:SS or HH:MM)
1205    // Note: This branch won't be reached for inputs like "10:30" that could match
1206    // parse_datetime, because parse_datetime expects a full date prefix and will fail.
1207    if let Ok(time) = parse_time_only(s) {
1208        let now = Local::now();
1209        let today = now.date_naive();
1210        let mut naive_dt = NaiveDateTime::new(today, time);
1211        let mut dt = Local
1212            .from_local_datetime(&naive_dt)
1213            .single()
1214            .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1215
1216        // If the interpreted time for today is in the future, assume the user meant yesterday
1217        // BUT only for --since. For --until, a future time today is valid.
1218        if is_since
1219            && dt > now
1220            && let Some(yesterday) = today.pred_opt()
1221        {
1222            naive_dt = NaiveDateTime::new(yesterday, time);
1223            dt = Local
1224                .from_local_datetime(&naive_dt)
1225                .single()
1226                .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1227        }
1228        return Ok(dt);
1229    }
1230
1231    if let Ok(duration) = humantime::parse_duration(s) {
1232        let now = Local::now();
1233        let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
1234        return Ok(target);
1235    }
1236
1237    Err(miette::miette!(
1238        "Invalid time format: '{}'. Expected formats:\n\
1239         - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
1240         - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
1241         - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
1242        s
1243    ))
1244}
1245
1246fn parse_time_only(s: &str) -> Result<NaiveTime> {
1247    if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
1248        return Ok(time);
1249    }
1250
1251    if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
1252        return Ok(time);
1253    }
1254
1255    Err(miette::miette!("Invalid time format: '{}'", s))
1256}
1257
1258pub fn print_logs_for_time_range(
1259    daemon_id: &DaemonId,
1260    from: DateTime<Local>,
1261    to: Option<DateTime<Local>>,
1262) -> Result<()> {
1263    let daemon_ids = vec![daemon_id.clone()];
1264    let log_files = get_log_file_infos(&daemon_ids)?;
1265
1266    let from = from
1267        .with_nanosecond(0)
1268        .expect("0 is always valid for nanoseconds");
1269    let to = to.map(|t| {
1270        t.with_nanosecond(0)
1271            .expect("0 is always valid for nanoseconds")
1272    });
1273
1274    let log_lines = log_files
1275        .iter()
1276        .flat_map(
1277            |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), to) {
1278                Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1279                Err(e) => {
1280                    error!("{}: {}", lf.path.display(), e);
1281                    vec![]
1282                }
1283            },
1284        )
1285        .sorted_by_cached_key(|l| l.0.to_string())
1286        .collect_vec();
1287
1288    if log_lines.is_empty() {
1289        eprintln!("No logs found for daemon '{daemon_id}' in the specified time range");
1290    } else {
1291        eprintln!("\n{} {} {}", edim("==="), edim("Error logs"), edim("==="));
1292        for (date, _id, msg) in log_lines {
1293            eprintln!("{} {}", edim(&date), msg);
1294        }
1295        eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1296    }
1297
1298    Ok(())
1299}
1300
1301pub fn print_startup_logs(daemon_id: &DaemonId, from: DateTime<Local>) -> Result<()> {
1302    let daemon_ids = vec![daemon_id.clone()];
1303    let log_files = get_log_file_infos(&daemon_ids)?;
1304
1305    let from = from
1306        .with_nanosecond(0)
1307        .expect("0 is always valid for nanoseconds");
1308
1309    let log_lines = log_files
1310        .iter()
1311        .flat_map(
1312            |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), None) {
1313                Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1314                Err(e) => {
1315                    error!("{}: {}", lf.path.display(), e);
1316                    vec![]
1317                }
1318            },
1319        )
1320        .sorted_by_cached_key(|l| l.0.to_string())
1321        .collect_vec();
1322
1323    if !log_lines.is_empty() {
1324        eprintln!("\n{} {} {}", edim("==="), edim("Startup logs"), edim("==="));
1325        for (date, _id, msg) in log_lines {
1326            eprintln!("{} {}", edim(&date), msg);
1327        }
1328        eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1329    }
1330
1331    Ok(())
1332}