singer 0.3.0

Rust Singer SDK
Documentation
use serde::{Deserialize, Serialize};
use serde_json::Value;

use crate::schema::JsonSchema;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
pub enum Message {
    Schema(Schema),
    Record(Record),
    State(State),
    ActivateVersion(ActivateVersion),
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Record {
    pub stream: String,
    pub record: Value,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub time_extracted: Option<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub version: Option<i64>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Schema {
    pub stream: String,
    pub schema: JsonSchema,
    pub key_properties: Vec<String>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub bookmark_properties: Option<Vec<String>>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct State {
    pub value: Value,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ActivateVersion {
    pub stream: String,
    pub version: i64,
}

#[cfg(test)]
pub mod tests {
    use serde_json::json;

    use crate::messages::{ActivateVersion, Message, Record, Schema, State};

    #[test]
    fn test_record() {
        let input =
            r#"{"type": "RECORD", "stream": "users", "record": {"id": 1, "name": "Chris"}}"#;

        let record: Message = serde_json::from_str(input).unwrap();

        let expected = Message::Record(Record {
            stream: "users".to_string(),
            record: json!({"id": 1, "name": "Chris"}),
            time_extracted: None,
        });
        assert_eq!(record, expected);
    }

    #[test]
    fn test_schema() {
        let input = r#"{"type": "SCHEMA", "stream": "users", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}"#;

        let schema: Message = serde_json::from_str(input).unwrap();

        let expected = Message::Schema(Schema {
            stream: "users".to_string(),
            key_properties: vec!["id".to_string()],
            schema: serde_json::from_value(
                json!({"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}),
            ).unwrap(),
            bookmark_properties: None,
        });
        assert_eq!(schema, expected);
    }

    #[test]
    fn test_state() {
        let input = r#"{"type": "STATE", "value": {"users": 2, "locations": 1}}"#;

        let schema: Message = serde_json::from_str(input).unwrap();

        let expected = Message::State(State {
            value: json!({"users": 2, "locations": 1}),
        });
        assert_eq!(schema, expected);
    }

    #[test]
    fn test_activate_version() {
        let input = r#"{"type": "ACTIVATE_VERSION", "stream": "users", "version": 1695106400957}"#;

        let schema: Message = serde_json::from_str(input).unwrap();

        let expected = Message::ActivateVersion(ActivateVersion {
            stream: "users".to_string(),
            version: 1695106400957,
        });
        assert_eq!(schema, expected);
    }
}