feldera_types/
runtime_status.rs

1use crate::error::ErrorResponse;
2use actix_web::body::BoxBody;
3use actix_web::http::StatusCode;
4use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder, Responder, ResponseError};
5use serde::{Deserialize, Serialize};
6use std::fmt;
7use std::fmt::Display;
8use utoipa::ToSchema;
9
10/// Runtime status of the pipeline.
11///
12/// Of the statuses, only `Unavailable` is determined by the runner. All other statuses are
13/// determined by the pipeline and taken over by the runner.
14#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
15pub enum RuntimeStatus {
16    /// The runner was unable to determine the pipeline runtime status. This status is never
17    /// returned by the pipeline endpoint itself, but only determined by the runner.
18    ///
19    /// It can notably occur in two scenarios:
20    /// 1. The runner is unable to (in time) receive a response for its sent request to the
21    ///    pipeline `/status` endpoint, or it is unable to parse the response.
22    /// 2. The runner received back a `503 Service Unavailable` as a response to the request.
23    ///    This can occur for example if the pipeline is unable to acquire a lock necessary to
24    ///    determine whether it is in any of the other runtime statuses.
25    Unavailable,
26
27    /// The pipeline is constantly pulling the latest checkpoint from S3 but not processing any inputs.
28    Standby,
29
30    /// The input and output connectors are establishing connections to their data sources and sinks
31    /// respectively.
32    Initializing,
33
34    /// The pipeline was modified since the last time it was started, and as such it is currently
35    /// computing modified views.
36    Bootstrapping,
37
38    /// Input records that were stored in the journal but were not yet processed, are being
39    /// processed first.
40    Replaying,
41
42    /// The input connectors are paused.
43    Paused,
44
45    /// The input connectors are running.
46    Running,
47
48    /// The pipeline finished checkpointing and pausing.
49    Suspended,
50}
51
52#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, ToSchema)]
53pub enum RuntimeDesiredStatus {
54    Unavailable,
55    Standby,
56    Paused,
57    Running,
58    Suspended,
59}
60
61impl RuntimeDesiredStatus {
62    pub fn may_transition_to(&self, target: Self) -> bool {
63        match (*self, target) {
64            (old, new) if old == new => true,
65            (Self::Standby, Self::Paused | Self::Running) => true,
66            (Self::Paused, Self::Running | Self::Suspended) => true,
67            (Self::Running, Self::Paused | Self::Suspended) => true,
68            _ => false,
69        }
70    }
71
72    pub fn may_transition_to_at_startup(&self, target: Self) -> bool {
73        match (*self, target) {
74            (Self::Suspended, _) => {
75                // A suspended pipeline must transition to "paused" or
76                // "running".
77                matches!(target, Self::Paused | Self::Running)
78            }
79            (old, new) if old.may_transition_to(new) => true,
80            _ => false,
81        }
82    }
83}
84
85impl From<String> for RuntimeDesiredStatus {
86    fn from(value: String) -> Self {
87        match value.as_str() {
88            "unavailable" => Self::Unavailable,
89            "standby" => Self::Standby,
90            "paused" => Self::Paused,
91            "running" => Self::Running,
92            "suspended" => Self::Suspended,
93            _ => panic!("Invalid runtime desired status: {value}"),
94        }
95    }
96}
97
98#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
99pub struct ExtendedRuntimeStatus {
100    /// Runtime status of the pipeline.
101    pub runtime_status: RuntimeStatus,
102
103    /// Human-readable details about the runtime status. Its content can contain for instance an
104    /// explanation why it is in this status and any other additional information about it (e.g.,
105    /// progress).
106    pub runtime_status_details: String,
107
108    /// Runtime desired status of the pipeline.
109    pub runtime_desired_status: RuntimeDesiredStatus,
110}
111
112impl Responder for ExtendedRuntimeStatus {
113    type Body = BoxBody;
114
115    fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
116        HttpResponseBuilder::new(StatusCode::OK).json(self)
117    }
118}
119
120impl From<ExtendedRuntimeStatus> for HttpResponse<BoxBody> {
121    fn from(value: ExtendedRuntimeStatus) -> Self {
122        HttpResponseBuilder::new(StatusCode::OK).json(value)
123    }
124}
125
126/// Error returned by the pipeline `/status` endpoint.
127#[derive(Clone, Debug)]
128pub struct ExtendedRuntimeStatusError {
129    /// Status code. Returning anything except `503 Service Unavailable` will cause the runner to
130    /// forcefully stop the pipeline.
131    pub status_code: StatusCode,
132
133    /// Error response.
134    pub error: ErrorResponse,
135}
136
137impl Display for ExtendedRuntimeStatusError {
138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139        write!(f, "{}: {:?}", self.status_code, self.error)
140    }
141}
142
143impl ResponseError for ExtendedRuntimeStatusError {
144    fn status_code(&self) -> StatusCode {
145        self.status_code
146    }
147
148    fn error_response(&self) -> HttpResponse<BoxBody> {
149        HttpResponseBuilder::new(self.status_code()).json(self.error.clone())
150    }
151}