hyperstack_sdk/
view.rs

1//! View abstractions for unified access to views.
2//!
3//! All views return collections (Vec<T>). Use `.first()` on the result
4//! if you need a single item.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use hyperstack_sdk::prelude::*;
10//! use my_stack::OreRoundViews;
11//!
12//! let hs = HyperStack::connect("wss://example.com").await?;
13//!
14//! // Access views through the generated views struct
15//! let views = OreRoundViews::new(&hs);
16//!
17//! // Get latest round - use .first() for single item
18//! let latest = views.latest().get().await.first().cloned();
19//!
20//! // List all rounds
21//! let rounds = views.list().get().await;
22//!
23//! // Get specific round by key
24//! let round = views.state().get("round_key").await;
25//!
26//! // Watch for updates
27//! let mut stream = views.latest().watch();
28//! while let Some(update) = stream.next().await {
29//!     println!("Latest round updated: {:?}", update);
30//! }
31//! ```
32
33use crate::connection::ConnectionManager;
34use crate::entity::Entity;
35use crate::store::SharedStore;
36use crate::stream::{EntityStream, KeyFilter, RichEntityStream, Update};
37use futures_util::Stream;
38use serde::de::DeserializeOwned;
39use serde::Serialize;
40use std::marker::PhantomData;
41use std::pin::Pin;
42use std::task::{Context, Poll};
43use std::time::Duration;
44
45/// A handle to a view that provides get/watch operations.
46///
47/// All views return collections (Vec<T>). Use `.first()` on the result
48/// if you need a single item from views with a `take` limit.
49pub struct ViewHandle<T> {
50    connection: ConnectionManager,
51    store: SharedStore,
52    view_path: String,
53    initial_data_timeout: Duration,
54    _marker: PhantomData<T>,
55}
56
57impl<T> ViewHandle<T>
58where
59    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
60{
61    /// Get all items from this view.
62    ///
63    /// For views with a `take` limit defined in the stack, this returns
64    /// up to that many items. Use `.first()` on the result if you need
65    /// a single item.
66    pub async fn get(&self) -> Vec<T> {
67        self.connection
68            .ensure_subscription(&self.view_path, None)
69            .await;
70        self.store
71            .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
72            .await;
73        self.store.list::<T>(&self.view_path).await
74    }
75
76    /// Watch for updates to this view. Chain `.take(n)` to limit results.
77    pub fn watch(&self) -> WatchBuilder<T>
78    where
79        T: Unpin,
80    {
81        WatchBuilder::new(
82            self.connection.clone(),
83            self.store.clone(),
84            self.view_path.clone(),
85            KeyFilter::None,
86        )
87    }
88
89    /// Watch for updates filtered to specific keys.
90    pub fn watch_keys(&self, keys: &[&str]) -> WatchBuilder<T>
91    where
92        T: Unpin,
93    {
94        WatchBuilder::new(
95            self.connection.clone(),
96            self.store.clone(),
97            self.view_path.clone(),
98            KeyFilter::Multiple(keys.iter().map(|s| s.to_string()).collect()),
99        )
100    }
101}
102
103/// Builder for configuring watch subscriptions. Implements `Stream` directly.
104pub struct WatchBuilder<T>
105where
106    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
107{
108    connection: ConnectionManager,
109    store: SharedStore,
110    view_path: String,
111    key_filter: KeyFilter,
112    take: Option<u32>,
113    skip: Option<u32>,
114    stream: Option<EntityStream<T>>,
115}
116
117impl<T> WatchBuilder<T>
118where
119    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
120{
121    fn new(
122        connection: ConnectionManager,
123        store: SharedStore,
124        view_path: String,
125        key_filter: KeyFilter,
126    ) -> Self {
127        Self {
128            connection,
129            store,
130            view_path,
131            key_filter,
132            take: None,
133            skip: None,
134            stream: None,
135        }
136    }
137
138    /// Limit subscription to the top N items.
139    pub fn take(mut self, n: u32) -> Self {
140        self.take = Some(n);
141        self
142    }
143
144    /// Skip the first N items.
145    pub fn skip(mut self, n: u32) -> Self {
146        self.skip = Some(n);
147        self
148    }
149
150    /// Get a rich stream with before/after diffs instead.
151    pub fn rich(self) -> RichEntityStream<T> {
152        RichEntityStream::new_lazy_with_opts(
153            self.connection,
154            self.store,
155            self.view_path.clone(),
156            self.view_path,
157            self.key_filter,
158            None,
159            self.take,
160            self.skip,
161        )
162    }
163}
164
165impl<T> Stream for WatchBuilder<T>
166where
167    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
168{
169    type Item = Update<T>;
170
171    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172        let this = self.get_mut();
173
174        if this.stream.is_none() {
175            this.stream = Some(EntityStream::new_lazy_with_opts(
176                this.connection.clone(),
177                this.store.clone(),
178                this.view_path.clone(),
179                this.view_path.clone(),
180                this.key_filter.clone(),
181                None,
182                this.take,
183                this.skip,
184            ));
185        }
186
187        Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
188    }
189}
190
191/// Builder for creating view handles.
192///
193/// This is used internally by generated code to create properly configured view handles.
194pub struct ViewBuilder {
195    connection: ConnectionManager,
196    store: SharedStore,
197    initial_data_timeout: Duration,
198}
199
200impl ViewBuilder {
201    pub fn new(
202        connection: ConnectionManager,
203        store: SharedStore,
204        initial_data_timeout: Duration,
205    ) -> Self {
206        Self {
207            connection,
208            store,
209            initial_data_timeout,
210        }
211    }
212
213    pub fn connection(&self) -> &ConnectionManager {
214        &self.connection
215    }
216
217    pub fn store(&self) -> &SharedStore {
218        &self.store
219    }
220
221    pub fn initial_data_timeout(&self) -> Duration {
222        self.initial_data_timeout
223    }
224
225    /// Create a view handle.
226    pub fn view<T>(&self, view_path: &str) -> ViewHandle<T>
227    where
228        T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
229    {
230        ViewHandle {
231            connection: self.connection.clone(),
232            store: self.store.clone(),
233            view_path: view_path.to_string(),
234            initial_data_timeout: self.initial_data_timeout,
235            _marker: PhantomData,
236        }
237    }
238}
239
240/// Trait for generated view accessor structs.
241///
242/// This trait is implemented by generated code (e.g., `OreRoundViews`) to provide
243/// type-safe access to all views for an entity.
244pub trait Views: Sized {
245    type Entity: Entity;
246
247    fn from_builder(builder: ViewBuilder) -> Self;
248}
249
250/// A state view handle that requires a key for access.
251pub struct StateView<T> {
252    connection: ConnectionManager,
253    store: SharedStore,
254    view_path: String,
255    initial_data_timeout: Duration,
256    _marker: PhantomData<T>,
257}
258
259impl<T> StateView<T>
260where
261    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
262{
263    pub fn new(
264        connection: ConnectionManager,
265        store: SharedStore,
266        view_path: String,
267        initial_data_timeout: Duration,
268    ) -> Self {
269        Self {
270            connection,
271            store,
272            view_path,
273            initial_data_timeout,
274            _marker: PhantomData,
275        }
276    }
277
278    /// Get an entity by key.
279    pub async fn get(&self, key: &str) -> Option<T> {
280        self.connection
281            .ensure_subscription(&self.view_path, Some(key))
282            .await;
283        self.store
284            .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
285            .await;
286        self.store.get::<T>(&self.view_path, key).await
287    }
288
289    /// Watch for updates to a specific key.
290    pub fn watch(&self, key: &str) -> EntityStream<T> {
291        EntityStream::new_lazy(
292            self.connection.clone(),
293            self.store.clone(),
294            self.view_path.clone(),
295            self.view_path.clone(),
296            KeyFilter::Single(key.to_string()),
297            Some(key.to_string()),
298        )
299    }
300
301    /// Watch for updates with before/after diffs.
302    pub fn watch_rich(&self, key: &str) -> RichEntityStream<T> {
303        RichEntityStream::new_lazy(
304            self.connection.clone(),
305            self.store.clone(),
306            self.view_path.clone(),
307            self.view_path.clone(),
308            KeyFilter::Single(key.to_string()),
309            Some(key.to_string()),
310        )
311    }
312}