1use crate::daemon_id::DaemonId;
2use crate::pitchfork_toml::{PitchforkToml, WatchMode};
3use crate::state_file::StateFile;
4use crate::ui::style::edim;
5use crate::watch_files::WatchFiles;
6use crate::{Result, env};
7use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
8use itertools::Itertools;
9use miette::IntoDiagnostic;
10use notify::RecursiveMode;
11use std::cmp::{Ordering, Reverse};
12use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap};
13use std::fs::{self, File};
14use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
15use std::path::{Path, PathBuf};
16use std::process::{Child, Command, Stdio};
17use std::time::Duration;
18use xx::regex;
19
20struct PagerConfig {
22 command: String,
23 args: Vec<String>,
24}
25
26impl PagerConfig {
27 fn new(start_at_end: bool) -> Self {
30 let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
31 let args = Self::build_args(&command, start_at_end);
32 Self { command, args }
33 }
34
35 fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
36 let mut args = vec![];
37 if pager == "less" {
38 args.push("-R".to_string());
39 if start_at_end {
40 args.push("+G".to_string());
41 }
42 }
43 args
44 }
45
46 fn spawn_piped(&self) -> io::Result<Child> {
48 Command::new(&self.command)
49 .args(&self.args)
50 .stdin(Stdio::piped())
51 .spawn()
52 }
53}
54
55fn format_log_line(date: &str, id: &str, msg: &str, single_daemon: bool) -> String {
58 if single_daemon {
59 format!("{} {}", edim(date), msg)
60 } else {
61 format!("{} {} {}", edim(date), id, msg)
62 }
63}
64
65#[derive(Debug)]
67struct LogEntry {
68 timestamp: String,
69 daemon: String,
70 message: String,
71 source_idx: usize, }
73
74impl PartialEq for LogEntry {
75 fn eq(&self, other: &Self) -> bool {
76 self.timestamp == other.timestamp
77 }
78}
79
80impl Eq for LogEntry {}
81
82impl PartialOrd for LogEntry {
83 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
84 Some(self.cmp(other))
85 }
86}
87
88impl Ord for LogEntry {
89 fn cmp(&self, other: &Self) -> Ordering {
90 self.timestamp.cmp(&other.timestamp)
91 }
92}
93
94struct StreamingMerger<I>
97where
98 I: Iterator<Item = (String, String)>,
99{
100 sources: Vec<(String, I)>, heap: BinaryHeap<Reverse<LogEntry>>, }
103
104impl<I> StreamingMerger<I>
105where
106 I: Iterator<Item = (String, String)>,
107{
108 fn new() -> Self {
109 Self {
110 sources: Vec::new(),
111 heap: BinaryHeap::new(),
112 }
113 }
114
115 fn add_source(&mut self, daemon_name: String, iter: I) {
116 self.sources.push((daemon_name, iter));
117 }
118
119 fn initialize(&mut self) {
120 for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
122 if let Some((timestamp, message)) = iter.next() {
123 self.heap.push(Reverse(LogEntry {
124 timestamp,
125 daemon: daemon.clone(),
126 message,
127 source_idx: idx,
128 }));
129 }
130 }
131 }
132}
133
134impl<I> Iterator for StreamingMerger<I>
135where
136 I: Iterator<Item = (String, String)>,
137{
138 type Item = (String, String, String); fn next(&mut self) -> Option<Self::Item> {
141 let Reverse(entry) = self.heap.pop()?;
143
144 let (daemon, iter) = &mut self.sources[entry.source_idx];
146 if let Some((timestamp, message)) = iter.next() {
147 self.heap.push(Reverse(LogEntry {
148 timestamp,
149 daemon: daemon.clone(),
150 message,
151 source_idx: entry.source_idx,
152 }));
153 }
154
155 Some((entry.timestamp, entry.daemon, entry.message))
156 }
157}
158
159struct StreamingLogParser {
161 reader: BufReader<File>,
162 current_entry: Option<(String, String)>,
163 finished: bool,
164}
165
166impl StreamingLogParser {
167 fn new(file: File) -> Self {
168 Self {
169 reader: BufReader::new(file),
170 current_entry: None,
171 finished: false,
172 }
173 }
174}
175
176impl Iterator for StreamingLogParser {
177 type Item = (String, String);
178
179 fn next(&mut self) -> Option<Self::Item> {
180 if self.finished {
181 return None;
182 }
183
184 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
185
186 loop {
187 let mut line = String::new();
188 match self.reader.read_line(&mut line) {
189 Ok(0) => {
190 self.finished = true;
192 return self.current_entry.take();
193 }
194 Ok(_) => {
195 if line.ends_with('\n') {
197 line.pop();
198 if line.ends_with('\r') {
199 line.pop();
200 }
201 }
202
203 if let Some(caps) = re.captures(&line) {
204 let date = match caps.get(1) {
205 Some(d) => d.as_str().to_string(),
206 None => continue,
207 };
208 let msg = match caps.get(3) {
209 Some(m) => m.as_str().to_string(),
210 None => continue,
211 };
212
213 let prev = self.current_entry.take();
215 self.current_entry = Some((date, msg));
216
217 if prev.is_some() {
218 return prev;
219 }
220 } else {
222 if let Some((_, ref mut msg)) = self.current_entry {
224 msg.push('\n');
225 msg.push_str(&line);
226 }
227 }
228 }
229 Err(_) => {
230 self.finished = true;
231 return self.current_entry.take();
232 }
233 }
234 }
235 }
236}
237
238#[derive(Debug, clap::Args)]
240#[clap(
241 visible_alias = "l",
242 verbatim_doc_comment,
243 long_about = "\
244Displays logs for daemon(s)
245
246Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
247and include timestamps for filtering.
248
249Examples:
250 pitchfork logs api Show all logs for 'api' (paged if needed)
251 pitchfork logs api worker Show logs for multiple daemons
252 pitchfork logs Show logs for all daemons
253 pitchfork logs api -n 50 Show last 50 lines
254 pitchfork logs api --follow Follow logs in real-time
255 pitchfork logs api --since '2024-01-15 10:00:00'
256 Show logs since a specific time (forward)
257 pitchfork logs api --since '10:30:00'
258 Show logs since 10:30:00 today
259 pitchfork logs api --since '10:30' --until '12:00'
260 Show logs since 10:30:00 until 12:00:00 today
261 pitchfork logs api --since 5min Show logs from last 5 minutes
262 pitchfork logs api --raw Output raw log lines without formatting
263 pitchfork logs api --raw -n 100 Output last 100 raw log lines
264 pitchfork logs api --clear Delete logs for 'api'
265 pitchfork logs --clear Delete logs for all daemons"
266)]
267pub struct Logs {
268 id: Vec<String>,
270
271 #[clap(short, long)]
273 clear: bool,
274
275 #[clap(short)]
280 n: Option<usize>,
281
282 #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
284 tail: bool,
285
286 #[clap(short = 's', long)]
293 since: Option<String>,
294
295 #[clap(short = 'u', long)]
301 until: Option<String>,
302
303 #[clap(long)]
305 no_pager: bool,
306
307 #[clap(long)]
309 raw: bool,
310}
311
312impl Logs {
313 pub async fn run(&self) -> Result<()> {
314 migrate_legacy_log_dirs();
317
318 let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
320 get_all_daemon_ids()?
322 } else {
323 PitchforkToml::resolve_ids(&self.id)?
324 };
325
326 if self.clear {
327 for id in &resolved_ids {
328 let path = id.log_path();
329 if path.exists() {
330 xx::file::create(&path)?;
331 }
332 }
333 return Ok(());
334 }
335
336 let from = if let Some(since) = self.since.as_ref() {
337 Some(parse_time_input(since, true)?)
338 } else {
339 None
340 };
341 let to = if let Some(until) = self.until.as_ref() {
342 Some(parse_time_input(until, false)?)
343 } else {
344 None
345 };
346
347 self.print_existing_logs(&resolved_ids, from, to)?;
348 if self.tail {
349 tail_logs(&resolved_ids).await?;
350 }
351
352 Ok(())
353 }
354
355 fn print_existing_logs(
356 &self,
357 resolved_ids: &[DaemonId],
358 from: Option<DateTime<Local>>,
359 to: Option<DateTime<Local>>,
360 ) -> Result<()> {
361 let log_files = get_log_file_infos(resolved_ids)?;
362 trace!("log files for: {}", log_files.keys().join(", "));
363 let single_daemon = resolved_ids.len() == 1;
364 let has_time_filter = from.is_some() || to.is_some();
365
366 if has_time_filter {
367 let mut log_lines = self.collect_log_lines_forward(&log_files, from, to)?;
368
369 if let Some(n) = self.n {
370 let len = log_lines.len();
371 if len > n {
372 log_lines = log_lines.into_iter().skip(len - n).collect_vec();
373 }
374 }
375
376 self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
377 } else if let Some(n) = self.n {
378 let log_lines = self.collect_log_lines_reverse(&log_files, Some(n))?;
379 self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
380 } else {
381 self.stream_logs_to_pager(&log_files, single_daemon, self.raw)?;
382 }
383
384 Ok(())
385 }
386
387 fn collect_log_lines_forward(
388 &self,
389 log_files: &BTreeMap<DaemonId, LogFile>,
390 from: Option<DateTime<Local>>,
391 to: Option<DateTime<Local>>,
392 ) -> Result<Vec<(String, String, String)>> {
393 let log_lines: Vec<(String, String, String)> = log_files
394 .iter()
395 .flat_map(
396 |(name, lf)| match read_lines_in_time_range(&lf.path, from, to) {
397 Ok(lines) => merge_log_lines(&name.qualified(), lines, false),
398 Err(e) => {
399 error!("{}: {}", lf.path.display(), e);
400 vec![]
401 }
402 },
403 )
404 .sorted_by_cached_key(|l| l.0.to_string())
405 .collect_vec();
406
407 Ok(log_lines)
408 }
409
410 fn collect_log_lines_reverse(
411 &self,
412 log_files: &BTreeMap<DaemonId, LogFile>,
413 limit: Option<usize>,
414 ) -> Result<Vec<(String, String, String)>> {
415 let log_lines: Vec<(String, String, String)> = log_files
416 .iter()
417 .flat_map(|(daemon_id, lf)| {
418 let rev = match xx::file::open(&lf.path) {
419 Ok(f) => rev_lines::RevLines::new(f),
420 Err(e) => {
421 error!("{}: {}", lf.path.display(), e);
422 return vec![];
423 }
424 };
425 let lines = rev.into_iter().filter_map(Result::ok);
426 let lines = match limit {
427 Some(n) => lines.take(n).collect_vec(),
428 None => lines.collect_vec(),
429 };
430 merge_log_lines(&daemon_id.qualified(), lines, true)
431 })
432 .sorted_by_cached_key(|l| l.0.to_string())
433 .collect_vec();
434
435 let log_lines = match limit {
436 Some(n) => {
437 let len = log_lines.len();
438 if len > n {
439 log_lines.into_iter().skip(len - n).collect_vec()
440 } else {
441 log_lines
442 }
443 }
444 None => log_lines,
445 };
446
447 Ok(log_lines)
448 }
449
450 fn output_logs(
451 &self,
452 log_lines: Vec<(String, String, String)>,
453 single_daemon: bool,
454 has_time_filter: bool,
455 raw: bool,
456 ) -> Result<()> {
457 if log_lines.is_empty() {
458 return Ok(());
459 }
460
461 if raw {
463 for (date, id, msg) in log_lines {
464 if single_daemon {
465 println!("{date} {msg}");
466 } else {
467 println!("{date} {id} {msg}");
468 }
469 }
470 return Ok(());
471 }
472
473 let use_pager = !self.no_pager && should_use_pager(log_lines.len());
474
475 if use_pager {
476 self.output_with_pager(log_lines, single_daemon, has_time_filter)?;
477 } else {
478 for (date, id, msg) in log_lines {
479 println!("{}", format_log_line(&date, &id, &msg, single_daemon));
480 }
481 }
482
483 Ok(())
484 }
485
486 fn output_with_pager(
487 &self,
488 log_lines: Vec<(String, String, String)>,
489 single_daemon: bool,
490 has_time_filter: bool,
491 ) -> Result<()> {
492 let pager_config = PagerConfig::new(!has_time_filter);
494
495 match pager_config.spawn_piped() {
496 Ok(mut child) => {
497 if let Some(stdin) = child.stdin.as_mut() {
498 for (date, id, msg) in log_lines {
499 let line =
500 format!("{}\n", format_log_line(&date, &id, &msg, single_daemon));
501 if stdin.write_all(line.as_bytes()).is_err() {
502 break;
503 }
504 }
505 let _ = child.wait();
506 } else {
507 debug!("Failed to get pager stdin, falling back to direct output");
508 for (date, id, msg) in log_lines {
509 println!("{}", format_log_line(&date, &id, &msg, single_daemon));
510 }
511 }
512 }
513 Err(e) => {
514 debug!("Failed to spawn pager: {e}, falling back to direct output");
515 for (date, id, msg) in log_lines {
516 println!("{}", format_log_line(&date, &id, &msg, single_daemon));
517 }
518 }
519 }
520
521 Ok(())
522 }
523
524 fn stream_logs_to_pager(
525 &self,
526 log_files: &BTreeMap<DaemonId, LogFile>,
527 single_daemon: bool,
528 raw: bool,
529 ) -> Result<()> {
530 if !io::stdout().is_terminal() || self.no_pager || self.tail || raw {
531 return self.stream_logs_direct(log_files, single_daemon, raw);
532 }
533
534 let pager_config = PagerConfig::new(true); match pager_config.spawn_piped() {
537 Ok(mut child) => {
538 if let Some(stdin) = child.stdin.take() {
539 let log_files_clone: Vec<_> = log_files
541 .iter()
542 .map(|(daemon_id, lf)| (daemon_id.qualified(), lf.path.clone()))
543 .collect();
544 let single_daemon_clone = single_daemon;
545
546 std::thread::spawn(move || {
548 let mut writer = BufWriter::new(stdin);
549
550 if log_files_clone.len() == 1 {
552 let (name, path) = &log_files_clone[0];
553 let file = match File::open(path) {
554 Ok(f) => f,
555 Err(_) => return,
556 };
557 let parser = StreamingLogParser::new(file);
558 for (timestamp, message) in parser {
559 let output = format!(
560 "{}\n",
561 format_log_line(
562 ×tamp,
563 name,
564 &message,
565 single_daemon_clone
566 )
567 );
568 if writer.write_all(output.as_bytes()).is_err() {
569 return;
570 }
571 }
572 let _ = writer.flush();
573 return;
574 }
575
576 let mut merger: StreamingMerger<StreamingLogParser> =
578 StreamingMerger::new();
579
580 for (name, path) in log_files_clone {
581 let file = match File::open(&path) {
582 Ok(f) => f,
583 Err(_) => continue,
584 };
585 let parser = StreamingLogParser::new(file);
586 merger.add_source(name, parser);
587 }
588
589 merger.initialize();
591
592 for (timestamp, daemon, message) in merger {
594 let output = format!(
595 "{}\n",
596 format_log_line(×tamp, &daemon, &message, single_daemon_clone)
597 );
598 if writer.write_all(output.as_bytes()).is_err() {
599 return;
600 }
601 }
602
603 let _ = writer.flush();
604 });
605
606 let _ = child.wait();
607 } else {
608 debug!("Failed to get pager stdin, falling back to direct output");
609 return self.stream_logs_direct(log_files, single_daemon, raw);
610 }
611 }
612 Err(e) => {
613 debug!("Failed to spawn pager: {e}, falling back to direct output");
614 return self.stream_logs_direct(log_files, single_daemon, raw);
615 }
616 }
617
618 Ok(())
619 }
620
621 fn stream_logs_direct(
622 &self,
623 log_files: &BTreeMap<DaemonId, LogFile>,
624 single_daemon: bool,
625 raw: bool,
626 ) -> Result<()> {
627 if log_files.len() == 1 {
630 let (daemon_id, lf) = log_files.iter().next().unwrap();
631 let file = match File::open(&lf.path) {
632 Ok(f) => f,
633 Err(e) => {
634 error!("{}: {}", lf.path.display(), e);
635 return Ok(());
636 }
637 };
638 let reader = BufReader::new(file);
639 if raw {
640 for line in reader.lines() {
642 match line {
643 Ok(l) => {
644 if io::stdout().write_all(l.as_bytes()).is_err()
645 || io::stdout().write_all(b"\n").is_err()
646 {
647 return Ok(());
648 }
649 }
650 Err(_) => continue,
651 }
652 }
653 } else {
654 let parser = StreamingLogParser::new(File::open(&lf.path).into_diagnostic()?);
656 for (timestamp, message) in parser {
657 let output = format!(
658 "{}\n",
659 format_log_line(
660 ×tamp,
661 &daemon_id.qualified(),
662 &message,
663 single_daemon
664 )
665 );
666 if io::stdout().write_all(output.as_bytes()).is_err() {
667 return Ok(());
668 }
669 }
670 }
671 return Ok(());
672 }
673
674 let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
676
677 for (daemon_id, lf) in log_files {
678 let file = match File::open(&lf.path) {
679 Ok(f) => f,
680 Err(e) => {
681 error!("{}: {}", lf.path.display(), e);
682 continue;
683 }
684 };
685 let parser = StreamingLogParser::new(file);
686 merger.add_source(daemon_id.qualified(), parser);
687 }
688
689 merger.initialize();
691
692 for (timestamp, daemon, message) in merger {
694 let output = if raw {
695 if single_daemon {
696 format!("{timestamp} {message}\n")
697 } else {
698 format!("{timestamp} {daemon} {message}\n")
699 }
700 } else {
701 format!(
702 "{}\n",
703 format_log_line(×tamp, &daemon, &message, single_daemon)
704 )
705 };
706 if io::stdout().write_all(output.as_bytes()).is_err() {
707 return Ok(());
708 }
709 }
710
711 Ok(())
712 }
713}
714
715fn should_use_pager(line_count: usize) -> bool {
716 if !io::stdout().is_terminal() {
717 return false;
718 }
719
720 let terminal_height = get_terminal_height().unwrap_or(24);
721 line_count > terminal_height
722}
723
724fn get_terminal_height() -> Option<usize> {
725 if let Ok(rows) = std::env::var("LINES")
726 && let Ok(h) = rows.parse::<usize>()
727 {
728 return Some(h);
729 }
730
731 crossterm::terminal::size().ok().map(|(_, h)| h as usize)
732}
733
734fn read_lines_in_time_range(
735 path: &Path,
736 from: Option<DateTime<Local>>,
737 to: Option<DateTime<Local>>,
738) -> Result<Vec<String>> {
739 let mut file = File::open(path).into_diagnostic()?;
740 let file_size = file.metadata().into_diagnostic()?.len();
741
742 if file_size == 0 {
743 return Ok(vec![]);
744 }
745
746 let start_pos = if let Some(from_time) = from {
747 binary_search_log_position(&mut file, file_size, from_time, true)?
748 } else {
749 0
750 };
751
752 let end_pos = if let Some(to_time) = to {
753 binary_search_log_position(&mut file, file_size, to_time, false)?
754 } else {
755 file_size
756 };
757
758 if start_pos >= end_pos {
759 return Ok(vec![]);
760 }
761
762 file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
763 let mut reader = BufReader::new(&file);
764 let mut lines = Vec::new();
765 let mut current_pos = start_pos;
766
767 loop {
768 if current_pos >= end_pos {
769 break;
770 }
771
772 let mut line = String::new();
773 match reader.read_line(&mut line) {
774 Ok(0) => break,
775 Ok(bytes_read) => {
776 current_pos += bytes_read as u64;
777 if line.ends_with('\n') {
778 line.pop();
779 if line.ends_with('\r') {
780 line.pop();
781 }
782 }
783 lines.push(line);
784 }
785 Err(_) => break,
786 }
787 }
788
789 Ok(lines)
790}
791
792fn binary_search_log_position(
793 file: &mut File,
794 file_size: u64,
795 target_time: DateTime<Local>,
796 find_start: bool,
797) -> Result<u64> {
798 let mut low: u64 = 0;
799 let mut high: u64 = file_size;
800
801 while low < high {
802 let mid = low + (high - low) / 2;
803
804 let line_start = find_line_start(file, mid)?;
805
806 file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
807 let mut reader = BufReader::new(&*file);
808 let mut line = String::new();
809 let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
810 if bytes_read == 0 {
811 high = mid;
812 continue;
813 }
814
815 let line_time = extract_timestamp(&line);
816
817 match line_time {
818 Some(lt) => {
819 if find_start {
820 if lt < target_time {
821 low = line_start + bytes_read as u64;
822 } else {
823 high = line_start;
824 }
825 } else if lt <= target_time {
826 low = line_start + bytes_read as u64;
827 } else {
828 high = line_start;
829 }
830 }
831 None => {
832 low = line_start + bytes_read as u64;
833 }
834 }
835 }
836
837 find_line_start(file, low)
838}
839
840fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
841 if pos == 0 {
842 return Ok(0);
843 }
844
845 let mut search_pos = pos.saturating_sub(1);
847 const CHUNK_SIZE: usize = 8192;
848
849 loop {
850 let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
852 let len_u64 = search_pos - chunk_start + 1;
853 let len = len_u64 as usize;
854
855 file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
857 let mut buf = vec![0u8; len];
858 if file.read_exact(&mut buf).is_err() {
859 return Ok(0);
861 }
862
863 for (i, &b) in buf.iter().enumerate().rev() {
865 if b == b'\n' {
866 return Ok(chunk_start + i as u64 + 1);
867 }
868 }
869
870 if chunk_start == 0 {
873 return Ok(0);
874 }
875
876 search_pos = chunk_start - 1;
878 }
879}
880
881fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
882 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
883 re.captures(line)
884 .and_then(|caps| caps.get(1))
885 .and_then(|m| parse_datetime(m.as_str()).ok())
886}
887
888fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
889 let lines = if reverse {
890 lines.into_iter().rev().collect()
891 } else {
892 lines
893 };
894
895 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
896 lines
897 .into_iter()
898 .fold(vec![], |mut acc, line| match re.captures(&line) {
899 Some(caps) => {
900 let (date, msg) = match (caps.get(1), caps.get(3)) {
901 (Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
902 _ => return acc,
903 };
904 acc.push((date, id.to_string(), msg));
905 acc
906 }
907 None => {
908 if let Some(l) = acc.last_mut() {
909 l.2.push('\n');
910 l.2.push_str(&line);
911 }
912 acc
913 }
914 })
915}
916
917fn migrate_legacy_log_dirs() {
927 let known_safe_paths = known_daemon_safe_paths();
928 let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
929 Ok(d) => d,
930 Err(_) => return,
931 };
932 for dir in dirs {
933 if dir.starts_with(".") || !dir.is_dir() {
934 continue;
935 }
936 let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
937 Some(n) => n,
938 None => continue,
939 };
940 if name.contains("--") {
943 if DaemonId::from_safe_path(&name).is_ok() {
946 continue;
947 }
948 if known_safe_paths.contains(&name) {
951 continue;
952 }
953 warn!(
954 "Skipping invalid legacy log directory '{name}': contains '--' but is not a valid daemon safe-path"
955 );
956 continue;
957 }
958
959 let old_log = dir.join(format!("{name}.log"));
962 if !old_log.exists() {
963 continue;
964 }
965 if DaemonId::try_new("legacy", &name).is_err() {
966 warn!("Skipping invalid legacy log directory '{name}': not a valid daemon ID");
967 continue;
968 }
969
970 let new_name = format!("legacy--{name}");
971 let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
972 if new_dir.exists() {
974 continue;
975 }
976 if std::fs::rename(&dir, &new_dir).is_err() {
977 continue;
978 }
979 let old_log = new_dir.join(format!("{name}.log"));
981 let new_log = new_dir.join(format!("{new_name}.log"));
982 if old_log.exists() {
983 let _ = std::fs::rename(&old_log, &new_log);
984 }
985 debug!("Migrated legacy log dir '{name}' → '{new_name}'");
986 }
987}
988
989fn known_daemon_safe_paths() -> BTreeSet<String> {
990 let mut out = BTreeSet::new();
991
992 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
993 Ok(state) => {
994 for id in state.daemons.keys() {
995 out.insert(id.safe_path());
996 }
997 }
998 Err(e) => {
999 warn!("Failed to read state while checking known daemon IDs: {e}");
1000 }
1001 }
1002
1003 match PitchforkToml::all_merged() {
1004 Ok(config) => {
1005 for id in config.daemons.keys() {
1006 out.insert(id.safe_path());
1007 }
1008 }
1009 Err(e) => {
1010 warn!("Failed to read config while checking known daemon IDs: {e}");
1011 }
1012 }
1013
1014 out
1015}
1016
1017fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
1018 let mut ids = BTreeSet::new();
1019
1020 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1021 Ok(state) => ids.extend(state.daemons.keys().cloned()),
1022 Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
1023 }
1024
1025 match PitchforkToml::all_merged() {
1026 Ok(config) => ids.extend(config.daemons.keys().cloned()),
1027 Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
1028 }
1029
1030 Ok(ids
1031 .into_iter()
1032 .filter(|id| id.log_path().exists())
1033 .collect())
1034}
1035
1036fn get_log_file_infos(names: &[DaemonId]) -> Result<BTreeMap<DaemonId, LogFile>> {
1037 let mut out = BTreeMap::new();
1038
1039 for daemon_id in names {
1040 let path = daemon_id.log_path();
1041
1042 if !path.exists() {
1044 continue;
1045 }
1046
1047 let mut file = xx::file::open(&path)?;
1048 file.seek(SeekFrom::End(0)).into_diagnostic()?;
1051 let cur = file.stream_position().into_diagnostic()?;
1052
1053 out.insert(
1054 daemon_id.clone(),
1055 LogFile {
1056 _name: daemon_id.clone(),
1057 file,
1058 cur,
1059 path,
1060 },
1061 );
1062 }
1063
1064 Ok(out)
1065}
1066
1067pub async fn tail_logs(names: &[DaemonId]) -> Result<()> {
1068 let mut log_files = get_log_file_infos(names)?;
1069 let mut wf = WatchFiles::new(
1070 Duration::from_millis(10),
1071 WatchMode::Native,
1072 Duration::from_millis(500),
1073 )?;
1074
1075 for lf in log_files.values() {
1076 wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
1077 }
1078
1079 let files_to_name: HashMap<PathBuf, DaemonId> = log_files
1080 .iter()
1081 .map(|(n, f)| (f.path.clone(), n.clone()))
1082 .collect();
1083
1084 while let Some(paths) = wf.rx.recv().await {
1085 let mut out = vec![];
1086 for path in paths {
1087 let Some(name) = files_to_name.get(&path) else {
1088 warn!("Unknown log file changed: {}", path.display());
1089 continue;
1090 };
1091 let Some(info) = log_files.get_mut(name) else {
1092 warn!("No log info for: {name}");
1093 continue;
1094 };
1095 info.file
1096 .seek(SeekFrom::Start(info.cur))
1097 .into_diagnostic()?;
1098 let reader = BufReader::new(&info.file);
1099 let lines = reader.lines().map_while(Result::ok).collect_vec();
1100 info.cur = info.file.stream_position().into_diagnostic()?;
1101 out.extend(merge_log_lines(&name.qualified(), lines, false));
1102 }
1103 let out = out
1104 .into_iter()
1105 .sorted_by_cached_key(|l| l.0.to_string())
1106 .collect_vec();
1107 for (date, name, msg) in out {
1108 println!("{} {} {}", edim(&date), name, msg);
1109 }
1110 }
1111 Ok(())
1112}
1113
1114struct LogFile {
1115 _name: DaemonId,
1116 path: PathBuf,
1117 file: fs::File,
1118 cur: u64,
1119}
1120
1121fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
1122 let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
1123 Local
1124 .from_local_datetime(&naive_dt)
1125 .single()
1126 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
1127}
1128
1129fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
1135 let s = s.trim();
1136
1137 if let Ok(dt) = parse_datetime(s) {
1139 return Ok(dt);
1140 }
1141
1142 if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
1144 return Local
1145 .from_local_datetime(&naive_dt)
1146 .single()
1147 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
1148 }
1149
1150 if let Ok(time) = parse_time_only(s) {
1154 let now = Local::now();
1155 let today = now.date_naive();
1156 let mut naive_dt = NaiveDateTime::new(today, time);
1157 let mut dt = Local
1158 .from_local_datetime(&naive_dt)
1159 .single()
1160 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1161
1162 if is_since
1165 && dt > now
1166 && let Some(yesterday) = today.pred_opt()
1167 {
1168 naive_dt = NaiveDateTime::new(yesterday, time);
1169 dt = Local
1170 .from_local_datetime(&naive_dt)
1171 .single()
1172 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1173 }
1174 return Ok(dt);
1175 }
1176
1177 if let Ok(duration) = humantime::parse_duration(s) {
1178 let now = Local::now();
1179 let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
1180 return Ok(target);
1181 }
1182
1183 Err(miette::miette!(
1184 "Invalid time format: '{}'. Expected formats:\n\
1185 - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
1186 - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
1187 - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
1188 s
1189 ))
1190}
1191
1192fn parse_time_only(s: &str) -> Result<NaiveTime> {
1193 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
1194 return Ok(time);
1195 }
1196
1197 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
1198 return Ok(time);
1199 }
1200
1201 Err(miette::miette!("Invalid time format: '{}'", s))
1202}
1203
1204pub fn print_logs_for_time_range(
1205 daemon_id: &DaemonId,
1206 from: DateTime<Local>,
1207 to: Option<DateTime<Local>>,
1208) -> Result<()> {
1209 let daemon_ids = vec![daemon_id.clone()];
1210 let log_files = get_log_file_infos(&daemon_ids)?;
1211
1212 let from = from
1213 .with_nanosecond(0)
1214 .expect("0 is always valid for nanoseconds");
1215 let to = to.map(|t| {
1216 t.with_nanosecond(0)
1217 .expect("0 is always valid for nanoseconds")
1218 });
1219
1220 let log_lines = log_files
1221 .iter()
1222 .flat_map(
1223 |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), to) {
1224 Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1225 Err(e) => {
1226 error!("{}: {}", lf.path.display(), e);
1227 vec![]
1228 }
1229 },
1230 )
1231 .sorted_by_cached_key(|l| l.0.to_string())
1232 .collect_vec();
1233
1234 if log_lines.is_empty() {
1235 eprintln!("No logs found for daemon '{daemon_id}' in the specified time range");
1236 } else {
1237 eprintln!("\n{} {} {}", edim("==="), edim("Error logs"), edim("==="));
1238 for (date, _id, msg) in log_lines {
1239 eprintln!("{} {}", edim(&date), msg);
1240 }
1241 eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1242 }
1243
1244 Ok(())
1245}
1246
1247pub fn print_startup_logs(daemon_id: &DaemonId, from: DateTime<Local>) -> Result<()> {
1248 let daemon_ids = vec![daemon_id.clone()];
1249 let log_files = get_log_file_infos(&daemon_ids)?;
1250
1251 let from = from
1252 .with_nanosecond(0)
1253 .expect("0 is always valid for nanoseconds");
1254
1255 let log_lines = log_files
1256 .iter()
1257 .flat_map(
1258 |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), None) {
1259 Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1260 Err(e) => {
1261 error!("{}: {}", lf.path.display(), e);
1262 vec![]
1263 }
1264 },
1265 )
1266 .sorted_by_cached_key(|l| l.0.to_string())
1267 .collect_vec();
1268
1269 if !log_lines.is_empty() {
1270 eprintln!("\n{} {} {}", edim("==="), edim("Startup logs"), edim("==="));
1271 for (date, _id, msg) in log_lines {
1272 eprintln!("{} {}", edim(&date), msg);
1273 }
1274 eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1275 }
1276
1277 Ok(())
1278}