feldera_types/format/json.rs
1use serde::{Deserialize, Serialize};
2use std::fmt::Display;
3use utoipa::ToSchema;
4
5/// JSON parser configuration.
6///
7/// Describes the shape of an input JSON stream.
8///
9/// # Examples
10///
11/// A configuration with `update_format="raw"` and `array=false`
12/// is used to parse a stream of JSON objects without any envelope
13/// that get inserted in the input table.
14///
15/// ```json
16/// {"b": false, "i": 100, "s": "foo"}
17/// {"b": true, "i": 5, "s": "bar"}
18/// ```
19///
20/// A configuration with `update_format="insert_delete"` and
21/// `array=false` is used to parse a stream of JSON data change events
22/// in the insert/delete format:
23///
24/// ```json
25/// {"delete": {"b": false, "i": 15, "s": ""}}
26/// {"insert": {"b": false, "i": 100, "s": "foo"}}
27/// ```
28///
29/// A configuration with `update_format="insert_delete"` and
30/// `array=true` is used to parse a stream of JSON arrays
31/// where each array contains multiple data change events in
32/// the insert/delete format.
33///
34/// ```json
35/// [{"insert": {"b": true, "i": 0}}, {"delete": {"b": false, "i": 100, "s": "foo"}}]
36/// ```
37#[derive(Clone, Debug, Default, Deserialize, Serialize, ToSchema)]
38#[serde(default)]
39pub struct JsonParserConfig {
40 /// JSON update format.
41 pub update_format: JsonUpdateFormat,
42
43 /// Specifies JSON encoding used for individual table records.
44 pub json_flavor: JsonFlavor,
45
46 /// Set to `true` if updates in this stream are packaged into JSON arrays.
47 ///
48 /// # Example
49 ///
50 /// ```json
51 /// [{"b": true, "i": 0},{"b": false, "i": 100, "s": "foo"}]
52 /// ```
53 pub array: bool,
54
55 /// Whether JSON elements can span multiple lines.
56 ///
57 /// This only affects JSON input.
58 pub lines: JsonLines,
59}
60
61/// Whether JSON values can span multiple lines.
62#[derive(Deserialize, Serialize, Clone, Debug, Default, PartialEq, Eq, ToSchema)]
63pub enum JsonLines {
64 /// JSON values may span multiple lines.
65 ///
66 /// This supports general-purpose JSON input.
67 #[default]
68 #[serde(rename = "multiple")]
69 Multiple,
70
71 /// A given JSON value never contains a new-line.
72 ///
73 /// Suitable for parsing [NDJSON](https://github.com/ndjson/ndjson-spec).
74 /// Lines are allowed to contain multiple JSON values. The parser ignores
75 /// empty lines.
76 #[serde(rename = "single")]
77 Single,
78}
79
80/// Supported JSON data change event formats.
81///
82/// Each element in a JSON-formatted input stream specifies
83/// an update to one or more records in an input table. We support
84/// several different ways to represent such updates.
85///
86/// ### `InsertDelete`
87///
88/// Each element in the input stream consists of an "insert" or "delete"
89/// command and a record to be inserted to or deleted from the input table.
90///
91/// ```json
92/// {"insert": {"column1": "hello, world!", "column2": 100}}
93/// ```
94///
95/// ### `Weighted`
96///
97/// Each element in the input stream consists of a record and a weight
98/// which indicates how many times the row appears.
99///
100/// ```json
101/// {"weight": 2, "data": {"column1": "hello, world!", "column2": 100}}
102/// ```
103///
104/// Note that the line above would be equivalent to the following input in the `InsertDelete` format:
105///
106/// ```json
107/// {"insert": {"column1": "hello, world!", "column2": 100}}
108/// {"insert": {"column1": "hello, world!", "column2": 100}}
109/// ```
110///
111/// Similarly, negative weights are equivalent to deletions:
112///
113/// ```json
114/// {"weight": -1, "data": {"column1": "hello, world!", "column2": 100}}
115/// ```
116///
117/// is equivalent to in the `InsertDelete` format:
118///
119/// ```json
120/// {"delete": {"column1": "hello, world!", "column2": 100}}
121/// ```
122///
123/// ### `Debezium`
124///
125/// Debezium CDC format. Refer to [Debezium input connector documentation](https://docs.feldera.com/connectors/sources/debezium) for details.
126///
127/// ### `Snowflake`
128///
129/// Uses flat structure so that fields can get parsed directly into SQL
130/// columns. Defines three metadata fields:
131///
132/// * `__action` - "insert" or "delete"
133/// * `__stream_id` - unique 64-bit ID of the output stream (records within
134/// a stream are totally ordered)
135/// * `__seq_number` - monotonically increasing sequence number relative to
136/// the start of the stream.
137///
138/// ```json
139/// {"PART":1,"VENDOR":2,"EFFECTIVE_SINCE":"2019-05-21","PRICE":"10000","__action":"insert","__stream_id":4523666124030717756,"__seq_number":1}
140/// ```
141///
142/// ### `Raw`
143///
144/// This format is suitable for insert-only streams (no deletions).
145/// Each element in the input stream contains a record without any
146/// additional envelope that gets inserted in the input table.
147#[derive(Deserialize, Serialize, Clone, Debug, Default, PartialEq, Eq, ToSchema)]
148pub enum JsonUpdateFormat {
149 #[default]
150 #[serde(rename = "insert_delete")]
151 InsertDelete,
152
153 #[serde(rename = "weighted")]
154 Weighted,
155
156 #[serde(rename = "debezium")]
157 Debezium,
158
159 #[serde(rename = "snowflake")]
160 Snowflake,
161
162 #[serde(rename = "raw")]
163 Raw,
164
165 #[serde(rename = "redis")]
166 Redis,
167}
168
169impl Display for JsonUpdateFormat {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 match self {
172 JsonUpdateFormat::InsertDelete => write!(f, "insert_delete"),
173 JsonUpdateFormat::Weighted => write!(f, "weighted"),
174 JsonUpdateFormat::Debezium => write!(f, "debezium"),
175 JsonUpdateFormat::Snowflake => write!(f, "snowflake"),
176 JsonUpdateFormat::Raw => write!(f, "raw"),
177 JsonUpdateFormat::Redis => write!(f, "redis"),
178 }
179 }
180}
181
182/// Specifies JSON encoding used of table records.
183#[derive(Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
184pub enum JsonFlavor {
185 /// Default encoding used by Feldera, documented
186 /// [here](https://docs.feldera.com/formats/json#types).
187 #[default]
188 #[serde(rename = "default")]
189 Default,
190 /// Debezium MySQL JSON produced by the default configuration of the
191 /// Debezium [Kafka Connect connector](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types)
192 /// with `decimal.handling.mode` set to "string".
193 #[serde(rename = "debezium_mysql")]
194 DebeziumMySql,
195 /// Debezium Postgres JSON produced by the default configuration of the
196 /// Debezium [Kafka Connect connector](https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-data-types)
197 /// with `decimal.handling.mode` set to "string".
198 #[serde(rename = "debezium_postgres")]
199 DebeziumPostgres,
200 /// JSON format accepted by Snowflake using default settings.
201 #[serde(rename = "snowflake")]
202 Snowflake,
203 /// JSON format accepted by the Kafka Connect `JsonConverter` class.
204 #[serde(rename = "kafka_connect_json_converter")]
205 KafkaConnectJsonConverter,
206 #[serde(rename = "pandas")]
207 Pandas,
208 /// JSON format used to represent blockchain data.
209 /// Uses Base58 encoding for binary data such as Bitcoin addresses.
210 #[serde(rename = "blockchain")]
211 Blockchain,
212 /// Parquet to-json format.
213 /// (For internal use only)
214 #[serde(skip)]
215 ParquetConverter,
216 /// Used by the clock input connector.
217 ClockInput,
218 /// Datagen format.
219 /// (For internal use only)
220 #[serde(rename = "datagen")]
221 Datagen,
222 /// Postgres format.
223 /// (For internal use only)
224 #[serde(rename = "postgres")]
225 Postgres,
226}
227
228// TODO: support multiple update formats, e.g., `WeightedUpdate`
229// supports arbitrary weights beyond `MAX_DUPLICATES`.
230#[derive(Deserialize, Serialize, ToSchema)]
231#[serde(default)]
232pub struct JsonEncoderConfig {
233 pub update_format: JsonUpdateFormat,
234 pub json_flavor: Option<JsonFlavor>,
235 pub buffer_size_records: usize,
236 pub array: bool,
237
238 /// When this option is set, only the listed fields appear in the Debezium message key.
239 ///
240 /// This option is useful when writing to a table with primary keys.
241 /// For such tables, Debezium expects the message key to contain only
242 /// the primary key columns.
243 ///
244 /// This option is only valid with the `debezium` update format.
245 pub key_fields: Option<Vec<String>>,
246}
247
248impl Default for JsonEncoderConfig {
249 fn default() -> Self {
250 Self {
251 update_format: JsonUpdateFormat::default(),
252 json_flavor: None,
253 buffer_size_records: 10_000,
254 array: false,
255 key_fields: None,
256 }
257 }
258}