Skip to main content

pitchfork_cli/cli/
logs.rs

1use crate::daemon_id::DaemonId;
2use crate::pitchfork_toml::PitchforkToml;
3use crate::state_file::StateFile;
4use crate::ui::style::edim;
5use crate::watch_files::WatchFiles;
6use crate::{Result, env};
7use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
8use itertools::Itertools;
9use miette::IntoDiagnostic;
10use notify::RecursiveMode;
11use std::cmp::{Ordering, Reverse};
12use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap};
13use std::fs::{self, File};
14use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
15use std::path::{Path, PathBuf};
16use std::process::{Child, Command, Stdio};
17use std::time::Duration;
18use xx::regex;
19
20/// Pager configuration for displaying logs
21struct PagerConfig {
22    command: String,
23    args: Vec<String>,
24}
25
26impl PagerConfig {
27    /// Select and configure the appropriate pager.
28    /// Uses $PAGER environment variable if set, otherwise defaults to less.
29    fn new(start_at_end: bool) -> Self {
30        let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
31        let args = Self::build_args(&command, start_at_end);
32        Self { command, args }
33    }
34
35    fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
36        let mut args = vec![];
37        if pager == "less" {
38            args.push("-R".to_string());
39            if start_at_end {
40                args.push("+G".to_string());
41            }
42        }
43        args
44    }
45
46    /// Spawn the pager with piped stdin
47    fn spawn_piped(&self) -> io::Result<Child> {
48        Command::new(&self.command)
49            .args(&self.args)
50            .stdin(Stdio::piped())
51            .spawn()
52    }
53}
54
55/// Format a single log line for output.
56/// When `single_daemon` is true, omits the daemon ID from the output.
57fn format_log_line(date: &str, id: &str, msg: &str, single_daemon: bool) -> String {
58    if single_daemon {
59        format!("{} {}", edim(date), msg)
60    } else {
61        format!("{} {} {}", edim(date), id, msg)
62    }
63}
64
65/// A parsed log entry with timestamp, daemon name, and message
66#[derive(Debug)]
67struct LogEntry {
68    timestamp: String,
69    daemon: String,
70    message: String,
71    source_idx: usize, // Index of the source iterator
72}
73
74impl PartialEq for LogEntry {
75    fn eq(&self, other: &Self) -> bool {
76        self.timestamp == other.timestamp
77    }
78}
79
80impl Eq for LogEntry {}
81
82impl PartialOrd for LogEntry {
83    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
84        Some(self.cmp(other))
85    }
86}
87
88impl Ord for LogEntry {
89    fn cmp(&self, other: &Self) -> Ordering {
90        self.timestamp.cmp(&other.timestamp)
91    }
92}
93
94/// Streaming merger for multiple sorted log files using a min-heap.
95/// This allows merging sorted iterators without loading all data into memory.
96struct StreamingMerger<I>
97where
98    I: Iterator<Item = (String, String)>,
99{
100    sources: Vec<(String, I)>,           // (daemon_name, line_iterator)
101    heap: BinaryHeap<Reverse<LogEntry>>, // Min-heap (using Reverse for ascending order)
102}
103
104impl<I> StreamingMerger<I>
105where
106    I: Iterator<Item = (String, String)>,
107{
108    fn new() -> Self {
109        Self {
110            sources: Vec::new(),
111            heap: BinaryHeap::new(),
112        }
113    }
114
115    fn add_source(&mut self, daemon_name: String, iter: I) {
116        self.sources.push((daemon_name, iter));
117    }
118
119    fn initialize(&mut self) {
120        // Pull the first entry from each source into the heap
121        for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
122            if let Some((timestamp, message)) = iter.next() {
123                self.heap.push(Reverse(LogEntry {
124                    timestamp,
125                    daemon: daemon.clone(),
126                    message,
127                    source_idx: idx,
128                }));
129            }
130        }
131    }
132}
133
134impl<I> Iterator for StreamingMerger<I>
135where
136    I: Iterator<Item = (String, String)>,
137{
138    type Item = (String, String, String); // (timestamp, daemon, message)
139
140    fn next(&mut self) -> Option<Self::Item> {
141        // Pop the smallest entry from the heap
142        let Reverse(entry) = self.heap.pop()?;
143
144        // Pull the next entry from the same source and push to heap
145        let (daemon, iter) = &mut self.sources[entry.source_idx];
146        if let Some((timestamp, message)) = iter.next() {
147            self.heap.push(Reverse(LogEntry {
148                timestamp,
149                daemon: daemon.clone(),
150                message,
151                source_idx: entry.source_idx,
152            }));
153        }
154
155        Some((entry.timestamp, entry.daemon, entry.message))
156    }
157}
158
159/// A proper streaming log parser that handles multi-line entries
160struct StreamingLogParser {
161    reader: BufReader<File>,
162    current_entry: Option<(String, String)>,
163    finished: bool,
164}
165
166impl StreamingLogParser {
167    fn new(file: File) -> Self {
168        Self {
169            reader: BufReader::new(file),
170            current_entry: None,
171            finished: false,
172        }
173    }
174}
175
176impl Iterator for StreamingLogParser {
177    type Item = (String, String);
178
179    fn next(&mut self) -> Option<Self::Item> {
180        if self.finished {
181            return None;
182        }
183
184        let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
185
186        loop {
187            let mut line = String::new();
188            match self.reader.read_line(&mut line) {
189                Ok(0) => {
190                    // EOF - return the last entry if any
191                    self.finished = true;
192                    return self.current_entry.take();
193                }
194                Ok(_) => {
195                    // Remove trailing newline
196                    if line.ends_with('\n') {
197                        line.pop();
198                        if line.ends_with('\r') {
199                            line.pop();
200                        }
201                    }
202
203                    if let Some(caps) = re.captures(&line) {
204                        let date = match caps.get(1) {
205                            Some(d) => d.as_str().to_string(),
206                            None => continue,
207                        };
208                        let msg = match caps.get(3) {
209                            Some(m) => m.as_str().to_string(),
210                            None => continue,
211                        };
212
213                        // Return the previous entry and start a new one
214                        let prev = self.current_entry.take();
215                        self.current_entry = Some((date, msg));
216
217                        if prev.is_some() {
218                            return prev;
219                        }
220                        // First entry - continue to read more
221                    } else {
222                        // Continuation line - append to current entry
223                        if let Some((_, ref mut msg)) = self.current_entry {
224                            msg.push('\n');
225                            msg.push_str(&line);
226                        }
227                    }
228                }
229                Err(_) => {
230                    self.finished = true;
231                    return self.current_entry.take();
232                }
233            }
234        }
235    }
236}
237
238/// Displays logs for daemon(s)
239#[derive(Debug, clap::Args)]
240#[clap(
241    visible_alias = "l",
242    verbatim_doc_comment,
243    long_about = "\
244Displays logs for daemon(s)
245
246Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
247and include timestamps for filtering.
248
249Examples:
250  pitchfork logs api              Show all logs for 'api' (paged if needed)
251  pitchfork logs api worker       Show logs for multiple daemons
252  pitchfork logs                  Show logs for all daemons
253  pitchfork logs api -n 50        Show last 50 lines
254  pitchfork logs api --follow     Follow logs in real-time
255  pitchfork logs api --since '2024-01-15 10:00:00'
256                                  Show logs since a specific time (forward)
257  pitchfork logs api --since '10:30:00'
258                                  Show logs since 10:30:00 today
259  pitchfork logs api --since '10:30' --until '12:00'
260                                  Show logs since 10:30:00 until 12:00:00 today
261  pitchfork logs api --since 5min Show logs from last 5 minutes
262  pitchfork logs api --raw        Output raw log lines without formatting
263  pitchfork logs api --raw -n 100 Output last 100 raw log lines
264  pitchfork logs api --clear      Delete logs for 'api'
265  pitchfork logs --clear          Delete logs for all daemons"
266)]
267pub struct Logs {
268    /// Show only logs for the specified daemon(s)
269    id: Vec<String>,
270
271    /// Delete logs
272    #[clap(short, long)]
273    clear: bool,
274
275    /// Show last N lines of logs
276    ///
277    /// Only applies when --since/--until is not used.
278    /// Without this option, all logs are shown.
279    #[clap(short)]
280    n: Option<usize>,
281
282    /// Show logs in real-time
283    #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
284    tail: bool,
285
286    /// Show logs from this time
287    ///
288    /// Supports multiple formats:
289    /// - Full datetime: "YYYY-MM-DD HH:MM:SS" or "YYYY-MM-DD HH:MM"
290    /// - Time only: "HH:MM:SS" or "HH:MM" (uses today's date)
291    /// - Relative time: "5min", "2h", "1d" (e.g., last 5 minutes)
292    #[clap(short = 's', long)]
293    since: Option<String>,
294
295    /// Show logs until this time
296    ///
297    /// Supports multiple formats:
298    /// - Full datetime: "YYYY-MM-DD HH:MM:SS" or "YYYY-MM-DD HH:MM"
299    /// - Time only: "HH:MM:SS" or "HH:MM" (uses today's date)
300    #[clap(short = 'u', long)]
301    until: Option<String>,
302
303    /// Disable pager even in interactive terminal
304    #[clap(long)]
305    no_pager: bool,
306
307    /// Output raw log lines without color or formatting
308    #[clap(long)]
309    raw: bool,
310}
311
312impl Logs {
313    pub async fn run(&self) -> Result<()> {
314        // Migrate legacy log directories (old format: "api" → new format: "legacy--api").
315        // This is idempotent and silent so it is safe to run on every invocation.
316        migrate_legacy_log_dirs();
317
318        // Resolve user-provided IDs to qualified IDs
319        let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
320            // When no IDs provided, use all daemon IDs
321            get_all_daemon_ids()?
322        } else {
323            PitchforkToml::resolve_ids(&self.id)?
324        };
325
326        if self.clear {
327            for id in &resolved_ids {
328                let path = id.log_path();
329                if path.exists() {
330                    xx::file::create(&path)?;
331                }
332            }
333            return Ok(());
334        }
335
336        let from = if let Some(since) = self.since.as_ref() {
337            Some(parse_time_input(since, true)?)
338        } else {
339            None
340        };
341        let to = if let Some(until) = self.until.as_ref() {
342            Some(parse_time_input(until, false)?)
343        } else {
344            None
345        };
346
347        self.print_existing_logs(&resolved_ids, from, to)?;
348        if self.tail {
349            tail_logs(&resolved_ids).await?;
350        }
351
352        Ok(())
353    }
354
355    fn print_existing_logs(
356        &self,
357        resolved_ids: &[DaemonId],
358        from: Option<DateTime<Local>>,
359        to: Option<DateTime<Local>>,
360    ) -> Result<()> {
361        let log_files = get_log_file_infos(resolved_ids)?;
362        trace!("log files for: {}", log_files.keys().join(", "));
363        let single_daemon = resolved_ids.len() == 1;
364        let has_time_filter = from.is_some() || to.is_some();
365
366        if has_time_filter {
367            let mut log_lines = self.collect_log_lines_forward(&log_files, from, to)?;
368
369            if let Some(n) = self.n {
370                let len = log_lines.len();
371                if len > n {
372                    log_lines = log_lines.into_iter().skip(len - n).collect_vec();
373                }
374            }
375
376            self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
377        } else if let Some(n) = self.n {
378            let log_lines = self.collect_log_lines_reverse(&log_files, Some(n))?;
379            self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
380        } else {
381            self.stream_logs_to_pager(&log_files, single_daemon, self.raw)?;
382        }
383
384        Ok(())
385    }
386
387    fn collect_log_lines_forward(
388        &self,
389        log_files: &BTreeMap<DaemonId, LogFile>,
390        from: Option<DateTime<Local>>,
391        to: Option<DateTime<Local>>,
392    ) -> Result<Vec<(String, String, String)>> {
393        let log_lines: Vec<(String, String, String)> = log_files
394            .iter()
395            .flat_map(
396                |(name, lf)| match read_lines_in_time_range(&lf.path, from, to) {
397                    Ok(lines) => merge_log_lines(&name.qualified(), lines, false),
398                    Err(e) => {
399                        error!("{}: {}", lf.path.display(), e);
400                        vec![]
401                    }
402                },
403            )
404            .sorted_by_cached_key(|l| l.0.to_string())
405            .collect_vec();
406
407        Ok(log_lines)
408    }
409
410    fn collect_log_lines_reverse(
411        &self,
412        log_files: &BTreeMap<DaemonId, LogFile>,
413        limit: Option<usize>,
414    ) -> Result<Vec<(String, String, String)>> {
415        let log_lines: Vec<(String, String, String)> = log_files
416            .iter()
417            .flat_map(|(daemon_id, lf)| {
418                let rev = match xx::file::open(&lf.path) {
419                    Ok(f) => rev_lines::RevLines::new(f),
420                    Err(e) => {
421                        error!("{}: {}", lf.path.display(), e);
422                        return vec![];
423                    }
424                };
425                let lines = rev.into_iter().filter_map(Result::ok);
426                let lines = match limit {
427                    Some(n) => lines.take(n).collect_vec(),
428                    None => lines.collect_vec(),
429                };
430                merge_log_lines(&daemon_id.qualified(), lines, true)
431            })
432            .sorted_by_cached_key(|l| l.0.to_string())
433            .collect_vec();
434
435        let log_lines = match limit {
436            Some(n) => {
437                let len = log_lines.len();
438                if len > n {
439                    log_lines.into_iter().skip(len - n).collect_vec()
440                } else {
441                    log_lines
442                }
443            }
444            None => log_lines,
445        };
446
447        Ok(log_lines)
448    }
449
450    fn output_logs(
451        &self,
452        log_lines: Vec<(String, String, String)>,
453        single_daemon: bool,
454        has_time_filter: bool,
455        raw: bool,
456    ) -> Result<()> {
457        if log_lines.is_empty() {
458            return Ok(());
459        }
460
461        // Raw mode: output without formatting and without pager
462        if raw {
463            for (date, id, msg) in log_lines {
464                if single_daemon {
465                    println!("{} {}", date, msg);
466                } else {
467                    println!("{} {} {}", date, id, msg);
468                }
469            }
470            return Ok(());
471        }
472
473        let use_pager = !self.no_pager && should_use_pager(log_lines.len());
474
475        if use_pager {
476            self.output_with_pager(log_lines, single_daemon, has_time_filter)?;
477        } else {
478            for (date, id, msg) in log_lines {
479                println!("{}", format_log_line(&date, &id, &msg, single_daemon));
480            }
481        }
482
483        Ok(())
484    }
485
486    fn output_with_pager(
487        &self,
488        log_lines: Vec<(String, String, String)>,
489        single_daemon: bool,
490        has_time_filter: bool,
491    ) -> Result<()> {
492        // When time filter is used, start at top; otherwise start at end
493        let pager_config = PagerConfig::new(!has_time_filter);
494
495        match pager_config.spawn_piped() {
496            Ok(mut child) => {
497                if let Some(stdin) = child.stdin.as_mut() {
498                    for (date, id, msg) in log_lines {
499                        let line =
500                            format!("{}\n", format_log_line(&date, &id, &msg, single_daemon));
501                        if stdin.write_all(line.as_bytes()).is_err() {
502                            break;
503                        }
504                    }
505                    let _ = child.wait();
506                } else {
507                    debug!("Failed to get pager stdin, falling back to direct output");
508                    for (date, id, msg) in log_lines {
509                        println!("{}", format_log_line(&date, &id, &msg, single_daemon));
510                    }
511                }
512            }
513            Err(e) => {
514                debug!(
515                    "Failed to spawn pager: {}, falling back to direct output",
516                    e
517                );
518                for (date, id, msg) in log_lines {
519                    println!("{}", format_log_line(&date, &id, &msg, single_daemon));
520                }
521            }
522        }
523
524        Ok(())
525    }
526
527    fn stream_logs_to_pager(
528        &self,
529        log_files: &BTreeMap<DaemonId, LogFile>,
530        single_daemon: bool,
531        raw: bool,
532    ) -> Result<()> {
533        if !io::stdout().is_terminal() || self.no_pager || self.tail || raw {
534            return self.stream_logs_direct(log_files, single_daemon, raw);
535        }
536
537        let pager_config = PagerConfig::new(true); // start_at_end = true
538
539        match pager_config.spawn_piped() {
540            Ok(mut child) => {
541                if let Some(stdin) = child.stdin.take() {
542                    // Collect file info for the streaming thread
543                    let log_files_clone: Vec<_> = log_files
544                        .iter()
545                        .map(|(daemon_id, lf)| (daemon_id.qualified(), lf.path.clone()))
546                        .collect();
547                    let single_daemon_clone = single_daemon;
548
549                    // Stream logs using a background thread to avoid blocking
550                    std::thread::spawn(move || {
551                        let mut writer = BufWriter::new(stdin);
552
553                        // Single file: stream directly without merge overhead
554                        if log_files_clone.len() == 1 {
555                            let (name, path) = &log_files_clone[0];
556                            let file = match File::open(path) {
557                                Ok(f) => f,
558                                Err(_) => return,
559                            };
560                            let parser = StreamingLogParser::new(file);
561                            for (timestamp, message) in parser {
562                                let output = format!(
563                                    "{}\n",
564                                    format_log_line(
565                                        &timestamp,
566                                        name,
567                                        &message,
568                                        single_daemon_clone
569                                    )
570                                );
571                                if writer.write_all(output.as_bytes()).is_err() {
572                                    return;
573                                }
574                            }
575                            let _ = writer.flush();
576                            return;
577                        }
578
579                        // Multiple files: use streaming merger for sorted/interleaved output
580                        let mut merger: StreamingMerger<StreamingLogParser> =
581                            StreamingMerger::new();
582
583                        for (name, path) in log_files_clone {
584                            let file = match File::open(&path) {
585                                Ok(f) => f,
586                                Err(_) => continue,
587                            };
588                            let parser = StreamingLogParser::new(file);
589                            merger.add_source(name, parser);
590                        }
591
592                        // Initialize the heap with first entry from each source
593                        merger.initialize();
594
595                        // Stream merged entries to pager
596                        for (timestamp, daemon, message) in merger {
597                            let output = format!(
598                                "{}\n",
599                                format_log_line(&timestamp, &daemon, &message, single_daemon_clone)
600                            );
601                            if writer.write_all(output.as_bytes()).is_err() {
602                                return;
603                            }
604                        }
605
606                        let _ = writer.flush();
607                    });
608
609                    let _ = child.wait();
610                } else {
611                    debug!("Failed to get pager stdin, falling back to direct output");
612                    return self.stream_logs_direct(log_files, single_daemon, raw);
613                }
614            }
615            Err(e) => {
616                debug!(
617                    "Failed to spawn pager: {}, falling back to direct output",
618                    e
619                );
620                return self.stream_logs_direct(log_files, single_daemon, raw);
621            }
622        }
623
624        Ok(())
625    }
626
627    fn stream_logs_direct(
628        &self,
629        log_files: &BTreeMap<DaemonId, LogFile>,
630        single_daemon: bool,
631        raw: bool,
632    ) -> Result<()> {
633        // Fast path for single daemon: directly output file content without parsing
634        // This avoids expensive regex parsing for each line in large log files
635        if log_files.len() == 1 {
636            let (daemon_id, lf) = log_files.iter().next().unwrap();
637            let file = match File::open(&lf.path) {
638                Ok(f) => f,
639                Err(e) => {
640                    error!("{}: {}", lf.path.display(), e);
641                    return Ok(());
642                }
643            };
644            let reader = BufReader::new(file);
645            if raw {
646                // Raw mode: output lines as-is
647                for line in reader.lines() {
648                    match line {
649                        Ok(l) => {
650                            if io::stdout().write_all(l.as_bytes()).is_err()
651                                || io::stdout().write_all(b"\n").is_err()
652                            {
653                                return Ok(());
654                            }
655                        }
656                        Err(_) => continue,
657                    }
658                }
659            } else {
660                // Formatted mode: parse and format each line
661                let parser = StreamingLogParser::new(File::open(&lf.path).into_diagnostic()?);
662                for (timestamp, message) in parser {
663                    let output = format!(
664                        "{}\n",
665                        format_log_line(
666                            &timestamp,
667                            &daemon_id.qualified(),
668                            &message,
669                            single_daemon
670                        )
671                    );
672                    if io::stdout().write_all(output.as_bytes()).is_err() {
673                        return Ok(());
674                    }
675                }
676            }
677            return Ok(());
678        }
679
680        // Multiple daemons: use streaming merger for sorted output
681        let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
682
683        for (daemon_id, lf) in log_files {
684            let file = match File::open(&lf.path) {
685                Ok(f) => f,
686                Err(e) => {
687                    error!("{}: {}", lf.path.display(), e);
688                    continue;
689                }
690            };
691            let parser = StreamingLogParser::new(file);
692            merger.add_source(daemon_id.qualified(), parser);
693        }
694
695        // Initialize the heap with first entry from each source
696        merger.initialize();
697
698        // Stream merged entries to stdout
699        for (timestamp, daemon, message) in merger {
700            let output = if raw {
701                if single_daemon {
702                    format!("{} {}\n", timestamp, message)
703                } else {
704                    format!("{} {} {}\n", timestamp, daemon, message)
705                }
706            } else {
707                format!(
708                    "{}\n",
709                    format_log_line(&timestamp, &daemon, &message, single_daemon)
710                )
711            };
712            if io::stdout().write_all(output.as_bytes()).is_err() {
713                return Ok(());
714            }
715        }
716
717        Ok(())
718    }
719}
720
721fn should_use_pager(line_count: usize) -> bool {
722    if !io::stdout().is_terminal() {
723        return false;
724    }
725
726    let terminal_height = get_terminal_height().unwrap_or(24);
727    line_count > terminal_height
728}
729
730fn get_terminal_height() -> Option<usize> {
731    if let Ok(rows) = std::env::var("LINES")
732        && let Ok(h) = rows.parse::<usize>()
733    {
734        return Some(h);
735    }
736
737    crossterm::terminal::size().ok().map(|(_, h)| h as usize)
738}
739
740fn read_lines_in_time_range(
741    path: &Path,
742    from: Option<DateTime<Local>>,
743    to: Option<DateTime<Local>>,
744) -> Result<Vec<String>> {
745    let mut file = File::open(path).into_diagnostic()?;
746    let file_size = file.metadata().into_diagnostic()?.len();
747
748    if file_size == 0 {
749        return Ok(vec![]);
750    }
751
752    let start_pos = if let Some(from_time) = from {
753        binary_search_log_position(&mut file, file_size, from_time, true)?
754    } else {
755        0
756    };
757
758    let end_pos = if let Some(to_time) = to {
759        binary_search_log_position(&mut file, file_size, to_time, false)?
760    } else {
761        file_size
762    };
763
764    if start_pos >= end_pos {
765        return Ok(vec![]);
766    }
767
768    file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
769    let mut reader = BufReader::new(&file);
770    let mut lines = Vec::new();
771    let mut current_pos = start_pos;
772
773    loop {
774        if current_pos >= end_pos {
775            break;
776        }
777
778        let mut line = String::new();
779        match reader.read_line(&mut line) {
780            Ok(0) => break,
781            Ok(bytes_read) => {
782                current_pos += bytes_read as u64;
783                if line.ends_with('\n') {
784                    line.pop();
785                    if line.ends_with('\r') {
786                        line.pop();
787                    }
788                }
789                lines.push(line);
790            }
791            Err(_) => break,
792        }
793    }
794
795    Ok(lines)
796}
797
798fn binary_search_log_position(
799    file: &mut File,
800    file_size: u64,
801    target_time: DateTime<Local>,
802    find_start: bool,
803) -> Result<u64> {
804    let mut low: u64 = 0;
805    let mut high: u64 = file_size;
806
807    while low < high {
808        let mid = low + (high - low) / 2;
809
810        let line_start = find_line_start(file, mid)?;
811
812        file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
813        let mut reader = BufReader::new(&*file);
814        let mut line = String::new();
815        let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
816        if bytes_read == 0 {
817            high = mid;
818            continue;
819        }
820
821        let line_time = extract_timestamp(&line);
822
823        match line_time {
824            Some(lt) => {
825                if find_start {
826                    if lt < target_time {
827                        low = line_start + bytes_read as u64;
828                    } else {
829                        high = line_start;
830                    }
831                } else if lt <= target_time {
832                    low = line_start + bytes_read as u64;
833                } else {
834                    high = line_start;
835                }
836            }
837            None => {
838                low = line_start + bytes_read as u64;
839            }
840        }
841    }
842
843    find_line_start(file, low)
844}
845
846fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
847    if pos == 0 {
848        return Ok(0);
849    }
850
851    // Start searching from the byte just before `pos`.
852    let mut search_pos = pos.saturating_sub(1);
853    const CHUNK_SIZE: usize = 8192;
854
855    loop {
856        // Determine the start of the chunk we want to read.
857        let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
858        let len_u64 = search_pos - chunk_start + 1;
859        let len = len_u64 as usize;
860
861        // Seek once to the beginning of this chunk.
862        file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
863        let mut buf = vec![0u8; len];
864        if file.read_exact(&mut buf).is_err() {
865            // Match the original behavior: on read error, fall back to start of file.
866            return Ok(0);
867        }
868
869        // Scan this chunk backwards for a newline.
870        for (i, &b) in buf.iter().enumerate().rev() {
871            if b == b'\n' {
872                return Ok(chunk_start + i as u64 + 1);
873            }
874        }
875
876        // No newline in this chunk; if we've reached the start of the file,
877        // there is no earlier newline.
878        if chunk_start == 0 {
879            return Ok(0);
880        }
881
882        // Move to the previous chunk (just before this one).
883        search_pos = chunk_start - 1;
884    }
885}
886
887fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
888    let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
889    re.captures(line)
890        .and_then(|caps| caps.get(1))
891        .and_then(|m| parse_datetime(m.as_str()).ok())
892}
893
894fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
895    let lines = if reverse {
896        lines.into_iter().rev().collect()
897    } else {
898        lines
899    };
900
901    let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
902    lines
903        .into_iter()
904        .fold(vec![], |mut acc, line| match re.captures(&line) {
905            Some(caps) => {
906                let (date, msg) = match (caps.get(1), caps.get(3)) {
907                    (Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
908                    _ => return acc,
909                };
910                acc.push((date, id.to_string(), msg));
911                acc
912            }
913            None => {
914                if let Some(l) = acc.last_mut() {
915                    l.2.push('\n');
916                    l.2.push_str(&line);
917                }
918                acc
919            }
920        })
921}
922
923/// Rename legacy log directories that predate namespace-qualified daemon IDs.
924///
925/// Old layout: `PITCHFORK_LOGS_DIR/<name>/<name>.log`
926/// New layout: `PITCHFORK_LOGS_DIR/legacy--<name>/legacy--<name>.log`
927///
928/// Only directories that clearly match the old layout are migrated:
929/// - directory name does not contain `"--"`
930/// - directory contains `<name>.log`
931/// - `<name>` is a valid daemon short name under current DaemonId rules
932fn migrate_legacy_log_dirs() {
933    let known_safe_paths = known_daemon_safe_paths();
934    let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
935        Ok(d) => d,
936        Err(_) => return,
937    };
938    for dir in dirs {
939        if dir.starts_with(".") || !dir.is_dir() {
940            continue;
941        }
942        let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
943            Some(n) => n,
944            None => continue,
945        };
946        // New-format directories usually contain "--". For safety, only treat
947        // them as new-format if they match a known daemon ID safe-path.
948        if name.contains("--") {
949            // If it parses as a valid safe-path, treat it as already migrated
950            // and keep idempotent behavior silent.
951            if DaemonId::from_safe_path(&name).is_ok() {
952                continue;
953            }
954            // Keep noisy warnings only for invalid/ambiguous names that cannot
955            // be interpreted as new-format IDs.
956            if known_safe_paths.contains(&name) {
957                continue;
958            }
959            warn!(
960                "Skipping invalid legacy log directory '{}': contains '--' but is not a valid daemon safe-path",
961                name
962            );
963            continue;
964        }
965
966        // Migrate only explicit old-layout directories to avoid renaming
967        // unrelated folders under logs/.
968        let old_log = dir.join(format!("{name}.log"));
969        if !old_log.exists() {
970            continue;
971        }
972        if DaemonId::try_new("legacy", &name).is_err() {
973            warn!(
974                "Skipping invalid legacy log directory '{}': not a valid daemon ID",
975                name
976            );
977            continue;
978        }
979
980        let new_name = format!("legacy--{name}");
981        let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
982        // Skip if a target directory already exists to avoid clobbering data.
983        if new_dir.exists() {
984            continue;
985        }
986        if std::fs::rename(&dir, &new_dir).is_err() {
987            continue;
988        }
989        // Also rename the log file inside the directory.
990        let old_log = new_dir.join(format!("{name}.log"));
991        let new_log = new_dir.join(format!("{new_name}.log"));
992        if old_log.exists() {
993            let _ = std::fs::rename(&old_log, &new_log);
994        }
995        debug!("Migrated legacy log dir '{}' → '{}'", name, new_name);
996    }
997}
998
999fn known_daemon_safe_paths() -> BTreeSet<String> {
1000    let mut out = BTreeSet::new();
1001
1002    match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1003        Ok(state) => {
1004            for id in state.daemons.keys() {
1005                out.insert(id.safe_path());
1006            }
1007        }
1008        Err(e) => {
1009            warn!("Failed to read state while checking known daemon IDs: {e}");
1010        }
1011    }
1012
1013    match PitchforkToml::all_merged() {
1014        Ok(config) => {
1015            for id in config.daemons.keys() {
1016                out.insert(id.safe_path());
1017            }
1018        }
1019        Err(e) => {
1020            warn!("Failed to read config while checking known daemon IDs: {e}");
1021        }
1022    }
1023
1024    out
1025}
1026
1027fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
1028    let mut ids = BTreeSet::new();
1029
1030    match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1031        Ok(state) => ids.extend(state.daemons.keys().cloned()),
1032        Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
1033    }
1034
1035    match PitchforkToml::all_merged() {
1036        Ok(config) => ids.extend(config.daemons.keys().cloned()),
1037        Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
1038    }
1039
1040    Ok(ids
1041        .into_iter()
1042        .filter(|id| id.log_path().exists())
1043        .collect())
1044}
1045
1046fn get_log_file_infos(names: &[DaemonId]) -> Result<BTreeMap<DaemonId, LogFile>> {
1047    let mut out = BTreeMap::new();
1048
1049    for daemon_id in names {
1050        let path = daemon_id.log_path();
1051
1052        // Directory may exist before the first log line is written.
1053        if !path.exists() {
1054            continue;
1055        }
1056
1057        let mut file = xx::file::open(&path)?;
1058        // Seek to end and get position atomically to avoid race condition
1059        // where content is written between metadata check and file open
1060        file.seek(SeekFrom::End(0)).into_diagnostic()?;
1061        let cur = file.stream_position().into_diagnostic()?;
1062
1063        out.insert(
1064            daemon_id.clone(),
1065            LogFile {
1066                _name: daemon_id.clone(),
1067                file,
1068                cur,
1069                path,
1070            },
1071        );
1072    }
1073
1074    Ok(out)
1075}
1076
1077pub async fn tail_logs(names: &[DaemonId]) -> Result<()> {
1078    let mut log_files = get_log_file_infos(names)?;
1079    let mut wf = WatchFiles::new(Duration::from_millis(10))?;
1080
1081    for lf in log_files.values() {
1082        wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
1083    }
1084
1085    let files_to_name: HashMap<PathBuf, DaemonId> = log_files
1086        .iter()
1087        .map(|(n, f)| (f.path.clone(), n.clone()))
1088        .collect();
1089
1090    while let Some(paths) = wf.rx.recv().await {
1091        let mut out = vec![];
1092        for path in paths {
1093            let Some(name) = files_to_name.get(&path) else {
1094                warn!("Unknown log file changed: {}", path.display());
1095                continue;
1096            };
1097            let Some(info) = log_files.get_mut(name) else {
1098                warn!("No log info for: {name}");
1099                continue;
1100            };
1101            info.file
1102                .seek(SeekFrom::Start(info.cur))
1103                .into_diagnostic()?;
1104            let reader = BufReader::new(&info.file);
1105            let lines = reader.lines().map_while(Result::ok).collect_vec();
1106            info.cur = info.file.stream_position().into_diagnostic()?;
1107            out.extend(merge_log_lines(&name.qualified(), lines, false));
1108        }
1109        let out = out
1110            .into_iter()
1111            .sorted_by_cached_key(|l| l.0.to_string())
1112            .collect_vec();
1113        for (date, name, msg) in out {
1114            println!("{} {} {}", edim(&date), name, msg);
1115        }
1116    }
1117    Ok(())
1118}
1119
1120struct LogFile {
1121    _name: DaemonId,
1122    path: PathBuf,
1123    file: fs::File,
1124    cur: u64,
1125}
1126
1127fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
1128    let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
1129    Local
1130        .from_local_datetime(&naive_dt)
1131        .single()
1132        .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
1133}
1134
1135/// Parse time input string into DateTime.
1136///
1137/// `is_since` indicates whether this is for --since (true) or --until (false).
1138/// The "yesterday fallback" only applies to --since: if the time is in the future,
1139/// assume the user meant yesterday. For --until, future times are kept as-is.
1140fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
1141    let s = s.trim();
1142
1143    // Try full datetime first (YYYY-MM-DD HH:MM:SS)
1144    if let Ok(dt) = parse_datetime(s) {
1145        return Ok(dt);
1146    }
1147
1148    // Try datetime without seconds (YYYY-MM-DD HH:MM)
1149    if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
1150        return Local
1151            .from_local_datetime(&naive_dt)
1152            .single()
1153            .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
1154    }
1155
1156    // Try time-only format (HH:MM:SS or HH:MM)
1157    // Note: This branch won't be reached for inputs like "10:30" that could match
1158    // parse_datetime, because parse_datetime expects a full date prefix and will fail.
1159    if let Ok(time) = parse_time_only(s) {
1160        let now = Local::now();
1161        let today = now.date_naive();
1162        let mut naive_dt = NaiveDateTime::new(today, time);
1163        let mut dt = Local
1164            .from_local_datetime(&naive_dt)
1165            .single()
1166            .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1167
1168        // If the interpreted time for today is in the future, assume the user meant yesterday
1169        // BUT only for --since. For --until, a future time today is valid.
1170        if is_since
1171            && dt > now
1172            && let Some(yesterday) = today.pred_opt()
1173        {
1174            naive_dt = NaiveDateTime::new(yesterday, time);
1175            dt = Local
1176                .from_local_datetime(&naive_dt)
1177                .single()
1178                .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1179        }
1180        return Ok(dt);
1181    }
1182
1183    if let Ok(duration) = humantime::parse_duration(s) {
1184        let now = Local::now();
1185        let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
1186        return Ok(target);
1187    }
1188
1189    Err(miette::miette!(
1190        "Invalid time format: '{}'. Expected formats:\n\
1191         - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
1192         - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
1193         - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
1194        s
1195    ))
1196}
1197
1198fn parse_time_only(s: &str) -> Result<NaiveTime> {
1199    if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
1200        return Ok(time);
1201    }
1202
1203    if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
1204        return Ok(time);
1205    }
1206
1207    Err(miette::miette!("Invalid time format: '{}'", s))
1208}
1209
1210pub fn print_logs_for_time_range(
1211    daemon_id: &DaemonId,
1212    from: DateTime<Local>,
1213    to: Option<DateTime<Local>>,
1214) -> Result<()> {
1215    let daemon_ids = vec![daemon_id.clone()];
1216    let log_files = get_log_file_infos(&daemon_ids)?;
1217
1218    let from = from
1219        .with_nanosecond(0)
1220        .expect("0 is always valid for nanoseconds");
1221    let to = to.map(|t| {
1222        t.with_nanosecond(0)
1223            .expect("0 is always valid for nanoseconds")
1224    });
1225
1226    let log_lines = log_files
1227        .iter()
1228        .flat_map(
1229            |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), to) {
1230                Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1231                Err(e) => {
1232                    error!("{}: {}", lf.path.display(), e);
1233                    vec![]
1234                }
1235            },
1236        )
1237        .sorted_by_cached_key(|l| l.0.to_string())
1238        .collect_vec();
1239
1240    if log_lines.is_empty() {
1241        eprintln!("No logs found for daemon '{daemon_id}' in the specified time range");
1242    } else {
1243        eprintln!("\n{} {} {}", edim("==="), edim("Error logs"), edim("==="));
1244        for (date, _id, msg) in log_lines {
1245            eprintln!("{} {}", edim(&date), msg);
1246        }
1247        eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1248    }
1249
1250    Ok(())
1251}
1252
1253pub fn print_startup_logs(daemon_id: &DaemonId, from: DateTime<Local>) -> Result<()> {
1254    let daemon_ids = vec![daemon_id.clone()];
1255    let log_files = get_log_file_infos(&daemon_ids)?;
1256
1257    let from = from
1258        .with_nanosecond(0)
1259        .expect("0 is always valid for nanoseconds");
1260
1261    let log_lines = log_files
1262        .iter()
1263        .flat_map(
1264            |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), None) {
1265                Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1266                Err(e) => {
1267                    error!("{}: {}", lf.path.display(), e);
1268                    vec![]
1269                }
1270            },
1271        )
1272        .sorted_by_cached_key(|l| l.0.to_string())
1273        .collect_vec();
1274
1275    if !log_lines.is_empty() {
1276        eprintln!("\n{} {} {}", edim("==="), edim("Startup logs"), edim("==="));
1277        for (date, _id, msg) in log_lines {
1278            eprintln!("{} {}", edim(&date), msg);
1279        }
1280        eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1281    }
1282
1283    Ok(())
1284}