use serde::{Deserialize, Serialize};
use crate::schema::JsonSchema;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "type")]
pub enum AirbyteMessage {
Catalog {
catalog: AirbyteCatalog,
},
Log {
log: AirbyteLogMessage,
},
Trace {
trace: AirbyteTraceMessage,
},
Spec {
spec: ConnectorSpecification,
},
Control {
control: ConnectorSpecification,
},
ConnectionStatus {
#[serde(rename = "connectionStatus")]
connection_status: AirbyteConnectionStatus,
},
State {
state: AirbyteStateMessage,
},
Record {
record: AirbyteRecordMessage,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteRecordMessage {
pub data: serde_json::Value,
pub emitted_at: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub meta: Option<AirbyteRecordMessageMeta>,
#[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
pub stream: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteRecordMessageMeta {
#[serde(skip_serializing_if = "Option::is_none")]
pub changes: Option<Vec<AirbyteRecordMessageMetaChange>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteRecordMessageMetaChange {
pub change: String,
pub field: String,
pub reason: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "type")]
pub enum AirbyteStateMessage {
Global {
global: AirbyteGlobalState,
#[serde(rename = "destinationStats", skip_serializing_if = "Option::is_none")]
destination_stats: Option<AirbyteStateStats>,
#[serde(rename = "sourceStats", skip_serializing_if = "Option::is_none")]
source_stats: Option<AirbyteStateStats>,
},
Stream {
stream: AirbyteStreamState,
#[serde(rename = "destinationStats", skip_serializing_if = "Option::is_none")]
destination_stats: Option<AirbyteStateStats>,
#[serde(rename = "sourceStats", skip_serializing_if = "Option::is_none")]
source_stats: Option<AirbyteStateStats>,
},
Legacy {
data: serde_json::Value,
#[serde(rename = "destinationStats", skip_serializing_if = "Option::is_none")]
destination_stats: Option<AirbyteStateStats>,
#[serde(rename = "sourceStats", skip_serializing_if = "Option::is_none")]
source_stats: Option<AirbyteStateStats>,
},
#[serde(untagged)]
Empty {
data: serde_json::Value,
#[serde(rename = "destinationStats", skip_serializing_if = "Option::is_none")]
destination_stats: Option<AirbyteStateStats>,
#[serde(rename = "sourceStats", skip_serializing_if = "Option::is_none")]
source_stats: Option<AirbyteStateStats>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteGlobalState {
#[serde(skip_serializing_if = "Option::is_none")]
pub shared_state: Option<serde_json::Value>,
pub stream_states: Vec<AirbyteStreamState>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteStreamState {
pub stream_descriptor: StreamDescriptor,
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_state: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct StreamDescriptor {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
}
impl StreamDescriptor {
pub fn new(name: &str, namespace: Option<&str>) -> Self {
Self {
name: name.to_owned(),
namespace: namespace.map(ToOwned::to_owned),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteStateStats {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "recordCount")]
pub record_count: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteCatalog {
pub streams: Vec<AirbyteStream>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteStream {
pub name: String,
pub json_schema: JsonSchema,
#[serde(skip_serializing_if = "Option::is_none")]
pub default_cursor_field: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub is_resumable: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub source_defined_cursor: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub source_defined_primary_key: Option<Vec<Vec<String>>>,
pub supported_sync_modes: Vec<SyncMode>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ConfiguredAirbyteCatalog {
pub streams: Vec<ConfiguredAirbyteStream>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ConfiguredAirbyteStream {
#[serde(skip_serializing_if = "Option::is_none")]
pub cursor_field: Option<Vec<String>>,
pub destination_sync_mode: DestinationSyncMode,
#[serde(skip_serializing_if = "Option::is_none")]
pub generation_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub minimum_generation_id: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub primary_key: Option<Vec<Vec<String>>>,
pub stream: AirbyteStream,
#[serde(skip_serializing_if = "Option::is_none")]
pub sync_id: Option<i64>,
pub sync_mode: SyncMode,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteConnectionStatus {
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
pub status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "type")]
pub enum AirbyteControlMessage {
ConnectorConfig {
connector_config: AirbyteControlConnectorConfigMessage,
emitted_at: f64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteControlConnectorConfigMessage {
pub config: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ConnectorSpecification {
#[serde(skip_serializing_if = "Option::is_none")]
pub advanced_auth: Option<ConnectorSpecificationAdvancedAuth>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "changelogUrl")]
pub changelog_url: Option<String>,
#[serde(rename = "connectionSpecification")]
pub connection_specification: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "documentationUrl")]
pub documentation_url: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub protocol_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub supported_destination_sync_modes: Option<Vec<DestinationSyncMode>>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "supportsDBT")]
pub supports_dbt: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "supportsIncremental")]
pub supports_incremental: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "supportsNormalization")]
pub supports_normalization: Option<bool>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ConnectorSpecificationAdvancedAuth {
#[serde(skip_serializing_if = "Option::is_none")]
pub auth_flow_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub oauth_config_specification: Option<OauthConfigSpecification>,
#[serde(skip_serializing_if = "Option::is_none")]
pub predicate_key: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub predicate_value: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename = "OAuthConfigSpecification")]
pub struct OauthConfigSpecification {
#[serde(skip_serializing_if = "Option::is_none")]
pub complete_oauth_output_specification: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub complete_oauth_server_input_specification: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub complete_oauth_server_output_specification: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub oauth_user_input_from_connector_config_specification: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum DestinationSyncMode {
#[serde(rename = "append")]
Append,
#[serde(rename = "overwrite")]
Overwrite,
#[serde(rename = "append_dedup")]
AppendDedup,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteLogMessage {
pub level: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub stack_trace: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub enum SyncMode {
#[default]
#[serde(rename = "full_refresh")]
FullRefresh,
#[serde(rename = "incremental")]
Incremental,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE", tag = "type")]
pub enum AirbyteTraceMessage {
Analytics {
analytics: AirbyteAnalyticsTraceMessage,
emited_at: f64,
},
Error {
error: AirbyteErrorTraceMessage,
emitted_at: f64,
},
Estimate {
estimate: AirbyteEstimateTraceMessage,
emitted_at: f64,
},
StreamStatus {
stream_status: AirbyteStreamStatusTraceMessage,
emitted_at: f64,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteStreamStatusTraceMessage {
#[serde(skip_serializing_if = "Option::is_none")]
pub reasons: Option<Vec<AirbyteStreamStatusReason>>,
pub status: AirbyteStreamStatus,
pub stream_descriptor: StreamDescriptor,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "UPPERCASE")]
pub enum AirbyteStreamStatus {
Started,
Running,
Complete,
Incomplete,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteStreamStatusReason {
#[serde(skip_serializing_if = "Option::is_none")]
pub rate_limited: Option<AirbyteStreamStatusRateLimitedReason>,
#[serde(rename = "type")]
pub r#type: AirbyteStreamStatusReasonType,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteStreamStatusRateLimitedReason {
#[serde(skip_serializing_if = "Option::is_none")]
pub quota_reset: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum AirbyteStreamStatusReasonType {
#[serde(rename = "RATE_LIMITED")]
RateLimited,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteEstimateTraceMessage {
#[serde(skip_serializing_if = "Option::is_none")]
pub byte_estimate: Option<i64>,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub row_estimate: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteAnalyticsTraceMessage {
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct AirbyteErrorTraceMessage {
#[serde(skip_serializing_if = "Option::is_none")]
pub failure_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub internal_message: Option<String>,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub stack_trace: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stream_descriptor: Option<StreamDescriptor>,
}
#[cfg(test)]
pub mod tests {
use serde_json::json;
use crate::message::{AirbyteMessage, AirbyteRecordMessage};
#[test]
fn test_record() {
let input = r#"{"type": "RECORD", "record": { "stream": "users", "data": {"id": 1, "name": "Chris"}, "emitted_at": 1}}"#;
let record: AirbyteMessage = serde_json::from_str(input).unwrap();
let expected = AirbyteMessage::Record {
record: AirbyteRecordMessage {
stream: "users".to_string(),
data: json!({"id": 1, "name": "Chris"}),
emitted_at: 1,
meta: None,
namespace: None,
},
};
assert_eq!(record, expected);
}
}