1use crate::connection::ConnectionManager;
34use crate::store::SharedStore;
35use crate::stream::{EntityStream, KeyFilter, RichEntityStream, Update, UseStream};
36use futures_util::Stream;
37use serde::de::DeserializeOwned;
38use serde::Serialize;
39use std::collections::HashMap;
40use std::marker::PhantomData;
41use std::pin::Pin;
42use std::task::{Context, Poll};
43use std::time::Duration;
44
45pub struct ViewHandle<T> {
50 connection: ConnectionManager,
51 store: SharedStore,
52 view_path: String,
53 initial_data_timeout: Duration,
54 _marker: PhantomData<T>,
55}
56
57impl<T> ViewHandle<T>
58where
59 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
60{
61 pub async fn get(&self) -> Vec<T> {
67 self.connection
68 .ensure_subscription(&self.view_path, None)
69 .await;
70 self.store
71 .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
72 .await;
73 self.store.list::<T>(&self.view_path).await
74 }
75
76 pub fn get_sync(&self) -> Vec<T> {
81 self.store.list_sync::<T>(&self.view_path)
82 }
83
84 pub fn listen(&self) -> UseBuilder<T>
89 where
90 T: Unpin,
91 {
92 UseBuilder::new(
93 self.connection.clone(),
94 self.store.clone(),
95 self.view_path.clone(),
96 KeyFilter::None,
97 )
98 }
99
100 pub fn watch(&self) -> WatchBuilder<T>
102 where
103 T: Unpin,
104 {
105 WatchBuilder::new(
106 self.connection.clone(),
107 self.store.clone(),
108 self.view_path.clone(),
109 KeyFilter::None,
110 )
111 }
112
113 pub fn watch_rich(&self) -> RichWatchBuilder<T>
115 where
116 T: Unpin,
117 {
118 RichWatchBuilder::new(
119 self.connection.clone(),
120 self.store.clone(),
121 self.view_path.clone(),
122 KeyFilter::None,
123 )
124 }
125
126 pub fn watch_keys(&self, keys: &[&str]) -> WatchBuilder<T>
128 where
129 T: Unpin,
130 {
131 WatchBuilder::new(
132 self.connection.clone(),
133 self.store.clone(),
134 self.view_path.clone(),
135 KeyFilter::Multiple(keys.iter().map(|s| s.to_string()).collect()),
136 )
137 }
138}
139
140pub struct UseBuilder<T>
142where
143 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
144{
145 connection: ConnectionManager,
146 store: SharedStore,
147 view_path: String,
148 key_filter: KeyFilter,
149 take: Option<u32>,
150 skip: Option<u32>,
151 filters: Option<HashMap<String, String>>,
152 stream: Option<UseStream<T>>,
153}
154
155impl<T> UseBuilder<T>
156where
157 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
158{
159 fn new(
160 connection: ConnectionManager,
161 store: SharedStore,
162 view_path: String,
163 key_filter: KeyFilter,
164 ) -> Self {
165 Self {
166 connection,
167 store,
168 view_path,
169 key_filter,
170 take: None,
171 skip: None,
172 filters: None,
173 stream: None,
174 }
175 }
176
177 pub fn take(mut self, n: u32) -> Self {
179 self.take = Some(n);
180 self
181 }
182
183 pub fn skip(mut self, n: u32) -> Self {
185 self.skip = Some(n);
186 self
187 }
188
189 pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
191 self.filters
192 .get_or_insert_with(HashMap::new)
193 .insert(key.into(), value.into());
194 self
195 }
196}
197
198impl<T> Stream for UseBuilder<T>
199where
200 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
201{
202 type Item = T;
203
204 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
205 let this = self.get_mut();
206
207 if this.stream.is_none() {
208 this.stream = Some(UseStream::new_lazy_with_opts(
209 this.connection.clone(),
210 this.store.clone(),
211 this.view_path.clone(),
212 this.view_path.clone(),
213 this.key_filter.clone(),
214 None,
215 this.take,
216 this.skip,
217 ));
218 }
219
220 Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
221 }
222}
223
224pub struct WatchBuilder<T>
226where
227 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
228{
229 connection: ConnectionManager,
230 store: SharedStore,
231 view_path: String,
232 key_filter: KeyFilter,
233 take: Option<u32>,
234 skip: Option<u32>,
235 filters: Option<HashMap<String, String>>,
236 stream: Option<EntityStream<T>>,
237}
238
239impl<T> WatchBuilder<T>
240where
241 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
242{
243 fn new(
244 connection: ConnectionManager,
245 store: SharedStore,
246 view_path: String,
247 key_filter: KeyFilter,
248 ) -> Self {
249 Self {
250 connection,
251 store,
252 view_path,
253 key_filter,
254 take: None,
255 skip: None,
256 filters: None,
257 stream: None,
258 }
259 }
260
261 pub fn take(mut self, n: u32) -> Self {
263 self.take = Some(n);
264 self
265 }
266
267 pub fn skip(mut self, n: u32) -> Self {
269 self.skip = Some(n);
270 self
271 }
272
273 pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
275 self.filters
276 .get_or_insert_with(HashMap::new)
277 .insert(key.into(), value.into());
278 self
279 }
280
281 pub fn rich(self) -> RichEntityStream<T> {
283 RichEntityStream::new_lazy_with_opts(
284 self.connection,
285 self.store,
286 self.view_path.clone(),
287 self.view_path,
288 self.key_filter,
289 None,
290 self.take,
291 self.skip,
292 )
293 }
294}
295
296impl<T> Stream for WatchBuilder<T>
297where
298 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
299{
300 type Item = Update<T>;
301
302 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
303 let this = self.get_mut();
304
305 if this.stream.is_none() {
306 this.stream = Some(EntityStream::new_lazy_with_opts(
307 this.connection.clone(),
308 this.store.clone(),
309 this.view_path.clone(),
310 this.view_path.clone(),
311 this.key_filter.clone(),
312 None,
313 this.take,
314 this.skip,
315 ));
316 }
317
318 Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
319 }
320}
321
322pub struct RichWatchBuilder<T>
324where
325 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
326{
327 connection: ConnectionManager,
328 store: SharedStore,
329 view_path: String,
330 key_filter: KeyFilter,
331 take: Option<u32>,
332 skip: Option<u32>,
333 filters: Option<HashMap<String, String>>,
334 stream: Option<RichEntityStream<T>>,
335}
336
337impl<T> RichWatchBuilder<T>
338where
339 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
340{
341 fn new(
342 connection: ConnectionManager,
343 store: SharedStore,
344 view_path: String,
345 key_filter: KeyFilter,
346 ) -> Self {
347 Self {
348 connection,
349 store,
350 view_path,
351 key_filter,
352 take: None,
353 skip: None,
354 filters: None,
355 stream: None,
356 }
357 }
358
359 pub fn take(mut self, n: u32) -> Self {
360 self.take = Some(n);
361 self
362 }
363
364 pub fn skip(mut self, n: u32) -> Self {
365 self.skip = Some(n);
366 self
367 }
368
369 pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
370 self.filters
371 .get_or_insert_with(HashMap::new)
372 .insert(key.into(), value.into());
373 self
374 }
375}
376
377impl<T> Stream for RichWatchBuilder<T>
378where
379 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
380{
381 type Item = crate::stream::RichUpdate<T>;
382
383 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
384 let this = self.get_mut();
385
386 if this.stream.is_none() {
387 this.stream = Some(RichEntityStream::new_lazy_with_opts(
388 this.connection.clone(),
389 this.store.clone(),
390 this.view_path.clone(),
391 this.view_path.clone(),
392 this.key_filter.clone(),
393 None,
394 this.take,
395 this.skip,
396 ));
397 }
398
399 Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
400 }
401}
402
403pub struct ViewBuilder {
407 connection: ConnectionManager,
408 store: SharedStore,
409 initial_data_timeout: Duration,
410}
411
412impl ViewBuilder {
413 pub fn new(
414 connection: ConnectionManager,
415 store: SharedStore,
416 initial_data_timeout: Duration,
417 ) -> Self {
418 Self {
419 connection,
420 store,
421 initial_data_timeout,
422 }
423 }
424
425 pub fn connection(&self) -> &ConnectionManager {
426 &self.connection
427 }
428
429 pub fn store(&self) -> &SharedStore {
430 &self.store
431 }
432
433 pub fn initial_data_timeout(&self) -> Duration {
434 self.initial_data_timeout
435 }
436
437 pub fn view<T>(&self, view_path: &str) -> ViewHandle<T>
439 where
440 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
441 {
442 ViewHandle {
443 connection: self.connection.clone(),
444 store: self.store.clone(),
445 view_path: view_path.to_string(),
446 initial_data_timeout: self.initial_data_timeout,
447 _marker: PhantomData,
448 }
449 }
450}
451
452pub trait Views: Sized + Send + Sync + 'static {
454 fn from_builder(builder: ViewBuilder) -> Self;
455}
456
457pub struct StateView<T> {
459 connection: ConnectionManager,
460 store: SharedStore,
461 view_path: String,
462 initial_data_timeout: Duration,
463 _marker: PhantomData<T>,
464}
465
466impl<T> StateView<T>
467where
468 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
469{
470 pub fn new(
471 connection: ConnectionManager,
472 store: SharedStore,
473 view_path: String,
474 initial_data_timeout: Duration,
475 ) -> Self {
476 Self {
477 connection,
478 store,
479 view_path,
480 initial_data_timeout,
481 _marker: PhantomData,
482 }
483 }
484
485 pub async fn get(&self, key: &str) -> Option<T> {
487 self.connection
488 .ensure_subscription(&self.view_path, Some(key))
489 .await;
490 self.store
491 .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
492 .await;
493 self.store.get::<T>(&self.view_path, key).await
494 }
495
496 pub fn get_sync(&self, key: &str) -> Option<T> {
498 self.store.get_sync::<T>(&self.view_path, key)
499 }
500
501 pub fn listen(&self, key: &str) -> UseStream<T>
503 where
504 T: Unpin,
505 {
506 UseStream::new_lazy(
507 self.connection.clone(),
508 self.store.clone(),
509 self.view_path.clone(),
510 self.view_path.clone(),
511 KeyFilter::Single(key.to_string()),
512 Some(key.to_string()),
513 )
514 }
515
516 pub fn watch(&self, key: &str) -> EntityStream<T> {
518 EntityStream::new_lazy(
519 self.connection.clone(),
520 self.store.clone(),
521 self.view_path.clone(),
522 self.view_path.clone(),
523 KeyFilter::Single(key.to_string()),
524 Some(key.to_string()),
525 )
526 }
527
528 pub fn watch_rich(&self, key: &str) -> RichEntityStream<T> {
530 RichEntityStream::new_lazy(
531 self.connection.clone(),
532 self.store.clone(),
533 self.view_path.clone(),
534 self.view_path.clone(),
535 KeyFilter::Single(key.to_string()),
536 Some(key.to_string()),
537 )
538 }
539}