Skip to main content

hyperstack_sdk/
stream.rs

1use crate::connection::{ConnectionManager, SubscriptionOptions};
2use crate::frame::Operation;
3use crate::store::{SharedStore, StoreUpdate};
4use futures_util::Stream;
5use pin_project_lite::pin_project;
6use serde::de::DeserializeOwned;
7use std::collections::HashSet;
8use std::future::Future;
9use std::marker::PhantomData;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use tokio::sync::broadcast;
13use tokio_stream::wrappers::BroadcastStream;
14
15#[derive(Debug, Clone)]
16pub enum Update<T> {
17    Upsert { key: String, data: T },
18    Patch { key: String, data: T },
19    Delete { key: String },
20}
21
22#[derive(Debug, Clone)]
23pub enum RichUpdate<T> {
24    Created {
25        key: String,
26        data: T,
27    },
28    Updated {
29        key: String,
30        before: T,
31        after: T,
32        patch: Option<serde_json::Value>,
33    },
34    Deleted {
35        key: String,
36        last_known: Option<T>,
37    },
38}
39
40impl<T> Update<T> {
41    pub fn key(&self) -> &str {
42        match self {
43            Update::Upsert { key, .. } => key,
44            Update::Patch { key, .. } => key,
45            Update::Delete { key } => key,
46        }
47    }
48
49    pub fn data(&self) -> Option<&T> {
50        match self {
51            Update::Upsert { data, .. } => Some(data),
52            Update::Patch { data, .. } => Some(data),
53            Update::Delete { .. } => None,
54        }
55    }
56
57    pub fn is_delete(&self) -> bool {
58        matches!(self, Update::Delete { .. })
59    }
60
61    pub fn into_data(self) -> Option<T> {
62        match self {
63            Update::Upsert { data, .. } => Some(data),
64            Update::Patch { data, .. } => Some(data),
65            Update::Delete { .. } => None,
66        }
67    }
68
69    pub fn has_data(&self) -> bool {
70        matches!(self, Update::Upsert { .. } | Update::Patch { .. })
71    }
72
73    pub fn into_key(self) -> String {
74        match self {
75            Update::Upsert { key, .. } => key,
76            Update::Patch { key, .. } => key,
77            Update::Delete { key } => key,
78        }
79    }
80
81    pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> Update<U> {
82        match self {
83            Update::Upsert { key, data } => Update::Upsert { key, data: f(data) },
84            Update::Patch { key, data } => Update::Patch { key, data: f(data) },
85            Update::Delete { key } => Update::Delete { key },
86        }
87    }
88}
89
90impl<T> RichUpdate<T> {
91    pub fn key(&self) -> &str {
92        match self {
93            RichUpdate::Created { key, .. } => key,
94            RichUpdate::Updated { key, .. } => key,
95            RichUpdate::Deleted { key, .. } => key,
96        }
97    }
98
99    pub fn data(&self) -> Option<&T> {
100        match self {
101            RichUpdate::Created { data, .. } => Some(data),
102            RichUpdate::Updated { after, .. } => Some(after),
103            RichUpdate::Deleted { last_known, .. } => last_known.as_ref(),
104        }
105    }
106
107    pub fn before(&self) -> Option<&T> {
108        match self {
109            RichUpdate::Created { .. } => None,
110            RichUpdate::Updated { before, .. } => Some(before),
111            RichUpdate::Deleted { last_known, .. } => last_known.as_ref(),
112        }
113    }
114
115    pub fn into_data(self) -> Option<T> {
116        match self {
117            RichUpdate::Created { data, .. } => Some(data),
118            RichUpdate::Updated { after, .. } => Some(after),
119            RichUpdate::Deleted { last_known, .. } => last_known,
120        }
121    }
122
123    pub fn is_created(&self) -> bool {
124        matches!(self, RichUpdate::Created { .. })
125    }
126
127    pub fn is_updated(&self) -> bool {
128        matches!(self, RichUpdate::Updated { .. })
129    }
130
131    pub fn is_deleted(&self) -> bool {
132        matches!(self, RichUpdate::Deleted { .. })
133    }
134
135    pub fn patch(&self) -> Option<&serde_json::Value> {
136        match self {
137            RichUpdate::Updated { patch, .. } => patch.as_ref(),
138            _ => None,
139        }
140    }
141
142    pub fn has_patch_field(&self, field: &str) -> bool {
143        self.patch()
144            .and_then(|p| p.as_object())
145            .map(|obj| obj.contains_key(field))
146            .unwrap_or(false)
147    }
148}
149
150#[derive(Clone)]
151pub enum KeyFilter {
152    None,
153    Single(String),
154    Multiple(HashSet<String>),
155}
156
157impl KeyFilter {
158    fn matches(&self, key: &str) -> bool {
159        match self {
160            KeyFilter::None => true,
161            KeyFilter::Single(k) => k == key,
162            KeyFilter::Multiple(keys) => keys.contains(key),
163        }
164    }
165}
166
167pub struct EntityStream<T> {
168    state: EntityStreamState<T>,
169    view: String,
170    key_filter: KeyFilter,
171    _marker: PhantomData<T>,
172}
173
174enum EntityStreamState<T> {
175    Lazy {
176        connection: ConnectionManager,
177        store: SharedStore,
178        subscription_view: String,
179        subscription_key: Option<String>,
180        take: Option<u32>,
181        skip: Option<u32>,
182        with_snapshot: Option<bool>,
183        after: Option<String>,
184        snapshot_limit: Option<usize>,
185    },
186    Active {
187        inner: BroadcastStream<StoreUpdate>,
188    },
189    Subscribing {
190        fut: Pin<Box<dyn Future<Output = ()> + Send>>,
191        inner: BroadcastStream<StoreUpdate>,
192    },
193    Invalid,
194    _Phantom(PhantomData<T>),
195}
196
197impl<T: DeserializeOwned + Clone + Send + 'static> EntityStream<T> {
198    pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
199        Self {
200            state: EntityStreamState::Active {
201                inner: BroadcastStream::new(rx),
202            },
203            view,
204            key_filter: KeyFilter::None,
205            _marker: PhantomData,
206        }
207    }
208
209    pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
210        Self {
211            state: EntityStreamState::Active {
212                inner: BroadcastStream::new(rx),
213            },
214            view,
215            key_filter: KeyFilter::Single(key),
216            _marker: PhantomData,
217        }
218    }
219
220    pub fn new_multi_filtered(
221        rx: broadcast::Receiver<StoreUpdate>,
222        view: String,
223        keys: HashSet<String>,
224    ) -> Self {
225        Self {
226            state: EntityStreamState::Active {
227                inner: BroadcastStream::new(rx),
228            },
229            view,
230            key_filter: KeyFilter::Multiple(keys),
231            _marker: PhantomData,
232        }
233    }
234
235    pub fn new_lazy(
236        connection: ConnectionManager,
237        store: SharedStore,
238        entity_name: String,
239        subscription_view: String,
240        key_filter: KeyFilter,
241        subscription_key: Option<String>,
242    ) -> Self {
243        Self::new_lazy_with_opts(
244            connection,
245            store,
246            entity_name,
247            subscription_view,
248            key_filter,
249            subscription_key,
250            None,
251            None,
252            None,
253            None,
254            None,
255        )
256    }
257
258    #[allow(clippy::too_many_arguments)]
259    pub fn new_lazy_with_opts(
260        connection: ConnectionManager,
261        store: SharedStore,
262        entity_name: String,
263        subscription_view: String,
264        key_filter: KeyFilter,
265        subscription_key: Option<String>,
266        take: Option<u32>,
267        skip: Option<u32>,
268        with_snapshot: Option<bool>,
269        after: Option<String>,
270        snapshot_limit: Option<usize>,
271    ) -> Self {
272        Self {
273            state: EntityStreamState::Lazy {
274                connection,
275                store,
276                subscription_view,
277                subscription_key,
278                take,
279                skip,
280                with_snapshot,
281                after,
282                snapshot_limit,
283            },
284            view: entity_name,
285            key_filter,
286            _marker: PhantomData,
287        }
288    }
289
290    pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, Update<T>, F>
291    where
292        F: FnMut(&Update<T>) -> bool,
293    {
294        FilteredStream::new(self, predicate)
295    }
296
297    pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, Update<T>, U, F>
298    where
299        F: FnMut(Update<T>) -> Option<U>,
300    {
301        FilterMapStream::new(self, f)
302    }
303
304    pub fn map<U, F>(self, f: F) -> MapStream<Self, Update<T>, U, F>
305    where
306        F: FnMut(Update<T>) -> U,
307    {
308        MapStream::new(self, f)
309    }
310}
311
312impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for EntityStream<T> {
313    type Item = Update<T>;
314
315    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
316        let this = self.get_mut();
317
318        loop {
319            match &mut this.state {
320                EntityStreamState::Lazy { .. } => {
321                    let EntityStreamState::Lazy {
322                        connection,
323                        store,
324                        subscription_view,
325                        subscription_key,
326                        take,
327                        skip,
328                        with_snapshot,
329                        after,
330                        snapshot_limit,
331                    } = std::mem::replace(&mut this.state, EntityStreamState::Invalid)
332                    else {
333                        unreachable!()
334                    };
335
336                    // Subscribe to broadcast BEFORE sending subscription to server
337                    // This ensures we don't miss any frames that arrive during setup
338                    let inner = BroadcastStream::new(store.subscribe());
339
340                    let conn = connection.clone();
341                    let view = subscription_view.clone();
342                    let key = subscription_key.clone();
343                    let fut = Box::pin(async move {
344                        let opts = SubscriptionOptions {
345                            take,
346                            skip,
347                            with_snapshot,
348                            after,
349                            snapshot_limit,
350                        };
351                        conn.ensure_subscription_with_opts(&view, key.as_deref(), opts)
352                            .await;
353                    });
354
355                    this.state = EntityStreamState::Subscribing { fut, inner };
356                    continue;
357                }
358                EntityStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
359                    Poll::Ready(()) => {
360                        let EntityStreamState::Subscribing { inner, .. } =
361                            std::mem::replace(&mut this.state, EntityStreamState::Invalid)
362                        else {
363                            unreachable!()
364                        };
365                        this.state = EntityStreamState::Active { inner };
366                        continue;
367                    }
368                    Poll::Pending => return Poll::Pending,
369                },
370                EntityStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
371                    Poll::Ready(Some(Ok(update))) => {
372                        if update.view != this.view {
373                            continue;
374                        }
375
376                        if !this.key_filter.matches(&update.key) {
377                            continue;
378                        }
379
380                        match update.operation {
381                            Operation::Delete => {
382                                return Poll::Ready(Some(Update::Delete { key: update.key }));
383                            }
384                            Operation::Upsert | Operation::Create | Operation::Snapshot => {
385                                if let Some(data) = update.data {
386                                    if let Ok(typed) = serde_json::from_value::<T>(data) {
387                                        return Poll::Ready(Some(Update::Upsert {
388                                            key: update.key,
389                                            data: typed,
390                                        }));
391                                    }
392                                }
393                            }
394                            Operation::Patch => {
395                                if let Some(data) = update.data {
396                                    match serde_json::from_value::<T>(data) {
397                                        Ok(typed) => {
398                                            return Poll::Ready(Some(Update::Patch {
399                                                key: update.key,
400                                                data: typed,
401                                            }));
402                                        }
403                                        Err(e) => {
404                                            tracing::warn!(
405                                                key = %update.key,
406                                                error = %e,
407                                                "Patch failed to deserialize to full type, skipping"
408                                            );
409                                            continue;
410                                        }
411                                    }
412                                }
413                            }
414                            Operation::Subscribed => {
415                                continue;
416                            }
417                        }
418                    }
419                    Poll::Ready(Some(Err(_lagged))) => {
420                        tracing::warn!("EntityStream lagged behind, some messages were dropped");
421                        continue;
422                    }
423                    Poll::Ready(None) => {
424                        return Poll::Ready(None);
425                    }
426                    Poll::Pending => {
427                        return Poll::Pending;
428                    }
429                },
430                EntityStreamState::Invalid => {
431                    panic!("EntityStream in invalid state");
432                }
433                EntityStreamState::_Phantom(_) => unreachable!(),
434            }
435        }
436    }
437}
438
439pub struct RichEntityStream<T> {
440    state: RichEntityStreamState<T>,
441    view: String,
442    key_filter: KeyFilter,
443    _marker: PhantomData<T>,
444}
445
446enum RichEntityStreamState<T> {
447    Lazy {
448        connection: ConnectionManager,
449        store: SharedStore,
450        subscription_view: String,
451        subscription_key: Option<String>,
452        take: Option<u32>,
453        skip: Option<u32>,
454        with_snapshot: Option<bool>,
455        after: Option<String>,
456        snapshot_limit: Option<usize>,
457    },
458    Active {
459        inner: BroadcastStream<StoreUpdate>,
460    },
461    Subscribing {
462        fut: Pin<Box<dyn Future<Output = ()> + Send>>,
463        inner: BroadcastStream<StoreUpdate>,
464    },
465    Invalid,
466    _Phantom(PhantomData<T>),
467}
468
469impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
470    pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
471        Self {
472            state: RichEntityStreamState::Active {
473                inner: BroadcastStream::new(rx),
474            },
475            view,
476            key_filter: KeyFilter::None,
477            _marker: PhantomData,
478        }
479    }
480
481    pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
482        Self {
483            state: RichEntityStreamState::Active {
484                inner: BroadcastStream::new(rx),
485            },
486            view,
487            key_filter: KeyFilter::Single(key),
488            _marker: PhantomData,
489        }
490    }
491
492    pub fn new_lazy(
493        connection: ConnectionManager,
494        store: SharedStore,
495        entity_name: String,
496        subscription_view: String,
497        key_filter: KeyFilter,
498        subscription_key: Option<String>,
499    ) -> Self {
500        Self::new_lazy_with_opts(
501            connection,
502            store,
503            entity_name,
504            subscription_view,
505            key_filter,
506            subscription_key,
507            None,
508            None,
509            None,
510            None,
511            None,
512        )
513    }
514
515    #[allow(clippy::too_many_arguments)]
516    pub fn new_lazy_with_opts(
517        connection: ConnectionManager,
518        store: SharedStore,
519        entity_name: String,
520        subscription_view: String,
521        key_filter: KeyFilter,
522        subscription_key: Option<String>,
523        take: Option<u32>,
524        skip: Option<u32>,
525        with_snapshot: Option<bool>,
526        after: Option<String>,
527        snapshot_limit: Option<usize>,
528    ) -> Self {
529        Self {
530            state: RichEntityStreamState::Lazy {
531                connection,
532                store,
533                subscription_view,
534                subscription_key,
535                take,
536                skip,
537                with_snapshot,
538                after,
539                snapshot_limit,
540            },
541            view: entity_name,
542            key_filter,
543            _marker: PhantomData,
544        }
545    }
546}
547
548impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for RichEntityStream<T> {
549    type Item = RichUpdate<T>;
550
551    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
552        let this = self.get_mut();
553
554        loop {
555            match &mut this.state {
556                RichEntityStreamState::Lazy { .. } => {
557                    let RichEntityStreamState::Lazy {
558                        connection,
559                        store,
560                        subscription_view,
561                        subscription_key,
562                        take,
563                        skip,
564                        with_snapshot,
565                        after,
566                        snapshot_limit,
567                    } = std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
568                    else {
569                        unreachable!()
570                    };
571
572                    // Subscribe to broadcast BEFORE sending subscription to server
573                    // This ensures we don't miss any frames that arrive during setup
574                    let inner = BroadcastStream::new(store.subscribe());
575
576                    let conn = connection.clone();
577                    let view = subscription_view.clone();
578                    let key = subscription_key.clone();
579                    let fut = Box::pin(async move {
580                        let opts = SubscriptionOptions {
581                            take,
582                            skip,
583                            with_snapshot,
584                            after,
585                            snapshot_limit,
586                        };
587                        conn.ensure_subscription_with_opts(&view, key.as_deref(), opts)
588                            .await;
589                    });
590
591                    this.state = RichEntityStreamState::Subscribing { fut, inner };
592                    continue;
593                }
594                RichEntityStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
595                    Poll::Ready(()) => {
596                        let RichEntityStreamState::Subscribing { inner, .. } =
597                            std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
598                        else {
599                            unreachable!()
600                        };
601                        this.state = RichEntityStreamState::Active { inner };
602                        continue;
603                    }
604                    Poll::Pending => return Poll::Pending,
605                },
606                RichEntityStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
607                    Poll::Ready(Some(Ok(update))) => {
608                        if update.view != this.view {
609                            continue;
610                        }
611
612                        if !this.key_filter.matches(&update.key) {
613                            continue;
614                        }
615
616                        let previous: Option<T> =
617                            update.previous.and_then(|v| serde_json::from_value(v).ok());
618
619                        match update.operation {
620                            Operation::Delete => {
621                                return Poll::Ready(Some(RichUpdate::Deleted {
622                                    key: update.key,
623                                    last_known: previous,
624                                }));
625                            }
626                            Operation::Create | Operation::Snapshot => {
627                                if let Some(data) = update.data {
628                                    if let Ok(typed) = serde_json::from_value::<T>(data) {
629                                        return Poll::Ready(Some(RichUpdate::Created {
630                                            key: update.key,
631                                            data: typed,
632                                        }));
633                                    }
634                                }
635                            }
636                            Operation::Upsert | Operation::Patch => {
637                                if let Some(data) = update.data {
638                                    match serde_json::from_value::<T>(data.clone()) {
639                                        Ok(after) => {
640                                            if let Some(before) = previous {
641                                                return Poll::Ready(Some(RichUpdate::Updated {
642                                                    key: update.key,
643                                                    before,
644                                                    after,
645                                                    patch: update.patch,
646                                                }));
647                                            } else {
648                                                return Poll::Ready(Some(RichUpdate::Created {
649                                                    key: update.key,
650                                                    data: after,
651                                                }));
652                                            }
653                                        }
654                                        Err(e) => {
655                                            tracing::warn!(
656                                                key = %update.key,
657                                                error = %e,
658                                                "Update failed to deserialize, skipping"
659                                            );
660                                            continue;
661                                        }
662                                    }
663                                }
664                            }
665                            Operation::Subscribed => {
666                                continue;
667                            }
668                        }
669                    }
670                    Poll::Ready(Some(Err(_lagged))) => {
671                        tracing::warn!(
672                            "RichEntityStream lagged behind, some messages were dropped"
673                        );
674                        continue;
675                    }
676                    Poll::Ready(None) => {
677                        return Poll::Ready(None);
678                    }
679                    Poll::Pending => {
680                        return Poll::Pending;
681                    }
682                },
683                RichEntityStreamState::Invalid => {
684                    panic!("RichEntityStream in invalid state");
685                }
686                RichEntityStreamState::_Phantom(_) => unreachable!(),
687            }
688        }
689    }
690}
691
692impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
693    pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, RichUpdate<T>, F>
694    where
695        F: FnMut(&RichUpdate<T>) -> bool,
696    {
697        FilteredStream::new(self, predicate)
698    }
699
700    pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, RichUpdate<T>, U, F>
701    where
702        F: FnMut(RichUpdate<T>) -> Option<U>,
703    {
704        FilterMapStream::new(self, f)
705    }
706
707    pub fn map<U, F>(self, f: F) -> MapStream<Self, RichUpdate<T>, U, F>
708    where
709        F: FnMut(RichUpdate<T>) -> U,
710    {
711        MapStream::new(self, f)
712    }
713}
714
715pin_project! {
716    pub struct FilteredStream<S, I, F> {
717        #[pin]
718        inner: S,
719        predicate: F,
720        _item: PhantomData<I>,
721    }
722}
723
724impl<S, I, F> FilteredStream<S, I, F> {
725    pub fn new(inner: S, predicate: F) -> Self {
726        Self {
727            inner,
728            predicate,
729            _item: PhantomData,
730        }
731    }
732}
733
734impl<S, I, F> Stream for FilteredStream<S, I, F>
735where
736    S: Stream<Item = I>,
737    F: FnMut(&I) -> bool,
738{
739    type Item = I;
740
741    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
742        let mut this = self.project();
743        loop {
744            match this.inner.as_mut().poll_next(cx) {
745                Poll::Ready(Some(item)) => {
746                    if (this.predicate)(&item) {
747                        return Poll::Ready(Some(item));
748                    }
749                }
750                Poll::Ready(None) => return Poll::Ready(None),
751                Poll::Pending => return Poll::Pending,
752            }
753        }
754    }
755}
756
757impl<S, I, F> FilteredStream<S, I, F>
758where
759    S: Stream<Item = I>,
760    F: FnMut(&I) -> bool,
761{
762    pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, I, F2>
763    where
764        F2: FnMut(&I) -> bool,
765    {
766        FilteredStream::new(self, predicate)
767    }
768
769    pub fn filter_map<U, F2>(self, f: F2) -> FilterMapStream<Self, I, U, F2>
770    where
771        F2: FnMut(I) -> Option<U>,
772    {
773        FilterMapStream::new(self, f)
774    }
775
776    pub fn map<U, F2>(self, f: F2) -> MapStream<Self, I, U, F2>
777    where
778        F2: FnMut(I) -> U,
779    {
780        MapStream::new(self, f)
781    }
782}
783
784pin_project! {
785    pub struct FilterMapStream<S, I, U, F> {
786        #[pin]
787        inner: S,
788        f: F,
789        _item: PhantomData<(I, U)>,
790    }
791}
792
793impl<S, I, U, F> FilterMapStream<S, I, U, F> {
794    pub fn new(inner: S, f: F) -> Self {
795        Self {
796            inner,
797            f,
798            _item: PhantomData,
799        }
800    }
801}
802
803impl<S, I, U, F> Stream for FilterMapStream<S, I, U, F>
804where
805    S: Stream<Item = I>,
806    F: FnMut(I) -> Option<U>,
807{
808    type Item = U;
809
810    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
811        let mut this = self.project();
812        loop {
813            match this.inner.as_mut().poll_next(cx) {
814                Poll::Ready(Some(item)) => {
815                    if let Some(mapped) = (this.f)(item) {
816                        return Poll::Ready(Some(mapped));
817                    }
818                }
819                Poll::Ready(None) => return Poll::Ready(None),
820                Poll::Pending => return Poll::Pending,
821            }
822        }
823    }
824}
825
826impl<S, I, U, F> FilterMapStream<S, I, U, F>
827where
828    S: Stream<Item = I>,
829    F: FnMut(I) -> Option<U>,
830{
831    pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, U, F2>
832    where
833        F2: FnMut(&U) -> bool,
834    {
835        FilteredStream::new(self, predicate)
836    }
837
838    pub fn filter_map<V, F2>(self, f: F2) -> FilterMapStream<Self, U, V, F2>
839    where
840        F2: FnMut(U) -> Option<V>,
841    {
842        FilterMapStream::new(self, f)
843    }
844
845    pub fn map<V, F2>(self, f: F2) -> MapStream<Self, U, V, F2>
846    where
847        F2: FnMut(U) -> V,
848    {
849        MapStream::new(self, f)
850    }
851}
852
853pin_project! {
854    pub struct MapStream<S, I, U, F> {
855        #[pin]
856        inner: S,
857        f: F,
858        _item: PhantomData<(I, U)>,
859    }
860}
861
862impl<S, I, U, F> MapStream<S, I, U, F> {
863    pub fn new(inner: S, f: F) -> Self {
864        Self {
865            inner,
866            f,
867            _item: PhantomData,
868        }
869    }
870}
871
872impl<S, I, U, F> Stream for MapStream<S, I, U, F>
873where
874    S: Stream<Item = I>,
875    F: FnMut(I) -> U,
876{
877    type Item = U;
878
879    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
880        let this = self.project();
881        match this.inner.poll_next(cx) {
882            Poll::Ready(Some(item)) => Poll::Ready(Some((this.f)(item))),
883            Poll::Ready(None) => Poll::Ready(None),
884            Poll::Pending => Poll::Pending,
885        }
886    }
887}
888
889impl<S, I, U, F> MapStream<S, I, U, F>
890where
891    S: Stream<Item = I>,
892    F: FnMut(I) -> U,
893{
894    pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, U, F2>
895    where
896        F2: FnMut(&U) -> bool,
897    {
898        FilteredStream::new(self, predicate)
899    }
900
901    pub fn filter_map<V, F2>(self, f: F2) -> FilterMapStream<Self, U, V, F2>
902    where
903        F2: FnMut(U) -> Option<V>,
904    {
905        FilterMapStream::new(self, f)
906    }
907
908    pub fn map<V, F2>(self, f: F2) -> MapStream<Self, U, V, F2>
909    where
910        F2: FnMut(U) -> V,
911    {
912        MapStream::new(self, f)
913    }
914}
915
916/// A stream that emits merged entity values directly (filtering out deletes).
917///
918/// This is the simplest streaming interface - it just yields `T` after each change,
919/// applying patches to give you the full merged entity state. Deletes are filtered out.
920///
921/// Corresponds to TypeScript SDK's `.use()` method.
922pub struct UseStream<T> {
923    state: UseStreamState<T>,
924    view: String,
925    key_filter: KeyFilter,
926    _marker: PhantomData<T>,
927}
928
929enum UseStreamState<T> {
930    Lazy {
931        connection: ConnectionManager,
932        store: SharedStore,
933        subscription_view: String,
934        subscription_key: Option<String>,
935        take: Option<u32>,
936        skip: Option<u32>,
937        with_snapshot: Option<bool>,
938        after: Option<String>,
939        snapshot_limit: Option<usize>,
940    },
941    Active {
942        inner: BroadcastStream<StoreUpdate>,
943    },
944    Subscribing {
945        fut: Pin<Box<dyn Future<Output = ()> + Send>>,
946        inner: BroadcastStream<StoreUpdate>,
947    },
948    Invalid,
949    _Phantom(PhantomData<T>),
950}
951
952impl<T: DeserializeOwned + Clone + Send + 'static> UseStream<T> {
953    pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
954        Self {
955            state: UseStreamState::Active {
956                inner: BroadcastStream::new(rx),
957            },
958            view,
959            key_filter: KeyFilter::None,
960            _marker: PhantomData,
961        }
962    }
963
964    pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
965        Self {
966            state: UseStreamState::Active {
967                inner: BroadcastStream::new(rx),
968            },
969            view,
970            key_filter: KeyFilter::Single(key),
971            _marker: PhantomData,
972        }
973    }
974
975    pub fn new_lazy(
976        connection: ConnectionManager,
977        store: SharedStore,
978        entity_name: String,
979        subscription_view: String,
980        key_filter: KeyFilter,
981        subscription_key: Option<String>,
982    ) -> Self {
983        Self::new_lazy_with_opts(
984            connection,
985            store,
986            entity_name,
987            subscription_view,
988            key_filter,
989            subscription_key,
990            None,
991            None,
992            None,
993            None,
994            None,
995        )
996    }
997
998    #[allow(clippy::too_many_arguments)]
999    pub fn new_lazy_with_opts(
1000        connection: ConnectionManager,
1001        store: SharedStore,
1002        entity_name: String,
1003        subscription_view: String,
1004        key_filter: KeyFilter,
1005        subscription_key: Option<String>,
1006        take: Option<u32>,
1007        skip: Option<u32>,
1008        with_snapshot: Option<bool>,
1009        after: Option<String>,
1010        snapshot_limit: Option<usize>,
1011    ) -> Self {
1012        Self {
1013            state: UseStreamState::Lazy {
1014                connection,
1015                store,
1016                subscription_view,
1017                subscription_key,
1018                take,
1019                skip,
1020                with_snapshot,
1021                after,
1022                snapshot_limit,
1023            },
1024            view: entity_name,
1025            key_filter,
1026            _marker: PhantomData,
1027        }
1028    }
1029
1030    /// Filter the stream to only emit items matching the predicate.
1031    pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, T, F>
1032    where
1033        F: FnMut(&T) -> bool,
1034    {
1035        FilteredStream::new(self, predicate)
1036    }
1037
1038    /// Filter and transform items in one step.
1039    pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, T, U, F>
1040    where
1041        F: FnMut(T) -> Option<U>,
1042    {
1043        FilterMapStream::new(self, f)
1044    }
1045
1046    /// Transform each item.
1047    pub fn map<U, F>(self, f: F) -> MapStream<Self, T, U, F>
1048    where
1049        F: FnMut(T) -> U,
1050    {
1051        MapStream::new(self, f)
1052    }
1053}
1054
1055impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for UseStream<T> {
1056    type Item = T;
1057
1058    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1059        let this = self.get_mut();
1060
1061        loop {
1062            match &mut this.state {
1063                UseStreamState::Lazy { .. } => {
1064                    let UseStreamState::Lazy {
1065                        connection,
1066                        store,
1067                        subscription_view,
1068                        subscription_key,
1069                        take,
1070                        skip,
1071                        with_snapshot,
1072                        after,
1073                        snapshot_limit,
1074                    } = std::mem::replace(&mut this.state, UseStreamState::Invalid)
1075                    else {
1076                        unreachable!()
1077                    };
1078
1079                    // Subscribe to broadcast BEFORE sending subscription to server
1080                    let inner = BroadcastStream::new(store.subscribe());
1081
1082                    let conn = connection.clone();
1083                    let view = subscription_view.clone();
1084                    let key = subscription_key.clone();
1085                    let fut = Box::pin(async move {
1086                        let opts = SubscriptionOptions {
1087                            take,
1088                            skip,
1089                            with_snapshot,
1090                            after,
1091                            snapshot_limit,
1092                        };
1093                        conn.ensure_subscription_with_opts(&view, key.as_deref(), opts)
1094                            .await;
1095                    });
1096
1097                    this.state = UseStreamState::Subscribing { fut, inner };
1098                    continue;
1099                }
1100                UseStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
1101                    Poll::Ready(()) => {
1102                        let UseStreamState::Subscribing { inner, .. } =
1103                            std::mem::replace(&mut this.state, UseStreamState::Invalid)
1104                        else {
1105                            unreachable!()
1106                        };
1107                        this.state = UseStreamState::Active { inner };
1108                        continue;
1109                    }
1110                    Poll::Pending => return Poll::Pending,
1111                },
1112                UseStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
1113                    Poll::Ready(Some(Ok(update))) => {
1114                        if update.view != this.view {
1115                            continue;
1116                        }
1117
1118                        if !this.key_filter.matches(&update.key) {
1119                            continue;
1120                        }
1121
1122                        // Filter out deletes - UseStream only emits actual entity data
1123                        match update.operation {
1124                            Operation::Delete => {
1125                                // Skip deletes entirely
1126                                continue;
1127                            }
1128                            Operation::Upsert
1129                            | Operation::Create
1130                            | Operation::Snapshot
1131                            | Operation::Patch => {
1132                                if let Some(data) = update.data {
1133                                    match serde_json::from_value::<T>(data) {
1134                                        Ok(typed) => {
1135                                            return Poll::Ready(Some(typed));
1136                                        }
1137                                        Err(e) => {
1138                                            tracing::warn!(
1139                                                key = %update.key,
1140                                                error = %e,
1141                                                "UseStream: failed to deserialize entity, skipping"
1142                                            );
1143                                            continue;
1144                                        }
1145                                    }
1146                                }
1147                            }
1148                            Operation::Subscribed => {
1149                                continue;
1150                            }
1151                        }
1152                    }
1153                    Poll::Ready(Some(Err(_lagged))) => {
1154                        tracing::warn!("UseStream lagged behind, some messages were dropped");
1155                        continue;
1156                    }
1157                    Poll::Ready(None) => {
1158                        return Poll::Ready(None);
1159                    }
1160                    Poll::Pending => {
1161                        return Poll::Pending;
1162                    }
1163                },
1164                UseStreamState::Invalid => {
1165                    panic!("UseStream in invalid state");
1166                }
1167                UseStreamState::_Phantom(_) => unreachable!(),
1168            }
1169        }
1170    }
1171}