1use crate::config::{ConnectionConfig, HyperStackConfig};
2use crate::connection::{ConnectionManager, ConnectionState};
3use crate::entity::Entity;
4use crate::error::HyperStackError;
5use crate::frame::Frame;
6use crate::store::SharedStore;
7use crate::stream::EntityStream;
8use std::time::Duration;
9use tokio::sync::mpsc;
10
11pub struct HyperStack {
12 connection: ConnectionManager,
13 store: SharedStore,
14 #[allow(dead_code)]
15 config: HyperStackConfig,
16}
17
18impl HyperStack {
19 pub fn builder() -> HyperStackBuilder {
20 HyperStackBuilder::default()
21 }
22
23 pub async fn connect(url: &str) -> Result<Self, HyperStackError> {
24 Self::builder().url(url).connect().await
25 }
26
27 pub async fn get<E: Entity>(&self, key: &str) -> Option<E::Data> {
28 self.connection
29 .ensure_subscription(E::state_view(), Some(key))
30 .await;
31 tokio::time::sleep(Duration::from_millis(100)).await;
32 self.store.get::<E::Data>(E::NAME, key).await
33 }
34
35 pub async fn list<E: Entity>(&self) -> Vec<E::Data> {
36 self.connection
37 .ensure_subscription(E::list_view(), None)
38 .await;
39 tokio::time::sleep(Duration::from_millis(100)).await;
40 self.store.list::<E::Data>(E::NAME).await
41 }
42
43 pub async fn watch<E: Entity>(&self) -> EntityStream<E::Data> {
44 self.connection
45 .ensure_subscription(E::list_view(), None)
46 .await;
47 EntityStream::new(self.store.subscribe(), E::NAME.to_string())
48 }
49
50 pub async fn watch_key<E: Entity>(&self, key: &str) -> EntityStream<E::Data> {
51 self.connection
52 .ensure_subscription(E::kv_view(), Some(key))
53 .await;
54 EntityStream::new_filtered(self.store.subscribe(), E::NAME.to_string(), key.to_string())
55 }
56
57 pub async fn connection_state(&self) -> ConnectionState {
58 self.connection.state().await
59 }
60
61 pub async fn disconnect(&self) {
62 self.connection.disconnect().await;
63 }
64
65 pub fn store(&self) -> &SharedStore {
66 &self.store
67 }
68}
69
70#[derive(Default)]
71pub struct HyperStackBuilder {
72 url: Option<String>,
73 config: HyperStackConfig,
74}
75
76impl HyperStackBuilder {
77 pub fn url(mut self, url: &str) -> Self {
78 self.url = Some(url.to_string());
79 self
80 }
81
82 pub fn auto_reconnect(mut self, enabled: bool) -> Self {
83 self.config.auto_reconnect = enabled;
84 self
85 }
86
87 pub fn reconnect_intervals(mut self, intervals: Vec<Duration>) -> Self {
88 self.config.reconnect_intervals = intervals;
89 self
90 }
91
92 pub fn max_reconnect_attempts(mut self, max: u32) -> Self {
93 self.config.max_reconnect_attempts = max;
94 self
95 }
96
97 pub fn ping_interval(mut self, interval: Duration) -> Self {
98 self.config.ping_interval = interval;
99 self
100 }
101
102 pub async fn connect(self) -> Result<HyperStack, HyperStackError> {
103 let url = self.url.ok_or(HyperStackError::MissingUrl)?;
104 let store = SharedStore::new();
105 let store_clone = store.clone();
106
107 let (frame_tx, mut frame_rx) = mpsc::channel::<Frame>(1000);
108
109 let connection_config: ConnectionConfig = self.config.clone().into();
110 let connection = ConnectionManager::new(url, connection_config, frame_tx).await;
111
112 tokio::spawn(async move {
113 while let Some(frame) = frame_rx.recv().await {
114 store_clone.apply_frame(frame).await;
115 }
116 });
117
118 Ok(HyperStack {
119 connection,
120 store,
121 config: self.config,
122 })
123 }
124}