1use std::{
16 borrow::Borrow,
17 collections::{BTreeMap, BTreeSet, HashMap},
18 fmt,
19 ops::Deref,
20 sync::Arc,
21};
22
23use as_variant::as_variant;
24use async_trait::async_trait;
25use growable_bloom_filter::GrowableBloom;
26use matrix_sdk_common::{AsyncTraitDeps, ttl::TtlValue};
27use ruma::{
28 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
29 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
30 api::{
31 MatrixVersion, SupportedVersions,
32 client::{
33 discovery::{
34 discover_homeserver::{self, HomeserverInfo, IdentityServerInfo, TileServerInfo},
35 get_capabilities::v3::Capabilities,
36 },
37 rtc::RtcTransport,
38 },
39 },
40 events::{
41 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, EmptyStateKey, GlobalAccountDataEvent,
42 GlobalAccountDataEventContent, GlobalAccountDataEventType, RedactContent,
43 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
44 RoomAccountDataEventType, StateEventType, StaticEventContent, StaticStateEventContent,
45 presence::PresenceEvent,
46 receipt::{Receipt, ReceiptThread, ReceiptType},
47 },
48 serde::Raw,
49};
50use serde::{Deserialize, Serialize};
51use thiserror::Error;
52use tokio::sync::{Mutex, MutexGuard};
53
54use super::{
55 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
56 QueuedRequest, QueuedRequestKind, RoomLoadSettings, StateChanges, StoreError,
57 send_queue::SentRequestKey,
58};
59use crate::{
60 MinimalRoomMemberEvent, RoomInfo, RoomMemberships,
61 deserialized_responses::{
62 DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
63 },
64 store::StoredThreadSubscription,
65};
66
67#[cfg_attr(target_family = "wasm", async_trait(?Send))]
70#[cfg_attr(not(target_family = "wasm"), async_trait)]
71pub trait StateStore: AsyncTraitDeps {
72 type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error>;
74
75 async fn get_kv_data(
81 &self,
82 key: StateStoreDataKey<'_>,
83 ) -> Result<Option<StateStoreDataValue>, Self::Error>;
84
85 async fn set_kv_data(
95 &self,
96 key: StateStoreDataKey<'_>,
97 value: StateStoreDataValue,
98 ) -> Result<(), Self::Error>;
99
100 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error>;
106
107 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error>;
109
110 async fn get_presence_event(
117 &self,
118 user_id: &UserId,
119 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;
120
121 async fn get_presence_events(
127 &self,
128 user_ids: &[OwnedUserId],
129 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;
130
131 async fn get_state_event(
139 &self,
140 room_id: &RoomId,
141 event_type: StateEventType,
142 state_key: &str,
143 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error>;
144
145 async fn get_state_events(
153 &self,
154 room_id: &RoomId,
155 event_type: StateEventType,
156 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
157
158 async fn get_state_events_for_keys(
169 &self,
170 room_id: &RoomId,
171 event_type: StateEventType,
172 state_keys: &[&str],
173 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
174
175 async fn get_profile(
183 &self,
184 room_id: &RoomId,
185 user_id: &UserId,
186 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error>;
187
188 async fn get_profiles<'a>(
196 &self,
197 room_id: &RoomId,
198 user_ids: &'a [OwnedUserId],
199 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error>;
200
201 async fn get_user_ids(
204 &self,
205 room_id: &RoomId,
206 memberships: RoomMemberships,
207 ) -> Result<Vec<OwnedUserId>, Self::Error>;
208
209 async fn get_room_infos(
211 &self,
212 room_load_settings: &RoomLoadSettings,
213 ) -> Result<Vec<RoomInfo>, Self::Error>;
214
215 async fn get_users_with_display_name(
224 &self,
225 room_id: &RoomId,
226 display_name: &DisplayName,
227 ) -> Result<BTreeSet<OwnedUserId>, Self::Error>;
228
229 async fn get_users_with_display_names<'a>(
237 &self,
238 room_id: &RoomId,
239 display_names: &'a [DisplayName],
240 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error>;
241
242 async fn get_account_data_event(
248 &self,
249 event_type: GlobalAccountDataEventType,
250 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error>;
251
252 async fn get_room_account_data_event(
262 &self,
263 room_id: &RoomId,
264 event_type: RoomAccountDataEventType,
265 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error>;
266
267 async fn get_user_room_receipt_event(
280 &self,
281 room_id: &RoomId,
282 receipt_type: ReceiptType,
283 thread: ReceiptThread,
284 user_id: &UserId,
285 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error>;
286
287 async fn get_event_room_receipt_events(
301 &self,
302 room_id: &RoomId,
303 receipt_type: ReceiptType,
304 thread: ReceiptThread,
305 event_id: &EventId,
306 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error>;
307
308 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
314
315 async fn set_custom_value(
324 &self,
325 key: &[u8],
326 value: Vec<u8>,
327 ) -> Result<Option<Vec<u8>>, Self::Error>;
328
329 async fn set_custom_value_no_read(
343 &self,
344 key: &[u8],
345 value: Vec<u8>,
346 ) -> Result<(), Self::Error> {
347 self.set_custom_value(key, value).await.map(|_| ())
348 }
349
350 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
356
357 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>;
363
364 async fn save_send_queue_request(
374 &self,
375 room_id: &RoomId,
376 transaction_id: OwnedTransactionId,
377 created_at: MilliSecondsSinceUnixEpoch,
378 request: QueuedRequestKind,
379 priority: usize,
380 ) -> Result<(), Self::Error>;
381
382 async fn update_send_queue_request(
394 &self,
395 room_id: &RoomId,
396 transaction_id: &TransactionId,
397 content: QueuedRequestKind,
398 ) -> Result<bool, Self::Error>;
399
400 async fn remove_send_queue_request(
406 &self,
407 room_id: &RoomId,
408 transaction_id: &TransactionId,
409 ) -> Result<bool, Self::Error>;
410
411 async fn load_send_queue_requests(
417 &self,
418 room_id: &RoomId,
419 ) -> Result<Vec<QueuedRequest>, Self::Error>;
420
421 async fn update_send_queue_request_status(
424 &self,
425 room_id: &RoomId,
426 transaction_id: &TransactionId,
427 error: Option<QueueWedgeError>,
428 ) -> Result<(), Self::Error>;
429
430 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
432
433 async fn save_dependent_queued_request(
436 &self,
437 room_id: &RoomId,
438 parent_txn_id: &TransactionId,
439 own_txn_id: ChildTransactionId,
440 created_at: MilliSecondsSinceUnixEpoch,
441 content: DependentQueuedRequestKind,
442 ) -> Result<(), Self::Error>;
443
444 async fn mark_dependent_queued_requests_as_ready(
453 &self,
454 room_id: &RoomId,
455 parent_txn_id: &TransactionId,
456 sent_parent_key: SentRequestKey,
457 ) -> Result<usize, Self::Error>;
458
459 async fn update_dependent_queued_request(
463 &self,
464 room_id: &RoomId,
465 own_transaction_id: &ChildTransactionId,
466 new_content: DependentQueuedRequestKind,
467 ) -> Result<bool, Self::Error>;
468
469 async fn remove_dependent_queued_request(
474 &self,
475 room: &RoomId,
476 own_txn_id: &ChildTransactionId,
477 ) -> Result<bool, Self::Error>;
478
479 async fn load_dependent_queued_requests(
485 &self,
486 room: &RoomId,
487 ) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
488
489 async fn upsert_thread_subscriptions(
499 &self,
500 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
501 ) -> Result<(), Self::Error>;
502
503 async fn remove_thread_subscription(
507 &self,
508 room: &RoomId,
509 thread_id: &EventId,
510 ) -> Result<(), Self::Error>;
511
512 async fn load_thread_subscription(
516 &self,
517 room: &RoomId,
518 thread_id: &EventId,
519 ) -> Result<Option<StoredThreadSubscription>, Self::Error>;
520
521 async fn close(&self) -> Result<(), Self::Error>;
527
528 async fn reopen(&self) -> Result<(), Self::Error>;
531
532 #[doc(hidden)]
538 async fn optimize(&self) -> Result<(), Self::Error>;
539
540 async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
542}
543
544#[cfg_attr(target_family = "wasm", async_trait(?Send))]
545#[cfg_attr(not(target_family = "wasm"), async_trait)]
546impl<T: StateStore> StateStore for &T {
547 type Error = T::Error;
548
549 async fn get_kv_data(
550 &self,
551 key: StateStoreDataKey<'_>,
552 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
553 (*self).get_kv_data(key).await
554 }
555
556 async fn set_kv_data(
557 &self,
558 key: StateStoreDataKey<'_>,
559 value: StateStoreDataValue,
560 ) -> Result<(), Self::Error> {
561 (*self).set_kv_data(key, value).await
562 }
563
564 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
565 (*self).remove_kv_data(key).await
566 }
567
568 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
569 (*self).save_changes(changes).await
570 }
571
572 async fn get_presence_event(
573 &self,
574 user_id: &UserId,
575 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
576 (*self).get_presence_event(user_id).await
577 }
578
579 async fn get_presence_events(
580 &self,
581 user_ids: &[OwnedUserId],
582 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
583 (*self).get_presence_events(user_ids).await
584 }
585
586 async fn get_state_event(
587 &self,
588 room_id: &RoomId,
589 event_type: StateEventType,
590 state_key: &str,
591 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
592 (*self).get_state_event(room_id, event_type, state_key).await
593 }
594
595 async fn get_state_events(
596 &self,
597 room_id: &RoomId,
598 event_type: StateEventType,
599 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
600 (*self).get_state_events(room_id, event_type).await
601 }
602
603 async fn get_state_events_for_keys(
604 &self,
605 room_id: &RoomId,
606 event_type: StateEventType,
607 state_keys: &[&str],
608 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
609 (*self).get_state_events_for_keys(room_id, event_type, state_keys).await
610 }
611
612 async fn get_profile(
613 &self,
614 room_id: &RoomId,
615 user_id: &UserId,
616 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
617 (*self).get_profile(room_id, user_id).await
618 }
619
620 async fn get_profiles<'a>(
621 &self,
622 room_id: &RoomId,
623 user_ids: &'a [OwnedUserId],
624 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
625 (*self).get_profiles(room_id, user_ids).await
626 }
627
628 async fn get_user_ids(
629 &self,
630 room_id: &RoomId,
631 memberships: RoomMemberships,
632 ) -> Result<Vec<OwnedUserId>, Self::Error> {
633 (*self).get_user_ids(room_id, memberships).await
634 }
635
636 async fn get_room_infos(
637 &self,
638 room_load_settings: &RoomLoadSettings,
639 ) -> Result<Vec<RoomInfo>, Self::Error> {
640 (*self).get_room_infos(room_load_settings).await
641 }
642
643 async fn get_users_with_display_name(
644 &self,
645 room_id: &RoomId,
646 display_name: &DisplayName,
647 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
648 (*self).get_users_with_display_name(room_id, display_name).await
649 }
650
651 async fn get_users_with_display_names<'a>(
652 &self,
653 room_id: &RoomId,
654 display_names: &'a [DisplayName],
655 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
656 (*self).get_users_with_display_names(room_id, display_names).await
657 }
658
659 async fn get_account_data_event(
660 &self,
661 event_type: GlobalAccountDataEventType,
662 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
663 (*self).get_account_data_event(event_type).await
664 }
665
666 async fn get_room_account_data_event(
667 &self,
668 room_id: &RoomId,
669 event_type: RoomAccountDataEventType,
670 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
671 (*self).get_room_account_data_event(room_id, event_type).await
672 }
673
674 async fn get_user_room_receipt_event(
675 &self,
676 room_id: &RoomId,
677 receipt_type: ReceiptType,
678 thread: ReceiptThread,
679 user_id: &UserId,
680 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
681 (*self).get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
682 }
683
684 async fn get_event_room_receipt_events(
685 &self,
686 room_id: &RoomId,
687 receipt_type: ReceiptType,
688 thread: ReceiptThread,
689 event_id: &EventId,
690 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
691 (*self).get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
692 }
693
694 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
695 (*self).get_custom_value(key).await
696 }
697
698 async fn set_custom_value(
699 &self,
700 key: &[u8],
701 value: Vec<u8>,
702 ) -> Result<Option<Vec<u8>>, Self::Error> {
703 (*self).set_custom_value(key, value).await
704 }
705
706 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
707 (*self).remove_custom_value(key).await
708 }
709
710 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
711 (*self).remove_room(room_id).await
712 }
713
714 async fn save_send_queue_request(
715 &self,
716 room_id: &RoomId,
717 transaction_id: OwnedTransactionId,
718 created_at: MilliSecondsSinceUnixEpoch,
719 request: QueuedRequestKind,
720 priority: usize,
721 ) -> Result<(), Self::Error> {
722 (*self)
723 .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
724 .await
725 }
726
727 async fn update_send_queue_request(
728 &self,
729 room_id: &RoomId,
730 transaction_id: &TransactionId,
731 content: QueuedRequestKind,
732 ) -> Result<bool, Self::Error> {
733 (*self).update_send_queue_request(room_id, transaction_id, content).await
734 }
735
736 async fn remove_send_queue_request(
737 &self,
738 room_id: &RoomId,
739 transaction_id: &TransactionId,
740 ) -> Result<bool, Self::Error> {
741 (*self).remove_send_queue_request(room_id, transaction_id).await
742 }
743
744 async fn load_send_queue_requests(
745 &self,
746 room_id: &RoomId,
747 ) -> Result<Vec<QueuedRequest>, Self::Error> {
748 (*self).load_send_queue_requests(room_id).await
749 }
750
751 async fn update_send_queue_request_status(
752 &self,
753 room_id: &RoomId,
754 transaction_id: &TransactionId,
755 error: Option<QueueWedgeError>,
756 ) -> Result<(), Self::Error> {
757 (*self).update_send_queue_request_status(room_id, transaction_id, error).await
758 }
759
760 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
761 (*self).load_rooms_with_unsent_requests().await
762 }
763
764 async fn save_dependent_queued_request(
765 &self,
766 room_id: &RoomId,
767 parent_txn_id: &TransactionId,
768 own_txn_id: ChildTransactionId,
769 created_at: MilliSecondsSinceUnixEpoch,
770 content: DependentQueuedRequestKind,
771 ) -> Result<(), Self::Error> {
772 (*self)
773 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
774 .await
775 }
776
777 async fn mark_dependent_queued_requests_as_ready(
778 &self,
779 room_id: &RoomId,
780 parent_txn_id: &TransactionId,
781 sent_parent_key: SentRequestKey,
782 ) -> Result<usize, Self::Error> {
783 (*self)
784 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
785 .await
786 }
787
788 async fn update_dependent_queued_request(
789 &self,
790 room_id: &RoomId,
791 own_transaction_id: &ChildTransactionId,
792 new_content: DependentQueuedRequestKind,
793 ) -> Result<bool, Self::Error> {
794 (*self).update_dependent_queued_request(room_id, own_transaction_id, new_content).await
795 }
796
797 async fn remove_dependent_queued_request(
798 &self,
799 room: &RoomId,
800 own_txn_id: &ChildTransactionId,
801 ) -> Result<bool, Self::Error> {
802 (*self).remove_dependent_queued_request(room, own_txn_id).await
803 }
804
805 async fn load_dependent_queued_requests(
806 &self,
807 room: &RoomId,
808 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
809 (*self).load_dependent_queued_requests(room).await
810 }
811
812 async fn upsert_thread_subscriptions(
813 &self,
814 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
815 ) -> Result<(), Self::Error> {
816 (*self).upsert_thread_subscriptions(updates).await
817 }
818
819 async fn remove_thread_subscription(
820 &self,
821 room: &RoomId,
822 thread_id: &EventId,
823 ) -> Result<(), Self::Error> {
824 (*self).remove_thread_subscription(room, thread_id).await
825 }
826
827 async fn load_thread_subscription(
828 &self,
829 room: &RoomId,
830 thread_id: &EventId,
831 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
832 (*self).load_thread_subscription(room, thread_id).await
833 }
834
835 async fn close(&self) -> Result<(), Self::Error> {
836 (*self).close().await
837 }
838
839 async fn reopen(&self) -> Result<(), Self::Error> {
840 (*self).reopen().await
841 }
842
843 async fn optimize(&self) -> Result<(), Self::Error> {
844 (*self).optimize().await
845 }
846
847 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
848 (*self).get_size().await
849 }
850}
851
852#[cfg_attr(target_family = "wasm", async_trait(?Send))]
853#[cfg_attr(not(target_family = "wasm"), async_trait)]
854impl<T: StateStore + ?Sized> StateStore for Arc<T> {
855 type Error = T::Error;
856
857 async fn get_kv_data(
858 &self,
859 key: StateStoreDataKey<'_>,
860 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
861 self.deref().get_kv_data(key).await
862 }
863
864 async fn set_kv_data(
865 &self,
866 key: StateStoreDataKey<'_>,
867 value: StateStoreDataValue,
868 ) -> Result<(), Self::Error> {
869 self.deref().set_kv_data(key, value).await
870 }
871
872 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
873 self.deref().remove_kv_data(key).await
874 }
875
876 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
877 self.deref().save_changes(changes).await
878 }
879
880 async fn get_presence_event(
881 &self,
882 user_id: &UserId,
883 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
884 self.deref().get_presence_event(user_id).await
885 }
886
887 async fn get_presence_events(
888 &self,
889 user_ids: &[OwnedUserId],
890 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
891 self.deref().get_presence_events(user_ids).await
892 }
893
894 async fn get_state_event(
895 &self,
896 room_id: &RoomId,
897 event_type: StateEventType,
898 state_key: &str,
899 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
900 self.deref().get_state_event(room_id, event_type, state_key).await
901 }
902
903 async fn get_state_events(
904 &self,
905 room_id: &RoomId,
906 event_type: StateEventType,
907 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
908 self.deref().get_state_events(room_id, event_type).await
909 }
910
911 async fn get_state_events_for_keys(
912 &self,
913 room_id: &RoomId,
914 event_type: StateEventType,
915 state_keys: &[&str],
916 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
917 self.deref().get_state_events_for_keys(room_id, event_type, state_keys).await
918 }
919
920 async fn get_profile(
921 &self,
922 room_id: &RoomId,
923 user_id: &UserId,
924 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
925 self.deref().get_profile(room_id, user_id).await
926 }
927
928 async fn get_profiles<'a>(
929 &self,
930 room_id: &RoomId,
931 user_ids: &'a [OwnedUserId],
932 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
933 self.deref().get_profiles(room_id, user_ids).await
934 }
935
936 async fn get_user_ids(
937 &self,
938 room_id: &RoomId,
939 memberships: RoomMemberships,
940 ) -> Result<Vec<OwnedUserId>, Self::Error> {
941 self.deref().get_user_ids(room_id, memberships).await
942 }
943
944 async fn get_room_infos(
945 &self,
946 room_load_settings: &RoomLoadSettings,
947 ) -> Result<Vec<RoomInfo>, Self::Error> {
948 self.deref().get_room_infos(room_load_settings).await
949 }
950
951 async fn get_users_with_display_name(
952 &self,
953 room_id: &RoomId,
954 display_name: &DisplayName,
955 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
956 self.deref().get_users_with_display_name(room_id, display_name).await
957 }
958
959 async fn get_users_with_display_names<'a>(
960 &self,
961 room_id: &RoomId,
962 display_names: &'a [DisplayName],
963 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
964 self.deref().get_users_with_display_names(room_id, display_names).await
965 }
966
967 async fn get_account_data_event(
968 &self,
969 event_type: GlobalAccountDataEventType,
970 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
971 self.deref().get_account_data_event(event_type).await
972 }
973
974 async fn get_room_account_data_event(
975 &self,
976 room_id: &RoomId,
977 event_type: RoomAccountDataEventType,
978 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
979 self.deref().get_room_account_data_event(room_id, event_type).await
980 }
981
982 async fn get_user_room_receipt_event(
983 &self,
984 room_id: &RoomId,
985 receipt_type: ReceiptType,
986 thread: ReceiptThread,
987 user_id: &UserId,
988 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
989 self.deref().get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
990 }
991
992 async fn get_event_room_receipt_events(
993 &self,
994 room_id: &RoomId,
995 receipt_type: ReceiptType,
996 thread: ReceiptThread,
997 event_id: &EventId,
998 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
999 self.deref().get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
1000 }
1001
1002 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1003 self.deref().get_custom_value(key).await
1004 }
1005
1006 async fn set_custom_value(
1007 &self,
1008 key: &[u8],
1009 value: Vec<u8>,
1010 ) -> Result<Option<Vec<u8>>, Self::Error> {
1011 self.deref().set_custom_value(key, value).await
1012 }
1013
1014 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1015 self.deref().remove_custom_value(key).await
1016 }
1017
1018 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1019 self.deref().remove_room(room_id).await
1020 }
1021
1022 async fn save_send_queue_request(
1023 &self,
1024 room_id: &RoomId,
1025 transaction_id: OwnedTransactionId,
1026 created_at: MilliSecondsSinceUnixEpoch,
1027 request: QueuedRequestKind,
1028 priority: usize,
1029 ) -> Result<(), Self::Error> {
1030 self.deref()
1031 .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
1032 .await
1033 }
1034
1035 async fn update_send_queue_request(
1036 &self,
1037 room_id: &RoomId,
1038 transaction_id: &TransactionId,
1039 content: QueuedRequestKind,
1040 ) -> Result<bool, Self::Error> {
1041 self.deref().update_send_queue_request(room_id, transaction_id, content).await
1042 }
1043
1044 async fn remove_send_queue_request(
1045 &self,
1046 room_id: &RoomId,
1047 transaction_id: &TransactionId,
1048 ) -> Result<bool, Self::Error> {
1049 self.deref().remove_send_queue_request(room_id, transaction_id).await
1050 }
1051
1052 async fn load_send_queue_requests(
1053 &self,
1054 room_id: &RoomId,
1055 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1056 self.deref().load_send_queue_requests(room_id).await
1057 }
1058
1059 async fn update_send_queue_request_status(
1060 &self,
1061 room_id: &RoomId,
1062 transaction_id: &TransactionId,
1063 error: Option<QueueWedgeError>,
1064 ) -> Result<(), Self::Error> {
1065 self.deref().update_send_queue_request_status(room_id, transaction_id, error).await
1066 }
1067
1068 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1069 self.deref().load_rooms_with_unsent_requests().await
1070 }
1071
1072 async fn save_dependent_queued_request(
1073 &self,
1074 room_id: &RoomId,
1075 parent_txn_id: &TransactionId,
1076 own_txn_id: ChildTransactionId,
1077 created_at: MilliSecondsSinceUnixEpoch,
1078 content: DependentQueuedRequestKind,
1079 ) -> Result<(), Self::Error> {
1080 self.deref()
1081 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1082 .await
1083 }
1084
1085 async fn mark_dependent_queued_requests_as_ready(
1086 &self,
1087 room_id: &RoomId,
1088 parent_txn_id: &TransactionId,
1089 sent_parent_key: SentRequestKey,
1090 ) -> Result<usize, Self::Error> {
1091 self.deref()
1092 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1093 .await
1094 }
1095
1096 async fn update_dependent_queued_request(
1097 &self,
1098 room_id: &RoomId,
1099 own_transaction_id: &ChildTransactionId,
1100 new_content: DependentQueuedRequestKind,
1101 ) -> Result<bool, Self::Error> {
1102 self.deref().update_dependent_queued_request(room_id, own_transaction_id, new_content).await
1103 }
1104
1105 async fn remove_dependent_queued_request(
1106 &self,
1107 room: &RoomId,
1108 own_txn_id: &ChildTransactionId,
1109 ) -> Result<bool, Self::Error> {
1110 self.deref().remove_dependent_queued_request(room, own_txn_id).await
1111 }
1112
1113 async fn load_dependent_queued_requests(
1114 &self,
1115 room: &RoomId,
1116 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1117 self.deref().load_dependent_queued_requests(room).await
1118 }
1119
1120 async fn upsert_thread_subscriptions(
1121 &self,
1122 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1123 ) -> Result<(), Self::Error> {
1124 self.deref().upsert_thread_subscriptions(updates).await
1125 }
1126
1127 async fn remove_thread_subscription(
1128 &self,
1129 room: &RoomId,
1130 thread_id: &EventId,
1131 ) -> Result<(), Self::Error> {
1132 self.deref().remove_thread_subscription(room, thread_id).await
1133 }
1134
1135 async fn load_thread_subscription(
1136 &self,
1137 room: &RoomId,
1138 thread_id: &EventId,
1139 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1140 self.deref().load_thread_subscription(room, thread_id).await
1141 }
1142
1143 async fn close(&self) -> Result<(), Self::Error> {
1144 self.deref().close().await
1145 }
1146
1147 async fn reopen(&self) -> Result<(), Self::Error> {
1148 self.deref().reopen().await
1149 }
1150
1151 async fn optimize(&self) -> Result<(), Self::Error> {
1152 self.deref().optimize().await
1153 }
1154
1155 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1156 self.deref().get_size().await
1157 }
1158}
1159
1160#[repr(transparent)]
1161struct EraseStateStoreError<T>(T);
1162
1163#[cfg(not(tarpaulin_include))]
1164impl<T: fmt::Debug> fmt::Debug for EraseStateStoreError<T> {
1165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1166 self.0.fmt(f)
1167 }
1168}
1169
1170#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1171#[cfg_attr(not(target_family = "wasm"), async_trait)]
1172impl<T: StateStore> StateStore for EraseStateStoreError<T> {
1173 type Error = StoreError;
1174
1175 async fn get_kv_data(
1176 &self,
1177 key: StateStoreDataKey<'_>,
1178 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
1179 self.0.get_kv_data(key).await.map_err(Into::into)
1180 }
1181
1182 async fn set_kv_data(
1183 &self,
1184 key: StateStoreDataKey<'_>,
1185 value: StateStoreDataValue,
1186 ) -> Result<(), Self::Error> {
1187 self.0.set_kv_data(key, value).await.map_err(Into::into)
1188 }
1189
1190 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
1191 self.0.remove_kv_data(key).await.map_err(Into::into)
1192 }
1193
1194 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
1195 self.0.save_changes(changes).await.map_err(Into::into)
1196 }
1197
1198 async fn get_presence_event(
1199 &self,
1200 user_id: &UserId,
1201 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
1202 self.0.get_presence_event(user_id).await.map_err(Into::into)
1203 }
1204
1205 async fn get_presence_events(
1206 &self,
1207 user_ids: &[OwnedUserId],
1208 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
1209 self.0.get_presence_events(user_ids).await.map_err(Into::into)
1210 }
1211
1212 async fn get_state_event(
1213 &self,
1214 room_id: &RoomId,
1215 event_type: StateEventType,
1216 state_key: &str,
1217 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
1218 self.0.get_state_event(room_id, event_type, state_key).await.map_err(Into::into)
1219 }
1220
1221 async fn get_state_events(
1222 &self,
1223 room_id: &RoomId,
1224 event_type: StateEventType,
1225 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1226 self.0.get_state_events(room_id, event_type).await.map_err(Into::into)
1227 }
1228
1229 async fn get_state_events_for_keys(
1230 &self,
1231 room_id: &RoomId,
1232 event_type: StateEventType,
1233 state_keys: &[&str],
1234 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1235 self.0.get_state_events_for_keys(room_id, event_type, state_keys).await.map_err(Into::into)
1236 }
1237
1238 async fn get_profile(
1239 &self,
1240 room_id: &RoomId,
1241 user_id: &UserId,
1242 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
1243 self.0.get_profile(room_id, user_id).await.map_err(Into::into)
1244 }
1245
1246 async fn get_profiles<'a>(
1247 &self,
1248 room_id: &RoomId,
1249 user_ids: &'a [OwnedUserId],
1250 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
1251 self.0.get_profiles(room_id, user_ids).await.map_err(Into::into)
1252 }
1253
1254 async fn get_user_ids(
1255 &self,
1256 room_id: &RoomId,
1257 memberships: RoomMemberships,
1258 ) -> Result<Vec<OwnedUserId>, Self::Error> {
1259 self.0.get_user_ids(room_id, memberships).await.map_err(Into::into)
1260 }
1261
1262 async fn get_room_infos(
1263 &self,
1264 room_load_settings: &RoomLoadSettings,
1265 ) -> Result<Vec<RoomInfo>, Self::Error> {
1266 self.0.get_room_infos(room_load_settings).await.map_err(Into::into)
1267 }
1268
1269 async fn get_users_with_display_name(
1270 &self,
1271 room_id: &RoomId,
1272 display_name: &DisplayName,
1273 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
1274 self.0.get_users_with_display_name(room_id, display_name).await.map_err(Into::into)
1275 }
1276
1277 async fn get_users_with_display_names<'a>(
1278 &self,
1279 room_id: &RoomId,
1280 display_names: &'a [DisplayName],
1281 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
1282 self.0.get_users_with_display_names(room_id, display_names).await.map_err(Into::into)
1283 }
1284
1285 async fn get_account_data_event(
1286 &self,
1287 event_type: GlobalAccountDataEventType,
1288 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
1289 self.0.get_account_data_event(event_type).await.map_err(Into::into)
1290 }
1291
1292 async fn get_room_account_data_event(
1293 &self,
1294 room_id: &RoomId,
1295 event_type: RoomAccountDataEventType,
1296 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
1297 self.0.get_room_account_data_event(room_id, event_type).await.map_err(Into::into)
1298 }
1299
1300 async fn get_user_room_receipt_event(
1301 &self,
1302 room_id: &RoomId,
1303 receipt_type: ReceiptType,
1304 thread: ReceiptThread,
1305 user_id: &UserId,
1306 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
1307 self.0
1308 .get_user_room_receipt_event(room_id, receipt_type, thread, user_id)
1309 .await
1310 .map_err(Into::into)
1311 }
1312
1313 async fn get_event_room_receipt_events(
1314 &self,
1315 room_id: &RoomId,
1316 receipt_type: ReceiptType,
1317 thread: ReceiptThread,
1318 event_id: &EventId,
1319 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
1320 self.0
1321 .get_event_room_receipt_events(room_id, receipt_type, thread, event_id)
1322 .await
1323 .map_err(Into::into)
1324 }
1325
1326 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1327 self.0.get_custom_value(key).await.map_err(Into::into)
1328 }
1329
1330 async fn set_custom_value(
1331 &self,
1332 key: &[u8],
1333 value: Vec<u8>,
1334 ) -> Result<Option<Vec<u8>>, Self::Error> {
1335 self.0.set_custom_value(key, value).await.map_err(Into::into)
1336 }
1337
1338 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1339 self.0.remove_custom_value(key).await.map_err(Into::into)
1340 }
1341
1342 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1343 self.0.remove_room(room_id).await.map_err(Into::into)
1344 }
1345
1346 async fn save_send_queue_request(
1347 &self,
1348 room_id: &RoomId,
1349 transaction_id: OwnedTransactionId,
1350 created_at: MilliSecondsSinceUnixEpoch,
1351 content: QueuedRequestKind,
1352 priority: usize,
1353 ) -> Result<(), Self::Error> {
1354 self.0
1355 .save_send_queue_request(room_id, transaction_id, created_at, content, priority)
1356 .await
1357 .map_err(Into::into)
1358 }
1359
1360 async fn update_send_queue_request(
1361 &self,
1362 room_id: &RoomId,
1363 transaction_id: &TransactionId,
1364 content: QueuedRequestKind,
1365 ) -> Result<bool, Self::Error> {
1366 self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
1367 }
1368
1369 async fn remove_send_queue_request(
1370 &self,
1371 room_id: &RoomId,
1372 transaction_id: &TransactionId,
1373 ) -> Result<bool, Self::Error> {
1374 self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into)
1375 }
1376
1377 async fn load_send_queue_requests(
1378 &self,
1379 room_id: &RoomId,
1380 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1381 self.0.load_send_queue_requests(room_id).await.map_err(Into::into)
1382 }
1383
1384 async fn update_send_queue_request_status(
1385 &self,
1386 room_id: &RoomId,
1387 transaction_id: &TransactionId,
1388 error: Option<QueueWedgeError>,
1389 ) -> Result<(), Self::Error> {
1390 self.0
1391 .update_send_queue_request_status(room_id, transaction_id, error)
1392 .await
1393 .map_err(Into::into)
1394 }
1395
1396 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1397 self.0.load_rooms_with_unsent_requests().await.map_err(Into::into)
1398 }
1399
1400 async fn save_dependent_queued_request(
1401 &self,
1402 room_id: &RoomId,
1403 parent_txn_id: &TransactionId,
1404 own_txn_id: ChildTransactionId,
1405 created_at: MilliSecondsSinceUnixEpoch,
1406 content: DependentQueuedRequestKind,
1407 ) -> Result<(), Self::Error> {
1408 self.0
1409 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1410 .await
1411 .map_err(Into::into)
1412 }
1413
1414 async fn mark_dependent_queued_requests_as_ready(
1415 &self,
1416 room_id: &RoomId,
1417 parent_txn_id: &TransactionId,
1418 sent_parent_key: SentRequestKey,
1419 ) -> Result<usize, Self::Error> {
1420 self.0
1421 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1422 .await
1423 .map_err(Into::into)
1424 }
1425
1426 async fn remove_dependent_queued_request(
1427 &self,
1428 room_id: &RoomId,
1429 own_txn_id: &ChildTransactionId,
1430 ) -> Result<bool, Self::Error> {
1431 self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into)
1432 }
1433
1434 async fn load_dependent_queued_requests(
1435 &self,
1436 room_id: &RoomId,
1437 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1438 self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
1439 }
1440
1441 async fn update_dependent_queued_request(
1442 &self,
1443 room_id: &RoomId,
1444 own_transaction_id: &ChildTransactionId,
1445 new_content: DependentQueuedRequestKind,
1446 ) -> Result<bool, Self::Error> {
1447 self.0
1448 .update_dependent_queued_request(room_id, own_transaction_id, new_content)
1449 .await
1450 .map_err(Into::into)
1451 }
1452
1453 async fn upsert_thread_subscriptions(
1454 &self,
1455 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1456 ) -> Result<(), Self::Error> {
1457 self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
1458 }
1459
1460 async fn load_thread_subscription(
1461 &self,
1462 room: &RoomId,
1463 thread_id: &EventId,
1464 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1465 self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
1466 }
1467
1468 async fn remove_thread_subscription(
1469 &self,
1470 room: &RoomId,
1471 thread_id: &EventId,
1472 ) -> Result<(), Self::Error> {
1473 self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
1474 }
1475
1476 async fn close(&self) -> Result<(), Self::Error> {
1477 self.0.close().await.map_err(Into::into)
1478 }
1479
1480 async fn reopen(&self) -> Result<(), Self::Error> {
1481 self.0.reopen().await.map_err(Into::into)
1482 }
1483
1484 async fn optimize(&self) -> Result<(), Self::Error> {
1485 self.0.optimize().await.map_err(Into::into)
1486 }
1487
1488 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1489 self.0.get_size().await.map_err(Into::into)
1490 }
1491}
1492
1493#[derive(Debug, Clone)]
1496pub struct SaveLockedStateStore<T = Arc<DynStateStore>> {
1497 store: T,
1498 lock: Arc<Mutex<()>>,
1499}
1500
1501#[derive(Debug, Error)]
1505#[error("a mutex guard was provided, but it does not reference the correct mutex")]
1506pub struct IncorrectMutexGuardError;
1507
1508impl From<IncorrectMutexGuardError> for StoreError {
1509 fn from(value: IncorrectMutexGuardError) -> Self {
1510 Self::backend(value)
1511 }
1512}
1513
1514impl<T> SaveLockedStateStore<T> {
1515 pub fn new(store: T) -> Self {
1517 Self { store, lock: Arc::new(Mutex::new(())) }
1518 }
1519
1520 pub fn lock(&self) -> &Mutex<()> {
1523 self.lock.as_ref()
1524 }
1525}
1526
1527impl<T: StateStore> SaveLockedStateStore<T> {
1528 pub async fn save_changes_with_guard(
1533 &self,
1534 guard: &MutexGuard<'_, ()>,
1535 changes: &StateChanges,
1536 ) -> Result<(), StoreError> {
1537 if !std::ptr::eq(MutexGuard::mutex(guard), self.lock()) {
1538 Err(IncorrectMutexGuardError.into())
1539 } else {
1540 self.store.save_changes(changes).await.map_err(Into::into)
1541 }
1542 }
1543}
1544
1545#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1546#[cfg_attr(not(target_family = "wasm"), async_trait)]
1547impl<T: StateStore> StateStore for SaveLockedStateStore<T> {
1548 type Error = T::Error;
1549
1550 async fn get_kv_data(
1551 &self,
1552 key: StateStoreDataKey<'_>,
1553 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
1554 self.store.get_kv_data(key).await
1555 }
1556
1557 async fn set_kv_data(
1558 &self,
1559 key: StateStoreDataKey<'_>,
1560 value: StateStoreDataValue,
1561 ) -> Result<(), Self::Error> {
1562 self.store.set_kv_data(key, value).await
1563 }
1564
1565 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
1566 self.store.remove_kv_data(key).await
1567 }
1568
1569 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
1570 let _guard = self.lock.lock().await;
1571 self.store.save_changes(changes).await
1572 }
1573
1574 async fn get_presence_event(
1575 &self,
1576 user_id: &UserId,
1577 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
1578 self.store.get_presence_event(user_id).await
1579 }
1580
1581 async fn get_presence_events(
1582 &self,
1583 user_ids: &[OwnedUserId],
1584 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
1585 self.store.get_presence_events(user_ids).await
1586 }
1587
1588 async fn get_state_event(
1589 &self,
1590 room_id: &RoomId,
1591 event_type: StateEventType,
1592 state_key: &str,
1593 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
1594 self.store.get_state_event(room_id, event_type, state_key).await
1595 }
1596
1597 async fn get_state_events(
1598 &self,
1599 room_id: &RoomId,
1600 event_type: StateEventType,
1601 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1602 self.store.get_state_events(room_id, event_type).await
1603 }
1604
1605 async fn get_state_events_for_keys(
1606 &self,
1607 room_id: &RoomId,
1608 event_type: StateEventType,
1609 state_keys: &[&str],
1610 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1611 self.store.get_state_events_for_keys(room_id, event_type, state_keys).await
1612 }
1613
1614 async fn get_profile(
1615 &self,
1616 room_id: &RoomId,
1617 user_id: &UserId,
1618 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
1619 self.store.get_profile(room_id, user_id).await
1620 }
1621
1622 async fn get_profiles<'a>(
1623 &self,
1624 room_id: &RoomId,
1625 user_ids: &'a [OwnedUserId],
1626 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
1627 self.store.get_profiles(room_id, user_ids).await
1628 }
1629
1630 async fn get_user_ids(
1631 &self,
1632 room_id: &RoomId,
1633 memberships: RoomMemberships,
1634 ) -> Result<Vec<OwnedUserId>, Self::Error> {
1635 self.store.get_user_ids(room_id, memberships).await
1636 }
1637
1638 async fn get_room_infos(
1639 &self,
1640 room_load_settings: &RoomLoadSettings,
1641 ) -> Result<Vec<RoomInfo>, Self::Error> {
1642 self.store.get_room_infos(room_load_settings).await
1643 }
1644
1645 async fn get_users_with_display_name(
1646 &self,
1647 room_id: &RoomId,
1648 display_name: &DisplayName,
1649 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
1650 self.store.get_users_with_display_name(room_id, display_name).await
1651 }
1652
1653 async fn get_users_with_display_names<'a>(
1654 &self,
1655 room_id: &RoomId,
1656 display_names: &'a [DisplayName],
1657 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
1658 self.store.get_users_with_display_names(room_id, display_names).await
1659 }
1660
1661 async fn get_account_data_event(
1662 &self,
1663 event_type: GlobalAccountDataEventType,
1664 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
1665 self.store.get_account_data_event(event_type).await
1666 }
1667
1668 async fn get_room_account_data_event(
1669 &self,
1670 room_id: &RoomId,
1671 event_type: RoomAccountDataEventType,
1672 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
1673 self.store.get_room_account_data_event(room_id, event_type).await
1674 }
1675
1676 async fn get_user_room_receipt_event(
1677 &self,
1678 room_id: &RoomId,
1679 receipt_type: ReceiptType,
1680 thread: ReceiptThread,
1681 user_id: &UserId,
1682 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
1683 self.store.get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
1684 }
1685
1686 async fn get_event_room_receipt_events(
1687 &self,
1688 room_id: &RoomId,
1689 receipt_type: ReceiptType,
1690 thread: ReceiptThread,
1691 event_id: &EventId,
1692 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
1693 self.store.get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
1694 }
1695
1696 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1697 self.store.get_custom_value(key).await
1698 }
1699
1700 async fn set_custom_value(
1701 &self,
1702 key: &[u8],
1703 value: Vec<u8>,
1704 ) -> Result<Option<Vec<u8>>, Self::Error> {
1705 self.store.set_custom_value(key, value).await
1706 }
1707
1708 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1709 self.store.remove_custom_value(key).await
1710 }
1711
1712 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1713 self.store.remove_room(room_id).await
1714 }
1715
1716 async fn save_send_queue_request(
1717 &self,
1718 room_id: &RoomId,
1719 transaction_id: OwnedTransactionId,
1720 created_at: MilliSecondsSinceUnixEpoch,
1721 request: QueuedRequestKind,
1722 priority: usize,
1723 ) -> Result<(), Self::Error> {
1724 self.store
1725 .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
1726 .await
1727 }
1728
1729 async fn update_send_queue_request(
1730 &self,
1731 room_id: &RoomId,
1732 transaction_id: &TransactionId,
1733 content: QueuedRequestKind,
1734 ) -> Result<bool, Self::Error> {
1735 self.store.update_send_queue_request(room_id, transaction_id, content).await
1736 }
1737
1738 async fn remove_send_queue_request(
1739 &self,
1740 room_id: &RoomId,
1741 transaction_id: &TransactionId,
1742 ) -> Result<bool, Self::Error> {
1743 self.store.remove_send_queue_request(room_id, transaction_id).await
1744 }
1745
1746 async fn load_send_queue_requests(
1747 &self,
1748 room_id: &RoomId,
1749 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1750 self.store.load_send_queue_requests(room_id).await
1751 }
1752
1753 async fn update_send_queue_request_status(
1754 &self,
1755 room_id: &RoomId,
1756 transaction_id: &TransactionId,
1757 error: Option<QueueWedgeError>,
1758 ) -> Result<(), Self::Error> {
1759 self.store.update_send_queue_request_status(room_id, transaction_id, error).await
1760 }
1761
1762 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1763 self.store.load_rooms_with_unsent_requests().await
1764 }
1765
1766 async fn save_dependent_queued_request(
1767 &self,
1768 room_id: &RoomId,
1769 parent_txn_id: &TransactionId,
1770 own_txn_id: ChildTransactionId,
1771 created_at: MilliSecondsSinceUnixEpoch,
1772 content: DependentQueuedRequestKind,
1773 ) -> Result<(), Self::Error> {
1774 self.store
1775 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1776 .await
1777 }
1778
1779 async fn mark_dependent_queued_requests_as_ready(
1780 &self,
1781 room_id: &RoomId,
1782 parent_txn_id: &TransactionId,
1783 sent_parent_key: SentRequestKey,
1784 ) -> Result<usize, Self::Error> {
1785 self.store
1786 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1787 .await
1788 }
1789
1790 async fn update_dependent_queued_request(
1791 &self,
1792 room_id: &RoomId,
1793 own_transaction_id: &ChildTransactionId,
1794 new_content: DependentQueuedRequestKind,
1795 ) -> Result<bool, Self::Error> {
1796 self.store.update_dependent_queued_request(room_id, own_transaction_id, new_content).await
1797 }
1798
1799 async fn remove_dependent_queued_request(
1800 &self,
1801 room: &RoomId,
1802 own_txn_id: &ChildTransactionId,
1803 ) -> Result<bool, Self::Error> {
1804 self.store.remove_dependent_queued_request(room, own_txn_id).await
1805 }
1806
1807 async fn load_dependent_queued_requests(
1808 &self,
1809 room: &RoomId,
1810 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1811 self.store.load_dependent_queued_requests(room).await
1812 }
1813
1814 async fn upsert_thread_subscriptions(
1815 &self,
1816 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1817 ) -> Result<(), Self::Error> {
1818 self.store.upsert_thread_subscriptions(updates).await
1819 }
1820
1821 async fn load_thread_subscription(
1822 &self,
1823 room: &RoomId,
1824 thread_id: &EventId,
1825 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1826 self.store.load_thread_subscription(room, thread_id).await
1827 }
1828
1829 async fn remove_thread_subscription(
1830 &self,
1831 room: &RoomId,
1832 thread_id: &EventId,
1833 ) -> Result<(), Self::Error> {
1834 self.store.remove_thread_subscription(room, thread_id).await
1835 }
1836
1837 async fn close(&self) -> Result<(), Self::Error> {
1838 self.store.close().await
1839 }
1840
1841 async fn reopen(&self) -> Result<(), Self::Error> {
1842 self.store.reopen().await
1843 }
1844
1845 async fn optimize(&self) -> Result<(), Self::Error> {
1846 self.store.optimize().await
1847 }
1848
1849 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1850 self.store.get_size().await
1851 }
1852}
1853
1854#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1856#[cfg_attr(not(target_family = "wasm"), async_trait)]
1857pub trait StateStoreExt: StateStore {
1858 async fn get_state_event_static<C>(
1864 &self,
1865 room_id: &RoomId,
1866 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
1867 where
1868 C: StaticEventContent<IsPrefix = ruma::events::False>
1869 + StaticStateEventContent<StateKey = EmptyStateKey>
1870 + RedactContent,
1871 C::Redacted: RedactedStateEventContent,
1872 {
1873 Ok(self.get_state_event(room_id, C::TYPE.into(), "").await?.map(|raw| raw.cast()))
1874 }
1875
1876 async fn get_state_event_static_for_key<C, K>(
1882 &self,
1883 room_id: &RoomId,
1884 state_key: &K,
1885 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
1886 where
1887 C: StaticEventContent<IsPrefix = ruma::events::False>
1888 + StaticStateEventContent
1889 + RedactContent,
1890 C::StateKey: Borrow<K>,
1891 C::Redacted: RedactedStateEventContent,
1892 K: AsRef<str> + ?Sized + Sync,
1893 {
1894 Ok(self
1895 .get_state_event(room_id, C::TYPE.into(), state_key.as_ref())
1896 .await?
1897 .map(|raw| raw.cast()))
1898 }
1899
1900 async fn get_state_events_static<C>(
1906 &self,
1907 room_id: &RoomId,
1908 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
1909 where
1910 C: StaticEventContent<IsPrefix = ruma::events::False>
1911 + StaticStateEventContent
1912 + RedactContent,
1913 C::Redacted: RedactedStateEventContent,
1914 {
1915 Ok(self
1917 .get_state_events(room_id, C::TYPE.into())
1918 .await?
1919 .into_iter()
1920 .map(|raw| raw.cast())
1921 .collect())
1922 }
1923
1924 async fn get_state_events_for_keys_static<'a, C, K, I>(
1933 &self,
1934 room_id: &RoomId,
1935 state_keys: I,
1936 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
1937 where
1938 C: StaticEventContent<IsPrefix = ruma::events::False>
1939 + StaticStateEventContent
1940 + RedactContent,
1941 C::StateKey: Borrow<K>,
1942 C::Redacted: RedactedStateEventContent,
1943 K: AsRef<str> + Sized + Sync + 'a,
1944 I: IntoIterator<Item = &'a K> + Send,
1945 I::IntoIter: Send,
1946 {
1947 Ok(self
1948 .get_state_events_for_keys(
1949 room_id,
1950 C::TYPE.into(),
1951 &state_keys.into_iter().map(|k| k.as_ref()).collect::<Vec<_>>(),
1952 )
1953 .await?
1954 .into_iter()
1955 .map(|raw| raw.cast())
1956 .collect())
1957 }
1958
1959 async fn get_account_data_event_static<C>(
1961 &self,
1962 ) -> Result<Option<Raw<GlobalAccountDataEvent<C>>>, Self::Error>
1963 where
1964 C: StaticEventContent<IsPrefix = ruma::events::False> + GlobalAccountDataEventContent,
1965 {
1966 Ok(self.get_account_data_event(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1967 }
1968
1969 async fn get_room_account_data_event_static<C>(
1977 &self,
1978 room_id: &RoomId,
1979 ) -> Result<Option<Raw<RoomAccountDataEvent<C>>>, Self::Error>
1980 where
1981 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1982 {
1983 Ok(self
1984 .get_room_account_data_event(room_id, C::TYPE.into())
1985 .await?
1986 .map(Raw::cast_unchecked))
1987 }
1988
1989 async fn get_member_event(
1997 &self,
1998 room_id: &RoomId,
1999 state_key: &UserId,
2000 ) -> Result<Option<RawMemberEvent>, Self::Error> {
2001 self.get_state_event_static_for_key(room_id, state_key).await
2002 }
2003}
2004
2005#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2006#[cfg_attr(not(target_family = "wasm"), async_trait)]
2007impl<T: StateStore + ?Sized> StateStoreExt for T {}
2008
2009pub type DynStateStore = dyn StateStore<Error = StoreError>;
2011
2012pub trait IntoStateStore {
2018 #[doc(hidden)]
2019 fn into_state_store(self) -> Arc<DynStateStore>;
2020}
2021
2022impl<T> IntoStateStore for T
2023where
2024 T: StateStore + Sized + 'static,
2025{
2026 fn into_state_store(self) -> Arc<DynStateStore> {
2027 Arc::new(EraseStateStoreError(self))
2028 }
2029}
2030
2031#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2033pub struct SupportedVersionsResponse {
2034 pub versions: Vec<String>,
2036
2037 pub unstable_features: BTreeMap<String, bool>,
2039}
2040
2041impl SupportedVersionsResponse {
2042 pub fn supported_versions(&self) -> SupportedVersions {
2048 let mut supported_versions =
2049 SupportedVersions::from_parts(&self.versions, &self.unstable_features);
2050
2051 if supported_versions.versions.is_empty() {
2054 supported_versions.versions.insert(MatrixVersion::V1_0);
2055 }
2056
2057 supported_versions
2058 }
2059}
2060
2061#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2062pub struct WellKnownResponse {
2064 pub homeserver: HomeserverInfo,
2066
2067 pub identity_server: Option<IdentityServerInfo>,
2069
2070 pub tile_server: Option<TileServerInfo>,
2072
2073 pub rtc_foci: Vec<RtcTransport>,
2075}
2076
2077impl From<discover_homeserver::Response> for WellKnownResponse {
2078 fn from(response: discover_homeserver::Response) -> Self {
2079 Self {
2080 homeserver: response.homeserver,
2081 identity_server: response.identity_server,
2082 tile_server: response.tile_server,
2083 rtc_foci: response.rtc_foci,
2084 }
2085 }
2086}
2087
2088#[derive(Debug, Clone)]
2090pub enum StateStoreDataValue {
2091 SyncToken(String),
2093
2094 SupportedVersions(TtlValue<SupportedVersionsResponse>),
2096
2097 WellKnown(TtlValue<Option<WellKnownResponse>>),
2099
2100 Filter(String),
2102
2103 UserAvatarUrl(OwnedMxcUri),
2105
2106 RecentlyVisitedRooms(Vec<OwnedRoomId>),
2108
2109 UtdHookManagerData(GrowableBloom),
2112
2113 OneTimeKeyAlreadyUploaded,
2116
2117 ComposerDraft(ComposerDraft),
2122
2123 SeenKnockRequests(BTreeMap<OwnedEventId, OwnedUserId>),
2125
2126 ThreadSubscriptionsCatchupTokens(Vec<ThreadSubscriptionCatchupToken>),
2131
2132 HomeserverCapabilities(TtlValue<Capabilities>),
2134}
2135
2136#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2146pub struct ThreadSubscriptionCatchupToken {
2147 pub from: String,
2153
2154 pub to: Option<String>,
2160}
2161
2162#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2164pub struct ComposerDraft {
2165 pub plain_text: String,
2167 pub html_text: Option<String>,
2170 pub draft_type: ComposerDraftType,
2172 #[serde(default)]
2174 pub attachments: Vec<DraftAttachment>,
2175}
2176
2177#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2179pub struct DraftAttachment {
2180 pub filename: String,
2182 pub content: DraftAttachmentContent,
2184}
2185
2186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2188#[serde(tag = "type")]
2189pub enum DraftAttachmentContent {
2190 Image {
2192 data: Vec<u8>,
2194 mimetype: Option<String>,
2196 size: Option<u64>,
2198 width: Option<u64>,
2200 height: Option<u64>,
2202 blurhash: Option<String>,
2204 thumbnail: Option<DraftThumbnail>,
2206 },
2207 Video {
2209 data: Vec<u8>,
2211 mimetype: Option<String>,
2213 size: Option<u64>,
2215 width: Option<u64>,
2217 height: Option<u64>,
2219 duration: Option<std::time::Duration>,
2221 blurhash: Option<String>,
2223 thumbnail: Option<DraftThumbnail>,
2225 },
2226 Audio {
2228 data: Vec<u8>,
2230 mimetype: Option<String>,
2232 size: Option<u64>,
2234 duration: Option<std::time::Duration>,
2236 },
2237 File {
2239 data: Vec<u8>,
2241 mimetype: Option<String>,
2243 size: Option<u64>,
2245 },
2246}
2247
2248#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2250pub struct DraftThumbnail {
2251 pub filename: String,
2253 pub data: Vec<u8>,
2255 pub mimetype: Option<String>,
2257 pub width: Option<u64>,
2259 pub height: Option<u64>,
2261 pub size: Option<u64>,
2263}
2264
2265#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2267pub enum ComposerDraftType {
2268 NewMessage,
2270 Reply {
2272 event_id: OwnedEventId,
2274 },
2275 Edit {
2277 event_id: OwnedEventId,
2279 },
2280}
2281
2282impl StateStoreDataValue {
2283 pub fn into_sync_token(self) -> Option<String> {
2285 as_variant!(self, Self::SyncToken)
2286 }
2287
2288 pub fn into_filter(self) -> Option<String> {
2290 as_variant!(self, Self::Filter)
2291 }
2292
2293 pub fn into_user_avatar_url(self) -> Option<OwnedMxcUri> {
2295 as_variant!(self, Self::UserAvatarUrl)
2296 }
2297
2298 pub fn into_recently_visited_rooms(self) -> Option<Vec<OwnedRoomId>> {
2300 as_variant!(self, Self::RecentlyVisitedRooms)
2301 }
2302
2303 pub fn into_utd_hook_manager_data(self) -> Option<GrowableBloom> {
2305 as_variant!(self, Self::UtdHookManagerData)
2306 }
2307
2308 pub fn into_composer_draft(self) -> Option<ComposerDraft> {
2310 as_variant!(self, Self::ComposerDraft)
2311 }
2312
2313 pub fn into_supported_versions(self) -> Option<TtlValue<SupportedVersionsResponse>> {
2315 as_variant!(self, Self::SupportedVersions)
2316 }
2317
2318 pub fn into_well_known(self) -> Option<TtlValue<Option<WellKnownResponse>>> {
2320 as_variant!(self, Self::WellKnown)
2321 }
2322
2323 pub fn into_seen_knock_requests(self) -> Option<BTreeMap<OwnedEventId, OwnedUserId>> {
2325 as_variant!(self, Self::SeenKnockRequests)
2326 }
2327
2328 pub fn into_thread_subscriptions_catchup_tokens(
2331 self,
2332 ) -> Option<Vec<ThreadSubscriptionCatchupToken>> {
2333 as_variant!(self, Self::ThreadSubscriptionsCatchupTokens)
2334 }
2335
2336 pub fn into_homeserver_capabilities(self) -> Option<TtlValue<Capabilities>> {
2339 as_variant!(self, Self::HomeserverCapabilities)
2340 }
2341}
2342
2343#[derive(Debug, Clone, Copy)]
2345pub enum StateStoreDataKey<'a> {
2346 SyncToken,
2348
2349 SupportedVersions,
2351
2352 WellKnown,
2354
2355 Filter(&'a str),
2357
2358 UserAvatarUrl(&'a UserId),
2360
2361 RecentlyVisitedRooms(&'a UserId),
2363
2364 UtdHookManagerData,
2367
2368 OneTimeKeyAlreadyUploaded,
2371
2372 ComposerDraft(&'a RoomId, Option<&'a EventId>),
2377
2378 SeenKnockRequests(&'a RoomId),
2380
2381 ThreadSubscriptionsCatchupTokens,
2383
2384 HomeserverCapabilities,
2386}
2387
2388impl StateStoreDataKey<'_> {
2389 pub const SYNC_TOKEN: &'static str = "sync_token";
2391
2392 pub const SUPPORTED_VERSIONS: &'static str = "server_capabilities"; pub const WELL_KNOWN: &'static str = "well_known";
2399
2400 pub const FILTER: &'static str = "filter";
2402
2403 pub const USER_AVATAR_URL: &'static str = "user_avatar_url";
2406
2407 pub const RECENTLY_VISITED_ROOMS: &'static str = "recently_visited_rooms";
2410
2411 pub const UTD_HOOK_MANAGER_DATA: &'static str = "utd_hook_manager_data";
2414
2415 pub const ONE_TIME_KEY_ALREADY_UPLOADED: &'static str = "one_time_key_already_uploaded";
2418
2419 pub const COMPOSER_DRAFT: &'static str = "composer_draft";
2422
2423 pub const SEEN_KNOCK_REQUESTS: &'static str = "seen_knock_requests";
2426
2427 pub const THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS: &'static str =
2430 "thread_subscriptions_catchup_tokens";
2431
2432 pub const HOMESERVER_CAPABILITIES: &'static str = "homeserver_capabilities";
2434}
2435
2436pub fn compare_thread_subscription_bump_stamps(
2445 previous: Option<u64>,
2446 new: &mut Option<u64>,
2447) -> bool {
2448 match (previous, &new) {
2449 (Some(prev_bump), None) => {
2452 *new = Some(prev_bump);
2453 }
2454
2455 (Some(prev_bump), Some(new_bump)) if *new_bump <= prev_bump => {
2457 return false;
2458 }
2459
2460 _ => {}
2462 }
2463
2464 true
2465}
2466
2467#[cfg(test)]
2468mod tests {
2469 mod save_locked_state_store {
2470 use std::time::Duration;
2471
2472 use assert_matches::assert_matches;
2473 use futures_util::future::{self, Either};
2474 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
2475 use gloo_timers::future::sleep;
2476 use matrix_sdk_common::executor::spawn;
2477 use matrix_sdk_test::async_test;
2478 use tokio::sync::Mutex;
2479 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
2480 use tokio::time::sleep;
2481
2482 use crate::{
2483 StateChanges, StateStore,
2484 store::{IntoStateStore, MemoryStore, Result, SaveLockedStateStore},
2485 };
2486
2487 async fn get_store() -> Result<impl StateStore> {
2488 Ok(SaveLockedStateStore::new(MemoryStore::new()))
2489 }
2490
2491 statestore_integration_tests!();
2492
2493 #[async_test]
2494 async fn test_state_store_only_accepts_guard_for_underlying_mutex() {
2495 let state_store = SaveLockedStateStore::new(MemoryStore::new());
2496 let state_changes = StateChanges::default();
2497 state_store
2498 .save_changes_with_guard(&state_store.lock().lock().await, &state_changes)
2499 .await
2500 .expect("state store accepts guard for underlying mutex");
2501
2502 let mutex = Mutex::new(());
2503 state_store
2504 .save_changes_with_guard(&mutex.lock().await, &state_changes)
2505 .await
2506 .expect_err("state store does not accept guard for unknown mutex");
2507 }
2508
2509 #[derive(Debug)]
2510 struct Elapsed;
2511
2512 async fn timeout<F: Future + Unpin>(
2513 duration: Duration,
2514 f: F,
2515 ) -> Result<F::Output, Elapsed> {
2516 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
2517 {
2518 match future::select(sleep(duration), f).await {
2519 Either::Left(_) => return Err(Elapsed),
2520 Either::Right((output, _)) => Ok(output),
2521 }
2522 }
2523 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
2524 {
2525 tokio::time::timeout(duration, f).await.map_err(|_| Elapsed)
2526 }
2527 }
2528
2529 #[async_test]
2530 async fn test_state_store_waits_to_acquire_lock_before_saving_changes() {
2531 let state_store = SaveLockedStateStore::new(MemoryStore::new().into_state_store());
2532
2533 let lock_task = spawn({
2535 let state_store = state_store.clone();
2536 async move {
2537 let lock = state_store.lock();
2538 let _guard = lock.lock().await;
2539 sleep(Duration::from_secs(5)).await;
2540 }
2541 });
2542
2543 let save_task =
2545 spawn(async move { state_store.save_changes(&StateChanges::default()).await });
2546
2547 assert_matches!(future::select(lock_task, save_task).await, Either::Left((_, save_task)) => {
2550 timeout(Duration::from_millis(100), save_task)
2551 .await
2552 .expect("task completes before timeout")
2553 .expect("task completes successfully")
2554 .expect("task saves changes");
2555 });
2556 }
2557 }
2558}