1use serde::{Deserialize, Serialize};
2
3use crate::schema::JsonSchema;
4
5#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
6#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "type")]
7pub enum AirbyteMessage {
8 Catalog {
9 catalog: AirbyteCatalog,
10 },
11 Log {
12 log: AirbyteLogMessage,
13 },
14 Trace {
15 trace: AirbyteTraceMessage,
16 },
17 Spec {
18 spec: ConnectorSpecification,
19 },
20 Control {
21 control: ConnectorSpecification,
22 },
23 ConnectionStatus {
24 #[serde(rename = "connectionStatus")]
25 connection_status: AirbyteConnectionStatus,
26 },
27 State {
28 state: AirbyteStateMessage,
29 },
30 Record {
31 record: AirbyteRecordMessage,
32 },
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36pub struct AirbyteRecordMessage {
37 pub data: serde_json::Value,
39 pub emitted_at: i64,
41 #[serde(skip_serializing_if = "Option::is_none")]
43 pub meta: Option<AirbyteRecordMessageMeta>,
44 #[serde(skip_serializing_if = "Option::is_none")]
46 pub namespace: Option<String>,
47 pub stream: String,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
52pub struct AirbyteRecordMessageMeta {
53 #[serde(skip_serializing_if = "Option::is_none")]
55 pub changes: Option<Vec<AirbyteRecordMessageMetaChange>>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
59pub struct AirbyteRecordMessageMetaChange {
60 pub change: String,
62 pub field: String,
64 pub reason: String,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
76#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "type")]
77pub enum AirbyteStateMessage {
78 Global {
79 global: AirbyteGlobalState,
80 #[serde(rename = "destinationStats", skip_serializing_if = "Option::is_none")]
81 destination_stats: Option<AirbyteStateStats>,
82 #[serde(rename = "sourceStats", skip_serializing_if = "Option::is_none")]
83 source_stats: Option<AirbyteStateStats>,
84 },
85 Stream {
86 stream: AirbyteStreamState,
87 #[serde(rename = "destinationStats", skip_serializing_if = "Option::is_none")]
88 destination_stats: Option<AirbyteStateStats>,
89 #[serde(rename = "sourceStats", skip_serializing_if = "Option::is_none")]
90 source_stats: Option<AirbyteStateStats>,
91 },
92 Legacy {
93 data: serde_json::Value,
94 #[serde(rename = "destinationStats", skip_serializing_if = "Option::is_none")]
95 destination_stats: Option<AirbyteStateStats>,
96 #[serde(rename = "sourceStats", skip_serializing_if = "Option::is_none")]
97 source_stats: Option<AirbyteStateStats>,
98 },
99 #[serde(untagged)]
100 Empty {
101 data: serde_json::Value,
102 #[serde(rename = "destinationStats", skip_serializing_if = "Option::is_none")]
103 destination_stats: Option<AirbyteStateStats>,
104 #[serde(rename = "sourceStats", skip_serializing_if = "Option::is_none")]
105 source_stats: Option<AirbyteStateStats>,
106 },
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
110pub struct AirbyteGlobalState {
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub shared_state: Option<serde_json::Value>,
113 pub stream_states: Vec<AirbyteStreamState>,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
117pub struct AirbyteStreamState {
118 pub stream_descriptor: StreamDescriptor,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 pub stream_state: Option<serde_json::Value>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
124pub struct StreamDescriptor {
125 pub name: String,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub namespace: Option<String>,
128}
129
130impl StreamDescriptor {
131 pub fn new(name: &str, namespace: Option<&str>) -> Self {
132 Self {
133 name: name.to_owned(),
134 namespace: namespace.map(ToOwned::to_owned),
135 }
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
140pub struct AirbyteStateStats {
141 #[serde(skip_serializing_if = "Option::is_none")]
146 #[serde(rename = "recordCount")]
147 pub record_count: Option<f64>,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
151pub struct AirbyteCatalog {
153 pub streams: Vec<AirbyteStream>,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
157pub struct AirbyteStream {
158 pub name: String,
160 pub json_schema: JsonSchema,
162 #[serde(skip_serializing_if = "Option::is_none")]
166 pub default_cursor_field: Option<Vec<String>>,
167 #[serde(skip_serializing_if = "Option::is_none")]
172 pub is_resumable: Option<bool>,
173 #[serde(skip_serializing_if = "Option::is_none")]
177 pub namespace: Option<String>,
178 #[serde(skip_serializing_if = "Option::is_none")]
184 pub source_defined_cursor: Option<bool>,
185 #[serde(skip_serializing_if = "Option::is_none")]
189 pub source_defined_primary_key: Option<Vec<Vec<String>>>,
190 pub supported_sync_modes: Vec<SyncMode>,
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
195pub struct ConfiguredAirbyteCatalog {
197 pub streams: Vec<ConfiguredAirbyteStream>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
201pub struct ConfiguredAirbyteStream {
202 #[serde(skip_serializing_if = "Option::is_none")]
205 pub cursor_field: Option<Vec<String>>,
206 pub destination_sync_mode: DestinationSyncMode,
207 #[serde(skip_serializing_if = "Option::is_none")]
213 pub generation_id: Option<i64>,
214 #[serde(skip_serializing_if = "Option::is_none")]
221 pub minimum_generation_id: Option<i64>,
222 #[serde(skip_serializing_if = "Option::is_none")]
225 pub primary_key: Option<Vec<Vec<String>>>,
226 pub stream: AirbyteStream,
227 #[serde(skip_serializing_if = "Option::is_none")]
233 pub sync_id: Option<i64>,
234 pub sync_mode: SyncMode,
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
238pub struct AirbyteConnectionStatus {
240 #[serde(skip_serializing_if = "Option::is_none")]
241 pub message: Option<String>,
242 pub status: String,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
246#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "type")]
247pub enum AirbyteControlMessage {
248 ConnectorConfig {
249 connector_config: AirbyteControlConnectorConfigMessage,
250 emitted_at: f64,
251 },
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
255pub struct AirbyteControlConnectorConfigMessage {
256 pub config: serde_json::Value,
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
262pub struct ConnectorSpecification {
263 #[serde(skip_serializing_if = "Option::is_none")]
273 pub advanced_auth: Option<ConnectorSpecificationAdvancedAuth>,
274 #[serde(skip_serializing_if = "Option::is_none")]
275 #[serde(rename = "changelogUrl")]
276 pub changelog_url: Option<String>,
277 #[serde(rename = "connectionSpecification")]
279 pub connection_specification: serde_json::Value,
280 #[serde(skip_serializing_if = "Option::is_none")]
281 #[serde(rename = "documentationUrl")]
282 pub documentation_url: Option<String>,
283 #[serde(skip_serializing_if = "Option::is_none")]
285 pub protocol_version: Option<String>,
286 #[serde(skip_serializing_if = "Option::is_none")]
288 pub supported_destination_sync_modes: Option<Vec<DestinationSyncMode>>,
289 #[serde(skip_serializing_if = "Option::is_none")]
291 #[serde(rename = "supportsDBT")]
292 pub supports_dbt: Option<bool>,
293 #[serde(skip_serializing_if = "Option::is_none")]
295 #[serde(rename = "supportsIncremental")]
296 pub supports_incremental: Option<bool>,
297 #[serde(skip_serializing_if = "Option::is_none")]
299 #[serde(rename = "supportsNormalization")]
300 pub supports_normalization: Option<bool>,
301}
302
303#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
304pub struct ConnectorSpecificationAdvancedAuth {
305 #[serde(skip_serializing_if = "Option::is_none")]
306 pub auth_flow_type: Option<String>,
307 #[serde(skip_serializing_if = "Option::is_none")]
308 pub oauth_config_specification: Option<OauthConfigSpecification>,
309 #[serde(skip_serializing_if = "Option::is_none")]
312 pub predicate_key: Option<Vec<String>>,
313 #[serde(skip_serializing_if = "Option::is_none")]
315 pub predicate_value: Option<String>,
316}
317
318#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
319#[serde(rename = "OAuthConfigSpecification")]
320pub struct OauthConfigSpecification {
321 #[serde(skip_serializing_if = "Option::is_none")]
330 pub complete_oauth_output_specification: Option<serde_json::Value>,
331 #[serde(skip_serializing_if = "Option::is_none")]
338 pub complete_oauth_server_input_specification: Option<serde_json::Value>,
339 #[serde(skip_serializing_if = "Option::is_none")]
356 pub complete_oauth_server_output_specification: Option<serde_json::Value>,
357 #[serde(skip_serializing_if = "Option::is_none")]
367 pub oauth_user_input_from_connector_config_specification: Option<serde_json::Value>,
368}
369
370#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
371pub enum DestinationSyncMode {
372 #[serde(rename = "append")]
373 Append,
374 #[serde(rename = "overwrite")]
375 Overwrite,
376 #[serde(rename = "append_dedup")]
377 AppendDedup,
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
381pub struct AirbyteLogMessage {
382 pub level: String,
384 pub message: String,
386 #[serde(skip_serializing_if = "Option::is_none")]
388 pub stack_trace: Option<String>,
389}
390
391#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
392pub enum SyncMode {
393 #[default]
394 #[serde(rename = "full_refresh")]
395 FullRefresh,
396 #[serde(rename = "incremental")]
397 Incremental,
398}
399
400#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
401#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "type")]
402pub enum AirbyteTraceMessage {
403 Analytics {
404 analytics: AirbyteAnalyticsTraceMessage,
405 emited_at: f64,
406 },
407 Error {
408 error: AirbyteErrorTraceMessage,
409 emitted_at: f64,
410 },
411 Estimate {
412 estimate: AirbyteEstimateTraceMessage,
413 emitted_at: f64,
414 },
415 StreamStatus {
416 stream_status: AirbyteStreamStatusTraceMessage,
417 emitted_at: f64,
418 },
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
422pub struct AirbyteStreamStatusTraceMessage {
423 #[serde(skip_serializing_if = "Option::is_none")]
425 pub reasons: Option<Vec<AirbyteStreamStatusReason>>,
426 pub status: AirbyteStreamStatus,
428 pub stream_descriptor: StreamDescriptor,
430}
431
432#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
435#[serde(rename_all = "UPPERCASE")]
436pub enum AirbyteStreamStatus {
437 Started,
438 Running,
439 Complete,
440 Incomplete,
441}
442
443#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
446pub struct AirbyteStreamStatusReason {
447 #[serde(skip_serializing_if = "Option::is_none")]
448 pub rate_limited: Option<AirbyteStreamStatusRateLimitedReason>,
449 #[serde(rename = "type")]
450 pub r#type: AirbyteStreamStatusReasonType,
451}
452
453#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
455pub struct AirbyteStreamStatusRateLimitedReason {
456 #[serde(skip_serializing_if = "Option::is_none")]
458 pub quota_reset: Option<i64>,
459}
460
461#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
464pub enum AirbyteStreamStatusReasonType {
465 #[serde(rename = "RATE_LIMITED")]
466 RateLimited,
467}
468
469#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
470pub struct AirbyteEstimateTraceMessage {
471 #[serde(skip_serializing_if = "Option::is_none")]
473 pub byte_estimate: Option<i64>,
474 pub name: String,
476 #[serde(skip_serializing_if = "Option::is_none")]
478 pub namespace: Option<String>,
479 #[serde(skip_serializing_if = "Option::is_none")]
481 pub row_estimate: Option<i64>,
482}
483
484#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
491pub struct AirbyteAnalyticsTraceMessage {
492 #[serde(skip_serializing_if = "Option::is_none")]
496 pub value: Option<String>,
497}
498
499#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
500pub struct AirbyteErrorTraceMessage {
501 #[serde(skip_serializing_if = "Option::is_none")]
503 pub failure_type: Option<String>,
504 #[serde(skip_serializing_if = "Option::is_none")]
506 pub internal_message: Option<String>,
507 pub message: String,
509 #[serde(skip_serializing_if = "Option::is_none")]
511 pub stack_trace: Option<String>,
512 #[serde(skip_serializing_if = "Option::is_none")]
514 pub stream_descriptor: Option<StreamDescriptor>,
515}
516
517#[cfg(test)]
518pub mod tests {
519
520 use serde_json::json;
521
522 use crate::message::{AirbyteMessage, AirbyteRecordMessage};
523
524 #[test]
525 fn test_record() {
526 let input = r#"{"type": "RECORD", "record": { "stream": "users", "data": {"id": 1, "name": "Chris"}, "emitted_at": 1}}"#;
527
528 let record: AirbyteMessage = serde_json::from_str(input).unwrap();
529
530 let expected = AirbyteMessage::Record {
531 record: AirbyteRecordMessage {
532 stream: "users".to_string(),
533 data: json!({"id": 1, "name": "Chris"}),
534 emitted_at: 1,
535 meta: None,
536 namespace: None,
537 },
538 };
539 assert_eq!(record, expected);
540 }
541}