feldera_types/format/
avro.rs

1use serde::{Deserialize, Serialize};
2use std::{collections::HashMap, fmt::Display};
3use utoipa::ToSchema;
4
5/// Supported Avro data change event formats.
6#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, ToSchema, Default)]
7pub enum AvroUpdateFormat {
8    /// Raw encoding.
9    ///
10    /// Each message in the stream represents a single-record update: an insert, upsert, or delete.
11    ///
12    /// ### Input Connectors
13    /// Raw Avro encoding can be used for insert and upsert operations, but not deletes.
14    /// - The message value contains the record to be inserted or updated.
15    /// - The message key and headers are ignored.
16    ///
17    /// ### Output Connectors
18    /// The raw format supports inserts, upserts, and deletes.
19    /// - The message value contains the record to be inserted or deleted.
20    /// - The operation type is specified in the `op` message header field, which can be
21    ///   `insert`, `update`, or `delete`.
22    /// - The message key can optionally store the primary key (see the `key_mode` property).
23    #[serde(rename = "raw")]
24    #[default]
25    Raw,
26
27    /// Debezium data change event format.
28    #[serde(rename = "debezium")]
29    Debezium,
30
31    /// Confluent JDBC connector change event format.
32    #[serde(rename = "confluent_jdbc")]
33    ConfluentJdbc,
34}
35
36impl Display for AvroUpdateFormat {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        match self {
39            Self::Raw => f.write_str("raw"),
40            Self::Debezium => f.write_str("debezium"),
41            Self::ConfluentJdbc => f.write_str("confluent_jdbc"),
42        }
43    }
44}
45
46/// Schema registry configuration.
47#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
48pub struct AvroSchemaRegistryConfig {
49    /// List of schema registry URLs.
50    ///
51    /// * **Input connector**: When non-empty, the connector retrieves Avro
52    ///   message schemas from the registry.
53    ///
54    /// * **Output connector**: When non-empty, the connector will
55    ///   post the schema to the registry and embed the schema id returned
56    ///   by the registry in Avro messages.  Otherwise, schema id 0 is used.
57    #[serde(default)]
58    pub registry_urls: Vec<String>,
59
60    /// Custom headers that will be added to every call to the schema registry.
61    ///
62    /// This property is only applicable to output connectors.
63    ///
64    /// Requires `registry_urls` to be set.
65    #[serde(default)]
66    pub registry_headers: HashMap<String, String>,
67
68    /// Proxy that will be used to access the schema registry.
69    ///
70    /// Requires `registry_urls` to be set.
71    pub registry_proxy: Option<String>,
72
73    /// Timeout in seconds used to connect to the registry.
74    ///
75    /// Requires `registry_urls` to be set.
76    pub registry_timeout_secs: Option<u64>,
77
78    /// Username used to authenticate with the registry.
79    ///
80    /// Requires `registry_urls` to be set. This option is mutually exclusive with
81    /// token-based authentication (see `registry_authorization_token`).
82    pub registry_username: Option<String>,
83
84    /// Password used to authenticate with the registry.
85    ///
86    /// Requires `registry_urls` to be set.
87    pub registry_password: Option<String>,
88
89    /// Token used to authenticate with the registry.
90    ///
91    /// Requires `registry_urls` to be set. This option is mutually exclusive with
92    /// password-based authentication (see `registry_username` and `registry_password`).
93    pub registry_authorization_token: Option<String>,
94}
95
96/// Avro output format configuration.
97#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
98#[serde(deny_unknown_fields)]
99pub struct AvroParserConfig {
100    /// Format used to encode data change events in this stream.
101    ///
102    /// The default value is 'raw'.
103    #[serde(default)]
104    pub update_format: AvroUpdateFormat,
105
106    /// Avro schema used to encode all records in this stream, specified as a JSON-encoded string.
107    ///
108    /// When this property is set, the connector uses the provided schema instead of
109    /// retrieving the schema from the schema registry. This setting is mutually exclusive
110    /// with `registry_urls`.
111    pub schema: Option<String>,
112
113    /// `true` if serialized messages only contain raw data without the
114    /// header carrying schema ID.
115    ///
116    /// See <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
117    ///
118    /// The default value is `false`.
119    #[serde(default)]
120    pub skip_schema_id: bool,
121
122    /// Schema registry configuration.
123    #[serde(flatten)]
124    pub registry_config: AvroSchemaRegistryConfig,
125}
126
127/// Subject name strategies used in registering key and value schemas
128/// with the schema registry.
129#[derive(Clone, Serialize, Deserialize, Debug, ToSchema)]
130pub enum SubjectNameStrategy {
131    /// The subject name is derived directly from the Kafka topic name.
132    ///
133    /// For update formats with both key and value components, use subject names
134    /// `{topic_name}-key` and `{topic_name}-value` for key and value schemas respectively.
135    /// For update formats without a key (e.g., `raw`), publish value schema
136    /// under the subject name `{topic_name}`.
137    ///
138    /// Only applicable when using Kafka as a transport.
139    #[serde(rename = "topic_name")]
140    TopicName,
141
142    /// The name of the SQL relation that the schema is derived from is used as the subject name:
143    /// * the SQL view name for the message value schema.
144    /// * the SQL index name for the message key schema.
145    #[serde(rename = "record_name")]
146    RecordName,
147
148    /// Combines both the topic name and the record name to form the subject.
149    ///
150    /// For update formats with both key and value components, use subject names
151    /// `{topic_name}-{record_name}-key` and `{topic_name}-{record_name}-value` for
152    /// key and value schemas respectively.
153    /// For update formats without a key (e.g., `raw`), publish value schema
154    /// under the subject name `{topic_name}-{record_name}`.
155    ///
156    /// `{record_name}` is the name of the SQL view or index that this connector
157    /// is attached to.
158    ///
159    /// Only applicable when using Kafka as a transport.
160    #[serde(rename = "topic_record_name")]
161    TopicRecordName,
162}
163
164/// Determines how the message key is generated when the Avro encoder is configured
165/// in the `raw` mode.
166#[derive(Clone, Serialize, Deserialize, Debug, ToSchema, PartialEq, Eq)]
167pub enum AvroEncoderKeyMode {
168    /// Produce messages without a key.
169    #[serde(rename = "none")]
170    None,
171
172    /// Uses the unique key columns of the view as the message key.
173    ///
174    /// This setting is supported when the output connector is configured with the `index` property.
175    /// It utilizes the values of the index columns specified in the associated `CREATE INDEX` statement
176    /// as the Avro message key.
177    ///
178    /// A separate Avro schema will be created and registered in the schema registry
179    /// for the key component of the message.
180    #[serde(rename = "key_fields")]
181    KeyFields,
182}
183
184/// Avro output format configuration.
185#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
186#[serde(deny_unknown_fields)]
187pub struct AvroEncoderConfig {
188    /// Format used to encode data change events in this stream.
189    ///
190    /// The default value is `raw`.
191    #[serde(default)]
192    pub update_format: AvroUpdateFormat,
193
194    /// Determines how the message key is generated when the Avro encoder is configured
195    /// in the `raw` mode.
196    ///
197    /// The default is `key_fields` when the `index` property of the connector is configured and `none` otherwise.
198    pub key_mode: Option<AvroEncoderKeyMode>,
199
200    /// Avro schema used to encode output records.
201    ///
202    /// When specified, the encoder will use this schema; otherwise it will automatically
203    /// generate an Avro schema based on the SQL view definition.
204    ///
205    /// Specified as a string containing schema definition in JSON format.
206    /// This schema must match precisely the SQL view definition, modulo
207    /// nullability of columns.
208    pub schema: Option<String>,
209
210    /// Optional name of the field used for Change Data Capture (CDC) annotations.
211    ///
212    /// Use this setting with data sinks that expect operation type
213    /// (insert, delete, or update) encoded as a column in the Avro record, such
214    /// as the [Iceberg Sink Kafka Connector](https://docs.feldera.com/connectors/sinks/iceberg).
215    ///
216    /// When set (e.g., `"cdc_field": "op"`), the specified field will be added to each record
217    /// to indicate the type of change:
218    /// - `"I"` for insert operations
219    /// - `"U"` for upserts
220    /// - `"D"` for deletions
221    ///
222    /// If not set, CDC metadata will not be included in the records.
223    /// Only works with the `raw` update format.
224    pub cdc_field: Option<String>,
225
226    /// Avro namespace for the generated Avro schemas.
227    pub namespace: Option<String>,
228
229    /// Subject name strategy used to publish Avro schemas used by the connector
230    /// in the schema registry.
231    ///
232    /// When this property is not specified, the connector chooses subject name strategy automatically:
233    /// * `topic_name` for `confluent_jdbc` update format
234    /// * `record_name` for `raw` update format
235    pub subject_name_strategy: Option<SubjectNameStrategy>,
236
237    /// Set to `true` if serialized messages should only contain raw data
238    /// without the header carrying schema ID.
239    /// `False` by default.
240    ///
241    /// See <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
242    #[serde(default)]
243    pub skip_schema_id: bool,
244
245    /// Schema registry configuration.
246    ///
247    /// When configured, the connector will push the Avro schema, whether it is specified as part of
248    /// connector configuration or generated automatically, to the schema registry and use the schema id
249    /// assigned by the registry in the
250    #[serde(flatten)]
251    pub registry_config: AvroSchemaRegistryConfig,
252}