feldera_types/
suspend.rs

1use std::fmt::Display;
2
3use serde::{Deserialize, Serialize};
4use thiserror::Error as ThisError;
5use utoipa::ToSchema;
6
7/// Whether a pipeline supports checkpointing and suspend-and-resume.
8#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
9pub enum SuspendError {
10    /// Pipeline does not support suspend-and-resume.
11    ///
12    /// These reasons only change if the pipeline's configuration changes, e.g.
13    /// if a pipeline has an input connector that does not support
14    /// suspend-and-resume, and then that input connector is removed.
15    Permanent(
16        /// Reasons why the pipeline does not support suspend-and-resume.
17        Vec<PermanentSuspendError>,
18    ),
19
20    /// Pipeline supports suspend-and-resume, but a suspend requested now will
21    /// be delayed.
22    Temporary(
23        /// Reasons that the suspend will be delayed.
24        Vec<TemporarySuspendError>,
25    ),
26}
27
28impl Display for SuspendError {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        match self {
31            SuspendError::Permanent(reasons) => {
32                write!(
33                    f,
34                    "The pipeline does not support checkpointing for the following reasons:"
35                )?;
36                for (index, reason) in reasons.iter().enumerate() {
37                    if index > 0 {
38                        write!(f, ",")?;
39                    }
40                    write!(f, " {reason}")?;
41                }
42            }
43            SuspendError::Temporary(delays) => {
44                write!(
45                    f,
46                    "Checkpointing the pipeline will be temporarily delayed for the following reasons:"
47                )?;
48                for (index, delay) in delays.iter().enumerate() {
49                    if index > 0 {
50                        write!(f, ",")?;
51                    }
52                    write!(f, " {delay}")?;
53                }
54            }
55        }
56        Ok(())
57    }
58}
59
60/// Reasons why a pipeline does not support suspend and resume operations.
61#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ThisError, ToSchema)]
62pub enum PermanentSuspendError {
63    #[error("Storage must be configured")]
64    StorageRequired,
65
66    #[error("Suspend is an enterprise feature")]
67    EnterpriseFeature,
68
69    #[error("Input endpoint {0:?} does not support suspend")]
70    UnsupportedInputEndpoint(String),
71}
72
73/// Reasons why a pipeline cannot be suspended at this time.
74#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ThisError, ToSchema)]
75pub enum TemporarySuspendError {
76    #[error("The pipeline is replaying the journal")]
77    Replaying,
78
79    #[error("The pipeline is bootstrapping")]
80    Bootstrapping,
81
82    #[error("Input endpoint {0:?} is blocking suspend")]
83    InputEndpointBarrier(String),
84}
85
86/// Response to a `/suspendable` request.
87///
88/// Reports whether the pipeline supports suspend and resume operations.
89/// If not, provides the reasons why suspending is not supported.
90#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
91pub struct SuspendableResponse {
92    /// Is the pipeline suspendable?
93    pub suspendable: bool,
94
95    /// If the pipeline is not suspendable, why not?
96    pub reasons: Vec<PermanentSuspendError>,
97}
98
99impl SuspendableResponse {
100    /// Create a new suspendable response.
101    pub fn new(suspendable: bool, reasons: Vec<PermanentSuspendError>) -> Self {
102        Self {
103            suspendable,
104            reasons,
105        }
106    }
107}