1use std::marker::PhantomData;
2
3use artwrap::spawn_local;
4use futures_signals::{
5 map_ref,
6 signal::{Mutable, Signal, SignalExt},
7 signal_vec::{
8 MutableSignalVec, MutableVec, MutableVecLockMut, MutableVecLockRef, SignalVec, SignalVecExt,
9 },
10};
11use futures_signals_ext::{MutableExt, MutableVecExt};
12use log::{debug, error, trace, warn};
13use serde::{Serialize, de::DeserializeOwned};
14
15#[cfg(feature = "json")]
16use crate::JSONSerialize;
17#[cfg(any(feature = "json", feature = "postcard"))]
18use crate::MediaType;
19#[cfg(feature = "postcard")]
20use crate::PostcardSerialize;
21use crate::{
22 CollectionResponse, HEADER_SIGNATURE, MacSign, MacVerify, Messages, NoMac, Paging, StatusCode,
23};
24
25use super::{
26 CollectionState,
27 common::{PendingFetch, execute_fetch},
28 request::Request,
29 transferstate::TransferState,
30};
31
32pub struct CollectionStore<E, MV = NoMac> {
33 transfer_state: Mutable<TransferState>,
34 messages: Messages,
35 paging: Mutable<Paging>,
36 collection: MutableVec<E>,
37 pmv: PhantomData<MV>,
38}
39
40impl<E, MV> CollectionStore<E, MV> {
41 pub fn new() -> Self {
42 Self::new_value(vec![])
43 }
44
45 pub fn new_value(collection: Vec<E>) -> Self {
46 Self {
47 transfer_state: Mutable::new(TransferState::Empty),
48 messages: Messages::new(),
49 paging: Mutable::new(Paging::default()),
50 collection: MutableVec::new_with_values(collection),
51 pmv: PhantomData,
52 }
53 }
54
55 pub fn reset(&self) {
56 self.transfer_state.set_neq(TransferState::Empty);
57 self.messages.clear_all();
58 self.paging.set(Paging::default());
59 self.collection.lock_mut().clear();
60 }
61
62 pub fn invalidate(&self) {
63 self.transfer_state.set_neq(TransferState::Empty);
64 }
65
66 pub fn transfer_state(&self) -> TransferState {
67 self.transfer_state.get()
68 }
69
70 pub fn set_transfer_state(&self, transfer_state: TransferState) {
71 self.transfer_state.set_neq(transfer_state);
72 }
73
74 pub fn reset_transfer_error(&self) {
75 self.transfer_state.lock_mut().reset_error();
76 }
77
78 pub fn loaded(&self) -> bool {
79 self.transfer_state.map(TransferState::loaded)
80 }
81
82 pub fn loaded_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
83 self.transfer_state
84 .signal_ref(TransferState::loaded)
85 .dedupe()
86 }
87
88 pub fn loaded_status(&self) -> Option<StatusCode> {
89 self.transfer_state.map(TransferState::loaded_status)
90 }
91
92 pub fn loaded_status_signal(&self) -> impl Signal<Item = Option<StatusCode>> + use<E, MV> {
93 self.transfer_state
94 .signal_ref(TransferState::loaded_status)
95 .dedupe()
96 }
97
98 pub fn stored(&self) -> bool {
99 self.transfer_state.map(TransferState::stored)
100 }
101
102 pub fn stored_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
103 self.transfer_state
104 .signal_ref(TransferState::stored)
105 .dedupe()
106 }
107
108 pub fn stored_status(&self) -> Option<StatusCode> {
109 self.transfer_state.map(TransferState::stored_status)
110 }
111
112 pub fn stored_status_signal(&self) -> impl Signal<Item = Option<StatusCode>> + use<E, MV> {
113 self.transfer_state
114 .signal_ref(TransferState::stored_status)
115 .dedupe()
116 }
117
118 pub fn pending(&self) -> bool {
119 self.transfer_state.map(TransferState::pending)
120 }
121
122 pub fn pending_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
123 self.transfer_state
124 .signal_ref(TransferState::pending)
125 .dedupe()
126 }
127
128 pub fn collection(&self) -> &MutableVec<E> {
129 &self.collection
130 }
131
132 pub fn messages(&self) -> &Messages {
133 &self.messages
134 }
135
136 pub fn paging(&self) -> &Mutable<Paging> {
137 &self.paging
138 }
139
140 pub fn is_empty(&self) -> bool {
141 self.collection.lock_ref().is_empty()
142 }
143
144 pub fn any<F>(&self, f: F) -> bool
145 where
146 F: Fn(&E) -> bool,
147 {
148 self.collection.lock_ref().iter().any(f)
149 }
150
151 pub fn all<F>(&self, f: F) -> bool
152 where
153 F: Fn(&E) -> bool,
154 {
155 self.collection.lock_ref().iter().all(f)
156 }
157
158 pub fn lock_ref(&self) -> MutableVecLockRef<'_, E> {
159 self.collection.lock_ref()
160 }
161
162 pub fn lock_mut(&self) -> MutableVecLockMut<'_, E> {
163 self.collection.lock_mut()
164 }
165
166 pub fn inspect<F>(&self, f: F)
167 where
168 F: FnOnce(&[E]),
169 {
170 self.collection.inspect(f)
171 }
172
173 pub fn inspect_mut<F>(&self, f: F)
174 where
175 F: FnOnce(&mut MutableVecLockMut<E>),
176 {
177 self.collection.inspect_mut(f)
178 }
179
180 pub fn find_map<F, U>(&self, f: F) -> Option<U>
181 where
182 F: Fn(&E) -> Option<U>,
183 {
184 self.collection.lock_ref().iter().find_map(f)
185 }
186
187 pub fn map_vec<F, U>(&self, f: F) -> U
188 where
189 F: FnOnce(&[E]) -> U,
190 {
191 self.collection.map_vec(f)
192 }
193
194 pub fn map_vec_mut<F, U>(&self, f: F) -> U
195 where
196 F: FnOnce(&mut MutableVecLockMut<E>) -> U,
197 {
198 self.collection.map_vec_mut(f)
199 }
200}
201
202impl<E, MV> CollectionStore<E, MV>
203where
204 E: Copy,
205{
206 pub fn empty_signal(&self) -> impl Signal<Item = bool> + use<E, MV> {
207 self.collection.signal_vec().is_empty().dedupe()
208 }
209
210 pub fn collection_state_signal(&self) -> impl Signal<Item = CollectionState> + use<E, MV> {
211 collection_state_signal(self.pending_signal(), self.empty_signal())
212 }
213
214 pub fn find<F>(&self, f: F) -> Option<E>
215 where
216 F: Fn(&E) -> bool,
217 {
218 self.find_map(|e| f(e).then_some(*e))
219 }
220
221 pub fn find_inspect_mut<P, F>(&self, predicate: P, f: F) -> Option<bool>
222 where
223 P: FnMut(&E) -> bool,
224 F: FnMut(&mut E) -> bool,
225 {
226 self.collection.find_inspect_mut(predicate, f)
227 }
228
229 pub fn find_set<P>(&self, predicate: P, item: E) -> bool
230 where
231 P: FnMut(&E) -> bool,
232 {
233 self.collection.find_set(predicate, item)
234 }
235
236 pub fn find_set_or_add<P>(&self, predicate: P, item: E)
237 where
238 P: FnMut(&E) -> bool,
239 {
240 self.collection.find_set_or_add(predicate, item);
241 }
242
243 pub fn replace(&self, values: Vec<E>) -> Vec<E> {
244 self.messages.clear_all();
245 let mut collection = self.collection.lock_mut();
246 let current = collection.drain(..).collect();
247 collection.replace(values);
248 current
249 }
250
251 pub fn remove<P>(&self, predicate: P) -> bool
252 where
253 P: FnMut(&E) -> bool,
254 {
255 self.collection.find_remove(predicate)
256 }
257
258 pub fn set_externally_loaded(&self, values: Vec<E>) {
259 self.collection.lock_mut().replace(values);
260 self.transfer_state
261 .set_neq(TransferState::Loaded(StatusCode::Ok));
262 }
263
264 pub fn signal_map<F, U>(&self, f: F) -> impl Signal<Item = U> + use<E, MV, F, U>
265 where
266 F: FnMut(&[E]) -> U,
267 {
268 self.collection.signal_vec().to_signal_map(f)
269 }
270
271 pub fn signal_vec(&self) -> MutableSignalVec<E> {
272 self.collection.signal_vec()
273 }
274
275 pub fn signal_vec_filter<F>(&self, f: F) -> impl SignalVec<Item = E> + use<E, MV, F>
276 where
277 F: FnMut(&E) -> bool,
278 {
279 self.collection.signal_vec_filter(f)
280 }
281
282 pub fn signal_vec_filter_signal<F, U>(
283 &self,
284 f: F,
285 ) -> impl SignalVec<Item = E> + use<E, MV, F, U>
286 where
287 F: FnMut(&E) -> U,
288 U: Signal<Item = bool>,
289 {
290 self.collection.signal_vec_filter_signal(f)
291 }
292
293 pub fn signal_vec_map<F, U>(&self, f: F) -> impl SignalVec<Item = U> + use<E, MV, F, U>
294 where
295 F: FnMut(E) -> U,
296 {
297 self.collection.signal_vec().map(f)
298 }
299
300 pub fn signal_vec_map_signal<F, U>(
301 &self,
302 f: F,
303 ) -> impl SignalVec<Item = U::Item> + use<E, MV, F, U>
304 where
305 F: FnMut(E) -> U,
306 U: Signal,
307 {
308 self.collection.signal_vec().map_signal(f)
309 }
310
311 pub fn signal_vec_filter_map<F, U>(&self, f: F) -> impl SignalVec<Item = U> + use<E, MV, F, U>
312 where
313 F: FnMut(E) -> Option<U>,
314 {
315 self.collection.signal_vec().filter_map(f)
316 }
317}
318
319impl<E, MV> CollectionStore<E, MV>
320where
321 E: Clone,
322{
323 pub fn empty_signal_cloned(&self) -> impl Signal<Item = bool> + use<E, MV> {
324 self.collection.signal_vec_cloned().is_empty().dedupe()
325 }
326
327 pub fn collection_state_signal_cloned(
328 &self,
329 ) -> impl Signal<Item = CollectionState> + use<E, MV> {
330 collection_state_signal(self.pending_signal(), self.empty_signal_cloned())
331 }
332
333 pub fn find_cloned<F>(&self, f: F) -> Option<E>
334 where
335 F: Fn(&E) -> bool,
336 {
337 self.find_map(|e| f(e).then(|| e.clone()))
338 }
339
340 pub fn find_inspect_mut_cloned<P, F>(&self, predicate: P, f: F) -> Option<bool>
341 where
342 P: FnMut(&E) -> bool,
343 F: FnMut(&mut E) -> bool,
344 {
345 self.collection.find_inspect_mut_cloned(predicate, f)
346 }
347
348 pub fn get_cloned(&self) -> Vec<E> {
349 self.collection.lock_ref().to_vec()
350 }
351
352 pub fn find_set_cloned<P>(&self, predicate: P, item: E) -> bool
353 where
354 P: FnMut(&E) -> bool,
355 {
356 self.collection.find_set_cloned(predicate, item)
357 }
358
359 pub fn find_set_or_add_cloned<P>(&self, predicate: P, item: E)
360 where
361 P: FnMut(&E) -> bool,
362 {
363 self.collection.find_set_or_add_cloned(predicate, item);
364 }
365
366 pub fn replace_cloned(&self, values: Vec<E>) -> Vec<E> {
367 self.messages.clear_all();
368 let mut collection = self.collection.lock_mut();
369 let current = collection.drain(..).collect();
370 collection.replace_cloned(values);
371 current
372 }
373
374 pub fn remove_cloned<P>(&self, predicate: P) -> bool
375 where
376 P: FnMut(&E) -> bool,
377 {
378 self.collection.find_remove_cloned(predicate)
379 }
380
381 pub fn set_externally_loaded_cloned(&self, values: Vec<E>) {
382 self.collection.lock_mut().replace_cloned(values);
383 self.transfer_state
384 .set_neq(TransferState::Loaded(StatusCode::Ok));
385 }
386
387 pub fn signal_map_cloned<F, U>(&self, f: F) -> impl Signal<Item = U> + use<E, MV, F, U>
388 where
389 F: FnMut(&[E]) -> U,
390 {
391 self.collection.signal_vec_cloned().to_signal_map(f)
392 }
393
394 pub fn signal_vec_cloned(&self) -> MutableSignalVec<E> {
395 self.collection.signal_vec_cloned()
396 }
397
398 pub fn signal_vec_filter_cloned<F>(&self, f: F) -> impl SignalVec<Item = E> + use<E, MV, F>
399 where
400 F: FnMut(&E) -> bool,
401 {
402 self.collection.signal_vec_filter_cloned(f)
403 }
404
405 pub fn signal_vec_filter_signal_cloned<F, U>(
406 &self,
407 f: F,
408 ) -> impl SignalVec<Item = E> + use<E, MV, F, U>
409 where
410 F: FnMut(&E) -> U,
411 U: Signal<Item = bool>,
412 {
413 self.collection.signal_vec_filter_signal_cloned(f)
414 }
415
416 pub fn signal_vec_map_cloned<F, U>(&self, f: F) -> impl SignalVec<Item = U> + use<E, MV, F, U>
417 where
418 F: FnMut(E) -> U,
419 {
420 self.collection.signal_vec_cloned().map(f)
421 }
422
423 pub fn signal_vec_map_signal_cloned<F, U>(
424 &self,
425 f: F,
426 ) -> impl SignalVec<Item = U::Item> + use<E, MV, F, U>
427 where
428 F: FnMut(E) -> U,
429 U: Signal,
430 {
431 self.collection.signal_vec_cloned().map_signal(f)
432 }
433
434 pub fn signal_vec_filter_map_cloned<F, U>(
435 &self,
436 f: F,
437 ) -> impl SignalVec<Item = U> + use<E, MV, F, U>
438 where
439 F: FnMut(E) -> Option<U>,
440 {
441 self.collection.signal_vec_cloned().filter_map(f)
442 }
443}
444
445impl<E, MV> CollectionStore<E, MV>
446where
447 E: Clone,
448 MV: MacVerify,
449{
450 pub fn load<C>(&self, request: Request<'_>, result_callback: C)
451 where
452 E: DeserializeOwned + 'static,
453 C: FnOnce(StatusCode) + 'static,
454 {
455 if self.transfer_state.map(TransferState::loaded) {
456 if request.logging() {
457 debug!("Request to load {} skipped, using cache", request.url());
458
459 if !request.method().is_load() {
460 warn!(
461 "Load request unexpectedly uses store verb {:?}",
462 request.method().as_str()
463 );
464 }
465 }
466 } else {
467 self.load_skip_cache(request, result_callback);
468 }
469 }
470
471 pub fn load_skip_cache<C>(&self, request: Request<'_>, result_callback: C)
472 where
473 E: DeserializeOwned + 'static,
474 C: FnOnce(StatusCode) + 'static,
475 {
476 if request.logging() {
477 debug!("Request to load {}", request.url());
478
479 if !request.method().is_load() {
480 warn!(
481 "Load request unexpectedly uses store verb {:?}",
482 request.method().as_str()
483 );
484 }
485 }
486
487 let collection = self.collection.clone();
488 fetch::<_, _, _, MV>(
489 request.with_is_load(true),
490 self.transfer_state.clone(),
491 self.messages.clone(),
492 self.paging.clone(),
493 move |new| {
494 collection.lock_mut().replace_cloned(new);
495 },
496 result_callback,
497 );
498 }
499
500 pub fn load_merge<F, C>(&self, request: Request<'_>, merge_fn: F, result_callback: C)
501 where
502 E: DeserializeOwned + 'static,
503 F: FnMut(Vec<E>) + 'static,
504 C: FnOnce(StatusCode) + 'static,
505 {
506 if request.logging() {
507 debug!("Request to load/merge {}", request.url());
508
509 if !request.method().is_load() {
510 warn!(
511 "Load/merge request unexpectedly uses store verb {:?}",
512 request.method().as_str()
513 );
514 }
515 }
516 fetch::<_, _, _, MV>(
517 request.with_is_load(true),
518 self.transfer_state.clone(),
519 self.messages.clone(),
520 self.paging.clone(),
521 merge_fn,
522 result_callback,
523 );
524 }
525
526 pub fn store<MS, C>(&self, request: Request<'_>, result_callback: C)
527 where
528 E: Serialize + DeserializeOwned + 'static,
529 MS: MacSign,
530 C: FnOnce(StatusCode) + 'static,
531 {
532 let mut request = request.with_is_load(false);
533 if request.logging() {
534 debug!("Request to update {}", request.url());
535
536 if request.method().is_load() {
537 warn!(
538 "Store request unexpectedly uses load verb {:?}",
539 request.method().as_str()
540 );
541 }
542 }
543
544 {
545 let collection = self.lock_ref();
547 if !collection.is_empty() {
548 let media_type = match request.media_type() {
549 #[cfg(feature = "json")]
550 Some(media_type @ MediaType::Json) => media_type,
551 #[cfg(feature = "postcard")]
552 Some(media_type @ MediaType::Postcard) => media_type,
553 _ => {
554 if request.logging() {
555 warn!("Request failed as unsupported media type is requested");
556 }
557 self.messages.replace(Messages::from_service_error(
558 "Request failed as unsupported media type is requested",
559 ));
560 self.transfer_state
561 .lock_mut()
562 .stop(StatusCode::UnsupportedMediaType);
563 return;
564 }
565 };
566
567 let content = collection.to_vec();
568 let bytes = match media_type {
569 #[cfg(feature = "json")]
570 MediaType::Json => content.to_json(),
571 #[cfg(feature = "postcard")]
572 MediaType::Postcard => content.to_postcard(),
573 _ => {
574 if request.logging() {
575 error!("Unsupported media type requested, unexpected code flow");
576 }
577 return;
578 }
579 };
580 let bytes = match bytes {
581 Ok(bytes) => bytes,
582 Err(error) => {
583 if request.logging() {
584 error!("Cannot serialize collection: {error}");
585 }
586 return;
587 }
588 };
589
590 if let Some(signature) = MS::sign(bytes.as_ref()) {
591 request = request.with_header(HEADER_SIGNATURE, signature);
592 }
593
594 request = request.with_body(bytes);
595 }
596 }
597
598 let collection = self.collection.clone();
599 fetch::<_, _, _, MV>(
600 request,
601 self.transfer_state.clone(),
602 self.messages.clone(),
603 self.paging.clone(),
604 move |new| collection.lock_mut().replace_cloned(new),
605 result_callback,
606 );
607 }
608}
609
610fn fetch<E, F, C, MV>(
611 request: Request<'_>,
612 transfer_state: Mutable<TransferState>,
613 messages: Messages,
614 paging: Mutable<Paging>,
615 store_fn: F,
616 result_callback: C,
617) where
618 E: Clone + DeserializeOwned + 'static,
619 F: FnMut(Vec<E>) + 'static,
620 C: FnOnce(StatusCode) + 'static,
621 MV: MacVerify,
622{
623 let logging = request.logging();
624
625 let pending_fetch = match request.start() {
626 Ok(future) => future,
627 Err(error) => {
628 if logging {
629 debug!("Request failed at init, error: {error}");
630 }
631 result_callback(StatusCode::BadRequest);
632 transfer_state.lock_mut().stop(StatusCode::FetchFailed);
633 return;
634 }
635 };
636 if request.is_load() {
637 transfer_state.lock_mut().start_load();
638 } else {
639 transfer_state.lock_mut().start_store();
640 }
641
642 let context = CollectionFetchContext::<F> {
643 logging,
644 messages,
645 paging,
646 store_fn,
647 };
648
649 spawn_local(async move {
650 let status = execute_collection_fetch::<_, _, MV>(pending_fetch, context).await;
651 result_callback(status);
652 transfer_state.lock_mut().stop(status);
653 });
654}
655
656async fn execute_collection_fetch<E, F, MV>(
657 pending_fetch: PendingFetch,
658 CollectionFetchContext {
659 logging,
660 messages,
661 paging,
662 mut store_fn,
663 }: CollectionFetchContext<F>,
664) -> StatusCode
665where
666 E: Clone + DeserializeOwned,
667 F: FnMut(Vec<E>) + 'static,
668 MV: MacVerify,
669{
670 let mut result = execute_fetch::<CollectionResponse<E>, MV>(pending_fetch).await;
671 match (result.status(), result.take_response()) {
672 (status @ StatusCode::FetchTimeout, _) => {
673 if logging {
674 debug!(
676 "Timeout accessing {}.",
677 result.hint().unwrap_or("?unknown url")
678 );
679 }
680 status
681 }
682 (status @ StatusCode::FetchFailed, _) => {
683 if logging {
684 debug!(
686 "Request failed in execution, error: {}",
687 result.hint().unwrap_or("?unknown")
688 );
689 }
690 status
691 }
692 (status @ StatusCode::DecodeFailed, _) => {
693 if logging {
694 warn!(
696 "Response decoding failed, error: {}",
697 result.hint().unwrap_or("?unknown")
698 );
699 }
700 status
701 }
702 (status, None) => status,
703 (status, Some(response)) => {
704 let (response_entities, response_messages, response_paging) = response.take();
705 messages.replace(response_messages);
706 if status.is_success()
707 && let Some(response_entities) = response_entities
708 {
709 if logging {
710 trace!("Request successfully fetched collection.");
711 }
712 store_fn(response_entities);
713 }
714 *paging.lock_mut() = response_paging;
715 status
716 }
717 }
718}
719
720impl<E, MV> Default for CollectionStore<E, MV> {
721 fn default() -> Self {
722 Self::new()
723 }
724}
725
726struct CollectionFetchContext<F> {
727 logging: bool,
728 messages: Messages,
729 paging: Mutable<Paging>,
730 store_fn: F,
731}
732
733pub fn collection_state_signal<P, E>(pending: P, empty: E) -> impl Signal<Item = CollectionState>
734where
735 P: Signal<Item = bool>,
736 E: Signal<Item = bool>,
737{
738 map_ref! {
739 pending, empty => {
740 match (pending, empty) {
741 (true, _) => CollectionState::Pending,
742 (false, true) => CollectionState::Empty,
743 (false, false) => CollectionState::NotEmpty,
744 }
745 }
746 }
747 .dedupe()
748}