feldera_types/format/avro.rs
1use serde::{Deserialize, Serialize};
2use std::{collections::HashMap, fmt::Display};
3use utoipa::ToSchema;
4
5/// Supported Avro data change event formats.
6#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, ToSchema, Default)]
7pub enum AvroUpdateFormat {
8 /// Raw encoding.
9 ///
10 /// Each message in the stream represents a single-record update: an insert, upsert, or delete.
11 ///
12 /// ### Input Connectors
13 /// Raw Avro encoding can be used for insert and upsert operations, but not deletes.
14 /// - The message value contains the record to be inserted or updated.
15 /// - The message key and headers are ignored.
16 ///
17 /// ### Output Connectors
18 /// The raw format supports inserts, upserts, and deletes.
19 /// - The message value contains the record to be inserted or deleted.
20 /// - The operation type is specified in the `op` message header field, which can be
21 /// `insert`, `update`, or `delete`.
22 /// - The message key can optionally store the primary key (see the `key_mode` property).
23 #[serde(rename = "raw")]
24 #[default]
25 Raw,
26
27 /// Debezium data change event format.
28 #[serde(rename = "debezium")]
29 Debezium,
30
31 /// Confluent JDBC connector change event format.
32 #[serde(rename = "confluent_jdbc")]
33 ConfluentJdbc,
34}
35
36impl Display for AvroUpdateFormat {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 match self {
39 Self::Raw => f.write_str("raw"),
40 Self::Debezium => f.write_str("debezium"),
41 Self::ConfluentJdbc => f.write_str("confluent_jdbc"),
42 }
43 }
44}
45
46/// Schema registry configuration.
47#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
48pub struct AvroSchemaRegistryConfig {
49 /// List of schema registry URLs.
50 ///
51 /// * **Input connector**: When non-empty, the connector retrieves Avro
52 /// message schemas from the registry.
53 ///
54 /// * **Output connector**: When non-empty, the connector will
55 /// post the schema to the registry and embed the schema id returned
56 /// by the registry in Avro messages. Otherwise, schema id 0 is used.
57 #[serde(default)]
58 pub registry_urls: Vec<String>,
59
60 /// Custom headers that will be added to every call to the schema registry.
61 ///
62 /// This property is only applicable to output connectors.
63 ///
64 /// Requires `registry_urls` to be set.
65 #[serde(default)]
66 pub registry_headers: HashMap<String, String>,
67
68 /// Proxy that will be used to access the schema registry.
69 ///
70 /// Requires `registry_urls` to be set.
71 pub registry_proxy: Option<String>,
72
73 /// Timeout in seconds used to connect to the registry.
74 ///
75 /// Requires `registry_urls` to be set.
76 pub registry_timeout_secs: Option<u64>,
77
78 /// Username used to authenticate with the registry.
79 ///
80 /// Requires `registry_urls` to be set. This option is mutually exclusive with
81 /// token-based authentication (see `registry_authorization_token`).
82 pub registry_username: Option<String>,
83
84 /// Password used to authenticate with the registry.
85 ///
86 /// Requires `registry_urls` to be set.
87 pub registry_password: Option<String>,
88
89 /// Token used to authenticate with the registry.
90 ///
91 /// Requires `registry_urls` to be set. This option is mutually exclusive with
92 /// password-based authentication (see `registry_username` and `registry_password`).
93 pub registry_authorization_token: Option<String>,
94}
95
96/// Avro output format configuration.
97#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
98#[serde(deny_unknown_fields)]
99pub struct AvroParserConfig {
100 /// Format used to encode data change events in this stream.
101 ///
102 /// The default value is 'raw'.
103 #[serde(default)]
104 pub update_format: AvroUpdateFormat,
105
106 /// Avro schema used to encode all records in this stream, specified as a JSON-encoded string.
107 ///
108 /// When this property is set, the connector uses the provided schema instead of
109 /// retrieving the schema from the schema registry. This setting is mutually exclusive
110 /// with `registry_urls`.
111 pub schema: Option<String>,
112
113 /// `true` if serialized messages only contain raw data without the
114 /// header carrying schema ID.
115 ///
116 /// See <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
117 ///
118 /// The default value is `false`.
119 #[serde(default)]
120 pub skip_schema_id: bool,
121
122 /// Schema registry configuration.
123 #[serde(flatten)]
124 pub registry_config: AvroSchemaRegistryConfig,
125}
126
127/// Subject name strategies used in registering key and value schemas
128/// with the schema registry.
129#[derive(Clone, Serialize, Deserialize, Debug, ToSchema)]
130pub enum SubjectNameStrategy {
131 /// The subject name is derived directly from the Kafka topic name.
132 ///
133 /// For update formats with both key and value components, use subject names
134 /// `{topic_name}-key` and `{topic_name}-value` for key and value schemas respectively.
135 /// For update formats without a key (e.g., `raw`), publish value schema
136 /// under the subject name `{topic_name}`.
137 ///
138 /// Only applicable when using Kafka as a transport.
139 #[serde(rename = "topic_name")]
140 TopicName,
141
142 /// The name of the SQL relation that the schema is derived from is used as the subject name:
143 /// * the SQL view name for the message value schema.
144 /// * the SQL index name for the message key schema.
145 #[serde(rename = "record_name")]
146 RecordName,
147
148 /// Combines both the topic name and the record name to form the subject.
149 ///
150 /// For update formats with both key and value components, use subject names
151 /// `{topic_name}-{record_name}-key` and `{topic_name}-{record_name}-value` for
152 /// key and value schemas respectively.
153 /// For update formats without a key (e.g., `raw`), publish value schema
154 /// under the subject name `{topic_name}-{record_name}`.
155 ///
156 /// `{record_name}` is the name of the SQL view or index that this connector
157 /// is attached to.
158 ///
159 /// Only applicable when using Kafka as a transport.
160 #[serde(rename = "topic_record_name")]
161 TopicRecordName,
162}
163
164/// Determines how the message key is generated when the Avro encoder is configured
165/// in the `raw` mode.
166#[derive(Clone, Serialize, Deserialize, Debug, ToSchema, PartialEq, Eq)]
167pub enum AvroEncoderKeyMode {
168 /// Produce messages without a key.
169 #[serde(rename = "none")]
170 None,
171
172 /// Uses the unique key columns of the view as the message key.
173 ///
174 /// This setting is supported when the output connector is configured with the `index` property.
175 /// It utilizes the values of the index columns specified in the associated `CREATE INDEX` statement
176 /// as the Avro message key.
177 ///
178 /// A separate Avro schema will be created and registered in the schema registry
179 /// for the key component of the message.
180 #[serde(rename = "key_fields")]
181 KeyFields,
182}
183
184/// Avro output format configuration.
185#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
186#[serde(deny_unknown_fields)]
187pub struct AvroEncoderConfig {
188 /// Format used to encode data change events in this stream.
189 ///
190 /// The default value is `raw`.
191 #[serde(default)]
192 pub update_format: AvroUpdateFormat,
193
194 /// Determines how the message key is generated when the Avro encoder is configured
195 /// in the `raw` mode.
196 ///
197 /// The default is `key_fields` when the `index` property of the connector is configured and `none` otherwise.
198 pub key_mode: Option<AvroEncoderKeyMode>,
199
200 /// Avro schema used to encode output records.
201 ///
202 /// When specified, the encoder will use this schema; otherwise it will automatically
203 /// generate an Avro schema based on the SQL view definition.
204 ///
205 /// Specified as a string containing schema definition in JSON format.
206 /// This schema must match precisely the SQL view definition, modulo
207 /// nullability of columns.
208 pub schema: Option<String>,
209
210 /// Optional name of the field used for Change Data Capture (CDC) annotations.
211 ///
212 /// Use this setting with data sinks that expect operation type
213 /// (insert, delete, or update) encoded as a column in the Avro record, such
214 /// as the [Iceberg Sink Kafka Connector](https://docs.feldera.com/connectors/sinks/iceberg).
215 ///
216 /// When set (e.g., `"cdc_field": "op"`), the specified field will be added to each record
217 /// to indicate the type of change:
218 /// - `"I"` for insert operations
219 /// - `"U"` for upserts
220 /// - `"D"` for deletions
221 ///
222 /// If not set, CDC metadata will not be included in the records.
223 /// Only works with the `raw` update format.
224 pub cdc_field: Option<String>,
225
226 /// Avro namespace for the generated Avro schemas.
227 pub namespace: Option<String>,
228
229 /// Subject name strategy used to publish Avro schemas used by the connector
230 /// in the schema registry.
231 ///
232 /// When this property is not specified, the connector chooses subject name strategy automatically:
233 /// * `topic_name` for `confluent_jdbc` update format
234 /// * `record_name` for `raw` update format
235 pub subject_name_strategy: Option<SubjectNameStrategy>,
236
237 /// Set to `true` if serialized messages should only contain raw data
238 /// without the header carrying schema ID.
239 /// `False` by default.
240 ///
241 /// See <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
242 #[serde(default)]
243 pub skip_schema_id: bool,
244
245 /// Schema registry configuration.
246 ///
247 /// When configured, the connector will push the Avro schema, whether it is specified as part of
248 /// connector configuration or generated automatically, to the schema registry and use the schema id
249 /// assigned by the registry in the
250 #[serde(flatten)]
251 pub registry_config: AvroSchemaRegistryConfig,
252}