feldera_types/
adapter_stats.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize, Serializer};
3use serde_json::Value as JsonValue;
4use std::collections::BTreeMap;
5use utoipa::ToSchema;
6use uuid::Uuid;
7
8use crate::{
9    config::InputEndpointConfig, config::OutputEndpointConfig, suspend::SuspendError,
10    transaction::TransactionId,
11};
12
13/// Pipeline state.
14#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, ToSchema)]
15#[serde(rename_all = "PascalCase")]
16pub enum PipelineState {
17    /// All input endpoints are paused (or are in the process of being paused).
18    #[default]
19    Paused,
20    /// Controller is running.
21    Running,
22    /// Controller is being terminated.
23    Terminated,
24}
25
26// Metrics for an input endpoint.
27///
28/// Serializes to match the subset of fields needed for error tracking.
29#[derive(Default, Serialize, Deserialize, Debug, Clone)]
30pub struct InputEndpointErrorMetrics {
31    pub endpoint_name: String,
32    #[serde(default)]
33    pub num_transport_errors: u64,
34    #[serde(default)]
35    pub num_parse_errors: u64,
36}
37
38/// Metrics for an output endpoint.
39///
40/// Serializes to match the subset of fields needed for error tracking.
41#[derive(Default, Serialize, Deserialize, Debug, Clone)]
42pub struct OutputEndpointErrorMetrics {
43    pub endpoint_name: String,
44    #[serde(default)]
45    pub num_encode_errors: u64,
46    #[serde(default)]
47    pub num_transport_errors: u64,
48}
49
50/// Endpoint statistics containing metrics.
51///
52/// Wraps metrics in a structure that matches the full stats API shape.
53#[derive(Serialize, Deserialize, Debug, Clone)]
54pub struct EndpointErrorStats<T> {
55    #[serde(default)]
56    pub metrics: T,
57}
58
59/// Helper struct for serializing only the stream name from endpoint configs.
60#[derive(Serialize)]
61struct StreamOnly<'a> {
62    stream: &'a str,
63}
64
65/// Schema definition for endpoint config that only includes the stream field.
66#[derive(ToSchema)]
67#[schema(as = ShortEndpointConfig)]
68pub struct ShortEndpointConfig {
69    /// The name of the stream.
70    #[allow(dead_code)]
71    stream: String,
72}
73
74/// Serialize only `config.stream`, omitting other fields.
75pub fn serialize_input_endpoint_config<S>(
76    config: &InputEndpointConfig,
77    serializer: S,
78) -> Result<S::Ok, S::Error>
79where
80    S: Serializer,
81{
82    StreamOnly {
83        stream: &config.stream,
84    }
85    .serialize(serializer)
86}
87
88/// Serialize only `config.stream`, omitting other fields.
89pub fn serialize_output_endpoint_config<S>(
90    config: &OutputEndpointConfig,
91    serializer: S,
92) -> Result<S::Ok, S::Error>
93where
94    S: Serializer,
95{
96    StreamOnly {
97        stream: &config.stream,
98    }
99    .serialize(serializer)
100}
101
102/// Pipeline error statistics response from the runtime.
103///
104/// Lightweight response containing only error counts from all endpoints.
105#[derive(Serialize, Deserialize, Debug, Clone)]
106pub struct PipelineStatsErrorsResponse {
107    #[serde(default)]
108    pub inputs: Vec<EndpointErrorStats<InputEndpointErrorMetrics>>,
109    #[serde(default)]
110    pub outputs: Vec<EndpointErrorStats<OutputEndpointErrorMetrics>>,
111}
112
113// OpenAPI schema definitions for controller statistics
114// These match the serialized JSON structure from the adapters crate
115
116/// Transaction phase.
117#[derive(Clone, Copy, PartialEq, Eq, Debug, Serialize, ToSchema)]
118#[serde(rename_all = "PascalCase")]
119#[schema(as = TransactionPhase)]
120pub enum ExternalTransactionPhase {
121    /// Transaction is in progress.
122    Started,
123    /// Transaction has been committed.
124    Committed,
125}
126
127/// Connector transaction phase with debugging label.
128#[derive(Clone, PartialEq, Eq, Debug, Serialize, ToSchema)]
129#[schema(as = ConnectorTransactionPhase)]
130pub struct ExternalConnectorTransactionPhase {
131    /// Current phase of the transaction.
132    #[schema(value_type = TransactionPhase)]
133    pub phase: ExternalTransactionPhase,
134    /// Optional label for debugging.
135    pub label: Option<String>,
136}
137
138/// Information about entities that initiated the current transaction.
139#[derive(Clone, Default, Debug, Serialize, ToSchema)]
140#[schema(as = TransactionInitiators)]
141pub struct ExternalTransactionInitiators {
142    /// ID assigned to the transaction (None if no transaction is in progress).
143    #[schema(value_type = Option<i64>)]
144    pub transaction_id: Option<TransactionId>,
145    /// Transaction phase initiated by the API.
146    #[schema(value_type = Option<TransactionPhase>)]
147    pub initiated_by_api: Option<ExternalTransactionPhase>,
148    /// Transaction phases initiated by connectors, indexed by endpoint name.
149    #[schema(value_type = BTreeMap<String, ConnectorTransactionPhase>)]
150    pub initiated_by_connectors: BTreeMap<String, ExternalConnectorTransactionPhase>,
151}
152
153/// A watermark that has been fully processed by the pipeline.
154#[derive(Clone, Debug, Serialize, ToSchema)]
155#[schema(as = CompletedWatermark)]
156pub struct ExternalCompletedWatermark {
157    /// Metadata that describes the position in the input stream (e.g., Kafka partition/offset pairs).
158    #[schema(value_type = Object)]
159    pub metadata: JsonValue,
160    /// Timestamp when the data was ingested from the wire.
161    pub ingested_at: String,
162    /// Timestamp when the data was processed by the circuit.
163    pub processed_at: String,
164    /// Timestamp when all outputs produced from this input have been pushed to all output endpoints.
165    pub completed_at: String,
166}
167
168/// Performance metrics for an input endpoint.
169#[derive(Default, Serialize, ToSchema)]
170#[schema(as = InputEndpointMetrics)]
171pub struct ExternalInputEndpointMetrics {
172    /// Total bytes pushed to the endpoint since it was created.
173    pub total_bytes: u64,
174    /// Total records pushed to the endpoint since it was created.
175    pub total_records: u64,
176    /// Number of records currently buffered by the endpoint (not yet consumed by the circuit).
177    pub buffered_records: u64,
178    /// Number of bytes currently buffered by the endpoint (not yet consumed by the circuit).
179    pub buffered_bytes: u64,
180    /// Number of transport errors.
181    pub num_transport_errors: u64,
182    /// Number of parse errors.
183    pub num_parse_errors: u64,
184    /// True if end-of-input has been signaled.
185    pub end_of_input: bool,
186}
187
188/// Input endpoint status information.
189#[derive(Serialize, ToSchema)]
190#[schema(as = InputEndpointStatus)]
191pub struct ExternalInputEndpointStatus {
192    /// Endpoint name.
193    pub endpoint_name: String,
194    /// Endpoint configuration.
195    #[serde(serialize_with = "serialize_input_endpoint_config")]
196    #[schema(value_type = ShortEndpointConfig)]
197    pub config: InputEndpointConfig,
198    /// Performance metrics.
199    #[schema(value_type = InputEndpointMetrics)]
200    pub metrics: ExternalInputEndpointMetrics,
201    /// The first fatal error that occurred at the endpoint.
202    pub fatal_error: Option<String>,
203    /// Endpoint has been paused by the user.
204    pub paused: bool,
205    /// Endpoint is currently a barrier to checkpointing and suspend.
206    pub barrier: bool,
207    /// The latest completed watermark.
208    #[schema(value_type = Option<CompletedWatermark>)]
209    pub completed_frontier: Option<ExternalCompletedWatermark>,
210}
211
212/// Performance metrics for an output endpoint.
213#[derive(Default, Serialize, ToSchema)]
214#[schema(as = OutputEndpointMetrics)]
215pub struct ExternalOutputEndpointMetrics {
216    /// Records sent on the underlying transport.
217    pub transmitted_records: u64,
218    /// Bytes sent on the underlying transport.
219    pub transmitted_bytes: u64,
220    /// Number of queued records.
221    pub queued_records: u64,
222    /// Number of queued batches.
223    pub queued_batches: u64,
224    /// Number of records pushed to the output buffer.
225    pub buffered_records: u64,
226    /// Number of batches in the buffer.
227    pub buffered_batches: u64,
228    /// Number of encoding errors.
229    pub num_encode_errors: u64,
230    /// Number of transport errors.
231    pub num_transport_errors: u64,
232    /// The number of input records processed by the circuit.
233    pub total_processed_input_records: u64,
234    /// Extra memory in use beyond that used for queuing records.
235    pub memory: u64,
236}
237
238/// Output endpoint status information.
239#[derive(Serialize, ToSchema)]
240#[schema(as = OutputEndpointStatus)]
241pub struct ExternalOutputEndpointStatus {
242    /// Endpoint name.
243    pub endpoint_name: String,
244    /// Endpoint configuration.
245    #[serde(serialize_with = "serialize_output_endpoint_config")]
246    #[schema(value_type = ShortEndpointConfig)]
247    pub config: OutputEndpointConfig,
248    /// Performance metrics.
249    #[schema(value_type = OutputEndpointMetrics)]
250    pub metrics: ExternalOutputEndpointMetrics,
251    /// The first fatal error that occurred at the endpoint.
252    pub fatal_error: Option<String>,
253}
254
255/// Global controller metrics.
256#[derive(Default, Serialize, ToSchema)]
257#[schema(as = GlobalControllerMetrics)]
258pub struct ExternalGlobalControllerMetrics {
259    /// State of the pipeline: running, paused, or terminating.
260    pub state: PipelineState,
261    /// The pipeline has been resumed from a checkpoint and is currently bootstrapping new and modified views.
262    pub bootstrap_in_progress: bool,
263    /// Status of the current transaction.
264    pub transaction_status: String,
265    /// ID of the current transaction or 0 if no transaction is in progress.
266    #[schema(value_type = i64)]
267    pub transaction_id: TransactionId,
268    /// Entities that initiated the current transaction.
269    #[schema(value_type = TransactionInitiators)]
270    pub transaction_initiators: ExternalTransactionInitiators,
271    /// Resident set size of the pipeline process, in bytes.
272    pub rss_bytes: u64,
273    /// CPU time used by the pipeline across all threads, in milliseconds.
274    pub cpu_msecs: u64,
275    /// Time since the pipeline process started, in milliseconds.
276    pub uptime_msecs: u64,
277    /// Time at which the pipeline process started, in seconds since the epoch.
278    #[serde(with = "chrono::serde::ts_seconds")]
279    #[schema(value_type = u64)]
280    pub start_time: DateTime<Utc>,
281    /// Uniquely identifies the pipeline process that started at start_time.
282    pub incarnation_uuid: Uuid,
283    /// Time at which the pipeline process from which we resumed started, in seconds since the epoch.
284    #[serde(with = "chrono::serde::ts_seconds")]
285    #[schema(value_type = u64)]
286    pub initial_start_time: DateTime<Utc>,
287    /// Current storage usage in bytes.
288    pub storage_bytes: u64,
289    /// Storage usage integrated over time, in megabytes * seconds.
290    pub storage_mb_secs: u64,
291    /// Time elapsed while the pipeline is executing a step, multiplied by the number of threads, in milliseconds.
292    pub runtime_elapsed_msecs: u64,
293    /// Total number of records currently buffered by all endpoints.
294    pub buffered_input_records: u64,
295    /// Total number of bytes currently buffered by all endpoints.
296    pub buffered_input_bytes: u64,
297    /// Total number of records received from all endpoints.
298    pub total_input_records: u64,
299    /// Total number of bytes received from all endpoints.
300    pub total_input_bytes: u64,
301    /// Total number of input records processed by the DBSP engine.
302    pub total_processed_records: u64,
303    /// Total bytes of input records processed by the DBSP engine.
304    pub total_processed_bytes: u64,
305    /// Total number of input records processed to completion.
306    pub total_completed_records: u64,
307    /// True if the pipeline has processed all input data to completion.
308    pub pipeline_complete: bool,
309}
310
311/// Complete pipeline statistics returned by the `/stats` endpoint.
312///
313/// This schema definition matches the serialized JSON structure from
314/// `adapters::controller::ControllerStatus`. The actual implementation with
315/// atomics and mutexes lives in the adapters crate, which uses ExternalControllerStatus to
316/// register this OpenAPI schema, making it available to pipeline-manager
317/// without requiring a direct dependency on the adapters crate.
318#[derive(Serialize, ToSchema)]
319#[schema(as = ControllerStatus)]
320pub struct ExternalControllerStatus {
321    /// Global controller metrics.
322    #[schema(value_type = GlobalControllerMetrics)]
323    pub global_metrics: ExternalGlobalControllerMetrics,
324    /// Reason why the pipeline cannot be suspended or checkpointed (if any).
325    pub suspend_error: Option<SuspendError>,
326    /// Input endpoint configs and metrics.
327    #[schema(value_type = Vec<InputEndpointStatus>)]
328    pub inputs: Vec<ExternalInputEndpointStatus>,
329    /// Output endpoint configs and metrics.
330    #[schema(value_type = Vec<OutputEndpointStatus>)]
331    pub outputs: Vec<ExternalOutputEndpointStatus>,
332}