1use crate::daemon_id::DaemonId;
2use crate::log_store::sqlite::LOG_STORE;
3use crate::log_store::{LogQuery, LogStore};
4use crate::pitchfork_toml::PitchforkToml;
5use crate::state_file::StateFile;
6use crate::ui::style::{edim, estyle, ndim};
7use crate::{Result, env};
8use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, TimeZone};
9use console;
10use itertools::Itertools;
11use miette::IntoDiagnostic;
12use std::collections::BTreeSet;
13use std::io::{self, IsTerminal, Write};
14use std::process::{Child, Command, Stdio};
15use std::time::Duration;
16
17struct PagerConfig {
19 command: String,
20 args: Vec<String>,
21}
22
23impl PagerConfig {
24 fn new(start_at_end: bool) -> Self {
27 let command = std::env::var("PAGER").unwrap_or_else(|_| "less".to_string());
28 let args = Self::build_args(&command, start_at_end);
29 Self { command, args }
30 }
31
32 fn build_args(pager: &str, start_at_end: bool) -> Vec<String> {
33 let mut args = vec![];
34 if pager == "less" {
35 args.push("-R".to_string());
36 if start_at_end {
37 args.push("+G".to_string());
38 }
39 }
40 args
41 }
42
43 fn spawn_piped(&self) -> io::Result<Child> {
45 Command::new(&self.command)
46 .args(&self.args)
47 .stdin(Stdio::piped())
48 .spawn()
49 }
50}
51
52fn format_log_line(
58 date: &str,
59 id: &str,
60 msg: &str,
61 single_daemon: bool,
62 id_width: usize,
63 strip_ansi: bool,
64) -> String {
65 let msg = if strip_ansi {
66 console::strip_ansi_codes(msg).to_string()
67 } else {
68 msg.to_string()
69 };
70 if single_daemon {
71 format!("{} {}", ndim(date), msg)
72 } else {
73 let colors_on = !strip_ansi && console::colors_enabled();
74 let colored = dimmed_id(id, colors_on);
75 let padded = console::pad_str(&colored, id_width, console::Alignment::Left, None);
76 format!("{} {} {}", padded, ndim(date), msg)
77 }
78}
79
80fn dimmed_id(id: &str, colors_enabled: bool) -> String {
84 if !colors_enabled {
85 return id.to_string();
86 }
87 let colors = [
88 (180, 120, 120), (180, 160, 100), (120, 180, 120), (120, 180, 180), (180, 120, 180), (120, 160, 180), ];
95 let mut h: usize = 0x811C_9DC5; for b in id.bytes() {
97 h = h.wrapping_mul(0x0100_0193).wrapping_add(b as usize);
98 }
99 let (r, g, b) = colors[h % colors.len()];
100 format!("\x1b[2;38;2;{};{};{}m{}\x1b[0m", r, g, b, id)
101}
102
103pub fn colored_id_label(id: &str, colors_enabled: bool) -> String {
106 if !colors_enabled {
107 return format!("[{}]", id);
108 }
109 let colors: [u8; 4] = [34, 35, 36, 32]; let mut h: usize = 0x811C_9DC5; for b in id.bytes() {
114 h = h.wrapping_mul(0x0100_0193).wrapping_add(b as usize);
115 }
116 let color = colors[h % colors.len()];
117 format!("\x1b[{color}m[{id}]\x1b[0m")
118}
119
120#[derive(Debug, clap::Args)]
122#[clap(
123 visible_alias = "l",
124 verbatim_doc_comment,
125 long_about = "\
126Displays logs for daemon(s)
127
128Shows logs from managed daemons. Logs are stored in the pitchfork logs directory
129and include timestamps for filtering.
130
131Examples:
132 pitchfork logs api Show all logs for 'api' (paged if needed)
133 pitchfork logs api worker Show logs for multiple daemons
134 pitchfork logs Show logs for all daemons
135 pitchfork logs api -n 50 Show last 50 lines
136 pitchfork logs api --follow Follow logs in real-time
137 pitchfork logs api --since '2024-01-15 10:00:00'
138 Show logs since a specific time (forward)
139 pitchfork logs api --since '10:30:00'
140 Show logs since 10:30:00 today
141 pitchfork logs api --since '10:30' --until '12:00'
142 Show logs since 10:30:00 until 12:00:00 today
143 pitchfork logs api --since 5min Show logs from last 5 minutes
144 pitchfork logs api --raw Output raw log lines without formatting
145 pitchfork logs api --raw -n 100 Output last 100 raw log lines
146 pitchfork logs api --clear Delete logs for 'api'
147 pitchfork logs --clear Delete logs for all daemons"
148)]
149pub struct Logs {
150 id: Vec<String>,
152
153 #[clap(short, long)]
155 clear: bool,
156
157 #[clap(short)]
162 n: Option<usize>,
163
164 #[clap(short = 't', short_alias = 'f', long, visible_alias = "follow")]
166 tail: bool,
167
168 #[clap(short = 's', long)]
175 since: Option<String>,
176
177 #[clap(short = 'u', long)]
183 until: Option<String>,
184
185 #[clap(long)]
187 no_pager: bool,
188
189 #[clap(long)]
191 raw: bool,
192}
193
194impl Logs {
195 pub async fn run(&self) -> Result<()> {
196 migrate_legacy_log_dirs();
199
200 let resolved_ids: Vec<DaemonId> = if self.id.is_empty() {
202 get_all_daemon_ids()?
204 } else {
205 PitchforkToml::resolve_ids(&self.id)?
206 };
207
208 if self.clear {
209 LOG_STORE.clear(&resolved_ids)?;
210 return Ok(());
211 }
212
213 let from = if let Some(since) = self.since.as_ref() {
214 Some(parse_time_input(since, true)?)
215 } else {
216 None
217 };
218 let to = if let Some(until) = self.until.as_ref() {
219 Some(parse_time_input(until, false)?)
220 } else {
221 None
222 };
223
224 let single_daemon = resolved_ids.len() == 1;
225 self.print_existing_logs(&resolved_ids, from, to, single_daemon, self.tail)?;
226 if self.tail {
227 tail_logs(&resolved_ids, single_daemon, true).await?;
228 }
229
230 Ok(())
231 }
232
233 fn print_existing_logs(
234 &self,
235 resolved_ids: &[DaemonId],
236 from: Option<DateTime<Local>>,
237 to: Option<DateTime<Local>>,
238 single_daemon: bool,
239 force_no_pager: bool,
240 ) -> Result<()> {
241 let daemon_ids: Vec<String> = resolved_ids.iter().map(|id| id.qualified()).collect();
242 let id_width = daemon_ids.iter().map(|id| id.len()).max().unwrap_or(0);
243 trace!("log query for: {}", daemon_ids.iter().join(", "));
244 let has_time_filter = from.is_some() || to.is_some();
245
246 let opts = LogQuery {
247 daemon_ids: daemon_ids.clone(),
248 from,
249 to,
250 limit: if !has_time_filter { self.n } else { None },
251 order_desc: !has_time_filter,
252 after_id: None,
253 };
254 let entries = LOG_STORE.query(&opts)?;
255 let log_lines: Vec<(String, String, String)> = entries
256 .into_iter()
257 .map(|e| {
258 let ts = e.timestamp.format("%Y-%m-%d %H:%M:%S").to_string();
259 (ts, e.daemon_id, e.message)
260 })
261 .collect();
262
263 let log_lines = if has_time_filter {
264 if let Some(n) = self.n {
265 let len = log_lines.len();
266 if len > n {
267 log_lines.into_iter().skip(len - n).collect_vec()
268 } else {
269 log_lines
270 }
271 } else {
272 log_lines
273 }
274 } else if let Some(n) = self.n {
275 let len = log_lines.len();
276 if len > n {
277 log_lines.into_iter().skip(len - n).rev().collect_vec()
278 } else {
279 log_lines.into_iter().rev().collect_vec()
280 }
281 } else {
282 log_lines.into_iter().rev().collect_vec()
283 };
284
285 self.output_logs(
286 log_lines,
287 single_daemon,
288 id_width,
289 has_time_filter,
290 self.raw,
291 force_no_pager,
292 )?;
293 Ok(())
294 }
295
296 fn output_logs(
297 &self,
298 log_lines: Vec<(String, String, String)>,
299 single_daemon: bool,
300 id_width: usize,
301 has_time_filter: bool,
302 raw: bool,
303 force_no_pager: bool,
304 ) -> Result<()> {
305 if log_lines.is_empty() {
306 return Ok(());
307 }
308
309 let strip_ansi = raw || !console::colors_enabled();
310
311 if raw {
313 for (date, id, msg) in log_lines {
314 let line = format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi);
315 println!("{line}");
316 }
317 return Ok(());
318 }
319
320 let use_pager = !force_no_pager && !self.no_pager && should_use_pager(log_lines.len());
321
322 if use_pager {
323 self.output_with_pager(
324 log_lines,
325 single_daemon,
326 id_width,
327 has_time_filter,
328 strip_ansi,
329 )?;
330 } else {
331 for (date, id, msg) in log_lines {
332 println!(
333 "{}",
334 format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
335 );
336 }
337 }
338
339 Ok(())
340 }
341
342 fn output_with_pager(
343 &self,
344 log_lines: Vec<(String, String, String)>,
345 single_daemon: bool,
346 id_width: usize,
347 has_time_filter: bool,
348 strip_ansi: bool,
349 ) -> Result<()> {
350 let pager_config = PagerConfig::new(!has_time_filter);
352
353 match pager_config.spawn_piped() {
354 Ok(mut child) => {
355 if let Some(stdin) = child.stdin.as_mut() {
356 for (date, id, msg) in log_lines {
357 let line = format!(
358 "{}\n",
359 format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
360 );
361 if stdin.write_all(line.as_bytes()).is_err() {
362 break;
363 }
364 }
365 let _ = child.wait();
366 } else {
367 debug!("Failed to get pager stdin, falling back to direct output");
368 for (date, id, msg) in log_lines {
369 println!(
370 "{}",
371 format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
372 );
373 }
374 }
375 }
376 Err(e) => {
377 debug!("Failed to spawn pager: {e}, falling back to direct output");
378 for (date, id, msg) in log_lines {
379 println!(
380 "{}",
381 format_log_line(&date, &id, &msg, single_daemon, id_width, strip_ansi)
382 );
383 }
384 }
385 }
386
387 Ok(())
388 }
389}
390
391fn should_use_pager(line_count: usize) -> bool {
392 if !io::stdout().is_terminal() {
393 return false;
394 }
395
396 let terminal_height = get_terminal_height().unwrap_or(24);
397 line_count > terminal_height
398}
399
400fn get_terminal_height() -> Option<usize> {
401 if let Ok(rows) = std::env::var("LINES")
402 && let Ok(h) = rows.parse::<usize>()
403 {
404 return Some(h);
405 }
406
407 crossterm::terminal::size().ok().map(|(_, h)| h as usize)
408}
409
410fn migrate_legacy_log_dirs() {
420 let known_safe_paths = known_daemon_safe_paths();
421 let dirs = match xx::file::ls(&*env::PITCHFORK_LOGS_DIR) {
422 Ok(d) => d,
423 Err(_) => return,
424 };
425 for dir in dirs {
426 if dir.starts_with(".") || !dir.is_dir() {
427 continue;
428 }
429 let name = match dir.file_name().map(|f| f.to_string_lossy().to_string()) {
430 Some(n) => n,
431 None => continue,
432 };
433 if name == "pitchfork" {
435 continue;
436 }
437 if name.contains("--") {
440 if DaemonId::from_safe_path(&name).is_ok() {
443 continue;
444 }
445 if known_safe_paths.contains(&name) {
448 continue;
449 }
450 warn!(
451 "Skipping invalid legacy log directory '{name}': contains '--' but is not a valid daemon safe-path"
452 );
453 continue;
454 }
455
456 let old_log = dir.join(format!("{name}.log"));
459 if !old_log.exists() {
460 continue;
461 }
462 if DaemonId::try_new("legacy", &name).is_err() {
463 warn!("Skipping invalid legacy log directory '{name}': not a valid daemon ID");
464 continue;
465 }
466
467 let new_name = format!("legacy--{name}");
468 let new_dir = env::PITCHFORK_LOGS_DIR.join(&new_name);
469 if new_dir.exists() {
471 continue;
472 }
473 if std::fs::rename(&dir, &new_dir).is_err() {
474 continue;
475 }
476 let old_log = new_dir.join(format!("{name}.log"));
478 let new_log = new_dir.join(format!("{new_name}.log"));
479 if old_log.exists() {
480 let _ = std::fs::rename(&old_log, &new_log);
481 }
482 debug!("Migrated legacy log dir '{name}' → '{new_name}'");
483 }
484}
485
486fn known_daemon_safe_paths() -> BTreeSet<String> {
487 let mut out = BTreeSet::new();
488
489 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
490 Ok(state) => {
491 for id in state.daemons.keys() {
492 out.insert(id.safe_path());
493 }
494 }
495 Err(e) => {
496 warn!("Failed to read state while checking known daemon IDs: {e}");
497 }
498 }
499
500 match PitchforkToml::all_merged() {
501 Ok(config) => {
502 for id in config.daemons.keys() {
503 out.insert(id.safe_path());
504 }
505 }
506 Err(e) => {
507 warn!("Failed to read config while checking known daemon IDs: {e}");
508 }
509 }
510
511 out
512}
513
514fn get_all_daemon_ids() -> Result<Vec<DaemonId>> {
515 let mut ids = BTreeSet::new();
516
517 match StateFile::read(&*env::PITCHFORK_STATE_FILE) {
518 Ok(state) => ids.extend(state.daemons.keys().cloned()),
519 Err(e) => warn!("Failed to read state for log daemon discovery: {e}"),
520 }
521
522 match PitchforkToml::all_merged() {
523 Ok(config) => ids.extend(config.daemons.keys().cloned()),
524 Err(e) => warn!("Failed to read config for log daemon discovery: {e}"),
525 }
526
527 let logged_ids: std::collections::HashSet<String> =
528 LOG_STORE.list_daemon_ids()?.into_iter().collect();
529 Ok(ids
530 .into_iter()
531 .filter(|id| logged_ids.contains(&id.qualified()))
532 .collect())
533}
534
535pub async fn tail_logs(
536 names: &[DaemonId],
537 single_daemon: bool,
538 start_from_end: bool,
539) -> Result<()> {
540 let id_width = names
542 .iter()
543 .map(|id| id.qualified().len())
544 .max()
545 .unwrap_or(0);
546
547 let strip_ansi = !console::colors_enabled();
548
549 let mut states: std::collections::HashMap<String, i64> = names
550 .iter()
551 .map(|id| {
552 let since = if start_from_end {
553 match LOG_STORE.query(&LogQuery {
554 daemon_ids: vec![id.qualified()],
555 from: None,
556 to: None,
557 limit: Some(1),
558 order_desc: true,
559 after_id: None,
560 }) {
561 Ok(entries) => entries.first().map(|e| e.id).unwrap_or(0),
562 Err(_) => 0,
563 }
564 } else {
565 0
566 };
567 (id.qualified(), since)
568 })
569 .collect();
570
571 let interval = tokio::time::interval(Duration::from_millis(200));
572 tokio::pin!(interval);
573
574 loop {
575 interval.tick().await;
576
577 let mut out = vec![];
578 for id in names {
579 let after_id = states.get(&id.qualified()).copied();
580 match LOG_STORE.tail(id, after_id) {
581 Ok(entries) => {
582 for entry in &entries {
583 let ts = entry.timestamp.format("%Y-%m-%d %H:%M:%S").to_string();
584 out.push((ts, entry.daemon_id.clone(), entry.message.clone()));
585 }
586 if let Some(last) = entries.last() {
587 states.insert(id.qualified(), last.id);
588 }
589 }
590 Err(e) => {
591 error!("Failed to tail logs for {}: {e}", id.qualified());
592 }
593 }
594 }
595
596 if !out.is_empty() {
597 let out = out
598 .into_iter()
599 .sorted_by(|a, b| (&a.0, &a.1).cmp(&(&b.0, &b.1)))
600 .collect_vec();
601 for (date, name, msg) in out {
602 println!(
603 "{}",
604 format_log_line(&date, &name, &msg, single_daemon, id_width, strip_ansi)
605 );
606 }
607 }
608 }
609}
610
611fn parse_datetime(s: &str) -> Result<DateTime<Local>> {
612 let naive_dt = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S").into_diagnostic()?;
613 Local
614 .from_local_datetime(&naive_dt)
615 .single()
616 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'. ", s))
617}
618
619fn parse_time_input(s: &str, is_since: bool) -> Result<DateTime<Local>> {
625 let s = s.trim();
626
627 if let Ok(dt) = parse_datetime(s) {
629 return Ok(dt);
630 }
631
632 if let Ok(naive_dt) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M") {
634 return Local
635 .from_local_datetime(&naive_dt)
636 .single()
637 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s));
638 }
639
640 if let Ok(time) = parse_time_only(s) {
644 let now = Local::now();
645 let today = now.date_naive();
646 let mut naive_dt = NaiveDateTime::new(today, time);
647 let mut dt = Local
648 .from_local_datetime(&naive_dt)
649 .single()
650 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
651
652 if is_since
655 && dt > now
656 && let Some(yesterday) = today.pred_opt()
657 {
658 naive_dt = NaiveDateTime::new(yesterday, time);
659 dt = Local
660 .from_local_datetime(&naive_dt)
661 .single()
662 .ok_or_else(|| miette::miette!("Invalid or ambiguous datetime: '{}'", s))?;
663 }
664 return Ok(dt);
665 }
666
667 if let Ok(duration) = humantime::parse_duration(s) {
668 let now = Local::now();
669 let target = now - chrono::Duration::from_std(duration).into_diagnostic()?;
670 return Ok(target);
671 }
672
673 Err(miette::miette!(
674 "Invalid time format: '{}'. Expected formats:\n\
675 - Full datetime: \"YYYY-MM-DD HH:MM:SS\" or \"YYYY-MM-DD HH:MM\"\n\
676 - Time only: \"HH:MM:SS\" or \"HH:MM\" (uses today's date)\n\
677 - Relative time: \"5min\", \"2h\", \"1d\" (e.g., last 5 minutes)",
678 s
679 ))
680}
681
682fn parse_time_only(s: &str) -> Result<NaiveTime> {
683 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M:%S") {
684 return Ok(time);
685 }
686
687 if let Ok(time) = NaiveTime::parse_from_str(s, "%H:%M") {
688 return Ok(time);
689 }
690
691 Err(miette::miette!("Invalid time format: '{}'", s))
692}
693
694pub fn print_error_logs_block(log_lines: &[(String, String, String)]) {
704 if log_lines.is_empty() {
705 return;
706 }
707
708 let is_tty = std::io::stderr().is_terminal();
709 let format_msg = |msg: &str| -> String {
710 let stripped = strip_pty_controls(msg);
711 if is_tty {
712 stripped
713 } else {
714 console::strip_ansi_codes(&stripped).to_string()
715 }
716 };
717
718 let tag = estyle(" ERROR LOGS ").white().on_red();
719 eprintln!("\n{tag}");
720
721 let unique_ids: BTreeSet<&str> = log_lines.iter().map(|(_, id, _)| id.as_str()).collect();
723 let show_id = unique_ids.len() > 1;
724
725 if show_id {
726 let id_width = log_lines
727 .iter()
728 .map(|(_, id, _)| console::measure_text_width(id))
729 .max()
730 .unwrap_or(0);
731 for (date, id, msg) in log_lines {
732 let time = date.split(' ').nth(1).unwrap_or(date);
733 let colored = dimmed_id(id, is_tty && console::colors_enabled_stderr());
734 let padded = console::pad_str(&colored, id_width, console::Alignment::Left, None);
735 eprintln!(
736 "{} {} {}",
737 padded,
738 estyle(time).red().dim(),
739 format_msg(msg)
740 );
741 }
742 } else {
743 for (date, _, msg) in log_lines {
744 let time = date.split(' ').nth(1).unwrap_or(date);
745 eprintln!("{} {}", estyle(time).red().dim(), format_msg(msg));
746 }
747 }
748}
749
750pub enum ReadyCheckType {
752 Output(String),
753 Http(String),
754 Port(u16),
755 Cmd(String),
756 Delay(u64),
757 Default,
758}
759
760impl std::fmt::Display for ReadyCheckType {
761 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
762 match self {
763 ReadyCheckType::Output(pattern) => write!(f, "output matching '{pattern}'"),
764 ReadyCheckType::Http(url) => write!(f, "HTTP {url}"),
765 ReadyCheckType::Port(port) => write!(f, "TCP port {port}"),
766 ReadyCheckType::Cmd(cmd) => write!(f, "command '{cmd}'"),
767 ReadyCheckType::Delay(secs) => write!(f, "delay ({secs}s)"),
768 ReadyCheckType::Default => write!(f, "default readiness check"),
769 }
770 }
771}
772
773pub fn create_ready_check_job(
779 daemon_id: &DaemonId,
780 check_type: &ReadyCheckType,
781) -> std::sync::Arc<clx::progress::ProgressJob> {
782 use clx::progress::{ProgressJobBuilder, ProgressJobDoneBehavior, ProgressStatus};
783
784 let is_tty = std::io::stderr().is_terminal();
785 let colors_enabled = is_tty && console::colors_enabled_stderr();
786 let id_label = colored_id_label(&daemon_id.qualified(), colors_enabled);
787 let show_ts = crate::settings::settings().general.startup_log_timestamps;
788
789 let prefix = if show_ts {
793 edim(chrono::Local::now().format("%H:%M:%S").to_string()).to_string()
796 } else {
797 "{{spinner()}}".to_string()
798 };
799
800 ProgressJobBuilder::new()
801 .body(format!(
802 "{} {} waiting for {{{{ check_type }}}}...",
803 prefix, id_label
804 ))
805 .prop("check_type", &check_type.to_string())
806 .status(ProgressStatus::Running)
807 .on_done(ProgressJobDoneBehavior::Keep)
808 .start()
809}
810
811pub fn collect_startup_logs(
816 daemon_id: &DaemonId,
817 from: DateTime<Local>,
818) -> Result<Vec<(String, String, String)>> {
819 let entries = LOG_STORE.query(&LogQuery {
820 daemon_ids: vec![daemon_id.qualified()],
821 from: Some(from),
822 to: None,
823 limit: None,
824 order_desc: false,
825 after_id: None,
826 })?;
827 let log_lines = entries
828 .into_iter()
829 .map(|e| {
830 let ts = e.timestamp.format("%Y-%m-%d %H:%M:%S").to_string();
831 (ts, e.daemon_id, e.message)
832 })
833 .collect();
834
835 Ok(log_lines)
836}
837
838pub fn stream_startup_logs(
844 daemon_id: &DaemonId,
845 from: DateTime<Local>,
846 job: std::sync::Arc<clx::progress::ProgressJob>,
847) -> (
848 tokio::sync::watch::Sender<bool>,
849 tokio::task::JoinHandle<()>,
850) {
851 let (tx, mut rx) = tokio::sync::watch::channel(false);
852 let id = daemon_id.clone();
853
854 let show_ts = crate::settings::settings().general.startup_log_timestamps;
855
856 let handle = tokio::spawn(async move {
857 let is_tty = std::io::stderr().is_terminal();
858 let colors_enabled = is_tty && console::colors_enabled_stderr();
859 let id_label = colored_id_label(&id.qualified(), colors_enabled);
860 let prefix = if show_ts {
861 String::new()
862 } else {
863 edim("•").to_string()
864 };
865
866 let mut last_id: i64 = 0;
867
868 let initial_entries = LOG_STORE.query(&LogQuery {
870 daemon_ids: vec![id.qualified()],
871 from: Some(from),
872 to: None,
873 limit: None,
874 order_desc: false,
875 after_id: None,
876 });
877
878 if let Ok(entries) = initial_entries {
879 for entry in &entries {
880 let time = entry.timestamp.format("%H:%M:%S").to_string();
881 let msg = strip_pty_controls(&entry.message);
882 let msg = if is_tty {
883 msg
884 } else {
885 console::strip_ansi_codes(&msg).to_string()
886 };
887 let line_prefix = if show_ts {
888 edim(time).to_string()
889 } else {
890 prefix.clone()
891 };
892 job.println(&format!("{} {} {}", line_prefix, id_label, msg));
893 }
894 if let Some(last) = entries.last() {
895 last_id = last.id;
896 }
897 }
898
899 loop {
900 tokio::select! {
901 _ = tokio::time::sleep(Duration::from_millis(200)) => {
902 if let Ok(entries) = LOG_STORE.tail(&id, Some(last_id)) {
903 for entry in &entries {
904 let time = entry.timestamp.format("%H:%M:%S").to_string();
905 let msg = strip_pty_controls(&entry.message);
906 let msg = if is_tty {
907 msg
908 } else {
909 console::strip_ansi_codes(&msg).to_string()
910 };
911 let line_prefix = if show_ts {
912 edim(time).to_string()
913 } else {
914 prefix.clone()
915 };
916 job.println(&format!("{} {} {}", line_prefix, id_label, msg));
917 }
918 if let Some(last) = entries.last() {
919 last_id = last.id;
920 }
921 }
922 }
923 _ = rx.changed() => {
924 break;
925 }
926 }
927 }
928
929 if let Ok(entries) = LOG_STORE.tail(&id, Some(last_id)) {
931 for entry in &entries {
932 let time = entry.timestamp.format("%H:%M:%S").to_string();
933 let msg = strip_pty_controls(&entry.message);
934 let msg = if is_tty {
935 msg
936 } else {
937 console::strip_ansi_codes(&msg).to_string()
938 };
939 let line_prefix = if show_ts {
940 edim(time).to_string()
941 } else {
942 prefix.clone()
943 };
944 job.println(&format!("{} {} {}", line_prefix, id_label, msg));
945 }
946 }
947 });
948
949 (tx, handle)
950}
951
952fn strip_pty_controls(s: &str) -> String {
957 struct Stripper {
958 result: String,
959 }
960
961 impl vte::Perform for Stripper {
962 fn print(&mut self, c: char) {
963 self.result.push(c);
964 }
965
966 fn execute(&mut self, byte: u8) {
967 if byte == b'\n' || byte == b'\t' {
969 self.result.push(byte as char);
970 }
971 }
972
973 fn csi_dispatch(
974 &mut self,
975 params: &vte::Params,
976 _intermediates: &[u8],
977 _ignore: bool,
978 action: char,
979 ) {
980 if action == 'm' {
982 self.result.push_str("\x1b[");
983 let mut first = true;
984 for sub in params.iter() {
985 if !first {
986 self.result.push(';');
987 }
988 first = false;
989 for (i, &p) in sub.iter().enumerate() {
990 if i > 0 {
991 self.result.push(':');
992 }
993 self.result.push_str(&p.to_string());
994 }
995 }
996 self.result.push('m');
997 }
998 }
1000
1001 fn osc_dispatch(&mut self, _params: &[&[u8]], _bell_terminated: bool) {
1002 }
1004
1005 fn esc_dispatch(&mut self, _intermediates: &[u8], _ignore: bool, _byte: u8) {
1006 }
1008
1009 fn hook(
1010 &mut self,
1011 _params: &vte::Params,
1012 _intermediates: &[u8],
1013 _ignore: bool,
1014 _action: char,
1015 ) {
1016 }
1018
1019 fn put(&mut self, _byte: u8) {
1020 }
1022
1023 fn unhook(&mut self) {
1024 }
1026 }
1027
1028 let mut parser = vte::Parser::new();
1029 let mut stripper = Stripper {
1030 result: String::with_capacity(s.len()),
1031 };
1032 parser.advance(&mut stripper, s.as_bytes());
1033 stripper.result
1034}