fetsig/browser/
collection.rs

1use std::marker::PhantomData;
2
3use artwrap::spawn_local;
4use futures_signals::{
5    map_ref,
6    signal::{Mutable, Signal, SignalExt},
7    signal_vec::{
8        MutableSignalVec, MutableVec, MutableVecLockMut, MutableVecLockRef, SignalVec, SignalVecExt,
9    },
10};
11use futures_signals_ext::{MutableExt, MutableVecExt};
12use log::{debug, error, trace, warn};
13use serde::{Serialize, de::DeserializeOwned};
14
15#[cfg(feature = "json")]
16use crate::JSONSerialize;
17#[cfg(any(feature = "json", feature = "postcard"))]
18use crate::MediaType;
19#[cfg(feature = "postcard")]
20use crate::PostcardSerialize;
21use crate::{
22    CollectionResponse, HEADER_SIGNATURE, MacSign, MacVerify, Messages, NoMac, Paging, StatusCode,
23};
24
25use super::{
26    CollectionState,
27    common::{PendingFetch, execute_fetch},
28    request::Request,
29    transferstate::TransferState,
30};
31
32#[derive(Debug)]
33pub struct CollectionStore<E, MV = NoMac> {
34    transfer_state: Mutable<TransferState>,
35    messages: Messages,
36    paging: Mutable<Paging>,
37    collection: MutableVec<E>,
38    pmv: PhantomData<MV>,
39}
40
41impl<E, MV> CollectionStore<E, MV> {
42    pub fn new() -> Self {
43        Self::new_value(vec![])
44    }
45
46    pub fn new_value(collection: Vec<E>) -> Self {
47        Self {
48            transfer_state: Mutable::new(TransferState::Empty),
49            messages: Messages::new(),
50            paging: Mutable::new(Paging::default()),
51            collection: MutableVec::new_with_values(collection),
52            pmv: PhantomData,
53        }
54    }
55
56    pub fn reset(&self) {
57        self.transfer_state.set_neq(TransferState::Empty);
58        self.messages.clear_all();
59        self.paging.set(Paging::default());
60        self.collection.lock_mut().clear();
61    }
62
63    pub fn invalidate(&self) {
64        self.transfer_state.set_neq(TransferState::Empty);
65    }
66
67    pub fn transfer_state(&self) -> TransferState {
68        self.transfer_state.get()
69    }
70
71    pub fn set_transfer_state(&self, transfer_state: TransferState) {
72        self.transfer_state.set_neq(transfer_state);
73    }
74
75    pub fn reset_transfer_error(&self) {
76        self.transfer_state.lock_mut().reset_error();
77    }
78
79    pub fn loaded(&self) -> bool {
80        self.transfer_state.map(TransferState::loaded)
81    }
82
83    pub fn loaded_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
84        self.transfer_state
85            .signal_ref(TransferState::loaded)
86            .dedupe()
87    }
88
89    pub fn loaded_status(&self) -> Option<StatusCode> {
90        self.transfer_state.map(TransferState::loaded_status)
91    }
92
93    pub fn loaded_status_signal(&self) -> impl Signal<Item = Option<StatusCode>> + use<E, MV> {
94        self.transfer_state
95            .signal_ref(TransferState::loaded_status)
96            .dedupe()
97    }
98
99    pub fn stored(&self) -> bool {
100        self.transfer_state.map(TransferState::stored)
101    }
102
103    pub fn stored_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
104        self.transfer_state
105            .signal_ref(TransferState::stored)
106            .dedupe()
107    }
108
109    pub fn stored_status(&self) -> Option<StatusCode> {
110        self.transfer_state.map(TransferState::stored_status)
111    }
112
113    pub fn stored_status_signal(&self) -> impl Signal<Item = Option<StatusCode>> + use<E, MV> {
114        self.transfer_state
115            .signal_ref(TransferState::stored_status)
116            .dedupe()
117    }
118
119    pub fn pending(&self) -> bool {
120        self.transfer_state.map(TransferState::pending)
121    }
122
123    pub fn pending_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
124        self.transfer_state
125            .signal_ref(TransferState::pending)
126            .dedupe()
127    }
128
129    pub fn collection(&self) -> &MutableVec<E> {
130        &self.collection
131    }
132
133    pub fn messages(&self) -> &Messages {
134        &self.messages
135    }
136
137    pub fn paging(&self) -> &Mutable<Paging> {
138        &self.paging
139    }
140
141    pub fn is_empty(&self) -> bool {
142        self.collection.lock_ref().is_empty()
143    }
144
145    pub fn any<F>(&self, f: F) -> bool
146    where
147        F: Fn(&E) -> bool,
148    {
149        self.collection.lock_ref().iter().any(f)
150    }
151
152    pub fn all<F>(&self, f: F) -> bool
153    where
154        F: Fn(&E) -> bool,
155    {
156        self.collection.lock_ref().iter().all(f)
157    }
158
159    pub fn lock_ref(&self) -> MutableVecLockRef<E> {
160        self.collection.lock_ref()
161    }
162
163    pub fn lock_mut(&self) -> MutableVecLockMut<E> {
164        self.collection.lock_mut()
165    }
166
167    pub fn inspect<F>(&self, f: F)
168    where
169        F: FnOnce(&[E]),
170    {
171        self.collection.inspect(f)
172    }
173
174    pub fn inspect_mut<F>(&self, f: F)
175    where
176        F: FnOnce(&mut MutableVecLockMut<E>),
177    {
178        self.collection.inspect_mut(f)
179    }
180
181    pub fn find_map<F, U>(&self, f: F) -> Option<U>
182    where
183        F: Fn(&E) -> Option<U>,
184    {
185        self.collection.lock_ref().iter().find_map(f)
186    }
187
188    pub fn map_vec<F, U>(&self, f: F) -> U
189    where
190        F: FnOnce(&[E]) -> U,
191    {
192        self.collection.map_vec(f)
193    }
194
195    pub fn map_vec_mut<F, U>(&self, f: F) -> U
196    where
197        F: FnOnce(&mut MutableVecLockMut<E>) -> U,
198    {
199        self.collection.map_vec_mut(f)
200    }
201}
202
203impl<E, MV> CollectionStore<E, MV>
204where
205    E: Copy,
206{
207    pub fn empty_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
208        self.collection.signal_vec().is_empty().dedupe()
209    }
210
211    pub fn collection_state_signal(&self) -> impl Signal<Item = CollectionState> + use<E, MV> {
212        collection_state_signal(self.pending_signal(), self.empty_signal())
213    }
214
215    pub fn find<F>(&self, f: F) -> Option<E>
216    where
217        F: Fn(&E) -> bool,
218    {
219        self.find_map(|e| f(e).then_some(*e))
220    }
221
222    pub fn find_inspect_mut<P, F>(&self, predicate: P, f: F) -> Option<bool>
223    where
224        P: FnMut(&E) -> bool,
225        F: FnMut(&mut E) -> bool,
226    {
227        self.collection.find_inspect_mut(predicate, f)
228    }
229
230    pub fn find_set<P>(&self, predicate: P, item: E) -> bool
231    where
232        P: FnMut(&E) -> bool,
233    {
234        self.collection.find_set(predicate, item)
235    }
236
237    pub fn find_set_or_add<P>(&self, predicate: P, item: E)
238    where
239        P: FnMut(&E) -> bool,
240    {
241        self.collection.find_set_or_add(predicate, item);
242    }
243
244    pub fn replace(&self, values: Vec<E>) -> Vec<E> {
245        self.messages.clear_all();
246        let mut collection = self.collection.lock_mut();
247        let current = collection.drain(..).collect();
248        collection.replace(values);
249        current
250    }
251
252    pub fn remove<P>(&self, predicate: P) -> bool
253    where
254        P: FnMut(&E) -> bool,
255    {
256        self.collection.find_remove(predicate)
257    }
258
259    pub fn set_externally_loaded(&self, values: Vec<E>) {
260        self.collection.lock_mut().replace(values);
261        self.transfer_state
262            .set_neq(TransferState::Loaded(StatusCode::Ok));
263    }
264
265    pub fn signal_map<F, U>(&self, f: F) -> impl Signal<Item = U> + use<E, MV, F, U>
266    where
267        F: FnMut(&[E]) -> U,
268    {
269        self.collection.signal_vec().to_signal_map(f)
270    }
271
272    pub fn signal_vec(&self) -> MutableSignalVec<E> {
273        self.collection.signal_vec()
274    }
275
276    pub fn signal_vec_filter<F>(&self, f: F) -> impl SignalVec<Item = E> + use<E, MV, F>
277    where
278        F: FnMut(&E) -> bool,
279    {
280        self.collection.signal_vec_filter(f)
281    }
282
283    pub fn signal_vec_filter_signal<F, U>(
284        &self,
285        f: F,
286    ) -> impl SignalVec<Item = E> + use<E, MV, F, U>
287    where
288        F: FnMut(&E) -> U,
289        U: Signal<Item = bool>,
290    {
291        self.collection.signal_vec_filter_signal(f)
292    }
293
294    pub fn signal_vec_map<F, U>(&self, f: F) -> impl SignalVec<Item = U> + use<E, MV, F, U>
295    where
296        F: FnMut(E) -> U,
297    {
298        self.collection.signal_vec().map(f)
299    }
300
301    pub fn signal_vec_map_signal<F, U>(
302        &self,
303        f: F,
304    ) -> impl SignalVec<Item = U::Item> + use<E, MV, F, U>
305    where
306        F: FnMut(E) -> U,
307        U: Signal,
308    {
309        self.collection.signal_vec().map_signal(f)
310    }
311
312    pub fn signal_vec_filter_map<F, U>(&self, f: F) -> impl SignalVec<Item = U> + use<E, MV, F, U>
313    where
314        F: FnMut(E) -> Option<U>,
315    {
316        self.collection.signal_vec().filter_map(f)
317    }
318}
319
320impl<E, MV> CollectionStore<E, MV>
321where
322    E: Clone,
323{
324    pub fn empty_signal_cloned(&self) -> impl Signal<Item = bool> + use<E, MV> {
325        self.collection.signal_vec_cloned().is_empty().dedupe()
326    }
327
328    pub fn collection_state_signal_cloned(
329        &self,
330    ) -> impl Signal<Item = CollectionState> + use<E, MV> {
331        collection_state_signal(self.pending_signal(), self.empty_signal_cloned())
332    }
333
334    pub fn find_cloned<F>(&self, f: F) -> Option<E>
335    where
336        F: Fn(&E) -> bool,
337    {
338        self.find_map(|e| f(e).then(|| e.clone()))
339    }
340
341    pub fn find_inspect_mut_cloned<P, F>(&self, predicate: P, f: F) -> Option<bool>
342    where
343        P: FnMut(&E) -> bool,
344        F: FnMut(&mut E) -> bool,
345    {
346        self.collection.find_inspect_mut_cloned(predicate, f)
347    }
348
349    pub fn get_cloned(&self) -> Vec<E> {
350        self.collection.lock_ref().to_vec()
351    }
352
353    pub fn find_set_cloned<P>(&self, predicate: P, item: E) -> bool
354    where
355        P: FnMut(&E) -> bool,
356    {
357        self.collection.find_set_cloned(predicate, item)
358    }
359
360    pub fn find_set_or_add_cloned<P>(&self, predicate: P, item: E)
361    where
362        P: FnMut(&E) -> bool,
363    {
364        self.collection.find_set_or_add_cloned(predicate, item);
365    }
366
367    pub fn replace_cloned(&self, values: Vec<E>) -> Vec<E> {
368        self.messages.clear_all();
369        let mut collection = self.collection.lock_mut();
370        let current = collection.drain(..).collect();
371        collection.replace_cloned(values);
372        current
373    }
374
375    pub fn remove_cloned<P>(&self, predicate: P) -> bool
376    where
377        P: FnMut(&E) -> bool,
378    {
379        self.collection.find_remove_cloned(predicate)
380    }
381
382    pub fn set_externally_loaded_cloned(&self, values: Vec<E>) {
383        self.collection.lock_mut().replace_cloned(values);
384        self.transfer_state
385            .set_neq(TransferState::Loaded(StatusCode::Ok));
386    }
387
388    pub fn signal_map_cloned<F, U>(&self, f: F) -> impl Signal<Item = U> + use<E, MV, F, U>
389    where
390        F: FnMut(&[E]) -> U,
391    {
392        self.collection.signal_vec_cloned().to_signal_map(f)
393    }
394
395    pub fn signal_vec_cloned(&self) -> MutableSignalVec<E> {
396        self.collection.signal_vec_cloned()
397    }
398
399    pub fn signal_vec_filter_cloned<F>(&self, f: F) -> impl SignalVec<Item = E> + use<E, MV, F>
400    where
401        F: FnMut(&E) -> bool,
402    {
403        self.collection.signal_vec_filter_cloned(f)
404    }
405
406    pub fn signal_vec_filter_signal_cloned<F, U>(
407        &self,
408        f: F,
409    ) -> impl SignalVec<Item = E> + use<E, MV, F, U>
410    where
411        F: FnMut(&E) -> U,
412        U: Signal<Item = bool>,
413    {
414        self.collection.signal_vec_filter_signal_cloned(f)
415    }
416
417    pub fn signal_vec_map_cloned<F, U>(&self, f: F) -> impl SignalVec<Item = U> + use<E, MV, F, U>
418    where
419        F: FnMut(E) -> U,
420    {
421        self.collection.signal_vec_cloned().map(f)
422    }
423
424    pub fn signal_vec_map_signal_cloned<F, U>(
425        &self,
426        f: F,
427    ) -> impl SignalVec<Item = U::Item> + use<E, MV, F, U>
428    where
429        F: FnMut(E) -> U,
430        U: Signal,
431    {
432        self.collection.signal_vec_cloned().map_signal(f)
433    }
434
435    pub fn signal_vec_filter_map_cloned<F, U>(
436        &self,
437        f: F,
438    ) -> impl SignalVec<Item = U> + use<E, MV, F, U>
439    where
440        F: FnMut(E) -> Option<U>,
441    {
442        self.collection.signal_vec_cloned().filter_map(f)
443    }
444}
445
446impl<E, MV> CollectionStore<E, MV>
447where
448    E: Clone,
449    MV: MacVerify,
450{
451    pub fn load<C>(&self, request: Request<'_>, result_callback: C)
452    where
453        E: DeserializeOwned + 'static,
454        C: FnOnce(StatusCode) + 'static,
455    {
456        if self.transfer_state.map(TransferState::loaded) {
457            if request.logging() {
458                debug!("Request to load {} skipped, using cache", request.url());
459
460                if !request.method().is_load() {
461                    warn!(
462                        "Load request unexpectedly uses store verb {:?}",
463                        request.method().as_str()
464                    );
465                }
466            }
467        } else {
468            self.load_skip_cache(request, result_callback);
469        }
470    }
471
472    pub fn load_skip_cache<C>(&self, request: Request<'_>, result_callback: C)
473    where
474        E: DeserializeOwned + 'static,
475        C: FnOnce(StatusCode) + 'static,
476    {
477        if request.logging() {
478            debug!("Request to load {}", request.url());
479
480            if !request.method().is_load() {
481                warn!(
482                    "Load request unexpectedly uses store verb {:?}",
483                    request.method().as_str()
484                );
485            }
486        }
487
488        let collection = self.collection.clone();
489        fetch::<_, _, _, MV>(
490            request.with_is_load(true),
491            self.transfer_state.clone(),
492            self.messages.clone(),
493            self.paging.clone(),
494            move |new| {
495                collection.lock_mut().replace_cloned(new);
496            },
497            result_callback,
498        );
499    }
500
501    pub fn load_merge<F, C>(&self, request: Request<'_>, merge_fn: F, result_callback: C)
502    where
503        E: DeserializeOwned + 'static,
504        F: FnMut(Vec<E>) + 'static,
505        C: FnOnce(StatusCode) + 'static,
506    {
507        if request.logging() {
508            debug!("Request to load/merge {}", request.url());
509
510            if !request.method().is_load() {
511                warn!(
512                    "Load/merge request unexpectedly uses store verb {:?}",
513                    request.method().as_str()
514                );
515            }
516        }
517        fetch::<_, _, _, MV>(
518            request.with_is_load(true),
519            self.transfer_state.clone(),
520            self.messages.clone(),
521            self.paging.clone(),
522            merge_fn,
523            result_callback,
524        );
525    }
526
527    pub fn store<MS, C>(&self, request: Request<'_>, result_callback: C)
528    where
529        E: Serialize + DeserializeOwned + 'static,
530        MS: MacSign,
531        C: FnOnce(StatusCode) + 'static,
532    {
533        let mut request = request.with_is_load(false);
534        if request.logging() {
535            debug!("Request to update {}", request.url());
536
537            if request.method().is_load() {
538                warn!(
539                    "Store request unexpectedly uses load verb {:?}",
540                    request.method().as_str()
541                );
542            }
543        }
544
545        {
546            // scope around vector and collection borrow
547            let collection = self.lock_ref();
548            if !collection.is_empty() {
549                let media_type = match request.media_type() {
550                    #[cfg(feature = "json")]
551                    Some(media_type @ MediaType::Json) => media_type,
552                    #[cfg(feature = "postcard")]
553                    Some(media_type @ MediaType::Postcard) => media_type,
554                    _ => {
555                        if request.logging() {
556                            warn!("Request failed as unsupported media type is requested");
557                        }
558                        self.messages.replace(Messages::from_service_error(
559                            "Request failed as unsupported media type is requested",
560                        ));
561                        self.transfer_state
562                            .lock_mut()
563                            .stop(StatusCode::UnsupportedMediaType);
564                        return;
565                    }
566                };
567
568                let content = collection.to_vec();
569                let bytes = match media_type {
570                    #[cfg(feature = "json")]
571                    MediaType::Json => content.to_json(),
572                    #[cfg(feature = "postcard")]
573                    MediaType::Postcard => content.to_postcard(),
574                    _ => {
575                        if request.logging() {
576                            error!("Unsupported media type requested, unexpected code flow");
577                        }
578                        return;
579                    }
580                };
581
582                if let Some(signature) = MS::sign(bytes.as_ref()) {
583                    request = request.with_header(HEADER_SIGNATURE, signature);
584                }
585
586                request = request.with_body(bytes);
587            }
588        }
589
590        let collection = self.collection.clone();
591        fetch::<_, _, _, MV>(
592            request,
593            self.transfer_state.clone(),
594            self.messages.clone(),
595            self.paging.clone(),
596            move |new| collection.lock_mut().replace_cloned(new),
597            result_callback,
598        );
599    }
600}
601
602fn fetch<E, F, C, MV>(
603    request: Request<'_>,
604    transfer_state: Mutable<TransferState>,
605    messages: Messages,
606    paging: Mutable<Paging>,
607    store_fn: F,
608    result_callback: C,
609) where
610    E: Clone + DeserializeOwned + 'static,
611    F: FnMut(Vec<E>) + 'static,
612    C: FnOnce(StatusCode) + 'static,
613    MV: MacVerify,
614{
615    let logging = request.logging();
616
617    let pending_fetch = match request.start() {
618        Ok(future) => future,
619        Err(error) => {
620            if logging {
621                debug!("Request failed at init, error: {}", error);
622            }
623            result_callback(StatusCode::BadRequest);
624            transfer_state.lock_mut().stop(StatusCode::FetchFailed);
625            return;
626        }
627    };
628    if request.is_load() {
629        transfer_state.lock_mut().start_load();
630    } else {
631        transfer_state.lock_mut().start_store();
632    }
633
634    let context = CollectionFetchContext::<F> {
635        logging,
636        messages,
637        paging,
638        store_fn,
639    };
640
641    spawn_local(async move {
642        let status = execute_collection_fetch::<_, _, MV>(pending_fetch, context).await;
643        result_callback(status);
644        transfer_state.lock_mut().stop(status);
645    });
646}
647
648async fn execute_collection_fetch<E, F, MV>(
649    pending_fetch: PendingFetch,
650    CollectionFetchContext {
651        logging,
652        messages,
653        paging,
654        mut store_fn,
655    }: CollectionFetchContext<F>,
656) -> StatusCode
657where
658    E: Clone + DeserializeOwned,
659    F: FnMut(Vec<E>) + 'static,
660    MV: MacVerify,
661{
662    let mut result = execute_fetch::<CollectionResponse<E>, MV>(pending_fetch).await;
663    match (result.status(), result.take_response()) {
664        (status @ StatusCode::FetchTimeout, _) => {
665            if logging {
666                // TODO: should this warning go also to Messages???
667                debug!(
668                    "Timeout accessing {}.",
669                    result.hint().unwrap_or("?unknown url")
670                );
671            }
672            status
673        }
674        (status @ StatusCode::FetchFailed, _) => {
675            if logging {
676                // TODO: should this warning go also to Messages???
677                debug!(
678                    "Request failed in execution, error: {}",
679                    result.hint().unwrap_or("?unknown")
680                );
681            }
682            status
683        }
684        (status @ StatusCode::DecodeFailed, _) => {
685            if logging {
686                // TODO: should this warning go also to Messages???
687                warn!(
688                    "Response decoding failed, error: {}",
689                    result.hint().unwrap_or("?unknown")
690                );
691            }
692            status
693        }
694        (status, None) => status,
695        (status, Some(response)) => {
696            let (response_entities, response_messages, response_paging) = response.take();
697            messages.replace(response_messages);
698            if status.is_success() {
699                if let Some(response_entities) = response_entities {
700                    if logging {
701                        trace!("Request successfully fetched collection.");
702                    }
703                    store_fn(response_entities);
704                }
705            }
706            *paging.lock_mut() = response_paging;
707            status
708        }
709    }
710}
711
712impl<E, MV> Default for CollectionStore<E, MV> {
713    fn default() -> Self {
714        Self::new()
715    }
716}
717
718struct CollectionFetchContext<F> {
719    logging: bool,
720    messages: Messages,
721    paging: Mutable<Paging>,
722    store_fn: F,
723}
724
725pub fn collection_state_signal<P, E>(pending: P, empty: E) -> impl Signal<Item = CollectionState>
726where
727    P: Signal<Item = bool>,
728    E: Signal<Item = bool>,
729{
730    map_ref! {
731        pending, empty => {
732            match (pending, empty) {
733                (true, _) => CollectionState::Pending,
734                (false, true) => CollectionState::Empty,
735                (false, false) => CollectionState::NotEmpty,
736            }
737        }
738    }
739    .dedupe()
740}