1use flate2::read::GzDecoder;
2use serde::{Deserialize, Serialize};
3use std::io::Read;
4
5const GZIP_MAGIC: [u8; 2] = [0x1f, 0x8b];
6
7fn is_gzip(data: &[u8]) -> bool {
8 data.len() >= 2 && data[0] == GZIP_MAGIC[0] && data[1] == GZIP_MAGIC[1]
9}
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
12#[serde(rename_all = "lowercase")]
13pub enum Mode {
14 State,
15 Append,
16 List,
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "lowercase")]
21pub enum SortOrder {
22 Asc,
23 Desc,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub struct SortConfig {
28 pub field: Vec<String>,
29 pub order: SortOrder,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct SubscribedFrame {
34 pub op: String,
35 pub view: String,
36 pub mode: Mode,
37 #[serde(default)]
38 pub sort: Option<SortConfig>,
39}
40
41impl SubscribedFrame {
42 pub fn is_subscribed_frame(op: &str) -> bool {
43 op == "subscribed"
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum Operation {
49 Upsert,
50 Patch,
51 Delete,
52 Create,
53 Snapshot,
54 Subscribed,
55}
56
57impl std::str::FromStr for Operation {
58 type Err = std::convert::Infallible;
59
60 fn from_str(s: &str) -> Result<Self, Self::Err> {
61 Ok(match s {
62 "upsert" => Operation::Upsert,
63 "patch" => Operation::Patch,
64 "delete" => Operation::Delete,
65 "create" => Operation::Create,
66 "snapshot" => Operation::Snapshot,
67 "subscribed" => Operation::Subscribed,
68 _ => Operation::Upsert,
69 })
70 }
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct Frame {
75 pub mode: Mode,
76 #[serde(rename = "entity")]
77 pub entity: String,
78 pub op: String,
79 #[serde(default)]
80 pub key: String,
81 pub data: serde_json::Value,
82 #[serde(default)]
83 pub append: Vec<String>,
84}
85
86impl Frame {
87 pub fn entity_name(&self) -> &str {
88 &self.entity
89 }
90
91 pub fn operation(&self) -> Operation {
92 self.op.parse().unwrap()
93 }
94
95 pub fn is_snapshot(&self) -> bool {
96 self.op == "snapshot"
97 }
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct SnapshotEntity {
102 pub key: String,
103 pub data: serde_json::Value,
104}
105
106fn decompress_gzip(data: &[u8]) -> Result<String, Box<dyn std::error::Error>> {
107 let mut decoder = GzDecoder::new(data);
108 let mut decompressed = String::new();
109 decoder.read_to_string(&mut decompressed)?;
110 Ok(decompressed)
111}
112
113pub fn parse_frame(bytes: &[u8]) -> Result<Frame, serde_json::Error> {
114 if is_gzip(bytes) {
115 if let Ok(decompressed) = decompress_gzip(bytes) {
116 return serde_json::from_str(&decompressed);
117 }
118 }
119
120 let text = String::from_utf8_lossy(bytes);
121 serde_json::from_str(&text)
122}
123
124pub fn parse_snapshot_entities(data: &serde_json::Value) -> Vec<SnapshotEntity> {
125 match data {
126 serde_json::Value::Array(arr) => arr
127 .iter()
128 .filter_map(|v| serde_json::from_value(v.clone()).ok())
129 .collect(),
130 _ => Vec::new(),
131 }
132}
133
134#[allow(dead_code)]
135pub fn parse_subscribed_frame(bytes: &[u8]) -> Result<SubscribedFrame, serde_json::Error> {
136 if is_gzip(bytes) {
137 if let Ok(decompressed) = decompress_gzip(bytes) {
138 return serde_json::from_str(&decompressed);
139 }
140 }
141
142 let text = String::from_utf8_lossy(bytes);
143 serde_json::from_str(&text)
144}
145
146#[allow(dead_code)]
147pub fn try_parse_subscribed_frame(bytes: &[u8]) -> Option<SubscribedFrame> {
148 let frame: serde_json::Value = if is_gzip(bytes) {
149 decompress_gzip(bytes)
150 .ok()
151 .and_then(|s| serde_json::from_str(&s).ok())?
152 } else {
153 serde_json::from_slice(bytes).ok()?
154 };
155
156 if frame.get("op")?.as_str()? == "subscribed" {
157 serde_json::from_value(frame).ok()
158 } else {
159 None
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use flate2::{write::GzEncoder, Compression};
167 use std::io::Write;
168
169 #[test]
170 fn test_parse_uncompressed_frame() {
171 let frame_json = r#"{"mode":"list","entity":"test/list","op":"snapshot","key":"","data":[{"key":"1","data":{"id":1}}]}"#;
172 let frame = parse_frame(frame_json.as_bytes()).unwrap();
173 assert_eq!(frame.op, "snapshot");
174 assert_eq!(frame.entity, "test/list");
175 }
176
177 #[test]
178 fn test_parse_raw_gzip_frame() {
179 let original = r#"{"mode":"list","entity":"test/list","op":"snapshot","key":"","data":[{"key":"1","data":{"id":1}}]}"#;
180
181 let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
182 encoder.write_all(original.as_bytes()).unwrap();
183 let compressed = encoder.finish().unwrap();
184
185 assert!(is_gzip(&compressed));
186
187 let frame = parse_frame(&compressed).unwrap();
188 assert_eq!(frame.op, "snapshot");
189 assert_eq!(frame.entity, "test/list");
190 }
191
192 #[test]
193 fn test_gzip_magic_detection() {
194 assert!(is_gzip(&[0x1f, 0x8b, 0x08]));
195 assert!(!is_gzip(&[0x7b, 0x22]));
196 assert!(!is_gzip(&[0x1f]));
197 assert!(!is_gzip(&[]));
198 }
199}