1use crate::daemon_id::DaemonId;
2use crate::pitchfork_toml::PitchforkToml;
3use crate::state_file::StateFile;
4use crate::ui::style::edim;
5use crate::watch_files::WatchFiles;
6use crate::{Result, env};
7use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
8use itertools::Itertools;
9use miette::IntoDiagnostic;
10use notify::RecursiveMode;
11use std::cmp::{Ordering, Reverse};
12use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap};
13use std::fs::{self, File};
14use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
15use std::path::{Path, PathBuf};
16use std::process::{Child, Command, Stdio};
17use std::time::Duration;
18use xx::regex;
19
20struct PagerConfig {
22 command: String,
23 args: Vec<String>,
24}
25
26impl PagerConfig {
27 fn new(start_at_end: bool) -> Self {
30 let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
31 let args = Self::build_args(&command, start_at_end);
32 Self { command, args }
33 }
34
35 fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
36 let mut args = vec![];
37 if pager == "less" {
38 args.push("-R".to_string());
39 if start_at_end {
40 args.push("+G".to_string());
41 }
42 }
43 args
44 }
45
46 fn spawn_piped(&self) -> io::Result<Child> {
48 Command::new(&self.command)
49 .args(&self.args)
50 .stdin(Stdio::piped())
51 .spawn()
52 }
53}
54
55fn format_log_line(date: &str, id: &str, msg: &str, single_daemon: bool) -> String {
58 if single_daemon {
59 format!("{} {}", edim(date), msg)
60 } else {
61 format!("{} {} {}", edim(date), id, msg)
62 }
63}
64
65#[derive(Debug)]
67struct LogEntry {
68 timestamp: String,
69 daemon: String,
70 message: String,
71 source_idx: usize, }
73
74impl PartialEq for LogEntry {
75 fn eq(&self, other: &Self) -> bool {
76 self.timestamp == other.timestamp
77 }
78}
79
80impl Eq for LogEntry {}
81
82impl PartialOrd for LogEntry {
83 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
84 Some(self.cmp(other))
85 }
86}
87
88impl Ord for LogEntry {
89 fn cmp(&self, other: &Self) -> Ordering {
90 self.timestamp.cmp(&other.timestamp)
91 }
92}
93
94struct StreamingMerger<I>
97where
98 I: Iterator<Item = (String, String)>,
99{
100 sources: Vec<(String, I)>, heap: BinaryHeap<Reverse<LogEntry>>, }
103
104impl<I> StreamingMerger<I>
105where
106 I: Iterator<Item = (String, String)>,
107{
108 fn new() -> Self {
109 Self {
110 sources: Vec::new(),
111 heap: BinaryHeap::new(),
112 }
113 }
114
115 fn add_source(&mut self, daemon_name: String, iter: I) {
116 self.sources.push((daemon_name, iter));
117 }
118
119 fn initialize(&mut self) {
120 for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
122 if let Some((timestamp, message)) = iter.next() {
123 self.heap.push(Reverse(LogEntry {
124 timestamp,
125 daemon: daemon.clone(),
126 message,
127 source_idx: idx,
128 }));
129 }
130 }
131 }
132}
133
134impl<I> Iterator for StreamingMerger<I>
135where
136 I: Iterator<Item = (String, String)>,
137{
138 type Item = (String, String, String); fn next(&mut self) -> Option<Self::Item> {
141 let Reverse(entry) = self.heap.pop()?;
143
144 let (daemon, iter) = &mut self.sources[entry.source_idx];
146 if let Some((timestamp, message)) = iter.next() {
147 self.heap.push(Reverse(LogEntry {
148 timestamp,
149 daemon: daemon.clone(),
150 message,
151 source_idx: entry.source_idx,
152 }));
153 }
154
155 Some((entry.timestamp, entry.daemon, entry.message))
156 }
157}
158
159struct StreamingLogParser {
161 reader: BufReader<File>,
162 current_entry: Option<(String, String)>,
163 finished: bool,
164}
165
166impl StreamingLogParser {
167 fn new(file: File) -> Self {
168 Self {
169 reader: BufReader::new(file),
170 current_entry: None,
171 finished: false,
172 }
173 }
174}
175
176impl Iterator for StreamingLogParser {
177 type Item = (String, String);
178
179 fn next(&mut self) -> Option<Self::Item> {
180 if self.finished {
181 return None;
182 }
183
184 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
185
186 loop {
187 let mut line = String::new();
188 match self.reader.read_line(&mut line) {
189 Ok(0) => {
190 self.finished = true;
192 return self.current_entry.take();
193 }
194 Ok(_) => {
195 if line.ends_with('\n') {
197 line.pop();
198 if line.ends_with('\r') {
199 line.pop();
200 }
201 }
202
203 if let Some(caps) = re.captures(&line) {
204 let date = match caps.get(1) {
205 Some(d) => d.as_str().to_string(),
206 None => continue,
207 };
208 let msg = match caps.get(3) {
209 Some(m) => m.as_str().to_string(),
210 None => continue,
211 };
212
213 let prev = self.current_entry.take();
215 self.current_entry = Some((date, msg));
216
217 if prev.is_some() {
218 return prev;
219 }
220 } else {
222 if let Some((_, ref mut msg)) = self.current_entry {
224 msg.push('\n');
225 msg.push_str(&line);
226 }
227 }
228 }
229 Err(_) => {
230 self.finished = true;
231 return self.current_entry.take();
232 }
233 }
234 }
235 }
236}
237
238#[derive(Debug, clap::Args)]
240#[clap(
241 visible_alias = "l",
242 verbatim_doc_comment,
243 long_about = "\
244Displays logs for daemon(s)
245
246Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
247and include timestamps for filtering.
248
249Examples:
250 pitchfork logs api Show all logs for 'api' (paged if needed)
251 pitchfork logs api worker Show logs for multiple daemons
252 pitchfork logs Show logs for all daemons
253 pitchfork logs api -n 50 Show last 50 lines
254 pitchfork logs api --follow Follow logs in real-time
255 pitchfork logs api --since '2024-01-15 10:00:00'
256 Show logs since a specific time (forward)
257 pitchfork logs api --since '10:30:00'
258 Show logs since 10:30:00 today
259 pitchfork logs api --since '10:30' --until '12:00'
260 Show logs since 10:30:00 until 12:00:00 today
261 pitchfork logs api --since 5min Show logs from last 5 minutes
262 pitchfork logs api --raw Output raw log lines without formatting
263 pitchfork logs api --raw -n 100 Output last 100 raw log lines
264 pitchfork logs api --clear Delete logs for 'api'
265 pitchfork logs --clear Delete logs for all daemons"
266)]
267pub struct Logs {
268 id: Vec<String>,
270
271 #[clap(short, long)]
273 clear: bool,
274
275 #[clap(short)]
280 n: Option<usize>,
281
282 #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
284 tail: bool,
285
286 #[clap(short = 's', long)]
293 since: Option<String>,
294
295 #[clap(short = 'u', long)]
301 until: Option<String>,
302
303 #[clap(long)]
305 no_pager: bool,
306
307 #[clap(long)]
309 raw: bool,
310}
311
312impl Logs {
313 pub async fn run(&self) -> Result<()> {
314 migrate_legacy_log_dirs();
317
318 let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
320 get_all_daemon_ids()?
322 } else {
323 PitchforkToml::resolve_ids(&self.id)?
324 };
325
326 if self.clear {
327 for id in &resolved_ids {
328 let path = id.log_path();
329 if path.exists() {
330 xx::file::create(&path)?;
331 }
332 }
333 return Ok(());
334 }
335
336 let from = if let Some(since) = self.since.as_ref() {
337 Some(parse_time_input(since, true)?)
338 } else {
339 None
340 };
341 let to = if let Some(until) = self.until.as_ref() {
342 Some(parse_time_input(until, false)?)
343 } else {
344 None
345 };
346
347 self.print_existing_logs(&resolved_ids, from, to)?;
348 if self.tail {
349 tail_logs(&resolved_ids).await?;
350 }
351
352 Ok(())
353 }
354
355 fn print_existing_logs(
356 &self,
357 resolved_ids: &[DaemonId],
358 from: Option<DateTime<Local>>,
359 to: Option<DateTime<Local>>,
360 ) -> Result<()> {
361 let log_files = get_log_file_infos(resolved_ids)?;
362 trace!("log files for: {}", log_files.keys().join(", "));
363 let single_daemon = resolved_ids.len() == 1;
364 let has_time_filter = from.is_some() || to.is_some();
365
366 if has_time_filter {
367 let mut log_lines = self.collect_log_lines_forward(&log_files, from, to)?;
368
369 if let Some(n) = self.n {
370 let len = log_lines.len();
371 if len > n {
372 log_lines = log_lines.into_iter().skip(len - n).collect_vec();
373 }
374 }
375
376 self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
377 } else if let Some(n) = self.n {
378 let log_lines = self.collect_log_lines_reverse(&log_files, Some(n))?;
379 self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
380 } else {
381 self.stream_logs_to_pager(&log_files, single_daemon, self.raw)?;
382 }
383
384 Ok(())
385 }
386
387 fn collect_log_lines_forward(
388 &self,
389 log_files: &BTreeMap<DaemonId, LogFile>,
390 from: Option<DateTime<Local>>,
391 to: Option<DateTime<Local>>,
392 ) -> Result<Vec<(String, String, String)>> {
393 let log_lines: Vec<(String, String, String)> = log_files
394 .iter()
395 .flat_map(
396 |(name, lf)| match read_lines_in_time_range(&lf.path, from, to) {
397 Ok(lines) => merge_log_lines(&name.qualified(), lines, false),
398 Err(e) => {
399 error!("{}: {}", lf.path.display(), e);
400 vec![]
401 }
402 },
403 )
404 .sorted_by_cached_key(|l| l.0.to_string())
405 .collect_vec();
406
407 Ok(log_lines)
408 }
409
410 fn collect_log_lines_reverse(
411 &self,
412 log_files: &BTreeMap<DaemonId, LogFile>,
413 limit: Option<usize>,
414 ) -> Result<Vec<(String, String, String)>> {
415 let log_lines: Vec<(String, String, String)> = log_files
416 .iter()
417 .flat_map(|(daemon_id, lf)| {
418 let rev = match xx::file::open(&lf.path) {
419 Ok(f) => rev_lines::RevLines::new(f),
420 Err(e) => {
421 error!("{}: {}", lf.path.display(), e);
422 return vec![];
423 }
424 };
425 let lines = rev.into_iter().filter_map(Result::ok);
426 let lines = match limit {
427 Some(n) => lines.take(n).collect_vec(),
428 None => lines.collect_vec(),
429 };
430 merge_log_lines(&daemon_id.qualified(), lines, true)
431 })
432 .sorted_by_cached_key(|l| l.0.to_string())
433 .collect_vec();
434
435 let log_lines = match limit {
436 Some(n) => {
437 let len = log_lines.len();
438 if len > n {
439 log_lines.into_iter().skip(len - n).collect_vec()
440 } else {
441 log_lines
442 }
443 }
444 None => log_lines,
445 };
446
447 Ok(log_lines)
448 }
449
450 fn output_logs(
451 &self,
452 log_lines: Vec<(String, String, String)>,
453 single_daemon: bool,
454 has_time_filter: bool,
455 raw: bool,
456 ) -> Result<()> {
457 if log_lines.is_empty() {
458 return Ok(());
459 }
460
461 if raw {
463 for (date, id, msg) in log_lines {
464 if single_daemon {
465 println!("{} {}", date, msg);
466 } else {
467 println!("{} {} {}", date, id, msg);
468 }
469 }
470 return Ok(());
471 }
472
473 let use_pager = !self.no_pager && should_use_pager(log_lines.len());
474
475 if use_pager {
476 self.output_with_pager(log_lines, single_daemon, has_time_filter)?;
477 } else {
478 for (date, id, msg) in log_lines {
479 println!("{}", format_log_line(&date, &id, &msg, single_daemon));
480 }
481 }
482
483 Ok(())
484 }
485
486 fn output_with_pager(
487 &self,
488 log_lines: Vec<(String, String, String)>,
489 single_daemon: bool,
490 has_time_filter: bool,
491 ) -> Result<()> {
492 let pager_config = PagerConfig::new(!has_time_filter);
494
495 match pager_config.spawn_piped() {
496 Ok(mut child) => {
497 if let Some(stdin) = child.stdin.as_mut() {
498 for (date, id, msg) in log_lines {
499 let line =
500 format!("{}\n", format_log_line(&date, &id, &msg, single_daemon));
501 if stdin.write_all(line.as_bytes()).is_err() {
502 break;
503 }
504 }
505 let _ = child.wait();
506 } else {
507 debug!("Failed to get pager stdin, falling back to direct output");
508 for (date, id, msg) in log_lines {
509 println!("{}", format_log_line(&date, &id, &msg, single_daemon));
510 }
511 }
512 }
513 Err(e) => {
514 debug!(
515 "Failed to spawn pager: {}, falling back to direct output",
516 e
517 );
518 for (date, id, msg) in log_lines {
519 println!("{}", format_log_line(&date, &id, &msg, single_daemon));
520 }
521 }
522 }
523
524 Ok(())
525 }
526
527 fn stream_logs_to_pager(
528 &self,
529 log_files: &BTreeMap<DaemonId, LogFile>,
530 single_daemon: bool,
531 raw: bool,
532 ) -> Result<()> {
533 if !io::stdout().is_terminal() || self.no_pager || self.tail || raw {
534 return self.stream_logs_direct(log_files, single_daemon, raw);
535 }
536
537 let pager_config = PagerConfig::new(true); match pager_config.spawn_piped() {
540 Ok(mut child) => {
541 if let Some(stdin) = child.stdin.take() {
542 let log_files_clone: Vec<_> = log_files
544 .iter()
545 .map(|(daemon_id, lf)| (daemon_id.qualified(), lf.path.clone()))
546 .collect();
547 let single_daemon_clone = single_daemon;
548
549 std::thread::spawn(move || {
551 let mut writer = BufWriter::new(stdin);
552
553 if log_files_clone.len() == 1 {
555 let (name, path) = &log_files_clone[0];
556 let file = match File::open(path) {
557 Ok(f) => f,
558 Err(_) => return,
559 };
560 let parser = StreamingLogParser::new(file);
561 for (timestamp, message) in parser {
562 let output = format!(
563 "{}\n",
564 format_log_line(
565 ×tamp,
566 name,
567 &message,
568 single_daemon_clone
569 )
570 );
571 if writer.write_all(output.as_bytes()).is_err() {
572 return;
573 }
574 }
575 let _ = writer.flush();
576 return;
577 }
578
579 let mut merger: StreamingMerger<StreamingLogParser> =
581 StreamingMerger::new();
582
583 for (name, path) in log_files_clone {
584 let file = match File::open(&path) {
585 Ok(f) => f,
586 Err(_) => continue,
587 };
588 let parser = StreamingLogParser::new(file);
589 merger.add_source(name, parser);
590 }
591
592 merger.initialize();
594
595 for (timestamp, daemon, message) in merger {
597 let output = format!(
598 "{}\n",
599 format_log_line(×tamp, &daemon, &message, single_daemon_clone)
600 );
601 if writer.write_all(output.as_bytes()).is_err() {
602 return;
603 }
604 }
605
606 let _ = writer.flush();
607 });
608
609 let _ = child.wait();
610 } else {
611 debug!("Failed to get pager stdin, falling back to direct output");
612 return self.stream_logs_direct(log_files, single_daemon, raw);
613 }
614 }
615 Err(e) => {
616 debug!(
617 "Failed to spawn pager: {}, falling back to direct output",
618 e
619 );
620 return self.stream_logs_direct(log_files, single_daemon, raw);
621 }
622 }
623
624 Ok(())
625 }
626
627 fn stream_logs_direct(
628 &self,
629 log_files: &BTreeMap<DaemonId, LogFile>,
630 single_daemon: bool,
631 raw: bool,
632 ) -> Result<()> {
633 if log_files.len() == 1 {
636 let (daemon_id, lf) = log_files.iter().next().unwrap();
637 let file = match File::open(&lf.path) {
638 Ok(f) => f,
639 Err(e) => {
640 error!("{}: {}", lf.path.display(), e);
641 return Ok(());
642 }
643 };
644 let reader = BufReader::new(file);
645 if raw {
646 for line in reader.lines() {
648 match line {
649 Ok(l) => {
650 if io::stdout().write_all(l.as_bytes()).is_err()
651 || io::stdout().write_all(b"\n").is_err()
652 {
653 return Ok(());
654 }
655 }
656 Err(_) => continue,
657 }
658 }
659 } else {
660 let parser = StreamingLogParser::new(File::open(&lf.path).into_diagnostic()?);
662 for (timestamp, message) in parser {
663 let output = format!(
664 "{}\n",
665 format_log_line(
666 ×tamp,
667 &daemon_id.qualified(),
668 &message,
669 single_daemon
670 )
671 );
672 if io::stdout().write_all(output.as_bytes()).is_err() {
673 return Ok(());
674 }
675 }
676 }
677 return Ok(());
678 }
679
680 let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
682
683 for (daemon_id, lf) in log_files {
684 let file = match File::open(&lf.path) {
685 Ok(f) => f,
686 Err(e) => {
687 error!("{}: {}", lf.path.display(), e);
688 continue;
689 }
690 };
691 let parser = StreamingLogParser::new(file);
692 merger.add_source(daemon_id.qualified(), parser);
693 }
694
695 merger.initialize();
697
698 for (timestamp, daemon, message) in merger {
700 let output = if raw {
701 if single_daemon {
702 format!("{} {}\n", timestamp, message)
703 } else {
704 format!("{} {} {}\n", timestamp, daemon, message)
705 }
706 } else {
707 format!(
708 "{}\n",
709 format_log_line(×tamp, &daemon, &message, single_daemon)
710 )
711 };
712 if io::stdout().write_all(output.as_bytes()).is_err() {
713 return Ok(());
714 }
715 }
716
717 Ok(())
718 }
719}
720
721fn should_use_pager(line_count: usize) -> bool {
722 if !io::stdout().is_terminal() {
723 return false;
724 }
725
726 let terminal_height = get_terminal_height().unwrap_or(24);
727 line_count > terminal_height
728}
729
730fn get_terminal_height() -> Option<usize> {
731 if let Ok(rows) = std::env::var("LINES")
732 && let Ok(h) = rows.parse::<usize>()
733 {
734 return Some(h);
735 }
736
737 crossterm::terminal::size().ok().map(|(_, h)| h as usize)
738}
739
740fn read_lines_in_time_range(
741 path: &Path,
742 from: Option<DateTime<Local>>,
743 to: Option<DateTime<Local>>,
744) -> Result<Vec<String>> {
745 let mut file = File::open(path).into_diagnostic()?;
746 let file_size = file.metadata().into_diagnostic()?.len();
747
748 if file_size == 0 {
749 return Ok(vec![]);
750 }
751
752 let start_pos = if let Some(from_time) = from {
753 binary_search_log_position(&mut file, file_size, from_time, true)?
754 } else {
755 0
756 };
757
758 let end_pos = if let Some(to_time) = to {
759 binary_search_log_position(&mut file, file_size, to_time, false)?
760 } else {
761 file_size
762 };
763
764 if start_pos >= end_pos {
765 return Ok(vec![]);
766 }
767
768 file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
769 let mut reader = BufReader::new(&file);
770 let mut lines = Vec::new();
771 let mut current_pos = start_pos;
772
773 loop {
774 if current_pos >= end_pos {
775 break;
776 }
777
778 let mut line = String::new();
779 match reader.read_line(&mut line) {
780 Ok(0) => break,
781 Ok(bytes_read) => {
782 current_pos += bytes_read as u64;
783 if line.ends_with('\n') {
784 line.pop();
785 if line.ends_with('\r') {
786 line.pop();
787 }
788 }
789 lines.push(line);
790 }
791 Err(_) => break,
792 }
793 }
794
795 Ok(lines)
796}
797
798fn binary_search_log_position(
799 file: &mut File,
800 file_size: u64,
801 target_time: DateTime<Local>,
802 find_start: bool,
803) -> Result<u64> {
804 let mut low: u64 = 0;
805 let mut high: u64 = file_size;
806
807 while low < high {
808 let mid = low + (high - low) / 2;
809
810 let line_start = find_line_start(file, mid)?;
811
812 file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
813 let mut reader = BufReader::new(&*file);
814 let mut line = String::new();
815 let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
816 if bytes_read == 0 {
817 high = mid;
818 continue;
819 }
820
821 let line_time = extract_timestamp(&line);
822
823 match line_time {
824 Some(lt) => {
825 if find_start {
826 if lt < target_time {
827 low = line_start + bytes_read as u64;
828 } else {
829 high = line_start;
830 }
831 } else if lt <= target_time {
832 low = line_start + bytes_read as u64;
833 } else {
834 high = line_start;
835 }
836 }
837 None => {
838 low = line_start + bytes_read as u64;
839 }
840 }
841 }
842
843 find_line_start(file, low)
844}
845
846fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
847 if pos == 0 {
848 return Ok(0);
849 }
850
851 let mut search_pos = pos.saturating_sub(1);
853 const CHUNK_SIZE: usize = 8192;
854
855 loop {
856 let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
858 let len_u64 = search_pos - chunk_start + 1;
859 let len = len_u64 as usize;
860
861 file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
863 let mut buf = vec![0u8; len];
864 if file.read_exact(&mut buf).is_err() {
865 return Ok(0);
867 }
868
869 for (i, &b) in buf.iter().enumerate().rev() {
871 if b == b'\n' {
872 return Ok(chunk_start + i as u64 + 1);
873 }
874 }
875
876 if chunk_start == 0 {
879 return Ok(0);
880 }
881
882 search_pos = chunk_start - 1;
884 }
885}
886
887fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
888 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
889 re.captures(line)
890 .and_then(|caps| caps.get(1))
891 .and_then(|m| parse_datetime(m.as_str()).ok())
892}
893
894fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
895 let lines = if reverse {
896 lines.into_iter().rev().collect()
897 } else {
898 lines
899 };
900
901 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
902 lines
903 .into_iter()
904 .fold(vec![], |mut acc, line| match re.captures(&line) {
905 Some(caps) => {
906 let (date, msg) = match (caps.get(1), caps.get(3)) {
907 (Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
908 _ => return acc,
909 };
910 acc.push((date, id.to_string(), msg));
911 acc
912 }
913 None => {
914 if let Some(l) = acc.last_mut() {
915 l.2.push('\n');
916 l.2.push_str(&line);
917 }
918 acc
919 }
920 })
921}
922
923fn migrate_legacy_log_dirs() {
933 let known_safe_paths = known_daemon_safe_paths();
934 let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
935 Ok(d) => d,
936 Err(_) => return,
937 };
938 for dir in dirs {
939 if dir.starts_with(".") || !dir.is_dir() {
940 continue;
941 }
942 let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
943 Some(n) => n,
944 None => continue,
945 };
946 if name.contains("--") {
949 if DaemonId::from_safe_path(&name).is_ok() {
952 continue;
953 }
954 if known_safe_paths.contains(&name) {
957 continue;
958 }
959 warn!(
960 "Skipping invalid legacy log directory '{}': contains '--' but is not a valid daemon safe-path",
961 name
962 );
963 continue;
964 }
965
966 let old_log = dir.join(format!("{name}.log"));
969 if !old_log.exists() {
970 continue;
971 }
972 if DaemonId::try_new("legacy", &name).is_err() {
973 warn!(
974 "Skipping invalid legacy log directory '{}': not a valid daemon ID",
975 name
976 );
977 continue;
978 }
979
980 let new_name = format!("legacy--{name}");
981 let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
982 if new_dir.exists() {
984 continue;
985 }
986 if std::fs::rename(&dir, &new_dir).is_err() {
987 continue;
988 }
989 let old_log = new_dir.join(format!("{name}.log"));
991 let new_log = new_dir.join(format!("{new_name}.log"));
992 if old_log.exists() {
993 let _ = std::fs::rename(&old_log, &new_log);
994 }
995 debug!("Migrated legacy log dir '{}' → '{}'", name, new_name);
996 }
997}
998
999fn known_daemon_safe_paths() -> BTreeSet<String> {
1000 let mut out = BTreeSet::new();
1001
1002 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1003 Ok(state) => {
1004 for id in state.daemons.keys() {
1005 out.insert(id.safe_path());
1006 }
1007 }
1008 Err(e) => {
1009 warn!("Failed to read state while checking known daemon IDs: {e}");
1010 }
1011 }
1012
1013 match PitchforkToml::all_merged() {
1014 Ok(config) => {
1015 for id in config.daemons.keys() {
1016 out.insert(id.safe_path());
1017 }
1018 }
1019 Err(e) => {
1020 warn!("Failed to read config while checking known daemon IDs: {e}");
1021 }
1022 }
1023
1024 out
1025}
1026
1027fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
1028 let mut ids = BTreeSet::new();
1029
1030 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1031 Ok(state) => ids.extend(state.daemons.keys().cloned()),
1032 Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
1033 }
1034
1035 match PitchforkToml::all_merged() {
1036 Ok(config) => ids.extend(config.daemons.keys().cloned()),
1037 Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
1038 }
1039
1040 Ok(ids
1041 .into_iter()
1042 .filter(|id| id.log_path().exists())
1043 .collect())
1044}
1045
1046fn get_log_file_infos(names: &[DaemonId]) -> Result<BTreeMap<DaemonId, LogFile>> {
1047 let mut out = BTreeMap::new();
1048
1049 for daemon_id in names {
1050 let path = daemon_id.log_path();
1051
1052 if !path.exists() {
1054 continue;
1055 }
1056
1057 let mut file = xx::file::open(&path)?;
1058 file.seek(SeekFrom::End(0)).into_diagnostic()?;
1061 let cur = file.stream_position().into_diagnostic()?;
1062
1063 out.insert(
1064 daemon_id.clone(),
1065 LogFile {
1066 _name: daemon_id.clone(),
1067 file,
1068 cur,
1069 path,
1070 },
1071 );
1072 }
1073
1074 Ok(out)
1075}
1076
1077pub async fn tail_logs(names: &[DaemonId]) -> Result<()> {
1078 let mut log_files = get_log_file_infos(names)?;
1079 let mut wf = WatchFiles::new(Duration::from_millis(10))?;
1080
1081 for lf in log_files.values() {
1082 wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
1083 }
1084
1085 let files_to_name: HashMap<PathBuf, DaemonId> = log_files
1086 .iter()
1087 .map(|(n, f)| (f.path.clone(), n.clone()))
1088 .collect();
1089
1090 while let Some(paths) = wf.rx.recv().await {
1091 let mut out = vec![];
1092 for path in paths {
1093 let Some(name) = files_to_name.get(&path) else {
1094 warn!("Unknown log file changed: {}", path.display());
1095 continue;
1096 };
1097 let Some(info) = log_files.get_mut(name) else {
1098 warn!("No log info for: {name}");
1099 continue;
1100 };
1101 info.file
1102 .seek(SeekFrom::Start(info.cur))
1103 .into_diagnostic()?;
1104 let reader = BufReader::new(&info.file);
1105 let lines = reader.lines().map_while(Result::ok).collect_vec();
1106 info.cur = info.file.stream_position().into_diagnostic()?;
1107 out.extend(merge_log_lines(&name.qualified(), lines, false));
1108 }
1109 let out = out
1110 .into_iter()
1111 .sorted_by_cached_key(|l| l.0.to_string())
1112 .collect_vec();
1113 for (date, name, msg) in out {
1114 println!("{} {} {}", edim(&date), name, msg);
1115 }
1116 }
1117 Ok(())
1118}
1119
1120struct LogFile {
1121 _name: DaemonId,
1122 path: PathBuf,
1123 file: fs::File,
1124 cur: u64,
1125}
1126
1127fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
1128 let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
1129 Local
1130 .from_local_datetime(&naive_dt)
1131 .single()
1132 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
1133}
1134
1135fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
1141 let s = s.trim();
1142
1143 if let Ok(dt) = parse_datetime(s) {
1145 return Ok(dt);
1146 }
1147
1148 if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
1150 return Local
1151 .from_local_datetime(&naive_dt)
1152 .single()
1153 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
1154 }
1155
1156 if let Ok(time) = parse_time_only(s) {
1160 let now = Local::now();
1161 let today = now.date_naive();
1162 let mut naive_dt = NaiveDateTime::new(today, time);
1163 let mut dt = Local
1164 .from_local_datetime(&naive_dt)
1165 .single()
1166 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1167
1168 if is_since
1171 && dt > now
1172 && let Some(yesterday) = today.pred_opt()
1173 {
1174 naive_dt = NaiveDateTime::new(yesterday, time);
1175 dt = Local
1176 .from_local_datetime(&naive_dt)
1177 .single()
1178 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1179 }
1180 return Ok(dt);
1181 }
1182
1183 if let Ok(duration) = humantime::parse_duration(s) {
1184 let now = Local::now();
1185 let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
1186 return Ok(target);
1187 }
1188
1189 Err(miette::miette!(
1190 "Invalid time format: '{}'. Expected formats:\n\
1191 - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
1192 - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
1193 - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
1194 s
1195 ))
1196}
1197
1198fn parse_time_only(s: &str) -> Result<NaiveTime> {
1199 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
1200 return Ok(time);
1201 }
1202
1203 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
1204 return Ok(time);
1205 }
1206
1207 Err(miette::miette!("Invalid time format: '{}'", s))
1208}
1209
1210pub fn print_logs_for_time_range(
1211 daemon_id: &DaemonId,
1212 from: DateTime<Local>,
1213 to: Option<DateTime<Local>>,
1214) -> Result<()> {
1215 let daemon_ids = vec![daemon_id.clone()];
1216 let log_files = get_log_file_infos(&daemon_ids)?;
1217
1218 let from = from
1219 .with_nanosecond(0)
1220 .expect("0 is always valid for nanoseconds");
1221 let to = to.map(|t| {
1222 t.with_nanosecond(0)
1223 .expect("0 is always valid for nanoseconds")
1224 });
1225
1226 let log_lines = log_files
1227 .iter()
1228 .flat_map(
1229 |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), to) {
1230 Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1231 Err(e) => {
1232 error!("{}: {}", lf.path.display(), e);
1233 vec![]
1234 }
1235 },
1236 )
1237 .sorted_by_cached_key(|l| l.0.to_string())
1238 .collect_vec();
1239
1240 if log_lines.is_empty() {
1241 eprintln!("No logs found for daemon '{daemon_id}' in the specified time range");
1242 } else {
1243 eprintln!("\n{} {} {}", edim("==="), edim("Error logs"), edim("==="));
1244 for (date, _id, msg) in log_lines {
1245 eprintln!("{} {}", edim(&date), msg);
1246 }
1247 eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1248 }
1249
1250 Ok(())
1251}
1252
1253pub fn print_startup_logs(daemon_id: &DaemonId, from: DateTime<Local>) -> Result<()> {
1254 let daemon_ids = vec![daemon_id.clone()];
1255 let log_files = get_log_file_infos(&daemon_ids)?;
1256
1257 let from = from
1258 .with_nanosecond(0)
1259 .expect("0 is always valid for nanoseconds");
1260
1261 let log_lines = log_files
1262 .iter()
1263 .flat_map(
1264 |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), None) {
1265 Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1266 Err(e) => {
1267 error!("{}: {}", lf.path.display(), e);
1268 vec![]
1269 }
1270 },
1271 )
1272 .sorted_by_cached_key(|l| l.0.to_string())
1273 .collect_vec();
1274
1275 if !log_lines.is_empty() {
1276 eprintln!("\n{} {} {}", edim("==="), edim("Startup logs"), edim("==="));
1277 for (date, _id, msg) in log_lines {
1278 eprintln!("{} {}", edim(&date), msg);
1279 }
1280 eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1281 }
1282
1283 Ok(())
1284}