use bytemuck::NoUninit;
use chrono::{DateTime, SecondsFormat, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::collections::BTreeMap;
use utoipa::ToSchema;
use uuid::Uuid;
use crate::{
coordination::Step,
memory_pressure::MemoryPressure,
suspend::SuspendError,
transaction::{CommitProgressSummary, TransactionId},
};
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "PascalCase")]
pub enum PipelineState {
#[default]
Paused,
Running,
Terminated,
}
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
pub struct InputEndpointErrorMetrics {
pub endpoint_name: String,
#[serde(default)]
pub num_transport_errors: u64,
#[serde(default)]
pub num_parse_errors: u64,
}
#[derive(Default, Serialize, Deserialize, Debug, Clone)]
pub struct OutputEndpointErrorMetrics {
pub endpoint_name: String,
#[serde(default)]
pub num_encode_errors: u64,
#[serde(default)]
pub num_transport_errors: u64,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EndpointErrorStats<T> {
#[serde(default)]
pub metrics: T,
}
#[derive(Debug, Deserialize, Serialize, ToSchema)]
pub struct ShortEndpointConfig {
pub stream: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PipelineStatsErrorsResponse {
#[serde(default)]
pub inputs: Vec<EndpointErrorStats<InputEndpointErrorMetrics>>,
#[serde(default)]
pub outputs: Vec<EndpointErrorStats<OutputEndpointErrorMetrics>>,
}
#[derive(
Debug, Default, Copy, PartialEq, Eq, Clone, NoUninit, Serialize, Deserialize, ToSchema,
)]
#[repr(u8)]
pub enum TransactionStatus {
#[default]
NoTransaction,
TransactionInProgress,
CommitInProgress,
}
#[derive(Clone, Copy, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
#[serde(rename_all = "PascalCase")]
#[schema(as = TransactionPhase)]
pub enum ExternalTransactionPhase {
Started,
Committed,
}
#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
#[schema(as = ConnectorTransactionPhase)]
pub struct ExternalConnectorTransactionPhase {
#[schema(value_type = TransactionPhase)]
pub phase: ExternalTransactionPhase,
pub label: Option<String>,
}
#[derive(Clone, Default, Debug, Deserialize, Serialize, ToSchema)]
#[schema(as = TransactionInitiators)]
pub struct ExternalTransactionInitiators {
#[schema(value_type = Option<i64>)]
pub transaction_id: Option<TransactionId>,
#[schema(value_type = Option<TransactionPhase>)]
pub initiated_by_api: Option<ExternalTransactionPhase>,
#[schema(value_type = BTreeMap<String, ConnectorTransactionPhase>)]
pub initiated_by_connectors: BTreeMap<String, ExternalConnectorTransactionPhase>,
}
#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
pub struct CompletedWatermark {
#[schema(value_type = Object)]
pub metadata: JsonValue,
#[serde(serialize_with = "serialize_timestamp_micros")]
pub ingested_at: DateTime<Utc>,
#[serde(serialize_with = "serialize_timestamp_micros")]
pub processed_at: DateTime<Utc>,
#[serde(serialize_with = "serialize_timestamp_micros")]
pub completed_at: DateTime<Utc>,
}
#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
pub enum ConnectorHealthStatus {
#[default]
Healthy,
Unhealthy,
}
#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
pub struct ConnectorHealth {
pub status: ConnectorHealthStatus,
pub description: Option<String>,
}
impl ConnectorHealth {
pub fn healthy() -> Self {
Self {
status: ConnectorHealthStatus::Healthy,
description: None,
}
}
pub fn unhealthy(description: &str) -> Self {
Self {
status: ConnectorHealthStatus::Unhealthy,
description: Some(description.to_string()),
}
}
}
#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
pub struct ConnectorError {
#[serde(serialize_with = "serialize_timestamp_micros")]
pub timestamp: DateTime<Utc>,
pub index: u64,
pub tag: Option<String>,
pub message: String,
}
#[derive(Debug, Default, Deserialize, Serialize, ToSchema)]
#[schema(as = InputEndpointMetrics)]
pub struct ExternalInputEndpointMetrics {
pub total_bytes: u64,
pub total_records: u64,
pub buffered_records: u64,
pub buffered_bytes: u64,
pub num_transport_errors: u64,
pub num_parse_errors: u64,
pub end_of_input: bool,
}
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[schema(as = InputEndpointStatus)]
pub struct ExternalInputEndpointStatus {
pub endpoint_name: String,
pub config: ShortEndpointConfig,
#[schema(value_type = InputEndpointMetrics)]
pub metrics: ExternalInputEndpointMetrics,
pub fatal_error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub parse_errors: Option<Vec<ConnectorError>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub transport_errors: Option<Vec<ConnectorError>>,
#[serde(default)]
pub health: Option<ConnectorHealth>,
pub paused: bool,
pub barrier: bool,
#[schema(value_type = Option<CompletedWatermark>)]
pub completed_frontier: Option<CompletedWatermark>,
}
#[derive(Debug, Default, Deserialize, Serialize, ToSchema, PartialEq, Eq, PartialOrd, Ord)]
#[schema(as = OutputEndpointMetrics)]
pub struct ExternalOutputEndpointMetrics {
pub transmitted_records: u64,
pub transmitted_bytes: u64,
pub queued_records: u64,
pub queued_batches: u64,
pub buffered_records: u64,
pub buffered_batches: u64,
pub num_encode_errors: u64,
pub num_transport_errors: u64,
pub total_processed_input_records: u64,
#[schema(value_type = u64)]
pub total_processed_steps: Step,
pub memory: u64,
}
#[derive(Debug, Deserialize, Serialize, ToSchema)]
#[schema(as = OutputEndpointStatus)]
pub struct ExternalOutputEndpointStatus {
pub endpoint_name: String,
pub config: ShortEndpointConfig,
#[schema(value_type = OutputEndpointMetrics)]
pub metrics: ExternalOutputEndpointMetrics,
pub fatal_error: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub encode_errors: Option<Vec<ConnectorError>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub transport_errors: Option<Vec<ConnectorError>>,
#[serde(default)]
pub health: Option<ConnectorHealth>,
}
#[derive(Debug, Default, Serialize, Deserialize, ToSchema)]
#[schema(as = GlobalControllerMetrics)]
pub struct ExternalGlobalControllerMetrics {
pub state: PipelineState,
pub bootstrap_in_progress: bool,
pub transaction_status: TransactionStatus,
#[schema(value_type = i64)]
pub transaction_id: TransactionId,
pub transaction_msecs: Option<u64>,
pub transaction_records: Option<u64>,
pub commit_progress: Option<CommitProgressSummary>,
#[schema(value_type = TransactionInitiators)]
pub transaction_initiators: ExternalTransactionInitiators,
pub rss_bytes: u64,
pub memory_pressure: MemoryPressure,
pub memory_pressure_epoch: u64,
pub cpu_msecs: u64,
pub uptime_msecs: u64,
#[serde(with = "chrono::serde::ts_seconds")]
#[schema(value_type = u64)]
pub start_time: DateTime<Utc>,
pub incarnation_uuid: Uuid,
#[serde(with = "chrono::serde::ts_seconds")]
#[schema(value_type = u64)]
pub initial_start_time: DateTime<Utc>,
pub storage_bytes: u64,
pub storage_mb_secs: u64,
pub runtime_elapsed_msecs: u64,
pub buffered_input_records: u64,
pub buffered_input_bytes: u64,
pub total_input_records: u64,
pub total_input_bytes: u64,
pub total_processed_records: u64,
pub total_processed_bytes: u64,
pub total_completed_records: u64,
pub output_stall_msecs: u64,
#[schema(value_type = u64)]
pub total_initiated_steps: Step,
#[schema(value_type = u64)]
pub total_completed_steps: Step,
pub pipeline_complete: bool,
}
#[derive(Debug, Deserialize, Serialize, ToSchema, Default)]
#[schema(as = ControllerStatus)]
pub struct ExternalControllerStatus {
#[schema(value_type = GlobalControllerMetrics)]
pub global_metrics: ExternalGlobalControllerMetrics,
pub suspend_error: Option<SuspendError>,
#[schema(value_type = Vec<InputEndpointStatus>)]
pub inputs: Vec<ExternalInputEndpointStatus>,
#[schema(value_type = Vec<OutputEndpointStatus>)]
pub outputs: Vec<ExternalOutputEndpointStatus>,
}
fn serialize_timestamp_micros<S>(
timestamp: &DateTime<Utc>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(×tamp.to_rfc3339_opts(SecondsFormat::Micros, true))
}
#[cfg(test)]
mod tests {
use super::ConnectorError;
use chrono::{DateTime, Utc};
#[test]
fn connector_error_timestamp_serializes_with_microsecond_precision() {
let error = ConnectorError {
timestamp: DateTime::parse_from_rfc3339("2026-03-08T05:26:42.442438448Z")
.unwrap()
.with_timezone(&Utc),
index: 1,
tag: None,
message: "boom".to_string(),
};
let json = serde_json::to_string(&error).unwrap();
assert!(json.contains(r#""timestamp":"2026-03-08T05:26:42.442438Z""#));
}
}