feldera_types/format/
avro.rs

1use serde::{Deserialize, Serialize};
2use std::{collections::HashMap, fmt::Display};
3use utoipa::ToSchema;
4
5/// Supported Avro data change event formats.
6#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, ToSchema)]
7pub enum AvroUpdateFormat {
8    /// Raw encoding.
9    ///
10    /// Each message in the stream represents a single-record update: an insert, upsert, or delete.
11    ///
12    /// ### Input Connectors
13    /// Raw Avro encoding can be used for insert and upsert operations, but not deletes.
14    /// - The message value contains the record to be inserted or updated.
15    /// - The message key and headers are ignored.
16    ///
17    /// ### Output Connectors
18    /// The raw format supports inserts, upserts, and deletes.
19    /// - The message value contains the record to be inserted or deleted.
20    /// - The operation type is specified in the `op` message header field, which can be
21    ///   `insert`, `update`, or `delete`.
22    /// - The message key can optionally store the primary key (see the `key_mode` property).
23    #[serde(rename = "raw")]
24    Raw,
25
26    /// Debezium data change event format.
27    #[serde(rename = "debezium")]
28    Debezium,
29
30    /// Confluent JDBC connector change event format.
31    #[serde(rename = "confluent_jdbc")]
32    ConfluentJdbc,
33}
34
35impl Default for AvroUpdateFormat {
36    fn default() -> Self {
37        Self::Raw
38    }
39}
40
41impl Display for AvroUpdateFormat {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        match self {
44            Self::Raw => f.write_str("raw"),
45            Self::Debezium => f.write_str("debezium"),
46            Self::ConfluentJdbc => f.write_str("confluent_jdbc"),
47        }
48    }
49}
50
51/// Schema registry configuration.
52#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
53pub struct AvroSchemaRegistryConfig {
54    /// List of schema registry URLs.
55    ///
56    /// * **Input connector**: When non-empty, the connector retrieves Avro
57    ///   message schemas from the registry.
58    ///
59    /// * **Output connector**: When non-empty, the connector will
60    ///   post the schema to the registry and embed the schema id returned
61    ///   by the registry in Avro messages.  Otherwise, schema id 0 is used.
62    #[serde(default)]
63    pub registry_urls: Vec<String>,
64
65    /// Custom headers that will be added to every call to the schema registry.
66    ///
67    /// This property is only applicable to output connectors.
68    ///
69    /// Requires `registry_urls` to be set.
70    #[serde(default)]
71    pub registry_headers: HashMap<String, String>,
72
73    /// Proxy that will be used to access the schema registry.
74    ///
75    /// Requires `registry_urls` to be set.
76    pub registry_proxy: Option<String>,
77
78    /// Timeout in seconds used to connect to the registry.
79    ///
80    /// Requires `registry_urls` to be set.
81    pub registry_timeout_secs: Option<u64>,
82
83    /// Username used to authenticate with the registry.
84    ///
85    /// Requires `registry_urls` to be set. This option is mutually exclusive with
86    /// token-based authentication (see `registry_authorization_token`).
87    pub registry_username: Option<String>,
88
89    /// Password used to authenticate with the registry.
90    ///
91    /// Requires `registry_urls` to be set.
92    pub registry_password: Option<String>,
93
94    /// Token used to authenticate with the registry.
95    ///
96    /// Requires `registry_urls` to be set. This option is mutually exclusive with
97    /// password-based authentication (see `registry_username` and `registry_password`).
98    pub registry_authorization_token: Option<String>,
99}
100
101/// Avro output format configuration.
102#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
103#[serde(deny_unknown_fields)]
104pub struct AvroParserConfig {
105    /// Format used to encode data change events in this stream.
106    ///
107    /// The default value is 'raw'.
108    #[serde(default)]
109    pub update_format: AvroUpdateFormat,
110
111    /// Avro schema used to encode all records in this stream, specified as a JSON-encoded string.
112    ///
113    /// When this property is set, the connector uses the provided schema instead of
114    /// retrieving the schema from the schema registry. This setting is mutually exclusive
115    /// with `registry_urls`.
116    pub schema: Option<String>,
117
118    /// `true` if serialized messages only contain raw data without the
119    /// header carrying schema ID.
120    ///
121    /// See <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
122    ///
123    /// The default value is `false`.
124    #[serde(default)]
125    pub skip_schema_id: bool,
126
127    /// Schema registry configuration.
128    #[serde(flatten)]
129    pub registry_config: AvroSchemaRegistryConfig,
130}
131
132/// Subject name strategies used in registering key and value schemas
133/// with the schema registry.
134#[derive(Clone, Serialize, Deserialize, Debug, ToSchema)]
135pub enum SubjectNameStrategy {
136    /// The subject name is derived directly from the Kafka topic name.
137    ///
138    /// For update formats with both key and value components, use subject names
139    /// `{topic_name}-key` and `{topic_name}-value` for key and value schemas respectively.
140    /// For update formats without a key (e.g., `raw`), publish value schema
141    /// under the subject name `{topic_name}`.
142    ///
143    /// Only applicable when using Kafka as a transport.
144    #[serde(rename = "topic_name")]
145    TopicName,
146
147    /// The name of the SQL relation that the schema is derived from is used as the subject name:
148    /// * the SQL view name for the message value schema.
149    /// * the SQL index name for the message key schema.
150    #[serde(rename = "record_name")]
151    RecordName,
152
153    /// Combines both the topic name and the record name to form the subject.
154    ///
155    /// For update formats with both key and value components, use subject names
156    /// `{topic_name}-{record_name}-key` and `{topic_name}-{record_name}-value` for
157    /// key and value schemas respectively.
158    /// For update formats without a key (e.g., `raw`), publish value schema
159    /// under the subject name `{topic_name}-{record_name}`.
160    ///
161    /// `{record_name}` is the name of the SQL view or index that this connector
162    /// is attached to.
163    ///
164    /// Only applicable when using Kafka as a transport.
165    #[serde(rename = "topic_record_name")]
166    TopicRecordName,
167}
168
169/// Determines how the message key is generated when the Avro encoder is configured
170/// in the `raw` mode.
171#[derive(Clone, Serialize, Deserialize, Debug, ToSchema, PartialEq, Eq)]
172pub enum AvroEncoderKeyMode {
173    /// Produce messages without a key.
174    #[serde(rename = "none")]
175    None,
176
177    /// Uses the unique key columns of the view as the message key.
178    ///
179    /// This setting is supported when the output connector is configured with the `index` property.
180    /// It utilizes the values of the index columns specified in the associated `CREATE INDEX` statement
181    /// as the Avro message key.
182    ///
183    /// A separate Avro schema will be created and registered in the schema registry
184    /// for the key component of the message.
185    #[serde(rename = "key_fields")]
186    KeyFields,
187}
188
189/// Avro output format configuration.
190#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
191#[serde(deny_unknown_fields)]
192pub struct AvroEncoderConfig {
193    /// Format used to encode data change events in this stream.
194    ///
195    /// The default value is `raw`.
196    #[serde(default)]
197    pub update_format: AvroUpdateFormat,
198
199    /// Determines how the message key is generated when the Avro encoder is configured
200    /// in the `raw` mode.
201    ///
202    /// The default is `key_fields` when the `index` property of the connector is configured and `none` otherwise.
203    pub key_mode: Option<AvroEncoderKeyMode>,
204
205    /// Avro schema used to encode output records.
206    ///
207    /// When specified, the encoder will use this schema; otherwise it will automatically
208    /// generate an Avro schema based on the SQL view definition.
209    ///
210    /// Specified as a string containing schema definition in JSON format.
211    /// This schema must match precisely the SQL view definition, modulo
212    /// nullability of columns.
213    pub schema: Option<String>,
214
215    /// Optional name of the field used for Change Data Capture (CDC) annotations.
216    ///
217    /// Use this setting with data sinks that expect operation type
218    /// (insert, delete, or update) encoded as a column in the Avro record, such
219    /// as the [Iceberg Sink Kafka Connector](https://docs.feldera.com/connectors/sinks/iceberg).
220    ///
221    /// When set (e.g., `"cdc_field": "op"`), the specified field will be added to each record
222    /// to indicate the type of change:
223    /// - `"I"` for insert operations
224    /// - `"U"` for upserts
225    /// - `"D"` for deletions
226    ///
227    /// If not set, CDC metadata will not be included in the records.
228    /// Only works with the `raw` update format.
229    pub cdc_field: Option<String>,
230
231    /// Avro namespace for the generated Avro schemas.
232    pub namespace: Option<String>,
233
234    /// Subject name strategy used to publish Avro schemas used by the connector
235    /// in the schema registry.
236    ///
237    /// When this property is not specified, the connector chooses subject name strategy automatically:
238    /// * `topic_name` for `confluent_jdbc` update format
239    /// * `record_name` for `raw` update format
240    pub subject_name_strategy: Option<SubjectNameStrategy>,
241
242    /// Set to `true` if serialized messages should only contain raw data
243    /// without the header carrying schema ID.
244    /// `False` by default.
245    ///
246    /// See <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
247    #[serde(default)]
248    pub skip_schema_id: bool,
249
250    /// Schema registry configuration.
251    ///
252    /// When configured, the connector will push the Avro schema, whether it is specified as part of
253    /// connector configuration or generated automatically, to the schema registry and use the schema id
254    /// assigned by the registry in the
255    #[serde(flatten)]
256    pub registry_config: AvroSchemaRegistryConfig,
257}