1use crate::daemon_id::DaemonId;
2use crate::pitchfork_toml::{PitchforkToml, WatchMode};
3use crate::state_file::StateFile;
4use crate::ui::style::edim;
5use crate::watch_files::WatchFiles;
6use crate::{Result, env};
7use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
8use console;
9use itertools::Itertools;
10use miette::IntoDiagnostic;
11use notify::RecursiveMode;
12use std::cmp::{Ordering, Reverse};
13use std::collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap};
14use std::fs::{self, File};
15use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
16use std::path::{Path, PathBuf};
17use std::process::{Child, Command, Stdio};
18use std::time::Duration;
19use xx::regex;
20
21struct PagerConfig {
23 command: String,
24 args: Vec<String>,
25}
26
27impl PagerConfig {
28 fn new(start_at_end: bool) -> Self {
31 let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
32 let args = Self::build_args(&command, start_at_end);
33 Self { command, args }
34 }
35
36 fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
37 let mut args = vec![];
38 if pager == "less" {
39 args.push("-R".to_string());
40 if start_at_end {
41 args.push("+G".to_string());
42 }
43 }
44 args
45 }
46
47 fn spawn_piped(&self) -> io::Result<Child> {
49 Command::new(&self.command)
50 .args(&self.args)
51 .stdin(Stdio::piped())
52 .spawn()
53 }
54}
55
56fn format_log_line(
60 date: &str,
61 id: &str,
62 msg: &str,
63 single_daemon: bool,
64 strip_ansi: bool,
65) -> String {
66 let msg = if strip_ansi {
67 console::strip_ansi_codes(msg).to_string()
68 } else {
69 msg.to_string()
70 };
71 if single_daemon {
72 format!("{} {}", edim(date), msg)
73 } else {
74 format!("{} {} {}", edim(date), id, msg)
75 }
76}
77
78#[derive(Debug)]
80struct LogEntry {
81 timestamp: String,
82 daemon: String,
83 message: String,
84 source_idx: usize, }
86
87impl PartialEq for LogEntry {
88 fn eq(&self, other: &Self) -> bool {
89 self.timestamp == other.timestamp
90 }
91}
92
93impl Eq for LogEntry {}
94
95impl PartialOrd for LogEntry {
96 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
97 Some(self.cmp(other))
98 }
99}
100
101impl Ord for LogEntry {
102 fn cmp(&self, other: &Self) -> Ordering {
103 self.timestamp.cmp(&other.timestamp)
104 }
105}
106
107struct StreamingMerger<I>
110where
111 I: Iterator<Item = (String, String)>,
112{
113 sources: Vec<(String, I)>, heap: BinaryHeap<Reverse<LogEntry>>, }
116
117impl<I> StreamingMerger<I>
118where
119 I: Iterator<Item = (String, String)>,
120{
121 fn new() -> Self {
122 Self {
123 sources: Vec::new(),
124 heap: BinaryHeap::new(),
125 }
126 }
127
128 fn add_source(&mut self, daemon_name: String, iter: I) {
129 self.sources.push((daemon_name, iter));
130 }
131
132 fn initialize(&mut self) {
133 for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
135 if let Some((timestamp, message)) = iter.next() {
136 self.heap.push(Reverse(LogEntry {
137 timestamp,
138 daemon: daemon.clone(),
139 message,
140 source_idx: idx,
141 }));
142 }
143 }
144 }
145}
146
147impl<I> Iterator for StreamingMerger<I>
148where
149 I: Iterator<Item = (String, String)>,
150{
151 type Item = (String, String, String); fn next(&mut self) -> Option<Self::Item> {
154 let Reverse(entry) = self.heap.pop()?;
156
157 let (daemon, iter) = &mut self.sources[entry.source_idx];
159 if let Some((timestamp, message)) = iter.next() {
160 self.heap.push(Reverse(LogEntry {
161 timestamp,
162 daemon: daemon.clone(),
163 message,
164 source_idx: entry.source_idx,
165 }));
166 }
167
168 Some((entry.timestamp, entry.daemon, entry.message))
169 }
170}
171
172struct StreamingLogParser {
174 reader: BufReader<File>,
175 current_entry: Option<(String, String)>,
176 finished: bool,
177}
178
179impl StreamingLogParser {
180 fn new(file: File) -> Self {
181 Self {
182 reader: BufReader::new(file),
183 current_entry: None,
184 finished: false,
185 }
186 }
187}
188
189impl Iterator for StreamingLogParser {
190 type Item = (String, String);
191
192 fn next(&mut self) -> Option<Self::Item> {
193 if self.finished {
194 return None;
195 }
196
197 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
198
199 loop {
200 let mut line = String::new();
201 match self.reader.read_line(&mut line) {
202 Ok(0) => {
203 self.finished = true;
205 return self.current_entry.take();
206 }
207 Ok(_) => {
208 if line.ends_with('\n') {
210 line.pop();
211 if line.ends_with('\r') {
212 line.pop();
213 }
214 }
215
216 if let Some(caps) = re.captures(&line) {
217 let date = match caps.get(1) {
218 Some(d) => d.as_str().to_string(),
219 None => continue,
220 };
221 let msg = match caps.get(3) {
222 Some(m) => m.as_str().to_string(),
223 None => continue,
224 };
225
226 let prev = self.current_entry.take();
228 self.current_entry = Some((date, msg));
229
230 if prev.is_some() {
231 return prev;
232 }
233 } else {
235 if let Some((_, ref mut msg)) = self.current_entry {
237 msg.push('\n');
238 msg.push_str(&line);
239 }
240 }
241 }
242 Err(_) => {
243 self.finished = true;
244 return self.current_entry.take();
245 }
246 }
247 }
248 }
249}
250
251#[derive(Debug, clap::Args)]
253#[clap(
254 visible_alias = "l",
255 verbatim_doc_comment,
256 long_about = "\
257Displays logs for daemon(s)
258
259Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
260and include timestamps for filtering.
261
262Examples:
263 pitchfork logs api Show all logs for 'api' (paged if needed)
264 pitchfork logs api worker Show logs for multiple daemons
265 pitchfork logs Show logs for all daemons
266 pitchfork logs api -n 50 Show last 50 lines
267 pitchfork logs api --follow Follow logs in real-time
268 pitchfork logs api --since '2024-01-15 10:00:00'
269 Show logs since a specific time (forward)
270 pitchfork logs api --since '10:30:00'
271 Show logs since 10:30:00 today
272 pitchfork logs api --since '10:30' --until '12:00'
273 Show logs since 10:30:00 until 12:00:00 today
274 pitchfork logs api --since 5min Show logs from last 5 minutes
275 pitchfork logs api --raw Output raw log lines without formatting
276 pitchfork logs api --raw -n 100 Output last 100 raw log lines
277 pitchfork logs api --clear Delete logs for 'api'
278 pitchfork logs --clear Delete logs for all daemons"
279)]
280pub struct Logs {
281 id: Vec<String>,
283
284 #[clap(short, long)]
286 clear: bool,
287
288 #[clap(short)]
293 n: Option<usize>,
294
295 #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
297 tail: bool,
298
299 #[clap(short = 's', long)]
306 since: Option<String>,
307
308 #[clap(short = 'u', long)]
314 until: Option<String>,
315
316 #[clap(long)]
318 no_pager: bool,
319
320 #[clap(long)]
322 raw: bool,
323}
324
325impl Logs {
326 pub async fn run(&self) -> Result<()> {
327 migrate_legacy_log_dirs();
330
331 let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
333 get_all_daemon_ids()?
335 } else {
336 PitchforkToml::resolve_ids(&self.id)?
337 };
338
339 if self.clear {
340 for id in &resolved_ids {
341 let path = id.log_path();
342 if path.exists() {
343 xx::file::create(&path)?;
344 }
345 }
346 return Ok(());
347 }
348
349 let from = if let Some(since) = self.since.as_ref() {
350 Some(parse_time_input(since, true)?)
351 } else {
352 None
353 };
354 let to = if let Some(until) = self.until.as_ref() {
355 Some(parse_time_input(until, false)?)
356 } else {
357 None
358 };
359
360 self.print_existing_logs(&resolved_ids, from, to)?;
361 if self.tail {
362 tail_logs(&resolved_ids).await?;
363 }
364
365 Ok(())
366 }
367
368 fn print_existing_logs(
369 &self,
370 resolved_ids: &[DaemonId],
371 from: Option<DateTime<Local>>,
372 to: Option<DateTime<Local>>,
373 ) -> Result<()> {
374 let log_files = get_log_file_infos(resolved_ids)?;
375 trace!("log files for: {}", log_files.keys().join(", "));
376 let single_daemon = resolved_ids.len() == 1;
377 let has_time_filter = from.is_some() || to.is_some();
378
379 if has_time_filter {
380 let mut log_lines = self.collect_log_lines_forward(&log_files, from, to)?;
381
382 if let Some(n) = self.n {
383 let len = log_lines.len();
384 if len > n {
385 log_lines = log_lines.into_iter().skip(len - n).collect_vec();
386 }
387 }
388
389 self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
390 } else if let Some(n) = self.n {
391 let log_lines = self.collect_log_lines_reverse(&log_files, Some(n))?;
392 self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
393 } else {
394 self.stream_logs_to_pager(&log_files, single_daemon, self.raw)?;
395 }
396
397 Ok(())
398 }
399
400 fn collect_log_lines_forward(
401 &self,
402 log_files: &BTreeMap<DaemonId, LogFile>,
403 from: Option<DateTime<Local>>,
404 to: Option<DateTime<Local>>,
405 ) -> Result<Vec<(String, String, String)>> {
406 let log_lines: Vec<(String, String, String)> = log_files
407 .iter()
408 .flat_map(
409 |(name, lf)| match read_lines_in_time_range(&lf.path, from, to) {
410 Ok(lines) => merge_log_lines(&name.qualified(), lines, false),
411 Err(e) => {
412 error!("{}: {}", lf.path.display(), e);
413 vec![]
414 }
415 },
416 )
417 .sorted_by_cached_key(|l| l.0.to_string())
418 .collect_vec();
419
420 Ok(log_lines)
421 }
422
423 fn collect_log_lines_reverse(
424 &self,
425 log_files: &BTreeMap<DaemonId, LogFile>,
426 limit: Option<usize>,
427 ) -> Result<Vec<(String, String, String)>> {
428 let log_lines: Vec<(String, String, String)> = log_files
429 .iter()
430 .flat_map(|(daemon_id, lf)| {
431 let rev = match xx::file::open(&lf.path) {
432 Ok(f) => rev_lines::RevLines::new(f),
433 Err(e) => {
434 error!("{}: {}", lf.path.display(), e);
435 return vec![];
436 }
437 };
438 let lines = rev.into_iter().filter_map(Result::ok);
439 let lines = match limit {
440 Some(n) => lines.take(n).collect_vec(),
441 None => lines.collect_vec(),
442 };
443 merge_log_lines(&daemon_id.qualified(), lines, true)
444 })
445 .sorted_by_cached_key(|l| l.0.to_string())
446 .collect_vec();
447
448 let log_lines = match limit {
449 Some(n) => {
450 let len = log_lines.len();
451 if len > n {
452 log_lines.into_iter().skip(len - n).collect_vec()
453 } else {
454 log_lines
455 }
456 }
457 None => log_lines,
458 };
459
460 Ok(log_lines)
461 }
462
463 fn output_logs(
464 &self,
465 log_lines: Vec<(String, String, String)>,
466 single_daemon: bool,
467 has_time_filter: bool,
468 raw: bool,
469 ) -> Result<()> {
470 if log_lines.is_empty() {
471 return Ok(());
472 }
473
474 let strip_ansi = raw || !console::colors_enabled();
475
476 if raw {
478 for (date, id, msg) in log_lines {
479 let msg = if strip_ansi {
480 console::strip_ansi_codes(&msg).to_string()
481 } else {
482 msg
483 };
484 if single_daemon {
485 println!("{date} {msg}");
486 } else {
487 println!("{date} {id} {msg}");
488 }
489 }
490 return Ok(());
491 }
492
493 let use_pager = !self.no_pager && should_use_pager(log_lines.len());
494
495 if use_pager {
496 self.output_with_pager(log_lines, single_daemon, has_time_filter, strip_ansi)?;
497 } else {
498 for (date, id, msg) in log_lines {
499 println!(
500 "{}",
501 format_log_line(&date, &id, &msg, single_daemon, strip_ansi)
502 );
503 }
504 }
505
506 Ok(())
507 }
508
509 fn output_with_pager(
510 &self,
511 log_lines: Vec<(String, String, String)>,
512 single_daemon: bool,
513 has_time_filter: bool,
514 strip_ansi: bool,
515 ) -> Result<()> {
516 let pager_config = PagerConfig::new(!has_time_filter);
518
519 match pager_config.spawn_piped() {
520 Ok(mut child) => {
521 if let Some(stdin) = child.stdin.as_mut() {
522 for (date, id, msg) in log_lines {
523 let line = format!(
524 "{}\n",
525 format_log_line(&date, &id, &msg, single_daemon, strip_ansi)
526 );
527 if stdin.write_all(line.as_bytes()).is_err() {
528 break;
529 }
530 }
531 let _ = child.wait();
532 } else {
533 debug!("Failed to get pager stdin, falling back to direct output");
534 for (date, id, msg) in log_lines {
535 println!(
536 "{}",
537 format_log_line(&date, &id, &msg, single_daemon, strip_ansi)
538 );
539 }
540 }
541 }
542 Err(e) => {
543 debug!("Failed to spawn pager: {e}, falling back to direct output");
544 for (date, id, msg) in log_lines {
545 println!(
546 "{}",
547 format_log_line(&date, &id, &msg, single_daemon, strip_ansi)
548 );
549 }
550 }
551 }
552
553 Ok(())
554 }
555
556 fn stream_logs_to_pager(
557 &self,
558 log_files: &BTreeMap<DaemonId, LogFile>,
559 single_daemon: bool,
560 raw: bool,
561 ) -> Result<()> {
562 let strip_ansi = raw || !console::colors_enabled();
563
564 if !io::stdout().is_terminal() || self.no_pager || self.tail || raw {
565 return self.stream_logs_direct(log_files, single_daemon, raw, strip_ansi);
566 }
567
568 let pager_config = PagerConfig::new(true); match pager_config.spawn_piped() {
571 Ok(mut child) => {
572 if let Some(stdin) = child.stdin.take() {
573 let log_files_clone: Vec<_> = log_files
575 .iter()
576 .map(|(daemon_id, lf)| (daemon_id.qualified(), lf.path.clone()))
577 .collect();
578 let single_daemon_clone = single_daemon;
579 let strip_ansi_clone = strip_ansi;
580
581 std::thread::spawn(move || {
583 let mut writer = BufWriter::new(stdin);
584
585 if log_files_clone.len() == 1 {
587 let (name, path) = &log_files_clone[0];
588 let file = match File::open(path) {
589 Ok(f) => f,
590 Err(_) => return,
591 };
592 let parser = StreamingLogParser::new(file);
593 for (timestamp, message) in parser {
594 let output = format!(
595 "{}\n",
596 format_log_line(
597 ×tamp,
598 name,
599 &message,
600 single_daemon_clone,
601 strip_ansi_clone
602 )
603 );
604 if writer.write_all(output.as_bytes()).is_err() {
605 return;
606 }
607 }
608 let _ = writer.flush();
609 return;
610 }
611
612 let mut merger: StreamingMerger<StreamingLogParser> =
614 StreamingMerger::new();
615
616 for (name, path) in log_files_clone {
617 let file = match File::open(&path) {
618 Ok(f) => f,
619 Err(_) => continue,
620 };
621 let parser = StreamingLogParser::new(file);
622 merger.add_source(name, parser);
623 }
624
625 merger.initialize();
627
628 for (timestamp, daemon, message) in merger {
630 let output = format!(
631 "{}\n",
632 format_log_line(
633 ×tamp,
634 &daemon,
635 &message,
636 single_daemon_clone,
637 strip_ansi_clone
638 )
639 );
640 if writer.write_all(output.as_bytes()).is_err() {
641 return;
642 }
643 }
644
645 let _ = writer.flush();
646 });
647
648 let _ = child.wait();
649 } else {
650 debug!("Failed to get pager stdin, falling back to direct output");
651 return self.stream_logs_direct(log_files, single_daemon, raw, strip_ansi);
652 }
653 }
654 Err(e) => {
655 debug!("Failed to spawn pager: {e}, falling back to direct output");
656 return self.stream_logs_direct(log_files, single_daemon, raw, strip_ansi);
657 }
658 }
659
660 Ok(())
661 }
662
663 fn stream_logs_direct(
664 &self,
665 log_files: &BTreeMap<DaemonId, LogFile>,
666 single_daemon: bool,
667 raw: bool,
668 strip_ansi: bool,
669 ) -> Result<()> {
670 if log_files.len() == 1 {
673 let (daemon_id, lf) = log_files.iter().next().unwrap();
674 let file = match File::open(&lf.path) {
675 Ok(f) => f,
676 Err(e) => {
677 error!("{}: {}", lf.path.display(), e);
678 return Ok(());
679 }
680 };
681 let reader = BufReader::new(file);
682 if raw {
683 for line in reader.lines() {
685 match line {
686 Ok(l) => {
687 let l = if strip_ansi {
688 console::strip_ansi_codes(&l).to_string()
689 } else {
690 l
691 };
692 if io::stdout().write_all(l.as_bytes()).is_err()
693 || io::stdout().write_all(b"\n").is_err()
694 {
695 return Ok(());
696 }
697 }
698 Err(_) => continue,
699 }
700 }
701 } else {
702 let parser = StreamingLogParser::new(File::open(&lf.path).into_diagnostic()?);
704 for (timestamp, message) in parser {
705 let output = format!(
706 "{}\n",
707 format_log_line(
708 ×tamp,
709 &daemon_id.qualified(),
710 &message,
711 single_daemon,
712 strip_ansi
713 )
714 );
715 if io::stdout().write_all(output.as_bytes()).is_err() {
716 return Ok(());
717 }
718 }
719 }
720 return Ok(());
721 }
722
723 let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
725
726 for (daemon_id, lf) in log_files {
727 let file = match File::open(&lf.path) {
728 Ok(f) => f,
729 Err(e) => {
730 error!("{}: {}", lf.path.display(), e);
731 continue;
732 }
733 };
734 let parser = StreamingLogParser::new(file);
735 merger.add_source(daemon_id.qualified(), parser);
736 }
737
738 merger.initialize();
740
741 for (timestamp, daemon, message) in merger {
743 let output = if raw {
744 let message = if strip_ansi {
745 console::strip_ansi_codes(&message).to_string()
746 } else {
747 message
748 };
749 if single_daemon {
750 format!("{timestamp} {message}\n")
751 } else {
752 format!("{timestamp} {daemon} {message}\n")
753 }
754 } else {
755 format!(
756 "{}\n",
757 format_log_line(×tamp, &daemon, &message, single_daemon, strip_ansi)
758 )
759 };
760 if io::stdout().write_all(output.as_bytes()).is_err() {
761 return Ok(());
762 }
763 }
764
765 Ok(())
766 }
767}
768
769fn should_use_pager(line_count: usize) -> bool {
770 if !io::stdout().is_terminal() {
771 return false;
772 }
773
774 let terminal_height = get_terminal_height().unwrap_or(24);
775 line_count > terminal_height
776}
777
778fn get_terminal_height() -> Option<usize> {
779 if let Ok(rows) = std::env::var("LINES")
780 && let Ok(h) = rows.parse::<usize>()
781 {
782 return Some(h);
783 }
784
785 crossterm::terminal::size().ok().map(|(_, h)| h as usize)
786}
787
788fn read_lines_in_time_range(
789 path: &Path,
790 from: Option<DateTime<Local>>,
791 to: Option<DateTime<Local>>,
792) -> Result<Vec<String>> {
793 let mut file = File::open(path).into_diagnostic()?;
794 let file_size = file.metadata().into_diagnostic()?.len();
795
796 if file_size == 0 {
797 return Ok(vec![]);
798 }
799
800 let start_pos = if let Some(from_time) = from {
801 binary_search_log_position(&mut file, file_size, from_time, true)?
802 } else {
803 0
804 };
805
806 let end_pos = if let Some(to_time) = to {
807 binary_search_log_position(&mut file, file_size, to_time, false)?
808 } else {
809 file_size
810 };
811
812 if start_pos >= end_pos {
813 return Ok(vec![]);
814 }
815
816 file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
817 let mut reader = BufReader::new(&file);
818 let mut lines = Vec::new();
819 let mut current_pos = start_pos;
820
821 loop {
822 if current_pos >= end_pos {
823 break;
824 }
825
826 let mut line = String::new();
827 match reader.read_line(&mut line) {
828 Ok(0) => break,
829 Ok(bytes_read) => {
830 current_pos += bytes_read as u64;
831 if line.ends_with('\n') {
832 line.pop();
833 if line.ends_with('\r') {
834 line.pop();
835 }
836 }
837 lines.push(line);
838 }
839 Err(_) => break,
840 }
841 }
842
843 Ok(lines)
844}
845
846fn binary_search_log_position(
847 file: &mut File,
848 file_size: u64,
849 target_time: DateTime<Local>,
850 find_start: bool,
851) -> Result<u64> {
852 let mut low: u64 = 0;
853 let mut high: u64 = file_size;
854
855 while low < high {
856 let mid = low + (high - low) / 2;
857
858 let line_start = find_line_start(file, mid)?;
859
860 file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
861 let mut reader = BufReader::new(&*file);
862 let mut line = String::new();
863 let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
864 if bytes_read == 0 {
865 high = mid;
866 continue;
867 }
868
869 let line_time = extract_timestamp(&line);
870
871 match line_time {
872 Some(lt) => {
873 if find_start {
874 if lt < target_time {
875 low = line_start + bytes_read as u64;
876 } else {
877 high = line_start;
878 }
879 } else if lt <= target_time {
880 low = line_start + bytes_read as u64;
881 } else {
882 high = line_start;
883 }
884 }
885 None => {
886 low = line_start + bytes_read as u64;
887 }
888 }
889 }
890
891 find_line_start(file, low)
892}
893
894fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
895 if pos == 0 {
896 return Ok(0);
897 }
898
899 let mut search_pos = pos.saturating_sub(1);
901 const CHUNK_SIZE: usize = 8192;
902
903 loop {
904 let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
906 let len_u64 = search_pos - chunk_start + 1;
907 let len = len_u64 as usize;
908
909 file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
911 let mut buf = vec![0u8; len];
912 if file.read_exact(&mut buf).is_err() {
913 return Ok(0);
915 }
916
917 for (i, &b) in buf.iter().enumerate().rev() {
919 if b == b'\n' {
920 return Ok(chunk_start + i as u64 + 1);
921 }
922 }
923
924 if chunk_start == 0 {
927 return Ok(0);
928 }
929
930 search_pos = chunk_start - 1;
932 }
933}
934
935fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
936 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
937 re.captures(line)
938 .and_then(|caps| caps.get(1))
939 .and_then(|m| parse_datetime(m.as_str()).ok())
940}
941
942fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
943 let lines = if reverse {
944 lines.into_iter().rev().collect()
945 } else {
946 lines
947 };
948
949 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
950 lines
951 .into_iter()
952 .fold(vec![], |mut acc, line| match re.captures(&line) {
953 Some(caps) => {
954 let (date, msg) = match (caps.get(1), caps.get(3)) {
955 (Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
956 _ => return acc,
957 };
958 acc.push((date, id.to_string(), msg));
959 acc
960 }
961 None => {
962 if let Some(l) = acc.last_mut() {
963 l.2.push('\n');
964 l.2.push_str(&line);
965 }
966 acc
967 }
968 })
969}
970
971fn migrate_legacy_log_dirs() {
981 let known_safe_paths = known_daemon_safe_paths();
982 let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
983 Ok(d) => d,
984 Err(_) => return,
985 };
986 for dir in dirs {
987 if dir.starts_with(".") || !dir.is_dir() {
988 continue;
989 }
990 let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
991 Some(n) => n,
992 None => continue,
993 };
994 if name.contains("--") {
997 if DaemonId::from_safe_path(&name).is_ok() {
1000 continue;
1001 }
1002 if known_safe_paths.contains(&name) {
1005 continue;
1006 }
1007 warn!(
1008 "Skipping invalid legacy log directory '{name}': contains '--' but is not a valid daemon safe-path"
1009 );
1010 continue;
1011 }
1012
1013 let old_log = dir.join(format!("{name}.log"));
1016 if !old_log.exists() {
1017 continue;
1018 }
1019 if DaemonId::try_new("legacy", &name).is_err() {
1020 warn!("Skipping invalid legacy log directory '{name}': not a valid daemon ID");
1021 continue;
1022 }
1023
1024 let new_name = format!("legacy--{name}");
1025 let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
1026 if new_dir.exists() {
1028 continue;
1029 }
1030 if std::fs::rename(&dir, &new_dir).is_err() {
1031 continue;
1032 }
1033 let old_log = new_dir.join(format!("{name}.log"));
1035 let new_log = new_dir.join(format!("{new_name}.log"));
1036 if old_log.exists() {
1037 let _ = std::fs::rename(&old_log, &new_log);
1038 }
1039 debug!("Migrated legacy log dir '{name}' → '{new_name}'");
1040 }
1041}
1042
1043fn known_daemon_safe_paths() -> BTreeSet<String> {
1044 let mut out = BTreeSet::new();
1045
1046 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1047 Ok(state) => {
1048 for id in state.daemons.keys() {
1049 out.insert(id.safe_path());
1050 }
1051 }
1052 Err(e) => {
1053 warn!("Failed to read state while checking known daemon IDs: {e}");
1054 }
1055 }
1056
1057 match PitchforkToml::all_merged() {
1058 Ok(config) => {
1059 for id in config.daemons.keys() {
1060 out.insert(id.safe_path());
1061 }
1062 }
1063 Err(e) => {
1064 warn!("Failed to read config while checking known daemon IDs: {e}");
1065 }
1066 }
1067
1068 out
1069}
1070
1071fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
1072 let mut ids = BTreeSet::new();
1073
1074 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1075 Ok(state) => ids.extend(state.daemons.keys().cloned()),
1076 Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
1077 }
1078
1079 match PitchforkToml::all_merged() {
1080 Ok(config) => ids.extend(config.daemons.keys().cloned()),
1081 Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
1082 }
1083
1084 Ok(ids
1085 .into_iter()
1086 .filter(|id| id.log_path().exists())
1087 .collect())
1088}
1089
1090fn get_log_file_infos(names: &[DaemonId]) -> Result<BTreeMap<DaemonId, LogFile>> {
1091 let mut out = BTreeMap::new();
1092
1093 for daemon_id in names {
1094 let path = daemon_id.log_path();
1095
1096 if !path.exists() {
1098 continue;
1099 }
1100
1101 let mut file = xx::file::open(&path)?;
1102 file.seek(SeekFrom::End(0)).into_diagnostic()?;
1105 let cur = file.stream_position().into_diagnostic()?;
1106
1107 out.insert(
1108 daemon_id.clone(),
1109 LogFile {
1110 _name: daemon_id.clone(),
1111 file,
1112 cur,
1113 path,
1114 },
1115 );
1116 }
1117
1118 Ok(out)
1119}
1120
1121pub async fn tail_logs(names: &[DaemonId]) -> Result<()> {
1122 let mut log_files = get_log_file_infos(names)?;
1123 let mut wf = WatchFiles::new(
1124 Duration::from_millis(10),
1125 WatchMode::Native,
1126 Duration::from_millis(500),
1127 )?;
1128
1129 for lf in log_files.values() {
1130 wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
1131 }
1132
1133 let files_to_name: HashMap<PathBuf, DaemonId> = log_files
1134 .iter()
1135 .map(|(n, f)| (f.path.clone(), n.clone()))
1136 .collect();
1137
1138 while let Some(paths) = wf.rx.recv().await {
1139 let mut out = vec![];
1140 for path in paths {
1141 let Some(name) = files_to_name.get(&path) else {
1142 warn!("Unknown log file changed: {}", path.display());
1143 continue;
1144 };
1145 let Some(info) = log_files.get_mut(name) else {
1146 warn!("No log info for: {name}");
1147 continue;
1148 };
1149 info.file
1150 .seek(SeekFrom::Start(info.cur))
1151 .into_diagnostic()?;
1152 let reader = BufReader::new(&info.file);
1153 let lines = reader.lines().map_while(Result::ok).collect_vec();
1154 info.cur = info.file.stream_position().into_diagnostic()?;
1155 out.extend(merge_log_lines(&name.qualified(), lines, false));
1156 }
1157 let out = out
1158 .into_iter()
1159 .sorted_by_cached_key(|l| l.0.to_string())
1160 .collect_vec();
1161 for (date, name, msg) in out {
1162 println!("{} {} {}", edim(&date), name, msg);
1163 }
1164 }
1165 Ok(())
1166}
1167
1168struct LogFile {
1169 _name: DaemonId,
1170 path: PathBuf,
1171 file: fs::File,
1172 cur: u64,
1173}
1174
1175fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
1176 let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
1177 Local
1178 .from_local_datetime(&naive_dt)
1179 .single()
1180 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
1181}
1182
1183fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
1189 let s = s.trim();
1190
1191 if let Ok(dt) = parse_datetime(s) {
1193 return Ok(dt);
1194 }
1195
1196 if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
1198 return Local
1199 .from_local_datetime(&naive_dt)
1200 .single()
1201 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
1202 }
1203
1204 if let Ok(time) = parse_time_only(s) {
1208 let now = Local::now();
1209 let today = now.date_naive();
1210 let mut naive_dt = NaiveDateTime::new(today, time);
1211 let mut dt = Local
1212 .from_local_datetime(&naive_dt)
1213 .single()
1214 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1215
1216 if is_since
1219 && dt > now
1220 && let Some(yesterday) = today.pred_opt()
1221 {
1222 naive_dt = NaiveDateTime::new(yesterday, time);
1223 dt = Local
1224 .from_local_datetime(&naive_dt)
1225 .single()
1226 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1227 }
1228 return Ok(dt);
1229 }
1230
1231 if let Ok(duration) = humantime::parse_duration(s) {
1232 let now = Local::now();
1233 let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
1234 return Ok(target);
1235 }
1236
1237 Err(miette::miette!(
1238 "Invalid time format: '{}'. Expected formats:\n\
1239 - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
1240 - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
1241 - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
1242 s
1243 ))
1244}
1245
1246fn parse_time_only(s: &str) -> Result<NaiveTime> {
1247 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
1248 return Ok(time);
1249 }
1250
1251 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
1252 return Ok(time);
1253 }
1254
1255 Err(miette::miette!("Invalid time format: '{}'", s))
1256}
1257
1258pub fn print_logs_for_time_range(
1259 daemon_id: &DaemonId,
1260 from: DateTime<Local>,
1261 to: Option<DateTime<Local>>,
1262) -> Result<()> {
1263 let daemon_ids = vec![daemon_id.clone()];
1264 let log_files = get_log_file_infos(&daemon_ids)?;
1265
1266 let from = from
1267 .with_nanosecond(0)
1268 .expect("0 is always valid for nanoseconds");
1269 let to = to.map(|t| {
1270 t.with_nanosecond(0)
1271 .expect("0 is always valid for nanoseconds")
1272 });
1273
1274 let log_lines = log_files
1275 .iter()
1276 .flat_map(
1277 |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), to) {
1278 Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1279 Err(e) => {
1280 error!("{}: {}", lf.path.display(), e);
1281 vec![]
1282 }
1283 },
1284 )
1285 .sorted_by_cached_key(|l| l.0.to_string())
1286 .collect_vec();
1287
1288 if log_lines.is_empty() {
1289 eprintln!("No logs found for daemon '{daemon_id}' in the specified time range");
1290 } else {
1291 eprintln!("\n{} {} {}", edim("==="), edim("Error logs"), edim("==="));
1292 for (date, _id, msg) in log_lines {
1293 eprintln!("{} {}", edim(&date), msg);
1294 }
1295 eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1296 }
1297
1298 Ok(())
1299}
1300
1301pub fn print_startup_logs(daemon_id: &DaemonId, from: DateTime<Local>) -> Result<()> {
1302 let daemon_ids = vec![daemon_id.clone()];
1303 let log_files = get_log_file_infos(&daemon_ids)?;
1304
1305 let from = from
1306 .with_nanosecond(0)
1307 .expect("0 is always valid for nanoseconds");
1308
1309 let log_lines = log_files
1310 .iter()
1311 .flat_map(
1312 |(daemon_id, lf)| match read_lines_in_time_range(&lf.path, Some(from), None) {
1313 Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1314 Err(e) => {
1315 error!("{}: {}", lf.path.display(), e);
1316 vec![]
1317 }
1318 },
1319 )
1320 .sorted_by_cached_key(|l| l.0.to_string())
1321 .collect_vec();
1322
1323 if !log_lines.is_empty() {
1324 eprintln!("\n{} {} {}", edim("==="), edim("Startup logs"), edim("==="));
1325 for (date, _id, msg) in log_lines {
1326 eprintln!("{} {}", edim(&date), msg);
1327 }
1328 eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1329 }
1330
1331 Ok(())
1332}