1use bytemuck::NoUninit;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use std::collections::BTreeMap;
6use utoipa::ToSchema;
7use uuid::Uuid;
8
9use crate::{suspend::SuspendError, transaction::TransactionId};
10
11#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
13#[serde(rename_all = "PascalCase")]
14pub enum PipelineState {
15 #[default]
17 Paused,
18 Running,
20 Terminated,
22}
23
24#[derive(Default, Serialize, Deserialize, Debug, Clone)]
28pub struct InputEndpointErrorMetrics {
29 pub endpoint_name: String,
30 #[serde(default)]
31 pub num_transport_errors: u64,
32 #[serde(default)]
33 pub num_parse_errors: u64,
34}
35
36#[derive(Default, Serialize, Deserialize, Debug, Clone)]
40pub struct OutputEndpointErrorMetrics {
41 pub endpoint_name: String,
42 #[serde(default)]
43 pub num_encode_errors: u64,
44 #[serde(default)]
45 pub num_transport_errors: u64,
46}
47
48#[derive(Serialize, Deserialize, Debug, Clone)]
52pub struct EndpointErrorStats<T> {
53 #[serde(default)]
54 pub metrics: T,
55}
56
57#[derive(Deserialize, Serialize, ToSchema)]
59pub struct ShortEndpointConfig {
60 pub stream: String,
62}
63
64#[derive(Serialize, Deserialize, Debug, Clone)]
68pub struct PipelineStatsErrorsResponse {
69 #[serde(default)]
70 pub inputs: Vec<EndpointErrorStats<InputEndpointErrorMetrics>>,
71 #[serde(default)]
72 pub outputs: Vec<EndpointErrorStats<OutputEndpointErrorMetrics>>,
73}
74
75#[derive(
80 Debug, Default, Copy, PartialEq, Eq, Clone, NoUninit, Serialize, Deserialize, ToSchema,
81)]
82#[repr(u8)]
83pub enum TransactionStatus {
84 #[default]
85 NoTransaction,
86 TransactionInProgress,
87 CommitInProgress,
88}
89
90#[derive(Clone, Copy, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
92#[serde(rename_all = "PascalCase")]
93#[schema(as = TransactionPhase)]
94pub enum ExternalTransactionPhase {
95 Started,
97 Committed,
99}
100
101#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
103#[schema(as = ConnectorTransactionPhase)]
104pub struct ExternalConnectorTransactionPhase {
105 #[schema(value_type = TransactionPhase)]
107 pub phase: ExternalTransactionPhase,
108 pub label: Option<String>,
110}
111
112#[derive(Clone, Default, Debug, Deserialize, Serialize, ToSchema)]
114#[schema(as = TransactionInitiators)]
115pub struct ExternalTransactionInitiators {
116 #[schema(value_type = Option<i64>)]
118 pub transaction_id: Option<TransactionId>,
119 #[schema(value_type = Option<TransactionPhase>)]
121 pub initiated_by_api: Option<ExternalTransactionPhase>,
122 #[schema(value_type = BTreeMap<String, ConnectorTransactionPhase>)]
124 pub initiated_by_connectors: BTreeMap<String, ExternalConnectorTransactionPhase>,
125}
126
127#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
129#[schema(as = CompletedWatermark)]
130pub struct ExternalCompletedWatermark {
131 #[schema(value_type = Object)]
133 pub metadata: JsonValue,
134 pub ingested_at: String,
136 pub processed_at: String,
138 pub completed_at: String,
140}
141
142#[derive(Default, Deserialize, Serialize, ToSchema)]
144#[schema(as = InputEndpointMetrics)]
145pub struct ExternalInputEndpointMetrics {
146 pub total_bytes: u64,
148 pub total_records: u64,
150 pub buffered_records: u64,
152 pub buffered_bytes: u64,
154 pub num_transport_errors: u64,
156 pub num_parse_errors: u64,
158 pub end_of_input: bool,
160}
161
162#[derive(Serialize, Deserialize, ToSchema)]
164#[schema(as = InputEndpointStatus)]
165pub struct ExternalInputEndpointStatus {
166 pub endpoint_name: String,
168 pub config: ShortEndpointConfig,
170 #[schema(value_type = InputEndpointMetrics)]
172 pub metrics: ExternalInputEndpointMetrics,
173 pub fatal_error: Option<String>,
175 pub paused: bool,
177 pub barrier: bool,
179 #[schema(value_type = Option<CompletedWatermark>)]
181 pub completed_frontier: Option<ExternalCompletedWatermark>,
182}
183
184#[derive(Default, Deserialize, Serialize, ToSchema, PartialEq, Eq, PartialOrd, Ord)]
186#[schema(as = OutputEndpointMetrics)]
187pub struct ExternalOutputEndpointMetrics {
188 pub transmitted_records: u64,
190 pub transmitted_bytes: u64,
192 pub queued_records: u64,
194 pub queued_batches: u64,
196 pub buffered_records: u64,
198 pub buffered_batches: u64,
200 pub num_encode_errors: u64,
202 pub num_transport_errors: u64,
204 pub total_processed_input_records: u64,
206 pub memory: u64,
208}
209
210#[derive(Deserialize, Serialize, ToSchema)]
212#[schema(as = OutputEndpointStatus)]
213pub struct ExternalOutputEndpointStatus {
214 pub endpoint_name: String,
216 pub config: ShortEndpointConfig,
218 #[schema(value_type = OutputEndpointMetrics)]
220 pub metrics: ExternalOutputEndpointMetrics,
221 pub fatal_error: Option<String>,
223}
224
225#[derive(Default, Serialize, Deserialize, ToSchema)]
227#[schema(as = GlobalControllerMetrics)]
228pub struct ExternalGlobalControllerMetrics {
229 pub state: PipelineState,
231 pub bootstrap_in_progress: bool,
233 pub transaction_status: TransactionStatus,
235 #[schema(value_type = i64)]
237 pub transaction_id: TransactionId,
238 #[schema(value_type = TransactionInitiators)]
240 pub transaction_initiators: ExternalTransactionInitiators,
241 pub rss_bytes: u64,
243 pub cpu_msecs: u64,
245 pub uptime_msecs: u64,
247 #[serde(with = "chrono::serde::ts_seconds")]
249 #[schema(value_type = u64)]
250 pub start_time: DateTime<Utc>,
251 pub incarnation_uuid: Uuid,
253 #[serde(with = "chrono::serde::ts_seconds")]
255 #[schema(value_type = u64)]
256 pub initial_start_time: DateTime<Utc>,
257 pub storage_bytes: u64,
259 pub storage_mb_secs: u64,
261 pub runtime_elapsed_msecs: u64,
263 pub buffered_input_records: u64,
265 pub buffered_input_bytes: u64,
267 pub total_input_records: u64,
269 pub total_input_bytes: u64,
271 pub total_processed_records: u64,
273 pub total_processed_bytes: u64,
275 pub total_completed_records: u64,
277 pub pipeline_complete: bool,
279}
280
281#[derive(Deserialize, Serialize, ToSchema, Default)]
289#[schema(as = ControllerStatus)]
290pub struct ExternalControllerStatus {
291 #[schema(value_type = GlobalControllerMetrics)]
293 pub global_metrics: ExternalGlobalControllerMetrics,
294 pub suspend_error: Option<SuspendError>,
296 #[schema(value_type = Vec<InputEndpointStatus>)]
298 pub inputs: Vec<ExternalInputEndpointStatus>,
299 #[schema(value_type = Vec<OutputEndpointStatus>)]
301 pub outputs: Vec<ExternalOutputEndpointStatus>,
302}