Skip to main content

zng_task/process/
tap.rs

1//! Helper types for recording stdout/err and parsing the error output.
2//!
3//! Both [`StdoutTap`] and [`StderrTap`] record a child process stream while still propagating to
4//! the parent stream. After the child closes the stream the recording can be converted to string
5//! and parsed to retrieve data such as a panic printout.
6//!
7//! # ANSI Escape Sequences
8//!
9//! Use [`contains_ansi_csi`] and [`remove_ansi_csi`] to convert styled output to plain text.
10//!
11//! # Panic
12//!
13//! Use the [`PanicInfo::find`] to find and parse the last panic printout from stderr. Use [`PanicInfo::set_hook`]
14//! on the child process to ensure the panic message is formatted in a compatible way.
15
16use std::{
17    collections::VecDeque,
18    fmt,
19    io::{self, BufRead as _, Read, Write as _},
20    process::{ChildStderr, ChildStdout},
21};
22
23use futures_lite::{AsyncRead, AsyncReadExt};
24use zng_txt::{ToTxt as _, Txt, formatx};
25
26/// Record stdout of a child process while also passing though the output to the running process output.
27///
28/// Both blocking and async APIs are provided, the blocking API is slightly more efficient.
29pub struct StdoutTap(StdTap<false>);
30impl fmt::Debug for StdoutTap {
31    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32        f.debug_tuple("StdoutTap").finish_non_exhaustive()
33    }
34}
35impl StdoutTap {
36    /// Start recording and passing.
37    pub fn new_blocking(stream: ChildStdout) -> Self {
38        Self(StdTap::new_blocking(stream))
39    }
40
41    /// Start recording and passing.
42    pub fn new(stream: super::ChildStdout) -> Self {
43        Self(StdTap::new(stream))
44    }
45}
46
47/// Record stderr of a child process while also passing though the output to the running process output.
48///
49/// Both blocking and async APIs are provided, the blocking API is slightly more efficient.
50pub struct StderrTap(StdTap<true>);
51impl fmt::Debug for StderrTap {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        f.debug_tuple("StderrTap").finish_non_exhaustive()
54    }
55}
56impl StderrTap {
57    /// Start recording and passing.
58    pub fn new_blocking(stream: ChildStderr) -> Self {
59        Self(StdTap::new_blocking(stream))
60    }
61
62    /// Start recording and passing.
63    pub fn new(stream: super::ChildStderr) -> Self {
64        Self(StdTap::new(stream))
65    }
66
67    /// Block until the child process closes stderr and attempts to parse the last panic info from it.
68    ///
69    /// If cannot find a panic returns `Err` with the captured stderr converted to [`Txt`].
70    ///
71    /// Note that the exit code for a fatal panic is `101`, checking the exit code is the reliable
72    /// way to verify the child process exited due to panic.
73    pub fn into_panic_blocking(self) -> Result<PanicInfo, Txt> {
74        let s = self.into_string_blocking(false);
75        match PanicInfo::find(&s) {
76            Some(p) => Ok(p),
77            None => Err(s.into()),
78        }
79    }
80
81    /// Await until the child process closes stderr and attempts to parse the last panic info from it.
82    ///
83    /// If cannot find a panic returns `Err` with the captured stderr converted to [`Txt`].
84    ///
85    /// Note that the exit code for a fatal panic is `101`, checking the exit code is the reliable
86    /// way to verify the child process exited due to panic.
87    pub async fn into_panic(self) -> Result<PanicInfo, Txt> {
88        blocking::unblock(move || self.into_panic_blocking()).await
89    }
90}
91
92macro_rules! impl_common {
93    ($($StreamTap:ident;)+) => {
94        $(
95impl $StreamTap {
96    /// Placeholder tap that records nothing.
97    pub fn dummy() -> Self {
98        Self(StdTap::dummy())
99    }
100
101    /// Block until the child process closes the stream and converts the capture to [`String`].
102    pub fn into_string_blocking(self, remove_ansi_csi: bool) -> String {
103        let s = deque_to_string(self.0.capture());
104        if remove_ansi_csi && contains_ansi_csi(&s) {
105            self::remove_ansi_csi_str(&s)
106        } else {
107            s
108        }
109    }
110
111    /// Await until the child process closes the stream and converts the capture to [`String`].
112    pub async fn into_string(self, remove_ansi_csi: bool) -> String {
113        blocking::unblock(move || self.into_string_blocking(remove_ansi_csi)).await
114    }
115
116    /// Block until the child process closes the stream and converts the capture to [`Txt`].
117    pub fn into_txt_blocking(self, remove_ansi_csi: bool) -> Txt {
118        self.into_string_blocking(remove_ansi_csi).into()
119    }
120
121    /// Await until the child process closes the stream and converts the capture to [`Txt`].
122    pub async fn into_txt(self, remove_ansi_csi: bool) -> Txt {
123        blocking::unblock(move || self.into_txt_blocking(remove_ansi_csi)).await
124    }
125}
126        )+
127    };
128}
129impl_common! {
130    StdoutTap;
131    StderrTap;
132}
133
134struct StdTap<const E: bool>(Option<std::thread::JoinHandle<VecDeque<u8>>>);
135
136impl<const E: bool> StdTap<E> {
137    fn new_blocking(std_stream: impl Read + Send + 'static) -> Self {
138        Self(Some(tap(std_stream, E)))
139    }
140
141    fn new(stream: impl AsyncRead + Send + Unpin + 'static) -> Self {
142        Self(Some(tap_async(stream, E)))
143    }
144
145    fn dummy() -> Self {
146        Self(None)
147    }
148
149    fn capture(self) -> VecDeque<u8> {
150        match self.0 {
151            Some(j) => match j.join() {
152                Ok(d) => d,
153                Err(p) => std::panic::resume_unwind(p),
154            },
155            None => VecDeque::new(),
156        }
157    }
158}
159
160fn tap(mut stream: impl Read + Send + 'static, is_err: bool) -> std::thread::JoinHandle<VecDeque<u8>> {
161    tap_thread(is_err)
162        .spawn(move || tap_read_loop(&mut stream, is_err))
163        .expect("failed to spawn thread")
164}
165fn tap_thread(is_err: bool) -> std::thread::Builder {
166    std::thread::Builder::new()
167        .name(format!("{}-reader", if is_err { "stderr" } else { "stdout" }))
168        .stack_size(256 * 1024)
169}
170fn tap_read_loop(stream: &mut dyn Read, is_err: bool) -> VecDeque<u8> {
171    let mut tap = Tap::new();
172    loop {
173        let r = stream.read(&mut tap.buffer);
174        if tap.push(r, is_err) {
175            break;
176        }
177    }
178    tap.rec
179}
180
181fn tap_async(mut stream: impl AsyncRead + Send + Unpin + 'static, is_err: bool) -> std::thread::JoinHandle<VecDeque<u8>> {
182    tap_thread(is_err)
183        .spawn(move || tap_async_read_loop(&mut stream, is_err))
184        .expect("failed to spawn thread")
185}
186
187fn tap_async_read_loop(stream: &mut (dyn AsyncRead + Unpin), is_err: bool) -> VecDeque<u8> {
188    let mut tap = Tap::new();
189    loop {
190        let r = crate::block_on(stream.read(&mut tap.buffer));
191        if tap.push(r, is_err) {
192            break;
193        }
194    }
195    tap.rec
196}
197struct Tap {
198    rec: VecDeque<u8>,
199    buffer: [u8; 16_384],
200}
201impl Tap {
202    fn new() -> Self {
203        Self {
204            rec: VecDeque::with_capacity(16_384),
205            buffer: [0; 16_384],
206        }
207    }
208
209    fn push(&mut self, read_r: io::Result<usize>, is_err: bool) -> bool {
210        const MAX_CAPTURE: usize = 8_388_608;
211
212        match read_r {
213            Ok(n) => {
214                if n == 0 {
215                    return true;
216                }
217
218                let new = &self.buffer[..n];
219                let next_len = self.rec.len() + new.len();
220                if next_len > MAX_CAPTURE {
221                    let overflow = self.rec.len() + new.len() - MAX_CAPTURE;
222                    self.rec.drain(..overflow);
223                }
224                self.rec.extend(new);
225
226                let r = if is_err {
227                    let mut s = std::io::stderr();
228                    s.write_all(new).and_then(|_| s.flush())
229                } else {
230                    let mut s = std::io::stdout();
231                    s.write_all(new).and_then(|_| s.flush())
232                };
233                if let Err(e) = r {
234                    panic!("{} write error, {}", if is_err { "stderr" } else { "stdout" }, e)
235                }
236            }
237            Err(e) => panic!("{} read error, {}", if is_err { "stderr" } else { "stdout" }, e),
238        }
239
240        false
241    }
242}
243
244fn deque_to_string(deq: VecDeque<u8>) -> String {
245    let deq: Vec<u8> = deq.into();
246    match String::from_utf8_lossy(&deq) {
247        std::borrow::Cow::Borrowed(_) => {
248            // SAFETY: from_utf8_lossy only returns `Borrowed` when the input is valid utf-8
249            unsafe { String::from_utf8_unchecked(deq) }
250        }
251        std::borrow::Cow::Owned(s) => s,
252    }
253}
254
255/// Panic parsed from a `stderr` dump.
256///
257/// # Compatibility
258///
259/// The parser can seek only the latest Rust stable panic format, to ensure compatibility call
260/// [`PanicInfo::set_hook`] on the child process is possible.
261#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
262#[non_exhaustive]
263pub struct PanicInfo {
264    /// Name of thread that panicked.
265    pub thread: Txt,
266    /// Panic message.
267    pub message: Txt,
268    /// Path to file that defines the panic.
269    pub file: Txt,
270    /// Line of code that defines the panic.
271    pub line: u32,
272    /// Column in the line of code that defines the panic.
273    pub column: u32,
274    /// Widget where the panic happened.
275    ///
276    /// Only available in processes that use [`PanicInfo::set_hook`].
277    pub widget_path: Txt,
278    /// Stack backtrace.
279    pub backtrace: Txt,
280}
281
282/// Alternate mode `{:#}` writes raw backtrace without cleanup and code snippets.
283///
284/// See also [`PanicInfo::display_no_backtrace`]
285impl fmt::Display for PanicInfo {
286    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287        fmt::Display::fmt(&self.display_no_backtrace(), f)?;
288        if f.alternate() {
289            writeln!(f, "stack backtrace:\n{}", self.backtrace)
290        } else {
291            writeln!(f, "stack backtrace:")?;
292            let mut snippet = 9;
293            for frame in self.backtrace_frames().skip_while(|f| f.is_after_panic) {
294                write!(f, "{frame}")?;
295                if snippet > 0 {
296                    let code = frame.code_snippet();
297                    if !code.is_empty() {
298                        snippet -= 1;
299                        writeln!(f, "{code}")?;
300                    }
301                }
302            }
303            Ok(())
304        }
305    }
306}
307impl PanicInfo {
308    /// Returns an object that implements [`fmt::Display`] to write only the thread name, location, message and widget path.
309    pub fn display_no_backtrace(&self) -> impl fmt::Display {
310        struct D<'a>(&'a PanicInfo);
311        impl<'a> fmt::Display for D<'a> {
312            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
313                let p = &self.0;
314                writeln!(f, "thread '{}' panicked at {}:{}:{}:", p.thread, p.file, p.line, p.column)?;
315                for line in p.message.lines() {
316                    writeln!(f, "   {line}")?;
317                }
318                if !p.widget_path.is_empty() {
319                    writeln!(f, "widget path:\n   {}", p.widget_path)?;
320                }
321                Ok(())
322            }
323        }
324        D(self)
325    }
326}
327impl PanicInfo {
328    /// Gets if `stderr` contains a panic that can be parsed by [`find`].
329    ///
330    /// [`find`]: Self::find
331    pub fn contains(stderr: &str) -> bool {
332        Self::find_impl(stderr, false).is_some()
333    }
334
335    /// Gets if `stderr` contains a panic that can be parsed by [`find`] and traced a widget/window path.
336    ///
337    /// [`find`]: Self::find
338    pub fn contains_widget(stderr: &str) -> bool {
339        match Self::find_impl(stderr, false) {
340            Some(p) => !p.widget_path.is_empty(),
341            None => false,
342        }
343    }
344
345    /// Try parse `stderr` for the last panic printout.
346    ///
347    /// Only reliably works if the panic fully printed correctly and was formatted by
348    /// [`PanicInfo::set_hook`].
349    pub fn find(stderr: &str) -> Option<Self> {
350        Self::find_impl(stderr, true)
351    }
352
353    fn find_impl(stderr: &str, parse: bool) -> Option<Self> {
354        let mut panic_at = usize::MAX;
355        let mut widget_path = usize::MAX;
356        let mut stack_backtrace = usize::MAX;
357        let mut i = 0;
358        for line in stderr.lines() {
359            if line.starts_with("thread '") && line.contains(" panicked at ") && line.ends_with(':') {
360                panic_at = i;
361                widget_path = usize::MAX;
362                stack_backtrace = usize::MAX;
363            } else if line == "widget path:" {
364                widget_path = i + "widget path:\n".len();
365            } else if line == "stack backtrace:" {
366                stack_backtrace = i + "stack backtrace:\n".len();
367            }
368            i += line.len() + "\n".len();
369        }
370
371        if panic_at == usize::MAX {
372            return None;
373        }
374
375        if !parse {
376            return Some(Self {
377                thread: Txt::from(""),
378                message: Txt::from(""),
379                file: Txt::from(""),
380                line: 0,
381                column: 0,
382                widget_path: if widget_path < stderr.len() {
383                    Txt::from("true")
384                } else {
385                    Txt::from("")
386                },
387                backtrace: Txt::from(""),
388            });
389        }
390
391        let panic_str = stderr[panic_at..].lines().next().unwrap();
392        let (thread, location) = panic_str.strip_prefix("thread '").unwrap().split_once(" panicked at ").unwrap();
393        let mut location = location.split(':');
394        let file = location.next().unwrap_or("");
395        let line: u32 = location.next().unwrap_or("0").parse().unwrap_or(0);
396        let column: u32 = location.next().unwrap_or("0").parse().unwrap_or(0);
397        let mut thread = thread.split('\'');
398        let mut thread_name = thread.next().unwrap_or("<unnamed>");
399        let thread_id = thread.next().unwrap_or("");
400        if thread_name == "<unnamed>"
401            && let Some(id) = thread_id.strip_prefix('(')
402            && let Some(id) = id.strip_suffix(')')
403        {
404            thread_name = id;
405        }
406
407        let mut message = String::new();
408        let mut sep = "";
409        for line in stderr[panic_at + panic_str.len() + "\n".len()..].lines() {
410            if let Some(line) = line.strip_prefix("   ") {
411                message.push_str(sep);
412                message.push_str(line);
413                sep = "\n";
414            } else {
415                if message.is_empty() && line != "widget path:" && line != "stack backtrace:" {
416                    // not formatted by us, probably by Rust
417                    line.clone_into(&mut message);
418                }
419                break;
420            }
421        }
422
423        let widget_path = if widget_path < stderr.len() {
424            stderr[widget_path..].lines().next().unwrap().trim()
425        } else {
426            ""
427        };
428
429        let backtrace = if stack_backtrace < stderr.len() {
430            let mut i = stack_backtrace;
431            'backtrace_seek: for line in stderr[stack_backtrace..].lines() {
432                let s = line.trim_start();
433                if s.is_empty() {
434                    break;
435                } else if !s.starts_with("at ") {
436                    for c in s.chars() {
437                        if !c.is_ascii_digit() {
438                            if c != ':' {
439                                break 'backtrace_seek;
440                            }
441                            break;
442                        }
443                    }
444                }
445
446                // matches "\s*\d+:" OR "\s*at "
447                i += line.len() + "\n".len();
448            }
449            &stderr[stack_backtrace..i]
450        } else {
451            ""
452        };
453
454        Some(Self {
455            thread: thread_name.to_txt(),
456            message: message.into(),
457            file: file.to_txt(),
458            line,
459            column,
460            widget_path: widget_path.to_txt(),
461            backtrace: backtrace.to_txt(),
462        })
463    }
464
465    /// Iterate over frames parsed from the `backtrace`.
466    pub fn backtrace_frames(&self) -> impl Iterator<Item = BacktraceFrame> + '_ {
467        BacktraceFrame::parse(&self.backtrace)
468    }
469}
470
471/// Represents a frame parsed from a stack backtrace.
472#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
473#[non_exhaustive]
474pub struct BacktraceFrame {
475    /// Position on the backtrace.
476    pub n: usize,
477
478    /// Function name.
479    pub name: Txt,
480    /// Source code file.
481    pub file: Txt,
482    /// Source code line.
483    pub line: u32,
484
485    /// If this frame is inside the Rust panic code.
486    pub is_after_panic: bool,
487}
488impl fmt::Display for BacktraceFrame {
489    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
490        writeln!(f, "{:>4}: {}", self.n, self.name)?;
491        if !self.file.is_empty() {
492            writeln!(f, "      at {}:{}", self.file, self.line)?;
493        }
494        Ok(())
495    }
496}
497impl BacktraceFrame {
498    /// Iterate over frames parsed from the `backtrace`.
499    pub fn parse(mut backtrace: &str) -> impl Iterator<Item = BacktraceFrame> + '_ {
500        let mut is_after_panic = backtrace.lines().any(|l| l.ends_with("core::panicking::panic_fmt"));
501        std::iter::from_fn(move || {
502            if backtrace.is_empty() {
503                None
504            } else {
505                let n_name = backtrace.lines().next().unwrap();
506                let (n, name) = if let Some((n, name)) = n_name.split_once(':') {
507                    let n = match n.trim_start().parse() {
508                        Ok(n) => n,
509                        Err(_) => {
510                            backtrace = "";
511                            return None;
512                        }
513                    };
514                    let name = name.trim();
515                    if name.is_empty() {
516                        backtrace = "";
517                        return None;
518                    }
519                    (n, name)
520                } else {
521                    backtrace = "";
522                    return None;
523                };
524
525                backtrace = &backtrace[n_name.len() + 1..];
526                let r = if backtrace.trim_start().starts_with("at ") {
527                    let file_line = backtrace.lines().next().unwrap();
528                    let (file, line) = if let Some((file, line)) = file_line.rsplit_once(':') {
529                        let file = file.trim_start().strip_prefix("at ").unwrap();
530                        let line = match line.trim_end().parse() {
531                            Ok(l) => l,
532                            Err(_) => {
533                                backtrace = "";
534                                return None;
535                            }
536                        };
537                        (file, line)
538                    } else {
539                        backtrace = "";
540                        return None;
541                    };
542
543                    backtrace = &backtrace[file_line.len() + 1..];
544
545                    BacktraceFrame {
546                        n,
547                        name: name.to_txt(),
548                        file: file.to_txt(),
549                        line,
550                        is_after_panic,
551                    }
552                } else {
553                    BacktraceFrame {
554                        n,
555                        name: name.to_txt(),
556                        file: Txt::from(""),
557                        line: 0,
558                        is_after_panic,
559                    }
560                };
561
562                if is_after_panic && name.ends_with("core::panicking::panic_fmt") {
563                    is_after_panic = false;
564                }
565
566                Some(r)
567            }
568        })
569    }
570
571    /// Reads the code line + four surrounding lines if the code file can be found.
572    pub fn code_snippet(&self) -> Txt {
573        if !self.file.is_empty()
574            && self.line > 0
575            && let Ok(file) = std::fs::File::open(&self.file)
576        {
577            use std::fmt::Write as _;
578            let mut r = String::new();
579
580            let reader = std::io::BufReader::new(file);
581
582            let line_s = self.line - 2.min(self.line - 1);
583            let lines = reader.lines().skip(line_s as usize - 1).take(5);
584            for (line, line_n) in lines.zip(line_s..) {
585                let line = match line {
586                    Ok(l) => l,
587                    Err(_) => return Txt::from(""),
588                };
589
590                if line_n == self.line {
591                    writeln!(&mut r, "      {line_n:>4} > {line}").unwrap();
592                } else {
593                    writeln!(&mut r, "      {line_n:>4} │ {line}").unwrap();
594                }
595            }
596
597            return r.into();
598        }
599        Txt::from("")
600    }
601}
602impl PanicInfo {
603    /// Set a panic hook that will print panics to stderr in a format compatible with [`PanicInfo`] parsing.
604    ///
605    /// The `widget_trace_path` should be a closure that return `WIDGET.trace_path()` if the process can run
606    /// an `APP`, otherwise it must be `Txt::default`.
607    ///
608    /// The panic hook calls simply [`eprint_panic`].
609    ///
610    /// [`eprint_panic`]: PanicInfo::eprint_panic
611    pub fn set_hook(widget_trace_path: impl Fn() -> Txt + Send + Sync + 'static) {
612        std::panic::set_hook(Box::new(move |a| {
613            let path = widget_trace_path();
614            Self::eprint_panic(a, &path);
615        }));
616    }
617
618    /// Print panic to stderr in a format compatible with [`PanicInfo`] parsing.
619    ///
620    /// This function is called by the hook set by [`set_hook`].
621    ///
622    /// [`set_hook`]: PanicInfo::set_hook
623    pub fn eprint_panic(info: &std::panic::PanicHookInfo, widget_trace_path: &str) {
624        let backtrace = std::backtrace::Backtrace::capture();
625        let panic = PanicFromHook::from_hook(info);
626        if widget_trace_path.is_empty() {
627            eprintln!("{panic}\nstack backtrace:\n{backtrace}");
628        } else {
629            eprintln!("{panic}widget path:\n   {widget_trace_path}\nstack backtrace:\n{backtrace}");
630        }
631    }
632}
633
634#[derive(Debug)]
635pub(crate) struct PanicFromHook {
636    pub thread: Txt,
637    pub msg: Txt,
638    pub file: Txt,
639    pub line: u32,
640    pub column: u32,
641}
642impl PanicFromHook {
643    pub fn from_hook(info: &std::panic::PanicHookInfo) -> Self {
644        let current_thread = std::thread::current();
645        let thread = match current_thread.name() {
646            Some(n) => n.to_txt(),
647            None => formatx!("{:?}", std::thread::current().id()),
648        };
649        let msg = crate::extract_panic_message(info.payload()).unwrap_or("Box<dyn  Any>").to_txt();
650
651        let (file, line, column) = if let Some(l) = info.location() {
652            (l.file(), l.line(), l.column())
653        } else {
654            ("<unknown>", 0, 0)
655        };
656        Self {
657            thread: thread.to_txt(),
658            msg,
659            file: file.to_txt(),
660            line,
661            column,
662        }
663    }
664}
665impl std::error::Error for PanicFromHook {}
666impl fmt::Display for PanicFromHook {
667    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668        writeln!(
669            f,
670            "thread '{}' panicked at {}:{}:{}:",
671            self.thread, self.file, self.line, self.column
672        )?;
673        for line in self.msg.lines() {
674            writeln!(f, "   {line}")?;
675        }
676        Ok(())
677    }
678}
679
680fn remove_ansi_csi_str(mut s: &str) -> String {
681    fn is_esc_end(byte: u8) -> bool {
682        (0x40..=0x7e).contains(&byte)
683    }
684
685    let mut r = String::new();
686    while let Some(i) = s.find(CSI) {
687        r.push_str(&s[..i]);
688        s = &s[i + CSI.len()..];
689        let mut esc_end = 0;
690        while esc_end < s.len() && !is_esc_end(s.as_bytes()[esc_end]) {
691            esc_end += 1;
692        }
693        esc_end += 1;
694        s = &s[esc_end..];
695    }
696    r.push_str(s);
697    r
698}
699
700/// Remove ANSI escape sequences (CSI) from `s`.
701pub fn remove_ansi_csi(s: &str) -> Txt {
702    remove_ansi_csi_str(s).into()
703}
704
705/// If `s` contains ANSI escape sequences (CSI).
706pub fn contains_ansi_csi(s: &str) -> bool {
707    s.contains(CSI)
708}
709
710const CSI: &str = "\x1b[";