hyperstack_sdk/
stream.rs

1use crate::connection::ConnectionManager;
2use crate::frame::Operation;
3use crate::store::{SharedStore, StoreUpdate};
4use futures_util::Stream;
5use pin_project_lite::pin_project;
6use serde::de::DeserializeOwned;
7use std::collections::HashSet;
8use std::future::Future;
9use std::marker::PhantomData;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use tokio::sync::broadcast;
13use tokio_stream::wrappers::BroadcastStream;
14
15#[derive(Debug, Clone)]
16pub enum Update<T> {
17    Upsert { key: String, data: T },
18    Patch { key: String, data: T },
19    Delete { key: String },
20}
21
22#[derive(Debug, Clone)]
23pub enum RichUpdate<T> {
24    Created {
25        key: String,
26        data: T,
27    },
28    Updated {
29        key: String,
30        before: T,
31        after: T,
32        patch: Option<serde_json::Value>,
33    },
34    Deleted {
35        key: String,
36        last_known: Option<T>,
37    },
38}
39
40impl<T> Update<T> {
41    pub fn key(&self) -> &str {
42        match self {
43            Update::Upsert { key, .. } => key,
44            Update::Patch { key, .. } => key,
45            Update::Delete { key } => key,
46        }
47    }
48
49    pub fn data(&self) -> Option<&T> {
50        match self {
51            Update::Upsert { data, .. } => Some(data),
52            Update::Patch { data, .. } => Some(data),
53            Update::Delete { .. } => None,
54        }
55    }
56
57    pub fn is_delete(&self) -> bool {
58        matches!(self, Update::Delete { .. })
59    }
60
61    pub fn into_data(self) -> Option<T> {
62        match self {
63            Update::Upsert { data, .. } => Some(data),
64            Update::Patch { data, .. } => Some(data),
65            Update::Delete { .. } => None,
66        }
67    }
68
69    pub fn has_data(&self) -> bool {
70        matches!(self, Update::Upsert { .. } | Update::Patch { .. })
71    }
72
73    pub fn into_key(self) -> String {
74        match self {
75            Update::Upsert { key, .. } => key,
76            Update::Patch { key, .. } => key,
77            Update::Delete { key } => key,
78        }
79    }
80
81    pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> Update<U> {
82        match self {
83            Update::Upsert { key, data } => Update::Upsert { key, data: f(data) },
84            Update::Patch { key, data } => Update::Patch { key, data: f(data) },
85            Update::Delete { key } => Update::Delete { key },
86        }
87    }
88}
89
90impl<T> RichUpdate<T> {
91    pub fn key(&self) -> &str {
92        match self {
93            RichUpdate::Created { key, .. } => key,
94            RichUpdate::Updated { key, .. } => key,
95            RichUpdate::Deleted { key, .. } => key,
96        }
97    }
98
99    pub fn data(&self) -> Option<&T> {
100        match self {
101            RichUpdate::Created { data, .. } => Some(data),
102            RichUpdate::Updated { after, .. } => Some(after),
103            RichUpdate::Deleted { last_known, .. } => last_known.as_ref(),
104        }
105    }
106
107    pub fn before(&self) -> Option<&T> {
108        match self {
109            RichUpdate::Created { .. } => None,
110            RichUpdate::Updated { before, .. } => Some(before),
111            RichUpdate::Deleted { last_known, .. } => last_known.as_ref(),
112        }
113    }
114
115    pub fn into_data(self) -> Option<T> {
116        match self {
117            RichUpdate::Created { data, .. } => Some(data),
118            RichUpdate::Updated { after, .. } => Some(after),
119            RichUpdate::Deleted { last_known, .. } => last_known,
120        }
121    }
122
123    pub fn is_created(&self) -> bool {
124        matches!(self, RichUpdate::Created { .. })
125    }
126
127    pub fn is_updated(&self) -> bool {
128        matches!(self, RichUpdate::Updated { .. })
129    }
130
131    pub fn is_deleted(&self) -> bool {
132        matches!(self, RichUpdate::Deleted { .. })
133    }
134
135    pub fn patch(&self) -> Option<&serde_json::Value> {
136        match self {
137            RichUpdate::Updated { patch, .. } => patch.as_ref(),
138            _ => None,
139        }
140    }
141
142    pub fn has_patch_field(&self, field: &str) -> bool {
143        self.patch()
144            .and_then(|p| p.as_object())
145            .map(|obj| obj.contains_key(field))
146            .unwrap_or(false)
147    }
148}
149
150#[derive(Clone)]
151pub enum KeyFilter {
152    None,
153    Single(String),
154    Multiple(HashSet<String>),
155}
156
157impl KeyFilter {
158    fn matches(&self, key: &str) -> bool {
159        match self {
160            KeyFilter::None => true,
161            KeyFilter::Single(k) => k == key,
162            KeyFilter::Multiple(keys) => keys.contains(key),
163        }
164    }
165}
166
167pub struct EntityStream<T> {
168    state: EntityStreamState<T>,
169    view: String,
170    key_filter: KeyFilter,
171    _marker: PhantomData<T>,
172}
173
174enum EntityStreamState<T> {
175    Lazy {
176        connection: ConnectionManager,
177        store: SharedStore,
178        subscription_view: String,
179        subscription_key: Option<String>,
180        take: Option<u32>,
181        skip: Option<u32>,
182    },
183    Active {
184        inner: BroadcastStream<StoreUpdate>,
185    },
186    Subscribing {
187        fut: Pin<Box<dyn Future<Output = ()> + Send>>,
188        inner: BroadcastStream<StoreUpdate>,
189    },
190    Invalid,
191    _Phantom(PhantomData<T>),
192}
193
194impl<T: DeserializeOwned + Clone + Send + 'static> EntityStream<T> {
195    pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
196        Self {
197            state: EntityStreamState::Active {
198                inner: BroadcastStream::new(rx),
199            },
200            view,
201            key_filter: KeyFilter::None,
202            _marker: PhantomData,
203        }
204    }
205
206    pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
207        Self {
208            state: EntityStreamState::Active {
209                inner: BroadcastStream::new(rx),
210            },
211            view,
212            key_filter: KeyFilter::Single(key),
213            _marker: PhantomData,
214        }
215    }
216
217    pub fn new_multi_filtered(
218        rx: broadcast::Receiver<StoreUpdate>,
219        view: String,
220        keys: HashSet<String>,
221    ) -> Self {
222        Self {
223            state: EntityStreamState::Active {
224                inner: BroadcastStream::new(rx),
225            },
226            view,
227            key_filter: KeyFilter::Multiple(keys),
228            _marker: PhantomData,
229        }
230    }
231
232    pub fn new_lazy(
233        connection: ConnectionManager,
234        store: SharedStore,
235        entity_name: String,
236        subscription_view: String,
237        key_filter: KeyFilter,
238        subscription_key: Option<String>,
239    ) -> Self {
240        Self::new_lazy_with_opts(
241            connection,
242            store,
243            entity_name,
244            subscription_view,
245            key_filter,
246            subscription_key,
247            None,
248            None,
249        )
250    }
251
252    #[allow(clippy::too_many_arguments)]
253    pub fn new_lazy_with_opts(
254        connection: ConnectionManager,
255        store: SharedStore,
256        entity_name: String,
257        subscription_view: String,
258        key_filter: KeyFilter,
259        subscription_key: Option<String>,
260        take: Option<u32>,
261        skip: Option<u32>,
262    ) -> Self {
263        Self {
264            state: EntityStreamState::Lazy {
265                connection,
266                store,
267                subscription_view,
268                subscription_key,
269                take,
270                skip,
271            },
272            view: entity_name,
273            key_filter,
274            _marker: PhantomData,
275        }
276    }
277
278    pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, Update<T>, F>
279    where
280        F: FnMut(&Update<T>) -> bool,
281    {
282        FilteredStream::new(self, predicate)
283    }
284
285    pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, Update<T>, U, F>
286    where
287        F: FnMut(Update<T>) -> Option<U>,
288    {
289        FilterMapStream::new(self, f)
290    }
291
292    pub fn map<U, F>(self, f: F) -> MapStream<Self, Update<T>, U, F>
293    where
294        F: FnMut(Update<T>) -> U,
295    {
296        MapStream::new(self, f)
297    }
298}
299
300impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for EntityStream<T> {
301    type Item = Update<T>;
302
303    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
304        let this = self.get_mut();
305
306        loop {
307            match &mut this.state {
308                EntityStreamState::Lazy { .. } => {
309                    let EntityStreamState::Lazy {
310                        connection,
311                        store,
312                        subscription_view,
313                        subscription_key,
314                        take,
315                        skip,
316                    } = std::mem::replace(&mut this.state, EntityStreamState::Invalid)
317                    else {
318                        unreachable!()
319                    };
320
321                    // Subscribe to broadcast BEFORE sending subscription to server
322                    // This ensures we don't miss any frames that arrive during setup
323                    let inner = BroadcastStream::new(store.subscribe());
324
325                    let conn = connection.clone();
326                    let view = subscription_view.clone();
327                    let key = subscription_key.clone();
328                    let fut = Box::pin(async move {
329                        conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip)
330                            .await;
331                    });
332
333                    this.state = EntityStreamState::Subscribing { fut, inner };
334                    continue;
335                }
336                EntityStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
337                    Poll::Ready(()) => {
338                        let EntityStreamState::Subscribing { inner, .. } =
339                            std::mem::replace(&mut this.state, EntityStreamState::Invalid)
340                        else {
341                            unreachable!()
342                        };
343                        this.state = EntityStreamState::Active { inner };
344                        continue;
345                    }
346                    Poll::Pending => return Poll::Pending,
347                },
348                EntityStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
349                    Poll::Ready(Some(Ok(update))) => {
350                        if update.view != this.view {
351                            continue;
352                        }
353
354                        if !this.key_filter.matches(&update.key) {
355                            continue;
356                        }
357
358                        match update.operation {
359                            Operation::Delete => {
360                                return Poll::Ready(Some(Update::Delete { key: update.key }));
361                            }
362                            Operation::Upsert | Operation::Create | Operation::Snapshot => {
363                                if let Some(data) = update.data {
364                                    if let Ok(typed) = serde_json::from_value::<T>(data) {
365                                        return Poll::Ready(Some(Update::Upsert {
366                                            key: update.key,
367                                            data: typed,
368                                        }));
369                                    }
370                                }
371                            }
372                            Operation::Patch => {
373                                if let Some(data) = update.data {
374                                    match serde_json::from_value::<T>(data) {
375                                        Ok(typed) => {
376                                            return Poll::Ready(Some(Update::Patch {
377                                                key: update.key,
378                                                data: typed,
379                                            }));
380                                        }
381                                        Err(e) => {
382                                            tracing::warn!(
383                                                key = %update.key,
384                                                error = %e,
385                                                "Patch failed to deserialize to full type, skipping"
386                                            );
387                                            continue;
388                                        }
389                                    }
390                                }
391                            }
392                            Operation::Subscribed => {
393                                continue;
394                            }
395                        }
396                    }
397                    Poll::Ready(Some(Err(_lagged))) => {
398                        tracing::warn!("EntityStream lagged behind, some messages were dropped");
399                        continue;
400                    }
401                    Poll::Ready(None) => {
402                        return Poll::Ready(None);
403                    }
404                    Poll::Pending => {
405                        return Poll::Pending;
406                    }
407                },
408                EntityStreamState::Invalid => {
409                    panic!("EntityStream in invalid state");
410                }
411                EntityStreamState::_Phantom(_) => unreachable!(),
412            }
413        }
414    }
415}
416
417pub struct RichEntityStream<T> {
418    state: RichEntityStreamState<T>,
419    view: String,
420    key_filter: KeyFilter,
421    _marker: PhantomData<T>,
422}
423
424enum RichEntityStreamState<T> {
425    Lazy {
426        connection: ConnectionManager,
427        store: SharedStore,
428        subscription_view: String,
429        subscription_key: Option<String>,
430        take: Option<u32>,
431        skip: Option<u32>,
432    },
433    Active {
434        inner: BroadcastStream<StoreUpdate>,
435    },
436    Subscribing {
437        fut: Pin<Box<dyn Future<Output = ()> + Send>>,
438        inner: BroadcastStream<StoreUpdate>,
439    },
440    Invalid,
441    _Phantom(PhantomData<T>),
442}
443
444impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
445    pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
446        Self {
447            state: RichEntityStreamState::Active {
448                inner: BroadcastStream::new(rx),
449            },
450            view,
451            key_filter: KeyFilter::None,
452            _marker: PhantomData,
453        }
454    }
455
456    pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
457        Self {
458            state: RichEntityStreamState::Active {
459                inner: BroadcastStream::new(rx),
460            },
461            view,
462            key_filter: KeyFilter::Single(key),
463            _marker: PhantomData,
464        }
465    }
466
467    pub fn new_lazy(
468        connection: ConnectionManager,
469        store: SharedStore,
470        entity_name: String,
471        subscription_view: String,
472        key_filter: KeyFilter,
473        subscription_key: Option<String>,
474    ) -> Self {
475        Self::new_lazy_with_opts(
476            connection,
477            store,
478            entity_name,
479            subscription_view,
480            key_filter,
481            subscription_key,
482            None,
483            None,
484        )
485    }
486
487    #[allow(clippy::too_many_arguments)]
488    pub fn new_lazy_with_opts(
489        connection: ConnectionManager,
490        store: SharedStore,
491        entity_name: String,
492        subscription_view: String,
493        key_filter: KeyFilter,
494        subscription_key: Option<String>,
495        take: Option<u32>,
496        skip: Option<u32>,
497    ) -> Self {
498        Self {
499            state: RichEntityStreamState::Lazy {
500                connection,
501                store,
502                subscription_view,
503                subscription_key,
504                take,
505                skip,
506            },
507            view: entity_name,
508            key_filter,
509            _marker: PhantomData,
510        }
511    }
512}
513
514impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for RichEntityStream<T> {
515    type Item = RichUpdate<T>;
516
517    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
518        let this = self.get_mut();
519
520        loop {
521            match &mut this.state {
522                RichEntityStreamState::Lazy { .. } => {
523                    let RichEntityStreamState::Lazy {
524                        connection,
525                        store,
526                        subscription_view,
527                        subscription_key,
528                        take,
529                        skip,
530                    } = std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
531                    else {
532                        unreachable!()
533                    };
534
535                    // Subscribe to broadcast BEFORE sending subscription to server
536                    // This ensures we don't miss any frames that arrive during setup
537                    let inner = BroadcastStream::new(store.subscribe());
538
539                    let conn = connection.clone();
540                    let view = subscription_view.clone();
541                    let key = subscription_key.clone();
542                    let fut = Box::pin(async move {
543                        conn.ensure_subscription_with_opts(&view, key.as_deref(), take, skip)
544                            .await;
545                    });
546
547                    this.state = RichEntityStreamState::Subscribing { fut, inner };
548                    continue;
549                }
550                RichEntityStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
551                    Poll::Ready(()) => {
552                        let RichEntityStreamState::Subscribing { inner, .. } =
553                            std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
554                        else {
555                            unreachable!()
556                        };
557                        this.state = RichEntityStreamState::Active { inner };
558                        continue;
559                    }
560                    Poll::Pending => return Poll::Pending,
561                },
562                RichEntityStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
563                    Poll::Ready(Some(Ok(update))) => {
564                        if update.view != this.view {
565                            continue;
566                        }
567
568                        if !this.key_filter.matches(&update.key) {
569                            continue;
570                        }
571
572                        let previous: Option<T> =
573                            update.previous.and_then(|v| serde_json::from_value(v).ok());
574
575                        match update.operation {
576                            Operation::Delete => {
577                                return Poll::Ready(Some(RichUpdate::Deleted {
578                                    key: update.key,
579                                    last_known: previous,
580                                }));
581                            }
582                            Operation::Create | Operation::Snapshot => {
583                                if let Some(data) = update.data {
584                                    if let Ok(typed) = serde_json::from_value::<T>(data) {
585                                        return Poll::Ready(Some(RichUpdate::Created {
586                                            key: update.key,
587                                            data: typed,
588                                        }));
589                                    }
590                                }
591                            }
592                            Operation::Upsert | Operation::Patch => {
593                                if let Some(data) = update.data {
594                                    match serde_json::from_value::<T>(data.clone()) {
595                                        Ok(after) => {
596                                            if let Some(before) = previous {
597                                                return Poll::Ready(Some(RichUpdate::Updated {
598                                                    key: update.key,
599                                                    before,
600                                                    after,
601                                                    patch: update.patch,
602                                                }));
603                                            } else {
604                                                return Poll::Ready(Some(RichUpdate::Created {
605                                                    key: update.key,
606                                                    data: after,
607                                                }));
608                                            }
609                                        }
610                                        Err(e) => {
611                                            tracing::warn!(
612                                                key = %update.key,
613                                                error = %e,
614                                                "Update failed to deserialize, skipping"
615                                            );
616                                            continue;
617                                        }
618                                    }
619                                }
620                            }
621                            Operation::Subscribed => {
622                                continue;
623                            }
624                        }
625                    }
626                    Poll::Ready(Some(Err(_lagged))) => {
627                        tracing::warn!(
628                            "RichEntityStream lagged behind, some messages were dropped"
629                        );
630                        continue;
631                    }
632                    Poll::Ready(None) => {
633                        return Poll::Ready(None);
634                    }
635                    Poll::Pending => {
636                        return Poll::Pending;
637                    }
638                },
639                RichEntityStreamState::Invalid => {
640                    panic!("RichEntityStream in invalid state");
641                }
642                RichEntityStreamState::_Phantom(_) => unreachable!(),
643            }
644        }
645    }
646}
647
648impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
649    pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, RichUpdate<T>, F>
650    where
651        F: FnMut(&RichUpdate<T>) -> bool,
652    {
653        FilteredStream::new(self, predicate)
654    }
655
656    pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, RichUpdate<T>, U, F>
657    where
658        F: FnMut(RichUpdate<T>) -> Option<U>,
659    {
660        FilterMapStream::new(self, f)
661    }
662
663    pub fn map<U, F>(self, f: F) -> MapStream<Self, RichUpdate<T>, U, F>
664    where
665        F: FnMut(RichUpdate<T>) -> U,
666    {
667        MapStream::new(self, f)
668    }
669}
670
671pin_project! {
672    pub struct FilteredStream<S, I, F> {
673        #[pin]
674        inner: S,
675        predicate: F,
676        _item: PhantomData<I>,
677    }
678}
679
680impl<S, I, F> FilteredStream<S, I, F> {
681    pub fn new(inner: S, predicate: F) -> Self {
682        Self {
683            inner,
684            predicate,
685            _item: PhantomData,
686        }
687    }
688}
689
690impl<S, I, F> Stream for FilteredStream<S, I, F>
691where
692    S: Stream<Item = I>,
693    F: FnMut(&I) -> bool,
694{
695    type Item = I;
696
697    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
698        let mut this = self.project();
699        loop {
700            match this.inner.as_mut().poll_next(cx) {
701                Poll::Ready(Some(item)) => {
702                    if (this.predicate)(&item) {
703                        return Poll::Ready(Some(item));
704                    }
705                }
706                Poll::Ready(None) => return Poll::Ready(None),
707                Poll::Pending => return Poll::Pending,
708            }
709        }
710    }
711}
712
713impl<S, I, F> FilteredStream<S, I, F>
714where
715    S: Stream<Item = I>,
716    F: FnMut(&I) -> bool,
717{
718    pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, I, F2>
719    where
720        F2: FnMut(&I) -> bool,
721    {
722        FilteredStream::new(self, predicate)
723    }
724
725    pub fn filter_map<U, F2>(self, f: F2) -> FilterMapStream<Self, I, U, F2>
726    where
727        F2: FnMut(I) -> Option<U>,
728    {
729        FilterMapStream::new(self, f)
730    }
731
732    pub fn map<U, F2>(self, f: F2) -> MapStream<Self, I, U, F2>
733    where
734        F2: FnMut(I) -> U,
735    {
736        MapStream::new(self, f)
737    }
738}
739
740pin_project! {
741    pub struct FilterMapStream<S, I, U, F> {
742        #[pin]
743        inner: S,
744        f: F,
745        _item: PhantomData<(I, U)>,
746    }
747}
748
749impl<S, I, U, F> FilterMapStream<S, I, U, F> {
750    pub fn new(inner: S, f: F) -> Self {
751        Self {
752            inner,
753            f,
754            _item: PhantomData,
755        }
756    }
757}
758
759impl<S, I, U, F> Stream for FilterMapStream<S, I, U, F>
760where
761    S: Stream<Item = I>,
762    F: FnMut(I) -> Option<U>,
763{
764    type Item = U;
765
766    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
767        let mut this = self.project();
768        loop {
769            match this.inner.as_mut().poll_next(cx) {
770                Poll::Ready(Some(item)) => {
771                    if let Some(mapped) = (this.f)(item) {
772                        return Poll::Ready(Some(mapped));
773                    }
774                }
775                Poll::Ready(None) => return Poll::Ready(None),
776                Poll::Pending => return Poll::Pending,
777            }
778        }
779    }
780}
781
782impl<S, I, U, F> FilterMapStream<S, I, U, F>
783where
784    S: Stream<Item = I>,
785    F: FnMut(I) -> Option<U>,
786{
787    pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, U, F2>
788    where
789        F2: FnMut(&U) -> bool,
790    {
791        FilteredStream::new(self, predicate)
792    }
793
794    pub fn filter_map<V, F2>(self, f: F2) -> FilterMapStream<Self, U, V, F2>
795    where
796        F2: FnMut(U) -> Option<V>,
797    {
798        FilterMapStream::new(self, f)
799    }
800
801    pub fn map<V, F2>(self, f: F2) -> MapStream<Self, U, V, F2>
802    where
803        F2: FnMut(U) -> V,
804    {
805        MapStream::new(self, f)
806    }
807}
808
809pin_project! {
810    pub struct MapStream<S, I, U, F> {
811        #[pin]
812        inner: S,
813        f: F,
814        _item: PhantomData<(I, U)>,
815    }
816}
817
818impl<S, I, U, F> MapStream<S, I, U, F> {
819    pub fn new(inner: S, f: F) -> Self {
820        Self {
821            inner,
822            f,
823            _item: PhantomData,
824        }
825    }
826}
827
828impl<S, I, U, F> Stream for MapStream<S, I, U, F>
829where
830    S: Stream<Item = I>,
831    F: FnMut(I) -> U,
832{
833    type Item = U;
834
835    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
836        let this = self.project();
837        match this.inner.poll_next(cx) {
838            Poll::Ready(Some(item)) => Poll::Ready(Some((this.f)(item))),
839            Poll::Ready(None) => Poll::Ready(None),
840            Poll::Pending => Poll::Pending,
841        }
842    }
843}
844
845impl<S, I, U, F> MapStream<S, I, U, F>
846where
847    S: Stream<Item = I>,
848    F: FnMut(I) -> U,
849{
850    pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, U, F2>
851    where
852        F2: FnMut(&U) -> bool,
853    {
854        FilteredStream::new(self, predicate)
855    }
856
857    pub fn filter_map<V, F2>(self, f: F2) -> FilterMapStream<Self, U, V, F2>
858    where
859        F2: FnMut(U) -> Option<V>,
860    {
861        FilterMapStream::new(self, f)
862    }
863
864    pub fn map<V, F2>(self, f: F2) -> MapStream<Self, U, V, F2>
865    where
866        F2: FnMut(U) -> V,
867    {
868        MapStream::new(self, f)
869    }
870}