feldera_types/
runtime_status.rs

1use crate::error::ErrorResponse;
2use actix_web::body::BoxBody;
3use actix_web::http::StatusCode;
4use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder, Responder, ResponseError};
5use serde::{Deserialize, Serialize};
6use std::fmt;
7use std::fmt::Display;
8use utoipa::ToSchema;
9
10/// Runtime status of the pipeline.
11///
12/// Of the statuses, only `Unavailable` is determined by the runner. All other statuses are
13/// determined by the pipeline and taken over by the runner.
14#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
15pub enum RuntimeStatus {
16    /// The runner was unable to determine the pipeline runtime status. This status is never
17    /// returned by the pipeline endpoint itself, but only determined by the runner.
18    ///
19    /// It can notably occur in two scenarios:
20    /// 1. The runner is unable to (in time) receive a response for its sent request to the
21    ///    pipeline `/status` endpoint, or it is unable to parse the response.
22    /// 2. The runner received back a `503 Service Unavailable` as a response to the request.
23    ///    This can occur for example if the pipeline is unable to acquire a lock necessary to
24    ///    determine whether it is in any of the other runtime statuses.
25    Unavailable,
26
27    /// The pipeline is constantly pulling the latest checkpoint from S3 but not processing any inputs.
28    Standby,
29
30    /// The input and output connectors are establishing connections to their data sources and sinks
31    /// respectively.
32    Initializing,
33
34    /// The pipeline was modified since the last checkpoint. User approval is required before
35    /// bootstrapping can proceed.
36    AwaitingApproval,
37
38    /// The pipeline was modified since the last checkpoint, and is currently bootstrapping modified
39    /// views.
40    Bootstrapping,
41
42    /// Input records that were stored in the journal but were not yet processed, are being
43    /// processed first.
44    Replaying,
45
46    /// The input connectors are paused.
47    Paused,
48
49    /// The input connectors are running.
50    Running,
51
52    /// The pipeline finished checkpointing and pausing.
53    Suspended,
54}
55
56#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
57pub enum RuntimeDesiredStatus {
58    Unavailable,
59    Standby,
60    Paused,
61    Running,
62    Suspended,
63}
64
65impl RuntimeDesiredStatus {
66    pub fn may_transition_to(&self, target: Self) -> bool {
67        match (*self, target) {
68            (old, new) if old == new => true,
69            (Self::Standby, Self::Paused | Self::Running) => true,
70            (Self::Paused, Self::Running | Self::Suspended) => true,
71            (Self::Running, Self::Paused | Self::Suspended) => true,
72            _ => false,
73        }
74    }
75
76    pub fn may_transition_to_at_startup(&self, target: Self) -> bool {
77        match (*self, target) {
78            (Self::Suspended, _) => {
79                // A suspended pipeline must transition to "paused" or
80                // "running".
81                matches!(target, Self::Paused | Self::Running)
82            }
83            (old, new) if old.may_transition_to(new) => true,
84            _ => false,
85        }
86    }
87}
88
89impl From<String> for RuntimeDesiredStatus {
90    fn from(value: String) -> Self {
91        match value.as_str() {
92            "unavailable" => Self::Unavailable,
93            "standby" => Self::Standby,
94            "paused" => Self::Paused,
95            "running" => Self::Running,
96            "suspended" => Self::Suspended,
97            _ => panic!("Invalid runtime desired status: {value}"),
98        }
99    }
100}
101
102#[derive(Debug, Default, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
103#[serde(rename_all = "snake_case")]
104pub enum BootstrapPolicy {
105    Allow,
106    Reject,
107    #[default]
108    AwaitApproval,
109}
110
111impl TryFrom<Option<String>> for BootstrapPolicy {
112    type Error = ();
113
114    fn try_from(value: Option<String>) -> Result<Self, Self::Error> {
115        match value.as_deref() {
116            Some("allow") => Ok(Self::Allow),
117            Some("reject") => Ok(Self::Reject),
118            Some("await_approval") | None => Ok(Self::AwaitApproval),
119            _ => Err(()),
120        }
121    }
122}
123
124impl From<String> for BootstrapPolicy {
125    fn from(value: String) -> Self {
126        match value.as_str() {
127            "allow" => Self::Allow,
128            "reject" => Self::Reject,
129            "await_approval" => Self::AwaitApproval,
130            _ => panic!("Invalid 'bootstrap_policy' value: {value}"),
131        }
132    }
133}
134
135impl Display for BootstrapPolicy {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        let s = match self {
138            BootstrapPolicy::Allow => "allow",
139            BootstrapPolicy::Reject => "reject",
140            BootstrapPolicy::AwaitApproval => "await_approval",
141        };
142        write!(f, "{s}")
143    }
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
147pub struct ExtendedRuntimeStatus {
148    /// Runtime status of the pipeline.
149    pub runtime_status: RuntimeStatus,
150
151    /// Human-readable details about the runtime status. Its content can contain for instance an
152    /// explanation why it is in this status and any other additional information about it (e.g.,
153    /// progress).
154    pub runtime_status_details: serde_json::Value,
155
156    /// Runtime desired status of the pipeline.
157    pub runtime_desired_status: RuntimeDesiredStatus,
158}
159
160impl Responder for ExtendedRuntimeStatus {
161    type Body = BoxBody;
162
163    fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
164        HttpResponseBuilder::new(StatusCode::OK).json(self)
165    }
166}
167
168impl From<ExtendedRuntimeStatus> for HttpResponse<BoxBody> {
169    fn from(value: ExtendedRuntimeStatus) -> Self {
170        HttpResponseBuilder::new(StatusCode::OK).json(value)
171    }
172}
173
174/// Error returned by the pipeline `/status` endpoint.
175#[derive(Clone, Debug)]
176pub struct ExtendedRuntimeStatusError {
177    /// Status code. Returning anything except `503 Service Unavailable` will cause the runner to
178    /// forcefully stop the pipeline.
179    pub status_code: StatusCode,
180
181    /// Error response.
182    pub error: ErrorResponse,
183}
184
185impl Display for ExtendedRuntimeStatusError {
186    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187        write!(f, "{}: {:?}", self.status_code, self.error)
188    }
189}
190
191impl ResponseError for ExtendedRuntimeStatusError {
192    fn status_code(&self) -> StatusCode {
193        self.status_code
194    }
195
196    fn error_response(&self) -> HttpResponse<BoxBody> {
197        HttpResponseBuilder::new(self.status_code()).json(self.error.clone())
198    }
199}