1use std::borrow::Cow;
4use std::error::Error;
5use std::fmt;
6use std::io;
7use std::process::ExitStatus;
8use std::sync::Arc;
9use std::time::Duration;
10use thiserror::Error;
11
12use crate::ConsumerError;
13
14#[derive(Debug, Error)]
16#[non_exhaustive]
17pub enum TerminationError {
18 #[error(
20 "Failed to send signal to process '{process_name}'.{}",
21 DisplayAttemptErrors(.attempt_errors.as_slice())
22 )]
23 SignalFailed {
24 process_name: Cow<'static, str>,
26 attempt_errors: Vec<TerminationAttemptError>,
28 },
29
30 #[error(
32 "Failed to terminate process '{process_name}'.{}",
33 DisplayAttemptErrors(.attempt_errors.as_slice())
34 )]
35 TerminationFailed {
36 process_name: Cow<'static, str>,
38 attempt_errors: Vec<TerminationAttemptError>,
40 },
41}
42
43impl TerminationError {
44 #[must_use]
46 pub fn process_name(&self) -> &str {
47 match self {
48 Self::SignalFailed { process_name, .. }
49 | Self::TerminationFailed { process_name, .. } => process_name,
50 }
51 }
52
53 #[must_use]
55 pub fn attempt_errors(&self) -> &[TerminationAttemptError] {
56 match self {
57 Self::SignalFailed { attempt_errors, .. }
58 | Self::TerminationFailed { attempt_errors, .. } => attempt_errors,
59 }
60 }
61}
62
63struct DisplayAttemptErrors<'a>(&'a [TerminationAttemptError]);
64
65impl fmt::Display for DisplayAttemptErrors<'_> {
66 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
67 if self.0.is_empty() {
68 return write!(f, " No attempt error was recorded.");
69 }
70
71 write!(f, " Attempt errors:")?;
72 for (index, attempt_error) in self.0.iter().enumerate() {
73 write!(f, " [{}] {attempt_error}", index + 1)?;
74 }
75
76 Ok(())
77 }
78}
79
80struct DisplaySignalNameSuffix(Option<&'static str>);
81
82impl fmt::Display for DisplaySignalNameSuffix {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 let Some(signal_name) = self.0 else {
85 return Ok(());
86 };
87
88 write!(f, " for {signal_name}")
89 }
90}
91
92#[derive(Debug, Error)]
94#[error(
95 "{phase} {operation} failed{}: {source}",
96 DisplaySignalNameSuffix(*.signal_name)
97)]
98#[non_exhaustive]
99pub struct TerminationAttemptError {
100 pub phase: TerminationAttemptPhase,
102 pub operation: TerminationAttemptOperation,
104 pub signal_name: Option<&'static str>,
106 #[source]
108 pub source: Box<dyn Error + Send + Sync + 'static>,
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113#[non_exhaustive]
114pub enum TerminationAttemptPhase {
115 Preflight,
117
118 Interrupt,
125
126 Terminate,
134
135 Kill,
137}
138
139impl fmt::Display for TerminationAttemptPhase {
140 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
141 match self {
142 Self::Preflight => f.write_str("preflight"),
143 Self::Interrupt => f.write_str("interrupt"),
144 Self::Terminate => f.write_str("terminate"),
145 Self::Kill => f.write_str("kill"),
146 }
147 }
148}
149
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152#[non_exhaustive]
153pub enum TerminationAttemptOperation {
154 CheckStatus,
156 SendSignal,
158 WaitForExit,
160}
161
162impl fmt::Display for TerminationAttemptOperation {
163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164 match self {
165 Self::CheckStatus => f.write_str("status check"),
166 Self::SendSignal => f.write_str("signal send"),
167 Self::WaitForExit => f.write_str("exit wait"),
168 }
169 }
170}
171
172#[derive(Debug, Error)]
174#[non_exhaustive]
175pub enum WaitError {
176 #[error("IO error occurred while waiting for process '{process_name}': {source}")]
178 IoError {
179 process_name: Cow<'static, str>,
181 #[source]
183 source: io::Error,
184 },
185}
186
187#[derive(Debug, Clone, Copy, PartialEq, Eq)]
189pub enum WaitForCompletionResult<T = ExitStatus> {
190 Completed(T),
192
193 Timeout {
195 timeout: Duration,
197 },
198}
199
200impl<T> WaitForCompletionResult<T> {
201 #[must_use]
203 pub fn into_completed(self) -> Option<T> {
204 match self {
205 Self::Completed(value) => Some(value),
206 Self::Timeout { .. } => None,
207 }
208 }
209
210 pub fn expect_completed(self, message: &str) -> T {
216 self.into_completed().expect(message)
217 }
218
219 pub(crate) fn map<U>(self, f: impl FnOnce(T) -> U) -> WaitForCompletionResult<U> {
221 match self {
222 Self::Completed(value) => WaitForCompletionResult::Completed(f(value)),
223 Self::Timeout { timeout } => WaitForCompletionResult::Timeout { timeout },
224 }
225 }
226}
227
228#[derive(Debug, Clone, Copy, PartialEq, Eq)]
230pub enum WaitForCompletionOrTerminateResult<T = ExitStatus> {
231 Completed(T),
233
234 TerminatedAfterTimeout {
236 result: T,
238 timeout: Duration,
240 },
241}
242
243impl<T> WaitForCompletionOrTerminateResult<T> {
244 #[must_use]
246 pub fn into_result(self) -> T {
247 match self {
248 Self::Completed(value) | Self::TerminatedAfterTimeout { result: value, .. } => value,
249 }
250 }
251
252 #[must_use]
254 pub fn into_completed(self) -> Option<T> {
255 match self {
256 Self::Completed(value) => Some(value),
257 Self::TerminatedAfterTimeout { .. } => None,
258 }
259 }
260
261 pub fn expect_completed(self, message: &str) -> T {
268 self.into_completed().expect(message)
269 }
270
271 pub(crate) fn map<U>(self, f: impl FnOnce(T) -> U) -> WaitForCompletionOrTerminateResult<U> {
273 match self {
274 Self::Completed(value) => WaitForCompletionOrTerminateResult::Completed(f(value)),
275 Self::TerminatedAfterTimeout { result, timeout } => {
276 WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
277 result: f(result),
278 timeout,
279 }
280 }
281 }
282 }
283}
284
285#[derive(Debug, Error)]
287#[non_exhaustive]
288pub enum WaitOrTerminateError {
289 #[error(
291 "Waiting for process '{process_name}' failed with '{wait_error}', then cleanup termination completed with status {termination_status}"
292 )]
293 WaitFailed {
294 process_name: Cow<'static, str>,
296 #[source]
298 wait_error: Box<WaitError>,
299 termination_status: ExitStatus,
301 },
302
303 #[error(
305 "Waiting for process '{process_name}' failed with '{wait_error}', then cleanup termination also failed: {termination_error}"
306 )]
307 TerminationFailed {
308 process_name: Cow<'static, str>,
310 #[source]
312 wait_error: Box<WaitError>,
313 termination_error: TerminationError,
315 },
316
317 #[error(
319 "Process '{process_name}' did not complete within {timeout:?}, then cleanup termination failed: {termination_error}"
320 )]
321 TerminationAfterTimeoutFailed {
322 process_name: Cow<'static, str>,
324 timeout: Duration,
326 #[source]
328 termination_error: TerminationError,
329 },
330}
331
332#[derive(Debug, Error)]
340#[non_exhaustive]
341pub enum WaitWithOutputError {
342 #[error("Waiting for process completion failed: {0}")]
344 WaitFailed(#[from] WaitError),
345
346 #[error("Wait-or-terminate operation failed: {0}")]
348 WaitOrTerminateFailed(#[from] WaitOrTerminateError),
349
350 #[error("Output collection for process '{process_name}' did not complete within {timeout:?}")]
352 OutputCollectionTimeout {
353 process_name: Cow<'static, str>,
355 timeout: Duration,
357 },
358
359 #[error("Output collection for process '{process_name}' failed: {source}")]
361 OutputCollectionFailed {
362 process_name: Cow<'static, str>,
364 #[source]
366 source: ConsumerError,
367 },
368
369 #[error("Output collection for process '{process_name}' could not start: {source}")]
371 OutputCollectionStartFailed {
372 process_name: Cow<'static, str>,
374 #[source]
376 source: StreamConsumerError,
377 },
378}
379
380#[derive(Debug, Error)]
382#[non_exhaustive]
383pub enum SpawnError {
384 #[error("Failed to spawn process '{process_name}': {source}")]
386 SpawnFailed {
387 process_name: Cow<'static, str>,
389 #[source]
391 source: io::Error,
392 },
393}
394
395#[derive(Debug, Clone, Copy, Error, PartialEq, Eq)]
397#[non_exhaustive]
398pub enum StreamConsumerError {
399 #[error("Stream '{stream_name}' already has an active consumer")]
401 ActiveConsumer {
402 stream_name: &'static str,
404 },
405}
406
407impl StreamConsumerError {
408 #[must_use]
410 pub fn stream_name(&self) -> &'static str {
411 match self {
412 Self::ActiveConsumer { stream_name } => stream_name,
413 }
414 }
415}
416
417#[derive(Debug, Clone, Error)]
419#[error("Could not read from stream '{stream_name}': {source}")]
420pub struct StreamReadError {
421 stream_name: &'static str,
422 #[source]
423 source: Arc<io::Error>,
424}
425
426impl StreamReadError {
427 #[must_use]
429 pub fn new(stream_name: &'static str, source: io::Error) -> Self {
430 Self {
431 stream_name,
432 source: Arc::new(source),
433 }
434 }
435
436 #[must_use]
438 pub fn stream_name(&self) -> &'static str {
439 self.stream_name
440 }
441
442 #[must_use]
444 pub fn kind(&self) -> io::ErrorKind {
445 self.source.kind()
446 }
447
448 #[must_use]
450 pub fn source_io_error(&self) -> &io::Error {
451 self.source.as_ref()
452 }
453}
454
455impl PartialEq for StreamReadError {
456 fn eq(&self, other: &Self) -> bool {
457 self.stream_name == other.stream_name && self.kind() == other.kind()
458 }
459}
460
461impl Eq for StreamReadError {}
462
463#[derive(Debug, Clone, Copy, PartialEq, Eq)]
468pub enum WaitForLineResult {
469 Matched,
471
472 StreamClosed,
474
475 Timeout,
477}