use std::borrow::Cow;
use std::error::Error;
use std::fmt;
use std::io;
use std::process::ExitStatus;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use crate::ConsumerError;
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum TerminationError {
#[error(
"Failed to send signal to process '{process_name}'.{}",
DisplayAttemptErrors(.attempt_errors.as_slice())
)]
SignalFailed {
process_name: Cow<'static, str>,
attempt_errors: Vec<TerminationAttemptError>,
},
#[error(
"Failed to terminate process '{process_name}'.{}",
DisplayAttemptErrors(.attempt_errors.as_slice())
)]
TerminationFailed {
process_name: Cow<'static, str>,
attempt_errors: Vec<TerminationAttemptError>,
},
}
impl TerminationError {
#[must_use]
pub fn process_name(&self) -> &str {
match self {
Self::SignalFailed { process_name, .. }
| Self::TerminationFailed { process_name, .. } => process_name,
}
}
#[must_use]
pub fn attempt_errors(&self) -> &[TerminationAttemptError] {
match self {
Self::SignalFailed { attempt_errors, .. }
| Self::TerminationFailed { attempt_errors, .. } => attempt_errors,
}
}
}
struct DisplayAttemptErrors<'a>(&'a [TerminationAttemptError]);
impl fmt::Display for DisplayAttemptErrors<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.0.is_empty() {
return write!(f, " No attempt error was recorded.");
}
write!(f, " Attempt errors:")?;
for (index, attempt_error) in self.0.iter().enumerate() {
write!(f, " [{}] {attempt_error}", index + 1)?;
}
Ok(())
}
}
struct DisplaySignalNameSuffix(Option<&'static str>);
impl fmt::Display for DisplaySignalNameSuffix {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Some(signal_name) = self.0 else {
return Ok(());
};
write!(f, " for {signal_name}")
}
}
#[derive(Debug, Error)]
#[error(
"{phase} {operation} failed{}: {source}",
DisplaySignalNameSuffix(*.signal_name)
)]
#[non_exhaustive]
pub struct TerminationAttemptError {
pub phase: TerminationAttemptPhase,
pub operation: TerminationAttemptOperation,
pub signal_name: Option<&'static str>,
#[source]
pub source: Box<dyn Error + Send + Sync + 'static>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum TerminationAttemptPhase {
Preflight,
Interrupt,
Terminate,
Kill,
}
impl fmt::Display for TerminationAttemptPhase {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Preflight => f.write_str("preflight"),
Self::Interrupt => f.write_str("interrupt"),
Self::Terminate => f.write_str("terminate"),
Self::Kill => f.write_str("kill"),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum TerminationAttemptOperation {
CheckStatus,
SendSignal,
WaitForExit,
}
impl fmt::Display for TerminationAttemptOperation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::CheckStatus => f.write_str("status check"),
Self::SendSignal => f.write_str("signal send"),
Self::WaitForExit => f.write_str("exit wait"),
}
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum WaitError {
#[error("IO error occurred while waiting for process '{process_name}': {source}")]
IoError {
process_name: Cow<'static, str>,
#[source]
source: io::Error,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WaitForCompletionResult<T = ExitStatus> {
Completed(T),
Timeout {
timeout: Duration,
},
}
impl<T> WaitForCompletionResult<T> {
#[must_use]
pub fn into_completed(self) -> Option<T> {
match self {
Self::Completed(value) => Some(value),
Self::Timeout { .. } => None,
}
}
pub fn expect_completed(self, message: &str) -> T {
self.into_completed().expect(message)
}
pub(crate) fn map<U>(self, f: impl FnOnce(T) -> U) -> WaitForCompletionResult<U> {
match self {
Self::Completed(value) => WaitForCompletionResult::Completed(f(value)),
Self::Timeout { timeout } => WaitForCompletionResult::Timeout { timeout },
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WaitForCompletionOrTerminateResult<T = ExitStatus> {
Completed(T),
TerminatedAfterTimeout {
result: T,
timeout: Duration,
},
}
impl<T> WaitForCompletionOrTerminateResult<T> {
#[must_use]
pub fn into_result(self) -> T {
match self {
Self::Completed(value) | Self::TerminatedAfterTimeout { result: value, .. } => value,
}
}
#[must_use]
pub fn into_completed(self) -> Option<T> {
match self {
Self::Completed(value) => Some(value),
Self::TerminatedAfterTimeout { .. } => None,
}
}
pub fn expect_completed(self, message: &str) -> T {
self.into_completed().expect(message)
}
pub(crate) fn map<U>(self, f: impl FnOnce(T) -> U) -> WaitForCompletionOrTerminateResult<U> {
match self {
Self::Completed(value) => WaitForCompletionOrTerminateResult::Completed(f(value)),
Self::TerminatedAfterTimeout { result, timeout } => {
WaitForCompletionOrTerminateResult::TerminatedAfterTimeout {
result: f(result),
timeout,
}
}
}
}
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum WaitOrTerminateError {
#[error(
"Waiting for process '{process_name}' failed with '{wait_error}', then cleanup termination completed with status {termination_status}"
)]
WaitFailed {
process_name: Cow<'static, str>,
#[source]
wait_error: Box<WaitError>,
termination_status: ExitStatus,
},
#[error(
"Waiting for process '{process_name}' failed with '{wait_error}', then cleanup termination also failed: {termination_error}"
)]
TerminationFailed {
process_name: Cow<'static, str>,
#[source]
wait_error: Box<WaitError>,
termination_error: TerminationError,
},
#[error(
"Process '{process_name}' did not complete within {timeout:?}, then cleanup termination failed: {termination_error}"
)]
TerminationAfterTimeoutFailed {
process_name: Cow<'static, str>,
timeout: Duration,
#[source]
termination_error: TerminationError,
},
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum WaitWithOutputError {
#[error("Waiting for process completion failed: {0}")]
WaitFailed(#[from] WaitError),
#[error("Wait-or-terminate operation failed: {0}")]
WaitOrTerminateFailed(#[from] WaitOrTerminateError),
#[error("Output collection for process '{process_name}' did not complete within {timeout:?}")]
OutputCollectionTimeout {
process_name: Cow<'static, str>,
timeout: Duration,
},
#[error("Output collection for process '{process_name}' failed: {source}")]
OutputCollectionFailed {
process_name: Cow<'static, str>,
#[source]
source: ConsumerError,
},
#[error("Output collection for process '{process_name}' could not start: {source}")]
OutputCollectionStartFailed {
process_name: Cow<'static, str>,
#[source]
source: StreamConsumerError,
},
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum SpawnError {
#[error("Failed to spawn process '{process_name}': {source}")]
SpawnFailed {
process_name: Cow<'static, str>,
#[source]
source: io::Error,
},
}
#[derive(Debug, Clone, Copy, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum StreamConsumerError {
#[error("Stream '{stream_name}' already has an active consumer")]
ActiveConsumer {
stream_name: &'static str,
},
}
impl StreamConsumerError {
#[must_use]
pub fn stream_name(&self) -> &'static str {
match self {
Self::ActiveConsumer { stream_name } => stream_name,
}
}
}
#[derive(Debug, Clone, Error)]
#[error("Could not read from stream '{stream_name}': {source}")]
pub struct StreamReadError {
stream_name: &'static str,
#[source]
source: Arc<io::Error>,
}
impl StreamReadError {
#[must_use]
pub fn new(stream_name: &'static str, source: io::Error) -> Self {
Self {
stream_name,
source: Arc::new(source),
}
}
#[must_use]
pub fn stream_name(&self) -> &'static str {
self.stream_name
}
#[must_use]
pub fn kind(&self) -> io::ErrorKind {
self.source.kind()
}
#[must_use]
pub fn source_io_error(&self) -> &io::Error {
self.source.as_ref()
}
}
impl PartialEq for StreamReadError {
fn eq(&self, other: &Self) -> bool {
self.stream_name == other.stream_name && self.kind() == other.kind()
}
}
impl Eq for StreamReadError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WaitForLineResult {
Matched,
StreamClosed,
Timeout,
}