1use std::{
16 borrow::Borrow,
17 collections::{BTreeMap, BTreeSet, HashMap},
18 fmt,
19 sync::Arc,
20};
21
22use as_variant::as_variant;
23use async_trait::async_trait;
24use growable_bloom_filter::GrowableBloom;
25use matrix_sdk_common::AsyncTraitDeps;
26use ruma::{
27 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
28 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
29 api::{
30 SupportedVersions,
31 client::discovery::discover_homeserver::{
32 self, HomeserverInfo, IdentityServerInfo, RtcFocusInfo, TileServerInfo,
33 },
34 },
35 events::{
36 AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, EmptyStateKey, GlobalAccountDataEvent,
37 GlobalAccountDataEventContent, GlobalAccountDataEventType, RedactContent,
38 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
39 RoomAccountDataEventType, StateEventType, StaticEventContent, StaticStateEventContent,
40 presence::PresenceEvent,
41 receipt::{Receipt, ReceiptThread, ReceiptType},
42 },
43 serde::Raw,
44 time::SystemTime,
45};
46use serde::{Deserialize, Serialize};
47
48use super::{
49 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
50 QueuedRequest, QueuedRequestKind, RoomLoadSettings, StateChanges, StoreError,
51 send_queue::SentRequestKey,
52};
53use crate::{
54 MinimalRoomMemberEvent, RoomInfo, RoomMemberships,
55 deserialized_responses::{
56 DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
57 },
58 store::StoredThreadSubscription,
59};
60
61#[cfg_attr(target_family = "wasm", async_trait(?Send))]
64#[cfg_attr(not(target_family = "wasm"), async_trait)]
65pub trait StateStore: AsyncTraitDeps {
66 type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error>;
68
69 async fn get_kv_data(
75 &self,
76 key: StateStoreDataKey<'_>,
77 ) -> Result<Option<StateStoreDataValue>, Self::Error>;
78
79 async fn set_kv_data(
89 &self,
90 key: StateStoreDataKey<'_>,
91 value: StateStoreDataValue,
92 ) -> Result<(), Self::Error>;
93
94 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error>;
100
101 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error>;
103
104 async fn get_presence_event(
111 &self,
112 user_id: &UserId,
113 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;
114
115 async fn get_presence_events(
121 &self,
122 user_ids: &[OwnedUserId],
123 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;
124
125 async fn get_state_event(
133 &self,
134 room_id: &RoomId,
135 event_type: StateEventType,
136 state_key: &str,
137 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error>;
138
139 async fn get_state_events(
147 &self,
148 room_id: &RoomId,
149 event_type: StateEventType,
150 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
151
152 async fn get_state_events_for_keys(
163 &self,
164 room_id: &RoomId,
165 event_type: StateEventType,
166 state_keys: &[&str],
167 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
168
169 async fn get_profile(
177 &self,
178 room_id: &RoomId,
179 user_id: &UserId,
180 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error>;
181
182 async fn get_profiles<'a>(
190 &self,
191 room_id: &RoomId,
192 user_ids: &'a [OwnedUserId],
193 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error>;
194
195 async fn get_user_ids(
198 &self,
199 room_id: &RoomId,
200 memberships: RoomMemberships,
201 ) -> Result<Vec<OwnedUserId>, Self::Error>;
202
203 async fn get_room_infos(
205 &self,
206 room_load_settings: &RoomLoadSettings,
207 ) -> Result<Vec<RoomInfo>, Self::Error>;
208
209 async fn get_users_with_display_name(
218 &self,
219 room_id: &RoomId,
220 display_name: &DisplayName,
221 ) -> Result<BTreeSet<OwnedUserId>, Self::Error>;
222
223 async fn get_users_with_display_names<'a>(
231 &self,
232 room_id: &RoomId,
233 display_names: &'a [DisplayName],
234 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error>;
235
236 async fn get_account_data_event(
242 &self,
243 event_type: GlobalAccountDataEventType,
244 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error>;
245
246 async fn get_room_account_data_event(
256 &self,
257 room_id: &RoomId,
258 event_type: RoomAccountDataEventType,
259 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error>;
260
261 async fn get_user_room_receipt_event(
274 &self,
275 room_id: &RoomId,
276 receipt_type: ReceiptType,
277 thread: ReceiptThread,
278 user_id: &UserId,
279 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error>;
280
281 async fn get_event_room_receipt_events(
295 &self,
296 room_id: &RoomId,
297 receipt_type: ReceiptType,
298 thread: ReceiptThread,
299 event_id: &EventId,
300 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error>;
301
302 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
308
309 async fn set_custom_value(
318 &self,
319 key: &[u8],
320 value: Vec<u8>,
321 ) -> Result<Option<Vec<u8>>, Self::Error>;
322
323 async fn set_custom_value_no_read(
337 &self,
338 key: &[u8],
339 value: Vec<u8>,
340 ) -> Result<(), Self::Error> {
341 self.set_custom_value(key, value).await.map(|_| ())
342 }
343
344 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
350
351 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>;
357
358 async fn save_send_queue_request(
368 &self,
369 room_id: &RoomId,
370 transaction_id: OwnedTransactionId,
371 created_at: MilliSecondsSinceUnixEpoch,
372 request: QueuedRequestKind,
373 priority: usize,
374 ) -> Result<(), Self::Error>;
375
376 async fn update_send_queue_request(
388 &self,
389 room_id: &RoomId,
390 transaction_id: &TransactionId,
391 content: QueuedRequestKind,
392 ) -> Result<bool, Self::Error>;
393
394 async fn remove_send_queue_request(
400 &self,
401 room_id: &RoomId,
402 transaction_id: &TransactionId,
403 ) -> Result<bool, Self::Error>;
404
405 async fn load_send_queue_requests(
411 &self,
412 room_id: &RoomId,
413 ) -> Result<Vec<QueuedRequest>, Self::Error>;
414
415 async fn update_send_queue_request_status(
418 &self,
419 room_id: &RoomId,
420 transaction_id: &TransactionId,
421 error: Option<QueueWedgeError>,
422 ) -> Result<(), Self::Error>;
423
424 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
426
427 async fn save_dependent_queued_request(
430 &self,
431 room_id: &RoomId,
432 parent_txn_id: &TransactionId,
433 own_txn_id: ChildTransactionId,
434 created_at: MilliSecondsSinceUnixEpoch,
435 content: DependentQueuedRequestKind,
436 ) -> Result<(), Self::Error>;
437
438 async fn mark_dependent_queued_requests_as_ready(
447 &self,
448 room_id: &RoomId,
449 parent_txn_id: &TransactionId,
450 sent_parent_key: SentRequestKey,
451 ) -> Result<usize, Self::Error>;
452
453 async fn update_dependent_queued_request(
457 &self,
458 room_id: &RoomId,
459 own_transaction_id: &ChildTransactionId,
460 new_content: DependentQueuedRequestKind,
461 ) -> Result<bool, Self::Error>;
462
463 async fn remove_dependent_queued_request(
468 &self,
469 room: &RoomId,
470 own_txn_id: &ChildTransactionId,
471 ) -> Result<bool, Self::Error>;
472
473 async fn load_dependent_queued_requests(
479 &self,
480 room: &RoomId,
481 ) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
482
483 async fn upsert_thread_subscription(
493 &self,
494 room: &RoomId,
495 thread_id: &EventId,
496 subscription: StoredThreadSubscription,
497 ) -> Result<(), Self::Error>;
498
499 async fn remove_thread_subscription(
503 &self,
504 room: &RoomId,
505 thread_id: &EventId,
506 ) -> Result<(), Self::Error>;
507
508 async fn load_thread_subscription(
512 &self,
513 room: &RoomId,
514 thread_id: &EventId,
515 ) -> Result<Option<StoredThreadSubscription>, Self::Error>;
516
517 #[doc(hidden)]
523 async fn optimize(&self) -> Result<(), Self::Error>;
524
525 async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
527}
528
529#[repr(transparent)]
530struct EraseStateStoreError<T>(T);
531
532#[cfg(not(tarpaulin_include))]
533impl<T: fmt::Debug> fmt::Debug for EraseStateStoreError<T> {
534 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
535 self.0.fmt(f)
536 }
537}
538
539#[cfg_attr(target_family = "wasm", async_trait(?Send))]
540#[cfg_attr(not(target_family = "wasm"), async_trait)]
541impl<T: StateStore> StateStore for EraseStateStoreError<T> {
542 type Error = StoreError;
543
544 async fn get_kv_data(
545 &self,
546 key: StateStoreDataKey<'_>,
547 ) -> Result<Option<StateStoreDataValue>, Self::Error> {
548 self.0.get_kv_data(key).await.map_err(Into::into)
549 }
550
551 async fn set_kv_data(
552 &self,
553 key: StateStoreDataKey<'_>,
554 value: StateStoreDataValue,
555 ) -> Result<(), Self::Error> {
556 self.0.set_kv_data(key, value).await.map_err(Into::into)
557 }
558
559 async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
560 self.0.remove_kv_data(key).await.map_err(Into::into)
561 }
562
563 async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
564 self.0.save_changes(changes).await.map_err(Into::into)
565 }
566
567 async fn get_presence_event(
568 &self,
569 user_id: &UserId,
570 ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
571 self.0.get_presence_event(user_id).await.map_err(Into::into)
572 }
573
574 async fn get_presence_events(
575 &self,
576 user_ids: &[OwnedUserId],
577 ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
578 self.0.get_presence_events(user_ids).await.map_err(Into::into)
579 }
580
581 async fn get_state_event(
582 &self,
583 room_id: &RoomId,
584 event_type: StateEventType,
585 state_key: &str,
586 ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
587 self.0.get_state_event(room_id, event_type, state_key).await.map_err(Into::into)
588 }
589
590 async fn get_state_events(
591 &self,
592 room_id: &RoomId,
593 event_type: StateEventType,
594 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
595 self.0.get_state_events(room_id, event_type).await.map_err(Into::into)
596 }
597
598 async fn get_state_events_for_keys(
599 &self,
600 room_id: &RoomId,
601 event_type: StateEventType,
602 state_keys: &[&str],
603 ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
604 self.0.get_state_events_for_keys(room_id, event_type, state_keys).await.map_err(Into::into)
605 }
606
607 async fn get_profile(
608 &self,
609 room_id: &RoomId,
610 user_id: &UserId,
611 ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
612 self.0.get_profile(room_id, user_id).await.map_err(Into::into)
613 }
614
615 async fn get_profiles<'a>(
616 &self,
617 room_id: &RoomId,
618 user_ids: &'a [OwnedUserId],
619 ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
620 self.0.get_profiles(room_id, user_ids).await.map_err(Into::into)
621 }
622
623 async fn get_user_ids(
624 &self,
625 room_id: &RoomId,
626 memberships: RoomMemberships,
627 ) -> Result<Vec<OwnedUserId>, Self::Error> {
628 self.0.get_user_ids(room_id, memberships).await.map_err(Into::into)
629 }
630
631 async fn get_room_infos(
632 &self,
633 room_load_settings: &RoomLoadSettings,
634 ) -> Result<Vec<RoomInfo>, Self::Error> {
635 self.0.get_room_infos(room_load_settings).await.map_err(Into::into)
636 }
637
638 async fn get_users_with_display_name(
639 &self,
640 room_id: &RoomId,
641 display_name: &DisplayName,
642 ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
643 self.0.get_users_with_display_name(room_id, display_name).await.map_err(Into::into)
644 }
645
646 async fn get_users_with_display_names<'a>(
647 &self,
648 room_id: &RoomId,
649 display_names: &'a [DisplayName],
650 ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
651 self.0.get_users_with_display_names(room_id, display_names).await.map_err(Into::into)
652 }
653
654 async fn get_account_data_event(
655 &self,
656 event_type: GlobalAccountDataEventType,
657 ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
658 self.0.get_account_data_event(event_type).await.map_err(Into::into)
659 }
660
661 async fn get_room_account_data_event(
662 &self,
663 room_id: &RoomId,
664 event_type: RoomAccountDataEventType,
665 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
666 self.0.get_room_account_data_event(room_id, event_type).await.map_err(Into::into)
667 }
668
669 async fn get_user_room_receipt_event(
670 &self,
671 room_id: &RoomId,
672 receipt_type: ReceiptType,
673 thread: ReceiptThread,
674 user_id: &UserId,
675 ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
676 self.0
677 .get_user_room_receipt_event(room_id, receipt_type, thread, user_id)
678 .await
679 .map_err(Into::into)
680 }
681
682 async fn get_event_room_receipt_events(
683 &self,
684 room_id: &RoomId,
685 receipt_type: ReceiptType,
686 thread: ReceiptThread,
687 event_id: &EventId,
688 ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
689 self.0
690 .get_event_room_receipt_events(room_id, receipt_type, thread, event_id)
691 .await
692 .map_err(Into::into)
693 }
694
695 async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
696 self.0.get_custom_value(key).await.map_err(Into::into)
697 }
698
699 async fn set_custom_value(
700 &self,
701 key: &[u8],
702 value: Vec<u8>,
703 ) -> Result<Option<Vec<u8>>, Self::Error> {
704 self.0.set_custom_value(key, value).await.map_err(Into::into)
705 }
706
707 async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
708 self.0.remove_custom_value(key).await.map_err(Into::into)
709 }
710
711 async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
712 self.0.remove_room(room_id).await.map_err(Into::into)
713 }
714
715 async fn save_send_queue_request(
716 &self,
717 room_id: &RoomId,
718 transaction_id: OwnedTransactionId,
719 created_at: MilliSecondsSinceUnixEpoch,
720 content: QueuedRequestKind,
721 priority: usize,
722 ) -> Result<(), Self::Error> {
723 self.0
724 .save_send_queue_request(room_id, transaction_id, created_at, content, priority)
725 .await
726 .map_err(Into::into)
727 }
728
729 async fn update_send_queue_request(
730 &self,
731 room_id: &RoomId,
732 transaction_id: &TransactionId,
733 content: QueuedRequestKind,
734 ) -> Result<bool, Self::Error> {
735 self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
736 }
737
738 async fn remove_send_queue_request(
739 &self,
740 room_id: &RoomId,
741 transaction_id: &TransactionId,
742 ) -> Result<bool, Self::Error> {
743 self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into)
744 }
745
746 async fn load_send_queue_requests(
747 &self,
748 room_id: &RoomId,
749 ) -> Result<Vec<QueuedRequest>, Self::Error> {
750 self.0.load_send_queue_requests(room_id).await.map_err(Into::into)
751 }
752
753 async fn update_send_queue_request_status(
754 &self,
755 room_id: &RoomId,
756 transaction_id: &TransactionId,
757 error: Option<QueueWedgeError>,
758 ) -> Result<(), Self::Error> {
759 self.0
760 .update_send_queue_request_status(room_id, transaction_id, error)
761 .await
762 .map_err(Into::into)
763 }
764
765 async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
766 self.0.load_rooms_with_unsent_requests().await.map_err(Into::into)
767 }
768
769 async fn save_dependent_queued_request(
770 &self,
771 room_id: &RoomId,
772 parent_txn_id: &TransactionId,
773 own_txn_id: ChildTransactionId,
774 created_at: MilliSecondsSinceUnixEpoch,
775 content: DependentQueuedRequestKind,
776 ) -> Result<(), Self::Error> {
777 self.0
778 .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
779 .await
780 .map_err(Into::into)
781 }
782
783 async fn mark_dependent_queued_requests_as_ready(
784 &self,
785 room_id: &RoomId,
786 parent_txn_id: &TransactionId,
787 sent_parent_key: SentRequestKey,
788 ) -> Result<usize, Self::Error> {
789 self.0
790 .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
791 .await
792 .map_err(Into::into)
793 }
794
795 async fn remove_dependent_queued_request(
796 &self,
797 room_id: &RoomId,
798 own_txn_id: &ChildTransactionId,
799 ) -> Result<bool, Self::Error> {
800 self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into)
801 }
802
803 async fn load_dependent_queued_requests(
804 &self,
805 room_id: &RoomId,
806 ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
807 self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
808 }
809
810 async fn update_dependent_queued_request(
811 &self,
812 room_id: &RoomId,
813 own_transaction_id: &ChildTransactionId,
814 new_content: DependentQueuedRequestKind,
815 ) -> Result<bool, Self::Error> {
816 self.0
817 .update_dependent_queued_request(room_id, own_transaction_id, new_content)
818 .await
819 .map_err(Into::into)
820 }
821
822 async fn upsert_thread_subscription(
823 &self,
824 room: &RoomId,
825 thread_id: &EventId,
826 subscription: StoredThreadSubscription,
827 ) -> Result<(), Self::Error> {
828 self.0.upsert_thread_subscription(room, thread_id, subscription).await.map_err(Into::into)
829 }
830
831 async fn load_thread_subscription(
832 &self,
833 room: &RoomId,
834 thread_id: &EventId,
835 ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
836 self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
837 }
838
839 async fn remove_thread_subscription(
840 &self,
841 room: &RoomId,
842 thread_id: &EventId,
843 ) -> Result<(), Self::Error> {
844 self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
845 }
846
847 async fn optimize(&self) -> Result<(), Self::Error> {
848 self.0.optimize().await.map_err(Into::into)
849 }
850
851 async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
852 self.0.get_size().await.map_err(Into::into)
853 }
854}
855
856#[cfg_attr(target_family = "wasm", async_trait(?Send))]
858#[cfg_attr(not(target_family = "wasm"), async_trait)]
859pub trait StateStoreExt: StateStore {
860 async fn get_state_event_static<C>(
866 &self,
867 room_id: &RoomId,
868 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
869 where
870 C: StaticEventContent<IsPrefix = ruma::events::False>
871 + StaticStateEventContent<StateKey = EmptyStateKey>
872 + RedactContent,
873 C::Redacted: RedactedStateEventContent,
874 {
875 Ok(self.get_state_event(room_id, C::TYPE.into(), "").await?.map(|raw| raw.cast()))
876 }
877
878 async fn get_state_event_static_for_key<C, K>(
884 &self,
885 room_id: &RoomId,
886 state_key: &K,
887 ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
888 where
889 C: StaticEventContent<IsPrefix = ruma::events::False>
890 + StaticStateEventContent
891 + RedactContent,
892 C::StateKey: Borrow<K>,
893 C::Redacted: RedactedStateEventContent,
894 K: AsRef<str> + ?Sized + Sync,
895 {
896 Ok(self
897 .get_state_event(room_id, C::TYPE.into(), state_key.as_ref())
898 .await?
899 .map(|raw| raw.cast()))
900 }
901
902 async fn get_state_events_static<C>(
908 &self,
909 room_id: &RoomId,
910 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
911 where
912 C: StaticEventContent<IsPrefix = ruma::events::False>
913 + StaticStateEventContent
914 + RedactContent,
915 C::Redacted: RedactedStateEventContent,
916 {
917 Ok(self
919 .get_state_events(room_id, C::TYPE.into())
920 .await?
921 .into_iter()
922 .map(|raw| raw.cast())
923 .collect())
924 }
925
926 async fn get_state_events_for_keys_static<'a, C, K, I>(
935 &self,
936 room_id: &RoomId,
937 state_keys: I,
938 ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
939 where
940 C: StaticEventContent<IsPrefix = ruma::events::False>
941 + StaticStateEventContent
942 + RedactContent,
943 C::StateKey: Borrow<K>,
944 C::Redacted: RedactedStateEventContent,
945 K: AsRef<str> + Sized + Sync + 'a,
946 I: IntoIterator<Item = &'a K> + Send,
947 I::IntoIter: Send,
948 {
949 Ok(self
950 .get_state_events_for_keys(
951 room_id,
952 C::TYPE.into(),
953 &state_keys.into_iter().map(|k| k.as_ref()).collect::<Vec<_>>(),
954 )
955 .await?
956 .into_iter()
957 .map(|raw| raw.cast())
958 .collect())
959 }
960
961 async fn get_account_data_event_static<C>(
963 &self,
964 ) -> Result<Option<Raw<GlobalAccountDataEvent<C>>>, Self::Error>
965 where
966 C: StaticEventContent<IsPrefix = ruma::events::False> + GlobalAccountDataEventContent,
967 {
968 Ok(self.get_account_data_event(C::TYPE.into()).await?.map(Raw::cast_unchecked))
969 }
970
971 async fn get_room_account_data_event_static<C>(
979 &self,
980 room_id: &RoomId,
981 ) -> Result<Option<Raw<RoomAccountDataEvent<C>>>, Self::Error>
982 where
983 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
984 {
985 Ok(self
986 .get_room_account_data_event(room_id, C::TYPE.into())
987 .await?
988 .map(Raw::cast_unchecked))
989 }
990
991 async fn get_member_event(
999 &self,
1000 room_id: &RoomId,
1001 state_key: &UserId,
1002 ) -> Result<Option<RawMemberEvent>, Self::Error> {
1003 self.get_state_event_static_for_key(room_id, state_key).await
1004 }
1005}
1006
1007#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1008#[cfg_attr(not(target_family = "wasm"), async_trait)]
1009impl<T: StateStore + ?Sized> StateStoreExt for T {}
1010
1011pub type DynStateStore = dyn StateStore<Error = StoreError>;
1013
1014pub trait IntoStateStore {
1020 #[doc(hidden)]
1021 fn into_state_store(self) -> Arc<DynStateStore>;
1022}
1023
1024impl<T> IntoStateStore for T
1025where
1026 T: StateStore + Sized + 'static,
1027{
1028 fn into_state_store(self) -> Arc<DynStateStore> {
1029 Arc::new(EraseStateStoreError(self))
1030 }
1031}
1032
1033impl<T> IntoStateStore for Arc<T>
1036where
1037 T: StateStore + 'static,
1038{
1039 fn into_state_store(self) -> Arc<DynStateStore> {
1040 let ptr: *const T = Arc::into_raw(self);
1041 let ptr_erased = ptr as *const EraseStateStoreError<T>;
1042 unsafe { Arc::from_raw(ptr_erased) }
1045 }
1046}
1047
1048#[derive(Debug, Clone, Serialize, Deserialize)]
1050pub struct TtlStoreValue<T> {
1051 #[serde(flatten)]
1053 data: T,
1054
1055 last_fetch_ts: f64,
1058}
1059
1060impl<T> TtlStoreValue<T> {
1061 pub const STALE_THRESHOLD: f64 = (1000 * 60 * 60 * 24 * 7) as _; pub fn new(data: T) -> Self {
1066 Self { data, last_fetch_ts: now_timestamp_ms() }
1067 }
1068
1069 pub fn into_data(self) -> Option<T> {
1071 if now_timestamp_ms() - self.last_fetch_ts >= Self::STALE_THRESHOLD {
1072 None
1073 } else {
1074 Some(self.data)
1075 }
1076 }
1077}
1078
1079#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1081pub struct SupportedVersionsResponse {
1082 pub versions: Vec<String>,
1084
1085 pub unstable_features: BTreeMap<String, bool>,
1087}
1088
1089impl SupportedVersionsResponse {
1090 pub fn supported_versions(&self) -> SupportedVersions {
1096 SupportedVersions::from_parts(&self.versions, &self.unstable_features)
1097 }
1098}
1099
1100#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1101pub struct WellKnownResponse {
1103 pub homeserver: HomeserverInfo,
1105
1106 pub identity_server: Option<IdentityServerInfo>,
1108
1109 pub tile_server: Option<TileServerInfo>,
1111
1112 pub rtc_foci: Vec<RtcFocusInfo>,
1114}
1115
1116impl From<discover_homeserver::Response> for WellKnownResponse {
1117 fn from(response: discover_homeserver::Response) -> Self {
1118 Self {
1119 homeserver: response.homeserver,
1120 identity_server: response.identity_server,
1121 tile_server: response.tile_server,
1122 rtc_foci: response.rtc_foci,
1123 }
1124 }
1125}
1126
1127fn now_timestamp_ms() -> f64 {
1129 SystemTime::now()
1130 .duration_since(SystemTime::UNIX_EPOCH)
1131 .expect("System clock was before 1970.")
1132 .as_secs_f64()
1133 * 1000.0
1134}
1135
1136#[derive(Debug, Clone)]
1138pub enum StateStoreDataValue {
1139 SyncToken(String),
1141
1142 SupportedVersions(TtlStoreValue<SupportedVersionsResponse>),
1144
1145 WellKnown(TtlStoreValue<Option<WellKnownResponse>>),
1147
1148 Filter(String),
1150
1151 UserAvatarUrl(OwnedMxcUri),
1153
1154 RecentlyVisitedRooms(Vec<OwnedRoomId>),
1156
1157 UtdHookManagerData(GrowableBloom),
1160
1161 OneTimeKeyAlreadyUploaded,
1164
1165 ComposerDraft(ComposerDraft),
1170
1171 SeenKnockRequests(BTreeMap<OwnedEventId, OwnedUserId>),
1173
1174 ThreadSubscriptionsCatchupTokens(Vec<ThreadSubscriptionCatchupToken>),
1179}
1180
1181#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1191pub struct ThreadSubscriptionCatchupToken {
1192 pub from: String,
1198
1199 pub to: Option<String>,
1205}
1206
1207#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1209pub struct ComposerDraft {
1210 pub plain_text: String,
1212 pub html_text: Option<String>,
1215 pub draft_type: ComposerDraftType,
1217 #[serde(default)]
1219 pub attachments: Vec<DraftAttachment>,
1220}
1221
1222#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1224pub struct DraftAttachment {
1225 pub filename: String,
1227 pub content: DraftAttachmentContent,
1229}
1230
1231#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1233#[serde(tag = "type")]
1234pub enum DraftAttachmentContent {
1235 Image {
1237 data: Vec<u8>,
1239 mimetype: Option<String>,
1241 size: Option<u64>,
1243 width: Option<u64>,
1245 height: Option<u64>,
1247 blurhash: Option<String>,
1249 thumbnail: Option<DraftThumbnail>,
1251 },
1252 Video {
1254 data: Vec<u8>,
1256 mimetype: Option<String>,
1258 size: Option<u64>,
1260 width: Option<u64>,
1262 height: Option<u64>,
1264 duration: Option<std::time::Duration>,
1266 blurhash: Option<String>,
1268 thumbnail: Option<DraftThumbnail>,
1270 },
1271 Audio {
1273 data: Vec<u8>,
1275 mimetype: Option<String>,
1277 size: Option<u64>,
1279 duration: Option<std::time::Duration>,
1281 },
1282 File {
1284 data: Vec<u8>,
1286 mimetype: Option<String>,
1288 size: Option<u64>,
1290 },
1291}
1292
1293#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1295pub struct DraftThumbnail {
1296 pub filename: String,
1298 pub data: Vec<u8>,
1300 pub mimetype: Option<String>,
1302 pub width: Option<u64>,
1304 pub height: Option<u64>,
1306 pub size: Option<u64>,
1308}
1309
1310#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1312pub enum ComposerDraftType {
1313 NewMessage,
1315 Reply {
1317 event_id: OwnedEventId,
1319 },
1320 Edit {
1322 event_id: OwnedEventId,
1324 },
1325}
1326
1327impl StateStoreDataValue {
1328 pub fn into_sync_token(self) -> Option<String> {
1330 as_variant!(self, Self::SyncToken)
1331 }
1332
1333 pub fn into_filter(self) -> Option<String> {
1335 as_variant!(self, Self::Filter)
1336 }
1337
1338 pub fn into_user_avatar_url(self) -> Option<OwnedMxcUri> {
1340 as_variant!(self, Self::UserAvatarUrl)
1341 }
1342
1343 pub fn into_recently_visited_rooms(self) -> Option<Vec<OwnedRoomId>> {
1345 as_variant!(self, Self::RecentlyVisitedRooms)
1346 }
1347
1348 pub fn into_utd_hook_manager_data(self) -> Option<GrowableBloom> {
1350 as_variant!(self, Self::UtdHookManagerData)
1351 }
1352
1353 pub fn into_composer_draft(self) -> Option<ComposerDraft> {
1355 as_variant!(self, Self::ComposerDraft)
1356 }
1357
1358 pub fn into_supported_versions(self) -> Option<TtlStoreValue<SupportedVersionsResponse>> {
1360 as_variant!(self, Self::SupportedVersions)
1361 }
1362
1363 pub fn into_well_known(self) -> Option<TtlStoreValue<Option<WellKnownResponse>>> {
1365 as_variant!(self, Self::WellKnown)
1366 }
1367
1368 pub fn into_seen_knock_requests(self) -> Option<BTreeMap<OwnedEventId, OwnedUserId>> {
1370 as_variant!(self, Self::SeenKnockRequests)
1371 }
1372
1373 pub fn into_thread_subscriptions_catchup_tokens(
1376 self,
1377 ) -> Option<Vec<ThreadSubscriptionCatchupToken>> {
1378 as_variant!(self, Self::ThreadSubscriptionsCatchupTokens)
1379 }
1380}
1381
1382#[derive(Debug, Clone, Copy)]
1384pub enum StateStoreDataKey<'a> {
1385 SyncToken,
1387
1388 SupportedVersions,
1390
1391 WellKnown,
1393
1394 Filter(&'a str),
1396
1397 UserAvatarUrl(&'a UserId),
1399
1400 RecentlyVisitedRooms(&'a UserId),
1402
1403 UtdHookManagerData,
1406
1407 OneTimeKeyAlreadyUploaded,
1410
1411 ComposerDraft(&'a RoomId, Option<&'a EventId>),
1416
1417 SeenKnockRequests(&'a RoomId),
1419
1420 ThreadSubscriptionsCatchupTokens,
1422}
1423
1424impl StateStoreDataKey<'_> {
1425 pub const SYNC_TOKEN: &'static str = "sync_token";
1427
1428 pub const SUPPORTED_VERSIONS: &'static str = "server_capabilities"; pub const WELL_KNOWN: &'static str = "well_known";
1435
1436 pub const FILTER: &'static str = "filter";
1438
1439 pub const USER_AVATAR_URL: &'static str = "user_avatar_url";
1442
1443 pub const RECENTLY_VISITED_ROOMS: &'static str = "recently_visited_rooms";
1446
1447 pub const UTD_HOOK_MANAGER_DATA: &'static str = "utd_hook_manager_data";
1450
1451 pub const ONE_TIME_KEY_ALREADY_UPLOADED: &'static str = "one_time_key_already_uploaded";
1454
1455 pub const COMPOSER_DRAFT: &'static str = "composer_draft";
1458
1459 pub const SEEN_KNOCK_REQUESTS: &'static str = "seen_knock_requests";
1462
1463 pub const THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS: &'static str =
1466 "thread_subscriptions_catchup_tokens";
1467}
1468
1469pub fn compare_thread_subscription_bump_stamps(
1478 previous: Option<u64>,
1479 new: &mut Option<u64>,
1480) -> bool {
1481 match (previous, &new) {
1482 (Some(prev_bump), None) => {
1485 *new = Some(prev_bump);
1486 }
1487
1488 (Some(prev_bump), Some(new_bump)) if *new_bump <= prev_bump => {
1490 return false;
1491 }
1492
1493 _ => {}
1495 }
1496
1497 true
1498}
1499
1500#[cfg(test)]
1501mod tests {
1502 use serde_json::json;
1503
1504 use super::{SupportedVersionsResponse, TtlStoreValue, now_timestamp_ms};
1505
1506 #[test]
1507 fn test_stale_ttl_store_value() {
1508 let ttl_value = TtlStoreValue {
1510 data: (),
1511 last_fetch_ts: now_timestamp_ms() - TtlStoreValue::<()>::STALE_THRESHOLD - 1.0,
1512 };
1513 assert!(ttl_value.into_data().is_none());
1514
1515 let ttl_value = TtlStoreValue::new(());
1517 assert!(ttl_value.into_data().is_some());
1518 }
1519
1520 #[test]
1521 fn test_stale_ttl_store_value_serialize_roundtrip() {
1522 let server_info = SupportedVersionsResponse {
1523 versions: vec!["1.2".to_owned(), "1.3".to_owned(), "1.4".to_owned()],
1524 unstable_features: [("org.matrix.msc3916.stable".to_owned(), true)].into(),
1525 };
1526 let ttl_value = TtlStoreValue { data: server_info.clone(), last_fetch_ts: 1000.0 };
1527 let json = json!({
1528 "versions": ["1.2", "1.3", "1.4"],
1529 "unstable_features": {
1530 "org.matrix.msc3916.stable": true,
1531 },
1532 "last_fetch_ts": 1000.0,
1533 });
1534
1535 assert_eq!(serde_json::to_value(&ttl_value).unwrap(), json);
1536
1537 let deserialized =
1538 serde_json::from_value::<TtlStoreValue<SupportedVersionsResponse>>(json).unwrap();
1539 assert_eq!(deserialized.data, server_info);
1540 assert!(deserialized.last_fetch_ts - ttl_value.last_fetch_ts < 0.0001);
1541 }
1542}