1use std::collections::{BTreeMap, BTreeSet, HashMap};
4
5use assert_matches::assert_matches;
6use assert_matches2::assert_let;
7use growable_bloom_filter::GrowableBloomBuilder;
8use matrix_sdk_test::{event_factory::EventFactory, test_json};
9use ruma::{
10 api::MatrixVersion,
11 event_id,
12 events::{
13 presence::PresenceEvent,
14 receipt::{ReceiptThread, ReceiptType},
15 room::{
16 member::{
17 MembershipState, RoomMemberEventContent, StrippedRoomMemberEvent,
18 SyncRoomMemberEvent,
19 },
20 message::RoomMessageEventContent,
21 power_levels::RoomPowerLevelsEventContent,
22 topic::RoomTopicEventContent,
23 },
24 AnyGlobalAccountDataEvent, AnyMessageLikeEventContent, AnyRoomAccountDataEvent,
25 AnyStrippedStateEvent, AnySyncStateEvent, GlobalAccountDataEventType,
26 RoomAccountDataEventType, StateEventType, SyncStateEvent,
27 },
28 owned_event_id, owned_mxc_uri, room_id,
29 serde::Raw,
30 uint, user_id, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomId,
31 TransactionId, UserId,
32};
33use serde_json::{json, value::Value as JsonValue};
34
35use super::{
36 send_queue::SentRequestKey, DependentQueuedRequestKind, DisplayName, DynStateStore,
37 RoomLoadSettings, ServerCapabilities,
38};
39use crate::{
40 deserialized_responses::MemberEvent,
41 store::{ChildTransactionId, QueueWedgeError, Result, SerializableEventContent, StateStoreExt},
42 RoomInfo, RoomMemberships, RoomState, StateChanges, StateStoreDataKey, StateStoreDataValue,
43};
44
45#[allow(async_fn_in_trait)]
50pub trait StateStoreIntegrationTests {
51 async fn populate(&self) -> Result<()>;
53 async fn test_topic_redaction(&self) -> Result<()>;
55 async fn test_populate_store(&self) -> Result<()>;
57 async fn test_member_saving(&self);
59 async fn test_filter_saving(&self);
61 async fn test_user_avatar_url_saving(&self);
63 async fn test_sync_token_saving(&self);
65 async fn test_utd_hook_manager_data_saving(&self);
67 async fn test_stripped_member_saving(&self);
69 async fn test_power_level_saving(&self);
71 async fn test_receipts_saving(&self);
73 async fn test_custom_storage(&self) -> Result<()>;
75 async fn test_stripped_non_stripped(&self) -> Result<()>;
77 async fn test_room_removal(&self) -> Result<()>;
79 async fn test_profile_removal(&self) -> Result<()>;
81 async fn test_presence_saving(&self);
83 async fn test_display_names_saving(&self);
85 async fn test_send_queue(&self);
87 async fn test_send_queue_priority(&self);
89 async fn test_send_queue_dependents(&self);
91 async fn test_update_send_queue_dependent(&self);
93 async fn test_server_capabilities_saving(&self);
95 async fn test_get_room_infos(&self);
97}
98
99impl StateStoreIntegrationTests for DynStateStore {
100 async fn populate(&self) -> Result<()> {
101 let mut changes = StateChanges::default();
102
103 let user_id = user_id();
104 let invited_user_id = invited_user_id();
105 let room_id = room_id();
106 let stripped_room_id = stripped_room_id();
107
108 changes.sync_token = Some("t392-516_47314_0_7_1_1_1_11444_1".to_owned());
109
110 let presence_json: &JsonValue = &test_json::PRESENCE;
111 let presence_raw =
112 serde_json::from_value::<Raw<PresenceEvent>>(presence_json.clone()).unwrap();
113 let presence_event = presence_raw.deserialize().unwrap();
114 changes.add_presence_event(presence_event, presence_raw);
115
116 let pushrules_json: &JsonValue = &test_json::PUSH_RULES;
117 let pushrules_raw =
118 serde_json::from_value::<Raw<AnyGlobalAccountDataEvent>>(pushrules_json.clone())
119 .unwrap();
120 let pushrules_event = pushrules_raw.deserialize().unwrap();
121 changes.account_data.insert(pushrules_event.event_type(), pushrules_raw);
122
123 let mut room = RoomInfo::new(room_id, RoomState::Joined);
124 room.mark_as_left();
125
126 let tag_json: &JsonValue = &test_json::TAG;
127 let tag_raw =
128 serde_json::from_value::<Raw<AnyRoomAccountDataEvent>>(tag_json.clone()).unwrap();
129 let tag_event = tag_raw.deserialize().unwrap();
130 changes.add_room_account_data(room_id, tag_event, tag_raw);
131
132 let name_json: &JsonValue = &test_json::NAME;
133 let name_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(name_json.clone()).unwrap();
134 let name_event = name_raw.deserialize().unwrap();
135 room.handle_state_event(&name_event);
136 changes.add_state_event(room_id, name_event, name_raw);
137
138 let topic_json: &JsonValue = &test_json::TOPIC;
139 let topic_raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(topic_json.clone())
140 .expect("can create sync-state-event for topic");
141 let topic_event = topic_raw.deserialize().expect("can deserialize raw topic");
142 room.handle_state_event(&topic_event);
143 changes.add_state_event(room_id, topic_event, topic_raw);
144
145 let mut room_ambiguity_map = HashMap::new();
146 let mut room_profiles = BTreeMap::new();
147
148 let member_json: &JsonValue = &test_json::MEMBER;
149 let member_event: SyncRoomMemberEvent =
150 serde_json::from_value(member_json.clone()).unwrap();
151 let displayname = DisplayName::new(
152 member_event.as_original().unwrap().content.displayname.as_ref().unwrap(),
153 );
154 room_ambiguity_map.insert(displayname.clone(), BTreeSet::from([user_id.to_owned()]));
155 room_profiles.insert(user_id.to_owned(), (&member_event).into());
156
157 let member_state_raw =
158 serde_json::from_value::<Raw<AnySyncStateEvent>>(member_json.clone()).unwrap();
159 let member_state_event = member_state_raw.deserialize().unwrap();
160 changes.add_state_event(room_id, member_state_event, member_state_raw);
161
162 let invited_member_json: &JsonValue = &test_json::MEMBER_INVITE;
163 let invited_member_event: SyncRoomMemberEvent =
165 serde_json::from_value(invited_member_json.clone()).unwrap();
166 room_ambiguity_map.entry(displayname).or_default().insert(invited_user_id.to_owned());
167 room_profiles.insert(invited_user_id.to_owned(), (&invited_member_event).into());
168
169 let invited_member_state_raw =
170 serde_json::from_value::<Raw<AnySyncStateEvent>>(invited_member_json.clone()).unwrap();
171 let invited_member_state_event = invited_member_state_raw.deserialize().unwrap();
172 changes.add_state_event(room_id, invited_member_state_event, invited_member_state_raw);
173
174 let f = EventFactory::new().room(room_id);
175 let receipt_content = f
176 .read_receipts()
177 .add(event_id!("$example"), user_id, ReceiptType::Read, ReceiptThread::Unthreaded)
178 .into_content();
179 changes.add_receipts(room_id, receipt_content);
180
181 changes.ambiguity_maps.insert(room_id.to_owned(), room_ambiguity_map);
182 changes.profiles.insert(room_id.to_owned(), room_profiles);
183 changes.add_room(room);
184
185 let mut stripped_room = RoomInfo::new(stripped_room_id, RoomState::Invited);
186
187 let stripped_name_json: &JsonValue = &test_json::NAME_STRIPPED;
188 let stripped_name_raw =
189 serde_json::from_value::<Raw<AnyStrippedStateEvent>>(stripped_name_json.clone())
190 .unwrap();
191 let stripped_name_event = stripped_name_raw.deserialize().unwrap();
192 stripped_room.handle_stripped_state_event(&stripped_name_event);
193 changes.stripped_state.insert(
194 stripped_room_id.to_owned(),
195 BTreeMap::from([(
196 stripped_name_event.event_type(),
197 BTreeMap::from([(
198 stripped_name_event.state_key().to_owned(),
199 stripped_name_raw.clone(),
200 )]),
201 )]),
202 );
203
204 changes.add_room(stripped_room);
205
206 let stripped_member_json: &JsonValue = &test_json::MEMBER_STRIPPED;
207 let stripped_member_event = Raw::new(&stripped_member_json.clone()).unwrap().cast();
208 changes.add_stripped_member(stripped_room_id, user_id, stripped_member_event);
209
210 self.save_changes(&changes).await?;
211
212 Ok(())
213 }
214
215 async fn test_topic_redaction(&self) -> Result<()> {
216 let room_id = room_id();
217 self.populate().await?;
218
219 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
220 assert_eq!(
221 self.get_state_event_static::<RoomTopicEventContent>(room_id)
222 .await?
223 .expect("room topic found before redaction")
224 .deserialize()
225 .expect("can deserialize room topic before redaction")
226 .as_sync()
227 .expect("room topic is a sync state event")
228 .as_original()
229 .expect("room topic is not redacted yet")
230 .content
231 .topic,
232 "😀"
233 );
234
235 let mut changes = StateChanges::default();
236
237 let redaction_json: &JsonValue = &test_json::TOPIC_REDACTION;
238 let redaction_evt: Raw<_> = serde_json::from_value(redaction_json.clone())
239 .expect("topic redaction event making works");
240 let redacted_event_id: OwnedEventId = redaction_evt.get_field("redacts").unwrap().unwrap();
241
242 changes.add_redaction(room_id, &redacted_event_id, redaction_evt);
243 self.save_changes(&changes).await?;
244
245 let redacted_event = self
246 .get_state_event_static::<RoomTopicEventContent>(room_id)
247 .await?
248 .expect("room topic found after redaction")
249 .deserialize()
250 .expect("can deserialize room topic after redaction");
251
252 assert_matches!(redacted_event.as_sync(), Some(SyncStateEvent::Redacted(_)));
253
254 Ok(())
255 }
256
257 async fn test_populate_store(&self) -> Result<()> {
258 let room_id = room_id();
259 let user_id = user_id();
260 let display_name = DisplayName::new("example");
261
262 self.populate().await?;
263
264 assert!(self.get_kv_data(StateStoreDataKey::SyncToken).await?.is_some());
265 assert!(self.get_presence_event(user_id).await?.is_some());
266 assert_eq!(
267 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
268 2,
269 "Expected to find 2 room infos"
270 );
271 assert!(self
272 .get_account_data_event(GlobalAccountDataEventType::PushRules)
273 .await?
274 .is_some());
275
276 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_some());
277 assert_eq!(
278 self.get_state_events(room_id, StateEventType::RoomTopic).await?.len(),
279 1,
280 "Expected to find 1 room topic"
281 );
282 assert!(self.get_profile(room_id, user_id).await?.is_some());
283 assert!(self.get_member_event(room_id, user_id).await?.is_some());
284 assert_eq!(
285 self.get_user_ids(room_id, RoomMemberships::empty()).await?.len(),
286 2,
287 "Expected to find 2 members for room"
288 );
289 assert_eq!(
290 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.len(),
291 1,
292 "Expected to find 1 invited user ids"
293 );
294 assert_eq!(
295 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.len(),
296 1,
297 "Expected to find 1 joined user ids"
298 );
299 assert_eq!(
300 self.get_users_with_display_name(room_id, &display_name).await?.len(),
301 2,
302 "Expected to find 2 display names for room"
303 );
304 assert!(self
305 .get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
306 .await?
307 .is_some());
308 assert!(self
309 .get_user_room_receipt_event(
310 room_id,
311 ReceiptType::Read,
312 ReceiptThread::Unthreaded,
313 user_id
314 )
315 .await?
316 .is_some());
317 assert_eq!(
318 self.get_event_room_receipt_events(
319 room_id,
320 ReceiptType::Read,
321 ReceiptThread::Unthreaded,
322 first_receipt_event_id()
323 )
324 .await?
325 .len(),
326 1,
327 "Expected to find 1 read receipt"
328 );
329 Ok(())
330 }
331
332 async fn test_member_saving(&self) {
333 let room_id = room_id!("!test_member_saving:localhost");
334 let user_id = user_id();
335 let second_user_id = user_id!("@second:localhost");
336 let third_user_id = user_id!("@third:localhost");
337 let unknown_user_id = user_id!("@unknown:localhost");
338
339 let mut user_ids = vec![user_id.to_owned()];
341 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_none());
342 let member_events = self
343 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
344 .await;
345 assert!(member_events.unwrap().is_empty());
346 assert!(self.get_profile(room_id, user_id).await.unwrap().is_none());
347 let profiles = self.get_profiles(room_id, &user_ids).await;
348 assert!(profiles.unwrap().is_empty());
349
350 let mut changes = StateChanges::default();
352 let raw_member_event = membership_event();
353 let profile = raw_member_event.deserialize().unwrap().into();
354 changes
355 .state
356 .entry(room_id.to_owned())
357 .or_default()
358 .entry(StateEventType::RoomMember)
359 .or_default()
360 .insert(user_id.into(), raw_member_event.cast());
361 changes.profiles.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), profile);
362 self.save_changes(&changes).await.unwrap();
363
364 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_some());
365 let member_events = self
366 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
367 .await;
368 assert_eq!(member_events.unwrap().len(), 1);
369 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
370 assert_eq!(members.len(), 1, "We expected to find members for the room");
371 assert!(self.get_profile(room_id, user_id).await.unwrap().is_some());
372 let profiles = self.get_profiles(room_id, &user_ids).await;
373 assert_eq!(profiles.unwrap().len(), 1);
374
375 let mut changes = StateChanges::default();
377 let changes_members = changes
378 .state
379 .entry(room_id.to_owned())
380 .or_default()
381 .entry(StateEventType::RoomMember)
382 .or_default();
383 let changes_profiles = changes.profiles.entry(room_id.to_owned()).or_default();
384 let raw_second_member_event =
385 custom_membership_event(second_user_id, event_id!("$second_member_event"));
386 let second_profile = raw_second_member_event.deserialize().unwrap().into();
387 changes_members.insert(second_user_id.into(), raw_second_member_event.cast());
388 changes_profiles.insert(second_user_id.to_owned(), second_profile);
389 let raw_third_member_event =
390 custom_membership_event(third_user_id, event_id!("$third_member_event"));
391 let third_profile = raw_third_member_event.deserialize().unwrap().into();
392 changes_members.insert(third_user_id.into(), raw_third_member_event.cast());
393 changes_profiles.insert(third_user_id.to_owned(), third_profile);
394 self.save_changes(&changes).await.unwrap();
395
396 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
397 assert!(self.get_member_event(room_id, second_user_id).await.unwrap().is_some());
398 assert!(self.get_member_event(room_id, third_user_id).await.unwrap().is_some());
399 let member_events = self
400 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
401 .await;
402 assert_eq!(member_events.unwrap().len(), 3);
403 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
404 assert_eq!(members.len(), 3, "We expected to find members for the room");
405 assert!(self.get_profile(room_id, second_user_id).await.unwrap().is_some());
406 assert!(self.get_profile(room_id, third_user_id).await.unwrap().is_some());
407 let profiles = self.get_profiles(room_id, &user_ids).await;
408 assert_eq!(profiles.unwrap().len(), 3);
409
410 user_ids.push(unknown_user_id.to_owned());
412 let member_events = self
413 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(room_id, &user_ids)
414 .await;
415 assert_eq!(member_events.unwrap().len(), 3);
416 let profiles = self.get_profiles(room_id, &user_ids).await;
417 assert_eq!(profiles.unwrap().len(), 3);
418
419 let member_events = self
421 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
422 room_id,
423 &[],
424 )
425 .await;
426 assert!(member_events.unwrap().is_empty());
427 let profiles = self.get_profiles(room_id, &[]).await;
428 assert!(profiles.unwrap().is_empty());
429 }
430
431 async fn test_filter_saving(&self) {
432 let filter_name = "filter_name";
433 let filter_id = "filter_id_1234";
434
435 self.set_kv_data(
436 StateStoreDataKey::Filter(filter_name),
437 StateStoreDataValue::Filter(filter_id.to_owned()),
438 )
439 .await
440 .unwrap();
441 assert_let!(
442 Ok(Some(StateStoreDataValue::Filter(stored_filter_id))) =
443 self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await
444 );
445 assert_eq!(stored_filter_id, filter_id);
446
447 self.remove_kv_data(StateStoreDataKey::Filter(filter_name)).await.unwrap();
448 assert_matches!(self.get_kv_data(StateStoreDataKey::Filter(filter_name)).await, Ok(None));
449 }
450
451 async fn test_user_avatar_url_saving(&self) {
452 let user_id = user_id!("@alice:example.org");
453 let url = owned_mxc_uri!("mxc://example.org/poiuyt098");
454
455 self.set_kv_data(
456 StateStoreDataKey::UserAvatarUrl(user_id),
457 StateStoreDataValue::UserAvatarUrl(url.clone()),
458 )
459 .await
460 .unwrap();
461
462 assert_let!(
463 Ok(Some(StateStoreDataValue::UserAvatarUrl(stored_url))) =
464 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await
465 );
466 assert_eq!(stored_url, url);
467
468 self.remove_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await.unwrap();
469 assert_matches!(
470 self.get_kv_data(StateStoreDataKey::UserAvatarUrl(user_id)).await,
471 Ok(None)
472 );
473 }
474
475 async fn test_server_capabilities_saving(&self) {
476 let versions = &[MatrixVersion::V1_1, MatrixVersion::V1_2, MatrixVersion::V1_11];
477 let server_caps = ServerCapabilities::new(
478 versions,
479 [("org.matrix.experimental".to_owned(), true)].into(),
480 );
481
482 self.set_kv_data(
483 StateStoreDataKey::ServerCapabilities,
484 StateStoreDataValue::ServerCapabilities(server_caps.clone()),
485 )
486 .await
487 .unwrap();
488
489 assert_let!(
490 Ok(Some(StateStoreDataValue::ServerCapabilities(stored_caps))) =
491 self.get_kv_data(StateStoreDataKey::ServerCapabilities).await
492 );
493 assert_eq!(stored_caps, server_caps);
494
495 let (stored_versions, stored_features) = stored_caps.maybe_decode().unwrap();
496
497 assert_eq!(stored_versions, versions);
498 assert_eq!(stored_features.len(), 1);
499 assert_eq!(stored_features.get("org.matrix.experimental"), Some(&true));
500
501 self.remove_kv_data(StateStoreDataKey::ServerCapabilities).await.unwrap();
502 assert_matches!(self.get_kv_data(StateStoreDataKey::ServerCapabilities).await, Ok(None));
503 }
504
505 async fn test_sync_token_saving(&self) {
506 let sync_token_1 = "t392-516_47314_0_7_1";
507 let sync_token_2 = "t392-516_47314_0_7_2";
508
509 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
510
511 let changes =
512 StateChanges { sync_token: Some(sync_token_1.to_owned()), ..Default::default() };
513 self.save_changes(&changes).await.unwrap();
514 assert_let!(
515 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
516 self.get_kv_data(StateStoreDataKey::SyncToken).await
517 );
518 assert_eq!(stored_sync_token, sync_token_1);
519
520 self.set_kv_data(
521 StateStoreDataKey::SyncToken,
522 StateStoreDataValue::SyncToken(sync_token_2.to_owned()),
523 )
524 .await
525 .unwrap();
526 assert_let!(
527 Ok(Some(StateStoreDataValue::SyncToken(stored_sync_token))) =
528 self.get_kv_data(StateStoreDataKey::SyncToken).await
529 );
530 assert_eq!(stored_sync_token, sync_token_2);
531
532 self.remove_kv_data(StateStoreDataKey::SyncToken).await.unwrap();
533 assert_matches!(self.get_kv_data(StateStoreDataKey::SyncToken).await, Ok(None));
534 }
535
536 async fn test_utd_hook_manager_data_saving(&self) {
537 assert!(
539 self.get_kv_data(StateStoreDataKey::UtdHookManagerData)
540 .await
541 .expect("Could not read data")
542 .is_none(),
543 "Store was not empty at start"
544 );
545
546 let data = GrowableBloomBuilder::new().build();
548 self.set_kv_data(
549 StateStoreDataKey::UtdHookManagerData,
550 StateStoreDataValue::UtdHookManagerData(data.clone()),
551 )
552 .await
553 .expect("Could not save data");
554
555 let read_data = self
557 .get_kv_data(StateStoreDataKey::UtdHookManagerData)
558 .await
559 .expect("Could not read data")
560 .expect("no data found")
561 .into_utd_hook_manager_data()
562 .expect("not UtdHookManagerData");
563
564 assert_eq!(read_data, data);
565 }
566
567 async fn test_stripped_member_saving(&self) {
568 let room_id = room_id!("!test_stripped_member_saving:localhost");
569 let user_id = user_id();
570 let second_user_id = user_id!("@second:localhost");
571 let third_user_id = user_id!("@third:localhost");
572 let unknown_user_id = user_id!("@unknown:localhost");
573
574 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_none());
576 let member_events = self
577 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
578 room_id,
579 &[user_id.to_owned()],
580 )
581 .await;
582 assert!(member_events.unwrap().is_empty());
583
584 let mut changes = StateChanges::default();
586 changes
587 .stripped_state
588 .entry(room_id.to_owned())
589 .or_default()
590 .entry(StateEventType::RoomMember)
591 .or_default()
592 .insert(user_id.into(), stripped_membership_event().cast());
593 self.save_changes(&changes).await.unwrap();
594
595 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_some());
596 let member_events = self
597 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
598 room_id,
599 &[user_id.to_owned()],
600 )
601 .await;
602 assert_eq!(member_events.unwrap().len(), 1);
603 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
604 assert_eq!(members.len(), 1, "We expected to find members for the room");
605
606 let mut changes = StateChanges::default();
608 let changes_members = changes
609 .stripped_state
610 .entry(room_id.to_owned())
611 .or_default()
612 .entry(StateEventType::RoomMember)
613 .or_default();
614 changes_members
615 .insert(second_user_id.into(), custom_stripped_membership_event(second_user_id).cast());
616 changes_members
617 .insert(third_user_id.into(), custom_stripped_membership_event(third_user_id).cast());
618 self.save_changes(&changes).await.unwrap();
619
620 assert!(self.get_member_event(room_id, second_user_id).await.unwrap().is_some());
621 assert!(self.get_member_event(room_id, third_user_id).await.unwrap().is_some());
622 let member_events = self
623 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
624 room_id,
625 &[user_id.to_owned(), second_user_id.to_owned(), third_user_id.to_owned()],
626 )
627 .await;
628 assert_eq!(member_events.unwrap().len(), 3);
629 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
630 assert_eq!(members.len(), 3, "We expected to find members for the room");
631
632 let member_events = self
634 .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
635 room_id,
636 &[
637 user_id.to_owned(),
638 second_user_id.to_owned(),
639 third_user_id.to_owned(),
640 unknown_user_id.to_owned(),
641 ],
642 )
643 .await;
644 assert_eq!(member_events.unwrap().len(), 3);
645
646 let member_events = self
648 .get_state_events_for_keys_static::<RoomMemberEventContent, OwnedUserId, _>(
649 room_id,
650 &[],
651 )
652 .await;
653 assert!(member_events.unwrap().is_empty());
654 }
655
656 async fn test_power_level_saving(&self) {
657 let room_id = room_id!("!test_power_level_saving:localhost");
658
659 let raw_event = power_level_event();
660 let event = raw_event.deserialize().unwrap();
661
662 assert!(self
663 .get_state_event(room_id, StateEventType::RoomPowerLevels, "")
664 .await
665 .unwrap()
666 .is_none());
667 let mut changes = StateChanges::default();
668 changes.add_state_event(room_id, event, raw_event);
669
670 self.save_changes(&changes).await.unwrap();
671 assert!(self
672 .get_state_event(room_id, StateEventType::RoomPowerLevels, "")
673 .await
674 .unwrap()
675 .is_some());
676 }
677
678 async fn test_receipts_saving(&self) {
679 let room_id = room_id!("!test_receipts_saving:localhost");
680
681 let first_event_id = event_id!("$1435641916114394fHBLK:matrix.org");
682 let second_event_id = event_id!("$fHBLK1435641916114394:matrix.org");
683
684 let first_receipt_ts = uint!(1436451550);
685 let second_receipt_ts = uint!(1436451653);
686 let third_receipt_ts = uint!(1436474532);
687
688 let first_receipt_event = serde_json::from_value(json!({
689 first_event_id: {
690 "m.read": {
691 user_id(): {
692 "ts": first_receipt_ts,
693 }
694 }
695 }
696 }))
697 .expect("json creation failed");
698
699 let second_receipt_event = serde_json::from_value(json!({
700 second_event_id: {
701 "m.read": {
702 user_id(): {
703 "ts": second_receipt_ts,
704 }
705 }
706 }
707 }))
708 .expect("json creation failed");
709
710 let third_receipt_event = serde_json::from_value(json!({
711 second_event_id: {
712 "m.read": {
713 user_id(): {
714 "ts": third_receipt_ts,
715 "thread_id": "main",
716 }
717 }
718 }
719 }))
720 .expect("json creation failed");
721
722 assert!(self
723 .get_user_room_receipt_event(
724 room_id,
725 ReceiptType::Read,
726 ReceiptThread::Unthreaded,
727 user_id()
728 )
729 .await
730 .expect("failed to read unthreaded user room receipt")
731 .is_none());
732 assert!(self
733 .get_event_room_receipt_events(
734 room_id,
735 ReceiptType::Read,
736 ReceiptThread::Unthreaded,
737 first_event_id
738 )
739 .await
740 .expect("failed to read unthreaded event room receipt for 1")
741 .is_empty());
742 assert!(self
743 .get_event_room_receipt_events(
744 room_id,
745 ReceiptType::Read,
746 ReceiptThread::Unthreaded,
747 second_event_id
748 )
749 .await
750 .expect("failed to read unthreaded event room receipt for 2")
751 .is_empty());
752
753 let mut changes = StateChanges::default();
754 changes.add_receipts(room_id, first_receipt_event);
755
756 self.save_changes(&changes).await.expect("writing changes fauked");
757 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
758 .get_user_room_receipt_event(
759 room_id,
760 ReceiptType::Read,
761 ReceiptThread::Unthreaded,
762 user_id(),
763 )
764 .await
765 .expect("failed to read unthreaded user room receipt after save")
766 .unwrap();
767 assert_eq!(unthreaded_user_receipt_event_id, first_event_id);
768 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, first_receipt_ts);
769 let first_event_unthreaded_receipts = self
770 .get_event_room_receipt_events(
771 room_id,
772 ReceiptType::Read,
773 ReceiptThread::Unthreaded,
774 first_event_id,
775 )
776 .await
777 .expect("failed to read unthreaded event room receipt for 1 after save");
778 assert_eq!(
779 first_event_unthreaded_receipts.len(),
780 1,
781 "Found a wrong number of unthreaded receipts for 1 after save"
782 );
783 assert_eq!(first_event_unthreaded_receipts[0].0, user_id());
784 assert_eq!(first_event_unthreaded_receipts[0].1.ts.unwrap().0, first_receipt_ts);
785 assert!(self
786 .get_event_room_receipt_events(
787 room_id,
788 ReceiptType::Read,
789 ReceiptThread::Unthreaded,
790 second_event_id
791 )
792 .await
793 .expect("failed to read unthreaded event room receipt for 2 after save")
794 .is_empty());
795
796 let mut changes = StateChanges::default();
797 changes.add_receipts(room_id, second_receipt_event);
798
799 self.save_changes(&changes).await.expect("Saving works");
800 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
801 .get_user_room_receipt_event(
802 room_id,
803 ReceiptType::Read,
804 ReceiptThread::Unthreaded,
805 user_id(),
806 )
807 .await
808 .expect("Getting unthreaded user room receipt after save failed")
809 .unwrap();
810 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
811 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
812 assert!(self
813 .get_event_room_receipt_events(
814 room_id,
815 ReceiptType::Read,
816 ReceiptThread::Unthreaded,
817 first_event_id
818 )
819 .await
820 .expect("Getting unthreaded event room receipt events for first event failed")
821 .is_empty());
822 let second_event_unthreaded_receipts = self
823 .get_event_room_receipt_events(
824 room_id,
825 ReceiptType::Read,
826 ReceiptThread::Unthreaded,
827 second_event_id,
828 )
829 .await
830 .expect("Getting unthreaded event room receipt events for second event failed");
831 assert_eq!(
832 second_event_unthreaded_receipts.len(),
833 1,
834 "Found a wrong number of unthreaded receipts for second event after save"
835 );
836 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
837 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
838
839 assert!(self
840 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
841 .await
842 .expect("failed to read threaded user room receipt")
843 .is_none());
844 assert!(self
845 .get_event_room_receipt_events(
846 room_id,
847 ReceiptType::Read,
848 ReceiptThread::Main,
849 second_event_id
850 )
851 .await
852 .expect("Getting threaded event room receipts for 2 failed")
853 .is_empty());
854
855 let mut changes = StateChanges::default();
856 changes.add_receipts(room_id, third_receipt_event);
857
858 self.save_changes(&changes).await.expect("Saving works");
859 let (unthreaded_user_receipt_event_id, unthreaded_user_receipt) = self
861 .get_user_room_receipt_event(
862 room_id,
863 ReceiptType::Read,
864 ReceiptThread::Unthreaded,
865 user_id(),
866 )
867 .await
868 .expect("Getting unthreaded user room receipt after save failed")
869 .unwrap();
870 assert_eq!(unthreaded_user_receipt_event_id, second_event_id);
871 assert_eq!(unthreaded_user_receipt.ts.unwrap().0, second_receipt_ts);
872 let second_event_unthreaded_receipts = self
873 .get_event_room_receipt_events(
874 room_id,
875 ReceiptType::Read,
876 ReceiptThread::Unthreaded,
877 second_event_id,
878 )
879 .await
880 .expect("Getting unthreaded event room receipt events for second event failed");
881 assert_eq!(
882 second_event_unthreaded_receipts.len(),
883 1,
884 "Found a wrong number of unthreaded receipts for second event after save"
885 );
886 assert_eq!(second_event_unthreaded_receipts[0].0, user_id());
887 assert_eq!(second_event_unthreaded_receipts[0].1.ts.unwrap().0, second_receipt_ts);
888 let (threaded_user_receipt_event_id, threaded_user_receipt) = self
890 .get_user_room_receipt_event(room_id, ReceiptType::Read, ReceiptThread::Main, user_id())
891 .await
892 .expect("Getting threaded user room receipt after save failed")
893 .unwrap();
894 assert_eq!(threaded_user_receipt_event_id, second_event_id);
895 assert_eq!(threaded_user_receipt.ts.unwrap().0, third_receipt_ts);
896 let second_event_threaded_receipts = self
897 .get_event_room_receipt_events(
898 room_id,
899 ReceiptType::Read,
900 ReceiptThread::Main,
901 second_event_id,
902 )
903 .await
904 .expect("Getting threaded event room receipt events for second event failed");
905 assert_eq!(
906 second_event_threaded_receipts.len(),
907 1,
908 "Found a wrong number of threaded receipts for second event after save"
909 );
910 assert_eq!(second_event_threaded_receipts[0].0, user_id());
911 assert_eq!(second_event_threaded_receipts[0].1.ts.unwrap().0, third_receipt_ts);
912 }
913
914 async fn test_custom_storage(&self) -> Result<()> {
915 let key = "my_key";
916 let value = &[0, 1, 2, 3];
917
918 self.set_custom_value(key.as_bytes(), value.to_vec()).await?;
919
920 let read = self.get_custom_value(key.as_bytes()).await?;
921
922 assert_eq!(Some(value.as_ref()), read.as_deref());
923
924 Ok(())
925 }
926
927 async fn test_stripped_non_stripped(&self) -> Result<()> {
928 let room_id = room_id!("!test_stripped_non_stripped:localhost");
929 let user_id = user_id();
930
931 assert!(self.get_member_event(room_id, user_id).await.unwrap().is_none());
932 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 0);
933
934 let mut changes = StateChanges::default();
935 changes
936 .state
937 .entry(room_id.to_owned())
938 .or_default()
939 .entry(StateEventType::RoomMember)
940 .or_default()
941 .insert(user_id.into(), membership_event().cast());
942 changes.add_room(RoomInfo::new(room_id, RoomState::Left));
943 self.save_changes(&changes).await.unwrap();
944
945 let member_event =
946 self.get_member_event(room_id, user_id).await.unwrap().unwrap().deserialize().unwrap();
947 assert!(matches!(member_event, MemberEvent::Sync(_)));
948 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 1);
949
950 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
951 assert_eq!(members, vec![user_id.to_owned()]);
952
953 let mut changes = StateChanges::default();
954 changes.add_stripped_member(room_id, user_id, custom_stripped_membership_event(user_id));
955 changes.add_room(RoomInfo::new(room_id, RoomState::Invited));
956 self.save_changes(&changes).await.unwrap();
957
958 let member_event =
959 self.get_member_event(room_id, user_id).await.unwrap().unwrap().deserialize().unwrap();
960 assert!(matches!(member_event, MemberEvent::Stripped(_)));
961 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 1);
962
963 let members = self.get_user_ids(room_id, RoomMemberships::empty()).await.unwrap();
964 assert_eq!(members, vec![user_id.to_owned()]);
965
966 Ok(())
967 }
968
969 async fn test_room_removal(&self) -> Result<()> {
970 let room_id = room_id();
971 let user_id = user_id();
972 let display_name = DisplayName::new("example");
973 let stripped_room_id = stripped_room_id();
974
975 self.populate().await?;
976
977 {
978 let txn = TransactionId::new();
980 let ev =
981 SerializableEventContent::new(&RoomMessageEventContent::text_plain("sup").into())
982 .unwrap();
983 self.save_send_queue_request(
984 room_id,
985 txn.clone(),
986 MilliSecondsSinceUnixEpoch::now(),
987 ev.into(),
988 0,
989 )
990 .await?;
991
992 self.save_dependent_queued_request(
994 room_id,
995 &txn,
996 ChildTransactionId::new(),
997 MilliSecondsSinceUnixEpoch::now(),
998 DependentQueuedRequestKind::RedactEvent,
999 )
1000 .await?;
1001 }
1002
1003 self.remove_room(room_id).await?;
1004
1005 assert_eq!(
1006 self.get_room_infos(&RoomLoadSettings::default()).await?.len(),
1007 1,
1008 "room is still there"
1009 );
1010
1011 assert!(self.get_state_event(room_id, StateEventType::RoomName, "").await?.is_none());
1012 assert!(
1013 self.get_state_events(room_id, StateEventType::RoomTopic).await?.is_empty(),
1014 "still state events found"
1015 );
1016 assert!(self.get_profile(room_id, user_id).await?.is_none());
1017 assert!(self.get_member_event(room_id, user_id).await?.is_none());
1018 assert!(
1019 self.get_user_ids(room_id, RoomMemberships::empty()).await?.is_empty(),
1020 "still user ids found"
1021 );
1022 assert!(
1023 self.get_user_ids(room_id, RoomMemberships::INVITE).await?.is_empty(),
1024 "still invited user ids found"
1025 );
1026 assert!(
1027 self.get_user_ids(room_id, RoomMemberships::JOIN).await?.is_empty(),
1028 "still joined users found"
1029 );
1030 assert!(
1031 self.get_users_with_display_name(room_id, &display_name).await?.is_empty(),
1032 "still display names found"
1033 );
1034 assert!(self
1035 .get_room_account_data_event(room_id, RoomAccountDataEventType::Tag)
1036 .await?
1037 .is_none());
1038 assert!(self
1039 .get_user_room_receipt_event(
1040 room_id,
1041 ReceiptType::Read,
1042 ReceiptThread::Unthreaded,
1043 user_id
1044 )
1045 .await?
1046 .is_none());
1047 assert!(
1048 self.get_event_room_receipt_events(
1049 room_id,
1050 ReceiptType::Read,
1051 ReceiptThread::Unthreaded,
1052 first_receipt_event_id()
1053 )
1054 .await?
1055 .is_empty(),
1056 "still event recepts in the store"
1057 );
1058 assert!(self.load_send_queue_requests(room_id).await?.is_empty());
1059 assert!(self.load_dependent_queued_requests(room_id).await?.is_empty());
1060
1061 self.remove_room(stripped_room_id).await?;
1062
1063 assert!(
1064 self.get_room_infos(&RoomLoadSettings::default()).await?.is_empty(),
1065 "still room info found"
1066 );
1067 Ok(())
1068 }
1069
1070 async fn test_profile_removal(&self) -> Result<()> {
1071 let room_id = room_id();
1072
1073 let user_id = user_id();
1075 let invited_user_id = invited_user_id();
1076
1077 self.populate().await?;
1078
1079 let new_invite_member_json = json!({
1080 "content": {
1081 "avatar_url": "mxc://localhost/SEsfnsuifSDFSSEG",
1082 "displayname": "example after update",
1083 "membership": "invite",
1084 "reason": "Looking for support"
1085 },
1086 "event_id": "$143273582443PhrSm:localhost",
1087 "origin_server_ts": 1432735824,
1088 "room_id": room_id,
1089 "sender": user_id,
1090 "state_key": invited_user_id,
1091 "type": "m.room.member",
1092 });
1093 let new_invite_member_event: SyncRoomMemberEvent =
1094 serde_json::from_value(new_invite_member_json.clone()).unwrap();
1095
1096 let mut changes = StateChanges {
1097 profiles_to_delete: [(
1099 room_id.to_owned(),
1100 vec![user_id.to_owned(), invited_user_id.to_owned()],
1101 )]
1102 .into(),
1103
1104 profiles: {
1106 let mut map = BTreeMap::default();
1107 map.insert(
1108 room_id.to_owned(),
1109 [(invited_user_id.to_owned(), new_invite_member_event.into())]
1110 .into_iter()
1111 .collect(),
1112 );
1113 map
1114 },
1115
1116 ..StateChanges::default()
1117 };
1118
1119 let raw = serde_json::from_value::<Raw<AnySyncStateEvent>>(new_invite_member_json)
1120 .expect("can create sync-state-event for topic");
1121 let event = raw.deserialize().unwrap();
1122 changes.add_state_event(room_id, event, raw);
1123
1124 self.save_changes(&changes).await.unwrap();
1125
1126 assert!(self.get_profile(room_id, user_id).await?.is_none());
1128 assert!(self.get_member_event(room_id, user_id).await?.is_some());
1129
1130 let invited_member_event = self.get_profile(room_id, invited_user_id).await?.unwrap();
1132 assert_eq!(
1133 invited_member_event.as_original().unwrap().content.displayname.as_deref(),
1134 Some("example after update")
1135 );
1136 assert!(self.get_member_event(room_id, invited_user_id).await?.is_some());
1137
1138 Ok(())
1139 }
1140
1141 async fn test_presence_saving(&self) {
1142 let user_id = user_id();
1143 let second_user_id = user_id!("@second:localhost");
1144 let third_user_id = user_id!("@third:localhost");
1145 let unknown_user_id = user_id!("@unknown:localhost");
1146
1147 let mut user_ids = vec![user_id.to_owned()];
1149 let presence_event = self.get_presence_event(user_id).await;
1150 assert!(presence_event.unwrap().is_none());
1151 let presence_events = self.get_presence_events(&user_ids).await;
1152 assert!(presence_events.unwrap().is_empty());
1153
1154 let mut changes = StateChanges::default();
1156 changes.presence.insert(user_id.to_owned(), custom_presence_event(user_id));
1157 self.save_changes(&changes).await.unwrap();
1158
1159 let presence_event = self.get_presence_event(user_id).await;
1160 assert!(presence_event.unwrap().is_some());
1161 let presence_events = self.get_presence_events(&user_ids).await;
1162 assert_eq!(presence_events.unwrap().len(), 1);
1163
1164 let mut changes = StateChanges::default();
1166 changes.presence.insert(second_user_id.to_owned(), custom_presence_event(second_user_id));
1167 changes.presence.insert(third_user_id.to_owned(), custom_presence_event(third_user_id));
1168 self.save_changes(&changes).await.unwrap();
1169
1170 user_ids.extend([second_user_id.to_owned(), third_user_id.to_owned()]);
1171 let presence_event = self.get_presence_event(second_user_id).await;
1172 assert!(presence_event.unwrap().is_some());
1173 let presence_event = self.get_presence_event(third_user_id).await;
1174 assert!(presence_event.unwrap().is_some());
1175 let presence_events = self.get_presence_events(&user_ids).await;
1176 assert_eq!(presence_events.unwrap().len(), 3);
1177
1178 user_ids.push(unknown_user_id.to_owned());
1180 let member_events = self.get_presence_events(&user_ids).await;
1181 assert_eq!(member_events.unwrap().len(), 3);
1182
1183 let presence_events = self.get_presence_events(&[]).await;
1185 assert!(presence_events.unwrap().is_empty());
1186 }
1187
1188 async fn test_display_names_saving(&self) {
1189 let room_id = room_id!("!test_display_names_saving:localhost");
1190 let user_id = user_id();
1191 let user_display_name = DisplayName::new("User");
1192 let second_user_id = user_id!("@second:localhost");
1193 let third_user_id = user_id!("@third:localhost");
1194 let other_display_name = DisplayName::new("Raoul");
1195 let unknown_display_name = DisplayName::new("Unknown");
1196
1197 let mut display_names = vec![user_display_name.to_owned()];
1199 let users = self.get_users_with_display_name(room_id, &user_display_name).await.unwrap();
1200 assert!(users.is_empty());
1201 let names = self.get_users_with_display_names(room_id, &display_names).await.unwrap();
1202 assert!(names.is_empty());
1203
1204 let mut changes = StateChanges::default();
1206 changes
1207 .ambiguity_maps
1208 .entry(room_id.to_owned())
1209 .or_default()
1210 .insert(user_display_name.to_owned(), [user_id.to_owned()].into());
1211 self.save_changes(&changes).await.unwrap();
1212
1213 let users = self.get_users_with_display_name(room_id, &user_display_name).await.unwrap();
1214 assert_eq!(users.len(), 1);
1215 let names = self.get_users_with_display_names(room_id, &display_names).await.unwrap();
1216 assert_eq!(names.len(), 1);
1217 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1218
1219 let mut changes = StateChanges::default();
1221 changes.ambiguity_maps.entry(room_id.to_owned()).or_default().insert(
1222 other_display_name.to_owned(),
1223 [second_user_id.to_owned(), third_user_id.to_owned()].into(),
1224 );
1225 self.save_changes(&changes).await.unwrap();
1226
1227 display_names.push(other_display_name.to_owned());
1228 let users = self.get_users_with_display_name(room_id, &user_display_name).await.unwrap();
1229 assert_eq!(users.len(), 1);
1230 let users = self.get_users_with_display_name(room_id, &other_display_name).await.unwrap();
1231 assert_eq!(users.len(), 2);
1232 let names = self.get_users_with_display_names(room_id, &display_names).await.unwrap();
1233 assert_eq!(names.len(), 2);
1234 assert_eq!(names.get(&user_display_name).unwrap().len(), 1);
1235 assert_eq!(names.get(&other_display_name).unwrap().len(), 2);
1236
1237 display_names.push(unknown_display_name.to_owned());
1239 let names = self.get_users_with_display_names(room_id, &display_names).await.unwrap();
1240 assert_eq!(names.len(), 2);
1241
1242 let names = self.get_users_with_display_names(room_id, &[]).await;
1244 assert!(names.unwrap().is_empty());
1245 }
1246
1247 #[allow(clippy::needless_range_loop)]
1248 async fn test_send_queue(&self) {
1249 let room_id = room_id!("!test_send_queue:localhost");
1250
1251 let events = self.load_send_queue_requests(room_id).await.unwrap();
1253 assert!(events.is_empty());
1254
1255 let txn0 = TransactionId::new();
1257 let event0 =
1258 SerializableEventContent::new(&RoomMessageEventContent::text_plain("msg0").into())
1259 .unwrap();
1260 self.save_send_queue_request(
1261 room_id,
1262 txn0.clone(),
1263 MilliSecondsSinceUnixEpoch::now(),
1264 event0.into(),
1265 0,
1266 )
1267 .await
1268 .unwrap();
1269
1270 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1272
1273 assert_eq!(pending.len(), 1);
1274 {
1275 assert_eq!(pending[0].transaction_id, txn0);
1276
1277 let deserialized = pending[0].as_event().unwrap().deserialize().unwrap();
1278 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1279 assert_eq!(content.body(), "msg0");
1280
1281 assert!(!pending[0].is_wedged());
1282 }
1283
1284 for i in 1..=3 {
1286 let txn = TransactionId::new();
1287 let event = SerializableEventContent::new(
1288 &RoomMessageEventContent::text_plain(format!("msg{i}")).into(),
1289 )
1290 .unwrap();
1291
1292 self.save_send_queue_request(
1293 room_id,
1294 txn,
1295 MilliSecondsSinceUnixEpoch::now(),
1296 event.into(),
1297 0,
1298 )
1299 .await
1300 .unwrap();
1301 }
1302
1303 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1305
1306 assert_eq!(pending.len(), 4);
1308
1309 assert_eq!(pending[0].transaction_id, txn0);
1310
1311 for i in 0..4 {
1312 let deserialized = pending[i].as_event().unwrap().deserialize().unwrap();
1313 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1314 assert_eq!(content.body(), format!("msg{i}"));
1315 assert!(!pending[i].is_wedged());
1316 }
1317
1318 let txn2 = &pending[2].transaction_id;
1320 self.update_send_queue_request_status(
1321 room_id,
1322 txn2,
1323 Some(QueueWedgeError::GenericApiError { msg: "Oops".to_owned() }),
1324 )
1325 .await
1326 .unwrap();
1327
1328 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1330
1331 assert_eq!(pending.len(), 4);
1333 assert_eq!(pending[0].transaction_id, txn0);
1334 assert_eq!(pending[2].transaction_id, *txn2);
1335 assert!(pending[2].is_wedged());
1336 let error = pending[2].clone().error.unwrap();
1337 let generic_error = assert_matches!(error, QueueWedgeError::GenericApiError { msg } => msg);
1338 assert_eq!(generic_error, "Oops");
1339 for i in 0..4 {
1340 if i != 2 {
1341 assert!(!pending[i].is_wedged());
1342 }
1343 }
1344
1345 let event0 = SerializableEventContent::new(
1347 &RoomMessageEventContent::text_plain("wow that's a cool test").into(),
1348 )
1349 .unwrap();
1350 self.update_send_queue_request(room_id, txn2, event0.into()).await.unwrap();
1351
1352 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1354
1355 assert_eq!(pending.len(), 4);
1356 {
1357 assert_eq!(pending[2].transaction_id, *txn2);
1358
1359 let deserialized = pending[2].as_event().unwrap().deserialize().unwrap();
1360 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1361 assert_eq!(content.body(), "wow that's a cool test");
1362
1363 assert!(!pending[2].is_wedged());
1364
1365 for i in 0..4 {
1366 if i != 2 {
1367 let deserialized = pending[i].as_event().unwrap().deserialize().unwrap();
1368 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1369 assert_eq!(content.body(), format!("msg{i}"));
1370
1371 assert!(!pending[i].is_wedged());
1372 }
1373 }
1374 }
1375
1376 self.remove_send_queue_request(room_id, &txn0).await.unwrap();
1378
1379 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1381
1382 assert_eq!(pending.len(), 3);
1383 assert_eq!(pending[1].transaction_id, *txn2);
1384 for i in 0..3 {
1385 assert_ne!(pending[i].transaction_id, txn0);
1386 }
1387
1388 let room_id2 = room_id!("!test_send_queue_two:localhost");
1393 {
1394 let txn = TransactionId::new();
1395 let event =
1396 SerializableEventContent::new(&RoomMessageEventContent::text_plain("room2").into())
1397 .unwrap();
1398 self.save_send_queue_request(
1399 room_id2,
1400 txn.clone(),
1401 MilliSecondsSinceUnixEpoch::now(),
1402 event.into(),
1403 0,
1404 )
1405 .await
1406 .unwrap();
1407 }
1408
1409 {
1411 let room_id3 = room_id!("!test_send_queue_three:localhost");
1412 let txn = TransactionId::new();
1413 let event =
1414 SerializableEventContent::new(&RoomMessageEventContent::text_plain("room3").into())
1415 .unwrap();
1416 self.save_send_queue_request(
1417 room_id3,
1418 txn.clone(),
1419 MilliSecondsSinceUnixEpoch::now(),
1420 event.into(),
1421 0,
1422 )
1423 .await
1424 .unwrap();
1425
1426 self.remove_send_queue_request(room_id3, &txn).await.unwrap();
1427 }
1428
1429 let outstanding_rooms = self.load_rooms_with_unsent_requests().await.unwrap();
1432 assert_eq!(outstanding_rooms.len(), 2);
1433 assert!(outstanding_rooms.iter().any(|room| room == room_id));
1434 assert!(outstanding_rooms.iter().any(|room| room == room_id2));
1435 }
1436
1437 async fn test_send_queue_priority(&self) {
1438 let room_id = room_id!("!test_send_queue:localhost");
1439
1440 let events = self.load_send_queue_requests(room_id).await.unwrap();
1442 assert!(events.is_empty());
1443
1444 let low0_txn = TransactionId::new();
1446 let ev0 =
1447 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low0").into())
1448 .unwrap();
1449 self.save_send_queue_request(
1450 room_id,
1451 low0_txn.clone(),
1452 MilliSecondsSinceUnixEpoch::now(),
1453 ev0.into(),
1454 2,
1455 )
1456 .await
1457 .unwrap();
1458
1459 let high_txn = TransactionId::new();
1461 let ev1 =
1462 SerializableEventContent::new(&RoomMessageEventContent::text_plain("high").into())
1463 .unwrap();
1464 self.save_send_queue_request(
1465 room_id,
1466 high_txn.clone(),
1467 MilliSecondsSinceUnixEpoch::now(),
1468 ev1.into(),
1469 10,
1470 )
1471 .await
1472 .unwrap();
1473
1474 let low1_txn = TransactionId::new();
1476 let ev2 =
1477 SerializableEventContent::new(&RoomMessageEventContent::text_plain("low1").into())
1478 .unwrap();
1479 self.save_send_queue_request(
1480 room_id,
1481 low1_txn.clone(),
1482 MilliSecondsSinceUnixEpoch::now(),
1483 ev2.into(),
1484 2,
1485 )
1486 .await
1487 .unwrap();
1488
1489 let pending = self.load_send_queue_requests(room_id).await.unwrap();
1492
1493 assert_eq!(pending.len(), 3);
1494 {
1495 assert_eq!(pending[0].transaction_id, high_txn);
1496
1497 let deserialized = pending[0].as_event().unwrap().deserialize().unwrap();
1498 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1499 assert_eq!(content.body(), "high");
1500 }
1501
1502 {
1503 assert_eq!(pending[1].transaction_id, low0_txn);
1504
1505 let deserialized = pending[1].as_event().unwrap().deserialize().unwrap();
1506 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1507 assert_eq!(content.body(), "low0");
1508 }
1509
1510 {
1511 assert_eq!(pending[2].transaction_id, low1_txn);
1512
1513 let deserialized = pending[2].as_event().unwrap().deserialize().unwrap();
1514 assert_let!(AnyMessageLikeEventContent::RoomMessage(content) = deserialized);
1515 assert_eq!(content.body(), "low1");
1516 }
1517 }
1518
1519 async fn test_send_queue_dependents(&self) {
1520 let room_id = room_id!("!test_send_queue_dependents:localhost");
1521
1522 let txn0 = TransactionId::new();
1524 let event0 =
1525 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey").into())
1526 .unwrap();
1527 self.save_send_queue_request(
1528 room_id,
1529 txn0.clone(),
1530 MilliSecondsSinceUnixEpoch::now(),
1531 event0.into(),
1532 0,
1533 )
1534 .await
1535 .unwrap();
1536
1537 assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());
1539
1540 let child_txn = ChildTransactionId::new();
1542 self.save_dependent_queued_request(
1543 room_id,
1544 &txn0,
1545 child_txn.clone(),
1546 MilliSecondsSinceUnixEpoch::now(),
1547 DependentQueuedRequestKind::RedactEvent,
1548 )
1549 .await
1550 .unwrap();
1551
1552 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1554 assert_eq!(dependents.len(), 1);
1555 assert_eq!(dependents[0].parent_transaction_id, txn0);
1556 assert_eq!(dependents[0].own_transaction_id, child_txn);
1557 assert!(dependents[0].parent_key.is_none());
1558 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1559
1560 let event_id = owned_event_id!("$1");
1562 let num_updated = self
1563 .mark_dependent_queued_requests_as_ready(
1564 room_id,
1565 &txn0,
1566 SentRequestKey::Event(event_id.clone()),
1567 )
1568 .await
1569 .unwrap();
1570 assert_eq!(num_updated, 1);
1571
1572 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1574 assert_eq!(dependents.len(), 1);
1575 assert_eq!(dependents[0].parent_transaction_id, txn0);
1576 assert_eq!(dependents[0].own_transaction_id, child_txn);
1577 assert_matches!(dependents[0].parent_key.as_ref(), Some(SentRequestKey::Event(eid)) => {
1578 assert_eq!(*eid, event_id);
1579 });
1580 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1581
1582 let removed = self
1584 .remove_dependent_queued_request(room_id, &dependents[0].own_transaction_id)
1585 .await
1586 .unwrap();
1587 assert!(removed);
1588
1589 assert!(self.load_dependent_queued_requests(room_id).await.unwrap().is_empty());
1591
1592 let txn1 = TransactionId::new();
1595 let event1 =
1596 SerializableEventContent::new(&RoomMessageEventContent::text_plain("hey2").into())
1597 .unwrap();
1598 self.save_send_queue_request(
1599 room_id,
1600 txn1.clone(),
1601 MilliSecondsSinceUnixEpoch::now(),
1602 event1.into(),
1603 0,
1604 )
1605 .await
1606 .unwrap();
1607
1608 self.save_dependent_queued_request(
1609 room_id,
1610 &txn0,
1611 ChildTransactionId::new(),
1612 MilliSecondsSinceUnixEpoch::now(),
1613 DependentQueuedRequestKind::RedactEvent,
1614 )
1615 .await
1616 .unwrap();
1617 assert_eq!(self.load_dependent_queued_requests(room_id).await.unwrap().len(), 1);
1618
1619 self.save_dependent_queued_request(
1620 room_id,
1621 &txn1,
1622 ChildTransactionId::new(),
1623 MilliSecondsSinceUnixEpoch::now(),
1624 DependentQueuedRequestKind::EditEvent {
1625 new_content: SerializableEventContent::new(
1626 &RoomMessageEventContent::text_plain("edit").into(),
1627 )
1628 .unwrap(),
1629 },
1630 )
1631 .await
1632 .unwrap();
1633 assert_eq!(self.load_dependent_queued_requests(room_id).await.unwrap().len(), 2);
1634
1635 let removed = self.remove_send_queue_request(room_id, &txn0).await.unwrap();
1637 assert!(removed);
1638
1639 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1641 assert_eq!(dependents.len(), 2);
1642 }
1643
1644 async fn test_update_send_queue_dependent(&self) {
1645 let room_id = room_id!("!test_send_queue_dependents:localhost");
1646
1647 let txn = TransactionId::new();
1648
1649 let child_txn = ChildTransactionId::new();
1651
1652 self.save_dependent_queued_request(
1653 room_id,
1654 &txn,
1655 child_txn.clone(),
1656 MilliSecondsSinceUnixEpoch::now(),
1657 DependentQueuedRequestKind::RedactEvent,
1658 )
1659 .await
1660 .unwrap();
1661
1662 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1664 assert_eq!(dependents.len(), 1);
1665 assert_eq!(dependents[0].parent_transaction_id, txn);
1666 assert_eq!(dependents[0].own_transaction_id, child_txn);
1667 assert!(dependents[0].parent_key.is_none());
1668 assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent);
1669
1670 self.update_dependent_queued_request(
1672 room_id,
1673 &child_txn,
1674 DependentQueuedRequestKind::ReactEvent { key: "👍".to_owned() },
1675 )
1676 .await
1677 .unwrap();
1678
1679 let dependents = self.load_dependent_queued_requests(room_id).await.unwrap();
1681 assert_eq!(dependents.len(), 1);
1682 assert_eq!(dependents[0].parent_transaction_id, txn);
1683 assert_eq!(dependents[0].own_transaction_id, child_txn);
1684 assert!(dependents[0].parent_key.is_none());
1685 assert_matches!(
1686 &dependents[0].kind,
1687 DependentQueuedRequestKind::ReactEvent { key } => {
1688 assert_eq!(key, "👍");
1689 }
1690 );
1691 }
1692
1693 async fn test_get_room_infos(&self) {
1694 let room_id_0 = room_id!("!r0");
1695 let room_id_1 = room_id!("!r1");
1696 let room_id_2 = room_id!("!r2");
1697
1698 {
1700 assert_eq!(self.get_room_infos(&RoomLoadSettings::default()).await.unwrap().len(), 0);
1701 }
1702
1703 let mut changes = StateChanges::default();
1705 changes.add_room(RoomInfo::new(room_id_0, RoomState::Joined));
1706 changes.add_room(RoomInfo::new(room_id_1, RoomState::Joined));
1707 self.save_changes(&changes).await.unwrap();
1708
1709 {
1711 let mut all_rooms = self.get_room_infos(&RoomLoadSettings::All).await.unwrap();
1712
1713 all_rooms.sort_by(|a, b| a.room_id.cmp(&b.room_id));
1716
1717 assert_eq!(all_rooms.len(), 2);
1718 assert_eq!(all_rooms[0].room_id, room_id_0);
1719 assert_eq!(all_rooms[1].room_id, room_id_1);
1720 }
1721
1722 {
1724 let all_rooms =
1725 self.get_room_infos(&RoomLoadSettings::One(room_id_1.to_owned())).await.unwrap();
1726
1727 assert_eq!(all_rooms.len(), 1);
1728 assert_eq!(all_rooms[0].room_id, room_id_1);
1729 }
1730
1731 {
1734 let all_rooms =
1735 self.get_room_infos(&RoomLoadSettings::One(room_id_2.to_owned())).await.unwrap();
1736
1737 assert_eq!(all_rooms.len(), 0);
1738 }
1739 }
1740}
1741
1742#[allow(unused_macros, unused_extern_crates)]
1768#[macro_export]
1769macro_rules! statestore_integration_tests {
1770 () => {
1771 mod statestore_integration_tests {
1772 use matrix_sdk_test::async_test;
1773 use $crate::store::{
1774 IntoStateStore, Result as StoreResult, StateStoreIntegrationTests,
1775 };
1776
1777 use super::get_store;
1778
1779 #[async_test]
1780 async fn test_topic_redaction() -> StoreResult<()> {
1781 let store = get_store().await?.into_state_store();
1782 store.test_topic_redaction().await
1783 }
1784
1785 #[async_test]
1786 async fn test_populate_store() -> StoreResult<()> {
1787 let store = get_store().await?.into_state_store();
1788 store.test_populate_store().await
1789 }
1790
1791 #[async_test]
1792 async fn test_member_saving() {
1793 let store = get_store().await.unwrap().into_state_store();
1794 store.test_member_saving().await
1795 }
1796
1797 #[async_test]
1798 async fn test_filter_saving() {
1799 let store = get_store().await.unwrap().into_state_store();
1800 store.test_filter_saving().await
1801 }
1802
1803 #[async_test]
1804 async fn test_user_avatar_url_saving() {
1805 let store = get_store().await.unwrap().into_state_store();
1806 store.test_user_avatar_url_saving().await
1807 }
1808
1809 #[async_test]
1810 async fn test_server_capabilities_saving() {
1811 let store = get_store().await.unwrap().into_state_store();
1812 store.test_server_capabilities_saving().await
1813 }
1814
1815 #[async_test]
1816 async fn test_sync_token_saving() {
1817 let store = get_store().await.unwrap().into_state_store();
1818 store.test_sync_token_saving().await
1819 }
1820
1821 #[async_test]
1822 async fn test_utd_hook_manager_data_saving() {
1823 let store = get_store().await.expect("creating store failed").into_state_store();
1824 store.test_utd_hook_manager_data_saving().await;
1825 }
1826
1827 #[async_test]
1828 async fn test_stripped_member_saving() {
1829 let store = get_store().await.unwrap().into_state_store();
1830 store.test_stripped_member_saving().await
1831 }
1832
1833 #[async_test]
1834 async fn test_power_level_saving() {
1835 let store = get_store().await.unwrap().into_state_store();
1836 store.test_power_level_saving().await
1837 }
1838
1839 #[async_test]
1840 async fn test_receipts_saving() {
1841 let store = get_store().await.expect("creating store failed").into_state_store();
1842 store.test_receipts_saving().await;
1843 }
1844
1845 #[async_test]
1846 async fn test_custom_storage() -> StoreResult<()> {
1847 let store = get_store().await?.into_state_store();
1848 store.test_custom_storage().await
1849 }
1850
1851 #[async_test]
1852 async fn test_stripped_non_stripped() -> StoreResult<()> {
1853 let store = get_store().await.unwrap().into_state_store();
1854 store.test_stripped_non_stripped().await
1855 }
1856
1857 #[async_test]
1858 async fn test_room_removal() -> StoreResult<()> {
1859 let store = get_store().await?.into_state_store();
1860 store.test_room_removal().await
1861 }
1862
1863 #[async_test]
1864 async fn test_profile_removal() -> StoreResult<()> {
1865 let store = get_store().await?.into_state_store();
1866 store.test_profile_removal().await
1867 }
1868
1869 #[async_test]
1870 async fn test_presence_saving() {
1871 let store = get_store().await.expect("creating store failed").into_state_store();
1872 store.test_presence_saving().await;
1873 }
1874
1875 #[async_test]
1876 async fn test_display_names_saving() {
1877 let store = get_store().await.expect("creating store failed").into_state_store();
1878 store.test_display_names_saving().await;
1879 }
1880
1881 #[async_test]
1882 async fn test_send_queue() {
1883 let store = get_store().await.expect("creating store failed").into_state_store();
1884 store.test_send_queue().await;
1885 }
1886
1887 #[async_test]
1888 async fn test_send_queue_priority() {
1889 let store = get_store().await.expect("creating store failed").into_state_store();
1890 store.test_send_queue_priority().await;
1891 }
1892
1893 #[async_test]
1894 async fn test_send_queue_dependents() {
1895 let store = get_store().await.expect("creating store failed").into_state_store();
1896 store.test_send_queue_dependents().await;
1897 }
1898
1899 #[async_test]
1900 async fn test_update_send_queue_dependent() {
1901 let store = get_store().await.expect("creating store failed").into_state_store();
1902 store.test_update_send_queue_dependent().await;
1903 }
1904
1905 #[async_test]
1906 async fn test_get_room_infos() {
1907 let store = get_store().await.expect("creating store failed").into_state_store();
1908 store.test_get_room_infos().await;
1909 }
1910 }
1911 };
1912}
1913
1914fn user_id() -> &'static UserId {
1915 user_id!("@example:localhost")
1916}
1917
1918fn invited_user_id() -> &'static UserId {
1919 user_id!("@invited:localhost")
1920}
1921
1922fn room_id() -> &'static RoomId {
1923 room_id!("!test:localhost")
1924}
1925
1926fn stripped_room_id() -> &'static RoomId {
1927 room_id!("!stripped:localhost")
1928}
1929
1930fn first_receipt_event_id() -> &'static EventId {
1931 event_id!("$example")
1932}
1933
1934fn power_level_event() -> Raw<AnySyncStateEvent> {
1935 let content = RoomPowerLevelsEventContent::default();
1936
1937 let event = json!({
1938 "event_id": "$h29iv0s8:example.com",
1939 "content": content,
1940 "sender": user_id(),
1941 "type": "m.room.power_levels",
1942 "origin_server_ts": 0u64,
1943 "state_key": "",
1944 });
1945
1946 serde_json::from_value(event).unwrap()
1947}
1948
1949fn stripped_membership_event() -> Raw<StrippedRoomMemberEvent> {
1950 custom_stripped_membership_event(user_id())
1951}
1952
1953fn custom_stripped_membership_event(user_id: &UserId) -> Raw<StrippedRoomMemberEvent> {
1954 let ev_json = json!({
1955 "type": "m.room.member",
1956 "content": RoomMemberEventContent::new(MembershipState::Join),
1957 "sender": user_id,
1958 "state_key": user_id,
1959 });
1960
1961 Raw::new(&ev_json).unwrap().cast()
1962}
1963
1964fn membership_event() -> Raw<SyncRoomMemberEvent> {
1965 custom_membership_event(user_id(), event_id!("$h29iv0s8:example.com"))
1966}
1967
1968fn custom_membership_event(user_id: &UserId, event_id: &EventId) -> Raw<SyncRoomMemberEvent> {
1969 let ev_json = json!({
1970 "type": "m.room.member",
1971 "content": RoomMemberEventContent::new(MembershipState::Join),
1972 "event_id": event_id,
1973 "origin_server_ts": 198,
1974 "sender": user_id,
1975 "state_key": user_id,
1976 });
1977
1978 Raw::new(&ev_json).unwrap().cast()
1979}
1980
1981fn custom_presence_event(user_id: &UserId) -> Raw<PresenceEvent> {
1982 let ev_json = json!({
1983 "content": {
1984 "presence": "online"
1985 },
1986 "sender": user_id,
1987 });
1988
1989 Raw::new(&ev_json).unwrap().cast()
1990}