1use std::borrow::Cow;
4use std::error::Error;
5use std::fmt;
6use std::io;
7use std::process::ExitStatus;
8use std::sync::Arc;
9use std::time::Duration;
10use thiserror::Error;
11
12use crate::ConsumerError;
13
14#[derive(Debug, Error)]
16#[non_exhaustive]
17pub enum TerminationError {
18 #[error(
20 "Failed to send signal to process '{process_name}'.{}",
21 DisplayAttemptErrors(.attempt_errors.as_slice())
22 )]
23 SignalFailed {
24 process_name: Cow<'static, str>,
26 attempt_errors: Vec<TerminationAttemptError>,
28 },
29
30 #[error(
32 "Failed to terminate process '{process_name}'.{}",
33 DisplayAttemptErrors(.attempt_errors.as_slice())
34 )]
35 TerminationFailed {
36 process_name: Cow<'static, str>,
38 attempt_errors: Vec<TerminationAttemptError>,
40 },
41}
42
43impl TerminationError {
44 #[must_use]
46 pub fn process_name(&self) -> &str {
47 match self {
48 Self::SignalFailed { process_name, .. }
49 | Self::TerminationFailed { process_name, .. } => process_name,
50 }
51 }
52
53 #[must_use]
55 pub fn attempt_errors(&self) -> &[TerminationAttemptError] {
56 match self {
57 Self::SignalFailed { attempt_errors, .. }
58 | Self::TerminationFailed { attempt_errors, .. } => attempt_errors,
59 }
60 }
61}
62
63struct DisplayAttemptErrors<'a>(&'a [TerminationAttemptError]);
64
65impl fmt::Display for DisplayAttemptErrors<'_> {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 if self.0.is_empty() {
68 return write!(f, " No attempt error was recorded.");
69 }
70
71 write!(f, " Attempt errors:")?;
72 for (index, attempt_error) in self.0.iter().enumerate() {
73 write!(f, " [{}] {attempt_error}", index + 1)?;
74 }
75
76 Ok(())
77 }
78}
79
80struct DisplaySignalNameSuffix(Option<&'static str>);
81
82impl fmt::Display for DisplaySignalNameSuffix {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 let Some(signal_name) = self.0 else {
85 return Ok(());
86 };
87
88 write!(f, " for {signal_name}")
89 }
90}
91
92#[derive(Debug, Error)]
94#[error(
95 "{phase} {operation} failed{}: {source}",
96 DisplaySignalNameSuffix(*.signal_name)
97)]
98#[non_exhaustive]
99pub struct TerminationAttemptError {
100 pub phase: TerminationAttemptPhase,
102 pub operation: TerminationAttemptOperation,
104 pub signal_name: Option<&'static str>,
106 #[source]
108 pub source: Box<dyn Error + Send + Sync + 'static>,
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113#[non_exhaustive]
114pub enum TerminationAttemptPhase {
115 Preflight,
117 Interrupt,
119 Terminate,
121 Kill,
123}
124
125impl fmt::Display for TerminationAttemptPhase {
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 match self {
128 Self::Preflight => f.write_str("preflight"),
129 Self::Interrupt => f.write_str("interrupt"),
130 Self::Terminate => f.write_str("terminate"),
131 Self::Kill => f.write_str("kill"),
132 }
133 }
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138#[non_exhaustive]
139pub enum TerminationAttemptOperation {
140 CheckStatus,
142 SendSignal,
144 WaitForExit,
146}
147
148impl fmt::Display for TerminationAttemptOperation {
149 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
150 match self {
151 Self::CheckStatus => f.write_str("status check"),
152 Self::SendSignal => f.write_str("signal send"),
153 Self::WaitForExit => f.write_str("exit wait"),
154 }
155 }
156}
157
158#[derive(Debug, Error)]
160#[non_exhaustive]
161pub enum WaitError {
162 #[error("IO error occurred while waiting for process '{process_name}': {source}")]
164 IoError {
165 process_name: Cow<'static, str>,
167 #[source]
169 source: io::Error,
170 },
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
175pub enum WaitForCompletionResult<T = ExitStatus> {
176 Completed(T),
178
179 Timeout {
181 timeout: Duration,
183 },
184}
185
186impl<T> WaitForCompletionResult<T> {
187 #[must_use]
189 pub fn into_completed(self) -> Option<T> {
190 match self {
191 Self::Completed(value) => Some(value),
192 Self::Timeout { .. } => None,
193 }
194 }
195
196 pub fn expect_completed(self, message: &str) -> T {
202 self.into_completed().expect(message)
203 }
204
205 pub(crate) fn map<U>(self, f: impl FnOnce(T) -> U) -> WaitForCompletionResult<U> {
207 match self {
208 Self::Completed(value) => WaitForCompletionResult::Completed(f(value)),
209 Self::Timeout { timeout } => WaitForCompletionResult::Timeout { timeout },
210 }
211 }
212}
213
214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum WaitForCompletionOrTerminateResult<T = ExitStatus> {
217 Completed(T),
219
220 TerminatedAfterTimeout {
222 result: T,
224 timeout: Duration,
226 },
227}
228
229impl<T> WaitForCompletionOrTerminateResult<T> {
230 #[must_use]
232 pub fn into_result(self) -> T {
233 match self {
234 Self::Completed(value) | Self::TerminatedAfterTimeout { result: value, .. } => value,
235 }
236 }
237
238 #[must_use]
240 pub fn into_completed(self) -> Option<T> {
241 match self {
242 Self::Completed(value) => Some(value),
243 Self::TerminatedAfterTimeout { .. } => None,
244 }
245 }
246
247 pub fn expect_completed(self, message: &str) -> T {
254 self.into_completed().expect(message)
255 }
256
257 pub(crate) fn map<U>(self, f: impl FnOnce(T) -> U) -> WaitForCompletionOrTerminateResult<U> {
259 match self {
260 Self::Completed(value) => WaitForCompletionOrTerminateResult::Completed(f(value)),
261 Self::TerminatedAfterTimeout { result, timeout } => {
262 WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
263 result: f(result),
264 timeout,
265 }
266 }
267 }
268 }
269}
270
271#[derive(Debug, Error)]
273#[non_exhaustive]
274pub enum WaitOrTerminateError {
275 #[error(
277 "Waiting for process '{process_name}' failed with '{wait_error}', then cleanup termination completed with status {termination_status}"
278 )]
279 WaitFailed {
280 process_name: Cow<'static, str>,
282 #[source]
284 wait_error: Box<WaitError>,
285 termination_status: ExitStatus,
287 },
288
289 #[error(
291 "Waiting for process '{process_name}' failed with '{wait_error}', then cleanup termination also failed: {termination_error}"
292 )]
293 TerminationFailed {
294 process_name: Cow<'static, str>,
296 #[source]
298 wait_error: Box<WaitError>,
299 termination_error: TerminationError,
301 },
302
303 #[error(
305 "Process '{process_name}' did not complete within {timeout:?}, then cleanup termination failed: {termination_error}"
306 )]
307 TerminationAfterTimeoutFailed {
308 process_name: Cow<'static, str>,
310 timeout: Duration,
312 #[source]
314 termination_error: TerminationError,
315 },
316}
317
318#[derive(Debug, Error)]
326#[non_exhaustive]
327pub enum WaitWithOutputError {
328 #[error("Waiting for process completion failed: {0}")]
330 WaitFailed(#[from] WaitError),
331
332 #[error("Wait-or-terminate operation failed: {0}")]
334 WaitOrTerminateFailed(#[from] WaitOrTerminateError),
335
336 #[error("Output collection for process '{process_name}' did not complete within {timeout:?}")]
338 OutputCollectionTimeout {
339 process_name: Cow<'static, str>,
341 timeout: Duration,
343 },
344
345 #[error("Output collection for process '{process_name}' failed: {source}")]
347 OutputCollectionFailed {
348 process_name: Cow<'static, str>,
350 #[source]
352 source: ConsumerError,
353 },
354
355 #[error("Output collection for process '{process_name}' could not start: {source}")]
357 OutputCollectionStartFailed {
358 process_name: Cow<'static, str>,
360 #[source]
362 source: StreamConsumerError,
363 },
364}
365
366#[derive(Debug, Error)]
368#[non_exhaustive]
369pub enum SpawnError {
370 #[error("Failed to spawn process '{process_name}': {source}")]
372 SpawnFailed {
373 process_name: Cow<'static, str>,
375 #[source]
377 source: io::Error,
378 },
379}
380
381#[derive(Debug, Clone, Copy, Error, PartialEq, Eq)]
383#[non_exhaustive]
384pub enum StreamConsumerError {
385 #[error("Stream '{stream_name}' already has an active consumer")]
387 ActiveConsumer {
388 stream_name: &'static str,
390 },
391}
392
393impl StreamConsumerError {
394 #[must_use]
396 pub fn stream_name(&self) -> &'static str {
397 match self {
398 Self::ActiveConsumer { stream_name } => stream_name,
399 }
400 }
401}
402
403#[derive(Debug, Clone, Error)]
405#[error("Could not read from stream '{stream_name}': {source}")]
406pub struct StreamReadError {
407 stream_name: &'static str,
408 #[source]
409 source: Arc<io::Error>,
410}
411
412impl StreamReadError {
413 #[must_use]
415 pub fn new(stream_name: &'static str, source: io::Error) -> Self {
416 Self {
417 stream_name,
418 source: Arc::new(source),
419 }
420 }
421
422 #[must_use]
424 pub fn stream_name(&self) -> &'static str {
425 self.stream_name
426 }
427
428 #[must_use]
430 pub fn kind(&self) -> io::ErrorKind {
431 self.source.kind()
432 }
433
434 #[must_use]
436 pub fn source_io_error(&self) -> &io::Error {
437 self.source.as_ref()
438 }
439}
440
441impl PartialEq for StreamReadError {
442 fn eq(&self, other: &Self) -> bool {
443 self.stream_name == other.stream_name && self.kind() == other.kind()
444 }
445}
446
447impl Eq for StreamReadError {}
448
449#[derive(Debug, Clone, Copy, PartialEq, Eq)]
454pub enum WaitForLineResult {
455 Matched,
457
458 StreamClosed,
460
461 Timeout,
463}