airbyte_protocol/
message.rs

1use serde::{Deserialize, Serialize};
2
3use crate::schema::JsonSchema;
4
5#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
6#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "type")]
7pub enum AirbyteMessage {
8    Catalog {
9        catalog: AirbyteCatalog,
10    },
11    Log {
12        log: AirbyteLogMessage,
13    },
14    Trace {
15        trace: AirbyteTraceMessage,
16    },
17    Spec {
18        spec: ConnectorSpecification,
19    },
20    Control {
21        control: ConnectorSpecification,
22    },
23    ConnectionStatus {
24        #[serde(rename = "connectionStatus")]
25        connection_status: AirbyteConnectionStatus,
26    },
27    State {
28        state: AirbyteStateMessage,
29    },
30    Record {
31        record: AirbyteRecordMessage,
32    },
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36pub struct AirbyteRecordMessage {
37    /// record data
38    pub data: serde_json::Value,
39    /// when the data was emitted from the source. epoch in millisecond.
40    pub emitted_at: i64,
41    /// Information about this record added mid-sync
42    #[serde(skip_serializing_if = "Option::is_none")]
43    pub meta: Option<AirbyteRecordMessageMeta>,
44    /// namespace the data is associated with
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub namespace: Option<String>,
47    /// stream the data is associated with
48    pub stream: String,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
52pub struct AirbyteRecordMessageMeta {
53    /// Lists of changes to the content of this record which occurred during syncing
54    #[serde(skip_serializing_if = "Option::is_none")]
55    pub changes: Option<Vec<AirbyteRecordMessageMetaChange>>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
59pub struct AirbyteRecordMessageMetaChange {
60    /// The type of change that occurred
61    pub change: String,
62    /// The field that had the change occur (required)
63    pub field: String,
64    /// The reason that the change occurred
65    pub reason: String,
66}
67
68/// The type of state the other fields represent. Is set to LEGACY, the state data should be read
69/// from the `data` field for backwards compatibility. If not set, assume the state object is type
70/// LEGACY. GLOBAL means that the state should be read from `global` and means that it represents
71/// the state for all the streams. It contains one shared state and individual stream states.
72/// PER_STREAM means that the state should be read from `stream`. The state present in this field
73/// correspond to the isolated state of the associated stream description.
74///
75#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
76#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "type")]
77pub enum AirbyteStateMessage {
78    Global {
79        global: AirbyteGlobalState,
80        #[serde(rename = "destinationStats", skip_serializing_if = "Option::is_none")]
81        destination_stats: Option<AirbyteStateStats>,
82        #[serde(rename = "sourceStats", skip_serializing_if = "Option::is_none")]
83        source_stats: Option<AirbyteStateStats>,
84    },
85    Stream {
86        stream: AirbyteStreamState,
87        #[serde(rename = "destinationStats", skip_serializing_if = "Option::is_none")]
88        destination_stats: Option<AirbyteStateStats>,
89        #[serde(rename = "sourceStats", skip_serializing_if = "Option::is_none")]
90        source_stats: Option<AirbyteStateStats>,
91    },
92    Legacy {
93        data: serde_json::Value,
94        #[serde(rename = "destinationStats", skip_serializing_if = "Option::is_none")]
95        destination_stats: Option<AirbyteStateStats>,
96        #[serde(rename = "sourceStats", skip_serializing_if = "Option::is_none")]
97        source_stats: Option<AirbyteStateStats>,
98    },
99    #[serde(untagged)]
100    Empty {
101        data: serde_json::Value,
102        #[serde(rename = "destinationStats", skip_serializing_if = "Option::is_none")]
103        destination_stats: Option<AirbyteStateStats>,
104        #[serde(rename = "sourceStats", skip_serializing_if = "Option::is_none")]
105        source_stats: Option<AirbyteStateStats>,
106    },
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
110pub struct AirbyteGlobalState {
111    #[serde(skip_serializing_if = "Option::is_none")]
112    pub shared_state: Option<serde_json::Value>,
113    pub stream_states: Vec<AirbyteStreamState>,
114}
115
116#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
117pub struct AirbyteStreamState {
118    pub stream_descriptor: StreamDescriptor,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    pub stream_state: Option<serde_json::Value>,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
124pub struct StreamDescriptor {
125    pub name: String,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub namespace: Option<String>,
128}
129
130impl StreamDescriptor {
131    pub fn new(name: &str, namespace: Option<&str>) -> Self {
132        Self {
133            name: name.to_owned(),
134            namespace: namespace.map(ToOwned::to_owned),
135        }
136    }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
140pub struct AirbyteStateStats {
141    /// the number of records which were emitted for this state message, for this stream or global.
142    /// While the value should always be a round number, it is defined as a double to account for
143    /// integer overflows, and the value should always have a decimal point for proper
144    /// serialization.
145    #[serde(skip_serializing_if = "Option::is_none")]
146    #[serde(rename = "recordCount")]
147    pub record_count: Option<f64>,
148}
149
150#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
151/// Airbyte stream schema catalog
152pub struct AirbyteCatalog {
153    pub streams: Vec<AirbyteStream>,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
157pub struct AirbyteStream {
158    /// Stream's name.
159    pub name: String,
160    /// Stream schema using Json Schema specs.
161    pub json_schema: JsonSchema,
162    /// Path to the field that will be used to determine if a record is new or modified since the
163    /// last sync. If not provided by the source, the end user will have to specify the comparable
164    /// themselves.
165    #[serde(skip_serializing_if = "Option::is_none")]
166    pub default_cursor_field: Option<Vec<String>>,
167    /// If the stream is resumable or not. Should be set to true if the stream supports
168    /// incremental. Defaults to false.
169    /// Primarily used by the Platform in Full Refresh to determine if a Full Refresh stream should
170    /// actually be treated as incremental within a job.
171    #[serde(skip_serializing_if = "Option::is_none")]
172    pub is_resumable: Option<bool>,
173    /// Optional Source-defined namespace. Currently only used by JDBC destinations to determine
174    /// what schema to write to. Airbyte streams from the same sources should have the same
175    /// namespace.
176    #[serde(skip_serializing_if = "Option::is_none")]
177    pub namespace: Option<String>,
178    /// If the source defines the cursor field, then any other cursor field inputs will be ignored.
179    /// If it does not,
180    /// either the user_provided one is used, or the default one is used as a backup. This field
181    /// must be set if
182    /// is_resumable is set to true, including resumable full refresh synthetic cursors.
183    #[serde(skip_serializing_if = "Option::is_none")]
184    pub source_defined_cursor: Option<bool>,
185    /// If the source defines the primary key, paths to the fields that will be used as a primary
186    /// key. If not provided by the source, the end user will have to specify the primary key
187    /// themselves.
188    #[serde(skip_serializing_if = "Option::is_none")]
189    pub source_defined_primary_key: Option<Vec<Vec<String>>>,
190    /// List of sync modes supported by this stream.
191    pub supported_sync_modes: Vec<SyncMode>,
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
195/// Airbyte stream schema catalog
196pub struct ConfiguredAirbyteCatalog {
197    pub streams: Vec<ConfiguredAirbyteStream>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
201pub struct ConfiguredAirbyteStream {
202    /// Path to the field that will be used to determine if a record is new or modified since the
203    /// last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored.
204    #[serde(skip_serializing_if = "Option::is_none")]
205    pub cursor_field: Option<Vec<String>>,
206    pub destination_sync_mode: DestinationSyncMode,
207    /// Monotically increasing numeric id representing the current generation of a stream. This id
208    /// can be shared across syncs.
209    /// If this is null, it means that the platform is not supporting the refresh and it is
210    /// expected that no extra id will be added to the records and no data from previous generation
211    /// will be cleanup.
212    #[serde(skip_serializing_if = "Option::is_none")]
213    pub generation_id: Option<i64>,
214    /// The minimum generation id which is needed in a stream. If it is present, the destination
215    /// will try to delete the data that are part of a generation lower than this property. If the
216    /// minimum generation is equals to 0, no data deletion is expected from the destiantion
217    /// If this is null, it means that the platform is not supporting the refresh and it is
218    /// expected that no extra id will be added to the records and no data from previous generation
219    /// will be cleanup.
220    #[serde(skip_serializing_if = "Option::is_none")]
221    pub minimum_generation_id: Option<i64>,
222    /// Paths to the fields that will be used as primary key. This field is REQUIRED if
223    /// `destination_sync_mode` is `*_dedup`. Otherwise it is ignored.
224    #[serde(skip_serializing_if = "Option::is_none")]
225    pub primary_key: Option<Vec<Vec<String>>>,
226    pub stream: AirbyteStream,
227    /// Monotically increasing numeric id representing the current sync id. This is aimed to be
228    /// unique per sync.
229    /// If this is null, it means that the platform is not supporting the refresh and it is
230    /// expected that no extra id will be added to the records and no data from previous generation
231    /// will be cleanup.
232    #[serde(skip_serializing_if = "Option::is_none")]
233    pub sync_id: Option<i64>,
234    pub sync_mode: SyncMode,
235}
236
237#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
238/// Airbyte connection status
239pub struct AirbyteConnectionStatus {
240    #[serde(skip_serializing_if = "Option::is_none")]
241    pub message: Option<String>,
242    pub status: String,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
246#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "type")]
247pub enum AirbyteControlMessage {
248    ConnectorConfig {
249        connector_config: AirbyteControlConnectorConfigMessage,
250        emitted_at: f64,
251    },
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
255pub struct AirbyteControlConnectorConfigMessage {
256    /// the config items from this connector's spec to update
257    pub config: serde_json::Value,
258}
259
260/// Specification of a connector (source/destination)
261#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
262pub struct ConnectorSpecification {
263    /// Additional and optional specification object to describe what an 'advanced' Auth flow would
264    /// need to function.
265    ///   - A connector should be able to fully function with the configuration as described by the
266    /// ConnectorSpecification in a 'basic' mode.
267    ///   - The 'advanced' mode provides easier UX for the user with UI improvements and
268    /// automations. However, this requires further setup on the
269    ///   server side by instance or workspace admins beforehand. The trade-off is that the user
270    /// does not have to provide as many technical
271    ///   inputs anymore and the auth process is faster and easier to complete.
272    #[serde(skip_serializing_if = "Option::is_none")]
273    pub advanced_auth: Option<ConnectorSpecificationAdvancedAuth>,
274    #[serde(skip_serializing_if = "Option::is_none")]
275    #[serde(rename = "changelogUrl")]
276    pub changelog_url: Option<String>,
277    /// ConnectorDefinition specific blob. Must be a valid JSON string.
278    #[serde(rename = "connectionSpecification")]
279    pub connection_specification: serde_json::Value,
280    #[serde(skip_serializing_if = "Option::is_none")]
281    #[serde(rename = "documentationUrl")]
282    pub documentation_url: Option<String>,
283    /// the Airbyte Protocol version supported by the connector. Protocol versioning uses SemVer.
284    #[serde(skip_serializing_if = "Option::is_none")]
285    pub protocol_version: Option<String>,
286    /// List of destination sync modes supported by the connector
287    #[serde(skip_serializing_if = "Option::is_none")]
288    pub supported_destination_sync_modes: Option<Vec<DestinationSyncMode>>,
289    /// If the connector supports DBT or not.
290    #[serde(skip_serializing_if = "Option::is_none")]
291    #[serde(rename = "supportsDBT")]
292    pub supports_dbt: Option<bool>,
293    /// (deprecated) If the connector supports incremental mode or not.
294    #[serde(skip_serializing_if = "Option::is_none")]
295    #[serde(rename = "supportsIncremental")]
296    pub supports_incremental: Option<bool>,
297    /// If the connector supports normalization or not.
298    #[serde(skip_serializing_if = "Option::is_none")]
299    #[serde(rename = "supportsNormalization")]
300    pub supports_normalization: Option<bool>,
301}
302
303#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
304pub struct ConnectorSpecificationAdvancedAuth {
305    #[serde(skip_serializing_if = "Option::is_none")]
306    pub auth_flow_type: Option<String>,
307    #[serde(skip_serializing_if = "Option::is_none")]
308    pub oauth_config_specification: Option<OauthConfigSpecification>,
309    /// Json Path to a field in the connectorSpecification that should exist for the advanced auth
310    /// to be applicable.
311    #[serde(skip_serializing_if = "Option::is_none")]
312    pub predicate_key: Option<Vec<String>>,
313    /// Value of the predicate_key fields for the advanced auth to be applicable.
314    #[serde(skip_serializing_if = "Option::is_none")]
315    pub predicate_value: Option<String>,
316}
317
318#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
319#[serde(rename = "OAuthConfigSpecification")]
320pub struct OauthConfigSpecification {
321    /// OAuth specific blob. This is a Json Schema used to validate Json configurations produced by
322    /// the OAuth flows as they are
323    /// returned by the distant OAuth APIs.
324    /// Must be a valid JSON describing the fields to merge back to
325    /// `ConnectorSpecification.connectionSpecification`.
326    /// For each field, a special annotation `path_in_connector_config` can be specified to
327    /// determine where to merge it,
328    ///
329    #[serde(skip_serializing_if = "Option::is_none")]
330    pub complete_oauth_output_specification: Option<serde_json::Value>,
331    /// OAuth specific blob. This is a Json Schema used to validate Json configurations persisted
332    /// as Airbyte Server configurations.
333    /// Must be a valid non-nested JSON describing additional fields configured by the Airbyte
334    /// Instance or Workspace Admins to be used by the
335    /// server when completing an OAuth flow (typically exchanging an auth code for refresh token).
336    ///
337    #[serde(skip_serializing_if = "Option::is_none")]
338    pub complete_oauth_server_input_specification: Option<serde_json::Value>,
339    /// OAuth specific blob. This is a Json Schema used to validate Json configurations persisted
340    /// as Airbyte Server configurations that
341    /// also need to be merged back into the connector configuration at runtime.
342    /// This is a subset configuration of `complete_oauth_server_input_specification` that filters
343    /// fields out to retain only the ones that
344    /// are necessary for the connector to function with OAuth. (some fields could be used during
345    /// oauth flows but not needed afterwards, therefore
346    /// they would be listed in the `complete_oauth_server_input_specification` but not
347    /// `complete_oauth_server_output_specification`)
348    /// Must be a valid non-nested JSON describing additional fields configured by the Airbyte
349    /// Instance or Workspace Admins to be used by the
350    /// connector when using OAuth flow APIs.
351    /// These fields are to be merged back to `ConnectorSpecification.connectionSpecification`.
352    /// For each field, a special annotation `path_in_connector_config` can be specified to
353    /// determine where to merge it,
354    ///
355    #[serde(skip_serializing_if = "Option::is_none")]
356    pub complete_oauth_server_output_specification: Option<serde_json::Value>,
357    /// OAuth specific blob. This is a Json Schema used to validate Json configurations used as
358    /// input to OAuth.
359    /// Must be a valid non-nested JSON that refers to properties from
360    /// ConnectorSpecification.connectionSpecification
361    /// using special annotation 'path_in_connector_config'.
362    /// These are input values the user is entering through the UI to authenticate to the
363    /// connector, that might also shared
364    /// as inputs for syncing data via the connector.
365    ///
366    #[serde(skip_serializing_if = "Option::is_none")]
367    pub oauth_user_input_from_connector_config_specification: Option<serde_json::Value>,
368}
369
370#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
371pub enum DestinationSyncMode {
372    #[serde(rename = "append")]
373    Append,
374    #[serde(rename = "overwrite")]
375    Overwrite,
376    #[serde(rename = "append_dedup")]
377    AppendDedup,
378}
379
380#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
381pub struct AirbyteLogMessage {
382    /// log level
383    pub level: String,
384    /// log message
385    pub message: String,
386    /// an optional stack trace if the log message corresponds to an exception
387    #[serde(skip_serializing_if = "Option::is_none")]
388    pub stack_trace: Option<String>,
389}
390
391#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
392pub enum SyncMode {
393    #[default]
394    #[serde(rename = "full_refresh")]
395    FullRefresh,
396    #[serde(rename = "incremental")]
397    Incremental,
398}
399
400#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
401#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "type")]
402pub enum AirbyteTraceMessage {
403    Analytics {
404        analytics: AirbyteAnalyticsTraceMessage,
405        emited_at: f64,
406    },
407    Error {
408        error: AirbyteErrorTraceMessage,
409        emitted_at: f64,
410    },
411    Estimate {
412        estimate: AirbyteEstimateTraceMessage,
413        emitted_at: f64,
414    },
415    StreamStatus {
416        stream_status: AirbyteStreamStatusTraceMessage,
417        emitted_at: f64,
418    },
419}
420
421#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
422pub struct AirbyteStreamStatusTraceMessage {
423    /// The reasons associated with the status of the stream
424    #[serde(skip_serializing_if = "Option::is_none")]
425    pub reasons: Option<Vec<AirbyteStreamStatusReason>>,
426    /// The current status of the stream
427    pub status: AirbyteStreamStatus,
428    /// The stream associated with the status
429    pub stream_descriptor: StreamDescriptor,
430}
431
432/// The current status of a stream within the context of an executing synchronization job.
433///
434#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
435#[serde(rename_all = "UPPERCASE")]
436pub enum AirbyteStreamStatus {
437    Started,
438    Running,
439    Complete,
440    Incomplete,
441}
442
443/// The reason associated with the status of the stream.
444///
445#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
446pub struct AirbyteStreamStatusReason {
447    #[serde(skip_serializing_if = "Option::is_none")]
448    pub rate_limited: Option<AirbyteStreamStatusRateLimitedReason>,
449    #[serde(rename = "type")]
450    pub r#type: AirbyteStreamStatusReasonType,
451}
452
453/// Rate Limited Information
454#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
455pub struct AirbyteStreamStatusRateLimitedReason {
456    /// Optional time in ms representing when the API quota is going to be reset
457    #[serde(skip_serializing_if = "Option::is_none")]
458    pub quota_reset: Option<i64>,
459}
460
461/// Type of reason
462///
463#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
464pub enum AirbyteStreamStatusReasonType {
465    #[serde(rename = "RATE_LIMITED")]
466    RateLimited,
467}
468
469#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
470pub struct AirbyteEstimateTraceMessage {
471    /// The estimated number of bytes to be emitted by this sync for this stream
472    #[serde(skip_serializing_if = "Option::is_none")]
473    pub byte_estimate: Option<i64>,
474    /// The name of the stream
475    pub name: String,
476    /// The namespace of the stream
477    #[serde(skip_serializing_if = "Option::is_none")]
478    pub namespace: Option<String>,
479    /// The estimated number of rows to be emitted by this sync for this stream
480    #[serde(skip_serializing_if = "Option::is_none")]
481    pub row_estimate: Option<i64>,
482}
483
484/// A message to communicate usage information about the connector which is not captured by regular
485/// sync analytics because it's specific to the connector internals.
486/// This is useful to understand how the connector is used and how to improve it. Each message is
487/// an event with a type and an optional payload value (both of them being strings). The event
488/// types should not be dynamically generated but defined statically. The payload value is optional
489/// and can contain arbitrary strings.
490#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
491pub struct AirbyteAnalyticsTraceMessage {
492    /// The value of the event - can be an arbitrary string. In case the value is numeric, it
493    /// should be converted to a string. Casting for analytics purposes can happen in the
494    /// warehouse.
495    #[serde(skip_serializing_if = "Option::is_none")]
496    pub value: Option<String>,
497}
498
499#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
500pub struct AirbyteErrorTraceMessage {
501    /// The type of error
502    #[serde(skip_serializing_if = "Option::is_none")]
503    pub failure_type: Option<String>,
504    /// The internal error that caused the failure
505    #[serde(skip_serializing_if = "Option::is_none")]
506    pub internal_message: Option<String>,
507    /// A user-friendly message that indicates the cause of the error
508    pub message: String,
509    /// The full stack trace of the error
510    #[serde(skip_serializing_if = "Option::is_none")]
511    pub stack_trace: Option<String>,
512    /// The stream associated with the error, if known (optional)
513    #[serde(skip_serializing_if = "Option::is_none")]
514    pub stream_descriptor: Option<StreamDescriptor>,
515}
516
517#[cfg(test)]
518pub mod tests {
519
520    use serde_json::json;
521
522    use crate::message::{AirbyteMessage, AirbyteRecordMessage};
523
524    #[test]
525    fn test_record() {
526        let input = r#"{"type": "RECORD", "record": { "stream": "users", "data": {"id": 1, "name": "Chris"}, "emitted_at": 1}}"#;
527
528        let record: AirbyteMessage = serde_json::from_str(input).unwrap();
529
530        let expected = AirbyteMessage::Record {
531            record: AirbyteRecordMessage {
532                stream: "users".to_string(),
533                data: json!({"id": 1, "name": "Chris"}),
534                emitted_at: 1,
535                meta: None,
536                namespace: None,
537            },
538        };
539        assert_eq!(record, expected);
540    }
541}