feldera_types/format/
avro.rs

1use serde::{Deserialize, Serialize};
2use std::{collections::HashMap, fmt::Display};
3use utoipa::ToSchema;
4
5/// Supported Avro data change event formats.
6#[derive(Deserialize, Serialize, Clone, Debug, PartialEq, Eq, ToSchema)]
7pub enum AvroUpdateFormat {
8    /// Raw encoding.
9    ///
10    /// Every event in the stream represents a single record to be stored
11    /// in the table that the stream is connected to.  This format can represent
12    /// inserts and upsert, but not detetes.
13    #[serde(rename = "raw")]
14    Raw,
15
16    /// Debezium data change event format.
17    #[serde(rename = "debezium")]
18    Debezium,
19
20    /// Confluent JDBC connector change event format.
21    #[serde(rename = "confluent_jdbc")]
22    ConfluentJdbc,
23}
24
25impl Default for AvroUpdateFormat {
26    fn default() -> Self {
27        Self::Raw
28    }
29}
30
31impl Display for AvroUpdateFormat {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        match self {
34            Self::Raw => f.write_str("raw"),
35            Self::Debezium => f.write_str("debezium"),
36            Self::ConfluentJdbc => f.write_str("confluent_jdbc"),
37        }
38    }
39}
40
41impl AvroUpdateFormat {
42    /// `true` - this format has both key and value components.
43    /// `false` - format includes value only.
44    pub fn has_key(&self) -> bool {
45        match self {
46            Self::Raw => false,
47            Self::Debezium => true,
48            Self::ConfluentJdbc => true,
49        }
50    }
51
52    /// Does the format support deletions?
53    pub fn supports_deletes(&self) -> bool {
54        match self {
55            Self::Raw => false,
56            Self::Debezium => true,
57            Self::ConfluentJdbc => true,
58        }
59    }
60}
61
62/// Schema registry configuration.
63#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
64pub struct AvroSchemaRegistryConfig {
65    /// List of schema registry URLs.
66    ///
67    /// * **Input connector**: When non-empty, the connector retrieves Avro
68    ///   message schemas from the registry.
69    ///
70    /// * **Output connector**: When non-empty, the connector will
71    ///   post the schema to the registry and embed the schema id returned
72    ///   by the registry in Avro messages.  Otherwise, schema id 0 is used.
73    #[serde(default)]
74    pub registry_urls: Vec<String>,
75
76    /// Custom headers that will be added to every call to the schema registry.
77    ///
78    /// This property is only applicable to output connectors.
79    ///
80    /// Requires `registry_urls` to be set.
81    #[serde(default)]
82    pub registry_headers: HashMap<String, String>,
83
84    /// Proxy that will be used to access the schema registry.
85    ///
86    /// Requires `registry_urls` to be set.
87    pub registry_proxy: Option<String>,
88
89    /// Timeout in seconds used to connect to the registry.
90    ///
91    /// Requires `registry_urls` to be set.
92    pub registry_timeout_secs: Option<u64>,
93
94    /// Username used to authenticate with the registry.
95    ///
96    /// Requires `registry_urls` to be set. This option is mutually exclusive with
97    /// token-based authentication (see `registry_authorization_token`).
98    pub registry_username: Option<String>,
99
100    /// Password used to authenticate with the registry.
101    ///
102    /// Requires `registry_urls` to be set.
103    pub registry_password: Option<String>,
104
105    /// Token used to authenticate with the registry.
106    ///
107    /// Requires `registry_urls` to be set. This option is mutually exclusive with
108    /// password-based authentication (see `registry_username` and `registry_password`).
109    pub registry_authorization_token: Option<String>,
110}
111
112/// Avro output format configuration.
113#[derive(Clone, Serialize, Deserialize, Debug, Default, ToSchema)]
114#[serde(deny_unknown_fields)]
115pub struct AvroParserConfig {
116    /// Format used to encode data change events in this stream.
117    ///
118    /// The default value is 'raw'.
119    #[serde(default)]
120    pub update_format: AvroUpdateFormat,
121
122    /// Avro schema used to encode all records in this stream, specified as a JSON-encoded string.
123    ///
124    /// When this property is set, the connector uses the provided schema instead of
125    /// retrieving the schema from the schema registry. This setting is mutually exclusive
126    /// with `registry_urls`.
127    pub schema: Option<String>,
128
129    /// `true` if serialized messages only contain raw data without the
130    /// header carrying schema ID.
131    ///
132    /// See <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
133    ///
134    /// The default value is `false`.
135    #[serde(default)]
136    pub skip_schema_id: bool,
137
138    /// Schema registry configuration.
139    #[serde(flatten)]
140    pub registry_config: AvroSchemaRegistryConfig,
141}
142
143/// Subject name strategies used in registering key and value schemas
144/// with the schema registry.
145#[derive(Clone, Serialize, Deserialize, Debug, ToSchema)]
146pub enum SubjectNameStrategy {
147    /// The subject name is derived directly from the Kafka topic name.
148    ///
149    /// For update formats with both key and value components, use subject names
150    /// `{topic_name}-key` and `{topic_name}-value` for key and value schemas respectively.
151    /// For update formats without a key (e.g., `raw`), publish value schema
152    /// under the subject name `{topic_name}`.
153    ///
154    /// Only applicable when using Kafka as a transport.
155    #[serde(rename = "topic_name")]
156    TopicName,
157    /// The subject name is derived from the fully qualified name of the record.
158    #[serde(rename = "record_name")]
159    RecordName,
160    /// Combines both the topic name and the record name to form the subject.
161    ///
162    /// For update formats with both key and value components, use subject names
163    /// `{topic_name}-{record_name}-key` and `{topic_name}-{record_name}-value` for
164    /// key and value schemas respectively.
165    /// For update formats without a key (e.g., `raw`), publish value schema
166    /// under the subject name `{topic_name}-{record_name}`.
167    ///
168    /// Here, `{record_name}` is the name of the SQL view tha this connector
169    /// is attached to.
170    ///
171    /// Only applicable when using Kafka as a transport.
172    #[serde(rename = "topic_record_name")]
173    TopicRecordName,
174}
175
176/// Avro output format configuration.
177#[derive(Serialize, Deserialize, Debug, Default, ToSchema)]
178#[serde(deny_unknown_fields)]
179pub struct AvroEncoderConfig {
180    /// Format used to encode data change events in this stream.
181    ///
182    /// The default value is 'raw'.
183    #[serde(default)]
184    pub update_format: AvroUpdateFormat,
185
186    /// Avro schema used to encode output records.
187    ///
188    /// When specified, the encoder will use this schema; otherwise it will automatically
189    /// generate an Avro schema based on the SQL view definition.
190    ///
191    /// Specified as a string containing schema definition in JSON format.
192    /// This schema must match precisely the SQL view definition, modulo
193    /// nullability of columns.
194    pub schema: Option<String>,
195
196    /// Avro namespace for the generated Avro schemas.
197    pub namespace: Option<String>,
198
199    /// Subject name strategy used to publish Avro schemas used by the connector
200    /// in the schema registry.
201    ///
202    /// When this property is not specified, the connector chooses subject name strategy automatically:
203    /// * `topic_name` for `confluent_jdbc` update format
204    /// * `record_name` for `raw` update format
205    pub subject_name_strategy: Option<SubjectNameStrategy>,
206
207    /// Set to `true` if serialized messages should only contain raw data
208    /// without the header carrying schema ID.
209    /// `False` by default.
210    ///
211    /// See <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
212    #[serde(default)]
213    pub skip_schema_id: bool,
214
215    /// Schema registry configuration.
216    ///
217    /// When configured, the connector will push the Avro schema, whether it is specified as part of
218    /// connector configuration or generated automatically, to the schema registry and use the schema id
219    /// assigned by the registry in the
220    #[serde(flatten)]
221    pub registry_config: AvroSchemaRegistryConfig,
222}