use std::fmt::Display;
use serde::{Deserialize, Serialize};
use thiserror::Error as ThisError;
use utoipa::ToSchema;
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
pub enum SuspendError {
Permanent(
Vec<PermanentSuspendError>,
),
Temporary(
Vec<TemporarySuspendError>,
),
}
impl Display for SuspendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SuspendError::Permanent(reasons) => {
write!(
f,
"The pipeline does not support checkpointing for the following reasons:"
)?;
for (index, reason) in reasons.iter().enumerate() {
if index > 0 {
write!(f, ",")?;
}
write!(f, " {reason}")?;
}
}
SuspendError::Temporary(delays) => {
write!(
f,
"Checkpointing the pipeline will be temporarily delayed for the following reasons:"
)?;
for (index, delay) in delays.iter().enumerate() {
if index > 0 {
write!(f, ",")?;
}
write!(f, " {delay}")?;
}
}
}
Ok(())
}
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, ThisError, ToSchema)]
pub enum PermanentSuspendError {
#[error("Storage must be configured")]
StorageRequired,
#[error("Suspend is an enterprise feature")]
EnterpriseFeature,
#[error("Input endpoint {0:?} does not support suspend")]
UnsupportedInputEndpoint(String),
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ThisError, ToSchema)]
pub enum TemporarySuspendError {
#[error("The pipeline is replaying the journal")]
Replaying,
#[error("The pipeline is bootstrapping")]
Bootstrapping,
#[error("The pipeline is processing a transaction")]
TransactionInProgress,
#[error("Input endpoint {0:?} is blocking suspend")]
InputEndpointBarrier(String),
#[error("Coordinator is blocking checkpoint")]
Coordination,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, ToSchema)]
pub struct SuspendableResponse {
pub suspendable: bool,
pub reasons: Vec<PermanentSuspendError>,
}
impl SuspendableResponse {
pub fn new(suspendable: bool, reasons: Vec<PermanentSuspendError>) -> Self {
Self {
suspendable,
reasons,
}
}
}