Skip to main content

hyperstack_sdk/
view.rs

1//! View abstractions for unified access to views.
2//!
3//! All views return collections (Vec<T>). Use `.first()` on the result
4//! if you need a single item.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use hyperstack_sdk::prelude::*;
10//! use my_stack::OreRoundViews;
11//!
12//! let hs = HyperStack::connect("wss://example.com").await?;
13//!
14//! // Access views through the generated views struct
15//! let views = OreRoundViews::new(&hs);
16//!
17//! // Get latest round - use .first() for single item
18//! let latest = views.latest().get().await.first().cloned();
19//!
20//! // List all rounds
21//! let rounds = views.list().get().await;
22//!
23//! // Get specific round by key
24//! let round = views.state().get("round_key").await;
25//!
26//! // Watch for updates
27//! let mut stream = views.latest().watch();
28//! while let Some(update) = stream.next().await {
29//!     println!("Latest round updated: {:?}", update);
30//! }
31//! ```
32
33use crate::connection::ConnectionManager;
34use crate::store::SharedStore;
35use crate::stream::{EntityStream, KeyFilter, RichEntityStream, Update, UseStream};
36use futures_util::Stream;
37use serde::de::DeserializeOwned;
38use serde::Serialize;
39use std::collections::HashMap;
40use std::marker::PhantomData;
41use std::pin::Pin;
42use std::task::{Context, Poll};
43use std::time::Duration;
44
45/// A handle to a view that provides get/watch operations.
46///
47/// All views return collections (Vec<T>). Use `.first()` on the result
48/// if you need a single item from views with a `take` limit.
49pub struct ViewHandle<T> {
50    connection: ConnectionManager,
51    store: SharedStore,
52    view_path: String,
53    initial_data_timeout: Duration,
54    _marker: PhantomData<T>,
55}
56
57impl<T> ViewHandle<T>
58where
59    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
60{
61    /// Get all items from this view.
62    ///
63    /// For views with a `take` limit defined in the stack, this returns
64    /// up to that many items. Use `.first()` on the result if you need
65    /// a single item.
66    pub async fn get(&self) -> Vec<T> {
67        self.connection
68            .ensure_subscription(&self.view_path, None)
69            .await;
70        self.store
71            .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
72            .await;
73        self.store.list::<T>(&self.view_path).await
74    }
75
76    /// Synchronously get all items from cached data.
77    ///
78    /// Returns cached data immediately without waiting for subscription.
79    /// Returns empty vector if data not yet loaded or lock unavailable.
80    pub fn get_sync(&self) -> Vec<T> {
81        self.store.list_sync::<T>(&self.view_path)
82    }
83
84    /// Stream merged entities directly (simplest API - filters out deletes).
85    ///
86    /// Emits `T` after each change. Patches are merged to give full entity state.
87    /// Deletes are filtered out. Use `.watch()` if you need delete notifications.
88    pub fn listen(&self) -> UseBuilder<T>
89    where
90        T: Unpin,
91    {
92        UseBuilder::new(
93            self.connection.clone(),
94            self.store.clone(),
95            self.view_path.clone(),
96            KeyFilter::None,
97        )
98    }
99
100    /// Watch for updates to this view. Chain `.take(n)` to limit results.
101    pub fn watch(&self) -> WatchBuilder<T>
102    where
103        T: Unpin,
104    {
105        WatchBuilder::new(
106            self.connection.clone(),
107            self.store.clone(),
108            self.view_path.clone(),
109            KeyFilter::None,
110        )
111    }
112
113    /// Watch for updates with before/after diffs.
114    pub fn watch_rich(&self) -> RichWatchBuilder<T>
115    where
116        T: Unpin,
117    {
118        RichWatchBuilder::new(
119            self.connection.clone(),
120            self.store.clone(),
121            self.view_path.clone(),
122            KeyFilter::None,
123        )
124    }
125
126    /// Watch for updates filtered to specific keys.
127    pub fn watch_keys(&self, keys: &[&str]) -> WatchBuilder<T>
128    where
129        T: Unpin,
130    {
131        WatchBuilder::new(
132            self.connection.clone(),
133            self.store.clone(),
134            self.view_path.clone(),
135            KeyFilter::Multiple(keys.iter().map(|s| s.to_string()).collect()),
136        )
137    }
138}
139
140/// Builder for `.use()` subscriptions that emit `T` directly. Implements `Stream`.
141pub struct UseBuilder<T>
142where
143    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
144{
145    connection: ConnectionManager,
146    store: SharedStore,
147    view_path: String,
148    key_filter: KeyFilter,
149    take: Option<u32>,
150    skip: Option<u32>,
151    filters: Option<HashMap<String, String>>,
152    stream: Option<UseStream<T>>,
153}
154
155impl<T> UseBuilder<T>
156where
157    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
158{
159    fn new(
160        connection: ConnectionManager,
161        store: SharedStore,
162        view_path: String,
163        key_filter: KeyFilter,
164    ) -> Self {
165        Self {
166            connection,
167            store,
168            view_path,
169            key_filter,
170            take: None,
171            skip: None,
172            filters: None,
173            stream: None,
174        }
175    }
176
177    /// Limit subscription to the top N items.
178    pub fn take(mut self, n: u32) -> Self {
179        self.take = Some(n);
180        self
181    }
182
183    /// Skip the first N items.
184    pub fn skip(mut self, n: u32) -> Self {
185        self.skip = Some(n);
186        self
187    }
188
189    /// Add a server-side filter.
190    pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
191        self.filters
192            .get_or_insert_with(HashMap::new)
193            .insert(key.into(), value.into());
194        self
195    }
196}
197
198impl<T> Stream for UseBuilder<T>
199where
200    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
201{
202    type Item = T;
203
204    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
205        let this = self.get_mut();
206
207        if this.stream.is_none() {
208            this.stream = Some(UseStream::new_lazy_with_opts(
209                this.connection.clone(),
210                this.store.clone(),
211                this.view_path.clone(),
212                this.view_path.clone(),
213                this.key_filter.clone(),
214                None,
215                this.take,
216                this.skip,
217            ));
218        }
219
220        Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
221    }
222}
223
224/// Builder for configuring watch subscriptions. Implements `Stream` directly.
225pub struct WatchBuilder<T>
226where
227    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
228{
229    connection: ConnectionManager,
230    store: SharedStore,
231    view_path: String,
232    key_filter: KeyFilter,
233    take: Option<u32>,
234    skip: Option<u32>,
235    filters: Option<HashMap<String, String>>,
236    stream: Option<EntityStream<T>>,
237}
238
239impl<T> WatchBuilder<T>
240where
241    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
242{
243    fn new(
244        connection: ConnectionManager,
245        store: SharedStore,
246        view_path: String,
247        key_filter: KeyFilter,
248    ) -> Self {
249        Self {
250            connection,
251            store,
252            view_path,
253            key_filter,
254            take: None,
255            skip: None,
256            filters: None,
257            stream: None,
258        }
259    }
260
261    /// Limit subscription to the top N items.
262    pub fn take(mut self, n: u32) -> Self {
263        self.take = Some(n);
264        self
265    }
266
267    /// Skip the first N items.
268    pub fn skip(mut self, n: u32) -> Self {
269        self.skip = Some(n);
270        self
271    }
272
273    /// Add a server-side filter.
274    pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
275        self.filters
276            .get_or_insert_with(HashMap::new)
277            .insert(key.into(), value.into());
278        self
279    }
280
281    /// Get a rich stream with before/after diffs instead.
282    pub fn rich(self) -> RichEntityStream<T> {
283        RichEntityStream::new_lazy_with_opts(
284            self.connection,
285            self.store,
286            self.view_path.clone(),
287            self.view_path,
288            self.key_filter,
289            None,
290            self.take,
291            self.skip,
292        )
293    }
294}
295
296impl<T> Stream for WatchBuilder<T>
297where
298    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
299{
300    type Item = Update<T>;
301
302    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
303        let this = self.get_mut();
304
305        if this.stream.is_none() {
306            this.stream = Some(EntityStream::new_lazy_with_opts(
307                this.connection.clone(),
308                this.store.clone(),
309                this.view_path.clone(),
310                this.view_path.clone(),
311                this.key_filter.clone(),
312                None,
313                this.take,
314                this.skip,
315            ));
316        }
317
318        Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
319    }
320}
321
322/// Builder for rich watch subscriptions with before/after diffs.
323pub struct RichWatchBuilder<T>
324where
325    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
326{
327    connection: ConnectionManager,
328    store: SharedStore,
329    view_path: String,
330    key_filter: KeyFilter,
331    take: Option<u32>,
332    skip: Option<u32>,
333    filters: Option<HashMap<String, String>>,
334    stream: Option<RichEntityStream<T>>,
335}
336
337impl<T> RichWatchBuilder<T>
338where
339    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
340{
341    fn new(
342        connection: ConnectionManager,
343        store: SharedStore,
344        view_path: String,
345        key_filter: KeyFilter,
346    ) -> Self {
347        Self {
348            connection,
349            store,
350            view_path,
351            key_filter,
352            take: None,
353            skip: None,
354            filters: None,
355            stream: None,
356        }
357    }
358
359    pub fn take(mut self, n: u32) -> Self {
360        self.take = Some(n);
361        self
362    }
363
364    pub fn skip(mut self, n: u32) -> Self {
365        self.skip = Some(n);
366        self
367    }
368
369    pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
370        self.filters
371            .get_or_insert_with(HashMap::new)
372            .insert(key.into(), value.into());
373        self
374    }
375}
376
377impl<T> Stream for RichWatchBuilder<T>
378where
379    T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
380{
381    type Item = crate::stream::RichUpdate<T>;
382
383    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
384        let this = self.get_mut();
385
386        if this.stream.is_none() {
387            this.stream = Some(RichEntityStream::new_lazy_with_opts(
388                this.connection.clone(),
389                this.store.clone(),
390                this.view_path.clone(),
391                this.view_path.clone(),
392                this.key_filter.clone(),
393                None,
394                this.take,
395                this.skip,
396            ));
397        }
398
399        Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
400    }
401}
402
403/// Builder for creating view handles.
404///
405/// This is used internally by generated code to create properly configured view handles.
406#[derive(Clone)]
407pub struct ViewBuilder {
408    connection: ConnectionManager,
409    store: SharedStore,
410    initial_data_timeout: Duration,
411}
412
413impl ViewBuilder {
414    pub fn new(
415        connection: ConnectionManager,
416        store: SharedStore,
417        initial_data_timeout: Duration,
418    ) -> Self {
419        Self {
420            connection,
421            store,
422            initial_data_timeout,
423        }
424    }
425
426    pub fn connection(&self) -> &ConnectionManager {
427        &self.connection
428    }
429
430    pub fn store(&self) -> &SharedStore {
431        &self.store
432    }
433
434    pub fn initial_data_timeout(&self) -> Duration {
435        self.initial_data_timeout
436    }
437
438    /// Create a view handle.
439    pub fn view<T>(&self, view_path: &str) -> ViewHandle<T>
440    where
441        T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
442    {
443        ViewHandle {
444            connection: self.connection.clone(),
445            store: self.store.clone(),
446            view_path: view_path.to_string(),
447            initial_data_timeout: self.initial_data_timeout,
448            _marker: PhantomData,
449        }
450    }
451}
452
453/// Trait for generated view accessor structs.
454pub trait Views: Sized + Send + Sync + 'static {
455    fn from_builder(builder: ViewBuilder) -> Self;
456}
457
458/// A state view handle that requires a key for access.
459pub struct StateView<T> {
460    connection: ConnectionManager,
461    store: SharedStore,
462    view_path: String,
463    initial_data_timeout: Duration,
464    _marker: PhantomData<T>,
465}
466
467impl<T> StateView<T>
468where
469    T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
470{
471    pub fn new(
472        connection: ConnectionManager,
473        store: SharedStore,
474        view_path: String,
475        initial_data_timeout: Duration,
476    ) -> Self {
477        Self {
478            connection,
479            store,
480            view_path,
481            initial_data_timeout,
482            _marker: PhantomData,
483        }
484    }
485
486    /// Get an entity by key.
487    pub async fn get(&self, key: &str) -> Option<T> {
488        self.connection
489            .ensure_subscription(&self.view_path, Some(key))
490            .await;
491        self.store
492            .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
493            .await;
494        self.store.get::<T>(&self.view_path, key).await
495    }
496
497    /// Synchronously get an entity from cached data.
498    pub fn get_sync(&self, key: &str) -> Option<T> {
499        self.store.get_sync::<T>(&self.view_path, key)
500    }
501
502    /// Stream merged entity values directly (simplest API - filters out deletes).
503    pub fn listen(&self, key: &str) -> UseStream<T>
504    where
505        T: Unpin,
506    {
507        UseStream::new_lazy(
508            self.connection.clone(),
509            self.store.clone(),
510            self.view_path.clone(),
511            self.view_path.clone(),
512            KeyFilter::Single(key.to_string()),
513            Some(key.to_string()),
514        )
515    }
516
517    /// Watch for updates to a specific key.
518    pub fn watch(&self, key: &str) -> EntityStream<T> {
519        EntityStream::new_lazy(
520            self.connection.clone(),
521            self.store.clone(),
522            self.view_path.clone(),
523            self.view_path.clone(),
524            KeyFilter::Single(key.to_string()),
525            Some(key.to_string()),
526        )
527    }
528
529    /// Watch for updates with before/after diffs.
530    pub fn watch_rich(&self, key: &str) -> RichEntityStream<T> {
531        RichEntityStream::new_lazy(
532            self.connection.clone(),
533            self.store.clone(),
534            self.view_path.clone(),
535            self.view_path.clone(),
536            KeyFilter::Single(key.to_string()),
537            Some(key.to_string()),
538        )
539    }
540}