1use crate::connection::ConnectionManager;
34use crate::entity::Entity;
35use crate::store::SharedStore;
36use crate::stream::{EntityStream, KeyFilter, RichEntityStream, Update};
37use futures_util::Stream;
38use serde::de::DeserializeOwned;
39use serde::Serialize;
40use std::marker::PhantomData;
41use std::pin::Pin;
42use std::task::{Context, Poll};
43use std::time::Duration;
44
45pub struct ViewHandle<T> {
50 connection: ConnectionManager,
51 store: SharedStore,
52 view_path: String,
53 initial_data_timeout: Duration,
54 _marker: PhantomData<T>,
55}
56
57impl<T> ViewHandle<T>
58where
59 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
60{
61 pub async fn get(&self) -> Vec<T> {
67 self.connection
68 .ensure_subscription(&self.view_path, None)
69 .await;
70 self.store
71 .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
72 .await;
73 self.store.list::<T>(&self.view_path).await
74 }
75
76 pub fn watch(&self) -> WatchBuilder<T>
78 where
79 T: Unpin,
80 {
81 WatchBuilder::new(
82 self.connection.clone(),
83 self.store.clone(),
84 self.view_path.clone(),
85 KeyFilter::None,
86 )
87 }
88
89 pub fn watch_keys(&self, keys: &[&str]) -> WatchBuilder<T>
91 where
92 T: Unpin,
93 {
94 WatchBuilder::new(
95 self.connection.clone(),
96 self.store.clone(),
97 self.view_path.clone(),
98 KeyFilter::Multiple(keys.iter().map(|s| s.to_string()).collect()),
99 )
100 }
101}
102
103pub struct WatchBuilder<T>
105where
106 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
107{
108 connection: ConnectionManager,
109 store: SharedStore,
110 view_path: String,
111 key_filter: KeyFilter,
112 take: Option<u32>,
113 skip: Option<u32>,
114 stream: Option<EntityStream<T>>,
115}
116
117impl<T> WatchBuilder<T>
118where
119 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
120{
121 fn new(
122 connection: ConnectionManager,
123 store: SharedStore,
124 view_path: String,
125 key_filter: KeyFilter,
126 ) -> Self {
127 Self {
128 connection,
129 store,
130 view_path,
131 key_filter,
132 take: None,
133 skip: None,
134 stream: None,
135 }
136 }
137
138 pub fn take(mut self, n: u32) -> Self {
140 self.take = Some(n);
141 self
142 }
143
144 pub fn skip(mut self, n: u32) -> Self {
146 self.skip = Some(n);
147 self
148 }
149
150 pub fn rich(self) -> RichEntityStream<T> {
152 RichEntityStream::new_lazy_with_opts(
153 self.connection,
154 self.store,
155 self.view_path.clone(),
156 self.view_path,
157 self.key_filter,
158 None,
159 self.take,
160 self.skip,
161 )
162 }
163}
164
165impl<T> Stream for WatchBuilder<T>
166where
167 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
168{
169 type Item = Update<T>;
170
171 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
172 let this = self.get_mut();
173
174 if this.stream.is_none() {
175 this.stream = Some(EntityStream::new_lazy_with_opts(
176 this.connection.clone(),
177 this.store.clone(),
178 this.view_path.clone(),
179 this.view_path.clone(),
180 this.key_filter.clone(),
181 None,
182 this.take,
183 this.skip,
184 ));
185 }
186
187 Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
188 }
189}
190
191pub struct ViewBuilder {
195 connection: ConnectionManager,
196 store: SharedStore,
197 initial_data_timeout: Duration,
198}
199
200impl ViewBuilder {
201 pub fn new(
202 connection: ConnectionManager,
203 store: SharedStore,
204 initial_data_timeout: Duration,
205 ) -> Self {
206 Self {
207 connection,
208 store,
209 initial_data_timeout,
210 }
211 }
212
213 pub fn connection(&self) -> &ConnectionManager {
214 &self.connection
215 }
216
217 pub fn store(&self) -> &SharedStore {
218 &self.store
219 }
220
221 pub fn initial_data_timeout(&self) -> Duration {
222 self.initial_data_timeout
223 }
224
225 pub fn view<T>(&self, view_path: &str) -> ViewHandle<T>
227 where
228 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
229 {
230 ViewHandle {
231 connection: self.connection.clone(),
232 store: self.store.clone(),
233 view_path: view_path.to_string(),
234 initial_data_timeout: self.initial_data_timeout,
235 _marker: PhantomData,
236 }
237 }
238}
239
240pub trait Views: Sized {
245 type Entity: Entity;
246
247 fn from_builder(builder: ViewBuilder) -> Self;
248}
249
250pub struct StateView<T> {
252 connection: ConnectionManager,
253 store: SharedStore,
254 view_path: String,
255 initial_data_timeout: Duration,
256 _marker: PhantomData<T>,
257}
258
259impl<T> StateView<T>
260where
261 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
262{
263 pub fn new(
264 connection: ConnectionManager,
265 store: SharedStore,
266 view_path: String,
267 initial_data_timeout: Duration,
268 ) -> Self {
269 Self {
270 connection,
271 store,
272 view_path,
273 initial_data_timeout,
274 _marker: PhantomData,
275 }
276 }
277
278 pub async fn get(&self, key: &str) -> Option<T> {
280 self.connection
281 .ensure_subscription(&self.view_path, Some(key))
282 .await;
283 self.store
284 .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
285 .await;
286 self.store.get::<T>(&self.view_path, key).await
287 }
288
289 pub fn watch(&self, key: &str) -> EntityStream<T> {
291 EntityStream::new_lazy(
292 self.connection.clone(),
293 self.store.clone(),
294 self.view_path.clone(),
295 self.view_path.clone(),
296 KeyFilter::Single(key.to_string()),
297 Some(key.to_string()),
298 )
299 }
300
301 pub fn watch_rich(&self, key: &str) -> RichEntityStream<T> {
303 RichEntityStream::new_lazy(
304 self.connection.clone(),
305 self.store.clone(),
306 self.view_path.clone(),
307 self.view_path.clone(),
308 KeyFilter::Single(key.to_string()),
309 Some(key.to_string()),
310 )
311 }
312}