use std::borrow::Cow;
use std::error::Error;
use std::fmt;
use std::io;
use std::process::ExitStatus;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use crate::ConsumerError;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum TerminationError {
#[error(
"Failed to send signal to process '{process_name}'.{}",
DisplayAttemptErrors(.attempt_errors.as_slice())
)]
SignalFailed {
process_name: Cow<'static, str>,
attempt_errors: Vec<TerminationAttemptError>,
},
#[error(
"Failed to terminate process '{process_name}'.{}",
DisplayAttemptErrors(.attempt_errors.as_slice())
)]
TerminationFailed {
process_name: Cow<'static, str>,
attempt_errors: Vec<TerminationAttemptError>,
},
}
impl TerminationError {
#[must_use]
pub fn process_name(&self) -> &str {
match self {
Self::SignalFailed { process_name, .. }
| Self::TerminationFailed { process_name, .. } => process_name,
}
}
#[must_use]
pub fn attempt_errors(&self) -> &[TerminationAttemptError] {
match self {
Self::SignalFailed { attempt_errors, .. }
| Self::TerminationFailed { attempt_errors, .. } => attempt_errors,
}
}
}
struct DisplayAttemptErrors<'a>(&'a [TerminationAttemptError]);
impl fmt::Display for DisplayAttemptErrors<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.0.is_empty() {
return write!(f, " No attempt error was recorded.");
}
write!(f, " Attempt errors:")?;
for (index, attempt_error) in self.0.iter().enumerate() {
write!(f, " [{}] {attempt_error}", index + 1)?;
}
Ok(())
}
}
#[derive(Debug, Error)]
#[error("{action} failed")]
#[non_exhaustive]
pub struct TerminationAttemptError {
pub action: TerminationAction,
#[source]
pub source: Box<dyn Error + Send + Sync + 'static>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum TerminationAction {
CheckStatus,
SendSignal {
signal_name: &'static str,
},
WaitForExit,
}
impl fmt::Display for TerminationAction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::CheckStatus => f.write_str("status check"),
Self::SendSignal { signal_name } => write!(f, "send signal {signal_name}"),
Self::WaitForExit => f.write_str("exit wait"),
}
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum WaitError {
#[error("IO error occurred while waiting for process '{process_name}'")]
IoError {
process_name: Cow<'static, str>,
#[source]
source: io::Error,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[must_use = "Discarding the result hides whether the process completed or the wait timed out; \
match on the variants or call `into_completed`/`expect_completed`."]
pub enum WaitForCompletionResult<T = ExitStatus> {
Completed(T),
Timeout {
timeout: Duration,
},
}
impl<T> WaitForCompletionResult<T> {
#[must_use]
pub fn into_completed(self) -> Option<T> {
match self {
Self::Completed(value) => Some(value),
Self::Timeout { .. } => None,
}
}
pub fn expect_completed(self, message: &str) -> T {
self.into_completed().expect(message)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[must_use = "Discarding the result hides whether the process completed naturally or was \
terminated after the wait timeout; match on the variants or call `into_result`."]
pub enum WaitForCompletionOrTerminateResult<T = ExitStatus> {
Completed(T),
TerminatedAfterTimeout {
result: T,
timeout: Duration,
},
}
impl<T> WaitForCompletionOrTerminateResult<T> {
#[must_use]
pub fn into_result(self) -> T {
match self {
Self::Completed(value) | Self::TerminatedAfterTimeout { result: value, .. } => value,
}
}
#[must_use]
pub fn into_completed(self) -> Option<T> {
match self {
Self::Completed(value) => Some(value),
Self::TerminatedAfterTimeout { .. } => None,
}
}
pub fn expect_completed(self, message: &str) -> T {
self.into_completed().expect(message)
}
pub(crate) fn map<U>(self, f: impl FnOnce(T) -> U) -> WaitForCompletionOrTerminateResult<U> {
match self {
Self::Completed(value) => WaitForCompletionOrTerminateResult::Completed(f(value)),
Self::TerminatedAfterTimeout { result, timeout } => {
WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
result: f(result),
timeout,
}
}
}
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum WaitOrTerminateError {
#[error(
"Waiting for process '{process_name}' failed; cleanup termination completed with status {termination_status}"
)]
WaitFailed {
process_name: Cow<'static, str>,
#[source]
wait_error: Box<WaitError>,
termination_status: ExitStatus,
},
#[error("Termination of process '{process_name}' failed after '{wait_error}'.")]
TerminationFailed {
process_name: Cow<'static, str>,
wait_error: Box<WaitError>,
#[source]
termination_error: TerminationError,
},
#[error(
"Process '{process_name}' did not complete within {timeout:?}; cleanup termination failed"
)]
TerminationAfterTimeoutFailed {
process_name: Cow<'static, str>,
timeout: Duration,
#[source]
termination_error: TerminationError,
},
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum WaitWithOutputError {
#[error("Waiting for process completion failed")]
WaitFailed(#[from] WaitError),
#[error("Wait-or-terminate operation failed")]
WaitOrTerminateFailed(#[from] WaitOrTerminateError),
#[error("Output collection for process '{process_name}' did not complete within {timeout:?}")]
OutputCollectionTimeout {
process_name: Cow<'static, str>,
timeout: Duration,
},
#[error("Output collection for process '{process_name}' failed")]
OutputCollectionFailed {
process_name: Cow<'static, str>,
#[source]
source: ConsumerError,
},
#[error("Output collection for process '{process_name}' could not start")]
OutputCollectionStartFailed {
process_name: Cow<'static, str>,
#[source]
source: Box<dyn Error + Send + Sync + 'static>,
},
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum SpawnError {
#[error("Failed to spawn process '{process_name}'")]
SpawnFailed {
process_name: Cow<'static, str>,
#[source]
source: io::Error,
},
#[cfg(windows)]
#[error("Failed to attach spawned process '{process_name}' to a Windows Job Object")]
JobAttachmentFailed {
process_name: Cow<'static, str>,
#[source]
source: io::Error,
},
}
#[derive(Debug, Clone, Copy, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum StreamConsumerError {
#[error("Stream '{stream_name}' already has an active consumer")]
ActiveConsumer {
stream_name: &'static str,
},
}
impl StreamConsumerError {
#[must_use]
pub fn stream_name(&self) -> &'static str {
match self {
Self::ActiveConsumer { stream_name } => stream_name,
}
}
}
#[derive(Debug, Clone, Error)]
#[error("Could not read from stream '{stream_name}'")]
pub struct StreamReadError {
stream_name: &'static str,
#[source]
source: Arc<io::Error>,
}
impl StreamReadError {
#[must_use]
pub fn new(stream_name: &'static str, source: io::Error) -> Self {
Self {
stream_name,
source: Arc::new(source),
}
}
#[must_use]
pub fn stream_name(&self) -> &'static str {
self.stream_name
}
#[must_use]
pub fn kind(&self) -> io::ErrorKind {
self.source.kind()
}
#[must_use]
pub fn source_io_error(&self) -> &io::Error {
self.source.as_ref()
}
}
impl PartialEq for StreamReadError {
fn eq(&self, other: &Self) -> bool {
self.stream_name == other.stream_name && self.kind() == other.kind()
}
}
impl Eq for StreamReadError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WaitForLineResult {
Matched,
StreamClosed,
Timeout,
}
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use assertr::prelude::*;
pub(crate) fn assert_attempt_error(
attempt_error: &TerminationAttemptError,
expected_action: TerminationAction,
expected_kind: io::ErrorKind,
expected_message: &str,
) {
assert_that!(attempt_error.action).is_equal_to(expected_action);
let io_error = attempt_error
.source
.downcast_ref::<io::Error>()
.expect("diagnostic should preserve the original io::Error");
assert_that!(io_error.kind()).is_equal_to(expected_kind);
assert_that!(io_error.to_string().as_str()).contains(expected_message);
}
}