Skip to main content

feldera_types/
adapter_stats.rs

1use bytemuck::NoUninit;
2use chrono::{DateTime, SecondsFormat, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use std::collections::BTreeMap;
6use utoipa::ToSchema;
7use uuid::Uuid;
8
9use crate::{
10    checkpoint::CheckpointActivity,
11    coordination::Step,
12    memory_pressure::MemoryPressure,
13    suspend::{PermanentSuspendError, SuspendError},
14    transaction::{CommitProgressSummary, TransactionId},
15};
16
17/// Pipeline state.
18#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
19#[serde(rename_all = "PascalCase")]
20pub enum PipelineState {
21    /// All input endpoints are paused (or are in the process of being paused).
22    #[default]
23    Paused,
24    /// Controller is running.
25    Running,
26    /// Controller is being terminated.
27    Terminated,
28}
29
30// Metrics for an input endpoint.
31///
32/// Serializes to match the subset of fields needed for error tracking.
33#[derive(Default, Serialize, Deserialize, Debug, Clone)]
34pub struct InputEndpointErrorMetrics {
35    pub endpoint_name: String,
36    #[serde(default)]
37    pub num_transport_errors: u64,
38    #[serde(default)]
39    pub num_parse_errors: u64,
40}
41
42/// Metrics for an output endpoint.
43///
44/// Serializes to match the subset of fields needed for error tracking.
45#[derive(Default, Serialize, Deserialize, Debug, Clone)]
46pub struct OutputEndpointErrorMetrics {
47    pub endpoint_name: String,
48    #[serde(default)]
49    pub num_encode_errors: u64,
50    #[serde(default)]
51    pub num_transport_errors: u64,
52}
53
54/// Endpoint statistics containing metrics.
55///
56/// Wraps metrics in a structure that matches the full stats API shape.
57#[derive(Serialize, Deserialize, Debug, Clone)]
58pub struct EndpointErrorStats<T> {
59    #[serde(default)]
60    pub metrics: T,
61}
62
63/// Schema definition for endpoint config that only includes the stream field.
64#[derive(Debug, Deserialize, Serialize, ToSchema)]
65pub struct ShortEndpointConfig {
66    /// The name of the stream.
67    pub stream: String,
68}
69
70/// Pipeline error statistics response from the runtime.
71///
72/// Lightweight response containing only error counts from all endpoints.
73#[derive(Serialize, Deserialize, Debug, Clone)]
74pub struct PipelineStatsErrorsResponse {
75    #[serde(default)]
76    pub inputs: Vec<EndpointErrorStats<InputEndpointErrorMetrics>>,
77    #[serde(default)]
78    pub outputs: Vec<EndpointErrorStats<OutputEndpointErrorMetrics>>,
79}
80
81// OpenAPI schema definitions for controller statistics
82// These match the serialized JSON structure from the adapters crate
83
84/// Transaction status summarized as a single value.
85#[derive(
86    Debug, Default, Copy, PartialEq, Eq, Clone, NoUninit, Serialize, Deserialize, ToSchema,
87)]
88#[repr(u8)]
89pub enum TransactionStatus {
90    #[default]
91    NoTransaction,
92    TransactionInProgress,
93    CommitInProgress,
94}
95
96/// Transaction phase.
97#[derive(Clone, Copy, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
98#[serde(rename_all = "PascalCase")]
99#[schema(as = TransactionPhase)]
100pub enum ExternalTransactionPhase {
101    /// Transaction is in progress.
102    Started,
103    /// Transaction has been committed.
104    Committed,
105}
106
107/// Connector transaction phase with debugging label.
108#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
109#[schema(as = ConnectorTransactionPhase)]
110pub struct ExternalConnectorTransactionPhase {
111    /// Current phase of the transaction.
112    #[schema(value_type = TransactionPhase)]
113    pub phase: ExternalTransactionPhase,
114    /// Optional label for debugging.
115    pub label: Option<String>,
116}
117
118/// Information about entities that initiated the current transaction.
119#[derive(Clone, Default, Debug, Deserialize, Serialize, ToSchema)]
120#[schema(as = TransactionInitiators)]
121pub struct ExternalTransactionInitiators {
122    /// ID assigned to the transaction (None if no transaction is in progress).
123    #[schema(value_type = Option<i64>)]
124    pub transaction_id: Option<TransactionId>,
125    /// Transaction phase initiated by the API.
126    #[schema(value_type = Option<TransactionPhase>)]
127    pub initiated_by_api: Option<ExternalTransactionPhase>,
128    /// Transaction phases initiated by connectors, indexed by endpoint name.
129    #[schema(value_type = BTreeMap<String, ConnectorTransactionPhase>)]
130    pub initiated_by_connectors: BTreeMap<String, ExternalConnectorTransactionPhase>,
131}
132
133/// A watermark that has been fully processed by the pipeline.
134#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
135pub struct CompletedWatermark {
136    /// Metadata that describes the position in the input stream (e.g., Kafka partition/offset pairs).
137    #[schema(value_type = Object)]
138    pub metadata: JsonValue,
139    /// Timestamp when the data was ingested from the wire.
140    #[serde(serialize_with = "serialize_timestamp_micros")]
141    pub ingested_at: DateTime<Utc>,
142    /// Timestamp when the data was processed by the circuit.
143    #[serde(serialize_with = "serialize_timestamp_micros")]
144    pub processed_at: DateTime<Utc>,
145    /// Timestamp when all outputs produced from this input have been pushed to all output endpoints.
146    #[serde(serialize_with = "serialize_timestamp_micros")]
147    pub completed_at: DateTime<Utc>,
148}
149
150#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
151pub enum ConnectorHealthStatus {
152    #[default]
153    Healthy,
154    Unhealthy,
155}
156
157#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
158pub struct ConnectorHealth {
159    pub status: ConnectorHealthStatus,
160    pub description: Option<String>,
161}
162
163impl ConnectorHealth {
164    pub fn healthy() -> Self {
165        Self {
166            status: ConnectorHealthStatus::Healthy,
167            description: None,
168        }
169    }
170    pub fn unhealthy(description: &str) -> Self {
171        Self {
172            status: ConnectorHealthStatus::Unhealthy,
173            description: Some(description.to_string()),
174        }
175    }
176}
177
178#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone, PartialEq, Eq)]
179pub struct ConnectorError {
180    /// Timestamp when the error occurred, serialized as RFC3339 with microseconds.
181    #[serde(serialize_with = "serialize_timestamp_micros")]
182    pub timestamp: DateTime<Utc>,
183
184    /// Sequence number of the error.
185    ///
186    /// The client can use this field to detect gaps in the error list reported
187    /// by the pipeline. When the connector reports a large number of errors, the
188    /// pipeline will only preserve and report the most recent errors of each kind.
189    pub index: u64,
190
191    /// Optional tag for the error.
192    ///
193    /// The tag is used to group errors by their type.
194    pub tag: Option<String>,
195
196    /// Error message.
197    pub message: String,
198}
199
200/// Performance metrics for an input endpoint.
201#[derive(Debug, Default, Deserialize, Serialize, ToSchema)]
202#[schema(as = InputEndpointMetrics)]
203pub struct ExternalInputEndpointMetrics {
204    /// Total bytes pushed to the endpoint since it was created.
205    pub total_bytes: u64,
206    /// Total records pushed to the endpoint since it was created.
207    pub total_records: u64,
208    /// Number of records currently buffered by the endpoint (not yet consumed by the circuit).
209    pub buffered_records: u64,
210    /// Number of bytes currently buffered by the endpoint (not yet consumed by the circuit).
211    pub buffered_bytes: u64,
212    /// Number of transport errors.
213    pub num_transport_errors: u64,
214    /// Number of parse errors.
215    pub num_parse_errors: u64,
216    /// True if end-of-input has been signaled.
217    pub end_of_input: bool,
218}
219
220/// Input endpoint status information.
221#[derive(Debug, Serialize, Deserialize, ToSchema)]
222#[schema(as = InputEndpointStatus)]
223pub struct ExternalInputEndpointStatus {
224    /// Endpoint name.
225    pub endpoint_name: String,
226    /// Endpoint configuration.
227    pub config: ShortEndpointConfig,
228    /// Performance metrics.
229    #[schema(value_type = InputEndpointMetrics)]
230    pub metrics: ExternalInputEndpointMetrics,
231    /// The first fatal error that occurred at the endpoint.
232    pub fatal_error: Option<String>,
233    /// Recent parse errors on this endpoint.
234    #[serde(default, skip_serializing_if = "Option::is_none")]
235    pub parse_errors: Option<Vec<ConnectorError>>,
236    /// Recent transport errors on this endpoint.
237    #[serde(default, skip_serializing_if = "Option::is_none")]
238    pub transport_errors: Option<Vec<ConnectorError>>,
239    /// Health status of the connector.
240    #[serde(default)]
241    pub health: Option<ConnectorHealth>,
242    /// Endpoint has been paused by the user.
243    pub paused: bool,
244    /// Endpoint is currently a barrier to checkpointing and suspend.
245    pub barrier: bool,
246    /// The latest completed watermark.
247    #[schema(value_type = Option<CompletedWatermark>)]
248    pub completed_frontier: Option<CompletedWatermark>,
249}
250
251/// Performance metrics for an output endpoint.
252#[derive(Debug, Default, Deserialize, Serialize, ToSchema, PartialEq, Eq, PartialOrd, Ord)]
253#[schema(as = OutputEndpointMetrics)]
254pub struct ExternalOutputEndpointMetrics {
255    /// Records sent on the underlying transport.
256    pub transmitted_records: u64,
257    /// Bytes sent on the underlying transport.
258    pub transmitted_bytes: u64,
259    /// Number of queued records.
260    pub queued_records: u64,
261    /// Number of queued batches.
262    pub queued_batches: u64,
263    /// Number of records pushed to the output buffer.
264    pub buffered_records: u64,
265    /// Number of batches in the buffer.
266    pub buffered_batches: u64,
267    /// Number of encoding errors.
268    pub num_encode_errors: u64,
269    /// Number of transport errors.
270    pub num_transport_errors: u64,
271    /// The number of input records processed by the circuit.
272    ///
273    /// This metric tracks the end-to-end progress of the pipeline: the output
274    /// of this endpoint is equal to the output of the circuit after
275    /// processing `total_processed_input_records` records.
276    ///
277    /// In a multihost pipeline, this count reflects only the input records
278    /// processed on the same host as the output endpoint, which is not usually
279    /// meaningful.
280    pub total_processed_input_records: u64,
281    /// The number of steps whose input records have been processed by the
282    /// endpoint.
283    ///
284    /// This is meaningful in a multihost pipeline because steps are
285    /// synchronized across all of the hosts.
286    ///
287    /// # Interpretation
288    ///
289    /// This is a count, not a step number.  If `total_processed_steps` is 0, no
290    /// steps have been processed to completion.  If `total_processed_steps >
291    /// 0`, then the last step whose input records have been processed to
292    /// completion is `total_processed_steps - 1`. A record that was ingested in
293    /// step `n` is fully processed when `total_processed_steps > n`.
294    #[schema(value_type = u64)]
295    pub total_processed_steps: Step,
296    /// Extra memory in use beyond that used for queuing records.
297    pub memory: u64,
298    /// Number of records written so far while the connector is processing a
299    /// batch of updates.  Resets to 0 after the batch is committed.
300    ///
301    /// `None` when the connector does not support batch-progress reporting.
302    #[serde(default, skip_serializing_if = "Option::is_none")]
303    pub batch_records_written: Option<u64>,
304}
305
306/// Output endpoint status information.
307#[derive(Debug, Deserialize, Serialize, ToSchema)]
308#[schema(as = OutputEndpointStatus)]
309pub struct ExternalOutputEndpointStatus {
310    /// Endpoint name.
311    pub endpoint_name: String,
312    /// Endpoint configuration.
313    pub config: ShortEndpointConfig,
314    /// Performance metrics.
315    #[schema(value_type = OutputEndpointMetrics)]
316    pub metrics: ExternalOutputEndpointMetrics,
317    /// The first fatal error that occurred at the endpoint.
318    pub fatal_error: Option<String>,
319    /// Recent encoding errors on this endpoint.
320    #[serde(default, skip_serializing_if = "Option::is_none")]
321    pub encode_errors: Option<Vec<ConnectorError>>,
322    /// Recent transport errors on this endpoint.
323    #[serde(default, skip_serializing_if = "Option::is_none")]
324    pub transport_errors: Option<Vec<ConnectorError>>,
325    /// Health status of the connector.
326    #[serde(default)]
327    pub health: Option<ConnectorHealth>,
328}
329
330/// Global controller metrics.
331#[derive(Debug, Default, Serialize, Deserialize, ToSchema)]
332#[schema(as = GlobalControllerMetrics)]
333pub struct ExternalGlobalControllerMetrics {
334    /// State of the pipeline: running, paused, or terminating.
335    pub state: PipelineState,
336    /// The pipeline has been resumed from a checkpoint and is currently bootstrapping new and modified views.
337    pub bootstrap_in_progress: bool,
338    /// Status of the current transaction.
339    pub transaction_status: TransactionStatus,
340    /// ID of the current transaction or 0 if no transaction is in progress.
341    #[schema(value_type = i64)]
342    pub transaction_id: TransactionId,
343    /// Elapsed time in milliseconds, according to `transaction_status`:
344    ///
345    /// - [TransactionStatus::TransactionInProgress]: Time that this transaction
346    ///   has been in progress.
347    ///
348    /// - [TransactionStatus::CommitInProgress]: Time that this transaction has
349    ///   been committing.
350    pub transaction_msecs: Option<u64>,
351    /// Number of records in this transaction, according to
352    /// `transaction_status`:
353    ///
354    /// - [TransactionStatus::TransactionInProgress]: Number of records added so
355    ///   far.  More records might be added.
356    ///
357    /// - [TransactionStatus::CommitInProgress]: Final number of records.
358    pub transaction_records: Option<u64>,
359    /// Progress of the current transaction commit, if one is in progress.
360    pub commit_progress: Option<CommitProgressSummary>,
361    /// Entities that initiated the current transaction.
362    #[schema(value_type = TransactionInitiators)]
363    pub transaction_initiators: ExternalTransactionInitiators,
364    /// Resident set size of the pipeline process, in bytes.
365    pub rss_bytes: u64,
366    /// Memory pressure.
367    pub memory_pressure: MemoryPressure,
368    /// Memory pressure epoch.
369    pub memory_pressure_epoch: u64,
370    /// CPU time used by the pipeline across all threads, in milliseconds.
371    pub cpu_msecs: u64,
372    /// Time since the pipeline process started, including time that the
373    /// pipeline was running or paused.
374    ///
375    /// This is the elapsed time since `start_time`.
376    pub uptime_msecs: u64,
377    /// Time at which the pipeline process started, in seconds since the epoch.
378    #[serde(with = "chrono::serde::ts_seconds")]
379    #[schema(value_type = u64)]
380    pub start_time: DateTime<Utc>,
381    /// Uniquely identifies the pipeline process that started at start_time.
382    pub incarnation_uuid: Uuid,
383    /// Time at which the pipeline process from which we resumed started, in seconds since the epoch.
384    #[serde(with = "chrono::serde::ts_seconds")]
385    #[schema(value_type = u64)]
386    pub initial_start_time: DateTime<Utc>,
387    /// Current storage usage in bytes.
388    pub storage_bytes: u64,
389    /// Storage usage integrated over time, in megabytes * seconds.
390    pub storage_mb_secs: u64,
391    /// Time elapsed while the pipeline is executing a step, multiplied by the number of threads, in milliseconds.
392    pub runtime_elapsed_msecs: u64,
393    /// Total number of records currently buffered by all endpoints.
394    pub buffered_input_records: u64,
395    /// Total number of bytes currently buffered by all endpoints.
396    pub buffered_input_bytes: u64,
397    /// Total number of records received from all endpoints.
398    pub total_input_records: u64,
399    /// Total number of bytes received from all endpoints.
400    pub total_input_bytes: u64,
401    /// Total number of input records processed by the DBSP engine.
402    pub total_processed_records: u64,
403    /// Total bytes of input records processed by the DBSP engine.
404    pub total_processed_bytes: u64,
405    /// Total number of input records processed to completion.
406    pub total_completed_records: u64,
407    /// If the pipeline is stalled because one or more output connectors' output
408    /// buffers are full, this is the number of milliseconds that the current
409    /// stall has lasted.
410    ///
411    /// If this is nonzero, then the output connectors causing the stall can be
412    /// identified by noticing `ExternalOutputEndpointMetrics::queued_records`
413    /// is greater than or equal to `ConnectorConfig::max_queued_records`.
414    ///
415    /// In the ordinary case, the pipeline is not stalled, and this value is 0.
416    pub output_stall_msecs: u64,
417    /// Number of steps that have been initiated.
418    ///
419    /// # Interpretation
420    ///
421    /// This is a count, not a step number.  If `total_initiated_steps` is 0, no
422    /// steps have been initiated.  If `total_initiated_steps > 0`, then step
423    /// `total_initiated_steps - 1` has been started and all steps previous to
424    /// that have been completely processed by the circuit.
425    #[schema(value_type = u64)]
426    pub total_initiated_steps: Step,
427    /// Number of steps whose input records have been processed to completion.
428    ///
429    /// A record is processed to completion if it has been processed by the DBSP engine and
430    /// all outputs derived from it have been processed by all output connectors.
431    ///
432    /// # Interpretation
433    ///
434    /// This is a count, not a step number.  If `total_completed_steps` is 0, no
435    /// steps have been processed to completion.  If `total_completed_steps >
436    /// 0`, then the last step whose input records have been processed to
437    /// completion is `total_completed_steps - 1`. A record that was ingested
438    /// when `total_initiated_steps` was `n` is fully processed when
439    /// `total_completed_steps >= n`.
440    #[schema(value_type = u64)]
441    pub total_completed_steps: Step,
442    /// True if the pipeline has processed all input data to completion.
443    pub pipeline_complete: bool,
444}
445
446/// Complete pipeline statistics returned by the `/stats` endpoint.
447///
448/// This schema definition matches the serialized JSON structure from
449/// `adapters::controller::ControllerStatus`. The actual implementation with
450/// atomics and mutexes lives in the adapters crate, which uses ExternalControllerStatus to
451/// register this OpenAPI schema, making it available to pipeline-manager
452/// without requiring a direct dependency on the adapters crate.
453#[derive(Debug, Deserialize, Serialize, ToSchema, Default)]
454#[schema(as = ControllerStatus)]
455pub struct ExternalControllerStatus {
456    /// Global controller metrics.
457    #[schema(value_type = GlobalControllerMetrics)]
458    pub global_metrics: ExternalGlobalControllerMetrics,
459    /// Reason why the pipeline cannot be suspended or checkpointed (if any).
460    pub suspend_error: Option<SuspendError>,
461    /// Current checkpoint activity (idle, delayed, or in-progress).
462    /// `None` when the pipeline binary predates checkpoint activity tracking.
463    pub checkpoint_activity: Option<CheckpointActivity>,
464    /// If the pipeline fundamentally cannot checkpoint (e.g. storage is not
465    /// configured, or an input endpoint does not support suspend), the reasons
466    /// are listed here.  Unlike a checkpoint failure, this means *no*
467    /// checkpoint can succeed until the pipeline configuration changes.
468    pub permanent_checkpoint_errors: Option<Vec<PermanentSuspendError>>,
469    /// Input endpoint configs and metrics.
470    #[schema(value_type = Vec<InputEndpointStatus>)]
471    pub inputs: Vec<ExternalInputEndpointStatus>,
472    /// Output endpoint configs and metrics.
473    #[schema(value_type = Vec<OutputEndpointStatus>)]
474    pub outputs: Vec<ExternalOutputEndpointStatus>,
475}
476
477fn serialize_timestamp_micros<S>(
478    timestamp: &DateTime<Utc>,
479    serializer: S,
480) -> Result<S::Ok, S::Error>
481where
482    S: serde::Serializer,
483{
484    serializer.serialize_str(&timestamp.to_rfc3339_opts(SecondsFormat::Micros, true))
485}
486
487#[cfg(test)]
488mod tests {
489    use super::ConnectorError;
490    use chrono::{DateTime, Utc};
491
492    #[test]
493    fn connector_error_timestamp_serializes_with_microsecond_precision() {
494        let error = ConnectorError {
495            timestamp: DateTime::parse_from_rfc3339("2026-03-08T05:26:42.442438448Z")
496                .unwrap()
497                .with_timezone(&Utc),
498            index: 1,
499            tag: None,
500            message: "boom".to_string(),
501        };
502
503        let json = serde_json::to_string(&error).unwrap();
504        assert!(json.contains(r#""timestamp":"2026-03-08T05:26:42.442438Z""#));
505    }
506
507    #[test]
508    fn output_metrics_batch_records_written_serializes() {
509        use super::ExternalOutputEndpointMetrics;
510
511        let metrics = ExternalOutputEndpointMetrics {
512            batch_records_written: Some(42),
513            ..Default::default()
514        };
515        let json = serde_json::to_string(&metrics).unwrap();
516        assert!(json.contains(r#""batch_records_written":42"#));
517
518        let deserialized: ExternalOutputEndpointMetrics = serde_json::from_str(&json).unwrap();
519        assert_eq!(deserialized.batch_records_written, Some(42));
520    }
521
522    #[test]
523    fn output_metrics_batch_records_written_defaults_to_none() {
524        use super::ExternalOutputEndpointMetrics;
525
526        // Old JSON without batch_records_written defaults to None.
527        let json = r#"{"transmitted_records":0,"transmitted_bytes":0,"queued_records":0,"queued_batches":0,"buffered_records":0,"buffered_batches":0,"num_encode_errors":0,"num_transport_errors":0,"total_processed_input_records":0,"total_processed_steps":0,"memory":0}"#;
528        let metrics: ExternalOutputEndpointMetrics = serde_json::from_str(json).unwrap();
529        assert_eq!(metrics.batch_records_written, None);
530    }
531}