feldera_types/format/
json.rs

1use serde::{Deserialize, Serialize};
2use std::fmt::Display;
3use utoipa::ToSchema;
4
5/// JSON parser configuration.
6///
7/// Describes the shape of an input JSON stream.
8///
9/// # Examples
10///
11/// A configuration with `update_format="raw"` and `array=false`
12/// is used to parse a stream of JSON objects without any envelope
13/// that get inserted in the input table.
14///
15/// ```json
16/// {"b": false, "i": 100, "s": "foo"}
17/// {"b": true, "i": 5, "s": "bar"}
18/// ```
19///
20/// A configuration with `update_format="insert_delete"` and
21/// `array=false` is used to parse a stream of JSON data change events
22/// in the insert/delete format:
23///
24/// ```json
25/// {"delete": {"b": false, "i": 15, "s": ""}}
26/// {"insert": {"b": false, "i": 100, "s": "foo"}}
27/// ```
28///
29/// A configuration with `update_format="insert_delete"` and
30/// `array=true` is used to parse a stream of JSON arrays
31/// where each array contains multiple data change events in
32/// the insert/delete format.
33///
34/// ```json
35/// [{"insert": {"b": true, "i": 0}}, {"delete": {"b": false, "i": 100, "s": "foo"}}]
36/// ```
37#[derive(Clone, Debug, Default, Deserialize, Serialize, ToSchema)]
38#[serde(default)]
39pub struct JsonParserConfig {
40    /// JSON update format.
41    pub update_format: JsonUpdateFormat,
42
43    /// Specifies JSON encoding used for individual table records.
44    pub json_flavor: JsonFlavor,
45
46    /// Set to `true` if updates in this stream are packaged into JSON arrays.
47    ///
48    /// # Example
49    ///
50    /// ```json
51    /// [{"b": true, "i": 0},{"b": false, "i": 100, "s": "foo"}]
52    /// ```
53    pub array: bool,
54
55    /// Whether JSON elements can span multiple lines.
56    ///
57    /// This only affects JSON input.
58    pub lines: JsonLines,
59}
60
61/// Whether JSON values can span multiple lines.
62#[derive(Deserialize, Serialize, Clone, Debug, Default, PartialEq, Eq, ToSchema)]
63pub enum JsonLines {
64    /// JSON values may span multiple lines.
65    ///
66    /// This supports general-purpose JSON input.
67    #[default]
68    #[serde(rename = "multiple")]
69    Multiple,
70
71    /// A given JSON value never contains a new-line.
72    ///
73    /// Suitable for parsing [NDJSON](https://github.com/ndjson/ndjson-spec).
74    /// Lines are allowed to contain multiple JSON values.  The parser ignores
75    /// empty lines.
76    #[serde(rename = "single")]
77    Single,
78}
79
80/// Supported JSON data change event formats.
81///
82/// Each element in a JSON-formatted input stream specifies
83/// an update to one or more records in an input table.  We support
84/// several different ways to represent such updates.
85///
86/// ### `InsertDelete`
87///
88/// Each element in the input stream consists of an "insert" or "delete"
89/// command and a record to be inserted to or deleted from the input table.
90///
91/// ```json
92/// {"insert": {"column1": "hello, world!", "column2": 100}}
93/// ```
94///
95/// ### `Weighted`
96///
97/// Each element in the input stream consists of a record and a weight
98/// which indicates how many times the row appears.
99///
100/// ```json
101/// {"weight": 2, "data": {"column1": "hello, world!", "column2": 100}}
102/// ```
103///
104/// Note that the line above would be equivalent to the following input in the `InsertDelete` format:
105///
106/// ```json
107/// {"insert": {"column1": "hello, world!", "column2": 100}}
108/// {"insert": {"column1": "hello, world!", "column2": 100}}
109/// ```
110///
111/// Similarly, negative weights are equivalent to deletions:
112///
113/// ```json
114/// {"weight": -1, "data": {"column1": "hello, world!", "column2": 100}}
115/// ```
116///
117/// is equivalent to in the `InsertDelete` format:
118///
119/// ```json
120/// {"delete": {"column1": "hello, world!", "column2": 100}}
121/// ```
122///
123/// ### `Debezium`
124///
125/// Debezium CDC format.  Refer to [Debezium input connector documentation](https://docs.feldera.com/connectors/sources/debezium) for details.
126///
127/// ### `Snowflake`
128///
129/// Uses flat structure so that fields can get parsed directly into SQL
130/// columns.  Defines three metadata fields:
131///
132/// * `__action` - "insert" or "delete"
133/// * `__stream_id` - unique 64-bit ID of the output stream (records within
134///   a stream are totally ordered)
135/// * `__seq_number` - monotonically increasing sequence number relative to
136///   the start of the stream.
137///
138/// ```json
139/// {"PART":1,"VENDOR":2,"EFFECTIVE_SINCE":"2019-05-21","PRICE":"10000","__action":"insert","__stream_id":4523666124030717756,"__seq_number":1}
140/// ```
141///
142/// ### `Raw`
143///
144/// This format is suitable for insert-only streams (no deletions).
145/// Each element in the input stream contains a record without any
146/// additional envelope that gets inserted in the input table.
147#[derive(Deserialize, Serialize, Clone, Debug, Default, PartialEq, Eq, ToSchema)]
148pub enum JsonUpdateFormat {
149    #[default]
150    #[serde(rename = "insert_delete")]
151    InsertDelete,
152
153    #[serde(rename = "weighted")]
154    Weighted,
155
156    #[serde(rename = "debezium")]
157    Debezium,
158
159    #[serde(rename = "snowflake")]
160    Snowflake,
161
162    #[serde(rename = "raw")]
163    Raw,
164
165    #[serde(rename = "redis")]
166    Redis,
167}
168
169impl Display for JsonUpdateFormat {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        match self {
172            JsonUpdateFormat::InsertDelete => write!(f, "insert_delete"),
173            JsonUpdateFormat::Weighted => write!(f, "weighted"),
174            JsonUpdateFormat::Debezium => write!(f, "debezium"),
175            JsonUpdateFormat::Snowflake => write!(f, "snowflake"),
176            JsonUpdateFormat::Raw => write!(f, "raw"),
177            JsonUpdateFormat::Redis => write!(f, "redis"),
178        }
179    }
180}
181
182/// Specifies JSON encoding used of table records.
183#[derive(Clone, Default, Deserialize, Serialize, Debug, PartialEq, Eq, ToSchema)]
184pub enum JsonFlavor {
185    /// Default encoding used by Feldera, documented
186    /// [here](https://docs.feldera.com/formats/json#types).
187    #[default]
188    #[serde(rename = "default")]
189    Default,
190    /// Debezium MySQL JSON produced by the default configuration of the
191    /// Debezium [Kafka Connect connector](https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-data-types)
192    /// with `decimal.handling.mode` set to "string".
193    #[serde(rename = "debezium_mysql")]
194    DebeziumMySql,
195    /// Debezium Postgres JSON produced by the default configuration of the
196    /// Debezium [Kafka Connect connector](https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-data-types)
197    /// with `decimal.handling.mode` set to "string".
198    #[serde(rename = "debezium_postgres")]
199    DebeziumPostgres,
200    /// JSON format accepted by Snowflake using default settings.
201    #[serde(rename = "snowflake")]
202    Snowflake,
203    /// JSON format accepted by the Kafka Connect `JsonConverter` class.
204    #[serde(rename = "kafka_connect_json_converter")]
205    KafkaConnectJsonConverter,
206    #[serde(rename = "pandas")]
207    Pandas,
208    /// JSON format used to represent blockchain data.
209    /// Uses Base58 encoding for binary data such as Bitcoin addresses.
210    #[serde(rename = "blockchain")]
211    Blockchain,
212    /// JSON format with "0x"-prefixed hexadecimal encoding for binary data.
213    #[serde(rename = "c_hex")]
214    CHex,
215    /// Parquet to-json format.
216    /// (For internal use only)
217    #[serde(skip)]
218    ParquetConverter,
219    /// Used by the clock input connector.
220    ClockInput,
221    /// Datagen format.
222    /// (For internal use only)
223    #[serde(rename = "datagen")]
224    Datagen,
225    /// Postgres format.
226    /// (For internal use only)
227    #[serde(rename = "postgres")]
228    Postgres,
229}
230
231// TODO: support multiple update formats, e.g., `WeightedUpdate`
232// supports arbitrary weights beyond `MAX_DUPLICATES`.
233#[derive(Deserialize, Serialize, ToSchema)]
234#[serde(default)]
235pub struct JsonEncoderConfig {
236    pub update_format: JsonUpdateFormat,
237    pub json_flavor: Option<JsonFlavor>,
238    pub buffer_size_records: usize,
239    pub array: bool,
240
241    /// When this option is set, only the listed fields appear in the Debezium message key.
242    ///
243    /// This option is useful when writing to a table with primary keys.
244    /// For such tables, Debezium expects the message key to contain only
245    /// the primary key columns.
246    ///
247    /// This option is only valid with the `debezium` update format.
248    pub key_fields: Option<Vec<String>>,
249}
250
251impl Default for JsonEncoderConfig {
252    fn default() -> Self {
253        Self {
254            update_format: JsonUpdateFormat::default(),
255            json_flavor: None,
256            buffer_size_records: 10_000,
257            array: false,
258            key_fields: None,
259        }
260    }
261}