1use crate::connection::ConnectionManager;
34use crate::store::SharedStore;
35use crate::stream::{EntityStream, KeyFilter, RichEntityStream, Update, UseStream};
36use futures_util::Stream;
37use serde::de::DeserializeOwned;
38use serde::Serialize;
39use std::collections::HashMap;
40use std::marker::PhantomData;
41use std::pin::Pin;
42use std::task::{Context, Poll};
43use std::time::Duration;
44
45pub struct ViewHandle<T> {
50 connection: ConnectionManager,
51 store: SharedStore,
52 view_path: String,
53 initial_data_timeout: Duration,
54 _marker: PhantomData<T>,
55}
56
57impl<T> ViewHandle<T>
58where
59 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
60{
61 pub async fn get(&self) -> Vec<T> {
67 self.connection
68 .ensure_subscription(&self.view_path, None)
69 .await;
70 self.store
71 .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
72 .await;
73 self.store.list::<T>(&self.view_path).await
74 }
75
76 pub fn get_sync(&self) -> Vec<T> {
81 self.store.list_sync::<T>(&self.view_path)
82 }
83
84 pub fn listen(&self) -> UseBuilder<T>
89 where
90 T: Unpin,
91 {
92 UseBuilder::new(
93 self.connection.clone(),
94 self.store.clone(),
95 self.view_path.clone(),
96 KeyFilter::None,
97 )
98 }
99
100 pub fn watch(&self) -> WatchBuilder<T>
102 where
103 T: Unpin,
104 {
105 WatchBuilder::new(
106 self.connection.clone(),
107 self.store.clone(),
108 self.view_path.clone(),
109 KeyFilter::None,
110 )
111 }
112
113 pub fn watch_rich(&self) -> RichWatchBuilder<T>
115 where
116 T: Unpin,
117 {
118 RichWatchBuilder::new(
119 self.connection.clone(),
120 self.store.clone(),
121 self.view_path.clone(),
122 KeyFilter::None,
123 )
124 }
125
126 pub fn watch_keys(&self, keys: &[&str]) -> WatchBuilder<T>
128 where
129 T: Unpin,
130 {
131 WatchBuilder::new(
132 self.connection.clone(),
133 self.store.clone(),
134 self.view_path.clone(),
135 KeyFilter::Multiple(keys.iter().map(|s| s.to_string()).collect()),
136 )
137 }
138}
139
140pub struct UseBuilder<T>
142where
143 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
144{
145 connection: ConnectionManager,
146 store: SharedStore,
147 view_path: String,
148 key_filter: KeyFilter,
149 take: Option<u32>,
150 skip: Option<u32>,
151 filters: Option<HashMap<String, String>>,
152 with_snapshot: Option<bool>,
153 after: Option<String>,
154 snapshot_limit: Option<usize>,
155 stream: Option<UseStream<T>>,
156}
157
158impl<T> UseBuilder<T>
159where
160 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
161{
162 fn new(
163 connection: ConnectionManager,
164 store: SharedStore,
165 view_path: String,
166 key_filter: KeyFilter,
167 ) -> Self {
168 Self {
169 connection,
170 store,
171 view_path,
172 key_filter,
173 take: None,
174 skip: None,
175 filters: None,
176 with_snapshot: None,
177 after: None,
178 snapshot_limit: None,
179 stream: None,
180 }
181 }
182
183 pub fn take(mut self, n: u32) -> Self {
185 self.take = Some(n);
186 self
187 }
188
189 pub fn skip(mut self, n: u32) -> Self {
191 self.skip = Some(n);
192 self
193 }
194
195 pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
197 self.filters
198 .get_or_insert_with(HashMap::new)
199 .insert(key.into(), value.into());
200 self
201 }
202
203 pub fn with_snapshot(mut self, with_snapshot: bool) -> Self {
205 self.with_snapshot = Some(with_snapshot);
206 self
207 }
208
209 pub fn after(mut self, cursor: impl Into<String>) -> Self {
211 self.after = Some(cursor.into());
212 self
213 }
214
215 pub fn with_snapshot_limit(mut self, limit: usize) -> Self {
217 self.snapshot_limit = Some(limit);
218 self
219 }
220}
221
222impl<T> Stream for UseBuilder<T>
223where
224 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
225{
226 type Item = T;
227
228 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
229 let this = self.get_mut();
230
231 if this.stream.is_none() {
232 this.stream = Some(UseStream::new_lazy_with_opts(
233 this.connection.clone(),
234 this.store.clone(),
235 this.view_path.clone(),
236 this.view_path.clone(),
237 this.key_filter.clone(),
238 None,
239 this.take,
240 this.skip,
241 this.with_snapshot,
242 this.after.clone(),
243 this.snapshot_limit,
244 ));
245 }
246
247 Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
248 }
249}
250
251pub struct WatchBuilder<T>
253where
254 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
255{
256 connection: ConnectionManager,
257 store: SharedStore,
258 view_path: String,
259 key_filter: KeyFilter,
260 take: Option<u32>,
261 skip: Option<u32>,
262 filters: Option<HashMap<String, String>>,
263 with_snapshot: Option<bool>,
264 after: Option<String>,
265 snapshot_limit: Option<usize>,
266 stream: Option<EntityStream<T>>,
267}
268
269impl<T> WatchBuilder<T>
270where
271 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
272{
273 fn new(
274 connection: ConnectionManager,
275 store: SharedStore,
276 view_path: String,
277 key_filter: KeyFilter,
278 ) -> Self {
279 Self {
280 connection,
281 store,
282 view_path,
283 key_filter,
284 take: None,
285 skip: None,
286 filters: None,
287 with_snapshot: None,
288 after: None,
289 snapshot_limit: None,
290 stream: None,
291 }
292 }
293
294 pub fn take(mut self, n: u32) -> Self {
296 self.take = Some(n);
297 self
298 }
299
300 pub fn skip(mut self, n: u32) -> Self {
302 self.skip = Some(n);
303 self
304 }
305
306 pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
308 self.filters
309 .get_or_insert_with(HashMap::new)
310 .insert(key.into(), value.into());
311 self
312 }
313
314 pub fn with_snapshot(mut self, with_snapshot: bool) -> Self {
316 self.with_snapshot = Some(with_snapshot);
317 self
318 }
319
320 pub fn after(mut self, cursor: impl Into<String>) -> Self {
322 self.after = Some(cursor.into());
323 self
324 }
325
326 pub fn with_snapshot_limit(mut self, limit: usize) -> Self {
328 self.snapshot_limit = Some(limit);
329 self
330 }
331
332 pub fn rich(self) -> RichEntityStream<T> {
334 RichEntityStream::new_lazy_with_opts(
335 self.connection,
336 self.store,
337 self.view_path.clone(),
338 self.view_path,
339 self.key_filter,
340 None,
341 self.take,
342 self.skip,
343 self.with_snapshot,
344 self.after,
345 self.snapshot_limit,
346 )
347 }
348}
349
350impl<T> Stream for WatchBuilder<T>
351where
352 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
353{
354 type Item = Update<T>;
355
356 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
357 let this = self.get_mut();
358
359 if this.stream.is_none() {
360 this.stream = Some(EntityStream::new_lazy_with_opts(
361 this.connection.clone(),
362 this.store.clone(),
363 this.view_path.clone(),
364 this.view_path.clone(),
365 this.key_filter.clone(),
366 None,
367 this.take,
368 this.skip,
369 this.with_snapshot,
370 this.after.clone(),
371 this.snapshot_limit,
372 ));
373 }
374
375 Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
376 }
377}
378
379pub struct RichWatchBuilder<T>
381where
382 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
383{
384 connection: ConnectionManager,
385 store: SharedStore,
386 view_path: String,
387 key_filter: KeyFilter,
388 take: Option<u32>,
389 skip: Option<u32>,
390 filters: Option<HashMap<String, String>>,
391 with_snapshot: Option<bool>,
392 after: Option<String>,
393 snapshot_limit: Option<usize>,
394 stream: Option<RichEntityStream<T>>,
395}
396
397impl<T> RichWatchBuilder<T>
398where
399 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
400{
401 fn new(
402 connection: ConnectionManager,
403 store: SharedStore,
404 view_path: String,
405 key_filter: KeyFilter,
406 ) -> Self {
407 Self {
408 connection,
409 store,
410 view_path,
411 key_filter,
412 take: None,
413 skip: None,
414 filters: None,
415 with_snapshot: None,
416 after: None,
417 snapshot_limit: None,
418 stream: None,
419 }
420 }
421
422 pub fn take(mut self, n: u32) -> Self {
423 self.take = Some(n);
424 self
425 }
426
427 pub fn skip(mut self, n: u32) -> Self {
428 self.skip = Some(n);
429 self
430 }
431
432 pub fn filter(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
433 self.filters
434 .get_or_insert_with(HashMap::new)
435 .insert(key.into(), value.into());
436 self
437 }
438
439 pub fn with_snapshot(mut self, with_snapshot: bool) -> Self {
441 self.with_snapshot = Some(with_snapshot);
442 self
443 }
444
445 pub fn after(mut self, cursor: impl Into<String>) -> Self {
447 self.after = Some(cursor.into());
448 self
449 }
450
451 pub fn with_snapshot_limit(mut self, limit: usize) -> Self {
453 self.snapshot_limit = Some(limit);
454 self
455 }
456}
457
458impl<T> Stream for RichWatchBuilder<T>
459where
460 T: Serialize + DeserializeOwned + Clone + Send + Sync + Unpin + 'static,
461{
462 type Item = crate::stream::RichUpdate<T>;
463
464 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
465 let this = self.get_mut();
466
467 if this.stream.is_none() {
468 this.stream = Some(RichEntityStream::new_lazy_with_opts(
469 this.connection.clone(),
470 this.store.clone(),
471 this.view_path.clone(),
472 this.view_path.clone(),
473 this.key_filter.clone(),
474 None,
475 this.take,
476 this.skip,
477 this.with_snapshot,
478 this.after.clone(),
479 this.snapshot_limit,
480 ));
481 }
482
483 Pin::new(this.stream.as_mut().unwrap()).poll_next(cx)
484 }
485}
486
487#[derive(Clone)]
491pub struct ViewBuilder {
492 connection: ConnectionManager,
493 store: SharedStore,
494 initial_data_timeout: Duration,
495}
496
497impl ViewBuilder {
498 pub fn new(
499 connection: ConnectionManager,
500 store: SharedStore,
501 initial_data_timeout: Duration,
502 ) -> Self {
503 Self {
504 connection,
505 store,
506 initial_data_timeout,
507 }
508 }
509
510 pub fn connection(&self) -> &ConnectionManager {
511 &self.connection
512 }
513
514 pub fn store(&self) -> &SharedStore {
515 &self.store
516 }
517
518 pub fn initial_data_timeout(&self) -> Duration {
519 self.initial_data_timeout
520 }
521
522 pub fn view<T>(&self, view_path: &str) -> ViewHandle<T>
524 where
525 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
526 {
527 ViewHandle {
528 connection: self.connection.clone(),
529 store: self.store.clone(),
530 view_path: view_path.to_string(),
531 initial_data_timeout: self.initial_data_timeout,
532 _marker: PhantomData,
533 }
534 }
535}
536
537pub trait Views: Sized + Send + Sync + 'static {
539 fn from_builder(builder: ViewBuilder) -> Self;
540}
541
542pub struct StateView<T> {
544 connection: ConnectionManager,
545 store: SharedStore,
546 view_path: String,
547 initial_data_timeout: Duration,
548 _marker: PhantomData<T>,
549}
550
551impl<T> StateView<T>
552where
553 T: Serialize + DeserializeOwned + Clone + Send + Sync + 'static,
554{
555 pub fn new(
556 connection: ConnectionManager,
557 store: SharedStore,
558 view_path: String,
559 initial_data_timeout: Duration,
560 ) -> Self {
561 Self {
562 connection,
563 store,
564 view_path,
565 initial_data_timeout,
566 _marker: PhantomData,
567 }
568 }
569
570 pub async fn get(&self, key: &str) -> Option<T> {
572 self.connection
573 .ensure_subscription(&self.view_path, Some(key))
574 .await;
575 self.store
576 .wait_for_view_ready(&self.view_path, self.initial_data_timeout)
577 .await;
578 self.store.get::<T>(&self.view_path, key).await
579 }
580
581 pub fn get_sync(&self, key: &str) -> Option<T> {
583 self.store.get_sync::<T>(&self.view_path, key)
584 }
585
586 pub fn listen(&self, key: &str) -> UseStream<T>
588 where
589 T: Unpin,
590 {
591 UseStream::new_lazy(
592 self.connection.clone(),
593 self.store.clone(),
594 self.view_path.clone(),
595 self.view_path.clone(),
596 KeyFilter::Single(key.to_string()),
597 Some(key.to_string()),
598 )
599 }
600
601 pub fn watch(&self, key: &str) -> EntityStream<T> {
603 EntityStream::new_lazy(
604 self.connection.clone(),
605 self.store.clone(),
606 self.view_path.clone(),
607 self.view_path.clone(),
608 KeyFilter::Single(key.to_string()),
609 Some(key.to_string()),
610 )
611 }
612
613 pub fn watch_rich(&self, key: &str) -> RichEntityStream<T> {
615 RichEntityStream::new_lazy(
616 self.connection.clone(),
617 self.store.clone(),
618 self.view_path.clone(),
619 self.view_path.clone(),
620 KeyFilter::Single(key.to_string()),
621 Some(key.to_string()),
622 )
623 }
624}