1use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
2use flate2::read::GzDecoder;
3use serde::{Deserialize, Serialize};
4use std::io::Read;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
7#[serde(rename_all = "lowercase")]
8pub enum Mode {
9 State,
10 Append,
11 List,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum Operation {
16 Upsert,
17 Patch,
18 Delete,
19 Create,
20 Snapshot,
21}
22
23impl std::str::FromStr for Operation {
24 type Err = std::convert::Infallible;
25
26 fn from_str(s: &str) -> Result<Self, Self::Err> {
27 Ok(match s {
28 "upsert" => Operation::Upsert,
29 "patch" => Operation::Patch,
30 "delete" => Operation::Delete,
31 "create" => Operation::Create,
32 "snapshot" => Operation::Snapshot,
33 _ => Operation::Upsert,
34 })
35 }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct Frame {
40 pub mode: Mode,
41 #[serde(rename = "entity")]
42 pub entity: String,
43 pub op: String,
44 #[serde(default)]
45 pub key: String,
46 pub data: serde_json::Value,
47 #[serde(default)]
48 pub append: Vec<String>,
49}
50
51impl Frame {
52 pub fn entity_name(&self) -> &str {
53 &self.entity
54 }
55
56 pub fn operation(&self) -> Operation {
57 self.op.parse().unwrap()
58 }
59
60 pub fn is_snapshot(&self) -> bool {
61 self.op == "snapshot"
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct SnapshotEntity {
67 pub key: String,
68 pub data: serde_json::Value,
69}
70
71#[derive(Deserialize)]
72struct CompressedFrame {
73 compressed: String,
74 data: String,
75}
76
77fn decompress_gzip(base64_data: &str) -> Result<String, Box<dyn std::error::Error>> {
78 let compressed = BASE64.decode(base64_data)?;
79 let mut decoder = GzDecoder::new(&compressed[..]);
80 let mut decompressed = String::new();
81 decoder.read_to_string(&mut decompressed)?;
82 Ok(decompressed)
83}
84
85fn parse_and_decompress(text: &str) -> Result<Frame, serde_json::Error> {
86 if let Ok(compressed) = serde_json::from_str::<CompressedFrame>(text) {
87 if compressed.compressed == "gzip" {
88 if let Ok(decompressed) = decompress_gzip(&compressed.data) {
89 return serde_json::from_str(&decompressed);
90 }
91 }
92 }
93 serde_json::from_str(text)
94}
95
96pub fn parse_frame(bytes: &[u8]) -> Result<Frame, serde_json::Error> {
97 let text = String::from_utf8_lossy(bytes);
98 parse_and_decompress(&text)
99}
100
101pub fn parse_snapshot_entities(data: &serde_json::Value) -> Vec<SnapshotEntity> {
102 match data {
103 serde_json::Value::Array(arr) => arr
104 .iter()
105 .filter_map(|v| serde_json::from_value(v.clone()).ok())
106 .collect(),
107 _ => Vec::new(),
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use flate2::{write::GzEncoder, Compression};
115 use std::io::Write;
116
117 #[test]
118 fn test_parse_uncompressed_frame() {
119 let frame_json = r#"{"mode":"list","entity":"test/list","op":"snapshot","key":"","data":[{"key":"1","data":{"id":1}}]}"#;
120 let frame = parse_frame(frame_json.as_bytes()).unwrap();
121 assert_eq!(frame.op, "snapshot");
122 assert_eq!(frame.entity, "test/list");
123 }
124
125 #[test]
126 fn test_parse_compressed_frame() {
127 let original = r#"{"mode":"list","entity":"test/list","op":"snapshot","key":"","data":[{"key":"1","data":{"id":1}}]}"#;
128
129 let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
130 encoder.write_all(original.as_bytes()).unwrap();
131 let compressed = encoder.finish().unwrap();
132 let base64_data = BASE64.encode(&compressed);
133
134 let compressed_frame = format!(r#"{{"compressed":"gzip","data":"{}"}}"#, base64_data);
135
136 let frame = parse_frame(compressed_frame.as_bytes()).unwrap();
137 assert_eq!(frame.op, "snapshot");
138 assert_eq!(frame.entity, "test/list");
139 }
140}