fetsig/browser/
collection.rs

1use std::marker::PhantomData;
2
3use artwrap::spawn_local;
4use futures_signals::{
5    map_ref,
6    signal::{Mutable, Signal, SignalExt},
7    signal_vec::{
8        MutableSignalVec, MutableVec, MutableVecLockMut, MutableVecLockRef, SignalVec, SignalVecExt,
9    },
10};
11use futures_signals_ext::{MutableExt, MutableVecExt};
12use log::{debug, error, trace, warn};
13use serde::{Serialize, de::DeserializeOwned};
14
15#[cfg(feature = "json")]
16use crate::JSONSerialize;
17#[cfg(any(feature = "json", feature = "postcard"))]
18use crate::MediaType;
19#[cfg(feature = "postcard")]
20use crate::PostcardSerialize;
21use crate::{
22    CollectionResponse, HEADER_SIGNATURE, MacSign, MacVerify, Messages, NoMac, Paging, StatusCode,
23};
24
25use super::{
26    CollectionState,
27    common::{PendingFetch, execute_fetch},
28    request::Request,
29    transferstate::TransferState,
30};
31
32pub struct CollectionStore<E, MV = NoMac> {
33    transfer_state: Mutable<TransferState>,
34    messages: Messages,
35    paging: Mutable<Paging>,
36    collection: MutableVec<E>,
37    pmv: PhantomData<MV>,
38}
39
40impl<E, MV> CollectionStore<E, MV> {
41    pub fn new() -> Self {
42        Self::new_value(vec![])
43    }
44
45    pub fn new_value(collection: Vec<E>) -> Self {
46        Self {
47            transfer_state: Mutable::new(TransferState::Empty),
48            messages: Messages::new(),
49            paging: Mutable::new(Paging::default()),
50            collection: MutableVec::new_with_values(collection),
51            pmv: PhantomData,
52        }
53    }
54
55    pub fn reset(&self) {
56        self.transfer_state.set_neq(TransferState::Empty);
57        self.messages.clear_all();
58        self.paging.set(Paging::default());
59        self.collection.lock_mut().clear();
60    }
61
62    pub fn invalidate(&self) {
63        self.transfer_state.set_neq(TransferState::Empty);
64    }
65
66    pub fn transfer_state(&self) -> TransferState {
67        self.transfer_state.get()
68    }
69
70    pub fn set_transfer_state(&self, transfer_state: TransferState) {
71        self.transfer_state.set_neq(transfer_state);
72    }
73
74    pub fn reset_transfer_error(&self) {
75        self.transfer_state.lock_mut().reset_error();
76    }
77
78    pub fn loaded(&self) -> bool {
79        self.transfer_state.map(TransferState::loaded)
80    }
81
82    pub fn loaded_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
83        self.transfer_state
84            .signal_ref(TransferState::loaded)
85            .dedupe()
86    }
87
88    pub fn loaded_status(&self) -> Option<StatusCode> {
89        self.transfer_state.map(TransferState::loaded_status)
90    }
91
92    pub fn loaded_status_signal(&self) -> impl Signal<Item = Option<StatusCode>> + use<E, MV> {
93        self.transfer_state
94            .signal_ref(TransferState::loaded_status)
95            .dedupe()
96    }
97
98    pub fn stored(&self) -> bool {
99        self.transfer_state.map(TransferState::stored)
100    }
101
102    pub fn stored_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
103        self.transfer_state
104            .signal_ref(TransferState::stored)
105            .dedupe()
106    }
107
108    pub fn stored_status(&self) -> Option<StatusCode> {
109        self.transfer_state.map(TransferState::stored_status)
110    }
111
112    pub fn stored_status_signal(&self) -> impl Signal<Item = Option<StatusCode>> + use<E, MV> {
113        self.transfer_state
114            .signal_ref(TransferState::stored_status)
115            .dedupe()
116    }
117
118    pub fn pending(&self) -> bool {
119        self.transfer_state.map(TransferState::pending)
120    }
121
122    pub fn pending_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
123        self.transfer_state
124            .signal_ref(TransferState::pending)
125            .dedupe()
126    }
127
128    pub fn collection(&self) -> &MutableVec<E> {
129        &self.collection
130    }
131
132    pub fn messages(&self) -> &Messages {
133        &self.messages
134    }
135
136    pub fn paging(&self) -> &Mutable<Paging> {
137        &self.paging
138    }
139
140    pub fn is_empty(&self) -> bool {
141        self.collection.lock_ref().is_empty()
142    }
143
144    pub fn any<F>(&self, f: F) -> bool
145    where
146        F: Fn(&E) -> bool,
147    {
148        self.collection.lock_ref().iter().any(f)
149    }
150
151    pub fn all<F>(&self, f: F) -> bool
152    where
153        F: Fn(&E) -> bool,
154    {
155        self.collection.lock_ref().iter().all(f)
156    }
157
158    pub fn lock_ref(&self) -> MutableVecLockRef<'_, E> {
159        self.collection.lock_ref()
160    }
161
162    pub fn lock_mut(&self) -> MutableVecLockMut<'_, E> {
163        self.collection.lock_mut()
164    }
165
166    pub fn inspect<F>(&self, f: F)
167    where
168        F: FnOnce(&[E]),
169    {
170        self.collection.inspect(f)
171    }
172
173    pub fn inspect_mut<F>(&self, f: F)
174    where
175        F: FnOnce(&mut MutableVecLockMut<E>),
176    {
177        self.collection.inspect_mut(f)
178    }
179
180    pub fn find_map<F, U>(&self, f: F) -> Option<U>
181    where
182        F: Fn(&E) -> Option<U>,
183    {
184        self.collection.lock_ref().iter().find_map(f)
185    }
186
187    pub fn map_vec<F, U>(&self, f: F) -> U
188    where
189        F: FnOnce(&[E]) -> U,
190    {
191        self.collection.map_vec(f)
192    }
193
194    pub fn map_vec_mut<F, U>(&self, f: F) -> U
195    where
196        F: FnOnce(&mut MutableVecLockMut<E>) -> U,
197    {
198        self.collection.map_vec_mut(f)
199    }
200}
201
202impl<E, MV> CollectionStore<E, MV>
203where
204    E: Copy,
205{
206    pub fn empty_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
207        self.collection.signal_vec().is_empty().dedupe()
208    }
209
210    pub fn collection_state_signal(&self) -> impl Signal<Item = CollectionState> + use<E, MV> {
211        collection_state_signal(self.pending_signal(), self.empty_signal())
212    }
213
214    pub fn find<F>(&self, f: F) -> Option<E>
215    where
216        F: Fn(&E) -> bool,
217    {
218        self.find_map(|e| f(e).then_some(*e))
219    }
220
221    pub fn find_inspect_mut<P, F>(&self, predicate: P, f: F) -> Option<bool>
222    where
223        P: FnMut(&E) -> bool,
224        F: FnMut(&mut E) -> bool,
225    {
226        self.collection.find_inspect_mut(predicate, f)
227    }
228
229    pub fn find_set<P>(&self, predicate: P, item: E) -> bool
230    where
231        P: FnMut(&E) -> bool,
232    {
233        self.collection.find_set(predicate, item)
234    }
235
236    pub fn find_set_or_add<P>(&self, predicate: P, item: E)
237    where
238        P: FnMut(&E) -> bool,
239    {
240        self.collection.find_set_or_add(predicate, item);
241    }
242
243    pub fn replace(&self, values: Vec<E>) -> Vec<E> {
244        self.messages.clear_all();
245        let mut collection = self.collection.lock_mut();
246        let current = collection.drain(..).collect();
247        collection.replace(values);
248        current
249    }
250
251    pub fn remove<P>(&self, predicate: P) -> bool
252    where
253        P: FnMut(&E) -> bool,
254    {
255        self.collection.find_remove(predicate)
256    }
257
258    pub fn set_externally_loaded(&self, values: Vec<E>) {
259        self.collection.lock_mut().replace(values);
260        self.transfer_state
261            .set_neq(TransferState::Loaded(StatusCode::Ok));
262    }
263
264    pub fn signal_map<F, U>(&self, f: F) -> impl Signal<Item = U> + use<E, MV, F, U>
265    where
266        F: FnMut(&[E]) -> U,
267    {
268        self.collection.signal_vec().to_signal_map(f)
269    }
270
271    pub fn signal_vec(&self) -> MutableSignalVec<E> {
272        self.collection.signal_vec()
273    }
274
275    pub fn signal_vec_filter<F>(&self, f: F) -> impl SignalVec<Item = E> + use<E, MV, F>
276    where
277        F: FnMut(&E) -> bool,
278    {
279        self.collection.signal_vec_filter(f)
280    }
281
282    pub fn signal_vec_filter_signal<F, U>(
283        &self,
284        f: F,
285    ) -> impl SignalVec<Item = E> + use<E, MV, F, U>
286    where
287        F: FnMut(&E) -> U,
288        U: Signal<Item = bool>,
289    {
290        self.collection.signal_vec_filter_signal(f)
291    }
292
293    pub fn signal_vec_map<F, U>(&self, f: F) -> impl SignalVec<Item = U> + use<E, MV, F, U>
294    where
295        F: FnMut(E) -> U,
296    {
297        self.collection.signal_vec().map(f)
298    }
299
300    pub fn signal_vec_map_signal<F, U>(
301        &self,
302        f: F,
303    ) -> impl SignalVec<Item = U::Item> + use<E, MV, F, U>
304    where
305        F: FnMut(E) -> U,
306        U: Signal,
307    {
308        self.collection.signal_vec().map_signal(f)
309    }
310
311    pub fn signal_vec_filter_map<F, U>(&self, f: F) -> impl SignalVec<Item = U> + use<E, MV, F, U>
312    where
313        F: FnMut(E) -> Option<U>,
314    {
315        self.collection.signal_vec().filter_map(f)
316    }
317}
318
319impl<E, MV> CollectionStore<E, MV>
320where
321    E: Clone,
322{
323    pub fn empty_signal_cloned(&self) -> impl Signal<Item = bool> + use<E, MV> {
324        self.collection.signal_vec_cloned().is_empty().dedupe()
325    }
326
327    pub fn collection_state_signal_cloned(
328        &self,
329    ) -> impl Signal<Item = CollectionState> + use<E, MV> {
330        collection_state_signal(self.pending_signal(), self.empty_signal_cloned())
331    }
332
333    pub fn find_cloned<F>(&self, f: F) -> Option<E>
334    where
335        F: Fn(&E) -> bool,
336    {
337        self.find_map(|e| f(e).then(|| e.clone()))
338    }
339
340    pub fn find_inspect_mut_cloned<P, F>(&self, predicate: P, f: F) -> Option<bool>
341    where
342        P: FnMut(&E) -> bool,
343        F: FnMut(&mut E) -> bool,
344    {
345        self.collection.find_inspect_mut_cloned(predicate, f)
346    }
347
348    pub fn get_cloned(&self) -> Vec<E> {
349        self.collection.lock_ref().to_vec()
350    }
351
352    pub fn find_set_cloned<P>(&self, predicate: P, item: E) -> bool
353    where
354        P: FnMut(&E) -> bool,
355    {
356        self.collection.find_set_cloned(predicate, item)
357    }
358
359    pub fn find_set_or_add_cloned<P>(&self, predicate: P, item: E)
360    where
361        P: FnMut(&E) -> bool,
362    {
363        self.collection.find_set_or_add_cloned(predicate, item);
364    }
365
366    pub fn replace_cloned(&self, values: Vec<E>) -> Vec<E> {
367        self.messages.clear_all();
368        let mut collection = self.collection.lock_mut();
369        let current = collection.drain(..).collect();
370        collection.replace_cloned(values);
371        current
372    }
373
374    pub fn remove_cloned<P>(&self, predicate: P) -> bool
375    where
376        P: FnMut(&E) -> bool,
377    {
378        self.collection.find_remove_cloned(predicate)
379    }
380
381    pub fn set_externally_loaded_cloned(&self, values: Vec<E>) {
382        self.collection.lock_mut().replace_cloned(values);
383        self.transfer_state
384            .set_neq(TransferState::Loaded(StatusCode::Ok));
385    }
386
387    pub fn signal_map_cloned<F, U>(&self, f: F) -> impl Signal<Item = U> + use<E, MV, F, U>
388    where
389        F: FnMut(&[E]) -> U,
390    {
391        self.collection.signal_vec_cloned().to_signal_map(f)
392    }
393
394    pub fn signal_vec_cloned(&self) -> MutableSignalVec<E> {
395        self.collection.signal_vec_cloned()
396    }
397
398    pub fn signal_vec_filter_cloned<F>(&self, f: F) -> impl SignalVec<Item = E> + use<E, MV, F>
399    where
400        F: FnMut(&E) -> bool,
401    {
402        self.collection.signal_vec_filter_cloned(f)
403    }
404
405    pub fn signal_vec_filter_signal_cloned<F, U>(
406        &self,
407        f: F,
408    ) -> impl SignalVec<Item = E> + use<E, MV, F, U>
409    where
410        F: FnMut(&E) -> U,
411        U: Signal<Item = bool>,
412    {
413        self.collection.signal_vec_filter_signal_cloned(f)
414    }
415
416    pub fn signal_vec_map_cloned<F, U>(&self, f: F) -> impl SignalVec<Item = U> + use<E, MV, F, U>
417    where
418        F: FnMut(E) -> U,
419    {
420        self.collection.signal_vec_cloned().map(f)
421    }
422
423    pub fn signal_vec_map_signal_cloned<F, U>(
424        &self,
425        f: F,
426    ) -> impl SignalVec<Item = U::Item> + use<E, MV, F, U>
427    where
428        F: FnMut(E) -> U,
429        U: Signal,
430    {
431        self.collection.signal_vec_cloned().map_signal(f)
432    }
433
434    pub fn signal_vec_filter_map_cloned<F, U>(
435        &self,
436        f: F,
437    ) -> impl SignalVec<Item = U> + use<E, MV, F, U>
438    where
439        F: FnMut(E) -> Option<U>,
440    {
441        self.collection.signal_vec_cloned().filter_map(f)
442    }
443}
444
445impl<E, MV> CollectionStore<E, MV>
446where
447    E: Clone,
448    MV: MacVerify,
449{
450    pub fn load<C>(&self, request: Request<'_>, result_callback: C)
451    where
452        E: DeserializeOwned + 'static,
453        C: FnOnce(StatusCode) + 'static,
454    {
455        if self.transfer_state.map(TransferState::loaded) {
456            if request.logging() {
457                debug!("Request to load {} skipped, using cache", request.url());
458
459                if !request.method().is_load() {
460                    warn!(
461                        "Load request unexpectedly uses store verb {:?}",
462                        request.method().as_str()
463                    );
464                }
465            }
466        } else {
467            self.load_skip_cache(request, result_callback);
468        }
469    }
470
471    pub fn load_skip_cache<C>(&self, request: Request<'_>, result_callback: C)
472    where
473        E: DeserializeOwned + 'static,
474        C: FnOnce(StatusCode) + 'static,
475    {
476        if request.logging() {
477            debug!("Request to load {}", request.url());
478
479            if !request.method().is_load() {
480                warn!(
481                    "Load request unexpectedly uses store verb {:?}",
482                    request.method().as_str()
483                );
484            }
485        }
486
487        let collection = self.collection.clone();
488        fetch::<_, _, _, MV>(
489            request.with_is_load(true),
490            self.transfer_state.clone(),
491            self.messages.clone(),
492            self.paging.clone(),
493            move |new| {
494                collection.lock_mut().replace_cloned(new);
495            },
496            result_callback,
497        );
498    }
499
500    pub fn load_merge<F, C>(&self, request: Request<'_>, merge_fn: F, result_callback: C)
501    where
502        E: DeserializeOwned + 'static,
503        F: FnMut(Vec<E>) + 'static,
504        C: FnOnce(StatusCode) + 'static,
505    {
506        if request.logging() {
507            debug!("Request to load/merge {}", request.url());
508
509            if !request.method().is_load() {
510                warn!(
511                    "Load/merge request unexpectedly uses store verb {:?}",
512                    request.method().as_str()
513                );
514            }
515        }
516        fetch::<_, _, _, MV>(
517            request.with_is_load(true),
518            self.transfer_state.clone(),
519            self.messages.clone(),
520            self.paging.clone(),
521            merge_fn,
522            result_callback,
523        );
524    }
525
526    pub fn store<MS, C>(&self, request: Request<'_>, result_callback: C)
527    where
528        E: Serialize + DeserializeOwned + 'static,
529        MS: MacSign,
530        C: FnOnce(StatusCode) + 'static,
531    {
532        let mut request = request.with_is_load(false);
533        if request.logging() {
534            debug!("Request to update {}", request.url());
535
536            if request.method().is_load() {
537                warn!(
538                    "Store request unexpectedly uses load verb {:?}",
539                    request.method().as_str()
540                );
541            }
542        }
543
544        {
545            // scope around vector and collection borrow
546            let collection = self.lock_ref();
547            if !collection.is_empty() {
548                let media_type = match request.media_type() {
549                    #[cfg(feature = "json")]
550                    Some(media_type @ MediaType::Json) => media_type,
551                    #[cfg(feature = "postcard")]
552                    Some(media_type @ MediaType::Postcard) => media_type,
553                    _ => {
554                        if request.logging() {
555                            warn!("Request failed as unsupported media type is requested");
556                        }
557                        self.messages.replace(Messages::from_service_error(
558                            "Request failed as unsupported media type is requested",
559                        ));
560                        self.transfer_state
561                            .lock_mut()
562                            .stop(StatusCode::UnsupportedMediaType);
563                        return;
564                    }
565                };
566
567                let content = collection.to_vec();
568                let bytes = match media_type {
569                    #[cfg(feature = "json")]
570                    MediaType::Json => content.to_json(),
571                    #[cfg(feature = "postcard")]
572                    MediaType::Postcard => content.to_postcard(),
573                    _ => {
574                        if request.logging() {
575                            error!("Unsupported media type requested, unexpected code flow");
576                        }
577                        return;
578                    }
579                };
580                let bytes = match bytes {
581                    Ok(bytes) => bytes,
582                    Err(error) => {
583                        if request.logging() {
584                            error!("Cannot serialize collection: {error}");
585                        }
586                        return;
587                    }
588                };
589
590                if let Some(signature) = MS::sign(bytes.as_ref()) {
591                    request = request.with_header(HEADER_SIGNATURE, signature);
592                }
593
594                request = request.with_body(bytes);
595            }
596        }
597
598        let collection = self.collection.clone();
599        fetch::<_, _, _, MV>(
600            request,
601            self.transfer_state.clone(),
602            self.messages.clone(),
603            self.paging.clone(),
604            move |new| collection.lock_mut().replace_cloned(new),
605            result_callback,
606        );
607    }
608}
609
610fn fetch<E, F, C, MV>(
611    request: Request<'_>,
612    transfer_state: Mutable<TransferState>,
613    messages: Messages,
614    paging: Mutable<Paging>,
615    store_fn: F,
616    result_callback: C,
617) where
618    E: Clone + DeserializeOwned + 'static,
619    F: FnMut(Vec<E>) + 'static,
620    C: FnOnce(StatusCode) + 'static,
621    MV: MacVerify,
622{
623    let logging = request.logging();
624
625    let pending_fetch = match request.start() {
626        Ok(future) => future,
627        Err(error) => {
628            if logging {
629                debug!("Request failed at init, error: {error}");
630            }
631            result_callback(StatusCode::BadRequest);
632            transfer_state.lock_mut().stop(StatusCode::FetchFailed);
633            return;
634        }
635    };
636    if request.is_load() {
637        transfer_state.lock_mut().start_load();
638    } else {
639        transfer_state.lock_mut().start_store();
640    }
641
642    let context = CollectionFetchContext::<F> {
643        logging,
644        messages,
645        paging,
646        store_fn,
647    };
648
649    spawn_local(async move {
650        let status = execute_collection_fetch::<_, _, MV>(pending_fetch, context).await;
651        result_callback(status);
652        transfer_state.lock_mut().stop(status);
653    });
654}
655
656async fn execute_collection_fetch<E, F, MV>(
657    pending_fetch: PendingFetch,
658    CollectionFetchContext {
659        logging,
660        messages,
661        paging,
662        mut store_fn,
663    }: CollectionFetchContext<F>,
664) -> StatusCode
665where
666    E: Clone + DeserializeOwned,
667    F: FnMut(Vec<E>) + 'static,
668    MV: MacVerify,
669{
670    let mut result = execute_fetch::<CollectionResponse<E>, MV>(pending_fetch).await;
671    match (result.status(), result.take_response()) {
672        (status @ StatusCode::FetchTimeout, _) => {
673            if logging {
674                // TODO: should this warning go also to Messages???
675                debug!(
676                    "Timeout accessing {}.",
677                    result.hint().unwrap_or("?unknown url")
678                );
679            }
680            status
681        }
682        (status @ StatusCode::FetchFailed, _) => {
683            if logging {
684                // TODO: should this warning go also to Messages???
685                debug!(
686                    "Request failed in execution, error: {}",
687                    result.hint().unwrap_or("?unknown")
688                );
689            }
690            status
691        }
692        (status @ StatusCode::DecodeFailed, _) => {
693            if logging {
694                // TODO: should this warning go also to Messages???
695                warn!(
696                    "Response decoding failed, error: {}",
697                    result.hint().unwrap_or("?unknown")
698                );
699            }
700            status
701        }
702        (status, None) => status,
703        (status, Some(response)) => {
704            let (response_entities, response_messages, response_paging) = response.take();
705            messages.replace(response_messages);
706            if status.is_success()
707                && let Some(response_entities) = response_entities
708            {
709                if logging {
710                    trace!("Request successfully fetched collection.");
711                }
712                store_fn(response_entities);
713            }
714            *paging.lock_mut() = response_paging;
715            status
716        }
717    }
718}
719
720impl<E, MV> Default for CollectionStore<E, MV> {
721    fn default() -> Self {
722        Self::new()
723    }
724}
725
726struct CollectionFetchContext<F> {
727    logging: bool,
728    messages: Messages,
729    paging: Mutable<Paging>,
730    store_fn: F,
731}
732
733pub fn collection_state_signal<P, E>(pending: P, empty: E) -> impl Signal<Item = CollectionState>
734where
735    P: Signal<Item = bool>,
736    E: Signal<Item = bool>,
737{
738    map_ref! {
739        pending, empty => {
740            match (pending, empty) {
741                (true, _) => CollectionState::Pending,
742                (false, true) => CollectionState::Empty,
743                (false, false) => CollectionState::NotEmpty,
744            }
745        }
746    }
747    .dedupe()
748}