hyperstack_sdk/
client.rs

1use crate::mutation::{Frame, Mode};
2use crate::state::EntityStore;
3use anyhow::Result;
4use futures_util::{SinkExt, StreamExt};
5use serde::{de::DeserializeOwned, Deserialize, Serialize};
6use tokio_tungstenite::{connect_async, tungstenite::Message};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct Subscription {
10    pub view: String,
11    #[serde(skip_serializing_if = "Option::is_none")]
12    pub key: Option<String>,
13    #[serde(skip_serializing_if = "Option::is_none")]
14    pub partition: Option<String>,
15}
16
17pub struct HyperStackClient<T> {
18    url: String,
19    view: String,
20    key: Option<String>,
21    _phantom: std::marker::PhantomData<T>,
22}
23
24impl<T> HyperStackClient<T>
25where
26    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
27{
28    pub fn new(url: impl Into<String>, view: impl Into<String>) -> Self {
29        Self {
30            url: url.into(),
31            view: view.into(),
32            key: None,
33            _phantom: std::marker::PhantomData,
34        }
35    }
36
37    pub fn with_key(mut self, key: impl Into<String>) -> Self {
38        self.key = Some(key.into());
39        self
40    }
41
42    pub async fn connect(self) -> Result<EntityStore<T>> {
43        let (ws, _) = connect_async(&self.url).await?;
44        let (mut tx, mut rx) = ws.split();
45
46        let subscription = Subscription {
47            view: self.view.clone(),
48            key: self.key,
49            partition: None,
50        };
51
52        tx.send(Message::Text(serde_json::to_string(&subscription)?)).await?;
53
54        let mode = infer_mode_from_view(&self.view);
55        let store = EntityStore::new(mode);
56        let store_ref = store.clone();
57
58        let mut tx_clone = tx;
59        tokio::spawn(async move {
60            while let Some(Ok(msg)) = rx.next().await {
61                match msg {
62                    Message::Binary(bytes) => {
63                        let text = String::from_utf8_lossy(&bytes);
64                        if let Ok(frame) = serde_json::from_str::<Frame>(&text) {
65                            match frame.op.as_str() {
66                                "patch" => {
67                                    store_ref.apply_patch(frame.key, frame.data).await;
68                                }
69                                "upsert" => {
70                                    store_ref.apply_upsert(frame.key, frame.data).await;
71                                }
72                                "delete" => {
73                                    store_ref.apply_delete(frame.key).await;
74                                }
75                                _ => {
76                                    store_ref.apply_upsert(frame.key, frame.data).await;
77                                }
78                            }
79                        }
80                    }
81                    Message::Ping(payload) => {
82                        let _ = tx_clone.send(Message::Pong(payload)).await;
83                    }
84                    Message::Close(_) => break,
85                    _ => {}
86                }
87            }
88        });
89
90        Ok(store)
91    }
92}
93
94fn infer_mode_from_view(view: &str) -> Mode {
95    if view.ends_with("/state") {
96        Mode::State
97    } else if view.ends_with("/list") {
98        Mode::List
99    } else if view.ends_with("/append") {
100        Mode::Append
101    } else {
102        Mode::Kv
103    }
104}