Skip to main content

feldera_types/
adapter_stats.rs

1use bytemuck::NoUninit;
2use chrono::{DateTime, Utc};
3use serde::{Deserialize, Serialize};
4use serde_json::Value as JsonValue;
5use std::collections::BTreeMap;
6use utoipa::ToSchema;
7use uuid::Uuid;
8
9use crate::{
10    coordination::Step,
11    suspend::SuspendError,
12    transaction::{CommitProgressSummary, TransactionId},
13};
14
15/// Pipeline state.
16#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
17#[serde(rename_all = "PascalCase")]
18pub enum PipelineState {
19    /// All input endpoints are paused (or are in the process of being paused).
20    #[default]
21    Paused,
22    /// Controller is running.
23    Running,
24    /// Controller is being terminated.
25    Terminated,
26}
27
28// Metrics for an input endpoint.
29///
30/// Serializes to match the subset of fields needed for error tracking.
31#[derive(Default, Serialize, Deserialize, Debug, Clone)]
32pub struct InputEndpointErrorMetrics {
33    pub endpoint_name: String,
34    #[serde(default)]
35    pub num_transport_errors: u64,
36    #[serde(default)]
37    pub num_parse_errors: u64,
38}
39
40/// Metrics for an output endpoint.
41///
42/// Serializes to match the subset of fields needed for error tracking.
43#[derive(Default, Serialize, Deserialize, Debug, Clone)]
44pub struct OutputEndpointErrorMetrics {
45    pub endpoint_name: String,
46    #[serde(default)]
47    pub num_encode_errors: u64,
48    #[serde(default)]
49    pub num_transport_errors: u64,
50}
51
52/// Endpoint statistics containing metrics.
53///
54/// Wraps metrics in a structure that matches the full stats API shape.
55#[derive(Serialize, Deserialize, Debug, Clone)]
56pub struct EndpointErrorStats<T> {
57    #[serde(default)]
58    pub metrics: T,
59}
60
61/// Schema definition for endpoint config that only includes the stream field.
62#[derive(Debug, Deserialize, Serialize, ToSchema)]
63pub struct ShortEndpointConfig {
64    /// The name of the stream.
65    pub stream: String,
66}
67
68/// Pipeline error statistics response from the runtime.
69///
70/// Lightweight response containing only error counts from all endpoints.
71#[derive(Serialize, Deserialize, Debug, Clone)]
72pub struct PipelineStatsErrorsResponse {
73    #[serde(default)]
74    pub inputs: Vec<EndpointErrorStats<InputEndpointErrorMetrics>>,
75    #[serde(default)]
76    pub outputs: Vec<EndpointErrorStats<OutputEndpointErrorMetrics>>,
77}
78
79// OpenAPI schema definitions for controller statistics
80// These match the serialized JSON structure from the adapters crate
81
82/// Transaction status summarized as a single value.
83#[derive(
84    Debug, Default, Copy, PartialEq, Eq, Clone, NoUninit, Serialize, Deserialize, ToSchema,
85)]
86#[repr(u8)]
87pub enum TransactionStatus {
88    #[default]
89    NoTransaction,
90    TransactionInProgress,
91    CommitInProgress,
92}
93
94/// Transaction phase.
95#[derive(Clone, Copy, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
96#[serde(rename_all = "PascalCase")]
97#[schema(as = TransactionPhase)]
98pub enum ExternalTransactionPhase {
99    /// Transaction is in progress.
100    Started,
101    /// Transaction has been committed.
102    Committed,
103}
104
105/// Connector transaction phase with debugging label.
106#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize, ToSchema)]
107#[schema(as = ConnectorTransactionPhase)]
108pub struct ExternalConnectorTransactionPhase {
109    /// Current phase of the transaction.
110    #[schema(value_type = TransactionPhase)]
111    pub phase: ExternalTransactionPhase,
112    /// Optional label for debugging.
113    pub label: Option<String>,
114}
115
116/// Information about entities that initiated the current transaction.
117#[derive(Clone, Default, Debug, Deserialize, Serialize, ToSchema)]
118#[schema(as = TransactionInitiators)]
119pub struct ExternalTransactionInitiators {
120    /// ID assigned to the transaction (None if no transaction is in progress).
121    #[schema(value_type = Option<i64>)]
122    pub transaction_id: Option<TransactionId>,
123    /// Transaction phase initiated by the API.
124    #[schema(value_type = Option<TransactionPhase>)]
125    pub initiated_by_api: Option<ExternalTransactionPhase>,
126    /// Transaction phases initiated by connectors, indexed by endpoint name.
127    #[schema(value_type = BTreeMap<String, ConnectorTransactionPhase>)]
128    pub initiated_by_connectors: BTreeMap<String, ExternalConnectorTransactionPhase>,
129}
130
131/// A watermark that has been fully processed by the pipeline.
132#[derive(Clone, Debug, Deserialize, Serialize, ToSchema)]
133#[schema(as = CompletedWatermark)]
134pub struct ExternalCompletedWatermark {
135    /// Metadata that describes the position in the input stream (e.g., Kafka partition/offset pairs).
136    #[schema(value_type = Object)]
137    pub metadata: JsonValue,
138    /// Timestamp when the data was ingested from the wire.
139    pub ingested_at: String,
140    /// Timestamp when the data was processed by the circuit.
141    pub processed_at: String,
142    /// Timestamp when all outputs produced from this input have been pushed to all output endpoints.
143    pub completed_at: String,
144}
145
146/// Performance metrics for an input endpoint.
147#[derive(Debug, Default, Deserialize, Serialize, ToSchema)]
148#[schema(as = InputEndpointMetrics)]
149pub struct ExternalInputEndpointMetrics {
150    /// Total bytes pushed to the endpoint since it was created.
151    pub total_bytes: u64,
152    /// Total records pushed to the endpoint since it was created.
153    pub total_records: u64,
154    /// Number of records currently buffered by the endpoint (not yet consumed by the circuit).
155    pub buffered_records: u64,
156    /// Number of bytes currently buffered by the endpoint (not yet consumed by the circuit).
157    pub buffered_bytes: u64,
158    /// Number of transport errors.
159    pub num_transport_errors: u64,
160    /// Number of parse errors.
161    pub num_parse_errors: u64,
162    /// True if end-of-input has been signaled.
163    pub end_of_input: bool,
164}
165
166/// Input endpoint status information.
167#[derive(Debug, Serialize, Deserialize, ToSchema)]
168#[schema(as = InputEndpointStatus)]
169pub struct ExternalInputEndpointStatus {
170    /// Endpoint name.
171    pub endpoint_name: String,
172    /// Endpoint configuration.
173    pub config: ShortEndpointConfig,
174    /// Performance metrics.
175    #[schema(value_type = InputEndpointMetrics)]
176    pub metrics: ExternalInputEndpointMetrics,
177    /// The first fatal error that occurred at the endpoint.
178    pub fatal_error: Option<String>,
179    /// Endpoint has been paused by the user.
180    pub paused: bool,
181    /// Endpoint is currently a barrier to checkpointing and suspend.
182    pub barrier: bool,
183    /// The latest completed watermark.
184    #[schema(value_type = Option<CompletedWatermark>)]
185    pub completed_frontier: Option<ExternalCompletedWatermark>,
186}
187
188/// Performance metrics for an output endpoint.
189#[derive(Debug, Default, Deserialize, Serialize, ToSchema, PartialEq, Eq, PartialOrd, Ord)]
190#[schema(as = OutputEndpointMetrics)]
191pub struct ExternalOutputEndpointMetrics {
192    /// Records sent on the underlying transport.
193    pub transmitted_records: u64,
194    /// Bytes sent on the underlying transport.
195    pub transmitted_bytes: u64,
196    /// Number of queued records.
197    pub queued_records: u64,
198    /// Number of queued batches.
199    pub queued_batches: u64,
200    /// Number of records pushed to the output buffer.
201    pub buffered_records: u64,
202    /// Number of batches in the buffer.
203    pub buffered_batches: u64,
204    /// Number of encoding errors.
205    pub num_encode_errors: u64,
206    /// Number of transport errors.
207    pub num_transport_errors: u64,
208    /// The number of input records processed by the circuit.
209    pub total_processed_input_records: u64,
210    /// Extra memory in use beyond that used for queuing records.
211    pub memory: u64,
212}
213
214/// Output endpoint status information.
215#[derive(Debug, Deserialize, Serialize, ToSchema)]
216#[schema(as = OutputEndpointStatus)]
217pub struct ExternalOutputEndpointStatus {
218    /// Endpoint name.
219    pub endpoint_name: String,
220    /// Endpoint configuration.
221    pub config: ShortEndpointConfig,
222    /// Performance metrics.
223    #[schema(value_type = OutputEndpointMetrics)]
224    pub metrics: ExternalOutputEndpointMetrics,
225    /// The first fatal error that occurred at the endpoint.
226    pub fatal_error: Option<String>,
227}
228
229/// Global controller metrics.
230#[derive(Debug, Default, Serialize, Deserialize, ToSchema)]
231#[schema(as = GlobalControllerMetrics)]
232pub struct ExternalGlobalControllerMetrics {
233    /// State of the pipeline: running, paused, or terminating.
234    pub state: PipelineState,
235    /// The pipeline has been resumed from a checkpoint and is currently bootstrapping new and modified views.
236    pub bootstrap_in_progress: bool,
237    /// Status of the current transaction.
238    pub transaction_status: TransactionStatus,
239    /// ID of the current transaction or 0 if no transaction is in progress.
240    #[schema(value_type = i64)]
241    pub transaction_id: TransactionId,
242    /// Progress of the current transaction commit, if one is in progress.
243    pub commit_progress: Option<CommitProgressSummary>,
244    /// Entities that initiated the current transaction.
245    #[schema(value_type = TransactionInitiators)]
246    pub transaction_initiators: ExternalTransactionInitiators,
247    /// Resident set size of the pipeline process, in bytes.
248    pub rss_bytes: u64,
249    /// CPU time used by the pipeline across all threads, in milliseconds.
250    pub cpu_msecs: u64,
251    /// Time since the pipeline process started, including time that the
252    /// pipeline was running or paused.
253    ///
254    /// This is the elapsed time since `start_time`.
255    pub uptime_msecs: u64,
256    /// Time at which the pipeline process started, in seconds since the epoch.
257    #[serde(with = "chrono::serde::ts_seconds")]
258    #[schema(value_type = u64)]
259    pub start_time: DateTime<Utc>,
260    /// Uniquely identifies the pipeline process that started at start_time.
261    pub incarnation_uuid: Uuid,
262    /// Time at which the pipeline process from which we resumed started, in seconds since the epoch.
263    #[serde(with = "chrono::serde::ts_seconds")]
264    #[schema(value_type = u64)]
265    pub initial_start_time: DateTime<Utc>,
266    /// Current storage usage in bytes.
267    pub storage_bytes: u64,
268    /// Storage usage integrated over time, in megabytes * seconds.
269    pub storage_mb_secs: u64,
270    /// Time elapsed while the pipeline is executing a step, multiplied by the number of threads, in milliseconds.
271    pub runtime_elapsed_msecs: u64,
272    /// Total number of records currently buffered by all endpoints.
273    pub buffered_input_records: u64,
274    /// Total number of bytes currently buffered by all endpoints.
275    pub buffered_input_bytes: u64,
276    /// Total number of records received from all endpoints.
277    pub total_input_records: u64,
278    /// Total number of bytes received from all endpoints.
279    pub total_input_bytes: u64,
280    /// Total number of input records processed by the DBSP engine.
281    pub total_processed_records: u64,
282    /// Total bytes of input records processed by the DBSP engine.
283    pub total_processed_bytes: u64,
284    /// Total number of input records processed to completion.
285    pub total_completed_records: u64,
286    /// Number of steps that have been initiated.
287    ///
288    /// # Interpretation
289    ///
290    /// This is a count, not a step number.  If `total_initiated_steps` is 0, no
291    /// steps have been initiated.  If `total_initiated_steps > 0`, then step
292    /// `total_initiated_steps - 1` has been started and all steps previous to
293    /// that have been completely processed by the circuit.
294    #[schema(value_type = u64)]
295    pub total_initiated_steps: Step,
296    /// Number of steps whose input records have been processed to completion.
297    ///
298    /// A record is processed to completion if it has been processed by the DBSP engine and
299    /// all outputs derived from it have been processed by all output connectors.
300    ///
301    /// # Interpretation
302    ///
303    /// This is a count, not a step number.  If `total_completed_steps` is 0, no
304    /// steps have been processed to completion.  If `total_completed_steps >
305    /// 0`, then the last step whose input records have been processed to
306    /// completion is `total_completed_steps - 1`. A record that was ingested
307    /// when `total_initiated_steps` was `n` is fully processed when
308    /// `total_completed_steps >= n`.
309    #[schema(value_type = u64)]
310    pub total_completed_steps: Step,
311    /// True if the pipeline has processed all input data to completion.
312    pub pipeline_complete: bool,
313}
314
315/// Complete pipeline statistics returned by the `/stats` endpoint.
316///
317/// This schema definition matches the serialized JSON structure from
318/// `adapters::controller::ControllerStatus`. The actual implementation with
319/// atomics and mutexes lives in the adapters crate, which uses ExternalControllerStatus to
320/// register this OpenAPI schema, making it available to pipeline-manager
321/// without requiring a direct dependency on the adapters crate.
322#[derive(Debug, Deserialize, Serialize, ToSchema, Default)]
323#[schema(as = ControllerStatus)]
324pub struct ExternalControllerStatus {
325    /// Global controller metrics.
326    #[schema(value_type = GlobalControllerMetrics)]
327    pub global_metrics: ExternalGlobalControllerMetrics,
328    /// Reason why the pipeline cannot be suspended or checkpointed (if any).
329    pub suspend_error: Option<SuspendError>,
330    /// Input endpoint configs and metrics.
331    #[schema(value_type = Vec<InputEndpointStatus>)]
332    pub inputs: Vec<ExternalInputEndpointStatus>,
333    /// Output endpoint configs and metrics.
334    #[schema(value_type = Vec<OutputEndpointStatus>)]
335    pub outputs: Vec<ExternalOutputEndpointStatus>,
336}