1use crate::connection::{ConnectionManager, SubscriptionOptions};
2use crate::frame::Operation;
3use crate::store::{SharedStore, StoreUpdate};
4use futures_util::Stream;
5use pin_project_lite::pin_project;
6use serde::de::DeserializeOwned;
7use std::collections::HashSet;
8use std::future::Future;
9use std::marker::PhantomData;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use tokio::sync::broadcast;
13use tokio_stream::wrappers::BroadcastStream;
14
15#[derive(Debug, Clone)]
16pub enum Update<T> {
17 Upsert { key: String, data: T },
18 Patch { key: String, data: T },
19 Delete { key: String },
20}
21
22#[derive(Debug, Clone)]
23pub enum RichUpdate<T> {
24 Created {
25 key: String,
26 data: T,
27 },
28 Updated {
29 key: String,
30 before: T,
31 after: T,
32 patch: Option<serde_json::Value>,
33 },
34 Deleted {
35 key: String,
36 last_known: Option<T>,
37 },
38}
39
40impl<T> Update<T> {
41 pub fn key(&self) -> &str {
42 match self {
43 Update::Upsert { key, .. } => key,
44 Update::Patch { key, .. } => key,
45 Update::Delete { key } => key,
46 }
47 }
48
49 pub fn data(&self) -> Option<&T> {
50 match self {
51 Update::Upsert { data, .. } => Some(data),
52 Update::Patch { data, .. } => Some(data),
53 Update::Delete { .. } => None,
54 }
55 }
56
57 pub fn is_delete(&self) -> bool {
58 matches!(self, Update::Delete { .. })
59 }
60
61 pub fn into_data(self) -> Option<T> {
62 match self {
63 Update::Upsert { data, .. } => Some(data),
64 Update::Patch { data, .. } => Some(data),
65 Update::Delete { .. } => None,
66 }
67 }
68
69 pub fn has_data(&self) -> bool {
70 matches!(self, Update::Upsert { .. } | Update::Patch { .. })
71 }
72
73 pub fn into_key(self) -> String {
74 match self {
75 Update::Upsert { key, .. } => key,
76 Update::Patch { key, .. } => key,
77 Update::Delete { key } => key,
78 }
79 }
80
81 pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> Update<U> {
82 match self {
83 Update::Upsert { key, data } => Update::Upsert { key, data: f(data) },
84 Update::Patch { key, data } => Update::Patch { key, data: f(data) },
85 Update::Delete { key } => Update::Delete { key },
86 }
87 }
88}
89
90impl<T> RichUpdate<T> {
91 pub fn key(&self) -> &str {
92 match self {
93 RichUpdate::Created { key, .. } => key,
94 RichUpdate::Updated { key, .. } => key,
95 RichUpdate::Deleted { key, .. } => key,
96 }
97 }
98
99 pub fn data(&self) -> Option<&T> {
100 match self {
101 RichUpdate::Created { data, .. } => Some(data),
102 RichUpdate::Updated { after, .. } => Some(after),
103 RichUpdate::Deleted { last_known, .. } => last_known.as_ref(),
104 }
105 }
106
107 pub fn before(&self) -> Option<&T> {
108 match self {
109 RichUpdate::Created { .. } => None,
110 RichUpdate::Updated { before, .. } => Some(before),
111 RichUpdate::Deleted { last_known, .. } => last_known.as_ref(),
112 }
113 }
114
115 pub fn into_data(self) -> Option<T> {
116 match self {
117 RichUpdate::Created { data, .. } => Some(data),
118 RichUpdate::Updated { after, .. } => Some(after),
119 RichUpdate::Deleted { last_known, .. } => last_known,
120 }
121 }
122
123 pub fn is_created(&self) -> bool {
124 matches!(self, RichUpdate::Created { .. })
125 }
126
127 pub fn is_updated(&self) -> bool {
128 matches!(self, RichUpdate::Updated { .. })
129 }
130
131 pub fn is_deleted(&self) -> bool {
132 matches!(self, RichUpdate::Deleted { .. })
133 }
134
135 pub fn patch(&self) -> Option<&serde_json::Value> {
136 match self {
137 RichUpdate::Updated { patch, .. } => patch.as_ref(),
138 _ => None,
139 }
140 }
141
142 pub fn has_patch_field(&self, field: &str) -> bool {
143 self.patch()
144 .and_then(|p| p.as_object())
145 .map(|obj| obj.contains_key(field))
146 .unwrap_or(false)
147 }
148}
149
150#[derive(Clone)]
151pub enum KeyFilter {
152 None,
153 Single(String),
154 Multiple(HashSet<String>),
155}
156
157impl KeyFilter {
158 fn matches(&self, key: &str) -> bool {
159 match self {
160 KeyFilter::None => true,
161 KeyFilter::Single(k) => k == key,
162 KeyFilter::Multiple(keys) => keys.contains(key),
163 }
164 }
165}
166
167pub struct EntityStream<T> {
168 state: EntityStreamState<T>,
169 view: String,
170 key_filter: KeyFilter,
171 _marker: PhantomData<T>,
172}
173
174enum EntityStreamState<T> {
175 Lazy {
176 connection: ConnectionManager,
177 store: SharedStore,
178 subscription_view: String,
179 subscription_key: Option<String>,
180 take: Option<u32>,
181 skip: Option<u32>,
182 with_snapshot: Option<bool>,
183 after: Option<String>,
184 snapshot_limit: Option<usize>,
185 },
186 Active {
187 inner: BroadcastStream<StoreUpdate>,
188 },
189 Subscribing {
190 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
191 inner: BroadcastStream<StoreUpdate>,
192 },
193 Invalid,
194 _Phantom(PhantomData<T>),
195}
196
197impl<T: DeserializeOwned + Clone + Send + 'static> EntityStream<T> {
198 pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
199 Self {
200 state: EntityStreamState::Active {
201 inner: BroadcastStream::new(rx),
202 },
203 view,
204 key_filter: KeyFilter::None,
205 _marker: PhantomData,
206 }
207 }
208
209 pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
210 Self {
211 state: EntityStreamState::Active {
212 inner: BroadcastStream::new(rx),
213 },
214 view,
215 key_filter: KeyFilter::Single(key),
216 _marker: PhantomData,
217 }
218 }
219
220 pub fn new_multi_filtered(
221 rx: broadcast::Receiver<StoreUpdate>,
222 view: String,
223 keys: HashSet<String>,
224 ) -> Self {
225 Self {
226 state: EntityStreamState::Active {
227 inner: BroadcastStream::new(rx),
228 },
229 view,
230 key_filter: KeyFilter::Multiple(keys),
231 _marker: PhantomData,
232 }
233 }
234
235 pub fn new_lazy(
236 connection: ConnectionManager,
237 store: SharedStore,
238 entity_name: String,
239 subscription_view: String,
240 key_filter: KeyFilter,
241 subscription_key: Option<String>,
242 ) -> Self {
243 Self::new_lazy_with_opts(
244 connection,
245 store,
246 entity_name,
247 subscription_view,
248 key_filter,
249 subscription_key,
250 None,
251 None,
252 None,
253 None,
254 None,
255 )
256 }
257
258 #[allow(clippy::too_many_arguments)]
259 pub fn new_lazy_with_opts(
260 connection: ConnectionManager,
261 store: SharedStore,
262 entity_name: String,
263 subscription_view: String,
264 key_filter: KeyFilter,
265 subscription_key: Option<String>,
266 take: Option<u32>,
267 skip: Option<u32>,
268 with_snapshot: Option<bool>,
269 after: Option<String>,
270 snapshot_limit: Option<usize>,
271 ) -> Self {
272 Self {
273 state: EntityStreamState::Lazy {
274 connection,
275 store,
276 subscription_view,
277 subscription_key,
278 take,
279 skip,
280 with_snapshot,
281 after,
282 snapshot_limit,
283 },
284 view: entity_name,
285 key_filter,
286 _marker: PhantomData,
287 }
288 }
289
290 pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, Update<T>, F>
291 where
292 F: FnMut(&Update<T>) -> bool,
293 {
294 FilteredStream::new(self, predicate)
295 }
296
297 pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, Update<T>, U, F>
298 where
299 F: FnMut(Update<T>) -> Option<U>,
300 {
301 FilterMapStream::new(self, f)
302 }
303
304 pub fn map<U, F>(self, f: F) -> MapStream<Self, Update<T>, U, F>
305 where
306 F: FnMut(Update<T>) -> U,
307 {
308 MapStream::new(self, f)
309 }
310}
311
312impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for EntityStream<T> {
313 type Item = Update<T>;
314
315 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
316 let this = self.get_mut();
317
318 loop {
319 match &mut this.state {
320 EntityStreamState::Lazy { .. } => {
321 let EntityStreamState::Lazy {
322 connection,
323 store,
324 subscription_view,
325 subscription_key,
326 take,
327 skip,
328 with_snapshot,
329 after,
330 snapshot_limit,
331 } = std::mem::replace(&mut this.state, EntityStreamState::Invalid)
332 else {
333 unreachable!()
334 };
335
336 let inner = BroadcastStream::new(store.subscribe());
339
340 let conn = connection.clone();
341 let view = subscription_view.clone();
342 let key = subscription_key.clone();
343 let fut = Box::pin(async move {
344 let opts = SubscriptionOptions {
345 take,
346 skip,
347 with_snapshot,
348 after,
349 snapshot_limit,
350 };
351 conn.ensure_subscription_with_opts(&view, key.as_deref(), opts)
352 .await;
353 });
354
355 this.state = EntityStreamState::Subscribing { fut, inner };
356 continue;
357 }
358 EntityStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
359 Poll::Ready(()) => {
360 let EntityStreamState::Subscribing { inner, .. } =
361 std::mem::replace(&mut this.state, EntityStreamState::Invalid)
362 else {
363 unreachable!()
364 };
365 this.state = EntityStreamState::Active { inner };
366 continue;
367 }
368 Poll::Pending => return Poll::Pending,
369 },
370 EntityStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
371 Poll::Ready(Some(Ok(update))) => {
372 if update.view != this.view {
373 continue;
374 }
375
376 if !this.key_filter.matches(&update.key) {
377 continue;
378 }
379
380 match update.operation {
381 Operation::Delete => {
382 return Poll::Ready(Some(Update::Delete { key: update.key }));
383 }
384 Operation::Upsert | Operation::Create | Operation::Snapshot => {
385 if let Some(data) = update.data {
386 if let Ok(typed) = serde_json::from_value::<T>(data) {
387 return Poll::Ready(Some(Update::Upsert {
388 key: update.key,
389 data: typed,
390 }));
391 }
392 }
393 }
394 Operation::Patch => {
395 if let Some(data) = update.data {
396 match serde_json::from_value::<T>(data) {
397 Ok(typed) => {
398 return Poll::Ready(Some(Update::Patch {
399 key: update.key,
400 data: typed,
401 }));
402 }
403 Err(e) => {
404 tracing::warn!(
405 key = %update.key,
406 error = %e,
407 "Patch failed to deserialize to full type, skipping"
408 );
409 continue;
410 }
411 }
412 }
413 }
414 Operation::Subscribed => {
415 continue;
416 }
417 }
418 }
419 Poll::Ready(Some(Err(_lagged))) => {
420 tracing::warn!("EntityStream lagged behind, some messages were dropped");
421 continue;
422 }
423 Poll::Ready(None) => {
424 return Poll::Ready(None);
425 }
426 Poll::Pending => {
427 return Poll::Pending;
428 }
429 },
430 EntityStreamState::Invalid => {
431 panic!("EntityStream in invalid state");
432 }
433 EntityStreamState::_Phantom(_) => unreachable!(),
434 }
435 }
436 }
437}
438
439pub struct RichEntityStream<T> {
440 state: RichEntityStreamState<T>,
441 view: String,
442 key_filter: KeyFilter,
443 _marker: PhantomData<T>,
444}
445
446enum RichEntityStreamState<T> {
447 Lazy {
448 connection: ConnectionManager,
449 store: SharedStore,
450 subscription_view: String,
451 subscription_key: Option<String>,
452 take: Option<u32>,
453 skip: Option<u32>,
454 with_snapshot: Option<bool>,
455 after: Option<String>,
456 snapshot_limit: Option<usize>,
457 },
458 Active {
459 inner: BroadcastStream<StoreUpdate>,
460 },
461 Subscribing {
462 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
463 inner: BroadcastStream<StoreUpdate>,
464 },
465 Invalid,
466 _Phantom(PhantomData<T>),
467}
468
469impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
470 pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
471 Self {
472 state: RichEntityStreamState::Active {
473 inner: BroadcastStream::new(rx),
474 },
475 view,
476 key_filter: KeyFilter::None,
477 _marker: PhantomData,
478 }
479 }
480
481 pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
482 Self {
483 state: RichEntityStreamState::Active {
484 inner: BroadcastStream::new(rx),
485 },
486 view,
487 key_filter: KeyFilter::Single(key),
488 _marker: PhantomData,
489 }
490 }
491
492 pub fn new_lazy(
493 connection: ConnectionManager,
494 store: SharedStore,
495 entity_name: String,
496 subscription_view: String,
497 key_filter: KeyFilter,
498 subscription_key: Option<String>,
499 ) -> Self {
500 Self::new_lazy_with_opts(
501 connection,
502 store,
503 entity_name,
504 subscription_view,
505 key_filter,
506 subscription_key,
507 None,
508 None,
509 None,
510 None,
511 None,
512 )
513 }
514
515 #[allow(clippy::too_many_arguments)]
516 pub fn new_lazy_with_opts(
517 connection: ConnectionManager,
518 store: SharedStore,
519 entity_name: String,
520 subscription_view: String,
521 key_filter: KeyFilter,
522 subscription_key: Option<String>,
523 take: Option<u32>,
524 skip: Option<u32>,
525 with_snapshot: Option<bool>,
526 after: Option<String>,
527 snapshot_limit: Option<usize>,
528 ) -> Self {
529 Self {
530 state: RichEntityStreamState::Lazy {
531 connection,
532 store,
533 subscription_view,
534 subscription_key,
535 take,
536 skip,
537 with_snapshot,
538 after,
539 snapshot_limit,
540 },
541 view: entity_name,
542 key_filter,
543 _marker: PhantomData,
544 }
545 }
546}
547
548impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for RichEntityStream<T> {
549 type Item = RichUpdate<T>;
550
551 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
552 let this = self.get_mut();
553
554 loop {
555 match &mut this.state {
556 RichEntityStreamState::Lazy { .. } => {
557 let RichEntityStreamState::Lazy {
558 connection,
559 store,
560 subscription_view,
561 subscription_key,
562 take,
563 skip,
564 with_snapshot,
565 after,
566 snapshot_limit,
567 } = std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
568 else {
569 unreachable!()
570 };
571
572 let inner = BroadcastStream::new(store.subscribe());
575
576 let conn = connection.clone();
577 let view = subscription_view.clone();
578 let key = subscription_key.clone();
579 let fut = Box::pin(async move {
580 let opts = SubscriptionOptions {
581 take,
582 skip,
583 with_snapshot,
584 after,
585 snapshot_limit,
586 };
587 conn.ensure_subscription_with_opts(&view, key.as_deref(), opts)
588 .await;
589 });
590
591 this.state = RichEntityStreamState::Subscribing { fut, inner };
592 continue;
593 }
594 RichEntityStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
595 Poll::Ready(()) => {
596 let RichEntityStreamState::Subscribing { inner, .. } =
597 std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
598 else {
599 unreachable!()
600 };
601 this.state = RichEntityStreamState::Active { inner };
602 continue;
603 }
604 Poll::Pending => return Poll::Pending,
605 },
606 RichEntityStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
607 Poll::Ready(Some(Ok(update))) => {
608 if update.view != this.view {
609 continue;
610 }
611
612 if !this.key_filter.matches(&update.key) {
613 continue;
614 }
615
616 let previous: Option<T> =
617 update.previous.and_then(|v| serde_json::from_value(v).ok());
618
619 match update.operation {
620 Operation::Delete => {
621 return Poll::Ready(Some(RichUpdate::Deleted {
622 key: update.key,
623 last_known: previous,
624 }));
625 }
626 Operation::Create | Operation::Snapshot => {
627 if let Some(data) = update.data {
628 if let Ok(typed) = serde_json::from_value::<T>(data) {
629 return Poll::Ready(Some(RichUpdate::Created {
630 key: update.key,
631 data: typed,
632 }));
633 }
634 }
635 }
636 Operation::Upsert | Operation::Patch => {
637 if let Some(data) = update.data {
638 match serde_json::from_value::<T>(data.clone()) {
639 Ok(after) => {
640 if let Some(before) = previous {
641 return Poll::Ready(Some(RichUpdate::Updated {
642 key: update.key,
643 before,
644 after,
645 patch: update.patch,
646 }));
647 } else {
648 return Poll::Ready(Some(RichUpdate::Created {
649 key: update.key,
650 data: after,
651 }));
652 }
653 }
654 Err(e) => {
655 tracing::warn!(
656 key = %update.key,
657 error = %e,
658 "Update failed to deserialize, skipping"
659 );
660 continue;
661 }
662 }
663 }
664 }
665 Operation::Subscribed => {
666 continue;
667 }
668 }
669 }
670 Poll::Ready(Some(Err(_lagged))) => {
671 tracing::warn!(
672 "RichEntityStream lagged behind, some messages were dropped"
673 );
674 continue;
675 }
676 Poll::Ready(None) => {
677 return Poll::Ready(None);
678 }
679 Poll::Pending => {
680 return Poll::Pending;
681 }
682 },
683 RichEntityStreamState::Invalid => {
684 panic!("RichEntityStream in invalid state");
685 }
686 RichEntityStreamState::_Phantom(_) => unreachable!(),
687 }
688 }
689 }
690}
691
692impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
693 pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, RichUpdate<T>, F>
694 where
695 F: FnMut(&RichUpdate<T>) -> bool,
696 {
697 FilteredStream::new(self, predicate)
698 }
699
700 pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, RichUpdate<T>, U, F>
701 where
702 F: FnMut(RichUpdate<T>) -> Option<U>,
703 {
704 FilterMapStream::new(self, f)
705 }
706
707 pub fn map<U, F>(self, f: F) -> MapStream<Self, RichUpdate<T>, U, F>
708 where
709 F: FnMut(RichUpdate<T>) -> U,
710 {
711 MapStream::new(self, f)
712 }
713}
714
715pin_project! {
716 pub struct FilteredStream<S, I, F> {
717 #[pin]
718 inner: S,
719 predicate: F,
720 _item: PhantomData<I>,
721 }
722}
723
724impl<S, I, F> FilteredStream<S, I, F> {
725 pub fn new(inner: S, predicate: F) -> Self {
726 Self {
727 inner,
728 predicate,
729 _item: PhantomData,
730 }
731 }
732}
733
734impl<S, I, F> Stream for FilteredStream<S, I, F>
735where
736 S: Stream<Item = I>,
737 F: FnMut(&I) -> bool,
738{
739 type Item = I;
740
741 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
742 let mut this = self.project();
743 loop {
744 match this.inner.as_mut().poll_next(cx) {
745 Poll::Ready(Some(item)) => {
746 if (this.predicate)(&item) {
747 return Poll::Ready(Some(item));
748 }
749 }
750 Poll::Ready(None) => return Poll::Ready(None),
751 Poll::Pending => return Poll::Pending,
752 }
753 }
754 }
755}
756
757impl<S, I, F> FilteredStream<S, I, F>
758where
759 S: Stream<Item = I>,
760 F: FnMut(&I) -> bool,
761{
762 pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, I, F2>
763 where
764 F2: FnMut(&I) -> bool,
765 {
766 FilteredStream::new(self, predicate)
767 }
768
769 pub fn filter_map<U, F2>(self, f: F2) -> FilterMapStream<Self, I, U, F2>
770 where
771 F2: FnMut(I) -> Option<U>,
772 {
773 FilterMapStream::new(self, f)
774 }
775
776 pub fn map<U, F2>(self, f: F2) -> MapStream<Self, I, U, F2>
777 where
778 F2: FnMut(I) -> U,
779 {
780 MapStream::new(self, f)
781 }
782}
783
784pin_project! {
785 pub struct FilterMapStream<S, I, U, F> {
786 #[pin]
787 inner: S,
788 f: F,
789 _item: PhantomData<(I, U)>,
790 }
791}
792
793impl<S, I, U, F> FilterMapStream<S, I, U, F> {
794 pub fn new(inner: S, f: F) -> Self {
795 Self {
796 inner,
797 f,
798 _item: PhantomData,
799 }
800 }
801}
802
803impl<S, I, U, F> Stream for FilterMapStream<S, I, U, F>
804where
805 S: Stream<Item = I>,
806 F: FnMut(I) -> Option<U>,
807{
808 type Item = U;
809
810 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
811 let mut this = self.project();
812 loop {
813 match this.inner.as_mut().poll_next(cx) {
814 Poll::Ready(Some(item)) => {
815 if let Some(mapped) = (this.f)(item) {
816 return Poll::Ready(Some(mapped));
817 }
818 }
819 Poll::Ready(None) => return Poll::Ready(None),
820 Poll::Pending => return Poll::Pending,
821 }
822 }
823 }
824}
825
826impl<S, I, U, F> FilterMapStream<S, I, U, F>
827where
828 S: Stream<Item = I>,
829 F: FnMut(I) -> Option<U>,
830{
831 pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, U, F2>
832 where
833 F2: FnMut(&U) -> bool,
834 {
835 FilteredStream::new(self, predicate)
836 }
837
838 pub fn filter_map<V, F2>(self, f: F2) -> FilterMapStream<Self, U, V, F2>
839 where
840 F2: FnMut(U) -> Option<V>,
841 {
842 FilterMapStream::new(self, f)
843 }
844
845 pub fn map<V, F2>(self, f: F2) -> MapStream<Self, U, V, F2>
846 where
847 F2: FnMut(U) -> V,
848 {
849 MapStream::new(self, f)
850 }
851}
852
853pin_project! {
854 pub struct MapStream<S, I, U, F> {
855 #[pin]
856 inner: S,
857 f: F,
858 _item: PhantomData<(I, U)>,
859 }
860}
861
862impl<S, I, U, F> MapStream<S, I, U, F> {
863 pub fn new(inner: S, f: F) -> Self {
864 Self {
865 inner,
866 f,
867 _item: PhantomData,
868 }
869 }
870}
871
872impl<S, I, U, F> Stream for MapStream<S, I, U, F>
873where
874 S: Stream<Item = I>,
875 F: FnMut(I) -> U,
876{
877 type Item = U;
878
879 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
880 let this = self.project();
881 match this.inner.poll_next(cx) {
882 Poll::Ready(Some(item)) => Poll::Ready(Some((this.f)(item))),
883 Poll::Ready(None) => Poll::Ready(None),
884 Poll::Pending => Poll::Pending,
885 }
886 }
887}
888
889impl<S, I, U, F> MapStream<S, I, U, F>
890where
891 S: Stream<Item = I>,
892 F: FnMut(I) -> U,
893{
894 pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, U, F2>
895 where
896 F2: FnMut(&U) -> bool,
897 {
898 FilteredStream::new(self, predicate)
899 }
900
901 pub fn filter_map<V, F2>(self, f: F2) -> FilterMapStream<Self, U, V, F2>
902 where
903 F2: FnMut(U) -> Option<V>,
904 {
905 FilterMapStream::new(self, f)
906 }
907
908 pub fn map<V, F2>(self, f: F2) -> MapStream<Self, U, V, F2>
909 where
910 F2: FnMut(U) -> V,
911 {
912 MapStream::new(self, f)
913 }
914}
915
916pub struct UseStream<T> {
923 state: UseStreamState<T>,
924 view: String,
925 key_filter: KeyFilter,
926 _marker: PhantomData<T>,
927}
928
929enum UseStreamState<T> {
930 Lazy {
931 connection: ConnectionManager,
932 store: SharedStore,
933 subscription_view: String,
934 subscription_key: Option<String>,
935 take: Option<u32>,
936 skip: Option<u32>,
937 with_snapshot: Option<bool>,
938 after: Option<String>,
939 snapshot_limit: Option<usize>,
940 },
941 Active {
942 inner: BroadcastStream<StoreUpdate>,
943 },
944 Subscribing {
945 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
946 inner: BroadcastStream<StoreUpdate>,
947 },
948 Invalid,
949 _Phantom(PhantomData<T>),
950}
951
952impl<T: DeserializeOwned + Clone + Send + 'static> UseStream<T> {
953 pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
954 Self {
955 state: UseStreamState::Active {
956 inner: BroadcastStream::new(rx),
957 },
958 view,
959 key_filter: KeyFilter::None,
960 _marker: PhantomData,
961 }
962 }
963
964 pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
965 Self {
966 state: UseStreamState::Active {
967 inner: BroadcastStream::new(rx),
968 },
969 view,
970 key_filter: KeyFilter::Single(key),
971 _marker: PhantomData,
972 }
973 }
974
975 pub fn new_lazy(
976 connection: ConnectionManager,
977 store: SharedStore,
978 entity_name: String,
979 subscription_view: String,
980 key_filter: KeyFilter,
981 subscription_key: Option<String>,
982 ) -> Self {
983 Self::new_lazy_with_opts(
984 connection,
985 store,
986 entity_name,
987 subscription_view,
988 key_filter,
989 subscription_key,
990 None,
991 None,
992 None,
993 None,
994 None,
995 )
996 }
997
998 #[allow(clippy::too_many_arguments)]
999 pub fn new_lazy_with_opts(
1000 connection: ConnectionManager,
1001 store: SharedStore,
1002 entity_name: String,
1003 subscription_view: String,
1004 key_filter: KeyFilter,
1005 subscription_key: Option<String>,
1006 take: Option<u32>,
1007 skip: Option<u32>,
1008 with_snapshot: Option<bool>,
1009 after: Option<String>,
1010 snapshot_limit: Option<usize>,
1011 ) -> Self {
1012 Self {
1013 state: UseStreamState::Lazy {
1014 connection,
1015 store,
1016 subscription_view,
1017 subscription_key,
1018 take,
1019 skip,
1020 with_snapshot,
1021 after,
1022 snapshot_limit,
1023 },
1024 view: entity_name,
1025 key_filter,
1026 _marker: PhantomData,
1027 }
1028 }
1029
1030 pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, T, F>
1032 where
1033 F: FnMut(&T) -> bool,
1034 {
1035 FilteredStream::new(self, predicate)
1036 }
1037
1038 pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, T, U, F>
1040 where
1041 F: FnMut(T) -> Option<U>,
1042 {
1043 FilterMapStream::new(self, f)
1044 }
1045
1046 pub fn map<U, F>(self, f: F) -> MapStream<Self, T, U, F>
1048 where
1049 F: FnMut(T) -> U,
1050 {
1051 MapStream::new(self, f)
1052 }
1053}
1054
1055impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for UseStream<T> {
1056 type Item = T;
1057
1058 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1059 let this = self.get_mut();
1060
1061 loop {
1062 match &mut this.state {
1063 UseStreamState::Lazy { .. } => {
1064 let UseStreamState::Lazy {
1065 connection,
1066 store,
1067 subscription_view,
1068 subscription_key,
1069 take,
1070 skip,
1071 with_snapshot,
1072 after,
1073 snapshot_limit,
1074 } = std::mem::replace(&mut this.state, UseStreamState::Invalid)
1075 else {
1076 unreachable!()
1077 };
1078
1079 let inner = BroadcastStream::new(store.subscribe());
1081
1082 let conn = connection.clone();
1083 let view = subscription_view.clone();
1084 let key = subscription_key.clone();
1085 let fut = Box::pin(async move {
1086 let opts = SubscriptionOptions {
1087 take,
1088 skip,
1089 with_snapshot,
1090 after,
1091 snapshot_limit,
1092 };
1093 conn.ensure_subscription_with_opts(&view, key.as_deref(), opts)
1094 .await;
1095 });
1096
1097 this.state = UseStreamState::Subscribing { fut, inner };
1098 continue;
1099 }
1100 UseStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
1101 Poll::Ready(()) => {
1102 let UseStreamState::Subscribing { inner, .. } =
1103 std::mem::replace(&mut this.state, UseStreamState::Invalid)
1104 else {
1105 unreachable!()
1106 };
1107 this.state = UseStreamState::Active { inner };
1108 continue;
1109 }
1110 Poll::Pending => return Poll::Pending,
1111 },
1112 UseStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
1113 Poll::Ready(Some(Ok(update))) => {
1114 if update.view != this.view {
1115 continue;
1116 }
1117
1118 if !this.key_filter.matches(&update.key) {
1119 continue;
1120 }
1121
1122 match update.operation {
1124 Operation::Delete => {
1125 continue;
1127 }
1128 Operation::Upsert
1129 | Operation::Create
1130 | Operation::Snapshot
1131 | Operation::Patch => {
1132 if let Some(data) = update.data {
1133 match serde_json::from_value::<T>(data) {
1134 Ok(typed) => {
1135 return Poll::Ready(Some(typed));
1136 }
1137 Err(e) => {
1138 tracing::warn!(
1139 key = %update.key,
1140 error = %e,
1141 "UseStream: failed to deserialize entity, skipping"
1142 );
1143 continue;
1144 }
1145 }
1146 }
1147 }
1148 Operation::Subscribed => {
1149 continue;
1150 }
1151 }
1152 }
1153 Poll::Ready(Some(Err(_lagged))) => {
1154 tracing::warn!("UseStream lagged behind, some messages were dropped");
1155 continue;
1156 }
1157 Poll::Ready(None) => {
1158 return Poll::Ready(None);
1159 }
1160 Poll::Pending => {
1161 return Poll::Pending;
1162 }
1163 },
1164 UseStreamState::Invalid => {
1165 panic!("UseStream in invalid state");
1166 }
1167 UseStreamState::_Phantom(_) => unreachable!(),
1168 }
1169 }
1170 }
1171}