1use std::fmt::Display;
2
3use serde::{Deserialize, Serialize};
4use thiserror::Error as ThisError;
5use utoipa::ToSchema;
6
7#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
9pub enum SuspendError {
10 Permanent(
16 Vec<PermanentSuspendError>,
18 ),
19
20 Temporary(
23 Vec<TemporarySuspendError>,
25 ),
26}
27
28impl Display for SuspendError {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 match self {
31 SuspendError::Permanent(reasons) => {
32 write!(
33 f,
34 "The pipeline does not support checkpointing for the following reasons:"
35 )?;
36 for (index, reason) in reasons.iter().enumerate() {
37 if index > 0 {
38 write!(f, ",")?;
39 }
40 write!(f, " {reason}")?;
41 }
42 }
43 SuspendError::Temporary(delays) => {
44 write!(
45 f,
46 "Checkpointing the pipeline will be temporarily delayed for the following reasons:"
47 )?;
48 for (index, delay) in delays.iter().enumerate() {
49 if index > 0 {
50 write!(f, ",")?;
51 }
52 write!(f, " {delay}")?;
53 }
54 }
55 }
56 Ok(())
57 }
58}
59
60#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, ThisError, ToSchema)]
62pub enum PermanentSuspendError {
63 #[error("Storage must be configured")]
64 StorageRequired,
65
66 #[error("Suspend is an enterprise feature")]
67 EnterpriseFeature,
68
69 #[error("Input endpoint {0:?} does not support suspend")]
70 UnsupportedInputEndpoint(String),
71}
72
73#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ThisError, ToSchema)]
75pub enum TemporarySuspendError {
76 #[error("The pipeline is replaying the journal")]
77 Replaying,
78
79 #[error("The pipeline is bootstrapping")]
80 Bootstrapping,
81
82 #[error("The pipeline is processing a transaction")]
83 TransactionInProgress,
84
85 #[error("Input endpoint {0:?} is blocking suspend")]
86 InputEndpointBarrier(String),
87
88 #[error("Coordinator is blocking checkpoint")]
89 Coordination,
90}
91
92#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, ToSchema)]
97pub struct SuspendableResponse {
98 pub suspendable: bool,
100
101 pub reasons: Vec<PermanentSuspendError>,
103}
104
105impl SuspendableResponse {
106 pub fn new(suspendable: bool, reasons: Vec<PermanentSuspendError>) -> Self {
108 Self {
109 suspendable,
110 reasons,
111 }
112 }
113}