1use crate::mutation::{Frame, Mode};
2use crate::state::EntityStore;
3use anyhow::Result;
4use futures_util::{SinkExt, StreamExt};
5use serde::{de::DeserializeOwned, Deserialize, Serialize};
6use tokio_tungstenite::{connect_async, tungstenite::Message};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct Subscription {
10 pub view: String,
11 #[serde(skip_serializing_if = "Option::is_none")]
12 pub key: Option<String>,
13 #[serde(skip_serializing_if = "Option::is_none")]
14 pub partition: Option<String>,
15}
16
17pub struct HyperStackClient<T> {
18 url: String,
19 view: String,
20 key: Option<String>,
21 _phantom: std::marker::PhantomData<T>,
22}
23
24impl<T> HyperStackClient<T>
25where
26 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
27{
28 pub fn new(url: impl Into<String>, view: impl Into<String>) -> Self {
29 Self {
30 url: url.into(),
31 view: view.into(),
32 key: None,
33 _phantom: std::marker::PhantomData,
34 }
35 }
36
37 pub fn with_key(mut self, key: impl Into<String>) -> Self {
38 self.key = Some(key.into());
39 self
40 }
41
42 pub async fn connect(self) -> Result<EntityStore<T>> {
43 let (ws, _) = connect_async(&self.url).await?;
44 let (mut tx, mut rx) = ws.split();
45
46 let subscription = Subscription {
47 view: self.view.clone(),
48 key: self.key,
49 partition: None,
50 };
51
52 tx.send(Message::Text(serde_json::to_string(&subscription)?)).await?;
53
54 let mode = infer_mode_from_view(&self.view);
55 let store = EntityStore::new(mode);
56 let store_ref = store.clone();
57
58 let mut tx_clone = tx;
59 tokio::spawn(async move {
60 while let Some(Ok(msg)) = rx.next().await {
61 match msg {
62 Message::Binary(bytes) => {
63 let text = String::from_utf8_lossy(&bytes);
64 if let Ok(frame) = serde_json::from_str::<Frame>(&text) {
65 match frame.op.as_str() {
66 "patch" => {
67 store_ref.apply_patch(frame.key, frame.data).await;
68 }
69 "upsert" => {
70 store_ref.apply_upsert(frame.key, frame.data).await;
71 }
72 "delete" => {
73 store_ref.apply_delete(frame.key).await;
74 }
75 _ => {
76 store_ref.apply_upsert(frame.key, frame.data).await;
77 }
78 }
79 }
80 }
81 Message::Ping(payload) => {
82 let _ = tx_clone.send(Message::Pong(payload)).await;
83 }
84 Message::Close(_) => break,
85 _ => {}
86 }
87 }
88 });
89
90 Ok(store)
91 }
92}
93
94fn infer_mode_from_view(view: &str) -> Mode {
95 if view.ends_with("/state") {
96 Mode::State
97 } else if view.ends_with("/list") {
98 Mode::List
99 } else if view.ends_with("/append") {
100 Mode::Append
101 } else {
102 Mode::Kv
103 }
104}