hyperstack_sdk/
client.rs

1use crate::mutation::{Frame, Mode};
2use crate::state::EntityStore;
3use anyhow::Result;
4use futures_util::{SinkExt, StreamExt};
5use serde::{de::DeserializeOwned, Deserialize, Serialize};
6use tokio_tungstenite::{connect_async, tungstenite::Message};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct Subscription {
10    pub view: String,
11    #[serde(skip_serializing_if = "Option::is_none")]
12    pub key: Option<String>,
13    #[serde(skip_serializing_if = "Option::is_none")]
14    pub partition: Option<String>,
15}
16
17pub struct HyperStackClient<T> {
18    url: String,
19    view: String,
20    key: Option<String>,
21    _phantom: std::marker::PhantomData<T>,
22}
23
24impl<T> HyperStackClient<T>
25where
26    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
27{
28    pub fn new(url: impl Into<String>, view: impl Into<String>) -> Self {
29        Self {
30            url: url.into(),
31            view: view.into(),
32            key: None,
33            _phantom: std::marker::PhantomData,
34        }
35    }
36
37    pub fn with_key(mut self, key: impl Into<String>) -> Self {
38        self.key = Some(key.into());
39        self
40    }
41
42    pub async fn connect(self) -> Result<EntityStore<T>> {
43        let (ws, _) = connect_async(&self.url).await?;
44        let (mut tx, mut rx) = ws.split();
45
46        let subscription = Subscription {
47            view: self.view.clone(),
48            key: self.key,
49            partition: None,
50        };
51
52        tx.send(Message::Text(serde_json::to_string(&subscription)?))
53            .await?;
54
55        let mode = infer_mode_from_view(&self.view);
56        let store = EntityStore::new(mode);
57        let store_ref = store.clone();
58
59        let mut tx_clone = tx;
60        tokio::spawn(async move {
61            while let Some(Ok(msg)) = rx.next().await {
62                match msg {
63                    Message::Binary(bytes) => {
64                        let text = String::from_utf8_lossy(&bytes);
65                        if let Ok(frame) = serde_json::from_str::<Frame>(&text) {
66                            match frame.op.as_str() {
67                                "patch" => {
68                                    store_ref.apply_patch(frame.key, frame.data).await;
69                                }
70                                "upsert" => {
71                                    store_ref.apply_upsert(frame.key, frame.data).await;
72                                }
73                                "delete" => {
74                                    store_ref.apply_delete(frame.key).await;
75                                }
76                                _ => {
77                                    store_ref.apply_upsert(frame.key, frame.data).await;
78                                }
79                            }
80                        }
81                    }
82                    Message::Ping(payload) => {
83                        let _ = tx_clone.send(Message::Pong(payload)).await;
84                    }
85                    Message::Close(_) => break,
86                    _ => {}
87                }
88            }
89        });
90
91        Ok(store)
92    }
93}
94
95fn infer_mode_from_view(view: &str) -> Mode {
96    if view.ends_with("/state") {
97        Mode::State
98    } else if view.ends_with("/list") {
99        Mode::List
100    } else if view.ends_with("/append") {
101        Mode::Append
102    } else {
103        Mode::Kv
104    }
105}