hyperstack_sdk/
client.rs

1use crate::config::{ConnectionConfig, HyperStackConfig};
2use crate::connection::{ConnectionManager, ConnectionState};
3use crate::entity::{Entity, EntityData};
4use crate::error::HyperStackError;
5use crate::frame::Frame;
6use crate::store::{SharedStore, StoreConfig};
7use crate::stream::{EntityStream, KeyFilter, RichEntityStream};
8use crate::view::{ViewBuilder, Views};
9use std::time::Duration;
10use tokio::sync::mpsc;
11
12pub struct HyperStack {
13    connection: ConnectionManager,
14    store: SharedStore,
15    #[allow(dead_code)]
16    config: HyperStackConfig,
17}
18
19impl HyperStack {
20    pub fn builder() -> HyperStackBuilder {
21        HyperStackBuilder::default()
22    }
23
24    pub async fn connect(url: &str) -> Result<Self, HyperStackError> {
25        Self::builder().url(url).connect().await
26    }
27
28    pub async fn get<E: Entity>(&self, key: &str) -> Option<E::Data> {
29        self.connection
30            .ensure_subscription(E::state_view(), Some(key))
31            .await;
32        self.store
33            .wait_for_view_ready(E::NAME, self.config.initial_data_timeout)
34            .await;
35        self.store.get::<E::Data>(E::NAME, key).await
36    }
37
38    pub async fn list<E: Entity>(&self) -> Vec<E::Data> {
39        self.connection
40            .ensure_subscription(E::list_view(), None)
41            .await;
42        self.store
43            .wait_for_view_ready(E::NAME, self.config.initial_data_timeout)
44            .await;
45        self.store.list::<E::Data>(E::NAME).await
46    }
47
48    pub fn watch<E: Entity>(&self) -> EntityStream<E::Data> {
49        EntityStream::new_lazy(
50            self.connection.clone(),
51            self.store.clone(),
52            E::NAME.to_string(),
53            E::list_view().to_string(),
54            KeyFilter::None,
55            None,
56        )
57    }
58
59    pub fn watch_key<E: Entity>(&self, key: &str) -> EntityStream<E::Data> {
60        EntityStream::new_lazy(
61            self.connection.clone(),
62            self.store.clone(),
63            E::NAME.to_string(),
64            E::list_view().to_string(),
65            KeyFilter::Single(key.to_string()),
66            Some(key.to_string()),
67        )
68    }
69
70    pub fn watch_keys<E: Entity>(&self, keys: &[&str]) -> EntityStream<E::Data> {
71        EntityStream::new_lazy(
72            self.connection.clone(),
73            self.store.clone(),
74            E::NAME.to_string(),
75            E::list_view().to_string(),
76            KeyFilter::Multiple(keys.iter().map(|s| s.to_string()).collect()),
77            None,
78        )
79    }
80
81    pub fn watch_rich<E: Entity>(&self) -> RichEntityStream<E::Data> {
82        RichEntityStream::new_lazy(
83            self.connection.clone(),
84            self.store.clone(),
85            E::NAME.to_string(),
86            E::list_view().to_string(),
87            KeyFilter::None,
88            None,
89        )
90    }
91
92    pub fn watch_key_rich<E: Entity>(&self, key: &str) -> RichEntityStream<E::Data> {
93        RichEntityStream::new_lazy(
94            self.connection.clone(),
95            self.store.clone(),
96            E::NAME.to_string(),
97            E::list_view().to_string(),
98            KeyFilter::Single(key.to_string()),
99            Some(key.to_string()),
100        )
101    }
102
103    pub async fn get_data<D: EntityData>(&self, key: &str) -> Option<D> {
104        self.get::<D::Entity>(key).await
105    }
106
107    pub async fn list_data<D: EntityData>(&self) -> Vec<D> {
108        self.list::<D::Entity>().await
109    }
110
111    pub fn watch_data<D: EntityData>(&self) -> EntityStream<D> {
112        self.watch::<D::Entity>()
113    }
114
115    pub fn watch_key_data<D: EntityData>(&self, key: &str) -> EntityStream<D> {
116        self.watch_key::<D::Entity>(key)
117    }
118
119    pub async fn connection_state(&self) -> ConnectionState {
120        self.connection.state().await
121    }
122
123    pub async fn disconnect(&self) {
124        self.connection.disconnect().await;
125    }
126
127    pub fn store(&self) -> &SharedStore {
128        &self.store
129    }
130
131    /// Create a view builder for constructing typed view accessors.
132    ///
133    /// This is used by generated code to create view accessor structs.
134    pub fn view_builder(&self) -> ViewBuilder {
135        ViewBuilder::new(
136            self.connection.clone(),
137            self.store.clone(),
138            self.config.initial_data_timeout,
139        )
140    }
141
142    /// Get a views accessor for the specified entity type.
143    ///
144    /// This provides a fluent API for accessing all views (state, list, derived)
145    /// for an entity.
146    ///
147    /// # Example
148    ///
149    /// ```ignore
150    /// use my_stack::OreRoundViews;
151    ///
152    /// let views = hs.views::<OreRoundViews>();
153    /// let latest = views.latest().get().await;
154    /// let all_rounds = views.list().get().await;
155    /// ```
156    pub fn views<V: Views>(&self) -> V {
157        V::from_builder(self.view_builder())
158    }
159}
160
161#[derive(Default)]
162pub struct HyperStackBuilder {
163    url: Option<String>,
164    config: HyperStackConfig,
165}
166
167impl HyperStackBuilder {
168    pub fn url(mut self, url: &str) -> Self {
169        self.url = Some(url.to_string());
170        self
171    }
172
173    pub fn auto_reconnect(mut self, enabled: bool) -> Self {
174        self.config.auto_reconnect = enabled;
175        self
176    }
177
178    pub fn reconnect_intervals(mut self, intervals: Vec<Duration>) -> Self {
179        self.config.reconnect_intervals = intervals;
180        self
181    }
182
183    pub fn max_reconnect_attempts(mut self, max: u32) -> Self {
184        self.config.max_reconnect_attempts = max;
185        self
186    }
187
188    pub fn ping_interval(mut self, interval: Duration) -> Self {
189        self.config.ping_interval = interval;
190        self
191    }
192
193    pub fn initial_data_timeout(mut self, timeout: Duration) -> Self {
194        self.config.initial_data_timeout = timeout;
195        self
196    }
197
198    pub fn max_entries_per_view(mut self, max: usize) -> Self {
199        self.config.max_entries_per_view = Some(max);
200        self
201    }
202
203    pub fn unlimited_entries(mut self) -> Self {
204        self.config.max_entries_per_view = None;
205        self
206    }
207
208    pub async fn connect(self) -> Result<HyperStack, HyperStackError> {
209        let url = self.url.ok_or(HyperStackError::MissingUrl)?;
210        let store_config = StoreConfig {
211            max_entries_per_view: self.config.max_entries_per_view,
212        };
213        let store = SharedStore::with_config(store_config);
214        let store_clone = store.clone();
215
216        let (frame_tx, mut frame_rx) = mpsc::channel::<Frame>(1000);
217
218        let connection_config: ConnectionConfig = self.config.clone().into();
219        let connection = ConnectionManager::new(url, connection_config, frame_tx).await;
220
221        tokio::spawn(async move {
222            while let Some(frame) = frame_rx.recv().await {
223                store_clone.apply_frame(frame).await;
224            }
225        });
226
227        Ok(HyperStack {
228            connection,
229            store,
230            config: self.config,
231        })
232    }
233}