1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3
4use crate::schema::JsonSchema;
5
6#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
7#[serde(tag = "type", rename_all = "SCREAMING_SNAKE_CASE")]
8pub enum Message {
9 Schema(Schema),
10 Record(Record),
11 State(State),
12 ActivateVersion(ActivateVersion),
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
16pub struct Record {
17 pub stream: String,
18 pub record: Value,
19 #[serde(skip_serializing_if = "Option::is_none")]
20 pub time_extracted: Option<String>,
21 #[serde(skip_serializing_if = "Option::is_none")]
22 pub version: Option<i64>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
26pub struct Schema {
27 pub stream: String,
28 pub schema: JsonSchema,
29 pub key_properties: Vec<String>,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub bookmark_properties: Option<Vec<String>>,
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35pub struct State {
36 pub value: Value,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
40pub struct ActivateVersion {
41 pub stream: String,
42 pub version: i64,
43}
44
45#[cfg(test)]
46pub mod tests {
47 use serde_json::json;
48
49 use crate::messages::{ActivateVersion, Message, Record, Schema, State};
50
51 #[test]
52 fn test_record() {
53 let input =
54 r#"{"type": "RECORD", "stream": "users", "record": {"id": 1, "name": "Chris"}}"#;
55
56 let record: Message = serde_json::from_str(input).unwrap();
57
58 let expected = Message::Record(Record {
59 stream: "users".to_string(),
60 record: json!({"id": 1, "name": "Chris"}),
61 time_extracted: None,
62 });
63 assert_eq!(record, expected);
64 }
65
66 #[test]
67 fn test_schema() {
68 let input = r#"{"type": "SCHEMA", "stream": "users", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}"#;
69
70 let schema: Message = serde_json::from_str(input).unwrap();
71
72 let expected = Message::Schema(Schema {
73 stream: "users".to_string(),
74 key_properties: vec!["id".to_string()],
75 schema: serde_json::from_value(
76 json!({"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}),
77 ).unwrap(),
78 bookmark_properties: None,
79 });
80 assert_eq!(schema, expected);
81 }
82
83 #[test]
84 fn test_state() {
85 let input = r#"{"type": "STATE", "value": {"users": 2, "locations": 1}}"#;
86
87 let schema: Message = serde_json::from_str(input).unwrap();
88
89 let expected = Message::State(State {
90 value: json!({"users": 2, "locations": 1}),
91 });
92 assert_eq!(schema, expected);
93 }
94
95 #[test]
96 fn test_activate_version() {
97 let input = r#"{"type": "ACTIVATE_VERSION", "stream": "users", "version": 1695106400957}"#;
98
99 let schema: Message = serde_json::from_str(input).unwrap();
100
101 let expected = Message::ActivateVersion(ActivateVersion {
102 stream: "users".to_string(),
103 version: 1695106400957,
104 });
105 assert_eq!(schema, expected);
106 }
107}