feldera_types/format/avro.rs
1use serde::{Deserialize, Serialize};
2use std::{collections::HashMap, fmt::Display};
3use utoipa::ToSchema;
4
5/// Supported Avro data change event formats.
6#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, ToSchema)]
7pub enum AvroUpdateFormat {
8 /// Raw encoding.
9 ///
10 /// Every event in the stream represents a single record to be stored
11 /// in the table that the stream is connected to. This format can represent
12 /// inserts and upsert, but not detetes.
13 #[serde(rename = "raw")]
14 Raw,
15
16 /// Debezium data change event format.
17 #[serde(rename = "debezium")]
18 Debezium,
19
20 /// Confluent JDBC connector change event format.
21 #[serde(rename = "confluent_jdbc")]
22 ConfluentJdbc,
23}
24
25impl Default for AvroUpdateFormat {
26 fn default() -> Self {
27 Self::Raw
28 }
29}
30
31impl Display for AvroUpdateFormat {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 match self {
34 Self::Raw => f.write_str("raw"),
35 Self::Debezium => f.write_str("debezium"),
36 Self::ConfluentJdbc => f.write_str("confluent_jdbc"),
37 }
38 }
39}
40
41impl AvroUpdateFormat {
42 /// `true` - this format has both key and value components.
43 /// `false` - format includes value only.
44 pub fn has_key(&self) -> bool {
45 match self {
46 Self::Raw => false,
47 Self::Debezium => true,
48 Self::ConfluentJdbc => true,
49 }
50 }
51
52 /// Does the format support deletions?
53 pub fn supports_deletes(&self) -> bool {
54 match self {
55 Self::Raw => false,
56 Self::Debezium => true,
57 Self::ConfluentJdbc => true,
58 }
59 }
60}
61
62/// Schema registry configuration.
63#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
64pub struct AvroSchemaRegistryConfig {
65 /// List of schema registry URLs.
66 ///
67 /// * **Input connector**: When non-empty, the connector retrieves Avro
68 /// message schemas from the registry.
69 ///
70 /// * **Output connector**: When non-empty, the connector will
71 /// post the schema to the registry and embed the schema id returned
72 /// by the registry in Avro messages. Otherwise, schema id 0 is used.
73 #[serde(default)]
74 pub registry_urls: Vec<String>,
75
76 /// Custom headers that will be added to every call to the schema registry.
77 ///
78 /// This property is only applicable to output connectors.
79 ///
80 /// Requires `registry_urls` to be set.
81 #[serde(default)]
82 pub registry_headers: HashMap<String, String>,
83
84 /// Proxy that will be used to access the schema registry.
85 ///
86 /// Requires `registry_urls` to be set.
87 pub registry_proxy: Option<String>,
88
89 /// Timeout in seconds used to connect to the registry.
90 ///
91 /// Requires `registry_urls` to be set.
92 pub registry_timeout_secs: Option<u64>,
93
94 /// Username used to authenticate with the registry.
95 ///
96 /// Requires `registry_urls` to be set. This option is mutually exclusive with
97 /// token-based authentication (see `registry_authorization_token`).
98 pub registry_username: Option<String>,
99
100 /// Password used to authenticate with the registry.
101 ///
102 /// Requires `registry_urls` to be set.
103 pub registry_password: Option<String>,
104
105 /// Token used to authenticate with the registry.
106 ///
107 /// Requires `registry_urls` to be set. This option is mutually exclusive with
108 /// password-based authentication (see `registry_username` and `registry_password`).
109 pub registry_authorization_token: Option<String>,
110}
111
112/// Avro output format configuration.
113#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
114#[serde(deny_unknown_fields)]
115pub struct AvroParserConfig {
116 /// Format used to encode data change events in this stream.
117 ///
118 /// The default value is 'raw'.
119 #[serde(default)]
120 pub update_format: AvroUpdateFormat,
121
122 /// Avro schema used to encode all records in this stream, specified as a JSON-encoded string.
123 ///
124 /// When this property is set, the connector uses the provided schema instead of
125 /// retrieving the schema from the schema registry. This setting is mutually exclusive
126 /// with `registry_urls`.
127 pub schema: Option<String>,
128
129 /// `true` if serialized messages only contain raw data without the
130 /// header carrying schema ID.
131 ///
132 /// See <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
133 ///
134 /// The default value is `false`.
135 #[serde(default)]
136 pub skip_schema_id: bool,
137
138 /// Schema registry configuration.
139 #[serde(flatten)]
140 pub registry_config: AvroSchemaRegistryConfig,
141}
142
143/// Subject name strategies used in registering key and value schemas
144/// with the schema registry.
145#[derive(Clone, Serialize, Deserialize, Debug, ToSchema)]
146pub enum SubjectNameStrategy {
147 /// The subject name is derived directly from the Kafka topic name.
148 ///
149 /// For update formats with both key and value components, use subject names
150 /// `{topic_name}-key` and `{topic_name}-value` for key and value schemas respectively.
151 /// For update formats without a key (e.g., `raw`), publish value schema
152 /// under the subject name `{topic_name}`.
153 ///
154 /// Only applicable when using Kafka as a transport.
155 #[serde(rename = "topic_name")]
156 TopicName,
157 /// The subject name is derived from the fully qualified name of the record.
158 #[serde(rename = "record_name")]
159 RecordName,
160 /// Combines both the topic name and the record name to form the subject.
161 ///
162 /// For update formats with both key and value components, use subject names
163 /// `{topic_name}-{record_name}-key` and `{topic_name}-{record_name}-value` for
164 /// key and value schemas respectively.
165 /// For update formats without a key (e.g., `raw`), publish value schema
166 /// under the subject name `{topic_name}-{record_name}`.
167 ///
168 /// Here, `{record_name}` is the name of the SQL view tha this connector
169 /// is attached to.
170 ///
171 /// Only applicable when using Kafka as a transport.
172 #[serde(rename = "topic_record_name")]
173 TopicRecordName,
174}
175
176/// Avro output format configuration.
177#[derive(Serialize, Deserialize, Debug, Default, ToSchema)]
178#[serde(deny_unknown_fields)]
179pub struct AvroEncoderConfig {
180 /// Format used to encode data change events in this stream.
181 ///
182 /// The default value is 'raw'.
183 #[serde(default)]
184 pub update_format: AvroUpdateFormat,
185
186 /// Avro schema used to encode output records.
187 ///
188 /// When specified, the encoder will use this schema; otherwise it will automatically
189 /// generate an Avro schema based on the SQL view definition.
190 ///
191 /// Specified as a string containing schema definition in JSON format.
192 /// This schema must match precisely the SQL view definition, modulo
193 /// nullability of columns.
194 pub schema: Option<String>,
195
196 /// Avro namespace for the generated Avro schemas.
197 pub namespace: Option<String>,
198
199 /// Subject name strategy used to publish Avro schemas used by the connector
200 /// in the schema registry.
201 ///
202 /// When this property is not specified, the connector chooses subject name strategy automatically:
203 /// * `topic_name` for `confluent_jdbc` update format
204 /// * `record_name` for `raw` update format
205 pub subject_name_strategy: Option<SubjectNameStrategy>,
206
207 /// Set to `true` if serialized messages should only contain raw data
208 /// without the header carrying schema ID.
209 /// `False` by default.
210 ///
211 /// See <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
212 #[serde(default)]
213 pub skip_schema_id: bool,
214
215 /// Schema registry configuration.
216 ///
217 /// When configured, the connector will push the Avro schema, whether it is specified as part of
218 /// connector configuration or generated automatically, to the schema registry and use the schema id
219 /// assigned by the registry in the
220 #[serde(flatten)]
221 pub registry_config: AvroSchemaRegistryConfig,
222}