feldera_types/adapter_stats.rs
1use bytemuck::NoUninit;
2use chrono::{DateTime, SecondsFormat, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use std::collections::BTreeMap;
6use utoipa::ToSchema;
7use uuid::Uuid;
8
9use crate::{
10 coordination::Step,
11 suspend::SuspendError,
12 transaction::{CommitProgressSummary, TransactionId},
13};
14
15/// Pipeline state.
16#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
17#[serde(rename_all = "PascalCase")]
18pub enum PipelineState {
19 /// All input endpoints are paused (or are in the process of being paused).
20 #[default]
21 Paused,
22 /// Controller is running.
23 Running,
24 /// Controller is being terminated.
25 Terminated,
26}
27
28// Metrics for an input endpoint.
29///
30/// Serializes to match the subset of fields needed for error tracking.
31#[derive(Default, Serialize, Deserialize, Debug, Clone)]
32pub struct InputEndpointErrorMetrics {
33 pub endpoint_name: String,
34 #[serde(default)]
35 pub num_transport_errors: u64,
36 #[serde(default)]
37 pub num_parse_errors: u64,
38}
39
40/// Metrics for an output endpoint.
41///
42/// Serializes to match the subset of fields needed for error tracking.
43#[derive(Default, Serialize, Deserialize, Debug, Clone)]
44pub struct OutputEndpointErrorMetrics {
45 pub endpoint_name: String,
46 #[serde(default)]
47 pub num_encode_errors: u64,
48 #[serde(default)]
49 pub num_transport_errors: u64,
50}
51
52/// Endpoint statistics containing metrics.
53///
54/// Wraps metrics in a structure that matches the full stats API shape.
55#[derive(Serialize, Deserialize, Debug, Clone)]
56pub struct EndpointErrorStats<T> {
57 #[serde(default)]
58 pub metrics: T,
59}
60
61/// Schema definition for endpoint config that only includes the stream field.
62#[derive(Debug, Deserialize, Serialize, ToSchema)]
63pub struct ShortEndpointConfig {
64 /// The name of the stream.
65 pub stream: String,
66}
67
68/// Pipeline error statistics response from the runtime.
69///
70/// Lightweight response containing only error counts from all endpoints.
71#[derive(Serialize, Deserialize, Debug, Clone)]
72pub struct PipelineStatsErrorsResponse {
73 #[serde(default)]
74 pub inputs: Vec<EndpointErrorStats<InputEndpointErrorMetrics>>,
75 #[serde(default)]
76 pub outputs: Vec<EndpointErrorStats<OutputEndpointErrorMetrics>>,
77}
78
79// OpenAPI schema definitions for controller statistics
80// These match the serialized JSON structure from the adapters crate
81
82/// Transaction status summarized as a single value.
83#[derive(
84 Debug, Default, Copy, PartialEq, Eq, Clone, NoUninit, Serialize, Deserialize, ToSchema,
85)]
86#[repr(u8)]
87pub enum TransactionStatus {
88 #[default]
89 NoTransaction,
90 TransactionInProgress,
91 CommitInProgress,
92}
93
94/// Transaction phase.
95#[derive(Clone, Copy, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
96#[serde(rename_all = "PascalCase")]
97#[schema(as = TransactionPhase)]
98pub enum ExternalTransactionPhase {
99 /// Transaction is in progress.
100 Started,
101 /// Transaction has been committed.
102 Committed,
103}
104
105/// Connector transaction phase with debugging label.
106#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
107#[schema(as = ConnectorTransactionPhase)]
108pub struct ExternalConnectorTransactionPhase {
109 /// Current phase of the transaction.
110 #[schema(value_type = TransactionPhase)]
111 pub phase: ExternalTransactionPhase,
112 /// Optional label for debugging.
113 pub label: Option<String>,
114}
115
116/// Information about entities that initiated the current transaction.
117#[derive(Clone, Default, Debug, Deserialize, Serialize, ToSchema)]
118#[schema(as = TransactionInitiators)]
119pub struct ExternalTransactionInitiators {
120 /// ID assigned to the transaction (None if no transaction is in progress).
121 #[schema(value_type = Option<i64>)]
122 pub transaction_id: Option<TransactionId>,
123 /// Transaction phase initiated by the API.
124 #[schema(value_type = Option<TransactionPhase>)]
125 pub initiated_by_api: Option<ExternalTransactionPhase>,
126 /// Transaction phases initiated by connectors, indexed by endpoint name.
127 #[schema(value_type = BTreeMap<String, ConnectorTransactionPhase>)]
128 pub initiated_by_connectors: BTreeMap<String, ExternalConnectorTransactionPhase>,
129}
130
131/// A watermark that has been fully processed by the pipeline.
132#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
133pub struct CompletedWatermark {
134 /// Metadata that describes the position in the input stream (e.g., Kafka partition/offset pairs).
135 #[schema(value_type = Object)]
136 pub metadata: JsonValue,
137 /// Timestamp when the data was ingested from the wire.
138 #[serde(serialize_with = "serialize_timestamp_micros")]
139 pub ingested_at: DateTime<Utc>,
140 /// Timestamp when the data was processed by the circuit.
141 #[serde(serialize_with = "serialize_timestamp_micros")]
142 pub processed_at: DateTime<Utc>,
143 /// Timestamp when all outputs produced from this input have been pushed to all output endpoints.
144 #[serde(serialize_with = "serialize_timestamp_micros")]
145 pub completed_at: DateTime<Utc>,
146}
147
148#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
149pub struct ConnectorError {
150 /// Timestamp when the error occurred, serialized as RFC3339 with microseconds.
151 #[serde(serialize_with = "serialize_timestamp_micros")]
152 pub timestamp: DateTime<Utc>,
153
154 /// Sequence number of the error.
155 ///
156 /// The client can use this field to detect gaps in the error list reported
157 /// by the pipeline. When the connector reports a large number of errors, the
158 /// pipeline will only preserve and report the most recent errors of each kind.
159 pub index: u64,
160
161 /// Optional tag for the error.
162 ///
163 /// The tag is used to group errors by their type.
164 pub tag: Option<String>,
165
166 /// Error message.
167 pub message: String,
168}
169
170/// Performance metrics for an input endpoint.
171#[derive(Debug, Default, Deserialize, Serialize, ToSchema)]
172#[schema(as = InputEndpointMetrics)]
173pub struct ExternalInputEndpointMetrics {
174 /// Total bytes pushed to the endpoint since it was created.
175 pub total_bytes: u64,
176 /// Total records pushed to the endpoint since it was created.
177 pub total_records: u64,
178 /// Number of records currently buffered by the endpoint (not yet consumed by the circuit).
179 pub buffered_records: u64,
180 /// Number of bytes currently buffered by the endpoint (not yet consumed by the circuit).
181 pub buffered_bytes: u64,
182 /// Number of transport errors.
183 pub num_transport_errors: u64,
184 /// Number of parse errors.
185 pub num_parse_errors: u64,
186 /// True if end-of-input has been signaled.
187 pub end_of_input: bool,
188}
189
190/// Input endpoint status information.
191#[derive(Debug, Serialize, Deserialize, ToSchema)]
192#[schema(as = InputEndpointStatus)]
193pub struct ExternalInputEndpointStatus {
194 /// Endpoint name.
195 pub endpoint_name: String,
196 /// Endpoint configuration.
197 pub config: ShortEndpointConfig,
198 /// Performance metrics.
199 #[schema(value_type = InputEndpointMetrics)]
200 pub metrics: ExternalInputEndpointMetrics,
201 /// The first fatal error that occurred at the endpoint.
202 pub fatal_error: Option<String>,
203 /// Recent parse errors on this endpoint.
204 #[serde(default, skip_serializing_if = "Option::is_none")]
205 pub parse_errors: Option<Vec<ConnectorError>>,
206 /// Recent transport errors on this endpoint.
207 #[serde(default, skip_serializing_if = "Option::is_none")]
208 pub transport_errors: Option<Vec<ConnectorError>>,
209 /// Endpoint has been paused by the user.
210 pub paused: bool,
211 /// Endpoint is currently a barrier to checkpointing and suspend.
212 pub barrier: bool,
213 /// The latest completed watermark.
214 #[schema(value_type = Option<CompletedWatermark>)]
215 pub completed_frontier: Option<CompletedWatermark>,
216}
217
218/// Performance metrics for an output endpoint.
219#[derive(Debug, Default, Deserialize, Serialize, ToSchema, PartialEq, Eq, PartialOrd, Ord)]
220#[schema(as = OutputEndpointMetrics)]
221pub struct ExternalOutputEndpointMetrics {
222 /// Records sent on the underlying transport.
223 pub transmitted_records: u64,
224 /// Bytes sent on the underlying transport.
225 pub transmitted_bytes: u64,
226 /// Number of queued records.
227 pub queued_records: u64,
228 /// Number of queued batches.
229 pub queued_batches: u64,
230 /// Number of records pushed to the output buffer.
231 pub buffered_records: u64,
232 /// Number of batches in the buffer.
233 pub buffered_batches: u64,
234 /// Number of encoding errors.
235 pub num_encode_errors: u64,
236 /// Number of transport errors.
237 pub num_transport_errors: u64,
238 /// The number of input records processed by the circuit.
239 ///
240 /// This metric tracks the end-to-end progress of the pipeline: the output
241 /// of this endpoint is equal to the output of the circuit after
242 /// processing `total_processed_input_records` records.
243 ///
244 /// In a multihost pipeline, this count reflects only the input records
245 /// processed on the same host as the output endpoint, which is not usually
246 /// meaningful.
247 pub total_processed_input_records: u64,
248 /// The number of steps whose input records have been processed by the
249 /// endpoint.
250 ///
251 /// This is meaningful in a multihost pipeline because steps are
252 /// synchronized across all of the hosts.
253 ///
254 /// # Interpretation
255 ///
256 /// This is a count, not a step number. If `total_processed_steps` is 0, no
257 /// steps have been processed to completion. If `total_processed_steps >
258 /// 0`, then the last step whose input records have been processed to
259 /// completion is `total_processed_steps - 1`. A record that was ingested in
260 /// step `n` is fully processed when `total_processed_steps > n`.
261 #[schema(value_type = u64)]
262 pub total_processed_steps: Step,
263 /// Extra memory in use beyond that used for queuing records.
264 pub memory: u64,
265}
266
267/// Output endpoint status information.
268#[derive(Debug, Deserialize, Serialize, ToSchema)]
269#[schema(as = OutputEndpointStatus)]
270pub struct ExternalOutputEndpointStatus {
271 /// Endpoint name.
272 pub endpoint_name: String,
273 /// Endpoint configuration.
274 pub config: ShortEndpointConfig,
275 /// Performance metrics.
276 #[schema(value_type = OutputEndpointMetrics)]
277 pub metrics: ExternalOutputEndpointMetrics,
278 /// The first fatal error that occurred at the endpoint.
279 pub fatal_error: Option<String>,
280 /// Recent encoding errors on this endpoint.
281 #[serde(default, skip_serializing_if = "Option::is_none")]
282 pub encode_errors: Option<Vec<ConnectorError>>,
283 /// Recent transport errors on this endpoint.
284 #[serde(default, skip_serializing_if = "Option::is_none")]
285 pub transport_errors: Option<Vec<ConnectorError>>,
286}
287
288/// Global controller metrics.
289#[derive(Debug, Default, Serialize, Deserialize, ToSchema)]
290#[schema(as = GlobalControllerMetrics)]
291pub struct ExternalGlobalControllerMetrics {
292 /// State of the pipeline: running, paused, or terminating.
293 pub state: PipelineState,
294 /// The pipeline has been resumed from a checkpoint and is currently bootstrapping new and modified views.
295 pub bootstrap_in_progress: bool,
296 /// Status of the current transaction.
297 pub transaction_status: TransactionStatus,
298 /// ID of the current transaction or 0 if no transaction is in progress.
299 #[schema(value_type = i64)]
300 pub transaction_id: TransactionId,
301 /// Progress of the current transaction commit, if one is in progress.
302 pub commit_progress: Option<CommitProgressSummary>,
303 /// Entities that initiated the current transaction.
304 #[schema(value_type = TransactionInitiators)]
305 pub transaction_initiators: ExternalTransactionInitiators,
306 /// Resident set size of the pipeline process, in bytes.
307 pub rss_bytes: u64,
308 /// CPU time used by the pipeline across all threads, in milliseconds.
309 pub cpu_msecs: u64,
310 /// Time since the pipeline process started, including time that the
311 /// pipeline was running or paused.
312 ///
313 /// This is the elapsed time since `start_time`.
314 pub uptime_msecs: u64,
315 /// Time at which the pipeline process started, in seconds since the epoch.
316 #[serde(with = "chrono::serde::ts_seconds")]
317 #[schema(value_type = u64)]
318 pub start_time: DateTime<Utc>,
319 /// Uniquely identifies the pipeline process that started at start_time.
320 pub incarnation_uuid: Uuid,
321 /// Time at which the pipeline process from which we resumed started, in seconds since the epoch.
322 #[serde(with = "chrono::serde::ts_seconds")]
323 #[schema(value_type = u64)]
324 pub initial_start_time: DateTime<Utc>,
325 /// Current storage usage in bytes.
326 pub storage_bytes: u64,
327 /// Storage usage integrated over time, in megabytes * seconds.
328 pub storage_mb_secs: u64,
329 /// Time elapsed while the pipeline is executing a step, multiplied by the number of threads, in milliseconds.
330 pub runtime_elapsed_msecs: u64,
331 /// Total number of records currently buffered by all endpoints.
332 pub buffered_input_records: u64,
333 /// Total number of bytes currently buffered by all endpoints.
334 pub buffered_input_bytes: u64,
335 /// Total number of records received from all endpoints.
336 pub total_input_records: u64,
337 /// Total number of bytes received from all endpoints.
338 pub total_input_bytes: u64,
339 /// Total number of input records processed by the DBSP engine.
340 pub total_processed_records: u64,
341 /// Total bytes of input records processed by the DBSP engine.
342 pub total_processed_bytes: u64,
343 /// Total number of input records processed to completion.
344 pub total_completed_records: u64,
345 /// If the pipeline is stalled because one or more output connectors' output
346 /// buffers are full, this is the number of milliseconds that the current
347 /// stall has lasted.
348 ///
349 /// If this is nonzero, then the output connectors causing the stall can be
350 /// identified by noticing `ExternalOutputEndpointMetrics::queued_records`
351 /// is greater than or equal to `ConnectorConfig::max_queued_records`.
352 ///
353 /// In the ordinary case, the pipeline is not stalled, and this value is 0.
354 pub output_stall_msecs: u64,
355 /// Number of steps that have been initiated.
356 ///
357 /// # Interpretation
358 ///
359 /// This is a count, not a step number. If `total_initiated_steps` is 0, no
360 /// steps have been initiated. If `total_initiated_steps > 0`, then step
361 /// `total_initiated_steps - 1` has been started and all steps previous to
362 /// that have been completely processed by the circuit.
363 #[schema(value_type = u64)]
364 pub total_initiated_steps: Step,
365 /// Number of steps whose input records have been processed to completion.
366 ///
367 /// A record is processed to completion if it has been processed by the DBSP engine and
368 /// all outputs derived from it have been processed by all output connectors.
369 ///
370 /// # Interpretation
371 ///
372 /// This is a count, not a step number. If `total_completed_steps` is 0, no
373 /// steps have been processed to completion. If `total_completed_steps >
374 /// 0`, then the last step whose input records have been processed to
375 /// completion is `total_completed_steps - 1`. A record that was ingested
376 /// when `total_initiated_steps` was `n` is fully processed when
377 /// `total_completed_steps >= n`.
378 #[schema(value_type = u64)]
379 pub total_completed_steps: Step,
380 /// True if the pipeline has processed all input data to completion.
381 pub pipeline_complete: bool,
382}
383
384/// Complete pipeline statistics returned by the `/stats` endpoint.
385///
386/// This schema definition matches the serialized JSON structure from
387/// `adapters::controller::ControllerStatus`. The actual implementation with
388/// atomics and mutexes lives in the adapters crate, which uses ExternalControllerStatus to
389/// register this OpenAPI schema, making it available to pipeline-manager
390/// without requiring a direct dependency on the adapters crate.
391#[derive(Debug, Deserialize, Serialize, ToSchema, Default)]
392#[schema(as = ControllerStatus)]
393pub struct ExternalControllerStatus {
394 /// Global controller metrics.
395 #[schema(value_type = GlobalControllerMetrics)]
396 pub global_metrics: ExternalGlobalControllerMetrics,
397 /// Reason why the pipeline cannot be suspended or checkpointed (if any).
398 pub suspend_error: Option<SuspendError>,
399 /// Input endpoint configs and metrics.
400 #[schema(value_type = Vec<InputEndpointStatus>)]
401 pub inputs: Vec<ExternalInputEndpointStatus>,
402 /// Output endpoint configs and metrics.
403 #[schema(value_type = Vec<OutputEndpointStatus>)]
404 pub outputs: Vec<ExternalOutputEndpointStatus>,
405}
406
407fn serialize_timestamp_micros<S>(
408 timestamp: &DateTime<Utc>,
409 serializer: S,
410) -> Result<S::Ok, S::Error>
411where
412 S: serde::Serializer,
413{
414 serializer.serialize_str(×tamp.to_rfc3339_opts(SecondsFormat::Micros, true))
415}
416
417#[cfg(test)]
418mod tests {
419 use super::ConnectorError;
420 use chrono::{DateTime, Utc};
421
422 #[test]
423 fn connector_error_timestamp_serializes_with_microsecond_precision() {
424 let error = ConnectorError {
425 timestamp: DateTime::parse_from_rfc3339("2026-03-08T05:26:42.442438448Z")
426 .unwrap()
427 .with_timezone(&Utc),
428 index: 1,
429 tag: None,
430 message: "boom".to_string(),
431 };
432
433 let json = serde_json::to_string(&error).unwrap();
434 assert!(json.contains(r#""timestamp":"2026-03-08T05:26:42.442438Z""#));
435 }
436}