feldera_types/adapter_stats.rs
1use bytemuck::NoUninit;
2use chrono::{DateTime, SecondsFormat, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use std::collections::BTreeMap;
6use utoipa::ToSchema;
7use uuid::Uuid;
8
9use crate::{
10 coordination::Step,
11 memory_pressure::MemoryPressure,
12 suspend::SuspendError,
13 transaction::{CommitProgressSummary, TransactionId},
14};
15
16/// Pipeline state.
17#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
18#[serde(rename_all = "PascalCase")]
19pub enum PipelineState {
20 /// All input endpoints are paused (or are in the process of being paused).
21 #[default]
22 Paused,
23 /// Controller is running.
24 Running,
25 /// Controller is being terminated.
26 Terminated,
27}
28
29// Metrics for an input endpoint.
30///
31/// Serializes to match the subset of fields needed for error tracking.
32#[derive(Default, Serialize, Deserialize, Debug, Clone)]
33pub struct InputEndpointErrorMetrics {
34 pub endpoint_name: String,
35 #[serde(default)]
36 pub num_transport_errors: u64,
37 #[serde(default)]
38 pub num_parse_errors: u64,
39}
40
41/// Metrics for an output endpoint.
42///
43/// Serializes to match the subset of fields needed for error tracking.
44#[derive(Default, Serialize, Deserialize, Debug, Clone)]
45pub struct OutputEndpointErrorMetrics {
46 pub endpoint_name: String,
47 #[serde(default)]
48 pub num_encode_errors: u64,
49 #[serde(default)]
50 pub num_transport_errors: u64,
51}
52
53/// Endpoint statistics containing metrics.
54///
55/// Wraps metrics in a structure that matches the full stats API shape.
56#[derive(Serialize, Deserialize, Debug, Clone)]
57pub struct EndpointErrorStats<T> {
58 #[serde(default)]
59 pub metrics: T,
60}
61
62/// Schema definition for endpoint config that only includes the stream field.
63#[derive(Debug, Deserialize, Serialize, ToSchema)]
64pub struct ShortEndpointConfig {
65 /// The name of the stream.
66 pub stream: String,
67}
68
69/// Pipeline error statistics response from the runtime.
70///
71/// Lightweight response containing only error counts from all endpoints.
72#[derive(Serialize, Deserialize, Debug, Clone)]
73pub struct PipelineStatsErrorsResponse {
74 #[serde(default)]
75 pub inputs: Vec<EndpointErrorStats<InputEndpointErrorMetrics>>,
76 #[serde(default)]
77 pub outputs: Vec<EndpointErrorStats<OutputEndpointErrorMetrics>>,
78}
79
80// OpenAPI schema definitions for controller statistics
81// These match the serialized JSON structure from the adapters crate
82
83/// Transaction status summarized as a single value.
84#[derive(
85 Debug, Default, Copy, PartialEq, Eq, Clone, NoUninit, Serialize, Deserialize, ToSchema,
86)]
87#[repr(u8)]
88pub enum TransactionStatus {
89 #[default]
90 NoTransaction,
91 TransactionInProgress,
92 CommitInProgress,
93}
94
95/// Transaction phase.
96#[derive(Clone, Copy, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
97#[serde(rename_all = "PascalCase")]
98#[schema(as = TransactionPhase)]
99pub enum ExternalTransactionPhase {
100 /// Transaction is in progress.
101 Started,
102 /// Transaction has been committed.
103 Committed,
104}
105
106/// Connector transaction phase with debugging label.
107#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
108#[schema(as = ConnectorTransactionPhase)]
109pub struct ExternalConnectorTransactionPhase {
110 /// Current phase of the transaction.
111 #[schema(value_type = TransactionPhase)]
112 pub phase: ExternalTransactionPhase,
113 /// Optional label for debugging.
114 pub label: Option<String>,
115}
116
117/// Information about entities that initiated the current transaction.
118#[derive(Clone, Default, Debug, Deserialize, Serialize, ToSchema)]
119#[schema(as = TransactionInitiators)]
120pub struct ExternalTransactionInitiators {
121 /// ID assigned to the transaction (None if no transaction is in progress).
122 #[schema(value_type = Option<i64>)]
123 pub transaction_id: Option<TransactionId>,
124 /// Transaction phase initiated by the API.
125 #[schema(value_type = Option<TransactionPhase>)]
126 pub initiated_by_api: Option<ExternalTransactionPhase>,
127 /// Transaction phases initiated by connectors, indexed by endpoint name.
128 #[schema(value_type = BTreeMap<String, ConnectorTransactionPhase>)]
129 pub initiated_by_connectors: BTreeMap<String, ExternalConnectorTransactionPhase>,
130}
131
132/// A watermark that has been fully processed by the pipeline.
133#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
134pub struct CompletedWatermark {
135 /// Metadata that describes the position in the input stream (e.g., Kafka partition/offset pairs).
136 #[schema(value_type = Object)]
137 pub metadata: JsonValue,
138 /// Timestamp when the data was ingested from the wire.
139 #[serde(serialize_with = "serialize_timestamp_micros")]
140 pub ingested_at: DateTime<Utc>,
141 /// Timestamp when the data was processed by the circuit.
142 #[serde(serialize_with = "serialize_timestamp_micros")]
143 pub processed_at: DateTime<Utc>,
144 /// Timestamp when all outputs produced from this input have been pushed to all output endpoints.
145 #[serde(serialize_with = "serialize_timestamp_micros")]
146 pub completed_at: DateTime<Utc>,
147}
148
149#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
150pub enum ConnectorHealthStatus {
151 #[default]
152 Healthy,
153 Unhealthy,
154}
155
156#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
157pub struct ConnectorHealth {
158 pub status: ConnectorHealthStatus,
159 pub description: Option<String>,
160}
161
162impl ConnectorHealth {
163 pub fn healthy() -> Self {
164 Self {
165 status: ConnectorHealthStatus::Healthy,
166 description: None,
167 }
168 }
169 pub fn unhealthy(description: &str) -> Self {
170 Self {
171 status: ConnectorHealthStatus::Unhealthy,
172 description: Some(description.to_string()),
173 }
174 }
175}
176
177#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
178pub struct ConnectorError {
179 /// Timestamp when the error occurred, serialized as RFC3339 with microseconds.
180 #[serde(serialize_with = "serialize_timestamp_micros")]
181 pub timestamp: DateTime<Utc>,
182
183 /// Sequence number of the error.
184 ///
185 /// The client can use this field to detect gaps in the error list reported
186 /// by the pipeline. When the connector reports a large number of errors, the
187 /// pipeline will only preserve and report the most recent errors of each kind.
188 pub index: u64,
189
190 /// Optional tag for the error.
191 ///
192 /// The tag is used to group errors by their type.
193 pub tag: Option<String>,
194
195 /// Error message.
196 pub message: String,
197}
198
199/// Performance metrics for an input endpoint.
200#[derive(Debug, Default, Deserialize, Serialize, ToSchema)]
201#[schema(as = InputEndpointMetrics)]
202pub struct ExternalInputEndpointMetrics {
203 /// Total bytes pushed to the endpoint since it was created.
204 pub total_bytes: u64,
205 /// Total records pushed to the endpoint since it was created.
206 pub total_records: u64,
207 /// Number of records currently buffered by the endpoint (not yet consumed by the circuit).
208 pub buffered_records: u64,
209 /// Number of bytes currently buffered by the endpoint (not yet consumed by the circuit).
210 pub buffered_bytes: u64,
211 /// Number of transport errors.
212 pub num_transport_errors: u64,
213 /// Number of parse errors.
214 pub num_parse_errors: u64,
215 /// True if end-of-input has been signaled.
216 pub end_of_input: bool,
217}
218
219/// Input endpoint status information.
220#[derive(Debug, Serialize, Deserialize, ToSchema)]
221#[schema(as = InputEndpointStatus)]
222pub struct ExternalInputEndpointStatus {
223 /// Endpoint name.
224 pub endpoint_name: String,
225 /// Endpoint configuration.
226 pub config: ShortEndpointConfig,
227 /// Performance metrics.
228 #[schema(value_type = InputEndpointMetrics)]
229 pub metrics: ExternalInputEndpointMetrics,
230 /// The first fatal error that occurred at the endpoint.
231 pub fatal_error: Option<String>,
232 /// Recent parse errors on this endpoint.
233 #[serde(default, skip_serializing_if = "Option::is_none")]
234 pub parse_errors: Option<Vec<ConnectorError>>,
235 /// Recent transport errors on this endpoint.
236 #[serde(default, skip_serializing_if = "Option::is_none")]
237 pub transport_errors: Option<Vec<ConnectorError>>,
238 /// Health status of the connector.
239 #[serde(default)]
240 pub health: Option<ConnectorHealth>,
241 /// Endpoint has been paused by the user.
242 pub paused: bool,
243 /// Endpoint is currently a barrier to checkpointing and suspend.
244 pub barrier: bool,
245 /// The latest completed watermark.
246 #[schema(value_type = Option<CompletedWatermark>)]
247 pub completed_frontier: Option<CompletedWatermark>,
248}
249
250/// Performance metrics for an output endpoint.
251#[derive(Debug, Default, Deserialize, Serialize, ToSchema, PartialEq, Eq, PartialOrd, Ord)]
252#[schema(as = OutputEndpointMetrics)]
253pub struct ExternalOutputEndpointMetrics {
254 /// Records sent on the underlying transport.
255 pub transmitted_records: u64,
256 /// Bytes sent on the underlying transport.
257 pub transmitted_bytes: u64,
258 /// Number of queued records.
259 pub queued_records: u64,
260 /// Number of queued batches.
261 pub queued_batches: u64,
262 /// Number of records pushed to the output buffer.
263 pub buffered_records: u64,
264 /// Number of batches in the buffer.
265 pub buffered_batches: u64,
266 /// Number of encoding errors.
267 pub num_encode_errors: u64,
268 /// Number of transport errors.
269 pub num_transport_errors: u64,
270 /// The number of input records processed by the circuit.
271 ///
272 /// This metric tracks the end-to-end progress of the pipeline: the output
273 /// of this endpoint is equal to the output of the circuit after
274 /// processing `total_processed_input_records` records.
275 ///
276 /// In a multihost pipeline, this count reflects only the input records
277 /// processed on the same host as the output endpoint, which is not usually
278 /// meaningful.
279 pub total_processed_input_records: u64,
280 /// The number of steps whose input records have been processed by the
281 /// endpoint.
282 ///
283 /// This is meaningful in a multihost pipeline because steps are
284 /// synchronized across all of the hosts.
285 ///
286 /// # Interpretation
287 ///
288 /// This is a count, not a step number. If `total_processed_steps` is 0, no
289 /// steps have been processed to completion. If `total_processed_steps >
290 /// 0`, then the last step whose input records have been processed to
291 /// completion is `total_processed_steps - 1`. A record that was ingested in
292 /// step `n` is fully processed when `total_processed_steps > n`.
293 #[schema(value_type = u64)]
294 pub total_processed_steps: Step,
295 /// Extra memory in use beyond that used for queuing records.
296 pub memory: u64,
297}
298
299/// Output endpoint status information.
300#[derive(Debug, Deserialize, Serialize, ToSchema)]
301#[schema(as = OutputEndpointStatus)]
302pub struct ExternalOutputEndpointStatus {
303 /// Endpoint name.
304 pub endpoint_name: String,
305 /// Endpoint configuration.
306 pub config: ShortEndpointConfig,
307 /// Performance metrics.
308 #[schema(value_type = OutputEndpointMetrics)]
309 pub metrics: ExternalOutputEndpointMetrics,
310 /// The first fatal error that occurred at the endpoint.
311 pub fatal_error: Option<String>,
312 /// Recent encoding errors on this endpoint.
313 #[serde(default, skip_serializing_if = "Option::is_none")]
314 pub encode_errors: Option<Vec<ConnectorError>>,
315 /// Recent transport errors on this endpoint.
316 #[serde(default, skip_serializing_if = "Option::is_none")]
317 pub transport_errors: Option<Vec<ConnectorError>>,
318 /// Health status of the connector.
319 #[serde(default)]
320 pub health: Option<ConnectorHealth>,
321}
322
323/// Global controller metrics.
324#[derive(Debug, Default, Serialize, Deserialize, ToSchema)]
325#[schema(as = GlobalControllerMetrics)]
326pub struct ExternalGlobalControllerMetrics {
327 /// State of the pipeline: running, paused, or terminating.
328 pub state: PipelineState,
329 /// The pipeline has been resumed from a checkpoint and is currently bootstrapping new and modified views.
330 pub bootstrap_in_progress: bool,
331 /// Status of the current transaction.
332 pub transaction_status: TransactionStatus,
333 /// ID of the current transaction or 0 if no transaction is in progress.
334 #[schema(value_type = i64)]
335 pub transaction_id: TransactionId,
336 /// Elapsed time in milliseconds, according to `transaction_status`:
337 ///
338 /// - [TransactionStatus::TransactionInProgress]: Time that this transaction
339 /// has been in progress.
340 ///
341 /// - [TransactionStatus::CommitInProgress]: Time that this transaction has
342 /// been committing.
343 pub transaction_msecs: Option<u64>,
344 /// Number of records in this transaction, according to
345 /// `transaction_status`:
346 ///
347 /// - [TransactionStatus::TransactionInProgress]: Number of records added so
348 /// far. More records might be added.
349 ///
350 /// - [TransactionStatus::CommitInProgress]: Final number of records.
351 pub transaction_records: Option<u64>,
352 /// Progress of the current transaction commit, if one is in progress.
353 pub commit_progress: Option<CommitProgressSummary>,
354 /// Entities that initiated the current transaction.
355 #[schema(value_type = TransactionInitiators)]
356 pub transaction_initiators: ExternalTransactionInitiators,
357 /// Resident set size of the pipeline process, in bytes.
358 pub rss_bytes: u64,
359 /// Memory pressure.
360 pub memory_pressure: MemoryPressure,
361 /// Memory pressure epoch.
362 pub memory_pressure_epoch: u64,
363 /// CPU time used by the pipeline across all threads, in milliseconds.
364 pub cpu_msecs: u64,
365 /// Time since the pipeline process started, including time that the
366 /// pipeline was running or paused.
367 ///
368 /// This is the elapsed time since `start_time`.
369 pub uptime_msecs: u64,
370 /// Time at which the pipeline process started, in seconds since the epoch.
371 #[serde(with = "chrono::serde::ts_seconds")]
372 #[schema(value_type = u64)]
373 pub start_time: DateTime<Utc>,
374 /// Uniquely identifies the pipeline process that started at start_time.
375 pub incarnation_uuid: Uuid,
376 /// Time at which the pipeline process from which we resumed started, in seconds since the epoch.
377 #[serde(with = "chrono::serde::ts_seconds")]
378 #[schema(value_type = u64)]
379 pub initial_start_time: DateTime<Utc>,
380 /// Current storage usage in bytes.
381 pub storage_bytes: u64,
382 /// Storage usage integrated over time, in megabytes * seconds.
383 pub storage_mb_secs: u64,
384 /// Time elapsed while the pipeline is executing a step, multiplied by the number of threads, in milliseconds.
385 pub runtime_elapsed_msecs: u64,
386 /// Total number of records currently buffered by all endpoints.
387 pub buffered_input_records: u64,
388 /// Total number of bytes currently buffered by all endpoints.
389 pub buffered_input_bytes: u64,
390 /// Total number of records received from all endpoints.
391 pub total_input_records: u64,
392 /// Total number of bytes received from all endpoints.
393 pub total_input_bytes: u64,
394 /// Total number of input records processed by the DBSP engine.
395 pub total_processed_records: u64,
396 /// Total bytes of input records processed by the DBSP engine.
397 pub total_processed_bytes: u64,
398 /// Total number of input records processed to completion.
399 pub total_completed_records: u64,
400 /// If the pipeline is stalled because one or more output connectors' output
401 /// buffers are full, this is the number of milliseconds that the current
402 /// stall has lasted.
403 ///
404 /// If this is nonzero, then the output connectors causing the stall can be
405 /// identified by noticing `ExternalOutputEndpointMetrics::queued_records`
406 /// is greater than or equal to `ConnectorConfig::max_queued_records`.
407 ///
408 /// In the ordinary case, the pipeline is not stalled, and this value is 0.
409 pub output_stall_msecs: u64,
410 /// Number of steps that have been initiated.
411 ///
412 /// # Interpretation
413 ///
414 /// This is a count, not a step number. If `total_initiated_steps` is 0, no
415 /// steps have been initiated. If `total_initiated_steps > 0`, then step
416 /// `total_initiated_steps - 1` has been started and all steps previous to
417 /// that have been completely processed by the circuit.
418 #[schema(value_type = u64)]
419 pub total_initiated_steps: Step,
420 /// Number of steps whose input records have been processed to completion.
421 ///
422 /// A record is processed to completion if it has been processed by the DBSP engine and
423 /// all outputs derived from it have been processed by all output connectors.
424 ///
425 /// # Interpretation
426 ///
427 /// This is a count, not a step number. If `total_completed_steps` is 0, no
428 /// steps have been processed to completion. If `total_completed_steps >
429 /// 0`, then the last step whose input records have been processed to
430 /// completion is `total_completed_steps - 1`. A record that was ingested
431 /// when `total_initiated_steps` was `n` is fully processed when
432 /// `total_completed_steps >= n`.
433 #[schema(value_type = u64)]
434 pub total_completed_steps: Step,
435 /// True if the pipeline has processed all input data to completion.
436 pub pipeline_complete: bool,
437}
438
439/// Complete pipeline statistics returned by the `/stats` endpoint.
440///
441/// This schema definition matches the serialized JSON structure from
442/// `adapters::controller::ControllerStatus`. The actual implementation with
443/// atomics and mutexes lives in the adapters crate, which uses ExternalControllerStatus to
444/// register this OpenAPI schema, making it available to pipeline-manager
445/// without requiring a direct dependency on the adapters crate.
446#[derive(Debug, Deserialize, Serialize, ToSchema, Default)]
447#[schema(as = ControllerStatus)]
448pub struct ExternalControllerStatus {
449 /// Global controller metrics.
450 #[schema(value_type = GlobalControllerMetrics)]
451 pub global_metrics: ExternalGlobalControllerMetrics,
452 /// Reason why the pipeline cannot be suspended or checkpointed (if any).
453 pub suspend_error: Option<SuspendError>,
454 /// Input endpoint configs and metrics.
455 #[schema(value_type = Vec<InputEndpointStatus>)]
456 pub inputs: Vec<ExternalInputEndpointStatus>,
457 /// Output endpoint configs and metrics.
458 #[schema(value_type = Vec<OutputEndpointStatus>)]
459 pub outputs: Vec<ExternalOutputEndpointStatus>,
460}
461
462fn serialize_timestamp_micros<S>(
463 timestamp: &DateTime<Utc>,
464 serializer: S,
465) -> Result<S::Ok, S::Error>
466where
467 S: serde::Serializer,
468{
469 serializer.serialize_str(×tamp.to_rfc3339_opts(SecondsFormat::Micros, true))
470}
471
472#[cfg(test)]
473mod tests {
474 use super::ConnectorError;
475 use chrono::{DateTime, Utc};
476
477 #[test]
478 fn connector_error_timestamp_serializes_with_microsecond_precision() {
479 let error = ConnectorError {
480 timestamp: DateTime::parse_from_rfc3339("2026-03-08T05:26:42.442438448Z")
481 .unwrap()
482 .with_timezone(&Utc),
483 index: 1,
484 tag: None,
485 message: "boom".to_string(),
486 };
487
488 let json = serde_json::to_string(&error).unwrap();
489 assert!(json.contains(r#""timestamp":"2026-03-08T05:26:42.442438Z""#));
490 }
491}