Skip to main content

feldera_types/
adapter_stats.rs

1use bytemuck::NoUninit;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use std::collections::BTreeMap;
6use utoipa::ToSchema;
7use uuid::Uuid;
8
9use crate::{suspend::SuspendError, transaction::TransactionId};
10
11/// Pipeline state.
12#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
13#[serde(rename_all = "PascalCase")]
14pub enum PipelineState {
15    /// All input endpoints are paused (or are in the process of being paused).
16    #[default]
17    Paused,
18    /// Controller is running.
19    Running,
20    /// Controller is being terminated.
21    Terminated,
22}
23
24// Metrics for an input endpoint.
25///
26/// Serializes to match the subset of fields needed for error tracking.
27#[derive(Default, Serialize, Deserialize, Debug, Clone)]
28pub struct InputEndpointErrorMetrics {
29    pub endpoint_name: String,
30    #[serde(default)]
31    pub num_transport_errors: u64,
32    #[serde(default)]
33    pub num_parse_errors: u64,
34}
35
36/// Metrics for an output endpoint.
37///
38/// Serializes to match the subset of fields needed for error tracking.
39#[derive(Default, Serialize, Deserialize, Debug, Clone)]
40pub struct OutputEndpointErrorMetrics {
41    pub endpoint_name: String,
42    #[serde(default)]
43    pub num_encode_errors: u64,
44    #[serde(default)]
45    pub num_transport_errors: u64,
46}
47
48/// Endpoint statistics containing metrics.
49///
50/// Wraps metrics in a structure that matches the full stats API shape.
51#[derive(Serialize, Deserialize, Debug, Clone)]
52pub struct EndpointErrorStats<T> {
53    #[serde(default)]
54    pub metrics: T,
55}
56
57/// Schema definition for endpoint config that only includes the stream field.
58#[derive(Deserialize, Serialize, ToSchema)]
59pub struct ShortEndpointConfig {
60    /// The name of the stream.
61    pub stream: String,
62}
63
64/// Pipeline error statistics response from the runtime.
65///
66/// Lightweight response containing only error counts from all endpoints.
67#[derive(Serialize, Deserialize, Debug, Clone)]
68pub struct PipelineStatsErrorsResponse {
69    #[serde(default)]
70    pub inputs: Vec<EndpointErrorStats<InputEndpointErrorMetrics>>,
71    #[serde(default)]
72    pub outputs: Vec<EndpointErrorStats<OutputEndpointErrorMetrics>>,
73}
74
75// OpenAPI schema definitions for controller statistics
76// These match the serialized JSON structure from the adapters crate
77
78/// Transaction status summarized as a single value.
79#[derive(
80    Debug, Default, Copy, PartialEq, Eq, Clone, NoUninit, Serialize, Deserialize, ToSchema,
81)]
82#[repr(u8)]
83pub enum TransactionStatus {
84    #[default]
85    NoTransaction,
86    TransactionInProgress,
87    CommitInProgress,
88}
89
90/// Transaction phase.
91#[derive(Clone, Copy, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
92#[serde(rename_all = "PascalCase")]
93#[schema(as = TransactionPhase)]
94pub enum ExternalTransactionPhase {
95    /// Transaction is in progress.
96    Started,
97    /// Transaction has been committed.
98    Committed,
99}
100
101/// Connector transaction phase with debugging label.
102#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
103#[schema(as = ConnectorTransactionPhase)]
104pub struct ExternalConnectorTransactionPhase {
105    /// Current phase of the transaction.
106    #[schema(value_type = TransactionPhase)]
107    pub phase: ExternalTransactionPhase,
108    /// Optional label for debugging.
109    pub label: Option<String>,
110}
111
112/// Information about entities that initiated the current transaction.
113#[derive(Clone, Default, Debug, Deserialize, Serialize, ToSchema)]
114#[schema(as = TransactionInitiators)]
115pub struct ExternalTransactionInitiators {
116    /// ID assigned to the transaction (None if no transaction is in progress).
117    #[schema(value_type = Option<i64>)]
118    pub transaction_id: Option<TransactionId>,
119    /// Transaction phase initiated by the API.
120    #[schema(value_type = Option<TransactionPhase>)]
121    pub initiated_by_api: Option<ExternalTransactionPhase>,
122    /// Transaction phases initiated by connectors, indexed by endpoint name.
123    #[schema(value_type = BTreeMap<String, ConnectorTransactionPhase>)]
124    pub initiated_by_connectors: BTreeMap<String, ExternalConnectorTransactionPhase>,
125}
126
127/// A watermark that has been fully processed by the pipeline.
128#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
129#[schema(as = CompletedWatermark)]
130pub struct ExternalCompletedWatermark {
131    /// Metadata that describes the position in the input stream (e.g., Kafka partition/offset pairs).
132    #[schema(value_type = Object)]
133    pub metadata: JsonValue,
134    /// Timestamp when the data was ingested from the wire.
135    pub ingested_at: String,
136    /// Timestamp when the data was processed by the circuit.
137    pub processed_at: String,
138    /// Timestamp when all outputs produced from this input have been pushed to all output endpoints.
139    pub completed_at: String,
140}
141
142/// Performance metrics for an input endpoint.
143#[derive(Default, Deserialize, Serialize, ToSchema)]
144#[schema(as = InputEndpointMetrics)]
145pub struct ExternalInputEndpointMetrics {
146    /// Total bytes pushed to the endpoint since it was created.
147    pub total_bytes: u64,
148    /// Total records pushed to the endpoint since it was created.
149    pub total_records: u64,
150    /// Number of records currently buffered by the endpoint (not yet consumed by the circuit).
151    pub buffered_records: u64,
152    /// Number of bytes currently buffered by the endpoint (not yet consumed by the circuit).
153    pub buffered_bytes: u64,
154    /// Number of transport errors.
155    pub num_transport_errors: u64,
156    /// Number of parse errors.
157    pub num_parse_errors: u64,
158    /// True if end-of-input has been signaled.
159    pub end_of_input: bool,
160}
161
162/// Input endpoint status information.
163#[derive(Serialize, Deserialize, ToSchema)]
164#[schema(as = InputEndpointStatus)]
165pub struct ExternalInputEndpointStatus {
166    /// Endpoint name.
167    pub endpoint_name: String,
168    /// Endpoint configuration.
169    pub config: ShortEndpointConfig,
170    /// Performance metrics.
171    #[schema(value_type = InputEndpointMetrics)]
172    pub metrics: ExternalInputEndpointMetrics,
173    /// The first fatal error that occurred at the endpoint.
174    pub fatal_error: Option<String>,
175    /// Endpoint has been paused by the user.
176    pub paused: bool,
177    /// Endpoint is currently a barrier to checkpointing and suspend.
178    pub barrier: bool,
179    /// The latest completed watermark.
180    #[schema(value_type = Option<CompletedWatermark>)]
181    pub completed_frontier: Option<ExternalCompletedWatermark>,
182}
183
184/// Performance metrics for an output endpoint.
185#[derive(Default, Deserialize, Serialize, ToSchema, PartialEq, Eq, PartialOrd, Ord)]
186#[schema(as = OutputEndpointMetrics)]
187pub struct ExternalOutputEndpointMetrics {
188    /// Records sent on the underlying transport.
189    pub transmitted_records: u64,
190    /// Bytes sent on the underlying transport.
191    pub transmitted_bytes: u64,
192    /// Number of queued records.
193    pub queued_records: u64,
194    /// Number of queued batches.
195    pub queued_batches: u64,
196    /// Number of records pushed to the output buffer.
197    pub buffered_records: u64,
198    /// Number of batches in the buffer.
199    pub buffered_batches: u64,
200    /// Number of encoding errors.
201    pub num_encode_errors: u64,
202    /// Number of transport errors.
203    pub num_transport_errors: u64,
204    /// The number of input records processed by the circuit.
205    pub total_processed_input_records: u64,
206    /// Extra memory in use beyond that used for queuing records.
207    pub memory: u64,
208}
209
210/// Output endpoint status information.
211#[derive(Deserialize, Serialize, ToSchema)]
212#[schema(as = OutputEndpointStatus)]
213pub struct ExternalOutputEndpointStatus {
214    /// Endpoint name.
215    pub endpoint_name: String,
216    /// Endpoint configuration.
217    pub config: ShortEndpointConfig,
218    /// Performance metrics.
219    #[schema(value_type = OutputEndpointMetrics)]
220    pub metrics: ExternalOutputEndpointMetrics,
221    /// The first fatal error that occurred at the endpoint.
222    pub fatal_error: Option<String>,
223}
224
225/// Global controller metrics.
226#[derive(Default, Serialize, Deserialize, ToSchema)]
227#[schema(as = GlobalControllerMetrics)]
228pub struct ExternalGlobalControllerMetrics {
229    /// State of the pipeline: running, paused, or terminating.
230    pub state: PipelineState,
231    /// The pipeline has been resumed from a checkpoint and is currently bootstrapping new and modified views.
232    pub bootstrap_in_progress: bool,
233    /// Status of the current transaction.
234    pub transaction_status: TransactionStatus,
235    /// ID of the current transaction or 0 if no transaction is in progress.
236    #[schema(value_type = i64)]
237    pub transaction_id: TransactionId,
238    /// Entities that initiated the current transaction.
239    #[schema(value_type = TransactionInitiators)]
240    pub transaction_initiators: ExternalTransactionInitiators,
241    /// Resident set size of the pipeline process, in bytes.
242    pub rss_bytes: u64,
243    /// CPU time used by the pipeline across all threads, in milliseconds.
244    pub cpu_msecs: u64,
245    /// Time since the pipeline process started, in milliseconds.
246    pub uptime_msecs: u64,
247    /// Time at which the pipeline process started, in seconds since the epoch.
248    #[serde(with = "chrono::serde::ts_seconds")]
249    #[schema(value_type = u64)]
250    pub start_time: DateTime<Utc>,
251    /// Uniquely identifies the pipeline process that started at start_time.
252    pub incarnation_uuid: Uuid,
253    /// Time at which the pipeline process from which we resumed started, in seconds since the epoch.
254    #[serde(with = "chrono::serde::ts_seconds")]
255    #[schema(value_type = u64)]
256    pub initial_start_time: DateTime<Utc>,
257    /// Current storage usage in bytes.
258    pub storage_bytes: u64,
259    /// Storage usage integrated over time, in megabytes * seconds.
260    pub storage_mb_secs: u64,
261    /// Time elapsed while the pipeline is executing a step, multiplied by the number of threads, in milliseconds.
262    pub runtime_elapsed_msecs: u64,
263    /// Total number of records currently buffered by all endpoints.
264    pub buffered_input_records: u64,
265    /// Total number of bytes currently buffered by all endpoints.
266    pub buffered_input_bytes: u64,
267    /// Total number of records received from all endpoints.
268    pub total_input_records: u64,
269    /// Total number of bytes received from all endpoints.
270    pub total_input_bytes: u64,
271    /// Total number of input records processed by the DBSP engine.
272    pub total_processed_records: u64,
273    /// Total bytes of input records processed by the DBSP engine.
274    pub total_processed_bytes: u64,
275    /// Total number of input records processed to completion.
276    pub total_completed_records: u64,
277    /// True if the pipeline has processed all input data to completion.
278    pub pipeline_complete: bool,
279}
280
281/// Complete pipeline statistics returned by the `/stats` endpoint.
282///
283/// This schema definition matches the serialized JSON structure from
284/// `adapters::controller::ControllerStatus`. The actual implementation with
285/// atomics and mutexes lives in the adapters crate, which uses ExternalControllerStatus to
286/// register this OpenAPI schema, making it available to pipeline-manager
287/// without requiring a direct dependency on the adapters crate.
288#[derive(Deserialize, Serialize, ToSchema, Default)]
289#[schema(as = ControllerStatus)]
290pub struct ExternalControllerStatus {
291    /// Global controller metrics.
292    #[schema(value_type = GlobalControllerMetrics)]
293    pub global_metrics: ExternalGlobalControllerMetrics,
294    /// Reason why the pipeline cannot be suspended or checkpointed (if any).
295    pub suspend_error: Option<SuspendError>,
296    /// Input endpoint configs and metrics.
297    #[schema(value_type = Vec<InputEndpointStatus>)]
298    pub inputs: Vec<ExternalInputEndpointStatus>,
299    /// Output endpoint configs and metrics.
300    #[schema(value_type = Vec<OutputEndpointStatus>)]
301    pub outputs: Vec<ExternalOutputEndpointStatus>,
302}