hyperstack_sdk/
stream.rs

1use crate::connection::ConnectionManager;
2use crate::frame::Operation;
3use crate::store::{SharedStore, StoreUpdate};
4use futures_util::Stream;
5use pin_project_lite::pin_project;
6use serde::de::DeserializeOwned;
7use std::collections::HashSet;
8use std::future::Future;
9use std::marker::PhantomData;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use tokio::sync::broadcast;
13use tokio_stream::wrappers::BroadcastStream;
14
15#[derive(Debug, Clone)]
16pub enum Update<T> {
17    Upsert { key: String, data: T },
18    Patch { key: String, data: T },
19    Delete { key: String },
20}
21
22#[derive(Debug, Clone)]
23pub enum RichUpdate<T> {
24    Created {
25        key: String,
26        data: T,
27    },
28    Updated {
29        key: String,
30        before: T,
31        after: T,
32        patch: Option<serde_json::Value>,
33    },
34    Deleted {
35        key: String,
36        last_known: Option<T>,
37    },
38}
39
40impl<T> Update<T> {
41    pub fn key(&self) -> &str {
42        match self {
43            Update::Upsert { key, .. } => key,
44            Update::Patch { key, .. } => key,
45            Update::Delete { key } => key,
46        }
47    }
48
49    pub fn data(&self) -> Option<&T> {
50        match self {
51            Update::Upsert { data, .. } => Some(data),
52            Update::Patch { data, .. } => Some(data),
53            Update::Delete { .. } => None,
54        }
55    }
56
57    pub fn is_delete(&self) -> bool {
58        matches!(self, Update::Delete { .. })
59    }
60
61    pub fn into_data(self) -> Option<T> {
62        match self {
63            Update::Upsert { data, .. } => Some(data),
64            Update::Patch { data, .. } => Some(data),
65            Update::Delete { .. } => None,
66        }
67    }
68
69    pub fn has_data(&self) -> bool {
70        matches!(self, Update::Upsert { .. } | Update::Patch { .. })
71    }
72
73    pub fn into_key(self) -> String {
74        match self {
75            Update::Upsert { key, .. } => key,
76            Update::Patch { key, .. } => key,
77            Update::Delete { key } => key,
78        }
79    }
80
81    pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> Update<U> {
82        match self {
83            Update::Upsert { key, data } => Update::Upsert { key, data: f(data) },
84            Update::Patch { key, data } => Update::Patch { key, data: f(data) },
85            Update::Delete { key } => Update::Delete { key },
86        }
87    }
88}
89
90impl<T> RichUpdate<T> {
91    pub fn key(&self) -> &str {
92        match self {
93            RichUpdate::Created { key, .. } => key,
94            RichUpdate::Updated { key, .. } => key,
95            RichUpdate::Deleted { key, .. } => key,
96        }
97    }
98
99    pub fn data(&self) -> Option<&T> {
100        match self {
101            RichUpdate::Created { data, .. } => Some(data),
102            RichUpdate::Updated { after, .. } => Some(after),
103            RichUpdate::Deleted { last_known, .. } => last_known.as_ref(),
104        }
105    }
106
107    pub fn before(&self) -> Option<&T> {
108        match self {
109            RichUpdate::Created { .. } => None,
110            RichUpdate::Updated { before, .. } => Some(before),
111            RichUpdate::Deleted { last_known, .. } => last_known.as_ref(),
112        }
113    }
114
115    pub fn into_data(self) -> Option<T> {
116        match self {
117            RichUpdate::Created { data, .. } => Some(data),
118            RichUpdate::Updated { after, .. } => Some(after),
119            RichUpdate::Deleted { last_known, .. } => last_known,
120        }
121    }
122
123    pub fn is_created(&self) -> bool {
124        matches!(self, RichUpdate::Created { .. })
125    }
126
127    pub fn is_updated(&self) -> bool {
128        matches!(self, RichUpdate::Updated { .. })
129    }
130
131    pub fn is_deleted(&self) -> bool {
132        matches!(self, RichUpdate::Deleted { .. })
133    }
134
135    pub fn patch(&self) -> Option<&serde_json::Value> {
136        match self {
137            RichUpdate::Updated { patch, .. } => patch.as_ref(),
138            _ => None,
139        }
140    }
141
142    pub fn has_patch_field(&self, field: &str) -> bool {
143        self.patch()
144            .and_then(|p| p.as_object())
145            .map(|obj| obj.contains_key(field))
146            .unwrap_or(false)
147    }
148}
149
150#[derive(Clone)]
151pub enum KeyFilter {
152    None,
153    Single(String),
154    Multiple(HashSet<String>),
155}
156
157impl KeyFilter {
158    fn matches(&self, key: &str) -> bool {
159        match self {
160            KeyFilter::None => true,
161            KeyFilter::Single(k) => k == key,
162            KeyFilter::Multiple(keys) => keys.contains(key),
163        }
164    }
165}
166
167pub struct EntityStream<T> {
168    state: EntityStreamState<T>,
169    view: String,
170    key_filter: KeyFilter,
171    _marker: PhantomData<T>,
172}
173
174enum EntityStreamState<T> {
175    Lazy {
176        connection: ConnectionManager,
177        store: SharedStore,
178        subscription_view: String,
179        subscription_key: Option<String>,
180    },
181    Active {
182        inner: BroadcastStream<StoreUpdate>,
183    },
184    Subscribing {
185        fut: Pin<Box<dyn Future<Output = ()> + Send>>,
186        store: SharedStore,
187    },
188    Invalid,
189    _Phantom(PhantomData<T>),
190}
191
192impl<T: DeserializeOwned + Clone + Send + 'static> EntityStream<T> {
193    pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
194        Self {
195            state: EntityStreamState::Active {
196                inner: BroadcastStream::new(rx),
197            },
198            view,
199            key_filter: KeyFilter::None,
200            _marker: PhantomData,
201        }
202    }
203
204    pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
205        Self {
206            state: EntityStreamState::Active {
207                inner: BroadcastStream::new(rx),
208            },
209            view,
210            key_filter: KeyFilter::Single(key),
211            _marker: PhantomData,
212        }
213    }
214
215    pub fn new_multi_filtered(
216        rx: broadcast::Receiver<StoreUpdate>,
217        view: String,
218        keys: HashSet<String>,
219    ) -> Self {
220        Self {
221            state: EntityStreamState::Active {
222                inner: BroadcastStream::new(rx),
223            },
224            view,
225            key_filter: KeyFilter::Multiple(keys),
226            _marker: PhantomData,
227        }
228    }
229
230    pub fn new_lazy(
231        connection: ConnectionManager,
232        store: SharedStore,
233        entity_name: String,
234        subscription_view: String,
235        key_filter: KeyFilter,
236        subscription_key: Option<String>,
237    ) -> Self {
238        Self {
239            state: EntityStreamState::Lazy {
240                connection,
241                store,
242                subscription_view,
243                subscription_key,
244            },
245            view: entity_name,
246            key_filter,
247            _marker: PhantomData,
248        }
249    }
250
251    pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, Update<T>, F>
252    where
253        F: FnMut(&Update<T>) -> bool,
254    {
255        FilteredStream::new(self, predicate)
256    }
257
258    pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, Update<T>, U, F>
259    where
260        F: FnMut(Update<T>) -> Option<U>,
261    {
262        FilterMapStream::new(self, f)
263    }
264
265    pub fn map<U, F>(self, f: F) -> MapStream<Self, Update<T>, U, F>
266    where
267        F: FnMut(Update<T>) -> U,
268    {
269        MapStream::new(self, f)
270    }
271}
272
273impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for EntityStream<T> {
274    type Item = Update<T>;
275
276    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
277        let this = self.get_mut();
278
279        loop {
280            match &mut this.state {
281                EntityStreamState::Lazy { .. } => {
282                    let EntityStreamState::Lazy {
283                        connection,
284                        store,
285                        subscription_view,
286                        subscription_key,
287                    } = std::mem::replace(&mut this.state, EntityStreamState::Invalid)
288                    else {
289                        unreachable!()
290                    };
291
292                    let conn = connection.clone();
293                    let view = subscription_view.clone();
294                    let key = subscription_key.clone();
295                    let fut = Box::pin(async move {
296                        conn.ensure_subscription(&view, key.as_deref()).await;
297                    });
298
299                    this.state = EntityStreamState::Subscribing { fut, store };
300                    continue;
301                }
302                EntityStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
303                    Poll::Ready(()) => {
304                        let EntityStreamState::Subscribing { store, .. } =
305                            std::mem::replace(&mut this.state, EntityStreamState::Invalid)
306                        else {
307                            unreachable!()
308                        };
309                        this.state = EntityStreamState::Active {
310                            inner: BroadcastStream::new(store.subscribe()),
311                        };
312                        continue;
313                    }
314                    Poll::Pending => return Poll::Pending,
315                },
316                EntityStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
317                    Poll::Ready(Some(Ok(update))) => {
318                        if update.view != this.view {
319                            continue;
320                        }
321
322                        if !this.key_filter.matches(&update.key) {
323                            continue;
324                        }
325
326                        match update.operation {
327                            Operation::Delete => {
328                                return Poll::Ready(Some(Update::Delete { key: update.key }));
329                            }
330                            Operation::Upsert | Operation::Create | Operation::Snapshot => {
331                                if let Some(data) = update.data {
332                                    if let Ok(typed) = serde_json::from_value::<T>(data) {
333                                        return Poll::Ready(Some(Update::Upsert {
334                                            key: update.key,
335                                            data: typed,
336                                        }));
337                                    }
338                                }
339                            }
340                            Operation::Patch => {
341                                if let Some(data) = update.data {
342                                    match serde_json::from_value::<T>(data) {
343                                        Ok(typed) => {
344                                            return Poll::Ready(Some(Update::Patch {
345                                                key: update.key,
346                                                data: typed,
347                                            }));
348                                        }
349                                        Err(e) => {
350                                            tracing::warn!(
351                                                key = %update.key,
352                                                error = %e,
353                                                "Patch failed to deserialize to full type, skipping"
354                                            );
355                                            continue;
356                                        }
357                                    }
358                                }
359                            }
360                        }
361                    }
362                    Poll::Ready(Some(Err(_lagged))) => {
363                        tracing::warn!("EntityStream lagged behind, some messages were dropped");
364                        continue;
365                    }
366                    Poll::Ready(None) => {
367                        return Poll::Ready(None);
368                    }
369                    Poll::Pending => {
370                        return Poll::Pending;
371                    }
372                },
373                EntityStreamState::Invalid => {
374                    panic!("EntityStream in invalid state");
375                }
376                EntityStreamState::_Phantom(_) => unreachable!(),
377            }
378        }
379    }
380}
381
382pub struct RichEntityStream<T> {
383    state: RichEntityStreamState<T>,
384    view: String,
385    key_filter: KeyFilter,
386    _marker: PhantomData<T>,
387}
388
389enum RichEntityStreamState<T> {
390    Lazy {
391        connection: ConnectionManager,
392        store: SharedStore,
393        subscription_view: String,
394        subscription_key: Option<String>,
395    },
396    Active {
397        inner: BroadcastStream<StoreUpdate>,
398    },
399    Subscribing {
400        fut: Pin<Box<dyn Future<Output = ()> + Send>>,
401        store: SharedStore,
402    },
403    Invalid,
404    _Phantom(PhantomData<T>),
405}
406
407impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
408    pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
409        Self {
410            state: RichEntityStreamState::Active {
411                inner: BroadcastStream::new(rx),
412            },
413            view,
414            key_filter: KeyFilter::None,
415            _marker: PhantomData,
416        }
417    }
418
419    pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
420        Self {
421            state: RichEntityStreamState::Active {
422                inner: BroadcastStream::new(rx),
423            },
424            view,
425            key_filter: KeyFilter::Single(key),
426            _marker: PhantomData,
427        }
428    }
429
430    pub fn new_lazy(
431        connection: ConnectionManager,
432        store: SharedStore,
433        entity_name: String,
434        subscription_view: String,
435        key_filter: KeyFilter,
436        subscription_key: Option<String>,
437    ) -> Self {
438        Self {
439            state: RichEntityStreamState::Lazy {
440                connection,
441                store,
442                subscription_view,
443                subscription_key,
444            },
445            view: entity_name,
446            key_filter,
447            _marker: PhantomData,
448        }
449    }
450}
451
452impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for RichEntityStream<T> {
453    type Item = RichUpdate<T>;
454
455    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
456        let this = self.get_mut();
457
458        loop {
459            match &mut this.state {
460                RichEntityStreamState::Lazy { .. } => {
461                    let RichEntityStreamState::Lazy {
462                        connection,
463                        store,
464                        subscription_view,
465                        subscription_key,
466                    } = std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
467                    else {
468                        unreachable!()
469                    };
470
471                    let conn = connection.clone();
472                    let view = subscription_view.clone();
473                    let key = subscription_key.clone();
474                    let fut = Box::pin(async move {
475                        conn.ensure_subscription(&view, key.as_deref()).await;
476                    });
477
478                    this.state = RichEntityStreamState::Subscribing { fut, store };
479                    continue;
480                }
481                RichEntityStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
482                    Poll::Ready(()) => {
483                        let RichEntityStreamState::Subscribing { store, .. } =
484                            std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
485                        else {
486                            unreachable!()
487                        };
488                        this.state = RichEntityStreamState::Active {
489                            inner: BroadcastStream::new(store.subscribe()),
490                        };
491                        continue;
492                    }
493                    Poll::Pending => return Poll::Pending,
494                },
495                RichEntityStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
496                    Poll::Ready(Some(Ok(update))) => {
497                        if update.view != this.view {
498                            continue;
499                        }
500
501                        if !this.key_filter.matches(&update.key) {
502                            continue;
503                        }
504
505                        let previous: Option<T> =
506                            update.previous.and_then(|v| serde_json::from_value(v).ok());
507
508                        match update.operation {
509                            Operation::Delete => {
510                                return Poll::Ready(Some(RichUpdate::Deleted {
511                                    key: update.key,
512                                    last_known: previous,
513                                }));
514                            }
515                            Operation::Create | Operation::Snapshot => {
516                                if let Some(data) = update.data {
517                                    if let Ok(typed) = serde_json::from_value::<T>(data) {
518                                        return Poll::Ready(Some(RichUpdate::Created {
519                                            key: update.key,
520                                            data: typed,
521                                        }));
522                                    }
523                                }
524                            }
525                            Operation::Upsert | Operation::Patch => {
526                                if let Some(data) = update.data {
527                                    match serde_json::from_value::<T>(data.clone()) {
528                                        Ok(after) => {
529                                            if let Some(before) = previous {
530                                                return Poll::Ready(Some(RichUpdate::Updated {
531                                                    key: update.key,
532                                                    before,
533                                                    after,
534                                                    patch: update.patch,
535                                                }));
536                                            } else {
537                                                return Poll::Ready(Some(RichUpdate::Created {
538                                                    key: update.key,
539                                                    data: after,
540                                                }));
541                                            }
542                                        }
543                                        Err(e) => {
544                                            tracing::warn!(
545                                                key = %update.key,
546                                                error = %e,
547                                                "Update failed to deserialize, skipping"
548                                            );
549                                            continue;
550                                        }
551                                    }
552                                }
553                            }
554                        }
555                    }
556                    Poll::Ready(Some(Err(_lagged))) => {
557                        tracing::warn!(
558                            "RichEntityStream lagged behind, some messages were dropped"
559                        );
560                        continue;
561                    }
562                    Poll::Ready(None) => {
563                        return Poll::Ready(None);
564                    }
565                    Poll::Pending => {
566                        return Poll::Pending;
567                    }
568                },
569                RichEntityStreamState::Invalid => {
570                    panic!("RichEntityStream in invalid state");
571                }
572                RichEntityStreamState::_Phantom(_) => unreachable!(),
573            }
574        }
575    }
576}
577
578impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
579    pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, RichUpdate<T>, F>
580    where
581        F: FnMut(&RichUpdate<T>) -> bool,
582    {
583        FilteredStream::new(self, predicate)
584    }
585
586    pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, RichUpdate<T>, U, F>
587    where
588        F: FnMut(RichUpdate<T>) -> Option<U>,
589    {
590        FilterMapStream::new(self, f)
591    }
592
593    pub fn map<U, F>(self, f: F) -> MapStream<Self, RichUpdate<T>, U, F>
594    where
595        F: FnMut(RichUpdate<T>) -> U,
596    {
597        MapStream::new(self, f)
598    }
599}
600
601pin_project! {
602    pub struct FilteredStream<S, I, F> {
603        #[pin]
604        inner: S,
605        predicate: F,
606        _item: PhantomData<I>,
607    }
608}
609
610impl<S, I, F> FilteredStream<S, I, F> {
611    pub fn new(inner: S, predicate: F) -> Self {
612        Self {
613            inner,
614            predicate,
615            _item: PhantomData,
616        }
617    }
618}
619
620impl<S, I, F> Stream for FilteredStream<S, I, F>
621where
622    S: Stream<Item = I>,
623    F: FnMut(&I) -> bool,
624{
625    type Item = I;
626
627    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
628        let mut this = self.project();
629        loop {
630            match this.inner.as_mut().poll_next(cx) {
631                Poll::Ready(Some(item)) => {
632                    if (this.predicate)(&item) {
633                        return Poll::Ready(Some(item));
634                    }
635                }
636                Poll::Ready(None) => return Poll::Ready(None),
637                Poll::Pending => return Poll::Pending,
638            }
639        }
640    }
641}
642
643impl<S, I, F> FilteredStream<S, I, F>
644where
645    S: Stream<Item = I>,
646    F: FnMut(&I) -> bool,
647{
648    pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, I, F2>
649    where
650        F2: FnMut(&I) -> bool,
651    {
652        FilteredStream::new(self, predicate)
653    }
654
655    pub fn filter_map<U, F2>(self, f: F2) -> FilterMapStream<Self, I, U, F2>
656    where
657        F2: FnMut(I) -> Option<U>,
658    {
659        FilterMapStream::new(self, f)
660    }
661
662    pub fn map<U, F2>(self, f: F2) -> MapStream<Self, I, U, F2>
663    where
664        F2: FnMut(I) -> U,
665    {
666        MapStream::new(self, f)
667    }
668}
669
670pin_project! {
671    pub struct FilterMapStream<S, I, U, F> {
672        #[pin]
673        inner: S,
674        f: F,
675        _item: PhantomData<(I, U)>,
676    }
677}
678
679impl<S, I, U, F> FilterMapStream<S, I, U, F> {
680    pub fn new(inner: S, f: F) -> Self {
681        Self {
682            inner,
683            f,
684            _item: PhantomData,
685        }
686    }
687}
688
689impl<S, I, U, F> Stream for FilterMapStream<S, I, U, F>
690where
691    S: Stream<Item = I>,
692    F: FnMut(I) -> Option<U>,
693{
694    type Item = U;
695
696    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
697        let mut this = self.project();
698        loop {
699            match this.inner.as_mut().poll_next(cx) {
700                Poll::Ready(Some(item)) => {
701                    if let Some(mapped) = (this.f)(item) {
702                        return Poll::Ready(Some(mapped));
703                    }
704                }
705                Poll::Ready(None) => return Poll::Ready(None),
706                Poll::Pending => return Poll::Pending,
707            }
708        }
709    }
710}
711
712impl<S, I, U, F> FilterMapStream<S, I, U, F>
713where
714    S: Stream<Item = I>,
715    F: FnMut(I) -> Option<U>,
716{
717    pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, U, F2>
718    where
719        F2: FnMut(&U) -> bool,
720    {
721        FilteredStream::new(self, predicate)
722    }
723
724    pub fn filter_map<V, F2>(self, f: F2) -> FilterMapStream<Self, U, V, F2>
725    where
726        F2: FnMut(U) -> Option<V>,
727    {
728        FilterMapStream::new(self, f)
729    }
730
731    pub fn map<V, F2>(self, f: F2) -> MapStream<Self, U, V, F2>
732    where
733        F2: FnMut(U) -> V,
734    {
735        MapStream::new(self, f)
736    }
737}
738
739pin_project! {
740    pub struct MapStream<S, I, U, F> {
741        #[pin]
742        inner: S,
743        f: F,
744        _item: PhantomData<(I, U)>,
745    }
746}
747
748impl<S, I, U, F> MapStream<S, I, U, F> {
749    pub fn new(inner: S, f: F) -> Self {
750        Self {
751            inner,
752            f,
753            _item: PhantomData,
754        }
755    }
756}
757
758impl<S, I, U, F> Stream for MapStream<S, I, U, F>
759where
760    S: Stream<Item = I>,
761    F: FnMut(I) -> U,
762{
763    type Item = U;
764
765    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
766        let this = self.project();
767        match this.inner.poll_next(cx) {
768            Poll::Ready(Some(item)) => Poll::Ready(Some((this.f)(item))),
769            Poll::Ready(None) => Poll::Ready(None),
770            Poll::Pending => Poll::Pending,
771        }
772    }
773}
774
775impl<S, I, U, F> MapStream<S, I, U, F>
776where
777    S: Stream<Item = I>,
778    F: FnMut(I) -> U,
779{
780    pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, U, F2>
781    where
782        F2: FnMut(&U) -> bool,
783    {
784        FilteredStream::new(self, predicate)
785    }
786
787    pub fn filter_map<V, F2>(self, f: F2) -> FilterMapStream<Self, U, V, F2>
788    where
789        F2: FnMut(U) -> Option<V>,
790    {
791        FilterMapStream::new(self, f)
792    }
793
794    pub fn map<V, F2>(self, f: F2) -> MapStream<Self, U, V, F2>
795    where
796        F2: FnMut(U) -> V,
797    {
798        MapStream::new(self, f)
799    }
800}