Skip to main content

feldera_types/
adapter_stats.rs

1use bytemuck::NoUninit;
2use chrono::{DateTime, SecondsFormat, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use std::collections::BTreeMap;
6use utoipa::ToSchema;
7use uuid::Uuid;
8
9use crate::{
10    coordination::Step,
11    memory_pressure::MemoryPressure,
12    suspend::SuspendError,
13    transaction::{CommitProgressSummary, TransactionId},
14};
15
16/// Pipeline state.
17#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
18#[serde(rename_all = "PascalCase")]
19pub enum PipelineState {
20    /// All input endpoints are paused (or are in the process of being paused).
21    #[default]
22    Paused,
23    /// Controller is running.
24    Running,
25    /// Controller is being terminated.
26    Terminated,
27}
28
29// Metrics for an input endpoint.
30///
31/// Serializes to match the subset of fields needed for error tracking.
32#[derive(Default, Serialize, Deserialize, Debug, Clone)]
33pub struct InputEndpointErrorMetrics {
34    pub endpoint_name: String,
35    #[serde(default)]
36    pub num_transport_errors: u64,
37    #[serde(default)]
38    pub num_parse_errors: u64,
39}
40
41/// Metrics for an output endpoint.
42///
43/// Serializes to match the subset of fields needed for error tracking.
44#[derive(Default, Serialize, Deserialize, Debug, Clone)]
45pub struct OutputEndpointErrorMetrics {
46    pub endpoint_name: String,
47    #[serde(default)]
48    pub num_encode_errors: u64,
49    #[serde(default)]
50    pub num_transport_errors: u64,
51}
52
53/// Endpoint statistics containing metrics.
54///
55/// Wraps metrics in a structure that matches the full stats API shape.
56#[derive(Serialize, Deserialize, Debug, Clone)]
57pub struct EndpointErrorStats<T> {
58    #[serde(default)]
59    pub metrics: T,
60}
61
62/// Schema definition for endpoint config that only includes the stream field.
63#[derive(Debug, Deserialize, Serialize, ToSchema)]
64pub struct ShortEndpointConfig {
65    /// The name of the stream.
66    pub stream: String,
67}
68
69/// Pipeline error statistics response from the runtime.
70///
71/// Lightweight response containing only error counts from all endpoints.
72#[derive(Serialize, Deserialize, Debug, Clone)]
73pub struct PipelineStatsErrorsResponse {
74    #[serde(default)]
75    pub inputs: Vec<EndpointErrorStats<InputEndpointErrorMetrics>>,
76    #[serde(default)]
77    pub outputs: Vec<EndpointErrorStats<OutputEndpointErrorMetrics>>,
78}
79
80// OpenAPI schema definitions for controller statistics
81// These match the serialized JSON structure from the adapters crate
82
83/// Transaction status summarized as a single value.
84#[derive(
85    Debug, Default, Copy, PartialEq, Eq, Clone, NoUninit, Serialize, Deserialize, ToSchema,
86)]
87#[repr(u8)]
88pub enum TransactionStatus {
89    #[default]
90    NoTransaction,
91    TransactionInProgress,
92    CommitInProgress,
93}
94
95/// Transaction phase.
96#[derive(Clone, Copy, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
97#[serde(rename_all = "PascalCase")]
98#[schema(as = TransactionPhase)]
99pub enum ExternalTransactionPhase {
100    /// Transaction is in progress.
101    Started,
102    /// Transaction has been committed.
103    Committed,
104}
105
106/// Connector transaction phase with debugging label.
107#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
108#[schema(as = ConnectorTransactionPhase)]
109pub struct ExternalConnectorTransactionPhase {
110    /// Current phase of the transaction.
111    #[schema(value_type = TransactionPhase)]
112    pub phase: ExternalTransactionPhase,
113    /// Optional label for debugging.
114    pub label: Option<String>,
115}
116
117/// Information about entities that initiated the current transaction.
118#[derive(Clone, Default, Debug, Deserialize, Serialize, ToSchema)]
119#[schema(as = TransactionInitiators)]
120pub struct ExternalTransactionInitiators {
121    /// ID assigned to the transaction (None if no transaction is in progress).
122    #[schema(value_type = Option<i64>)]
123    pub transaction_id: Option<TransactionId>,
124    /// Transaction phase initiated by the API.
125    #[schema(value_type = Option<TransactionPhase>)]
126    pub initiated_by_api: Option<ExternalTransactionPhase>,
127    /// Transaction phases initiated by connectors, indexed by endpoint name.
128    #[schema(value_type = BTreeMap<String, ConnectorTransactionPhase>)]
129    pub initiated_by_connectors: BTreeMap<String, ExternalConnectorTransactionPhase>,
130}
131
132/// A watermark that has been fully processed by the pipeline.
133#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
134pub struct CompletedWatermark {
135    /// Metadata that describes the position in the input stream (e.g., Kafka partition/offset pairs).
136    #[schema(value_type = Object)]
137    pub metadata: JsonValue,
138    /// Timestamp when the data was ingested from the wire.
139    #[serde(serialize_with = "serialize_timestamp_micros")]
140    pub ingested_at: DateTime<Utc>,
141    /// Timestamp when the data was processed by the circuit.
142    #[serde(serialize_with = "serialize_timestamp_micros")]
143    pub processed_at: DateTime<Utc>,
144    /// Timestamp when all outputs produced from this input have been pushed to all output endpoints.
145    #[serde(serialize_with = "serialize_timestamp_micros")]
146    pub completed_at: DateTime<Utc>,
147}
148
149#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
150pub enum ConnectorHealthStatus {
151    #[default]
152    Healthy,
153    Unhealthy,
154}
155
156#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
157pub struct ConnectorHealth {
158    pub status: ConnectorHealthStatus,
159    pub description: Option<String>,
160}
161
162impl ConnectorHealth {
163    pub fn healthy() -> Self {
164        Self {
165            status: ConnectorHealthStatus::Healthy,
166            description: None,
167        }
168    }
169    pub fn unhealthy(description: &str) -> Self {
170        Self {
171            status: ConnectorHealthStatus::Unhealthy,
172            description: Some(description.to_string()),
173        }
174    }
175}
176
177#[derive(Debug, Default, Deserialize, Serialize, ToSchema, Clone)]
178pub struct ConnectorError {
179    /// Timestamp when the error occurred, serialized as RFC3339 with microseconds.
180    #[serde(serialize_with = "serialize_timestamp_micros")]
181    pub timestamp: DateTime<Utc>,
182
183    /// Sequence number of the error.
184    ///
185    /// The client can use this field to detect gaps in the error list reported
186    /// by the pipeline. When the connector reports a large number of errors, the
187    /// pipeline will only preserve and report the most recent errors of each kind.
188    pub index: u64,
189
190    /// Optional tag for the error.
191    ///
192    /// The tag is used to group errors by their type.
193    pub tag: Option<String>,
194
195    /// Error message.
196    pub message: String,
197}
198
199/// Performance metrics for an input endpoint.
200#[derive(Debug, Default, Deserialize, Serialize, ToSchema)]
201#[schema(as = InputEndpointMetrics)]
202pub struct ExternalInputEndpointMetrics {
203    /// Total bytes pushed to the endpoint since it was created.
204    pub total_bytes: u64,
205    /// Total records pushed to the endpoint since it was created.
206    pub total_records: u64,
207    /// Number of records currently buffered by the endpoint (not yet consumed by the circuit).
208    pub buffered_records: u64,
209    /// Number of bytes currently buffered by the endpoint (not yet consumed by the circuit).
210    pub buffered_bytes: u64,
211    /// Number of transport errors.
212    pub num_transport_errors: u64,
213    /// Number of parse errors.
214    pub num_parse_errors: u64,
215    /// True if end-of-input has been signaled.
216    pub end_of_input: bool,
217}
218
219/// Input endpoint status information.
220#[derive(Debug, Serialize, Deserialize, ToSchema)]
221#[schema(as = InputEndpointStatus)]
222pub struct ExternalInputEndpointStatus {
223    /// Endpoint name.
224    pub endpoint_name: String,
225    /// Endpoint configuration.
226    pub config: ShortEndpointConfig,
227    /// Performance metrics.
228    #[schema(value_type = InputEndpointMetrics)]
229    pub metrics: ExternalInputEndpointMetrics,
230    /// The first fatal error that occurred at the endpoint.
231    pub fatal_error: Option<String>,
232    /// Recent parse errors on this endpoint.
233    #[serde(default, skip_serializing_if = "Option::is_none")]
234    pub parse_errors: Option<Vec<ConnectorError>>,
235    /// Recent transport errors on this endpoint.
236    #[serde(default, skip_serializing_if = "Option::is_none")]
237    pub transport_errors: Option<Vec<ConnectorError>>,
238    /// Health status of the connector.
239    #[serde(default)]
240    pub health: Option<ConnectorHealth>,
241    /// Endpoint has been paused by the user.
242    pub paused: bool,
243    /// Endpoint is currently a barrier to checkpointing and suspend.
244    pub barrier: bool,
245    /// The latest completed watermark.
246    #[schema(value_type = Option<CompletedWatermark>)]
247    pub completed_frontier: Option<CompletedWatermark>,
248}
249
250/// Performance metrics for an output endpoint.
251#[derive(Debug, Default, Deserialize, Serialize, ToSchema, PartialEq, Eq, PartialOrd, Ord)]
252#[schema(as = OutputEndpointMetrics)]
253pub struct ExternalOutputEndpointMetrics {
254    /// Records sent on the underlying transport.
255    pub transmitted_records: u64,
256    /// Bytes sent on the underlying transport.
257    pub transmitted_bytes: u64,
258    /// Number of queued records.
259    pub queued_records: u64,
260    /// Number of queued batches.
261    pub queued_batches: u64,
262    /// Number of records pushed to the output buffer.
263    pub buffered_records: u64,
264    /// Number of batches in the buffer.
265    pub buffered_batches: u64,
266    /// Number of encoding errors.
267    pub num_encode_errors: u64,
268    /// Number of transport errors.
269    pub num_transport_errors: u64,
270    /// The number of input records processed by the circuit.
271    ///
272    /// This metric tracks the end-to-end progress of the pipeline: the output
273    /// of this endpoint is equal to the output of the circuit after
274    /// processing `total_processed_input_records` records.
275    ///
276    /// In a multihost pipeline, this count reflects only the input records
277    /// processed on the same host as the output endpoint, which is not usually
278    /// meaningful.
279    pub total_processed_input_records: u64,
280    /// The number of steps whose input records have been processed by the
281    /// endpoint.
282    ///
283    /// This is meaningful in a multihost pipeline because steps are
284    /// synchronized across all of the hosts.
285    ///
286    /// # Interpretation
287    ///
288    /// This is a count, not a step number.  If `total_processed_steps` is 0, no
289    /// steps have been processed to completion.  If `total_processed_steps >
290    /// 0`, then the last step whose input records have been processed to
291    /// completion is `total_processed_steps - 1`. A record that was ingested in
292    /// step `n` is fully processed when `total_processed_steps > n`.
293    #[schema(value_type = u64)]
294    pub total_processed_steps: Step,
295    /// Extra memory in use beyond that used for queuing records.
296    pub memory: u64,
297}
298
299/// Output endpoint status information.
300#[derive(Debug, Deserialize, Serialize, ToSchema)]
301#[schema(as = OutputEndpointStatus)]
302pub struct ExternalOutputEndpointStatus {
303    /// Endpoint name.
304    pub endpoint_name: String,
305    /// Endpoint configuration.
306    pub config: ShortEndpointConfig,
307    /// Performance metrics.
308    #[schema(value_type = OutputEndpointMetrics)]
309    pub metrics: ExternalOutputEndpointMetrics,
310    /// The first fatal error that occurred at the endpoint.
311    pub fatal_error: Option<String>,
312    /// Recent encoding errors on this endpoint.
313    #[serde(default, skip_serializing_if = "Option::is_none")]
314    pub encode_errors: Option<Vec<ConnectorError>>,
315    /// Recent transport errors on this endpoint.
316    #[serde(default, skip_serializing_if = "Option::is_none")]
317    pub transport_errors: Option<Vec<ConnectorError>>,
318    /// Health status of the connector.
319    #[serde(default)]
320    pub health: Option<ConnectorHealth>,
321}
322
323/// Global controller metrics.
324#[derive(Debug, Default, Serialize, Deserialize, ToSchema)]
325#[schema(as = GlobalControllerMetrics)]
326pub struct ExternalGlobalControllerMetrics {
327    /// State of the pipeline: running, paused, or terminating.
328    pub state: PipelineState,
329    /// The pipeline has been resumed from a checkpoint and is currently bootstrapping new and modified views.
330    pub bootstrap_in_progress: bool,
331    /// Status of the current transaction.
332    pub transaction_status: TransactionStatus,
333    /// ID of the current transaction or 0 if no transaction is in progress.
334    #[schema(value_type = i64)]
335    pub transaction_id: TransactionId,
336    /// Elapsed time in milliseconds, according to `transaction_status`:
337    ///
338    /// - [TransactionStatus::TransactionInProgress]: Time that this transaction
339    ///   has been in progress.
340    ///
341    /// - [TransactionStatus::CommitInProgress]: Time that this transaction has
342    ///   been committing.
343    pub transaction_msecs: Option<u64>,
344    /// Number of records in this transaction, according to
345    /// `transaction_status`:
346    ///
347    /// - [TransactionStatus::TransactionInProgress]: Number of records added so
348    ///   far.  More records might be added.
349    ///
350    /// - [TransactionStatus::CommitInProgress]: Final number of records.
351    pub transaction_records: Option<u64>,
352    /// Progress of the current transaction commit, if one is in progress.
353    pub commit_progress: Option<CommitProgressSummary>,
354    /// Entities that initiated the current transaction.
355    #[schema(value_type = TransactionInitiators)]
356    pub transaction_initiators: ExternalTransactionInitiators,
357    /// Resident set size of the pipeline process, in bytes.
358    pub rss_bytes: u64,
359    /// Memory pressure.
360    pub memory_pressure: MemoryPressure,
361    /// Memory pressure epoch.
362    pub memory_pressure_epoch: u64,
363    /// CPU time used by the pipeline across all threads, in milliseconds.
364    pub cpu_msecs: u64,
365    /// Time since the pipeline process started, including time that the
366    /// pipeline was running or paused.
367    ///
368    /// This is the elapsed time since `start_time`.
369    pub uptime_msecs: u64,
370    /// Time at which the pipeline process started, in seconds since the epoch.
371    #[serde(with = "chrono::serde::ts_seconds")]
372    #[schema(value_type = u64)]
373    pub start_time: DateTime<Utc>,
374    /// Uniquely identifies the pipeline process that started at start_time.
375    pub incarnation_uuid: Uuid,
376    /// Time at which the pipeline process from which we resumed started, in seconds since the epoch.
377    #[serde(with = "chrono::serde::ts_seconds")]
378    #[schema(value_type = u64)]
379    pub initial_start_time: DateTime<Utc>,
380    /// Current storage usage in bytes.
381    pub storage_bytes: u64,
382    /// Storage usage integrated over time, in megabytes * seconds.
383    pub storage_mb_secs: u64,
384    /// Time elapsed while the pipeline is executing a step, multiplied by the number of threads, in milliseconds.
385    pub runtime_elapsed_msecs: u64,
386    /// Total number of records currently buffered by all endpoints.
387    pub buffered_input_records: u64,
388    /// Total number of bytes currently buffered by all endpoints.
389    pub buffered_input_bytes: u64,
390    /// Total number of records received from all endpoints.
391    pub total_input_records: u64,
392    /// Total number of bytes received from all endpoints.
393    pub total_input_bytes: u64,
394    /// Total number of input records processed by the DBSP engine.
395    pub total_processed_records: u64,
396    /// Total bytes of input records processed by the DBSP engine.
397    pub total_processed_bytes: u64,
398    /// Total number of input records processed to completion.
399    pub total_completed_records: u64,
400    /// If the pipeline is stalled because one or more output connectors' output
401    /// buffers are full, this is the number of milliseconds that the current
402    /// stall has lasted.
403    ///
404    /// If this is nonzero, then the output connectors causing the stall can be
405    /// identified by noticing `ExternalOutputEndpointMetrics::queued_records`
406    /// is greater than or equal to `ConnectorConfig::max_queued_records`.
407    ///
408    /// In the ordinary case, the pipeline is not stalled, and this value is 0.
409    pub output_stall_msecs: u64,
410    /// Number of steps that have been initiated.
411    ///
412    /// # Interpretation
413    ///
414    /// This is a count, not a step number.  If `total_initiated_steps` is 0, no
415    /// steps have been initiated.  If `total_initiated_steps > 0`, then step
416    /// `total_initiated_steps - 1` has been started and all steps previous to
417    /// that have been completely processed by the circuit.
418    #[schema(value_type = u64)]
419    pub total_initiated_steps: Step,
420    /// Number of steps whose input records have been processed to completion.
421    ///
422    /// A record is processed to completion if it has been processed by the DBSP engine and
423    /// all outputs derived from it have been processed by all output connectors.
424    ///
425    /// # Interpretation
426    ///
427    /// This is a count, not a step number.  If `total_completed_steps` is 0, no
428    /// steps have been processed to completion.  If `total_completed_steps >
429    /// 0`, then the last step whose input records have been processed to
430    /// completion is `total_completed_steps - 1`. A record that was ingested
431    /// when `total_initiated_steps` was `n` is fully processed when
432    /// `total_completed_steps >= n`.
433    #[schema(value_type = u64)]
434    pub total_completed_steps: Step,
435    /// True if the pipeline has processed all input data to completion.
436    pub pipeline_complete: bool,
437}
438
439/// Complete pipeline statistics returned by the `/stats` endpoint.
440///
441/// This schema definition matches the serialized JSON structure from
442/// `adapters::controller::ControllerStatus`. The actual implementation with
443/// atomics and mutexes lives in the adapters crate, which uses ExternalControllerStatus to
444/// register this OpenAPI schema, making it available to pipeline-manager
445/// without requiring a direct dependency on the adapters crate.
446#[derive(Debug, Deserialize, Serialize, ToSchema, Default)]
447#[schema(as = ControllerStatus)]
448pub struct ExternalControllerStatus {
449    /// Global controller metrics.
450    #[schema(value_type = GlobalControllerMetrics)]
451    pub global_metrics: ExternalGlobalControllerMetrics,
452    /// Reason why the pipeline cannot be suspended or checkpointed (if any).
453    pub suspend_error: Option<SuspendError>,
454    /// Input endpoint configs and metrics.
455    #[schema(value_type = Vec<InputEndpointStatus>)]
456    pub inputs: Vec<ExternalInputEndpointStatus>,
457    /// Output endpoint configs and metrics.
458    #[schema(value_type = Vec<OutputEndpointStatus>)]
459    pub outputs: Vec<ExternalOutputEndpointStatus>,
460}
461
462fn serialize_timestamp_micros<S>(
463    timestamp: &DateTime<Utc>,
464    serializer: S,
465) -> Result<S::Ok, S::Error>
466where
467    S: serde::Serializer,
468{
469    serializer.serialize_str(&timestamp.to_rfc3339_opts(SecondsFormat::Micros, true))
470}
471
472#[cfg(test)]
473mod tests {
474    use super::ConnectorError;
475    use chrono::{DateTime, Utc};
476
477    #[test]
478    fn connector_error_timestamp_serializes_with_microsecond_precision() {
479        let error = ConnectorError {
480            timestamp: DateTime::parse_from_rfc3339("2026-03-08T05:26:42.442438448Z")
481                .unwrap()
482                .with_timezone(&Utc),
483            index: 1,
484            tag: None,
485            message: "boom".to_string(),
486        };
487
488        let json = serde_json::to_string(&error).unwrap();
489        assert!(json.contains(r#""timestamp":"2026-03-08T05:26:42.442438Z""#));
490    }
491}