hyperstack_sdk/
stream.rs

1use crate::frame::Operation;
2use crate::store::StoreUpdate;
3use futures_util::Stream;
4use serde::de::DeserializeOwned;
5use std::marker::PhantomData;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use tokio::sync::broadcast;
9use tokio_stream::wrappers::BroadcastStream;
10
11#[derive(Debug, Clone)]
12pub enum Update<T> {
13    Upsert { key: String, data: T },
14    Patch { key: String, data: T },
15    Delete { key: String },
16}
17
18impl<T> Update<T> {
19    pub fn key(&self) -> &str {
20        match self {
21            Update::Upsert { key, .. } => key,
22            Update::Patch { key, .. } => key,
23            Update::Delete { key } => key,
24        }
25    }
26
27    pub fn data(&self) -> Option<&T> {
28        match self {
29            Update::Upsert { data, .. } => Some(data),
30            Update::Patch { data, .. } => Some(data),
31            Update::Delete { .. } => None,
32        }
33    }
34
35    pub fn is_delete(&self) -> bool {
36        matches!(self, Update::Delete { .. })
37    }
38}
39
40pub struct EntityStream<T> {
41    inner: BroadcastStream<StoreUpdate>,
42    view: String,
43    key_filter: Option<String>,
44    _marker: PhantomData<T>,
45}
46
47impl<T: DeserializeOwned + Clone + Send + 'static> EntityStream<T> {
48    pub fn new(rx: broadcast::Receiver<StoreUpdate>, view: String) -> Self {
49        Self {
50            inner: BroadcastStream::new(rx),
51            view,
52            key_filter: None,
53            _marker: PhantomData,
54        }
55    }
56
57    pub fn new_filtered(rx: broadcast::Receiver<StoreUpdate>, view: String, key: String) -> Self {
58        Self {
59            inner: BroadcastStream::new(rx),
60            view,
61            key_filter: Some(key),
62            _marker: PhantomData,
63        }
64    }
65}
66
67impl<T: DeserializeOwned + Clone + Send + Unpin + 'static> Stream for EntityStream<T> {
68    type Item = Update<T>;
69
70    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71        let this = self.get_mut();
72        loop {
73            match Pin::new(&mut this.inner).poll_next(cx) {
74                Poll::Ready(Some(Ok(update))) => {
75                    if update.view != this.view {
76                        continue;
77                    }
78
79                    if let Some(ref key_filter) = this.key_filter {
80                        if &update.key != key_filter {
81                            continue;
82                        }
83                    }
84
85                    match update.operation {
86                        Operation::Delete => {
87                            return Poll::Ready(Some(Update::Delete { key: update.key }));
88                        }
89                        Operation::Upsert | Operation::Create => {
90                            if let Some(data) = update.data {
91                                if let Ok(typed) = serde_json::from_value::<T>(data) {
92                                    return Poll::Ready(Some(Update::Upsert {
93                                        key: update.key,
94                                        data: typed,
95                                    }));
96                                }
97                            }
98                        }
99                        Operation::Patch => {
100                            if let Some(data) = update.data {
101                                match serde_json::from_value::<T>(data) {
102                                    Ok(typed) => {
103                                        return Poll::Ready(Some(Update::Patch {
104                                            key: update.key,
105                                            data: typed,
106                                        }));
107                                    }
108                                    Err(_) => {
109                                        // Partial patches can't deserialize to full type - skip
110                                        continue;
111                                    }
112                                }
113                            }
114                        }
115                    }
116                }
117                Poll::Ready(Some(Err(_lagged))) => {
118                    // Receiver lagged behind - messages were dropped. Continue to next.
119                    tracing::warn!("EntityStream lagged behind, some messages were dropped");
120                    continue;
121                }
122                Poll::Ready(None) => {
123                    return Poll::Ready(None);
124                }
125                Poll::Pending => {
126                    return Poll::Pending;
127                }
128            }
129        }
130    }
131}