1use crate::ui::style::edim;
2use crate::watch_files::WatchFiles;
3use crate::{Result, env};
4use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone, Timelike};
5use itertools::Itertools;
6use miette::IntoDiagnostic;
7use notify::RecursiveMode;
8use std::cmp::{Ordering, Reverse};
9use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet};
10use std::fs::{self, File};
11use std::io::{self, BufRead, BufReader, BufWriter, IsTerminal, Read, Seek, SeekFrom, Write};
12use std::path::{Path, PathBuf};
13use std::process::{Child, Command, Stdio};
14use std::time::Duration;
15use xx::regex;
16
17struct PagerConfig {
19 command: String,
20 args: Vec<String>,
21}
22
23impl PagerConfig {
24 fn new(start_at_end: bool) -> Self {
27 let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
28 let args = Self::build_args(&command, start_at_end);
29 Self { command, args }
30 }
31
32 fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
33 let mut args = vec![];
34 if pager == "less" {
35 args.push("-R".to_string());
36 if start_at_end {
37 args.push("+G".to_string());
38 }
39 }
40 args
41 }
42
43 fn spawn_piped(&self) -> io::Result<Child> {
45 Command::new(&self.command)
46 .args(&self.args)
47 .stdin(Stdio::piped())
48 .spawn()
49 }
50}
51
52fn format_log_line(date: &str, id: &str, msg: &str, single_daemon: bool) -> String {
55 if single_daemon {
56 format!("{} {}", edim(date), msg)
57 } else {
58 format!("{} {} {}", edim(date), id, msg)
59 }
60}
61
62#[derive(Debug)]
64struct LogEntry {
65 timestamp: String,
66 daemon: String,
67 message: String,
68 source_idx: usize, }
70
71impl PartialEq for LogEntry {
72 fn eq(&self, other: &Self) -> bool {
73 self.timestamp == other.timestamp
74 }
75}
76
77impl Eq for LogEntry {}
78
79impl PartialOrd for LogEntry {
80 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
81 Some(self.cmp(other))
82 }
83}
84
85impl Ord for LogEntry {
86 fn cmp(&self, other: &Self) -> Ordering {
87 self.timestamp.cmp(&other.timestamp)
88 }
89}
90
91struct StreamingMerger<I>
94where
95 I: Iterator<Item = (String, String)>,
96{
97 sources: Vec<(String, I)>, heap: BinaryHeap<Reverse<LogEntry>>, }
100
101impl<I> StreamingMerger<I>
102where
103 I: Iterator<Item = (String, String)>,
104{
105 fn new() -> Self {
106 Self {
107 sources: Vec::new(),
108 heap: BinaryHeap::new(),
109 }
110 }
111
112 fn add_source(&mut self, daemon_name: String, iter: I) {
113 self.sources.push((daemon_name, iter));
114 }
115
116 fn initialize(&mut self) {
117 for (idx, (daemon, iter)) in self.sources.iter_mut().enumerate() {
119 if let Some((timestamp, message)) = iter.next() {
120 self.heap.push(Reverse(LogEntry {
121 timestamp,
122 daemon: daemon.clone(),
123 message,
124 source_idx: idx,
125 }));
126 }
127 }
128 }
129}
130
131impl<I> Iterator for StreamingMerger<I>
132where
133 I: Iterator<Item = (String, String)>,
134{
135 type Item = (String, String, String); fn next(&mut self) -> Option<Self::Item> {
138 let Reverse(entry) = self.heap.pop()?;
140
141 let (daemon, iter) = &mut self.sources[entry.source_idx];
143 if let Some((timestamp, message)) = iter.next() {
144 self.heap.push(Reverse(LogEntry {
145 timestamp,
146 daemon: daemon.clone(),
147 message,
148 source_idx: entry.source_idx,
149 }));
150 }
151
152 Some((entry.timestamp, entry.daemon, entry.message))
153 }
154}
155
156struct StreamingLogParser {
158 reader: BufReader<File>,
159 current_entry: Option<(String, String)>,
160 finished: bool,
161}
162
163impl StreamingLogParser {
164 fn new(file: File) -> Self {
165 Self {
166 reader: BufReader::new(file),
167 current_entry: None,
168 finished: false,
169 }
170 }
171}
172
173impl Iterator for StreamingLogParser {
174 type Item = (String, String);
175
176 fn next(&mut self) -> Option<Self::Item> {
177 if self.finished {
178 return None;
179 }
180
181 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (\w+) (.*)$");
182
183 loop {
184 let mut line = String::new();
185 match self.reader.read_line(&mut line) {
186 Ok(0) => {
187 self.finished = true;
189 return self.current_entry.take();
190 }
191 Ok(_) => {
192 if line.ends_with('\n') {
194 line.pop();
195 if line.ends_with('\r') {
196 line.pop();
197 }
198 }
199
200 if let Some(caps) = re.captures(&line) {
201 let date = match caps.get(1) {
202 Some(d) => d.as_str().to_string(),
203 None => continue,
204 };
205 let msg = match caps.get(3) {
206 Some(m) => m.as_str().to_string(),
207 None => continue,
208 };
209
210 let prev = self.current_entry.take();
212 self.current_entry = Some((date, msg));
213
214 if prev.is_some() {
215 return prev;
216 }
217 } else {
219 if let Some((_, ref mut msg)) = self.current_entry {
221 msg.push('\n');
222 msg.push_str(&line);
223 }
224 }
225 }
226 Err(_) => {
227 self.finished = true;
228 return self.current_entry.take();
229 }
230 }
231 }
232 }
233}
234
235#[derive(Debug, clap::Args)]
237#[clap(
238 visible_alias = "l",
239 verbatim_doc_comment,
240 long_about = "\
241Displays logs for daemon(s)
242
243Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
244and include timestamps for filtering.
245
246Examples:
247 pitchfork logs api Show all logs for 'api' (paged if needed)
248 pitchfork logs api worker Show logs for multiple daemons
249 pitchfork logs Show logs for all daemons
250 pitchfork logs api -n 50 Show last 50 lines
251 pitchfork logs api --follow Follow logs in real-time
252 pitchfork logs api --since '2024-01-15 10:00:00'
253 Show logs since a specific time (forward)
254 pitchfork logs api --since '10:30:00'
255 Show logs since 10:30:00 today
256 pitchfork logs api --since '10:30' --until '12:00'
257 Show logs since 10:30:00 until 12:00:00 today
258 pitchfork logs api --since 5min Show logs from last 5 minutes
259 pitchfork logs api --raw Output raw log lines without formatting
260 pitchfork logs api --raw -n 100 Output last 100 raw log lines
261 pitchfork logs api --clear Delete logs for 'api'
262 pitchfork logs --clear Delete logs for all daemons"
263)]
264pub struct Logs {
265 id: Vec<String>,
267
268 #[clap(short, long)]
270 clear: bool,
271
272 #[clap(short)]
277 n: Option<usize>,
278
279 #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
281 tail: bool,
282
283 #[clap(short = 's', long)]
290 since: Option<String>,
291
292 #[clap(short = 'u', long)]
298 until: Option<String>,
299
300 #[clap(long)]
302 no_pager: bool,
303
304 #[clap(long)]
306 raw: bool,
307}
308
309impl Logs {
310 pub async fn run(&self) -> Result<()> {
311 if self.clear {
312 let ids = if self.id.is_empty() {
313 get_all_daemon_ids()?
315 } else {
316 self.id.clone()
317 };
318 for id in &ids {
319 let log_dir = env::PITCHFORK_LOGS_DIR.join(id);
320 let path = log_dir.join(format!("{id}.log"));
321 if path.exists() {
322 xx::file::create(&path)?;
323 }
324 }
325 return Ok(());
326 }
327
328 let from = if let Some(since) = self.since.as_ref() {
329 Some(parse_time_input(since, true)?)
330 } else {
331 None
332 };
333 let to = if let Some(until) = self.until.as_ref() {
334 Some(parse_time_input(until, false)?)
335 } else {
336 None
337 };
338
339 self.print_existing_logs(from, to)?;
340 if self.tail {
341 tail_logs(&self.id).await?;
342 }
343
344 Ok(())
345 }
346
347 fn print_existing_logs(
348 &self,
349 from: Option<DateTime<Local>>,
350 to: Option<DateTime<Local>>,
351 ) -> Result<()> {
352 let log_files = get_log_file_infos(&self.id)?;
353 trace!("log files for: {}", log_files.keys().join(", "));
354 let single_daemon = self.id.len() == 1;
355 let has_time_filter = from.is_some() || to.is_some();
356
357 if has_time_filter {
358 let mut log_lines = self.collect_log_lines_forward(&log_files, from, to)?;
359
360 if let Some(n) = self.n {
361 let len = log_lines.len();
362 if len > n {
363 log_lines = log_lines.into_iter().skip(len - n).collect_vec();
364 }
365 }
366
367 self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
368 } else if let Some(n) = self.n {
369 let log_lines = self.collect_log_lines_reverse(&log_files, Some(n))?;
370 self.output_logs(log_lines, single_daemon, has_time_filter, self.raw)?;
371 } else {
372 self.stream_logs_to_pager(&log_files, single_daemon, self.raw)?;
373 }
374
375 Ok(())
376 }
377
378 fn collect_log_lines_forward(
379 &self,
380 log_files: &BTreeMap<String, LogFile>,
381 from: Option<DateTime<Local>>,
382 to: Option<DateTime<Local>>,
383 ) -> Result<Vec<(String, String, String)>> {
384 let log_lines: Vec<(String, String, String)> = log_files
385 .iter()
386 .flat_map(
387 |(name, lf)| match read_lines_in_time_range(&lf.path, from, to) {
388 Ok(lines) => merge_log_lines(name, lines, false),
389 Err(e) => {
390 error!("{}: {}", lf.path.display(), e);
391 vec![]
392 }
393 },
394 )
395 .sorted_by_cached_key(|l| l.0.to_string())
396 .collect_vec();
397
398 Ok(log_lines)
399 }
400
401 fn collect_log_lines_reverse(
402 &self,
403 log_files: &BTreeMap<String, LogFile>,
404 limit: Option<usize>,
405 ) -> Result<Vec<(String, String, String)>> {
406 let log_lines: Vec<(String, String, String)> = log_files
407 .iter()
408 .flat_map(|(name, lf)| {
409 let rev = match xx::file::open(&lf.path) {
410 Ok(f) => rev_lines::RevLines::new(f),
411 Err(e) => {
412 error!("{}: {}", lf.path.display(), e);
413 return vec![];
414 }
415 };
416 let lines = rev.into_iter().filter_map(Result::ok);
417 let lines = match limit {
418 Some(n) => lines.take(n).collect_vec(),
419 None => lines.collect_vec(),
420 };
421 merge_log_lines(name, lines, true)
422 })
423 .sorted_by_cached_key(|l| l.0.to_string())
424 .collect_vec();
425
426 let log_lines = match limit {
427 Some(n) => {
428 let len = log_lines.len();
429 if len > n {
430 log_lines.into_iter().skip(len - n).collect_vec()
431 } else {
432 log_lines
433 }
434 }
435 None => log_lines,
436 };
437
438 Ok(log_lines)
439 }
440
441 fn output_logs(
442 &self,
443 log_lines: Vec<(String, String, String)>,
444 single_daemon: bool,
445 has_time_filter: bool,
446 raw: bool,
447 ) -> Result<()> {
448 if log_lines.is_empty() {
449 return Ok(());
450 }
451
452 if raw {
454 for (date, _, msg) in log_lines {
455 println!("{} {}", date, msg);
456 }
457 return Ok(());
458 }
459
460 let use_pager = !self.no_pager && should_use_pager(log_lines.len());
461
462 if use_pager {
463 self.output_with_pager(log_lines, single_daemon, has_time_filter)?;
464 } else {
465 for (date, id, msg) in log_lines {
466 println!("{}", format_log_line(&date, &id, &msg, single_daemon));
467 }
468 }
469
470 Ok(())
471 }
472
473 fn output_with_pager(
474 &self,
475 log_lines: Vec<(String, String, String)>,
476 single_daemon: bool,
477 has_time_filter: bool,
478 ) -> Result<()> {
479 let pager_config = PagerConfig::new(!has_time_filter);
481
482 match pager_config.spawn_piped() {
483 Ok(mut child) => {
484 if let Some(stdin) = child.stdin.as_mut() {
485 for (date, id, msg) in log_lines {
486 let line =
487 format!("{}\n", format_log_line(&date, &id, &msg, single_daemon));
488 if stdin.write_all(line.as_bytes()).is_err() {
489 break;
490 }
491 }
492 let _ = child.wait();
493 } else {
494 debug!("Failed to get pager stdin, falling back to direct output");
495 for (date, id, msg) in log_lines {
496 println!("{}", format_log_line(&date, &id, &msg, single_daemon));
497 }
498 }
499 }
500 Err(e) => {
501 debug!(
502 "Failed to spawn pager: {}, falling back to direct output",
503 e
504 );
505 for (date, id, msg) in log_lines {
506 println!("{}", format_log_line(&date, &id, &msg, single_daemon));
507 }
508 }
509 }
510
511 Ok(())
512 }
513
514 fn stream_logs_to_pager(
515 &self,
516 log_files: &BTreeMap<String, LogFile>,
517 single_daemon: bool,
518 raw: bool,
519 ) -> Result<()> {
520 if !io::stdout().is_terminal() || self.no_pager || raw {
521 return self.stream_logs_direct(log_files, single_daemon, raw);
522 }
523
524 let pager_config = PagerConfig::new(true); match pager_config.spawn_piped() {
527 Ok(mut child) => {
528 if let Some(stdin) = child.stdin.take() {
529 let log_files_clone: Vec<_> = log_files
531 .iter()
532 .map(|(name, lf)| (name.clone(), lf.path.clone()))
533 .collect();
534 let single_daemon_clone = single_daemon;
535
536 std::thread::spawn(move || {
538 let mut writer = BufWriter::new(stdin);
539
540 if log_files_clone.len() == 1 {
542 let (name, path) = &log_files_clone[0];
543 let file = match File::open(path) {
544 Ok(f) => f,
545 Err(_) => return,
546 };
547 let parser = StreamingLogParser::new(file);
548 for (timestamp, message) in parser {
549 let output = format!(
550 "{}\n",
551 format_log_line(
552 ×tamp,
553 name,
554 &message,
555 single_daemon_clone
556 )
557 );
558 if writer.write_all(output.as_bytes()).is_err() {
559 return;
560 }
561 }
562 let _ = writer.flush();
563 return;
564 }
565
566 let mut merger: StreamingMerger<StreamingLogParser> =
568 StreamingMerger::new();
569
570 for (name, path) in log_files_clone {
571 let file = match File::open(&path) {
572 Ok(f) => f,
573 Err(_) => continue,
574 };
575 let parser = StreamingLogParser::new(file);
576 merger.add_source(name, parser);
577 }
578
579 merger.initialize();
581
582 for (timestamp, daemon, message) in merger {
584 let output = format!(
585 "{}\n",
586 format_log_line(×tamp, &daemon, &message, single_daemon_clone)
587 );
588 if writer.write_all(output.as_bytes()).is_err() {
589 return;
590 }
591 }
592
593 let _ = writer.flush();
594 });
595
596 let _ = child.wait();
597 } else {
598 debug!("Failed to get pager stdin, falling back to direct output");
599 return self.stream_logs_direct(log_files, single_daemon, raw);
600 }
601 }
602 Err(e) => {
603 debug!(
604 "Failed to spawn pager: {}, falling back to direct output",
605 e
606 );
607 return self.stream_logs_direct(log_files, single_daemon, raw);
608 }
609 }
610
611 Ok(())
612 }
613
614 fn stream_logs_direct(
615 &self,
616 log_files: &BTreeMap<String, LogFile>,
617 single_daemon: bool,
618 raw: bool,
619 ) -> Result<()> {
620 if log_files.len() == 1 {
623 let (name, lf) = log_files.iter().next().unwrap();
624 let file = match File::open(&lf.path) {
625 Ok(f) => f,
626 Err(e) => {
627 error!("{}: {}", lf.path.display(), e);
628 return Ok(());
629 }
630 };
631 let reader = BufReader::new(file);
632 if raw {
633 for line in reader.lines() {
635 match line {
636 Ok(l) => {
637 if io::stdout().write_all(l.as_bytes()).is_err()
638 || io::stdout().write_all(b"\n").is_err()
639 {
640 return Ok(());
641 }
642 }
643 Err(_) => continue,
644 }
645 }
646 } else {
647 let parser = StreamingLogParser::new(File::open(&lf.path).into_diagnostic()?);
649 for (timestamp, message) in parser {
650 let output = format!(
651 "{}\n",
652 format_log_line(×tamp, name, &message, single_daemon)
653 );
654 if io::stdout().write_all(output.as_bytes()).is_err() {
655 return Ok(());
656 }
657 }
658 }
659 return Ok(());
660 }
661
662 let mut merger: StreamingMerger<StreamingLogParser> = StreamingMerger::new();
664
665 for (name, lf) in log_files {
666 let file = match File::open(&lf.path) {
667 Ok(f) => f,
668 Err(e) => {
669 error!("{}: {}", lf.path.display(), e);
670 continue;
671 }
672 };
673 let parser = StreamingLogParser::new(file);
674 merger.add_source(name.clone(), parser);
675 }
676
677 merger.initialize();
679
680 for (timestamp, daemon, message) in merger {
682 let output = if raw {
683 format!("{} {}\n", timestamp, message)
684 } else {
685 format!(
686 "{}\n",
687 format_log_line(×tamp, &daemon, &message, single_daemon)
688 )
689 };
690 if io::stdout().write_all(output.as_bytes()).is_err() {
691 return Ok(());
692 }
693 }
694
695 Ok(())
696 }
697}
698
699fn should_use_pager(line_count: usize) -> bool {
700 if !io::stdout().is_terminal() {
701 return false;
702 }
703
704 let terminal_height = get_terminal_height().unwrap_or(24);
705 line_count > terminal_height
706}
707
708fn get_terminal_height() -> Option<usize> {
709 if let Ok(rows) = std::env::var("LINES")
710 && let Ok(h) = rows.parse::<usize>()
711 {
712 return Some(h);
713 }
714
715 crossterm::terminal::size().ok().map(|(_, h)| h as usize)
716}
717
718fn read_lines_in_time_range(
719 path: &Path,
720 from: Option<DateTime<Local>>,
721 to: Option<DateTime<Local>>,
722) -> Result<Vec<String>> {
723 let mut file = File::open(path).into_diagnostic()?;
724 let file_size = file.metadata().into_diagnostic()?.len();
725
726 if file_size == 0 {
727 return Ok(vec![]);
728 }
729
730 let start_pos = if let Some(from_time) = from {
731 binary_search_log_position(&mut file, file_size, from_time, true)?
732 } else {
733 0
734 };
735
736 let end_pos = if let Some(to_time) = to {
737 binary_search_log_position(&mut file, file_size, to_time, false)?
738 } else {
739 file_size
740 };
741
742 if start_pos >= end_pos {
743 return Ok(vec![]);
744 }
745
746 file.seek(SeekFrom::Start(start_pos)).into_diagnostic()?;
747 let mut reader = BufReader::new(&file);
748 let mut lines = Vec::new();
749 let mut current_pos = start_pos;
750
751 loop {
752 if current_pos >= end_pos {
753 break;
754 }
755
756 let mut line = String::new();
757 match reader.read_line(&mut line) {
758 Ok(0) => break,
759 Ok(bytes_read) => {
760 current_pos += bytes_read as u64;
761 if line.ends_with('\n') {
762 line.pop();
763 if line.ends_with('\r') {
764 line.pop();
765 }
766 }
767 lines.push(line);
768 }
769 Err(_) => break,
770 }
771 }
772
773 Ok(lines)
774}
775
776fn binary_search_log_position(
777 file: &mut File,
778 file_size: u64,
779 target_time: DateTime<Local>,
780 find_start: bool,
781) -> Result<u64> {
782 let mut low: u64 = 0;
783 let mut high: u64 = file_size;
784
785 while low < high {
786 let mid = low + (high - low) / 2;
787
788 let line_start = find_line_start(file, mid)?;
789
790 file.seek(SeekFrom::Start(line_start)).into_diagnostic()?;
791 let mut reader = BufReader::new(&*file);
792 let mut line = String::new();
793 let bytes_read = reader.read_line(&mut line).into_diagnostic()?;
794 if bytes_read == 0 {
795 high = mid;
796 continue;
797 }
798
799 let line_time = extract_timestamp(&line);
800
801 match line_time {
802 Some(lt) => {
803 if find_start {
804 if lt < target_time {
805 low = line_start + bytes_read as u64;
806 } else {
807 high = line_start;
808 }
809 } else if lt <= target_time {
810 low = line_start + bytes_read as u64;
811 } else {
812 high = line_start;
813 }
814 }
815 None => {
816 low = line_start + bytes_read as u64;
817 }
818 }
819 }
820
821 find_line_start(file, low)
822}
823
824fn find_line_start(file: &mut File, pos: u64) -> Result<u64> {
825 if pos == 0 {
826 return Ok(0);
827 }
828
829 let mut search_pos = pos.saturating_sub(1);
831 const CHUNK_SIZE: usize = 8192;
832
833 loop {
834 let chunk_start = search_pos.saturating_sub(CHUNK_SIZE as u64 - 1);
836 let len_u64 = search_pos - chunk_start + 1;
837 let len = len_u64 as usize;
838
839 file.seek(SeekFrom::Start(chunk_start)).into_diagnostic()?;
841 let mut buf = vec![0u8; len];
842 if file.read_exact(&mut buf).is_err() {
843 return Ok(0);
845 }
846
847 for (i, &b) in buf.iter().enumerate().rev() {
849 if b == b'\n' {
850 return Ok(chunk_start + i as u64 + 1);
851 }
852 }
853
854 if chunk_start == 0 {
857 return Ok(0);
858 }
859
860 search_pos = chunk_start - 1;
862 }
863}
864
865fn extract_timestamp(line: &str) -> Option<DateTime<Local>> {
866 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})");
867 re.captures(line)
868 .and_then(|caps| caps.get(1))
869 .and_then(|m| parse_datetime(m.as_str()).ok())
870}
871
872fn merge_log_lines(id: &str, lines: Vec<String>, reverse: bool) -> Vec<(String, String, String)> {
873 let lines = if reverse {
874 lines.into_iter().rev().collect()
875 } else {
876 lines
877 };
878
879 let re = regex!(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) (\w+) (.*)$");
880 lines
881 .into_iter()
882 .fold(vec![], |mut acc, line| match re.captures(&line) {
883 Some(caps) => {
884 let (date, msg) = match (caps.get(1), caps.get(3)) {
885 (Some(d), Some(m)) => (d.as_str().to_string(), m.as_str().to_string()),
886 _ => return acc,
887 };
888 acc.push((date, id.to_string(), msg));
889 acc
890 }
891 None => {
892 if let Some(l) = acc.last_mut() {
893 l.2.push('\n');
894 l.2.push_str(&line);
895 }
896 acc
897 }
898 })
899}
900
901fn get_all_daemon_ids() -> Result<Vec<String>> {
902 Ok(xx::file::ls(&*env::PITCHFORK_LOGS_DIR)?
903 .into_iter()
904 .filter(|d| !d.starts_with("."))
905 .filter(|d| d.is_dir())
906 .filter_map(|d| d.file_name().map(|f| f.to_string_lossy().to_string()))
907 .collect())
908}
909
910fn get_log_file_infos(names: &[String]) -> Result<BTreeMap<String, LogFile>> {
911 let names = names.iter().collect::<HashSet<_>>();
912 xx::file::ls(&*env::PITCHFORK_LOGS_DIR)?
913 .into_iter()
914 .filter(|d| !d.starts_with("."))
915 .filter(|d| d.is_dir())
916 .filter_map(|d| d.file_name().map(|f| f.to_string_lossy().to_string()))
917 .filter(|n| names.is_empty() || names.contains(n))
918 .map(|n| {
919 let path = env::PITCHFORK_LOGS_DIR
920 .join(&n)
921 .join(format!("{n}.log"))
922 .canonicalize()
923 .into_diagnostic()?;
924 let mut file = xx::file::open(&path)?;
925 file.seek(SeekFrom::End(0)).into_diagnostic()?;
928 let cur = file.stream_position().into_diagnostic()?;
929 Ok((
930 n.clone(),
931 LogFile {
932 _name: n,
933 file,
934 cur,
935 path,
936 },
937 ))
938 })
939 .filter_ok(|(_, f)| f.path.exists())
940 .collect::<Result<BTreeMap<_, _>>>()
941}
942
943pub async fn tail_logs(names: &[String]) -> Result<()> {
944 let mut log_files = get_log_file_infos(names)?;
945 let mut wf = WatchFiles::new(Duration::from_millis(10))?;
946
947 for lf in log_files.values() {
948 wf.watch(&lf.path, RecursiveMode::NonRecursive)?;
949 }
950
951 let files_to_name = log_files
952 .iter()
953 .map(|(n, f)| (f.path.clone(), n.clone()))
954 .collect::<HashMap<_, _>>();
955
956 while let Some(paths) = wf.rx.recv().await {
957 let mut out = vec![];
958 for path in paths {
959 let Some(name) = files_to_name.get(&path) else {
960 warn!("Unknown log file changed: {}", path.display());
961 continue;
962 };
963 let Some(info) = log_files.get_mut(name) else {
964 warn!("No log info for: {name}");
965 continue;
966 };
967 info.file
968 .seek(SeekFrom::Start(info.cur))
969 .into_diagnostic()?;
970 let reader = BufReader::new(&info.file);
971 let lines = reader.lines().map_while(Result::ok).collect_vec();
972 info.cur = info.file.stream_position().into_diagnostic()?;
973 out.extend(merge_log_lines(name, lines, false));
974 }
975 let out = out
976 .into_iter()
977 .sorted_by_cached_key(|l| l.0.to_string())
978 .collect_vec();
979 for (date, name, msg) in out {
980 println!("{} {} {}", edim(&date), name, msg);
981 }
982 }
983 Ok(())
984}
985
986struct LogFile {
987 _name: String,
988 path: PathBuf,
989 file: fs::File,
990 cur: u64,
991}
992
993fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
994 let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
995 Local
996 .from_local_datetime(&naive_dt)
997 .single()
998 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
999}
1000
1001fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
1007 let s = s.trim();
1008
1009 if let Ok(dt) = parse_datetime(s) {
1011 return Ok(dt);
1012 }
1013
1014 if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
1016 return Local
1017 .from_local_datetime(&naive_dt)
1018 .single()
1019 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
1020 }
1021
1022 if let Ok(time) = parse_time_only(s) {
1026 let now = Local::now();
1027 let today = now.date_naive();
1028 let mut naive_dt = NaiveDateTime::new(today, time);
1029 let mut dt = Local
1030 .from_local_datetime(&naive_dt)
1031 .single()
1032 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1033
1034 if is_since
1037 && dt > now
1038 && let Some(yesterday) = today.pred_opt()
1039 {
1040 naive_dt = NaiveDateTime::new(yesterday, time);
1041 dt = Local
1042 .from_local_datetime(&naive_dt)
1043 .single()
1044 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
1045 }
1046 return Ok(dt);
1047 }
1048
1049 if let Ok(duration) = humantime::parse_duration(s) {
1050 let now = Local::now();
1051 let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
1052 return Ok(target);
1053 }
1054
1055 Err(miette::miette!(
1056 "Invalid time format: '{}'. Expected formats:\n\
1057 - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
1058 - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
1059 - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
1060 s
1061 ))
1062}
1063
1064fn parse_time_only(s: &str) -> Result<NaiveTime> {
1065 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
1066 return Ok(time);
1067 }
1068
1069 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
1070 return Ok(time);
1071 }
1072
1073 Err(miette::miette!("Invalid time format: '{}'", s))
1074}
1075
1076pub fn print_logs_for_time_range(
1077 daemon_id: &str,
1078 from: DateTime<Local>,
1079 to: Option<DateTime<Local>>,
1080) -> Result<()> {
1081 let daemon_ids = vec![daemon_id.to_string()];
1082 let log_files = get_log_file_infos(&daemon_ids)?;
1083
1084 let from = from
1085 .with_nanosecond(0)
1086 .expect("0 is always valid for nanoseconds");
1087 let to = to.map(|t| {
1088 t.with_nanosecond(0)
1089 .expect("0 is always valid for nanoseconds")
1090 });
1091
1092 let log_lines = log_files
1093 .iter()
1094 .flat_map(
1095 |(name, lf)| match read_lines_in_time_range(&lf.path, Some(from), to) {
1096 Ok(lines) => merge_log_lines(name, lines, false),
1097 Err(e) => {
1098 error!("{}: {}", lf.path.display(), e);
1099 vec![]
1100 }
1101 },
1102 )
1103 .sorted_by_cached_key(|l| l.0.to_string())
1104 .collect_vec();
1105
1106 if log_lines.is_empty() {
1107 eprintln!("No logs found for daemon '{daemon_id}' in the specified time range");
1108 } else {
1109 eprintln!("\n{} {} {}", edim("==="), edim("Error logs"), edim("==="));
1110 for (date, _id, msg) in log_lines {
1111 eprintln!("{} {}", edim(&date), msg);
1112 }
1113 eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1114 }
1115
1116 Ok(())
1117}
1118
1119pub fn print_startup_logs(daemon_id: &str, from: DateTime<Local>) -> Result<()> {
1120 let daemon_ids = vec![daemon_id.to_string()];
1121 let log_files = get_log_file_infos(&daemon_ids)?;
1122
1123 let from = from
1124 .with_nanosecond(0)
1125 .expect("0 is always valid for nanoseconds");
1126
1127 let log_lines = log_files
1128 .iter()
1129 .flat_map(
1130 |(name, lf)| match read_lines_in_time_range(&lf.path, Some(from), None) {
1131 Ok(lines) => merge_log_lines(name, lines, false),
1132 Err(e) => {
1133 error!("{}: {}", lf.path.display(), e);
1134 vec![]
1135 }
1136 },
1137 )
1138 .sorted_by_cached_key(|l| l.0.to_string())
1139 .collect_vec();
1140
1141 if !log_lines.is_empty() {
1142 eprintln!("\n{} {} {}", edim("==="), edim("Startup logs"), edim("==="));
1143 for (date, _id, msg) in log_lines {
1144 eprintln!("{} {}", edim(&date), msg);
1145 }
1146 eprintln!("{} {} {}\n", edim("==="), edim("End of logs"), edim("==="));
1147 }
1148
1149 Ok(())
1150}