Skip to main content

feldera_types/
runtime_status.rs

1use crate::checkpoint::CheckpointMetadata;
2use crate::error::ErrorResponse;
3use actix_web::body::BoxBody;
4use actix_web::http::StatusCode;
5use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder, Responder, ResponseError};
6use bytemuck::NoUninit;
7use clap::ValueEnum;
8use serde::{Deserialize, Serialize};
9use std::collections::VecDeque;
10use std::fmt;
11use std::fmt::Display;
12use utoipa::ToSchema;
13
14/// Runtime status of the pipeline.
15///
16/// Of the statuses, only `Unavailable` is determined by the runner. All other statuses are
17/// determined by the pipeline and taken over by the runner.
18#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, ToSchema, NoUninit)]
19#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
20#[repr(u8)]
21pub enum RuntimeStatus {
22    /// The runner was unable to determine the pipeline runtime status. This status is never
23    /// returned by the pipeline endpoint itself, but only determined by the runner.
24    ///
25    /// It can notably occur in two scenarios:
26    /// 1. The runner is unable to (in time) receive a response for its sent request to the
27    ///    pipeline `/status` endpoint, or it is unable to parse the response.
28    /// 2. The runner received back a `503 Service Unavailable` as a response to the request.
29    ///    This can occur for example if the pipeline is unable to acquire a lock necessary to
30    ///    determine whether it is in any of the other runtime statuses.
31    Unavailable,
32
33    /// The pipeline is waiting for initialization instructions from the
34    /// coordinator.
35    Coordination,
36
37    /// The pipeline is constantly pulling the latest checkpoint from S3 but not processing any inputs.
38    Standby,
39
40    /// The input and output connectors are establishing connections to their data sources and sinks
41    /// respectively.
42    Initializing,
43
44    /// The pipeline was modified since the last checkpoint. User approval is required before
45    /// bootstrapping can proceed.
46    AwaitingApproval,
47
48    /// The pipeline was modified since the last checkpoint, and is currently bootstrapping modified
49    /// views.
50    Bootstrapping,
51
52    /// Input records that were stored in the journal but were not yet processed, are being
53    /// processed first.
54    Replaying,
55
56    /// The input connectors are paused.
57    Paused,
58
59    /// The input connectors are running.
60    Running,
61
62    /// The pipeline finished checkpointing and pausing.
63    Suspended,
64}
65
66impl From<RuntimeDesiredStatus> for RuntimeStatus {
67    fn from(value: RuntimeDesiredStatus) -> Self {
68        match value {
69            RuntimeDesiredStatus::Unavailable => Self::Unavailable,
70            RuntimeDesiredStatus::Coordination => Self::Coordination,
71            RuntimeDesiredStatus::Standby => Self::Standby,
72            RuntimeDesiredStatus::Paused => Self::Paused,
73            RuntimeDesiredStatus::Running => Self::Running,
74            RuntimeDesiredStatus::Suspended => Self::Suspended,
75        }
76    }
77}
78
79#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, ToSchema, ValueEnum)]
80#[cfg_attr(feature = "testing", derive(proptest_derive::Arbitrary))]
81pub enum RuntimeDesiredStatus {
82    Unavailable,
83    Coordination,
84    Standby,
85    Paused,
86    Running,
87    Suspended,
88}
89
90impl RuntimeDesiredStatus {
91    pub fn may_transition_to(&self, target: Self) -> bool {
92        match (*self, target) {
93            (old, new) if old == new => true,
94            (Self::Standby, Self::Paused | Self::Running) => true,
95            (Self::Paused, Self::Running | Self::Suspended) => true,
96            (Self::Running, Self::Paused | Self::Suspended) => true,
97            _ => false,
98        }
99    }
100
101    pub fn may_transition_to_at_startup(&self, target: Self) -> bool {
102        match (*self, target) {
103            (_, Self::Coordination) => true,
104            (Self::Suspended, _) => {
105                // A suspended pipeline must transition to "paused" or
106                // "running".
107                matches!(target, Self::Paused | Self::Running)
108            }
109            (old, new) if old.may_transition_to(new) => true,
110            _ => false,
111        }
112    }
113}
114
115/// Some of our JSON interface uses capitalized status names, like `Paused`, but
116/// other parts use snake-case names, like `paused`.  To support the latter with
117/// `serde`, use this module in the field declaration, e.g.:
118///
119/// ```ignore
120/// #[serde(with = "feldera_types::runtime_status::snake_case_runtime_desired_status")]
121/// ```
122pub mod snake_case_runtime_desired_status {
123    use serde::{Deserialize, Deserializer, Serialize, Serializer};
124
125    use crate::runtime_status::RuntimeDesiredStatus;
126
127    #[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
128    #[serde(rename_all = "snake_case")]
129    enum SnakeRuntimeDesiredStatus {
130        Unavailable,
131        Coordination,
132        Standby,
133        Paused,
134        Running,
135        Suspended,
136    }
137
138    impl From<RuntimeDesiredStatus> for SnakeRuntimeDesiredStatus {
139        fn from(value: RuntimeDesiredStatus) -> Self {
140            match value {
141                RuntimeDesiredStatus::Unavailable => SnakeRuntimeDesiredStatus::Unavailable,
142                RuntimeDesiredStatus::Coordination => SnakeRuntimeDesiredStatus::Coordination,
143                RuntimeDesiredStatus::Standby => SnakeRuntimeDesiredStatus::Standby,
144                RuntimeDesiredStatus::Paused => SnakeRuntimeDesiredStatus::Paused,
145                RuntimeDesiredStatus::Running => SnakeRuntimeDesiredStatus::Running,
146                RuntimeDesiredStatus::Suspended => SnakeRuntimeDesiredStatus::Suspended,
147            }
148        }
149    }
150
151    impl From<SnakeRuntimeDesiredStatus> for RuntimeDesiredStatus {
152        fn from(value: SnakeRuntimeDesiredStatus) -> Self {
153            match value {
154                SnakeRuntimeDesiredStatus::Unavailable => RuntimeDesiredStatus::Unavailable,
155                SnakeRuntimeDesiredStatus::Coordination => RuntimeDesiredStatus::Coordination,
156                SnakeRuntimeDesiredStatus::Standby => RuntimeDesiredStatus::Standby,
157                SnakeRuntimeDesiredStatus::Paused => RuntimeDesiredStatus::Paused,
158                SnakeRuntimeDesiredStatus::Running => RuntimeDesiredStatus::Running,
159                SnakeRuntimeDesiredStatus::Suspended => RuntimeDesiredStatus::Suspended,
160            }
161        }
162    }
163
164    pub fn serialize<S>(value: &RuntimeDesiredStatus, serializer: S) -> Result<S::Ok, S::Error>
165    where
166        S: Serializer,
167    {
168        SnakeRuntimeDesiredStatus::from(*value).serialize(serializer)
169    }
170
171    pub fn deserialize<'de, D>(deserializer: D) -> Result<RuntimeDesiredStatus, D::Error>
172    where
173        D: Deserializer<'de>,
174    {
175        SnakeRuntimeDesiredStatus::deserialize(deserializer).map(|status| status.into())
176    }
177}
178
179#[derive(
180    Debug, Default, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, ToSchema, NoUninit,
181)]
182#[repr(u8)]
183#[serde(rename_all = "snake_case")]
184pub enum BootstrapPolicy {
185    Allow,
186    Reject,
187    #[default]
188    AwaitApproval,
189}
190
191impl TryFrom<Option<String>> for BootstrapPolicy {
192    type Error = ();
193
194    fn try_from(value: Option<String>) -> Result<Self, Self::Error> {
195        match value.as_deref() {
196            Some("allow") => Ok(Self::Allow),
197            Some("reject") => Ok(Self::Reject),
198            Some("await_approval") | None => Ok(Self::AwaitApproval),
199            _ => Err(()),
200        }
201    }
202}
203
204impl From<String> for BootstrapPolicy {
205    fn from(value: String) -> Self {
206        match value.as_str() {
207            "allow" => Self::Allow,
208            "reject" => Self::Reject,
209            "await_approval" => Self::AwaitApproval,
210            _ => panic!("Invalid 'bootstrap_policy' value: {value}"),
211        }
212    }
213}
214
215impl Display for BootstrapPolicy {
216    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217        let s = match self {
218            BootstrapPolicy::Allow => "allow",
219            BootstrapPolicy::Reject => "reject",
220            BootstrapPolicy::AwaitApproval => "await_approval",
221        };
222        write!(f, "{s}")
223    }
224}
225
226/// Details about pipeline storage, which are returned as part of the regular runtime status polling
227/// by the runner.
228#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
229pub struct StorageStatusDetails {
230    /// Present checkpoints.
231    pub checkpoints: VecDeque<CheckpointMetadata>,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
235pub struct ExtendedRuntimeStatus {
236    /// Runtime status of the pipeline.
237    pub runtime_status: RuntimeStatus,
238
239    /// Human-readable details about the runtime status. Its content can contain for instance an
240    /// explanation why it is in this status and any other additional information about it (e.g.,
241    /// progress).
242    pub runtime_status_details: serde_json::Value,
243
244    /// Runtime desired status of the pipeline.
245    pub runtime_desired_status: RuntimeDesiredStatus,
246
247    /// Details about the pipeline persistent storage.
248    ///
249    /// `None` indicates that the pipeline in its current runtime status is unable to check the
250    /// storage status details. Returning `None` _does not_ override the already existing storage
251    /// status details in the database of the runner.
252    pub storage_status_details: Option<StorageStatusDetails>,
253}
254
255impl Responder for ExtendedRuntimeStatus {
256    type Body = BoxBody;
257
258    fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
259        HttpResponseBuilder::new(StatusCode::OK).json(self)
260    }
261}
262
263impl From<ExtendedRuntimeStatus> for HttpResponse<BoxBody> {
264    fn from(value: ExtendedRuntimeStatus) -> Self {
265        HttpResponseBuilder::new(StatusCode::OK).json(value)
266    }
267}
268
269/// Error returned by the pipeline `/status` endpoint.
270#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
271pub struct ExtendedRuntimeStatusError {
272    /// Status code. Returning anything except `503 Service Unavailable` will cause the runner to
273    /// forcefully stop the pipeline.
274    #[serde(with = "status_code")]
275    pub status_code: StatusCode,
276
277    /// Error response.
278    pub error: ErrorResponse,
279}
280
281mod status_code {
282    use actix_web::http::StatusCode;
283    use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error};
284
285    pub fn serialize<S>(value: &StatusCode, serializer: S) -> Result<S::Ok, S::Error>
286    where
287        S: Serializer,
288    {
289        value.as_u16().serialize(serializer)
290    }
291
292    pub fn deserialize<'de, D>(deserializer: D) -> Result<StatusCode, D::Error>
293    where
294        D: Deserializer<'de>,
295    {
296        let value = u16::deserialize(deserializer)?;
297        StatusCode::from_u16(value).map_err(D::Error::custom)
298    }
299}
300
301impl Display for ExtendedRuntimeStatusError {
302    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303        write!(f, "{}: {:?}", self.status_code, self.error)
304    }
305}
306
307impl ResponseError for ExtendedRuntimeStatusError {
308    fn status_code(&self) -> StatusCode {
309        self.status_code
310    }
311
312    fn error_response(&self) -> HttpResponse<BoxBody> {
313        HttpResponseBuilder::new(self.status_code()).json(self.error.clone())
314    }
315}