1use std::marker::PhantomData;
2
3use artwrap::spawn_local;
4use futures_signals::{
5 map_ref,
6 signal::{Mutable, Signal, SignalExt},
7 signal_vec::{
8 MutableSignalVec, MutableVec, MutableVecLockMut, MutableVecLockRef, SignalVec, SignalVecExt,
9 },
10};
11use futures_signals_ext::{MutableExt, MutableVecExt};
12use log::{debug, error, trace, warn};
13use serde::{Serialize, de::DeserializeOwned};
14
15#[cfg(feature = "json")]
16use crate::JSONSerialize;
17#[cfg(any(feature = "json", feature = "postcard"))]
18use crate::MediaType;
19#[cfg(feature = "postcard")]
20use crate::PostcardSerialize;
21use crate::{
22 CollectionResponse, HEADER_SIGNATURE, MacSign, MacVerify, Messages, NoMac, Paging, StatusCode,
23};
24
25use super::{
26 CollectionState,
27 common::{PendingFetch, execute_fetch},
28 request::Request,
29 transferstate::TransferState,
30};
31
32#[derive(Debug)]
33pub struct CollectionStore<E, MV = NoMac> {
34 transfer_state: Mutable<TransferState>,
35 messages: Messages,
36 paging: Mutable<Paging>,
37 collection: MutableVec<E>,
38 pmv: PhantomData<MV>,
39}
40
41impl<E, MV> CollectionStore<E, MV> {
42 pub fn new() -> Self {
43 Self::new_value(vec![])
44 }
45
46 pub fn new_value(collection: Vec<E>) -> Self {
47 Self {
48 transfer_state: Mutable::new(TransferState::Empty),
49 messages: Messages::new(),
50 paging: Mutable::new(Paging::default()),
51 collection: MutableVec::new_with_values(collection),
52 pmv: PhantomData,
53 }
54 }
55
56 pub fn reset(&self) {
57 self.transfer_state.set_neq(TransferState::Empty);
58 self.messages.clear_all();
59 self.paging.set(Paging::default());
60 self.collection.lock_mut().clear();
61 }
62
63 pub fn invalidate(&self) {
64 self.transfer_state.set_neq(TransferState::Empty);
65 }
66
67 pub fn transfer_state(&self) -> TransferState {
68 self.transfer_state.get()
69 }
70
71 pub fn set_transfer_state(&self, transfer_state: TransferState) {
72 self.transfer_state.set_neq(transfer_state);
73 }
74
75 pub fn reset_transfer_error(&self) {
76 self.transfer_state.lock_mut().reset_error();
77 }
78
79 pub fn loaded(&self) -> bool {
80 self.transfer_state.map(TransferState::loaded)
81 }
82
83 pub fn loaded_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
84 self.transfer_state
85 .signal_ref(TransferState::loaded)
86 .dedupe()
87 }
88
89 pub fn loaded_status(&self) -> Option<StatusCode> {
90 self.transfer_state.map(TransferState::loaded_status)
91 }
92
93 pub fn loaded_status_signal(&self) -> impl Signal<Item = Option<StatusCode>> + use<E, MV> {
94 self.transfer_state
95 .signal_ref(TransferState::loaded_status)
96 .dedupe()
97 }
98
99 pub fn stored(&self) -> bool {
100 self.transfer_state.map(TransferState::stored)
101 }
102
103 pub fn stored_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
104 self.transfer_state
105 .signal_ref(TransferState::stored)
106 .dedupe()
107 }
108
109 pub fn stored_status(&self) -> Option<StatusCode> {
110 self.transfer_state.map(TransferState::stored_status)
111 }
112
113 pub fn stored_status_signal(&self) -> impl Signal<Item = Option<StatusCode>> + use<E, MV> {
114 self.transfer_state
115 .signal_ref(TransferState::stored_status)
116 .dedupe()
117 }
118
119 pub fn pending(&self) -> bool {
120 self.transfer_state.map(TransferState::pending)
121 }
122
123 pub fn pending_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
124 self.transfer_state
125 .signal_ref(TransferState::pending)
126 .dedupe()
127 }
128
129 pub fn collection(&self) -> &MutableVec<E> {
130 &self.collection
131 }
132
133 pub fn messages(&self) -> &Messages {
134 &self.messages
135 }
136
137 pub fn paging(&self) -> &Mutable<Paging> {
138 &self.paging
139 }
140
141 pub fn is_empty(&self) -> bool {
142 self.collection.lock_ref().is_empty()
143 }
144
145 pub fn any<F>(&self, f: F) -> bool
146 where
147 F: Fn(&E) -> bool,
148 {
149 self.collection.lock_ref().iter().any(f)
150 }
151
152 pub fn all<F>(&self, f: F) -> bool
153 where
154 F: Fn(&E) -> bool,
155 {
156 self.collection.lock_ref().iter().all(f)
157 }
158
159 pub fn lock_ref(&self) -> MutableVecLockRef<E> {
160 self.collection.lock_ref()
161 }
162
163 pub fn lock_mut(&self) -> MutableVecLockMut<E> {
164 self.collection.lock_mut()
165 }
166
167 pub fn inspect<F>(&self, f: F)
168 where
169 F: FnOnce(&[E]),
170 {
171 self.collection.inspect(f)
172 }
173
174 pub fn inspect_mut<F>(&self, f: F)
175 where
176 F: FnOnce(&mut MutableVecLockMut<E>),
177 {
178 self.collection.inspect_mut(f)
179 }
180
181 pub fn find_map<F, U>(&self, f: F) -> Option<U>
182 where
183 F: Fn(&E) -> Option<U>,
184 {
185 self.collection.lock_ref().iter().find_map(f)
186 }
187
188 pub fn map_vec<F, U>(&self, f: F) -> U
189 where
190 F: FnOnce(&[E]) -> U,
191 {
192 self.collection.map_vec(f)
193 }
194
195 pub fn map_vec_mut<F, U>(&self, f: F) -> U
196 where
197 F: FnOnce(&mut MutableVecLockMut<E>) -> U,
198 {
199 self.collection.map_vec_mut(f)
200 }
201}
202
203impl<E, MV> CollectionStore<E, MV>
204where
205 E: Copy,
206{
207 pub fn empty_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
208 self.collection.signal_vec().is_empty().dedupe()
209 }
210
211 pub fn collection_state_signal(&self) -> impl Signal<Item = CollectionState> + use<E, MV> {
212 collection_state_signal(self.pending_signal(), self.empty_signal())
213 }
214
215 pub fn find<F>(&self, f: F) -> Option<E>
216 where
217 F: Fn(&E) -> bool,
218 {
219 self.find_map(|e| f(e).then_some(*e))
220 }
221
222 pub fn find_inspect_mut<P, F>(&self, predicate: P, f: F) -> Option<bool>
223 where
224 P: FnMut(&E) -> bool,
225 F: FnMut(&mut E) -> bool,
226 {
227 self.collection.find_inspect_mut(predicate, f)
228 }
229
230 pub fn find_set<P>(&self, predicate: P, item: E) -> bool
231 where
232 P: FnMut(&E) -> bool,
233 {
234 self.collection.find_set(predicate, item)
235 }
236
237 pub fn find_set_or_add<P>(&self, predicate: P, item: E)
238 where
239 P: FnMut(&E) -> bool,
240 {
241 self.collection.find_set_or_add(predicate, item);
242 }
243
244 pub fn replace(&self, values: Vec<E>) -> Vec<E> {
245 self.messages.clear_all();
246 let mut collection = self.collection.lock_mut();
247 let current = collection.drain(..).collect();
248 collection.replace(values);
249 current
250 }
251
252 pub fn remove<P>(&self, predicate: P) -> bool
253 where
254 P: FnMut(&E) -> bool,
255 {
256 self.collection.find_remove(predicate)
257 }
258
259 pub fn set_externally_loaded(&self, values: Vec<E>) {
260 self.collection.lock_mut().replace(values);
261 self.transfer_state
262 .set_neq(TransferState::Loaded(StatusCode::Ok));
263 }
264
265 pub fn signal_map<F, U>(&self, f: F) -> impl Signal<Item = U> + use<E, MV, F, U>
266 where
267 F: FnMut(&[E]) -> U,
268 {
269 self.collection.signal_vec().to_signal_map(f)
270 }
271
272 pub fn signal_vec(&self) -> MutableSignalVec<E> {
273 self.collection.signal_vec()
274 }
275
276 pub fn signal_vec_filter<F>(&self, f: F) -> impl SignalVec<Item = E> + use<E, MV, F>
277 where
278 F: FnMut(&E) -> bool,
279 {
280 self.collection.signal_vec_filter(f)
281 }
282
283 pub fn signal_vec_filter_signal<F, U>(
284 &self,
285 f: F,
286 ) -> impl SignalVec<Item = E> + use<E, MV, F, U>
287 where
288 F: FnMut(&E) -> U,
289 U: Signal<Item = bool>,
290 {
291 self.collection.signal_vec_filter_signal(f)
292 }
293
294 pub fn signal_vec_map<F, U>(&self, f: F) -> impl SignalVec<Item = U> + use<E, MV, F, U>
295 where
296 F: FnMut(E) -> U,
297 {
298 self.collection.signal_vec().map(f)
299 }
300
301 pub fn signal_vec_map_signal<F, U>(
302 &self,
303 f: F,
304 ) -> impl SignalVec<Item = U::Item> + use<E, MV, F, U>
305 where
306 F: FnMut(E) -> U,
307 U: Signal,
308 {
309 self.collection.signal_vec().map_signal(f)
310 }
311
312 pub fn signal_vec_filter_map<F, U>(&self, f: F) -> impl SignalVec<Item = U> + use<E, MV, F, U>
313 where
314 F: FnMut(E) -> Option<U>,
315 {
316 self.collection.signal_vec().filter_map(f)
317 }
318}
319
320impl<E, MV> CollectionStore<E, MV>
321where
322 E: Clone,
323{
324 pub fn empty_signal_cloned(&self) -> impl Signal<Item = bool> + use<E, MV> {
325 self.collection.signal_vec_cloned().is_empty().dedupe()
326 }
327
328 pub fn collection_state_signal_cloned(
329 &self,
330 ) -> impl Signal<Item = CollectionState> + use<E, MV> {
331 collection_state_signal(self.pending_signal(), self.empty_signal_cloned())
332 }
333
334 pub fn find_cloned<F>(&self, f: F) -> Option<E>
335 where
336 F: Fn(&E) -> bool,
337 {
338 self.find_map(|e| f(e).then(|| e.clone()))
339 }
340
341 pub fn find_inspect_mut_cloned<P, F>(&self, predicate: P, f: F) -> Option<bool>
342 where
343 P: FnMut(&E) -> bool,
344 F: FnMut(&mut E) -> bool,
345 {
346 self.collection.find_inspect_mut_cloned(predicate, f)
347 }
348
349 pub fn get_cloned(&self) -> Vec<E> {
350 self.collection.lock_ref().to_vec()
351 }
352
353 pub fn find_set_cloned<P>(&self, predicate: P, item: E) -> bool
354 where
355 P: FnMut(&E) -> bool,
356 {
357 self.collection.find_set_cloned(predicate, item)
358 }
359
360 pub fn find_set_or_add_cloned<P>(&self, predicate: P, item: E)
361 where
362 P: FnMut(&E) -> bool,
363 {
364 self.collection.find_set_or_add_cloned(predicate, item);
365 }
366
367 pub fn replace_cloned(&self, values: Vec<E>) -> Vec<E> {
368 self.messages.clear_all();
369 let mut collection = self.collection.lock_mut();
370 let current = collection.drain(..).collect();
371 collection.replace_cloned(values);
372 current
373 }
374
375 pub fn remove_cloned<P>(&self, predicate: P) -> bool
376 where
377 P: FnMut(&E) -> bool,
378 {
379 self.collection.find_remove_cloned(predicate)
380 }
381
382 pub fn set_externally_loaded_cloned(&self, values: Vec<E>) {
383 self.collection.lock_mut().replace_cloned(values);
384 self.transfer_state
385 .set_neq(TransferState::Loaded(StatusCode::Ok));
386 }
387
388 pub fn signal_map_cloned<F, U>(&self, f: F) -> impl Signal<Item = U> + use<E, MV, F, U>
389 where
390 F: FnMut(&[E]) -> U,
391 {
392 self.collection.signal_vec_cloned().to_signal_map(f)
393 }
394
395 pub fn signal_vec_cloned(&self) -> MutableSignalVec<E> {
396 self.collection.signal_vec_cloned()
397 }
398
399 pub fn signal_vec_filter_cloned<F>(&self, f: F) -> impl SignalVec<Item = E> + use<E, MV, F>
400 where
401 F: FnMut(&E) -> bool,
402 {
403 self.collection.signal_vec_filter_cloned(f)
404 }
405
406 pub fn signal_vec_filter_signal_cloned<F, U>(
407 &self,
408 f: F,
409 ) -> impl SignalVec<Item = E> + use<E, MV, F, U>
410 where
411 F: FnMut(&E) -> U,
412 U: Signal<Item = bool>,
413 {
414 self.collection.signal_vec_filter_signal_cloned(f)
415 }
416
417 pub fn signal_vec_map_cloned<F, U>(&self, f: F) -> impl SignalVec<Item = U> + use<E, MV, F, U>
418 where
419 F: FnMut(E) -> U,
420 {
421 self.collection.signal_vec_cloned().map(f)
422 }
423
424 pub fn signal_vec_map_signal_cloned<F, U>(
425 &self,
426 f: F,
427 ) -> impl SignalVec<Item = U::Item> + use<E, MV, F, U>
428 where
429 F: FnMut(E) -> U,
430 U: Signal,
431 {
432 self.collection.signal_vec_cloned().map_signal(f)
433 }
434
435 pub fn signal_vec_filter_map_cloned<F, U>(
436 &self,
437 f: F,
438 ) -> impl SignalVec<Item = U> + use<E, MV, F, U>
439 where
440 F: FnMut(E) -> Option<U>,
441 {
442 self.collection.signal_vec_cloned().filter_map(f)
443 }
444}
445
446impl<E, MV> CollectionStore<E, MV>
447where
448 E: Clone,
449 MV: MacVerify,
450{
451 pub fn load<C>(&self, request: Request<'_>, result_callback: C)
452 where
453 E: DeserializeOwned + 'static,
454 C: FnOnce(StatusCode) + 'static,
455 {
456 if self.transfer_state.map(TransferState::loaded) {
457 if request.logging() {
458 debug!("Request to load {} skipped, using cache", request.url());
459
460 if !request.method().is_load() {
461 warn!(
462 "Load request unexpectedly uses store verb {:?}",
463 request.method().as_str()
464 );
465 }
466 }
467 } else {
468 self.load_skip_cache(request, result_callback);
469 }
470 }
471
472 pub fn load_skip_cache<C>(&self, request: Request<'_>, result_callback: C)
473 where
474 E: DeserializeOwned + 'static,
475 C: FnOnce(StatusCode) + 'static,
476 {
477 if request.logging() {
478 debug!("Request to load {}", request.url());
479
480 if !request.method().is_load() {
481 warn!(
482 "Load request unexpectedly uses store verb {:?}",
483 request.method().as_str()
484 );
485 }
486 }
487
488 let collection = self.collection.clone();
489 fetch::<_, _, _, MV>(
490 request.with_is_load(true),
491 self.transfer_state.clone(),
492 self.messages.clone(),
493 self.paging.clone(),
494 move |new| {
495 collection.lock_mut().replace_cloned(new);
496 },
497 result_callback,
498 );
499 }
500
501 pub fn load_merge<F, C>(&self, request: Request<'_>, merge_fn: F, result_callback: C)
502 where
503 E: DeserializeOwned + 'static,
504 F: FnMut(Vec<E>) + 'static,
505 C: FnOnce(StatusCode) + 'static,
506 {
507 if request.logging() {
508 debug!("Request to load/merge {}", request.url());
509
510 if !request.method().is_load() {
511 warn!(
512 "Load/merge request unexpectedly uses store verb {:?}",
513 request.method().as_str()
514 );
515 }
516 }
517 fetch::<_, _, _, MV>(
518 request.with_is_load(true),
519 self.transfer_state.clone(),
520 self.messages.clone(),
521 self.paging.clone(),
522 merge_fn,
523 result_callback,
524 );
525 }
526
527 pub fn store<MS, C>(&self, request: Request<'_>, result_callback: C)
528 where
529 E: Serialize + DeserializeOwned + 'static,
530 MS: MacSign,
531 C: FnOnce(StatusCode) + 'static,
532 {
533 let mut request = request.with_is_load(false);
534 if request.logging() {
535 debug!("Request to update {}", request.url());
536
537 if request.method().is_load() {
538 warn!(
539 "Store request unexpectedly uses load verb {:?}",
540 request.method().as_str()
541 );
542 }
543 }
544
545 {
546 let collection = self.lock_ref();
548 if !collection.is_empty() {
549 let media_type = match request.media_type() {
550 #[cfg(feature = "json")]
551 Some(media_type @ MediaType::Json) => media_type,
552 #[cfg(feature = "postcard")]
553 Some(media_type @ MediaType::Postcard) => media_type,
554 _ => {
555 if request.logging() {
556 warn!("Request failed as unsupported media type is requested");
557 }
558 self.messages.replace(Messages::from_service_error(
559 "Request failed as unsupported media type is requested",
560 ));
561 self.transfer_state
562 .lock_mut()
563 .stop(StatusCode::UnsupportedMediaType);
564 return;
565 }
566 };
567
568 let content = collection.to_vec();
569 let bytes = match media_type {
570 #[cfg(feature = "json")]
571 MediaType::Json => content.to_json(),
572 #[cfg(feature = "postcard")]
573 MediaType::Postcard => content.to_postcard(),
574 _ => {
575 if request.logging() {
576 error!("Unsupported media type requested, unexpected code flow");
577 }
578 return;
579 }
580 };
581
582 if let Some(signature) = MS::sign(bytes.as_ref()) {
583 request = request.with_header(HEADER_SIGNATURE, signature);
584 }
585
586 request = request.with_body(bytes);
587 }
588 }
589
590 let collection = self.collection.clone();
591 fetch::<_, _, _, MV>(
592 request,
593 self.transfer_state.clone(),
594 self.messages.clone(),
595 self.paging.clone(),
596 move |new| collection.lock_mut().replace_cloned(new),
597 result_callback,
598 );
599 }
600}
601
602fn fetch<E, F, C, MV>(
603 request: Request<'_>,
604 transfer_state: Mutable<TransferState>,
605 messages: Messages,
606 paging: Mutable<Paging>,
607 store_fn: F,
608 result_callback: C,
609) where
610 E: Clone + DeserializeOwned + 'static,
611 F: FnMut(Vec<E>) + 'static,
612 C: FnOnce(StatusCode) + 'static,
613 MV: MacVerify,
614{
615 let logging = request.logging();
616
617 let pending_fetch = match request.start() {
618 Ok(future) => future,
619 Err(error) => {
620 if logging {
621 debug!("Request failed at init, error: {}", error);
622 }
623 result_callback(StatusCode::BadRequest);
624 transfer_state.lock_mut().stop(StatusCode::FetchFailed);
625 return;
626 }
627 };
628 if request.is_load() {
629 transfer_state.lock_mut().start_load();
630 } else {
631 transfer_state.lock_mut().start_store();
632 }
633
634 let context = CollectionFetchContext::<F> {
635 logging,
636 messages,
637 paging,
638 store_fn,
639 };
640
641 spawn_local(async move {
642 let status = execute_collection_fetch::<_, _, MV>(pending_fetch, context).await;
643 result_callback(status);
644 transfer_state.lock_mut().stop(status);
645 });
646}
647
648async fn execute_collection_fetch<E, F, MV>(
649 pending_fetch: PendingFetch,
650 CollectionFetchContext {
651 logging,
652 messages,
653 paging,
654 mut store_fn,
655 }: CollectionFetchContext<F>,
656) -> StatusCode
657where
658 E: Clone + DeserializeOwned,
659 F: FnMut(Vec<E>) + 'static,
660 MV: MacVerify,
661{
662 let mut result = execute_fetch::<CollectionResponse<E>, MV>(pending_fetch).await;
663 match (result.status(), result.take_response()) {
664 (status @ StatusCode::FetchTimeout, _) => {
665 if logging {
666 debug!(
668 "Timeout accessing {}.",
669 result.hint().unwrap_or("?unknown url")
670 );
671 }
672 status
673 }
674 (status @ StatusCode::FetchFailed, _) => {
675 if logging {
676 debug!(
678 "Request failed in execution, error: {}",
679 result.hint().unwrap_or("?unknown")
680 );
681 }
682 status
683 }
684 (status @ StatusCode::DecodeFailed, _) => {
685 if logging {
686 warn!(
688 "Response decoding failed, error: {}",
689 result.hint().unwrap_or("?unknown")
690 );
691 }
692 status
693 }
694 (status, None) => status,
695 (status, Some(response)) => {
696 let (response_entities, response_messages, response_paging) = response.take();
697 messages.replace(response_messages);
698 if status.is_success() {
699 if let Some(response_entities) = response_entities {
700 if logging {
701 trace!("Request successfully fetched collection.");
702 }
703 store_fn(response_entities);
704 }
705 }
706 *paging.lock_mut() = response_paging;
707 status
708 }
709 }
710}
711
712impl<E, MV> Default for CollectionStore<E, MV> {
713 fn default() -> Self {
714 Self::new()
715 }
716}
717
718struct CollectionFetchContext<F> {
719 logging: bool,
720 messages: Messages,
721 paging: Mutable<Paging>,
722 store_fn: F,
723}
724
725pub fn collection_state_signal<P, E>(pending: P, empty: E) -> impl Signal<Item = CollectionState>
726where
727 P: Signal<Item = bool>,
728 E: Signal<Item = bool>,
729{
730 map_ref! {
731 pending, empty => {
732 match (pending, empty) {
733 (true, _) => CollectionState::Pending,
734 (false, true) => CollectionState::Empty,
735 (false, false) => CollectionState::NotEmpty,
736 }
737 }
738 }
739 .dedupe()
740}