singer/
messages.rs

1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3
4use crate::schema::JsonSchema;
5
6#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
7#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
8pub enum Message {
9    Schema(Schema),
10    Record(Record),
11    State(State),
12    ActivateVersion(ActivateVersion),
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
16pub struct Record {
17    pub stream: String,
18    pub record: Value,
19    #[serde(skip_serializing_if = "Option::is_none")]
20    pub time_extracted: Option<String>,
21    #[serde(skip_serializing_if = "Option::is_none")]
22    pub version: Option<i64>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub struct Schema {
27    pub stream: String,
28    pub schema: JsonSchema,
29    pub key_properties: Vec<String>,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub bookmark_properties: Option<Vec<String>>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35pub struct State {
36    pub value: Value,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
40pub struct ActivateVersion {
41    pub stream: String,
42    pub version: i64,
43}
44
45#[cfg(test)]
46pub mod tests {
47    use serde_json::json;
48
49    use crate::messages::{ActivateVersion, Message, Record, Schema, State};
50
51    #[test]
52    fn test_record() {
53        let input =
54            r#"{"type": "RECORD", "stream": "users", "record": {"id": 1, "name": "Chris"}}"#;
55
56        let record: Message = serde_json::from_str(input).unwrap();
57
58        let expected = Message::Record(Record {
59            stream: "users".to_string(),
60            record: json!({"id": 1, "name": "Chris"}),
61            time_extracted: None,
62        });
63        assert_eq!(record, expected);
64    }
65
66    #[test]
67    fn test_schema() {
68        let input = r#"{"type": "SCHEMA", "stream": "users", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}"#;
69
70        let schema: Message = serde_json::from_str(input).unwrap();
71
72        let expected = Message::Schema(Schema {
73            stream: "users".to_string(),
74            key_properties: vec!["id".to_string()],
75            schema: serde_json::from_value(
76                json!({"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}),
77            ).unwrap(),
78            bookmark_properties: None,
79        });
80        assert_eq!(schema, expected);
81    }
82
83    #[test]
84    fn test_state() {
85        let input = r#"{"type": "STATE", "value": {"users": 2, "locations": 1}}"#;
86
87        let schema: Message = serde_json::from_str(input).unwrap();
88
89        let expected = Message::State(State {
90            value: json!({"users": 2, "locations": 1}),
91        });
92        assert_eq!(schema, expected);
93    }
94
95    #[test]
96    fn test_activate_version() {
97        let input = r#"{"type": "ACTIVATE_VERSION", "stream": "users", "version": 1695106400957}"#;
98
99        let schema: Message = serde_json::from_str(input).unwrap();
100
101        let expected = Message::ActivateVersion(ActivateVersion {
102            stream: "users".to_string(),
103            version: 1695106400957,
104        });
105        assert_eq!(schema, expected);
106    }
107}