Skip to main content

pitchfork_cli/cli/
logs.rs

1use crate::ui::style::edim;
2use crate::watch_files::WatchFiles;
3use crate::{Result, env};
4use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
5use itertools::Itertools;
6use miette::IntoDiagnostic;
7use notify::RecursiveMode;
8use std::cmp::{Ordering, Reverse};
9use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
10use std::fs::{self, File};
11use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
12use std::path::{Path, PathBuf};
13use std::process::{Child, Command, Stdio};
14use std::time::Duration;
15use xx::regex;
16
17/// Pager configuration for displaying logs
18struct PagerConfig {
19    command: String,
20    args: Vec<String>,
21}
22
23impl PagerConfig {
24    /// Select and configure the appropriate pager.
25    /// Uses $PAGER environment variable if set, otherwise defaults to less.
26    fn new(start_at_end: bool) -> Self {
27        let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
28        let args = Self::build_args(&command, start_at_end);
29        Self { command, args }
30    }
31
32    fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
33        let mut args = vec![];
34        if pager == "less" {
35            args.push("-R".to_string());
36            if start_at_end {
37                args.push("+G".to_string());
38            }
39        }
40        args
41    }
42
43    /// Spawn the pager with piped stdin
44    fn spawn_piped(&self) -> io::Result<Child> {
45        Command::new(&self.command)
46            .args(&self.args)
47            .stdin(Stdio::piped())
48            .spawn()
49    }
50}
51
52/// Format a single log line for output.
53/// When `single_daemon` is true, omits the daemon ID from the output.
54fn format_log_line(date: &str, id: &str, msg: &str, single_daemon: bool) -> String {
55    if single_daemon {
56        format!("{} {}", edim(date), msg)
57    } else {
58        format!("{} {} {}", edim(date), id, msg)
59    }
60}
61
62/// A parsed log entry with timestamp, daemon name, and message
63#[derive(Debug)]
64struct LogEntry {
65    timestamp: String,
66    daemon: String,
67    message: String,
68    source_idx: usize, // Index of the source iterator
69}
70
71impl PartialEq for LogEntry {
72    fn eq(&self, other: &Self) -> bool {
73        self.timestamp == other.timestamp
74    }
75}
76
77impl Eq for LogEntry {}
78
79impl PartialOrd for LogEntry {
80    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
81        Some(self.cmp(other))
82    }
83}
84
85impl Ord for LogEntry {
86    fn cmp(&self, other: &Self) -> Ordering {
87        self.timestamp.cmp(&other.timestamp)
88    }
89}
90
91/// Streaming merger for multiple sorted log files using a min-heap.
92/// This allows merging sorted iterators without loading all data into memory.
93struct StreamingMerger<I>
94where
95    I: Iterator<Item = (String, String)>,
96{
97    sources: Vec<(String, I)>,           // (daemon_name, line_iterator)
98    heap: BinaryHeap<Reverse<LogEntry>>, // Min-heap (using Reverse for ascending order)
99}
100
101impl<I> StreamingMerger<I>
102where
103    I: Iterator<Item = (String, String)>,
104{
105    fn new() -> Self {
106        Self {
107            sources: Vec::new(),
108            heap: BinaryHeap::new(),
109        }
110    }
111
112    fn add_source(&mut self, daemon_name: String, iter: I) {
113        self.sources.push((daemon_name, iter));
114    }
115
116    fn initialize(&mut self) {
117        // Pull the first entry from each source into the heap
118        for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
119            if let Some((timestamp, message)) = iter.next() {
120                self.heap.push(Reverse(LogEntry {
121                    timestamp,
122                    daemon: daemon.clone(),
123                    message,
124                    source_idx: idx,
125                }));
126            }
127        }
128    }
129}
130
131impl<I> Iterator for StreamingMerger<I>
132where
133    I: Iterator<Item = (String, String)>,
134{
135    type Item = (String, String, String); // (timestamp, daemon, message)
136
137    fn next(&mut self) -> Option<Self::Item> {
138        // Pop the smallest entry from the heap
139        let Reverse(entry) = self.heap.pop()?;
140
141        // Pull the next entry from the same source and push to heap
142        let (daemon, iter) = &mut self.sources[entry.source_idx];
143        if let Some((timestamp, message)) = iter.next() {
144            self.heap.push(Reverse(LogEntry {
145                timestamp,
146                daemon: daemon.clone(),
147                message,
148                source_idx: entry.source_idx,
149            }));
150        }
151
152        Some((entry.timestamp, entry.daemon, entry.message))
153    }
154}
155
156/// A proper streaming log parser that handles multi-line entries
157struct StreamingLogParser {
158    reader: BufReader<File>,
159    current_entry: Option<(String, String)>,
160    finished: bool,
161}
162
163impl StreamingLogParser {
164    fn new(file: File) -> Self {
165        Self {
166            reader: BufReader::new(file),
167            current_entry: None,
168            finished: false,
169        }
170    }
171}
172
173impl Iterator for StreamingLogParser {
174    type Item = (String, String);
175
176    fn next(&mut self) -> Option<Self::Item> {
177        if self.finished {
178            return None;
179        }
180
181        let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (\w+) (.*)$");
182
183        loop {
184            let mut line = String::new();
185            match self.reader.read_line(&mut line) {
186                Ok(0) => {
187                    // EOF - return the last entry if any
188                    self.finished = true;
189                    return self.current_entry.take();
190                }
191                Ok(_) => {
192                    // Remove trailing newline
193                    if line.ends_with('\n') {
194                        line.pop();
195                        if line.ends_with('\r') {
196                            line.pop();
197                        }
198                    }
199
200                    if let Some(caps) = re.captures(&line) {
201                        let date = match caps.get(1) {
202                            Some(d) => d.as_str().to_string(),
203                            None => continue,
204                        };
205                        let msg = match caps.get(3) {
206                            Some(m) => m.as_str().to_string(),
207                            None => continue,
208                        };
209
210                        // Return the previous entry and start a new one
211                        let prev = self.current_entry.take();
212                        self.current_entry = Some((date, msg));
213
214                        if prev.is_some() {
215                            return prev;
216                        }
217                        // First entry - continue to read more
218                    } else {
219                        // Continuation line - append to current entry
220                        if let Some((_, ref mut msg)) = self.current_entry {
221                            msg.push('\n');
222                            msg.push_str(&line);
223                        }
224                    }
225                }
226                Err(_) => {
227                    self.finished = true;
228                    return self.current_entry.take();
229                }
230            }
231        }
232    }
233}
234
235/// Displays logs for daemon(s)
236#[derive(Debug, clap::Args)]
237#[clap(
238    visible_alias = "l",
239    verbatim_doc_comment,
240    long_about = "\
241Displays logs for daemon(s)
242
243Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
244and include timestamps for filtering.
245
246Examples:
247  pitchfork logs api              Show all logs for 'api' (paged if needed)
248  pitchfork logs api worker       Show logs for multiple daemons
249  pitchfork logs                  Show logs for all daemons
250  pitchfork logs api -n 50        Show last 50 lines
251  pitchfork logs api --follow     Follow logs in real-time
252  pitchfork logs api --since '2024-01-15 10:00:00'
253                                  Show logs since a specific time (forward)
254  pitchfork logs api --since '10:30:00'
255                                  Show logs since 10:30:00 today
256  pitchfork logs api --since '10:30' --until '12:00'
257                                  Show logs since 10:30:00 until 12:00:00 today
258  pitchfork logs api --since 5min Show logs from last 5 minutes
259  pitchfork logs api --raw        Output raw log lines without formatting
260  pitchfork logs api --raw -n 100 Output last 100 raw log lines
261  pitchfork logs api --clear      Delete logs for 'api'
262  pitchfork logs --clear          Delete logs for all daemons"
263)]
264pub struct Logs {
265    /// Show only logs for the specified daemon(s)
266    id: Vec<String>,
267
268    /// Delete logs
269    #[clap(short, long)]
270    clear: bool,
271
272    /// Show last N lines of logs
273    ///
274    /// Only applies when --since/--until is not used.
275    /// Without this option, all logs are shown.
276    #[clap(short)]
277    n: Option<usize>,
278
279    /// Show logs in real-time
280    #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
281    tail: bool,
282
283    /// Show logs from this time
284    ///
285    /// Supports multiple formats:
286    /// - Full datetime: "YYYY-MM-DD HH:MM:SS" or "YYYY-MM-DD HH:MM"
287    /// - Time only: "HH:MM:SS" or "HH:MM" (uses today's date)
288    /// - Relative time: "5min", "2h", "1d" (e.g., last 5 minutes)
289    #[clap(short = 's', long)]
290    since: Option<String>,
291
292    /// Show logs until this time
293    ///
294    /// Supports multiple formats:
295    /// - Full datetime: "YYYY-MM-DD HH:MM:SS" or "YYYY-MM-DD HH:MM"
296    /// - Time only: "HH:MM:SS" or "HH:MM" (uses today's date)
297    #[clap(short = 'u', long)]
298    until: Option<String>,
299
300    /// Disable pager even in interactive terminal
301    #[clap(long)]
302    no_pager: bool,
303
304    /// Output raw log lines without color or formatting
305    #[clap(long)]
306    raw: bool,
307}
308
309impl Logs {
310    pub async fn run(&self) -> Result<()> {
311        if self.clear {
312            let ids = if self.id.is_empty() {
313                // Clear all logs when no daemon specified
314                get_all_daemon_ids()?
315            } else {
316                self.id.clone()
317            };
318            for id in &ids {
319                let log_dir = env::PITCHFORK_LOGS_DIR.join(id);
320                let path = log_dir.join(format!("{id}.log"));
321                if path.exists() {
322                    xx::file::create(&path)?;
323                }
324            }
325            return Ok(());
326        }
327
328        let from = if let Some(since) = self.since.as_ref() {
329            Some(parse_time_input(since, true)?)
330        } else {
331            None
332        };
333        let to = if let Some(until) = self.until.as_ref() {
334            Some(parse_time_input(until, false)?)
335        } else {
336            None
337        };
338
339        self.print_existing_logs(from, to)?;
340        if self.tail {
341            tail_logs(&self.id).await?;
342        }
343
344        Ok(())
345    }
346
347    fn print_existing_logs(
348        &self,
349        from: Option<DateTime<Local>>,
350        to: Option<DateTime<Local>>,
351    ) -> Result<()> {
352        let log_files = get_log_file_infos(&self.id)?;
353        trace!("log files for: {}", log_files.keys().join(", "));
354        let single_daemon = self.id.len() == 1;
355        let has_time_filter = from.is_some() || to.is_some();
356
357        if has_time_filter {
358            let mut log_lines = self.collect_log_lines_forward(&log_files, from, to)?;
359
360            if let Some(n) = self.n {
361                let len = log_lines.len();
362                if len > n {
363                    log_lines = log_lines.into_iter().skip(len - n).collect_vec();
364                }
365            }
366
367            self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
368        } else if let Some(n) = self.n {
369            let log_lines = self.collect_log_lines_reverse(&log_files, Some(n))?;
370            self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
371        } else {
372            self.stream_logs_to_pager(&log_files, single_daemon, self.raw)?;
373        }
374
375        Ok(())
376    }
377
378    fn collect_log_lines_forward(
379        &self,
380        log_files: &BTreeMap<String, LogFile>,
381        from: Option<DateTime<Local>>,
382        to: Option<DateTime<Local>>,
383    ) -> Result<Vec<(String, String, String)>> {
384        let log_lines: Vec<(String, String, String)> = log_files
385            .iter()
386            .flat_map(
387                |(name, lf)| match read_lines_in_time_range(&lf.path, from, to) {
388                    Ok(lines) => merge_log_lines(name, lines, false),
389                    Err(e) => {
390                        error!("{}: {}", lf.path.display(), e);
391                        vec![]
392                    }
393                },
394            )
395            .sorted_by_cached_key(|l| l.0.to_string())
396            .collect_vec();
397
398        Ok(log_lines)
399    }
400
401    fn collect_log_lines_reverse(
402        &self,
403        log_files: &BTreeMap<String, LogFile>,
404        limit: Option<usize>,
405    ) -> Result<Vec<(String, String, String)>> {
406        let log_lines: Vec<(String, String, String)> = log_files
407            .iter()
408            .flat_map(|(name, lf)| {
409                let rev = match xx::file::open(&lf.path) {
410                    Ok(f) => rev_lines::RevLines::new(f),
411                    Err(e) => {
412                        error!("{}: {}", lf.path.display(), e);
413                        return vec![];
414                    }
415                };
416                let lines = rev.into_iter().filter_map(Result::ok);
417                let lines = match limit {
418                    Some(n) => lines.take(n).collect_vec(),
419                    None => lines.collect_vec(),
420                };
421                merge_log_lines(name, lines, true)
422            })
423            .sorted_by_cached_key(|l| l.0.to_string())
424            .collect_vec();
425
426        let log_lines = match limit {
427            Some(n) => {
428                let len = log_lines.len();
429                if len > n {
430                    log_lines.into_iter().skip(len - n).collect_vec()
431                } else {
432                    log_lines
433                }
434            }
435            None => log_lines,
436        };
437
438        Ok(log_lines)
439    }
440
441    fn output_logs(
442        &self,
443        log_lines: Vec<(String, String, String)>,
444        single_daemon: bool,
445        has_time_filter: bool,
446        raw: bool,
447    ) -> Result<()> {
448        if log_lines.is_empty() {
449            return Ok(());
450        }
451
452        // Raw mode: output without formatting and without pager
453        if raw {
454            for (date, _, msg) in log_lines {
455                println!("{} {}", date, msg);
456            }
457            return Ok(());
458        }
459
460        let use_pager = !self.no_pager && should_use_pager(log_lines.len());
461
462        if use_pager {
463            self.output_with_pager(log_lines, single_daemon, has_time_filter)?;
464        } else {
465            for (date, id, msg) in log_lines {
466                println!("{}", format_log_line(&date, &id, &msg, single_daemon));
467            }
468        }
469
470        Ok(())
471    }
472
473    fn output_with_pager(
474        &self,
475        log_lines: Vec<(String, String, String)>,
476        single_daemon: bool,
477        has_time_filter: bool,
478    ) -> Result<()> {
479        // When time filter is used, start at top; otherwise start at end
480        let pager_config = PagerConfig::new(!has_time_filter);
481
482        match pager_config.spawn_piped() {
483            Ok(mut child) => {
484                if let Some(stdin) = child.stdin.as_mut() {
485                    for (date, id, msg) in log_lines {
486                        let line =
487                            format!("{}\n", format_log_line(&date, &id, &msg, single_daemon));
488                        if stdin.write_all(line.as_bytes()).is_err() {
489                            break;
490                        }
491                    }
492                    let _ = child.wait();
493                } else {
494                    debug!("Failed to get pager stdin, falling back to direct output");
495                    for (date, id, msg) in log_lines {
496                        println!("{}", format_log_line(&date, &id, &msg, single_daemon));
497                    }
498                }
499            }
500            Err(e) => {
501                debug!(
502                    "Failed to spawn pager: {}, falling back to direct output",
503                    e
504                );
505                for (date, id, msg) in log_lines {
506                    println!("{}", format_log_line(&date, &id, &msg, single_daemon));
507                }
508            }
509        }
510
511        Ok(())
512    }
513
514    fn stream_logs_to_pager(
515        &self,
516        log_files: &BTreeMap<String, LogFile>,
517        single_daemon: bool,
518        raw: bool,
519    ) -> Result<()> {
520        if !io::stdout().is_terminal() || self.no_pager || raw {
521            return self.stream_logs_direct(log_files, single_daemon, raw);
522        }
523
524        let pager_config = PagerConfig::new(true); // start_at_end = true
525
526        match pager_config.spawn_piped() {
527            Ok(mut child) => {
528                if let Some(stdin) = child.stdin.take() {
529                    // Collect file info for the streaming thread
530                    let log_files_clone: Vec<_> = log_files
531                        .iter()
532                        .map(|(name, lf)| (name.clone(), lf.path.clone()))
533                        .collect();
534                    let single_daemon_clone = single_daemon;
535
536                    // Stream logs using a background thread to avoid blocking
537                    std::thread::spawn(move || {
538                        let mut writer = BufWriter::new(stdin);
539
540                        // Single file: stream directly without merge overhead
541                        if log_files_clone.len() == 1 {
542                            let (name, path) = &log_files_clone[0];
543                            let file = match File::open(path) {
544                                Ok(f) => f,
545                                Err(_) => return,
546                            };
547                            let parser = StreamingLogParser::new(file);
548                            for (timestamp, message) in parser {
549                                let output = format!(
550                                    "{}\n",
551                                    format_log_line(
552                                        &timestamp,
553                                        name,
554                                        &message,
555                                        single_daemon_clone
556                                    )
557                                );
558                                if writer.write_all(output.as_bytes()).is_err() {
559                                    return;
560                                }
561                            }
562                            let _ = writer.flush();
563                            return;
564                        }
565
566                        // Multiple files: use streaming merger for sorted/interleaved output
567                        let mut merger: StreamingMerger<StreamingLogParser> =
568                            StreamingMerger::new();
569
570                        for (name, path) in log_files_clone {
571                            let file = match File::open(&path) {
572                                Ok(f) => f,
573                                Err(_) => continue,
574                            };
575                            let parser = StreamingLogParser::new(file);
576                            merger.add_source(name, parser);
577                        }
578
579                        // Initialize the heap with first entry from each source
580                        merger.initialize();
581
582                        // Stream merged entries to pager
583                        for (timestamp, daemon, message) in merger {
584                            let output = format!(
585                                "{}\n",
586                                format_log_line(&timestamp, &daemon, &message, single_daemon_clone)
587                            );
588                            if writer.write_all(output.as_bytes()).is_err() {
589                                return;
590                            }
591                        }
592
593                        let _ = writer.flush();
594                    });
595
596                    let _ = child.wait();
597                } else {
598                    debug!("Failed to get pager stdin, falling back to direct output");
599                    return self.stream_logs_direct(log_files, single_daemon, raw);
600                }
601            }
602            Err(e) => {
603                debug!(
604                    "Failed to spawn pager: {}, falling back to direct output",
605                    e
606                );
607                return self.stream_logs_direct(log_files, single_daemon, raw);
608            }
609        }
610
611        Ok(())
612    }
613
614    fn stream_logs_direct(
615        &self,
616        log_files: &BTreeMap<String, LogFile>,
617        single_daemon: bool,
618        raw: bool,
619    ) -> Result<()> {
620        // Fast path for single daemon: directly output file content without parsing
621        // This avoids expensive regex parsing for each line in large log files
622        if log_files.len() == 1 {
623            let (name, lf) = log_files.iter().next().unwrap();
624            let file = match File::open(&lf.path) {
625                Ok(f) => f,
626                Err(e) => {
627                    error!("{}: {}", lf.path.display(), e);
628                    return Ok(());
629                }
630            };
631            let reader = BufReader::new(file);
632            if raw {
633                // Raw mode: output lines as-is
634                for line in reader.lines() {
635                    match line {
636                        Ok(l) => {
637                            if io::stdout().write_all(l.as_bytes()).is_err()
638                                || io::stdout().write_all(b"\n").is_err()
639                            {
640                                return Ok(());
641                            }
642                        }
643                        Err(_) => continue,
644                    }
645                }
646            } else {
647                // Formatted mode: parse and format each line
648                let parser = StreamingLogParser::new(File::open(&lf.path).into_diagnostic()?);
649                for (timestamp, message) in parser {
650                    let output = format!(
651                        "{}\n",
652                        format_log_line(&timestamp, name, &message, single_daemon)
653                    );
654                    if io::stdout().write_all(output.as_bytes()).is_err() {
655                        return Ok(());
656                    }
657                }
658            }
659            return Ok(());
660        }
661
662        // Multiple daemons: use streaming merger for sorted output
663        let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
664
665        for (name, lf) in log_files {
666            let file = match File::open(&lf.path) {
667                Ok(f) => f,
668                Err(e) => {
669                    error!("{}: {}", lf.path.display(), e);
670                    continue;
671                }
672            };
673            let parser = StreamingLogParser::new(file);
674            merger.add_source(name.clone(), parser);
675        }
676
677        // Initialize the heap with first entry from each source
678        merger.initialize();
679
680        // Stream merged entries to stdout
681        for (timestamp, daemon, message) in merger {
682            let output = if raw {
683                format!("{} {}\n", timestamp, message)
684            } else {
685                format!(
686                    "{}\n",
687                    format_log_line(&timestamp, &daemon, &message, single_daemon)
688                )
689            };
690            if io::stdout().write_all(output.as_bytes()).is_err() {
691                return Ok(());
692            }
693        }
694
695        Ok(())
696    }
697}
698
699fn should_use_pager(line_count: usize) -> bool {
700    if !io::stdout().is_terminal() {
701        return false;
702    }
703
704    let terminal_height = get_terminal_height().unwrap_or(24);
705    line_count > terminal_height
706}
707
708fn get_terminal_height() -> Option<usize> {
709    if let Ok(rows) = std::env::var("LINES")
710        && let Ok(h) = rows.parse::<usize>()
711    {
712        return Some(h);
713    }
714
715    crossterm::terminal::size().ok().map(|(_, h)| h as usize)
716}
717
718fn read_lines_in_time_range(
719    path: &Path,
720    from: Option<DateTime<Local>>,
721    to: Option<DateTime<Local>>,
722) -> Result<Vec<String>> {
723    let mut file = File::open(path).into_diagnostic()?;
724    let file_size = file.metadata().into_diagnostic()?.len();
725
726    if file_size == 0 {
727        return Ok(vec![]);
728    }
729
730    let start_pos = if let Some(from_time) = from {
731        binary_search_log_position(&mut file, file_size, from_time, true)?
732    } else {
733        0
734    };
735
736    let end_pos = if let Some(to_time) = to {
737        binary_search_log_position(&mut file, file_size, to_time, false)?
738    } else {
739        file_size
740    };
741
742    if start_pos >= end_pos {
743        return Ok(vec![]);
744    }
745
746    file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
747    let mut reader = BufReader::new(&file);
748    let mut lines = Vec::new();
749    let mut current_pos = start_pos;
750
751    loop {
752        if current_pos >= end_pos {
753            break;
754        }
755
756        let mut line = String::new();
757        match reader.read_line(&mut line) {
758            Ok(0) => break,
759            Ok(bytes_read) => {
760                current_pos += bytes_read as u64;
761                if line.ends_with('\n') {
762                    line.pop();
763                    if line.ends_with('\r') {
764                        line.pop();
765                    }
766                }
767                lines.push(line);
768            }
769            Err(_) => break,
770        }
771    }
772
773    Ok(lines)
774}
775
776fn binary_search_log_position(
777    file: &mut File,
778    file_size: u64,
779    target_time: DateTime<Local>,
780    find_start: bool,
781) -> Result<u64> {
782    let mut low: u64 = 0;
783    let mut high: u64 = file_size;
784
785    while low < high {
786        let mid = low + (high - low) / 2;
787
788        let line_start = find_line_start(file, mid)?;
789
790        file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
791        let mut reader = BufReader::new(&*file);
792        let mut line = String::new();
793        let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
794        if bytes_read == 0 {
795            high = mid;
796            continue;
797        }
798
799        let line_time = extract_timestamp(&line);
800
801        match line_time {
802            Some(lt) => {
803                if find_start {
804                    if lt < target_time {
805                        low = line_start + bytes_read as u64;
806                    } else {
807                        high = line_start;
808                    }
809                } else if lt <= target_time {
810                    low = line_start + bytes_read as u64;
811                } else {
812                    high = line_start;
813                }
814            }
815            None => {
816                low = line_start + bytes_read as u64;
817            }
818        }
819    }
820
821    find_line_start(file, low)
822}
823
824fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
825    if pos == 0 {
826        return Ok(0);
827    }
828
829    // Start searching from the byte just before `pos`.
830    let mut search_pos = pos.saturating_sub(1);
831    const CHUNK_SIZE: usize = 8192;
832
833    loop {
834        // Determine the start of the chunk we want to read.
835        let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
836        let len_u64 = search_pos - chunk_start + 1;
837        let len = len_u64 as usize;
838
839        // Seek once to the beginning of this chunk.
840        file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
841        let mut buf = vec![0u8; len];
842        if file.read_exact(&mut buf).is_err() {
843            // Match the original behavior: on read error, fall back to start of file.
844            return Ok(0);
845        }
846
847        // Scan this chunk backwards for a newline.
848        for (i, &b) in buf.iter().enumerate().rev() {
849            if b == b'\n' {
850                return Ok(chunk_start + i as u64 + 1);
851            }
852        }
853
854        // No newline in this chunk; if we've reached the start of the file,
855        // there is no earlier newline.
856        if chunk_start == 0 {
857            return Ok(0);
858        }
859
860        // Move to the previous chunk (just before this one).
861        search_pos = chunk_start - 1;
862    }
863}
864
865fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
866    let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
867    re.captures(line)
868        .and_then(|caps| caps.get(1))
869        .and_then(|m| parse_datetime(m.as_str()).ok())
870}
871
872fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
873    let lines = if reverse {
874        lines.into_iter().rev().collect()
875    } else {
876        lines
877    };
878
879    let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (\w+) (.*)$");
880    lines
881        .into_iter()
882        .fold(vec![], |mut acc, line| match re.captures(&line) {
883            Some(caps) => {
884                let (date, msg) = match (caps.get(1), caps.get(3)) {
885                    (Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
886                    _ => return acc,
887                };
888                acc.push((date, id.to_string(), msg));
889                acc
890            }
891            None => {
892                if let Some(l) = acc.last_mut() {
893                    l.2.push('\n');
894                    l.2.push_str(&line);
895                }
896                acc
897            }
898        })
899}
900
901fn get_all_daemon_ids() -> Result<Vec<String>> {
902    Ok(xx::file::ls(&*env::PITCHFORK_LOGS_DIR)?
903        .into_iter()
904        .filter(|d| !d.starts_with("."))
905        .filter(|d| d.is_dir())
906        .filter_map(|d| d.file_name().map(|f| f.to_string_lossy().to_string()))
907        .collect())
908}
909
910fn get_log_file_infos(names: &[String]) -> Result<BTreeMap<String, LogFile>> {
911    let names = names.iter().collect::<HashSet<_>>();
912    xx::file::ls(&*env::PITCHFORK_LOGS_DIR)?
913        .into_iter()
914        .filter(|d| !d.starts_with("."))
915        .filter(|d| d.is_dir())
916        .filter_map(|d| d.file_name().map(|f| f.to_string_lossy().to_string()))
917        .filter(|n| names.is_empty() || names.contains(n))
918        .map(|n| {
919            let path = env::PITCHFORK_LOGS_DIR
920                .join(&n)
921                .join(format!("{n}.log"))
922                .canonicalize()
923                .into_diagnostic()?;
924            let mut file = xx::file::open(&path)?;
925            // Seek to end and get position atomically to avoid race condition
926            // where content is written between metadata check and file open
927            file.seek(SeekFrom::End(0)).into_diagnostic()?;
928            let cur = file.stream_position().into_diagnostic()?;
929            Ok((
930                n.clone(),
931                LogFile {
932                    _name: n,
933                    file,
934                    cur,
935                    path,
936                },
937            ))
938        })
939        .filter_ok(|(_, f)| f.path.exists())
940        .collect::<Result<BTreeMap<_, _>>>()
941}
942
943pub async fn tail_logs(names: &[String]) -> Result<()> {
944    let mut log_files = get_log_file_infos(names)?;
945    let mut wf = WatchFiles::new(Duration::from_millis(10))?;
946
947    for lf in log_files.values() {
948        wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
949    }
950
951    let files_to_name = log_files
952        .iter()
953        .map(|(n, f)| (f.path.clone(), n.clone()))
954        .collect::<HashMap<_, _>>();
955
956    while let Some(paths) = wf.rx.recv().await {
957        let mut out = vec![];
958        for path in paths {
959            let Some(name) = files_to_name.get(&path) else {
960                warn!("Unknown log file changed: {}", path.display());
961                continue;
962            };
963            let Some(info) = log_files.get_mut(name) else {
964                warn!("No log info for: {name}");
965                continue;
966            };
967            info.file
968                .seek(SeekFrom::Start(info.cur))
969                .into_diagnostic()?;
970            let reader = BufReader::new(&info.file);
971            let lines = reader.lines().map_while(Result::ok).collect_vec();
972            info.cur = info.file.stream_position().into_diagnostic()?;
973            out.extend(merge_log_lines(name, lines, false));
974        }
975        let out = out
976            .into_iter()
977            .sorted_by_cached_key(|l| l.0.to_string())
978            .collect_vec();
979        for (date, name, msg) in out {
980            println!("{} {} {}", edim(&date), name, msg);
981        }
982    }
983    Ok(())
984}
985
986struct LogFile {
987    _name: String,
988    path: PathBuf,
989    file: fs::File,
990    cur: u64,
991}
992
993fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
994    let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
995    Local
996        .from_local_datetime(&naive_dt)
997        .single()
998        .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
999}
1000
1001/// Parse time input string into DateTime.
1002///
1003/// `is_since` indicates whether this is for --since (true) or --until (false).
1004/// The "yesterday fallback" only applies to --since: if the time is in the future,
1005/// assume the user meant yesterday. For --until, future times are kept as-is.
1006fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
1007    let s = s.trim();
1008
1009    // Try full datetime first (YYYY-MM-DD HH:MM:SS)
1010    if let Ok(dt) = parse_datetime(s) {
1011        return Ok(dt);
1012    }
1013
1014    // Try datetime without seconds (YYYY-MM-DD HH:MM)
1015    if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
1016        return Local
1017            .from_local_datetime(&naive_dt)
1018            .single()
1019            .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
1020    }
1021
1022    // Try time-only format (HH:MM:SS or HH:MM)
1023    // Note: This branch won't be reached for inputs like "10:30" that could match
1024    // parse_datetime, because parse_datetime expects a full date prefix and will fail.
1025    if let Ok(time) = parse_time_only(s) {
1026        let now = Local::now();
1027        let today = now.date_naive();
1028        let mut naive_dt = NaiveDateTime::new(today, time);
1029        let mut dt = Local
1030            .from_local_datetime(&naive_dt)
1031            .single()
1032            .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1033
1034        // If the interpreted time for today is in the future, assume the user meant yesterday
1035        // BUT only for --since. For --until, a future time today is valid.
1036        if is_since
1037            && dt > now
1038            && let Some(yesterday) = today.pred_opt()
1039        {
1040            naive_dt = NaiveDateTime::new(yesterday, time);
1041            dt = Local
1042                .from_local_datetime(&naive_dt)
1043                .single()
1044                .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1045        }
1046        return Ok(dt);
1047    }
1048
1049    if let Ok(duration) = humantime::parse_duration(s) {
1050        let now = Local::now();
1051        let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
1052        return Ok(target);
1053    }
1054
1055    Err(miette::miette!(
1056        "Invalid time format: '{}'. Expected formats:\n\
1057         - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
1058         - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
1059         - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
1060        s
1061    ))
1062}
1063
1064fn parse_time_only(s: &str) -> Result<NaiveTime> {
1065    if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
1066        return Ok(time);
1067    }
1068
1069    if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
1070        return Ok(time);
1071    }
1072
1073    Err(miette::miette!("Invalid time format: '{}'", s))
1074}
1075
1076pub fn print_logs_for_time_range(
1077    daemon_id: &str,
1078    from: DateTime<Local>,
1079    to: Option<DateTime<Local>>,
1080) -> Result<()> {
1081    let daemon_ids = vec![daemon_id.to_string()];
1082    let log_files = get_log_file_infos(&daemon_ids)?;
1083
1084    let from = from
1085        .with_nanosecond(0)
1086        .expect("0 is always valid for nanoseconds");
1087    let to = to.map(|t| {
1088        t.with_nanosecond(0)
1089            .expect("0 is always valid for nanoseconds")
1090    });
1091
1092    let log_lines = log_files
1093        .iter()
1094        .flat_map(
1095            |(name, lf)| match read_lines_in_time_range(&lf.path, Some(from), to) {
1096                Ok(lines) => merge_log_lines(name, lines, false),
1097                Err(e) => {
1098                    error!("{}: {}", lf.path.display(), e);
1099                    vec![]
1100                }
1101            },
1102        )
1103        .sorted_by_cached_key(|l| l.0.to_string())
1104        .collect_vec();
1105
1106    if log_lines.is_empty() {
1107        eprintln!("No logs found for daemon '{daemon_id}' in the specified time range");
1108    } else {
1109        eprintln!("\n{} {} {}", edim("==="), edim("Error logs"), edim("==="));
1110        for (date, _id, msg) in log_lines {
1111            eprintln!("{} {}", edim(&date), msg);
1112        }
1113        eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1114    }
1115
1116    Ok(())
1117}
1118
1119pub fn print_startup_logs(daemon_id: &str, from: DateTime<Local>) -> Result<()> {
1120    let daemon_ids = vec![daemon_id.to_string()];
1121    let log_files = get_log_file_infos(&daemon_ids)?;
1122
1123    let from = from
1124        .with_nanosecond(0)
1125        .expect("0 is always valid for nanoseconds");
1126
1127    let log_lines = log_files
1128        .iter()
1129        .flat_map(
1130            |(name, lf)| match read_lines_in_time_range(&lf.path, Some(from), None) {
1131                Ok(lines) => merge_log_lines(name, lines, false),
1132                Err(e) => {
1133                    error!("{}: {}", lf.path.display(), e);
1134                    vec![]
1135                }
1136            },
1137        )
1138        .sorted_by_cached_key(|l| l.0.to_string())
1139        .collect_vec();
1140
1141    if !log_lines.is_empty() {
1142        eprintln!("\n{} {} {}", edim("==="), edim("Startup logs"), edim("==="));
1143        for (date, _id, msg) in log_lines {
1144            eprintln!("{} {}", edim(&date), msg);
1145        }
1146        eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1147    }
1148
1149    Ok(())
1150}