Skip to main content

matrix_sdk_base/store/
traits.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    borrow::Borrow,
17    collections::{BTreeMap, BTreeSet, HashMap},
18    fmt,
19    ops::Deref,
20    sync::Arc,
21};
22
23use as_variant::as_variant;
24use async_trait::async_trait;
25use growable_bloom_filter::GrowableBloom;
26use matrix_sdk_common::{AsyncTraitDeps, ttl::TtlValue};
27use ruma::{
28    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
29    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
30    api::{
31        MatrixVersion, SupportedVersions,
32        client::discovery::{
33            discover_homeserver::{
34                self, HomeserverInfo, IdentityServerInfo, RtcFocusInfo, TileServerInfo,
35            },
36            get_capabilities::v3::Capabilities,
37        },
38    },
39    events::{
40        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, EmptyStateKey, GlobalAccountDataEvent,
41        GlobalAccountDataEventContent, GlobalAccountDataEventType, RedactContent,
42        RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
43        RoomAccountDataEventType, StateEventType, StaticEventContent, StaticStateEventContent,
44        presence::PresenceEvent,
45        receipt::{Receipt, ReceiptThread, ReceiptType},
46    },
47    serde::Raw,
48};
49use serde::{Deserialize, Serialize};
50use thiserror::Error;
51use tokio::sync::{Mutex, MutexGuard};
52
53use super::{
54    ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
55    QueuedRequest, QueuedRequestKind, RoomLoadSettings, StateChanges, StoreError,
56    send_queue::SentRequestKey,
57};
58use crate::{
59    MinimalRoomMemberEvent, RoomInfo, RoomMemberships,
60    deserialized_responses::{
61        DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
62    },
63    store::StoredThreadSubscription,
64};
65
66/// An abstract state store trait that can be used to implement different stores
67/// for the SDK.
68#[cfg_attr(target_family = "wasm", async_trait(?Send))]
69#[cfg_attr(not(target_family = "wasm"), async_trait)]
70pub trait StateStore: AsyncTraitDeps {
71    /// The error type used by this state store.
72    type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error>;
73
74    /// Get key-value data from the store.
75    ///
76    /// # Arguments
77    ///
78    /// * `key` - The key to fetch data for.
79    async fn get_kv_data(
80        &self,
81        key: StateStoreDataKey<'_>,
82    ) -> Result<Option<StateStoreDataValue>, Self::Error>;
83
84    /// Put key-value data into the store.
85    ///
86    /// # Arguments
87    ///
88    /// * `key` - The key to identify the data in the store.
89    ///
90    /// * `value` - The data to insert.
91    ///
92    /// Panics if the key and value variants do not match.
93    async fn set_kv_data(
94        &self,
95        key: StateStoreDataKey<'_>,
96        value: StateStoreDataValue,
97    ) -> Result<(), Self::Error>;
98
99    /// Remove key-value data from the store.
100    ///
101    /// # Arguments
102    ///
103    /// * `key` - The key to remove the data for.
104    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error>;
105
106    /// Save the set of state changes in the store.
107    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error>;
108
109    /// Get the stored presence event for the given user.
110    ///
111    /// # Arguments
112    ///
113    /// * `user_id` - The id of the user for which we wish to fetch the presence
114    /// event for.
115    async fn get_presence_event(
116        &self,
117        user_id: &UserId,
118    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;
119
120    /// Get the stored presence events for the given users.
121    ///
122    /// # Arguments
123    ///
124    /// * `user_ids` - The IDs of the users to fetch the presence events for.
125    async fn get_presence_events(
126        &self,
127        user_ids: &[OwnedUserId],
128    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;
129
130    /// Get a state event out of the state store.
131    ///
132    /// # Arguments
133    ///
134    /// * `room_id` - The id of the room the state event was received for.
135    ///
136    /// * `event_type` - The event type of the state event.
137    async fn get_state_event(
138        &self,
139        room_id: &RoomId,
140        event_type: StateEventType,
141        state_key: &str,
142    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error>;
143
144    /// Get a list of state events for a given room and `StateEventType`.
145    ///
146    /// # Arguments
147    ///
148    /// * `room_id` - The id of the room to find events for.
149    ///
150    /// * `event_type` - The event type.
151    async fn get_state_events(
152        &self,
153        room_id: &RoomId,
154        event_type: StateEventType,
155    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
156
157    /// Get a list of state events for a given room, `StateEventType`, and the
158    /// given state keys.
159    ///
160    /// # Arguments
161    ///
162    /// * `room_id` - The id of the room to find events for.
163    ///
164    /// * `event_type` - The event type.
165    ///
166    /// * `state_keys` - The list of state keys to find.
167    async fn get_state_events_for_keys(
168        &self,
169        room_id: &RoomId,
170        event_type: StateEventType,
171        state_keys: &[&str],
172    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
173
174    /// Get the current profile for the given user in the given room.
175    ///
176    /// # Arguments
177    ///
178    /// * `room_id` - The room id the profile is used in.
179    ///
180    /// * `user_id` - The id of the user the profile belongs to.
181    async fn get_profile(
182        &self,
183        room_id: &RoomId,
184        user_id: &UserId,
185    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error>;
186
187    /// Get the current profiles for the given users in the given room.
188    ///
189    /// # Arguments
190    ///
191    /// * `room_id` - The ID of the room the profiles are used in.
192    ///
193    /// * `user_ids` - The IDs of the users the profiles belong to.
194    async fn get_profiles<'a>(
195        &self,
196        room_id: &RoomId,
197        user_ids: &'a [OwnedUserId],
198    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error>;
199
200    /// Get the user ids of members for a given room with the given memberships,
201    /// for stripped and regular rooms alike.
202    async fn get_user_ids(
203        &self,
204        room_id: &RoomId,
205        memberships: RoomMemberships,
206    ) -> Result<Vec<OwnedUserId>, Self::Error>;
207
208    /// Get a set of pure `RoomInfo`s the store knows about.
209    async fn get_room_infos(
210        &self,
211        room_load_settings: &RoomLoadSettings,
212    ) -> Result<Vec<RoomInfo>, Self::Error>;
213
214    /// Get all the users that use the given display name in the given room.
215    ///
216    /// # Arguments
217    ///
218    /// * `room_id` - The id of the room for which the display name users should
219    /// be fetched for.
220    ///
221    /// * `display_name` - The display name that the users use.
222    async fn get_users_with_display_name(
223        &self,
224        room_id: &RoomId,
225        display_name: &DisplayName,
226    ) -> Result<BTreeSet<OwnedUserId>, Self::Error>;
227
228    /// Get all the users that use the given display names in the given room.
229    ///
230    /// # Arguments
231    ///
232    /// * `room_id` - The ID of the room to fetch the display names for.
233    ///
234    /// * `display_names` - The display names that the users use.
235    async fn get_users_with_display_names<'a>(
236        &self,
237        room_id: &RoomId,
238        display_names: &'a [DisplayName],
239    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error>;
240
241    /// Get an event out of the account data store.
242    ///
243    /// # Arguments
244    ///
245    /// * `event_type` - The event type of the account data event.
246    async fn get_account_data_event(
247        &self,
248        event_type: GlobalAccountDataEventType,
249    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error>;
250
251    /// Get an event out of the room account data store.
252    ///
253    /// # Arguments
254    ///
255    /// * `room_id` - The id of the room for which the room account data event
256    ///   should
257    /// be fetched.
258    ///
259    /// * `event_type` - The event type of the room account data event.
260    async fn get_room_account_data_event(
261        &self,
262        room_id: &RoomId,
263        event_type: RoomAccountDataEventType,
264    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error>;
265
266    /// Get a user's read receipt for a given room and receipt type and thread.
267    ///
268    /// # Arguments
269    ///
270    /// * `room_id` - The id of the room for which the receipt should be
271    ///   fetched.
272    ///
273    /// * `receipt_type` - The type of the receipt.
274    ///
275    /// * `thread` - The thread containing this receipt.
276    ///
277    /// * `user_id` - The id of the user for whom the receipt should be fetched.
278    async fn get_user_room_receipt_event(
279        &self,
280        room_id: &RoomId,
281        receipt_type: ReceiptType,
282        thread: ReceiptThread,
283        user_id: &UserId,
284    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error>;
285
286    /// Get an event's read receipts for a given room, receipt type, and thread.
287    ///
288    /// # Arguments
289    ///
290    /// * `room_id` - The id of the room for which the receipts should be
291    ///   fetched.
292    ///
293    /// * `receipt_type` - The type of the receipts.
294    ///
295    /// * `thread` - The thread containing this receipt.
296    ///
297    /// * `event_id` - The id of the event for which the receipts should be
298    ///   fetched.
299    async fn get_event_room_receipt_events(
300        &self,
301        room_id: &RoomId,
302        receipt_type: ReceiptType,
303        thread: ReceiptThread,
304        event_id: &EventId,
305    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error>;
306
307    /// Get arbitrary data from the custom store
308    ///
309    /// # Arguments
310    ///
311    /// * `key` - The key to fetch data for
312    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
313
314    /// Put arbitrary data into the custom store, return the data previously
315    /// stored
316    ///
317    /// # Arguments
318    ///
319    /// * `key` - The key to insert data into
320    ///
321    /// * `value` - The value to insert
322    async fn set_custom_value(
323        &self,
324        key: &[u8],
325        value: Vec<u8>,
326    ) -> Result<Option<Vec<u8>>, Self::Error>;
327
328    /// Put arbitrary data into the custom store, do not attempt to read any
329    /// previous data
330    ///
331    /// Optimization option for set_custom_values for stores that would perform
332    /// better withouts the extra read and the caller not needing that data
333    /// returned. Otherwise this just wraps around `set_custom_data` and
334    /// discards the result.
335    ///
336    /// # Arguments
337    ///
338    /// * `key` - The key to insert data into
339    ///
340    /// * `value` - The value to insert
341    async fn set_custom_value_no_read(
342        &self,
343        key: &[u8],
344        value: Vec<u8>,
345    ) -> Result<(), Self::Error> {
346        self.set_custom_value(key, value).await.map(|_| ())
347    }
348
349    /// Remove arbitrary data from the custom store and return it if existed
350    ///
351    /// # Arguments
352    ///
353    /// * `key` - The key to remove data from
354    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
355
356    /// Remove a room and all elements associated from the state store.
357    ///
358    /// # Arguments
359    ///
360    /// * `room_id` - The `RoomId` of the room to delete.
361    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>;
362
363    /// Save a request to be sent by a send queue later (e.g. sending an event).
364    ///
365    /// # Arguments
366    ///
367    /// * `room_id` - The `RoomId` of the send queue's room.
368    /// * `transaction_id` - The unique key identifying the event to be sent
369    ///   (and its transaction). Note: this is expected to be randomly generated
370    ///   and thus unique.
371    /// * `content` - Serializable event content to be sent.
372    async fn save_send_queue_request(
373        &self,
374        room_id: &RoomId,
375        transaction_id: OwnedTransactionId,
376        created_at: MilliSecondsSinceUnixEpoch,
377        request: QueuedRequestKind,
378        priority: usize,
379    ) -> Result<(), Self::Error>;
380
381    /// Updates a send queue request with the given content, and resets its
382    /// error status.
383    ///
384    /// # Arguments
385    ///
386    /// * `room_id` - The `RoomId` of the send queue's room.
387    /// * `transaction_id` - The unique key identifying the request to be sent
388    ///   (and its transaction).
389    /// * `content` - Serializable event content to replace the original one.
390    ///
391    /// Returns true if a request has been updated, or false otherwise.
392    async fn update_send_queue_request(
393        &self,
394        room_id: &RoomId,
395        transaction_id: &TransactionId,
396        content: QueuedRequestKind,
397    ) -> Result<bool, Self::Error>;
398
399    /// Remove a request previously inserted with
400    /// [`Self::save_send_queue_request`] from the database, based on its
401    /// transaction id.
402    ///
403    /// Returns true if something has been removed, or false otherwise.
404    async fn remove_send_queue_request(
405        &self,
406        room_id: &RoomId,
407        transaction_id: &TransactionId,
408    ) -> Result<bool, Self::Error>;
409
410    /// Loads all the send queue requests for the given room.
411    ///
412    /// The resulting vector of queued requests should be ordered from higher
413    /// priority to lower priority, and respect the insertion order when
414    /// priorities are equal.
415    async fn load_send_queue_requests(
416        &self,
417        room_id: &RoomId,
418    ) -> Result<Vec<QueuedRequest>, Self::Error>;
419
420    /// Updates the send queue error status (wedge) for a given send queue
421    /// request.
422    async fn update_send_queue_request_status(
423        &self,
424        room_id: &RoomId,
425        transaction_id: &TransactionId,
426        error: Option<QueueWedgeError>,
427    ) -> Result<(), Self::Error>;
428
429    /// Loads all the rooms which have any pending requests in their send queue.
430    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
431
432    /// Add a new entry to the list of dependent send queue requests for a
433    /// parent request.
434    async fn save_dependent_queued_request(
435        &self,
436        room_id: &RoomId,
437        parent_txn_id: &TransactionId,
438        own_txn_id: ChildTransactionId,
439        created_at: MilliSecondsSinceUnixEpoch,
440        content: DependentQueuedRequestKind,
441    ) -> Result<(), Self::Error>;
442
443    /// Mark a set of dependent send queue requests as ready, using a key
444    /// identifying the homeserver's response.
445    ///
446    /// âš  Beware! There's no verification applied that the parent key type is
447    /// compatible with the dependent event type. The invalid state may be
448    /// lazily filtered out in `load_dependent_queued_requests`.
449    ///
450    /// Returns the number of updated requests.
451    async fn mark_dependent_queued_requests_as_ready(
452        &self,
453        room_id: &RoomId,
454        parent_txn_id: &TransactionId,
455        sent_parent_key: SentRequestKey,
456    ) -> Result<usize, Self::Error>;
457
458    /// Update a dependent send queue request with the new content.
459    ///
460    /// Returns true if the request was found and could be updated.
461    async fn update_dependent_queued_request(
462        &self,
463        room_id: &RoomId,
464        own_transaction_id: &ChildTransactionId,
465        new_content: DependentQueuedRequestKind,
466    ) -> Result<bool, Self::Error>;
467
468    /// Remove a specific dependent send queue request by id.
469    ///
470    /// Returns true if the dependent send queue request has been indeed
471    /// removed.
472    async fn remove_dependent_queued_request(
473        &self,
474        room: &RoomId,
475        own_txn_id: &ChildTransactionId,
476    ) -> Result<bool, Self::Error>;
477
478    /// List all the dependent send queue requests.
479    ///
480    /// This returns absolutely all the dependent send queue requests, whether
481    /// they have a parent event id or not. As a contract for implementors, they
482    /// must be returned in insertion order.
483    async fn load_dependent_queued_requests(
484        &self,
485        room: &RoomId,
486    ) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
487
488    /// Inserts or updates multiple thread subscriptions.
489    ///
490    /// If the new thread subscription hasn't set a bumpstamp, and there was a
491    /// previous subscription in the database with a bumpstamp, the existing
492    /// bumpstamp is kept.
493    ///
494    /// If the new thread subscription has a bumpstamp that's lower than or
495    /// equal to a previous one, the existing subscription is kept, i.e.
496    /// this method must have no effect.
497    async fn upsert_thread_subscriptions(
498        &self,
499        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
500    ) -> Result<(), Self::Error>;
501
502    /// Remove a previous thread subscription for a given room and thread.
503    ///
504    /// Note: removing an unknown thread subscription is a no-op.
505    async fn remove_thread_subscription(
506        &self,
507        room: &RoomId,
508        thread_id: &EventId,
509    ) -> Result<(), Self::Error>;
510
511    /// Loads the current thread subscription for a given room and thread.
512    ///
513    /// Returns `None` if there was no entry for the given room/thread pair.
514    async fn load_thread_subscription(
515        &self,
516        room: &RoomId,
517        thread_id: &EventId,
518    ) -> Result<Option<StoredThreadSubscription>, Self::Error>;
519
520    /// Perform database optimizations if any are available, i.e. vacuuming in
521    /// SQLite.
522    ///
523    /// /// **Warning:** this was added to check if SQLite fragmentation was the
524    /// source of performance issues, **DO NOT use in production**.
525    #[doc(hidden)]
526    async fn optimize(&self) -> Result<(), Self::Error>;
527
528    /// Returns the size of the store in bytes, if known.
529    async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
530}
531
532#[cfg_attr(target_family = "wasm", async_trait(?Send))]
533#[cfg_attr(not(target_family = "wasm"), async_trait)]
534impl<T: StateStore> StateStore for &T {
535    type Error = T::Error;
536
537    async fn get_kv_data(
538        &self,
539        key: StateStoreDataKey<'_>,
540    ) -> Result<Option<StateStoreDataValue>, Self::Error> {
541        (*self).get_kv_data(key).await
542    }
543
544    async fn set_kv_data(
545        &self,
546        key: StateStoreDataKey<'_>,
547        value: StateStoreDataValue,
548    ) -> Result<(), Self::Error> {
549        (*self).set_kv_data(key, value).await
550    }
551
552    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
553        (*self).remove_kv_data(key).await
554    }
555
556    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
557        (*self).save_changes(changes).await
558    }
559
560    async fn get_presence_event(
561        &self,
562        user_id: &UserId,
563    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
564        (*self).get_presence_event(user_id).await
565    }
566
567    async fn get_presence_events(
568        &self,
569        user_ids: &[OwnedUserId],
570    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
571        (*self).get_presence_events(user_ids).await
572    }
573
574    async fn get_state_event(
575        &self,
576        room_id: &RoomId,
577        event_type: StateEventType,
578        state_key: &str,
579    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
580        (*self).get_state_event(room_id, event_type, state_key).await
581    }
582
583    async fn get_state_events(
584        &self,
585        room_id: &RoomId,
586        event_type: StateEventType,
587    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
588        (*self).get_state_events(room_id, event_type).await
589    }
590
591    async fn get_state_events_for_keys(
592        &self,
593        room_id: &RoomId,
594        event_type: StateEventType,
595        state_keys: &[&str],
596    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
597        (*self).get_state_events_for_keys(room_id, event_type, state_keys).await
598    }
599
600    async fn get_profile(
601        &self,
602        room_id: &RoomId,
603        user_id: &UserId,
604    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
605        (*self).get_profile(room_id, user_id).await
606    }
607
608    async fn get_profiles<'a>(
609        &self,
610        room_id: &RoomId,
611        user_ids: &'a [OwnedUserId],
612    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
613        (*self).get_profiles(room_id, user_ids).await
614    }
615
616    async fn get_user_ids(
617        &self,
618        room_id: &RoomId,
619        memberships: RoomMemberships,
620    ) -> Result<Vec<OwnedUserId>, Self::Error> {
621        (*self).get_user_ids(room_id, memberships).await
622    }
623
624    async fn get_room_infos(
625        &self,
626        room_load_settings: &RoomLoadSettings,
627    ) -> Result<Vec<RoomInfo>, Self::Error> {
628        (*self).get_room_infos(room_load_settings).await
629    }
630
631    async fn get_users_with_display_name(
632        &self,
633        room_id: &RoomId,
634        display_name: &DisplayName,
635    ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
636        (*self).get_users_with_display_name(room_id, display_name).await
637    }
638
639    async fn get_users_with_display_names<'a>(
640        &self,
641        room_id: &RoomId,
642        display_names: &'a [DisplayName],
643    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
644        (*self).get_users_with_display_names(room_id, display_names).await
645    }
646
647    async fn get_account_data_event(
648        &self,
649        event_type: GlobalAccountDataEventType,
650    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
651        (*self).get_account_data_event(event_type).await
652    }
653
654    async fn get_room_account_data_event(
655        &self,
656        room_id: &RoomId,
657        event_type: RoomAccountDataEventType,
658    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
659        (*self).get_room_account_data_event(room_id, event_type).await
660    }
661
662    async fn get_user_room_receipt_event(
663        &self,
664        room_id: &RoomId,
665        receipt_type: ReceiptType,
666        thread: ReceiptThread,
667        user_id: &UserId,
668    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
669        (*self).get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
670    }
671
672    async fn get_event_room_receipt_events(
673        &self,
674        room_id: &RoomId,
675        receipt_type: ReceiptType,
676        thread: ReceiptThread,
677        event_id: &EventId,
678    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
679        (*self).get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
680    }
681
682    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
683        (*self).get_custom_value(key).await
684    }
685
686    async fn set_custom_value(
687        &self,
688        key: &[u8],
689        value: Vec<u8>,
690    ) -> Result<Option<Vec<u8>>, Self::Error> {
691        (*self).set_custom_value(key, value).await
692    }
693
694    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
695        (*self).remove_custom_value(key).await
696    }
697
698    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
699        (*self).remove_room(room_id).await
700    }
701
702    async fn save_send_queue_request(
703        &self,
704        room_id: &RoomId,
705        transaction_id: OwnedTransactionId,
706        created_at: MilliSecondsSinceUnixEpoch,
707        request: QueuedRequestKind,
708        priority: usize,
709    ) -> Result<(), Self::Error> {
710        (*self)
711            .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
712            .await
713    }
714
715    async fn update_send_queue_request(
716        &self,
717        room_id: &RoomId,
718        transaction_id: &TransactionId,
719        content: QueuedRequestKind,
720    ) -> Result<bool, Self::Error> {
721        (*self).update_send_queue_request(room_id, transaction_id, content).await
722    }
723
724    async fn remove_send_queue_request(
725        &self,
726        room_id: &RoomId,
727        transaction_id: &TransactionId,
728    ) -> Result<bool, Self::Error> {
729        (*self).remove_send_queue_request(room_id, transaction_id).await
730    }
731
732    async fn load_send_queue_requests(
733        &self,
734        room_id: &RoomId,
735    ) -> Result<Vec<QueuedRequest>, Self::Error> {
736        (*self).load_send_queue_requests(room_id).await
737    }
738
739    async fn update_send_queue_request_status(
740        &self,
741        room_id: &RoomId,
742        transaction_id: &TransactionId,
743        error: Option<QueueWedgeError>,
744    ) -> Result<(), Self::Error> {
745        (*self).update_send_queue_request_status(room_id, transaction_id, error).await
746    }
747
748    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
749        (*self).load_rooms_with_unsent_requests().await
750    }
751
752    async fn save_dependent_queued_request(
753        &self,
754        room_id: &RoomId,
755        parent_txn_id: &TransactionId,
756        own_txn_id: ChildTransactionId,
757        created_at: MilliSecondsSinceUnixEpoch,
758        content: DependentQueuedRequestKind,
759    ) -> Result<(), Self::Error> {
760        (*self)
761            .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
762            .await
763    }
764
765    async fn mark_dependent_queued_requests_as_ready(
766        &self,
767        room_id: &RoomId,
768        parent_txn_id: &TransactionId,
769        sent_parent_key: SentRequestKey,
770    ) -> Result<usize, Self::Error> {
771        (*self)
772            .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
773            .await
774    }
775
776    async fn update_dependent_queued_request(
777        &self,
778        room_id: &RoomId,
779        own_transaction_id: &ChildTransactionId,
780        new_content: DependentQueuedRequestKind,
781    ) -> Result<bool, Self::Error> {
782        (*self).update_dependent_queued_request(room_id, own_transaction_id, new_content).await
783    }
784
785    async fn remove_dependent_queued_request(
786        &self,
787        room: &RoomId,
788        own_txn_id: &ChildTransactionId,
789    ) -> Result<bool, Self::Error> {
790        (*self).remove_dependent_queued_request(room, own_txn_id).await
791    }
792
793    async fn load_dependent_queued_requests(
794        &self,
795        room: &RoomId,
796    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
797        (*self).load_dependent_queued_requests(room).await
798    }
799
800    async fn upsert_thread_subscriptions(
801        &self,
802        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
803    ) -> Result<(), Self::Error> {
804        (*self).upsert_thread_subscriptions(updates).await
805    }
806
807    async fn remove_thread_subscription(
808        &self,
809        room: &RoomId,
810        thread_id: &EventId,
811    ) -> Result<(), Self::Error> {
812        (*self).remove_thread_subscription(room, thread_id).await
813    }
814
815    async fn load_thread_subscription(
816        &self,
817        room: &RoomId,
818        thread_id: &EventId,
819    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
820        (*self).load_thread_subscription(room, thread_id).await
821    }
822
823    async fn optimize(&self) -> Result<(), Self::Error> {
824        (*self).optimize().await
825    }
826
827    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
828        (*self).get_size().await
829    }
830}
831
832#[cfg_attr(target_family = "wasm", async_trait(?Send))]
833#[cfg_attr(not(target_family = "wasm"), async_trait)]
834impl<T: StateStore + ?Sized> StateStore for Arc<T> {
835    type Error = T::Error;
836
837    async fn get_kv_data(
838        &self,
839        key: StateStoreDataKey<'_>,
840    ) -> Result<Option<StateStoreDataValue>, Self::Error> {
841        self.deref().get_kv_data(key).await
842    }
843
844    async fn set_kv_data(
845        &self,
846        key: StateStoreDataKey<'_>,
847        value: StateStoreDataValue,
848    ) -> Result<(), Self::Error> {
849        self.deref().set_kv_data(key, value).await
850    }
851
852    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
853        self.deref().remove_kv_data(key).await
854    }
855
856    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
857        self.deref().save_changes(changes).await
858    }
859
860    async fn get_presence_event(
861        &self,
862        user_id: &UserId,
863    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
864        self.deref().get_presence_event(user_id).await
865    }
866
867    async fn get_presence_events(
868        &self,
869        user_ids: &[OwnedUserId],
870    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
871        self.deref().get_presence_events(user_ids).await
872    }
873
874    async fn get_state_event(
875        &self,
876        room_id: &RoomId,
877        event_type: StateEventType,
878        state_key: &str,
879    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
880        self.deref().get_state_event(room_id, event_type, state_key).await
881    }
882
883    async fn get_state_events(
884        &self,
885        room_id: &RoomId,
886        event_type: StateEventType,
887    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
888        self.deref().get_state_events(room_id, event_type).await
889    }
890
891    async fn get_state_events_for_keys(
892        &self,
893        room_id: &RoomId,
894        event_type: StateEventType,
895        state_keys: &[&str],
896    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
897        self.deref().get_state_events_for_keys(room_id, event_type, state_keys).await
898    }
899
900    async fn get_profile(
901        &self,
902        room_id: &RoomId,
903        user_id: &UserId,
904    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
905        self.deref().get_profile(room_id, user_id).await
906    }
907
908    async fn get_profiles<'a>(
909        &self,
910        room_id: &RoomId,
911        user_ids: &'a [OwnedUserId],
912    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
913        self.deref().get_profiles(room_id, user_ids).await
914    }
915
916    async fn get_user_ids(
917        &self,
918        room_id: &RoomId,
919        memberships: RoomMemberships,
920    ) -> Result<Vec<OwnedUserId>, Self::Error> {
921        self.deref().get_user_ids(room_id, memberships).await
922    }
923
924    async fn get_room_infos(
925        &self,
926        room_load_settings: &RoomLoadSettings,
927    ) -> Result<Vec<RoomInfo>, Self::Error> {
928        self.deref().get_room_infos(room_load_settings).await
929    }
930
931    async fn get_users_with_display_name(
932        &self,
933        room_id: &RoomId,
934        display_name: &DisplayName,
935    ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
936        self.deref().get_users_with_display_name(room_id, display_name).await
937    }
938
939    async fn get_users_with_display_names<'a>(
940        &self,
941        room_id: &RoomId,
942        display_names: &'a [DisplayName],
943    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
944        self.deref().get_users_with_display_names(room_id, display_names).await
945    }
946
947    async fn get_account_data_event(
948        &self,
949        event_type: GlobalAccountDataEventType,
950    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
951        self.deref().get_account_data_event(event_type).await
952    }
953
954    async fn get_room_account_data_event(
955        &self,
956        room_id: &RoomId,
957        event_type: RoomAccountDataEventType,
958    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
959        self.deref().get_room_account_data_event(room_id, event_type).await
960    }
961
962    async fn get_user_room_receipt_event(
963        &self,
964        room_id: &RoomId,
965        receipt_type: ReceiptType,
966        thread: ReceiptThread,
967        user_id: &UserId,
968    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
969        self.deref().get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
970    }
971
972    async fn get_event_room_receipt_events(
973        &self,
974        room_id: &RoomId,
975        receipt_type: ReceiptType,
976        thread: ReceiptThread,
977        event_id: &EventId,
978    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
979        self.deref().get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
980    }
981
982    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
983        self.deref().get_custom_value(key).await
984    }
985
986    async fn set_custom_value(
987        &self,
988        key: &[u8],
989        value: Vec<u8>,
990    ) -> Result<Option<Vec<u8>>, Self::Error> {
991        self.deref().set_custom_value(key, value).await
992    }
993
994    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
995        self.deref().remove_custom_value(key).await
996    }
997
998    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
999        self.deref().remove_room(room_id).await
1000    }
1001
1002    async fn save_send_queue_request(
1003        &self,
1004        room_id: &RoomId,
1005        transaction_id: OwnedTransactionId,
1006        created_at: MilliSecondsSinceUnixEpoch,
1007        request: QueuedRequestKind,
1008        priority: usize,
1009    ) -> Result<(), Self::Error> {
1010        self.deref()
1011            .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
1012            .await
1013    }
1014
1015    async fn update_send_queue_request(
1016        &self,
1017        room_id: &RoomId,
1018        transaction_id: &TransactionId,
1019        content: QueuedRequestKind,
1020    ) -> Result<bool, Self::Error> {
1021        self.deref().update_send_queue_request(room_id, transaction_id, content).await
1022    }
1023
1024    async fn remove_send_queue_request(
1025        &self,
1026        room_id: &RoomId,
1027        transaction_id: &TransactionId,
1028    ) -> Result<bool, Self::Error> {
1029        self.deref().remove_send_queue_request(room_id, transaction_id).await
1030    }
1031
1032    async fn load_send_queue_requests(
1033        &self,
1034        room_id: &RoomId,
1035    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1036        self.deref().load_send_queue_requests(room_id).await
1037    }
1038
1039    async fn update_send_queue_request_status(
1040        &self,
1041        room_id: &RoomId,
1042        transaction_id: &TransactionId,
1043        error: Option<QueueWedgeError>,
1044    ) -> Result<(), Self::Error> {
1045        self.deref().update_send_queue_request_status(room_id, transaction_id, error).await
1046    }
1047
1048    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1049        self.deref().load_rooms_with_unsent_requests().await
1050    }
1051
1052    async fn save_dependent_queued_request(
1053        &self,
1054        room_id: &RoomId,
1055        parent_txn_id: &TransactionId,
1056        own_txn_id: ChildTransactionId,
1057        created_at: MilliSecondsSinceUnixEpoch,
1058        content: DependentQueuedRequestKind,
1059    ) -> Result<(), Self::Error> {
1060        self.deref()
1061            .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1062            .await
1063    }
1064
1065    async fn mark_dependent_queued_requests_as_ready(
1066        &self,
1067        room_id: &RoomId,
1068        parent_txn_id: &TransactionId,
1069        sent_parent_key: SentRequestKey,
1070    ) -> Result<usize, Self::Error> {
1071        self.deref()
1072            .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1073            .await
1074    }
1075
1076    async fn update_dependent_queued_request(
1077        &self,
1078        room_id: &RoomId,
1079        own_transaction_id: &ChildTransactionId,
1080        new_content: DependentQueuedRequestKind,
1081    ) -> Result<bool, Self::Error> {
1082        self.deref().update_dependent_queued_request(room_id, own_transaction_id, new_content).await
1083    }
1084
1085    async fn remove_dependent_queued_request(
1086        &self,
1087        room: &RoomId,
1088        own_txn_id: &ChildTransactionId,
1089    ) -> Result<bool, Self::Error> {
1090        self.deref().remove_dependent_queued_request(room, own_txn_id).await
1091    }
1092
1093    async fn load_dependent_queued_requests(
1094        &self,
1095        room: &RoomId,
1096    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1097        self.deref().load_dependent_queued_requests(room).await
1098    }
1099
1100    async fn upsert_thread_subscriptions(
1101        &self,
1102        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1103    ) -> Result<(), Self::Error> {
1104        self.deref().upsert_thread_subscriptions(updates).await
1105    }
1106
1107    async fn remove_thread_subscription(
1108        &self,
1109        room: &RoomId,
1110        thread_id: &EventId,
1111    ) -> Result<(), Self::Error> {
1112        self.deref().remove_thread_subscription(room, thread_id).await
1113    }
1114
1115    async fn load_thread_subscription(
1116        &self,
1117        room: &RoomId,
1118        thread_id: &EventId,
1119    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1120        self.deref().load_thread_subscription(room, thread_id).await
1121    }
1122
1123    async fn optimize(&self) -> Result<(), Self::Error> {
1124        self.deref().optimize().await
1125    }
1126
1127    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1128        self.deref().get_size().await
1129    }
1130}
1131
1132#[repr(transparent)]
1133struct EraseStateStoreError<T>(T);
1134
1135#[cfg(not(tarpaulin_include))]
1136impl<T: fmt::Debug> fmt::Debug for EraseStateStoreError<T> {
1137    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1138        self.0.fmt(f)
1139    }
1140}
1141
1142#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1143#[cfg_attr(not(target_family = "wasm"), async_trait)]
1144impl<T: StateStore> StateStore for EraseStateStoreError<T> {
1145    type Error = StoreError;
1146
1147    async fn get_kv_data(
1148        &self,
1149        key: StateStoreDataKey<'_>,
1150    ) -> Result<Option<StateStoreDataValue>, Self::Error> {
1151        self.0.get_kv_data(key).await.map_err(Into::into)
1152    }
1153
1154    async fn set_kv_data(
1155        &self,
1156        key: StateStoreDataKey<'_>,
1157        value: StateStoreDataValue,
1158    ) -> Result<(), Self::Error> {
1159        self.0.set_kv_data(key, value).await.map_err(Into::into)
1160    }
1161
1162    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
1163        self.0.remove_kv_data(key).await.map_err(Into::into)
1164    }
1165
1166    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
1167        self.0.save_changes(changes).await.map_err(Into::into)
1168    }
1169
1170    async fn get_presence_event(
1171        &self,
1172        user_id: &UserId,
1173    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
1174        self.0.get_presence_event(user_id).await.map_err(Into::into)
1175    }
1176
1177    async fn get_presence_events(
1178        &self,
1179        user_ids: &[OwnedUserId],
1180    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
1181        self.0.get_presence_events(user_ids).await.map_err(Into::into)
1182    }
1183
1184    async fn get_state_event(
1185        &self,
1186        room_id: &RoomId,
1187        event_type: StateEventType,
1188        state_key: &str,
1189    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
1190        self.0.get_state_event(room_id, event_type, state_key).await.map_err(Into::into)
1191    }
1192
1193    async fn get_state_events(
1194        &self,
1195        room_id: &RoomId,
1196        event_type: StateEventType,
1197    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1198        self.0.get_state_events(room_id, event_type).await.map_err(Into::into)
1199    }
1200
1201    async fn get_state_events_for_keys(
1202        &self,
1203        room_id: &RoomId,
1204        event_type: StateEventType,
1205        state_keys: &[&str],
1206    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1207        self.0.get_state_events_for_keys(room_id, event_type, state_keys).await.map_err(Into::into)
1208    }
1209
1210    async fn get_profile(
1211        &self,
1212        room_id: &RoomId,
1213        user_id: &UserId,
1214    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
1215        self.0.get_profile(room_id, user_id).await.map_err(Into::into)
1216    }
1217
1218    async fn get_profiles<'a>(
1219        &self,
1220        room_id: &RoomId,
1221        user_ids: &'a [OwnedUserId],
1222    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
1223        self.0.get_profiles(room_id, user_ids).await.map_err(Into::into)
1224    }
1225
1226    async fn get_user_ids(
1227        &self,
1228        room_id: &RoomId,
1229        memberships: RoomMemberships,
1230    ) -> Result<Vec<OwnedUserId>, Self::Error> {
1231        self.0.get_user_ids(room_id, memberships).await.map_err(Into::into)
1232    }
1233
1234    async fn get_room_infos(
1235        &self,
1236        room_load_settings: &RoomLoadSettings,
1237    ) -> Result<Vec<RoomInfo>, Self::Error> {
1238        self.0.get_room_infos(room_load_settings).await.map_err(Into::into)
1239    }
1240
1241    async fn get_users_with_display_name(
1242        &self,
1243        room_id: &RoomId,
1244        display_name: &DisplayName,
1245    ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
1246        self.0.get_users_with_display_name(room_id, display_name).await.map_err(Into::into)
1247    }
1248
1249    async fn get_users_with_display_names<'a>(
1250        &self,
1251        room_id: &RoomId,
1252        display_names: &'a [DisplayName],
1253    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
1254        self.0.get_users_with_display_names(room_id, display_names).await.map_err(Into::into)
1255    }
1256
1257    async fn get_account_data_event(
1258        &self,
1259        event_type: GlobalAccountDataEventType,
1260    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
1261        self.0.get_account_data_event(event_type).await.map_err(Into::into)
1262    }
1263
1264    async fn get_room_account_data_event(
1265        &self,
1266        room_id: &RoomId,
1267        event_type: RoomAccountDataEventType,
1268    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
1269        self.0.get_room_account_data_event(room_id, event_type).await.map_err(Into::into)
1270    }
1271
1272    async fn get_user_room_receipt_event(
1273        &self,
1274        room_id: &RoomId,
1275        receipt_type: ReceiptType,
1276        thread: ReceiptThread,
1277        user_id: &UserId,
1278    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
1279        self.0
1280            .get_user_room_receipt_event(room_id, receipt_type, thread, user_id)
1281            .await
1282            .map_err(Into::into)
1283    }
1284
1285    async fn get_event_room_receipt_events(
1286        &self,
1287        room_id: &RoomId,
1288        receipt_type: ReceiptType,
1289        thread: ReceiptThread,
1290        event_id: &EventId,
1291    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
1292        self.0
1293            .get_event_room_receipt_events(room_id, receipt_type, thread, event_id)
1294            .await
1295            .map_err(Into::into)
1296    }
1297
1298    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1299        self.0.get_custom_value(key).await.map_err(Into::into)
1300    }
1301
1302    async fn set_custom_value(
1303        &self,
1304        key: &[u8],
1305        value: Vec<u8>,
1306    ) -> Result<Option<Vec<u8>>, Self::Error> {
1307        self.0.set_custom_value(key, value).await.map_err(Into::into)
1308    }
1309
1310    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1311        self.0.remove_custom_value(key).await.map_err(Into::into)
1312    }
1313
1314    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1315        self.0.remove_room(room_id).await.map_err(Into::into)
1316    }
1317
1318    async fn save_send_queue_request(
1319        &self,
1320        room_id: &RoomId,
1321        transaction_id: OwnedTransactionId,
1322        created_at: MilliSecondsSinceUnixEpoch,
1323        content: QueuedRequestKind,
1324        priority: usize,
1325    ) -> Result<(), Self::Error> {
1326        self.0
1327            .save_send_queue_request(room_id, transaction_id, created_at, content, priority)
1328            .await
1329            .map_err(Into::into)
1330    }
1331
1332    async fn update_send_queue_request(
1333        &self,
1334        room_id: &RoomId,
1335        transaction_id: &TransactionId,
1336        content: QueuedRequestKind,
1337    ) -> Result<bool, Self::Error> {
1338        self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
1339    }
1340
1341    async fn remove_send_queue_request(
1342        &self,
1343        room_id: &RoomId,
1344        transaction_id: &TransactionId,
1345    ) -> Result<bool, Self::Error> {
1346        self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into)
1347    }
1348
1349    async fn load_send_queue_requests(
1350        &self,
1351        room_id: &RoomId,
1352    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1353        self.0.load_send_queue_requests(room_id).await.map_err(Into::into)
1354    }
1355
1356    async fn update_send_queue_request_status(
1357        &self,
1358        room_id: &RoomId,
1359        transaction_id: &TransactionId,
1360        error: Option<QueueWedgeError>,
1361    ) -> Result<(), Self::Error> {
1362        self.0
1363            .update_send_queue_request_status(room_id, transaction_id, error)
1364            .await
1365            .map_err(Into::into)
1366    }
1367
1368    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1369        self.0.load_rooms_with_unsent_requests().await.map_err(Into::into)
1370    }
1371
1372    async fn save_dependent_queued_request(
1373        &self,
1374        room_id: &RoomId,
1375        parent_txn_id: &TransactionId,
1376        own_txn_id: ChildTransactionId,
1377        created_at: MilliSecondsSinceUnixEpoch,
1378        content: DependentQueuedRequestKind,
1379    ) -> Result<(), Self::Error> {
1380        self.0
1381            .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1382            .await
1383            .map_err(Into::into)
1384    }
1385
1386    async fn mark_dependent_queued_requests_as_ready(
1387        &self,
1388        room_id: &RoomId,
1389        parent_txn_id: &TransactionId,
1390        sent_parent_key: SentRequestKey,
1391    ) -> Result<usize, Self::Error> {
1392        self.0
1393            .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1394            .await
1395            .map_err(Into::into)
1396    }
1397
1398    async fn remove_dependent_queued_request(
1399        &self,
1400        room_id: &RoomId,
1401        own_txn_id: &ChildTransactionId,
1402    ) -> Result<bool, Self::Error> {
1403        self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into)
1404    }
1405
1406    async fn load_dependent_queued_requests(
1407        &self,
1408        room_id: &RoomId,
1409    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1410        self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
1411    }
1412
1413    async fn update_dependent_queued_request(
1414        &self,
1415        room_id: &RoomId,
1416        own_transaction_id: &ChildTransactionId,
1417        new_content: DependentQueuedRequestKind,
1418    ) -> Result<bool, Self::Error> {
1419        self.0
1420            .update_dependent_queued_request(room_id, own_transaction_id, new_content)
1421            .await
1422            .map_err(Into::into)
1423    }
1424
1425    async fn upsert_thread_subscriptions(
1426        &self,
1427        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1428    ) -> Result<(), Self::Error> {
1429        self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
1430    }
1431
1432    async fn load_thread_subscription(
1433        &self,
1434        room: &RoomId,
1435        thread_id: &EventId,
1436    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1437        self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
1438    }
1439
1440    async fn remove_thread_subscription(
1441        &self,
1442        room: &RoomId,
1443        thread_id: &EventId,
1444    ) -> Result<(), Self::Error> {
1445        self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
1446    }
1447
1448    async fn optimize(&self) -> Result<(), Self::Error> {
1449        self.0.optimize().await.map_err(Into::into)
1450    }
1451
1452    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1453        self.0.get_size().await.map_err(Into::into)
1454    }
1455}
1456
1457/// A wrapper around a [`StateStore`] that supports synchronizing calls to
1458/// [`StateStore::save_changes`].
1459#[derive(Debug, Clone)]
1460pub struct SaveLockedStateStore<T = Arc<DynStateStore>> {
1461    store: T,
1462    lock: Arc<Mutex<()>>,
1463}
1464
1465/// An error type that represents a scenario where a [`MutexGuard`] provided to
1466/// a function does not reference the underlying [`Mutex`] in the enclosing
1467/// [`SaveLockedStateStore`].
1468#[derive(Debug, Error)]
1469#[error("a mutex guard was provided, but it does not reference the correct mutex")]
1470pub struct IncorrectMutexGuardError;
1471
1472impl From<IncorrectMutexGuardError> for StoreError {
1473    fn from(value: IncorrectMutexGuardError) -> Self {
1474        Self::backend(value)
1475    }
1476}
1477
1478impl<T> SaveLockedStateStore<T> {
1479    /// Creates a new [`SaveLockedStateStore`] with the provided store.
1480    pub fn new(store: T) -> Self {
1481        Self { store, lock: Arc::new(Mutex::new(())) }
1482    }
1483
1484    /// Returns a reference to the underlying [`Mutex`] used to synchronize
1485    /// calls to [`StateStore::save_changes`].
1486    pub fn lock(&self) -> &Mutex<()> {
1487        self.lock.as_ref()
1488    }
1489}
1490
1491impl<T: StateStore> SaveLockedStateStore<T> {
1492    /// Provides a means of calling [`StateStore::save_changes`] when the caller
1493    /// has already acquired the underlying [`Mutex`]. Returns an error if
1494    /// the [`MutexGuard`] provided does not reference the underlying
1495    /// [`Mutex`].
1496    pub async fn save_changes_with_guard(
1497        &self,
1498        guard: &MutexGuard<'_, ()>,
1499        changes: &StateChanges,
1500    ) -> Result<(), StoreError> {
1501        if !std::ptr::eq(MutexGuard::mutex(guard), self.lock()) {
1502            Err(IncorrectMutexGuardError.into())
1503        } else {
1504            self.store.save_changes(changes).await.map_err(Into::into)
1505        }
1506    }
1507}
1508
1509#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1510#[cfg_attr(not(target_family = "wasm"), async_trait)]
1511impl<T: StateStore> StateStore for SaveLockedStateStore<T> {
1512    type Error = T::Error;
1513
1514    async fn get_kv_data(
1515        &self,
1516        key: StateStoreDataKey<'_>,
1517    ) -> Result<Option<StateStoreDataValue>, Self::Error> {
1518        self.store.get_kv_data(key).await
1519    }
1520
1521    async fn set_kv_data(
1522        &self,
1523        key: StateStoreDataKey<'_>,
1524        value: StateStoreDataValue,
1525    ) -> Result<(), Self::Error> {
1526        self.store.set_kv_data(key, value).await
1527    }
1528
1529    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
1530        self.store.remove_kv_data(key).await
1531    }
1532
1533    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
1534        let _guard = self.lock.lock().await;
1535        self.store.save_changes(changes).await
1536    }
1537
1538    async fn get_presence_event(
1539        &self,
1540        user_id: &UserId,
1541    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
1542        self.store.get_presence_event(user_id).await
1543    }
1544
1545    async fn get_presence_events(
1546        &self,
1547        user_ids: &[OwnedUserId],
1548    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
1549        self.store.get_presence_events(user_ids).await
1550    }
1551
1552    async fn get_state_event(
1553        &self,
1554        room_id: &RoomId,
1555        event_type: StateEventType,
1556        state_key: &str,
1557    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
1558        self.store.get_state_event(room_id, event_type, state_key).await
1559    }
1560
1561    async fn get_state_events(
1562        &self,
1563        room_id: &RoomId,
1564        event_type: StateEventType,
1565    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1566        self.store.get_state_events(room_id, event_type).await
1567    }
1568
1569    async fn get_state_events_for_keys(
1570        &self,
1571        room_id: &RoomId,
1572        event_type: StateEventType,
1573        state_keys: &[&str],
1574    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1575        self.store.get_state_events_for_keys(room_id, event_type, state_keys).await
1576    }
1577
1578    async fn get_profile(
1579        &self,
1580        room_id: &RoomId,
1581        user_id: &UserId,
1582    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
1583        self.store.get_profile(room_id, user_id).await
1584    }
1585
1586    async fn get_profiles<'a>(
1587        &self,
1588        room_id: &RoomId,
1589        user_ids: &'a [OwnedUserId],
1590    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
1591        self.store.get_profiles(room_id, user_ids).await
1592    }
1593
1594    async fn get_user_ids(
1595        &self,
1596        room_id: &RoomId,
1597        memberships: RoomMemberships,
1598    ) -> Result<Vec<OwnedUserId>, Self::Error> {
1599        self.store.get_user_ids(room_id, memberships).await
1600    }
1601
1602    async fn get_room_infos(
1603        &self,
1604        room_load_settings: &RoomLoadSettings,
1605    ) -> Result<Vec<RoomInfo>, Self::Error> {
1606        self.store.get_room_infos(room_load_settings).await
1607    }
1608
1609    async fn get_users_with_display_name(
1610        &self,
1611        room_id: &RoomId,
1612        display_name: &DisplayName,
1613    ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
1614        self.store.get_users_with_display_name(room_id, display_name).await
1615    }
1616
1617    async fn get_users_with_display_names<'a>(
1618        &self,
1619        room_id: &RoomId,
1620        display_names: &'a [DisplayName],
1621    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
1622        self.store.get_users_with_display_names(room_id, display_names).await
1623    }
1624
1625    async fn get_account_data_event(
1626        &self,
1627        event_type: GlobalAccountDataEventType,
1628    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
1629        self.store.get_account_data_event(event_type).await
1630    }
1631
1632    async fn get_room_account_data_event(
1633        &self,
1634        room_id: &RoomId,
1635        event_type: RoomAccountDataEventType,
1636    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
1637        self.store.get_room_account_data_event(room_id, event_type).await
1638    }
1639
1640    async fn get_user_room_receipt_event(
1641        &self,
1642        room_id: &RoomId,
1643        receipt_type: ReceiptType,
1644        thread: ReceiptThread,
1645        user_id: &UserId,
1646    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
1647        self.store.get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
1648    }
1649
1650    async fn get_event_room_receipt_events(
1651        &self,
1652        room_id: &RoomId,
1653        receipt_type: ReceiptType,
1654        thread: ReceiptThread,
1655        event_id: &EventId,
1656    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
1657        self.store.get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
1658    }
1659
1660    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1661        self.store.get_custom_value(key).await
1662    }
1663
1664    async fn set_custom_value(
1665        &self,
1666        key: &[u8],
1667        value: Vec<u8>,
1668    ) -> Result<Option<Vec<u8>>, Self::Error> {
1669        self.store.set_custom_value(key, value).await
1670    }
1671
1672    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1673        self.store.remove_custom_value(key).await
1674    }
1675
1676    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1677        self.store.remove_room(room_id).await
1678    }
1679
1680    async fn save_send_queue_request(
1681        &self,
1682        room_id: &RoomId,
1683        transaction_id: OwnedTransactionId,
1684        created_at: MilliSecondsSinceUnixEpoch,
1685        request: QueuedRequestKind,
1686        priority: usize,
1687    ) -> Result<(), Self::Error> {
1688        self.store
1689            .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
1690            .await
1691    }
1692
1693    async fn update_send_queue_request(
1694        &self,
1695        room_id: &RoomId,
1696        transaction_id: &TransactionId,
1697        content: QueuedRequestKind,
1698    ) -> Result<bool, Self::Error> {
1699        self.store.update_send_queue_request(room_id, transaction_id, content).await
1700    }
1701
1702    async fn remove_send_queue_request(
1703        &self,
1704        room_id: &RoomId,
1705        transaction_id: &TransactionId,
1706    ) -> Result<bool, Self::Error> {
1707        self.store.remove_send_queue_request(room_id, transaction_id).await
1708    }
1709
1710    async fn load_send_queue_requests(
1711        &self,
1712        room_id: &RoomId,
1713    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1714        self.store.load_send_queue_requests(room_id).await
1715    }
1716
1717    async fn update_send_queue_request_status(
1718        &self,
1719        room_id: &RoomId,
1720        transaction_id: &TransactionId,
1721        error: Option<QueueWedgeError>,
1722    ) -> Result<(), Self::Error> {
1723        self.store.update_send_queue_request_status(room_id, transaction_id, error).await
1724    }
1725
1726    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1727        self.store.load_rooms_with_unsent_requests().await
1728    }
1729
1730    async fn save_dependent_queued_request(
1731        &self,
1732        room_id: &RoomId,
1733        parent_txn_id: &TransactionId,
1734        own_txn_id: ChildTransactionId,
1735        created_at: MilliSecondsSinceUnixEpoch,
1736        content: DependentQueuedRequestKind,
1737    ) -> Result<(), Self::Error> {
1738        self.store
1739            .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1740            .await
1741    }
1742
1743    async fn mark_dependent_queued_requests_as_ready(
1744        &self,
1745        room_id: &RoomId,
1746        parent_txn_id: &TransactionId,
1747        sent_parent_key: SentRequestKey,
1748    ) -> Result<usize, Self::Error> {
1749        self.store
1750            .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1751            .await
1752    }
1753
1754    async fn update_dependent_queued_request(
1755        &self,
1756        room_id: &RoomId,
1757        own_transaction_id: &ChildTransactionId,
1758        new_content: DependentQueuedRequestKind,
1759    ) -> Result<bool, Self::Error> {
1760        self.store.update_dependent_queued_request(room_id, own_transaction_id, new_content).await
1761    }
1762
1763    async fn remove_dependent_queued_request(
1764        &self,
1765        room: &RoomId,
1766        own_txn_id: &ChildTransactionId,
1767    ) -> Result<bool, Self::Error> {
1768        self.store.remove_dependent_queued_request(room, own_txn_id).await
1769    }
1770
1771    async fn load_dependent_queued_requests(
1772        &self,
1773        room: &RoomId,
1774    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1775        self.store.load_dependent_queued_requests(room).await
1776    }
1777
1778    async fn upsert_thread_subscriptions(
1779        &self,
1780        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1781    ) -> Result<(), Self::Error> {
1782        self.store.upsert_thread_subscriptions(updates).await
1783    }
1784
1785    async fn remove_thread_subscription(
1786        &self,
1787        room: &RoomId,
1788        thread_id: &EventId,
1789    ) -> Result<(), Self::Error> {
1790        self.store.remove_thread_subscription(room, thread_id).await
1791    }
1792
1793    async fn load_thread_subscription(
1794        &self,
1795        room: &RoomId,
1796        thread_id: &EventId,
1797    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1798        self.store.load_thread_subscription(room, thread_id).await
1799    }
1800
1801    async fn optimize(&self) -> Result<(), Self::Error> {
1802        self.store.optimize().await
1803    }
1804
1805    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1806        self.store.get_size().await
1807    }
1808}
1809
1810/// Convenience functionality for state stores.
1811#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1812#[cfg_attr(not(target_family = "wasm"), async_trait)]
1813pub trait StateStoreExt: StateStore {
1814    /// Get a specific state event of statically-known type.
1815    ///
1816    /// # Arguments
1817    ///
1818    /// * `room_id` - The id of the room the state event was received for.
1819    async fn get_state_event_static<C>(
1820        &self,
1821        room_id: &RoomId,
1822    ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
1823    where
1824        C: StaticEventContent<IsPrefix = ruma::events::False>
1825            + StaticStateEventContent<StateKey = EmptyStateKey>
1826            + RedactContent,
1827        C::Redacted: RedactedStateEventContent,
1828    {
1829        Ok(self.get_state_event(room_id, C::TYPE.into(), "").await?.map(|raw| raw.cast()))
1830    }
1831
1832    /// Get a specific state event of statically-known type.
1833    ///
1834    /// # Arguments
1835    ///
1836    /// * `room_id` - The id of the room the state event was received for.
1837    async fn get_state_event_static_for_key<C, K>(
1838        &self,
1839        room_id: &RoomId,
1840        state_key: &K,
1841    ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
1842    where
1843        C: StaticEventContent<IsPrefix = ruma::events::False>
1844            + StaticStateEventContent
1845            + RedactContent,
1846        C::StateKey: Borrow<K>,
1847        C::Redacted: RedactedStateEventContent,
1848        K: AsRef<str> + ?Sized + Sync,
1849    {
1850        Ok(self
1851            .get_state_event(room_id, C::TYPE.into(), state_key.as_ref())
1852            .await?
1853            .map(|raw| raw.cast()))
1854    }
1855
1856    /// Get a list of state events of a statically-known type for a given room.
1857    ///
1858    /// # Arguments
1859    ///
1860    /// * `room_id` - The id of the room to find events for.
1861    async fn get_state_events_static<C>(
1862        &self,
1863        room_id: &RoomId,
1864    ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
1865    where
1866        C: StaticEventContent<IsPrefix = ruma::events::False>
1867            + StaticStateEventContent
1868            + RedactContent,
1869        C::Redacted: RedactedStateEventContent,
1870    {
1871        // FIXME: Could be more efficient, if we had streaming store accessor functions
1872        Ok(self
1873            .get_state_events(room_id, C::TYPE.into())
1874            .await?
1875            .into_iter()
1876            .map(|raw| raw.cast())
1877            .collect())
1878    }
1879
1880    /// Get a list of state events of a statically-known type for a given room
1881    /// and given state keys.
1882    ///
1883    /// # Arguments
1884    ///
1885    /// * `room_id` - The id of the room to find events for.
1886    ///
1887    /// * `state_keys` - The list of state keys to find.
1888    async fn get_state_events_for_keys_static<'a, C, K, I>(
1889        &self,
1890        room_id: &RoomId,
1891        state_keys: I,
1892    ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
1893    where
1894        C: StaticEventContent<IsPrefix = ruma::events::False>
1895            + StaticStateEventContent
1896            + RedactContent,
1897        C::StateKey: Borrow<K>,
1898        C::Redacted: RedactedStateEventContent,
1899        K: AsRef<str> + Sized + Sync + 'a,
1900        I: IntoIterator<Item = &'a K> + Send,
1901        I::IntoIter: Send,
1902    {
1903        Ok(self
1904            .get_state_events_for_keys(
1905                room_id,
1906                C::TYPE.into(),
1907                &state_keys.into_iter().map(|k| k.as_ref()).collect::<Vec<_>>(),
1908            )
1909            .await?
1910            .into_iter()
1911            .map(|raw| raw.cast())
1912            .collect())
1913    }
1914
1915    /// Get an event of a statically-known type from the account data store.
1916    async fn get_account_data_event_static<C>(
1917        &self,
1918    ) -> Result<Option<Raw<GlobalAccountDataEvent<C>>>, Self::Error>
1919    where
1920        C: StaticEventContent<IsPrefix = ruma::events::False> + GlobalAccountDataEventContent,
1921    {
1922        Ok(self.get_account_data_event(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1923    }
1924
1925    /// Get an event of a statically-known type from the room account data
1926    /// store.
1927    ///
1928    /// # Arguments
1929    ///
1930    /// * `room_id` - The id of the room for which the room account data event
1931    ///   should be fetched.
1932    async fn get_room_account_data_event_static<C>(
1933        &self,
1934        room_id: &RoomId,
1935    ) -> Result<Option<Raw<RoomAccountDataEvent<C>>>, Self::Error>
1936    where
1937        C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1938    {
1939        Ok(self
1940            .get_room_account_data_event(room_id, C::TYPE.into())
1941            .await?
1942            .map(Raw::cast_unchecked))
1943    }
1944
1945    /// Get the `MemberEvent` for the given state key in the given room id.
1946    ///
1947    /// # Arguments
1948    ///
1949    /// * `room_id` - The room id the member event belongs to.
1950    ///
1951    /// * `state_key` - The user id that the member event defines the state for.
1952    async fn get_member_event(
1953        &self,
1954        room_id: &RoomId,
1955        state_key: &UserId,
1956    ) -> Result<Option<RawMemberEvent>, Self::Error> {
1957        self.get_state_event_static_for_key(room_id, state_key).await
1958    }
1959}
1960
1961#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1962#[cfg_attr(not(target_family = "wasm"), async_trait)]
1963impl<T: StateStore + ?Sized> StateStoreExt for T {}
1964
1965/// A type-erased [`StateStore`].
1966pub type DynStateStore = dyn StateStore<Error = StoreError>;
1967
1968/// A type that can be type-erased into `Arc<dyn StateStore>`.
1969///
1970/// This trait is not meant to be implemented directly outside
1971/// `matrix-sdk-crypto`, but it is automatically implemented for everything that
1972/// implements `StateStore`.
1973pub trait IntoStateStore {
1974    #[doc(hidden)]
1975    fn into_state_store(self) -> Arc<DynStateStore>;
1976}
1977
1978impl<T> IntoStateStore for T
1979where
1980    T: StateStore + Sized + 'static,
1981{
1982    fn into_state_store(self) -> Arc<DynStateStore> {
1983        Arc::new(EraseStateStoreError(self))
1984    }
1985}
1986
1987/// Serialisable representation of get_supported_versions::Response.
1988#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
1989pub struct SupportedVersionsResponse {
1990    /// Versions supported by the remote server.
1991    pub versions: Vec<String>,
1992
1993    /// List of unstable features and their enablement status.
1994    pub unstable_features: BTreeMap<String, bool>,
1995}
1996
1997impl SupportedVersionsResponse {
1998    /// Extracts known Matrix versions and features from the un-typed lists of
1999    /// strings.
2000    ///
2001    /// Note: Matrix versions and features that Ruma cannot parse, or does not
2002    /// know about, are discarded.
2003    pub fn supported_versions(&self) -> SupportedVersions {
2004        let mut supported_versions =
2005            SupportedVersions::from_parts(&self.versions, &self.unstable_features);
2006
2007        // We need at least one supported version to be able to make requests, so we
2008        // default to Matrix 1.0.
2009        if supported_versions.versions.is_empty() {
2010            supported_versions.versions.insert(MatrixVersion::V1_0);
2011        }
2012
2013        supported_versions
2014    }
2015}
2016
2017#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2018/// A serialisable representation of discover_homeserver::Response.
2019pub struct WellKnownResponse {
2020    /// Information about the homeserver to connect to.
2021    pub homeserver: HomeserverInfo,
2022
2023    /// Information about the identity server to connect to.
2024    pub identity_server: Option<IdentityServerInfo>,
2025
2026    /// Information about the tile server to use to display location data.
2027    pub tile_server: Option<TileServerInfo>,
2028
2029    /// A list of the available MatrixRTC foci, ordered by priority.
2030    pub rtc_foci: Vec<RtcFocusInfo>,
2031}
2032
2033impl From<discover_homeserver::Response> for WellKnownResponse {
2034    fn from(response: discover_homeserver::Response) -> Self {
2035        Self {
2036            homeserver: response.homeserver,
2037            identity_server: response.identity_server,
2038            tile_server: response.tile_server,
2039            rtc_foci: response.rtc_foci,
2040        }
2041    }
2042}
2043
2044/// A value for key-value data that should be persisted into the store.
2045#[derive(Debug, Clone)]
2046pub enum StateStoreDataValue {
2047    /// The sync token.
2048    SyncToken(String),
2049
2050    /// The supported versions of the server.
2051    SupportedVersions(TtlValue<SupportedVersionsResponse>),
2052
2053    /// The well-known information of the server.
2054    WellKnown(TtlValue<Option<WellKnownResponse>>),
2055
2056    /// A filter with the given ID.
2057    Filter(String),
2058
2059    /// The user avatar url
2060    UserAvatarUrl(OwnedMxcUri),
2061
2062    /// A list of recently visited room identifiers for the current user
2063    RecentlyVisitedRooms(Vec<OwnedRoomId>),
2064
2065    /// Persistent data for
2066    /// `matrix_sdk_ui::unable_to_decrypt_hook::UtdHookManager`.
2067    UtdHookManagerData(GrowableBloom),
2068
2069    /// A unit value telling us that the client uploaded duplicate one-time
2070    /// keys.
2071    OneTimeKeyAlreadyUploaded,
2072
2073    /// A composer draft for the room.
2074    /// To learn more, see [`ComposerDraft`].
2075    ///
2076    /// [`ComposerDraft`]: Self::ComposerDraft
2077    ComposerDraft(ComposerDraft),
2078
2079    /// A list of knock request ids marked as seen in a room.
2080    SeenKnockRequests(BTreeMap<OwnedEventId, OwnedUserId>),
2081
2082    /// A list of tokens to continue thread subscriptions catchup.
2083    ///
2084    /// See documentation of [`ThreadSubscriptionCatchupToken`] for more
2085    /// details.
2086    ThreadSubscriptionsCatchupTokens(Vec<ThreadSubscriptionCatchupToken>),
2087
2088    /// The capabilities the homeserver supports or disables.
2089    HomeserverCapabilities(TtlValue<Capabilities>),
2090}
2091
2092/// Tokens to use when catching up on thread subscriptions.
2093///
2094/// These tokens are created when the client receives some thread subscriptions
2095/// from sync, but the sync indicates that there are more thread subscriptions
2096/// available on the server. In this case, it's expected that the client will
2097/// call the [MSC4308] companion endpoint to catch up (back-paginate) on
2098/// previous thread subscriptions.
2099///
2100/// [MSC4308]: https://github.com/matrix-org/matrix-spec-proposals/pull/4308
2101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2102pub struct ThreadSubscriptionCatchupToken {
2103    /// The token to use as the lower bound when fetching new threads
2104    /// subscriptions.
2105    ///
2106    /// In sliding sync, this is the `prev_batch` value of a sliding sync
2107    /// response.
2108    pub from: String,
2109
2110    /// The token to use as the upper bound when fetching new threads
2111    /// subscriptions.
2112    ///
2113    /// In sliding sync, it must be set to the `pos` value of the sliding sync
2114    /// *request*, which response received a `prev_batch` token.
2115    pub to: Option<String>,
2116}
2117
2118/// Current draft of the composer for the room.
2119#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2120pub struct ComposerDraft {
2121    /// The draft content in plain text.
2122    pub plain_text: String,
2123    /// If the message is formatted in HTML, the HTML representation of the
2124    /// message.
2125    pub html_text: Option<String>,
2126    /// The type of draft.
2127    pub draft_type: ComposerDraftType,
2128    /// Attachments associated with this draft.
2129    #[serde(default)]
2130    pub attachments: Vec<DraftAttachment>,
2131}
2132
2133/// An attachment stored with a composer draft.
2134#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2135pub struct DraftAttachment {
2136    /// The filename of the attachment.
2137    pub filename: String,
2138    /// The attachment content with type-specific data.
2139    pub content: DraftAttachmentContent,
2140}
2141
2142/// The content of a draft attachment with type-specific data.
2143#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2144#[serde(tag = "type")]
2145pub enum DraftAttachmentContent {
2146    /// Image attachment.
2147    Image {
2148        /// The image file data.
2149        data: Vec<u8>,
2150        /// MIME type.
2151        mimetype: Option<String>,
2152        /// File size in bytes.
2153        size: Option<u64>,
2154        /// Width in pixels.
2155        width: Option<u64>,
2156        /// Height in pixels.
2157        height: Option<u64>,
2158        /// BlurHash string.
2159        blurhash: Option<String>,
2160        /// Optional thumbnail.
2161        thumbnail: Option<DraftThumbnail>,
2162    },
2163    /// Video attachment.
2164    Video {
2165        /// The video file data.
2166        data: Vec<u8>,
2167        /// MIME type.
2168        mimetype: Option<String>,
2169        /// File size in bytes.
2170        size: Option<u64>,
2171        /// Width in pixels.
2172        width: Option<u64>,
2173        /// Height in pixels.
2174        height: Option<u64>,
2175        /// Duration.
2176        duration: Option<std::time::Duration>,
2177        /// BlurHash string.
2178        blurhash: Option<String>,
2179        /// Optional thumbnail.
2180        thumbnail: Option<DraftThumbnail>,
2181    },
2182    /// Audio attachment.
2183    Audio {
2184        /// The audio file data.
2185        data: Vec<u8>,
2186        /// MIME type.
2187        mimetype: Option<String>,
2188        /// File size in bytes.
2189        size: Option<u64>,
2190        /// Duration.
2191        duration: Option<std::time::Duration>,
2192    },
2193    /// Generic file attachment.
2194    File {
2195        /// The file data.
2196        data: Vec<u8>,
2197        /// MIME type.
2198        mimetype: Option<String>,
2199        /// File size in bytes.
2200        size: Option<u64>,
2201    },
2202}
2203
2204/// Thumbnail data for a draft attachment.
2205#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2206pub struct DraftThumbnail {
2207    /// The filename of the thumbnail.
2208    pub filename: String,
2209    /// The thumbnail image data.
2210    pub data: Vec<u8>,
2211    /// MIME type of the thumbnail.
2212    pub mimetype: Option<String>,
2213    /// Width in pixels.
2214    pub width: Option<u64>,
2215    /// Height in pixels.
2216    pub height: Option<u64>,
2217    /// File size in bytes.
2218    pub size: Option<u64>,
2219}
2220
2221/// The type of draft of the composer.
2222#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2223pub enum ComposerDraftType {
2224    /// The draft is a new message.
2225    NewMessage,
2226    /// The draft is a reply to an event.
2227    Reply {
2228        /// The ID of the event being replied to.
2229        event_id: OwnedEventId,
2230    },
2231    /// The draft is an edit of an event.
2232    Edit {
2233        /// The ID of the event being edited.
2234        event_id: OwnedEventId,
2235    },
2236}
2237
2238impl StateStoreDataValue {
2239    /// Get this value if it is a sync token.
2240    pub fn into_sync_token(self) -> Option<String> {
2241        as_variant!(self, Self::SyncToken)
2242    }
2243
2244    /// Get this value if it is a filter.
2245    pub fn into_filter(self) -> Option<String> {
2246        as_variant!(self, Self::Filter)
2247    }
2248
2249    /// Get this value if it is a user avatar url.
2250    pub fn into_user_avatar_url(self) -> Option<OwnedMxcUri> {
2251        as_variant!(self, Self::UserAvatarUrl)
2252    }
2253
2254    /// Get this value if it is a list of recently visited rooms.
2255    pub fn into_recently_visited_rooms(self) -> Option<Vec<OwnedRoomId>> {
2256        as_variant!(self, Self::RecentlyVisitedRooms)
2257    }
2258
2259    /// Get this value if it is the data for the `UtdHookManager`.
2260    pub fn into_utd_hook_manager_data(self) -> Option<GrowableBloom> {
2261        as_variant!(self, Self::UtdHookManagerData)
2262    }
2263
2264    /// Get this value if it is a composer draft.
2265    pub fn into_composer_draft(self) -> Option<ComposerDraft> {
2266        as_variant!(self, Self::ComposerDraft)
2267    }
2268
2269    /// Get this value if it is the supported versions metadata.
2270    pub fn into_supported_versions(self) -> Option<TtlValue<SupportedVersionsResponse>> {
2271        as_variant!(self, Self::SupportedVersions)
2272    }
2273
2274    /// Get this value if it is the well-known metadata.
2275    pub fn into_well_known(self) -> Option<TtlValue<Option<WellKnownResponse>>> {
2276        as_variant!(self, Self::WellKnown)
2277    }
2278
2279    /// Get this value if it is the data for the ignored join requests.
2280    pub fn into_seen_knock_requests(self) -> Option<BTreeMap<OwnedEventId, OwnedUserId>> {
2281        as_variant!(self, Self::SeenKnockRequests)
2282    }
2283
2284    /// Get this value if it is the data for the thread subscriptions catchup
2285    /// tokens.
2286    pub fn into_thread_subscriptions_catchup_tokens(
2287        self,
2288    ) -> Option<Vec<ThreadSubscriptionCatchupToken>> {
2289        as_variant!(self, Self::ThreadSubscriptionsCatchupTokens)
2290    }
2291
2292    /// Get this value if it is the data for the capabilities the homeserver
2293    /// supports or disables.
2294    pub fn into_homeserver_capabilities(self) -> Option<TtlValue<Capabilities>> {
2295        as_variant!(self, Self::HomeserverCapabilities)
2296    }
2297}
2298
2299/// A key for key-value data.
2300#[derive(Debug, Clone, Copy)]
2301pub enum StateStoreDataKey<'a> {
2302    /// The sync token.
2303    SyncToken,
2304
2305    /// The supported versions of the server,
2306    SupportedVersions,
2307
2308    /// The well-known information of the server,
2309    WellKnown,
2310
2311    /// A filter with the given name.
2312    Filter(&'a str),
2313
2314    /// Avatar URL
2315    UserAvatarUrl(&'a UserId),
2316
2317    /// Recently visited room identifiers
2318    RecentlyVisitedRooms(&'a UserId),
2319
2320    /// Persistent data for
2321    /// `matrix_sdk_ui::unable_to_decrypt_hook::UtdHookManager`.
2322    UtdHookManagerData,
2323
2324    /// Data remembering if the client already reported that it has uploaded
2325    /// duplicate one-time keys.
2326    OneTimeKeyAlreadyUploaded,
2327
2328    /// A composer draft for the room.
2329    /// To learn more, see [`ComposerDraft`].
2330    ///
2331    /// [`ComposerDraft`]: Self::ComposerDraft
2332    ComposerDraft(&'a RoomId, Option<&'a EventId>),
2333
2334    /// A list of knock request ids marked as seen in a room.
2335    SeenKnockRequests(&'a RoomId),
2336
2337    /// A list of thread subscriptions catchup tokens.
2338    ThreadSubscriptionsCatchupTokens,
2339
2340    /// A list of capabilities that the homeserver supports.
2341    HomeserverCapabilities,
2342}
2343
2344impl StateStoreDataKey<'_> {
2345    /// Key to use for the [`SyncToken`][Self::SyncToken] variant.
2346    pub const SYNC_TOKEN: &'static str = "sync_token";
2347
2348    /// Key to use for the [`SupportedVersions`][Self::SupportedVersions]
2349    /// variant.
2350    pub const SUPPORTED_VERSIONS: &'static str = "server_capabilities"; // Note: this is the old name, kept for backwards compatibility.
2351
2352    /// Key to use for the [`WellKnown`][Self::WellKnown]
2353    /// variant.
2354    pub const WELL_KNOWN: &'static str = "well_known";
2355
2356    /// Key prefix to use for the [`Filter`][Self::Filter] variant.
2357    pub const FILTER: &'static str = "filter";
2358
2359    /// Key prefix to use for the [`UserAvatarUrl`][Self::UserAvatarUrl]
2360    /// variant.
2361    pub const USER_AVATAR_URL: &'static str = "user_avatar_url";
2362
2363    /// Key prefix to use for the
2364    /// [`RecentlyVisitedRooms`][Self::RecentlyVisitedRooms] variant.
2365    pub const RECENTLY_VISITED_ROOMS: &'static str = "recently_visited_rooms";
2366
2367    /// Key to use for the [`UtdHookManagerData`][Self::UtdHookManagerData]
2368    /// variant.
2369    pub const UTD_HOOK_MANAGER_DATA: &'static str = "utd_hook_manager_data";
2370
2371    /// Key to use for the flag remembering that we already reported that we
2372    /// uploaded duplicate one-time keys.
2373    pub const ONE_TIME_KEY_ALREADY_UPLOADED: &'static str = "one_time_key_already_uploaded";
2374
2375    /// Key prefix to use for the [`ComposerDraft`][Self::ComposerDraft]
2376    /// variant.
2377    pub const COMPOSER_DRAFT: &'static str = "composer_draft";
2378
2379    /// Key prefix to use for the
2380    /// [`SeenKnockRequests`][Self::SeenKnockRequests] variant.
2381    pub const SEEN_KNOCK_REQUESTS: &'static str = "seen_knock_requests";
2382
2383    /// Key prefix to use for the
2384    /// [`ThreadSubscriptionsCatchupTokens`][Self::ThreadSubscriptionsCatchupTokens] variant.
2385    pub const THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS: &'static str =
2386        "thread_subscriptions_catchup_tokens";
2387
2388    /// Key prefix to use for the homeserver's [`Capabilities`].
2389    pub const HOMESERVER_CAPABILITIES: &'static str = "homeserver_capabilities";
2390}
2391
2392/// Compare two thread subscription changes bump stamps, given a fixed room and
2393/// thread root event id pair.
2394///
2395/// May update the newer one to keep the previous one if needed, under some
2396/// conditions.
2397///
2398/// Returns true if the new subscription should be stored, or false if the new
2399/// subscription should be ignored.
2400pub fn compare_thread_subscription_bump_stamps(
2401    previous: Option<u64>,
2402    new: &mut Option<u64>,
2403) -> bool {
2404    match (previous, &new) {
2405        // If the previous subscription had a bump stamp, and the new one doesn't, keep the
2406        // previous one; it should be updated soon via sync anyways.
2407        (Some(prev_bump), None) => {
2408            *new = Some(prev_bump);
2409        }
2410
2411        // If the previous bump stamp is newer than the new one, don't store the value at all.
2412        (Some(prev_bump), Some(new_bump)) if *new_bump <= prev_bump => {
2413            return false;
2414        }
2415
2416        // In all other cases, keep the new bumpstamp.
2417        _ => {}
2418    }
2419
2420    true
2421}
2422
2423#[cfg(test)]
2424mod tests {
2425    mod save_locked_state_store {
2426        use std::time::Duration;
2427
2428        use assert_matches::assert_matches;
2429        use futures_util::future::{self, Either};
2430        #[cfg(all(target_family = "wasm", target_os = "unknown"))]
2431        use gloo_timers::future::sleep;
2432        use matrix_sdk_common::executor::spawn;
2433        use matrix_sdk_test::async_test;
2434        use tokio::sync::Mutex;
2435        #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
2436        use tokio::time::sleep;
2437
2438        use crate::{
2439            StateChanges, StateStore,
2440            store::{IntoStateStore, MemoryStore, Result, SaveLockedStateStore},
2441        };
2442
2443        async fn get_store() -> Result<impl StateStore> {
2444            Ok(SaveLockedStateStore::new(MemoryStore::new()))
2445        }
2446
2447        statestore_integration_tests!();
2448
2449        #[async_test]
2450        async fn test_state_store_only_accepts_guard_for_underlying_mutex() {
2451            let state_store = SaveLockedStateStore::new(MemoryStore::new());
2452            let state_changes = StateChanges::default();
2453            state_store
2454                .save_changes_with_guard(&state_store.lock().lock().await, &state_changes)
2455                .await
2456                .expect("state store accepts guard for underlying mutex");
2457
2458            let mutex = Mutex::new(());
2459            state_store
2460                .save_changes_with_guard(&mutex.lock().await, &state_changes)
2461                .await
2462                .expect_err("state store does not accept guard for unknown mutex");
2463        }
2464
2465        #[derive(Debug)]
2466        struct Elapsed;
2467
2468        async fn timeout<F: Future + Unpin>(
2469            duration: Duration,
2470            f: F,
2471        ) -> Result<F::Output, Elapsed> {
2472            #[cfg(all(target_family = "wasm", target_os = "unknown"))]
2473            {
2474                match future::select(sleep(duration), f).await {
2475                    Either::Left(_) => return Err(Elapsed),
2476                    Either::Right((output, _)) => Ok(output),
2477                }
2478            }
2479            #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
2480            {
2481                tokio::time::timeout(duration, f).await.map_err(|_| Elapsed)
2482            }
2483        }
2484
2485        #[async_test]
2486        async fn test_state_store_waits_to_acquire_lock_before_saving_changes() {
2487            let state_store = SaveLockedStateStore::new(MemoryStore::new().into_state_store());
2488
2489            // Acquire lock and hold it for 5 seconds
2490            let lock_task = spawn({
2491                let state_store = state_store.clone();
2492                async move {
2493                    let lock = state_store.lock();
2494                    let _guard = lock.lock().await;
2495                    sleep(Duration::from_secs(5)).await;
2496                }
2497            });
2498
2499            // Try to save changes to the state store while the lock is held by another task
2500            let save_task =
2501                spawn(async move { state_store.save_changes(&StateChanges::default()).await });
2502
2503            // Ensure that the second task does not progress until the first task has
2504            // completed and therefore release the save lock
2505            assert_matches!(future::select(lock_task, save_task).await, Either::Left((_, save_task)) => {
2506                timeout(Duration::from_millis(100), save_task)
2507                    .await
2508                    .expect("task completes before timeout")
2509                    .expect("task completes successfully")
2510                    .expect("task saves changes");
2511            });
2512        }
2513    }
2514}