Skip to main content

hyperstack_sdk/
view.rs

1//! View abstractions for unified access to views.
2//!
3//! All views return collections (Vec<T>). Use `.first()` on the result
4//! if you need a single item.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use hyperstack_sdk::prelude::*;
10//! use my_stack::OreRoundViews;
11//!
12//! let hs = HyperStack::connect("wss://example.com").await?;
13//!
14//! // Access views through the generated views struct
15//! let views = OreRoundViews::new(&hs);
16//!
17//! // Get latest round - use .first() for single item
18//! let latest = views.latest().get().await.first().cloned();
19//!
20//! // List all rounds
21//! let rounds = views.list().get().await;
22//!
23//! // Get specific round by key
24//! let round = views.state().get("round_key").await;
25//!
26//! // Watch for updates
27//! let mut stream = views.latest().watch();
28//! while let Some(update) = stream.next().await {
29//!     println!("Latest round updated: {:?}", update);
30//! }
31//! ```
32
33use crate::connection::ConnectionManager;
34use crate::store::SharedStore;
35use crate::stream::{EntityStream, KeyFilter, RichEntityStream, Update, UseStream};
36use futures_util::Stream;
37use serde::de::DeserializeOwned;
38use serde::Serialize;
39use std::collections::HashMap;
40use std::marker::PhantomData;
41use std::pin::Pin;
42use std::task::{Context, Poll};
43use std::time::Duration;
44
45/// A handle to a view that provides get/watch operations.
46///
47/// All views return collections (Vec<T>). Use `.first()` on the result
48/// if you need a single item from views with a `take` limit.
49pub struct ViewHandle<T> {
50    connection: ConnectionManager,
51    store: SharedStore,
52    view_path: String,
53    initial_data_timeout: Duration,
54    _marker: PhantomData<T>,
55}
56
57impl<T> ViewHandle<T>
58where
59    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
60{
61    /// Get all items from this view.
62    ///
63    /// For views with a `take` limit defined in the stack, this returns
64    /// up to that many items. Use `.first()` on the result if you need
65    /// a single item.
66    pub async fn get(&self) -> Vec<T> {
67        self.connection
68            .ensure_subscription(&self.view_path, None)
69            .await;
70        self.store
71            .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
72            .await;
73        self.store.list::<T>(&self.view_path).await
74    }
75
76    /// Synchronously get all items from cached data.
77    ///
78    /// Returns cached data immediately without waiting for subscription.
79    /// Returns empty vector if data not yet loaded or lock unavailable.
80    pub fn get_sync(&self) -> Vec<T> {
81        self.store.list_sync::<T>(&self.view_path)
82    }
83
84    /// Stream merged entities directly (simplest API - filters out deletes).
85    ///
86    /// Emits `T` after each change. Patches are merged to give full entity state.
87    /// Deletes are filtered out. Use `.watch()` if you need delete notifications.
88    pub fn listen(&self) -> UseBuilder<T>
89    where
90        T: Unpin,
91    {
92        UseBuilder::new(
93            self.connection.clone(),
94            self.store.clone(),
95            self.view_path.clone(),
96            KeyFilter::None,
97        )
98    }
99
100    /// Watch for updates to this view. Chain `.take(n)` to limit results.
101    pub fn watch(&self) -> WatchBuilder<T>
102    where
103        T: Unpin,
104    {
105        WatchBuilder::new(
106            self.connection.clone(),
107            self.store.clone(),
108            self.view_path.clone(),
109            KeyFilter::None,
110        )
111    }
112
113    /// Watch for updates with before/after diffs.
114    pub fn watch_rich(&self) -> RichWatchBuilder<T>
115    where
116        T: Unpin,
117    {
118        RichWatchBuilder::new(
119            self.connection.clone(),
120            self.store.clone(),
121            self.view_path.clone(),
122            KeyFilter::None,
123        )
124    }
125
126    /// Watch for updates filtered to specific keys.
127    pub fn watch_keys(&self, keys: &[&str]) -> WatchBuilder<T>
128    where
129        T: Unpin,
130    {
131        WatchBuilder::new(
132            self.connection.clone(),
133            self.store.clone(),
134            self.view_path.clone(),
135            KeyFilter::Multiple(keys.iter().map(|s| s.to_string()).collect()),
136        )
137    }
138}
139
140/// Builder for `.use()` subscriptions that emit `T` directly. Implements `Stream`.
141pub struct UseBuilder<T>
142where
143    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
144{
145    connection: ConnectionManager,
146    store: SharedStore,
147    view_path: String,
148    key_filter: KeyFilter,
149    take: Option<u32>,
150    skip: Option<u32>,
151    filters: Option<HashMap<String, String>>,
152    with_snapshot: Option<bool>,
153    after: Option<String>,
154    snapshot_limit: Option<usize>,
155    stream: Option<UseStream<T>>,
156}
157
158impl<T> UseBuilder<T>
159where
160    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
161{
162    fn new(
163        connection: ConnectionManager,
164        store: SharedStore,
165        view_path: String,
166        key_filter: KeyFilter,
167    ) -> Self {
168        Self {
169            connection,
170            store,
171            view_path,
172            key_filter,
173            take: None,
174            skip: None,
175            filters: None,
176            with_snapshot: None,
177            after: None,
178            snapshot_limit: None,
179            stream: None,
180        }
181    }
182
183    /// Limit subscription to the top N items.
184    pub fn take(mut self, n: u32) -> Self {
185        self.take = Some(n);
186        self
187    }
188
189    /// Skip the first N items.
190    pub fn skip(mut self, n: u32) -> Self {
191        self.skip = Some(n);
192        self
193    }
194
195    /// Add a server-side filter.
196    pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
197        self.filters
198            .get_or_insert_with(HashMap::new)
199            .insert(key.into(), value.into());
200        self
201    }
202
203    /// Set whether to include the initial snapshot (defaults to true).
204    pub fn with_snapshot(mut self, with_snapshot: bool) -> Self {
205        self.with_snapshot = Some(with_snapshot);
206        self
207    }
208
209    /// Set the cursor to resume from (for reconnecting and getting only newer data).
210    pub fn after(mut self, cursor: impl Into<String>) -> Self {
211        self.after = Some(cursor.into());
212        self
213    }
214
215    /// Set the maximum number of entities to include in the snapshot.
216    pub fn with_snapshot_limit(mut self, limit: usize) -> Self {
217        self.snapshot_limit = Some(limit);
218        self
219    }
220}
221
222impl<T> Stream for UseBuilder<T>
223where
224    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
225{
226    type Item = T;
227
228    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
229        let this = self.get_mut();
230
231        if this.stream.is_none() {
232            this.stream = Some(UseStream::new_lazy_with_opts(
233                this.connection.clone(),
234                this.store.clone(),
235                this.view_path.clone(),
236                this.view_path.clone(),
237                this.key_filter.clone(),
238                None,
239                this.take,
240                this.skip,
241                this.with_snapshot,
242                this.after.clone(),
243                this.snapshot_limit,
244            ));
245        }
246
247        Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
248    }
249}
250
251/// Builder for configuring watch subscriptions. Implements `Stream` directly.
252pub struct WatchBuilder<T>
253where
254    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
255{
256    connection: ConnectionManager,
257    store: SharedStore,
258    view_path: String,
259    key_filter: KeyFilter,
260    take: Option<u32>,
261    skip: Option<u32>,
262    filters: Option<HashMap<String, String>>,
263    with_snapshot: Option<bool>,
264    after: Option<String>,
265    snapshot_limit: Option<usize>,
266    stream: Option<EntityStream<T>>,
267}
268
269impl<T> WatchBuilder<T>
270where
271    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
272{
273    fn new(
274        connection: ConnectionManager,
275        store: SharedStore,
276        view_path: String,
277        key_filter: KeyFilter,
278    ) -> Self {
279        Self {
280            connection,
281            store,
282            view_path,
283            key_filter,
284            take: None,
285            skip: None,
286            filters: None,
287            with_snapshot: None,
288            after: None,
289            snapshot_limit: None,
290            stream: None,
291        }
292    }
293
294    /// Limit subscription to the top N items.
295    pub fn take(mut self, n: u32) -> Self {
296        self.take = Some(n);
297        self
298    }
299
300    /// Skip the first N items.
301    pub fn skip(mut self, n: u32) -> Self {
302        self.skip = Some(n);
303        self
304    }
305
306    /// Add a server-side filter.
307    pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
308        self.filters
309            .get_or_insert_with(HashMap::new)
310            .insert(key.into(), value.into());
311        self
312    }
313
314    /// Set whether to include the initial snapshot (defaults to true).
315    pub fn with_snapshot(mut self, with_snapshot: bool) -> Self {
316        self.with_snapshot = Some(with_snapshot);
317        self
318    }
319
320    /// Set the cursor to resume from (for reconnecting and getting only newer data).
321    pub fn after(mut self, cursor: impl Into<String>) -> Self {
322        self.after = Some(cursor.into());
323        self
324    }
325
326    /// Set the maximum number of entities to include in the snapshot.
327    pub fn with_snapshot_limit(mut self, limit: usize) -> Self {
328        self.snapshot_limit = Some(limit);
329        self
330    }
331
332    /// Get a rich stream with before/after diffs instead.
333    pub fn rich(self) -> RichEntityStream<T> {
334        RichEntityStream::new_lazy_with_opts(
335            self.connection,
336            self.store,
337            self.view_path.clone(),
338            self.view_path,
339            self.key_filter,
340            None,
341            self.take,
342            self.skip,
343            self.with_snapshot,
344            self.after,
345            self.snapshot_limit,
346        )
347    }
348}
349
350impl<T> Stream for WatchBuilder<T>
351where
352    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
353{
354    type Item = Update<T>;
355
356    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
357        let this = self.get_mut();
358
359        if this.stream.is_none() {
360            this.stream = Some(EntityStream::new_lazy_with_opts(
361                this.connection.clone(),
362                this.store.clone(),
363                this.view_path.clone(),
364                this.view_path.clone(),
365                this.key_filter.clone(),
366                None,
367                this.take,
368                this.skip,
369                this.with_snapshot,
370                this.after.clone(),
371                this.snapshot_limit,
372            ));
373        }
374
375        Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
376    }
377}
378
379/// Builder for rich watch subscriptions with before/after diffs.
380pub struct RichWatchBuilder<T>
381where
382    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
383{
384    connection: ConnectionManager,
385    store: SharedStore,
386    view_path: String,
387    key_filter: KeyFilter,
388    take: Option<u32>,
389    skip: Option<u32>,
390    filters: Option<HashMap<String, String>>,
391    with_snapshot: Option<bool>,
392    after: Option<String>,
393    snapshot_limit: Option<usize>,
394    stream: Option<RichEntityStream<T>>,
395}
396
397impl<T> RichWatchBuilder<T>
398where
399    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
400{
401    fn new(
402        connection: ConnectionManager,
403        store: SharedStore,
404        view_path: String,
405        key_filter: KeyFilter,
406    ) -> Self {
407        Self {
408            connection,
409            store,
410            view_path,
411            key_filter,
412            take: None,
413            skip: None,
414            filters: None,
415            with_snapshot: None,
416            after: None,
417            snapshot_limit: None,
418            stream: None,
419        }
420    }
421
422    pub fn take(mut self, n: u32) -> Self {
423        self.take = Some(n);
424        self
425    }
426
427    pub fn skip(mut self, n: u32) -> Self {
428        self.skip = Some(n);
429        self
430    }
431
432    pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
433        self.filters
434            .get_or_insert_with(HashMap::new)
435            .insert(key.into(), value.into());
436        self
437    }
438
439    /// Set whether to include the initial snapshot (defaults to true).
440    pub fn with_snapshot(mut self, with_snapshot: bool) -> Self {
441        self.with_snapshot = Some(with_snapshot);
442        self
443    }
444
445    /// Set the cursor to resume from (for reconnecting and getting only newer data).
446    pub fn after(mut self, cursor: impl Into<String>) -> Self {
447        self.after = Some(cursor.into());
448        self
449    }
450
451    /// Set the maximum number of entities to include in the snapshot.
452    pub fn with_snapshot_limit(mut self, limit: usize) -> Self {
453        self.snapshot_limit = Some(limit);
454        self
455    }
456}
457
458impl<T> Stream for RichWatchBuilder<T>
459where
460    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
461{
462    type Item = crate::stream::RichUpdate<T>;
463
464    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
465        let this = self.get_mut();
466
467        if this.stream.is_none() {
468            this.stream = Some(RichEntityStream::new_lazy_with_opts(
469                this.connection.clone(),
470                this.store.clone(),
471                this.view_path.clone(),
472                this.view_path.clone(),
473                this.key_filter.clone(),
474                None,
475                this.take,
476                this.skip,
477                this.with_snapshot,
478                this.after.clone(),
479                this.snapshot_limit,
480            ));
481        }
482
483        Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
484    }
485}
486
487/// Builder for creating view handles.
488///
489/// This is used internally by generated code to create properly configured view handles.
490#[derive(Clone)]
491pub struct ViewBuilder {
492    connection: ConnectionManager,
493    store: SharedStore,
494    initial_data_timeout: Duration,
495}
496
497impl ViewBuilder {
498    pub fn new(
499        connection: ConnectionManager,
500        store: SharedStore,
501        initial_data_timeout: Duration,
502    ) -> Self {
503        Self {
504            connection,
505            store,
506            initial_data_timeout,
507        }
508    }
509
510    pub fn connection(&self) -> &ConnectionManager {
511        &self.connection
512    }
513
514    pub fn store(&self) -> &SharedStore {
515        &self.store
516    }
517
518    pub fn initial_data_timeout(&self) -> Duration {
519        self.initial_data_timeout
520    }
521
522    /// Create a view handle.
523    pub fn view<T>(&self, view_path: &str) -> ViewHandle<T>
524    where
525        T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
526    {
527        ViewHandle {
528            connection: self.connection.clone(),
529            store: self.store.clone(),
530            view_path: view_path.to_string(),
531            initial_data_timeout: self.initial_data_timeout,
532            _marker: PhantomData,
533        }
534    }
535}
536
537/// Trait for generated view accessor structs.
538pub trait Views: Sized + Send + Sync + 'static {
539    fn from_builder(builder: ViewBuilder) -> Self;
540}
541
542/// A state view handle that requires a key for access.
543pub struct StateView<T> {
544    connection: ConnectionManager,
545    store: SharedStore,
546    view_path: String,
547    initial_data_timeout: Duration,
548    _marker: PhantomData<T>,
549}
550
551impl<T> StateView<T>
552where
553    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
554{
555    pub fn new(
556        connection: ConnectionManager,
557        store: SharedStore,
558        view_path: String,
559        initial_data_timeout: Duration,
560    ) -> Self {
561        Self {
562            connection,
563            store,
564            view_path,
565            initial_data_timeout,
566            _marker: PhantomData,
567        }
568    }
569
570    /// Get an entity by key.
571    pub async fn get(&self, key: &str) -> Option<T> {
572        self.connection
573            .ensure_subscription(&self.view_path, Some(key))
574            .await;
575        self.store
576            .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
577            .await;
578        self.store.get::<T>(&self.view_path, key).await
579    }
580
581    /// Synchronously get an entity from cached data.
582    pub fn get_sync(&self, key: &str) -> Option<T> {
583        self.store.get_sync::<T>(&self.view_path, key)
584    }
585
586    /// Stream merged entity values directly (simplest API - filters out deletes).
587    pub fn listen(&self, key: &str) -> UseStream<T>
588    where
589        T: Unpin,
590    {
591        UseStream::new_lazy(
592            self.connection.clone(),
593            self.store.clone(),
594            self.view_path.clone(),
595            self.view_path.clone(),
596            KeyFilter::Single(key.to_string()),
597            Some(key.to_string()),
598        )
599    }
600
601    /// Watch for updates to a specific key.
602    pub fn watch(&self, key: &str) -> EntityStream<T> {
603        EntityStream::new_lazy(
604            self.connection.clone(),
605            self.store.clone(),
606            self.view_path.clone(),
607            self.view_path.clone(),
608            KeyFilter::Single(key.to_string()),
609            Some(key.to_string()),
610        )
611    }
612
613    /// Watch for updates with before/after diffs.
614    pub fn watch_rich(&self, key: &str) -> RichEntityStream<T> {
615        RichEntityStream::new_lazy(
616            self.connection.clone(),
617            self.store.clone(),
618            self.view_path.clone(),
619            self.view_path.clone(),
620            KeyFilter::Single(key.to_string()),
621            Some(key.to_string()),
622        )
623    }
624}