1use std::collections::{BTreeMap, BTreeSet, HashMap};
4
5use assert_matches::assert_matches;
6use assert_matches2::assert_let;
7use growable_bloom_filter::GrowableBloomBuilder;
8use matrix_sdk_test::{event_factory::EventFactory, test_json};
9use ruma::{
10 api::{
11 client::discovery::discover_homeserver::{HomeserverInfo, RtcFocusInfo},
12 FeatureFlag, MatrixVersion,
13 },
14 event_id,
15 events::{
16 presence::PresenceEvent,
17 receipt::{ReceiptThread, ReceiptType},
18 room::{
19 member::{
20 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
21 SyncRoomMemberEvent,
22 },
23 message::RoomMessageEventContent,
24 power_levels::RoomPowerLevelsEventContent,
25 topic::RoomTopicEventContent,
26 },
27 AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
28 AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
29 RoomAccountDataEventType, StateEventType, SyncStateEvent,
30 },
31 owned_event_id, owned_mxc_uri, room_id,
32 serde::Raw,
33 uint, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId,
34 TransactionId, UserId,
35};
36use serde_json::{json, value::Value as JsonValue};
37
38use super::{
39 send_queue::SentRequestKey, DependentQueuedRequestKind, DisplayName, DynStateStore,
40 RoomLoadSettings, ServerInfo, WellKnownResponse,
41};
42use crate::{
43 deserialized_responses::MemberEvent,
44 store::{ChildTransactionId, QueueWedgeError, Result, SerializableEventContent, StateStoreExt},
45 RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
46};
47
48#[allow(async_fn_in_trait)]
53pub trait StateStoreIntegrationTests {
54 async fn populate(&self) -> Result<()>;
56 async fn test_topic_redaction(&self) -> Result<()>;
58 async fn test_populate_store(&self) -> Result<()>;
60 async fn test_member_saving(&self);
62 async fn test_filter_saving(&self);
64 async fn test_user_avatar_url_saving(&self);
66 async fn test_sync_token_saving(&self);
68 async fn test_utd_hook_manager_data_saving(&self);
70 async fn test_stripped_member_saving(&self);
72 async fn test_power_level_saving(&self);
74 async fn test_receipts_saving(&self);
76 async fn test_custom_storage(&self) -> Result<()>;
78 async fn test_stripped_non_stripped(&self) -> Result<()>;
80 async fn test_room_removal(&self) -> Result<()>;
82 async fn test_profile_removal(&self) -> Result<()>;
84 async fn test_presence_saving(&self);
86 async fn test_display_names_saving(&self);
88 async fn test_send_queue(&self);
90 async fn test_send_queue_priority(&self);
92 async fn test_send_queue_dependents(&self);
94 async fn test_update_send_queue_dependent(&self);
96 async fn test_server_info_saving(&self);
98 async fn test_get_room_infos(&self);
100}
101
102impl StateStoreIntegrationTests for DynStateStore {
103 async fn populate(&self) -> Result<()> {
104 let mut changes = StateChanges::default();
105
106 let user_id = user_id();
107 let invited_user_id = invited_user_id();
108 let room_id = room_id();
109 let stripped_room_id = stripped_room_id();
110
111 changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
112
113 let presence_json: &JsonValue = &test_json::PRESENCE;
114 let presence_raw =
115 serde_json::from_value::<Raw<PresenceEvent>>(presence_json.clone()).unwrap();
116 let presence_event = presence_raw.deserialize().unwrap();
117 changes.add_presence_event(presence_event, presence_raw);
118
119 let pushrules_json: &JsonValue = &test_json::PUSH_RULES;
120 let pushrules_raw =
121 serde_json::from_value::<Raw<AnyGlobalAccountDataEvent>>(pushrules_json.clone())
122 .unwrap();
123 let pushrules_event = pushrules_raw.deserialize().unwrap();
124 changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
125
126 let mut room = RoomInfo::new(room_id, RoomState::Joined);
127 room.mark_as_left();
128
129 let tag_json: &JsonValue = &test_json::TAG;
130 let tag_raw =
131 serde_json::from_value::<Raw<AnyRoomAccountDataEvent>>(tag_json.clone()).unwrap();
132 let tag_event = tag_raw.deserialize().unwrap();
133 changes.add_room_account_data(room_id, tag_event, tag_raw);
134
135 let name_json: &JsonValue = &test_json::NAME;
136 let name_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(name_json.clone()).unwrap();
137 let name_event = name_raw.deserialize().unwrap();
138 room.handle_state_event(&name_event);
139 changes.add_state_event(room_id, name_event, name_raw);
140
141 let topic_json: &JsonValue = &test_json::TOPIC;
142 let topic_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(topic_json.clone())
143 .expect("can create sync-state-event for topic");
144 let topic_event = topic_raw.deserialize().expect("can deserialize raw topic");
145 room.handle_state_event(&topic_event);
146 changes.add_state_event(room_id, topic_event, topic_raw);
147
148 let mut room_ambiguity_map = HashMap::new();
149 let mut room_profiles = BTreeMap::new();
150
151 let member_json: &JsonValue = &test_json::MEMBER;
152 let member_event: SyncRoomMemberEvent =
153 serde_json::from_value(member_json.clone()).unwrap();
154 let displayname = DisplayName::new(
155 member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
156 );
157 room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
158 room_profiles.insert(user_id.to_owned(), (&member_event).into());
159
160 let member_state_raw =
161 serde_json::from_value::<Raw<AnySyncStateEvent>>(member_json.clone()).unwrap();
162 let member_state_event = member_state_raw.deserialize().unwrap();
163 changes.add_state_event(room_id, member_state_event, member_state_raw);
164
165 let invited_member_json: &JsonValue = &test_json::MEMBER_INVITE;
166 let invited_member_event: SyncRoomMemberEvent =
168 serde_json::from_value(invited_member_json.clone()).unwrap();
169 room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
170 room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
171
172 let invited_member_state_raw =
173 serde_json::from_value::<Raw<AnySyncStateEvent>>(invited_member_json.clone()).unwrap();
174 let invited_member_state_event = invited_member_state_raw.deserialize().unwrap();
175 changes.add_state_event(room_id, invited_member_state_event, invited_member_state_raw);
176
177 let f = EventFactory::new().room(room_id);
178 let receipt_content = f
179 .read_receipts()
180 .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
181 .into_content();
182 changes.add_receipts(room_id, receipt_content);
183
184 changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
185 changes.profiles.insert(room_id.to_owned(), room_profiles);
186 changes.add_room(room);
187
188 let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
189
190 let stripped_name_json: &JsonValue = &test_json::NAME_STRIPPED;
191 let stripped_name_raw =
192 serde_json::from_value::<Raw<AnyStrippedStateEvent>>(stripped_name_json.clone())
193 .unwrap();
194 let stripped_name_event = stripped_name_raw.deserialize().unwrap();
195 stripped_room.handle_stripped_state_event(&stripped_name_event);
196 changes.stripped_state.insert(
197 stripped_room_id.to_owned(),
198 BTreeMap::from([(
199 stripped_name_event.event_type(),
200 BTreeMap::from([(
201 stripped_name_event.state_key().to_owned(),
202 stripped_name_raw.clone(),
203 )]),
204 )]),
205 );
206
207 changes.add_room(stripped_room);
208
209 let stripped_member_json: &JsonValue = &test_json::MEMBER_STRIPPED;
210 let stripped_member_event = Raw::new(&stripped_member_json.clone()).unwrap().cast();
211 changes.add_stripped_member(stripped_room_id, user_id, stripped_member_event);
212
213 self.save_changes(&changes).await?;
214
215 Ok(())
216 }
217
218 async fn test_topic_redaction(&self) -> Result<()> {
219 let room_id = room_id();
220 self.populate().await?;
221
222 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
223 assert_eq!(
224 self.get_state_event_static::<RoomTopicEventContent>(room_id)
225 .await?
226 .expect("room topic found before redaction")
227 .deserialize()
228 .expect("can deserialize room topic before redaction")
229 .as_sync()
230 .expect("room topic is a sync state event")
231 .as_original()
232 .expect("room topic is not redacted yet")
233 .content
234 .topic,
235 "😀"
236 );
237
238 let mut changes = StateChanges::default();
239
240 let redaction_json: &JsonValue = &test_json::TOPIC_REDACTION;
241 let redaction_evt: Raw<_> = serde_json::from_value(redaction_json.clone())
242 .expect("topic redaction event making works");
243 let redacted_event_id: OwnedEventId = redaction_evt.get_field("redacts").unwrap().unwrap();
244
245 changes.add_redaction(room_id, &redacted_event_id, redaction_evt);
246 self.save_changes(&changes).await?;
247
248 let redacted_event = self
249 .get_state_event_static::<RoomTopicEventContent>(room_id)
250 .await?
251 .expect("room topic found after redaction")
252 .deserialize()
253 .expect("can deserialize room topic after redaction");
254
255 assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
256
257 Ok(())
258 }
259
260 async fn test_populate_store(&self) -> Result<()> {
261 let room_id = room_id();
262 let user_id = user_id();
263 let display_name = DisplayName::new("example");
264
265 self.populate().await?;
266
267 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
268 assert!(self.get_presence_event(user_id).await?.is_some());
269 assert_eq!(
270 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
271 2,
272 "Expected to find 2 room infos"
273 );
274 assert!(self
275 .get_account_data_event(GlobalAccountDataEventType::PushRules)
276 .await?
277 .is_some());
278
279 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
280 assert_eq!(
281 self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
282 1,
283 "Expected to find 1 room topic"
284 );
285 assert!(self.get_profile(room_id, user_id).await?.is_some());
286 assert!(self.get_member_event(room_id, user_id).await?.is_some());
287 assert_eq!(
288 self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
289 2,
290 "Expected to find 2 members for room"
291 );
292 assert_eq!(
293 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
294 1,
295 "Expected to find 1 invited user ids"
296 );
297 assert_eq!(
298 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
299 1,
300 "Expected to find 1 joined user ids"
301 );
302 assert_eq!(
303 self.get_users_with_display_name(room_id, &display_name).await?.len(),
304 2,
305 "Expected to find 2 display names for room"
306 );
307 assert!(self
308 .get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
309 .await?
310 .is_some());
311 assert!(self
312 .get_user_room_receipt_event(
313 room_id,
314 ReceiptType::Read,
315 ReceiptThread::Unthreaded,
316 user_id
317 )
318 .await?
319 .is_some());
320 assert_eq!(
321 self.get_event_room_receipt_events(
322 room_id,
323 ReceiptType::Read,
324 ReceiptThread::Unthreaded,
325 first_receipt_event_id()
326 )
327 .await?
328 .len(),
329 1,
330 "Expected to find 1 read receipt"
331 );
332 Ok(())
333 }
334
335 async fn test_member_saving(&self) {
336 let room_id = room_id!("!test_member_saving:localhost");
337 let user_id = user_id();
338 let second_user_id = user_id!("@second:localhost");
339 let third_user_id = user_id!("@third:localhost");
340 let unknown_user_id = user_id!("@unknown:localhost");
341
342 let mut user_ids = vec![user_id.to_owned()];
344 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_none());
345 let member_events = self
346 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
347 .await;
348 assert!(member_events.unwrap().is_empty());
349 assert!(self.get_profile(room_id, user_id).await.unwrap().is_none());
350 let profiles = self.get_profiles(room_id, &user_ids).await;
351 assert!(profiles.unwrap().is_empty());
352
353 let mut changes = StateChanges::default();
355 let raw_member_event = membership_event();
356 let profile = raw_member_event.deserialize().unwrap().into();
357 changes
358 .state
359 .entry(room_id.to_owned())
360 .or_default()
361 .entry(StateEventType::RoomMember)
362 .or_default()
363 .insert(user_id.into(), raw_member_event.cast());
364 changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
365 self.save_changes(&changes).await.unwrap();
366
367 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_some());
368 let member_events = self
369 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
370 .await;
371 assert_eq!(member_events.unwrap().len(), 1);
372 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
373 assert_eq!(members.len(), 1, "We expected to find members for the room");
374 assert!(self.get_profile(room_id, user_id).await.unwrap().is_some());
375 let profiles = self.get_profiles(room_id, &user_ids).await;
376 assert_eq!(profiles.unwrap().len(), 1);
377
378 let mut changes = StateChanges::default();
380 let changes_members = changes
381 .state
382 .entry(room_id.to_owned())
383 .or_default()
384 .entry(StateEventType::RoomMember)
385 .or_default();
386 let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
387 let raw_second_member_event =
388 custom_membership_event(second_user_id, event_id!("$second_member_event"));
389 let second_profile = raw_second_member_event.deserialize().unwrap().into();
390 changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
391 changes_profiles.insert(second_user_id.to_owned(), second_profile);
392 let raw_third_member_event =
393 custom_membership_event(third_user_id, event_id!("$third_member_event"));
394 let third_profile = raw_third_member_event.deserialize().unwrap().into();
395 changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
396 changes_profiles.insert(third_user_id.to_owned(), third_profile);
397 self.save_changes(&changes).await.unwrap();
398
399 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
400 assert!(self.get_member_event(room_id, second_user_id).await.unwrap().is_some());
401 assert!(self.get_member_event(room_id, third_user_id).await.unwrap().is_some());
402 let member_events = self
403 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
404 .await;
405 assert_eq!(member_events.unwrap().len(), 3);
406 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
407 assert_eq!(members.len(), 3, "We expected to find members for the room");
408 assert!(self.get_profile(room_id, second_user_id).await.unwrap().is_some());
409 assert!(self.get_profile(room_id, third_user_id).await.unwrap().is_some());
410 let profiles = self.get_profiles(room_id, &user_ids).await;
411 assert_eq!(profiles.unwrap().len(), 3);
412
413 user_ids.push(unknown_user_id.to_owned());
415 let member_events = self
416 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
417 .await;
418 assert_eq!(member_events.unwrap().len(), 3);
419 let profiles = self.get_profiles(room_id, &user_ids).await;
420 assert_eq!(profiles.unwrap().len(), 3);
421
422 let member_events = self
424 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
425 room_id,
426 &[],
427 )
428 .await;
429 assert!(member_events.unwrap().is_empty());
430 let profiles = self.get_profiles(room_id, &[]).await;
431 assert!(profiles.unwrap().is_empty());
432 }
433
434 async fn test_filter_saving(&self) {
435 let filter_name = "filter_name";
436 let filter_id = "filter_id_1234";
437
438 self.set_kv_data(
439 StateStoreDataKey::Filter(filter_name),
440 StateStoreDataValue::Filter(filter_id.to_owned()),
441 )
442 .await
443 .unwrap();
444 assert_let!(
445 Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
446 self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
447 );
448 assert_eq!(stored_filter_id, filter_id);
449
450 self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await.unwrap();
451 assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
452 }
453
454 async fn test_user_avatar_url_saving(&self) {
455 let user_id = user_id!("@alice:example.org");
456 let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
457
458 self.set_kv_data(
459 StateStoreDataKey::UserAvatarUrl(user_id),
460 StateStoreDataValue::UserAvatarUrl(url.clone()),
461 )
462 .await
463 .unwrap();
464
465 assert_let!(
466 Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
467 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
468 );
469 assert_eq!(stored_url, url);
470
471 self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await.unwrap();
472 assert_matches!(
473 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
474 Ok(None)
475 );
476 }
477
478 async fn test_server_info_saving(&self) {
479 let versions = &[MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11];
480 let server_info = ServerInfo::new(
481 versions.iter().map(|version| version.to_string()).collect(),
482 [("org.matrix.experimental".to_owned(), true)].into(),
483 Some(WellKnownResponse {
484 homeserver: HomeserverInfo::new("matrix.example.com".to_owned()),
485 identity_server: None,
486 tile_server: None,
487 rtc_foci: vec![RtcFocusInfo::livekit("livekit.example.com".to_owned())],
488 }),
489 );
490
491 self.set_kv_data(
492 StateStoreDataKey::ServerInfo,
493 StateStoreDataValue::ServerInfo(server_info.clone()),
494 )
495 .await
496 .unwrap();
497
498 assert_let!(
499 Ok(Some(StateStoreDataValue::ServerInfo(stored_info))) =
500 self.get_kv_data(StateStoreDataKey::ServerInfo).await
501 );
502 assert_eq!(stored_info, server_info);
503
504 let decoded_server_info = stored_info.maybe_decode().unwrap();
505 let stored_supported = decoded_server_info.supported_versions();
506
507 assert_eq!(stored_supported.versions.as_ref(), versions);
508 assert_eq!(stored_supported.features.len(), 1);
509 assert!(stored_supported.features.contains(&FeatureFlag::from("org.matrix.experimental")));
510
511 self.remove_kv_data(StateStoreDataKey::ServerInfo).await.unwrap();
512 assert_matches!(self.get_kv_data(StateStoreDataKey::ServerInfo).await, Ok(None));
513 }
514
515 async fn test_sync_token_saving(&self) {
516 let sync_token_1 = "t392-516_47314_0_7_1";
517 let sync_token_2 = "t392-516_47314_0_7_2";
518
519 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
520
521 let changes =
522 StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
523 self.save_changes(&changes).await.unwrap();
524 assert_let!(
525 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
526 self.get_kv_data(StateStoreDataKey::SyncToken).await
527 );
528 assert_eq!(stored_sync_token, sync_token_1);
529
530 self.set_kv_data(
531 StateStoreDataKey::SyncToken,
532 StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
533 )
534 .await
535 .unwrap();
536 assert_let!(
537 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
538 self.get_kv_data(StateStoreDataKey::SyncToken).await
539 );
540 assert_eq!(stored_sync_token, sync_token_2);
541
542 self.remove_kv_data(StateStoreDataKey::SyncToken).await.unwrap();
543 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
544 }
545
546 async fn test_utd_hook_manager_data_saving(&self) {
547 assert!(
549 self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
550 .await
551 .expect("Could not read data")
552 .is_none(),
553 "Store was not empty at start"
554 );
555
556 let data = GrowableBloomBuilder::new().build();
558 self.set_kv_data(
559 StateStoreDataKey::UtdHookManagerData,
560 StateStoreDataValue::UtdHookManagerData(data.clone()),
561 )
562 .await
563 .expect("Could not save data");
564
565 let read_data = self
567 .get_kv_data(StateStoreDataKey::UtdHookManagerData)
568 .await
569 .expect("Could not read data")
570 .expect("no data found")
571 .into_utd_hook_manager_data()
572 .expect("not UtdHookManagerData");
573
574 assert_eq!(read_data, data);
575 }
576
577 async fn test_stripped_member_saving(&self) {
578 let room_id = room_id!("!test_stripped_member_saving:localhost");
579 let user_id = user_id();
580 let second_user_id = user_id!("@second:localhost");
581 let third_user_id = user_id!("@third:localhost");
582 let unknown_user_id = user_id!("@unknown:localhost");
583
584 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_none());
586 let member_events = self
587 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
588 room_id,
589 &[user_id.to_owned()],
590 )
591 .await;
592 assert!(member_events.unwrap().is_empty());
593
594 let mut changes = StateChanges::default();
596 changes
597 .stripped_state
598 .entry(room_id.to_owned())
599 .or_default()
600 .entry(StateEventType::RoomMember)
601 .or_default()
602 .insert(user_id.into(), stripped_membership_event().cast());
603 self.save_changes(&changes).await.unwrap();
604
605 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_some());
606 let member_events = self
607 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
608 room_id,
609 &[user_id.to_owned()],
610 )
611 .await;
612 assert_eq!(member_events.unwrap().len(), 1);
613 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
614 assert_eq!(members.len(), 1, "We expected to find members for the room");
615
616 let mut changes = StateChanges::default();
618 let changes_members = changes
619 .stripped_state
620 .entry(room_id.to_owned())
621 .or_default()
622 .entry(StateEventType::RoomMember)
623 .or_default();
624 changes_members
625 .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
626 changes_members
627 .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
628 self.save_changes(&changes).await.unwrap();
629
630 assert!(self.get_member_event(room_id, second_user_id).await.unwrap().is_some());
631 assert!(self.get_member_event(room_id, third_user_id).await.unwrap().is_some());
632 let member_events = self
633 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
634 room_id,
635 &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
636 )
637 .await;
638 assert_eq!(member_events.unwrap().len(), 3);
639 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
640 assert_eq!(members.len(), 3, "We expected to find members for the room");
641
642 let member_events = self
644 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
645 room_id,
646 &[
647 user_id.to_owned(),
648 second_user_id.to_owned(),
649 third_user_id.to_owned(),
650 unknown_user_id.to_owned(),
651 ],
652 )
653 .await;
654 assert_eq!(member_events.unwrap().len(), 3);
655
656 let member_events = self
658 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
659 room_id,
660 &[],
661 )
662 .await;
663 assert!(member_events.unwrap().is_empty());
664 }
665
666 async fn test_power_level_saving(&self) {
667 let room_id = room_id!("!test_power_level_saving:localhost");
668
669 let raw_event = power_level_event();
670 let event = raw_event.deserialize().unwrap();
671
672 assert!(self
673 .get_state_event(room_id, StateEventType::RoomPowerLevels, "")
674 .await
675 .unwrap()
676 .is_none());
677 let mut changes = StateChanges::default();
678 changes.add_state_event(room_id, event, raw_event);
679
680 self.save_changes(&changes).await.unwrap();
681 assert!(self
682 .get_state_event(room_id, StateEventType::RoomPowerLevels, "")
683 .await
684 .unwrap()
685 .is_some());
686 }
687
688 async fn test_receipts_saving(&self) {
689 let room_id = room_id!("!test_receipts_saving:localhost");
690
691 let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
692 let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
693
694 let first_receipt_ts = uint!(1436451550);
695 let second_receipt_ts = uint!(1436451653);
696 let third_receipt_ts = uint!(1436474532);
697
698 let first_receipt_event = serde_json::from_value(json!({
699 first_event_id: {
700 "m.read": {
701 user_id(): {
702 "ts": first_receipt_ts,
703 }
704 }
705 }
706 }))
707 .expect("json creation failed");
708
709 let second_receipt_event = serde_json::from_value(json!({
710 second_event_id: {
711 "m.read": {
712 user_id(): {
713 "ts": second_receipt_ts,
714 }
715 }
716 }
717 }))
718 .expect("json creation failed");
719
720 let third_receipt_event = serde_json::from_value(json!({
721 second_event_id: {
722 "m.read": {
723 user_id(): {
724 "ts": third_receipt_ts,
725 "thread_id": "main",
726 }
727 }
728 }
729 }))
730 .expect("json creation failed");
731
732 assert!(self
733 .get_user_room_receipt_event(
734 room_id,
735 ReceiptType::Read,
736 ReceiptThread::Unthreaded,
737 user_id()
738 )
739 .await
740 .expect("failed to read unthreaded user room receipt")
741 .is_none());
742 assert!(self
743 .get_event_room_receipt_events(
744 room_id,
745 ReceiptType::Read,
746 ReceiptThread::Unthreaded,
747 first_event_id
748 )
749 .await
750 .expect("failed to read unthreaded event room receipt for 1")
751 .is_empty());
752 assert!(self
753 .get_event_room_receipt_events(
754 room_id,
755 ReceiptType::Read,
756 ReceiptThread::Unthreaded,
757 second_event_id
758 )
759 .await
760 .expect("failed to read unthreaded event room receipt for 2")
761 .is_empty());
762
763 let mut changes = StateChanges::default();
764 changes.add_receipts(room_id, first_receipt_event);
765
766 self.save_changes(&changes).await.expect("writing changes fauked");
767 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
768 .get_user_room_receipt_event(
769 room_id,
770 ReceiptType::Read,
771 ReceiptThread::Unthreaded,
772 user_id(),
773 )
774 .await
775 .expect("failed to read unthreaded user room receipt after save")
776 .unwrap();
777 assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
778 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
779 let first_event_unthreaded_receipts = self
780 .get_event_room_receipt_events(
781 room_id,
782 ReceiptType::Read,
783 ReceiptThread::Unthreaded,
784 first_event_id,
785 )
786 .await
787 .expect("failed to read unthreaded event room receipt for 1 after save");
788 assert_eq!(
789 first_event_unthreaded_receipts.len(),
790 1,
791 "Found a wrong number of unthreaded receipts for 1 after save"
792 );
793 assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
794 assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
795 assert!(self
796 .get_event_room_receipt_events(
797 room_id,
798 ReceiptType::Read,
799 ReceiptThread::Unthreaded,
800 second_event_id
801 )
802 .await
803 .expect("failed to read unthreaded event room receipt for 2 after save")
804 .is_empty());
805
806 let mut changes = StateChanges::default();
807 changes.add_receipts(room_id, second_receipt_event);
808
809 self.save_changes(&changes).await.expect("Saving works");
810 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
811 .get_user_room_receipt_event(
812 room_id,
813 ReceiptType::Read,
814 ReceiptThread::Unthreaded,
815 user_id(),
816 )
817 .await
818 .expect("Getting unthreaded user room receipt after save failed")
819 .unwrap();
820 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
821 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
822 assert!(self
823 .get_event_room_receipt_events(
824 room_id,
825 ReceiptType::Read,
826 ReceiptThread::Unthreaded,
827 first_event_id
828 )
829 .await
830 .expect("Getting unthreaded event room receipt events for first event failed")
831 .is_empty());
832 let second_event_unthreaded_receipts = self
833 .get_event_room_receipt_events(
834 room_id,
835 ReceiptType::Read,
836 ReceiptThread::Unthreaded,
837 second_event_id,
838 )
839 .await
840 .expect("Getting unthreaded event room receipt events for second event failed");
841 assert_eq!(
842 second_event_unthreaded_receipts.len(),
843 1,
844 "Found a wrong number of unthreaded receipts for second event after save"
845 );
846 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
847 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
848
849 assert!(self
850 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
851 .await
852 .expect("failed to read threaded user room receipt")
853 .is_none());
854 assert!(self
855 .get_event_room_receipt_events(
856 room_id,
857 ReceiptType::Read,
858 ReceiptThread::Main,
859 second_event_id
860 )
861 .await
862 .expect("Getting threaded event room receipts for 2 failed")
863 .is_empty());
864
865 let mut changes = StateChanges::default();
866 changes.add_receipts(room_id, third_receipt_event);
867
868 self.save_changes(&changes).await.expect("Saving works");
869 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
871 .get_user_room_receipt_event(
872 room_id,
873 ReceiptType::Read,
874 ReceiptThread::Unthreaded,
875 user_id(),
876 )
877 .await
878 .expect("Getting unthreaded user room receipt after save failed")
879 .unwrap();
880 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
881 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
882 let second_event_unthreaded_receipts = self
883 .get_event_room_receipt_events(
884 room_id,
885 ReceiptType::Read,
886 ReceiptThread::Unthreaded,
887 second_event_id,
888 )
889 .await
890 .expect("Getting unthreaded event room receipt events for second event failed");
891 assert_eq!(
892 second_event_unthreaded_receipts.len(),
893 1,
894 "Found a wrong number of unthreaded receipts for second event after save"
895 );
896 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
897 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
898 let (threaded_user_receipt_event_id, threaded_user_receipt) = self
900 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
901 .await
902 .expect("Getting threaded user room receipt after save failed")
903 .unwrap();
904 assert_eq!(threaded_user_receipt_event_id, second_event_id);
905 assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
906 let second_event_threaded_receipts = self
907 .get_event_room_receipt_events(
908 room_id,
909 ReceiptType::Read,
910 ReceiptThread::Main,
911 second_event_id,
912 )
913 .await
914 .expect("Getting threaded event room receipt events for second event failed");
915 assert_eq!(
916 second_event_threaded_receipts.len(),
917 1,
918 "Found a wrong number of threaded receipts for second event after save"
919 );
920 assert_eq!(second_event_threaded_receipts[0].0, user_id());
921 assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
922 }
923
924 async fn test_custom_storage(&self) -> Result<()> {
925 let key = "my_key";
926 let value = &[0, 1, 2, 3];
927
928 self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
929
930 let read = self.get_custom_value(key.as_bytes()).await?;
931
932 assert_eq!(Some(value.as_ref()), read.as_deref());
933
934 Ok(())
935 }
936
937 async fn test_stripped_non_stripped(&self) -> Result<()> {
938 let room_id = room_id!("!test_stripped_non_stripped:localhost");
939 let user_id = user_id();
940
941 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_none());
942 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 0);
943
944 let mut changes = StateChanges::default();
945 changes
946 .state
947 .entry(room_id.to_owned())
948 .or_default()
949 .entry(StateEventType::RoomMember)
950 .or_default()
951 .insert(user_id.into(), membership_event().cast());
952 changes.add_room(RoomInfo::new(room_id, RoomState::Left));
953 self.save_changes(&changes).await.unwrap();
954
955 let member_event =
956 self.get_member_event(room_id, user_id).await.unwrap().unwrap().deserialize().unwrap();
957 assert!(matches!(member_event, MemberEvent::Sync(_)));
958 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 1);
959
960 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
961 assert_eq!(members, vec![user_id.to_owned()]);
962
963 let mut changes = StateChanges::default();
964 changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
965 changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
966 self.save_changes(&changes).await.unwrap();
967
968 let member_event =
969 self.get_member_event(room_id, user_id).await.unwrap().unwrap().deserialize().unwrap();
970 assert!(matches!(member_event, MemberEvent::Stripped(_)));
971 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 1);
972
973 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
974 assert_eq!(members, vec![user_id.to_owned()]);
975
976 Ok(())
977 }
978
979 async fn test_room_removal(&self) -> Result<()> {
980 let room_id = room_id();
981 let user_id = user_id();
982 let display_name = DisplayName::new("example");
983 let stripped_room_id = stripped_room_id();
984
985 self.populate().await?;
986
987 {
988 let txn = TransactionId::new();
990 let ev =
991 SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())
992 .unwrap();
993 self.save_send_queue_request(
994 room_id,
995 txn.clone(),
996 MilliSecondsSinceUnixEpoch::now(),
997 ev.into(),
998 0,
999 )
1000 .await?;
1001
1002 self.save_dependent_queued_request(
1004 room_id,
1005 &txn,
1006 ChildTransactionId::new(),
1007 MilliSecondsSinceUnixEpoch::now(),
1008 DependentQueuedRequestKind::RedactEvent,
1009 )
1010 .await?;
1011 }
1012
1013 self.remove_room(room_id).await?;
1014
1015 assert_eq!(
1016 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1017 1,
1018 "room is still there"
1019 );
1020
1021 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1022 assert!(
1023 self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1024 "still state events found"
1025 );
1026 assert!(self.get_profile(room_id, user_id).await?.is_none());
1027 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1028 assert!(
1029 self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1030 "still user ids found"
1031 );
1032 assert!(
1033 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1034 "still invited user ids found"
1035 );
1036 assert!(
1037 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1038 "still joined users found"
1039 );
1040 assert!(
1041 self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1042 "still display names found"
1043 );
1044 assert!(self
1045 .get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1046 .await?
1047 .is_none());
1048 assert!(self
1049 .get_user_room_receipt_event(
1050 room_id,
1051 ReceiptType::Read,
1052 ReceiptThread::Unthreaded,
1053 user_id
1054 )
1055 .await?
1056 .is_none());
1057 assert!(
1058 self.get_event_room_receipt_events(
1059 room_id,
1060 ReceiptType::Read,
1061 ReceiptThread::Unthreaded,
1062 first_receipt_event_id()
1063 )
1064 .await?
1065 .is_empty(),
1066 "still event recepts in the store"
1067 );
1068 assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1069 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1070
1071 self.remove_room(stripped_room_id).await?;
1072
1073 assert!(
1074 self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1075 "still room info found"
1076 );
1077 Ok(())
1078 }
1079
1080 async fn test_profile_removal(&self) -> Result<()> {
1081 let room_id = room_id();
1082
1083 let user_id = user_id();
1085 let invited_user_id = invited_user_id();
1086
1087 self.populate().await?;
1088
1089 let new_invite_member_json = json!({
1090 "content": {
1091 "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1092 "displayname": "example after update",
1093 "membership": "invite",
1094 "reason": "Looking for support"
1095 },
1096 "event_id": "$143273582443PhrSm:localhost",
1097 "origin_server_ts": 1432735824,
1098 "room_id": room_id,
1099 "sender": user_id,
1100 "state_key": invited_user_id,
1101 "type": "m.room.member",
1102 });
1103 let new_invite_member_event: SyncRoomMemberEvent =
1104 serde_json::from_value(new_invite_member_json.clone()).unwrap();
1105
1106 let mut changes = StateChanges {
1107 profiles_to_delete: [(
1109 room_id.to_owned(),
1110 vec![user_id.to_owned(), invited_user_id.to_owned()],
1111 )]
1112 .into(),
1113
1114 profiles: {
1116 let mut map = BTreeMap::default();
1117 map.insert(
1118 room_id.to_owned(),
1119 [(invited_user_id.to_owned(), new_invite_member_event.into())]
1120 .into_iter()
1121 .collect(),
1122 );
1123 map
1124 },
1125
1126 ..StateChanges::default()
1127 };
1128
1129 let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1130 .expect("can create sync-state-event for topic");
1131 let event = raw.deserialize().unwrap();
1132 changes.add_state_event(room_id, event, raw);
1133
1134 self.save_changes(&changes).await.unwrap();
1135
1136 assert!(self.get_profile(room_id, user_id).await?.is_none());
1138 assert!(self.get_member_event(room_id, user_id).await?.is_some());
1139
1140 let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1142 assert_eq!(
1143 invited_member_event.as_original().unwrap().content.displayname.as_deref(),
1144 Some("example after update")
1145 );
1146 assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1147
1148 Ok(())
1149 }
1150
1151 async fn test_presence_saving(&self) {
1152 let user_id = user_id();
1153 let second_user_id = user_id!("@second:localhost");
1154 let third_user_id = user_id!("@third:localhost");
1155 let unknown_user_id = user_id!("@unknown:localhost");
1156
1157 let mut user_ids = vec![user_id.to_owned()];
1159 let presence_event = self.get_presence_event(user_id).await;
1160 assert!(presence_event.unwrap().is_none());
1161 let presence_events = self.get_presence_events(&user_ids).await;
1162 assert!(presence_events.unwrap().is_empty());
1163
1164 let mut changes = StateChanges::default();
1166 changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1167 self.save_changes(&changes).await.unwrap();
1168
1169 let presence_event = self.get_presence_event(user_id).await;
1170 assert!(presence_event.unwrap().is_some());
1171 let presence_events = self.get_presence_events(&user_ids).await;
1172 assert_eq!(presence_events.unwrap().len(), 1);
1173
1174 let mut changes = StateChanges::default();
1176 changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1177 changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1178 self.save_changes(&changes).await.unwrap();
1179
1180 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1181 let presence_event = self.get_presence_event(second_user_id).await;
1182 assert!(presence_event.unwrap().is_some());
1183 let presence_event = self.get_presence_event(third_user_id).await;
1184 assert!(presence_event.unwrap().is_some());
1185 let presence_events = self.get_presence_events(&user_ids).await;
1186 assert_eq!(presence_events.unwrap().len(), 3);
1187
1188 user_ids.push(unknown_user_id.to_owned());
1190 let member_events = self.get_presence_events(&user_ids).await;
1191 assert_eq!(member_events.unwrap().len(), 3);
1192
1193 let presence_events = self.get_presence_events(&[]).await;
1195 assert!(presence_events.unwrap().is_empty());
1196 }
1197
1198 async fn test_display_names_saving(&self) {
1199 let room_id = room_id!("!test_display_names_saving:localhost");
1200 let user_id = user_id();
1201 let user_display_name = DisplayName::new("User");
1202 let second_user_id = user_id!("@second:localhost");
1203 let third_user_id = user_id!("@third:localhost");
1204 let other_display_name = DisplayName::new("Raoul");
1205 let unknown_display_name = DisplayName::new("Unknown");
1206
1207 let mut display_names = vec![user_display_name.to_owned()];
1209 let users = self.get_users_with_display_name(room_id, &user_display_name).await.unwrap();
1210 assert!(users.is_empty());
1211 let names = self.get_users_with_display_names(room_id, &display_names).await.unwrap();
1212 assert!(names.is_empty());
1213
1214 let mut changes = StateChanges::default();
1216 changes
1217 .ambiguity_maps
1218 .entry(room_id.to_owned())
1219 .or_default()
1220 .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1221 self.save_changes(&changes).await.unwrap();
1222
1223 let users = self.get_users_with_display_name(room_id, &user_display_name).await.unwrap();
1224 assert_eq!(users.len(), 1);
1225 let names = self.get_users_with_display_names(room_id, &display_names).await.unwrap();
1226 assert_eq!(names.len(), 1);
1227 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1228
1229 let mut changes = StateChanges::default();
1231 changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1232 other_display_name.to_owned(),
1233 [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1234 );
1235 self.save_changes(&changes).await.unwrap();
1236
1237 display_names.push(other_display_name.to_owned());
1238 let users = self.get_users_with_display_name(room_id, &user_display_name).await.unwrap();
1239 assert_eq!(users.len(), 1);
1240 let users = self.get_users_with_display_name(room_id, &other_display_name).await.unwrap();
1241 assert_eq!(users.len(), 2);
1242 let names = self.get_users_with_display_names(room_id, &display_names).await.unwrap();
1243 assert_eq!(names.len(), 2);
1244 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1245 assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1246
1247 display_names.push(unknown_display_name.to_owned());
1249 let names = self.get_users_with_display_names(room_id, &display_names).await.unwrap();
1250 assert_eq!(names.len(), 2);
1251
1252 let names = self.get_users_with_display_names(room_id, &[]).await;
1254 assert!(names.unwrap().is_empty());
1255 }
1256
1257 #[allow(clippy::needless_range_loop)]
1258 async fn test_send_queue(&self) {
1259 let room_id = room_id!("!test_send_queue:localhost");
1260
1261 let events = self.load_send_queue_requests(room_id).await.unwrap();
1263 assert!(events.is_empty());
1264
1265 let txn0 = TransactionId::new();
1267 let event0 =
1268 SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())
1269 .unwrap();
1270 self.save_send_queue_request(
1271 room_id,
1272 txn0.clone(),
1273 MilliSecondsSinceUnixEpoch::now(),
1274 event0.into(),
1275 0,
1276 )
1277 .await
1278 .unwrap();
1279
1280 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1282
1283 assert_eq!(pending.len(), 1);
1284 {
1285 assert_eq!(pending[0].transaction_id, txn0);
1286
1287 let deserialized = pending[0].as_event().unwrap().deserialize().unwrap();
1288 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1289 assert_eq!(content.body(), "msg0");
1290
1291 assert!(!pending[0].is_wedged());
1292 }
1293
1294 for i in 1..=3 {
1296 let txn = TransactionId::new();
1297 let event = SerializableEventContent::new(
1298 &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1299 )
1300 .unwrap();
1301
1302 self.save_send_queue_request(
1303 room_id,
1304 txn,
1305 MilliSecondsSinceUnixEpoch::now(),
1306 event.into(),
1307 0,
1308 )
1309 .await
1310 .unwrap();
1311 }
1312
1313 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1315
1316 assert_eq!(pending.len(), 4);
1318
1319 assert_eq!(pending[0].transaction_id, txn0);
1320
1321 for i in 0..4 {
1322 let deserialized = pending[i].as_event().unwrap().deserialize().unwrap();
1323 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1324 assert_eq!(content.body(), format!("msg{i}"));
1325 assert!(!pending[i].is_wedged());
1326 }
1327
1328 let txn2 = &pending[2].transaction_id;
1330 self.update_send_queue_request_status(
1331 room_id,
1332 txn2,
1333 Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1334 )
1335 .await
1336 .unwrap();
1337
1338 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1340
1341 assert_eq!(pending.len(), 4);
1343 assert_eq!(pending[0].transaction_id, txn0);
1344 assert_eq!(pending[2].transaction_id, *txn2);
1345 assert!(pending[2].is_wedged());
1346 let error = pending[2].clone().error.unwrap();
1347 let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1348 assert_eq!(generic_error, "Oops");
1349 for i in 0..4 {
1350 if i != 2 {
1351 assert!(!pending[i].is_wedged());
1352 }
1353 }
1354
1355 let event0 = SerializableEventContent::new(
1357 &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1358 )
1359 .unwrap();
1360 self.update_send_queue_request(room_id, txn2, event0.into()).await.unwrap();
1361
1362 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1364
1365 assert_eq!(pending.len(), 4);
1366 {
1367 assert_eq!(pending[2].transaction_id, *txn2);
1368
1369 let deserialized = pending[2].as_event().unwrap().deserialize().unwrap();
1370 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1371 assert_eq!(content.body(), "wow that's a cool test");
1372
1373 assert!(!pending[2].is_wedged());
1374
1375 for i in 0..4 {
1376 if i != 2 {
1377 let deserialized = pending[i].as_event().unwrap().deserialize().unwrap();
1378 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1379 assert_eq!(content.body(), format!("msg{i}"));
1380
1381 assert!(!pending[i].is_wedged());
1382 }
1383 }
1384 }
1385
1386 self.remove_send_queue_request(room_id, &txn0).await.unwrap();
1388
1389 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1391
1392 assert_eq!(pending.len(), 3);
1393 assert_eq!(pending[1].transaction_id, *txn2);
1394 for i in 0..3 {
1395 assert_ne!(pending[i].transaction_id, txn0);
1396 }
1397
1398 let room_id2 = room_id!("!test_send_queue_two:localhost");
1403 {
1404 let txn = TransactionId::new();
1405 let event =
1406 SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into())
1407 .unwrap();
1408 self.save_send_queue_request(
1409 room_id2,
1410 txn.clone(),
1411 MilliSecondsSinceUnixEpoch::now(),
1412 event.into(),
1413 0,
1414 )
1415 .await
1416 .unwrap();
1417 }
1418
1419 {
1421 let room_id3 = room_id!("!test_send_queue_three:localhost");
1422 let txn = TransactionId::new();
1423 let event =
1424 SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into())
1425 .unwrap();
1426 self.save_send_queue_request(
1427 room_id3,
1428 txn.clone(),
1429 MilliSecondsSinceUnixEpoch::now(),
1430 event.into(),
1431 0,
1432 )
1433 .await
1434 .unwrap();
1435
1436 self.remove_send_queue_request(room_id3, &txn).await.unwrap();
1437 }
1438
1439 let outstanding_rooms = self.load_rooms_with_unsent_requests().await.unwrap();
1442 assert_eq!(outstanding_rooms.len(), 2);
1443 assert!(outstanding_rooms.iter().any(|room| room == room_id));
1444 assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1445 }
1446
1447 async fn test_send_queue_priority(&self) {
1448 let room_id = room_id!("!test_send_queue:localhost");
1449
1450 let events = self.load_send_queue_requests(room_id).await.unwrap();
1452 assert!(events.is_empty());
1453
1454 let low0_txn = TransactionId::new();
1456 let ev0 =
1457 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())
1458 .unwrap();
1459 self.save_send_queue_request(
1460 room_id,
1461 low0_txn.clone(),
1462 MilliSecondsSinceUnixEpoch::now(),
1463 ev0.into(),
1464 2,
1465 )
1466 .await
1467 .unwrap();
1468
1469 let high_txn = TransactionId::new();
1471 let ev1 =
1472 SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())
1473 .unwrap();
1474 self.save_send_queue_request(
1475 room_id,
1476 high_txn.clone(),
1477 MilliSecondsSinceUnixEpoch::now(),
1478 ev1.into(),
1479 10,
1480 )
1481 .await
1482 .unwrap();
1483
1484 let low1_txn = TransactionId::new();
1486 let ev2 =
1487 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())
1488 .unwrap();
1489 self.save_send_queue_request(
1490 room_id,
1491 low1_txn.clone(),
1492 MilliSecondsSinceUnixEpoch::now(),
1493 ev2.into(),
1494 2,
1495 )
1496 .await
1497 .unwrap();
1498
1499 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1502
1503 assert_eq!(pending.len(), 3);
1504 {
1505 assert_eq!(pending[0].transaction_id, high_txn);
1506
1507 let deserialized = pending[0].as_event().unwrap().deserialize().unwrap();
1508 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1509 assert_eq!(content.body(), "high");
1510 }
1511
1512 {
1513 assert_eq!(pending[1].transaction_id, low0_txn);
1514
1515 let deserialized = pending[1].as_event().unwrap().deserialize().unwrap();
1516 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1517 assert_eq!(content.body(), "low0");
1518 }
1519
1520 {
1521 assert_eq!(pending[2].transaction_id, low1_txn);
1522
1523 let deserialized = pending[2].as_event().unwrap().deserialize().unwrap();
1524 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1525 assert_eq!(content.body(), "low1");
1526 }
1527 }
1528
1529 async fn test_send_queue_dependents(&self) {
1530 let room_id = room_id!("!test_send_queue_dependents:localhost");
1531
1532 let txn0 = TransactionId::new();
1534 let event0 =
1535 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())
1536 .unwrap();
1537 self.save_send_queue_request(
1538 room_id,
1539 txn0.clone(),
1540 MilliSecondsSinceUnixEpoch::now(),
1541 event0.into(),
1542 0,
1543 )
1544 .await
1545 .unwrap();
1546
1547 assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());
1549
1550 let child_txn = ChildTransactionId::new();
1552 self.save_dependent_queued_request(
1553 room_id,
1554 &txn0,
1555 child_txn.clone(),
1556 MilliSecondsSinceUnixEpoch::now(),
1557 DependentQueuedRequestKind::RedactEvent,
1558 )
1559 .await
1560 .unwrap();
1561
1562 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1564 assert_eq!(dependents.len(), 1);
1565 assert_eq!(dependents[0].parent_transaction_id, txn0);
1566 assert_eq!(dependents[0].own_transaction_id, child_txn);
1567 assert!(dependents[0].parent_key.is_none());
1568 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1569
1570 let event_id = owned_event_id!("$1");
1572 let num_updated = self
1573 .mark_dependent_queued_requests_as_ready(
1574 room_id,
1575 &txn0,
1576 SentRequestKey::Event(event_id.clone()),
1577 )
1578 .await
1579 .unwrap();
1580 assert_eq!(num_updated, 1);
1581
1582 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1584 assert_eq!(dependents.len(), 1);
1585 assert_eq!(dependents[0].parent_transaction_id, txn0);
1586 assert_eq!(dependents[0].own_transaction_id, child_txn);
1587 assert_matches!(dependents[0].parent_key.as_ref(), Some(SentRequestKey::Event(eid)) => {
1588 assert_eq!(*eid, event_id);
1589 });
1590 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1591
1592 let removed = self
1594 .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1595 .await
1596 .unwrap();
1597 assert!(removed);
1598
1599 assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());
1601
1602 let txn1 = TransactionId::new();
1605 let event1 =
1606 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())
1607 .unwrap();
1608 self.save_send_queue_request(
1609 room_id,
1610 txn1.clone(),
1611 MilliSecondsSinceUnixEpoch::now(),
1612 event1.into(),
1613 0,
1614 )
1615 .await
1616 .unwrap();
1617
1618 self.save_dependent_queued_request(
1619 room_id,
1620 &txn0,
1621 ChildTransactionId::new(),
1622 MilliSecondsSinceUnixEpoch::now(),
1623 DependentQueuedRequestKind::RedactEvent,
1624 )
1625 .await
1626 .unwrap();
1627 assert_eq!(self.load_dependent_queued_requests(room_id).await.unwrap().len(), 1);
1628
1629 self.save_dependent_queued_request(
1630 room_id,
1631 &txn1,
1632 ChildTransactionId::new(),
1633 MilliSecondsSinceUnixEpoch::now(),
1634 DependentQueuedRequestKind::EditEvent {
1635 new_content: SerializableEventContent::new(
1636 &RoomMessageEventContent::text_plain("edit").into(),
1637 )
1638 .unwrap(),
1639 },
1640 )
1641 .await
1642 .unwrap();
1643 assert_eq!(self.load_dependent_queued_requests(room_id).await.unwrap().len(), 2);
1644
1645 let removed = self.remove_send_queue_request(room_id, &txn0).await.unwrap();
1647 assert!(removed);
1648
1649 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1651 assert_eq!(dependents.len(), 2);
1652 }
1653
1654 async fn test_update_send_queue_dependent(&self) {
1655 let room_id = room_id!("!test_send_queue_dependents:localhost");
1656
1657 let txn = TransactionId::new();
1658
1659 let child_txn = ChildTransactionId::new();
1661
1662 self.save_dependent_queued_request(
1663 room_id,
1664 &txn,
1665 child_txn.clone(),
1666 MilliSecondsSinceUnixEpoch::now(),
1667 DependentQueuedRequestKind::RedactEvent,
1668 )
1669 .await
1670 .unwrap();
1671
1672 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1674 assert_eq!(dependents.len(), 1);
1675 assert_eq!(dependents[0].parent_transaction_id, txn);
1676 assert_eq!(dependents[0].own_transaction_id, child_txn);
1677 assert!(dependents[0].parent_key.is_none());
1678 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1679
1680 self.update_dependent_queued_request(
1682 room_id,
1683 &child_txn,
1684 DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1685 )
1686 .await
1687 .unwrap();
1688
1689 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1691 assert_eq!(dependents.len(), 1);
1692 assert_eq!(dependents[0].parent_transaction_id, txn);
1693 assert_eq!(dependents[0].own_transaction_id, child_txn);
1694 assert!(dependents[0].parent_key.is_none());
1695 assert_matches!(
1696 &dependents[0].kind,
1697 DependentQueuedRequestKind::ReactEvent { key } => {
1698 assert_eq!(key, "👍");
1699 }
1700 );
1701 }
1702
1703 async fn test_get_room_infos(&self) {
1704 let room_id_0 = room_id!("!r0");
1705 let room_id_1 = room_id!("!r1");
1706 let room_id_2 = room_id!("!r2");
1707
1708 {
1710 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 0);
1711 }
1712
1713 let mut changes = StateChanges::default();
1715 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1716 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1717 self.save_changes(&changes).await.unwrap();
1718
1719 {
1721 let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await.unwrap();
1722
1723 all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1726
1727 assert_eq!(all_rooms.len(), 2);
1728 assert_eq!(all_rooms[0].room_id, room_id_0);
1729 assert_eq!(all_rooms[1].room_id, room_id_1);
1730 }
1731
1732 {
1734 let all_rooms =
1735 self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await.unwrap();
1736
1737 assert_eq!(all_rooms.len(), 1);
1738 assert_eq!(all_rooms[0].room_id, room_id_1);
1739 }
1740
1741 {
1744 let all_rooms =
1745 self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await.unwrap();
1746
1747 assert_eq!(all_rooms.len(), 0);
1748 }
1749 }
1750}
1751
1752#[allow(unused_macros, unused_extern_crates)]
1778#[macro_export]
1779macro_rules! statestore_integration_tests {
1780 () => {
1781 mod statestore_integration_tests {
1782 use matrix_sdk_test::async_test;
1783 use $crate::store::{
1784 IntoStateStore, Result as StoreResult, StateStoreIntegrationTests,
1785 };
1786
1787 use super::get_store;
1788
1789 #[async_test]
1790 async fn test_topic_redaction() -> StoreResult<()> {
1791 let store = get_store().await?.into_state_store();
1792 store.test_topic_redaction().await
1793 }
1794
1795 #[async_test]
1796 async fn test_populate_store() -> StoreResult<()> {
1797 let store = get_store().await?.into_state_store();
1798 store.test_populate_store().await
1799 }
1800
1801 #[async_test]
1802 async fn test_member_saving() {
1803 let store = get_store().await.unwrap().into_state_store();
1804 store.test_member_saving().await
1805 }
1806
1807 #[async_test]
1808 async fn test_filter_saving() {
1809 let store = get_store().await.unwrap().into_state_store();
1810 store.test_filter_saving().await
1811 }
1812
1813 #[async_test]
1814 async fn test_user_avatar_url_saving() {
1815 let store = get_store().await.unwrap().into_state_store();
1816 store.test_user_avatar_url_saving().await
1817 }
1818
1819 #[async_test]
1820 async fn test_server_info_saving() {
1821 let store = get_store().await.unwrap().into_state_store();
1822 store.test_server_info_saving().await
1823 }
1824
1825 #[async_test]
1826 async fn test_sync_token_saving() {
1827 let store = get_store().await.unwrap().into_state_store();
1828 store.test_sync_token_saving().await
1829 }
1830
1831 #[async_test]
1832 async fn test_utd_hook_manager_data_saving() {
1833 let store = get_store().await.expect("creating store failed").into_state_store();
1834 store.test_utd_hook_manager_data_saving().await;
1835 }
1836
1837 #[async_test]
1838 async fn test_stripped_member_saving() {
1839 let store = get_store().await.unwrap().into_state_store();
1840 store.test_stripped_member_saving().await
1841 }
1842
1843 #[async_test]
1844 async fn test_power_level_saving() {
1845 let store = get_store().await.unwrap().into_state_store();
1846 store.test_power_level_saving().await
1847 }
1848
1849 #[async_test]
1850 async fn test_receipts_saving() {
1851 let store = get_store().await.expect("creating store failed").into_state_store();
1852 store.test_receipts_saving().await;
1853 }
1854
1855 #[async_test]
1856 async fn test_custom_storage() -> StoreResult<()> {
1857 let store = get_store().await?.into_state_store();
1858 store.test_custom_storage().await
1859 }
1860
1861 #[async_test]
1862 async fn test_stripped_non_stripped() -> StoreResult<()> {
1863 let store = get_store().await.unwrap().into_state_store();
1864 store.test_stripped_non_stripped().await
1865 }
1866
1867 #[async_test]
1868 async fn test_room_removal() -> StoreResult<()> {
1869 let store = get_store().await?.into_state_store();
1870 store.test_room_removal().await
1871 }
1872
1873 #[async_test]
1874 async fn test_profile_removal() -> StoreResult<()> {
1875 let store = get_store().await?.into_state_store();
1876 store.test_profile_removal().await
1877 }
1878
1879 #[async_test]
1880 async fn test_presence_saving() {
1881 let store = get_store().await.expect("creating store failed").into_state_store();
1882 store.test_presence_saving().await;
1883 }
1884
1885 #[async_test]
1886 async fn test_display_names_saving() {
1887 let store = get_store().await.expect("creating store failed").into_state_store();
1888 store.test_display_names_saving().await;
1889 }
1890
1891 #[async_test]
1892 async fn test_send_queue() {
1893 let store = get_store().await.expect("creating store failed").into_state_store();
1894 store.test_send_queue().await;
1895 }
1896
1897 #[async_test]
1898 async fn test_send_queue_priority() {
1899 let store = get_store().await.expect("creating store failed").into_state_store();
1900 store.test_send_queue_priority().await;
1901 }
1902
1903 #[async_test]
1904 async fn test_send_queue_dependents() {
1905 let store = get_store().await.expect("creating store failed").into_state_store();
1906 store.test_send_queue_dependents().await;
1907 }
1908
1909 #[async_test]
1910 async fn test_update_send_queue_dependent() {
1911 let store = get_store().await.expect("creating store failed").into_state_store();
1912 store.test_update_send_queue_dependent().await;
1913 }
1914
1915 #[async_test]
1916 async fn test_get_room_infos() {
1917 let store = get_store().await.expect("creating store failed").into_state_store();
1918 store.test_get_room_infos().await;
1919 }
1920 }
1921 };
1922}
1923
1924fn user_id() -> &'static UserId {
1925 user_id!("@example:localhost")
1926}
1927
1928fn invited_user_id() -> &'static UserId {
1929 user_id!("@invited:localhost")
1930}
1931
1932fn room_id() -> &'static RoomId {
1933 room_id!("!test:localhost")
1934}
1935
1936fn stripped_room_id() -> &'static RoomId {
1937 room_id!("!stripped:localhost")
1938}
1939
1940fn first_receipt_event_id() -> &'static EventId {
1941 event_id!("$example")
1942}
1943
1944fn power_level_event() -> Raw<AnySyncStateEvent> {
1945 let content = RoomPowerLevelsEventContent::default();
1946
1947 let event = json!({
1948 "event_id": "$h29iv0s8:example.com",
1949 "content": content,
1950 "sender": user_id(),
1951 "type": "m.room.power_levels",
1952 "origin_server_ts": 0u64,
1953 "state_key": "",
1954 });
1955
1956 serde_json::from_value(event).unwrap()
1957}
1958
1959fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
1960 custom_stripped_membership_event(user_id())
1961}
1962
1963fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
1964 let ev_json = json!({
1965 "type": "m.room.member",
1966 "content": RoomMemberEventContent::new(MembershipState::Join),
1967 "sender": user_id,
1968 "state_key": user_id,
1969 });
1970
1971 Raw::new(&ev_json).unwrap().cast()
1972}
1973
1974fn membership_event() -> Raw<SyncRoomMemberEvent> {
1975 custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
1976}
1977
1978fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
1979 let ev_json = json!({
1980 "type": "m.room.member",
1981 "content": RoomMemberEventContent::new(MembershipState::Join),
1982 "event_id": event_id,
1983 "origin_server_ts": 198,
1984 "sender": user_id,
1985 "state_key": user_id,
1986 });
1987
1988 Raw::new(&ev_json).unwrap().cast()
1989}
1990
1991fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
1992 let ev_json = json!({
1993 "content": {
1994 "presence": "online"
1995 },
1996 "sender": user_id,
1997 });
1998
1999 Raw::new(&ev_json).unwrap().cast()
2000}