Skip to main content

hyperstack_sdk/
view.rs

1//! View abstractions for unified access to views.
2//!
3//! All views return collections (Vec<T>). Use `.first()` on the result
4//! if you need a single item.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use hyperstack_sdk::prelude::*;
10//! use my_stack::OreRoundViews;
11//!
12//! let hs = HyperStack::connect("wss://example.com").await?;
13//!
14//! // Access views through the generated views struct
15//! let views = OreRoundViews::new(&hs);
16//!
17//! // Get latest round - use .first() for single item
18//! let latest = views.latest().get().await.first().cloned();
19//!
20//! // List all rounds
21//! let rounds = views.list().get().await;
22//!
23//! // Get specific round by key
24//! let round = views.state().get("round_key").await;
25//!
26//! // Watch for updates
27//! let mut stream = views.latest().watch();
28//! while let Some(update) = stream.next().await {
29//!     println!("Latest round updated: {:?}", update);
30//! }
31//! ```
32
33use crate::connection::ConnectionManager;
34use crate::store::SharedStore;
35use crate::stream::{EntityStream, KeyFilter, RichEntityStream, Update, UseStream};
36use futures_util::Stream;
37use serde::de::DeserializeOwned;
38use serde::Serialize;
39use std::collections::HashMap;
40use std::marker::PhantomData;
41use std::pin::Pin;
42use std::task::{Context, Poll};
43use std::time::Duration;
44
45/// A handle to a view that provides get/watch operations.
46///
47/// All views return collections (Vec<T>). Use `.first()` on the result
48/// if you need a single item from views with a `take` limit.
49pub struct ViewHandle<T> {
50    connection: ConnectionManager,
51    store: SharedStore,
52    view_path: String,
53    initial_data_timeout: Duration,
54    _marker: PhantomData<T>,
55}
56
57impl<T> ViewHandle<T>
58where
59    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
60{
61    /// Get all items from this view.
62    ///
63    /// For views with a `take` limit defined in the stack, this returns
64    /// up to that many items. Use `.first()` on the result if you need
65    /// a single item.
66    pub async fn get(&self) -> Vec<T> {
67        self.connection
68            .ensure_subscription(&self.view_path, None)
69            .await;
70        self.store
71            .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
72            .await;
73        self.store.list::<T>(&self.view_path).await
74    }
75
76    /// Synchronously get all items from cached data.
77    ///
78    /// Returns cached data immediately without waiting for subscription.
79    /// Returns empty vector if data not yet loaded or lock unavailable.
80    pub fn get_sync(&self) -> Vec<T> {
81        self.store.list_sync::<T>(&self.view_path)
82    }
83
84    /// Stream merged entities directly (simplest API - filters out deletes).
85    ///
86    /// Emits `T` after each change. Patches are merged to give full entity state.
87    /// Deletes are filtered out. Use `.watch()` if you need delete notifications.
88    pub fn listen(&self) -> UseBuilder<T>
89    where
90        T: Unpin,
91    {
92        UseBuilder::new(
93            self.connection.clone(),
94            self.store.clone(),
95            self.view_path.clone(),
96            KeyFilter::None,
97        )
98    }
99
100    /// Watch for updates to this view. Chain `.take(n)` to limit results.
101    pub fn watch(&self) -> WatchBuilder<T>
102    where
103        T: Unpin,
104    {
105        WatchBuilder::new(
106            self.connection.clone(),
107            self.store.clone(),
108            self.view_path.clone(),
109            KeyFilter::None,
110        )
111    }
112
113    /// Watch for updates with before/after diffs.
114    pub fn watch_rich(&self) -> RichWatchBuilder<T>
115    where
116        T: Unpin,
117    {
118        RichWatchBuilder::new(
119            self.connection.clone(),
120            self.store.clone(),
121            self.view_path.clone(),
122            KeyFilter::None,
123        )
124    }
125
126    /// Watch for updates filtered to specific keys.
127    pub fn watch_keys(&self, keys: &[&str]) -> WatchBuilder<T>
128    where
129        T: Unpin,
130    {
131        WatchBuilder::new(
132            self.connection.clone(),
133            self.store.clone(),
134            self.view_path.clone(),
135            KeyFilter::Multiple(keys.iter().map(|s| s.to_string()).collect()),
136        )
137    }
138}
139
140/// Builder for `.use()` subscriptions that emit `T` directly. Implements `Stream`.
141pub struct UseBuilder<T>
142where
143    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
144{
145    connection: ConnectionManager,
146    store: SharedStore,
147    view_path: String,
148    key_filter: KeyFilter,
149    take: Option<u32>,
150    skip: Option<u32>,
151    filters: Option<HashMap<String, String>>,
152    stream: Option<UseStream<T>>,
153}
154
155impl<T> UseBuilder<T>
156where
157    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
158{
159    fn new(
160        connection: ConnectionManager,
161        store: SharedStore,
162        view_path: String,
163        key_filter: KeyFilter,
164    ) -> Self {
165        Self {
166            connection,
167            store,
168            view_path,
169            key_filter,
170            take: None,
171            skip: None,
172            filters: None,
173            stream: None,
174        }
175    }
176
177    /// Limit subscription to the top N items.
178    pub fn take(mut self, n: u32) -> Self {
179        self.take = Some(n);
180        self
181    }
182
183    /// Skip the first N items.
184    pub fn skip(mut self, n: u32) -> Self {
185        self.skip = Some(n);
186        self
187    }
188
189    /// Add a server-side filter.
190    pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
191        self.filters
192            .get_or_insert_with(HashMap::new)
193            .insert(key.into(), value.into());
194        self
195    }
196}
197
198impl<T> Stream for UseBuilder<T>
199where
200    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
201{
202    type Item = T;
203
204    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
205        let this = self.get_mut();
206
207        if this.stream.is_none() {
208            this.stream = Some(UseStream::new_lazy_with_opts(
209                this.connection.clone(),
210                this.store.clone(),
211                this.view_path.clone(),
212                this.view_path.clone(),
213                this.key_filter.clone(),
214                None,
215                this.take,
216                this.skip,
217            ));
218        }
219
220        Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
221    }
222}
223
224/// Builder for configuring watch subscriptions. Implements `Stream` directly.
225pub struct WatchBuilder<T>
226where
227    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
228{
229    connection: ConnectionManager,
230    store: SharedStore,
231    view_path: String,
232    key_filter: KeyFilter,
233    take: Option<u32>,
234    skip: Option<u32>,
235    filters: Option<HashMap<String, String>>,
236    stream: Option<EntityStream<T>>,
237}
238
239impl<T> WatchBuilder<T>
240where
241    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
242{
243    fn new(
244        connection: ConnectionManager,
245        store: SharedStore,
246        view_path: String,
247        key_filter: KeyFilter,
248    ) -> Self {
249        Self {
250            connection,
251            store,
252            view_path,
253            key_filter,
254            take: None,
255            skip: None,
256            filters: None,
257            stream: None,
258        }
259    }
260
261    /// Limit subscription to the top N items.
262    pub fn take(mut self, n: u32) -> Self {
263        self.take = Some(n);
264        self
265    }
266
267    /// Skip the first N items.
268    pub fn skip(mut self, n: u32) -> Self {
269        self.skip = Some(n);
270        self
271    }
272
273    /// Add a server-side filter.
274    pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
275        self.filters
276            .get_or_insert_with(HashMap::new)
277            .insert(key.into(), value.into());
278        self
279    }
280
281    /// Get a rich stream with before/after diffs instead.
282    pub fn rich(self) -> RichEntityStream<T> {
283        RichEntityStream::new_lazy_with_opts(
284            self.connection,
285            self.store,
286            self.view_path.clone(),
287            self.view_path,
288            self.key_filter,
289            None,
290            self.take,
291            self.skip,
292        )
293    }
294}
295
296impl<T> Stream for WatchBuilder<T>
297where
298    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
299{
300    type Item = Update<T>;
301
302    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
303        let this = self.get_mut();
304
305        if this.stream.is_none() {
306            this.stream = Some(EntityStream::new_lazy_with_opts(
307                this.connection.clone(),
308                this.store.clone(),
309                this.view_path.clone(),
310                this.view_path.clone(),
311                this.key_filter.clone(),
312                None,
313                this.take,
314                this.skip,
315            ));
316        }
317
318        Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
319    }
320}
321
322/// Builder for rich watch subscriptions with before/after diffs.
323pub struct RichWatchBuilder<T>
324where
325    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
326{
327    connection: ConnectionManager,
328    store: SharedStore,
329    view_path: String,
330    key_filter: KeyFilter,
331    take: Option<u32>,
332    skip: Option<u32>,
333    filters: Option<HashMap<String, String>>,
334    stream: Option<RichEntityStream<T>>,
335}
336
337impl<T> RichWatchBuilder<T>
338where
339    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
340{
341    fn new(
342        connection: ConnectionManager,
343        store: SharedStore,
344        view_path: String,
345        key_filter: KeyFilter,
346    ) -> Self {
347        Self {
348            connection,
349            store,
350            view_path,
351            key_filter,
352            take: None,
353            skip: None,
354            filters: None,
355            stream: None,
356        }
357    }
358
359    pub fn take(mut self, n: u32) -> Self {
360        self.take = Some(n);
361        self
362    }
363
364    pub fn skip(mut self, n: u32) -> Self {
365        self.skip = Some(n);
366        self
367    }
368
369    pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
370        self.filters
371            .get_or_insert_with(HashMap::new)
372            .insert(key.into(), value.into());
373        self
374    }
375}
376
377impl<T> Stream for RichWatchBuilder<T>
378where
379    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
380{
381    type Item = crate::stream::RichUpdate<T>;
382
383    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
384        let this = self.get_mut();
385
386        if this.stream.is_none() {
387            this.stream = Some(RichEntityStream::new_lazy_with_opts(
388                this.connection.clone(),
389                this.store.clone(),
390                this.view_path.clone(),
391                this.view_path.clone(),
392                this.key_filter.clone(),
393                None,
394                this.take,
395                this.skip,
396            ));
397        }
398
399        Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
400    }
401}
402
403/// Builder for creating view handles.
404///
405/// This is used internally by generated code to create properly configured view handles.
406pub struct ViewBuilder {
407    connection: ConnectionManager,
408    store: SharedStore,
409    initial_data_timeout: Duration,
410}
411
412impl ViewBuilder {
413    pub fn new(
414        connection: ConnectionManager,
415        store: SharedStore,
416        initial_data_timeout: Duration,
417    ) -> Self {
418        Self {
419            connection,
420            store,
421            initial_data_timeout,
422        }
423    }
424
425    pub fn connection(&self) -> &ConnectionManager {
426        &self.connection
427    }
428
429    pub fn store(&self) -> &SharedStore {
430        &self.store
431    }
432
433    pub fn initial_data_timeout(&self) -> Duration {
434        self.initial_data_timeout
435    }
436
437    /// Create a view handle.
438    pub fn view<T>(&self, view_path: &str) -> ViewHandle<T>
439    where
440        T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
441    {
442        ViewHandle {
443            connection: self.connection.clone(),
444            store: self.store.clone(),
445            view_path: view_path.to_string(),
446            initial_data_timeout: self.initial_data_timeout,
447            _marker: PhantomData,
448        }
449    }
450}
451
452/// Trait for generated view accessor structs.
453pub trait Views: Sized + Send + Sync + 'static {
454    fn from_builder(builder: ViewBuilder) -> Self;
455}
456
457/// A state view handle that requires a key for access.
458pub struct StateView<T> {
459    connection: ConnectionManager,
460    store: SharedStore,
461    view_path: String,
462    initial_data_timeout: Duration,
463    _marker: PhantomData<T>,
464}
465
466impl<T> StateView<T>
467where
468    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
469{
470    pub fn new(
471        connection: ConnectionManager,
472        store: SharedStore,
473        view_path: String,
474        initial_data_timeout: Duration,
475    ) -> Self {
476        Self {
477            connection,
478            store,
479            view_path,
480            initial_data_timeout,
481            _marker: PhantomData,
482        }
483    }
484
485    /// Get an entity by key.
486    pub async fn get(&self, key: &str) -> Option<T> {
487        self.connection
488            .ensure_subscription(&self.view_path, Some(key))
489            .await;
490        self.store
491            .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
492            .await;
493        self.store.get::<T>(&self.view_path, key).await
494    }
495
496    /// Synchronously get an entity from cached data.
497    pub fn get_sync(&self, key: &str) -> Option<T> {
498        self.store.get_sync::<T>(&self.view_path, key)
499    }
500
501    /// Stream merged entity values directly (simplest API - filters out deletes).
502    pub fn listen(&self, key: &str) -> UseStream<T>
503    where
504        T: Unpin,
505    {
506        UseStream::new_lazy(
507            self.connection.clone(),
508            self.store.clone(),
509            self.view_path.clone(),
510            self.view_path.clone(),
511            KeyFilter::Single(key.to_string()),
512            Some(key.to_string()),
513        )
514    }
515
516    /// Watch for updates to a specific key.
517    pub fn watch(&self, key: &str) -> EntityStream<T> {
518        EntityStream::new_lazy(
519            self.connection.clone(),
520            self.store.clone(),
521            self.view_path.clone(),
522            self.view_path.clone(),
523            KeyFilter::Single(key.to_string()),
524            Some(key.to_string()),
525        )
526    }
527
528    /// Watch for updates with before/after diffs.
529    pub fn watch_rich(&self, key: &str) -> RichEntityStream<T> {
530        RichEntityStream::new_lazy(
531            self.connection.clone(),
532            self.store.clone(),
533            self.view_path.clone(),
534            self.view_path.clone(),
535            KeyFilter::Single(key.to_string()),
536            Some(key.to_string()),
537        )
538    }
539}