Skip to main content

matrix_sdk_base/store/
traits.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    borrow::Borrow,
17    collections::{BTreeMap, BTreeSet, HashMap},
18    fmt,
19    ops::Deref,
20    sync::Arc,
21};
22
23use as_variant::as_variant;
24use async_trait::async_trait;
25use growable_bloom_filter::GrowableBloom;
26use matrix_sdk_common::{AsyncTraitDeps, ttl::TtlValue};
27use ruma::{
28    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
29    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
30    api::{
31        MatrixVersion, SupportedVersions,
32        client::{
33            discovery::{
34                discover_homeserver::{self, HomeserverInfo, IdentityServerInfo, TileServerInfo},
35                get_capabilities::v3::Capabilities,
36            },
37            rtc::RtcTransport,
38        },
39    },
40    events::{
41        AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, EmptyStateKey, GlobalAccountDataEvent,
42        GlobalAccountDataEventContent, GlobalAccountDataEventType, RedactContent,
43        RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
44        RoomAccountDataEventType, StateEventType, StaticEventContent, StaticStateEventContent,
45        presence::PresenceEvent,
46        receipt::{Receipt, ReceiptThread, ReceiptType},
47    },
48    serde::Raw,
49};
50use serde::{Deserialize, Serialize};
51use thiserror::Error;
52use tokio::sync::{Mutex, MutexGuard};
53
54use super::{
55    ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
56    QueuedRequest, QueuedRequestKind, RoomLoadSettings, StateChanges, StoreError,
57    send_queue::SentRequestKey,
58};
59use crate::{
60    MinimalRoomMemberEvent, RoomInfo, RoomMemberships,
61    deserialized_responses::{
62        DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
63    },
64    store::StoredThreadSubscription,
65};
66
67/// An abstract state store trait that can be used to implement different stores
68/// for the SDK.
69#[cfg_attr(target_family = "wasm", async_trait(?Send))]
70#[cfg_attr(not(target_family = "wasm"), async_trait)]
71pub trait StateStore: AsyncTraitDeps {
72    /// The error type used by this state store.
73    type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error>;
74
75    /// Get key-value data from the store.
76    ///
77    /// # Arguments
78    ///
79    /// * `key` - The key to fetch data for.
80    async fn get_kv_data(
81        &self,
82        key: StateStoreDataKey<'_>,
83    ) -> Result<Option<StateStoreDataValue>, Self::Error>;
84
85    /// Put key-value data into the store.
86    ///
87    /// # Arguments
88    ///
89    /// * `key` - The key to identify the data in the store.
90    ///
91    /// * `value` - The data to insert.
92    ///
93    /// Panics if the key and value variants do not match.
94    async fn set_kv_data(
95        &self,
96        key: StateStoreDataKey<'_>,
97        value: StateStoreDataValue,
98    ) -> Result<(), Self::Error>;
99
100    /// Remove key-value data from the store.
101    ///
102    /// # Arguments
103    ///
104    /// * `key` - The key to remove the data for.
105    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error>;
106
107    /// Save the set of state changes in the store.
108    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error>;
109
110    /// Get the stored presence event for the given user.
111    ///
112    /// # Arguments
113    ///
114    /// * `user_id` - The id of the user for which we wish to fetch the presence
115    /// event for.
116    async fn get_presence_event(
117        &self,
118        user_id: &UserId,
119    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;
120
121    /// Get the stored presence events for the given users.
122    ///
123    /// # Arguments
124    ///
125    /// * `user_ids` - The IDs of the users to fetch the presence events for.
126    async fn get_presence_events(
127        &self,
128        user_ids: &[OwnedUserId],
129    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;
130
131    /// Get a state event out of the state store.
132    ///
133    /// # Arguments
134    ///
135    /// * `room_id` - The id of the room the state event was received for.
136    ///
137    /// * `event_type` - The event type of the state event.
138    async fn get_state_event(
139        &self,
140        room_id: &RoomId,
141        event_type: StateEventType,
142        state_key: &str,
143    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error>;
144
145    /// Get a list of state events for a given room and `StateEventType`.
146    ///
147    /// # Arguments
148    ///
149    /// * `room_id` - The id of the room to find events for.
150    ///
151    /// * `event_type` - The event type.
152    async fn get_state_events(
153        &self,
154        room_id: &RoomId,
155        event_type: StateEventType,
156    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
157
158    /// Get a list of state events for a given room, `StateEventType`, and the
159    /// given state keys.
160    ///
161    /// # Arguments
162    ///
163    /// * `room_id` - The id of the room to find events for.
164    ///
165    /// * `event_type` - The event type.
166    ///
167    /// * `state_keys` - The list of state keys to find.
168    async fn get_state_events_for_keys(
169        &self,
170        room_id: &RoomId,
171        event_type: StateEventType,
172        state_keys: &[&str],
173    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
174
175    /// Get the current profile for the given user in the given room.
176    ///
177    /// # Arguments
178    ///
179    /// * `room_id` - The room id the profile is used in.
180    ///
181    /// * `user_id` - The id of the user the profile belongs to.
182    async fn get_profile(
183        &self,
184        room_id: &RoomId,
185        user_id: &UserId,
186    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error>;
187
188    /// Get the current profiles for the given users in the given room.
189    ///
190    /// # Arguments
191    ///
192    /// * `room_id` - The ID of the room the profiles are used in.
193    ///
194    /// * `user_ids` - The IDs of the users the profiles belong to.
195    async fn get_profiles<'a>(
196        &self,
197        room_id: &RoomId,
198        user_ids: &'a [OwnedUserId],
199    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error>;
200
201    /// Get the user ids of members for a given room with the given memberships,
202    /// for stripped and regular rooms alike.
203    async fn get_user_ids(
204        &self,
205        room_id: &RoomId,
206        memberships: RoomMemberships,
207    ) -> Result<Vec<OwnedUserId>, Self::Error>;
208
209    /// Get a set of pure `RoomInfo`s the store knows about.
210    async fn get_room_infos(
211        &self,
212        room_load_settings: &RoomLoadSettings,
213    ) -> Result<Vec<RoomInfo>, Self::Error>;
214
215    /// Get all the users that use the given display name in the given room.
216    ///
217    /// # Arguments
218    ///
219    /// * `room_id` - The id of the room for which the display name users should
220    /// be fetched for.
221    ///
222    /// * `display_name` - The display name that the users use.
223    async fn get_users_with_display_name(
224        &self,
225        room_id: &RoomId,
226        display_name: &DisplayName,
227    ) -> Result<BTreeSet<OwnedUserId>, Self::Error>;
228
229    /// Get all the users that use the given display names in the given room.
230    ///
231    /// # Arguments
232    ///
233    /// * `room_id` - The ID of the room to fetch the display names for.
234    ///
235    /// * `display_names` - The display names that the users use.
236    async fn get_users_with_display_names<'a>(
237        &self,
238        room_id: &RoomId,
239        display_names: &'a [DisplayName],
240    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error>;
241
242    /// Get an event out of the account data store.
243    ///
244    /// # Arguments
245    ///
246    /// * `event_type` - The event type of the account data event.
247    async fn get_account_data_event(
248        &self,
249        event_type: GlobalAccountDataEventType,
250    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error>;
251
252    /// Get an event out of the room account data store.
253    ///
254    /// # Arguments
255    ///
256    /// * `room_id` - The id of the room for which the room account data event
257    ///   should
258    /// be fetched.
259    ///
260    /// * `event_type` - The event type of the room account data event.
261    async fn get_room_account_data_event(
262        &self,
263        room_id: &RoomId,
264        event_type: RoomAccountDataEventType,
265    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error>;
266
267    /// Get a user's read receipt for a given room and receipt type and thread.
268    ///
269    /// # Arguments
270    ///
271    /// * `room_id` - The id of the room for which the receipt should be
272    ///   fetched.
273    ///
274    /// * `receipt_type` - The type of the receipt.
275    ///
276    /// * `thread` - The thread containing this receipt.
277    ///
278    /// * `user_id` - The id of the user for whom the receipt should be fetched.
279    async fn get_user_room_receipt_event(
280        &self,
281        room_id: &RoomId,
282        receipt_type: ReceiptType,
283        thread: ReceiptThread,
284        user_id: &UserId,
285    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error>;
286
287    /// Get an event's read receipts for a given room, receipt type, and thread.
288    ///
289    /// # Arguments
290    ///
291    /// * `room_id` - The id of the room for which the receipts should be
292    ///   fetched.
293    ///
294    /// * `receipt_type` - The type of the receipts.
295    ///
296    /// * `thread` - The thread containing this receipt.
297    ///
298    /// * `event_id` - The id of the event for which the receipts should be
299    ///   fetched.
300    async fn get_event_room_receipt_events(
301        &self,
302        room_id: &RoomId,
303        receipt_type: ReceiptType,
304        thread: ReceiptThread,
305        event_id: &EventId,
306    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error>;
307
308    /// Get arbitrary data from the custom store
309    ///
310    /// # Arguments
311    ///
312    /// * `key` - The key to fetch data for
313    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
314
315    /// Put arbitrary data into the custom store, return the data previously
316    /// stored
317    ///
318    /// # Arguments
319    ///
320    /// * `key` - The key to insert data into
321    ///
322    /// * `value` - The value to insert
323    async fn set_custom_value(
324        &self,
325        key: &[u8],
326        value: Vec<u8>,
327    ) -> Result<Option<Vec<u8>>, Self::Error>;
328
329    /// Put arbitrary data into the custom store, do not attempt to read any
330    /// previous data
331    ///
332    /// Optimization option for set_custom_values for stores that would perform
333    /// better withouts the extra read and the caller not needing that data
334    /// returned. Otherwise this just wraps around `set_custom_data` and
335    /// discards the result.
336    ///
337    /// # Arguments
338    ///
339    /// * `key` - The key to insert data into
340    ///
341    /// * `value` - The value to insert
342    async fn set_custom_value_no_read(
343        &self,
344        key: &[u8],
345        value: Vec<u8>,
346    ) -> Result<(), Self::Error> {
347        self.set_custom_value(key, value).await.map(|_| ())
348    }
349
350    /// Remove arbitrary data from the custom store and return it if existed
351    ///
352    /// # Arguments
353    ///
354    /// * `key` - The key to remove data from
355    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
356
357    /// Remove a room and all elements associated from the state store.
358    ///
359    /// # Arguments
360    ///
361    /// * `room_id` - The `RoomId` of the room to delete.
362    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>;
363
364    /// Save a request to be sent by a send queue later (e.g. sending an event).
365    ///
366    /// # Arguments
367    ///
368    /// * `room_id` - The `RoomId` of the send queue's room.
369    /// * `transaction_id` - The unique key identifying the event to be sent
370    ///   (and its transaction). Note: this is expected to be randomly generated
371    ///   and thus unique.
372    /// * `content` - Serializable event content to be sent.
373    async fn save_send_queue_request(
374        &self,
375        room_id: &RoomId,
376        transaction_id: OwnedTransactionId,
377        created_at: MilliSecondsSinceUnixEpoch,
378        request: QueuedRequestKind,
379        priority: usize,
380    ) -> Result<(), Self::Error>;
381
382    /// Updates a send queue request with the given content, and resets its
383    /// error status.
384    ///
385    /// # Arguments
386    ///
387    /// * `room_id` - The `RoomId` of the send queue's room.
388    /// * `transaction_id` - The unique key identifying the request to be sent
389    ///   (and its transaction).
390    /// * `content` - Serializable event content to replace the original one.
391    ///
392    /// Returns true if a request has been updated, or false otherwise.
393    async fn update_send_queue_request(
394        &self,
395        room_id: &RoomId,
396        transaction_id: &TransactionId,
397        content: QueuedRequestKind,
398    ) -> Result<bool, Self::Error>;
399
400    /// Remove a request previously inserted with
401    /// [`Self::save_send_queue_request`] from the database, based on its
402    /// transaction id.
403    ///
404    /// Returns true if something has been removed, or false otherwise.
405    async fn remove_send_queue_request(
406        &self,
407        room_id: &RoomId,
408        transaction_id: &TransactionId,
409    ) -> Result<bool, Self::Error>;
410
411    /// Loads all the send queue requests for the given room.
412    ///
413    /// The resulting vector of queued requests should be ordered from higher
414    /// priority to lower priority, and respect the insertion order when
415    /// priorities are equal.
416    async fn load_send_queue_requests(
417        &self,
418        room_id: &RoomId,
419    ) -> Result<Vec<QueuedRequest>, Self::Error>;
420
421    /// Updates the send queue error status (wedge) for a given send queue
422    /// request.
423    async fn update_send_queue_request_status(
424        &self,
425        room_id: &RoomId,
426        transaction_id: &TransactionId,
427        error: Option<QueueWedgeError>,
428    ) -> Result<(), Self::Error>;
429
430    /// Loads all the rooms which have any pending requests in their send queue.
431    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
432
433    /// Add a new entry to the list of dependent send queue requests for a
434    /// parent request.
435    async fn save_dependent_queued_request(
436        &self,
437        room_id: &RoomId,
438        parent_txn_id: &TransactionId,
439        own_txn_id: ChildTransactionId,
440        created_at: MilliSecondsSinceUnixEpoch,
441        content: DependentQueuedRequestKind,
442    ) -> Result<(), Self::Error>;
443
444    /// Mark a set of dependent send queue requests as ready, using a key
445    /// identifying the homeserver's response.
446    ///
447    /// âš  Beware! There's no verification applied that the parent key type is
448    /// compatible with the dependent event type. The invalid state may be
449    /// lazily filtered out in `load_dependent_queued_requests`.
450    ///
451    /// Returns the number of updated requests.
452    async fn mark_dependent_queued_requests_as_ready(
453        &self,
454        room_id: &RoomId,
455        parent_txn_id: &TransactionId,
456        sent_parent_key: SentRequestKey,
457    ) -> Result<usize, Self::Error>;
458
459    /// Update a dependent send queue request with the new content.
460    ///
461    /// Returns true if the request was found and could be updated.
462    async fn update_dependent_queued_request(
463        &self,
464        room_id: &RoomId,
465        own_transaction_id: &ChildTransactionId,
466        new_content: DependentQueuedRequestKind,
467    ) -> Result<bool, Self::Error>;
468
469    /// Remove a specific dependent send queue request by id.
470    ///
471    /// Returns true if the dependent send queue request has been indeed
472    /// removed.
473    async fn remove_dependent_queued_request(
474        &self,
475        room: &RoomId,
476        own_txn_id: &ChildTransactionId,
477    ) -> Result<bool, Self::Error>;
478
479    /// List all the dependent send queue requests.
480    ///
481    /// This returns absolutely all the dependent send queue requests, whether
482    /// they have a parent event id or not. As a contract for implementors, they
483    /// must be returned in insertion order.
484    async fn load_dependent_queued_requests(
485        &self,
486        room: &RoomId,
487    ) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
488
489    /// Inserts or updates multiple thread subscriptions.
490    ///
491    /// If the new thread subscription hasn't set a bumpstamp, and there was a
492    /// previous subscription in the database with a bumpstamp, the existing
493    /// bumpstamp is kept.
494    ///
495    /// If the new thread subscription has a bumpstamp that's lower than or
496    /// equal to a previous one, the existing subscription is kept, i.e.
497    /// this method must have no effect.
498    async fn upsert_thread_subscriptions(
499        &self,
500        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
501    ) -> Result<(), Self::Error>;
502
503    /// Remove a previous thread subscription for a given room and thread.
504    ///
505    /// Note: removing an unknown thread subscription is a no-op.
506    async fn remove_thread_subscription(
507        &self,
508        room: &RoomId,
509        thread_id: &EventId,
510    ) -> Result<(), Self::Error>;
511
512    /// Loads the current thread subscription for a given room and thread.
513    ///
514    /// Returns `None` if there was no entry for the given room/thread pair.
515    async fn load_thread_subscription(
516        &self,
517        room: &RoomId,
518        thread_id: &EventId,
519    ) -> Result<Option<StoredThreadSubscription>, Self::Error>;
520
521    /// Close the store, releasing all held resources (database connections,
522    /// file descriptors, file locks).
523    ///
524    /// In-flight operations complete before this method returns. After it
525    /// returns, operations will fail until [`Self::reopen()`] is called.
526    async fn close(&self) -> Result<(), Self::Error>;
527
528    /// Reopen the store after a [`Self::close()`], re-acquiring database
529    /// connections.
530    async fn reopen(&self) -> Result<(), Self::Error>;
531
532    /// Perform database optimizations if any are available, i.e. vacuuming in
533    /// SQLite.
534    ///
535    /// /// **Warning:** this was added to check if SQLite fragmentation was the
536    /// source of performance issues, **DO NOT use in production**.
537    #[doc(hidden)]
538    async fn optimize(&self) -> Result<(), Self::Error>;
539
540    /// Returns the size of the store in bytes, if known.
541    async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
542}
543
544#[cfg_attr(target_family = "wasm", async_trait(?Send))]
545#[cfg_attr(not(target_family = "wasm"), async_trait)]
546impl<T: StateStore> StateStore for &T {
547    type Error = T::Error;
548
549    async fn get_kv_data(
550        &self,
551        key: StateStoreDataKey<'_>,
552    ) -> Result<Option<StateStoreDataValue>, Self::Error> {
553        (*self).get_kv_data(key).await
554    }
555
556    async fn set_kv_data(
557        &self,
558        key: StateStoreDataKey<'_>,
559        value: StateStoreDataValue,
560    ) -> Result<(), Self::Error> {
561        (*self).set_kv_data(key, value).await
562    }
563
564    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
565        (*self).remove_kv_data(key).await
566    }
567
568    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
569        (*self).save_changes(changes).await
570    }
571
572    async fn get_presence_event(
573        &self,
574        user_id: &UserId,
575    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
576        (*self).get_presence_event(user_id).await
577    }
578
579    async fn get_presence_events(
580        &self,
581        user_ids: &[OwnedUserId],
582    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
583        (*self).get_presence_events(user_ids).await
584    }
585
586    async fn get_state_event(
587        &self,
588        room_id: &RoomId,
589        event_type: StateEventType,
590        state_key: &str,
591    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
592        (*self).get_state_event(room_id, event_type, state_key).await
593    }
594
595    async fn get_state_events(
596        &self,
597        room_id: &RoomId,
598        event_type: StateEventType,
599    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
600        (*self).get_state_events(room_id, event_type).await
601    }
602
603    async fn get_state_events_for_keys(
604        &self,
605        room_id: &RoomId,
606        event_type: StateEventType,
607        state_keys: &[&str],
608    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
609        (*self).get_state_events_for_keys(room_id, event_type, state_keys).await
610    }
611
612    async fn get_profile(
613        &self,
614        room_id: &RoomId,
615        user_id: &UserId,
616    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
617        (*self).get_profile(room_id, user_id).await
618    }
619
620    async fn get_profiles<'a>(
621        &self,
622        room_id: &RoomId,
623        user_ids: &'a [OwnedUserId],
624    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
625        (*self).get_profiles(room_id, user_ids).await
626    }
627
628    async fn get_user_ids(
629        &self,
630        room_id: &RoomId,
631        memberships: RoomMemberships,
632    ) -> Result<Vec<OwnedUserId>, Self::Error> {
633        (*self).get_user_ids(room_id, memberships).await
634    }
635
636    async fn get_room_infos(
637        &self,
638        room_load_settings: &RoomLoadSettings,
639    ) -> Result<Vec<RoomInfo>, Self::Error> {
640        (*self).get_room_infos(room_load_settings).await
641    }
642
643    async fn get_users_with_display_name(
644        &self,
645        room_id: &RoomId,
646        display_name: &DisplayName,
647    ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
648        (*self).get_users_with_display_name(room_id, display_name).await
649    }
650
651    async fn get_users_with_display_names<'a>(
652        &self,
653        room_id: &RoomId,
654        display_names: &'a [DisplayName],
655    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
656        (*self).get_users_with_display_names(room_id, display_names).await
657    }
658
659    async fn get_account_data_event(
660        &self,
661        event_type: GlobalAccountDataEventType,
662    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
663        (*self).get_account_data_event(event_type).await
664    }
665
666    async fn get_room_account_data_event(
667        &self,
668        room_id: &RoomId,
669        event_type: RoomAccountDataEventType,
670    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
671        (*self).get_room_account_data_event(room_id, event_type).await
672    }
673
674    async fn get_user_room_receipt_event(
675        &self,
676        room_id: &RoomId,
677        receipt_type: ReceiptType,
678        thread: ReceiptThread,
679        user_id: &UserId,
680    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
681        (*self).get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
682    }
683
684    async fn get_event_room_receipt_events(
685        &self,
686        room_id: &RoomId,
687        receipt_type: ReceiptType,
688        thread: ReceiptThread,
689        event_id: &EventId,
690    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
691        (*self).get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
692    }
693
694    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
695        (*self).get_custom_value(key).await
696    }
697
698    async fn set_custom_value(
699        &self,
700        key: &[u8],
701        value: Vec<u8>,
702    ) -> Result<Option<Vec<u8>>, Self::Error> {
703        (*self).set_custom_value(key, value).await
704    }
705
706    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
707        (*self).remove_custom_value(key).await
708    }
709
710    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
711        (*self).remove_room(room_id).await
712    }
713
714    async fn save_send_queue_request(
715        &self,
716        room_id: &RoomId,
717        transaction_id: OwnedTransactionId,
718        created_at: MilliSecondsSinceUnixEpoch,
719        request: QueuedRequestKind,
720        priority: usize,
721    ) -> Result<(), Self::Error> {
722        (*self)
723            .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
724            .await
725    }
726
727    async fn update_send_queue_request(
728        &self,
729        room_id: &RoomId,
730        transaction_id: &TransactionId,
731        content: QueuedRequestKind,
732    ) -> Result<bool, Self::Error> {
733        (*self).update_send_queue_request(room_id, transaction_id, content).await
734    }
735
736    async fn remove_send_queue_request(
737        &self,
738        room_id: &RoomId,
739        transaction_id: &TransactionId,
740    ) -> Result<bool, Self::Error> {
741        (*self).remove_send_queue_request(room_id, transaction_id).await
742    }
743
744    async fn load_send_queue_requests(
745        &self,
746        room_id: &RoomId,
747    ) -> Result<Vec<QueuedRequest>, Self::Error> {
748        (*self).load_send_queue_requests(room_id).await
749    }
750
751    async fn update_send_queue_request_status(
752        &self,
753        room_id: &RoomId,
754        transaction_id: &TransactionId,
755        error: Option<QueueWedgeError>,
756    ) -> Result<(), Self::Error> {
757        (*self).update_send_queue_request_status(room_id, transaction_id, error).await
758    }
759
760    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
761        (*self).load_rooms_with_unsent_requests().await
762    }
763
764    async fn save_dependent_queued_request(
765        &self,
766        room_id: &RoomId,
767        parent_txn_id: &TransactionId,
768        own_txn_id: ChildTransactionId,
769        created_at: MilliSecondsSinceUnixEpoch,
770        content: DependentQueuedRequestKind,
771    ) -> Result<(), Self::Error> {
772        (*self)
773            .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
774            .await
775    }
776
777    async fn mark_dependent_queued_requests_as_ready(
778        &self,
779        room_id: &RoomId,
780        parent_txn_id: &TransactionId,
781        sent_parent_key: SentRequestKey,
782    ) -> Result<usize, Self::Error> {
783        (*self)
784            .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
785            .await
786    }
787
788    async fn update_dependent_queued_request(
789        &self,
790        room_id: &RoomId,
791        own_transaction_id: &ChildTransactionId,
792        new_content: DependentQueuedRequestKind,
793    ) -> Result<bool, Self::Error> {
794        (*self).update_dependent_queued_request(room_id, own_transaction_id, new_content).await
795    }
796
797    async fn remove_dependent_queued_request(
798        &self,
799        room: &RoomId,
800        own_txn_id: &ChildTransactionId,
801    ) -> Result<bool, Self::Error> {
802        (*self).remove_dependent_queued_request(room, own_txn_id).await
803    }
804
805    async fn load_dependent_queued_requests(
806        &self,
807        room: &RoomId,
808    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
809        (*self).load_dependent_queued_requests(room).await
810    }
811
812    async fn upsert_thread_subscriptions(
813        &self,
814        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
815    ) -> Result<(), Self::Error> {
816        (*self).upsert_thread_subscriptions(updates).await
817    }
818
819    async fn remove_thread_subscription(
820        &self,
821        room: &RoomId,
822        thread_id: &EventId,
823    ) -> Result<(), Self::Error> {
824        (*self).remove_thread_subscription(room, thread_id).await
825    }
826
827    async fn load_thread_subscription(
828        &self,
829        room: &RoomId,
830        thread_id: &EventId,
831    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
832        (*self).load_thread_subscription(room, thread_id).await
833    }
834
835    async fn close(&self) -> Result<(), Self::Error> {
836        (*self).close().await
837    }
838
839    async fn reopen(&self) -> Result<(), Self::Error> {
840        (*self).reopen().await
841    }
842
843    async fn optimize(&self) -> Result<(), Self::Error> {
844        (*self).optimize().await
845    }
846
847    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
848        (*self).get_size().await
849    }
850}
851
852#[cfg_attr(target_family = "wasm", async_trait(?Send))]
853#[cfg_attr(not(target_family = "wasm"), async_trait)]
854impl<T: StateStore + ?Sized> StateStore for Arc<T> {
855    type Error = T::Error;
856
857    async fn get_kv_data(
858        &self,
859        key: StateStoreDataKey<'_>,
860    ) -> Result<Option<StateStoreDataValue>, Self::Error> {
861        self.deref().get_kv_data(key).await
862    }
863
864    async fn set_kv_data(
865        &self,
866        key: StateStoreDataKey<'_>,
867        value: StateStoreDataValue,
868    ) -> Result<(), Self::Error> {
869        self.deref().set_kv_data(key, value).await
870    }
871
872    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
873        self.deref().remove_kv_data(key).await
874    }
875
876    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
877        self.deref().save_changes(changes).await
878    }
879
880    async fn get_presence_event(
881        &self,
882        user_id: &UserId,
883    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
884        self.deref().get_presence_event(user_id).await
885    }
886
887    async fn get_presence_events(
888        &self,
889        user_ids: &[OwnedUserId],
890    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
891        self.deref().get_presence_events(user_ids).await
892    }
893
894    async fn get_state_event(
895        &self,
896        room_id: &RoomId,
897        event_type: StateEventType,
898        state_key: &str,
899    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
900        self.deref().get_state_event(room_id, event_type, state_key).await
901    }
902
903    async fn get_state_events(
904        &self,
905        room_id: &RoomId,
906        event_type: StateEventType,
907    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
908        self.deref().get_state_events(room_id, event_type).await
909    }
910
911    async fn get_state_events_for_keys(
912        &self,
913        room_id: &RoomId,
914        event_type: StateEventType,
915        state_keys: &[&str],
916    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
917        self.deref().get_state_events_for_keys(room_id, event_type, state_keys).await
918    }
919
920    async fn get_profile(
921        &self,
922        room_id: &RoomId,
923        user_id: &UserId,
924    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
925        self.deref().get_profile(room_id, user_id).await
926    }
927
928    async fn get_profiles<'a>(
929        &self,
930        room_id: &RoomId,
931        user_ids: &'a [OwnedUserId],
932    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
933        self.deref().get_profiles(room_id, user_ids).await
934    }
935
936    async fn get_user_ids(
937        &self,
938        room_id: &RoomId,
939        memberships: RoomMemberships,
940    ) -> Result<Vec<OwnedUserId>, Self::Error> {
941        self.deref().get_user_ids(room_id, memberships).await
942    }
943
944    async fn get_room_infos(
945        &self,
946        room_load_settings: &RoomLoadSettings,
947    ) -> Result<Vec<RoomInfo>, Self::Error> {
948        self.deref().get_room_infos(room_load_settings).await
949    }
950
951    async fn get_users_with_display_name(
952        &self,
953        room_id: &RoomId,
954        display_name: &DisplayName,
955    ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
956        self.deref().get_users_with_display_name(room_id, display_name).await
957    }
958
959    async fn get_users_with_display_names<'a>(
960        &self,
961        room_id: &RoomId,
962        display_names: &'a [DisplayName],
963    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
964        self.deref().get_users_with_display_names(room_id, display_names).await
965    }
966
967    async fn get_account_data_event(
968        &self,
969        event_type: GlobalAccountDataEventType,
970    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
971        self.deref().get_account_data_event(event_type).await
972    }
973
974    async fn get_room_account_data_event(
975        &self,
976        room_id: &RoomId,
977        event_type: RoomAccountDataEventType,
978    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
979        self.deref().get_room_account_data_event(room_id, event_type).await
980    }
981
982    async fn get_user_room_receipt_event(
983        &self,
984        room_id: &RoomId,
985        receipt_type: ReceiptType,
986        thread: ReceiptThread,
987        user_id: &UserId,
988    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
989        self.deref().get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
990    }
991
992    async fn get_event_room_receipt_events(
993        &self,
994        room_id: &RoomId,
995        receipt_type: ReceiptType,
996        thread: ReceiptThread,
997        event_id: &EventId,
998    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
999        self.deref().get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
1000    }
1001
1002    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1003        self.deref().get_custom_value(key).await
1004    }
1005
1006    async fn set_custom_value(
1007        &self,
1008        key: &[u8],
1009        value: Vec<u8>,
1010    ) -> Result<Option<Vec<u8>>, Self::Error> {
1011        self.deref().set_custom_value(key, value).await
1012    }
1013
1014    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1015        self.deref().remove_custom_value(key).await
1016    }
1017
1018    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1019        self.deref().remove_room(room_id).await
1020    }
1021
1022    async fn save_send_queue_request(
1023        &self,
1024        room_id: &RoomId,
1025        transaction_id: OwnedTransactionId,
1026        created_at: MilliSecondsSinceUnixEpoch,
1027        request: QueuedRequestKind,
1028        priority: usize,
1029    ) -> Result<(), Self::Error> {
1030        self.deref()
1031            .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
1032            .await
1033    }
1034
1035    async fn update_send_queue_request(
1036        &self,
1037        room_id: &RoomId,
1038        transaction_id: &TransactionId,
1039        content: QueuedRequestKind,
1040    ) -> Result<bool, Self::Error> {
1041        self.deref().update_send_queue_request(room_id, transaction_id, content).await
1042    }
1043
1044    async fn remove_send_queue_request(
1045        &self,
1046        room_id: &RoomId,
1047        transaction_id: &TransactionId,
1048    ) -> Result<bool, Self::Error> {
1049        self.deref().remove_send_queue_request(room_id, transaction_id).await
1050    }
1051
1052    async fn load_send_queue_requests(
1053        &self,
1054        room_id: &RoomId,
1055    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1056        self.deref().load_send_queue_requests(room_id).await
1057    }
1058
1059    async fn update_send_queue_request_status(
1060        &self,
1061        room_id: &RoomId,
1062        transaction_id: &TransactionId,
1063        error: Option<QueueWedgeError>,
1064    ) -> Result<(), Self::Error> {
1065        self.deref().update_send_queue_request_status(room_id, transaction_id, error).await
1066    }
1067
1068    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1069        self.deref().load_rooms_with_unsent_requests().await
1070    }
1071
1072    async fn save_dependent_queued_request(
1073        &self,
1074        room_id: &RoomId,
1075        parent_txn_id: &TransactionId,
1076        own_txn_id: ChildTransactionId,
1077        created_at: MilliSecondsSinceUnixEpoch,
1078        content: DependentQueuedRequestKind,
1079    ) -> Result<(), Self::Error> {
1080        self.deref()
1081            .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1082            .await
1083    }
1084
1085    async fn mark_dependent_queued_requests_as_ready(
1086        &self,
1087        room_id: &RoomId,
1088        parent_txn_id: &TransactionId,
1089        sent_parent_key: SentRequestKey,
1090    ) -> Result<usize, Self::Error> {
1091        self.deref()
1092            .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1093            .await
1094    }
1095
1096    async fn update_dependent_queued_request(
1097        &self,
1098        room_id: &RoomId,
1099        own_transaction_id: &ChildTransactionId,
1100        new_content: DependentQueuedRequestKind,
1101    ) -> Result<bool, Self::Error> {
1102        self.deref().update_dependent_queued_request(room_id, own_transaction_id, new_content).await
1103    }
1104
1105    async fn remove_dependent_queued_request(
1106        &self,
1107        room: &RoomId,
1108        own_txn_id: &ChildTransactionId,
1109    ) -> Result<bool, Self::Error> {
1110        self.deref().remove_dependent_queued_request(room, own_txn_id).await
1111    }
1112
1113    async fn load_dependent_queued_requests(
1114        &self,
1115        room: &RoomId,
1116    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1117        self.deref().load_dependent_queued_requests(room).await
1118    }
1119
1120    async fn upsert_thread_subscriptions(
1121        &self,
1122        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1123    ) -> Result<(), Self::Error> {
1124        self.deref().upsert_thread_subscriptions(updates).await
1125    }
1126
1127    async fn remove_thread_subscription(
1128        &self,
1129        room: &RoomId,
1130        thread_id: &EventId,
1131    ) -> Result<(), Self::Error> {
1132        self.deref().remove_thread_subscription(room, thread_id).await
1133    }
1134
1135    async fn load_thread_subscription(
1136        &self,
1137        room: &RoomId,
1138        thread_id: &EventId,
1139    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1140        self.deref().load_thread_subscription(room, thread_id).await
1141    }
1142
1143    async fn close(&self) -> Result<(), Self::Error> {
1144        self.deref().close().await
1145    }
1146
1147    async fn reopen(&self) -> Result<(), Self::Error> {
1148        self.deref().reopen().await
1149    }
1150
1151    async fn optimize(&self) -> Result<(), Self::Error> {
1152        self.deref().optimize().await
1153    }
1154
1155    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1156        self.deref().get_size().await
1157    }
1158}
1159
1160#[repr(transparent)]
1161struct EraseStateStoreError<T>(T);
1162
1163#[cfg(not(tarpaulin_include))]
1164impl<T: fmt::Debug> fmt::Debug for EraseStateStoreError<T> {
1165    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1166        self.0.fmt(f)
1167    }
1168}
1169
1170#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1171#[cfg_attr(not(target_family = "wasm"), async_trait)]
1172impl<T: StateStore> StateStore for EraseStateStoreError<T> {
1173    type Error = StoreError;
1174
1175    async fn get_kv_data(
1176        &self,
1177        key: StateStoreDataKey<'_>,
1178    ) -> Result<Option<StateStoreDataValue>, Self::Error> {
1179        self.0.get_kv_data(key).await.map_err(Into::into)
1180    }
1181
1182    async fn set_kv_data(
1183        &self,
1184        key: StateStoreDataKey<'_>,
1185        value: StateStoreDataValue,
1186    ) -> Result<(), Self::Error> {
1187        self.0.set_kv_data(key, value).await.map_err(Into::into)
1188    }
1189
1190    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
1191        self.0.remove_kv_data(key).await.map_err(Into::into)
1192    }
1193
1194    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
1195        self.0.save_changes(changes).await.map_err(Into::into)
1196    }
1197
1198    async fn get_presence_event(
1199        &self,
1200        user_id: &UserId,
1201    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
1202        self.0.get_presence_event(user_id).await.map_err(Into::into)
1203    }
1204
1205    async fn get_presence_events(
1206        &self,
1207        user_ids: &[OwnedUserId],
1208    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
1209        self.0.get_presence_events(user_ids).await.map_err(Into::into)
1210    }
1211
1212    async fn get_state_event(
1213        &self,
1214        room_id: &RoomId,
1215        event_type: StateEventType,
1216        state_key: &str,
1217    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
1218        self.0.get_state_event(room_id, event_type, state_key).await.map_err(Into::into)
1219    }
1220
1221    async fn get_state_events(
1222        &self,
1223        room_id: &RoomId,
1224        event_type: StateEventType,
1225    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1226        self.0.get_state_events(room_id, event_type).await.map_err(Into::into)
1227    }
1228
1229    async fn get_state_events_for_keys(
1230        &self,
1231        room_id: &RoomId,
1232        event_type: StateEventType,
1233        state_keys: &[&str],
1234    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1235        self.0.get_state_events_for_keys(room_id, event_type, state_keys).await.map_err(Into::into)
1236    }
1237
1238    async fn get_profile(
1239        &self,
1240        room_id: &RoomId,
1241        user_id: &UserId,
1242    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
1243        self.0.get_profile(room_id, user_id).await.map_err(Into::into)
1244    }
1245
1246    async fn get_profiles<'a>(
1247        &self,
1248        room_id: &RoomId,
1249        user_ids: &'a [OwnedUserId],
1250    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
1251        self.0.get_profiles(room_id, user_ids).await.map_err(Into::into)
1252    }
1253
1254    async fn get_user_ids(
1255        &self,
1256        room_id: &RoomId,
1257        memberships: RoomMemberships,
1258    ) -> Result<Vec<OwnedUserId>, Self::Error> {
1259        self.0.get_user_ids(room_id, memberships).await.map_err(Into::into)
1260    }
1261
1262    async fn get_room_infos(
1263        &self,
1264        room_load_settings: &RoomLoadSettings,
1265    ) -> Result<Vec<RoomInfo>, Self::Error> {
1266        self.0.get_room_infos(room_load_settings).await.map_err(Into::into)
1267    }
1268
1269    async fn get_users_with_display_name(
1270        &self,
1271        room_id: &RoomId,
1272        display_name: &DisplayName,
1273    ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
1274        self.0.get_users_with_display_name(room_id, display_name).await.map_err(Into::into)
1275    }
1276
1277    async fn get_users_with_display_names<'a>(
1278        &self,
1279        room_id: &RoomId,
1280        display_names: &'a [DisplayName],
1281    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
1282        self.0.get_users_with_display_names(room_id, display_names).await.map_err(Into::into)
1283    }
1284
1285    async fn get_account_data_event(
1286        &self,
1287        event_type: GlobalAccountDataEventType,
1288    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
1289        self.0.get_account_data_event(event_type).await.map_err(Into::into)
1290    }
1291
1292    async fn get_room_account_data_event(
1293        &self,
1294        room_id: &RoomId,
1295        event_type: RoomAccountDataEventType,
1296    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
1297        self.0.get_room_account_data_event(room_id, event_type).await.map_err(Into::into)
1298    }
1299
1300    async fn get_user_room_receipt_event(
1301        &self,
1302        room_id: &RoomId,
1303        receipt_type: ReceiptType,
1304        thread: ReceiptThread,
1305        user_id: &UserId,
1306    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
1307        self.0
1308            .get_user_room_receipt_event(room_id, receipt_type, thread, user_id)
1309            .await
1310            .map_err(Into::into)
1311    }
1312
1313    async fn get_event_room_receipt_events(
1314        &self,
1315        room_id: &RoomId,
1316        receipt_type: ReceiptType,
1317        thread: ReceiptThread,
1318        event_id: &EventId,
1319    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
1320        self.0
1321            .get_event_room_receipt_events(room_id, receipt_type, thread, event_id)
1322            .await
1323            .map_err(Into::into)
1324    }
1325
1326    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1327        self.0.get_custom_value(key).await.map_err(Into::into)
1328    }
1329
1330    async fn set_custom_value(
1331        &self,
1332        key: &[u8],
1333        value: Vec<u8>,
1334    ) -> Result<Option<Vec<u8>>, Self::Error> {
1335        self.0.set_custom_value(key, value).await.map_err(Into::into)
1336    }
1337
1338    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1339        self.0.remove_custom_value(key).await.map_err(Into::into)
1340    }
1341
1342    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1343        self.0.remove_room(room_id).await.map_err(Into::into)
1344    }
1345
1346    async fn save_send_queue_request(
1347        &self,
1348        room_id: &RoomId,
1349        transaction_id: OwnedTransactionId,
1350        created_at: MilliSecondsSinceUnixEpoch,
1351        content: QueuedRequestKind,
1352        priority: usize,
1353    ) -> Result<(), Self::Error> {
1354        self.0
1355            .save_send_queue_request(room_id, transaction_id, created_at, content, priority)
1356            .await
1357            .map_err(Into::into)
1358    }
1359
1360    async fn update_send_queue_request(
1361        &self,
1362        room_id: &RoomId,
1363        transaction_id: &TransactionId,
1364        content: QueuedRequestKind,
1365    ) -> Result<bool, Self::Error> {
1366        self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
1367    }
1368
1369    async fn remove_send_queue_request(
1370        &self,
1371        room_id: &RoomId,
1372        transaction_id: &TransactionId,
1373    ) -> Result<bool, Self::Error> {
1374        self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into)
1375    }
1376
1377    async fn load_send_queue_requests(
1378        &self,
1379        room_id: &RoomId,
1380    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1381        self.0.load_send_queue_requests(room_id).await.map_err(Into::into)
1382    }
1383
1384    async fn update_send_queue_request_status(
1385        &self,
1386        room_id: &RoomId,
1387        transaction_id: &TransactionId,
1388        error: Option<QueueWedgeError>,
1389    ) -> Result<(), Self::Error> {
1390        self.0
1391            .update_send_queue_request_status(room_id, transaction_id, error)
1392            .await
1393            .map_err(Into::into)
1394    }
1395
1396    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1397        self.0.load_rooms_with_unsent_requests().await.map_err(Into::into)
1398    }
1399
1400    async fn save_dependent_queued_request(
1401        &self,
1402        room_id: &RoomId,
1403        parent_txn_id: &TransactionId,
1404        own_txn_id: ChildTransactionId,
1405        created_at: MilliSecondsSinceUnixEpoch,
1406        content: DependentQueuedRequestKind,
1407    ) -> Result<(), Self::Error> {
1408        self.0
1409            .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1410            .await
1411            .map_err(Into::into)
1412    }
1413
1414    async fn mark_dependent_queued_requests_as_ready(
1415        &self,
1416        room_id: &RoomId,
1417        parent_txn_id: &TransactionId,
1418        sent_parent_key: SentRequestKey,
1419    ) -> Result<usize, Self::Error> {
1420        self.0
1421            .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1422            .await
1423            .map_err(Into::into)
1424    }
1425
1426    async fn remove_dependent_queued_request(
1427        &self,
1428        room_id: &RoomId,
1429        own_txn_id: &ChildTransactionId,
1430    ) -> Result<bool, Self::Error> {
1431        self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into)
1432    }
1433
1434    async fn load_dependent_queued_requests(
1435        &self,
1436        room_id: &RoomId,
1437    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1438        self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
1439    }
1440
1441    async fn update_dependent_queued_request(
1442        &self,
1443        room_id: &RoomId,
1444        own_transaction_id: &ChildTransactionId,
1445        new_content: DependentQueuedRequestKind,
1446    ) -> Result<bool, Self::Error> {
1447        self.0
1448            .update_dependent_queued_request(room_id, own_transaction_id, new_content)
1449            .await
1450            .map_err(Into::into)
1451    }
1452
1453    async fn upsert_thread_subscriptions(
1454        &self,
1455        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1456    ) -> Result<(), Self::Error> {
1457        self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
1458    }
1459
1460    async fn load_thread_subscription(
1461        &self,
1462        room: &RoomId,
1463        thread_id: &EventId,
1464    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1465        self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
1466    }
1467
1468    async fn remove_thread_subscription(
1469        &self,
1470        room: &RoomId,
1471        thread_id: &EventId,
1472    ) -> Result<(), Self::Error> {
1473        self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
1474    }
1475
1476    async fn close(&self) -> Result<(), Self::Error> {
1477        self.0.close().await.map_err(Into::into)
1478    }
1479
1480    async fn reopen(&self) -> Result<(), Self::Error> {
1481        self.0.reopen().await.map_err(Into::into)
1482    }
1483
1484    async fn optimize(&self) -> Result<(), Self::Error> {
1485        self.0.optimize().await.map_err(Into::into)
1486    }
1487
1488    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1489        self.0.get_size().await.map_err(Into::into)
1490    }
1491}
1492
1493/// A wrapper around a [`StateStore`] that supports synchronizing calls to
1494/// [`StateStore::save_changes`].
1495#[derive(Debug, Clone)]
1496pub struct SaveLockedStateStore<T = Arc<DynStateStore>> {
1497    store: T,
1498    lock: Arc<Mutex<()>>,
1499}
1500
1501/// An error type that represents a scenario where a [`MutexGuard`] provided to
1502/// a function does not reference the underlying [`Mutex`] in the enclosing
1503/// [`SaveLockedStateStore`].
1504#[derive(Debug, Error)]
1505#[error("a mutex guard was provided, but it does not reference the correct mutex")]
1506pub struct IncorrectMutexGuardError;
1507
1508impl From<IncorrectMutexGuardError> for StoreError {
1509    fn from(value: IncorrectMutexGuardError) -> Self {
1510        Self::backend(value)
1511    }
1512}
1513
1514impl<T> SaveLockedStateStore<T> {
1515    /// Creates a new [`SaveLockedStateStore`] with the provided store.
1516    pub fn new(store: T) -> Self {
1517        Self { store, lock: Arc::new(Mutex::new(())) }
1518    }
1519
1520    /// Returns a reference to the underlying [`Mutex`] used to synchronize
1521    /// calls to [`StateStore::save_changes`].
1522    pub fn lock(&self) -> &Mutex<()> {
1523        self.lock.as_ref()
1524    }
1525}
1526
1527impl<T: StateStore> SaveLockedStateStore<T> {
1528    /// Provides a means of calling [`StateStore::save_changes`] when the caller
1529    /// has already acquired the underlying [`Mutex`]. Returns an error if
1530    /// the [`MutexGuard`] provided does not reference the underlying
1531    /// [`Mutex`].
1532    pub async fn save_changes_with_guard(
1533        &self,
1534        guard: &MutexGuard<'_, ()>,
1535        changes: &StateChanges,
1536    ) -> Result<(), StoreError> {
1537        if !std::ptr::eq(MutexGuard::mutex(guard), self.lock()) {
1538            Err(IncorrectMutexGuardError.into())
1539        } else {
1540            self.store.save_changes(changes).await.map_err(Into::into)
1541        }
1542    }
1543}
1544
1545#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1546#[cfg_attr(not(target_family = "wasm"), async_trait)]
1547impl<T: StateStore> StateStore for SaveLockedStateStore<T> {
1548    type Error = T::Error;
1549
1550    async fn get_kv_data(
1551        &self,
1552        key: StateStoreDataKey<'_>,
1553    ) -> Result<Option<StateStoreDataValue>, Self::Error> {
1554        self.store.get_kv_data(key).await
1555    }
1556
1557    async fn set_kv_data(
1558        &self,
1559        key: StateStoreDataKey<'_>,
1560        value: StateStoreDataValue,
1561    ) -> Result<(), Self::Error> {
1562        self.store.set_kv_data(key, value).await
1563    }
1564
1565    async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
1566        self.store.remove_kv_data(key).await
1567    }
1568
1569    async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
1570        let _guard = self.lock.lock().await;
1571        self.store.save_changes(changes).await
1572    }
1573
1574    async fn get_presence_event(
1575        &self,
1576        user_id: &UserId,
1577    ) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
1578        self.store.get_presence_event(user_id).await
1579    }
1580
1581    async fn get_presence_events(
1582        &self,
1583        user_ids: &[OwnedUserId],
1584    ) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
1585        self.store.get_presence_events(user_ids).await
1586    }
1587
1588    async fn get_state_event(
1589        &self,
1590        room_id: &RoomId,
1591        event_type: StateEventType,
1592        state_key: &str,
1593    ) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
1594        self.store.get_state_event(room_id, event_type, state_key).await
1595    }
1596
1597    async fn get_state_events(
1598        &self,
1599        room_id: &RoomId,
1600        event_type: StateEventType,
1601    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1602        self.store.get_state_events(room_id, event_type).await
1603    }
1604
1605    async fn get_state_events_for_keys(
1606        &self,
1607        room_id: &RoomId,
1608        event_type: StateEventType,
1609        state_keys: &[&str],
1610    ) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
1611        self.store.get_state_events_for_keys(room_id, event_type, state_keys).await
1612    }
1613
1614    async fn get_profile(
1615        &self,
1616        room_id: &RoomId,
1617        user_id: &UserId,
1618    ) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
1619        self.store.get_profile(room_id, user_id).await
1620    }
1621
1622    async fn get_profiles<'a>(
1623        &self,
1624        room_id: &RoomId,
1625        user_ids: &'a [OwnedUserId],
1626    ) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
1627        self.store.get_profiles(room_id, user_ids).await
1628    }
1629
1630    async fn get_user_ids(
1631        &self,
1632        room_id: &RoomId,
1633        memberships: RoomMemberships,
1634    ) -> Result<Vec<OwnedUserId>, Self::Error> {
1635        self.store.get_user_ids(room_id, memberships).await
1636    }
1637
1638    async fn get_room_infos(
1639        &self,
1640        room_load_settings: &RoomLoadSettings,
1641    ) -> Result<Vec<RoomInfo>, Self::Error> {
1642        self.store.get_room_infos(room_load_settings).await
1643    }
1644
1645    async fn get_users_with_display_name(
1646        &self,
1647        room_id: &RoomId,
1648        display_name: &DisplayName,
1649    ) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
1650        self.store.get_users_with_display_name(room_id, display_name).await
1651    }
1652
1653    async fn get_users_with_display_names<'a>(
1654        &self,
1655        room_id: &RoomId,
1656        display_names: &'a [DisplayName],
1657    ) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
1658        self.store.get_users_with_display_names(room_id, display_names).await
1659    }
1660
1661    async fn get_account_data_event(
1662        &self,
1663        event_type: GlobalAccountDataEventType,
1664    ) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
1665        self.store.get_account_data_event(event_type).await
1666    }
1667
1668    async fn get_room_account_data_event(
1669        &self,
1670        room_id: &RoomId,
1671        event_type: RoomAccountDataEventType,
1672    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
1673        self.store.get_room_account_data_event(room_id, event_type).await
1674    }
1675
1676    async fn get_user_room_receipt_event(
1677        &self,
1678        room_id: &RoomId,
1679        receipt_type: ReceiptType,
1680        thread: ReceiptThread,
1681        user_id: &UserId,
1682    ) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
1683        self.store.get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
1684    }
1685
1686    async fn get_event_room_receipt_events(
1687        &self,
1688        room_id: &RoomId,
1689        receipt_type: ReceiptType,
1690        thread: ReceiptThread,
1691        event_id: &EventId,
1692    ) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
1693        self.store.get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
1694    }
1695
1696    async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1697        self.store.get_custom_value(key).await
1698    }
1699
1700    async fn set_custom_value(
1701        &self,
1702        key: &[u8],
1703        value: Vec<u8>,
1704    ) -> Result<Option<Vec<u8>>, Self::Error> {
1705        self.store.set_custom_value(key, value).await
1706    }
1707
1708    async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
1709        self.store.remove_custom_value(key).await
1710    }
1711
1712    async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
1713        self.store.remove_room(room_id).await
1714    }
1715
1716    async fn save_send_queue_request(
1717        &self,
1718        room_id: &RoomId,
1719        transaction_id: OwnedTransactionId,
1720        created_at: MilliSecondsSinceUnixEpoch,
1721        request: QueuedRequestKind,
1722        priority: usize,
1723    ) -> Result<(), Self::Error> {
1724        self.store
1725            .save_send_queue_request(room_id, transaction_id, created_at, request, priority)
1726            .await
1727    }
1728
1729    async fn update_send_queue_request(
1730        &self,
1731        room_id: &RoomId,
1732        transaction_id: &TransactionId,
1733        content: QueuedRequestKind,
1734    ) -> Result<bool, Self::Error> {
1735        self.store.update_send_queue_request(room_id, transaction_id, content).await
1736    }
1737
1738    async fn remove_send_queue_request(
1739        &self,
1740        room_id: &RoomId,
1741        transaction_id: &TransactionId,
1742    ) -> Result<bool, Self::Error> {
1743        self.store.remove_send_queue_request(room_id, transaction_id).await
1744    }
1745
1746    async fn load_send_queue_requests(
1747        &self,
1748        room_id: &RoomId,
1749    ) -> Result<Vec<QueuedRequest>, Self::Error> {
1750        self.store.load_send_queue_requests(room_id).await
1751    }
1752
1753    async fn update_send_queue_request_status(
1754        &self,
1755        room_id: &RoomId,
1756        transaction_id: &TransactionId,
1757        error: Option<QueueWedgeError>,
1758    ) -> Result<(), Self::Error> {
1759        self.store.update_send_queue_request_status(room_id, transaction_id, error).await
1760    }
1761
1762    async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
1763        self.store.load_rooms_with_unsent_requests().await
1764    }
1765
1766    async fn save_dependent_queued_request(
1767        &self,
1768        room_id: &RoomId,
1769        parent_txn_id: &TransactionId,
1770        own_txn_id: ChildTransactionId,
1771        created_at: MilliSecondsSinceUnixEpoch,
1772        content: DependentQueuedRequestKind,
1773    ) -> Result<(), Self::Error> {
1774        self.store
1775            .save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
1776            .await
1777    }
1778
1779    async fn mark_dependent_queued_requests_as_ready(
1780        &self,
1781        room_id: &RoomId,
1782        parent_txn_id: &TransactionId,
1783        sent_parent_key: SentRequestKey,
1784    ) -> Result<usize, Self::Error> {
1785        self.store
1786            .mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
1787            .await
1788    }
1789
1790    async fn update_dependent_queued_request(
1791        &self,
1792        room_id: &RoomId,
1793        own_transaction_id: &ChildTransactionId,
1794        new_content: DependentQueuedRequestKind,
1795    ) -> Result<bool, Self::Error> {
1796        self.store.update_dependent_queued_request(room_id, own_transaction_id, new_content).await
1797    }
1798
1799    async fn remove_dependent_queued_request(
1800        &self,
1801        room: &RoomId,
1802        own_txn_id: &ChildTransactionId,
1803    ) -> Result<bool, Self::Error> {
1804        self.store.remove_dependent_queued_request(room, own_txn_id).await
1805    }
1806
1807    async fn load_dependent_queued_requests(
1808        &self,
1809        room: &RoomId,
1810    ) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
1811        self.store.load_dependent_queued_requests(room).await
1812    }
1813
1814    async fn upsert_thread_subscriptions(
1815        &self,
1816        updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
1817    ) -> Result<(), Self::Error> {
1818        self.store.upsert_thread_subscriptions(updates).await
1819    }
1820
1821    async fn load_thread_subscription(
1822        &self,
1823        room: &RoomId,
1824        thread_id: &EventId,
1825    ) -> Result<Option<StoredThreadSubscription>, Self::Error> {
1826        self.store.load_thread_subscription(room, thread_id).await
1827    }
1828
1829    async fn remove_thread_subscription(
1830        &self,
1831        room: &RoomId,
1832        thread_id: &EventId,
1833    ) -> Result<(), Self::Error> {
1834        self.store.remove_thread_subscription(room, thread_id).await
1835    }
1836
1837    async fn close(&self) -> Result<(), Self::Error> {
1838        self.store.close().await
1839    }
1840
1841    async fn reopen(&self) -> Result<(), Self::Error> {
1842        self.store.reopen().await
1843    }
1844
1845    async fn optimize(&self) -> Result<(), Self::Error> {
1846        self.store.optimize().await
1847    }
1848
1849    async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
1850        self.store.get_size().await
1851    }
1852}
1853
1854/// Convenience functionality for state stores.
1855#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1856#[cfg_attr(not(target_family = "wasm"), async_trait)]
1857pub trait StateStoreExt: StateStore {
1858    /// Get a specific state event of statically-known type.
1859    ///
1860    /// # Arguments
1861    ///
1862    /// * `room_id` - The id of the room the state event was received for.
1863    async fn get_state_event_static<C>(
1864        &self,
1865        room_id: &RoomId,
1866    ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
1867    where
1868        C: StaticEventContent<IsPrefix = ruma::events::False>
1869            + StaticStateEventContent<StateKey = EmptyStateKey>
1870            + RedactContent,
1871        C::Redacted: RedactedStateEventContent,
1872    {
1873        Ok(self.get_state_event(room_id, C::TYPE.into(), "").await?.map(|raw| raw.cast()))
1874    }
1875
1876    /// Get a specific state event of statically-known type.
1877    ///
1878    /// # Arguments
1879    ///
1880    /// * `room_id` - The id of the room the state event was received for.
1881    async fn get_state_event_static_for_key<C, K>(
1882        &self,
1883        room_id: &RoomId,
1884        state_key: &K,
1885    ) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
1886    where
1887        C: StaticEventContent<IsPrefix = ruma::events::False>
1888            + StaticStateEventContent
1889            + RedactContent,
1890        C::StateKey: Borrow<K>,
1891        C::Redacted: RedactedStateEventContent,
1892        K: AsRef<str> + ?Sized + Sync,
1893    {
1894        Ok(self
1895            .get_state_event(room_id, C::TYPE.into(), state_key.as_ref())
1896            .await?
1897            .map(|raw| raw.cast()))
1898    }
1899
1900    /// Get a list of state events of a statically-known type for a given room.
1901    ///
1902    /// # Arguments
1903    ///
1904    /// * `room_id` - The id of the room to find events for.
1905    async fn get_state_events_static<C>(
1906        &self,
1907        room_id: &RoomId,
1908    ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
1909    where
1910        C: StaticEventContent<IsPrefix = ruma::events::False>
1911            + StaticStateEventContent
1912            + RedactContent,
1913        C::Redacted: RedactedStateEventContent,
1914    {
1915        // FIXME: Could be more efficient, if we had streaming store accessor functions
1916        Ok(self
1917            .get_state_events(room_id, C::TYPE.into())
1918            .await?
1919            .into_iter()
1920            .map(|raw| raw.cast())
1921            .collect())
1922    }
1923
1924    /// Get a list of state events of a statically-known type for a given room
1925    /// and given state keys.
1926    ///
1927    /// # Arguments
1928    ///
1929    /// * `room_id` - The id of the room to find events for.
1930    ///
1931    /// * `state_keys` - The list of state keys to find.
1932    async fn get_state_events_for_keys_static<'a, C, K, I>(
1933        &self,
1934        room_id: &RoomId,
1935        state_keys: I,
1936    ) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
1937    where
1938        C: StaticEventContent<IsPrefix = ruma::events::False>
1939            + StaticStateEventContent
1940            + RedactContent,
1941        C::StateKey: Borrow<K>,
1942        C::Redacted: RedactedStateEventContent,
1943        K: AsRef<str> + Sized + Sync + 'a,
1944        I: IntoIterator<Item = &'a K> + Send,
1945        I::IntoIter: Send,
1946    {
1947        Ok(self
1948            .get_state_events_for_keys(
1949                room_id,
1950                C::TYPE.into(),
1951                &state_keys.into_iter().map(|k| k.as_ref()).collect::<Vec<_>>(),
1952            )
1953            .await?
1954            .into_iter()
1955            .map(|raw| raw.cast())
1956            .collect())
1957    }
1958
1959    /// Get an event of a statically-known type from the account data store.
1960    async fn get_account_data_event_static<C>(
1961        &self,
1962    ) -> Result<Option<Raw<GlobalAccountDataEvent<C>>>, Self::Error>
1963    where
1964        C: StaticEventContent<IsPrefix = ruma::events::False> + GlobalAccountDataEventContent,
1965    {
1966        Ok(self.get_account_data_event(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1967    }
1968
1969    /// Get an event of a statically-known type from the room account data
1970    /// store.
1971    ///
1972    /// # Arguments
1973    ///
1974    /// * `room_id` - The id of the room for which the room account data event
1975    ///   should be fetched.
1976    async fn get_room_account_data_event_static<C>(
1977        &self,
1978        room_id: &RoomId,
1979    ) -> Result<Option<Raw<RoomAccountDataEvent<C>>>, Self::Error>
1980    where
1981        C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1982    {
1983        Ok(self
1984            .get_room_account_data_event(room_id, C::TYPE.into())
1985            .await?
1986            .map(Raw::cast_unchecked))
1987    }
1988
1989    /// Get the `MemberEvent` for the given state key in the given room id.
1990    ///
1991    /// # Arguments
1992    ///
1993    /// * `room_id` - The room id the member event belongs to.
1994    ///
1995    /// * `state_key` - The user id that the member event defines the state for.
1996    async fn get_member_event(
1997        &self,
1998        room_id: &RoomId,
1999        state_key: &UserId,
2000    ) -> Result<Option<RawMemberEvent>, Self::Error> {
2001        self.get_state_event_static_for_key(room_id, state_key).await
2002    }
2003}
2004
2005#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2006#[cfg_attr(not(target_family = "wasm"), async_trait)]
2007impl<T: StateStore + ?Sized> StateStoreExt for T {}
2008
2009/// A type-erased [`StateStore`].
2010pub type DynStateStore = dyn StateStore<Error = StoreError>;
2011
2012/// A type that can be type-erased into `Arc<dyn StateStore>`.
2013///
2014/// This trait is not meant to be implemented directly outside
2015/// `matrix-sdk-crypto`, but it is automatically implemented for everything that
2016/// implements `StateStore`.
2017pub trait IntoStateStore {
2018    #[doc(hidden)]
2019    fn into_state_store(self) -> Arc<DynStateStore>;
2020}
2021
2022impl<T> IntoStateStore for T
2023where
2024    T: StateStore + Sized + 'static,
2025{
2026    fn into_state_store(self) -> Arc<DynStateStore> {
2027        Arc::new(EraseStateStoreError(self))
2028    }
2029}
2030
2031/// Serialisable representation of get_supported_versions::Response.
2032#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2033pub struct SupportedVersionsResponse {
2034    /// Versions supported by the remote server.
2035    pub versions: Vec<String>,
2036
2037    /// List of unstable features and their enablement status.
2038    pub unstable_features: BTreeMap<String, bool>,
2039}
2040
2041impl SupportedVersionsResponse {
2042    /// Extracts known Matrix versions and features from the un-typed lists of
2043    /// strings.
2044    ///
2045    /// Note: Matrix versions and features that Ruma cannot parse, or does not
2046    /// know about, are discarded.
2047    pub fn supported_versions(&self) -> SupportedVersions {
2048        let mut supported_versions =
2049            SupportedVersions::from_parts(&self.versions, &self.unstable_features);
2050
2051        // We need at least one supported version to be able to make requests, so we
2052        // default to Matrix 1.0.
2053        if supported_versions.versions.is_empty() {
2054            supported_versions.versions.insert(MatrixVersion::V1_0);
2055        }
2056
2057        supported_versions
2058    }
2059}
2060
2061#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2062/// A serialisable representation of discover_homeserver::Response.
2063pub struct WellKnownResponse {
2064    /// Information about the homeserver to connect to.
2065    pub homeserver: HomeserverInfo,
2066
2067    /// Information about the identity server to connect to.
2068    pub identity_server: Option<IdentityServerInfo>,
2069
2070    /// Information about the tile server to use to display location data.
2071    pub tile_server: Option<TileServerInfo>,
2072
2073    /// A list of the available MatrixRTC foci, ordered by priority.
2074    pub rtc_foci: Vec<RtcTransport>,
2075}
2076
2077impl From<discover_homeserver::Response> for WellKnownResponse {
2078    fn from(response: discover_homeserver::Response) -> Self {
2079        Self {
2080            homeserver: response.homeserver,
2081            identity_server: response.identity_server,
2082            tile_server: response.tile_server,
2083            rtc_foci: response.rtc_foci,
2084        }
2085    }
2086}
2087
2088/// A value for key-value data that should be persisted into the store.
2089#[derive(Debug, Clone)]
2090pub enum StateStoreDataValue {
2091    /// The sync token.
2092    SyncToken(String),
2093
2094    /// The supported versions of the server.
2095    SupportedVersions(TtlValue<SupportedVersionsResponse>),
2096
2097    /// The well-known information of the server.
2098    WellKnown(TtlValue<Option<WellKnownResponse>>),
2099
2100    /// A filter with the given ID.
2101    Filter(String),
2102
2103    /// The user avatar url
2104    UserAvatarUrl(OwnedMxcUri),
2105
2106    /// A list of recently visited room identifiers for the current user
2107    RecentlyVisitedRooms(Vec<OwnedRoomId>),
2108
2109    /// Persistent data for
2110    /// `matrix_sdk_ui::unable_to_decrypt_hook::UtdHookManager`.
2111    UtdHookManagerData(GrowableBloom),
2112
2113    /// A unit value telling us that the client uploaded duplicate one-time
2114    /// keys.
2115    OneTimeKeyAlreadyUploaded,
2116
2117    /// A composer draft for the room.
2118    /// To learn more, see [`ComposerDraft`].
2119    ///
2120    /// [`ComposerDraft`]: Self::ComposerDraft
2121    ComposerDraft(ComposerDraft),
2122
2123    /// A list of knock request ids marked as seen in a room.
2124    SeenKnockRequests(BTreeMap<OwnedEventId, OwnedUserId>),
2125
2126    /// A list of tokens to continue thread subscriptions catchup.
2127    ///
2128    /// See documentation of [`ThreadSubscriptionCatchupToken`] for more
2129    /// details.
2130    ThreadSubscriptionsCatchupTokens(Vec<ThreadSubscriptionCatchupToken>),
2131
2132    /// The capabilities the homeserver supports or disables.
2133    HomeserverCapabilities(TtlValue<Capabilities>),
2134}
2135
2136/// Tokens to use when catching up on thread subscriptions.
2137///
2138/// These tokens are created when the client receives some thread subscriptions
2139/// from sync, but the sync indicates that there are more thread subscriptions
2140/// available on the server. In this case, it's expected that the client will
2141/// call the [MSC4308] companion endpoint to catch up (back-paginate) on
2142/// previous thread subscriptions.
2143///
2144/// [MSC4308]: https://github.com/matrix-org/matrix-spec-proposals/pull/4308
2145#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2146pub struct ThreadSubscriptionCatchupToken {
2147    /// The token to use as the lower bound when fetching new threads
2148    /// subscriptions.
2149    ///
2150    /// In sliding sync, this is the `prev_batch` value of a sliding sync
2151    /// response.
2152    pub from: String,
2153
2154    /// The token to use as the upper bound when fetching new threads
2155    /// subscriptions.
2156    ///
2157    /// In sliding sync, it must be set to the `pos` value of the sliding sync
2158    /// *request*, which response received a `prev_batch` token.
2159    pub to: Option<String>,
2160}
2161
2162/// Current draft of the composer for the room.
2163#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2164pub struct ComposerDraft {
2165    /// The draft content in plain text.
2166    pub plain_text: String,
2167    /// If the message is formatted in HTML, the HTML representation of the
2168    /// message.
2169    pub html_text: Option<String>,
2170    /// The type of draft.
2171    pub draft_type: ComposerDraftType,
2172    /// Attachments associated with this draft.
2173    #[serde(default)]
2174    pub attachments: Vec<DraftAttachment>,
2175}
2176
2177/// An attachment stored with a composer draft.
2178#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2179pub struct DraftAttachment {
2180    /// The filename of the attachment.
2181    pub filename: String,
2182    /// The attachment content with type-specific data.
2183    pub content: DraftAttachmentContent,
2184}
2185
2186/// The content of a draft attachment with type-specific data.
2187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2188#[serde(tag = "type")]
2189pub enum DraftAttachmentContent {
2190    /// Image attachment.
2191    Image {
2192        /// The image file data.
2193        data: Vec<u8>,
2194        /// MIME type.
2195        mimetype: Option<String>,
2196        /// File size in bytes.
2197        size: Option<u64>,
2198        /// Width in pixels.
2199        width: Option<u64>,
2200        /// Height in pixels.
2201        height: Option<u64>,
2202        /// BlurHash string.
2203        blurhash: Option<String>,
2204        /// Optional thumbnail.
2205        thumbnail: Option<DraftThumbnail>,
2206    },
2207    /// Video attachment.
2208    Video {
2209        /// The video file data.
2210        data: Vec<u8>,
2211        /// MIME type.
2212        mimetype: Option<String>,
2213        /// File size in bytes.
2214        size: Option<u64>,
2215        /// Width in pixels.
2216        width: Option<u64>,
2217        /// Height in pixels.
2218        height: Option<u64>,
2219        /// Duration.
2220        duration: Option<std::time::Duration>,
2221        /// BlurHash string.
2222        blurhash: Option<String>,
2223        /// Optional thumbnail.
2224        thumbnail: Option<DraftThumbnail>,
2225    },
2226    /// Audio attachment.
2227    Audio {
2228        /// The audio file data.
2229        data: Vec<u8>,
2230        /// MIME type.
2231        mimetype: Option<String>,
2232        /// File size in bytes.
2233        size: Option<u64>,
2234        /// Duration.
2235        duration: Option<std::time::Duration>,
2236    },
2237    /// Generic file attachment.
2238    File {
2239        /// The file data.
2240        data: Vec<u8>,
2241        /// MIME type.
2242        mimetype: Option<String>,
2243        /// File size in bytes.
2244        size: Option<u64>,
2245    },
2246}
2247
2248/// Thumbnail data for a draft attachment.
2249#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2250pub struct DraftThumbnail {
2251    /// The filename of the thumbnail.
2252    pub filename: String,
2253    /// The thumbnail image data.
2254    pub data: Vec<u8>,
2255    /// MIME type of the thumbnail.
2256    pub mimetype: Option<String>,
2257    /// Width in pixels.
2258    pub width: Option<u64>,
2259    /// Height in pixels.
2260    pub height: Option<u64>,
2261    /// File size in bytes.
2262    pub size: Option<u64>,
2263}
2264
2265/// The type of draft of the composer.
2266#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
2267pub enum ComposerDraftType {
2268    /// The draft is a new message.
2269    NewMessage,
2270    /// The draft is a reply to an event.
2271    Reply {
2272        /// The ID of the event being replied to.
2273        event_id: OwnedEventId,
2274    },
2275    /// The draft is an edit of an event.
2276    Edit {
2277        /// The ID of the event being edited.
2278        event_id: OwnedEventId,
2279    },
2280}
2281
2282impl StateStoreDataValue {
2283    /// Get this value if it is a sync token.
2284    pub fn into_sync_token(self) -> Option<String> {
2285        as_variant!(self, Self::SyncToken)
2286    }
2287
2288    /// Get this value if it is a filter.
2289    pub fn into_filter(self) -> Option<String> {
2290        as_variant!(self, Self::Filter)
2291    }
2292
2293    /// Get this value if it is a user avatar url.
2294    pub fn into_user_avatar_url(self) -> Option<OwnedMxcUri> {
2295        as_variant!(self, Self::UserAvatarUrl)
2296    }
2297
2298    /// Get this value if it is a list of recently visited rooms.
2299    pub fn into_recently_visited_rooms(self) -> Option<Vec<OwnedRoomId>> {
2300        as_variant!(self, Self::RecentlyVisitedRooms)
2301    }
2302
2303    /// Get this value if it is the data for the `UtdHookManager`.
2304    pub fn into_utd_hook_manager_data(self) -> Option<GrowableBloom> {
2305        as_variant!(self, Self::UtdHookManagerData)
2306    }
2307
2308    /// Get this value if it is a composer draft.
2309    pub fn into_composer_draft(self) -> Option<ComposerDraft> {
2310        as_variant!(self, Self::ComposerDraft)
2311    }
2312
2313    /// Get this value if it is the supported versions metadata.
2314    pub fn into_supported_versions(self) -> Option<TtlValue<SupportedVersionsResponse>> {
2315        as_variant!(self, Self::SupportedVersions)
2316    }
2317
2318    /// Get this value if it is the well-known metadata.
2319    pub fn into_well_known(self) -> Option<TtlValue<Option<WellKnownResponse>>> {
2320        as_variant!(self, Self::WellKnown)
2321    }
2322
2323    /// Get this value if it is the data for the ignored join requests.
2324    pub fn into_seen_knock_requests(self) -> Option<BTreeMap<OwnedEventId, OwnedUserId>> {
2325        as_variant!(self, Self::SeenKnockRequests)
2326    }
2327
2328    /// Get this value if it is the data for the thread subscriptions catchup
2329    /// tokens.
2330    pub fn into_thread_subscriptions_catchup_tokens(
2331        self,
2332    ) -> Option<Vec<ThreadSubscriptionCatchupToken>> {
2333        as_variant!(self, Self::ThreadSubscriptionsCatchupTokens)
2334    }
2335
2336    /// Get this value if it is the data for the capabilities the homeserver
2337    /// supports or disables.
2338    pub fn into_homeserver_capabilities(self) -> Option<TtlValue<Capabilities>> {
2339        as_variant!(self, Self::HomeserverCapabilities)
2340    }
2341}
2342
2343/// A key for key-value data.
2344#[derive(Debug, Clone, Copy)]
2345pub enum StateStoreDataKey<'a> {
2346    /// The sync token.
2347    SyncToken,
2348
2349    /// The supported versions of the server,
2350    SupportedVersions,
2351
2352    /// The well-known information of the server,
2353    WellKnown,
2354
2355    /// A filter with the given name.
2356    Filter(&'a str),
2357
2358    /// Avatar URL
2359    UserAvatarUrl(&'a UserId),
2360
2361    /// Recently visited room identifiers
2362    RecentlyVisitedRooms(&'a UserId),
2363
2364    /// Persistent data for
2365    /// `matrix_sdk_ui::unable_to_decrypt_hook::UtdHookManager`.
2366    UtdHookManagerData,
2367
2368    /// Data remembering if the client already reported that it has uploaded
2369    /// duplicate one-time keys.
2370    OneTimeKeyAlreadyUploaded,
2371
2372    /// A composer draft for the room.
2373    /// To learn more, see [`ComposerDraft`].
2374    ///
2375    /// [`ComposerDraft`]: Self::ComposerDraft
2376    ComposerDraft(&'a RoomId, Option<&'a EventId>),
2377
2378    /// A list of knock request ids marked as seen in a room.
2379    SeenKnockRequests(&'a RoomId),
2380
2381    /// A list of thread subscriptions catchup tokens.
2382    ThreadSubscriptionsCatchupTokens,
2383
2384    /// A list of capabilities that the homeserver supports.
2385    HomeserverCapabilities,
2386}
2387
2388impl StateStoreDataKey<'_> {
2389    /// Key to use for the [`SyncToken`][Self::SyncToken] variant.
2390    pub const SYNC_TOKEN: &'static str = "sync_token";
2391
2392    /// Key to use for the [`SupportedVersions`][Self::SupportedVersions]
2393    /// variant.
2394    pub const SUPPORTED_VERSIONS: &'static str = "server_capabilities"; // Note: this is the old name, kept for backwards compatibility.
2395
2396    /// Key to use for the [`WellKnown`][Self::WellKnown]
2397    /// variant.
2398    pub const WELL_KNOWN: &'static str = "well_known";
2399
2400    /// Key prefix to use for the [`Filter`][Self::Filter] variant.
2401    pub const FILTER: &'static str = "filter";
2402
2403    /// Key prefix to use for the [`UserAvatarUrl`][Self::UserAvatarUrl]
2404    /// variant.
2405    pub const USER_AVATAR_URL: &'static str = "user_avatar_url";
2406
2407    /// Key prefix to use for the
2408    /// [`RecentlyVisitedRooms`][Self::RecentlyVisitedRooms] variant.
2409    pub const RECENTLY_VISITED_ROOMS: &'static str = "recently_visited_rooms";
2410
2411    /// Key to use for the [`UtdHookManagerData`][Self::UtdHookManagerData]
2412    /// variant.
2413    pub const UTD_HOOK_MANAGER_DATA: &'static str = "utd_hook_manager_data";
2414
2415    /// Key to use for the flag remembering that we already reported that we
2416    /// uploaded duplicate one-time keys.
2417    pub const ONE_TIME_KEY_ALREADY_UPLOADED: &'static str = "one_time_key_already_uploaded";
2418
2419    /// Key prefix to use for the [`ComposerDraft`][Self::ComposerDraft]
2420    /// variant.
2421    pub const COMPOSER_DRAFT: &'static str = "composer_draft";
2422
2423    /// Key prefix to use for the
2424    /// [`SeenKnockRequests`][Self::SeenKnockRequests] variant.
2425    pub const SEEN_KNOCK_REQUESTS: &'static str = "seen_knock_requests";
2426
2427    /// Key prefix to use for the
2428    /// [`ThreadSubscriptionsCatchupTokens`][Self::ThreadSubscriptionsCatchupTokens] variant.
2429    pub const THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS: &'static str =
2430        "thread_subscriptions_catchup_tokens";
2431
2432    /// Key prefix to use for the homeserver's [`Capabilities`].
2433    pub const HOMESERVER_CAPABILITIES: &'static str = "homeserver_capabilities";
2434}
2435
2436/// Compare two thread subscription changes bump stamps, given a fixed room and
2437/// thread root event id pair.
2438///
2439/// May update the newer one to keep the previous one if needed, under some
2440/// conditions.
2441///
2442/// Returns true if the new subscription should be stored, or false if the new
2443/// subscription should be ignored.
2444pub fn compare_thread_subscription_bump_stamps(
2445    previous: Option<u64>,
2446    new: &mut Option<u64>,
2447) -> bool {
2448    match (previous, &new) {
2449        // If the previous subscription had a bump stamp, and the new one doesn't, keep the
2450        // previous one; it should be updated soon via sync anyways.
2451        (Some(prev_bump), None) => {
2452            *new = Some(prev_bump);
2453        }
2454
2455        // If the previous bump stamp is newer than the new one, don't store the value at all.
2456        (Some(prev_bump), Some(new_bump)) if *new_bump <= prev_bump => {
2457            return false;
2458        }
2459
2460        // In all other cases, keep the new bumpstamp.
2461        _ => {}
2462    }
2463
2464    true
2465}
2466
2467#[cfg(test)]
2468mod tests {
2469    mod save_locked_state_store {
2470        use std::time::Duration;
2471
2472        use assert_matches::assert_matches;
2473        use futures_util::future::{self, Either};
2474        #[cfg(all(target_family = "wasm", target_os = "unknown"))]
2475        use gloo_timers::future::sleep;
2476        use matrix_sdk_common::executor::spawn;
2477        use matrix_sdk_test::async_test;
2478        use tokio::sync::Mutex;
2479        #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
2480        use tokio::time::sleep;
2481
2482        use crate::{
2483            StateChanges, StateStore,
2484            store::{IntoStateStore, MemoryStore, Result, SaveLockedStateStore},
2485        };
2486
2487        async fn get_store() -> Result<impl StateStore> {
2488            Ok(SaveLockedStateStore::new(MemoryStore::new()))
2489        }
2490
2491        statestore_integration_tests!();
2492
2493        #[async_test]
2494        async fn test_state_store_only_accepts_guard_for_underlying_mutex() {
2495            let state_store = SaveLockedStateStore::new(MemoryStore::new());
2496            let state_changes = StateChanges::default();
2497            state_store
2498                .save_changes_with_guard(&state_store.lock().lock().await, &state_changes)
2499                .await
2500                .expect("state store accepts guard for underlying mutex");
2501
2502            let mutex = Mutex::new(());
2503            state_store
2504                .save_changes_with_guard(&mutex.lock().await, &state_changes)
2505                .await
2506                .expect_err("state store does not accept guard for unknown mutex");
2507        }
2508
2509        #[derive(Debug)]
2510        struct Elapsed;
2511
2512        async fn timeout<F: Future + Unpin>(
2513            duration: Duration,
2514            f: F,
2515        ) -> Result<F::Output, Elapsed> {
2516            #[cfg(all(target_family = "wasm", target_os = "unknown"))]
2517            {
2518                match future::select(sleep(duration), f).await {
2519                    Either::Left(_) => return Err(Elapsed),
2520                    Either::Right((output, _)) => Ok(output),
2521                }
2522            }
2523            #[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
2524            {
2525                tokio::time::timeout(duration, f).await.map_err(|_| Elapsed)
2526            }
2527        }
2528
2529        #[async_test]
2530        async fn test_state_store_waits_to_acquire_lock_before_saving_changes() {
2531            let state_store = SaveLockedStateStore::new(MemoryStore::new().into_state_store());
2532
2533            // Acquire lock and hold it for 5 seconds
2534            let lock_task = spawn({
2535                let state_store = state_store.clone();
2536                async move {
2537                    let lock = state_store.lock();
2538                    let _guard = lock.lock().await;
2539                    sleep(Duration::from_secs(5)).await;
2540                }
2541            });
2542
2543            // Try to save changes to the state store while the lock is held by another task
2544            let save_task =
2545                spawn(async move { state_store.save_changes(&StateChanges::default()).await });
2546
2547            // Ensure that the second task does not progress until the first task has
2548            // completed and therefore release the save lock
2549            assert_matches!(future::select(lock_task, save_task).await, Either::Left((_, save_task)) => {
2550                timeout(Duration::from_millis(100), save_task)
2551                    .await
2552                    .expect("task completes before timeout")
2553                    .expect("task completes successfully")
2554                    .expect("task saves changes");
2555            });
2556        }
2557    }
2558}