feldera_types/format/avro.rs
1use serde::{Deserialize, Serialize};
2use std::{collections::HashMap, fmt::Display};
3use utoipa::ToSchema;
4
5/// Supported Avro data change event formats.
6#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, ToSchema)]
7pub enum AvroUpdateFormat {
8 /// Raw encoding.
9 ///
10 /// Each message in the stream represents a single-record update: an insert, upsert, or delete.
11 ///
12 /// ### Input Connectors
13 /// Raw Avro encoding can be used for insert and upsert operations, but not deletes.
14 /// - The message value contains the record to be inserted or updated.
15 /// - The message key and headers are ignored.
16 ///
17 /// ### Output Connectors
18 /// The raw format supports inserts, upserts, and deletes.
19 /// - The message value contains the record to be inserted or deleted.
20 /// - The operation type is specified in the `op` message header field, which can be
21 /// `insert`, `update`, or `delete`.
22 /// - The message key can optionally store the primary key (see the `key_mode` property).
23 #[serde(rename = "raw")]
24 Raw,
25
26 /// Debezium data change event format.
27 #[serde(rename = "debezium")]
28 Debezium,
29
30 /// Confluent JDBC connector change event format.
31 #[serde(rename = "confluent_jdbc")]
32 ConfluentJdbc,
33}
34
35impl Default for AvroUpdateFormat {
36 fn default() -> Self {
37 Self::Raw
38 }
39}
40
41impl Display for AvroUpdateFormat {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 match self {
44 Self::Raw => f.write_str("raw"),
45 Self::Debezium => f.write_str("debezium"),
46 Self::ConfluentJdbc => f.write_str("confluent_jdbc"),
47 }
48 }
49}
50
51/// Schema registry configuration.
52#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
53pub struct AvroSchemaRegistryConfig {
54 /// List of schema registry URLs.
55 ///
56 /// * **Input connector**: When non-empty, the connector retrieves Avro
57 /// message schemas from the registry.
58 ///
59 /// * **Output connector**: When non-empty, the connector will
60 /// post the schema to the registry and embed the schema id returned
61 /// by the registry in Avro messages. Otherwise, schema id 0 is used.
62 #[serde(default)]
63 pub registry_urls: Vec<String>,
64
65 /// Custom headers that will be added to every call to the schema registry.
66 ///
67 /// This property is only applicable to output connectors.
68 ///
69 /// Requires `registry_urls` to be set.
70 #[serde(default)]
71 pub registry_headers: HashMap<String, String>,
72
73 /// Proxy that will be used to access the schema registry.
74 ///
75 /// Requires `registry_urls` to be set.
76 pub registry_proxy: Option<String>,
77
78 /// Timeout in seconds used to connect to the registry.
79 ///
80 /// Requires `registry_urls` to be set.
81 pub registry_timeout_secs: Option<u64>,
82
83 /// Username used to authenticate with the registry.
84 ///
85 /// Requires `registry_urls` to be set. This option is mutually exclusive with
86 /// token-based authentication (see `registry_authorization_token`).
87 pub registry_username: Option<String>,
88
89 /// Password used to authenticate with the registry.
90 ///
91 /// Requires `registry_urls` to be set.
92 pub registry_password: Option<String>,
93
94 /// Token used to authenticate with the registry.
95 ///
96 /// Requires `registry_urls` to be set. This option is mutually exclusive with
97 /// password-based authentication (see `registry_username` and `registry_password`).
98 pub registry_authorization_token: Option<String>,
99}
100
101/// Avro output format configuration.
102#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
103#[serde(deny_unknown_fields)]
104pub struct AvroParserConfig {
105 /// Format used to encode data change events in this stream.
106 ///
107 /// The default value is 'raw'.
108 #[serde(default)]
109 pub update_format: AvroUpdateFormat,
110
111 /// Avro schema used to encode all records in this stream, specified as a JSON-encoded string.
112 ///
113 /// When this property is set, the connector uses the provided schema instead of
114 /// retrieving the schema from the schema registry. This setting is mutually exclusive
115 /// with `registry_urls`.
116 pub schema: Option<String>,
117
118 /// `true` if serialized messages only contain raw data without the
119 /// header carrying schema ID.
120 ///
121 /// See <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
122 ///
123 /// The default value is `false`.
124 #[serde(default)]
125 pub skip_schema_id: bool,
126
127 /// Schema registry configuration.
128 #[serde(flatten)]
129 pub registry_config: AvroSchemaRegistryConfig,
130}
131
132/// Subject name strategies used in registering key and value schemas
133/// with the schema registry.
134#[derive(Clone, Serialize, Deserialize, Debug, ToSchema)]
135pub enum SubjectNameStrategy {
136 /// The subject name is derived directly from the Kafka topic name.
137 ///
138 /// For update formats with both key and value components, use subject names
139 /// `{topic_name}-key` and `{topic_name}-value` for key and value schemas respectively.
140 /// For update formats without a key (e.g., `raw`), publish value schema
141 /// under the subject name `{topic_name}`.
142 ///
143 /// Only applicable when using Kafka as a transport.
144 #[serde(rename = "topic_name")]
145 TopicName,
146
147 /// The name of the SQL relation that the schema is derived from is used as the subject name:
148 /// * the SQL view name for the message value schema.
149 /// * the SQL index name for the message key schema.
150 #[serde(rename = "record_name")]
151 RecordName,
152
153 /// Combines both the topic name and the record name to form the subject.
154 ///
155 /// For update formats with both key and value components, use subject names
156 /// `{topic_name}-{record_name}-key` and `{topic_name}-{record_name}-value` for
157 /// key and value schemas respectively.
158 /// For update formats without a key (e.g., `raw`), publish value schema
159 /// under the subject name `{topic_name}-{record_name}`.
160 ///
161 /// `{record_name}` is the name of the SQL view or index that this connector
162 /// is attached to.
163 ///
164 /// Only applicable when using Kafka as a transport.
165 #[serde(rename = "topic_record_name")]
166 TopicRecordName,
167}
168
169/// Determines how the message key is generated when the Avro encoder is configured
170/// in the `raw` mode.
171#[derive(Clone, Serialize, Deserialize, Debug, ToSchema, PartialEq, Eq)]
172pub enum AvroEncoderKeyMode {
173 /// Produce messages without a key.
174 #[serde(rename = "none")]
175 None,
176
177 /// Uses the unique key columns of the view as the message key.
178 ///
179 /// This setting is supported when the output connector is configured with the `index` property.
180 /// It utilizes the values of the index columns specified in the associated `CREATE INDEX` statement
181 /// as the Avro message key.
182 ///
183 /// A separate Avro schema will be created and registered in the schema registry
184 /// for the key component of the message.
185 #[serde(rename = "key_fields")]
186 KeyFields,
187}
188
189/// Avro output format configuration.
190#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
191#[serde(deny_unknown_fields)]
192pub struct AvroEncoderConfig {
193 /// Format used to encode data change events in this stream.
194 ///
195 /// The default value is `raw`.
196 #[serde(default)]
197 pub update_format: AvroUpdateFormat,
198
199 /// Determines how the message key is generated when the Avro encoder is configured
200 /// in the `raw` mode.
201 ///
202 /// The default is `key_fields` when the `index` property of the connector is configured and `none` otherwise.
203 pub key_mode: Option<AvroEncoderKeyMode>,
204
205 /// Avro schema used to encode output records.
206 ///
207 /// When specified, the encoder will use this schema; otherwise it will automatically
208 /// generate an Avro schema based on the SQL view definition.
209 ///
210 /// Specified as a string containing schema definition in JSON format.
211 /// This schema must match precisely the SQL view definition, modulo
212 /// nullability of columns.
213 pub schema: Option<String>,
214
215 /// Optional name of the field used for Change Data Capture (CDC) annotations.
216 ///
217 /// Use this setting with data sinks that expect operation type
218 /// (insert, delete, or update) encoded as a column in the Avro record, such
219 /// as the [Iceberg Sink Kafka Connector](https://docs.feldera.com/connectors/sinks/iceberg).
220 ///
221 /// When set (e.g., `"cdc_field": "op"`), the specified field will be added to each record
222 /// to indicate the type of change:
223 /// - `"I"` for insert operations
224 /// - `"U"` for upserts
225 /// - `"D"` for deletions
226 ///
227 /// If not set, CDC metadata will not be included in the records.
228 /// Only works with the `raw` update format.
229 pub cdc_field: Option<String>,
230
231 /// Avro namespace for the generated Avro schemas.
232 pub namespace: Option<String>,
233
234 /// Subject name strategy used to publish Avro schemas used by the connector
235 /// in the schema registry.
236 ///
237 /// When this property is not specified, the connector chooses subject name strategy automatically:
238 /// * `topic_name` for `confluent_jdbc` update format
239 /// * `record_name` for `raw` update format
240 pub subject_name_strategy: Option<SubjectNameStrategy>,
241
242 /// Set to `true` if serialized messages should only contain raw data
243 /// without the header carrying schema ID.
244 /// `False` by default.
245 ///
246 /// See <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
247 #[serde(default)]
248 pub skip_schema_id: bool,
249
250 /// Schema registry configuration.
251 ///
252 /// When configured, the connector will push the Avro schema, whether it is specified as part of
253 /// connector configuration or generated automatically, to the schema registry and use the schema id
254 /// assigned by the registry in the
255 #[serde(flatten)]
256 pub registry_config: AvroSchemaRegistryConfig,
257}