1use crate::frame::Operation;
2use crate::store::StoreUpdate;
3use futures_util::Stream;
4use serde::de::DeserializeOwned;
5use std::marker::PhantomData;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use tokio::sync::broadcast;
9use tokio_stream::wrappers::BroadcastStream;
10
11#[derive(Debug, Clone)]
12pub enum Update<T> {
13 Upsert { key: String, data: T },
14 Patch { key: String, data: T },
15 Delete { key: String },
16}
17
18impl<T> Update<T> {
19 pub fn key(&self) -> &str {
20 match self {
21 Update::Upsert { key, .. } => key,
22 Update::Patch { key, .. } => key,
23 Update::Delete { key } => key,
24 }
25 }
26
27 pub fn data(&self) -> Option<&T> {
28 match self {
29 Update::Upsert { data, .. } => Some(data),
30 Update::Patch { data, .. } => Some(data),
31 Update::Delete { .. } => None,
32 }
33 }
34
35 pub fn is_delete(&self) -> bool {
36 matches!(self, Update::Delete { .. })
37 }
38}
39
40pub struct EntityStream<T> {
41 inner: BroadcastStream<StoreUpdate>,
42 view: String,
43 key_filter: Option<String>,
44 _marker: PhantomData<T>,
45}
46
47impl<T: DeserializeOwned + Clone + Send + 'static> EntityStream<T> {
48 pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
49 Self {
50 inner: BroadcastStream::new(rx),
51 view,
52 key_filter: None,
53 _marker: PhantomData,
54 }
55 }
56
57 pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
58 Self {
59 inner: BroadcastStream::new(rx),
60 view,
61 key_filter: Some(key),
62 _marker: PhantomData,
63 }
64 }
65}
66
67impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for EntityStream<T> {
68 type Item = Update<T>;
69
70 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71 let this = self.get_mut();
72 loop {
73 match Pin::new(&mut this.inner).poll_next(cx) {
74 Poll::Ready(Some(Ok(update))) => {
75 if update.view != this.view {
76 continue;
77 }
78
79 if let Some(ref key_filter) = this.key_filter {
80 if &update.key != key_filter {
81 continue;
82 }
83 }
84
85 match update.operation {
86 Operation::Delete => {
87 return Poll::Ready(Some(Update::Delete { key: update.key }));
88 }
89 Operation::Upsert | Operation::Create => {
90 if let Some(data) = update.data {
91 if let Ok(typed) = serde_json::from_value::<T>(data) {
92 return Poll::Ready(Some(Update::Upsert {
93 key: update.key,
94 data: typed,
95 }));
96 }
97 }
98 }
99 Operation::Patch => {
100 if let Some(data) = update.data {
101 match serde_json::from_value::<T>(data) {
102 Ok(typed) => {
103 return Poll::Ready(Some(Update::Patch {
104 key: update.key,
105 data: typed,
106 }));
107 }
108 Err(_) => {
109 continue;
111 }
112 }
113 }
114 }
115 }
116 }
117 Poll::Ready(Some(Err(_lagged))) => {
118 tracing::warn!("EntityStream lagged behind, some messages were dropped");
120 continue;
121 }
122 Poll::Ready(None) => {
123 return Poll::Ready(None);
124 }
125 Poll::Pending => {
126 return Poll::Pending;
127 }
128 }
129 }
130 }
131}