1use crate::daemon_id::DaemonId;
2use crate::pitchfork_toml::PitchforkToml;
3use crate::state_file::StateFile;
4use crate::ui::style::edim;
5use crate::watch_files::WatchFiles;
6use crate::{Result, env};
7use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
8use itertools::Itertools;
9use miette::IntoDiagnostic;
10use notify::RecursiveMode;
11use std::cmp::{Ordering, Reverse};
12use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap};
13use std::fs::{self, File};
14use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
15use std::path::{Path, PathBuf};
16use std::process::{Child, Command, Stdio};
17use std::time::Duration;
18use xx::regex;
19
20struct PagerConfig {
22 command: String,
23 args: Vec<String>,
24}
25
26impl PagerConfig {
27 fn new(start_at_end: bool) -> Self {
30 let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
31 let args = Self::build_args(&command, start_at_end);
32 Self { command, args }
33 }
34
35 fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
36 let mut args = vec![];
37 if pager == "less" {
38 args.push("-R".to_string());
39 if start_at_end {
40 args.push("+G".to_string());
41 }
42 }
43 args
44 }
45
46 fn spawn_piped(&self) -> io::Result<Child> {
48 Command::new(&self.command)
49 .args(&self.args)
50 .stdin(Stdio::piped())
51 .spawn()
52 }
53}
54
55fn format_log_line(date: &str, id: &str, msg: &str, single_daemon: bool) -> String {
58 if single_daemon {
59 format!("{} {}", edim(date), msg)
60 } else {
61 format!("{} {} {}", edim(date), id, msg)
62 }
63}
64
65#[derive(Debug)]
67struct LogEntry {
68 timestamp: String,
69 daemon: String,
70 message: String,
71 source_idx: usize, }
73
74impl PartialEq for LogEntry {
75 fn eq(&self, other: &Self) -> bool {
76 self.timestamp == other.timestamp
77 }
78}
79
80impl Eq for LogEntry {}
81
82impl PartialOrd for LogEntry {
83 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
84 Some(self.cmp(other))
85 }
86}
87
88impl Ord for LogEntry {
89 fn cmp(&self, other: &Self) -> Ordering {
90 self.timestamp.cmp(&other.timestamp)
91 }
92}
93
94struct StreamingMerger<I>
97where
98 I: Iterator<Item = (String, String)>,
99{
100 sources: Vec<(String, I)>, heap: BinaryHeap<Reverse<LogEntry>>, }
103
104impl<I> StreamingMerger<I>
105where
106 I: Iterator<Item = (String, String)>,
107{
108 fn new() -> Self {
109 Self {
110 sources: Vec::new(),
111 heap: BinaryHeap::new(),
112 }
113 }
114
115 fn add_source(&mut self, daemon_name: String, iter: I) {
116 self.sources.push((daemon_name, iter));
117 }
118
119 fn initialize(&mut self) {
120 for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
122 if let Some((timestamp, message)) = iter.next() {
123 self.heap.push(Reverse(LogEntry {
124 timestamp,
125 daemon: daemon.clone(),
126 message,
127 source_idx: idx,
128 }));
129 }
130 }
131 }
132}
133
134impl<I> Iterator for StreamingMerger<I>
135where
136 I: Iterator<Item = (String, String)>,
137{
138 type Item = (String, String, String); fn next(&mut self) -> Option<Self::Item> {
141 let Reverse(entry) = self.heap.pop()?;
143
144 let (daemon, iter) = &mut self.sources[entry.source_idx];
146 if let Some((timestamp, message)) = iter.next() {
147 self.heap.push(Reverse(LogEntry {
148 timestamp,
149 daemon: daemon.clone(),
150 message,
151 source_idx: entry.source_idx,
152 }));
153 }
154
155 Some((entry.timestamp, entry.daemon, entry.message))
156 }
157}
158
159struct StreamingLogParser {
161 reader: BufReader<File>,
162 current_entry: Option<(String, String)>,
163 finished: bool,
164}
165
166impl StreamingLogParser {
167 fn new(file: File) -> Self {
168 Self {
169 reader: BufReader::new(file),
170 current_entry: None,
171 finished: false,
172 }
173 }
174}
175
176impl Iterator for StreamingLogParser {
177 type Item = (String, String);
178
179 fn next(&mut self) -> Option<Self::Item> {
180 if self.finished {
181 return None;
182 }
183
184 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
185
186 loop {
187 let mut line = String::new();
188 match self.reader.read_line(&mut line) {
189 Ok(0) => {
190 self.finished = true;
192 return self.current_entry.take();
193 }
194 Ok(_) => {
195 if line.ends_with('\n') {
197 line.pop();
198 if line.ends_with('\r') {
199 line.pop();
200 }
201 }
202
203 if let Some(caps) = re.captures(&line) {
204 let date = match caps.get(1) {
205 Some(d) => d.as_str().to_string(),
206 None => continue,
207 };
208 let msg = match caps.get(3) {
209 Some(m) => m.as_str().to_string(),
210 None => continue,
211 };
212
213 let prev = self.current_entry.take();
215 self.current_entry = Some((date, msg));
216
217 if prev.is_some() {
218 return prev;
219 }
220 } else {
222 if let Some((_, ref mut msg)) = self.current_entry {
224 msg.push('\n');
225 msg.push_str(&line);
226 }
227 }
228 }
229 Err(_) => {
230 self.finished = true;
231 return self.current_entry.take();
232 }
233 }
234 }
235 }
236}
237
238#[derive(Debug, clap::Args)]
240#[clap(
241 visible_alias = "l",
242 verbatim_doc_comment,
243 long_about = "\
244Displays logs for daemon(s)
245
246Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
247and include timestamps for filtering.
248
249Examples:
250 pitchfork logs api Show all logs for 'api' (paged if needed)
251 pitchfork logs api worker Show logs for multiple daemons
252 pitchfork logs Show logs for all daemons
253 pitchfork logs api -n 50 Show last 50 lines
254 pitchfork logs api --follow Follow logs in real-time
255 pitchfork logs api --since '2024-01-15 10:00:00'
256 Show logs since a specific time (forward)
257 pitchfork logs api --since '10:30:00'
258 Show logs since 10:30:00 today
259 pitchfork logs api --since '10:30' --until '12:00'
260 Show logs since 10:30:00 until 12:00:00 today
261 pitchfork logs api --since 5min Show logs from last 5 minutes
262 pitchfork logs api --raw Output raw log lines without formatting
263 pitchfork logs api --raw -n 100 Output last 100 raw log lines
264 pitchfork logs api --clear Delete logs for 'api'
265 pitchfork logs --clear Delete logs for all daemons"
266)]
267pub struct Logs {
268 id: Vec<String>,
270
271 #[clap(short, long)]
273 clear: bool,
274
275 #[clap(short)]
280 n: Option<usize>,
281
282 #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
284 tail: bool,
285
286 #[clap(short = 's', long)]
293 since: Option<String>,
294
295 #[clap(short = 'u', long)]
301 until: Option<String>,
302
303 #[clap(long)]
305 no_pager: bool,
306
307 #[clap(long)]
309 raw: bool,
310}
311
312impl Logs {
313 pub async fn run(&self) -> Result<()> {
314 migrate_legacy_log_dirs();
317
318 let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
320 get_all_daemon_ids()?
322 } else {
323 PitchforkToml::resolve_ids(&self.id)?
324 };
325
326 if self.clear {
327 for id in &resolved_ids {
328 let path = id.log_path();
329 if path.exists() {
330 xx::file::create(&path)?;
331 }
332 }
333 return Ok(());
334 }
335
336 let from = if let Some(since) = self.since.as_ref() {
337 Some(parse_time_input(since, true)?)
338 } else {
339 None
340 };
341 let to = if let Some(until) = self.until.as_ref() {
342 Some(parse_time_input(until, false)?)
343 } else {
344 None
345 };
346
347 self.print_existing_logs(&resolved_ids, from, to)?;
348 if self.tail {
349 tail_logs(&resolved_ids).await?;
350 }
351
352 Ok(())
353 }
354
355 fn print_existing_logs(
356 &self,
357 resolved_ids: &[DaemonId],
358 from: Option<DateTime<Local>>,
359 to: Option<DateTime<Local>>,
360 ) -> Result<()> {
361 let log_files = get_log_file_infos(resolved_ids)?;
362 trace!("log files for: {}", log_files.keys().join(", "));
363 let single_daemon = resolved_ids.len() == 1;
364 let has_time_filter = from.is_some() || to.is_some();
365
366 if has_time_filter {
367 let mut log_lines = self.collect_log_lines_forward(&log_files, from, to)?;
368
369 if let Some(n) = self.n {
370 let len = log_lines.len();
371 if len > n {
372 log_lines = log_lines.into_iter().skip(len - n).collect_vec();
373 }
374 }
375
376 self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
377 } else if let Some(n) = self.n {
378 let log_lines = self.collect_log_lines_reverse(&log_files, Some(n))?;
379 self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
380 } else {
381 self.stream_logs_to_pager(&log_files, single_daemon, self.raw)?;
382 }
383
384 Ok(())
385 }
386
387 fn collect_log_lines_forward(
388 &self,
389 log_files: &BTreeMap<DaemonId, LogFile>,
390 from: Option<DateTime<Local>>,
391 to: Option<DateTime<Local>>,
392 ) -> Result<Vec<(String, String, String)>> {
393 let log_lines: Vec<(String, String, String)> = log_files
394 .iter()
395 .flat_map(
396 |(name, lf)| match read_lines_in_time_range(&lf.path, from, to) {
397 Ok(lines) => merge_log_lines(&name.qualified(), lines, false),
398 Err(e) => {
399 error!("{}: {}", lf.path.display(), e);
400 vec![]
401 }
402 },
403 )
404 .sorted_by_cached_key(|l| l.0.to_string())
405 .collect_vec();
406
407 Ok(log_lines)
408 }
409
410 fn collect_log_lines_reverse(
411 &self,
412 log_files: &BTreeMap<DaemonId, LogFile>,
413 limit: Option<usize>,
414 ) -> Result<Vec<(String, String, String)>> {
415 let log_lines: Vec<(String, String, String)> = log_files
416 .iter()
417 .flat_map(|(daemon_id, lf)| {
418 let rev = match xx::file::open(&lf.path) {
419 Ok(f) => rev_lines::RevLines::new(f),
420 Err(e) => {
421 error!("{}: {}", lf.path.display(), e);
422 return vec![];
423 }
424 };
425 let lines = rev.into_iter().filter_map(Result::ok);
426 let lines = match limit {
427 Some(n) => lines.take(n).collect_vec(),
428 None => lines.collect_vec(),
429 };
430 merge_log_lines(&daemon_id.qualified(), lines, true)
431 })
432 .sorted_by_cached_key(|l| l.0.to_string())
433 .collect_vec();
434
435 let log_lines = match limit {
436 Some(n) => {
437 let len = log_lines.len();
438 if len > n {
439 log_lines.into_iter().skip(len - n).collect_vec()
440 } else {
441 log_lines
442 }
443 }
444 None => log_lines,
445 };
446
447 Ok(log_lines)
448 }
449
450 fn output_logs(
451 &self,
452 log_lines: Vec<(String, String, String)>,
453 single_daemon: bool,
454 has_time_filter: bool,
455 raw: bool,
456 ) -> Result<()> {
457 if log_lines.is_empty() {
458 return Ok(());
459 }
460
461 if raw {
463 for (date, id, msg) in log_lines {
464 if single_daemon {
465 println!("{date} {msg}");
466 } else {
467 println!("{date} {id} {msg}");
468 }
469 }
470 return Ok(());
471 }
472
473 let use_pager = !self.no_pager && should_use_pager(log_lines.len());
474
475 if use_pager {
476 self.output_with_pager(log_lines, single_daemon, has_time_filter)?;
477 } else {
478 for (date, id, msg) in log_lines {
479 println!("{}", format_log_line(&date, &id, &msg, single_daemon));
480 }
481 }
482
483 Ok(())
484 }
485
486 fn output_with_pager(
487 &self,
488 log_lines: Vec<(String, String, String)>,
489 single_daemon: bool,
490 has_time_filter: bool,
491 ) -> Result<()> {
492 let pager_config = PagerConfig::new(!has_time_filter);
494
495 match pager_config.spawn_piped() {
496 Ok(mut child) => {
497 if let Some(stdin) = child.stdin.as_mut() {
498 for (date, id, msg) in log_lines {
499 let line =
500 format!("{}\n", format_log_line(&date, &id, &msg, single_daemon));
501 if stdin.write_all(line.as_bytes()).is_err() {
502 break;
503 }
504 }
505 let _ = child.wait();
506 } else {
507 debug!("Failed to get pager stdin, falling back to direct output");
508 for (date, id, msg) in log_lines {
509 println!("{}", format_log_line(&date, &id, &msg, single_daemon));
510 }
511 }
512 }
513 Err(e) => {
514 debug!("Failed to spawn pager: {e}, falling back to direct output");
515 for (date, id, msg) in log_lines {
516 println!("{}", format_log_line(&date, &id, &msg, single_daemon));
517 }
518 }
519 }
520
521 Ok(())
522 }
523
524 fn stream_logs_to_pager(
525 &self,
526 log_files: &BTreeMap<DaemonId, LogFile>,
527 single_daemon: bool,
528 raw: bool,
529 ) -> Result<()> {
530 if !io::stdout().is_terminal() || self.no_pager || self.tail || raw {
531 return self.stream_logs_direct(log_files, single_daemon, raw);
532 }
533
534 let pager_config = PagerConfig::new(true); match pager_config.spawn_piped() {
537 Ok(mut child) => {
538 if let Some(stdin) = child.stdin.take() {
539 let log_files_clone: Vec<_> = log_files
541 .iter()
542 .map(|(daemon_id, lf)| (daemon_id.qualified(), lf.path.clone()))
543 .collect();
544 let single_daemon_clone = single_daemon;
545
546 std::thread::spawn(move || {
548 let mut writer = BufWriter::new(stdin);
549
550 if log_files_clone.len() == 1 {
552 let (name, path) = &log_files_clone[0];
553 let file = match File::open(path) {
554 Ok(f) => f,
555 Err(_) => return,
556 };
557 let parser = StreamingLogParser::new(file);
558 for (timestamp, message) in parser {
559 let output = format!(
560 "{}\n",
561 format_log_line(
562 ×tamp,
563 name,
564 &message,
565 single_daemon_clone
566 )
567 );
568 if writer.write_all(output.as_bytes()).is_err() {
569 return;
570 }
571 }
572 let _ = writer.flush();
573 return;
574 }
575
576 let mut merger: StreamingMerger<StreamingLogParser> =
578 StreamingMerger::new();
579
580 for (name, path) in log_files_clone {
581 let file = match File::open(&path) {
582 Ok(f) => f,
583 Err(_) => continue,
584 };
585 let parser = StreamingLogParser::new(file);
586 merger.add_source(name, parser);
587 }
588
589 merger.initialize();
591
592 for (timestamp, daemon, message) in merger {
594 let output = format!(
595 "{}\n",
596 format_log_line(×tamp, &daemon, &message, single_daemon_clone)
597 );
598 if writer.write_all(output.as_bytes()).is_err() {
599 return;
600 }
601 }
602
603 let _ = writer.flush();
604 });
605
606 let _ = child.wait();
607 } else {
608 debug!("Failed to get pager stdin, falling back to direct output");
609 return self.stream_logs_direct(log_files, single_daemon, raw);
610 }
611 }
612 Err(e) => {
613 debug!("Failed to spawn pager: {e}, falling back to direct output");
614 return self.stream_logs_direct(log_files, single_daemon, raw);
615 }
616 }
617
618 Ok(())
619 }
620
621 fn stream_logs_direct(
622 &self,
623 log_files: &BTreeMap<DaemonId, LogFile>,
624 single_daemon: bool,
625 raw: bool,
626 ) -> Result<()> {
627 if log_files.len() == 1 {
630 let (daemon_id, lf) = log_files.iter().next().unwrap();
631 let file = match File::open(&lf.path) {
632 Ok(f) => f,
633 Err(e) => {
634 error!("{}: {}", lf.path.display(), e);
635 return Ok(());
636 }
637 };
638 let reader = BufReader::new(file);
639 if raw {
640 for line in reader.lines() {
642 match line {
643 Ok(l) => {
644 if io::stdout().write_all(l.as_bytes()).is_err()
645 || io::stdout().write_all(b"\n").is_err()
646 {
647 return Ok(());
648 }
649 }
650 Err(_) => continue,
651 }
652 }
653 } else {
654 let parser = StreamingLogParser::new(File::open(&lf.path).into_diagnostic()?);
656 for (timestamp, message) in parser {
657 let output = format!(
658 "{}\n",
659 format_log_line(
660 ×tamp,
661 &daemon_id.qualified(),
662 &message,
663 single_daemon
664 )
665 );
666 if io::stdout().write_all(output.as_bytes()).is_err() {
667 return Ok(());
668 }
669 }
670 }
671 return Ok(());
672 }
673
674 let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
676
677 for (daemon_id, lf) in log_files {
678 let file = match File::open(&lf.path) {
679 Ok(f) => f,
680 Err(e) => {
681 error!("{}: {}", lf.path.display(), e);
682 continue;
683 }
684 };
685 let parser = StreamingLogParser::new(file);
686 merger.add_source(daemon_id.qualified(), parser);
687 }
688
689 merger.initialize();
691
692 for (timestamp, daemon, message) in merger {
694 let output = if raw {
695 if single_daemon {
696 format!("{timestamp} {message}\n")
697 } else {
698 format!("{timestamp} {daemon} {message}\n")
699 }
700 } else {
701 format!(
702 "{}\n",
703 format_log_line(×tamp, &daemon, &message, single_daemon)
704 )
705 };
706 if io::stdout().write_all(output.as_bytes()).is_err() {
707 return Ok(());
708 }
709 }
710
711 Ok(())
712 }
713}
714
715fn should_use_pager(line_count: usize) -> bool {
716 if !io::stdout().is_terminal() {
717 return false;
718 }
719
720 let terminal_height = get_terminal_height().unwrap_or(24);
721 line_count > terminal_height
722}
723
724fn get_terminal_height() -> Option<usize> {
725 if let Ok(rows) = std::env::var("LINES")
726 && let Ok(h) = rows.parse::<usize>()
727 {
728 return Some(h);
729 }
730
731 crossterm::terminal::size().ok().map(|(_, h)| h as usize)
732}
733
734fn read_lines_in_time_range(
735 path: &Path,
736 from: Option<DateTime<Local>>,
737 to: Option<DateTime<Local>>,
738) -> Result<Vec<String>> {
739 let mut file = File::open(path).into_diagnostic()?;
740 let file_size = file.metadata().into_diagnostic()?.len();
741
742 if file_size == 0 {
743 return Ok(vec![]);
744 }
745
746 let start_pos = if let Some(from_time) = from {
747 binary_search_log_position(&mut file, file_size, from_time, true)?
748 } else {
749 0
750 };
751
752 let end_pos = if let Some(to_time) = to {
753 binary_search_log_position(&mut file, file_size, to_time, false)?
754 } else {
755 file_size
756 };
757
758 if start_pos >= end_pos {
759 return Ok(vec![]);
760 }
761
762 file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
763 let mut reader = BufReader::new(&file);
764 let mut lines = Vec::new();
765 let mut current_pos = start_pos;
766
767 loop {
768 if current_pos >= end_pos {
769 break;
770 }
771
772 let mut line = String::new();
773 match reader.read_line(&mut line) {
774 Ok(0) => break,
775 Ok(bytes_read) => {
776 current_pos += bytes_read as u64;
777 if line.ends_with('\n') {
778 line.pop();
779 if line.ends_with('\r') {
780 line.pop();
781 }
782 }
783 lines.push(line);
784 }
785 Err(_) => break,
786 }
787 }
788
789 Ok(lines)
790}
791
792fn binary_search_log_position(
793 file: &mut File,
794 file_size: u64,
795 target_time: DateTime<Local>,
796 find_start: bool,
797) -> Result<u64> {
798 let mut low: u64 = 0;
799 let mut high: u64 = file_size;
800
801 while low < high {
802 let mid = low + (high - low) / 2;
803
804 let line_start = find_line_start(file, mid)?;
805
806 file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
807 let mut reader = BufReader::new(&*file);
808 let mut line = String::new();
809 let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
810 if bytes_read == 0 {
811 high = mid;
812 continue;
813 }
814
815 let line_time = extract_timestamp(&line);
816
817 match line_time {
818 Some(lt) => {
819 if find_start {
820 if lt < target_time {
821 low = line_start + bytes_read as u64;
822 } else {
823 high = line_start;
824 }
825 } else if lt <= target_time {
826 low = line_start + bytes_read as u64;
827 } else {
828 high = line_start;
829 }
830 }
831 None => {
832 low = line_start + bytes_read as u64;
833 }
834 }
835 }
836
837 find_line_start(file, low)
838}
839
840fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
841 if pos == 0 {
842 return Ok(0);
843 }
844
845 let mut search_pos = pos.saturating_sub(1);
847 const CHUNK_SIZE: usize = 8192;
848
849 loop {
850 let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
852 let len_u64 = search_pos - chunk_start + 1;
853 let len = len_u64 as usize;
854
855 file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
857 let mut buf = vec![0u8; len];
858 if file.read_exact(&mut buf).is_err() {
859 return Ok(0);
861 }
862
863 for (i, &b) in buf.iter().enumerate().rev() {
865 if b == b'\n' {
866 return Ok(chunk_start + i as u64 + 1);
867 }
868 }
869
870 if chunk_start == 0 {
873 return Ok(0);
874 }
875
876 search_pos = chunk_start - 1;
878 }
879}
880
881fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
882 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
883 re.captures(line)
884 .and_then(|caps| caps.get(1))
885 .and_then(|m| parse_datetime(m.as_str()).ok())
886}
887
888fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
889 let lines = if reverse {
890 lines.into_iter().rev().collect()
891 } else {
892 lines
893 };
894
895 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
896 lines
897 .into_iter()
898 .fold(vec![], |mut acc, line| match re.captures(&line) {
899 Some(caps) => {
900 let (date, msg) = match (caps.get(1), caps.get(3)) {
901 (Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
902 _ => return acc,
903 };
904 acc.push((date, id.to_string(), msg));
905 acc
906 }
907 None => {
908 if let Some(l) = acc.last_mut() {
909 l.2.push('\n');
910 l.2.push_str(&line);
911 }
912 acc
913 }
914 })
915}
916
917fn migrate_legacy_log_dirs() {
927 let known_safe_paths = known_daemon_safe_paths();
928 let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
929 Ok(d) => d,
930 Err(_) => return,
931 };
932 for dir in dirs {
933 if dir.starts_with(".") || !dir.is_dir() {
934 continue;
935 }
936 let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
937 Some(n) => n,
938 None => continue,
939 };
940 if name.contains("--") {
943 if DaemonId::from_safe_path(&name).is_ok() {
946 continue;
947 }
948 if known_safe_paths.contains(&name) {
951 continue;
952 }
953 warn!(
954 "Skipping invalid legacy log directory '{name}': contains '--' but is not a valid daemon safe-path"
955 );
956 continue;
957 }
958
959 let old_log = dir.join(format!("{name}.log"));
962 if !old_log.exists() {
963 continue;
964 }
965 if DaemonId::try_new("legacy", &name).is_err() {
966 warn!("Skipping invalid legacy log directory '{name}': not a valid daemon ID");
967 continue;
968 }
969
970 let new_name = format!("legacy--{name}");
971 let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
972 if new_dir.exists() {
974 continue;
975 }
976 if std::fs::rename(&dir, &new_dir).is_err() {
977 continue;
978 }
979 let old_log = new_dir.join(format!("{name}.log"));
981 let new_log = new_dir.join(format!("{new_name}.log"));
982 if old_log.exists() {
983 let _ = std::fs::rename(&old_log, &new_log);
984 }
985 debug!("Migrated legacy log dir '{name}' → '{new_name}'");
986 }
987}
988
989fn known_daemon_safe_paths() -> BTreeSet<String> {
990 let mut out = BTreeSet::new();
991
992 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
993 Ok(state) => {
994 for id in state.daemons.keys() {
995 out.insert(id.safe_path());
996 }
997 }
998 Err(e) => {
999 warn!("Failed to read state while checking known daemon IDs: {e}");
1000 }
1001 }
1002
1003 match PitchforkToml::all_merged() {
1004 Ok(config) => {
1005 for id in config.daemons.keys() {
1006 out.insert(id.safe_path());
1007 }
1008 }
1009 Err(e) => {
1010 warn!("Failed to read config while checking known daemon IDs: {e}");
1011 }
1012 }
1013
1014 out
1015}
1016
1017fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
1018 let mut ids = BTreeSet::new();
1019
1020 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1021 Ok(state) => ids.extend(state.daemons.keys().cloned()),
1022 Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
1023 }
1024
1025 match PitchforkToml::all_merged() {
1026 Ok(config) => ids.extend(config.daemons.keys().cloned()),
1027 Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
1028 }
1029
1030 Ok(ids
1031 .into_iter()
1032 .filter(|id| id.log_path().exists())
1033 .collect())
1034}
1035
1036fn get_log_file_infos(names: &[DaemonId]) -> Result<BTreeMap<DaemonId, LogFile>> {
1037 let mut out = BTreeMap::new();
1038
1039 for daemon_id in names {
1040 let path = daemon_id.log_path();
1041
1042 if !path.exists() {
1044 continue;
1045 }
1046
1047 let mut file = xx::file::open(&path)?;
1048 file.seek(SeekFrom::End(0)).into_diagnostic()?;
1051 let cur = file.stream_position().into_diagnostic()?;
1052
1053 out.insert(
1054 daemon_id.clone(),
1055 LogFile {
1056 _name: daemon_id.clone(),
1057 file,
1058 cur,
1059 path,
1060 },
1061 );
1062 }
1063
1064 Ok(out)
1065}
1066
1067pub async fn tail_logs(names: &[DaemonId]) -> Result<()> {
1068 let mut log_files = get_log_file_infos(names)?;
1069 let mut wf = WatchFiles::new(Duration::from_millis(10))?;
1070
1071 for lf in log_files.values() {
1072 wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
1073 }
1074
1075 let files_to_name: HashMap<PathBuf, DaemonId> = log_files
1076 .iter()
1077 .map(|(n, f)| (f.path.clone(), n.clone()))
1078 .collect();
1079
1080 while let Some(paths) = wf.rx.recv().await {
1081 let mut out = vec![];
1082 for path in paths {
1083 let Some(name) = files_to_name.get(&path) else {
1084 warn!("Unknown log file changed: {}", path.display());
1085 continue;
1086 };
1087 let Some(info) = log_files.get_mut(name) else {
1088 warn!("No log info for: {name}");
1089 continue;
1090 };
1091 info.file
1092 .seek(SeekFrom::Start(info.cur))
1093 .into_diagnostic()?;
1094 let reader = BufReader::new(&info.file);
1095 let lines = reader.lines().map_while(Result::ok).collect_vec();
1096 info.cur = info.file.stream_position().into_diagnostic()?;
1097 out.extend(merge_log_lines(&name.qualified(), lines, false));
1098 }
1099 let out = out
1100 .into_iter()
1101 .sorted_by_cached_key(|l| l.0.to_string())
1102 .collect_vec();
1103 for (date, name, msg) in out {
1104 println!("{} {} {}", edim(&date), name, msg);
1105 }
1106 }
1107 Ok(())
1108}
1109
1110struct LogFile {
1111 _name: DaemonId,
1112 path: PathBuf,
1113 file: fs::File,
1114 cur: u64,
1115}
1116
1117fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
1118 let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
1119 Local
1120 .from_local_datetime(&naive_dt)
1121 .single()
1122 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
1123}
1124
1125fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
1131 let s = s.trim();
1132
1133 if let Ok(dt) = parse_datetime(s) {
1135 return Ok(dt);
1136 }
1137
1138 if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
1140 return Local
1141 .from_local_datetime(&naive_dt)
1142 .single()
1143 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
1144 }
1145
1146 if let Ok(time) = parse_time_only(s) {
1150 let now = Local::now();
1151 let today = now.date_naive();
1152 let mut naive_dt = NaiveDateTime::new(today, time);
1153 let mut dt = Local
1154 .from_local_datetime(&naive_dt)
1155 .single()
1156 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1157
1158 if is_since
1161 && dt > now
1162 && let Some(yesterday) = today.pred_opt()
1163 {
1164 naive_dt = NaiveDateTime::new(yesterday, time);
1165 dt = Local
1166 .from_local_datetime(&naive_dt)
1167 .single()
1168 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1169 }
1170 return Ok(dt);
1171 }
1172
1173 if let Ok(duration) = humantime::parse_duration(s) {
1174 let now = Local::now();
1175 let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
1176 return Ok(target);
1177 }
1178
1179 Err(miette::miette!(
1180 "Invalid time format: '{}'. Expected formats:\n\
1181 - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
1182 - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
1183 - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
1184 s
1185 ))
1186}
1187
1188fn parse_time_only(s: &str) -> Result<NaiveTime> {
1189 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
1190 return Ok(time);
1191 }
1192
1193 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
1194 return Ok(time);
1195 }
1196
1197 Err(miette::miette!("Invalid time format: '{}'", s))
1198}
1199
1200pub fn print_logs_for_time_range(
1201 daemon_id: &DaemonId,
1202 from: DateTime<Local>,
1203 to: Option<DateTime<Local>>,
1204) -> Result<()> {
1205 let daemon_ids = vec![daemon_id.clone()];
1206 let log_files = get_log_file_infos(&daemon_ids)?;
1207
1208 let from = from
1209 .with_nanosecond(0)
1210 .expect("0 is always valid for nanoseconds");
1211 let to = to.map(|t| {
1212 t.with_nanosecond(0)
1213 .expect("0 is always valid for nanoseconds")
1214 });
1215
1216 let log_lines = log_files
1217 .iter()
1218 .flat_map(
1219 |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), to) {
1220 Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1221 Err(e) => {
1222 error!("{}: {}", lf.path.display(), e);
1223 vec![]
1224 }
1225 },
1226 )
1227 .sorted_by_cached_key(|l| l.0.to_string())
1228 .collect_vec();
1229
1230 if log_lines.is_empty() {
1231 eprintln!("No logs found for daemon '{daemon_id}' in the specified time range");
1232 } else {
1233 eprintln!("\n{} {} {}", edim("==="), edim("Error logs"), edim("==="));
1234 for (date, _id, msg) in log_lines {
1235 eprintln!("{} {}", edim(&date), msg);
1236 }
1237 eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1238 }
1239
1240 Ok(())
1241}
1242
1243pub fn print_startup_logs(daemon_id: &DaemonId, from: DateTime<Local>) -> Result<()> {
1244 let daemon_ids = vec![daemon_id.clone()];
1245 let log_files = get_log_file_infos(&daemon_ids)?;
1246
1247 let from = from
1248 .with_nanosecond(0)
1249 .expect("0 is always valid for nanoseconds");
1250
1251 let log_lines = log_files
1252 .iter()
1253 .flat_map(
1254 |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), None) {
1255 Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1256 Err(e) => {
1257 error!("{}: {}", lf.path.display(), e);
1258 vec![]
1259 }
1260 },
1261 )
1262 .sorted_by_cached_key(|l| l.0.to_string())
1263 .collect_vec();
1264
1265 if !log_lines.is_empty() {
1266 eprintln!("\n{} {} {}", edim("==="), edim("Startup logs"), edim("==="));
1267 for (date, _id, msg) in log_lines {
1268 eprintln!("{} {}", edim(&date), msg);
1269 }
1270 eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1271 }
1272
1273 Ok(())
1274}