1use crate::mutation::{Frame, Mode};
2use crate::state::EntityStore;
3use anyhow::Result;
4use futures_util::{SinkExt, StreamExt};
5use serde::{de::DeserializeOwned, Deserialize, Serialize};
6use tokio_tungstenite::{connect_async, tungstenite::Message};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct Subscription {
10 pub view: String,
11 #[serde(skip_serializing_if = "Option::is_none")]
12 pub key: Option<String>,
13 #[serde(skip_serializing_if = "Option::is_none")]
14 pub partition: Option<String>,
15}
16
17pub struct HyperStackClient<T> {
18 url: String,
19 view: String,
20 key: Option<String>,
21 _phantom: std::marker::PhantomData<T>,
22}
23
24impl<T> HyperStackClient<T>
25where
26 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
27{
28 pub fn new(url: impl Into<String>, view: impl Into<String>) -> Self {
29 Self {
30 url: url.into(),
31 view: view.into(),
32 key: None,
33 _phantom: std::marker::PhantomData,
34 }
35 }
36
37 pub fn with_key(mut self, key: impl Into<String>) -> Self {
38 self.key = Some(key.into());
39 self
40 }
41
42 pub async fn connect(self) -> Result<EntityStore<T>> {
43 let (ws, _) = connect_async(&self.url).await?;
44 let (mut tx, mut rx) = ws.split();
45
46 let subscription = Subscription {
47 view: self.view.clone(),
48 key: self.key,
49 partition: None,
50 };
51
52 tx.send(Message::Text(serde_json::to_string(&subscription)?))
53 .await?;
54
55 let mode = infer_mode_from_view(&self.view);
56 let store = EntityStore::new(mode);
57 let store_ref = store.clone();
58
59 let mut tx_clone = tx;
60 tokio::spawn(async move {
61 while let Some(Ok(msg)) = rx.next().await {
62 match msg {
63 Message::Binary(bytes) => {
64 let text = String::from_utf8_lossy(&bytes);
65 if let Ok(frame) = serde_json::from_str::<Frame>(&text) {
66 match frame.op.as_str() {
67 "patch" => {
68 store_ref.apply_patch(frame.key, frame.data).await;
69 }
70 "upsert" => {
71 store_ref.apply_upsert(frame.key, frame.data).await;
72 }
73 "delete" => {
74 store_ref.apply_delete(frame.key).await;
75 }
76 _ => {
77 store_ref.apply_upsert(frame.key, frame.data).await;
78 }
79 }
80 }
81 }
82 Message::Ping(payload) => {
83 let _ = tx_clone.send(Message::Pong(payload)).await;
84 }
85 Message::Close(_) => break,
86 _ => {}
87 }
88 }
89 });
90
91 Ok(store)
92 }
93}
94
95fn infer_mode_from_view(view: &str) -> Mode {
96 if view.ends_with("/state") {
97 Mode::State
98 } else if view.ends_with("/list") {
99 Mode::List
100 } else if view.ends_with("/append") {
101 Mode::Append
102 } else {
103 Mode::Kv
104 }
105}