1use crate::config::{ConnectionConfig, HyperStackConfig};
2use crate::connection::{ConnectionManager, ConnectionState};
3use crate::entity::{Entity, EntityData};
4use crate::error::HyperStackError;
5use crate::frame::Frame;
6use crate::store::{SharedStore, StoreConfig};
7use crate::stream::{EntityStream, KeyFilter, RichEntityStream};
8use crate::view::{ViewBuilder, Views};
9use std::time::Duration;
10use tokio::sync::mpsc;
11
12pub struct HyperStack {
13 connection: ConnectionManager,
14 store: SharedStore,
15 #[allow(dead_code)]
16 config: HyperStackConfig,
17}
18
19impl HyperStack {
20 pub fn builder() -> HyperStackBuilder {
21 HyperStackBuilder::default()
22 }
23
24 pub async fn connect(url: &str) -> Result<Self, HyperStackError> {
25 Self::builder().url(url).connect().await
26 }
27
28 pub async fn get<E: Entity>(&self, key: &str) -> Option<E::Data> {
29 self.connection
30 .ensure_subscription(E::state_view(), Some(key))
31 .await;
32 self.store
33 .wait_for_view_ready(E::NAME, self.config.initial_data_timeout)
34 .await;
35 self.store.get::<E::Data>(E::NAME, key).await
36 }
37
38 pub async fn list<E: Entity>(&self) -> Vec<E::Data> {
39 self.connection
40 .ensure_subscription(E::list_view(), None)
41 .await;
42 self.store
43 .wait_for_view_ready(E::NAME, self.config.initial_data_timeout)
44 .await;
45 self.store.list::<E::Data>(E::NAME).await
46 }
47
48 pub fn watch<E: Entity>(&self) -> EntityStream<E::Data> {
49 EntityStream::new_lazy(
50 self.connection.clone(),
51 self.store.clone(),
52 E::NAME.to_string(),
53 E::list_view().to_string(),
54 KeyFilter::None,
55 None,
56 )
57 }
58
59 pub fn watch_key<E: Entity>(&self, key: &str) -> EntityStream<E::Data> {
60 EntityStream::new_lazy(
61 self.connection.clone(),
62 self.store.clone(),
63 E::NAME.to_string(),
64 E::list_view().to_string(),
65 KeyFilter::Single(key.to_string()),
66 Some(key.to_string()),
67 )
68 }
69
70 pub fn watch_keys<E: Entity>(&self, keys: &[&str]) -> EntityStream<E::Data> {
71 EntityStream::new_lazy(
72 self.connection.clone(),
73 self.store.clone(),
74 E::NAME.to_string(),
75 E::list_view().to_string(),
76 KeyFilter::Multiple(keys.iter().map(|s| s.to_string()).collect()),
77 None,
78 )
79 }
80
81 pub fn watch_rich<E: Entity>(&self) -> RichEntityStream<E::Data> {
82 RichEntityStream::new_lazy(
83 self.connection.clone(),
84 self.store.clone(),
85 E::NAME.to_string(),
86 E::list_view().to_string(),
87 KeyFilter::None,
88 None,
89 )
90 }
91
92 pub fn watch_key_rich<E: Entity>(&self, key: &str) -> RichEntityStream<E::Data> {
93 RichEntityStream::new_lazy(
94 self.connection.clone(),
95 self.store.clone(),
96 E::NAME.to_string(),
97 E::list_view().to_string(),
98 KeyFilter::Single(key.to_string()),
99 Some(key.to_string()),
100 )
101 }
102
103 pub async fn get_data<D: EntityData>(&self, key: &str) -> Option<D> {
104 self.get::<D::Entity>(key).await
105 }
106
107 pub async fn list_data<D: EntityData>(&self) -> Vec<D> {
108 self.list::<D::Entity>().await
109 }
110
111 pub fn watch_data<D: EntityData>(&self) -> EntityStream<D> {
112 self.watch::<D::Entity>()
113 }
114
115 pub fn watch_key_data<D: EntityData>(&self, key: &str) -> EntityStream<D> {
116 self.watch_key::<D::Entity>(key)
117 }
118
119 pub async fn connection_state(&self) -> ConnectionState {
120 self.connection.state().await
121 }
122
123 pub async fn disconnect(&self) {
124 self.connection.disconnect().await;
125 }
126
127 pub fn store(&self) -> &SharedStore {
128 &self.store
129 }
130
131 pub fn view_builder(&self) -> ViewBuilder {
135 ViewBuilder::new(
136 self.connection.clone(),
137 self.store.clone(),
138 self.config.initial_data_timeout,
139 )
140 }
141
142 pub fn views<V: Views>(&self) -> V {
157 V::from_builder(self.view_builder())
158 }
159}
160
161#[derive(Default)]
162pub struct HyperStackBuilder {
163 url: Option<String>,
164 config: HyperStackConfig,
165}
166
167impl HyperStackBuilder {
168 pub fn url(mut self, url: &str) -> Self {
169 self.url = Some(url.to_string());
170 self
171 }
172
173 pub fn auto_reconnect(mut self, enabled: bool) -> Self {
174 self.config.auto_reconnect = enabled;
175 self
176 }
177
178 pub fn reconnect_intervals(mut self, intervals: Vec<Duration>) -> Self {
179 self.config.reconnect_intervals = intervals;
180 self
181 }
182
183 pub fn max_reconnect_attempts(mut self, max: u32) -> Self {
184 self.config.max_reconnect_attempts = max;
185 self
186 }
187
188 pub fn ping_interval(mut self, interval: Duration) -> Self {
189 self.config.ping_interval = interval;
190 self
191 }
192
193 pub fn initial_data_timeout(mut self, timeout: Duration) -> Self {
194 self.config.initial_data_timeout = timeout;
195 self
196 }
197
198 pub fn max_entries_per_view(mut self, max: usize) -> Self {
199 self.config.max_entries_per_view = Some(max);
200 self
201 }
202
203 pub fn unlimited_entries(mut self) -> Self {
204 self.config.max_entries_per_view = None;
205 self
206 }
207
208 pub async fn connect(self) -> Result<HyperStack, HyperStackError> {
209 let url = self.url.ok_or(HyperStackError::MissingUrl)?;
210 let store_config = StoreConfig {
211 max_entries_per_view: self.config.max_entries_per_view,
212 };
213 let store = SharedStore::with_config(store_config);
214 let store_clone = store.clone();
215
216 let (frame_tx, mut frame_rx) = mpsc::channel::<Frame>(1000);
217
218 let connection_config: ConnectionConfig = self.config.clone().into();
219 let connection = ConnectionManager::new(url, connection_config, frame_tx).await;
220
221 tokio::spawn(async move {
222 while let Some(frame) = frame_rx.recv().await {
223 store_clone.apply_frame(frame).await;
224 }
225 });
226
227 Ok(HyperStack {
228 connection,
229 store,
230 config: self.config,
231 })
232 }
233}