Skip to main content

hyperstack_sdk/
frame.rs

1use flate2::read::GzDecoder;
2use serde::{Deserialize, Serialize};
3use std::io::Read;
4
5const GZIP_MAGIC: [u8; 2] = [0x1f, 0x8b];
6
7fn is_gzip(data: &[u8]) -> bool {
8    data.len() >= 2 && data[0] == GZIP_MAGIC[0] && data[1] == GZIP_MAGIC[1]
9}
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
12#[serde(rename_all = "lowercase")]
13pub enum Mode {
14    State,
15    Append,
16    List,
17}
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
20#[serde(rename_all = "lowercase")]
21pub enum SortOrder {
22    Asc,
23    Desc,
24}
25
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub struct SortConfig {
28    pub field: Vec<String>,
29    pub order: SortOrder,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct SubscribedFrame {
34    pub op: String,
35    pub view: String,
36    pub mode: Mode,
37    #[serde(default)]
38    pub sort: Option<SortConfig>,
39}
40
41impl SubscribedFrame {
42    pub fn is_subscribed_frame(op: &str) -> bool {
43        op == "subscribed"
44    }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum Operation {
49    Upsert,
50    Patch,
51    Delete,
52    Create,
53    Snapshot,
54    Subscribed,
55}
56
57impl std::str::FromStr for Operation {
58    type Err = std::convert::Infallible;
59
60    fn from_str(s: &str) -> Result<Self, Self::Err> {
61        Ok(match s {
62            "upsert" => Operation::Upsert,
63            "patch" => Operation::Patch,
64            "delete" => Operation::Delete,
65            "create" => Operation::Create,
66            "snapshot" => Operation::Snapshot,
67            "subscribed" => Operation::Subscribed,
68            _ => Operation::Upsert,
69        })
70    }
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct Frame {
75    pub mode: Mode,
76    #[serde(rename = "entity")]
77    pub entity: String,
78    pub op: String,
79    #[serde(default)]
80    pub key: String,
81    pub data: serde_json::Value,
82    #[serde(default)]
83    pub append: Vec<String>,
84    /// Sequence cursor for ordering and resume capability
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub seq: Option<String>,
87}
88
89impl Frame {
90    pub fn entity_name(&self) -> &str {
91        &self.entity
92    }
93
94    pub fn operation(&self) -> Operation {
95        self.op.parse().unwrap()
96    }
97
98    pub fn is_snapshot(&self) -> bool {
99        self.op == "snapshot"
100    }
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct SnapshotEntity {
105    pub key: String,
106    pub data: serde_json::Value,
107}
108
109fn decompress_gzip(data: &[u8]) -> Result<String, Box<dyn std::error::Error>> {
110    let mut decoder = GzDecoder::new(data);
111    let mut decompressed = String::new();
112    decoder.read_to_string(&mut decompressed)?;
113    Ok(decompressed)
114}
115
116pub fn parse_frame(bytes: &[u8]) -> Result<Frame, serde_json::Error> {
117    if is_gzip(bytes) {
118        if let Ok(decompressed) = decompress_gzip(bytes) {
119            return serde_json::from_str(&decompressed);
120        }
121    }
122
123    let text = String::from_utf8_lossy(bytes);
124    serde_json::from_str(&text)
125}
126
127pub fn parse_snapshot_entities(data: &serde_json::Value) -> Vec<SnapshotEntity> {
128    match data {
129        serde_json::Value::Array(arr) => arr
130            .iter()
131            .filter_map(|v| serde_json::from_value(v.clone()).ok())
132            .collect(),
133        _ => Vec::new(),
134    }
135}
136
137#[allow(dead_code)]
138pub fn parse_subscribed_frame(bytes: &[u8]) -> Result<SubscribedFrame, serde_json::Error> {
139    if is_gzip(bytes) {
140        if let Ok(decompressed) = decompress_gzip(bytes) {
141            return serde_json::from_str(&decompressed);
142        }
143    }
144
145    let text = String::from_utf8_lossy(bytes);
146    serde_json::from_str(&text)
147}
148
149#[allow(dead_code)]
150pub fn try_parse_subscribed_frame(bytes: &[u8]) -> Option<SubscribedFrame> {
151    let frame: serde_json::Value = if is_gzip(bytes) {
152        decompress_gzip(bytes)
153            .ok()
154            .and_then(|s| serde_json::from_str(&s).ok())?
155    } else {
156        serde_json::from_slice(bytes).ok()?
157    };
158
159    if frame.get("op")?.as_str()? == "subscribed" {
160        serde_json::from_value(frame).ok()
161    } else {
162        None
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use flate2::{write::GzEncoder, Compression};
170    use std::io::Write;
171
172    #[test]
173    fn test_parse_uncompressed_frame() {
174        let frame_json = r#"{"mode":"list","entity":"test/list","op":"snapshot","key":"","data":[{"key":"1","data":{"id":1}}]}"#;
175        let frame = parse_frame(frame_json.as_bytes()).unwrap();
176        assert_eq!(frame.op, "snapshot");
177        assert_eq!(frame.entity, "test/list");
178    }
179
180    #[test]
181    fn test_parse_raw_gzip_frame() {
182        let original = r#"{"mode":"list","entity":"test/list","op":"snapshot","key":"","data":[{"key":"1","data":{"id":1}}]}"#;
183
184        let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
185        encoder.write_all(original.as_bytes()).unwrap();
186        let compressed = encoder.finish().unwrap();
187
188        assert!(is_gzip(&compressed));
189
190        let frame = parse_frame(&compressed).unwrap();
191        assert_eq!(frame.op, "snapshot");
192        assert_eq!(frame.entity, "test/list");
193    }
194
195    #[test]
196    fn test_gzip_magic_detection() {
197        assert!(is_gzip(&[0x1f, 0x8b, 0x08]));
198        assert!(!is_gzip(&[0x7b, 0x22]));
199        assert!(!is_gzip(&[0x1f]));
200        assert!(!is_gzip(&[]));
201    }
202}