Skip to main content

feldera_types/
adapter_stats.rs

1use bytemuck::NoUninit;
2use chrono::{DateTime, SecondsFormat, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use std::collections::BTreeMap;
6use utoipa::ToSchema;
7use uuid::Uuid;
8
9use crate::{
10    coordination::Step,
11    suspend::SuspendError,
12    transaction::{CommitProgressSummary, TransactionId},
13};
14
15/// Pipeline state.
16#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
17#[serde(rename_all = "PascalCase")]
18pub enum PipelineState {
19    /// All input endpoints are paused (or are in the process of being paused).
20    #[default]
21    Paused,
22    /// Controller is running.
23    Running,
24    /// Controller is being terminated.
25    Terminated,
26}
27
28// Metrics for an input endpoint.
29///
30/// Serializes to match the subset of fields needed for error tracking.
31#[derive(Default, Serialize, Deserialize, Debug, Clone)]
32pub struct InputEndpointErrorMetrics {
33    pub endpoint_name: String,
34    #[serde(default)]
35    pub num_transport_errors: u64,
36    #[serde(default)]
37    pub num_parse_errors: u64,
38}
39
40/// Metrics for an output endpoint.
41///
42/// Serializes to match the subset of fields needed for error tracking.
43#[derive(Default, Serialize, Deserialize, Debug, Clone)]
44pub struct OutputEndpointErrorMetrics {
45    pub endpoint_name: String,
46    #[serde(default)]
47    pub num_encode_errors: u64,
48    #[serde(default)]
49    pub num_transport_errors: u64,
50}
51
52/// Endpoint statistics containing metrics.
53///
54/// Wraps metrics in a structure that matches the full stats API shape.
55#[derive(Serialize, Deserialize, Debug, Clone)]
56pub struct EndpointErrorStats<T> {
57    #[serde(default)]
58    pub metrics: T,
59}
60
61/// Schema definition for endpoint config that only includes the stream field.
62#[derive(Debug, Deserialize, Serialize, ToSchema)]
63pub struct ShortEndpointConfig {
64    /// The name of the stream.
65    pub stream: String,
66}
67
68/// Pipeline error statistics response from the runtime.
69///
70/// Lightweight response containing only error counts from all endpoints.
71#[derive(Serialize, Deserialize, Debug, Clone)]
72pub struct PipelineStatsErrorsResponse {
73    #[serde(default)]
74    pub inputs: Vec<EndpointErrorStats<InputEndpointErrorMetrics>>,
75    #[serde(default)]
76    pub outputs: Vec<EndpointErrorStats<OutputEndpointErrorMetrics>>,
77}
78
79// OpenAPI schema definitions for controller statistics
80// These match the serialized JSON structure from the adapters crate
81
82/// Transaction status summarized as a single value.
83#[derive(
84    Debug, Default, Copy, PartialEq, Eq, Clone, NoUninit, Serialize, Deserialize, ToSchema,
85)]
86#[repr(u8)]
87pub enum TransactionStatus {
88    #[default]
89    NoTransaction,
90    TransactionInProgress,
91    CommitInProgress,
92}
93
94/// Transaction phase.
95#[derive(Clone, Copy, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
96#[serde(rename_all = "PascalCase")]
97#[schema(as = TransactionPhase)]
98pub enum ExternalTransactionPhase {
99    /// Transaction is in progress.
100    Started,
101    /// Transaction has been committed.
102    Committed,
103}
104
105/// Connector transaction phase with debugging label.
106#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
107#[schema(as = ConnectorTransactionPhase)]
108pub struct ExternalConnectorTransactionPhase {
109    /// Current phase of the transaction.
110    #[schema(value_type = TransactionPhase)]
111    pub phase: ExternalTransactionPhase,
112    /// Optional label for debugging.
113    pub label: Option<String>,
114}
115
116/// Information about entities that initiated the current transaction.
117#[derive(Clone, Default, Debug, Deserialize, Serialize, ToSchema)]
118#[schema(as = TransactionInitiators)]
119pub struct ExternalTransactionInitiators {
120    /// ID assigned to the transaction (None if no transaction is in progress).
121    #[schema(value_type = Option<i64>)]
122    pub transaction_id: Option<TransactionId>,
123    /// Transaction phase initiated by the API.
124    #[schema(value_type = Option<TransactionPhase>)]
125    pub initiated_by_api: Option<ExternalTransactionPhase>,
126    /// Transaction phases initiated by connectors, indexed by endpoint name.
127    #[schema(value_type = BTreeMap<String, ConnectorTransactionPhase>)]
128    pub initiated_by_connectors: BTreeMap<String, ExternalConnectorTransactionPhase>,
129}
130
131/// A watermark that has been fully processed by the pipeline.
132#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
133pub struct CompletedWatermark {
134    /// Metadata that describes the position in the input stream (e.g., Kafka partition/offset pairs).
135    #[schema(value_type = Object)]
136    pub metadata: JsonValue,
137    /// Timestamp when the data was ingested from the wire.
138    #[serde(serialize_with = "serialize_timestamp_micros")]
139    pub ingested_at: DateTime<Utc>,
140    /// Timestamp when the data was processed by the circuit.
141    #[serde(serialize_with = "serialize_timestamp_micros")]
142    pub processed_at: DateTime<Utc>,
143    /// Timestamp when all outputs produced from this input have been pushed to all output endpoints.
144    #[serde(serialize_with = "serialize_timestamp_micros")]
145    pub completed_at: DateTime<Utc>,
146}
147
148#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
149pub enum ConnectorHealthStatus {
150    #[default]
151    Healthy,
152    Unhealthy,
153}
154
155#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
156pub struct ConnectorHealth {
157    pub status: ConnectorHealthStatus,
158    pub description: Option<String>,
159}
160
161impl ConnectorHealth {
162    pub fn healthy() -> Self {
163        Self {
164            status: ConnectorHealthStatus::Healthy,
165            description: None,
166        }
167    }
168    pub fn unhealthy(description: &str) -> Self {
169        Self {
170            status: ConnectorHealthStatus::Unhealthy,
171            description: Some(description.to_string()),
172        }
173    }
174}
175
176#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
177pub struct ConnectorError {
178    /// Timestamp when the error occurred, serialized as RFC3339 with microseconds.
179    #[serde(serialize_with = "serialize_timestamp_micros")]
180    pub timestamp: DateTime<Utc>,
181
182    /// Sequence number of the error.
183    ///
184    /// The client can use this field to detect gaps in the error list reported
185    /// by the pipeline. When the connector reports a large number of errors, the
186    /// pipeline will only preserve and report the most recent errors of each kind.
187    pub index: u64,
188
189    /// Optional tag for the error.
190    ///
191    /// The tag is used to group errors by their type.
192    pub tag: Option<String>,
193
194    /// Error message.
195    pub message: String,
196}
197
198/// Performance metrics for an input endpoint.
199#[derive(Debug, Default, Deserialize, Serialize, ToSchema)]
200#[schema(as = InputEndpointMetrics)]
201pub struct ExternalInputEndpointMetrics {
202    /// Total bytes pushed to the endpoint since it was created.
203    pub total_bytes: u64,
204    /// Total records pushed to the endpoint since it was created.
205    pub total_records: u64,
206    /// Number of records currently buffered by the endpoint (not yet consumed by the circuit).
207    pub buffered_records: u64,
208    /// Number of bytes currently buffered by the endpoint (not yet consumed by the circuit).
209    pub buffered_bytes: u64,
210    /// Number of transport errors.
211    pub num_transport_errors: u64,
212    /// Number of parse errors.
213    pub num_parse_errors: u64,
214    /// True if end-of-input has been signaled.
215    pub end_of_input: bool,
216}
217
218/// Input endpoint status information.
219#[derive(Debug, Serialize, Deserialize, ToSchema)]
220#[schema(as = InputEndpointStatus)]
221pub struct ExternalInputEndpointStatus {
222    /// Endpoint name.
223    pub endpoint_name: String,
224    /// Endpoint configuration.
225    pub config: ShortEndpointConfig,
226    /// Performance metrics.
227    #[schema(value_type = InputEndpointMetrics)]
228    pub metrics: ExternalInputEndpointMetrics,
229    /// The first fatal error that occurred at the endpoint.
230    pub fatal_error: Option<String>,
231    /// Recent parse errors on this endpoint.
232    #[serde(default, skip_serializing_if = "Option::is_none")]
233    pub parse_errors: Option<Vec<ConnectorError>>,
234    /// Recent transport errors on this endpoint.
235    #[serde(default, skip_serializing_if = "Option::is_none")]
236    pub transport_errors: Option<Vec<ConnectorError>>,
237    /// Health status of the connector.
238    #[serde(default)]
239    pub health: Option<ConnectorHealth>,
240    /// Endpoint has been paused by the user.
241    pub paused: bool,
242    /// Endpoint is currently a barrier to checkpointing and suspend.
243    pub barrier: bool,
244    /// The latest completed watermark.
245    #[schema(value_type = Option<CompletedWatermark>)]
246    pub completed_frontier: Option<CompletedWatermark>,
247}
248
249/// Performance metrics for an output endpoint.
250#[derive(Debug, Default, Deserialize, Serialize, ToSchema, PartialEq, Eq, PartialOrd, Ord)]
251#[schema(as = OutputEndpointMetrics)]
252pub struct ExternalOutputEndpointMetrics {
253    /// Records sent on the underlying transport.
254    pub transmitted_records: u64,
255    /// Bytes sent on the underlying transport.
256    pub transmitted_bytes: u64,
257    /// Number of queued records.
258    pub queued_records: u64,
259    /// Number of queued batches.
260    pub queued_batches: u64,
261    /// Number of records pushed to the output buffer.
262    pub buffered_records: u64,
263    /// Number of batches in the buffer.
264    pub buffered_batches: u64,
265    /// Number of encoding errors.
266    pub num_encode_errors: u64,
267    /// Number of transport errors.
268    pub num_transport_errors: u64,
269    /// The number of input records processed by the circuit.
270    ///
271    /// This metric tracks the end-to-end progress of the pipeline: the output
272    /// of this endpoint is equal to the output of the circuit after
273    /// processing `total_processed_input_records` records.
274    ///
275    /// In a multihost pipeline, this count reflects only the input records
276    /// processed on the same host as the output endpoint, which is not usually
277    /// meaningful.
278    pub total_processed_input_records: u64,
279    /// The number of steps whose input records have been processed by the
280    /// endpoint.
281    ///
282    /// This is meaningful in a multihost pipeline because steps are
283    /// synchronized across all of the hosts.
284    ///
285    /// # Interpretation
286    ///
287    /// This is a count, not a step number.  If `total_processed_steps` is 0, no
288    /// steps have been processed to completion.  If `total_processed_steps >
289    /// 0`, then the last step whose input records have been processed to
290    /// completion is `total_processed_steps - 1`. A record that was ingested in
291    /// step `n` is fully processed when `total_processed_steps > n`.
292    #[schema(value_type = u64)]
293    pub total_processed_steps: Step,
294    /// Extra memory in use beyond that used for queuing records.
295    pub memory: u64,
296}
297
298/// Output endpoint status information.
299#[derive(Debug, Deserialize, Serialize, ToSchema)]
300#[schema(as = OutputEndpointStatus)]
301pub struct ExternalOutputEndpointStatus {
302    /// Endpoint name.
303    pub endpoint_name: String,
304    /// Endpoint configuration.
305    pub config: ShortEndpointConfig,
306    /// Performance metrics.
307    #[schema(value_type = OutputEndpointMetrics)]
308    pub metrics: ExternalOutputEndpointMetrics,
309    /// The first fatal error that occurred at the endpoint.
310    pub fatal_error: Option<String>,
311    /// Recent encoding errors on this endpoint.
312    #[serde(default, skip_serializing_if = "Option::is_none")]
313    pub encode_errors: Option<Vec<ConnectorError>>,
314    /// Recent transport errors on this endpoint.
315    #[serde(default, skip_serializing_if = "Option::is_none")]
316    pub transport_errors: Option<Vec<ConnectorError>>,
317    /// Health status of the connector.
318    #[serde(default)]
319    pub health: Option<ConnectorHealth>,
320}
321
322/// Global controller metrics.
323#[derive(Debug, Default, Serialize, Deserialize, ToSchema)]
324#[schema(as = GlobalControllerMetrics)]
325pub struct ExternalGlobalControllerMetrics {
326    /// State of the pipeline: running, paused, or terminating.
327    pub state: PipelineState,
328    /// The pipeline has been resumed from a checkpoint and is currently bootstrapping new and modified views.
329    pub bootstrap_in_progress: bool,
330    /// Status of the current transaction.
331    pub transaction_status: TransactionStatus,
332    /// ID of the current transaction or 0 if no transaction is in progress.
333    #[schema(value_type = i64)]
334    pub transaction_id: TransactionId,
335    /// Elapsed time in milliseconds, according to `transaction_status`:
336    ///
337    /// - [TransactionStatus::TransactionInProgress]: Time that this transaction
338    ///   has been in progress.
339    ///
340    /// - [TransactionStatus::CommitInProgress]: Time that this transaction has
341    ///   been committing.
342    pub transaction_msecs: Option<u64>,
343    /// Number of records in this transaction, according to
344    /// `transaction_status`:
345    ///
346    /// - [TransactionStatus::TransactionInProgress]: Number of records added so
347    ///   far.  More records might be added.
348    ///
349    /// - [TransactionStatus::CommitInProgress]: Final number of records.
350    pub transaction_records: Option<u64>,
351    /// Progress of the current transaction commit, if one is in progress.
352    pub commit_progress: Option<CommitProgressSummary>,
353    /// Entities that initiated the current transaction.
354    #[schema(value_type = TransactionInitiators)]
355    pub transaction_initiators: ExternalTransactionInitiators,
356    /// Resident set size of the pipeline process, in bytes.
357    pub rss_bytes: u64,
358    /// CPU time used by the pipeline across all threads, in milliseconds.
359    pub cpu_msecs: u64,
360    /// Time since the pipeline process started, including time that the
361    /// pipeline was running or paused.
362    ///
363    /// This is the elapsed time since `start_time`.
364    pub uptime_msecs: u64,
365    /// Time at which the pipeline process started, in seconds since the epoch.
366    #[serde(with = "chrono::serde::ts_seconds")]
367    #[schema(value_type = u64)]
368    pub start_time: DateTime<Utc>,
369    /// Uniquely identifies the pipeline process that started at start_time.
370    pub incarnation_uuid: Uuid,
371    /// Time at which the pipeline process from which we resumed started, in seconds since the epoch.
372    #[serde(with = "chrono::serde::ts_seconds")]
373    #[schema(value_type = u64)]
374    pub initial_start_time: DateTime<Utc>,
375    /// Current storage usage in bytes.
376    pub storage_bytes: u64,
377    /// Storage usage integrated over time, in megabytes * seconds.
378    pub storage_mb_secs: u64,
379    /// Time elapsed while the pipeline is executing a step, multiplied by the number of threads, in milliseconds.
380    pub runtime_elapsed_msecs: u64,
381    /// Total number of records currently buffered by all endpoints.
382    pub buffered_input_records: u64,
383    /// Total number of bytes currently buffered by all endpoints.
384    pub buffered_input_bytes: u64,
385    /// Total number of records received from all endpoints.
386    pub total_input_records: u64,
387    /// Total number of bytes received from all endpoints.
388    pub total_input_bytes: u64,
389    /// Total number of input records processed by the DBSP engine.
390    pub total_processed_records: u64,
391    /// Total bytes of input records processed by the DBSP engine.
392    pub total_processed_bytes: u64,
393    /// Total number of input records processed to completion.
394    pub total_completed_records: u64,
395    /// If the pipeline is stalled because one or more output connectors' output
396    /// buffers are full, this is the number of milliseconds that the current
397    /// stall has lasted.
398    ///
399    /// If this is nonzero, then the output connectors causing the stall can be
400    /// identified by noticing `ExternalOutputEndpointMetrics::queued_records`
401    /// is greater than or equal to `ConnectorConfig::max_queued_records`.
402    ///
403    /// In the ordinary case, the pipeline is not stalled, and this value is 0.
404    pub output_stall_msecs: u64,
405    /// Number of steps that have been initiated.
406    ///
407    /// # Interpretation
408    ///
409    /// This is a count, not a step number.  If `total_initiated_steps` is 0, no
410    /// steps have been initiated.  If `total_initiated_steps > 0`, then step
411    /// `total_initiated_steps - 1` has been started and all steps previous to
412    /// that have been completely processed by the circuit.
413    #[schema(value_type = u64)]
414    pub total_initiated_steps: Step,
415    /// Number of steps whose input records have been processed to completion.
416    ///
417    /// A record is processed to completion if it has been processed by the DBSP engine and
418    /// all outputs derived from it have been processed by all output connectors.
419    ///
420    /// # Interpretation
421    ///
422    /// This is a count, not a step number.  If `total_completed_steps` is 0, no
423    /// steps have been processed to completion.  If `total_completed_steps >
424    /// 0`, then the last step whose input records have been processed to
425    /// completion is `total_completed_steps - 1`. A record that was ingested
426    /// when `total_initiated_steps` was `n` is fully processed when
427    /// `total_completed_steps >= n`.
428    #[schema(value_type = u64)]
429    pub total_completed_steps: Step,
430    /// True if the pipeline has processed all input data to completion.
431    pub pipeline_complete: bool,
432}
433
434/// Complete pipeline statistics returned by the `/stats` endpoint.
435///
436/// This schema definition matches the serialized JSON structure from
437/// `adapters::controller::ControllerStatus`. The actual implementation with
438/// atomics and mutexes lives in the adapters crate, which uses ExternalControllerStatus to
439/// register this OpenAPI schema, making it available to pipeline-manager
440/// without requiring a direct dependency on the adapters crate.
441#[derive(Debug, Deserialize, Serialize, ToSchema, Default)]
442#[schema(as = ControllerStatus)]
443pub struct ExternalControllerStatus {
444    /// Global controller metrics.
445    #[schema(value_type = GlobalControllerMetrics)]
446    pub global_metrics: ExternalGlobalControllerMetrics,
447    /// Reason why the pipeline cannot be suspended or checkpointed (if any).
448    pub suspend_error: Option<SuspendError>,
449    /// Input endpoint configs and metrics.
450    #[schema(value_type = Vec<InputEndpointStatus>)]
451    pub inputs: Vec<ExternalInputEndpointStatus>,
452    /// Output endpoint configs and metrics.
453    #[schema(value_type = Vec<OutputEndpointStatus>)]
454    pub outputs: Vec<ExternalOutputEndpointStatus>,
455}
456
457fn serialize_timestamp_micros<S>(
458    timestamp: &DateTime<Utc>,
459    serializer: S,
460) -> Result<S::Ok, S::Error>
461where
462    S: serde::Serializer,
463{
464    serializer.serialize_str(&timestamp.to_rfc3339_opts(SecondsFormat::Micros, true))
465}
466
467#[cfg(test)]
468mod tests {
469    use super::ConnectorError;
470    use chrono::{DateTime, Utc};
471
472    #[test]
473    fn connector_error_timestamp_serializes_with_microsecond_precision() {
474        let error = ConnectorError {
475            timestamp: DateTime::parse_from_rfc3339("2026-03-08T05:26:42.442438448Z")
476                .unwrap()
477                .with_timezone(&Utc),
478            index: 1,
479            tag: None,
480            message: "boom".to_string(),
481        };
482
483        let json = serde_json::to_string(&error).unwrap();
484        assert!(json.contains(r#""timestamp":"2026-03-08T05:26:42.442438Z""#));
485    }
486}