1use crate::config::{ConnectionConfig, HyperStackConfig};
2use crate::connection::{ConnectionManager, ConnectionState};
3use crate::entity::{Entity, EntityData};
4use crate::error::HyperStackError;
5use crate::frame::Frame;
6use crate::store::{SharedStore, StoreConfig};
7use crate::stream::{EntityStream, KeyFilter, RichEntityStream};
8use std::time::Duration;
9use tokio::sync::mpsc;
10
11pub struct HyperStack {
12 connection: ConnectionManager,
13 store: SharedStore,
14 #[allow(dead_code)]
15 config: HyperStackConfig,
16}
17
18impl HyperStack {
19 pub fn builder() -> HyperStackBuilder {
20 HyperStackBuilder::default()
21 }
22
23 pub async fn connect(url: &str) -> Result<Self, HyperStackError> {
24 Self::builder().url(url).connect().await
25 }
26
27 pub async fn get<E: Entity>(&self, key: &str) -> Option<E::Data> {
28 self.connection
29 .ensure_subscription(E::state_view(), Some(key))
30 .await;
31 self.store
32 .wait_for_view_ready(E::NAME, self.config.initial_data_timeout)
33 .await;
34 self.store.get::<E::Data>(E::NAME, key).await
35 }
36
37 pub async fn list<E: Entity>(&self) -> Vec<E::Data> {
38 self.connection
39 .ensure_subscription(E::list_view(), None)
40 .await;
41 self.store
42 .wait_for_view_ready(E::NAME, self.config.initial_data_timeout)
43 .await;
44 self.store.list::<E::Data>(E::NAME).await
45 }
46
47 pub fn watch<E: Entity>(&self) -> EntityStream<E::Data> {
48 EntityStream::new_lazy(
49 self.connection.clone(),
50 self.store.clone(),
51 E::NAME.to_string(),
52 E::list_view().to_string(),
53 KeyFilter::None,
54 None,
55 )
56 }
57
58 pub fn watch_key<E: Entity>(&self, key: &str) -> EntityStream<E::Data> {
59 EntityStream::new_lazy(
60 self.connection.clone(),
61 self.store.clone(),
62 E::NAME.to_string(),
63 E::list_view().to_string(),
64 KeyFilter::Single(key.to_string()),
65 Some(key.to_string()),
66 )
67 }
68
69 pub fn watch_keys<E: Entity>(&self, keys: &[&str]) -> EntityStream<E::Data> {
70 EntityStream::new_lazy(
71 self.connection.clone(),
72 self.store.clone(),
73 E::NAME.to_string(),
74 E::list_view().to_string(),
75 KeyFilter::Multiple(keys.iter().map(|s| s.to_string()).collect()),
76 None,
77 )
78 }
79
80 pub fn watch_rich<E: Entity>(&self) -> RichEntityStream<E::Data> {
81 RichEntityStream::new_lazy(
82 self.connection.clone(),
83 self.store.clone(),
84 E::NAME.to_string(),
85 E::list_view().to_string(),
86 KeyFilter::None,
87 None,
88 )
89 }
90
91 pub fn watch_key_rich<E: Entity>(&self, key: &str) -> RichEntityStream<E::Data> {
92 RichEntityStream::new_lazy(
93 self.connection.clone(),
94 self.store.clone(),
95 E::NAME.to_string(),
96 E::list_view().to_string(),
97 KeyFilter::Single(key.to_string()),
98 Some(key.to_string()),
99 )
100 }
101
102 pub async fn get_data<D: EntityData>(&self, key: &str) -> Option<D> {
103 self.get::<D::Entity>(key).await
104 }
105
106 pub async fn list_data<D: EntityData>(&self) -> Vec<D> {
107 self.list::<D::Entity>().await
108 }
109
110 pub fn watch_data<D: EntityData>(&self) -> EntityStream<D> {
111 self.watch::<D::Entity>()
112 }
113
114 pub fn watch_key_data<D: EntityData>(&self, key: &str) -> EntityStream<D> {
115 self.watch_key::<D::Entity>(key)
116 }
117
118 pub async fn connection_state(&self) -> ConnectionState {
119 self.connection.state().await
120 }
121
122 pub async fn disconnect(&self) {
123 self.connection.disconnect().await;
124 }
125
126 pub fn store(&self) -> &SharedStore {
127 &self.store
128 }
129}
130
131#[derive(Default)]
132pub struct HyperStackBuilder {
133 url: Option<String>,
134 config: HyperStackConfig,
135}
136
137impl HyperStackBuilder {
138 pub fn url(mut self, url: &str) -> Self {
139 self.url = Some(url.to_string());
140 self
141 }
142
143 pub fn auto_reconnect(mut self, enabled: bool) -> Self {
144 self.config.auto_reconnect = enabled;
145 self
146 }
147
148 pub fn reconnect_intervals(mut self, intervals: Vec<Duration>) -> Self {
149 self.config.reconnect_intervals = intervals;
150 self
151 }
152
153 pub fn max_reconnect_attempts(mut self, max: u32) -> Self {
154 self.config.max_reconnect_attempts = max;
155 self
156 }
157
158 pub fn ping_interval(mut self, interval: Duration) -> Self {
159 self.config.ping_interval = interval;
160 self
161 }
162
163 pub fn initial_data_timeout(mut self, timeout: Duration) -> Self {
164 self.config.initial_data_timeout = timeout;
165 self
166 }
167
168 pub fn max_entries_per_view(mut self, max: usize) -> Self {
169 self.config.max_entries_per_view = Some(max);
170 self
171 }
172
173 pub fn unlimited_entries(mut self) -> Self {
174 self.config.max_entries_per_view = None;
175 self
176 }
177
178 pub async fn connect(self) -> Result<HyperStack, HyperStackError> {
179 let url = self.url.ok_or(HyperStackError::MissingUrl)?;
180 let store_config = StoreConfig {
181 max_entries_per_view: self.config.max_entries_per_view,
182 };
183 let store = SharedStore::with_config(store_config);
184 let store_clone = store.clone();
185
186 let (frame_tx, mut frame_rx) = mpsc::channel::<Frame>(1000);
187
188 let connection_config: ConnectionConfig = self.config.clone().into();
189 let connection = ConnectionManager::new(url, connection_config, frame_tx).await;
190
191 tokio::spawn(async move {
192 while let Some(frame) = frame_rx.recv().await {
193 store_clone.apply_frame(frame).await;
194 }
195 });
196
197 Ok(HyperStack {
198 connection,
199 store,
200 config: self.config,
201 })
202 }
203}