chrome_for_testing_manager/
output.rs1use std::fmt;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use tokio_process_tools::{
6 BroadcastOutputStream, Consumable, Consumer, Delivery, LineParsingOptions, Next, ParseLines,
7 ProcessHandle, ReliableWithBackpressure, Replay, ReplayEnabled,
8};
9use unwrap_infallible::UnwrapInfallible;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
13pub enum DriverOutputSource {
14 Stdout,
16
17 Stderr,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq, Hash)]
23pub struct DriverOutputLine {
24 pub source: DriverOutputSource,
26
27 pub sequence: u64,
32
33 pub line: String,
35}
36
37#[derive(Clone)]
39pub struct DriverOutputListener {
40 on_line: Arc<dyn Fn(DriverOutputLine) + Send + Sync + 'static>,
41}
42
43impl fmt::Debug for DriverOutputListener {
44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 f.debug_struct("DriverOutputListener")
46 .field("on_line", &"<callback>")
47 .finish()
48 }
49}
50
51impl DriverOutputListener {
52 #[must_use]
59 pub fn new(on_line: impl Fn(DriverOutputLine) + Send + Sync + 'static) -> Self {
60 Self {
61 on_line: Arc::new(on_line),
62 }
63 }
64
65 pub(crate) fn emit(&self, line: DriverOutputLine) {
66 (self.on_line)(line);
67 }
68}
69
70pub struct DriverOutputInspectors {
79 stdout: Consumer<()>,
80 stderr: Consumer<()>,
81}
82
83impl fmt::Debug for DriverOutputInspectors {
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85 f.debug_struct("DriverOutputInspectors")
86 .field("stdout_finished", &self.stdout.is_finished())
87 .field("stderr_finished", &self.stderr.is_finished())
88 .finish()
89 }
90}
91
92impl DriverOutputInspectors {
93 pub(crate) fn start(
94 process: &ProcessHandle<BroadcastOutputStream<ReliableWithBackpressure, ReplayEnabled>>,
95 listener: Option<DriverOutputListener>,
96 ) -> Self {
97 let sequence = Arc::new(AtomicU64::new(0));
98 Self {
99 stdout: inspect_output(
100 process.stdout(),
101 DriverOutputSource::Stdout,
102 Arc::clone(&sequence),
103 listener.clone(),
104 ),
105 stderr: inspect_output(
106 process.stderr(),
107 DriverOutputSource::Stderr,
108 sequence,
109 listener,
110 ),
111 }
112 }
113}
114
115fn inspect_output<D, R>(
116 stream: &BroadcastOutputStream<D, R>,
117 source: DriverOutputSource,
118 sequence: Arc<AtomicU64>,
119 listener: Option<DriverOutputListener>,
120) -> Consumer<()>
121where
122 D: Delivery,
123 R: Replay,
124{
125 stream
126 .consume(ParseLines::inspect(
127 LineParsingOptions::default(),
128 move |line| {
129 let line_ref: &str = &line;
130 tracing::debug!(source = ?source, driver_output = line_ref, "driver log");
131
132 if let Some(listener) = &listener {
133 listener.emit(DriverOutputLine {
134 source,
135 sequence: sequence.fetch_add(1, Ordering::SeqCst),
136 line: line.into_owned(),
137 });
138 }
139
140 Next::Continue
141 },
142 ))
143 .unwrap_infallible()
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use assertr::prelude::*;
150 use std::sync::Mutex;
151
152 #[test]
153 fn driver_output_listener_invokes_callback() {
154 let lines = Arc::new(Mutex::new(Vec::new()));
155 let listener = {
156 let lines = Arc::clone(&lines);
157 DriverOutputListener::new(move |line| {
158 lines
159 .lock()
160 .expect("lines mutex should not be poisoned")
161 .push(line);
162 })
163 };
164
165 listener.emit(DriverOutputLine {
166 source: DriverOutputSource::Stdout,
167 sequence: 0,
168 line: "ready".to_owned(),
169 });
170
171 let lines = lines.lock().expect("lines mutex should not be poisoned");
172 assert_that!(lines.as_slice()).contains_exactly([DriverOutputLine {
173 source: DriverOutputSource::Stdout,
174 sequence: 0,
175 line: "ready".to_owned(),
176 }]);
177 }
178}