1use std::{
16 borrow::Borrow,
17 collections::{BTreeMap, BTreeSet, HashMap},
18 fmt,
19 ops::Deref,
20 sync::Arc,
21};
22
23use as_variant::as_variant;
24use async_trait::async_trait;
25use growable_bloom_filter::GrowableBloom;
26use matrix_sdk_common::{AsyncTraitDeps, ttl::TtlValue};
27use ruma::{
28 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
29 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
30 api::{
31 MatrixVersion, SupportedVersions,
32 client::discovery::{
33 discover_homeserver::{
34 self, HomeserverInfo, IdentityServerInfo, RtcFocusInfo, TileServerInfo,
35 },
36 get_capabilities::v3::Capabilities,
37 },
38 },
39 events::{
40 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, EmptyStateKey, GlobalAccountDataEvent,
41 GlobalAccountDataEventContent, GlobalAccountDataEventType, RedactContent,
42 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
43 RoomAccountDataEventType, StateEventType, StaticEventContent, StaticStateEventContent,
44 presence::PresenceEvent,
45 receipt::{Receipt, ReceiptThread, ReceiptType},
46 },
47 serde::Raw,
48};
49use serde::{Deserialize, Serialize};
50use thiserror::Error;
51use tokio::sync::{Mutex, MutexGuard};
52
53use super::{
54 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
55 QueuedRequest, QueuedRequestKind, RoomLoadSettings, StateChanges, StoreError,
56 send_queue::SentRequestKey,
57};
58use crate::{
59 MinimalRoomMemberEvent, RoomInfo, RoomMemberships,
60 deserialized_responses::{
61 DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
62 },
63 store::StoredThreadSubscription,
64};
65
66#[cfg_attr(target_family = "wasm", async_trait(?Send))]
69#[cfg_attr(not(target_family = "wasm"), async_trait)]
70pub trait StateStore: AsyncTraitDeps {
71 type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error>;
73
74 async fn get_kv_data(
80 &self,
81 key: StateStoreDataKey<'_>,
82 ) -> Result<Option<StateStoreDataValue>, Self::Error>;
83
84 async fn set_kv_data(
94 &self,
95 key: StateStoreDataKey<'_>,
96 value: StateStoreDataValue,
97 ) -> Result<(), Self::Error>;
98
99 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error>;
105
106 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error>;
108
109 async fn get_presence_event(
116 &self,
117 user_id: &UserId,
118 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;
119
120 async fn get_presence_events(
126 &self,
127 user_ids: &[OwnedUserId],
128 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;
129
130 async fn get_state_event(
138 &self,
139 room_id: &RoomId,
140 event_type: StateEventType,
141 state_key: &str,
142 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error>;
143
144 async fn get_state_events(
152 &self,
153 room_id: &RoomId,
154 event_type: StateEventType,
155 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
156
157 async fn get_state_events_for_keys(
168 &self,
169 room_id: &RoomId,
170 event_type: StateEventType,
171 state_keys: &[&str],
172 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
173
174 async fn get_profile(
182 &self,
183 room_id: &RoomId,
184 user_id: &UserId,
185 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error>;
186
187 async fn get_profiles<'a>(
195 &self,
196 room_id: &RoomId,
197 user_ids: &'a [OwnedUserId],
198 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error>;
199
200 async fn get_user_ids(
203 &self,
204 room_id: &RoomId,
205 memberships: RoomMemberships,
206 ) -> Result<Vec<OwnedUserId>, Self::Error>;
207
208 async fn get_room_infos(
210 &self,
211 room_load_settings: &RoomLoadSettings,
212 ) -> Result<Vec<RoomInfo>, Self::Error>;
213
214 async fn get_users_with_display_name(
223 &self,
224 room_id: &RoomId,
225 display_name: &DisplayName,
226 ) -> Result<BTreeSet<OwnedUserId>, Self::Error>;
227
228 async fn get_users_with_display_names<'a>(
236 &self,
237 room_id: &RoomId,
238 display_names: &'a [DisplayName],
239 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error>;
240
241 async fn get_account_data_event(
247 &self,
248 event_type: GlobalAccountDataEventType,
249 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error>;
250
251 async fn get_room_account_data_event(
261 &self,
262 room_id: &RoomId,
263 event_type: RoomAccountDataEventType,
264 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error>;
265
266 async fn get_user_room_receipt_event(
279 &self,
280 room_id: &RoomId,
281 receipt_type: ReceiptType,
282 thread: ReceiptThread,
283 user_id: &UserId,
284 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error>;
285
286 async fn get_event_room_receipt_events(
300 &self,
301 room_id: &RoomId,
302 receipt_type: ReceiptType,
303 thread: ReceiptThread,
304 event_id: &EventId,
305 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error>;
306
307 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
313
314 async fn set_custom_value(
323 &self,
324 key: &[u8],
325 value: Vec<u8>,
326 ) -> Result<Option<Vec<u8>>, Self::Error>;
327
328 async fn set_custom_value_no_read(
342 &self,
343 key: &[u8],
344 value: Vec<u8>,
345 ) -> Result<(), Self::Error> {
346 self.set_custom_value(key, value).await.map(|_| ())
347 }
348
349 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
355
356 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>;
362
363 async fn save_send_queue_request(
373 &self,
374 room_id: &RoomId,
375 transaction_id: OwnedTransactionId,
376 created_at: MilliSecondsSinceUnixEpoch,
377 request: QueuedRequestKind,
378 priority: usize,
379 ) -> Result<(), Self::Error>;
380
381 async fn update_send_queue_request(
393 &self,
394 room_id: &RoomId,
395 transaction_id: &TransactionId,
396 content: QueuedRequestKind,
397 ) -> Result<bool, Self::Error>;
398
399 async fn remove_send_queue_request(
405 &self,
406 room_id: &RoomId,
407 transaction_id: &TransactionId,
408 ) -> Result<bool, Self::Error>;
409
410 async fn load_send_queue_requests(
416 &self,
417 room_id: &RoomId,
418 ) -> Result<Vec<QueuedRequest>, Self::Error>;
419
420 async fn update_send_queue_request_status(
423 &self,
424 room_id: &RoomId,
425 transaction_id: &TransactionId,
426 error: Option<QueueWedgeError>,
427 ) -> Result<(), Self::Error>;
428
429 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
431
432 async fn save_dependent_queued_request(
435 &self,
436 room_id: &RoomId,
437 parent_txn_id: &TransactionId,
438 own_txn_id: ChildTransactionId,
439 created_at: MilliSecondsSinceUnixEpoch,
440 content: DependentQueuedRequestKind,
441 ) -> Result<(), Self::Error>;
442
443 async fn mark_dependent_queued_requests_as_ready(
452 &self,
453 room_id: &RoomId,
454 parent_txn_id: &TransactionId,
455 sent_parent_key: SentRequestKey,
456 ) -> Result<usize, Self::Error>;
457
458 async fn update_dependent_queued_request(
462 &self,
463 room_id: &RoomId,
464 own_transaction_id: &ChildTransactionId,
465 new_content: DependentQueuedRequestKind,
466 ) -> Result<bool, Self::Error>;
467
468 async fn remove_dependent_queued_request(
473 &self,
474 room: &RoomId,
475 own_txn_id: &ChildTransactionId,
476 ) -> Result<bool, Self::Error>;
477
478 async fn load_dependent_queued_requests(
484 &self,
485 room: &RoomId,
486 ) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
487
488 async fn upsert_thread_subscriptions(
498 &self,
499 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
500 ) -> Result<(), Self::Error>;
501
502 async fn remove_thread_subscription(
506 &self,
507 room: &RoomId,
508 thread_id: &EventId,
509 ) -> Result<(), Self::Error>;
510
511 async fn load_thread_subscription(
515 &self,
516 room: &RoomId,
517 thread_id: &EventId,
518 ) -> Result<Option<StoredThreadSubscription>, Self::Error>;
519
520 #[doc(hidden)]
526 async fn optimize(&self) -> Result<(), Self::Error>;
527
528 async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
530}
531
532#[cfg_attr(target_family = "wasm", async_trait(?Send))]
533#[cfg_attr(not(target_family = "wasm"), async_trait)]
534impl<T: StateStore> StateStore for &T {
535 type Error = T::Error;
536
537 async fn get_kv_data(
538 &self,
539 key: StateStoreDataKey<'_>,
540 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
541 (*self).get_kv_data(key).await
542 }
543
544 async fn set_kv_data(
545 &self,
546 key: StateStoreDataKey<'_>,
547 value: StateStoreDataValue,
548 ) -> Result<(), Self::Error> {
549 (*self).set_kv_data(key, value).await
550 }
551
552 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
553 (*self).remove_kv_data(key).await
554 }
555
556 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
557 (*self).save_changes(changes).await
558 }
559
560 async fn get_presence_event(
561 &self,
562 user_id: &UserId,
563 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
564 (*self).get_presence_event(user_id).await
565 }
566
567 async fn get_presence_events(
568 &self,
569 user_ids: &[OwnedUserId],
570 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
571 (*self).get_presence_events(user_ids).await
572 }
573
574 async fn get_state_event(
575 &self,
576 room_id: &RoomId,
577 event_type: StateEventType,
578 state_key: &str,
579 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
580 (*self).get_state_event(room_id, event_type, state_key).await
581 }
582
583 async fn get_state_events(
584 &self,
585 room_id: &RoomId,
586 event_type: StateEventType,
587 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
588 (*self).get_state_events(room_id, event_type).await
589 }
590
591 async fn get_state_events_for_keys(
592 &self,
593 room_id: &RoomId,
594 event_type: StateEventType,
595 state_keys: &[&str],
596 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
597 (*self).get_state_events_for_keys(room_id, event_type, state_keys).await
598 }
599
600 async fn get_profile(
601 &self,
602 room_id: &RoomId,
603 user_id: &UserId,
604 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
605 (*self).get_profile(room_id, user_id).await
606 }
607
608 async fn get_profiles<'a>(
609 &self,
610 room_id: &RoomId,
611 user_ids: &'a [OwnedUserId],
612 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
613 (*self).get_profiles(room_id, user_ids).await
614 }
615
616 async fn get_user_ids(
617 &self,
618 room_id: &RoomId,
619 memberships: RoomMemberships,
620 ) -> Result<Vec<OwnedUserId>, Self::Error> {
621 (*self).get_user_ids(room_id, memberships).await
622 }
623
624 async fn get_room_infos(
625 &self,
626 room_load_settings: &RoomLoadSettings,
627 ) -> Result<Vec<RoomInfo>, Self::Error> {
628 (*self).get_room_infos(room_load_settings).await
629 }
630
631 async fn get_users_with_display_name(
632 &self,
633 room_id: &RoomId,
634 display_name: &DisplayName,
635 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
636 (*self).get_users_with_display_name(room_id, display_name).await
637 }
638
639 async fn get_users_with_display_names<'a>(
640 &self,
641 room_id: &RoomId,
642 display_names: &'a [DisplayName],
643 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
644 (*self).get_users_with_display_names(room_id, display_names).await
645 }
646
647 async fn get_account_data_event(
648 &self,
649 event_type: GlobalAccountDataEventType,
650 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
651 (*self).get_account_data_event(event_type).await
652 }
653
654 async fn get_room_account_data_event(
655 &self,
656 room_id: &RoomId,
657 event_type: RoomAccountDataEventType,
658 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
659 (*self).get_room_account_data_event(room_id, event_type).await
660 }
661
662 async fn get_user_room_receipt_event(
663 &self,
664 room_id: &RoomId,
665 receipt_type: ReceiptType,
666 thread: ReceiptThread,
667 user_id: &UserId,
668 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
669 (*self).get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
670 }
671
672 async fn get_event_room_receipt_events(
673 &self,
674 room_id: &RoomId,
675 receipt_type: ReceiptType,
676 thread: ReceiptThread,
677 event_id: &EventId,
678 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
679 (*self).get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
680 }
681
682 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
683 (*self).get_custom_value(key).await
684 }
685
686 async fn set_custom_value(
687 &self,
688 key: &[u8],
689 value: Vec<u8>,
690 ) -> Result<Option<Vec<u8>>, Self::Error> {
691 (*self).set_custom_value(key, value).await
692 }
693
694 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
695 (*self).remove_custom_value(key).await
696 }
697
698 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
699 (*self).remove_room(room_id).await
700 }
701
702 async fn save_send_queue_request(
703 &self,
704 room_id: &RoomId,
705 transaction_id: OwnedTransactionId,
706 created_at: MilliSecondsSinceUnixEpoch,
707 request: QueuedRequestKind,
708 priority: usize,
709 ) -> Result<(), Self::Error> {
710 (*self)
711 .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
712 .await
713 }
714
715 async fn update_send_queue_request(
716 &self,
717 room_id: &RoomId,
718 transaction_id: &TransactionId,
719 content: QueuedRequestKind,
720 ) -> Result<bool, Self::Error> {
721 (*self).update_send_queue_request(room_id, transaction_id, content).await
722 }
723
724 async fn remove_send_queue_request(
725 &self,
726 room_id: &RoomId,
727 transaction_id: &TransactionId,
728 ) -> Result<bool, Self::Error> {
729 (*self).remove_send_queue_request(room_id, transaction_id).await
730 }
731
732 async fn load_send_queue_requests(
733 &self,
734 room_id: &RoomId,
735 ) -> Result<Vec<QueuedRequest>, Self::Error> {
736 (*self).load_send_queue_requests(room_id).await
737 }
738
739 async fn update_send_queue_request_status(
740 &self,
741 room_id: &RoomId,
742 transaction_id: &TransactionId,
743 error: Option<QueueWedgeError>,
744 ) -> Result<(), Self::Error> {
745 (*self).update_send_queue_request_status(room_id, transaction_id, error).await
746 }
747
748 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
749 (*self).load_rooms_with_unsent_requests().await
750 }
751
752 async fn save_dependent_queued_request(
753 &self,
754 room_id: &RoomId,
755 parent_txn_id: &TransactionId,
756 own_txn_id: ChildTransactionId,
757 created_at: MilliSecondsSinceUnixEpoch,
758 content: DependentQueuedRequestKind,
759 ) -> Result<(), Self::Error> {
760 (*self)
761 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
762 .await
763 }
764
765 async fn mark_dependent_queued_requests_as_ready(
766 &self,
767 room_id: &RoomId,
768 parent_txn_id: &TransactionId,
769 sent_parent_key: SentRequestKey,
770 ) -> Result<usize, Self::Error> {
771 (*self)
772 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
773 .await
774 }
775
776 async fn update_dependent_queued_request(
777 &self,
778 room_id: &RoomId,
779 own_transaction_id: &ChildTransactionId,
780 new_content: DependentQueuedRequestKind,
781 ) -> Result<bool, Self::Error> {
782 (*self).update_dependent_queued_request(room_id, own_transaction_id, new_content).await
783 }
784
785 async fn remove_dependent_queued_request(
786 &self,
787 room: &RoomId,
788 own_txn_id: &ChildTransactionId,
789 ) -> Result<bool, Self::Error> {
790 (*self).remove_dependent_queued_request(room, own_txn_id).await
791 }
792
793 async fn load_dependent_queued_requests(
794 &self,
795 room: &RoomId,
796 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
797 (*self).load_dependent_queued_requests(room).await
798 }
799
800 async fn upsert_thread_subscriptions(
801 &self,
802 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
803 ) -> Result<(), Self::Error> {
804 (*self).upsert_thread_subscriptions(updates).await
805 }
806
807 async fn remove_thread_subscription(
808 &self,
809 room: &RoomId,
810 thread_id: &EventId,
811 ) -> Result<(), Self::Error> {
812 (*self).remove_thread_subscription(room, thread_id).await
813 }
814
815 async fn load_thread_subscription(
816 &self,
817 room: &RoomId,
818 thread_id: &EventId,
819 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
820 (*self).load_thread_subscription(room, thread_id).await
821 }
822
823 async fn optimize(&self) -> Result<(), Self::Error> {
824 (*self).optimize().await
825 }
826
827 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
828 (*self).get_size().await
829 }
830}
831
832#[cfg_attr(target_family = "wasm", async_trait(?Send))]
833#[cfg_attr(not(target_family = "wasm"), async_trait)]
834impl<T: StateStore + ?Sized> StateStore for Arc<T> {
835 type Error = T::Error;
836
837 async fn get_kv_data(
838 &self,
839 key: StateStoreDataKey<'_>,
840 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
841 self.deref().get_kv_data(key).await
842 }
843
844 async fn set_kv_data(
845 &self,
846 key: StateStoreDataKey<'_>,
847 value: StateStoreDataValue,
848 ) -> Result<(), Self::Error> {
849 self.deref().set_kv_data(key, value).await
850 }
851
852 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
853 self.deref().remove_kv_data(key).await
854 }
855
856 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
857 self.deref().save_changes(changes).await
858 }
859
860 async fn get_presence_event(
861 &self,
862 user_id: &UserId,
863 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
864 self.deref().get_presence_event(user_id).await
865 }
866
867 async fn get_presence_events(
868 &self,
869 user_ids: &[OwnedUserId],
870 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
871 self.deref().get_presence_events(user_ids).await
872 }
873
874 async fn get_state_event(
875 &self,
876 room_id: &RoomId,
877 event_type: StateEventType,
878 state_key: &str,
879 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
880 self.deref().get_state_event(room_id, event_type, state_key).await
881 }
882
883 async fn get_state_events(
884 &self,
885 room_id: &RoomId,
886 event_type: StateEventType,
887 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
888 self.deref().get_state_events(room_id, event_type).await
889 }
890
891 async fn get_state_events_for_keys(
892 &self,
893 room_id: &RoomId,
894 event_type: StateEventType,
895 state_keys: &[&str],
896 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
897 self.deref().get_state_events_for_keys(room_id, event_type, state_keys).await
898 }
899
900 async fn get_profile(
901 &self,
902 room_id: &RoomId,
903 user_id: &UserId,
904 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
905 self.deref().get_profile(room_id, user_id).await
906 }
907
908 async fn get_profiles<'a>(
909 &self,
910 room_id: &RoomId,
911 user_ids: &'a [OwnedUserId],
912 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
913 self.deref().get_profiles(room_id, user_ids).await
914 }
915
916 async fn get_user_ids(
917 &self,
918 room_id: &RoomId,
919 memberships: RoomMemberships,
920 ) -> Result<Vec<OwnedUserId>, Self::Error> {
921 self.deref().get_user_ids(room_id, memberships).await
922 }
923
924 async fn get_room_infos(
925 &self,
926 room_load_settings: &RoomLoadSettings,
927 ) -> Result<Vec<RoomInfo>, Self::Error> {
928 self.deref().get_room_infos(room_load_settings).await
929 }
930
931 async fn get_users_with_display_name(
932 &self,
933 room_id: &RoomId,
934 display_name: &DisplayName,
935 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
936 self.deref().get_users_with_display_name(room_id, display_name).await
937 }
938
939 async fn get_users_with_display_names<'a>(
940 &self,
941 room_id: &RoomId,
942 display_names: &'a [DisplayName],
943 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
944 self.deref().get_users_with_display_names(room_id, display_names).await
945 }
946
947 async fn get_account_data_event(
948 &self,
949 event_type: GlobalAccountDataEventType,
950 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
951 self.deref().get_account_data_event(event_type).await
952 }
953
954 async fn get_room_account_data_event(
955 &self,
956 room_id: &RoomId,
957 event_type: RoomAccountDataEventType,
958 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
959 self.deref().get_room_account_data_event(room_id, event_type).await
960 }
961
962 async fn get_user_room_receipt_event(
963 &self,
964 room_id: &RoomId,
965 receipt_type: ReceiptType,
966 thread: ReceiptThread,
967 user_id: &UserId,
968 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
969 self.deref().get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
970 }
971
972 async fn get_event_room_receipt_events(
973 &self,
974 room_id: &RoomId,
975 receipt_type: ReceiptType,
976 thread: ReceiptThread,
977 event_id: &EventId,
978 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
979 self.deref().get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
980 }
981
982 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
983 self.deref().get_custom_value(key).await
984 }
985
986 async fn set_custom_value(
987 &self,
988 key: &[u8],
989 value: Vec<u8>,
990 ) -> Result<Option<Vec<u8>>, Self::Error> {
991 self.deref().set_custom_value(key, value).await
992 }
993
994 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
995 self.deref().remove_custom_value(key).await
996 }
997
998 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
999 self.deref().remove_room(room_id).await
1000 }
1001
1002 async fn save_send_queue_request(
1003 &self,
1004 room_id: &RoomId,
1005 transaction_id: OwnedTransactionId,
1006 created_at: MilliSecondsSinceUnixEpoch,
1007 request: QueuedRequestKind,
1008 priority: usize,
1009 ) -> Result<(), Self::Error> {
1010 self.deref()
1011 .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
1012 .await
1013 }
1014
1015 async fn update_send_queue_request(
1016 &self,
1017 room_id: &RoomId,
1018 transaction_id: &TransactionId,
1019 content: QueuedRequestKind,
1020 ) -> Result<bool, Self::Error> {
1021 self.deref().update_send_queue_request(room_id, transaction_id, content).await
1022 }
1023
1024 async fn remove_send_queue_request(
1025 &self,
1026 room_id: &RoomId,
1027 transaction_id: &TransactionId,
1028 ) -> Result<bool, Self::Error> {
1029 self.deref().remove_send_queue_request(room_id, transaction_id).await
1030 }
1031
1032 async fn load_send_queue_requests(
1033 &self,
1034 room_id: &RoomId,
1035 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1036 self.deref().load_send_queue_requests(room_id).await
1037 }
1038
1039 async fn update_send_queue_request_status(
1040 &self,
1041 room_id: &RoomId,
1042 transaction_id: &TransactionId,
1043 error: Option<QueueWedgeError>,
1044 ) -> Result<(), Self::Error> {
1045 self.deref().update_send_queue_request_status(room_id, transaction_id, error).await
1046 }
1047
1048 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1049 self.deref().load_rooms_with_unsent_requests().await
1050 }
1051
1052 async fn save_dependent_queued_request(
1053 &self,
1054 room_id: &RoomId,
1055 parent_txn_id: &TransactionId,
1056 own_txn_id: ChildTransactionId,
1057 created_at: MilliSecondsSinceUnixEpoch,
1058 content: DependentQueuedRequestKind,
1059 ) -> Result<(), Self::Error> {
1060 self.deref()
1061 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1062 .await
1063 }
1064
1065 async fn mark_dependent_queued_requests_as_ready(
1066 &self,
1067 room_id: &RoomId,
1068 parent_txn_id: &TransactionId,
1069 sent_parent_key: SentRequestKey,
1070 ) -> Result<usize, Self::Error> {
1071 self.deref()
1072 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1073 .await
1074 }
1075
1076 async fn update_dependent_queued_request(
1077 &self,
1078 room_id: &RoomId,
1079 own_transaction_id: &ChildTransactionId,
1080 new_content: DependentQueuedRequestKind,
1081 ) -> Result<bool, Self::Error> {
1082 self.deref().update_dependent_queued_request(room_id, own_transaction_id, new_content).await
1083 }
1084
1085 async fn remove_dependent_queued_request(
1086 &self,
1087 room: &RoomId,
1088 own_txn_id: &ChildTransactionId,
1089 ) -> Result<bool, Self::Error> {
1090 self.deref().remove_dependent_queued_request(room, own_txn_id).await
1091 }
1092
1093 async fn load_dependent_queued_requests(
1094 &self,
1095 room: &RoomId,
1096 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1097 self.deref().load_dependent_queued_requests(room).await
1098 }
1099
1100 async fn upsert_thread_subscriptions(
1101 &self,
1102 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1103 ) -> Result<(), Self::Error> {
1104 self.deref().upsert_thread_subscriptions(updates).await
1105 }
1106
1107 async fn remove_thread_subscription(
1108 &self,
1109 room: &RoomId,
1110 thread_id: &EventId,
1111 ) -> Result<(), Self::Error> {
1112 self.deref().remove_thread_subscription(room, thread_id).await
1113 }
1114
1115 async fn load_thread_subscription(
1116 &self,
1117 room: &RoomId,
1118 thread_id: &EventId,
1119 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1120 self.deref().load_thread_subscription(room, thread_id).await
1121 }
1122
1123 async fn optimize(&self) -> Result<(), Self::Error> {
1124 self.deref().optimize().await
1125 }
1126
1127 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1128 self.deref().get_size().await
1129 }
1130}
1131
1132#[repr(transparent)]
1133struct EraseStateStoreError<T>(T);
1134
1135#[cfg(not(tarpaulin_include))]
1136impl<T: fmt::Debug> fmt::Debug for EraseStateStoreError<T> {
1137 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1138 self.0.fmt(f)
1139 }
1140}
1141
1142#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1143#[cfg_attr(not(target_family = "wasm"), async_trait)]
1144impl<T: StateStore> StateStore for EraseStateStoreError<T> {
1145 type Error = StoreError;
1146
1147 async fn get_kv_data(
1148 &self,
1149 key: StateStoreDataKey<'_>,
1150 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
1151 self.0.get_kv_data(key).await.map_err(Into::into)
1152 }
1153
1154 async fn set_kv_data(
1155 &self,
1156 key: StateStoreDataKey<'_>,
1157 value: StateStoreDataValue,
1158 ) -> Result<(), Self::Error> {
1159 self.0.set_kv_data(key, value).await.map_err(Into::into)
1160 }
1161
1162 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
1163 self.0.remove_kv_data(key).await.map_err(Into::into)
1164 }
1165
1166 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
1167 self.0.save_changes(changes).await.map_err(Into::into)
1168 }
1169
1170 async fn get_presence_event(
1171 &self,
1172 user_id: &UserId,
1173 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
1174 self.0.get_presence_event(user_id).await.map_err(Into::into)
1175 }
1176
1177 async fn get_presence_events(
1178 &self,
1179 user_ids: &[OwnedUserId],
1180 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
1181 self.0.get_presence_events(user_ids).await.map_err(Into::into)
1182 }
1183
1184 async fn get_state_event(
1185 &self,
1186 room_id: &RoomId,
1187 event_type: StateEventType,
1188 state_key: &str,
1189 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
1190 self.0.get_state_event(room_id, event_type, state_key).await.map_err(Into::into)
1191 }
1192
1193 async fn get_state_events(
1194 &self,
1195 room_id: &RoomId,
1196 event_type: StateEventType,
1197 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1198 self.0.get_state_events(room_id, event_type).await.map_err(Into::into)
1199 }
1200
1201 async fn get_state_events_for_keys(
1202 &self,
1203 room_id: &RoomId,
1204 event_type: StateEventType,
1205 state_keys: &[&str],
1206 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1207 self.0.get_state_events_for_keys(room_id, event_type, state_keys).await.map_err(Into::into)
1208 }
1209
1210 async fn get_profile(
1211 &self,
1212 room_id: &RoomId,
1213 user_id: &UserId,
1214 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
1215 self.0.get_profile(room_id, user_id).await.map_err(Into::into)
1216 }
1217
1218 async fn get_profiles<'a>(
1219 &self,
1220 room_id: &RoomId,
1221 user_ids: &'a [OwnedUserId],
1222 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
1223 self.0.get_profiles(room_id, user_ids).await.map_err(Into::into)
1224 }
1225
1226 async fn get_user_ids(
1227 &self,
1228 room_id: &RoomId,
1229 memberships: RoomMemberships,
1230 ) -> Result<Vec<OwnedUserId>, Self::Error> {
1231 self.0.get_user_ids(room_id, memberships).await.map_err(Into::into)
1232 }
1233
1234 async fn get_room_infos(
1235 &self,
1236 room_load_settings: &RoomLoadSettings,
1237 ) -> Result<Vec<RoomInfo>, Self::Error> {
1238 self.0.get_room_infos(room_load_settings).await.map_err(Into::into)
1239 }
1240
1241 async fn get_users_with_display_name(
1242 &self,
1243 room_id: &RoomId,
1244 display_name: &DisplayName,
1245 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
1246 self.0.get_users_with_display_name(room_id, display_name).await.map_err(Into::into)
1247 }
1248
1249 async fn get_users_with_display_names<'a>(
1250 &self,
1251 room_id: &RoomId,
1252 display_names: &'a [DisplayName],
1253 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
1254 self.0.get_users_with_display_names(room_id, display_names).await.map_err(Into::into)
1255 }
1256
1257 async fn get_account_data_event(
1258 &self,
1259 event_type: GlobalAccountDataEventType,
1260 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
1261 self.0.get_account_data_event(event_type).await.map_err(Into::into)
1262 }
1263
1264 async fn get_room_account_data_event(
1265 &self,
1266 room_id: &RoomId,
1267 event_type: RoomAccountDataEventType,
1268 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
1269 self.0.get_room_account_data_event(room_id, event_type).await.map_err(Into::into)
1270 }
1271
1272 async fn get_user_room_receipt_event(
1273 &self,
1274 room_id: &RoomId,
1275 receipt_type: ReceiptType,
1276 thread: ReceiptThread,
1277 user_id: &UserId,
1278 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
1279 self.0
1280 .get_user_room_receipt_event(room_id, receipt_type, thread, user_id)
1281 .await
1282 .map_err(Into::into)
1283 }
1284
1285 async fn get_event_room_receipt_events(
1286 &self,
1287 room_id: &RoomId,
1288 receipt_type: ReceiptType,
1289 thread: ReceiptThread,
1290 event_id: &EventId,
1291 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
1292 self.0
1293 .get_event_room_receipt_events(room_id, receipt_type, thread, event_id)
1294 .await
1295 .map_err(Into::into)
1296 }
1297
1298 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1299 self.0.get_custom_value(key).await.map_err(Into::into)
1300 }
1301
1302 async fn set_custom_value(
1303 &self,
1304 key: &[u8],
1305 value: Vec<u8>,
1306 ) -> Result<Option<Vec<u8>>, Self::Error> {
1307 self.0.set_custom_value(key, value).await.map_err(Into::into)
1308 }
1309
1310 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1311 self.0.remove_custom_value(key).await.map_err(Into::into)
1312 }
1313
1314 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1315 self.0.remove_room(room_id).await.map_err(Into::into)
1316 }
1317
1318 async fn save_send_queue_request(
1319 &self,
1320 room_id: &RoomId,
1321 transaction_id: OwnedTransactionId,
1322 created_at: MilliSecondsSinceUnixEpoch,
1323 content: QueuedRequestKind,
1324 priority: usize,
1325 ) -> Result<(), Self::Error> {
1326 self.0
1327 .save_send_queue_request(room_id, transaction_id, created_at, content, priority)
1328 .await
1329 .map_err(Into::into)
1330 }
1331
1332 async fn update_send_queue_request(
1333 &self,
1334 room_id: &RoomId,
1335 transaction_id: &TransactionId,
1336 content: QueuedRequestKind,
1337 ) -> Result<bool, Self::Error> {
1338 self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
1339 }
1340
1341 async fn remove_send_queue_request(
1342 &self,
1343 room_id: &RoomId,
1344 transaction_id: &TransactionId,
1345 ) -> Result<bool, Self::Error> {
1346 self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into)
1347 }
1348
1349 async fn load_send_queue_requests(
1350 &self,
1351 room_id: &RoomId,
1352 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1353 self.0.load_send_queue_requests(room_id).await.map_err(Into::into)
1354 }
1355
1356 async fn update_send_queue_request_status(
1357 &self,
1358 room_id: &RoomId,
1359 transaction_id: &TransactionId,
1360 error: Option<QueueWedgeError>,
1361 ) -> Result<(), Self::Error> {
1362 self.0
1363 .update_send_queue_request_status(room_id, transaction_id, error)
1364 .await
1365 .map_err(Into::into)
1366 }
1367
1368 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1369 self.0.load_rooms_with_unsent_requests().await.map_err(Into::into)
1370 }
1371
1372 async fn save_dependent_queued_request(
1373 &self,
1374 room_id: &RoomId,
1375 parent_txn_id: &TransactionId,
1376 own_txn_id: ChildTransactionId,
1377 created_at: MilliSecondsSinceUnixEpoch,
1378 content: DependentQueuedRequestKind,
1379 ) -> Result<(), Self::Error> {
1380 self.0
1381 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1382 .await
1383 .map_err(Into::into)
1384 }
1385
1386 async fn mark_dependent_queued_requests_as_ready(
1387 &self,
1388 room_id: &RoomId,
1389 parent_txn_id: &TransactionId,
1390 sent_parent_key: SentRequestKey,
1391 ) -> Result<usize, Self::Error> {
1392 self.0
1393 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1394 .await
1395 .map_err(Into::into)
1396 }
1397
1398 async fn remove_dependent_queued_request(
1399 &self,
1400 room_id: &RoomId,
1401 own_txn_id: &ChildTransactionId,
1402 ) -> Result<bool, Self::Error> {
1403 self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into)
1404 }
1405
1406 async fn load_dependent_queued_requests(
1407 &self,
1408 room_id: &RoomId,
1409 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1410 self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
1411 }
1412
1413 async fn update_dependent_queued_request(
1414 &self,
1415 room_id: &RoomId,
1416 own_transaction_id: &ChildTransactionId,
1417 new_content: DependentQueuedRequestKind,
1418 ) -> Result<bool, Self::Error> {
1419 self.0
1420 .update_dependent_queued_request(room_id, own_transaction_id, new_content)
1421 .await
1422 .map_err(Into::into)
1423 }
1424
1425 async fn upsert_thread_subscriptions(
1426 &self,
1427 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1428 ) -> Result<(), Self::Error> {
1429 self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
1430 }
1431
1432 async fn load_thread_subscription(
1433 &self,
1434 room: &RoomId,
1435 thread_id: &EventId,
1436 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1437 self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
1438 }
1439
1440 async fn remove_thread_subscription(
1441 &self,
1442 room: &RoomId,
1443 thread_id: &EventId,
1444 ) -> Result<(), Self::Error> {
1445 self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
1446 }
1447
1448 async fn optimize(&self) -> Result<(), Self::Error> {
1449 self.0.optimize().await.map_err(Into::into)
1450 }
1451
1452 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1453 self.0.get_size().await.map_err(Into::into)
1454 }
1455}
1456
1457#[derive(Debug, Clone)]
1460pub struct SaveLockedStateStore<T = Arc<DynStateStore>> {
1461 store: T,
1462 lock: Arc<Mutex<()>>,
1463}
1464
1465#[derive(Debug, Error)]
1469#[error("a mutex guard was provided, but it does not reference the correct mutex")]
1470pub struct IncorrectMutexGuardError;
1471
1472impl From<IncorrectMutexGuardError> for StoreError {
1473 fn from(value: IncorrectMutexGuardError) -> Self {
1474 Self::backend(value)
1475 }
1476}
1477
1478impl<T> SaveLockedStateStore<T> {
1479 pub fn new(store: T) -> Self {
1481 Self { store, lock: Arc::new(Mutex::new(())) }
1482 }
1483
1484 pub fn lock(&self) -> &Mutex<()> {
1487 self.lock.as_ref()
1488 }
1489}
1490
1491impl<T: StateStore> SaveLockedStateStore<T> {
1492 pub async fn save_changes_with_guard(
1497 &self,
1498 guard: &MutexGuard<'_, ()>,
1499 changes: &StateChanges,
1500 ) -> Result<(), StoreError> {
1501 if !std::ptr::eq(MutexGuard::mutex(guard), self.lock()) {
1502 Err(IncorrectMutexGuardError.into())
1503 } else {
1504 self.store.save_changes(changes).await.map_err(Into::into)
1505 }
1506 }
1507}
1508
1509#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1510#[cfg_attr(not(target_family = "wasm"), async_trait)]
1511impl<T: StateStore> StateStore for SaveLockedStateStore<T> {
1512 type Error = T::Error;
1513
1514 async fn get_kv_data(
1515 &self,
1516 key: StateStoreDataKey<'_>,
1517 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
1518 self.store.get_kv_data(key).await
1519 }
1520
1521 async fn set_kv_data(
1522 &self,
1523 key: StateStoreDataKey<'_>,
1524 value: StateStoreDataValue,
1525 ) -> Result<(), Self::Error> {
1526 self.store.set_kv_data(key, value).await
1527 }
1528
1529 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
1530 self.store.remove_kv_data(key).await
1531 }
1532
1533 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
1534 let _guard = self.lock.lock().await;
1535 self.store.save_changes(changes).await
1536 }
1537
1538 async fn get_presence_event(
1539 &self,
1540 user_id: &UserId,
1541 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
1542 self.store.get_presence_event(user_id).await
1543 }
1544
1545 async fn get_presence_events(
1546 &self,
1547 user_ids: &[OwnedUserId],
1548 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
1549 self.store.get_presence_events(user_ids).await
1550 }
1551
1552 async fn get_state_event(
1553 &self,
1554 room_id: &RoomId,
1555 event_type: StateEventType,
1556 state_key: &str,
1557 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
1558 self.store.get_state_event(room_id, event_type, state_key).await
1559 }
1560
1561 async fn get_state_events(
1562 &self,
1563 room_id: &RoomId,
1564 event_type: StateEventType,
1565 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1566 self.store.get_state_events(room_id, event_type).await
1567 }
1568
1569 async fn get_state_events_for_keys(
1570 &self,
1571 room_id: &RoomId,
1572 event_type: StateEventType,
1573 state_keys: &[&str],
1574 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1575 self.store.get_state_events_for_keys(room_id, event_type, state_keys).await
1576 }
1577
1578 async fn get_profile(
1579 &self,
1580 room_id: &RoomId,
1581 user_id: &UserId,
1582 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
1583 self.store.get_profile(room_id, user_id).await
1584 }
1585
1586 async fn get_profiles<'a>(
1587 &self,
1588 room_id: &RoomId,
1589 user_ids: &'a [OwnedUserId],
1590 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
1591 self.store.get_profiles(room_id, user_ids).await
1592 }
1593
1594 async fn get_user_ids(
1595 &self,
1596 room_id: &RoomId,
1597 memberships: RoomMemberships,
1598 ) -> Result<Vec<OwnedUserId>, Self::Error> {
1599 self.store.get_user_ids(room_id, memberships).await
1600 }
1601
1602 async fn get_room_infos(
1603 &self,
1604 room_load_settings: &RoomLoadSettings,
1605 ) -> Result<Vec<RoomInfo>, Self::Error> {
1606 self.store.get_room_infos(room_load_settings).await
1607 }
1608
1609 async fn get_users_with_display_name(
1610 &self,
1611 room_id: &RoomId,
1612 display_name: &DisplayName,
1613 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
1614 self.store.get_users_with_display_name(room_id, display_name).await
1615 }
1616
1617 async fn get_users_with_display_names<'a>(
1618 &self,
1619 room_id: &RoomId,
1620 display_names: &'a [DisplayName],
1621 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
1622 self.store.get_users_with_display_names(room_id, display_names).await
1623 }
1624
1625 async fn get_account_data_event(
1626 &self,
1627 event_type: GlobalAccountDataEventType,
1628 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
1629 self.store.get_account_data_event(event_type).await
1630 }
1631
1632 async fn get_room_account_data_event(
1633 &self,
1634 room_id: &RoomId,
1635 event_type: RoomAccountDataEventType,
1636 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
1637 self.store.get_room_account_data_event(room_id, event_type).await
1638 }
1639
1640 async fn get_user_room_receipt_event(
1641 &self,
1642 room_id: &RoomId,
1643 receipt_type: ReceiptType,
1644 thread: ReceiptThread,
1645 user_id: &UserId,
1646 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
1647 self.store.get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
1648 }
1649
1650 async fn get_event_room_receipt_events(
1651 &self,
1652 room_id: &RoomId,
1653 receipt_type: ReceiptType,
1654 thread: ReceiptThread,
1655 event_id: &EventId,
1656 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
1657 self.store.get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
1658 }
1659
1660 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1661 self.store.get_custom_value(key).await
1662 }
1663
1664 async fn set_custom_value(
1665 &self,
1666 key: &[u8],
1667 value: Vec<u8>,
1668 ) -> Result<Option<Vec<u8>>, Self::Error> {
1669 self.store.set_custom_value(key, value).await
1670 }
1671
1672 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1673 self.store.remove_custom_value(key).await
1674 }
1675
1676 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1677 self.store.remove_room(room_id).await
1678 }
1679
1680 async fn save_send_queue_request(
1681 &self,
1682 room_id: &RoomId,
1683 transaction_id: OwnedTransactionId,
1684 created_at: MilliSecondsSinceUnixEpoch,
1685 request: QueuedRequestKind,
1686 priority: usize,
1687 ) -> Result<(), Self::Error> {
1688 self.store
1689 .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
1690 .await
1691 }
1692
1693 async fn update_send_queue_request(
1694 &self,
1695 room_id: &RoomId,
1696 transaction_id: &TransactionId,
1697 content: QueuedRequestKind,
1698 ) -> Result<bool, Self::Error> {
1699 self.store.update_send_queue_request(room_id, transaction_id, content).await
1700 }
1701
1702 async fn remove_send_queue_request(
1703 &self,
1704 room_id: &RoomId,
1705 transaction_id: &TransactionId,
1706 ) -> Result<bool, Self::Error> {
1707 self.store.remove_send_queue_request(room_id, transaction_id).await
1708 }
1709
1710 async fn load_send_queue_requests(
1711 &self,
1712 room_id: &RoomId,
1713 ) -> Result<Vec<QueuedRequest>, Self::Error> {
1714 self.store.load_send_queue_requests(room_id).await
1715 }
1716
1717 async fn update_send_queue_request_status(
1718 &self,
1719 room_id: &RoomId,
1720 transaction_id: &TransactionId,
1721 error: Option<QueueWedgeError>,
1722 ) -> Result<(), Self::Error> {
1723 self.store.update_send_queue_request_status(room_id, transaction_id, error).await
1724 }
1725
1726 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1727 self.store.load_rooms_with_unsent_requests().await
1728 }
1729
1730 async fn save_dependent_queued_request(
1731 &self,
1732 room_id: &RoomId,
1733 parent_txn_id: &TransactionId,
1734 own_txn_id: ChildTransactionId,
1735 created_at: MilliSecondsSinceUnixEpoch,
1736 content: DependentQueuedRequestKind,
1737 ) -> Result<(), Self::Error> {
1738 self.store
1739 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1740 .await
1741 }
1742
1743 async fn mark_dependent_queued_requests_as_ready(
1744 &self,
1745 room_id: &RoomId,
1746 parent_txn_id: &TransactionId,
1747 sent_parent_key: SentRequestKey,
1748 ) -> Result<usize, Self::Error> {
1749 self.store
1750 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1751 .await
1752 }
1753
1754 async fn update_dependent_queued_request(
1755 &self,
1756 room_id: &RoomId,
1757 own_transaction_id: &ChildTransactionId,
1758 new_content: DependentQueuedRequestKind,
1759 ) -> Result<bool, Self::Error> {
1760 self.store.update_dependent_queued_request(room_id, own_transaction_id, new_content).await
1761 }
1762
1763 async fn remove_dependent_queued_request(
1764 &self,
1765 room: &RoomId,
1766 own_txn_id: &ChildTransactionId,
1767 ) -> Result<bool, Self::Error> {
1768 self.store.remove_dependent_queued_request(room, own_txn_id).await
1769 }
1770
1771 async fn load_dependent_queued_requests(
1772 &self,
1773 room: &RoomId,
1774 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1775 self.store.load_dependent_queued_requests(room).await
1776 }
1777
1778 async fn upsert_thread_subscriptions(
1779 &self,
1780 updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1781 ) -> Result<(), Self::Error> {
1782 self.store.upsert_thread_subscriptions(updates).await
1783 }
1784
1785 async fn remove_thread_subscription(
1786 &self,
1787 room: &RoomId,
1788 thread_id: &EventId,
1789 ) -> Result<(), Self::Error> {
1790 self.store.remove_thread_subscription(room, thread_id).await
1791 }
1792
1793 async fn load_thread_subscription(
1794 &self,
1795 room: &RoomId,
1796 thread_id: &EventId,
1797 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1798 self.store.load_thread_subscription(room, thread_id).await
1799 }
1800
1801 async fn optimize(&self) -> Result<(), Self::Error> {
1802 self.store.optimize().await
1803 }
1804
1805 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1806 self.store.get_size().await
1807 }
1808}
1809
1810#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1812#[cfg_attr(not(target_family = "wasm"), async_trait)]
1813pub trait StateStoreExt: StateStore {
1814 async fn get_state_event_static<C>(
1820 &self,
1821 room_id: &RoomId,
1822 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
1823 where
1824 C: StaticEventContent<IsPrefix = ruma::events::False>
1825 + StaticStateEventContent<StateKey = EmptyStateKey>
1826 + RedactContent,
1827 C::Redacted: RedactedStateEventContent,
1828 {
1829 Ok(self.get_state_event(room_id, C::TYPE.into(), "").await?.map(|raw| raw.cast()))
1830 }
1831
1832 async fn get_state_event_static_for_key<C, K>(
1838 &self,
1839 room_id: &RoomId,
1840 state_key: &K,
1841 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
1842 where
1843 C: StaticEventContent<IsPrefix = ruma::events::False>
1844 + StaticStateEventContent
1845 + RedactContent,
1846 C::StateKey: Borrow<K>,
1847 C::Redacted: RedactedStateEventContent,
1848 K: AsRef<str> + ?Sized + Sync,
1849 {
1850 Ok(self
1851 .get_state_event(room_id, C::TYPE.into(), state_key.as_ref())
1852 .await?
1853 .map(|raw| raw.cast()))
1854 }
1855
1856 async fn get_state_events_static<C>(
1862 &self,
1863 room_id: &RoomId,
1864 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
1865 where
1866 C: StaticEventContent<IsPrefix = ruma::events::False>
1867 + StaticStateEventContent
1868 + RedactContent,
1869 C::Redacted: RedactedStateEventContent,
1870 {
1871 Ok(self
1873 .get_state_events(room_id, C::TYPE.into())
1874 .await?
1875 .into_iter()
1876 .map(|raw| raw.cast())
1877 .collect())
1878 }
1879
1880 async fn get_state_events_for_keys_static<'a, C, K, I>(
1889 &self,
1890 room_id: &RoomId,
1891 state_keys: I,
1892 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
1893 where
1894 C: StaticEventContent<IsPrefix = ruma::events::False>
1895 + StaticStateEventContent
1896 + RedactContent,
1897 C::StateKey: Borrow<K>,
1898 C::Redacted: RedactedStateEventContent,
1899 K: AsRef<str> + Sized + Sync + 'a,
1900 I: IntoIterator<Item = &'a K> + Send,
1901 I::IntoIter: Send,
1902 {
1903 Ok(self
1904 .get_state_events_for_keys(
1905 room_id,
1906 C::TYPE.into(),
1907 &state_keys.into_iter().map(|k| k.as_ref()).collect::<Vec<_>>(),
1908 )
1909 .await?
1910 .into_iter()
1911 .map(|raw| raw.cast())
1912 .collect())
1913 }
1914
1915 async fn get_account_data_event_static<C>(
1917 &self,
1918 ) -> Result<Option<Raw<GlobalAccountDataEvent<C>>>, Self::Error>
1919 where
1920 C: StaticEventContent<IsPrefix = ruma::events::False> + GlobalAccountDataEventContent,
1921 {
1922 Ok(self.get_account_data_event(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1923 }
1924
1925 async fn get_room_account_data_event_static<C>(
1933 &self,
1934 room_id: &RoomId,
1935 ) -> Result<Option<Raw<RoomAccountDataEvent<C>>>, Self::Error>
1936 where
1937 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1938 {
1939 Ok(self
1940 .get_room_account_data_event(room_id, C::TYPE.into())
1941 .await?
1942 .map(Raw::cast_unchecked))
1943 }
1944
1945 async fn get_member_event(
1953 &self,
1954 room_id: &RoomId,
1955 state_key: &UserId,
1956 ) -> Result<Option<RawMemberEvent>, Self::Error> {
1957 self.get_state_event_static_for_key(room_id, state_key).await
1958 }
1959}
1960
1961#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1962#[cfg_attr(not(target_family = "wasm"), async_trait)]
1963impl<T: StateStore + ?Sized> StateStoreExt for T {}
1964
1965pub type DynStateStore = dyn StateStore<Error = StoreError>;
1967
1968pub trait IntoStateStore {
1974 #[doc(hidden)]
1975 fn into_state_store(self) -> Arc<DynStateStore>;
1976}
1977
1978impl<T> IntoStateStore for T
1979where
1980 T: StateStore + Sized + 'static,
1981{
1982 fn into_state_store(self) -> Arc<DynStateStore> {
1983 Arc::new(EraseStateStoreError(self))
1984 }
1985}
1986
1987#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1989pub struct SupportedVersionsResponse {
1990 pub versions: Vec<String>,
1992
1993 pub unstable_features: BTreeMap<String, bool>,
1995}
1996
1997impl SupportedVersionsResponse {
1998 pub fn supported_versions(&self) -> SupportedVersions {
2004 let mut supported_versions =
2005 SupportedVersions::from_parts(&self.versions, &self.unstable_features);
2006
2007 if supported_versions.versions.is_empty() {
2010 supported_versions.versions.insert(MatrixVersion::V1_0);
2011 }
2012
2013 supported_versions
2014 }
2015}
2016
2017#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2018pub struct WellKnownResponse {
2020 pub homeserver: HomeserverInfo,
2022
2023 pub identity_server: Option<IdentityServerInfo>,
2025
2026 pub tile_server: Option<TileServerInfo>,
2028
2029 pub rtc_foci: Vec<RtcFocusInfo>,
2031}
2032
2033impl From<discover_homeserver::Response> for WellKnownResponse {
2034 fn from(response: discover_homeserver::Response) -> Self {
2035 Self {
2036 homeserver: response.homeserver,
2037 identity_server: response.identity_server,
2038 tile_server: response.tile_server,
2039 rtc_foci: response.rtc_foci,
2040 }
2041 }
2042}
2043
2044#[derive(Debug, Clone)]
2046pub enum StateStoreDataValue {
2047 SyncToken(String),
2049
2050 SupportedVersions(TtlValue<SupportedVersionsResponse>),
2052
2053 WellKnown(TtlValue<Option<WellKnownResponse>>),
2055
2056 Filter(String),
2058
2059 UserAvatarUrl(OwnedMxcUri),
2061
2062 RecentlyVisitedRooms(Vec<OwnedRoomId>),
2064
2065 UtdHookManagerData(GrowableBloom),
2068
2069 OneTimeKeyAlreadyUploaded,
2072
2073 ComposerDraft(ComposerDraft),
2078
2079 SeenKnockRequests(BTreeMap<OwnedEventId, OwnedUserId>),
2081
2082 ThreadSubscriptionsCatchupTokens(Vec<ThreadSubscriptionCatchupToken>),
2087
2088 HomeserverCapabilities(TtlValue<Capabilities>),
2090}
2091
2092#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2102pub struct ThreadSubscriptionCatchupToken {
2103 pub from: String,
2109
2110 pub to: Option<String>,
2116}
2117
2118#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2120pub struct ComposerDraft {
2121 pub plain_text: String,
2123 pub html_text: Option<String>,
2126 pub draft_type: ComposerDraftType,
2128 #[serde(default)]
2130 pub attachments: Vec<DraftAttachment>,
2131}
2132
2133#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2135pub struct DraftAttachment {
2136 pub filename: String,
2138 pub content: DraftAttachmentContent,
2140}
2141
2142#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2144#[serde(tag = "type")]
2145pub enum DraftAttachmentContent {
2146 Image {
2148 data: Vec<u8>,
2150 mimetype: Option<String>,
2152 size: Option<u64>,
2154 width: Option<u64>,
2156 height: Option<u64>,
2158 blurhash: Option<String>,
2160 thumbnail: Option<DraftThumbnail>,
2162 },
2163 Video {
2165 data: Vec<u8>,
2167 mimetype: Option<String>,
2169 size: Option<u64>,
2171 width: Option<u64>,
2173 height: Option<u64>,
2175 duration: Option<std::time::Duration>,
2177 blurhash: Option<String>,
2179 thumbnail: Option<DraftThumbnail>,
2181 },
2182 Audio {
2184 data: Vec<u8>,
2186 mimetype: Option<String>,
2188 size: Option<u64>,
2190 duration: Option<std::time::Duration>,
2192 },
2193 File {
2195 data: Vec<u8>,
2197 mimetype: Option<String>,
2199 size: Option<u64>,
2201 },
2202}
2203
2204#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2206pub struct DraftThumbnail {
2207 pub filename: String,
2209 pub data: Vec<u8>,
2211 pub mimetype: Option<String>,
2213 pub width: Option<u64>,
2215 pub height: Option<u64>,
2217 pub size: Option<u64>,
2219}
2220
2221#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2223pub enum ComposerDraftType {
2224 NewMessage,
2226 Reply {
2228 event_id: OwnedEventId,
2230 },
2231 Edit {
2233 event_id: OwnedEventId,
2235 },
2236}
2237
2238impl StateStoreDataValue {
2239 pub fn into_sync_token(self) -> Option<String> {
2241 as_variant!(self, Self::SyncToken)
2242 }
2243
2244 pub fn into_filter(self) -> Option<String> {
2246 as_variant!(self, Self::Filter)
2247 }
2248
2249 pub fn into_user_avatar_url(self) -> Option<OwnedMxcUri> {
2251 as_variant!(self, Self::UserAvatarUrl)
2252 }
2253
2254 pub fn into_recently_visited_rooms(self) -> Option<Vec<OwnedRoomId>> {
2256 as_variant!(self, Self::RecentlyVisitedRooms)
2257 }
2258
2259 pub fn into_utd_hook_manager_data(self) -> Option<GrowableBloom> {
2261 as_variant!(self, Self::UtdHookManagerData)
2262 }
2263
2264 pub fn into_composer_draft(self) -> Option<ComposerDraft> {
2266 as_variant!(self, Self::ComposerDraft)
2267 }
2268
2269 pub fn into_supported_versions(self) -> Option<TtlValue<SupportedVersionsResponse>> {
2271 as_variant!(self, Self::SupportedVersions)
2272 }
2273
2274 pub fn into_well_known(self) -> Option<TtlValue<Option<WellKnownResponse>>> {
2276 as_variant!(self, Self::WellKnown)
2277 }
2278
2279 pub fn into_seen_knock_requests(self) -> Option<BTreeMap<OwnedEventId, OwnedUserId>> {
2281 as_variant!(self, Self::SeenKnockRequests)
2282 }
2283
2284 pub fn into_thread_subscriptions_catchup_tokens(
2287 self,
2288 ) -> Option<Vec<ThreadSubscriptionCatchupToken>> {
2289 as_variant!(self, Self::ThreadSubscriptionsCatchupTokens)
2290 }
2291
2292 pub fn into_homeserver_capabilities(self) -> Option<TtlValue<Capabilities>> {
2295 as_variant!(self, Self::HomeserverCapabilities)
2296 }
2297}
2298
2299#[derive(Debug, Clone, Copy)]
2301pub enum StateStoreDataKey<'a> {
2302 SyncToken,
2304
2305 SupportedVersions,
2307
2308 WellKnown,
2310
2311 Filter(&'a str),
2313
2314 UserAvatarUrl(&'a UserId),
2316
2317 RecentlyVisitedRooms(&'a UserId),
2319
2320 UtdHookManagerData,
2323
2324 OneTimeKeyAlreadyUploaded,
2327
2328 ComposerDraft(&'a RoomId, Option<&'a EventId>),
2333
2334 SeenKnockRequests(&'a RoomId),
2336
2337 ThreadSubscriptionsCatchupTokens,
2339
2340 HomeserverCapabilities,
2342}
2343
2344impl StateStoreDataKey<'_> {
2345 pub const SYNC_TOKEN: &'static str = "sync_token";
2347
2348 pub const SUPPORTED_VERSIONS: &'static str = "server_capabilities"; pub const WELL_KNOWN: &'static str = "well_known";
2355
2356 pub const FILTER: &'static str = "filter";
2358
2359 pub const USER_AVATAR_URL: &'static str = "user_avatar_url";
2362
2363 pub const RECENTLY_VISITED_ROOMS: &'static str = "recently_visited_rooms";
2366
2367 pub const UTD_HOOK_MANAGER_DATA: &'static str = "utd_hook_manager_data";
2370
2371 pub const ONE_TIME_KEY_ALREADY_UPLOADED: &'static str = "one_time_key_already_uploaded";
2374
2375 pub const COMPOSER_DRAFT: &'static str = "composer_draft";
2378
2379 pub const SEEN_KNOCK_REQUESTS: &'static str = "seen_knock_requests";
2382
2383 pub const THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS: &'static str =
2386 "thread_subscriptions_catchup_tokens";
2387
2388 pub const HOMESERVER_CAPABILITIES: &'static str = "homeserver_capabilities";
2390}
2391
2392pub fn compare_thread_subscription_bump_stamps(
2401 previous: Option<u64>,
2402 new: &mut Option<u64>,
2403) -> bool {
2404 match (previous, &new) {
2405 (Some(prev_bump), None) => {
2408 *new = Some(prev_bump);
2409 }
2410
2411 (Some(prev_bump), Some(new_bump)) if *new_bump <= prev_bump => {
2413 return false;
2414 }
2415
2416 _ => {}
2418 }
2419
2420 true
2421}
2422
2423#[cfg(test)]
2424mod tests {
2425 mod save_locked_state_store {
2426 use std::time::Duration;
2427
2428 use assert_matches::assert_matches;
2429 use futures_util::future::{self, Either};
2430 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
2431 use gloo_timers::future::sleep;
2432 use matrix_sdk_common::executor::spawn;
2433 use matrix_sdk_test::async_test;
2434 use tokio::sync::Mutex;
2435 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
2436 use tokio::time::sleep;
2437
2438 use crate::{
2439 StateChanges, StateStore,
2440 store::{IntoStateStore, MemoryStore, Result, SaveLockedStateStore},
2441 };
2442
2443 async fn get_store() -> Result<impl StateStore> {
2444 Ok(SaveLockedStateStore::new(MemoryStore::new()))
2445 }
2446
2447 statestore_integration_tests!();
2448
2449 #[async_test]
2450 async fn test_state_store_only_accepts_guard_for_underlying_mutex() {
2451 let state_store = SaveLockedStateStore::new(MemoryStore::new());
2452 let state_changes = StateChanges::default();
2453 state_store
2454 .save_changes_with_guard(&state_store.lock().lock().await, &state_changes)
2455 .await
2456 .expect("state store accepts guard for underlying mutex");
2457
2458 let mutex = Mutex::new(());
2459 state_store
2460 .save_changes_with_guard(&mutex.lock().await, &state_changes)
2461 .await
2462 .expect_err("state store does not accept guard for unknown mutex");
2463 }
2464
2465 #[derive(Debug)]
2466 struct Elapsed;
2467
2468 async fn timeout<F: Future + Unpin>(
2469 duration: Duration,
2470 f: F,
2471 ) -> Result<F::Output, Elapsed> {
2472 #[cfg(all(target_family = "wasm", target_os = "unknown"))]
2473 {
2474 match future::select(sleep(duration), f).await {
2475 Either::Left(_) => return Err(Elapsed),
2476 Either::Right((output, _)) => Ok(output),
2477 }
2478 }
2479 #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
2480 {
2481 tokio::time::timeout(duration, f).await.map_err(|_| Elapsed)
2482 }
2483 }
2484
2485 #[async_test]
2486 async fn test_state_store_waits_to_acquire_lock_before_saving_changes() {
2487 let state_store = SaveLockedStateStore::new(MemoryStore::new().into_state_store());
2488
2489 let lock_task = spawn({
2491 let state_store = state_store.clone();
2492 async move {
2493 let lock = state_store.lock();
2494 let _guard = lock.lock().await;
2495 sleep(Duration::from_secs(5)).await;
2496 }
2497 });
2498
2499 let save_task =
2501 spawn(async move { state_store.save_changes(&StateChanges::default()).await });
2502
2503 assert_matches!(future::select(lock_task, save_task).await, Either::Left((_, save_task)) => {
2506 timeout(Duration::from_millis(100), save_task)
2507 .await
2508 .expect("task completes before timeout")
2509 .expect("task completes successfully")
2510 .expect("task saves changes");
2511 });
2512 }
2513 }
2514}