1use crate::connection::ConnectionManager;
2use crate::frame::Operation;
3use crate::store::{SharedStore, StoreUpdate};
4use futures_util::Stream;
5use pin_project_lite::pin_project;
6use serde::de::DeserializeOwned;
7use std::collections::HashSet;
8use std::future::Future;
9use std::marker::PhantomData;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use tokio::sync::broadcast;
13use tokio_stream::wrappers::BroadcastStream;
14
15#[derive(Debug, Clone)]
16pub enum Update<T> {
17 Upsert { key: String, data: T },
18 Patch { key: String, data: T },
19 Delete { key: String },
20}
21
22#[derive(Debug, Clone)]
23pub enum RichUpdate<T> {
24 Created {
25 key: String,
26 data: T,
27 },
28 Updated {
29 key: String,
30 before: T,
31 after: T,
32 patch: Option<serde_json::Value>,
33 },
34 Deleted {
35 key: String,
36 last_known: Option<T>,
37 },
38}
39
40impl<T> Update<T> {
41 pub fn key(&self) -> &str {
42 match self {
43 Update::Upsert { key, .. } => key,
44 Update::Patch { key, .. } => key,
45 Update::Delete { key } => key,
46 }
47 }
48
49 pub fn data(&self) -> Option<&T> {
50 match self {
51 Update::Upsert { data, .. } => Some(data),
52 Update::Patch { data, .. } => Some(data),
53 Update::Delete { .. } => None,
54 }
55 }
56
57 pub fn is_delete(&self) -> bool {
58 matches!(self, Update::Delete { .. })
59 }
60
61 pub fn into_data(self) -> Option<T> {
62 match self {
63 Update::Upsert { data, .. } => Some(data),
64 Update::Patch { data, .. } => Some(data),
65 Update::Delete { .. } => None,
66 }
67 }
68
69 pub fn has_data(&self) -> bool {
70 matches!(self, Update::Upsert { .. } | Update::Patch { .. })
71 }
72
73 pub fn into_key(self) -> String {
74 match self {
75 Update::Upsert { key, .. } => key,
76 Update::Patch { key, .. } => key,
77 Update::Delete { key } => key,
78 }
79 }
80
81 pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> Update<U> {
82 match self {
83 Update::Upsert { key, data } => Update::Upsert { key, data: f(data) },
84 Update::Patch { key, data } => Update::Patch { key, data: f(data) },
85 Update::Delete { key } => Update::Delete { key },
86 }
87 }
88}
89
90impl<T> RichUpdate<T> {
91 pub fn key(&self) -> &str {
92 match self {
93 RichUpdate::Created { key, .. } => key,
94 RichUpdate::Updated { key, .. } => key,
95 RichUpdate::Deleted { key, .. } => key,
96 }
97 }
98
99 pub fn data(&self) -> Option<&T> {
100 match self {
101 RichUpdate::Created { data, .. } => Some(data),
102 RichUpdate::Updated { after, .. } => Some(after),
103 RichUpdate::Deleted { last_known, .. } => last_known.as_ref(),
104 }
105 }
106
107 pub fn before(&self) -> Option<&T> {
108 match self {
109 RichUpdate::Created { .. } => None,
110 RichUpdate::Updated { before, .. } => Some(before),
111 RichUpdate::Deleted { last_known, .. } => last_known.as_ref(),
112 }
113 }
114
115 pub fn into_data(self) -> Option<T> {
116 match self {
117 RichUpdate::Created { data, .. } => Some(data),
118 RichUpdate::Updated { after, .. } => Some(after),
119 RichUpdate::Deleted { last_known, .. } => last_known,
120 }
121 }
122
123 pub fn is_created(&self) -> bool {
124 matches!(self, RichUpdate::Created { .. })
125 }
126
127 pub fn is_updated(&self) -> bool {
128 matches!(self, RichUpdate::Updated { .. })
129 }
130
131 pub fn is_deleted(&self) -> bool {
132 matches!(self, RichUpdate::Deleted { .. })
133 }
134
135 pub fn patch(&self) -> Option<&serde_json::Value> {
136 match self {
137 RichUpdate::Updated { patch, .. } => patch.as_ref(),
138 _ => None,
139 }
140 }
141
142 pub fn has_patch_field(&self, field: &str) -> bool {
143 self.patch()
144 .and_then(|p| p.as_object())
145 .map(|obj| obj.contains_key(field))
146 .unwrap_or(false)
147 }
148}
149
150#[derive(Clone)]
151pub enum KeyFilter {
152 None,
153 Single(String),
154 Multiple(HashSet<String>),
155}
156
157impl KeyFilter {
158 fn matches(&self, key: &str) -> bool {
159 match self {
160 KeyFilter::None => true,
161 KeyFilter::Single(k) => k == key,
162 KeyFilter::Multiple(keys) => keys.contains(key),
163 }
164 }
165}
166
167pub struct EntityStream<T> {
168 state: EntityStreamState<T>,
169 view: String,
170 key_filter: KeyFilter,
171 _marker: PhantomData<T>,
172}
173
174enum EntityStreamState<T> {
175 Lazy {
176 connection: ConnectionManager,
177 store: SharedStore,
178 subscription_view: String,
179 subscription_key: Option<String>,
180 },
181 Active {
182 inner: BroadcastStream<StoreUpdate>,
183 },
184 Subscribing {
185 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
186 store: SharedStore,
187 },
188 Invalid,
189 _Phantom(PhantomData<T>),
190}
191
192impl<T: DeserializeOwned + Clone + Send + 'static> EntityStream<T> {
193 pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
194 Self {
195 state: EntityStreamState::Active {
196 inner: BroadcastStream::new(rx),
197 },
198 view,
199 key_filter: KeyFilter::None,
200 _marker: PhantomData,
201 }
202 }
203
204 pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
205 Self {
206 state: EntityStreamState::Active {
207 inner: BroadcastStream::new(rx),
208 },
209 view,
210 key_filter: KeyFilter::Single(key),
211 _marker: PhantomData,
212 }
213 }
214
215 pub fn new_multi_filtered(
216 rx: broadcast::Receiver<StoreUpdate>,
217 view: String,
218 keys: HashSet<String>,
219 ) -> Self {
220 Self {
221 state: EntityStreamState::Active {
222 inner: BroadcastStream::new(rx),
223 },
224 view,
225 key_filter: KeyFilter::Multiple(keys),
226 _marker: PhantomData,
227 }
228 }
229
230 pub fn new_lazy(
231 connection: ConnectionManager,
232 store: SharedStore,
233 entity_name: String,
234 subscription_view: String,
235 key_filter: KeyFilter,
236 subscription_key: Option<String>,
237 ) -> Self {
238 Self {
239 state: EntityStreamState::Lazy {
240 connection,
241 store,
242 subscription_view,
243 subscription_key,
244 },
245 view: entity_name,
246 key_filter,
247 _marker: PhantomData,
248 }
249 }
250
251 pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, Update<T>, F>
252 where
253 F: FnMut(&Update<T>) -> bool,
254 {
255 FilteredStream::new(self, predicate)
256 }
257
258 pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, Update<T>, U, F>
259 where
260 F: FnMut(Update<T>) -> Option<U>,
261 {
262 FilterMapStream::new(self, f)
263 }
264
265 pub fn map<U, F>(self, f: F) -> MapStream<Self, Update<T>, U, F>
266 where
267 F: FnMut(Update<T>) -> U,
268 {
269 MapStream::new(self, f)
270 }
271}
272
273impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for EntityStream<T> {
274 type Item = Update<T>;
275
276 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
277 let this = self.get_mut();
278
279 loop {
280 match &mut this.state {
281 EntityStreamState::Lazy { .. } => {
282 let EntityStreamState::Lazy {
283 connection,
284 store,
285 subscription_view,
286 subscription_key,
287 } = std::mem::replace(&mut this.state, EntityStreamState::Invalid)
288 else {
289 unreachable!()
290 };
291
292 let conn = connection.clone();
293 let view = subscription_view.clone();
294 let key = subscription_key.clone();
295 let fut = Box::pin(async move {
296 conn.ensure_subscription(&view, key.as_deref()).await;
297 });
298
299 this.state = EntityStreamState::Subscribing { fut, store };
300 continue;
301 }
302 EntityStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
303 Poll::Ready(()) => {
304 let EntityStreamState::Subscribing { store, .. } =
305 std::mem::replace(&mut this.state, EntityStreamState::Invalid)
306 else {
307 unreachable!()
308 };
309 this.state = EntityStreamState::Active {
310 inner: BroadcastStream::new(store.subscribe()),
311 };
312 continue;
313 }
314 Poll::Pending => return Poll::Pending,
315 },
316 EntityStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
317 Poll::Ready(Some(Ok(update))) => {
318 if update.view != this.view {
319 continue;
320 }
321
322 if !this.key_filter.matches(&update.key) {
323 continue;
324 }
325
326 match update.operation {
327 Operation::Delete => {
328 return Poll::Ready(Some(Update::Delete { key: update.key }));
329 }
330 Operation::Upsert | Operation::Create | Operation::Snapshot => {
331 if let Some(data) = update.data {
332 if let Ok(typed) = serde_json::from_value::<T>(data) {
333 return Poll::Ready(Some(Update::Upsert {
334 key: update.key,
335 data: typed,
336 }));
337 }
338 }
339 }
340 Operation::Patch => {
341 if let Some(data) = update.data {
342 match serde_json::from_value::<T>(data) {
343 Ok(typed) => {
344 return Poll::Ready(Some(Update::Patch {
345 key: update.key,
346 data: typed,
347 }));
348 }
349 Err(e) => {
350 tracing::warn!(
351 key = %update.key,
352 error = %e,
353 "Patch failed to deserialize to full type, skipping"
354 );
355 continue;
356 }
357 }
358 }
359 }
360 }
361 }
362 Poll::Ready(Some(Err(_lagged))) => {
363 tracing::warn!("EntityStream lagged behind, some messages were dropped");
364 continue;
365 }
366 Poll::Ready(None) => {
367 return Poll::Ready(None);
368 }
369 Poll::Pending => {
370 return Poll::Pending;
371 }
372 },
373 EntityStreamState::Invalid => {
374 panic!("EntityStream in invalid state");
375 }
376 EntityStreamState::_Phantom(_) => unreachable!(),
377 }
378 }
379 }
380}
381
382pub struct RichEntityStream<T> {
383 state: RichEntityStreamState<T>,
384 view: String,
385 key_filter: KeyFilter,
386 _marker: PhantomData<T>,
387}
388
389enum RichEntityStreamState<T> {
390 Lazy {
391 connection: ConnectionManager,
392 store: SharedStore,
393 subscription_view: String,
394 subscription_key: Option<String>,
395 },
396 Active {
397 inner: BroadcastStream<StoreUpdate>,
398 },
399 Subscribing {
400 fut: Pin<Box<dyn Future<Output = ()> + Send>>,
401 store: SharedStore,
402 },
403 Invalid,
404 _Phantom(PhantomData<T>),
405}
406
407impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
408 pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
409 Self {
410 state: RichEntityStreamState::Active {
411 inner: BroadcastStream::new(rx),
412 },
413 view,
414 key_filter: KeyFilter::None,
415 _marker: PhantomData,
416 }
417 }
418
419 pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
420 Self {
421 state: RichEntityStreamState::Active {
422 inner: BroadcastStream::new(rx),
423 },
424 view,
425 key_filter: KeyFilter::Single(key),
426 _marker: PhantomData,
427 }
428 }
429
430 pub fn new_lazy(
431 connection: ConnectionManager,
432 store: SharedStore,
433 entity_name: String,
434 subscription_view: String,
435 key_filter: KeyFilter,
436 subscription_key: Option<String>,
437 ) -> Self {
438 Self {
439 state: RichEntityStreamState::Lazy {
440 connection,
441 store,
442 subscription_view,
443 subscription_key,
444 },
445 view: entity_name,
446 key_filter,
447 _marker: PhantomData,
448 }
449 }
450}
451
452impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for RichEntityStream<T> {
453 type Item = RichUpdate<T>;
454
455 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
456 let this = self.get_mut();
457
458 loop {
459 match &mut this.state {
460 RichEntityStreamState::Lazy { .. } => {
461 let RichEntityStreamState::Lazy {
462 connection,
463 store,
464 subscription_view,
465 subscription_key,
466 } = std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
467 else {
468 unreachable!()
469 };
470
471 let conn = connection.clone();
472 let view = subscription_view.clone();
473 let key = subscription_key.clone();
474 let fut = Box::pin(async move {
475 conn.ensure_subscription(&view, key.as_deref()).await;
476 });
477
478 this.state = RichEntityStreamState::Subscribing { fut, store };
479 continue;
480 }
481 RichEntityStreamState::Subscribing { fut, .. } => match fut.as_mut().poll(cx) {
482 Poll::Ready(()) => {
483 let RichEntityStreamState::Subscribing { store, .. } =
484 std::mem::replace(&mut this.state, RichEntityStreamState::Invalid)
485 else {
486 unreachable!()
487 };
488 this.state = RichEntityStreamState::Active {
489 inner: BroadcastStream::new(store.subscribe()),
490 };
491 continue;
492 }
493 Poll::Pending => return Poll::Pending,
494 },
495 RichEntityStreamState::Active { inner } => match Pin::new(inner).poll_next(cx) {
496 Poll::Ready(Some(Ok(update))) => {
497 if update.view != this.view {
498 continue;
499 }
500
501 if !this.key_filter.matches(&update.key) {
502 continue;
503 }
504
505 let previous: Option<T> =
506 update.previous.and_then(|v| serde_json::from_value(v).ok());
507
508 match update.operation {
509 Operation::Delete => {
510 return Poll::Ready(Some(RichUpdate::Deleted {
511 key: update.key,
512 last_known: previous,
513 }));
514 }
515 Operation::Create | Operation::Snapshot => {
516 if let Some(data) = update.data {
517 if let Ok(typed) = serde_json::from_value::<T>(data) {
518 return Poll::Ready(Some(RichUpdate::Created {
519 key: update.key,
520 data: typed,
521 }));
522 }
523 }
524 }
525 Operation::Upsert | Operation::Patch => {
526 if let Some(data) = update.data {
527 match serde_json::from_value::<T>(data.clone()) {
528 Ok(after) => {
529 if let Some(before) = previous {
530 return Poll::Ready(Some(RichUpdate::Updated {
531 key: update.key,
532 before,
533 after,
534 patch: update.patch,
535 }));
536 } else {
537 return Poll::Ready(Some(RichUpdate::Created {
538 key: update.key,
539 data: after,
540 }));
541 }
542 }
543 Err(e) => {
544 tracing::warn!(
545 key = %update.key,
546 error = %e,
547 "Update failed to deserialize, skipping"
548 );
549 continue;
550 }
551 }
552 }
553 }
554 }
555 }
556 Poll::Ready(Some(Err(_lagged))) => {
557 tracing::warn!(
558 "RichEntityStream lagged behind, some messages were dropped"
559 );
560 continue;
561 }
562 Poll::Ready(None) => {
563 return Poll::Ready(None);
564 }
565 Poll::Pending => {
566 return Poll::Pending;
567 }
568 },
569 RichEntityStreamState::Invalid => {
570 panic!("RichEntityStream in invalid state");
571 }
572 RichEntityStreamState::_Phantom(_) => unreachable!(),
573 }
574 }
575 }
576}
577
578impl<T: DeserializeOwned + Clone + Send + 'static> RichEntityStream<T> {
579 pub fn filter<F>(self, predicate: F) -> FilteredStream<Self, RichUpdate<T>, F>
580 where
581 F: FnMut(&RichUpdate<T>) -> bool,
582 {
583 FilteredStream::new(self, predicate)
584 }
585
586 pub fn filter_map<U, F>(self, f: F) -> FilterMapStream<Self, RichUpdate<T>, U, F>
587 where
588 F: FnMut(RichUpdate<T>) -> Option<U>,
589 {
590 FilterMapStream::new(self, f)
591 }
592
593 pub fn map<U, F>(self, f: F) -> MapStream<Self, RichUpdate<T>, U, F>
594 where
595 F: FnMut(RichUpdate<T>) -> U,
596 {
597 MapStream::new(self, f)
598 }
599}
600
601pin_project! {
602 pub struct FilteredStream<S, I, F> {
603 #[pin]
604 inner: S,
605 predicate: F,
606 _item: PhantomData<I>,
607 }
608}
609
610impl<S, I, F> FilteredStream<S, I, F> {
611 pub fn new(inner: S, predicate: F) -> Self {
612 Self {
613 inner,
614 predicate,
615 _item: PhantomData,
616 }
617 }
618}
619
620impl<S, I, F> Stream for FilteredStream<S, I, F>
621where
622 S: Stream<Item = I>,
623 F: FnMut(&I) -> bool,
624{
625 type Item = I;
626
627 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
628 let mut this = self.project();
629 loop {
630 match this.inner.as_mut().poll_next(cx) {
631 Poll::Ready(Some(item)) => {
632 if (this.predicate)(&item) {
633 return Poll::Ready(Some(item));
634 }
635 }
636 Poll::Ready(None) => return Poll::Ready(None),
637 Poll::Pending => return Poll::Pending,
638 }
639 }
640 }
641}
642
643impl<S, I, F> FilteredStream<S, I, F>
644where
645 S: Stream<Item = I>,
646 F: FnMut(&I) -> bool,
647{
648 pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, I, F2>
649 where
650 F2: FnMut(&I) -> bool,
651 {
652 FilteredStream::new(self, predicate)
653 }
654
655 pub fn filter_map<U, F2>(self, f: F2) -> FilterMapStream<Self, I, U, F2>
656 where
657 F2: FnMut(I) -> Option<U>,
658 {
659 FilterMapStream::new(self, f)
660 }
661
662 pub fn map<U, F2>(self, f: F2) -> MapStream<Self, I, U, F2>
663 where
664 F2: FnMut(I) -> U,
665 {
666 MapStream::new(self, f)
667 }
668}
669
670pin_project! {
671 pub struct FilterMapStream<S, I, U, F> {
672 #[pin]
673 inner: S,
674 f: F,
675 _item: PhantomData<(I, U)>,
676 }
677}
678
679impl<S, I, U, F> FilterMapStream<S, I, U, F> {
680 pub fn new(inner: S, f: F) -> Self {
681 Self {
682 inner,
683 f,
684 _item: PhantomData,
685 }
686 }
687}
688
689impl<S, I, U, F> Stream for FilterMapStream<S, I, U, F>
690where
691 S: Stream<Item = I>,
692 F: FnMut(I) -> Option<U>,
693{
694 type Item = U;
695
696 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
697 let mut this = self.project();
698 loop {
699 match this.inner.as_mut().poll_next(cx) {
700 Poll::Ready(Some(item)) => {
701 if let Some(mapped) = (this.f)(item) {
702 return Poll::Ready(Some(mapped));
703 }
704 }
705 Poll::Ready(None) => return Poll::Ready(None),
706 Poll::Pending => return Poll::Pending,
707 }
708 }
709 }
710}
711
712impl<S, I, U, F> FilterMapStream<S, I, U, F>
713where
714 S: Stream<Item = I>,
715 F: FnMut(I) -> Option<U>,
716{
717 pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, U, F2>
718 where
719 F2: FnMut(&U) -> bool,
720 {
721 FilteredStream::new(self, predicate)
722 }
723
724 pub fn filter_map<V, F2>(self, f: F2) -> FilterMapStream<Self, U, V, F2>
725 where
726 F2: FnMut(U) -> Option<V>,
727 {
728 FilterMapStream::new(self, f)
729 }
730
731 pub fn map<V, F2>(self, f: F2) -> MapStream<Self, U, V, F2>
732 where
733 F2: FnMut(U) -> V,
734 {
735 MapStream::new(self, f)
736 }
737}
738
739pin_project! {
740 pub struct MapStream<S, I, U, F> {
741 #[pin]
742 inner: S,
743 f: F,
744 _item: PhantomData<(I, U)>,
745 }
746}
747
748impl<S, I, U, F> MapStream<S, I, U, F> {
749 pub fn new(inner: S, f: F) -> Self {
750 Self {
751 inner,
752 f,
753 _item: PhantomData,
754 }
755 }
756}
757
758impl<S, I, U, F> Stream for MapStream<S, I, U, F>
759where
760 S: Stream<Item = I>,
761 F: FnMut(I) -> U,
762{
763 type Item = U;
764
765 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
766 let this = self.project();
767 match this.inner.poll_next(cx) {
768 Poll::Ready(Some(item)) => Poll::Ready(Some((this.f)(item))),
769 Poll::Ready(None) => Poll::Ready(None),
770 Poll::Pending => Poll::Pending,
771 }
772 }
773}
774
775impl<S, I, U, F> MapStream<S, I, U, F>
776where
777 S: Stream<Item = I>,
778 F: FnMut(I) -> U,
779{
780 pub fn filter<F2>(self, predicate: F2) -> FilteredStream<Self, U, F2>
781 where
782 F2: FnMut(&U) -> bool,
783 {
784 FilteredStream::new(self, predicate)
785 }
786
787 pub fn filter_map<V, F2>(self, f: F2) -> FilterMapStream<Self, U, V, F2>
788 where
789 F2: FnMut(U) -> Option<V>,
790 {
791 FilterMapStream::new(self, f)
792 }
793
794 pub fn map<V, F2>(self, f: F2) -> MapStream<Self, U, V, F2>
795 where
796 F2: FnMut(U) -> V,
797 {
798 MapStream::new(self, f)
799 }
800}