Skip to main content

feldera_types/
runtime_status.rs

1use crate::error::ErrorResponse;
2use actix_web::body::BoxBody;
3use actix_web::http::StatusCode;
4use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder, Responder, ResponseError};
5use bytemuck::NoUninit;
6use clap::ValueEnum;
7use serde::{Deserialize, Serialize};
8use std::fmt;
9use std::fmt::Display;
10use utoipa::ToSchema;
11
12/// Runtime status of the pipeline.
13///
14/// Of the statuses, only `Unavailable` is determined by the runner. All other statuses are
15/// determined by the pipeline and taken over by the runner.
16#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, ToSchema, NoUninit)]
17#[repr(u8)]
18pub enum RuntimeStatus {
19    /// The runner was unable to determine the pipeline runtime status. This status is never
20    /// returned by the pipeline endpoint itself, but only determined by the runner.
21    ///
22    /// It can notably occur in two scenarios:
23    /// 1. The runner is unable to (in time) receive a response for its sent request to the
24    ///    pipeline `/status` endpoint, or it is unable to parse the response.
25    /// 2. The runner received back a `503 Service Unavailable` as a response to the request.
26    ///    This can occur for example if the pipeline is unable to acquire a lock necessary to
27    ///    determine whether it is in any of the other runtime statuses.
28    Unavailable,
29
30    /// The pipeline is waiting for initialization instructions from the
31    /// coordinator.
32    Coordination,
33
34    /// The pipeline is constantly pulling the latest checkpoint from S3 but not processing any inputs.
35    Standby,
36
37    /// The input and output connectors are establishing connections to their data sources and sinks
38    /// respectively.
39    Initializing,
40
41    /// The pipeline was modified since the last checkpoint. User approval is required before
42    /// bootstrapping can proceed.
43    AwaitingApproval,
44
45    /// The pipeline was modified since the last checkpoint, and is currently bootstrapping modified
46    /// views.
47    Bootstrapping,
48
49    /// Input records that were stored in the journal but were not yet processed, are being
50    /// processed first.
51    Replaying,
52
53    /// The input connectors are paused.
54    Paused,
55
56    /// The input connectors are running.
57    Running,
58
59    /// The pipeline finished checkpointing and pausing.
60    Suspended,
61}
62
63impl From<RuntimeDesiredStatus> for RuntimeStatus {
64    fn from(value: RuntimeDesiredStatus) -> Self {
65        match value {
66            RuntimeDesiredStatus::Unavailable => Self::Unavailable,
67            RuntimeDesiredStatus::Coordination => Self::Coordination,
68            RuntimeDesiredStatus::Standby => Self::Standby,
69            RuntimeDesiredStatus::Paused => Self::Paused,
70            RuntimeDesiredStatus::Running => Self::Running,
71            RuntimeDesiredStatus::Suspended => Self::Suspended,
72        }
73    }
74}
75
76#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, ToSchema, ValueEnum)]
77pub enum RuntimeDesiredStatus {
78    Unavailable,
79    Coordination,
80    Standby,
81    Paused,
82    Running,
83    Suspended,
84}
85
86impl RuntimeDesiredStatus {
87    pub fn may_transition_to(&self, target: Self) -> bool {
88        match (*self, target) {
89            (old, new) if old == new => true,
90            (Self::Standby, Self::Paused | Self::Running) => true,
91            (Self::Paused, Self::Running | Self::Suspended) => true,
92            (Self::Running, Self::Paused | Self::Suspended) => true,
93            _ => false,
94        }
95    }
96
97    pub fn may_transition_to_at_startup(&self, target: Self) -> bool {
98        match (*self, target) {
99            (_, Self::Coordination) => true,
100            (Self::Suspended, _) => {
101                // A suspended pipeline must transition to "paused" or
102                // "running".
103                matches!(target, Self::Paused | Self::Running)
104            }
105            (old, new) if old.may_transition_to(new) => true,
106            _ => false,
107        }
108    }
109}
110
111/// Some of our JSON interface uses capitalized status names, like `Paused`, but
112/// other parts use snake-case names, like `paused`.  To support the latter with
113/// `serde`, use this module in the field declaration, e.g.:
114///
115/// ```ignore
116/// #[serde(with = "feldera_types::runtime_status::snake_case_runtime_desired_status")]
117/// ```
118pub mod snake_case_runtime_desired_status {
119    use serde::{Deserialize, Deserializer, Serialize, Serializer};
120
121    use crate::runtime_status::RuntimeDesiredStatus;
122
123    #[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
124    #[serde(rename_all = "snake_case")]
125    enum SnakeRuntimeDesiredStatus {
126        Unavailable,
127        Coordination,
128        Standby,
129        Paused,
130        Running,
131        Suspended,
132    }
133
134    impl From<RuntimeDesiredStatus> for SnakeRuntimeDesiredStatus {
135        fn from(value: RuntimeDesiredStatus) -> Self {
136            match value {
137                RuntimeDesiredStatus::Unavailable => SnakeRuntimeDesiredStatus::Unavailable,
138                RuntimeDesiredStatus::Coordination => SnakeRuntimeDesiredStatus::Coordination,
139                RuntimeDesiredStatus::Standby => SnakeRuntimeDesiredStatus::Standby,
140                RuntimeDesiredStatus::Paused => SnakeRuntimeDesiredStatus::Paused,
141                RuntimeDesiredStatus::Running => SnakeRuntimeDesiredStatus::Running,
142                RuntimeDesiredStatus::Suspended => SnakeRuntimeDesiredStatus::Suspended,
143            }
144        }
145    }
146
147    impl From<SnakeRuntimeDesiredStatus> for RuntimeDesiredStatus {
148        fn from(value: SnakeRuntimeDesiredStatus) -> Self {
149            match value {
150                SnakeRuntimeDesiredStatus::Unavailable => RuntimeDesiredStatus::Unavailable,
151                SnakeRuntimeDesiredStatus::Coordination => RuntimeDesiredStatus::Coordination,
152                SnakeRuntimeDesiredStatus::Standby => RuntimeDesiredStatus::Standby,
153                SnakeRuntimeDesiredStatus::Paused => RuntimeDesiredStatus::Paused,
154                SnakeRuntimeDesiredStatus::Running => RuntimeDesiredStatus::Running,
155                SnakeRuntimeDesiredStatus::Suspended => RuntimeDesiredStatus::Suspended,
156            }
157        }
158    }
159
160    pub fn serialize<S>(value: &RuntimeDesiredStatus, serializer: S) -> Result<S::Ok, S::Error>
161    where
162        S: Serializer,
163    {
164        SnakeRuntimeDesiredStatus::from(*value).serialize(serializer)
165    }
166
167    pub fn deserialize<'de, D>(deserializer: D) -> Result<RuntimeDesiredStatus, D::Error>
168    where
169        D: Deserializer<'de>,
170    {
171        SnakeRuntimeDesiredStatus::deserialize(deserializer).map(|status| status.into())
172    }
173}
174
175#[derive(
176    Debug, Default, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, ToSchema, NoUninit,
177)]
178#[repr(u8)]
179#[serde(rename_all = "snake_case")]
180pub enum BootstrapPolicy {
181    Allow,
182    Reject,
183    #[default]
184    AwaitApproval,
185}
186
187impl TryFrom<Option<String>> for BootstrapPolicy {
188    type Error = ();
189
190    fn try_from(value: Option<String>) -> Result<Self, Self::Error> {
191        match value.as_deref() {
192            Some("allow") => Ok(Self::Allow),
193            Some("reject") => Ok(Self::Reject),
194            Some("await_approval") | None => Ok(Self::AwaitApproval),
195            _ => Err(()),
196        }
197    }
198}
199
200impl From<String> for BootstrapPolicy {
201    fn from(value: String) -> Self {
202        match value.as_str() {
203            "allow" => Self::Allow,
204            "reject" => Self::Reject,
205            "await_approval" => Self::AwaitApproval,
206            _ => panic!("Invalid 'bootstrap_policy' value: {value}"),
207        }
208    }
209}
210
211impl Display for BootstrapPolicy {
212    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213        let s = match self {
214            BootstrapPolicy::Allow => "allow",
215            BootstrapPolicy::Reject => "reject",
216            BootstrapPolicy::AwaitApproval => "await_approval",
217        };
218        write!(f, "{s}")
219    }
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
223pub struct ExtendedRuntimeStatus {
224    /// Runtime status of the pipeline.
225    pub runtime_status: RuntimeStatus,
226
227    /// Human-readable details about the runtime status. Its content can contain for instance an
228    /// explanation why it is in this status and any other additional information about it (e.g.,
229    /// progress).
230    pub runtime_status_details: serde_json::Value,
231
232    /// Runtime desired status of the pipeline.
233    pub runtime_desired_status: RuntimeDesiredStatus,
234}
235
236impl Responder for ExtendedRuntimeStatus {
237    type Body = BoxBody;
238
239    fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
240        HttpResponseBuilder::new(StatusCode::OK).json(self)
241    }
242}
243
244impl From<ExtendedRuntimeStatus> for HttpResponse<BoxBody> {
245    fn from(value: ExtendedRuntimeStatus) -> Self {
246        HttpResponseBuilder::new(StatusCode::OK).json(value)
247    }
248}
249
250/// Error returned by the pipeline `/status` endpoint.
251#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
252pub struct ExtendedRuntimeStatusError {
253    /// Status code. Returning anything except `503 Service Unavailable` will cause the runner to
254    /// forcefully stop the pipeline.
255    #[serde(with = "status_code")]
256    pub status_code: StatusCode,
257
258    /// Error response.
259    pub error: ErrorResponse,
260}
261
262mod status_code {
263    use actix_web::http::StatusCode;
264    use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error};
265
266    pub fn serialize<S>(value: &StatusCode, serializer: S) -> Result<S::Ok, S::Error>
267    where
268        S: Serializer,
269    {
270        value.as_u16().serialize(serializer)
271    }
272
273    pub fn deserialize<'de, D>(deserializer: D) -> Result<StatusCode, D::Error>
274    where
275        D: Deserializer<'de>,
276    {
277        let value = u16::deserialize(deserializer)?;
278        StatusCode::from_u16(value).map_err(D::Error::custom)
279    }
280}
281
282impl Display for ExtendedRuntimeStatusError {
283    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284        write!(f, "{}: {:?}", self.status_code, self.error)
285    }
286}
287
288impl ResponseError for ExtendedRuntimeStatusError {
289    fn status_code(&self) -> StatusCode {
290        self.status_code
291    }
292
293    fn error_response(&self) -> HttpResponse<BoxBody> {
294        HttpResponseBuilder::new(self.status_code()).json(self.error.clone())
295    }
296}