1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize, Serializer};
3use serde_json::Value as JsonValue;
4use std::collections::BTreeMap;
5use utoipa::ToSchema;
6use uuid::Uuid;
7
8use crate::{
9 config::InputEndpointConfig, config::OutputEndpointConfig, suspend::SuspendError,
10 transaction::TransactionId,
11};
12
13#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, ToSchema)]
15#[serde(rename_all = "PascalCase")]
16pub enum PipelineState {
17 #[default]
19 Paused,
20 Running,
22 Terminated,
24}
25
26#[derive(Default, Serialize, Deserialize, Debug, Clone)]
30pub struct InputEndpointErrorMetrics {
31 pub endpoint_name: String,
32 #[serde(default)]
33 pub num_transport_errors: u64,
34 #[serde(default)]
35 pub num_parse_errors: u64,
36}
37
38#[derive(Default, Serialize, Deserialize, Debug, Clone)]
42pub struct OutputEndpointErrorMetrics {
43 pub endpoint_name: String,
44 #[serde(default)]
45 pub num_encode_errors: u64,
46 #[serde(default)]
47 pub num_transport_errors: u64,
48}
49
50#[derive(Serialize, Deserialize, Debug, Clone)]
54pub struct EndpointErrorStats<T> {
55 #[serde(default)]
56 pub metrics: T,
57}
58
59#[derive(Serialize)]
61struct StreamOnly<'a> {
62 stream: &'a str,
63}
64
65#[derive(ToSchema)]
67#[schema(as = ShortEndpointConfig)]
68pub struct ShortEndpointConfig {
69 #[allow(dead_code)]
71 stream: String,
72}
73
74pub fn serialize_input_endpoint_config<S>(
76 config: &InputEndpointConfig,
77 serializer: S,
78) -> Result<S::Ok, S::Error>
79where
80 S: Serializer,
81{
82 StreamOnly {
83 stream: &config.stream,
84 }
85 .serialize(serializer)
86}
87
88pub fn serialize_output_endpoint_config<S>(
90 config: &OutputEndpointConfig,
91 serializer: S,
92) -> Result<S::Ok, S::Error>
93where
94 S: Serializer,
95{
96 StreamOnly {
97 stream: &config.stream,
98 }
99 .serialize(serializer)
100}
101
102#[derive(Serialize, Deserialize, Debug, Clone)]
106pub struct PipelineStatsErrorsResponse {
107 #[serde(default)]
108 pub inputs: Vec<EndpointErrorStats<InputEndpointErrorMetrics>>,
109 #[serde(default)]
110 pub outputs: Vec<EndpointErrorStats<OutputEndpointErrorMetrics>>,
111}
112
113#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, ToSchema)]
118#[serde(rename_all = "PascalCase")]
119#[schema(as = TransactionPhase)]
120pub enum ExternalTransactionPhase {
121 Started,
123 Committed,
125}
126
127#[derive(Clone, PartialEq, Eq, Debug, Serialize, ToSchema)]
129#[schema(as = ConnectorTransactionPhase)]
130pub struct ExternalConnectorTransactionPhase {
131 #[schema(value_type = TransactionPhase)]
133 pub phase: ExternalTransactionPhase,
134 pub label: Option<String>,
136}
137
138#[derive(Clone, Default, Debug, Serialize, ToSchema)]
140#[schema(as = TransactionInitiators)]
141pub struct ExternalTransactionInitiators {
142 #[schema(value_type = Option<i64>)]
144 pub transaction_id: Option<TransactionId>,
145 #[schema(value_type = Option<TransactionPhase>)]
147 pub initiated_by_api: Option<ExternalTransactionPhase>,
148 #[schema(value_type = BTreeMap<String, ConnectorTransactionPhase>)]
150 pub initiated_by_connectors: BTreeMap<String, ExternalConnectorTransactionPhase>,
151}
152
153#[derive(Clone, Debug, Serialize, ToSchema)]
155#[schema(as = CompletedWatermark)]
156pub struct ExternalCompletedWatermark {
157 #[schema(value_type = Object)]
159 pub metadata: JsonValue,
160 pub ingested_at: String,
162 pub processed_at: String,
164 pub completed_at: String,
166}
167
168#[derive(Default, Serialize, ToSchema)]
170#[schema(as = InputEndpointMetrics)]
171pub struct ExternalInputEndpointMetrics {
172 pub total_bytes: u64,
174 pub total_records: u64,
176 pub buffered_records: u64,
178 pub buffered_bytes: u64,
180 pub num_transport_errors: u64,
182 pub num_parse_errors: u64,
184 pub end_of_input: bool,
186}
187
188#[derive(Serialize, ToSchema)]
190#[schema(as = InputEndpointStatus)]
191pub struct ExternalInputEndpointStatus {
192 pub endpoint_name: String,
194 #[serde(serialize_with = "serialize_input_endpoint_config")]
196 #[schema(value_type = ShortEndpointConfig)]
197 pub config: InputEndpointConfig,
198 #[schema(value_type = InputEndpointMetrics)]
200 pub metrics: ExternalInputEndpointMetrics,
201 pub fatal_error: Option<String>,
203 pub paused: bool,
205 pub barrier: bool,
207 #[schema(value_type = Option<CompletedWatermark>)]
209 pub completed_frontier: Option<ExternalCompletedWatermark>,
210}
211
212#[derive(Default, Serialize, ToSchema)]
214#[schema(as = OutputEndpointMetrics)]
215pub struct ExternalOutputEndpointMetrics {
216 pub transmitted_records: u64,
218 pub transmitted_bytes: u64,
220 pub queued_records: u64,
222 pub queued_batches: u64,
224 pub buffered_records: u64,
226 pub buffered_batches: u64,
228 pub num_encode_errors: u64,
230 pub num_transport_errors: u64,
232 pub total_processed_input_records: u64,
234 pub memory: u64,
236}
237
238#[derive(Serialize, ToSchema)]
240#[schema(as = OutputEndpointStatus)]
241pub struct ExternalOutputEndpointStatus {
242 pub endpoint_name: String,
244 #[serde(serialize_with = "serialize_output_endpoint_config")]
246 #[schema(value_type = ShortEndpointConfig)]
247 pub config: OutputEndpointConfig,
248 #[schema(value_type = OutputEndpointMetrics)]
250 pub metrics: ExternalOutputEndpointMetrics,
251 pub fatal_error: Option<String>,
253}
254
255#[derive(Default, Serialize, ToSchema)]
257#[schema(as = GlobalControllerMetrics)]
258pub struct ExternalGlobalControllerMetrics {
259 pub state: PipelineState,
261 pub bootstrap_in_progress: bool,
263 pub transaction_status: String,
265 #[schema(value_type = i64)]
267 pub transaction_id: TransactionId,
268 #[schema(value_type = TransactionInitiators)]
270 pub transaction_initiators: ExternalTransactionInitiators,
271 pub rss_bytes: u64,
273 pub cpu_msecs: u64,
275 pub uptime_msecs: u64,
277 #[serde(with = "chrono::serde::ts_seconds")]
279 #[schema(value_type = u64)]
280 pub start_time: DateTime<Utc>,
281 pub incarnation_uuid: Uuid,
283 #[serde(with = "chrono::serde::ts_seconds")]
285 #[schema(value_type = u64)]
286 pub initial_start_time: DateTime<Utc>,
287 pub storage_bytes: u64,
289 pub storage_mb_secs: u64,
291 pub runtime_elapsed_msecs: u64,
293 pub buffered_input_records: u64,
295 pub buffered_input_bytes: u64,
297 pub total_input_records: u64,
299 pub total_input_bytes: u64,
301 pub total_processed_records: u64,
303 pub total_processed_bytes: u64,
305 pub total_completed_records: u64,
307 pub pipeline_complete: bool,
309}
310
311#[derive(Serialize, ToSchema)]
319#[schema(as = ControllerStatus)]
320pub struct ExternalControllerStatus {
321 #[schema(value_type = GlobalControllerMetrics)]
323 pub global_metrics: ExternalGlobalControllerMetrics,
324 pub suspend_error: Option<SuspendError>,
326 #[schema(value_type = Vec<InputEndpointStatus>)]
328 pub inputs: Vec<ExternalInputEndpointStatus>,
329 #[schema(value_type = Vec<OutputEndpointStatus>)]
331 pub outputs: Vec<ExternalOutputEndpointStatus>,
332}