1use crate::daemon_id::DaemonId;
2use crate::pitchfork_toml::PitchforkToml;
3use crate::state_file::StateFile;
4use crate::ui::style::{edim, estyle, ndim};
5use crate::{Result, env};
6use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
7use console;
8use itertools::Itertools;
9use miette::IntoDiagnostic;
10use std::cmp::{Ordering, Reverse};
11use std::collections::{BTreeSet, BinaryHeap};
12use std::fs::{self, File};
13use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
14use std::path::{Path, PathBuf};
15use std::process::{Child, Command, Stdio};
16use std::time::Duration;
17use xx::regex;
18
19struct PagerConfig {
21 command: String,
22 args: Vec<String>,
23}
24
25impl PagerConfig {
26 fn new(start_at_end: bool) -> Self {
29 let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
30 let args = Self::build_args(&command, start_at_end);
31 Self { command, args }
32 }
33
34 fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
35 let mut args = vec![];
36 if pager == "less" {
37 args.push("-R".to_string());
38 if start_at_end {
39 args.push("+G".to_string());
40 }
41 }
42 args
43 }
44
45 fn spawn_piped(&self) -> io::Result<Child> {
47 Command::new(&self.command)
48 .args(&self.args)
49 .stdin(Stdio::piped())
50 .spawn()
51 }
52}
53
54fn format_log_line(
60 date: &str,
61 id: &str,
62 msg: &str,
63 single_daemon: bool,
64 id_width: usize,
65 strip_ansi: bool,
66) -> String {
67 let msg = if strip_ansi {
68 console::strip_ansi_codes(msg).to_string()
69 } else {
70 msg.to_string()
71 };
72 if single_daemon {
73 format!("{} {}", ndim(date), msg)
74 } else {
75 let colors_on = !strip_ansi && console::colors_enabled();
76 let colored = dimmed_id(id, colors_on);
77 let padded = console::pad_str(&colored, id_width, console::Alignment::Left, None);
78 format!("{} {} {}", padded, ndim(date), msg)
79 }
80}
81
82fn dimmed_id(id: &str, colors_enabled: bool) -> String {
86 if !colors_enabled {
87 return id.to_string();
88 }
89 let colors = [
90 (180, 120, 120), (180, 160, 100), (120, 180, 120), (120, 180, 180), (180, 120, 180), (120, 160, 180), ];
97 let mut h: usize = 0x811C_9DC5; for b in id.bytes() {
99 h = h.wrapping_mul(0x0100_0193).wrapping_add(b as usize);
100 }
101 let (r, g, b) = colors[h % colors.len()];
102 format!("\x1b[2;38;2;{};{};{}m{}\x1b[0m", r, g, b, id)
103}
104
105#[derive(Debug)]
107struct LogEntry {
108 timestamp: String,
109 daemon: String,
110 message: String,
111 source_idx: usize, }
113
114impl PartialEq for LogEntry {
115 fn eq(&self, other: &Self) -> bool {
116 self.timestamp == other.timestamp
117 }
118}
119
120impl Eq for LogEntry {}
121
122impl PartialOrd for LogEntry {
123 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
124 Some(self.cmp(other))
125 }
126}
127
128impl Ord for LogEntry {
129 fn cmp(&self, other: &Self) -> Ordering {
130 self.timestamp.cmp(&other.timestamp)
131 }
132}
133
134struct StreamingMerger<I>
137where
138 I: Iterator<Item = (String, String)>,
139{
140 sources: Vec<(String, I)>, heap: BinaryHeap<Reverse<LogEntry>>, }
143
144impl<I> StreamingMerger<I>
145where
146 I: Iterator<Item = (String, String)>,
147{
148 fn new() -> Self {
149 Self {
150 sources: Vec::new(),
151 heap: BinaryHeap::new(),
152 }
153 }
154
155 fn add_source(&mut self, daemon_name: String, iter: I) {
156 self.sources.push((daemon_name, iter));
157 }
158
159 fn initialize(&mut self) {
160 for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
162 if let Some((timestamp, message)) = iter.next() {
163 self.heap.push(Reverse(LogEntry {
164 timestamp,
165 daemon: daemon.clone(),
166 message,
167 source_idx: idx,
168 }));
169 }
170 }
171 }
172}
173
174impl<I> Iterator for StreamingMerger<I>
175where
176 I: Iterator<Item = (String, String)>,
177{
178 type Item = (String, String, String); fn next(&mut self) -> Option<Self::Item> {
181 let Reverse(entry) = self.heap.pop()?;
183
184 let (daemon, iter) = &mut self.sources[entry.source_idx];
186 if let Some((timestamp, message)) = iter.next() {
187 self.heap.push(Reverse(LogEntry {
188 timestamp,
189 daemon: daemon.clone(),
190 message,
191 source_idx: entry.source_idx,
192 }));
193 }
194
195 Some((entry.timestamp, entry.daemon, entry.message))
196 }
197}
198
199struct StreamingLogParser {
201 reader: BufReader<File>,
202 current_entry: Option<(String, String)>,
203 finished: bool,
204}
205
206impl StreamingLogParser {
207 fn new(file: File) -> Self {
208 Self {
209 reader: BufReader::new(file),
210 current_entry: None,
211 finished: false,
212 }
213 }
214}
215
216impl Iterator for StreamingLogParser {
217 type Item = (String, String);
218
219 fn next(&mut self) -> Option<Self::Item> {
220 if self.finished {
221 return None;
222 }
223
224 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
225
226 loop {
227 let mut line = String::new();
228 match self.reader.read_line(&mut line) {
229 Ok(0) => {
230 self.finished = true;
232 return self.current_entry.take();
233 }
234 Ok(_) => {
235 if line.ends_with('\n') {
237 line.pop();
238 if line.ends_with('\r') {
239 line.pop();
240 }
241 }
242
243 if let Some(caps) = re.captures(&line) {
244 let date = match caps.get(1) {
245 Some(d) => d.as_str().to_string(),
246 None => continue,
247 };
248 let msg = match caps.get(3) {
249 Some(m) => m.as_str().to_string(),
250 None => continue,
251 };
252
253 let prev = self.current_entry.take();
255 self.current_entry = Some((date, msg));
256
257 if prev.is_some() {
258 return prev;
259 }
260 } else {
262 if let Some((_, ref mut msg)) = self.current_entry {
264 msg.push('\n');
265 msg.push_str(&line);
266 }
267 }
268 }
269 Err(_) => {
270 self.finished = true;
271 return self.current_entry.take();
272 }
273 }
274 }
275 }
276}
277
278#[derive(Debug, clap::Args)]
280#[clap(
281 visible_alias = "l",
282 verbatim_doc_comment,
283 long_about = "\
284Displays logs for daemon(s)
285
286Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
287and include timestamps for filtering.
288
289Examples:
290 pitchfork logs api Show all logs for 'api' (paged if needed)
291 pitchfork logs api worker Show logs for multiple daemons
292 pitchfork logs Show logs for all daemons
293 pitchfork logs api -n 50 Show last 50 lines
294 pitchfork logs api --follow Follow logs in real-time
295 pitchfork logs api --since '2024-01-15 10:00:00'
296 Show logs since a specific time (forward)
297 pitchfork logs api --since '10:30:00'
298 Show logs since 10:30:00 today
299 pitchfork logs api --since '10:30' --until '12:00'
300 Show logs since 10:30:00 until 12:00:00 today
301 pitchfork logs api --since 5min Show logs from last 5 minutes
302 pitchfork logs api --raw Output raw log lines without formatting
303 pitchfork logs api --raw -n 100 Output last 100 raw log lines
304 pitchfork logs api --clear Delete logs for 'api'
305 pitchfork logs --clear Delete logs for all daemons"
306)]
307pub struct Logs {
308 id: Vec<String>,
310
311 #[clap(short, long)]
313 clear: bool,
314
315 #[clap(short)]
320 n: Option<usize>,
321
322 #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
324 tail: bool,
325
326 #[clap(short = 's', long)]
333 since: Option<String>,
334
335 #[clap(short = 'u', long)]
341 until: Option<String>,
342
343 #[clap(long)]
345 no_pager: bool,
346
347 #[clap(long)]
349 raw: bool,
350}
351
352impl Logs {
353 pub async fn run(&self) -> Result<()> {
354 migrate_legacy_log_dirs();
357
358 let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
360 get_all_daemon_ids()?
362 } else {
363 PitchforkToml::resolve_ids(&self.id)?
364 };
365
366 if self.clear {
367 for id in &resolved_ids {
368 let path = id.log_path();
369 if path.exists() {
370 xx::file::create(&path)?;
371 }
372 }
373 return Ok(());
374 }
375
376 let from = if let Some(since) = self.since.as_ref() {
377 Some(parse_time_input(since, true)?)
378 } else {
379 None
380 };
381 let to = if let Some(until) = self.until.as_ref() {
382 Some(parse_time_input(until, false)?)
383 } else {
384 None
385 };
386
387 let single_daemon = resolved_ids.len() == 1;
388 self.print_existing_logs(&resolved_ids, from, to, single_daemon)?;
389 if self.tail {
390 tail_logs(&resolved_ids, single_daemon, true).await?;
391 }
392
393 Ok(())
394 }
395
396 fn print_existing_logs(
397 &self,
398 resolved_ids: &[DaemonId],
399 from: Option<DateTime<Local>>,
400 to: Option<DateTime<Local>>,
401 single_daemon: bool,
402 ) -> Result<()> {
403 let valid_ids: Vec<DaemonId> = resolved_ids
404 .iter()
405 .filter(|id| id.log_path().exists())
406 .cloned()
407 .collect();
408 let id_width = valid_ids
409 .iter()
410 .map(|id| id.qualified().len())
411 .max()
412 .unwrap_or(0);
413 trace!(
414 "log files for: {}",
415 valid_ids.iter().map(|id| id.qualified()).join(", ")
416 );
417 let has_time_filter = from.is_some() || to.is_some();
418
419 if has_time_filter {
420 let mut log_lines = self.collect_log_lines_forward(&valid_ids, from, to)?;
421
422 if let Some(n) = self.n {
423 let len = log_lines.len();
424 if len > n {
425 log_lines = log_lines.into_iter().skip(len - n).collect_vec();
426 }
427 }
428
429 self.output_logs(
430 log_lines,
431 single_daemon,
432 id_width,
433 has_time_filter,
434 self.raw,
435 )?;
436 } else if let Some(n) = self.n {
437 let log_lines = self.collect_log_lines_reverse(&valid_ids, Some(n))?;
438 self.output_logs(
439 log_lines,
440 single_daemon,
441 id_width,
442 has_time_filter,
443 self.raw,
444 )?;
445 } else {
446 self.stream_logs_to_pager(&valid_ids, single_daemon, id_width, self.raw)?;
447 }
448
449 Ok(())
450 }
451
452 fn collect_log_lines_forward(
453 &self,
454 ids: &[DaemonId],
455 from: Option<DateTime<Local>>,
456 to: Option<DateTime<Local>>,
457 ) -> Result<Vec<(String, String, String)>> {
458 let log_lines: Vec<(String, String, String)> = ids
459 .iter()
460 .flat_map(|id| {
461 let path = id.log_path();
462 match read_lines_in_time_range(&path, from, to) {
463 Ok(lines) => merge_log_lines(&id.qualified(), lines, false),
464 Err(e) => {
465 error!("{}: {}", path.display(), e);
466 vec![]
467 }
468 }
469 })
470 .sorted_by_cached_key(|l| l.0.to_string())
471 .collect_vec();
472
473 Ok(log_lines)
474 }
475
476 fn collect_log_lines_reverse(
477 &self,
478 ids: &[DaemonId],
479 limit: Option<usize>,
480 ) -> Result<Vec<(String, String, String)>> {
481 let log_lines: Vec<(String, String, String)> = ids
482 .iter()
483 .flat_map(|id| {
484 let path = id.log_path();
485 let rev = match xx::file::open(&path) {
486 Ok(f) => rev_lines::RevLines::new(f),
487 Err(e) => {
488 error!("{}: {}", path.display(), e);
489 return vec![];
490 }
491 };
492 let lines = rev.into_iter().filter_map(Result::ok);
493 let lines = match limit {
494 Some(n) => lines.take(n).collect_vec(),
495 None => lines.collect_vec(),
496 };
497 merge_log_lines(&id.qualified(), lines, true)
498 })
499 .sorted_by_cached_key(|l| l.0.to_string())
500 .collect_vec();
501
502 let log_lines = match limit {
503 Some(n) => {
504 let len = log_lines.len();
505 if len > n {
506 log_lines.into_iter().skip(len - n).collect_vec()
507 } else {
508 log_lines
509 }
510 }
511 None => log_lines,
512 };
513
514 Ok(log_lines)
515 }
516
517 fn output_logs(
518 &self,
519 log_lines: Vec<(String, String, String)>,
520 single_daemon: bool,
521 id_width: usize,
522 has_time_filter: bool,
523 raw: bool,
524 ) -> Result<()> {
525 if log_lines.is_empty() {
526 return Ok(());
527 }
528
529 let strip_ansi = raw || !console::colors_enabled();
530
531 if raw {
533 for (date, id, msg) in log_lines {
534 let line = format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi);
535 println!("{line}");
536 }
537 return Ok(());
538 }
539
540 let use_pager = !self.no_pager && should_use_pager(log_lines.len());
541
542 if use_pager {
543 self.output_with_pager(
544 log_lines,
545 single_daemon,
546 id_width,
547 has_time_filter,
548 strip_ansi,
549 )?;
550 } else {
551 for (date, id, msg) in log_lines {
552 println!(
553 "{}",
554 format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
555 );
556 }
557 }
558
559 Ok(())
560 }
561
562 fn output_with_pager(
563 &self,
564 log_lines: Vec<(String, String, String)>,
565 single_daemon: bool,
566 id_width: usize,
567 has_time_filter: bool,
568 strip_ansi: bool,
569 ) -> Result<()> {
570 let pager_config = PagerConfig::new(!has_time_filter);
572
573 match pager_config.spawn_piped() {
574 Ok(mut child) => {
575 if let Some(stdin) = child.stdin.as_mut() {
576 for (date, id, msg) in log_lines {
577 let line = format!(
578 "{}\n",
579 format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
580 );
581 if stdin.write_all(line.as_bytes()).is_err() {
582 break;
583 }
584 }
585 let _ = child.wait();
586 } else {
587 debug!("Failed to get pager stdin, falling back to direct output");
588 for (date, id, msg) in log_lines {
589 println!(
590 "{}",
591 format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
592 );
593 }
594 }
595 }
596 Err(e) => {
597 debug!("Failed to spawn pager: {e}, falling back to direct output");
598 for (date, id, msg) in log_lines {
599 println!(
600 "{}",
601 format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
602 );
603 }
604 }
605 }
606
607 Ok(())
608 }
609
610 fn stream_logs_to_pager(
611 &self,
612 ids: &[DaemonId],
613 single_daemon: bool,
614 id_width: usize,
615 raw: bool,
616 ) -> Result<()> {
617 let strip_ansi = raw || !console::colors_enabled();
618
619 if !io::stdout().is_terminal() || self.no_pager || self.tail || raw {
620 return self.stream_logs_direct(ids, single_daemon, id_width, raw, strip_ansi);
621 }
622
623 let pager_config = PagerConfig::new(true); match pager_config.spawn_piped() {
626 Ok(mut child) => {
627 if let Some(stdin) = child.stdin.take() {
628 let file_infos: Vec<_> = ids
630 .iter()
631 .map(|id| (id.qualified(), id.log_path()))
632 .collect();
633 let single_daemon_clone = single_daemon;
634 let strip_ansi_clone = strip_ansi;
635 let id_width_clone = id_width;
636
637 std::thread::spawn(move || {
639 let mut writer = BufWriter::new(stdin);
640
641 if file_infos.len() == 1 {
643 let (name, path) = &file_infos[0];
644 let file = match File::open(path) {
645 Ok(f) => f,
646 Err(_) => return,
647 };
648 let parser = StreamingLogParser::new(file);
649 for (timestamp, message) in parser {
650 let output = format!(
651 "{}\n",
652 format_log_line(
653 ×tamp,
654 name,
655 &message,
656 single_daemon_clone,
657 id_width_clone,
658 strip_ansi_clone
659 )
660 );
661 if writer.write_all(output.as_bytes()).is_err() {
662 return;
663 }
664 }
665 let _ = writer.flush();
666 return;
667 }
668
669 let mut merger: StreamingMerger<StreamingLogParser> =
671 StreamingMerger::new();
672
673 for (name, path) in file_infos {
674 let file = match File::open(&path) {
675 Ok(f) => f,
676 Err(_) => continue,
677 };
678 let parser = StreamingLogParser::new(file);
679 merger.add_source(name, parser);
680 }
681
682 merger.initialize();
684
685 for (timestamp, daemon, message) in merger {
687 let output = format!(
688 "{}\n",
689 format_log_line(
690 ×tamp,
691 &daemon,
692 &message,
693 single_daemon_clone,
694 id_width_clone,
695 strip_ansi_clone
696 )
697 );
698 if writer.write_all(output.as_bytes()).is_err() {
699 return;
700 }
701 }
702
703 let _ = writer.flush();
704 });
705
706 let _ = child.wait();
707 } else {
708 debug!("Failed to get pager stdin, falling back to direct output");
709 return self.stream_logs_direct(ids, single_daemon, id_width, raw, strip_ansi);
710 }
711 }
712 Err(e) => {
713 debug!("Failed to spawn pager: {e}, falling back to direct output");
714 return self.stream_logs_direct(ids, single_daemon, id_width, raw, strip_ansi);
715 }
716 }
717
718 Ok(())
719 }
720
721 fn stream_logs_direct(
722 &self,
723 ids: &[DaemonId],
724 single_daemon: bool,
725 id_width: usize,
726 raw: bool,
727 strip_ansi: bool,
728 ) -> Result<()> {
729 if ids.len() == 1 {
732 let daemon_id = &ids[0];
733 let path = daemon_id.log_path();
734 let file = match File::open(&path) {
735 Ok(f) => f,
736 Err(e) => {
737 error!("{}: {}", path.display(), e);
738 return Ok(());
739 }
740 };
741 let reader = BufReader::new(file);
742 if raw {
743 for line in reader.lines() {
745 match line {
746 Ok(l) => {
747 let l = if strip_ansi {
748 console::strip_ansi_codes(&l).to_string()
749 } else {
750 l
751 };
752 if io::stdout().write_all(l.as_bytes()).is_err()
753 || io::stdout().write_all(b"\n").is_err()
754 {
755 return Ok(());
756 }
757 }
758 Err(_) => continue,
759 }
760 }
761 } else {
762 let parser = StreamingLogParser::new(File::open(&path).into_diagnostic()?);
764 for (timestamp, message) in parser {
765 let output = format!(
766 "{}\n",
767 format_log_line(
768 ×tamp,
769 &daemon_id.qualified(),
770 &message,
771 single_daemon,
772 id_width,
773 strip_ansi
774 )
775 );
776 if io::stdout().write_all(output.as_bytes()).is_err() {
777 return Ok(());
778 }
779 }
780 }
781 return Ok(());
782 }
783
784 let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
786
787 for id in ids {
788 let path = id.log_path();
789 let file = match File::open(&path) {
790 Ok(f) => f,
791 Err(e) => {
792 error!("{}: {}", path.display(), e);
793 continue;
794 }
795 };
796 let parser = StreamingLogParser::new(file);
797 merger.add_source(id.qualified(), parser);
798 }
799
800 merger.initialize();
802
803 for (timestamp, daemon, message) in merger {
805 let output = format!(
806 "{}\n",
807 format_log_line(
808 ×tamp,
809 &daemon,
810 &message,
811 single_daemon,
812 id_width,
813 strip_ansi
814 )
815 );
816 if io::stdout().write_all(output.as_bytes()).is_err() {
817 return Ok(());
818 }
819 }
820
821 Ok(())
822 }
823}
824
825fn should_use_pager(line_count: usize) -> bool {
826 if !io::stdout().is_terminal() {
827 return false;
828 }
829
830 let terminal_height = get_terminal_height().unwrap_or(24);
831 line_count > terminal_height
832}
833
834fn get_terminal_height() -> Option<usize> {
835 if let Ok(rows) = std::env::var("LINES")
836 && let Ok(h) = rows.parse::<usize>()
837 {
838 return Some(h);
839 }
840
841 crossterm::terminal::size().ok().map(|(_, h)| h as usize)
842}
843
844fn read_lines_in_time_range(
845 path: &Path,
846 from: Option<DateTime<Local>>,
847 to: Option<DateTime<Local>>,
848) -> Result<Vec<String>> {
849 let mut file = File::open(path).into_diagnostic()?;
850 let file_size = file.metadata().into_diagnostic()?.len();
851
852 if file_size == 0 {
853 return Ok(vec![]);
854 }
855
856 let start_pos = if let Some(from_time) = from {
857 binary_search_log_position(&mut file, file_size, from_time, true)?
858 } else {
859 0
860 };
861
862 let end_pos = if let Some(to_time) = to {
863 binary_search_log_position(&mut file, file_size, to_time, false)?
864 } else {
865 file_size
866 };
867
868 if start_pos >= end_pos {
869 return Ok(vec![]);
870 }
871
872 file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
873 let mut reader = BufReader::new(&file);
874 let mut lines = Vec::new();
875 let mut current_pos = start_pos;
876
877 loop {
878 if current_pos >= end_pos {
879 break;
880 }
881
882 let mut line = String::new();
883 match reader.read_line(&mut line) {
884 Ok(0) => break,
885 Ok(bytes_read) => {
886 current_pos += bytes_read as u64;
887 if line.ends_with('\n') {
888 line.pop();
889 if line.ends_with('\r') {
890 line.pop();
891 }
892 }
893 lines.push(line);
894 }
895 Err(_) => break,
896 }
897 }
898
899 Ok(lines)
900}
901
902fn binary_search_log_position(
903 file: &mut File,
904 file_size: u64,
905 target_time: DateTime<Local>,
906 find_start: bool,
907) -> Result<u64> {
908 let mut low: u64 = 0;
909 let mut high: u64 = file_size;
910
911 while low < high {
912 let mid = low + (high - low) / 2;
913
914 let line_start = find_line_start(file, mid)?;
915
916 file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
917 let mut reader = BufReader::new(&*file);
918 let mut line = String::new();
919 let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
920 if bytes_read == 0 {
921 high = mid;
922 continue;
923 }
924
925 let line_time = extract_timestamp(&line);
926
927 match line_time {
928 Some(lt) => {
929 if find_start {
930 if lt < target_time {
931 low = line_start + bytes_read as u64;
932 } else {
933 high = line_start;
934 }
935 } else if lt <= target_time {
936 low = line_start + bytes_read as u64;
937 } else {
938 high = line_start;
939 }
940 }
941 None => {
942 low = line_start + bytes_read as u64;
943 }
944 }
945 }
946
947 find_line_start(file, low)
948}
949
950fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
951 if pos == 0 {
952 return Ok(0);
953 }
954
955 let mut search_pos = pos.saturating_sub(1);
957 const CHUNK_SIZE: usize = 8192;
958
959 loop {
960 let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
962 let len_u64 = search_pos - chunk_start + 1;
963 let len = len_u64 as usize;
964
965 file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
967 let mut buf = vec![0u8; len];
968 if file.read_exact(&mut buf).is_err() {
969 return Ok(0);
971 }
972
973 for (i, &b) in buf.iter().enumerate().rev() {
975 if b == b'\n' {
976 return Ok(chunk_start + i as u64 + 1);
977 }
978 }
979
980 if chunk_start == 0 {
983 return Ok(0);
984 }
985
986 search_pos = chunk_start - 1;
988 }
989}
990
991fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
992 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
993 re.captures(line)
994 .and_then(|caps| caps.get(1))
995 .and_then(|m| parse_datetime(m.as_str()).ok())
996}
997
998fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
999 let lines = if reverse {
1000 lines.into_iter().rev().collect()
1001 } else {
1002 lines
1003 };
1004
1005 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
1006 lines
1007 .into_iter()
1008 .fold(vec![], |mut acc, line| match re.captures(&line) {
1009 Some(caps) => {
1010 let (date, msg) = match (caps.get(1), caps.get(3)) {
1011 (Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
1012 _ => return acc,
1013 };
1014 acc.push((date, id.to_string(), msg));
1015 acc
1016 }
1017 None => {
1018 if let Some(l) = acc.last_mut() {
1019 l.2.push('\n');
1020 l.2.push_str(&line);
1021 }
1022 acc
1023 }
1024 })
1025}
1026
1027fn migrate_legacy_log_dirs() {
1037 let known_safe_paths = known_daemon_safe_paths();
1038 let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
1039 Ok(d) => d,
1040 Err(_) => return,
1041 };
1042 for dir in dirs {
1043 if dir.starts_with(".") || !dir.is_dir() {
1044 continue;
1045 }
1046 let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
1047 Some(n) => n,
1048 None => continue,
1049 };
1050 if name.contains("--") {
1053 if DaemonId::from_safe_path(&name).is_ok() {
1056 continue;
1057 }
1058 if known_safe_paths.contains(&name) {
1061 continue;
1062 }
1063 warn!(
1064 "Skipping invalid legacy log directory '{name}': contains '--' but is not a valid daemon safe-path"
1065 );
1066 continue;
1067 }
1068
1069 let old_log = dir.join(format!("{name}.log"));
1072 if !old_log.exists() {
1073 continue;
1074 }
1075 if DaemonId::try_new("legacy", &name).is_err() {
1076 warn!("Skipping invalid legacy log directory '{name}': not a valid daemon ID");
1077 continue;
1078 }
1079
1080 let new_name = format!("legacy--{name}");
1081 let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
1082 if new_dir.exists() {
1084 continue;
1085 }
1086 if std::fs::rename(&dir, &new_dir).is_err() {
1087 continue;
1088 }
1089 let old_log = new_dir.join(format!("{name}.log"));
1091 let new_log = new_dir.join(format!("{new_name}.log"));
1092 if old_log.exists() {
1093 let _ = std::fs::rename(&old_log, &new_log);
1094 }
1095 debug!("Migrated legacy log dir '{name}' → '{new_name}'");
1096 }
1097}
1098
1099fn known_daemon_safe_paths() -> BTreeSet<String> {
1100 let mut out = BTreeSet::new();
1101
1102 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1103 Ok(state) => {
1104 for id in state.daemons.keys() {
1105 out.insert(id.safe_path());
1106 }
1107 }
1108 Err(e) => {
1109 warn!("Failed to read state while checking known daemon IDs: {e}");
1110 }
1111 }
1112
1113 match PitchforkToml::all_merged() {
1114 Ok(config) => {
1115 for id in config.daemons.keys() {
1116 out.insert(id.safe_path());
1117 }
1118 }
1119 Err(e) => {
1120 warn!("Failed to read config while checking known daemon IDs: {e}");
1121 }
1122 }
1123
1124 out
1125}
1126
1127fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
1128 let mut ids = BTreeSet::new();
1129
1130 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1131 Ok(state) => ids.extend(state.daemons.keys().cloned()),
1132 Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
1133 }
1134
1135 match PitchforkToml::all_merged() {
1136 Ok(config) => ids.extend(config.daemons.keys().cloned()),
1137 Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
1138 }
1139
1140 Ok(ids
1141 .into_iter()
1142 .filter(|id| id.log_path().exists())
1143 .collect())
1144}
1145
1146pub async fn tail_logs(
1147 names: &[DaemonId],
1148 single_daemon: bool,
1149 start_from_end: bool,
1150) -> Result<()> {
1151 let id_width = names
1165 .iter()
1166 .map(|id| id.qualified().len())
1167 .max()
1168 .unwrap_or(0);
1169
1170 let mut states: Vec<(DaemonId, PathBuf, u64)> = names
1171 .iter()
1172 .filter_map(|id| {
1173 let path = id.log_path();
1174 if !path.exists() {
1175 return None;
1176 }
1177 let pos = if start_from_end {
1178 fs::metadata(&path).map(|m| m.len()).unwrap_or(0)
1179 } else {
1180 0
1181 };
1182 Some((id.clone(), path, pos))
1183 })
1184 .collect();
1185
1186 let strip_ansi = !console::colors_enabled();
1187
1188 let interval = tokio::time::interval(Duration::from_millis(200));
1189 tokio::pin!(interval);
1190
1191 loop {
1192 interval.tick().await;
1193
1194 for id in names {
1198 let path = id.log_path();
1199 if !path.exists() || states.iter().any(|(s, _, _)| s == id) {
1200 continue;
1201 }
1202 states.push((id.clone(), path, 0));
1203 }
1204
1205 let mut out = vec![];
1206 for (id, path, pos) in &mut states {
1207 let mut file = match fs::File::open(path) {
1208 Ok(f) => f,
1209 Err(_) => continue,
1210 };
1211 let file_size = match file.metadata() {
1212 Ok(m) => m.len(),
1213 Err(_) => continue,
1214 };
1215 let start = if *pos > file_size { 0 } else { *pos };
1216 file.seek(SeekFrom::Start(start)).into_diagnostic()?;
1217
1218 let mut reader = BufReader::new(&file);
1222 let mut bytes_read: u64 = 0;
1223 let mut lines = vec![];
1224 loop {
1225 let mut line = String::new();
1226 let n = reader.read_line(&mut line).into_diagnostic()?;
1227 if n == 0 {
1228 break;
1229 }
1230 if line.ends_with('\n') {
1234 bytes_read += n as u64;
1235 line.pop();
1236 if line.ends_with('\r') {
1237 line.pop();
1238 }
1239 lines.push(line);
1240 } else {
1241 break;
1243 }
1244 }
1245 *pos = start + bytes_read;
1246 out.extend(merge_log_lines(&id.qualified(), lines, false));
1247 }
1248
1249 if !out.is_empty() {
1250 let out = out
1251 .into_iter()
1252 .sorted_by_cached_key(|l| l.0.to_string())
1253 .collect_vec();
1254 for (date, name, msg) in out {
1255 println!(
1256 "{}",
1257 format_log_line(&date, &name, &msg, single_daemon, id_width, strip_ansi)
1258 );
1259 }
1260 }
1261 }
1262}
1263
1264fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
1265 let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
1266 Local
1267 .from_local_datetime(&naive_dt)
1268 .single()
1269 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
1270}
1271
1272fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
1278 let s = s.trim();
1279
1280 if let Ok(dt) = parse_datetime(s) {
1282 return Ok(dt);
1283 }
1284
1285 if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
1287 return Local
1288 .from_local_datetime(&naive_dt)
1289 .single()
1290 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
1291 }
1292
1293 if let Ok(time) = parse_time_only(s) {
1297 let now = Local::now();
1298 let today = now.date_naive();
1299 let mut naive_dt = NaiveDateTime::new(today, time);
1300 let mut dt = Local
1301 .from_local_datetime(&naive_dt)
1302 .single()
1303 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1304
1305 if is_since
1308 && dt > now
1309 && let Some(yesterday) = today.pred_opt()
1310 {
1311 naive_dt = NaiveDateTime::new(yesterday, time);
1312 dt = Local
1313 .from_local_datetime(&naive_dt)
1314 .single()
1315 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1316 }
1317 return Ok(dt);
1318 }
1319
1320 if let Ok(duration) = humantime::parse_duration(s) {
1321 let now = Local::now();
1322 let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
1323 return Ok(target);
1324 }
1325
1326 Err(miette::miette!(
1327 "Invalid time format: '{}'. Expected formats:\n\
1328 - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
1329 - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
1330 - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
1331 s
1332 ))
1333}
1334
1335fn parse_time_only(s: &str) -> Result<NaiveTime> {
1336 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
1337 return Ok(time);
1338 }
1339
1340 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
1341 return Ok(time);
1342 }
1343
1344 Err(miette::miette!("Invalid time format: '{}'", s))
1345}
1346
1347pub fn print_logs_for_time_range(
1348 daemon_id: &DaemonId,
1349 from: DateTime<Local>,
1350 to: Option<DateTime<Local>>,
1351) -> Result<()> {
1352 let from = from
1353 .with_nanosecond(0)
1354 .expect("0 is always valid for nanoseconds");
1355 let to = to.map(|t| {
1356 t.with_nanosecond(0)
1357 .expect("0 is always valid for nanoseconds")
1358 });
1359
1360 let path = daemon_id.log_path();
1361 let log_lines = if path.exists() {
1362 match read_lines_in_time_range(&path, Some(from), to) {
1363 Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1364 Err(e) => {
1365 error!("{}: {}", path.display(), e);
1366 vec![]
1367 }
1368 }
1369 } else {
1370 vec![]
1371 };
1372
1373 if log_lines.is_empty() {
1374 eprintln!("No logs found for daemon '{daemon_id}' in the specified time range");
1375 } else {
1376 eprintln!("\n{} {} {}", edim("==="), edim("Error logs"), edim("==="));
1377 for (date, _id, msg) in log_lines {
1378 eprintln!("{} {}", edim(&date), msg);
1379 }
1380 eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1381 }
1382
1383 Ok(())
1384}
1385
1386pub fn collect_startup_logs(
1391 daemon_id: &DaemonId,
1392 from: DateTime<Local>,
1393) -> Result<Vec<(String, String, String)>> {
1394 let from = from
1395 .with_nanosecond(0)
1396 .expect("0 is always valid for nanoseconds");
1397
1398 let path = daemon_id.log_path();
1399 let log_lines = if path.exists() {
1400 match read_lines_in_time_range(&path, Some(from), None) {
1401 Ok(lines) => merge_log_lines(&daemon_id.qualified(), lines, false),
1402 Err(e) => {
1403 error!("{}: {}", path.display(), e);
1404 vec![]
1405 }
1406 }
1407 } else {
1408 vec![]
1409 };
1410
1411 Ok(log_lines)
1412}
1413
1414pub fn print_startup_logs_block(log_lines: &[(String, String, String)]) {
1432 if log_lines.is_empty() {
1433 return;
1434 }
1435
1436 let log_lines = log_lines
1439 .iter()
1440 .sorted_by_cached_key(|(ts, _, _)| ts.clone())
1441 .collect_vec();
1442
1443 let unique_ids: BTreeSet<&str> = log_lines.iter().map(|(_, id, _)| id.as_str()).collect();
1448 let show_id = unique_ids.len() > 1;
1449
1450 let is_tty = std::io::stderr().is_terminal();
1453 let format_msg = |msg: &str| -> String {
1454 let stripped = strip_pty_controls(msg);
1455 if is_tty {
1456 stripped
1457 } else {
1458 console::strip_ansi_codes(&stripped).to_string()
1459 }
1460 };
1461
1462 let tag = estyle(" STARTUP LOGS ").black().on_color256(8); eprintln!("\n{tag}");
1465
1466 if show_id {
1467 let id_width = log_lines
1468 .iter()
1469 .map(|(_, id, _)| console::measure_text_width(id))
1470 .max()
1471 .unwrap_or(0);
1472 for (date, id, msg) in log_lines {
1473 let time = date.split(' ').nth(1).unwrap_or(date);
1474 let colored = dimmed_id(id, is_tty && console::colors_enabled_stderr());
1475 let padded = console::pad_str(&colored, id_width, console::Alignment::Left, None);
1476 eprintln!("{} {} {}", padded, edim(time), format_msg(msg));
1477 }
1478 } else {
1479 for (date, _, msg) in log_lines {
1480 let time = date.split(' ').nth(1).unwrap_or(date);
1481 eprintln!("{} {}", edim(time), format_msg(msg));
1482 }
1483 }
1484}
1485
1486fn strip_pty_controls(s: &str) -> String {
1491 struct Stripper {
1492 result: String,
1493 }
1494
1495 impl vte::Perform for Stripper {
1496 fn print(&mut self, c: char) {
1497 self.result.push(c);
1498 }
1499
1500 fn execute(&mut self, byte: u8) {
1501 if byte == b'\n' || byte == b'\t' {
1503 self.result.push(byte as char);
1504 }
1505 }
1506
1507 fn csi_dispatch(
1508 &mut self,
1509 params: &vte::Params,
1510 _intermediates: &[u8],
1511 _ignore: bool,
1512 action: char,
1513 ) {
1514 if action == 'm' {
1516 self.result.push_str("\x1b[");
1517 let mut first = true;
1518 for sub in params.iter() {
1519 if !first {
1520 self.result.push(';');
1521 }
1522 first = false;
1523 for (i, &p) in sub.iter().enumerate() {
1524 if i > 0 {
1525 self.result.push(':');
1526 }
1527 self.result.push_str(&p.to_string());
1528 }
1529 }
1530 self.result.push('m');
1531 }
1532 }
1534
1535 fn osc_dispatch(&mut self, _params: &[&[u8]], _bell_terminated: bool) {
1536 }
1538
1539 fn esc_dispatch(&mut self, _intermediates: &[u8], _ignore: bool, _byte: u8) {
1540 }
1542
1543 fn hook(
1544 &mut self,
1545 _params: &vte::Params,
1546 _intermediates: &[u8],
1547 _ignore: bool,
1548 _action: char,
1549 ) {
1550 }
1552
1553 fn put(&mut self, _byte: u8) {
1554 }
1556
1557 fn unhook(&mut self) {
1558 }
1560 }
1561
1562 let mut parser = vte::Parser::new();
1563 let mut stripper = Stripper {
1564 result: String::with_capacity(s.len()),
1565 };
1566 parser.advance(&mut stripper, s.as_bytes());
1567 stripper.result
1568}