#[cfg(feature = "experimental-timeline")]
use std::collections::{BTreeMap, HashMap};
use std::{
collections::BTreeSet,
sync::{Arc, RwLock},
};
#[cfg(feature = "experimental-timeline")]
use async_stream::stream;
use async_trait::async_trait;
use dashmap::{DashMap, DashSet};
use lru::LruCache;
#[allow(unused_imports)]
use matrix_sdk_common::{instant::Instant, locks::Mutex};
use ruma::{
canonical_json::redact,
events::{
presence::PresenceEvent,
receipt::{Receipt, ReceiptType},
room::member::{MembershipState, StrippedRoomMemberEvent, SyncRoomMemberEvent},
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
AnySyncStateEvent, GlobalAccountDataEventType, RoomAccountDataEventType, StateEventType,
},
serde::Raw,
CanonicalJsonObject, EventId, MxcUri, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId,
RoomVersionId, UserId,
};
#[cfg(feature = "experimental-timeline")]
use ruma::{
canonical_json::redact_in_place,
events::{
room::redaction::SyncRoomRedactionEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
},
};
use tracing::{info, warn};
#[cfg(feature = "experimental-timeline")]
use super::BoxStream;
use super::{Result, RoomInfo, StateChanges, StateStore, StoreError};
#[cfg(feature = "experimental-timeline")]
use crate::deserialized_responses::SyncTimelineEvent;
use crate::{
deserialized_responses::MemberEvent,
media::{MediaRequest, UniqueKey},
MinimalRoomMemberEvent,
};
#[allow(clippy::type_complexity)]
#[derive(Debug, Clone)]
pub struct MemoryStore {
sync_token: Arc<RwLock<Option<String>>>,
filters: Arc<DashMap<String, String>>,
account_data: Arc<DashMap<GlobalAccountDataEventType, Raw<AnyGlobalAccountDataEvent>>>,
members: Arc<DashMap<OwnedRoomId, DashMap<OwnedUserId, SyncRoomMemberEvent>>>,
profiles: Arc<DashMap<OwnedRoomId, DashMap<OwnedUserId, MinimalRoomMemberEvent>>>,
display_names: Arc<DashMap<OwnedRoomId, DashMap<String, BTreeSet<OwnedUserId>>>>,
joined_user_ids: Arc<DashMap<OwnedRoomId, DashSet<OwnedUserId>>>,
invited_user_ids: Arc<DashMap<OwnedRoomId, DashSet<OwnedUserId>>>,
room_info: Arc<DashMap<OwnedRoomId, RoomInfo>>,
room_state:
Arc<DashMap<OwnedRoomId, DashMap<StateEventType, DashMap<String, Raw<AnySyncStateEvent>>>>>,
room_account_data:
Arc<DashMap<OwnedRoomId, DashMap<RoomAccountDataEventType, Raw<AnyRoomAccountDataEvent>>>>,
stripped_room_infos: Arc<DashMap<OwnedRoomId, RoomInfo>>,
stripped_room_state: Arc<
DashMap<OwnedRoomId, DashMap<StateEventType, DashMap<String, Raw<AnyStrippedStateEvent>>>>,
>,
stripped_members: Arc<DashMap<OwnedRoomId, DashMap<OwnedUserId, StrippedRoomMemberEvent>>>,
stripped_joined_user_ids: Arc<DashMap<OwnedRoomId, DashSet<OwnedUserId>>>,
stripped_invited_user_ids: Arc<DashMap<OwnedRoomId, DashSet<OwnedUserId>>>,
presence: Arc<DashMap<OwnedUserId, Raw<PresenceEvent>>>,
room_user_receipts:
Arc<DashMap<OwnedRoomId, DashMap<String, DashMap<OwnedUserId, (OwnedEventId, Receipt)>>>>,
room_event_receipts: Arc<
DashMap<OwnedRoomId, DashMap<String, DashMap<OwnedEventId, DashMap<OwnedUserId, Receipt>>>>,
>,
media: Arc<Mutex<LruCache<String, Vec<u8>>>>,
custom: Arc<DashMap<Vec<u8>, Vec<u8>>>,
#[cfg(feature = "experimental-timeline")]
room_timeline: Arc<DashMap<OwnedRoomId, TimelineData>>,
}
impl Default for MemoryStore {
fn default() -> Self {
Self::new()
}
}
impl MemoryStore {
#[allow(dead_code)]
pub fn new() -> Self {
Self {
sync_token: Default::default(),
filters: Default::default(),
account_data: Default::default(),
members: Default::default(),
profiles: Default::default(),
display_names: Default::default(),
joined_user_ids: Default::default(),
invited_user_ids: Default::default(),
room_info: Default::default(),
room_state: Default::default(),
room_account_data: Default::default(),
stripped_room_infos: Default::default(),
stripped_room_state: Default::default(),
stripped_members: Default::default(),
stripped_joined_user_ids: Default::default(),
stripped_invited_user_ids: Default::default(),
presence: Default::default(),
room_user_receipts: Default::default(),
room_event_receipts: Default::default(),
media: Arc::new(Mutex::new(LruCache::new(
100.try_into().expect("100 is a non-zero usize"),
))),
custom: DashMap::new().into(),
#[cfg(feature = "experimental-timeline")]
room_timeline: Default::default(),
}
}
async fn save_filter(&self, filter_name: &str, filter_id: &str) -> Result<()> {
self.filters.insert(filter_name.to_owned(), filter_id.to_owned());
Ok(())
}
async fn get_filter(&self, filter_name: &str) -> Result<Option<String>> {
Ok(self.filters.get(filter_name).map(|f| f.to_string()))
}
async fn get_sync_token(&self) -> Result<Option<String>> {
Ok(self.sync_token.read().unwrap().clone())
}
async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
let now = Instant::now();
if let Some(s) = &changes.sync_token {
*self.sync_token.write().unwrap() = Some(s.to_owned());
}
for (room, events) in &changes.members {
for event in events.values() {
self.stripped_joined_user_ids.remove(room);
self.stripped_invited_user_ids.remove(room);
match event.membership() {
MembershipState::Join => {
self.joined_user_ids
.entry(room.clone())
.or_default()
.insert(event.state_key().to_owned());
self.invited_user_ids
.entry(room.clone())
.or_default()
.remove(event.state_key());
}
MembershipState::Invite => {
self.invited_user_ids
.entry(room.clone())
.or_default()
.insert(event.state_key().to_owned());
self.joined_user_ids
.entry(room.clone())
.or_default()
.remove(event.state_key());
}
_ => {
self.joined_user_ids
.entry(room.clone())
.or_default()
.remove(event.state_key());
self.invited_user_ids
.entry(room.clone())
.or_default()
.remove(event.state_key());
}
}
self.members
.entry(room.clone())
.or_default()
.insert(event.state_key().to_owned(), event.clone());
self.stripped_members.remove(room);
}
}
for (room, users) in &changes.profiles {
for (user_id, profile) in users {
self.profiles
.entry(room.clone())
.or_default()
.insert(user_id.clone(), profile.clone());
}
}
for (room, map) in &changes.ambiguity_maps {
for (display_name, display_names) in map {
self.display_names
.entry(room.clone())
.or_default()
.insert(display_name.clone(), display_names.clone());
}
}
for (event_type, event) in &changes.account_data {
self.account_data.insert(event_type.clone(), event.clone());
}
for (room, events) in &changes.room_account_data {
for (event_type, event) in events {
self.room_account_data
.entry(room.clone())
.or_default()
.insert(event_type.clone(), event.clone());
}
}
for (room, event_types) in &changes.state {
for (event_type, events) in event_types {
for (state_key, event) in events {
self.room_state
.entry(room.clone())
.or_default()
.entry(event_type.clone())
.or_default()
.insert(state_key.to_owned(), event.clone());
self.stripped_room_state.remove(room);
}
}
}
for (room_id, room_info) in &changes.room_infos {
self.room_info.insert(room_id.clone(), room_info.clone());
self.stripped_room_infos.remove(room_id);
}
for (sender, event) in &changes.presence {
self.presence.insert(sender.clone(), event.clone());
}
for (room_id, info) in &changes.stripped_room_infos {
self.stripped_room_infos.insert(room_id.clone(), info.clone());
self.room_info.remove(room_id);
}
for (room, events) in &changes.stripped_members {
for event in events.values() {
match event.content.membership {
MembershipState::Join => {
self.stripped_joined_user_ids
.entry(room.clone())
.or_default()
.insert(event.state_key.clone());
self.stripped_invited_user_ids
.entry(room.clone())
.or_default()
.remove(&event.state_key);
}
MembershipState::Invite => {
self.stripped_invited_user_ids
.entry(room.clone())
.or_default()
.insert(event.state_key.clone());
self.stripped_joined_user_ids
.entry(room.clone())
.or_default()
.remove(&event.state_key);
}
_ => {
self.stripped_joined_user_ids
.entry(room.clone())
.or_default()
.remove(&event.state_key);
self.stripped_invited_user_ids
.entry(room.clone())
.or_default()
.remove(&event.state_key);
}
}
self.stripped_members
.entry(room.clone())
.or_default()
.insert(event.state_key.clone(), event.clone());
}
}
for (room, event_types) in &changes.stripped_state {
for (event_type, events) in event_types {
for (state_key, event) in events {
self.stripped_room_state
.entry(room.clone())
.or_default()
.entry(event_type.clone())
.or_default()
.insert(state_key.to_owned(), event.clone());
}
}
}
for (room, content) in &changes.receipts {
for (event_id, receipts) in &content.0 {
for (receipt_type, receipts) in receipts {
for (user_id, receipt) in receipts {
if let Some((old_event, _)) = self
.room_user_receipts
.entry(room.clone())
.or_default()
.entry(receipt_type.to_string())
.or_default()
.insert(user_id.clone(), (event_id.clone(), receipt.clone()))
{
if let Some(receipt_map) = self.room_event_receipts.get(room) {
if let Some(event_map) = receipt_map.get(receipt_type.as_ref()) {
if let Some(user_map) = event_map.get_mut(&old_event) {
user_map.remove(user_id);
}
}
}
}
self.room_event_receipts
.entry(room.clone())
.or_default()
.entry(receipt_type.to_string())
.or_default()
.entry(event_id.clone())
.or_default()
.insert(user_id.clone(), receipt.clone());
}
}
}
}
let make_room_version = |room_id| {
self.room_info
.get(room_id)
.and_then(|info| info.room_version().cloned())
.unwrap_or_else(|| {
warn!(%room_id, "Unable to find the room version, assuming version 9");
RoomVersionId::V9
})
};
#[cfg(feature = "experimental-timeline")]
for (room_id, timeline) in &changes.timeline {
if timeline.sync {
info!(%room_id, "Saving new timeline batch from sync response");
} else {
info!(%room_id, "Saving new timeline batch from messages response");
}
let mut delete_timeline = false;
if timeline.limited {
info!(%room_id, "Deleting stored timeline because the sync response was limited");
delete_timeline = true;
} else if let Some(mut data) = self.room_timeline.get_mut(room_id) {
if !timeline.sync && Some(&timeline.start) != data.end.as_ref() {
warn!(%room_id, "Dropping unexpected timeline batch");
return Ok(());
}
for event in &timeline.events {
if let Some(event_id) = event.event_id() {
if data.event_id_to_position.contains_key(&event_id) {
delete_timeline = true;
break;
}
}
}
if !delete_timeline {
if timeline.sync {
data.start = timeline.start.clone();
} else {
data.end = timeline.end.clone();
}
}
}
if delete_timeline {
info!(%room_id, "Deleting stored timeline because of duplicated events");
self.room_timeline.remove(room_id);
}
let mut data =
self.room_timeline.entry(room_id.to_owned()).or_insert_with(|| TimelineData {
start: timeline.start.clone(),
end: timeline.end.clone(),
..Default::default()
});
if timeline.sync {
let mut room_version = None;
for event in &timeline.events {
if let Ok(AnySyncTimelineEvent::MessageLike(
AnySyncMessageLikeEvent::RoomRedaction(SyncRoomRedactionEvent::Original(
redaction,
)),
)) = event.event.deserialize()
{
let pos = data.event_id_to_position.get(&redaction.redacts).copied();
if let Some(position) = pos {
if let Some(mut full_event) = data.events.get_mut(&position.clone()) {
let mut event_json: CanonicalJsonObject =
full_event.event.deserialize_as()?;
let v =
room_version.get_or_insert_with(|| make_room_version(room_id));
redact_in_place(&mut event_json, v)
.map_err(StoreError::Redaction)?;
full_event.event = Raw::new(&event_json)?.cast();
}
}
}
data.start_position -= 1;
let start_position = data.start_position;
if let Some(event_id) = event.event_id() {
data.event_id_to_position.insert(event_id, start_position);
}
data.events.insert(start_position, event.clone());
}
} else {
for event in &timeline.events {
data.end_position += 1;
let end_position = data.end_position;
if let Some(event_id) = event.event_id() {
data.event_id_to_position.insert(event_id, end_position);
}
data.events.insert(end_position, event.clone());
}
}
}
for (room_id, redactions) in &changes.redactions {
let mut room_version = None;
if let Some(room) = self.room_state.get_mut(room_id) {
for mut ref_room_mu in room.iter_mut() {
for mut ref_evt_mu in ref_room_mu.value_mut().iter_mut() {
let raw_evt = ref_evt_mu.value_mut();
if let Ok(Some(event_id)) = raw_evt.get_field::<OwnedEventId>("event_id") {
if redactions.get(&event_id).is_some() {
let redacted = redact(
&raw_evt.deserialize_as::<CanonicalJsonObject>()?,
room_version.get_or_insert_with(|| make_room_version(room_id)),
)
.map_err(StoreError::Redaction)?;
*raw_evt = Raw::new(&redacted)?.cast();
}
}
}
}
}
}
info!("Saved changes in {:?}", now.elapsed());
Ok(())
}
async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
Ok(self.presence.get(user_id).map(|p| p.clone()))
}
async fn get_state_event(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_key: &str,
) -> Result<Option<Raw<AnySyncStateEvent>>> {
Ok(self
.room_state
.get(room_id)
.and_then(|e| e.get(&event_type).and_then(|s| s.get(state_key).map(|e| e.clone()))))
}
async fn get_state_events(
&self,
room_id: &RoomId,
event_type: StateEventType,
) -> Result<Vec<Raw<AnySyncStateEvent>>> {
Ok(self
.room_state
.get(room_id)
.and_then(|e| {
e.get(&event_type).map(|s| s.iter().map(|e| e.clone()).collect::<Vec<_>>())
})
.unwrap_or_default())
}
async fn get_profile(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<MinimalRoomMemberEvent>> {
Ok(self.profiles.get(room_id).and_then(|p| p.get(user_id).map(|p| p.clone())))
}
async fn get_member_event(
&self,
room_id: &RoomId,
state_key: &UserId,
) -> Result<Option<MemberEvent>> {
if let Some(e) =
self.stripped_members.get(room_id).and_then(|m| m.get(state_key).map(|m| m.clone()))
{
Ok(Some(MemberEvent::Stripped(e)))
} else if let Some(e) =
self.members.get(room_id).and_then(|m| m.get(state_key).map(|m| m.clone()))
{
Ok(Some(MemberEvent::Sync(e)))
} else {
Ok(None)
}
}
fn get_user_ids(&self, room_id: &RoomId) -> Vec<OwnedUserId> {
if let Some(u) = self.stripped_members.get(room_id) {
u.iter().map(|u| u.key().clone()).collect()
} else {
self.members
.get(room_id)
.map(|u| u.iter().map(|u| u.key().clone()).collect())
.unwrap_or_default()
}
}
fn get_invited_user_ids(&self, room_id: &RoomId) -> Vec<OwnedUserId> {
self.invited_user_ids
.get(room_id)
.map(|u| u.iter().map(|u| u.clone()).collect())
.unwrap_or_default()
}
fn get_joined_user_ids(&self, room_id: &RoomId) -> Vec<OwnedUserId> {
self.joined_user_ids
.get(room_id)
.map(|u| u.iter().map(|u| u.clone()).collect())
.unwrap_or_default()
}
fn get_stripped_invited_user_ids(&self, room_id: &RoomId) -> Vec<OwnedUserId> {
self.stripped_invited_user_ids
.get(room_id)
.map(|u| u.iter().map(|u| u.clone()).collect())
.unwrap_or_default()
}
fn get_stripped_joined_user_ids(&self, room_id: &RoomId) -> Vec<OwnedUserId> {
self.stripped_joined_user_ids
.get(room_id)
.map(|u| u.iter().map(|u| u.clone()).collect())
.unwrap_or_default()
}
fn get_room_infos(&self) -> Vec<RoomInfo> {
self.room_info.iter().map(|r| r.clone()).collect()
}
fn get_stripped_room_infos(&self) -> Vec<RoomInfo> {
self.stripped_room_infos.iter().map(|r| r.clone()).collect()
}
async fn get_account_data_event(
&self,
event_type: GlobalAccountDataEventType,
) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
Ok(self.account_data.get(&event_type).map(|e| e.clone()))
}
async fn get_room_account_data_event(
&self,
room_id: &RoomId,
event_type: RoomAccountDataEventType,
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
Ok(self.room_account_data.get(room_id).and_then(|m| m.get(&event_type).map(|e| e.clone())))
}
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
user_id: &UserId,
) -> Result<Option<(OwnedEventId, Receipt)>> {
Ok(self.room_user_receipts.get(room_id).and_then(|m| {
m.get(receipt_type.as_ref()).and_then(|m| m.get(user_id).map(|r| r.clone()))
}))
}
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
event_id: &EventId,
) -> Result<Vec<(OwnedUserId, Receipt)>> {
Ok(self
.room_event_receipts
.get(room_id)
.and_then(|m| {
m.get(receipt_type.as_ref()).and_then(|m| {
m.get(event_id)
.map(|m| m.iter().map(|r| (r.key().clone(), r.value().clone())).collect())
})
})
.unwrap_or_default())
}
async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.custom.get(key).map(|e| e.value().clone()))
}
async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
Ok(self.custom.insert(key.to_vec(), value))
}
async fn add_media_content(&self, request: &MediaRequest, data: Vec<u8>) -> Result<()> {
self.media.lock().await.put(request.unique_key(), data);
Ok(())
}
async fn get_media_content(&self, request: &MediaRequest) -> Result<Option<Vec<u8>>> {
Ok(self.media.lock().await.get(&request.unique_key()).cloned())
}
async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> {
self.media.lock().await.pop(&request.unique_key());
Ok(())
}
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
let mut media_store = self.media.lock().await;
let keys: Vec<String> = media_store
.iter()
.filter_map(
|(key, _)| if key.starts_with(&uri.to_string()) { Some(key.clone()) } else { None },
)
.collect();
for key in keys {
media_store.pop(&key);
}
Ok(())
}
async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
self.members.remove(room_id);
self.profiles.remove(room_id);
self.display_names.remove(room_id);
self.joined_user_ids.remove(room_id);
self.invited_user_ids.remove(room_id);
self.room_info.remove(room_id);
self.room_state.remove(room_id);
self.room_account_data.remove(room_id);
self.stripped_room_infos.remove(room_id);
self.stripped_room_state.remove(room_id);
self.stripped_members.remove(room_id);
self.room_user_receipts.remove(room_id);
self.room_event_receipts.remove(room_id);
#[cfg(feature = "experimental-timeline")]
self.room_timeline.remove(room_id);
Ok(())
}
#[cfg(feature = "experimental-timeline")]
async fn room_timeline(
&self,
room_id: &RoomId,
) -> Result<Option<(BoxStream<Result<SyncTimelineEvent>>, Option<String>)>> {
let (events, end_token) = if let Some(data) = self.room_timeline.get(room_id) {
(data.events.clone(), data.end.clone())
} else {
info!(%room_id, "Couldn't find a previously stored timeline");
return Ok(None);
};
let stream = stream! {
for (_, item) in events {
yield Ok(item);
}
};
info!(%room_id, ?end_token, "Found previously stored timeline");
Ok(Some((Box::pin(stream), end_token)))
}
}
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
impl StateStore for MemoryStore {
async fn save_filter(&self, filter_name: &str, filter_id: &str) -> Result<()> {
self.save_filter(filter_name, filter_id).await
}
async fn save_changes(&self, changes: &StateChanges) -> Result<()> {
self.save_changes(changes).await
}
async fn get_filter(&self, filter_id: &str) -> Result<Option<String>> {
self.get_filter(filter_id).await
}
async fn get_sync_token(&self) -> Result<Option<String>> {
self.get_sync_token().await
}
async fn get_presence_event(&self, user_id: &UserId) -> Result<Option<Raw<PresenceEvent>>> {
self.get_presence_event(user_id).await
}
async fn get_state_event(
&self,
room_id: &RoomId,
event_type: StateEventType,
state_key: &str,
) -> Result<Option<Raw<AnySyncStateEvent>>> {
self.get_state_event(room_id, event_type, state_key).await
}
async fn get_state_events(
&self,
room_id: &RoomId,
event_type: StateEventType,
) -> Result<Vec<Raw<AnySyncStateEvent>>> {
self.get_state_events(room_id, event_type).await
}
async fn get_profile(
&self,
room_id: &RoomId,
user_id: &UserId,
) -> Result<Option<MinimalRoomMemberEvent>> {
self.get_profile(room_id, user_id).await
}
async fn get_member_event(
&self,
room_id: &RoomId,
state_key: &UserId,
) -> Result<Option<MemberEvent>> {
self.get_member_event(room_id, state_key).await
}
async fn get_user_ids(&self, room_id: &RoomId) -> Result<Vec<OwnedUserId>> {
Ok(self.get_user_ids(room_id))
}
async fn get_invited_user_ids(&self, room_id: &RoomId) -> Result<Vec<OwnedUserId>> {
let v = self.get_stripped_invited_user_ids(room_id);
if !v.is_empty() {
return Ok(v);
}
Ok(self.get_invited_user_ids(room_id))
}
async fn get_joined_user_ids(&self, room_id: &RoomId) -> Result<Vec<OwnedUserId>> {
let v = self.get_stripped_joined_user_ids(room_id);
if !v.is_empty() {
return Ok(v);
}
Ok(self.get_joined_user_ids(room_id))
}
async fn get_room_infos(&self) -> Result<Vec<RoomInfo>> {
Ok(self.get_room_infos())
}
async fn get_stripped_room_infos(&self) -> Result<Vec<RoomInfo>> {
Ok(self.get_stripped_room_infos())
}
async fn get_users_with_display_name(
&self,
room_id: &RoomId,
display_name: &str,
) -> Result<BTreeSet<OwnedUserId>> {
Ok(self
.display_names
.get(room_id)
.and_then(|d| d.get(display_name).map(|d| d.clone()))
.unwrap_or_default())
}
async fn get_account_data_event(
&self,
event_type: GlobalAccountDataEventType,
) -> Result<Option<Raw<AnyGlobalAccountDataEvent>>> {
self.get_account_data_event(event_type).await
}
async fn get_room_account_data_event(
&self,
room_id: &RoomId,
event_type: RoomAccountDataEventType,
) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
self.get_room_account_data_event(room_id, event_type).await
}
async fn get_user_room_receipt_event(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
user_id: &UserId,
) -> Result<Option<(OwnedEventId, Receipt)>> {
self.get_user_room_receipt_event(room_id, receipt_type, user_id).await
}
async fn get_event_room_receipt_events(
&self,
room_id: &RoomId,
receipt_type: ReceiptType,
event_id: &EventId,
) -> Result<Vec<(OwnedUserId, Receipt)>> {
self.get_event_room_receipt_events(room_id, receipt_type, event_id).await
}
async fn get_custom_value(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.get_custom_value(key).await
}
async fn set_custom_value(&self, key: &[u8], value: Vec<u8>) -> Result<Option<Vec<u8>>> {
self.set_custom_value(key, value).await
}
async fn add_media_content(&self, request: &MediaRequest, data: Vec<u8>) -> Result<()> {
self.add_media_content(request, data).await
}
async fn get_media_content(&self, request: &MediaRequest) -> Result<Option<Vec<u8>>> {
self.get_media_content(request).await
}
async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> {
self.remove_media_content(request).await
}
async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
self.remove_media_content_for_uri(uri).await
}
async fn remove_room(&self, room_id: &RoomId) -> Result<()> {
self.remove_room(room_id).await
}
#[cfg(feature = "experimental-timeline")]
async fn room_timeline(
&self,
room_id: &RoomId,
) -> Result<Option<(BoxStream<Result<SyncTimelineEvent>>, Option<String>)>> {
self.room_timeline(room_id).await
}
}
#[derive(Debug, Default)]
#[cfg(feature = "experimental-timeline")]
struct TimelineData {
pub start: String,
pub start_position: isize,
pub end: Option<String>,
pub end_position: isize,
pub events: BTreeMap<isize, SyncTimelineEvent>,
pub event_id_to_position: HashMap<OwnedEventId, isize>,
}
#[cfg(test)]
mod tests {
use super::{MemoryStore, Result, StateStore};
async fn get_store() -> Result<impl StateStore> {
Ok(MemoryStore::new())
}
statestore_integration_tests!();
}