hyperstack_sdk/
client.rs

1use crate::config::{ConnectionConfig, HyperStackConfig};
2use crate::connection::{ConnectionManager, ConnectionState};
3use crate::entity::{Entity, EntityData};
4use crate::error::HyperStackError;
5use crate::frame::Frame;
6use crate::store::{SharedStore, StoreConfig};
7use crate::stream::{EntityStream, KeyFilter, RichEntityStream};
8use std::time::Duration;
9use tokio::sync::mpsc;
10
11pub struct HyperStack {
12    connection: ConnectionManager,
13    store: SharedStore,
14    #[allow(dead_code)]
15    config: HyperStackConfig,
16}
17
18impl HyperStack {
19    pub fn builder() -> HyperStackBuilder {
20        HyperStackBuilder::default()
21    }
22
23    pub async fn connect(url: &str) -> Result<Self, HyperStackError> {
24        Self::builder().url(url).connect().await
25    }
26
27    pub async fn get<E: Entity>(&self, key: &str) -> Option<E::Data> {
28        self.connection
29            .ensure_subscription(E::state_view(), Some(key))
30            .await;
31        self.store
32            .wait_for_view_ready(E::NAME, self.config.initial_data_timeout)
33            .await;
34        self.store.get::<E::Data>(E::NAME, key).await
35    }
36
37    pub async fn list<E: Entity>(&self) -> Vec<E::Data> {
38        self.connection
39            .ensure_subscription(E::list_view(), None)
40            .await;
41        self.store
42            .wait_for_view_ready(E::NAME, self.config.initial_data_timeout)
43            .await;
44        self.store.list::<E::Data>(E::NAME).await
45    }
46
47    pub fn watch<E: Entity>(&self) -> EntityStream<E::Data> {
48        EntityStream::new_lazy(
49            self.connection.clone(),
50            self.store.clone(),
51            E::NAME.to_string(),
52            E::list_view().to_string(),
53            KeyFilter::None,
54            None,
55        )
56    }
57
58    pub fn watch_key<E: Entity>(&self, key: &str) -> EntityStream<E::Data> {
59        EntityStream::new_lazy(
60            self.connection.clone(),
61            self.store.clone(),
62            E::NAME.to_string(),
63            E::list_view().to_string(),
64            KeyFilter::Single(key.to_string()),
65            Some(key.to_string()),
66        )
67    }
68
69    pub fn watch_keys<E: Entity>(&self, keys: &[&str]) -> EntityStream<E::Data> {
70        EntityStream::new_lazy(
71            self.connection.clone(),
72            self.store.clone(),
73            E::NAME.to_string(),
74            E::list_view().to_string(),
75            KeyFilter::Multiple(keys.iter().map(|s| s.to_string()).collect()),
76            None,
77        )
78    }
79
80    pub fn watch_rich<E: Entity>(&self) -> RichEntityStream<E::Data> {
81        RichEntityStream::new_lazy(
82            self.connection.clone(),
83            self.store.clone(),
84            E::NAME.to_string(),
85            E::list_view().to_string(),
86            KeyFilter::None,
87            None,
88        )
89    }
90
91    pub fn watch_key_rich<E: Entity>(&self, key: &str) -> RichEntityStream<E::Data> {
92        RichEntityStream::new_lazy(
93            self.connection.clone(),
94            self.store.clone(),
95            E::NAME.to_string(),
96            E::list_view().to_string(),
97            KeyFilter::Single(key.to_string()),
98            Some(key.to_string()),
99        )
100    }
101
102    pub async fn get_data<D: EntityData>(&self, key: &str) -> Option<D> {
103        self.get::<D::Entity>(key).await
104    }
105
106    pub async fn list_data<D: EntityData>(&self) -> Vec<D> {
107        self.list::<D::Entity>().await
108    }
109
110    pub fn watch_data<D: EntityData>(&self) -> EntityStream<D> {
111        self.watch::<D::Entity>()
112    }
113
114    pub fn watch_key_data<D: EntityData>(&self, key: &str) -> EntityStream<D> {
115        self.watch_key::<D::Entity>(key)
116    }
117
118    pub async fn connection_state(&self) -> ConnectionState {
119        self.connection.state().await
120    }
121
122    pub async fn disconnect(&self) {
123        self.connection.disconnect().await;
124    }
125
126    pub fn store(&self) -> &SharedStore {
127        &self.store
128    }
129}
130
131#[derive(Default)]
132pub struct HyperStackBuilder {
133    url: Option<String>,
134    config: HyperStackConfig,
135}
136
137impl HyperStackBuilder {
138    pub fn url(mut self, url: &str) -> Self {
139        self.url = Some(url.to_string());
140        self
141    }
142
143    pub fn auto_reconnect(mut self, enabled: bool) -> Self {
144        self.config.auto_reconnect = enabled;
145        self
146    }
147
148    pub fn reconnect_intervals(mut self, intervals: Vec<Duration>) -> Self {
149        self.config.reconnect_intervals = intervals;
150        self
151    }
152
153    pub fn max_reconnect_attempts(mut self, max: u32) -> Self {
154        self.config.max_reconnect_attempts = max;
155        self
156    }
157
158    pub fn ping_interval(mut self, interval: Duration) -> Self {
159        self.config.ping_interval = interval;
160        self
161    }
162
163    pub fn initial_data_timeout(mut self, timeout: Duration) -> Self {
164        self.config.initial_data_timeout = timeout;
165        self
166    }
167
168    pub fn max_entries_per_view(mut self, max: usize) -> Self {
169        self.config.max_entries_per_view = Some(max);
170        self
171    }
172
173    pub fn unlimited_entries(mut self) -> Self {
174        self.config.max_entries_per_view = None;
175        self
176    }
177
178    pub async fn connect(self) -> Result<HyperStack, HyperStackError> {
179        let url = self.url.ok_or(HyperStackError::MissingUrl)?;
180        let store_config = StoreConfig {
181            max_entries_per_view: self.config.max_entries_per_view,
182        };
183        let store = SharedStore::with_config(store_config);
184        let store_clone = store.clone();
185
186        let (frame_tx, mut frame_rx) = mpsc::channel::<Frame>(1000);
187
188        let connection_config: ConnectionConfig = self.config.clone().into();
189        let connection = ConnectionManager::new(url, connection_config, frame_tx).await;
190
191        tokio::spawn(async move {
192            while let Some(frame) = frame_rx.recv().await {
193                store_clone.apply_frame(frame).await;
194            }
195        });
196
197        Ok(HyperStack {
198            connection,
199            store,
200            config: self.config,
201        })
202    }
203}