Skip to main content

chrome_for_testing_manager/
output.rs

1use std::fmt;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use tokio_process_tools::{
6    BroadcastOutputStream, Consumable, Consumer, Delivery, LineParsingOptions, Next, ParseLines,
7    ProcessHandle, ReliableWithBackpressure, Replay, ReplayEnabled,
8};
9use unwrap_infallible::UnwrapInfallible;
10
11/// The browser-driver output stream source.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
13pub enum DriverOutputSource {
14    /// The browser-driver process stdout stream.
15    Stdout,
16
17    /// The browser-driver process stderr stream.
18    Stderr,
19}
20
21/// One parsed line from the browser-driver process output.
22#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct DriverOutputLine {
24    /// The output stream this line came from.
25    pub source: DriverOutputSource,
26
27    /// Monotonic callback-order sequence number across stdout and stderr.
28    ///
29    /// This is useful for rendering one combined output tail. It does not guarantee the original
30    /// operating-system write order across separate stdout and stderr pipes.
31    pub sequence: u64,
32
33    /// The parsed output line without its trailing newline character.
34    pub line: String,
35}
36
37/// Callback invoked for each parsed browser-driver output line.
38#[derive(Clone)]
39pub struct DriverOutputListener {
40    on_line: Arc<dyn Fn(DriverOutputLine) + Send + Sync + 'static>,
41}
42
43impl fmt::Debug for DriverOutputListener {
44    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45        f.debug_struct("DriverOutputListener")
46            .field("on_line", &"<callback>")
47            .finish()
48    }
49}
50
51impl DriverOutputListener {
52    /// Create a new browser-driver output listener from a callback.
53    ///
54    /// The callback runs synchronously on the line-consumption task and applies backpressure to
55    /// chromedriver's stdout / stderr. Keep it non-blocking: avoid blocking I/O, lock contention,
56    /// or unbounded work. Hand off to a channel or background task if you need to do real work
57    /// per line. Otherwise, a slow callback can stall chromedriver itself.
58    #[must_use]
59    pub fn new(on_line: impl Fn(DriverOutputLine) + Send + Sync + 'static) -> Self {
60        Self {
61            on_line: Arc::new(on_line),
62        }
63    }
64
65    pub(crate) fn emit(&self, line: DriverOutputLine) {
66        (self.on_line)(line);
67    }
68}
69
70/// Long-lived line-inspecting [`Consumer`] handles for the chromedriver process's stdout and
71/// stderr streams.
72///
73/// Returned from [`crate::ChromeForTestingManager::launch_chromedriver`] alongside the process
74/// handle and the bound port. Keep this value alive for as long as you want the configured
75/// [`DriverOutputListener`] to receive lines; dropping it stops the listeners. When using the
76/// high-level [`crate::Chromedriver`] entry point the inspectors are owned for you and dropped
77/// when the chromedriver process is terminated.
78pub struct DriverOutputInspectors {
79    stdout: Consumer<()>,
80    stderr: Consumer<()>,
81}
82
83impl fmt::Debug for DriverOutputInspectors {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        f.debug_struct("DriverOutputInspectors")
86            .field("stdout_finished", &self.stdout.is_finished())
87            .field("stderr_finished", &self.stderr.is_finished())
88            .finish()
89    }
90}
91
92impl DriverOutputInspectors {
93    pub(crate) fn start(
94        process: &ProcessHandle<BroadcastOutputStream<ReliableWithBackpressure, ReplayEnabled>>,
95        listener: Option<DriverOutputListener>,
96    ) -> Self {
97        let sequence = Arc::new(AtomicU64::new(0));
98        Self {
99            stdout: inspect_output(
100                process.stdout(),
101                DriverOutputSource::Stdout,
102                Arc::clone(&sequence),
103                listener.clone(),
104            ),
105            stderr: inspect_output(
106                process.stderr(),
107                DriverOutputSource::Stderr,
108                sequence,
109                listener,
110            ),
111        }
112    }
113}
114
115fn inspect_output<D, R>(
116    stream: &BroadcastOutputStream<D, R>,
117    source: DriverOutputSource,
118    sequence: Arc<AtomicU64>,
119    listener: Option<DriverOutputListener>,
120) -> Consumer<()>
121where
122    D: Delivery,
123    R: Replay,
124{
125    stream
126        .consume(ParseLines::inspect(
127            LineParsingOptions::default(),
128            move |line| {
129                let line_ref: &str = &line;
130                tracing::debug!(source = ?source, driver_output = line_ref, "driver log");
131
132                if let Some(listener) = &listener {
133                    listener.emit(DriverOutputLine {
134                        source,
135                        sequence: sequence.fetch_add(1, Ordering::SeqCst),
136                        line: line.into_owned(),
137                    });
138                }
139
140                Next::Continue
141            },
142        ))
143        .unwrap_infallible()
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149    use assertr::prelude::*;
150    use std::sync::Mutex;
151
152    #[test]
153    fn driver_output_listener_invokes_callback() {
154        let lines = Arc::new(Mutex::new(Vec::new()));
155        let listener = {
156            let lines = Arc::clone(&lines);
157            DriverOutputListener::new(move |line| {
158                lines
159                    .lock()
160                    .expect("lines mutex should not be poisoned")
161                    .push(line);
162            })
163        };
164
165        listener.emit(DriverOutputLine {
166            source: DriverOutputSource::Stdout,
167            sequence: 0,
168            line: "ready".to_owned(),
169        });
170
171        let lines = lines.lock().expect("lines mutex should not be poisoned");
172        assert_that!(lines.as_slice()).contains_exactly([DriverOutputLine {
173            source: DriverOutputSource::Stdout,
174            sequence: 0,
175            line: "ready".to_owned(),
176        }]);
177    }
178}