hyperstack_sdk/
frame.rs

1use base64::{engine::general_purpose::STANDARD as BASE64, Engine};
2use flate2::read::GzDecoder;
3use serde::{Deserialize, Serialize};
4use std::io::Read;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
7#[serde(rename_all = "lowercase")]
8pub enum Mode {
9    State,
10    Append,
11    List,
12}
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum Operation {
16    Upsert,
17    Patch,
18    Delete,
19    Create,
20    Snapshot,
21}
22
23impl std::str::FromStr for Operation {
24    type Err = std::convert::Infallible;
25
26    fn from_str(s: &str) -> Result<Self, Self::Err> {
27        Ok(match s {
28            "upsert" => Operation::Upsert,
29            "patch" => Operation::Patch,
30            "delete" => Operation::Delete,
31            "create" => Operation::Create,
32            "snapshot" => Operation::Snapshot,
33            _ => Operation::Upsert,
34        })
35    }
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct Frame {
40    pub mode: Mode,
41    #[serde(rename = "entity")]
42    pub entity: String,
43    pub op: String,
44    #[serde(default)]
45    pub key: String,
46    pub data: serde_json::Value,
47    #[serde(default)]
48    pub append: Vec<String>,
49}
50
51impl Frame {
52    pub fn entity_name(&self) -> &str {
53        &self.entity
54    }
55
56    pub fn operation(&self) -> Operation {
57        self.op.parse().unwrap()
58    }
59
60    pub fn is_snapshot(&self) -> bool {
61        self.op == "snapshot"
62    }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct SnapshotEntity {
67    pub key: String,
68    pub data: serde_json::Value,
69}
70
71#[derive(Deserialize)]
72struct CompressedFrame {
73    compressed: String,
74    data: String,
75}
76
77fn decompress_gzip(base64_data: &str) -> Result<String, Box<dyn std::error::Error>> {
78    let compressed = BASE64.decode(base64_data)?;
79    let mut decoder = GzDecoder::new(&compressed[..]);
80    let mut decompressed = String::new();
81    decoder.read_to_string(&mut decompressed)?;
82    Ok(decompressed)
83}
84
85fn parse_and_decompress(text: &str) -> Result<Frame, serde_json::Error> {
86    if let Ok(compressed) = serde_json::from_str::<CompressedFrame>(text) {
87        if compressed.compressed == "gzip" {
88            if let Ok(decompressed) = decompress_gzip(&compressed.data) {
89                return serde_json::from_str(&decompressed);
90            }
91        }
92    }
93    serde_json::from_str(text)
94}
95
96pub fn parse_frame(bytes: &[u8]) -> Result<Frame, serde_json::Error> {
97    let text = String::from_utf8_lossy(bytes);
98    parse_and_decompress(&text)
99}
100
101pub fn parse_snapshot_entities(data: &serde_json::Value) -> Vec<SnapshotEntity> {
102    match data {
103        serde_json::Value::Array(arr) => arr
104            .iter()
105            .filter_map(|v| serde_json::from_value(v.clone()).ok())
106            .collect(),
107        _ => Vec::new(),
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use flate2::{write::GzEncoder, Compression};
115    use std::io::Write;
116
117    #[test]
118    fn test_parse_uncompressed_frame() {
119        let frame_json = r#"{"mode":"list","entity":"test/list","op":"snapshot","key":"","data":[{"key":"1","data":{"id":1}}]}"#;
120        let frame = parse_frame(frame_json.as_bytes()).unwrap();
121        assert_eq!(frame.op, "snapshot");
122        assert_eq!(frame.entity, "test/list");
123    }
124
125    #[test]
126    fn test_parse_compressed_frame() {
127        let original = r#"{"mode":"list","entity":"test/list","op":"snapshot","key":"","data":[{"key":"1","data":{"id":1}}]}"#;
128
129        let mut encoder = GzEncoder::new(Vec::new(), Compression::fast());
130        encoder.write_all(original.as_bytes()).unwrap();
131        let compressed = encoder.finish().unwrap();
132        let base64_data = BASE64.encode(&compressed);
133
134        let compressed_frame = format!(r#"{{"compressed":"gzip","data":"{}"}}"#, base64_data);
135
136        let frame = parse_frame(compressed_frame.as_bytes()).unwrap();
137        assert_eq!(frame.op, "snapshot");
138        assert_eq!(frame.entity, "test/list");
139    }
140}