Skip to main content

feldera_types/
runtime_status.rs

1use crate::checkpoint::CheckpointMetadata;
2use crate::error::ErrorResponse;
3use actix_web::body::BoxBody;
4use actix_web::http::StatusCode;
5use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder, Responder, ResponseError};
6use bytemuck::NoUninit;
7use clap::ValueEnum;
8use serde::{Deserialize, Serialize};
9use std::collections::VecDeque;
10use std::fmt;
11use std::fmt::Display;
12use utoipa::ToSchema;
13
14/// Runtime status of the pipeline.
15///
16/// Of the statuses, only `Unavailable` is determined by the runner. All other statuses are
17/// determined by the pipeline and taken over by the runner.
18#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, ToSchema, NoUninit)]
19#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
20#[repr(u8)]
21pub enum RuntimeStatus {
22    /// The runner was unable to determine the pipeline runtime status. This status is never
23    /// returned by the pipeline endpoint itself, but only determined by the runner.
24    ///
25    /// It can notably occur in two scenarios:
26    /// 1. The runner is unable to (in time) receive a response for its sent request to the
27    ///    pipeline `/status` endpoint, or it is unable to parse the response.
28    /// 2. The runner received back a `503 Service Unavailable` as a response to the request.
29    ///    This can occur for example if the pipeline is unable to acquire a lock necessary to
30    ///    determine whether it is in any of the other runtime statuses.
31    Unavailable,
32
33    /// The pipeline is waiting for initialization instructions from the
34    /// coordinator.
35    Coordination,
36
37    /// The pipeline is constantly pulling the latest checkpoint from S3 but not processing any inputs.
38    Standby,
39
40    /// The input and output connectors are establishing connections to their data sources and sinks
41    /// respectively.
42    Initializing,
43
44    /// The pipeline was modified since the last checkpoint. User approval is required before
45    /// bootstrapping can proceed.
46    AwaitingApproval,
47
48    /// The pipeline was modified since the last checkpoint, and is currently bootstrapping modified
49    /// views.
50    Bootstrapping,
51
52    /// Input records that were stored in the journal but were not yet processed, are being
53    /// processed first.
54    Replaying,
55
56    /// The input connectors are paused.
57    Paused,
58
59    /// The input connectors are running.
60    Running,
61
62    /// The pipeline finished checkpointing and pausing.
63    Suspended,
64}
65
66impl From<RuntimeDesiredStatus> for RuntimeStatus {
67    fn from(value: RuntimeDesiredStatus) -> Self {
68        match value {
69            RuntimeDesiredStatus::Unavailable => Self::Unavailable,
70            RuntimeDesiredStatus::Coordination => Self::Coordination,
71            RuntimeDesiredStatus::Standby => Self::Standby,
72            RuntimeDesiredStatus::Paused => Self::Paused,
73            RuntimeDesiredStatus::Running => Self::Running,
74            RuntimeDesiredStatus::Suspended => Self::Suspended,
75        }
76    }
77}
78
79#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, ToSchema, ValueEnum)]
80#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
81pub enum RuntimeDesiredStatus {
82    Unavailable,
83    Coordination,
84    Standby,
85    Paused,
86    Running,
87    Suspended,
88}
89
90impl RuntimeDesiredStatus {
91    pub fn may_transition_to(&self, target: Self) -> bool {
92        match (*self, target) {
93            (old, new) if old == new => true,
94            (Self::Standby, Self::Paused | Self::Running) => true,
95            (Self::Paused, Self::Running | Self::Suspended) => true,
96            (Self::Running, Self::Paused | Self::Suspended) => true,
97            _ => false,
98        }
99    }
100
101    pub fn may_transition_to_at_startup(&self, target: Self) -> bool {
102        match (*self, target) {
103            (_, Self::Coordination) => true,
104            (Self::Suspended, _) => {
105                // A suspended pipeline must transition to "paused" or
106                // "running".
107                matches!(target, Self::Paused | Self::Running)
108            }
109            (old, new) if old.may_transition_to(new) => true,
110            _ => false,
111        }
112    }
113}
114
115/// Some of our JSON interface uses capitalized status names, like `Paused`, but
116/// other parts use snake-case names, like `paused`.  To support the latter with
117/// `serde`, use this module in the field declaration, e.g.:
118///
119/// ```ignore
120/// #[serde(with = "feldera_types::runtime_status::snake_case_runtime_desired_status")]
121/// ```
122pub mod snake_case_runtime_desired_status {
123    use serde::{Deserialize, Deserializer, Serialize, Serializer};
124
125    use crate::runtime_status::RuntimeDesiredStatus;
126
127    #[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
128    #[serde(rename_all = "snake_case")]
129    enum SnakeRuntimeDesiredStatus {
130        Unavailable,
131        Coordination,
132        Standby,
133        Paused,
134        Running,
135        Suspended,
136    }
137
138    impl From<RuntimeDesiredStatus> for SnakeRuntimeDesiredStatus {
139        fn from(value: RuntimeDesiredStatus) -> Self {
140            match value {
141                RuntimeDesiredStatus::Unavailable => SnakeRuntimeDesiredStatus::Unavailable,
142                RuntimeDesiredStatus::Coordination => SnakeRuntimeDesiredStatus::Coordination,
143                RuntimeDesiredStatus::Standby => SnakeRuntimeDesiredStatus::Standby,
144                RuntimeDesiredStatus::Paused => SnakeRuntimeDesiredStatus::Paused,
145                RuntimeDesiredStatus::Running => SnakeRuntimeDesiredStatus::Running,
146                RuntimeDesiredStatus::Suspended => SnakeRuntimeDesiredStatus::Suspended,
147            }
148        }
149    }
150
151    impl From<SnakeRuntimeDesiredStatus> for RuntimeDesiredStatus {
152        fn from(value: SnakeRuntimeDesiredStatus) -> Self {
153            match value {
154                SnakeRuntimeDesiredStatus::Unavailable => RuntimeDesiredStatus::Unavailable,
155                SnakeRuntimeDesiredStatus::Coordination => RuntimeDesiredStatus::Coordination,
156                SnakeRuntimeDesiredStatus::Standby => RuntimeDesiredStatus::Standby,
157                SnakeRuntimeDesiredStatus::Paused => RuntimeDesiredStatus::Paused,
158                SnakeRuntimeDesiredStatus::Running => RuntimeDesiredStatus::Running,
159                SnakeRuntimeDesiredStatus::Suspended => RuntimeDesiredStatus::Suspended,
160            }
161        }
162    }
163
164    pub fn serialize<S>(value: &RuntimeDesiredStatus, serializer: S) -> Result<S::Ok, S::Error>
165    where
166        S: Serializer,
167    {
168        SnakeRuntimeDesiredStatus::from(*value).serialize(serializer)
169    }
170
171    pub fn deserialize<'de, D>(deserializer: D) -> Result<RuntimeDesiredStatus, D::Error>
172    where
173        D: Deserializer<'de>,
174    {
175        SnakeRuntimeDesiredStatus::deserialize(deserializer).map(|status| status.into())
176    }
177}
178
179#[derive(
180    Debug, Default, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, ToSchema, NoUninit,
181)]
182#[repr(u8)]
183#[serde(rename_all = "snake_case")]
184pub enum BootstrapPolicy {
185    Allow,
186    Reject,
187    #[default]
188    AwaitApproval,
189}
190
191impl TryFrom<Option<String>> for BootstrapPolicy {
192    type Error = ();
193
194    fn try_from(value: Option<String>) -> Result<Self, Self::Error> {
195        match value.as_deref() {
196            Some("allow") => Ok(Self::Allow),
197            Some("reject") => Ok(Self::Reject),
198            Some("await_approval") | None => Ok(Self::AwaitApproval),
199            _ => Err(()),
200        }
201    }
202}
203
204impl From<String> for BootstrapPolicy {
205    fn from(value: String) -> Self {
206        match value.as_str() {
207            "allow" => Self::Allow,
208            "reject" => Self::Reject,
209            "await_approval" => Self::AwaitApproval,
210            _ => panic!("Invalid 'bootstrap_policy' value: {value}"),
211        }
212    }
213}
214
215impl Display for BootstrapPolicy {
216    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217        let s = match self {
218            BootstrapPolicy::Allow => "allow",
219            BootstrapPolicy::Reject => "reject",
220            BootstrapPolicy::AwaitApproval => "await_approval",
221        };
222        write!(f, "{s}")
223    }
224}
225
226/// Bootstrap-related configuration for a deployment start request.
227#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, ToSchema, Default)]
228pub struct BootstrapConfig {
229    /// Bootstrap policy.
230    #[serde(default)]
231    pub bootstrap_policy: Option<BootstrapPolicy>,
232    /// Bootstrap the pipeline with output connectors disabled.
233    #[serde(default)]
234    pub silent_bootstrap: bool,
235}
236
237impl From<BootstrapPolicy> for BootstrapConfig {
238    fn from(bootstrap_policy: BootstrapPolicy) -> Self {
239        Self {
240            bootstrap_policy: Some(bootstrap_policy),
241            silent_bootstrap: false,
242        }
243    }
244}
245
246impl BootstrapConfig {
247    pub fn with_silent_bootstrap(self, silent_bootstrap: bool) -> Self {
248        Self {
249            silent_bootstrap,
250            ..self
251        }
252    }
253
254    /// Returns the bootstrap policy for an active deployment.
255    pub fn active_bootstrap_policy(&self) -> BootstrapPolicy {
256        self.bootstrap_policy
257            .expect("bootstrap policy must be set for an active deployment")
258    }
259}
260
261/// Details about pipeline storage, which are returned as part of the regular runtime status polling
262/// by the runner.
263#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
264pub struct StorageStatusDetails {
265    /// Present checkpoints.
266    pub checkpoints: VecDeque<CheckpointMetadata>,
267}
268
269#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
270pub struct ExtendedRuntimeStatus {
271    /// Runtime status of the pipeline.
272    pub runtime_status: RuntimeStatus,
273
274    /// Human-readable details about the runtime status. Its content can contain for instance an
275    /// explanation why it is in this status and any other additional information about it (e.g.,
276    /// progress).
277    pub runtime_status_details: serde_json::Value,
278
279    /// Runtime desired status of the pipeline.
280    pub runtime_desired_status: RuntimeDesiredStatus,
281
282    /// Details about the pipeline persistent storage.
283    ///
284    /// `None` indicates that the pipeline in its current runtime status is unable to check the
285    /// storage status details. Returning `None` _does not_ override the already existing storage
286    /// status details in the database of the runner.
287    pub storage_status_details: Option<StorageStatusDetails>,
288}
289
290impl Responder for ExtendedRuntimeStatus {
291    type Body = BoxBody;
292
293    fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
294        HttpResponseBuilder::new(StatusCode::OK).json(self)
295    }
296}
297
298impl From<ExtendedRuntimeStatus> for HttpResponse<BoxBody> {
299    fn from(value: ExtendedRuntimeStatus) -> Self {
300        HttpResponseBuilder::new(StatusCode::OK).json(value)
301    }
302}
303
304/// Error returned by the pipeline `/status` endpoint.
305#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
306pub struct ExtendedRuntimeStatusError {
307    /// Status code. Returning anything except `503 Service Unavailable` will cause the runner to
308    /// forcefully stop the pipeline.
309    #[serde(with = "status_code")]
310    pub status_code: StatusCode,
311
312    /// Error response.
313    pub error: ErrorResponse,
314}
315
316mod status_code {
317    use actix_web::http::StatusCode;
318    use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error};
319
320    pub fn serialize<S>(value: &StatusCode, serializer: S) -> Result<S::Ok, S::Error>
321    where
322        S: Serializer,
323    {
324        value.as_u16().serialize(serializer)
325    }
326
327    pub fn deserialize<'de, D>(deserializer: D) -> Result<StatusCode, D::Error>
328    where
329        D: Deserializer<'de>,
330    {
331        let value = u16::deserialize(deserializer)?;
332        StatusCode::from_u16(value).map_err(D::Error::custom)
333    }
334}
335
336impl Display for ExtendedRuntimeStatusError {
337    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338        write!(f, "{}: {:?}", self.status_code, self.error)
339    }
340}
341
342impl ResponseError for ExtendedRuntimeStatusError {
343    fn status_code(&self) -> StatusCode {
344        self.status_code
345    }
346
347    fn error_response(&self) -> HttpResponse<BoxBody> {
348        HttpResponseBuilder::new(self.status_code()).json(self.error.clone())
349    }
350}