1use crate::daemon_id::DaemonId;
2use crate::pitchfork_toml::PitchforkToml;
3use crate::state_file::StateFile;
4use crate::ui::style::{edim, estyle, ndim};
5use crate::{Result, env};
6use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
7use console;
8use itertools::Itertools;
9use miette::IntoDiagnostic;
10use std::cmp::{Ordering, Reverse};
11use std::collections::{BTreeSet, BinaryHeap};
12use std::fs::{self, File};
13use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
14use std::path::{Path, PathBuf};
15use std::process::{Child, Command, Stdio};
16use std::time::Duration;
17use xx::regex;
18
19struct PagerConfig {
21 command: String,
22 args: Vec<String>,
23}
24
25impl PagerConfig {
26 fn new(start_at_end: bool) -> Self {
29 let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
30 let args = Self::build_args(&command, start_at_end);
31 Self { command, args }
32 }
33
34 fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
35 let mut args = vec![];
36 if pager == "less" {
37 args.push("-R".to_string());
38 if start_at_end {
39 args.push("+G".to_string());
40 }
41 }
42 args
43 }
44
45 fn spawn_piped(&self) -> io::Result<Child> {
47 Command::new(&self.command)
48 .args(&self.args)
49 .stdin(Stdio::piped())
50 .spawn()
51 }
52}
53
54fn format_log_line(
60 date: &str,
61 id: &str,
62 msg: &str,
63 single_daemon: bool,
64 id_width: usize,
65 strip_ansi: bool,
66) -> String {
67 let msg = if strip_ansi {
68 console::strip_ansi_codes(msg).to_string()
69 } else {
70 msg.to_string()
71 };
72 if single_daemon {
73 format!("{} {}", ndim(date), msg)
74 } else {
75 let colors_on = !strip_ansi && console::colors_enabled();
76 let colored = dimmed_id(id, colors_on);
77 let padded = console::pad_str(&colored, id_width, console::Alignment::Left, None);
78 format!("{} {} {}", padded, ndim(date), msg)
79 }
80}
81
82fn dimmed_id(id: &str, colors_enabled: bool) -> String {
86 if !colors_enabled {
87 return id.to_string();
88 }
89 let colors = [
90 (180, 120, 120), (180, 160, 100), (120, 180, 120), (120, 180, 180), (180, 120, 180), (120, 160, 180), ];
97 let mut h: usize = 0x811C_9DC5; for b in id.bytes() {
99 h = h.wrapping_mul(0x0100_0193).wrapping_add(b as usize);
100 }
101 let (r, g, b) = colors[h % colors.len()];
102 format!("\x1b[2;38;2;{};{};{}m{}\x1b[0m", r, g, b, id)
103}
104
105pub fn colored_id_label(id: &str, colors_enabled: bool) -> String {
108 if !colors_enabled {
109 return format!("[{}]", id);
110 }
111 let colors: [u8; 4] = [34, 35, 36, 32]; let mut h: usize = 0x811C_9DC5; for b in id.bytes() {
116 h = h.wrapping_mul(0x0100_0193).wrapping_add(b as usize);
117 }
118 let color = colors[h % colors.len()];
119 format!("\x1b[{color}m[{id}]\x1b[0m")
120}
121
122#[derive(Debug)]
124struct LogEntry {
125 timestamp: String,
126 daemon: String,
127 message: String,
128 source_idx: usize, }
130
131impl PartialEq for LogEntry {
132 fn eq(&self, other: &Self) -> bool {
133 self.timestamp == other.timestamp
134 }
135}
136
137impl Eq for LogEntry {}
138
139impl PartialOrd for LogEntry {
140 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
141 Some(self.cmp(other))
142 }
143}
144
145impl Ord for LogEntry {
146 fn cmp(&self, other: &Self) -> Ordering {
147 self.timestamp.cmp(&other.timestamp)
148 }
149}
150
151struct StreamingMerger<I>
154where
155 I: Iterator<Item = (String, String)>,
156{
157 sources: Vec<(String, I)>, heap: BinaryHeap<Reverse<LogEntry>>, }
160
161impl<I> StreamingMerger<I>
162where
163 I: Iterator<Item = (String, String)>,
164{
165 fn new() -> Self {
166 Self {
167 sources: Vec::new(),
168 heap: BinaryHeap::new(),
169 }
170 }
171
172 fn add_source(&mut self, daemon_name: String, iter: I) {
173 self.sources.push((daemon_name, iter));
174 }
175
176 fn initialize(&mut self) {
177 for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
179 if let Some((timestamp, message)) = iter.next() {
180 self.heap.push(Reverse(LogEntry {
181 timestamp,
182 daemon: daemon.clone(),
183 message,
184 source_idx: idx,
185 }));
186 }
187 }
188 }
189}
190
191impl<I> Iterator for StreamingMerger<I>
192where
193 I: Iterator<Item = (String, String)>,
194{
195 type Item = (String, String, String); fn next(&mut self) -> Option<Self::Item> {
198 let Reverse(entry) = self.heap.pop()?;
200
201 let (daemon, iter) = &mut self.sources[entry.source_idx];
203 if let Some((timestamp, message)) = iter.next() {
204 self.heap.push(Reverse(LogEntry {
205 timestamp,
206 daemon: daemon.clone(),
207 message,
208 source_idx: entry.source_idx,
209 }));
210 }
211
212 Some((entry.timestamp, entry.daemon, entry.message))
213 }
214}
215
216struct StreamingLogParser {
218 reader: BufReader<File>,
219 current_entry: Option<(String, String)>,
220 finished: bool,
221}
222
223impl StreamingLogParser {
224 fn new(file: File) -> Self {
225 Self {
226 reader: BufReader::new(file),
227 current_entry: None,
228 finished: false,
229 }
230 }
231}
232
233impl Iterator for StreamingLogParser {
234 type Item = (String, String);
235
236 fn next(&mut self) -> Option<Self::Item> {
237 if self.finished {
238 return None;
239 }
240
241 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
242
243 loop {
244 let mut line = String::new();
245 match self.reader.read_line(&mut line) {
246 Ok(0) => {
247 self.finished = true;
249 return self.current_entry.take();
250 }
251 Ok(_) => {
252 if line.ends_with('\n') {
254 line.pop();
255 if line.ends_with('\r') {
256 line.pop();
257 }
258 }
259
260 if let Some(caps) = re.captures(&line) {
261 let date = match caps.get(1) {
262 Some(d) => d.as_str().to_string(),
263 None => continue,
264 };
265 let msg = match caps.get(3) {
266 Some(m) => m.as_str().to_string(),
267 None => continue,
268 };
269
270 let prev = self.current_entry.take();
272 self.current_entry = Some((date, msg));
273
274 if prev.is_some() {
275 return prev;
276 }
277 } else {
279 if let Some((_, ref mut msg)) = self.current_entry {
281 msg.push('\n');
282 msg.push_str(&line);
283 }
284 }
285 }
286 Err(_) => {
287 self.finished = true;
288 return self.current_entry.take();
289 }
290 }
291 }
292 }
293}
294
295#[derive(Debug, clap::Args)]
297#[clap(
298 visible_alias = "l",
299 verbatim_doc_comment,
300 long_about = "\
301Displays logs for daemon(s)
302
303Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
304and include timestamps for filtering.
305
306Examples:
307 pitchfork logs api Show all logs for 'api' (paged if needed)
308 pitchfork logs api worker Show logs for multiple daemons
309 pitchfork logs Show logs for all daemons
310 pitchfork logs api -n 50 Show last 50 lines
311 pitchfork logs api --follow Follow logs in real-time
312 pitchfork logs api --since '2024-01-15 10:00:00'
313 Show logs since a specific time (forward)
314 pitchfork logs api --since '10:30:00'
315 Show logs since 10:30:00 today
316 pitchfork logs api --since '10:30' --until '12:00'
317 Show logs since 10:30:00 until 12:00:00 today
318 pitchfork logs api --since 5min Show logs from last 5 minutes
319 pitchfork logs api --raw Output raw log lines without formatting
320 pitchfork logs api --raw -n 100 Output last 100 raw log lines
321 pitchfork logs api --clear Delete logs for 'api'
322 pitchfork logs --clear Delete logs for all daemons"
323)]
324pub struct Logs {
325 id: Vec<String>,
327
328 #[clap(short, long)]
330 clear: bool,
331
332 #[clap(short)]
337 n: Option<usize>,
338
339 #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
341 tail: bool,
342
343 #[clap(short = 's', long)]
350 since: Option<String>,
351
352 #[clap(short = 'u', long)]
358 until: Option<String>,
359
360 #[clap(long)]
362 no_pager: bool,
363
364 #[clap(long)]
366 raw: bool,
367}
368
369impl Logs {
370 pub async fn run(&self) -> Result<()> {
371 migrate_legacy_log_dirs();
374
375 let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
377 get_all_daemon_ids()?
379 } else {
380 PitchforkToml::resolve_ids(&self.id)?
381 };
382
383 if self.clear {
384 for id in &resolved_ids {
385 let path = id.log_path();
386 if path.exists() {
387 xx::file::create(&path)?;
388 }
389 }
390 return Ok(());
391 }
392
393 let from = if let Some(since) = self.since.as_ref() {
394 Some(parse_time_input(since, true)?)
395 } else {
396 None
397 };
398 let to = if let Some(until) = self.until.as_ref() {
399 Some(parse_time_input(until, false)?)
400 } else {
401 None
402 };
403
404 let single_daemon = resolved_ids.len() == 1;
405 self.print_existing_logs(&resolved_ids, from, to, single_daemon)?;
406 if self.tail {
407 tail_logs(&resolved_ids, single_daemon, true).await?;
408 }
409
410 Ok(())
411 }
412
413 fn print_existing_logs(
414 &self,
415 resolved_ids: &[DaemonId],
416 from: Option<DateTime<Local>>,
417 to: Option<DateTime<Local>>,
418 single_daemon: bool,
419 ) -> Result<()> {
420 let valid_ids: Vec<DaemonId> = resolved_ids
421 .iter()
422 .filter(|id| id.log_path().exists())
423 .cloned()
424 .collect();
425 let id_width = valid_ids
426 .iter()
427 .map(|id| id.qualified().len())
428 .max()
429 .unwrap_or(0);
430 trace!(
431 "log files for: {}",
432 valid_ids.iter().map(|id| id.qualified()).join(", ")
433 );
434 let has_time_filter = from.is_some() || to.is_some();
435
436 if has_time_filter {
437 let mut log_lines = self.collect_log_lines_forward(&valid_ids, from, to)?;
438
439 if let Some(n) = self.n {
440 let len = log_lines.len();
441 if len > n {
442 log_lines = log_lines.into_iter().skip(len - n).collect_vec();
443 }
444 }
445
446 self.output_logs(
447 log_lines,
448 single_daemon,
449 id_width,
450 has_time_filter,
451 self.raw,
452 )?;
453 } else if let Some(n) = self.n {
454 let log_lines = self.collect_log_lines_reverse(&valid_ids, Some(n))?;
455 self.output_logs(
456 log_lines,
457 single_daemon,
458 id_width,
459 has_time_filter,
460 self.raw,
461 )?;
462 } else {
463 self.stream_logs_to_pager(&valid_ids, single_daemon, id_width, self.raw)?;
464 }
465
466 Ok(())
467 }
468
469 fn collect_log_lines_forward(
470 &self,
471 ids: &[DaemonId],
472 from: Option<DateTime<Local>>,
473 to: Option<DateTime<Local>>,
474 ) -> Result<Vec<(String, String, String)>> {
475 let log_lines: Vec<(String, String, String)> = ids
476 .iter()
477 .flat_map(|id| {
478 let path = id.log_path();
479 match read_lines_in_time_range(&path, from, to) {
480 Ok((lines, _)) => merge_log_lines(&id.qualified(), lines, false),
481 Err(e) => {
482 error!("{}: {}", path.display(), e);
483 vec![]
484 }
485 }
486 })
487 .sorted_by_cached_key(|l| l.0.to_string())
488 .collect_vec();
489
490 Ok(log_lines)
491 }
492
493 fn collect_log_lines_reverse(
494 &self,
495 ids: &[DaemonId],
496 limit: Option<usize>,
497 ) -> Result<Vec<(String, String, String)>> {
498 let log_lines: Vec<(String, String, String)> = ids
499 .iter()
500 .flat_map(|id| {
501 let path = id.log_path();
502 let rev = match xx::file::open(&path) {
503 Ok(f) => rev_lines::RevLines::new(f),
504 Err(e) => {
505 error!("{}: {}", path.display(), e);
506 return vec![];
507 }
508 };
509 let lines = rev.into_iter().filter_map(Result::ok);
510 let lines = match limit {
511 Some(n) => lines.take(n).collect_vec(),
512 None => lines.collect_vec(),
513 };
514 merge_log_lines(&id.qualified(), lines, true)
515 })
516 .sorted_by_cached_key(|l| l.0.to_string())
517 .collect_vec();
518
519 let log_lines = match limit {
520 Some(n) => {
521 let len = log_lines.len();
522 if len > n {
523 log_lines.into_iter().skip(len - n).collect_vec()
524 } else {
525 log_lines
526 }
527 }
528 None => log_lines,
529 };
530
531 Ok(log_lines)
532 }
533
534 fn output_logs(
535 &self,
536 log_lines: Vec<(String, String, String)>,
537 single_daemon: bool,
538 id_width: usize,
539 has_time_filter: bool,
540 raw: bool,
541 ) -> Result<()> {
542 if log_lines.is_empty() {
543 return Ok(());
544 }
545
546 let strip_ansi = raw || !console::colors_enabled();
547
548 if raw {
550 for (date, id, msg) in log_lines {
551 let line = format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi);
552 println!("{line}");
553 }
554 return Ok(());
555 }
556
557 let use_pager = !self.no_pager && should_use_pager(log_lines.len());
558
559 if use_pager {
560 self.output_with_pager(
561 log_lines,
562 single_daemon,
563 id_width,
564 has_time_filter,
565 strip_ansi,
566 )?;
567 } else {
568 for (date, id, msg) in log_lines {
569 println!(
570 "{}",
571 format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
572 );
573 }
574 }
575
576 Ok(())
577 }
578
579 fn output_with_pager(
580 &self,
581 log_lines: Vec<(String, String, String)>,
582 single_daemon: bool,
583 id_width: usize,
584 has_time_filter: bool,
585 strip_ansi: bool,
586 ) -> Result<()> {
587 let pager_config = PagerConfig::new(!has_time_filter);
589
590 match pager_config.spawn_piped() {
591 Ok(mut child) => {
592 if let Some(stdin) = child.stdin.as_mut() {
593 for (date, id, msg) in log_lines {
594 let line = format!(
595 "{}\n",
596 format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
597 );
598 if stdin.write_all(line.as_bytes()).is_err() {
599 break;
600 }
601 }
602 let _ = child.wait();
603 } else {
604 debug!("Failed to get pager stdin, falling back to direct output");
605 for (date, id, msg) in log_lines {
606 println!(
607 "{}",
608 format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
609 );
610 }
611 }
612 }
613 Err(e) => {
614 debug!("Failed to spawn pager: {e}, falling back to direct output");
615 for (date, id, msg) in log_lines {
616 println!(
617 "{}",
618 format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
619 );
620 }
621 }
622 }
623
624 Ok(())
625 }
626
627 fn stream_logs_to_pager(
628 &self,
629 ids: &[DaemonId],
630 single_daemon: bool,
631 id_width: usize,
632 raw: bool,
633 ) -> Result<()> {
634 let strip_ansi = raw || !console::colors_enabled();
635
636 if !io::stdout().is_terminal() || self.no_pager || self.tail || raw {
637 return self.stream_logs_direct(ids, single_daemon, id_width, raw, strip_ansi);
638 }
639
640 let pager_config = PagerConfig::new(true); match pager_config.spawn_piped() {
643 Ok(mut child) => {
644 if let Some(stdin) = child.stdin.take() {
645 let file_infos: Vec<_> = ids
647 .iter()
648 .map(|id| (id.qualified(), id.log_path()))
649 .collect();
650 let single_daemon_clone = single_daemon;
651 let strip_ansi_clone = strip_ansi;
652 let id_width_clone = id_width;
653
654 std::thread::spawn(move || {
656 let mut writer = BufWriter::new(stdin);
657
658 if file_infos.len() == 1 {
660 let (name, path) = &file_infos[0];
661 let file = match File::open(path) {
662 Ok(f) => f,
663 Err(_) => return,
664 };
665 let parser = StreamingLogParser::new(file);
666 for (timestamp, message) in parser {
667 let output = format!(
668 "{}\n",
669 format_log_line(
670 ×tamp,
671 name,
672 &message,
673 single_daemon_clone,
674 id_width_clone,
675 strip_ansi_clone
676 )
677 );
678 if writer.write_all(output.as_bytes()).is_err() {
679 return;
680 }
681 }
682 let _ = writer.flush();
683 return;
684 }
685
686 let mut merger: StreamingMerger<StreamingLogParser> =
688 StreamingMerger::new();
689
690 for (name, path) in file_infos {
691 let file = match File::open(&path) {
692 Ok(f) => f,
693 Err(_) => continue,
694 };
695 let parser = StreamingLogParser::new(file);
696 merger.add_source(name, parser);
697 }
698
699 merger.initialize();
701
702 for (timestamp, daemon, message) in merger {
704 let output = format!(
705 "{}\n",
706 format_log_line(
707 ×tamp,
708 &daemon,
709 &message,
710 single_daemon_clone,
711 id_width_clone,
712 strip_ansi_clone
713 )
714 );
715 if writer.write_all(output.as_bytes()).is_err() {
716 return;
717 }
718 }
719
720 let _ = writer.flush();
721 });
722
723 let _ = child.wait();
724 } else {
725 debug!("Failed to get pager stdin, falling back to direct output");
726 return self.stream_logs_direct(ids, single_daemon, id_width, raw, strip_ansi);
727 }
728 }
729 Err(e) => {
730 debug!("Failed to spawn pager: {e}, falling back to direct output");
731 return self.stream_logs_direct(ids, single_daemon, id_width, raw, strip_ansi);
732 }
733 }
734
735 Ok(())
736 }
737
738 fn stream_logs_direct(
739 &self,
740 ids: &[DaemonId],
741 single_daemon: bool,
742 id_width: usize,
743 raw: bool,
744 strip_ansi: bool,
745 ) -> Result<()> {
746 if ids.len() == 1 {
749 let daemon_id = &ids[0];
750 let path = daemon_id.log_path();
751 let file = match File::open(&path) {
752 Ok(f) => f,
753 Err(e) => {
754 error!("{}: {}", path.display(), e);
755 return Ok(());
756 }
757 };
758 let reader = BufReader::new(file);
759 if raw {
760 for line in reader.lines() {
762 match line {
763 Ok(l) => {
764 let l = if strip_ansi {
765 console::strip_ansi_codes(&l).to_string()
766 } else {
767 l
768 };
769 if io::stdout().write_all(l.as_bytes()).is_err()
770 || io::stdout().write_all(b"\n").is_err()
771 {
772 return Ok(());
773 }
774 }
775 Err(_) => continue,
776 }
777 }
778 } else {
779 let parser = StreamingLogParser::new(File::open(&path).into_diagnostic()?);
781 for (timestamp, message) in parser {
782 let output = format!(
783 "{}\n",
784 format_log_line(
785 ×tamp,
786 &daemon_id.qualified(),
787 &message,
788 single_daemon,
789 id_width,
790 strip_ansi
791 )
792 );
793 if io::stdout().write_all(output.as_bytes()).is_err() {
794 return Ok(());
795 }
796 }
797 }
798 return Ok(());
799 }
800
801 let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
803
804 for id in ids {
805 let path = id.log_path();
806 let file = match File::open(&path) {
807 Ok(f) => f,
808 Err(e) => {
809 error!("{}: {}", path.display(), e);
810 continue;
811 }
812 };
813 let parser = StreamingLogParser::new(file);
814 merger.add_source(id.qualified(), parser);
815 }
816
817 merger.initialize();
819
820 for (timestamp, daemon, message) in merger {
822 let output = format!(
823 "{}\n",
824 format_log_line(
825 ×tamp,
826 &daemon,
827 &message,
828 single_daemon,
829 id_width,
830 strip_ansi
831 )
832 );
833 if io::stdout().write_all(output.as_bytes()).is_err() {
834 return Ok(());
835 }
836 }
837
838 Ok(())
839 }
840}
841
842fn should_use_pager(line_count: usize) -> bool {
843 if !io::stdout().is_terminal() {
844 return false;
845 }
846
847 let terminal_height = get_terminal_height().unwrap_or(24);
848 line_count > terminal_height
849}
850
851fn get_terminal_height() -> Option<usize> {
852 if let Ok(rows) = std::env::var("LINES")
853 && let Ok(h) = rows.parse::<usize>()
854 {
855 return Some(h);
856 }
857
858 crossterm::terminal::size().ok().map(|(_, h)| h as usize)
859}
860
861fn read_lines_in_time_range(
862 path: &Path,
863 from: Option<DateTime<Local>>,
864 to: Option<DateTime<Local>>,
865) -> Result<(Vec<String>, u64)> {
866 let mut file = File::open(path).into_diagnostic()?;
867 let file_size = file.metadata().into_diagnostic()?.len();
868
869 if file_size == 0 {
870 return Ok((vec![], 0));
871 }
872
873 let start_pos = if let Some(from_time) = from {
874 binary_search_log_position(&mut file, file_size, from_time, true)?
875 } else {
876 0
877 };
878
879 let end_pos = if let Some(to_time) = to {
880 binary_search_log_position(&mut file, file_size, to_time, false)?
881 } else {
882 file_size
883 };
884
885 if start_pos >= end_pos {
886 return Ok((vec![], end_pos));
887 }
888
889 file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
890 let mut reader = BufReader::new(&file);
891 let mut lines = Vec::new();
892 let mut current_pos = start_pos;
893
894 loop {
895 if current_pos >= end_pos {
896 break;
897 }
898
899 let mut line = String::new();
900 match reader.read_line(&mut line) {
901 Ok(0) => break,
902 Ok(bytes_read) => {
903 current_pos += bytes_read as u64;
904 if line.ends_with('\n') {
905 line.pop();
906 if line.ends_with('\r') {
907 line.pop();
908 }
909 }
910 lines.push(line);
911 }
912 Err(_) => break,
913 }
914 }
915
916 Ok((lines, end_pos))
917}
918
919fn binary_search_log_position(
920 file: &mut File,
921 file_size: u64,
922 target_time: DateTime<Local>,
923 find_start: bool,
924) -> Result<u64> {
925 let mut low: u64 = 0;
926 let mut high: u64 = file_size;
927
928 while low < high {
929 let mid = low + (high - low) / 2;
930
931 let line_start = find_line_start(file, mid)?;
932
933 file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
934 let mut reader = BufReader::new(&*file);
935 let mut line = String::new();
936 let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
937 if bytes_read == 0 {
938 high = mid;
939 continue;
940 }
941
942 let line_time = extract_timestamp(&line);
943
944 match line_time {
945 Some(lt) => {
946 if find_start {
947 if lt < target_time {
948 low = line_start + bytes_read as u64;
949 } else {
950 high = line_start;
951 }
952 } else if lt <= target_time {
953 low = line_start + bytes_read as u64;
954 } else {
955 high = line_start;
956 }
957 }
958 None => {
959 low = line_start + bytes_read as u64;
960 }
961 }
962 }
963
964 find_line_start(file, low)
965}
966
967fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
968 if pos == 0 {
969 return Ok(0);
970 }
971
972 let mut search_pos = pos.saturating_sub(1);
974 const CHUNK_SIZE: usize = 8192;
975
976 loop {
977 let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
979 let len_u64 = search_pos - chunk_start + 1;
980 let len = len_u64 as usize;
981
982 file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
984 let mut buf = vec![0u8; len];
985 if file.read_exact(&mut buf).is_err() {
986 return Ok(0);
988 }
989
990 for (i, &b) in buf.iter().enumerate().rev() {
992 if b == b'\n' {
993 return Ok(chunk_start + i as u64 + 1);
994 }
995 }
996
997 if chunk_start == 0 {
1000 return Ok(0);
1001 }
1002
1003 search_pos = chunk_start - 1;
1005 }
1006}
1007
1008fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
1009 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
1010 re.captures(line)
1011 .and_then(|caps| caps.get(1))
1012 .and_then(|m| parse_datetime(m.as_str()).ok())
1013}
1014
1015fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
1016 let lines = if reverse {
1017 lines.into_iter().rev().collect()
1018 } else {
1019 lines
1020 };
1021
1022 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
1023 lines
1024 .into_iter()
1025 .fold(vec![], |mut acc, line| match re.captures(&line) {
1026 Some(caps) => {
1027 let (date, msg) = match (caps.get(1), caps.get(3)) {
1028 (Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
1029 _ => return acc,
1030 };
1031 acc.push((date, id.to_string(), msg));
1032 acc
1033 }
1034 None => {
1035 if let Some(l) = acc.last_mut() {
1036 l.2.push('\n');
1037 l.2.push_str(&line);
1038 }
1039 acc
1040 }
1041 })
1042}
1043
1044fn migrate_legacy_log_dirs() {
1054 let known_safe_paths = known_daemon_safe_paths();
1055 let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
1056 Ok(d) => d,
1057 Err(_) => return,
1058 };
1059 for dir in dirs {
1060 if dir.starts_with(".") || !dir.is_dir() {
1061 continue;
1062 }
1063 let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
1064 Some(n) => n,
1065 None => continue,
1066 };
1067 if name.contains("--") {
1070 if DaemonId::from_safe_path(&name).is_ok() {
1073 continue;
1074 }
1075 if known_safe_paths.contains(&name) {
1078 continue;
1079 }
1080 warn!(
1081 "Skipping invalid legacy log directory '{name}': contains '--' but is not a valid daemon safe-path"
1082 );
1083 continue;
1084 }
1085
1086 let old_log = dir.join(format!("{name}.log"));
1089 if !old_log.exists() {
1090 continue;
1091 }
1092 if DaemonId::try_new("legacy", &name).is_err() {
1093 warn!("Skipping invalid legacy log directory '{name}': not a valid daemon ID");
1094 continue;
1095 }
1096
1097 let new_name = format!("legacy--{name}");
1098 let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
1099 if new_dir.exists() {
1101 continue;
1102 }
1103 if std::fs::rename(&dir, &new_dir).is_err() {
1104 continue;
1105 }
1106 let old_log = new_dir.join(format!("{name}.log"));
1108 let new_log = new_dir.join(format!("{new_name}.log"));
1109 if old_log.exists() {
1110 let _ = std::fs::rename(&old_log, &new_log);
1111 }
1112 debug!("Migrated legacy log dir '{name}' → '{new_name}'");
1113 }
1114}
1115
1116fn known_daemon_safe_paths() -> BTreeSet<String> {
1117 let mut out = BTreeSet::new();
1118
1119 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1120 Ok(state) => {
1121 for id in state.daemons.keys() {
1122 out.insert(id.safe_path());
1123 }
1124 }
1125 Err(e) => {
1126 warn!("Failed to read state while checking known daemon IDs: {e}");
1127 }
1128 }
1129
1130 match PitchforkToml::all_merged() {
1131 Ok(config) => {
1132 for id in config.daemons.keys() {
1133 out.insert(id.safe_path());
1134 }
1135 }
1136 Err(e) => {
1137 warn!("Failed to read config while checking known daemon IDs: {e}");
1138 }
1139 }
1140
1141 out
1142}
1143
1144fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
1145 let mut ids = BTreeSet::new();
1146
1147 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
1148 Ok(state) => ids.extend(state.daemons.keys().cloned()),
1149 Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
1150 }
1151
1152 match PitchforkToml::all_merged() {
1153 Ok(config) => ids.extend(config.daemons.keys().cloned()),
1154 Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
1155 }
1156
1157 Ok(ids
1158 .into_iter()
1159 .filter(|id| id.log_path().exists())
1160 .collect())
1161}
1162
1163pub async fn tail_logs(
1164 names: &[DaemonId],
1165 single_daemon: bool,
1166 start_from_end: bool,
1167) -> Result<()> {
1168 let id_width = names
1182 .iter()
1183 .map(|id| id.qualified().len())
1184 .max()
1185 .unwrap_or(0);
1186
1187 let mut states: Vec<(DaemonId, PathBuf, u64)> = names
1188 .iter()
1189 .filter_map(|id| {
1190 let path = id.log_path();
1191 if !path.exists() {
1192 return None;
1193 }
1194 let pos = if start_from_end {
1195 fs::metadata(&path).map(|m| m.len()).unwrap_or(0)
1196 } else {
1197 0
1198 };
1199 Some((id.clone(), path, pos))
1200 })
1201 .collect();
1202
1203 let strip_ansi = !console::colors_enabled();
1204
1205 let interval = tokio::time::interval(Duration::from_millis(200));
1206 tokio::pin!(interval);
1207
1208 loop {
1209 interval.tick().await;
1210
1211 for id in names {
1215 let path = id.log_path();
1216 if !path.exists() || states.iter().any(|(s, _, _)| s == id) {
1217 continue;
1218 }
1219 states.push((id.clone(), path, 0));
1220 }
1221
1222 let mut out = vec![];
1223 for (id, path, pos) in &mut states {
1224 let mut file = match fs::File::open(path) {
1225 Ok(f) => f,
1226 Err(_) => continue,
1227 };
1228 let file_size = match file.metadata() {
1229 Ok(m) => m.len(),
1230 Err(_) => continue,
1231 };
1232 let start = if *pos > file_size { 0 } else { *pos };
1233 file.seek(SeekFrom::Start(start)).into_diagnostic()?;
1234
1235 let mut reader = BufReader::new(&file);
1239 let mut bytes_read: u64 = 0;
1240 let mut lines = vec![];
1241 loop {
1242 let mut line = String::new();
1243 let n = reader.read_line(&mut line).into_diagnostic()?;
1244 if n == 0 {
1245 break;
1246 }
1247 if line.ends_with('\n') {
1251 bytes_read += n as u64;
1252 line.pop();
1253 if line.ends_with('\r') {
1254 line.pop();
1255 }
1256 lines.push(line);
1257 } else {
1258 break;
1260 }
1261 }
1262 *pos = start + bytes_read;
1263 out.extend(merge_log_lines(&id.qualified(), lines, false));
1264 }
1265
1266 if !out.is_empty() {
1267 let out = out
1268 .into_iter()
1269 .sorted_by_cached_key(|l| l.0.to_string())
1270 .collect_vec();
1271 for (date, name, msg) in out {
1272 println!(
1273 "{}",
1274 format_log_line(&date, &name, &msg, single_daemon, id_width, strip_ansi)
1275 );
1276 }
1277 }
1278 }
1279}
1280
1281fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
1282 let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
1283 Local
1284 .from_local_datetime(&naive_dt)
1285 .single()
1286 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
1287}
1288
1289fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
1295 let s = s.trim();
1296
1297 if let Ok(dt) = parse_datetime(s) {
1299 return Ok(dt);
1300 }
1301
1302 if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
1304 return Local
1305 .from_local_datetime(&naive_dt)
1306 .single()
1307 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
1308 }
1309
1310 if let Ok(time) = parse_time_only(s) {
1314 let now = Local::now();
1315 let today = now.date_naive();
1316 let mut naive_dt = NaiveDateTime::new(today, time);
1317 let mut dt = Local
1318 .from_local_datetime(&naive_dt)
1319 .single()
1320 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1321
1322 if is_since
1325 && dt > now
1326 && let Some(yesterday) = today.pred_opt()
1327 {
1328 naive_dt = NaiveDateTime::new(yesterday, time);
1329 dt = Local
1330 .from_local_datetime(&naive_dt)
1331 .single()
1332 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1333 }
1334 return Ok(dt);
1335 }
1336
1337 if let Ok(duration) = humantime::parse_duration(s) {
1338 let now = Local::now();
1339 let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
1340 return Ok(target);
1341 }
1342
1343 Err(miette::miette!(
1344 "Invalid time format: '{}'. Expected formats:\n\
1345 - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
1346 - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
1347 - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
1348 s
1349 ))
1350}
1351
1352fn parse_time_only(s: &str) -> Result<NaiveTime> {
1353 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
1354 return Ok(time);
1355 }
1356
1357 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
1358 return Ok(time);
1359 }
1360
1361 Err(miette::miette!("Invalid time format: '{}'", s))
1362}
1363
1364pub fn print_error_logs_block(log_lines: &[(String, String, String)]) {
1374 if log_lines.is_empty() {
1375 return;
1376 }
1377
1378 let is_tty = std::io::stderr().is_terminal();
1379 let format_msg = |msg: &str| -> String {
1380 let stripped = strip_pty_controls(msg);
1381 if is_tty {
1382 stripped
1383 } else {
1384 console::strip_ansi_codes(&stripped).to_string()
1385 }
1386 };
1387
1388 let tag = estyle(" ERROR LOGS ").white().on_red();
1389 eprintln!("\n{tag}");
1390
1391 let unique_ids: BTreeSet<&str> = log_lines.iter().map(|(_, id, _)| id.as_str()).collect();
1393 let show_id = unique_ids.len() > 1;
1394
1395 if show_id {
1396 let id_width = log_lines
1397 .iter()
1398 .map(|(_, id, _)| console::measure_text_width(id))
1399 .max()
1400 .unwrap_or(0);
1401 for (date, id, msg) in log_lines {
1402 let time = date.split(' ').nth(1).unwrap_or(date);
1403 let colored = dimmed_id(id, is_tty && console::colors_enabled_stderr());
1404 let padded = console::pad_str(&colored, id_width, console::Alignment::Left, None);
1405 eprintln!(
1406 "{} {} {}",
1407 padded,
1408 estyle(time).red().dim(),
1409 format_msg(msg)
1410 );
1411 }
1412 } else {
1413 for (date, _, msg) in log_lines {
1414 let time = date.split(' ').nth(1).unwrap_or(date);
1415 eprintln!("{} {}", estyle(time).red().dim(), format_msg(msg));
1416 }
1417 }
1418}
1419
1420pub enum ReadyCheckType {
1422 Output(String),
1423 Http(String),
1424 Port(u16),
1425 Cmd(String),
1426 Delay(u64),
1427 Default,
1428}
1429
1430impl std::fmt::Display for ReadyCheckType {
1431 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1432 match self {
1433 ReadyCheckType::Output(pattern) => write!(f, "output matching '{pattern}'"),
1434 ReadyCheckType::Http(url) => write!(f, "HTTP {url}"),
1435 ReadyCheckType::Port(port) => write!(f, "TCP port {port}"),
1436 ReadyCheckType::Cmd(cmd) => write!(f, "command '{cmd}'"),
1437 ReadyCheckType::Delay(secs) => write!(f, "delay ({secs}s)"),
1438 ReadyCheckType::Default => write!(f, "default readiness check"),
1439 }
1440 }
1441}
1442
1443pub fn create_ready_check_job(
1449 daemon_id: &DaemonId,
1450 check_type: &ReadyCheckType,
1451) -> std::sync::Arc<clx::progress::ProgressJob> {
1452 use clx::progress::{ProgressJobBuilder, ProgressJobDoneBehavior, ProgressStatus};
1453
1454 let is_tty = std::io::stderr().is_terminal();
1455 let colors_enabled = is_tty && console::colors_enabled_stderr();
1456 let id_label = colored_id_label(&daemon_id.qualified(), colors_enabled);
1457 let show_ts = crate::settings::settings().general.startup_log_timestamps;
1458
1459 let prefix = if show_ts {
1463 edim(chrono::Local::now().format("%H:%M:%S").to_string()).to_string()
1466 } else {
1467 "{{spinner()}}".to_string()
1468 };
1469
1470 ProgressJobBuilder::new()
1471 .body(format!(
1472 "{} {} waiting for {{{{ check_type }}}}...",
1473 prefix, id_label
1474 ))
1475 .prop("check_type", &check_type.to_string())
1476 .status(ProgressStatus::Running)
1477 .on_done(ProgressJobDoneBehavior::Keep)
1478 .start()
1479}
1480
1481pub fn collect_startup_logs(
1486 daemon_id: &DaemonId,
1487 from: DateTime<Local>,
1488) -> Result<Vec<(String, String, String)>> {
1489 let from = from
1490 .with_nanosecond(0)
1491 .expect("0 is always valid for nanoseconds");
1492
1493 let path = daemon_id.log_path();
1494 let log_lines = if path.exists() {
1495 match read_lines_in_time_range(&path, Some(from), None) {
1496 Ok((lines, _)) => merge_log_lines(&daemon_id.qualified(), lines, false),
1497 Err(e) => {
1498 error!("{}: {}", path.display(), e);
1499 vec![]
1500 }
1501 }
1502 } else {
1503 vec![]
1504 };
1505
1506 Ok(log_lines)
1507}
1508
1509fn read_from_offset(path: &Path, offset: u64) -> Result<(Vec<String>, u64)> {
1514 let mut file = File::open(path).into_diagnostic()?;
1515 file.seek(SeekFrom::Start(offset)).into_diagnostic()?;
1516 let mut buf = String::new();
1517 file.read_to_string(&mut buf).into_diagnostic()?;
1518
1519 if let Some(last_newline) = buf.rfind('\n') {
1522 let complete = &buf[..=last_newline];
1523 let new_offset = offset + (last_newline + 1) as u64;
1524 let lines = complete.lines().map(String::from).collect();
1525 Ok((lines, new_offset))
1526 } else {
1527 Ok((vec![], offset))
1529 }
1530}
1531
1532pub fn stream_startup_logs(
1538 daemon_id: &DaemonId,
1539 from: DateTime<Local>,
1540 job: std::sync::Arc<clx::progress::ProgressJob>,
1541) -> (
1542 tokio::sync::watch::Sender<bool>,
1543 tokio::task::JoinHandle<()>,
1544) {
1545 let (tx, mut rx) = tokio::sync::watch::channel(false);
1546 let id = daemon_id.clone();
1547 let from = from
1548 .with_nanosecond(0)
1549 .expect("0 is always valid for nanoseconds");
1550
1551 let show_ts = crate::settings::settings().general.startup_log_timestamps;
1552
1553 let handle = tokio::spawn(async move {
1554 let path = id.log_path();
1555 let is_tty = std::io::stderr().is_terminal();
1556 let colors_enabled = is_tty && console::colors_enabled_stderr();
1557 let id_label = colored_id_label(&id.qualified(), colors_enabled);
1558 let prefix = if show_ts {
1559 String::new()
1561 } else {
1562 edim("•").to_string()
1563 };
1564
1565 for _ in 0..20 {
1567 if path.exists() {
1568 break;
1569 }
1570 tokio::select! {
1571 _ = tokio::time::sleep(Duration::from_millis(100)) => {}
1572 _ = rx.changed() => return,
1573 }
1574 }
1575 if !path.exists() {
1576 return;
1577 }
1578
1579 let mut offset = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
1583 if let Ok((lines, read_end)) = read_lines_in_time_range(&path, Some(from), None) {
1584 let merged = merge_log_lines(&id.qualified(), lines, false);
1585 for (date, _, msg) in &merged {
1586 let time = date.split(' ').nth(1).unwrap_or(date);
1587 let msg = strip_pty_controls(msg);
1588 let msg = if is_tty {
1589 msg
1590 } else {
1591 console::strip_ansi_codes(&msg).to_string()
1592 };
1593 let line_prefix = if show_ts {
1594 edim(time).to_string()
1595 } else {
1596 prefix.clone()
1597 };
1598 job.println(&format!("{} {} {}", line_prefix, id_label, msg));
1599 }
1600 offset = read_end;
1601 }
1602 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) ([\w./-]+) (.*)$");
1603
1604 loop {
1606 tokio::select! {
1607 _ = tokio::time::sleep(Duration::from_millis(200)) => {
1608 if let Ok(metadata) = std::fs::metadata(&path) {
1609 if metadata.len() > offset {
1610 if let Ok((new_lines, new_offset)) = read_from_offset(&path, offset) {
1611 for line in &new_lines {
1612 if let Some(caps) = re.captures(line) {
1613 let time = caps.get(1).map(|m| m.as_str()).unwrap_or("");
1614 let time = time.split(' ').nth(1).unwrap_or(time);
1615 let msg = caps.get(3).map(|m| m.as_str()).unwrap_or("");
1616 let msg = strip_pty_controls(msg);
1617 let msg = if is_tty {
1618 msg
1619 } else {
1620 console::strip_ansi_codes(&msg).to_string()
1621 };
1622 let line_prefix = if show_ts { edim(time).to_string() } else { prefix.clone() };
1623 job.println(&format!("{} {} {}", line_prefix, id_label, msg));
1624 }
1625 }
1626 offset = new_offset;
1627 }
1628 }
1629 }
1630 }
1631 _ = rx.changed() => {
1632 break;
1633 }
1634 }
1635 }
1636
1637 if let Ok((new_lines, _new_offset)) = read_from_offset(&path, offset) {
1641 for line in &new_lines {
1642 if let Some(caps) = re.captures(line) {
1643 let time = caps.get(1).map(|m| m.as_str()).unwrap_or("");
1644 let time = time.split(' ').nth(1).unwrap_or(time);
1645 let msg = caps.get(3).map(|m| m.as_str()).unwrap_or("");
1646 let msg = strip_pty_controls(msg);
1647 let msg = if is_tty {
1648 msg
1649 } else {
1650 console::strip_ansi_codes(&msg).to_string()
1651 };
1652 let line_prefix = if show_ts {
1653 edim(time).to_string()
1654 } else {
1655 prefix.clone()
1656 };
1657 job.println(&format!("{} {} {}", line_prefix, id_label, msg));
1658 }
1659 }
1660 }
1661 });
1662
1663 (tx, handle)
1664}
1665
1666fn strip_pty_controls(s: &str) -> String {
1671 struct Stripper {
1672 result: String,
1673 }
1674
1675 impl vte::Perform for Stripper {
1676 fn print(&mut self, c: char) {
1677 self.result.push(c);
1678 }
1679
1680 fn execute(&mut self, byte: u8) {
1681 if byte == b'\n' || byte == b'\t' {
1683 self.result.push(byte as char);
1684 }
1685 }
1686
1687 fn csi_dispatch(
1688 &mut self,
1689 params: &vte::Params,
1690 _intermediates: &[u8],
1691 _ignore: bool,
1692 action: char,
1693 ) {
1694 if action == 'm' {
1696 self.result.push_str("\x1b[");
1697 let mut first = true;
1698 for sub in params.iter() {
1699 if !first {
1700 self.result.push(';');
1701 }
1702 first = false;
1703 for (i, &p) in sub.iter().enumerate() {
1704 if i > 0 {
1705 self.result.push(':');
1706 }
1707 self.result.push_str(&p.to_string());
1708 }
1709 }
1710 self.result.push('m');
1711 }
1712 }
1714
1715 fn osc_dispatch(&mut self, _params: &[&[u8]], _bell_terminated: bool) {
1716 }
1718
1719 fn esc_dispatch(&mut self, _intermediates: &[u8], _ignore: bool, _byte: u8) {
1720 }
1722
1723 fn hook(
1724 &mut self,
1725 _params: &vte::Params,
1726 _intermediates: &[u8],
1727 _ignore: bool,
1728 _action: char,
1729 ) {
1730 }
1732
1733 fn put(&mut self, _byte: u8) {
1734 }
1736
1737 fn unhook(&mut self) {
1738 }
1740 }
1741
1742 let mut parser = vte::Parser::new();
1743 let mut stripper = Stripper {
1744 result: String::with_capacity(s.len()),
1745 };
1746 parser.advance(&mut stripper, s.as_bytes());
1747 stripper.result
1748}