use std::{
borrow::Borrow,
collections::{BTreeMap, BTreeSet, HashMap},
fmt,
ops::Deref,
sync::Arc,
};
use as_variant::as_variant;
use async_trait::async_trait;
use growable_bloom_filter::GrowableBloom;
use matrix_sdk_common::{AsyncTraitDeps, ttl::TtlValue};
use ruma::{
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedMxcUri, OwnedRoomId,
OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UserId,
api::{
MatrixVersion, SupportedVersions,
client::discovery::{
discover_homeserver::{
self, HomeserverInfo, IdentityServerInfo, RtcFocusInfo, TileServerInfo,
},
get_capabilities::v3::Capabilities,
},
},
events::{
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, EmptyStateKey, GlobalAccountDataEvent,
GlobalAccountDataEventContent, GlobalAccountDataEventType, RedactContent,
RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
RoomAccountDataEventType, StateEventType, StaticEventContent, StaticStateEventContent,
presence::PresenceEvent,
receipt::{Receipt, ReceiptThread, ReceiptType},
},
serde::Raw,
};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::{Mutex, MutexGuard};
use super::{
ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError,
QueuedRequest, QueuedRequestKind, RoomLoadSettings, StateChanges, StoreError,
send_queue::SentRequestKey,
};
use crate::{
MinimalRoomMemberEvent, RoomInfo, RoomMemberships,
deserialized_responses::{
DisplayName, RawAnySyncOrStrippedState, RawMemberEvent, RawSyncOrStrippedState,
},
store::StoredThreadSubscription,
};
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
pub trait StateStore: AsyncTraitDeps {
type Error: fmt::Debug + Into<StoreError> + From<serde_json::Error>;
async fn get_kv_data(
&self,
key: StateStoreDataKey<'_>,
) -> Result<Option<StateStoreDataValue>, Self::Error>;
async fn set_kv_data(
&self,
key: StateStoreDataKey<'_>,
value: StateStoreDataValue,
) -> Result<(), Self::Error>;
async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error>;
async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error>;
async fn get_presence_event(
&self,
user_id: &UserId,
) -> Result<Option<Raw<PresenceEvent>>, Self::Error>;
async fn get_presence_events(
&self,
user_ids: &[OwnedUserId],
) -> Result<Vec<Raw<PresenceEvent>>, Self::Error>;
async fn get_state_event(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_key: &str,
) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error>;
async fn get_state_events(
&self,
room_id: &RoomId,
event_type: StateEventType,
) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
async fn get_state_events_for_keys(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_keys: &[&str],
) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error>;
async fn get_profile(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<MinimalRoomMemberEvent>, Self::Error>;
async fn get_profiles<'a>(
&self,
room_id: &RoomId,
user_ids: &'a [OwnedUserId],
) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error>;
async fn get_user_ids(
&self,
room_id: &RoomId,
memberships: RoomMemberships,
) -> Result<Vec<OwnedUserId>, Self::Error>;
async fn get_room_infos(
&self,
room_load_settings: &RoomLoadSettings,
) -> Result<Vec<RoomInfo>, Self::Error>;
async fn get_users_with_display_name(
&self,
room_id: &RoomId,
display_name: &DisplayName,
) -> Result<BTreeSet<OwnedUserId>, Self::Error>;
async fn get_users_with_display_names<'a>(
&self,
room_id: &RoomId,
display_names: &'a [DisplayName],
) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error>;
async fn get_account_data_event(
&self,
event_type: GlobalAccountDataEventType,
) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error>;
async fn get_room_account_data_event(
&self,
room_id: &RoomId,
event_type: RoomAccountDataEventType,
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error>;
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
thread: ReceiptThread,
user_id: &UserId,
) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error>;
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
thread: ReceiptThread,
event_id: &EventId,
) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error>;
async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
async fn set_custom_value(
&self,
key: &[u8],
value: Vec<u8>,
) -> Result<Option<Vec<u8>>, Self::Error>;
async fn set_custom_value_no_read(
&self,
key: &[u8],
value: Vec<u8>,
) -> Result<(), Self::Error> {
self.set_custom_value(key, value).await.map(|_| ())
}
async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error>;
async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error>;
async fn save_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
request: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error>;
async fn update_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
content: QueuedRequestKind,
) -> Result<bool, Self::Error>;
async fn remove_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
) -> Result<bool, Self::Error>;
async fn load_send_queue_requests(
&self,
room_id: &RoomId,
) -> Result<Vec<QueuedRequest>, Self::Error>;
async fn update_send_queue_request_status(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
error: Option<QueueWedgeError>,
) -> Result<(), Self::Error>;
async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error>;
async fn save_dependent_queued_request(
&self,
room_id: &RoomId,
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error>;
async fn mark_dependent_queued_requests_as_ready(
&self,
room_id: &RoomId,
parent_txn_id: &TransactionId,
sent_parent_key: SentRequestKey,
) -> Result<usize, Self::Error>;
async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool, Self::Error>;
async fn remove_dependent_queued_request(
&self,
room: &RoomId,
own_txn_id: &ChildTransactionId,
) -> Result<bool, Self::Error>;
async fn load_dependent_queued_requests(
&self,
room: &RoomId,
) -> Result<Vec<DependentQueuedRequest>, Self::Error>;
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<(), Self::Error>;
async fn remove_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<(), Self::Error>;
async fn load_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<Option<StoredThreadSubscription>, Self::Error>;
#[doc(hidden)]
async fn optimize(&self) -> Result<(), Self::Error>;
async fn get_size(&self) -> Result<Option<usize>, Self::Error>;
}
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
impl<T: StateStore> StateStore for &T {
type Error = T::Error;
async fn get_kv_data(
&self,
key: StateStoreDataKey<'_>,
) -> Result<Option<StateStoreDataValue>, Self::Error> {
(*self).get_kv_data(key).await
}
async fn set_kv_data(
&self,
key: StateStoreDataKey<'_>,
value: StateStoreDataValue,
) -> Result<(), Self::Error> {
(*self).set_kv_data(key, value).await
}
async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
(*self).remove_kv_data(key).await
}
async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
(*self).save_changes(changes).await
}
async fn get_presence_event(
&self,
user_id: &UserId,
) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
(*self).get_presence_event(user_id).await
}
async fn get_presence_events(
&self,
user_ids: &[OwnedUserId],
) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
(*self).get_presence_events(user_ids).await
}
async fn get_state_event(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_key: &str,
) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
(*self).get_state_event(room_id, event_type, state_key).await
}
async fn get_state_events(
&self,
room_id: &RoomId,
event_type: StateEventType,
) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
(*self).get_state_events(room_id, event_type).await
}
async fn get_state_events_for_keys(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_keys: &[&str],
) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
(*self).get_state_events_for_keys(room_id, event_type, state_keys).await
}
async fn get_profile(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
(*self).get_profile(room_id, user_id).await
}
async fn get_profiles<'a>(
&self,
room_id: &RoomId,
user_ids: &'a [OwnedUserId],
) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
(*self).get_profiles(room_id, user_ids).await
}
async fn get_user_ids(
&self,
room_id: &RoomId,
memberships: RoomMemberships,
) -> Result<Vec<OwnedUserId>, Self::Error> {
(*self).get_user_ids(room_id, memberships).await
}
async fn get_room_infos(
&self,
room_load_settings: &RoomLoadSettings,
) -> Result<Vec<RoomInfo>, Self::Error> {
(*self).get_room_infos(room_load_settings).await
}
async fn get_users_with_display_name(
&self,
room_id: &RoomId,
display_name: &DisplayName,
) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
(*self).get_users_with_display_name(room_id, display_name).await
}
async fn get_users_with_display_names<'a>(
&self,
room_id: &RoomId,
display_names: &'a [DisplayName],
) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
(*self).get_users_with_display_names(room_id, display_names).await
}
async fn get_account_data_event(
&self,
event_type: GlobalAccountDataEventType,
) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
(*self).get_account_data_event(event_type).await
}
async fn get_room_account_data_event(
&self,
room_id: &RoomId,
event_type: RoomAccountDataEventType,
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
(*self).get_room_account_data_event(room_id, event_type).await
}
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
thread: ReceiptThread,
user_id: &UserId,
) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
(*self).get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
}
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
thread: ReceiptThread,
event_id: &EventId,
) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
(*self).get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
}
async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
(*self).get_custom_value(key).await
}
async fn set_custom_value(
&self,
key: &[u8],
value: Vec<u8>,
) -> Result<Option<Vec<u8>>, Self::Error> {
(*self).set_custom_value(key, value).await
}
async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
(*self).remove_custom_value(key).await
}
async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
(*self).remove_room(room_id).await
}
async fn save_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
request: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
(*self)
.save_send_queue_request(room_id, transaction_id, created_at, request, priority)
.await
}
async fn update_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
content: QueuedRequestKind,
) -> Result<bool, Self::Error> {
(*self).update_send_queue_request(room_id, transaction_id, content).await
}
async fn remove_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
) -> Result<bool, Self::Error> {
(*self).remove_send_queue_request(room_id, transaction_id).await
}
async fn load_send_queue_requests(
&self,
room_id: &RoomId,
) -> Result<Vec<QueuedRequest>, Self::Error> {
(*self).load_send_queue_requests(room_id).await
}
async fn update_send_queue_request_status(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
error: Option<QueueWedgeError>,
) -> Result<(), Self::Error> {
(*self).update_send_queue_request_status(room_id, transaction_id, error).await
}
async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
(*self).load_rooms_with_unsent_requests().await
}
async fn save_dependent_queued_request(
&self,
room_id: &RoomId,
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error> {
(*self)
.save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
.await
}
async fn mark_dependent_queued_requests_as_ready(
&self,
room_id: &RoomId,
parent_txn_id: &TransactionId,
sent_parent_key: SentRequestKey,
) -> Result<usize, Self::Error> {
(*self)
.mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
.await
}
async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool, Self::Error> {
(*self).update_dependent_queued_request(room_id, own_transaction_id, new_content).await
}
async fn remove_dependent_queued_request(
&self,
room: &RoomId,
own_txn_id: &ChildTransactionId,
) -> Result<bool, Self::Error> {
(*self).remove_dependent_queued_request(room, own_txn_id).await
}
async fn load_dependent_queued_requests(
&self,
room: &RoomId,
) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
(*self).load_dependent_queued_requests(room).await
}
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<(), Self::Error> {
(*self).upsert_thread_subscriptions(updates).await
}
async fn remove_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<(), Self::Error> {
(*self).remove_thread_subscription(room, thread_id).await
}
async fn load_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<Option<StoredThreadSubscription>, Self::Error> {
(*self).load_thread_subscription(room, thread_id).await
}
async fn optimize(&self) -> Result<(), Self::Error> {
(*self).optimize().await
}
async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
(*self).get_size().await
}
}
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
impl<T: StateStore + ?Sized> StateStore for Arc<T> {
type Error = T::Error;
async fn get_kv_data(
&self,
key: StateStoreDataKey<'_>,
) -> Result<Option<StateStoreDataValue>, Self::Error> {
self.deref().get_kv_data(key).await
}
async fn set_kv_data(
&self,
key: StateStoreDataKey<'_>,
value: StateStoreDataValue,
) -> Result<(), Self::Error> {
self.deref().set_kv_data(key, value).await
}
async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
self.deref().remove_kv_data(key).await
}
async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
self.deref().save_changes(changes).await
}
async fn get_presence_event(
&self,
user_id: &UserId,
) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
self.deref().get_presence_event(user_id).await
}
async fn get_presence_events(
&self,
user_ids: &[OwnedUserId],
) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
self.deref().get_presence_events(user_ids).await
}
async fn get_state_event(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_key: &str,
) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
self.deref().get_state_event(room_id, event_type, state_key).await
}
async fn get_state_events(
&self,
room_id: &RoomId,
event_type: StateEventType,
) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
self.deref().get_state_events(room_id, event_type).await
}
async fn get_state_events_for_keys(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_keys: &[&str],
) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
self.deref().get_state_events_for_keys(room_id, event_type, state_keys).await
}
async fn get_profile(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
self.deref().get_profile(room_id, user_id).await
}
async fn get_profiles<'a>(
&self,
room_id: &RoomId,
user_ids: &'a [OwnedUserId],
) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
self.deref().get_profiles(room_id, user_ids).await
}
async fn get_user_ids(
&self,
room_id: &RoomId,
memberships: RoomMemberships,
) -> Result<Vec<OwnedUserId>, Self::Error> {
self.deref().get_user_ids(room_id, memberships).await
}
async fn get_room_infos(
&self,
room_load_settings: &RoomLoadSettings,
) -> Result<Vec<RoomInfo>, Self::Error> {
self.deref().get_room_infos(room_load_settings).await
}
async fn get_users_with_display_name(
&self,
room_id: &RoomId,
display_name: &DisplayName,
) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
self.deref().get_users_with_display_name(room_id, display_name).await
}
async fn get_users_with_display_names<'a>(
&self,
room_id: &RoomId,
display_names: &'a [DisplayName],
) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
self.deref().get_users_with_display_names(room_id, display_names).await
}
async fn get_account_data_event(
&self,
event_type: GlobalAccountDataEventType,
) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
self.deref().get_account_data_event(event_type).await
}
async fn get_room_account_data_event(
&self,
room_id: &RoomId,
event_type: RoomAccountDataEventType,
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
self.deref().get_room_account_data_event(room_id, event_type).await
}
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
thread: ReceiptThread,
user_id: &UserId,
) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
self.deref().get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
}
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
thread: ReceiptThread,
event_id: &EventId,
) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
self.deref().get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
}
async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.deref().get_custom_value(key).await
}
async fn set_custom_value(
&self,
key: &[u8],
value: Vec<u8>,
) -> Result<Option<Vec<u8>>, Self::Error> {
self.deref().set_custom_value(key, value).await
}
async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.deref().remove_custom_value(key).await
}
async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
self.deref().remove_room(room_id).await
}
async fn save_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
request: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
self.deref()
.save_send_queue_request(room_id, transaction_id, created_at, request, priority)
.await
}
async fn update_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
content: QueuedRequestKind,
) -> Result<bool, Self::Error> {
self.deref().update_send_queue_request(room_id, transaction_id, content).await
}
async fn remove_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
) -> Result<bool, Self::Error> {
self.deref().remove_send_queue_request(room_id, transaction_id).await
}
async fn load_send_queue_requests(
&self,
room_id: &RoomId,
) -> Result<Vec<QueuedRequest>, Self::Error> {
self.deref().load_send_queue_requests(room_id).await
}
async fn update_send_queue_request_status(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
error: Option<QueueWedgeError>,
) -> Result<(), Self::Error> {
self.deref().update_send_queue_request_status(room_id, transaction_id, error).await
}
async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
self.deref().load_rooms_with_unsent_requests().await
}
async fn save_dependent_queued_request(
&self,
room_id: &RoomId,
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error> {
self.deref()
.save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
.await
}
async fn mark_dependent_queued_requests_as_ready(
&self,
room_id: &RoomId,
parent_txn_id: &TransactionId,
sent_parent_key: SentRequestKey,
) -> Result<usize, Self::Error> {
self.deref()
.mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
.await
}
async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool, Self::Error> {
self.deref().update_dependent_queued_request(room_id, own_transaction_id, new_content).await
}
async fn remove_dependent_queued_request(
&self,
room: &RoomId,
own_txn_id: &ChildTransactionId,
) -> Result<bool, Self::Error> {
self.deref().remove_dependent_queued_request(room, own_txn_id).await
}
async fn load_dependent_queued_requests(
&self,
room: &RoomId,
) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
self.deref().load_dependent_queued_requests(room).await
}
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<(), Self::Error> {
self.deref().upsert_thread_subscriptions(updates).await
}
async fn remove_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<(), Self::Error> {
self.deref().remove_thread_subscription(room, thread_id).await
}
async fn load_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<Option<StoredThreadSubscription>, Self::Error> {
self.deref().load_thread_subscription(room, thread_id).await
}
async fn optimize(&self) -> Result<(), Self::Error> {
self.deref().optimize().await
}
async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
self.deref().get_size().await
}
}
#[repr(transparent)]
struct EraseStateStoreError<T>(T);
#[cfg(not(tarpaulin_include))]
impl<T: fmt::Debug> fmt::Debug for EraseStateStoreError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
impl<T: StateStore> StateStore for EraseStateStoreError<T> {
type Error = StoreError;
async fn get_kv_data(
&self,
key: StateStoreDataKey<'_>,
) -> Result<Option<StateStoreDataValue>, Self::Error> {
self.0.get_kv_data(key).await.map_err(Into::into)
}
async fn set_kv_data(
&self,
key: StateStoreDataKey<'_>,
value: StateStoreDataValue,
) -> Result<(), Self::Error> {
self.0.set_kv_data(key, value).await.map_err(Into::into)
}
async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
self.0.remove_kv_data(key).await.map_err(Into::into)
}
async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
self.0.save_changes(changes).await.map_err(Into::into)
}
async fn get_presence_event(
&self,
user_id: &UserId,
) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
self.0.get_presence_event(user_id).await.map_err(Into::into)
}
async fn get_presence_events(
&self,
user_ids: &[OwnedUserId],
) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
self.0.get_presence_events(user_ids).await.map_err(Into::into)
}
async fn get_state_event(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_key: &str,
) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
self.0.get_state_event(room_id, event_type, state_key).await.map_err(Into::into)
}
async fn get_state_events(
&self,
room_id: &RoomId,
event_type: StateEventType,
) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
self.0.get_state_events(room_id, event_type).await.map_err(Into::into)
}
async fn get_state_events_for_keys(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_keys: &[&str],
) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
self.0.get_state_events_for_keys(room_id, event_type, state_keys).await.map_err(Into::into)
}
async fn get_profile(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
self.0.get_profile(room_id, user_id).await.map_err(Into::into)
}
async fn get_profiles<'a>(
&self,
room_id: &RoomId,
user_ids: &'a [OwnedUserId],
) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
self.0.get_profiles(room_id, user_ids).await.map_err(Into::into)
}
async fn get_user_ids(
&self,
room_id: &RoomId,
memberships: RoomMemberships,
) -> Result<Vec<OwnedUserId>, Self::Error> {
self.0.get_user_ids(room_id, memberships).await.map_err(Into::into)
}
async fn get_room_infos(
&self,
room_load_settings: &RoomLoadSettings,
) -> Result<Vec<RoomInfo>, Self::Error> {
self.0.get_room_infos(room_load_settings).await.map_err(Into::into)
}
async fn get_users_with_display_name(
&self,
room_id: &RoomId,
display_name: &DisplayName,
) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
self.0.get_users_with_display_name(room_id, display_name).await.map_err(Into::into)
}
async fn get_users_with_display_names<'a>(
&self,
room_id: &RoomId,
display_names: &'a [DisplayName],
) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
self.0.get_users_with_display_names(room_id, display_names).await.map_err(Into::into)
}
async fn get_account_data_event(
&self,
event_type: GlobalAccountDataEventType,
) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
self.0.get_account_data_event(event_type).await.map_err(Into::into)
}
async fn get_room_account_data_event(
&self,
room_id: &RoomId,
event_type: RoomAccountDataEventType,
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
self.0.get_room_account_data_event(room_id, event_type).await.map_err(Into::into)
}
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
thread: ReceiptThread,
user_id: &UserId,
) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
self.0
.get_user_room_receipt_event(room_id, receipt_type, thread, user_id)
.await
.map_err(Into::into)
}
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
thread: ReceiptThread,
event_id: &EventId,
) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
self.0
.get_event_room_receipt_events(room_id, receipt_type, thread, event_id)
.await
.map_err(Into::into)
}
async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.0.get_custom_value(key).await.map_err(Into::into)
}
async fn set_custom_value(
&self,
key: &[u8],
value: Vec<u8>,
) -> Result<Option<Vec<u8>>, Self::Error> {
self.0.set_custom_value(key, value).await.map_err(Into::into)
}
async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.0.remove_custom_value(key).await.map_err(Into::into)
}
async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
self.0.remove_room(room_id).await.map_err(Into::into)
}
async fn save_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
self.0
.save_send_queue_request(room_id, transaction_id, created_at, content, priority)
.await
.map_err(Into::into)
}
async fn update_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
content: QueuedRequestKind,
) -> Result<bool, Self::Error> {
self.0.update_send_queue_request(room_id, transaction_id, content).await.map_err(Into::into)
}
async fn remove_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
) -> Result<bool, Self::Error> {
self.0.remove_send_queue_request(room_id, transaction_id).await.map_err(Into::into)
}
async fn load_send_queue_requests(
&self,
room_id: &RoomId,
) -> Result<Vec<QueuedRequest>, Self::Error> {
self.0.load_send_queue_requests(room_id).await.map_err(Into::into)
}
async fn update_send_queue_request_status(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
error: Option<QueueWedgeError>,
) -> Result<(), Self::Error> {
self.0
.update_send_queue_request_status(room_id, transaction_id, error)
.await
.map_err(Into::into)
}
async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
self.0.load_rooms_with_unsent_requests().await.map_err(Into::into)
}
async fn save_dependent_queued_request(
&self,
room_id: &RoomId,
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error> {
self.0
.save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
.await
.map_err(Into::into)
}
async fn mark_dependent_queued_requests_as_ready(
&self,
room_id: &RoomId,
parent_txn_id: &TransactionId,
sent_parent_key: SentRequestKey,
) -> Result<usize, Self::Error> {
self.0
.mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
.await
.map_err(Into::into)
}
async fn remove_dependent_queued_request(
&self,
room_id: &RoomId,
own_txn_id: &ChildTransactionId,
) -> Result<bool, Self::Error> {
self.0.remove_dependent_queued_request(room_id, own_txn_id).await.map_err(Into::into)
}
async fn load_dependent_queued_requests(
&self,
room_id: &RoomId,
) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
self.0.load_dependent_queued_requests(room_id).await.map_err(Into::into)
}
async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool, Self::Error> {
self.0
.update_dependent_queued_request(room_id, own_transaction_id, new_content)
.await
.map_err(Into::into)
}
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<(), Self::Error> {
self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
}
async fn load_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<Option<StoredThreadSubscription>, Self::Error> {
self.0.load_thread_subscription(room, thread_id).await.map_err(Into::into)
}
async fn remove_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<(), Self::Error> {
self.0.remove_thread_subscription(room, thread_id).await.map_err(Into::into)
}
async fn optimize(&self) -> Result<(), Self::Error> {
self.0.optimize().await.map_err(Into::into)
}
async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
self.0.get_size().await.map_err(Into::into)
}
}
#[derive(Debug, Clone)]
pub struct SaveLockedStateStore<T = Arc<DynStateStore>> {
store: T,
lock: Arc<Mutex<()>>,
}
#[derive(Debug, Error)]
#[error("a mutex guard was provided, but it does not reference the correct mutex")]
pub struct IncorrectMutexGuardError;
impl From<IncorrectMutexGuardError> for StoreError {
fn from(value: IncorrectMutexGuardError) -> Self {
Self::backend(value)
}
}
impl<T> SaveLockedStateStore<T> {
pub fn new(store: T) -> Self {
Self { store, lock: Arc::new(Mutex::new(())) }
}
pub fn lock(&self) -> &Mutex<()> {
self.lock.as_ref()
}
}
impl<T: StateStore> SaveLockedStateStore<T> {
pub async fn save_changes_with_guard(
&self,
guard: &MutexGuard<'_, ()>,
changes: &StateChanges,
) -> Result<(), StoreError> {
if !std::ptr::eq(MutexGuard::mutex(guard), self.lock()) {
Err(IncorrectMutexGuardError.into())
} else {
self.store.save_changes(changes).await.map_err(Into::into)
}
}
}
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
impl<T: StateStore> StateStore for SaveLockedStateStore<T> {
type Error = T::Error;
async fn get_kv_data(
&self,
key: StateStoreDataKey<'_>,
) -> Result<Option<StateStoreDataValue>, Self::Error> {
self.store.get_kv_data(key).await
}
async fn set_kv_data(
&self,
key: StateStoreDataKey<'_>,
value: StateStoreDataValue,
) -> Result<(), Self::Error> {
self.store.set_kv_data(key, value).await
}
async fn remove_kv_data(&self, key: StateStoreDataKey<'_>) -> Result<(), Self::Error> {
self.store.remove_kv_data(key).await
}
async fn save_changes(&self, changes: &StateChanges) -> Result<(), Self::Error> {
let _guard = self.lock.lock().await;
self.store.save_changes(changes).await
}
async fn get_presence_event(
&self,
user_id: &UserId,
) -> Result<Option<Raw<PresenceEvent>>, Self::Error> {
self.store.get_presence_event(user_id).await
}
async fn get_presence_events(
&self,
user_ids: &[OwnedUserId],
) -> Result<Vec<Raw<PresenceEvent>>, Self::Error> {
self.store.get_presence_events(user_ids).await
}
async fn get_state_event(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_key: &str,
) -> Result<Option<RawAnySyncOrStrippedState>, Self::Error> {
self.store.get_state_event(room_id, event_type, state_key).await
}
async fn get_state_events(
&self,
room_id: &RoomId,
event_type: StateEventType,
) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
self.store.get_state_events(room_id, event_type).await
}
async fn get_state_events_for_keys(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_keys: &[&str],
) -> Result<Vec<RawAnySyncOrStrippedState>, Self::Error> {
self.store.get_state_events_for_keys(room_id, event_type, state_keys).await
}
async fn get_profile(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<MinimalRoomMemberEvent>, Self::Error> {
self.store.get_profile(room_id, user_id).await
}
async fn get_profiles<'a>(
&self,
room_id: &RoomId,
user_ids: &'a [OwnedUserId],
) -> Result<BTreeMap<&'a UserId, MinimalRoomMemberEvent>, Self::Error> {
self.store.get_profiles(room_id, user_ids).await
}
async fn get_user_ids(
&self,
room_id: &RoomId,
memberships: RoomMemberships,
) -> Result<Vec<OwnedUserId>, Self::Error> {
self.store.get_user_ids(room_id, memberships).await
}
async fn get_room_infos(
&self,
room_load_settings: &RoomLoadSettings,
) -> Result<Vec<RoomInfo>, Self::Error> {
self.store.get_room_infos(room_load_settings).await
}
async fn get_users_with_display_name(
&self,
room_id: &RoomId,
display_name: &DisplayName,
) -> Result<BTreeSet<OwnedUserId>, Self::Error> {
self.store.get_users_with_display_name(room_id, display_name).await
}
async fn get_users_with_display_names<'a>(
&self,
room_id: &RoomId,
display_names: &'a [DisplayName],
) -> Result<HashMap<&'a DisplayName, BTreeSet<OwnedUserId>>, Self::Error> {
self.store.get_users_with_display_names(room_id, display_names).await
}
async fn get_account_data_event(
&self,
event_type: GlobalAccountDataEventType,
) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>, Self::Error> {
self.store.get_account_data_event(event_type).await
}
async fn get_room_account_data_event(
&self,
room_id: &RoomId,
event_type: RoomAccountDataEventType,
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>, Self::Error> {
self.store.get_room_account_data_event(room_id, event_type).await
}
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
thread: ReceiptThread,
user_id: &UserId,
) -> Result<Option<(OwnedEventId, Receipt)>, Self::Error> {
self.store.get_user_room_receipt_event(room_id, receipt_type, thread, user_id).await
}
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
thread: ReceiptThread,
event_id: &EventId,
) -> Result<Vec<(OwnedUserId, Receipt)>, Self::Error> {
self.store.get_event_room_receipt_events(room_id, receipt_type, thread, event_id).await
}
async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.store.get_custom_value(key).await
}
async fn set_custom_value(
&self,
key: &[u8],
value: Vec<u8>,
) -> Result<Option<Vec<u8>>, Self::Error> {
self.store.set_custom_value(key, value).await
}
async fn remove_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
self.store.remove_custom_value(key).await
}
async fn remove_room(&self, room_id: &RoomId) -> Result<(), Self::Error> {
self.store.remove_room(room_id).await
}
async fn save_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
request: QueuedRequestKind,
priority: usize,
) -> Result<(), Self::Error> {
self.store
.save_send_queue_request(room_id, transaction_id, created_at, request, priority)
.await
}
async fn update_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
content: QueuedRequestKind,
) -> Result<bool, Self::Error> {
self.store.update_send_queue_request(room_id, transaction_id, content).await
}
async fn remove_send_queue_request(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
) -> Result<bool, Self::Error> {
self.store.remove_send_queue_request(room_id, transaction_id).await
}
async fn load_send_queue_requests(
&self,
room_id: &RoomId,
) -> Result<Vec<QueuedRequest>, Self::Error> {
self.store.load_send_queue_requests(room_id).await
}
async fn update_send_queue_request_status(
&self,
room_id: &RoomId,
transaction_id: &TransactionId,
error: Option<QueueWedgeError>,
) -> Result<(), Self::Error> {
self.store.update_send_queue_request_status(room_id, transaction_id, error).await
}
async fn load_rooms_with_unsent_requests(&self) -> Result<Vec<OwnedRoomId>, Self::Error> {
self.store.load_rooms_with_unsent_requests().await
}
async fn save_dependent_queued_request(
&self,
room_id: &RoomId,
parent_txn_id: &TransactionId,
own_txn_id: ChildTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
content: DependentQueuedRequestKind,
) -> Result<(), Self::Error> {
self.store
.save_dependent_queued_request(room_id, parent_txn_id, own_txn_id, created_at, content)
.await
}
async fn mark_dependent_queued_requests_as_ready(
&self,
room_id: &RoomId,
parent_txn_id: &TransactionId,
sent_parent_key: SentRequestKey,
) -> Result<usize, Self::Error> {
self.store
.mark_dependent_queued_requests_as_ready(room_id, parent_txn_id, sent_parent_key)
.await
}
async fn update_dependent_queued_request(
&self,
room_id: &RoomId,
own_transaction_id: &ChildTransactionId,
new_content: DependentQueuedRequestKind,
) -> Result<bool, Self::Error> {
self.store.update_dependent_queued_request(room_id, own_transaction_id, new_content).await
}
async fn remove_dependent_queued_request(
&self,
room: &RoomId,
own_txn_id: &ChildTransactionId,
) -> Result<bool, Self::Error> {
self.store.remove_dependent_queued_request(room, own_txn_id).await
}
async fn load_dependent_queued_requests(
&self,
room: &RoomId,
) -> Result<Vec<DependentQueuedRequest>, Self::Error> {
self.store.load_dependent_queued_requests(room).await
}
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<(), Self::Error> {
self.store.upsert_thread_subscriptions(updates).await
}
async fn remove_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<(), Self::Error> {
self.store.remove_thread_subscription(room, thread_id).await
}
async fn load_thread_subscription(
&self,
room: &RoomId,
thread_id: &EventId,
) -> Result<Option<StoredThreadSubscription>, Self::Error> {
self.store.load_thread_subscription(room, thread_id).await
}
async fn optimize(&self) -> Result<(), Self::Error> {
self.store.optimize().await
}
async fn get_size(&self) -> Result<Option<usize>, Self::Error> {
self.store.get_size().await
}
}
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
pub trait StateStoreExt: StateStore {
async fn get_state_event_static<C>(
&self,
room_id: &RoomId,
) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
where
C: StaticEventContent<IsPrefix = ruma::events::False>
+ StaticStateEventContent<StateKey = EmptyStateKey>
+ RedactContent,
C::Redacted: RedactedStateEventContent,
{
Ok(self.get_state_event(room_id, C::TYPE.into(), "").await?.map(|raw| raw.cast()))
}
async fn get_state_event_static_for_key<C, K>(
&self,
room_id: &RoomId,
state_key: &K,
) -> Result<Option<RawSyncOrStrippedState<C>>, Self::Error>
where
C: StaticEventContent<IsPrefix = ruma::events::False>
+ StaticStateEventContent
+ RedactContent,
C::StateKey: Borrow<K>,
C::Redacted: RedactedStateEventContent,
K: AsRef<str> + ?Sized + Sync,
{
Ok(self
.get_state_event(room_id, C::TYPE.into(), state_key.as_ref())
.await?
.map(|raw| raw.cast()))
}
async fn get_state_events_static<C>(
&self,
room_id: &RoomId,
) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
where
C: StaticEventContent<IsPrefix = ruma::events::False>
+ StaticStateEventContent
+ RedactContent,
C::Redacted: RedactedStateEventContent,
{
Ok(self
.get_state_events(room_id, C::TYPE.into())
.await?
.into_iter()
.map(|raw| raw.cast())
.collect())
}
async fn get_state_events_for_keys_static<'a, C, K, I>(
&self,
room_id: &RoomId,
state_keys: I,
) -> Result<Vec<RawSyncOrStrippedState<C>>, Self::Error>
where
C: StaticEventContent<IsPrefix = ruma::events::False>
+ StaticStateEventContent
+ RedactContent,
C::StateKey: Borrow<K>,
C::Redacted: RedactedStateEventContent,
K: AsRef<str> + Sized + Sync + 'a,
I: IntoIterator<Item = &'a K> + Send,
I::IntoIter: Send,
{
Ok(self
.get_state_events_for_keys(
room_id,
C::TYPE.into(),
&state_keys.into_iter().map(|k| k.as_ref()).collect::<Vec<_>>(),
)
.await?
.into_iter()
.map(|raw| raw.cast())
.collect())
}
async fn get_account_data_event_static<C>(
&self,
) -> Result<Option<Raw<GlobalAccountDataEvent<C>>>, Self::Error>
where
C: StaticEventContent<IsPrefix = ruma::events::False> + GlobalAccountDataEventContent,
{
Ok(self.get_account_data_event(C::TYPE.into()).await?.map(Raw::cast_unchecked))
}
async fn get_room_account_data_event_static<C>(
&self,
room_id: &RoomId,
) -> Result<Option<Raw<RoomAccountDataEvent<C>>>, Self::Error>
where
C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
{
Ok(self
.get_room_account_data_event(room_id, C::TYPE.into())
.await?
.map(Raw::cast_unchecked))
}
async fn get_member_event(
&self,
room_id: &RoomId,
state_key: &UserId,
) -> Result<Option<RawMemberEvent>, Self::Error> {
self.get_state_event_static_for_key(room_id, state_key).await
}
}
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
#[cfg_attr(not(target_family = "wasm"), async_trait)]
impl<T: StateStore + ?Sized> StateStoreExt for T {}
pub type DynStateStore = dyn StateStore<Error = StoreError>;
pub trait IntoStateStore {
#[doc(hidden)]
fn into_state_store(self) -> Arc<DynStateStore>;
}
impl<T> IntoStateStore for T
where
T: StateStore + Sized + 'static,
{
fn into_state_store(self) -> Arc<DynStateStore> {
Arc::new(EraseStateStoreError(self))
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SupportedVersionsResponse {
pub versions: Vec<String>,
pub unstable_features: BTreeMap<String, bool>,
}
impl SupportedVersionsResponse {
pub fn supported_versions(&self) -> SupportedVersions {
let mut supported_versions =
SupportedVersions::from_parts(&self.versions, &self.unstable_features);
if supported_versions.versions.is_empty() {
supported_versions.versions.insert(MatrixVersion::V1_0);
}
supported_versions
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct WellKnownResponse {
pub homeserver: HomeserverInfo,
pub identity_server: Option<IdentityServerInfo>,
pub tile_server: Option<TileServerInfo>,
pub rtc_foci: Vec<RtcFocusInfo>,
}
impl From<discover_homeserver::Response> for WellKnownResponse {
fn from(response: discover_homeserver::Response) -> Self {
Self {
homeserver: response.homeserver,
identity_server: response.identity_server,
tile_server: response.tile_server,
rtc_foci: response.rtc_foci,
}
}
}
#[derive(Debug, Clone)]
pub enum StateStoreDataValue {
SyncToken(String),
SupportedVersions(TtlValue<SupportedVersionsResponse>),
WellKnown(TtlValue<Option<WellKnownResponse>>),
Filter(String),
UserAvatarUrl(OwnedMxcUri),
RecentlyVisitedRooms(Vec<OwnedRoomId>),
UtdHookManagerData(GrowableBloom),
OneTimeKeyAlreadyUploaded,
ComposerDraft(ComposerDraft),
SeenKnockRequests(BTreeMap<OwnedEventId, OwnedUserId>),
ThreadSubscriptionsCatchupTokens(Vec<ThreadSubscriptionCatchupToken>),
HomeserverCapabilities(TtlValue<Capabilities>),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ThreadSubscriptionCatchupToken {
pub from: String,
pub to: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ComposerDraft {
pub plain_text: String,
pub html_text: Option<String>,
pub draft_type: ComposerDraftType,
#[serde(default)]
pub attachments: Vec<DraftAttachment>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DraftAttachment {
pub filename: String,
pub content: DraftAttachmentContent,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type")]
pub enum DraftAttachmentContent {
Image {
data: Vec<u8>,
mimetype: Option<String>,
size: Option<u64>,
width: Option<u64>,
height: Option<u64>,
blurhash: Option<String>,
thumbnail: Option<DraftThumbnail>,
},
Video {
data: Vec<u8>,
mimetype: Option<String>,
size: Option<u64>,
width: Option<u64>,
height: Option<u64>,
duration: Option<std::time::Duration>,
blurhash: Option<String>,
thumbnail: Option<DraftThumbnail>,
},
Audio {
data: Vec<u8>,
mimetype: Option<String>,
size: Option<u64>,
duration: Option<std::time::Duration>,
},
File {
data: Vec<u8>,
mimetype: Option<String>,
size: Option<u64>,
},
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DraftThumbnail {
pub filename: String,
pub data: Vec<u8>,
pub mimetype: Option<String>,
pub width: Option<u64>,
pub height: Option<u64>,
pub size: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ComposerDraftType {
NewMessage,
Reply {
event_id: OwnedEventId,
},
Edit {
event_id: OwnedEventId,
},
}
impl StateStoreDataValue {
pub fn into_sync_token(self) -> Option<String> {
as_variant!(self, Self::SyncToken)
}
pub fn into_filter(self) -> Option<String> {
as_variant!(self, Self::Filter)
}
pub fn into_user_avatar_url(self) -> Option<OwnedMxcUri> {
as_variant!(self, Self::UserAvatarUrl)
}
pub fn into_recently_visited_rooms(self) -> Option<Vec<OwnedRoomId>> {
as_variant!(self, Self::RecentlyVisitedRooms)
}
pub fn into_utd_hook_manager_data(self) -> Option<GrowableBloom> {
as_variant!(self, Self::UtdHookManagerData)
}
pub fn into_composer_draft(self) -> Option<ComposerDraft> {
as_variant!(self, Self::ComposerDraft)
}
pub fn into_supported_versions(self) -> Option<TtlValue<SupportedVersionsResponse>> {
as_variant!(self, Self::SupportedVersions)
}
pub fn into_well_known(self) -> Option<TtlValue<Option<WellKnownResponse>>> {
as_variant!(self, Self::WellKnown)
}
pub fn into_seen_knock_requests(self) -> Option<BTreeMap<OwnedEventId, OwnedUserId>> {
as_variant!(self, Self::SeenKnockRequests)
}
pub fn into_thread_subscriptions_catchup_tokens(
self,
) -> Option<Vec<ThreadSubscriptionCatchupToken>> {
as_variant!(self, Self::ThreadSubscriptionsCatchupTokens)
}
pub fn into_homeserver_capabilities(self) -> Option<TtlValue<Capabilities>> {
as_variant!(self, Self::HomeserverCapabilities)
}
}
#[derive(Debug, Clone, Copy)]
pub enum StateStoreDataKey<'a> {
SyncToken,
SupportedVersions,
WellKnown,
Filter(&'a str),
UserAvatarUrl(&'a UserId),
RecentlyVisitedRooms(&'a UserId),
UtdHookManagerData,
OneTimeKeyAlreadyUploaded,
ComposerDraft(&'a RoomId, Option<&'a EventId>),
SeenKnockRequests(&'a RoomId),
ThreadSubscriptionsCatchupTokens,
HomeserverCapabilities,
}
impl StateStoreDataKey<'_> {
pub const SYNC_TOKEN: &'static str = "sync_token";
pub const SUPPORTED_VERSIONS: &'static str = "server_capabilities";
pub const WELL_KNOWN: &'static str = "well_known";
pub const FILTER: &'static str = "filter";
pub const USER_AVATAR_URL: &'static str = "user_avatar_url";
pub const RECENTLY_VISITED_ROOMS: &'static str = "recently_visited_rooms";
pub const UTD_HOOK_MANAGER_DATA: &'static str = "utd_hook_manager_data";
pub const ONE_TIME_KEY_ALREADY_UPLOADED: &'static str = "one_time_key_already_uploaded";
pub const COMPOSER_DRAFT: &'static str = "composer_draft";
pub const SEEN_KNOCK_REQUESTS: &'static str = "seen_knock_requests";
pub const THREAD_SUBSCRIPTIONS_CATCHUP_TOKENS: &'static str =
"thread_subscriptions_catchup_tokens";
pub const HOMESERVER_CAPABILITIES: &'static str = "homeserver_capabilities";
}
pub fn compare_thread_subscription_bump_stamps(
previous: Option<u64>,
new: &mut Option<u64>,
) -> bool {
match (previous, &new) {
(Some(prev_bump), None) => {
*new = Some(prev_bump);
}
(Some(prev_bump), Some(new_bump)) if *new_bump <= prev_bump => {
return false;
}
_ => {}
}
true
}
#[cfg(test)]
mod tests {
mod save_locked_state_store {
use std::time::Duration;
use assert_matches::assert_matches;
use futures_util::future::{self, Either};
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
use gloo_timers::future::sleep;
use matrix_sdk_common::executor::spawn;
use matrix_sdk_test::async_test;
use tokio::sync::Mutex;
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
use tokio::time::sleep;
use crate::{
StateChanges, StateStore,
store::{IntoStateStore, MemoryStore, Result, SaveLockedStateStore},
};
async fn get_store() -> Result<impl StateStore> {
Ok(SaveLockedStateStore::new(MemoryStore::new()))
}
statestore_integration_tests!();
#[async_test]
async fn test_state_store_only_accepts_guard_for_underlying_mutex() {
let state_store = SaveLockedStateStore::new(MemoryStore::new());
let state_changes = StateChanges::default();
state_store
.save_changes_with_guard(&state_store.lock().lock().await, &state_changes)
.await
.expect("state store accepts guard for underlying mutex");
let mutex = Mutex::new(());
state_store
.save_changes_with_guard(&mutex.lock().await, &state_changes)
.await
.expect_err("state store does not accept guard for unknown mutex");
}
#[derive(Debug)]
struct Elapsed;
async fn timeout<F: Future + Unpin>(
duration: Duration,
f: F,
) -> Result<F::Output, Elapsed> {
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
{
match future::select(sleep(duration), f).await {
Either::Left(_) => return Err(Elapsed),
Either::Right((output, _)) => Ok(output),
}
}
#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
{
tokio::time::timeout(duration, f).await.map_err(|_| Elapsed)
}
}
#[async_test]
async fn test_state_store_waits_to_acquire_lock_before_saving_changes() {
let state_store = SaveLockedStateStore::new(MemoryStore::new().into_state_store());
let lock_task = spawn({
let state_store = state_store.clone();
async move {
let lock = state_store.lock();
let _guard = lock.lock().await;
sleep(Duration::from_secs(5)).await;
}
});
let save_task =
spawn(async move { state_store.save_changes(&StateChanges::default()).await });
assert_matches!(future::select(lock_task, save_task).await, Either::Left((_, save_task)) => {
timeout(Duration::from_millis(100), save_task)
.await
.expect("task completes before timeout")
.expect("task completes successfully")
.expect("task saves changes");
});
}
}
}