feldera_types/adapter_stats.rs
1use bytemuck::NoUninit;
2use chrono::{DateTime, SecondsFormat, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use std::collections::BTreeMap;
6use utoipa::ToSchema;
7use uuid::Uuid;
8
9use crate::{
10 checkpoint::CheckpointActivity,
11 coordination::Step,
12 memory_pressure::MemoryPressure,
13 suspend::{PermanentSuspendError, SuspendError},
14 transaction::{CommitProgressSummary, TransactionId},
15};
16
17/// Pipeline state.
18#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
19#[serde(rename_all = "PascalCase")]
20pub enum PipelineState {
21 /// All input endpoints are paused (or are in the process of being paused).
22 #[default]
23 Paused,
24 /// Controller is running.
25 Running,
26 /// Controller is being terminated.
27 Terminated,
28}
29
30// Metrics for an input endpoint.
31///
32/// Serializes to match the subset of fields needed for error tracking.
33#[derive(Default, Serialize, Deserialize, Debug, Clone)]
34pub struct InputEndpointErrorMetrics {
35 pub endpoint_name: String,
36 #[serde(default)]
37 pub num_transport_errors: u64,
38 #[serde(default)]
39 pub num_parse_errors: u64,
40}
41
42/// Metrics for an output endpoint.
43///
44/// Serializes to match the subset of fields needed for error tracking.
45#[derive(Default, Serialize, Deserialize, Debug, Clone)]
46pub struct OutputEndpointErrorMetrics {
47 pub endpoint_name: String,
48 #[serde(default)]
49 pub num_encode_errors: u64,
50 #[serde(default)]
51 pub num_transport_errors: u64,
52}
53
54/// Endpoint statistics containing metrics.
55///
56/// Wraps metrics in a structure that matches the full stats API shape.
57#[derive(Serialize, Deserialize, Debug, Clone)]
58pub struct EndpointErrorStats<T> {
59 #[serde(default)]
60 pub metrics: T,
61}
62
63/// Schema definition for endpoint config that only includes the stream field.
64#[derive(Debug, Deserialize, Serialize, ToSchema)]
65pub struct ShortEndpointConfig {
66 /// The name of the stream.
67 pub stream: String,
68}
69
70/// Pipeline error statistics response from the runtime.
71///
72/// Lightweight response containing only error counts from all endpoints.
73#[derive(Serialize, Deserialize, Debug, Clone)]
74pub struct PipelineStatsErrorsResponse {
75 #[serde(default)]
76 pub inputs: Vec<EndpointErrorStats<InputEndpointErrorMetrics>>,
77 #[serde(default)]
78 pub outputs: Vec<EndpointErrorStats<OutputEndpointErrorMetrics>>,
79}
80
81// OpenAPI schema definitions for controller statistics
82// These match the serialized JSON structure from the adapters crate
83
84/// Transaction status summarized as a single value.
85#[derive(
86 Debug, Default, Copy, PartialEq, Eq, Clone, NoUninit, Serialize, Deserialize, ToSchema,
87)]
88#[repr(u8)]
89pub enum TransactionStatus {
90 #[default]
91 NoTransaction,
92 TransactionInProgress,
93 CommitInProgress,
94}
95
96/// Transaction phase.
97#[derive(Clone, Copy, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
98#[serde(rename_all = "PascalCase")]
99#[schema(as = TransactionPhase)]
100pub enum ExternalTransactionPhase {
101 /// Transaction is in progress.
102 Started,
103 /// Transaction has been committed.
104 Committed,
105}
106
107/// Connector transaction phase with debugging label.
108#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
109#[schema(as = ConnectorTransactionPhase)]
110pub struct ExternalConnectorTransactionPhase {
111 /// Current phase of the transaction.
112 #[schema(value_type = TransactionPhase)]
113 pub phase: ExternalTransactionPhase,
114 /// Optional label for debugging.
115 pub label: Option<String>,
116}
117
118/// Information about entities that initiated the current transaction.
119#[derive(Clone, Default, Debug, Deserialize, Serialize, ToSchema)]
120#[schema(as = TransactionInitiators)]
121pub struct ExternalTransactionInitiators {
122 /// ID assigned to the transaction (None if no transaction is in progress).
123 #[schema(value_type = Option<i64>)]
124 pub transaction_id: Option<TransactionId>,
125 /// Transaction phase initiated by the API.
126 #[schema(value_type = Option<TransactionPhase>)]
127 pub initiated_by_api: Option<ExternalTransactionPhase>,
128 /// Transaction phases initiated by connectors, indexed by endpoint name.
129 #[schema(value_type = BTreeMap<String, ConnectorTransactionPhase>)]
130 pub initiated_by_connectors: BTreeMap<String, ExternalConnectorTransactionPhase>,
131}
132
133/// A watermark that has been fully processed by the pipeline.
134#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
135pub struct CompletedWatermark {
136 /// Metadata that describes the position in the input stream (e.g., Kafka partition/offset pairs).
137 #[schema(value_type = Object)]
138 pub metadata: JsonValue,
139 /// Timestamp when the data was ingested from the wire.
140 #[serde(serialize_with = "serialize_timestamp_micros")]
141 pub ingested_at: DateTime<Utc>,
142 /// Timestamp when the data was processed by the circuit.
143 #[serde(serialize_with = "serialize_timestamp_micros")]
144 pub processed_at: DateTime<Utc>,
145 /// Timestamp when all outputs produced from this input have been pushed to all output endpoints.
146 #[serde(serialize_with = "serialize_timestamp_micros")]
147 pub completed_at: DateTime<Utc>,
148}
149
150#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
151pub enum ConnectorHealthStatus {
152 #[default]
153 Healthy,
154 Unhealthy,
155}
156
157#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
158pub struct ConnectorHealth {
159 pub status: ConnectorHealthStatus,
160 pub description: Option<String>,
161}
162
163impl ConnectorHealth {
164 pub fn healthy() -> Self {
165 Self {
166 status: ConnectorHealthStatus::Healthy,
167 description: None,
168 }
169 }
170 pub fn unhealthy(description: &str) -> Self {
171 Self {
172 status: ConnectorHealthStatus::Unhealthy,
173 description: Some(description.to_string()),
174 }
175 }
176}
177
178#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone, PartialEq, Eq)]
179pub struct ConnectorError {
180 /// Timestamp when the error occurred, serialized as RFC3339 with microseconds.
181 #[serde(serialize_with = "serialize_timestamp_micros")]
182 pub timestamp: DateTime<Utc>,
183
184 /// Sequence number of the error.
185 ///
186 /// The client can use this field to detect gaps in the error list reported
187 /// by the pipeline. When the connector reports a large number of errors, the
188 /// pipeline will only preserve and report the most recent errors of each kind.
189 pub index: u64,
190
191 /// Optional tag for the error.
192 ///
193 /// The tag is used to group errors by their type.
194 pub tag: Option<String>,
195
196 /// Error message.
197 pub message: String,
198}
199
200/// Performance metrics for an input endpoint.
201#[derive(Debug, Default, Deserialize, Serialize, ToSchema)]
202#[schema(as = InputEndpointMetrics)]
203pub struct ExternalInputEndpointMetrics {
204 /// Total bytes pushed to the endpoint since it was created.
205 pub total_bytes: u64,
206 /// Total records pushed to the endpoint since it was created.
207 pub total_records: u64,
208 /// Number of records currently buffered by the endpoint (not yet consumed by the circuit).
209 pub buffered_records: u64,
210 /// Number of bytes currently buffered by the endpoint (not yet consumed by the circuit).
211 pub buffered_bytes: u64,
212 /// Number of transport errors.
213 pub num_transport_errors: u64,
214 /// Number of parse errors.
215 pub num_parse_errors: u64,
216 /// True if end-of-input has been signaled.
217 pub end_of_input: bool,
218}
219
220/// Input endpoint status information.
221#[derive(Debug, Serialize, Deserialize, ToSchema)]
222#[schema(as = InputEndpointStatus)]
223pub struct ExternalInputEndpointStatus {
224 /// Endpoint name.
225 pub endpoint_name: String,
226 /// Endpoint configuration.
227 pub config: ShortEndpointConfig,
228 /// Performance metrics.
229 #[schema(value_type = InputEndpointMetrics)]
230 pub metrics: ExternalInputEndpointMetrics,
231 /// The first fatal error that occurred at the endpoint.
232 pub fatal_error: Option<String>,
233 /// Recent parse errors on this endpoint.
234 #[serde(default, skip_serializing_if = "Option::is_none")]
235 pub parse_errors: Option<Vec<ConnectorError>>,
236 /// Recent transport errors on this endpoint.
237 #[serde(default, skip_serializing_if = "Option::is_none")]
238 pub transport_errors: Option<Vec<ConnectorError>>,
239 /// Health status of the connector.
240 #[serde(default)]
241 pub health: Option<ConnectorHealth>,
242 /// Endpoint has been paused by the user.
243 pub paused: bool,
244 /// Endpoint is currently a barrier to checkpointing and suspend.
245 pub barrier: bool,
246 /// The latest completed watermark.
247 #[schema(value_type = Option<CompletedWatermark>)]
248 pub completed_frontier: Option<CompletedWatermark>,
249}
250
251/// Performance metrics for an output endpoint.
252#[derive(Debug, Default, Deserialize, Serialize, ToSchema, PartialEq, Eq, PartialOrd, Ord)]
253#[schema(as = OutputEndpointMetrics)]
254pub struct ExternalOutputEndpointMetrics {
255 /// Records sent on the underlying transport.
256 pub transmitted_records: u64,
257 /// Bytes sent on the underlying transport.
258 pub transmitted_bytes: u64,
259 /// Number of queued records.
260 pub queued_records: u64,
261 /// Number of queued batches.
262 pub queued_batches: u64,
263 /// Number of records pushed to the output buffer.
264 pub buffered_records: u64,
265 /// Number of batches in the buffer.
266 pub buffered_batches: u64,
267 /// Number of encoding errors.
268 pub num_encode_errors: u64,
269 /// Number of transport errors.
270 pub num_transport_errors: u64,
271 /// The number of input records processed by the circuit.
272 ///
273 /// This metric tracks the end-to-end progress of the pipeline: the output
274 /// of this endpoint is equal to the output of the circuit after
275 /// processing `total_processed_input_records` records.
276 ///
277 /// In a multihost pipeline, this count reflects only the input records
278 /// processed on the same host as the output endpoint, which is not usually
279 /// meaningful.
280 pub total_processed_input_records: u64,
281 /// The number of steps whose input records have been processed by the
282 /// endpoint.
283 ///
284 /// This is meaningful in a multihost pipeline because steps are
285 /// synchronized across all of the hosts.
286 ///
287 /// # Interpretation
288 ///
289 /// This is a count, not a step number. If `total_processed_steps` is 0, no
290 /// steps have been processed to completion. If `total_processed_steps >
291 /// 0`, then the last step whose input records have been processed to
292 /// completion is `total_processed_steps - 1`. A record that was ingested in
293 /// step `n` is fully processed when `total_processed_steps > n`.
294 #[schema(value_type = u64)]
295 pub total_processed_steps: Step,
296 /// Extra memory in use beyond that used for queuing records.
297 pub memory: u64,
298 /// Number of records written so far while the connector is processing a
299 /// batch of updates. Resets to 0 after the batch is committed.
300 ///
301 /// `None` when the connector does not support batch-progress reporting.
302 #[serde(default, skip_serializing_if = "Option::is_none")]
303 pub batch_records_written: Option<u64>,
304}
305
306/// Output endpoint status information.
307#[derive(Debug, Deserialize, Serialize, ToSchema)]
308#[schema(as = OutputEndpointStatus)]
309pub struct ExternalOutputEndpointStatus {
310 /// Endpoint name.
311 pub endpoint_name: String,
312 /// Endpoint configuration.
313 pub config: ShortEndpointConfig,
314 /// Performance metrics.
315 #[schema(value_type = OutputEndpointMetrics)]
316 pub metrics: ExternalOutputEndpointMetrics,
317 /// The first fatal error that occurred at the endpoint.
318 pub fatal_error: Option<String>,
319 /// Recent encoding errors on this endpoint.
320 #[serde(default, skip_serializing_if = "Option::is_none")]
321 pub encode_errors: Option<Vec<ConnectorError>>,
322 /// Recent transport errors on this endpoint.
323 #[serde(default, skip_serializing_if = "Option::is_none")]
324 pub transport_errors: Option<Vec<ConnectorError>>,
325 /// Health status of the connector.
326 #[serde(default)]
327 pub health: Option<ConnectorHealth>,
328}
329
330/// Global controller metrics.
331#[derive(Debug, Default, Serialize, Deserialize, ToSchema)]
332#[schema(as = GlobalControllerMetrics)]
333pub struct ExternalGlobalControllerMetrics {
334 /// State of the pipeline: running, paused, or terminating.
335 pub state: PipelineState,
336 /// The pipeline has been resumed from a checkpoint and is currently bootstrapping new and modified views.
337 pub bootstrap_in_progress: bool,
338 /// Status of the current transaction.
339 pub transaction_status: TransactionStatus,
340 /// ID of the current transaction or 0 if no transaction is in progress.
341 #[schema(value_type = i64)]
342 pub transaction_id: TransactionId,
343 /// Elapsed time in milliseconds, according to `transaction_status`:
344 ///
345 /// - [TransactionStatus::TransactionInProgress]: Time that this transaction
346 /// has been in progress.
347 ///
348 /// - [TransactionStatus::CommitInProgress]: Time that this transaction has
349 /// been committing.
350 pub transaction_msecs: Option<u64>,
351 /// Number of records in this transaction, according to
352 /// `transaction_status`:
353 ///
354 /// - [TransactionStatus::TransactionInProgress]: Number of records added so
355 /// far. More records might be added.
356 ///
357 /// - [TransactionStatus::CommitInProgress]: Final number of records.
358 pub transaction_records: Option<u64>,
359 /// Progress of the current transaction commit, if one is in progress.
360 pub commit_progress: Option<CommitProgressSummary>,
361 /// Entities that initiated the current transaction.
362 #[schema(value_type = TransactionInitiators)]
363 pub transaction_initiators: ExternalTransactionInitiators,
364 /// Resident set size of the pipeline process, in bytes.
365 pub rss_bytes: u64,
366 /// Memory pressure.
367 pub memory_pressure: MemoryPressure,
368 /// Memory pressure epoch.
369 pub memory_pressure_epoch: u64,
370 /// CPU time used by the pipeline across all threads, in milliseconds.
371 pub cpu_msecs: u64,
372 /// Time since the pipeline process started, including time that the
373 /// pipeline was running or paused.
374 ///
375 /// This is the elapsed time since `start_time`.
376 pub uptime_msecs: u64,
377 /// Time at which the pipeline process started, in seconds since the epoch.
378 #[serde(with = "chrono::serde::ts_seconds")]
379 #[schema(value_type = u64)]
380 pub start_time: DateTime<Utc>,
381 /// Uniquely identifies the pipeline process that started at start_time.
382 pub incarnation_uuid: Uuid,
383 /// Time at which the pipeline process from which we resumed started, in seconds since the epoch.
384 #[serde(with = "chrono::serde::ts_seconds")]
385 #[schema(value_type = u64)]
386 pub initial_start_time: DateTime<Utc>,
387 /// Current storage usage in bytes.
388 pub storage_bytes: u64,
389 /// Storage usage integrated over time, in megabytes * seconds.
390 pub storage_mb_secs: u64,
391 /// Time elapsed while the pipeline is executing a step, multiplied by the number of threads, in milliseconds.
392 pub runtime_elapsed_msecs: u64,
393 /// Total number of records currently buffered by all endpoints.
394 pub buffered_input_records: u64,
395 /// Total number of bytes currently buffered by all endpoints.
396 pub buffered_input_bytes: u64,
397 /// Total number of records received from all endpoints.
398 pub total_input_records: u64,
399 /// Total number of bytes received from all endpoints.
400 pub total_input_bytes: u64,
401 /// Total number of input records processed by the DBSP engine.
402 pub total_processed_records: u64,
403 /// Total bytes of input records processed by the DBSP engine.
404 pub total_processed_bytes: u64,
405 /// Total number of input records processed to completion.
406 pub total_completed_records: u64,
407 /// If the pipeline is stalled because one or more output connectors' output
408 /// buffers are full, this is the number of milliseconds that the current
409 /// stall has lasted.
410 ///
411 /// If this is nonzero, then the output connectors causing the stall can be
412 /// identified by noticing `ExternalOutputEndpointMetrics::queued_records`
413 /// is greater than or equal to `ConnectorConfig::max_queued_records`.
414 ///
415 /// In the ordinary case, the pipeline is not stalled, and this value is 0.
416 pub output_stall_msecs: u64,
417 /// Number of steps that have been initiated.
418 ///
419 /// # Interpretation
420 ///
421 /// This is a count, not a step number. If `total_initiated_steps` is 0, no
422 /// steps have been initiated. If `total_initiated_steps > 0`, then step
423 /// `total_initiated_steps - 1` has been started and all steps previous to
424 /// that have been completely processed by the circuit.
425 #[schema(value_type = u64)]
426 pub total_initiated_steps: Step,
427 /// Number of steps whose input records have been processed to completion.
428 ///
429 /// A record is processed to completion if it has been processed by the DBSP engine and
430 /// all outputs derived from it have been processed by all output connectors.
431 ///
432 /// # Interpretation
433 ///
434 /// This is a count, not a step number. If `total_completed_steps` is 0, no
435 /// steps have been processed to completion. If `total_completed_steps >
436 /// 0`, then the last step whose input records have been processed to
437 /// completion is `total_completed_steps - 1`. A record that was ingested
438 /// when `total_initiated_steps` was `n` is fully processed when
439 /// `total_completed_steps >= n`.
440 #[schema(value_type = u64)]
441 pub total_completed_steps: Step,
442 /// True if the pipeline has processed all input data to completion.
443 pub pipeline_complete: bool,
444}
445
446/// Complete pipeline statistics returned by the `/stats` endpoint.
447///
448/// This schema definition matches the serialized JSON structure from
449/// `adapters::controller::ControllerStatus`. The actual implementation with
450/// atomics and mutexes lives in the adapters crate, which uses ExternalControllerStatus to
451/// register this OpenAPI schema, making it available to pipeline-manager
452/// without requiring a direct dependency on the adapters crate.
453#[derive(Debug, Deserialize, Serialize, ToSchema, Default)]
454#[schema(as = ControllerStatus)]
455pub struct ExternalControllerStatus {
456 /// Global controller metrics.
457 #[schema(value_type = GlobalControllerMetrics)]
458 pub global_metrics: ExternalGlobalControllerMetrics,
459 /// Reason why the pipeline cannot be suspended or checkpointed (if any).
460 pub suspend_error: Option<SuspendError>,
461 /// Current checkpoint activity (idle, delayed, or in-progress).
462 /// `None` when the pipeline binary predates checkpoint activity tracking.
463 pub checkpoint_activity: Option<CheckpointActivity>,
464 /// If the pipeline fundamentally cannot checkpoint (e.g. storage is not
465 /// configured, or an input endpoint does not support suspend), the reasons
466 /// are listed here. Unlike a checkpoint failure, this means *no*
467 /// checkpoint can succeed until the pipeline configuration changes.
468 pub permanent_checkpoint_errors: Option<Vec<PermanentSuspendError>>,
469 /// Input endpoint configs and metrics.
470 #[schema(value_type = Vec<InputEndpointStatus>)]
471 pub inputs: Vec<ExternalInputEndpointStatus>,
472 /// Output endpoint configs and metrics.
473 #[schema(value_type = Vec<OutputEndpointStatus>)]
474 pub outputs: Vec<ExternalOutputEndpointStatus>,
475}
476
477fn serialize_timestamp_micros<S>(
478 timestamp: &DateTime<Utc>,
479 serializer: S,
480) -> Result<S::Ok, S::Error>
481where
482 S: serde::Serializer,
483{
484 serializer.serialize_str(×tamp.to_rfc3339_opts(SecondsFormat::Micros, true))
485}
486
487#[cfg(test)]
488mod tests {
489 use super::ConnectorError;
490 use chrono::{DateTime, Utc};
491
492 #[test]
493 fn connector_error_timestamp_serializes_with_microsecond_precision() {
494 let error = ConnectorError {
495 timestamp: DateTime::parse_from_rfc3339("2026-03-08T05:26:42.442438448Z")
496 .unwrap()
497 .with_timezone(&Utc),
498 index: 1,
499 tag: None,
500 message: "boom".to_string(),
501 };
502
503 let json = serde_json::to_string(&error).unwrap();
504 assert!(json.contains(r#""timestamp":"2026-03-08T05:26:42.442438Z""#));
505 }
506
507 #[test]
508 fn output_metrics_batch_records_written_serializes() {
509 use super::ExternalOutputEndpointMetrics;
510
511 let metrics = ExternalOutputEndpointMetrics {
512 batch_records_written: Some(42),
513 ..Default::default()
514 };
515 let json = serde_json::to_string(&metrics).unwrap();
516 assert!(json.contains(r#""batch_records_written":42"#));
517
518 let deserialized: ExternalOutputEndpointMetrics = serde_json::from_str(&json).unwrap();
519 assert_eq!(deserialized.batch_records_written, Some(42));
520 }
521
522 #[test]
523 fn output_metrics_batch_records_written_defaults_to_none() {
524 use super::ExternalOutputEndpointMetrics;
525
526 // Old JSON without batch_records_written defaults to None.
527 let json = r#"{"transmitted_records":0,"transmitted_bytes":0,"queued_records":0,"queued_batches":0,"buffered_records":0,"buffered_batches":0,"num_encode_errors":0,"num_transport_errors":0,"total_processed_input_records":0,"total_processed_steps":0,"memory":0}"#;
528 let metrics: ExternalOutputEndpointMetrics = serde_json::from_str(json).unwrap();
529 assert_eq!(metrics.batch_records_written, None);
530 }
531}