Skip to main content

pitchfork_cli/cli/
logs.rs

1use crate::daemon_id::DaemonId;
2use crate::pitchfork_toml::PitchforkToml;
3use crate::state_file::StateFile;
4use crate::ui::style::{edim, estyle, ndim};
5use crate::{Result, env};
6use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
7use console;
8use itertools::Itertools;
9use miette::IntoDiagnostic;
10use std::cmp::{Ordering, Reverse};
11use std::collections::{BTreeSet, BinaryHeap};
12use std::fs::{self, File};
13use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
14use std::path::{Path, PathBuf};
15use std::process::{Child, Command, Stdio};
16use std::time::Duration;
17use xx::regex;
18
19/// Pager configuration for displaying logs
20struct PagerConfig {
21    command: String,
22    args: Vec<String>,
23}
24
25impl PagerConfig {
26    /// Select and configure the appropriate pager.
27    /// Uses $PAGER environment variable if set, otherwise defaults to less.
28    fn new(start_at_end: bool) -> Self {
29        let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
30        let args = Self::build_args(&command, start_at_end);
31        Self { command, args }
32    }
33
34    fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
35        let mut args = vec![];
36        if pager == "less" {
37            args.push("-R".to_string());
38            if start_at_end {
39                args.push("+G".to_string());
40            }
41        }
42        args
43    }
44
45    /// Spawn the pager with piped stdin
46    fn spawn_piped(&self) -> io::Result<Child> {
47        Command::new(&self.command)
48            .args(&self.args)
49            .stdin(Stdio::piped())
50            .spawn()
51    }
52}
53
54/// Format a single log line for output.
55/// When `single_daemon` is true, omits the daemon ID from the output.
56/// `id_width` is the display width used to pad the daemon name column
57/// so messages line up vertically across different daemon names.
58/// When `strip_ansi` is true, strips ANSI escape codes from the message.
59fn format_log_line(
60    date: &str,
61    id: &str,
62    msg: &str,
63    single_daemon: bool,
64    id_width: usize,
65    strip_ansi: bool,
66) -> String {
67    let msg = if strip_ansi {
68        console::strip_ansi_codes(msg).to_string()
69    } else {
70        msg.to_string()
71    };
72    if single_daemon {
73        format!("{} {}", ndim(date), msg)
74    } else {
75        let colors_on = !strip_ansi && console::colors_enabled();
76        let colored = dimmed_id(id, colors_on);
77        let padded = console::pad_str(&colored, id_width, console::Alignment::Left, None);
78        format!("{}  {} {}", padded, ndim(date), msg)
79    }
80}
81
82/// Return a dimmed, colorized daemon ID string for display.
83/// Each daemon gets a deterministic color via FNV-1a hash so that
84/// multiple daemons are visually distinguishable while remaining subtle.
85fn dimmed_id(id: &str, colors_enabled: bool) -> String {
86    if !colors_enabled {
87        return id.to_string();
88    }
89    let colors = [
90        (180, 120, 120), // dim red
91        (180, 160, 100), // dim yellow
92        (120, 180, 120), // dim green
93        (120, 180, 180), // dim cyan
94        (180, 120, 180), // dim magenta
95        (120, 160, 180), // dim blue
96    ];
97    let mut h: usize = 0x811C_9DC5; // FNV offset basis
98    for b in id.bytes() {
99        h = h.wrapping_mul(0x0100_0193).wrapping_add(b as usize);
100    }
101    let (r, g, b) = colors[h % colors.len()];
102    format!("\x1b[2;38;2;{};{};{}m{}\x1b[0m", r, g, b, id)
103}
104
105/// Return a colorized `[namespace/id]` label for display in progress jobs.
106/// Uses brighter colors than `dimmed_id` and includes the square brackets.
107pub fn colored_id_label(id: &str, colors_enabled: bool) -> String {
108    if !colors_enabled {
109        return format!("[{}]", id);
110    }
111    // Same palette as mise: Blue, Magenta, Cyan, Green
112    // Excludes Red/Yellow to avoid confusion with errors/warnings.
113    let colors: [u8; 4] = [34, 35, 36, 32]; // ANSI: Blue, Magenta, Cyan, Green
114    let mut h: usize = 0x811C_9DC5; // FNV offset basis
115    for b in id.bytes() {
116        h = h.wrapping_mul(0x0100_0193).wrapping_add(b as usize);
117    }
118    let color = colors[h % colors.len()];
119    format!("\x1b[{color}m[{id}]\x1b[0m")
120}
121
122/// A parsed log entry with timestamp, daemon name, and message
123#[derive(Debug)]
124struct LogEntry {
125    timestamp: String,
126    daemon: String,
127    message: String,
128    source_idx: usize, // Index of the source iterator
129}
130
131impl PartialEq for LogEntry {
132    fn eq(&self, other: &Self) -> bool {
133        self.timestamp == other.timestamp
134    }
135}
136
137impl Eq for LogEntry {}
138
139impl PartialOrd for LogEntry {
140    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
141        Some(self.cmp(other))
142    }
143}
144
145impl Ord for LogEntry {
146    fn cmp(&self, other: &Self) -> Ordering {
147        self.timestamp.cmp(&other.timestamp)
148    }
149}
150
151/// Streaming merger for multiple sorted log files using a min-heap.
152/// This allows merging sorted iterators without loading all data into memory.
153struct StreamingMerger<I>
154where
155    I: Iterator<Item = (String, String)>,
156{
157    sources: Vec<(String, I)>,           // (daemon_name, line_iterator)
158    heap: BinaryHeap<Reverse<LogEntry>>, // Min-heap (using Reverse for ascending order)
159}
160
161impl<I> StreamingMerger<I>
162where
163    I: Iterator<Item = (String, String)>,
164{
165    fn new() -> Self {
166        Self {
167            sources: Vec::new(),
168            heap: BinaryHeap::new(),
169        }
170    }
171
172    fn add_source(&mut self, daemon_name: String, iter: I) {
173        self.sources.push((daemon_name, iter));
174    }
175
176    fn initialize(&mut self) {
177        // Pull the first entry from each source into the heap
178        for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
179            if let Some((timestamp, message)) = iter.next() {
180                self.heap.push(Reverse(LogEntry {
181                    timestamp,
182                    daemon: daemon.clone(),
183                    message,
184                    source_idx: idx,
185                }));
186            }
187        }
188    }
189}
190
191impl<I> Iterator for StreamingMerger<I>
192where
193    I: Iterator<Item = (String, String)>,
194{
195    type Item = (String, String, String); // (timestamp, daemon, message)
196
197    fn next(&mut self) -> Option<Self::Item> {
198        // Pop the smallest entry from the heap
199        let Reverse(entry) = self.heap.pop()?;
200
201        // Pull the next entry from the same source and push to heap
202        let (daemon, iter) = &mut self.sources[entry.source_idx];
203        if let Some((timestamp, message)) = iter.next() {
204            self.heap.push(Reverse(LogEntry {
205                timestamp,
206                daemon: daemon.clone(),
207                message,
208                source_idx: entry.source_idx,
209            }));
210        }
211
212        Some((entry.timestamp, entry.daemon, entry.message))
213    }
214}
215
216/// A proper streaming log parser that handles multi-line entries
217struct StreamingLogParser {
218    reader: BufReader<File>,
219    current_entry: Option<(String, String)>,
220    finished: bool,
221}
222
223impl StreamingLogParser {
224    fn new(file: File) -> Self {
225        Self {
226            reader: BufReader::new(file),
227            current_entry: None,
228            finished: false,
229        }
230    }
231}
232
233impl Iterator for StreamingLogParser {
234    type Item = (String, String);
235
236    fn next(&mut self) -> Option<Self::Item> {
237        if self.finished {
238            return None;
239        }
240
241        let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
242
243        loop {
244            let mut line = String::new();
245            match self.reader.read_line(&mut line) {
246                Ok(0) => {
247                    // EOF - return the last entry if any
248                    self.finished = true;
249                    return self.current_entry.take();
250                }
251                Ok(_) => {
252                    // Remove trailing newline
253                    if line.ends_with('\n') {
254                        line.pop();
255                        if line.ends_with('\r') {
256                            line.pop();
257                        }
258                    }
259
260                    if let Some(caps) = re.captures(&line) {
261                        let date = match caps.get(1) {
262                            Some(d) => d.as_str().to_string(),
263                            None => continue,
264                        };
265                        let msg = match caps.get(3) {
266                            Some(m) => m.as_str().to_string(),
267                            None => continue,
268                        };
269
270                        // Return the previous entry and start a new one
271                        let prev = self.current_entry.take();
272                        self.current_entry = Some((date, msg));
273
274                        if prev.is_some() {
275                            return prev;
276                        }
277                        // First entry - continue to read more
278                    } else {
279                        // Continuation line - append to current entry
280                        if let Some((_, ref mut msg)) = self.current_entry {
281                            msg.push('\n');
282                            msg.push_str(&line);
283                        }
284                    }
285                }
286                Err(_) => {
287                    self.finished = true;
288                    return self.current_entry.take();
289                }
290            }
291        }
292    }
293}
294
295/// Displays logs for daemon(s)
296#[derive(Debug, clap::Args)]
297#[clap(
298    visible_alias = "l",
299    verbatim_doc_comment,
300    long_about = "\
301Displays logs for daemon(s)
302
303Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
304and include timestamps for filtering.
305
306Examples:
307  pitchfork logs api              Show all logs for 'api' (paged if needed)
308  pitchfork logs api worker       Show logs for multiple daemons
309  pitchfork logs                  Show logs for all daemons
310  pitchfork logs api -n 50        Show last 50 lines
311  pitchfork logs api --follow     Follow logs in real-time
312  pitchfork logs api --since '2024-01-15 10:00:00'
313                                  Show logs since a specific time (forward)
314  pitchfork logs api --since '10:30:00'
315                                  Show logs since 10:30:00 today
316  pitchfork logs api --since '10:30' --until '12:00'
317                                  Show logs since 10:30:00 until 12:00:00 today
318  pitchfork logs api --since 5min Show logs from last 5 minutes
319  pitchfork logs api --raw        Output raw log lines without formatting
320  pitchfork logs api --raw -n 100 Output last 100 raw log lines
321  pitchfork logs api --clear      Delete logs for 'api'
322  pitchfork logs --clear          Delete logs for all daemons"
323)]
324pub struct Logs {
325    /// Show only logs for the specified daemon(s)
326    id: Vec<String>,
327
328    /// Delete logs
329    #[clap(short, long)]
330    clear: bool,
331
332    /// Show last N lines of logs
333    ///
334    /// Only applies when --since/--until is not used.
335    /// Without this option, all logs are shown.
336    #[clap(short)]
337    n: Option<usize>,
338
339    /// Show logs in real-time
340    #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
341    tail: bool,
342
343    /// Show logs from this time
344    ///
345    /// Supports multiple formats:
346    /// - Full datetime: "YYYY-MM-DD HH:MM:SS" or "YYYY-MM-DD HH:MM"
347    /// - Time only: "HH:MM:SS" or "HH:MM" (uses today's date)
348    /// - Relative time: "5min", "2h", "1d" (e.g., last 5 minutes)
349    #[clap(short = 's', long)]
350    since: Option<String>,
351
352    /// Show logs until this time
353    ///
354    /// Supports multiple formats:
355    /// - Full datetime: "YYYY-MM-DD HH:MM:SS" or "YYYY-MM-DD HH:MM"
356    /// - Time only: "HH:MM:SS" or "HH:MM" (uses today's date)
357    #[clap(short = 'u', long)]
358    until: Option<String>,
359
360    /// Disable pager even in interactive terminal
361    #[clap(long)]
362    no_pager: bool,
363
364    /// Output raw log lines without color or formatting
365    #[clap(long)]
366    raw: bool,
367}
368
369impl Logs {
370    pub async fn run(&self) -> Result<()> {
371        // Migrate legacy log directories (old format: "api" → new format: "legacy--api").
372        // This is idempotent and silent so it is safe to run on every invocation.
373        migrate_legacy_log_dirs();
374
375        // Resolve user-provided IDs to qualified IDs
376        let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
377            // When no IDs provided, use all daemon IDs
378            get_all_daemon_ids()?
379        } else {
380            PitchforkToml::resolve_ids(&self.id)?
381        };
382
383        if self.clear {
384            for id in &resolved_ids {
385                let path = id.log_path();
386                if path.exists() {
387                    xx::file::create(&path)?;
388                }
389            }
390            return Ok(());
391        }
392
393        let from = if let Some(since) = self.since.as_ref() {
394            Some(parse_time_input(since, true)?)
395        } else {
396            None
397        };
398        let to = if let Some(until) = self.until.as_ref() {
399            Some(parse_time_input(until, false)?)
400        } else {
401            None
402        };
403
404        let single_daemon = resolved_ids.len() == 1;
405        self.print_existing_logs(&resolved_ids, from, to, single_daemon)?;
406        if self.tail {
407            tail_logs(&resolved_ids, single_daemon, true).await?;
408        }
409
410        Ok(())
411    }
412
413    fn print_existing_logs(
414        &self,
415        resolved_ids: &[DaemonId],
416        from: Option<DateTime<Local>>,
417        to: Option<DateTime<Local>>,
418        single_daemon: bool,
419    ) -> Result<()> {
420        let valid_ids: Vec<DaemonId> = resolved_ids
421            .iter()
422            .filter(|id| id.log_path().exists())
423            .cloned()
424            .collect();
425        let id_width = valid_ids
426            .iter()
427            .map(|id| id.qualified().len())
428            .max()
429            .unwrap_or(0);
430        trace!(
431            "log files for: {}",
432            valid_ids.iter().map(|id| id.qualified()).join(", ")
433        );
434        let has_time_filter = from.is_some() || to.is_some();
435
436        if has_time_filter {
437            let mut log_lines = self.collect_log_lines_forward(&valid_ids, from, to)?;
438
439            if let Some(n) = self.n {
440                let len = log_lines.len();
441                if len > n {
442                    log_lines = log_lines.into_iter().skip(len - n).collect_vec();
443                }
444            }
445
446            self.output_logs(
447                log_lines,
448                single_daemon,
449                id_width,
450                has_time_filter,
451                self.raw,
452            )?;
453        } else if let Some(n) = self.n {
454            let log_lines = self.collect_log_lines_reverse(&valid_ids, Some(n))?;
455            self.output_logs(
456                log_lines,
457                single_daemon,
458                id_width,
459                has_time_filter,
460                self.raw,
461            )?;
462        } else {
463            self.stream_logs_to_pager(&valid_ids, single_daemon, id_width, self.raw)?;
464        }
465
466        Ok(())
467    }
468
469    fn collect_log_lines_forward(
470        &self,
471        ids: &[DaemonId],
472        from: Option<DateTime<Local>>,
473        to: Option<DateTime<Local>>,
474    ) -> Result<Vec<(String, String, String)>> {
475        let log_lines: Vec<(String, String, String)> = ids
476            .iter()
477            .flat_map(|id| {
478                let path = id.log_path();
479                match read_lines_in_time_range(&path, from, to) {
480                    Ok((lines, _)) => merge_log_lines(&id.qualified(), lines, false),
481                    Err(e) => {
482                        error!("{}: {}", path.display(), e);
483                        vec![]
484                    }
485                }
486            })
487            .sorted_by_cached_key(|l| l.0.to_string())
488            .collect_vec();
489
490        Ok(log_lines)
491    }
492
493    fn collect_log_lines_reverse(
494        &self,
495        ids: &[DaemonId],
496        limit: Option<usize>,
497    ) -> Result<Vec<(String, String, String)>> {
498        let log_lines: Vec<(String, String, String)> = ids
499            .iter()
500            .flat_map(|id| {
501                let path = id.log_path();
502                let rev = match xx::file::open(&path) {
503                    Ok(f) => rev_lines::RevLines::new(f),
504                    Err(e) => {
505                        error!("{}: {}", path.display(), e);
506                        return vec![];
507                    }
508                };
509                let lines = rev.into_iter().filter_map(Result::ok);
510                let lines = match limit {
511                    Some(n) => lines.take(n).collect_vec(),
512                    None => lines.collect_vec(),
513                };
514                merge_log_lines(&id.qualified(), lines, true)
515            })
516            .sorted_by_cached_key(|l| l.0.to_string())
517            .collect_vec();
518
519        let log_lines = match limit {
520            Some(n) => {
521                let len = log_lines.len();
522                if len > n {
523                    log_lines.into_iter().skip(len - n).collect_vec()
524                } else {
525                    log_lines
526                }
527            }
528            None => log_lines,
529        };
530
531        Ok(log_lines)
532    }
533
534    fn output_logs(
535        &self,
536        log_lines: Vec<(String, String, String)>,
537        single_daemon: bool,
538        id_width: usize,
539        has_time_filter: bool,
540        raw: bool,
541    ) -> Result<()> {
542        if log_lines.is_empty() {
543            return Ok(());
544        }
545
546        let strip_ansi = raw || !console::colors_enabled();
547
548        // Raw mode: output without formatting and without pager
549        if raw {
550            for (date, id, msg) in log_lines {
551                let line = format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi);
552                println!("{line}");
553            }
554            return Ok(());
555        }
556
557        let use_pager = !self.no_pager && should_use_pager(log_lines.len());
558
559        if use_pager {
560            self.output_with_pager(
561                log_lines,
562                single_daemon,
563                id_width,
564                has_time_filter,
565                strip_ansi,
566            )?;
567        } else {
568            for (date, id, msg) in log_lines {
569                println!(
570                    "{}",
571                    format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
572                );
573            }
574        }
575
576        Ok(())
577    }
578
579    fn output_with_pager(
580        &self,
581        log_lines: Vec<(String, String, String)>,
582        single_daemon: bool,
583        id_width: usize,
584        has_time_filter: bool,
585        strip_ansi: bool,
586    ) -> Result<()> {
587        // When time filter is used, start at top; otherwise start at end
588        let pager_config = PagerConfig::new(!has_time_filter);
589
590        match pager_config.spawn_piped() {
591            Ok(mut child) => {
592                if let Some(stdin) = child.stdin.as_mut() {
593                    for (date, id, msg) in log_lines {
594                        let line = format!(
595                            "{}\n",
596                            format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
597                        );
598                        if stdin.write_all(line.as_bytes()).is_err() {
599                            break;
600                        }
601                    }
602                    let _ = child.wait();
603                } else {
604                    debug!("Failed to get pager stdin, falling back to direct output");
605                    for (date, id, msg) in log_lines {
606                        println!(
607                            "{}",
608                            format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
609                        );
610                    }
611                }
612            }
613            Err(e) => {
614                debug!("Failed to spawn pager: {e}, falling back to direct output");
615                for (date, id, msg) in log_lines {
616                    println!(
617                        "{}",
618                        format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
619                    );
620                }
621            }
622        }
623
624        Ok(())
625    }
626
627    fn stream_logs_to_pager(
628        &self,
629        ids: &[DaemonId],
630        single_daemon: bool,
631        id_width: usize,
632        raw: bool,
633    ) -> Result<()> {
634        let strip_ansi = raw || !console::colors_enabled();
635
636        if !io::stdout().is_terminal() || self.no_pager || self.tail || raw {
637            return self.stream_logs_direct(ids, single_daemon, id_width, raw, strip_ansi);
638        }
639
640        let pager_config = PagerConfig::new(true); // start_at_end = true
641
642        match pager_config.spawn_piped() {
643            Ok(mut child) => {
644                if let Some(stdin) = child.stdin.take() {
645                    // Collect file info for the streaming thread
646                    let file_infos: Vec<_> = ids
647                        .iter()
648                        .map(|id| (id.qualified(), id.log_path()))
649                        .collect();
650                    let single_daemon_clone = single_daemon;
651                    let strip_ansi_clone = strip_ansi;
652                    let id_width_clone = id_width;
653
654                    // Stream logs using a background thread to avoid blocking
655                    std::thread::spawn(move || {
656                        let mut writer = BufWriter::new(stdin);
657
658                        // Single file: stream directly without merge overhead
659                        if file_infos.len() == 1 {
660                            let (name, path) = &file_infos[0];
661                            let file = match File::open(path) {
662                                Ok(f) => f,
663                                Err(_) => return,
664                            };
665                            let parser = StreamingLogParser::new(file);
666                            for (timestamp, message) in parser {
667                                let output = format!(
668                                    "{}\n",
669                                    format_log_line(
670                                        &timestamp,
671                                        name,
672                                        &message,
673                                        single_daemon_clone,
674                                        id_width_clone,
675                                        strip_ansi_clone
676                                    )
677                                );
678                                if writer.write_all(output.as_bytes()).is_err() {
679                                    return;
680                                }
681                            }
682                            let _ = writer.flush();
683                            return;
684                        }
685
686                        // Multiple files: use streaming merger for sorted/interleaved output
687                        let mut merger: StreamingMerger<StreamingLogParser> =
688                            StreamingMerger::new();
689
690                        for (name, path) in file_infos {
691                            let file = match File::open(&path) {
692                                Ok(f) => f,
693                                Err(_) => continue,
694                            };
695                            let parser = StreamingLogParser::new(file);
696                            merger.add_source(name, parser);
697                        }
698
699                        // Initialize the heap with first entry from each source
700                        merger.initialize();
701
702                        // Stream merged entries to pager
703                        for (timestamp, daemon, message) in merger {
704                            let output = format!(
705                                "{}\n",
706                                format_log_line(
707                                    &timestamp,
708                                    &daemon,
709                                    &message,
710                                    single_daemon_clone,
711                                    id_width_clone,
712                                    strip_ansi_clone
713                                )
714                            );
715                            if writer.write_all(output.as_bytes()).is_err() {
716                                return;
717                            }
718                        }
719
720                        let _ = writer.flush();
721                    });
722
723                    let _ = child.wait();
724                } else {
725                    debug!("Failed to get pager stdin, falling back to direct output");
726                    return self.stream_logs_direct(ids, single_daemon, id_width, raw, strip_ansi);
727                }
728            }
729            Err(e) => {
730                debug!("Failed to spawn pager: {e}, falling back to direct output");
731                return self.stream_logs_direct(ids, single_daemon, id_width, raw, strip_ansi);
732            }
733        }
734
735        Ok(())
736    }
737
738    fn stream_logs_direct(
739        &self,
740        ids: &[DaemonId],
741        single_daemon: bool,
742        id_width: usize,
743        raw: bool,
744        strip_ansi: bool,
745    ) -> Result<()> {
746        // Fast path for single daemon: directly output file content without parsing
747        // This avoids expensive regex parsing for each line in large log files
748        if ids.len() == 1 {
749            let daemon_id = &ids[0];
750            let path = daemon_id.log_path();
751            let file = match File::open(&path) {
752                Ok(f) => f,
753                Err(e) => {
754                    error!("{}: {}", path.display(), e);
755                    return Ok(());
756                }
757            };
758            let reader = BufReader::new(file);
759            if raw {
760                // Raw mode: output lines as-is (but strip ansi if colors disabled)
761                for line in reader.lines() {
762                    match line {
763                        Ok(l) => {
764                            let l = if strip_ansi {
765                                console::strip_ansi_codes(&l).to_string()
766                            } else {
767                                l
768                            };
769                            if io::stdout().write_all(l.as_bytes()).is_err()
770                                || io::stdout().write_all(b"\n").is_err()
771                            {
772                                return Ok(());
773                            }
774                        }
775                        Err(_) => continue,
776                    }
777                }
778            } else {
779                // Formatted mode: parse and format each line
780                let parser = StreamingLogParser::new(File::open(&path).into_diagnostic()?);
781                for (timestamp, message) in parser {
782                    let output = format!(
783                        "{}\n",
784                        format_log_line(
785                            &timestamp,
786                            &daemon_id.qualified(),
787                            &message,
788                            single_daemon,
789                            id_width,
790                            strip_ansi
791                        )
792                    );
793                    if io::stdout().write_all(output.as_bytes()).is_err() {
794                        return Ok(());
795                    }
796                }
797            }
798            return Ok(());
799        }
800
801        // Multiple daemons: use streaming merger for sorted output
802        let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
803
804        for id in ids {
805            let path = id.log_path();
806            let file = match File::open(&path) {
807                Ok(f) => f,
808                Err(e) => {
809                    error!("{}: {}", path.display(), e);
810                    continue;
811                }
812            };
813            let parser = StreamingLogParser::new(file);
814            merger.add_source(id.qualified(), parser);
815        }
816
817        // Initialize the heap with first entry from each source
818        merger.initialize();
819
820        // Stream merged entries to stdout
821        for (timestamp, daemon, message) in merger {
822            let output = format!(
823                "{}\n",
824                format_log_line(
825                    &timestamp,
826                    &daemon,
827                    &message,
828                    single_daemon,
829                    id_width,
830                    strip_ansi
831                )
832            );
833            if io::stdout().write_all(output.as_bytes()).is_err() {
834                return Ok(());
835            }
836        }
837
838        Ok(())
839    }
840}
841
842fn should_use_pager(line_count: usize) -> bool {
843    if !io::stdout().is_terminal() {
844        return false;
845    }
846
847    let terminal_height = get_terminal_height().unwrap_or(24);
848    line_count > terminal_height
849}
850
851fn get_terminal_height() -> Option<usize> {
852    if let Ok(rows) = std::env::var("LINES")
853        && let Ok(h) = rows.parse::<usize>()
854    {
855        return Some(h);
856    }
857
858    crossterm::terminal::size().ok().map(|(_, h)| h as usize)
859}
860
861fn read_lines_in_time_range(
862    path: &Path,
863    from: Option<DateTime<Local>>,
864    to: Option<DateTime<Local>>,
865) -> Result<(Vec<String>, u64)> {
866    let mut file = File::open(path).into_diagnostic()?;
867    let file_size = file.metadata().into_diagnostic()?.len();
868
869    if file_size == 0 {
870        return Ok((vec![], 0));
871    }
872
873    let start_pos = if let Some(from_time) = from {
874        binary_search_log_position(&mut file, file_size, from_time, true)?
875    } else {
876        0
877    };
878
879    let end_pos = if let Some(to_time) = to {
880        binary_search_log_position(&mut file, file_size, to_time, false)?
881    } else {
882        file_size
883    };
884
885    if start_pos >= end_pos {
886        return Ok((vec![], end_pos));
887    }
888
889    file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
890    let mut reader = BufReader::new(&file);
891    let mut lines = Vec::new();
892    let mut current_pos = start_pos;
893
894    loop {
895        if current_pos >= end_pos {
896            break;
897        }
898
899        let mut line = String::new();
900        match reader.read_line(&mut line) {
901            Ok(0) => break,
902            Ok(bytes_read) => {
903                current_pos += bytes_read as u64;
904                if line.ends_with('\n') {
905                    line.pop();
906                    if line.ends_with('\r') {
907                        line.pop();
908                    }
909                }
910                lines.push(line);
911            }
912            Err(_) => break,
913        }
914    }
915
916    Ok((lines, end_pos))
917}
918
919fn binary_search_log_position(
920    file: &mut File,
921    file_size: u64,
922    target_time: DateTime<Local>,
923    find_start: bool,
924) -> Result<u64> {
925    let mut low: u64 = 0;
926    let mut high: u64 = file_size;
927
928    while low < high {
929        let mid = low + (high - low) / 2;
930
931        let line_start = find_line_start(file, mid)?;
932
933        file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
934        let mut reader = BufReader::new(&*file);
935        let mut line = String::new();
936        let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
937        if bytes_read == 0 {
938            high = mid;
939            continue;
940        }
941
942        let line_time = extract_timestamp(&line);
943
944        match line_time {
945            Some(lt) => {
946                if find_start {
947                    if lt < target_time {
948                        low = line_start + bytes_read as u64;
949                    } else {
950                        high = line_start;
951                    }
952                } else if lt <= target_time {
953                    low = line_start + bytes_read as u64;
954                } else {
955                    high = line_start;
956                }
957            }
958            None => {
959                low = line_start + bytes_read as u64;
960            }
961        }
962    }
963
964    find_line_start(file, low)
965}
966
967fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
968    if pos == 0 {
969        return Ok(0);
970    }
971
972    // Start searching from the byte just before `pos`.
973    let mut search_pos = pos.saturating_sub(1);
974    const CHUNK_SIZE: usize = 8192;
975
976    loop {
977        // Determine the start of the chunk we want to read.
978        let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
979        let len_u64 = search_pos - chunk_start + 1;
980        let len = len_u64 as usize;
981
982        // Seek once to the beginning of this chunk.
983        file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
984        let mut buf = vec![0u8; len];
985        if file.read_exact(&mut buf).is_err() {
986            // Match the original behavior: on read error, fall back to start of file.
987            return Ok(0);
988        }
989
990        // Scan this chunk backwards for a newline.
991        for (i, &b) in buf.iter().enumerate().rev() {
992            if b == b'\n' {
993                return Ok(chunk_start + i as u64 + 1);
994            }
995        }
996
997        // No newline in this chunk; if we've reached the start of the file,
998        // there is no earlier newline.
999        if chunk_start == 0 {
1000            return Ok(0);
1001        }
1002
1003        // Move to the previous chunk (just before this one).
1004        search_pos = chunk_start - 1;
1005    }
1006}
1007
1008fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
1009    let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
1010    re.captures(line)
1011        .and_then(|caps| caps.get(1))
1012        .and_then(|m| parse_datetime(m.as_str()).ok())
1013}
1014
1015fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
1016    let lines = if reverse {
1017        lines.into_iter().rev().collect()
1018    } else {
1019        lines
1020    };
1021
1022    let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
1023    lines
1024        .into_iter()
1025        .fold(vec![], |mut acc, line| match re.captures(&line) {
1026            Some(caps) => {
1027                let (date, msg) = match (caps.get(1), caps.get(3)) {
1028                    (Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
1029                    _ => return acc,
1030                };
1031                acc.push((date, id.to_string(), msg));
1032                acc
1033            }
1034            None => {
1035                if let Some(l) = acc.last_mut() {
1036                    l.2.push('\n');
1037                    l.2.push_str(&line);
1038                }
1039                acc
1040            }
1041        })
1042}
1043
1044/// Rename legacy log directories that predate namespace-qualified daemon IDs.
1045///
1046/// Old layout: `PITCHFORK_LOGS_DIR/<name>/<name>.log`
1047/// New layout: `PITCHFORK_LOGS_DIR/legacy--<name>/legacy--<name>.log`
1048///
1049/// Only directories that clearly match the old layout are migrated:
1050/// - directory name does not contain `"--"`
1051/// - directory contains `<name>.log`
1052/// - `<name>` is a valid daemon short name under current DaemonId rules
1053fn migrate_legacy_log_dirs() {
1054    let known_safe_paths = known_daemon_safe_paths();
1055    let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
1056        Ok(d) => d,
1057        Err(_) => return,
1058    };
1059    for dir in dirs {
1060        if dir.starts_with(".") || !dir.is_dir() {
1061            continue;
1062        }
1063        let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
1064            Some(n) => n,
1065            None => continue,
1066        };
1067        // New-format directories usually contain "--". For safety, only treat
1068        // them as new-format if they match a known daemon ID safe-path.
1069        if name.contains("--") {
1070            // If it parses as a valid safe-path, treat it as already migrated
1071            // and keep idempotent behavior silent.
1072            if DaemonId::from_safe_path(&name).is_ok() {
1073                continue;
1074            }
1075            // Keep noisy warnings only for invalid/ambiguous names that cannot
1076            // be interpreted as new-format IDs.
1077            if known_safe_paths.contains(&name) {
1078                continue;
1079            }
1080            warn!(
1081                "Skipping invalid legacy log directory '{name}': contains '--' but is not a valid daemon safe-path"
1082            );
1083            continue;
1084        }
1085
1086        // Migrate only explicit old-layout directories to avoid renaming
1087        // unrelated folders under logs/.
1088        let old_log = dir.join(format!("{name}.log"));
1089        if !old_log.exists() {
1090            continue;
1091        }
1092        if DaemonId::try_new("legacy", &name).is_err() {
1093            warn!("Skipping invalid legacy log directory '{name}': not a valid daemon ID");
1094            continue;
1095        }
1096
1097        let new_name = format!("legacy--{name}");
1098        let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
1099        // Skip if a target directory already exists to avoid clobbering data.
1100        if new_dir.exists() {
1101            continue;
1102        }
1103        if std::fs::rename(&dir, &new_dir).is_err() {
1104            continue;
1105        }
1106        // Also rename the log file inside the directory.
1107        let old_log = new_dir.join(format!("{name}.log"));
1108        let new_log = new_dir.join(format!("{new_name}.log"));
1109        if old_log.exists() {
1110            let _ = std::fs::rename(&old_log, &new_log);
1111        }
1112        debug!("Migrated legacy log dir '{name}' → '{new_name}'");
1113    }
1114}
1115
1116fn known_daemon_safe_paths() -> BTreeSet<String> {
1117    let mut out = BTreeSet::new();
1118
1119    match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1120        Ok(state) => {
1121            for id in state.daemons.keys() {
1122                out.insert(id.safe_path());
1123            }
1124        }
1125        Err(e) => {
1126            warn!("Failed to read state while checking known daemon IDs: {e}");
1127        }
1128    }
1129
1130    match PitchforkToml::all_merged() {
1131        Ok(config) => {
1132            for id in config.daemons.keys() {
1133                out.insert(id.safe_path());
1134            }
1135        }
1136        Err(e) => {
1137            warn!("Failed to read config while checking known daemon IDs: {e}");
1138        }
1139    }
1140
1141    out
1142}
1143
1144fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
1145    let mut ids = BTreeSet::new();
1146
1147    match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1148        Ok(state) => ids.extend(state.daemons.keys().cloned()),
1149        Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
1150    }
1151
1152    match PitchforkToml::all_merged() {
1153        Ok(config) => ids.extend(config.daemons.keys().cloned()),
1154        Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
1155    }
1156
1157    Ok(ids
1158        .into_iter()
1159        .filter(|id| id.log_path().exists())
1160        .collect())
1161}
1162
1163pub async fn tail_logs(
1164    names: &[DaemonId],
1165    single_daemon: bool,
1166    start_from_end: bool,
1167) -> Result<()> {
1168    // Poll each log file in a loop instead of using file-system event watchers.
1169    //
1170    // Why polling:
1171    // - Real-time enough: 200ms interval is imperceptible for human consumption,
1172    //   and comparable to what `tail -f` provides.
1173    // - No long-running overhead: `logs --tail` runs in the foreground with the
1174    //   user watching output; the polling stops when the process exits.
1175    // - Cross-platform reliable: avoids edge cases in notify/FSEvents where events
1176    //   can be missed when the writer uses buffered I/O.
1177    //
1178    // `start_from_end`: when true, skip content already output by a prior
1179    // print_existing_logs call (used by `logs --tail`). When false, start from
1180    // the beginning so no content is missed (used by `wait`).
1181    let id_width = names
1182        .iter()
1183        .map(|id| id.qualified().len())
1184        .max()
1185        .unwrap_or(0);
1186
1187    let mut states: Vec<(DaemonId, PathBuf, u64)> = names
1188        .iter()
1189        .filter_map(|id| {
1190            let path = id.log_path();
1191            if !path.exists() {
1192                return None;
1193            }
1194            let pos = if start_from_end {
1195                fs::metadata(&path).map(|m| m.len()).unwrap_or(0)
1196            } else {
1197                0
1198            };
1199            Some((id.clone(), path, pos))
1200        })
1201        .collect();
1202
1203    let strip_ansi = !console::colors_enabled();
1204
1205    let interval = tokio::time::interval(Duration::from_millis(200));
1206    tokio::pin!(interval);
1207
1208    loop {
1209        interval.tick().await;
1210
1211        // Discover log files that appeared since last iteration.
1212        // Always start from position 0 — content written between ticks
1213        // must not be silently dropped.
1214        for id in names {
1215            let path = id.log_path();
1216            if !path.exists() || states.iter().any(|(s, _, _)| s == id) {
1217                continue;
1218            }
1219            states.push((id.clone(), path, 0));
1220        }
1221
1222        let mut out = vec![];
1223        for (id, path, pos) in &mut states {
1224            let mut file = match fs::File::open(path) {
1225                Ok(f) => f,
1226                Err(_) => continue,
1227            };
1228            let file_size = match file.metadata() {
1229                Ok(m) => m.len(),
1230                Err(_) => continue,
1231            };
1232            let start = if *pos > file_size { 0 } else { *pos };
1233            file.seek(SeekFrom::Start(start)).into_diagnostic()?;
1234
1235            // Track bytes consumed rather than using stream_position(),
1236            // which includes BufReader's read-ahead buffer and would skip
1237            // content written concurrently.
1238            let mut reader = BufReader::new(&file);
1239            let mut bytes_read: u64 = 0;
1240            let mut lines = vec![];
1241            loop {
1242                let mut line = String::new();
1243                let n = reader.read_line(&mut line).into_diagnostic()?;
1244                if n == 0 {
1245                    break;
1246                }
1247                // Only advance position for complete lines (ending with \n).
1248                // Partial lines at the end of file may still be written to;
1249                // leave them for the next tick.
1250                if line.ends_with('\n') {
1251                    bytes_read += n as u64;
1252                    line.pop();
1253                    if line.ends_with('\r') {
1254                        line.pop();
1255                    }
1256                    lines.push(line);
1257                } else {
1258                    // Partial line — don't advance, will retry next tick.
1259                    break;
1260                }
1261            }
1262            *pos = start + bytes_read;
1263            out.extend(merge_log_lines(&id.qualified(), lines, false));
1264        }
1265
1266        if !out.is_empty() {
1267            let out = out
1268                .into_iter()
1269                .sorted_by_cached_key(|l| l.0.to_string())
1270                .collect_vec();
1271            for (date, name, msg) in out {
1272                println!(
1273                    "{}",
1274                    format_log_line(&date, &name, &msg, single_daemon, id_width, strip_ansi)
1275                );
1276            }
1277        }
1278    }
1279}
1280
1281fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
1282    let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
1283    Local
1284        .from_local_datetime(&naive_dt)
1285        .single()
1286        .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
1287}
1288
1289/// Parse time input string into DateTime.
1290///
1291/// `is_since` indicates whether this is for --since (true) or --until (false).
1292/// The "yesterday fallback" only applies to --since: if the time is in the future,
1293/// assume the user meant yesterday. For --until, future times are kept as-is.
1294fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
1295    let s = s.trim();
1296
1297    // Try full datetime first (YYYY-MM-DD HH:MM:SS)
1298    if let Ok(dt) = parse_datetime(s) {
1299        return Ok(dt);
1300    }
1301
1302    // Try datetime without seconds (YYYY-MM-DD HH:MM)
1303    if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
1304        return Local
1305            .from_local_datetime(&naive_dt)
1306            .single()
1307            .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
1308    }
1309
1310    // Try time-only format (HH:MM:SS or HH:MM)
1311    // Note: This branch won't be reached for inputs like "10:30" that could match
1312    // parse_datetime, because parse_datetime expects a full date prefix and will fail.
1313    if let Ok(time) = parse_time_only(s) {
1314        let now = Local::now();
1315        let today = now.date_naive();
1316        let mut naive_dt = NaiveDateTime::new(today, time);
1317        let mut dt = Local
1318            .from_local_datetime(&naive_dt)
1319            .single()
1320            .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1321
1322        // If the interpreted time for today is in the future, assume the user meant yesterday
1323        // BUT only for --since. For --until, a future time today is valid.
1324        if is_since
1325            && dt > now
1326            && let Some(yesterday) = today.pred_opt()
1327        {
1328            naive_dt = NaiveDateTime::new(yesterday, time);
1329            dt = Local
1330                .from_local_datetime(&naive_dt)
1331                .single()
1332                .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1333        }
1334        return Ok(dt);
1335    }
1336
1337    if let Ok(duration) = humantime::parse_duration(s) {
1338        let now = Local::now();
1339        let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
1340        return Ok(target);
1341    }
1342
1343    Err(miette::miette!(
1344        "Invalid time format: '{}'. Expected formats:\n\
1345         - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
1346         - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
1347         - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
1348        s
1349    ))
1350}
1351
1352fn parse_time_only(s: &str) -> Result<NaiveTime> {
1353    if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
1354        return Ok(time);
1355    }
1356
1357    if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
1358        return Ok(time);
1359    }
1360
1361    Err(miette::miette!("Invalid time format: '{}'", s))
1362}
1363
1364/// Prints error log lines in a styled block matching the startup logs format.
1365///
1366/// Format:
1367/// ```text
1368///  ERROR LOGS
1369///  12:00:00 error message
1370/// ```
1371///
1372/// Timestamps use dimmed red. The tag uses white text on red background.
1373pub fn print_error_logs_block(log_lines: &[(String, String, String)]) {
1374    if log_lines.is_empty() {
1375        return;
1376    }
1377
1378    let is_tty = std::io::stderr().is_terminal();
1379    let format_msg = |msg: &str| -> String {
1380        let stripped = strip_pty_controls(msg);
1381        if is_tty {
1382            stripped
1383        } else {
1384            console::strip_ansi_codes(&stripped).to_string()
1385        }
1386    };
1387
1388    let tag = estyle(" ERROR LOGS ").white().on_red();
1389    eprintln!("\n{tag}");
1390
1391    // Determine if we need to show daemon IDs (same logic as startup logs)
1392    let unique_ids: BTreeSet<&str> = log_lines.iter().map(|(_, id, _)| id.as_str()).collect();
1393    let show_id = unique_ids.len() > 1;
1394
1395    if show_id {
1396        let id_width = log_lines
1397            .iter()
1398            .map(|(_, id, _)| console::measure_text_width(id))
1399            .max()
1400            .unwrap_or(0);
1401        for (date, id, msg) in log_lines {
1402            let time = date.split(' ').nth(1).unwrap_or(date);
1403            let colored = dimmed_id(id, is_tty && console::colors_enabled_stderr());
1404            let padded = console::pad_str(&colored, id_width, console::Alignment::Left, None);
1405            eprintln!(
1406                "{}  {} {}",
1407                padded,
1408                estyle(time).red().dim(),
1409                format_msg(msg)
1410            );
1411        }
1412    } else {
1413        for (date, _, msg) in log_lines {
1414            let time = date.split(' ').nth(1).unwrap_or(date);
1415            eprintln!("{} {}", estyle(time).red().dim(), format_msg(msg));
1416        }
1417    }
1418}
1419
1420/// Describes the type of ready check being performed for display purposes.
1421pub enum ReadyCheckType {
1422    Output(String),
1423    Http(String),
1424    Port(u16),
1425    Cmd(String),
1426    Delay(u64),
1427    Default,
1428}
1429
1430impl std::fmt::Display for ReadyCheckType {
1431    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1432        match self {
1433            ReadyCheckType::Output(pattern) => write!(f, "output matching '{pattern}'"),
1434            ReadyCheckType::Http(url) => write!(f, "HTTP {url}"),
1435            ReadyCheckType::Port(port) => write!(f, "TCP port {port}"),
1436            ReadyCheckType::Cmd(cmd) => write!(f, "command '{cmd}'"),
1437            ReadyCheckType::Delay(secs) => write!(f, "delay ({secs}s)"),
1438            ReadyCheckType::Default => write!(f, "default readiness check"),
1439        }
1440    }
1441}
1442
1443/// Creates a progress job showing a spinner while waiting for a ready check.
1444///
1445/// Returns a `Arc<ProgressJob>` that the caller should update:
1446/// - Set body to success message and status to `Done` when the daemon is ready
1447/// - Set body to failure message and status to `Failed` when the daemon fails
1448pub fn create_ready_check_job(
1449    daemon_id: &DaemonId,
1450    check_type: &ReadyCheckType,
1451) -> std::sync::Arc<clx::progress::ProgressJob> {
1452    use clx::progress::{ProgressJobBuilder, ProgressJobDoneBehavior, ProgressStatus};
1453
1454    let is_tty = std::io::stderr().is_terminal();
1455    let colors_enabled = is_tty && console::colors_enabled_stderr();
1456    let id_label = colored_id_label(&daemon_id.qualified(), colors_enabled);
1457    let show_ts = crate::settings::settings().general.startup_log_timestamps;
1458
1459    // When timestamps are off, {{spinner()}} renders as an animated spinner
1460    // (1 char wide) matching the "•" prefix used by println.  When on,
1461    // we show a dim timestamp instead.
1462    let prefix = if show_ts {
1463        // The timestamp updates each refresh via the now() tera function.
1464        // We use a fixed-width format (HH:MM:SS = 8 chars) for alignment.
1465        edim(chrono::Local::now().format("%H:%M:%S").to_string()).to_string()
1466    } else {
1467        "{{spinner()}}".to_string()
1468    };
1469
1470    ProgressJobBuilder::new()
1471        .body(format!(
1472            "{} {} waiting for {{{{ check_type }}}}...",
1473            prefix, id_label
1474        ))
1475        .prop("check_type", &check_type.to_string())
1476        .status(ProgressStatus::Running)
1477        .on_done(ProgressJobDoneBehavior::Keep)
1478        .start()
1479}
1480
1481/// Collects startup log lines for a single daemon (does not print).
1482///
1483/// Returns a list of `(time, daemon_id_qualified, message)` tuples for log
1484/// entries written after `from`.
1485pub fn collect_startup_logs(
1486    daemon_id: &DaemonId,
1487    from: DateTime<Local>,
1488) -> Result<Vec<(String, String, String)>> {
1489    let from = from
1490        .with_nanosecond(0)
1491        .expect("0 is always valid for nanoseconds");
1492
1493    let path = daemon_id.log_path();
1494    let log_lines = if path.exists() {
1495        match read_lines_in_time_range(&path, Some(from), None) {
1496            Ok((lines, _)) => merge_log_lines(&daemon_id.qualified(), lines, false),
1497            Err(e) => {
1498                error!("{}: {}", path.display(), e);
1499                vec![]
1500            }
1501        }
1502    } else {
1503        vec![]
1504    };
1505
1506    Ok(log_lines)
1507}
1508
1509/// Reads new lines from a file starting at the given byte offset.
1510/// Only returns complete lines (ending with `\n`). Returns the new offset
1511/// that points past the last complete line, so partial lines at EOF are
1512/// preserved for the next read.
1513fn read_from_offset(path: &Path, offset: u64) -> Result<(Vec<String>, u64)> {
1514    let mut file = File::open(path).into_diagnostic()?;
1515    file.seek(SeekFrom::Start(offset)).into_diagnostic()?;
1516    let mut buf = String::new();
1517    file.read_to_string(&mut buf).into_diagnostic()?;
1518
1519    // Only process up to the last complete line (ending with \n).
1520    // Any trailing partial line is kept for the next poll.
1521    if let Some(last_newline) = buf.rfind('\n') {
1522        let complete = &buf[..=last_newline];
1523        let new_offset = offset + (last_newline + 1) as u64;
1524        let lines = complete.lines().map(String::from).collect();
1525        Ok((lines, new_offset))
1526    } else {
1527        // No complete lines yet — don't advance offset.
1528        Ok((vec![], offset))
1529    }
1530}
1531
1532/// Stream startup logs for a daemon to a progress job in real-time.
1533///
1534/// Spawns a background tokio task that polls the daemon's log file
1535/// and calls `job.println()` for each new line. Returns a watch sender
1536/// that stops the streaming when sent `true`.
1537pub fn stream_startup_logs(
1538    daemon_id: &DaemonId,
1539    from: DateTime<Local>,
1540    job: std::sync::Arc<clx::progress::ProgressJob>,
1541) -> (
1542    tokio::sync::watch::Sender<bool>,
1543    tokio::task::JoinHandle<()>,
1544) {
1545    let (tx, mut rx) = tokio::sync::watch::channel(false);
1546    let id = daemon_id.clone();
1547    let from = from
1548        .with_nanosecond(0)
1549        .expect("0 is always valid for nanoseconds");
1550
1551    let show_ts = crate::settings::settings().general.startup_log_timestamps;
1552
1553    let handle = tokio::spawn(async move {
1554        let path = id.log_path();
1555        let is_tty = std::io::stderr().is_terminal();
1556        let colors_enabled = is_tty && console::colors_enabled_stderr();
1557        let id_label = colored_id_label(&id.qualified(), colors_enabled);
1558        let prefix = if show_ts {
1559            // Will be replaced with actual timestamp per line
1560            String::new()
1561        } else {
1562            edim("•").to_string()
1563        };
1564
1565        // Wait for log file to be created (up to 2s)
1566        for _ in 0..20 {
1567            if path.exists() {
1568                break;
1569            }
1570            tokio::select! {
1571                _ = tokio::time::sleep(Duration::from_millis(100)) => {}
1572                _ = rx.changed() => return,
1573            }
1574        }
1575        if !path.exists() {
1576            return;
1577        }
1578
1579        // Print existing lines from `from` time, then capture the exact
1580        // end-of-read offset so the subsequent polling loop continues from
1581        // exactly where this left off (eliminates a tiny race window).
1582        let mut offset = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
1583        if let Ok((lines, read_end)) = read_lines_in_time_range(&path, Some(from), None) {
1584            let merged = merge_log_lines(&id.qualified(), lines, false);
1585            for (date, _, msg) in &merged {
1586                let time = date.split(' ').nth(1).unwrap_or(date);
1587                let msg = strip_pty_controls(msg);
1588                let msg = if is_tty {
1589                    msg
1590                } else {
1591                    console::strip_ansi_codes(&msg).to_string()
1592                };
1593                let line_prefix = if show_ts {
1594                    edim(time).to_string()
1595                } else {
1596                    prefix.clone()
1597                };
1598                job.println(&format!("{} {} {}", line_prefix, id_label, msg));
1599            }
1600            offset = read_end;
1601        }
1602        let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
1603
1604        // Poll for new lines
1605        loop {
1606            tokio::select! {
1607                _ = tokio::time::sleep(Duration::from_millis(200)) => {
1608                    if let Ok(metadata) = std::fs::metadata(&path) {
1609                        if metadata.len() > offset {
1610                            if let Ok((new_lines, new_offset)) = read_from_offset(&path, offset) {
1611                                for line in &new_lines {
1612                                    if let Some(caps) = re.captures(line) {
1613                                        let time = caps.get(1).map(|m| m.as_str()).unwrap_or("");
1614                                        let time = time.split(' ').nth(1).unwrap_or(time);
1615                                        let msg = caps.get(3).map(|m| m.as_str()).unwrap_or("");
1616                                        let msg = strip_pty_controls(msg);
1617                                        let msg = if is_tty {
1618                                            msg
1619                                        } else {
1620                                            console::strip_ansi_codes(&msg).to_string()
1621                                        };
1622                                        let line_prefix = if show_ts { edim(time).to_string() } else { prefix.clone() };
1623                                        job.println(&format!("{} {} {}", line_prefix, id_label, msg));
1624                                    }
1625                                }
1626                                offset = new_offset;
1627                            }
1628                        }
1629                    }
1630                }
1631                _ = rx.changed() => {
1632                    break;
1633                }
1634            }
1635        }
1636
1637        // Final drain: pick up any lines written after the last timer tick
1638        // but before the stop signal arrived. Since the caller awaits this
1639        // task before touching the job state, this println() is safe.
1640        if let Ok((new_lines, _new_offset)) = read_from_offset(&path, offset) {
1641            for line in &new_lines {
1642                if let Some(caps) = re.captures(line) {
1643                    let time = caps.get(1).map(|m| m.as_str()).unwrap_or("");
1644                    let time = time.split(' ').nth(1).unwrap_or(time);
1645                    let msg = caps.get(3).map(|m| m.as_str()).unwrap_or("");
1646                    let msg = strip_pty_controls(msg);
1647                    let msg = if is_tty {
1648                        msg
1649                    } else {
1650                        console::strip_ansi_codes(&msg).to_string()
1651                    };
1652                    let line_prefix = if show_ts {
1653                        edim(time).to_string()
1654                    } else {
1655                        prefix.clone()
1656                    };
1657                    job.println(&format!("{} {} {}", line_prefix, id_label, msg));
1658                }
1659            }
1660        }
1661    });
1662
1663    (tx, handle)
1664}
1665
1666/// Strips PTY control sequences from a string while preserving SGR (color/style) codes.
1667///
1668/// Removes CSI sequences that control cursor movement, screen clearing, erasing, etc.,
1669/// but keeps `\x1b[...m` (SGR) sequences so colors are retained.
1670fn strip_pty_controls(s: &str) -> String {
1671    struct Stripper {
1672        result: String,
1673    }
1674
1675    impl vte::Perform for Stripper {
1676        fn print(&mut self, c: char) {
1677            self.result.push(c);
1678        }
1679
1680        fn execute(&mut self, byte: u8) {
1681            // Keep \n and \t; drop other control characters (BEL, BS, CR, etc.)
1682            if byte == b'\n' || byte == b'\t' {
1683                self.result.push(byte as char);
1684            }
1685        }
1686
1687        fn csi_dispatch(
1688            &mut self,
1689            params: &vte::Params,
1690            _intermediates: &[u8],
1691            _ignore: bool,
1692            action: char,
1693        ) {
1694            // Keep SGR sequences (final byte 'm')
1695            if action == 'm' {
1696                self.result.push_str("\x1b[");
1697                let mut first = true;
1698                for sub in params.iter() {
1699                    if !first {
1700                        self.result.push(';');
1701                    }
1702                    first = false;
1703                    for (i, &p) in sub.iter().enumerate() {
1704                        if i > 0 {
1705                            self.result.push(':');
1706                        }
1707                        self.result.push_str(&p.to_string());
1708                    }
1709                }
1710                self.result.push('m');
1711            }
1712            // All other CSI sequences (cursor move, clear, erase, etc.) are dropped
1713        }
1714
1715        fn osc_dispatch(&mut self, _params: &[&[u8]], _bell_terminated: bool) {
1716            // Drop OSC sequences (e.g. window title)
1717        }
1718
1719        fn esc_dispatch(&mut self, _intermediates: &[u8], _ignore: bool, _byte: u8) {
1720            // Drop ESC sequences (e.g. ESC c = reset terminal)
1721        }
1722
1723        fn hook(
1724            &mut self,
1725            _params: &vte::Params,
1726            _intermediates: &[u8],
1727            _ignore: bool,
1728            _action: char,
1729        ) {
1730            // Drop DCS hooks
1731        }
1732
1733        fn put(&mut self, _byte: u8) {
1734            // Drop DCS data
1735        }
1736
1737        fn unhook(&mut self) {
1738            // Drop DCS unhook
1739        }
1740    }
1741
1742    let mut parser = vte::Parser::new();
1743    let mut stripper = Stripper {
1744        result: String::with_capacity(s.len()),
1745    };
1746    parser.advance(&mut stripper, s.as_bytes());
1747    stripper.result
1748}