1use std::{collections::BTreeMap, fmt, sync::Arc};
18
19use events::Gap;
20use matrix_sdk_base::{
21 deserialized_responses::{AmbiguityChange, TimelineEvent},
22 linked_chunk::ChunkContent,
23 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
24};
25use ruma::{
26 events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
27 serde::Raw,
28 EventId, OwnedEventId, OwnedRoomId, RoomVersionId,
29};
30use tokio::sync::{
31 broadcast::{Receiver, Sender},
32 Notify, RwLock,
33};
34use tracing::{debug, trace, warn};
35
36use super::{
37 paginator::{Paginator, PaginatorState},
38 AllEventsCache, EventsOrigin, Result, RoomEventCacheUpdate, RoomPagination,
39};
40use crate::{client::WeakClient, room::WeakRoom};
41
42pub(super) mod events;
43
44#[derive(Clone)]
48pub struct RoomEventCache {
49 pub(super) inner: Arc<RoomEventCacheInner>,
50}
51
52impl fmt::Debug for RoomEventCache {
53 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54 f.debug_struct("RoomEventCache").finish_non_exhaustive()
55 }
56}
57
58impl RoomEventCache {
59 pub(super) fn new(
61 client: WeakClient,
62 state: RoomEventCacheState,
63 room_id: OwnedRoomId,
64 room_version: RoomVersionId,
65 all_events_cache: Arc<RwLock<AllEventsCache>>,
66 ) -> Self {
67 Self {
68 inner: Arc::new(RoomEventCacheInner::new(
69 client,
70 state,
71 room_id,
72 room_version,
73 all_events_cache,
74 )),
75 }
76 }
77
78 pub async fn subscribe(&self) -> Result<(Vec<TimelineEvent>, Receiver<RoomEventCacheUpdate>)> {
81 let state = self.inner.state.read().await;
82 let events = state.events().events().map(|(_position, item)| item.clone()).collect();
83
84 Ok((events, self.inner.sender.subscribe()))
85 }
86
87 pub fn pagination(&self) -> RoomPagination {
90 RoomPagination { inner: self.inner.clone() }
91 }
92
93 pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
95 if let Some((room_id, event)) =
96 self.inner.all_events.read().await.events.get(event_id).cloned()
97 {
98 if room_id == self.inner.room_id {
99 return Some(event);
100 }
101 }
102
103 let state = self.inner.state.read().await;
104 for (_pos, event) in state.events().revents() {
105 if event.event_id().as_deref() == Some(event_id) {
106 return Some(event.clone());
107 }
108 }
109 None
110 }
111
112 pub async fn event_with_relations(
117 &self,
118 event_id: &EventId,
119 filter: Option<Vec<RelationType>>,
120 ) -> Option<(TimelineEvent, Vec<TimelineEvent>)> {
121 let cache = self.inner.all_events.read().await;
122 if let Some((_, event)) = cache.events.get(event_id) {
123 let related_events = cache.collect_related_events(event_id, filter.as_deref());
124 Some((event.clone(), related_events))
125 } else {
126 None
127 }
128 }
129
130 pub async fn clear(&self) -> Result<()> {
135 self.inner.state.write().await.reset().await?;
137
138 self.inner.all_events.write().await.clear();
140
141 let _ = self.inner.paginator.set_idle_state(PaginatorState::Initial, None, None);
144
145 let _ = self.inner.sender.send(RoomEventCacheUpdate::Clear);
147
148 Ok(())
149 }
150
151 pub(crate) async fn save_event(&self, event: TimelineEvent) {
157 if let Some(event_id) = event.event_id() {
158 let mut cache = self.inner.all_events.write().await;
159
160 cache.append_related_event(&event);
161 cache.events.insert(event_id, (self.inner.room_id.clone(), event));
162 } else {
163 warn!("couldn't save event without event id in the event cache");
164 }
165 }
166
167 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = TimelineEvent>) {
174 let mut cache = self.inner.all_events.write().await;
175 for event in events {
176 if let Some(event_id) = event.event_id() {
177 cache.append_related_event(&event);
178 cache.events.insert(event_id, (self.inner.room_id.clone(), event));
179 } else {
180 warn!("couldn't save event without event id in the event cache");
181 }
182 }
183 }
184
185 pub async fn debug_string(&self) -> Vec<String> {
188 self.inner.state.read().await.events().debug_string()
189 }
190}
191
192pub(super) struct RoomEventCacheInner {
194 room_id: OwnedRoomId,
196
197 pub(crate) room_version: RoomVersionId,
199
200 pub sender: Sender<RoomEventCacheUpdate>,
202
203 pub state: RwLock<RoomEventCacheState>,
205
206 all_events: Arc<RwLock<AllEventsCache>>,
211
212 pub pagination_batch_token_notifier: Notify,
214
215 pub paginator: Paginator<WeakRoom>,
224}
225
226impl RoomEventCacheInner {
227 fn new(
230 client: WeakClient,
231 state: RoomEventCacheState,
232 room_id: OwnedRoomId,
233 room_version: RoomVersionId,
234 all_events_cache: Arc<RwLock<AllEventsCache>>,
235 ) -> Self {
236 let sender = Sender::new(32);
237 let weak_room = WeakRoom::new(client, room_id);
238 Self {
239 room_id: weak_room.room_id().to_owned(),
240 room_version,
241 state: RwLock::new(state),
242 all_events: all_events_cache,
243 sender,
244 pagination_batch_token_notifier: Default::default(),
245 paginator: Paginator::new(weak_room),
246 }
247 }
248
249 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
250 if account_data.is_empty() {
251 return;
252 }
253
254 let mut handled_read_marker = false;
255
256 trace!("Handling account data");
257
258 for raw_event in account_data {
259 match raw_event.deserialize() {
260 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
261 if handled_read_marker {
267 continue;
268 }
269
270 handled_read_marker = true;
271
272 let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
274 event_id: ev.content.event_id,
275 });
276 }
277
278 Ok(_) => {
279 }
282
283 Err(e) => {
284 let event_type = raw_event.get_field::<String>("type").ok().flatten();
285 warn!(event_type, "Failed to deserialize account data: {e}");
286 }
287 }
288 }
289 }
290
291 pub(super) async fn handle_joined_room_update(
292 &self,
293 has_storage: bool,
294 updates: JoinedRoomUpdate,
295 ) -> Result<()> {
296 self.handle_timeline(
297 has_storage,
298 updates.timeline,
299 updates.ephemeral.clone(),
300 updates.ambiguity_changes,
301 )
302 .await?;
303
304 self.handle_account_data(updates.account_data);
305
306 Ok(())
307 }
308
309 async fn handle_timeline(
310 &self,
311 has_storage: bool,
312 timeline: Timeline,
313 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
314 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
315 ) -> Result<()> {
316 if !has_storage && timeline.limited {
317 trace!("limited timeline, clearing all previous events and pushing new events");
321
322 self.replace_all_events_by(
323 timeline.events,
324 timeline.prev_batch,
325 ephemeral_events,
326 ambiguity_changes,
327 )
328 .await?;
329 } else {
330 trace!("adding new events");
332
333 let prev_batch =
341 if has_storage && !timeline.limited { None } else { timeline.prev_batch };
342
343 let mut state = self.state.write().await;
344 self.append_events_locked(
345 &mut state,
346 timeline.events,
347 prev_batch,
348 ephemeral_events,
349 ambiguity_changes,
350 )
351 .await?;
352 }
353
354 Ok(())
355 }
356
357 pub(super) async fn handle_left_room_update(
358 &self,
359 has_storage: bool,
360 updates: LeftRoomUpdate,
361 ) -> Result<()> {
362 self.handle_timeline(has_storage, updates.timeline, Vec::new(), updates.ambiguity_changes)
363 .await?;
364 Ok(())
365 }
366
367 pub(super) async fn replace_all_events_by(
370 &self,
371 sync_timeline_events: Vec<TimelineEvent>,
372 prev_batch: Option<String>,
373 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
374 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
375 ) -> Result<()> {
376 let mut state = self.state.write().await;
378
379 state.reset().await?;
381
382 let _ = self.sender.send(RoomEventCacheUpdate::Clear);
384
385 self.append_events_locked(
387 &mut state,
388 sync_timeline_events,
389 prev_batch.clone(),
390 ephemeral_events,
391 ambiguity_changes,
392 )
393 .await?;
394
395 self.paginator.set_idle_state(PaginatorState::Initial, prev_batch, None)?;
397
398 Ok(())
399 }
400
401 async fn append_events_locked(
406 &self,
407 state: &mut RoomEventCacheState,
408 sync_timeline_events: Vec<TimelineEvent>,
409 prev_batch: Option<String>,
410 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
411 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
412 ) -> Result<()> {
413 if sync_timeline_events.is_empty()
414 && prev_batch.is_none()
415 && ephemeral_events.is_empty()
416 && ambiguity_changes.is_empty()
417 {
418 return Ok(());
419 }
420
421 let sync_timeline_events_diffs = {
424 let (_, sync_timeline_events_diffs) = state
425 .with_events_mut(|room_events| {
426 if let Some(prev_token) = &prev_batch {
427 room_events.push_gap(Gap { prev_token: prev_token.clone() });
428 }
429
430 let added_unique_events = room_events.push_events(sync_timeline_events.clone());
431
432 if !added_unique_events {
433 debug!(
434 "not storing previous batch token, because we deduplicated all new sync events"
435 );
436
437 if let Some(prev_token) = &prev_batch {
438 trace!("removing gap we just inserted");
442
443 let prev_gap_id = room_events
445 .rchunks()
446 .find_map(|c| {
447 let gap = as_variant::as_variant!(c.content(), ChunkContent::Gap)?;
448 (gap.prev_token == *prev_token).then_some(c.identifier())
449 })
450 .expect("we just inserted the gap beforehand");
451
452 room_events
453 .replace_gap_at([], prev_gap_id)
454 .expect("we obtained the valid position beforehand");
455 }
456 }
457
458 room_events.on_new_events(&self.room_version, sync_timeline_events.iter());
459 })
460 .await?;
461
462 let mut all_events = self.all_events.write().await;
463
464 for sync_timeline_event in sync_timeline_events {
465 if let Some(event_id) = sync_timeline_event.event_id() {
466 all_events.append_related_event(&sync_timeline_event);
467 all_events
468 .events
469 .insert(event_id.to_owned(), (self.room_id.clone(), sync_timeline_event));
470 }
471 }
472
473 sync_timeline_events_diffs
474 };
475
476 if prev_batch.is_some() {
479 self.pagination_batch_token_notifier.notify_one();
480 }
481
482 {
484 if !sync_timeline_events_diffs.is_empty() {
485 let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
486 diffs: sync_timeline_events_diffs,
487 origin: EventsOrigin::Sync,
488 });
489 }
490
491 if !ephemeral_events.is_empty() {
492 let _ = self
493 .sender
494 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
495 }
496
497 if !ambiguity_changes.is_empty() {
498 let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
499 }
500 }
501
502 Ok(())
503 }
504}
505
506fn chunk_debug_string(content: &ChunkContent<TimelineEvent, Gap>) -> String {
508 match content {
509 ChunkContent::Gap(Gap { prev_token }) => {
510 format!("gap['{prev_token}']")
511 }
512 ChunkContent::Items(vec) => {
513 let items = vec
514 .iter()
515 .map(|event| {
516 event.event_id().map_or_else(
518 || "<no event id>".to_owned(),
519 |id| id.as_str().chars().take(1 + 8).collect(),
520 )
521 })
522 .collect::<Vec<_>>()
523 .join(", ");
524 format!("events[{items}]")
525 }
526 }
527}
528
529mod private {
531 use std::sync::Arc;
532
533 use eyeball_im::VectorDiff;
534 use matrix_sdk_base::{
535 deserialized_responses::{TimelineEvent, TimelineEventKind},
536 event_cache::{
537 store::{
538 EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockGuard,
539 DEFAULT_CHUNK_CAPACITY,
540 },
541 Event, Gap,
542 },
543 linked_chunk::{LinkedChunk, LinkedChunkBuilder, RawChunk, Update},
544 };
545 use once_cell::sync::OnceCell;
546 use ruma::{serde::Raw, OwnedRoomId, RoomId};
547 use tracing::{error, instrument, trace};
548
549 use super::{chunk_debug_string, events::RoomEvents};
550 use crate::event_cache::EventCacheError;
551
552 pub struct RoomEventCacheState {
557 room: OwnedRoomId,
559
560 store: Arc<OnceCell<EventCacheStoreLock>>,
565
566 events: RoomEvents,
568
569 pub waited_for_initial_prev_token: bool,
574 }
575
576 impl RoomEventCacheState {
577 async fn try_reload_linked_chunk(
578 room: &RoomId,
579 locked: &EventCacheStoreLockGuard<'_>,
580 ) -> Result<Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>, EventCacheError>
581 {
582 let raw_chunks = locked.reload_linked_chunk(room).await?;
583
584 let mut builder = LinkedChunkBuilder::from_raw_parts(raw_chunks.clone());
585
586 builder.with_update_history();
587
588 Ok(builder.build().map_err(|err| {
589 if tracing::enabled!(tracing::Level::TRACE) {
591 trace!("couldn't build a linked chunk from the raw parts. Raw chunks:");
592 for line in raw_chunks_debug_string(raw_chunks) {
593 trace!("{line}");
594 }
595 }
596
597 EventCacheStoreError::InvalidData { details: err.to_string() }
598 })?)
599 }
600
601 pub async fn new(
603 room: OwnedRoomId,
604 store: Arc<OnceCell<EventCacheStoreLock>>,
605 ) -> Result<Self, EventCacheError> {
606 let events = if let Some(store) = store.get() {
607 let locked = store.lock().await?;
608
609 let linked_chunk = match Self::try_reload_linked_chunk(&room, &locked).await {
612 Ok(linked_chunk) => linked_chunk,
613 Err(err) => {
614 error!("error when reloading a linked chunk from memory: {err}");
615
616 locked.handle_linked_chunk_updates(&room, vec![Update::Clear]).await?;
618
619 None
621 }
622 };
623
624 RoomEvents::with_initial_chunks(linked_chunk)
625 } else {
626 RoomEvents::default()
627 };
628
629 Ok(Self { room, store, events, waited_for_initial_prev_token: false })
630 }
631
632 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
636 let mut closure = || -> Option<()> {
640 let mut val: serde_json::Value = event.deserialize_as().ok()?;
641 let unsigned = val.get_mut("unsigned")?;
642 let unsigned_obj = unsigned.as_object_mut()?;
643 if unsigned_obj.remove("m.relations").is_some() {
644 *event = Raw::new(&val).ok()?.cast();
645 }
646 None
647 };
648 let _ = closure();
649 }
650
651 fn strip_relations_from_event(ev: &mut TimelineEvent) {
652 match &mut ev.kind {
653 TimelineEventKind::Decrypted(decrypted) => {
654 decrypted.unsigned_encryption_info = None;
657
658 Self::strip_relations_if_present(&mut decrypted.event);
660 }
661
662 TimelineEventKind::UnableToDecrypt { event, .. }
663 | TimelineEventKind::PlainText { event } => {
664 Self::strip_relations_if_present(event);
665 }
666 }
667 }
668
669 fn strip_relations_from_events(items: &mut [TimelineEvent]) {
671 for ev in items.iter_mut() {
672 Self::strip_relations_from_event(ev);
673 }
674 }
675
676 #[instrument(skip_all)]
678 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
679 let mut updates = self.events.updates().take();
680
681 if updates.is_empty() {
682 return Ok(());
683 }
684
685 let Some(store) = self.store.get() else {
686 return Ok(());
687 };
688
689 trace!("propagating {} updates", updates.len());
690
691 for up in updates.iter_mut() {
693 match up {
694 Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
695 Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
696 Update::NewItemsChunk { .. }
698 | Update::NewGapChunk { .. }
699 | Update::RemoveChunk(_)
700 | Update::RemoveItem { .. }
701 | Update::DetachLastItems { .. }
702 | Update::StartReattachItems
703 | Update::EndReattachItems
704 | Update::Clear => {}
705 }
706 }
707
708 let store = store.clone();
715 let room_id = self.room.clone();
716
717 matrix_sdk_common::executor::spawn(async move {
718 let locked = store.lock().await?;
719
720 if let Err(err) = locked.handle_linked_chunk_updates(&room_id, updates).await {
721 error!("unable to handle linked chunk updates: {err}");
722 }
723
724 super::Result::Ok(())
725 })
726 .await
727 .expect("joining failed")?;
728
729 trace!("done propagating store changes");
730
731 Ok(())
732 }
733
734 pub async fn reset(&mut self) -> Result<(), EventCacheError> {
736 self.events.reset();
737 self.propagate_changes().await?;
738 self.waited_for_initial_prev_token = false;
739 Ok(())
740 }
741
742 pub fn events(&self) -> &RoomEvents {
744 &self.events
745 }
746
747 pub async fn with_events_mut<O, F: FnOnce(&mut RoomEvents) -> O>(
754 &mut self,
755 func: F,
756 ) -> Result<(O, Vec<VectorDiff<TimelineEvent>>), EventCacheError> {
757 let output = func(&mut self.events);
758 self.propagate_changes().await?;
759 let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
760 Ok((output, updates_as_vector_diffs))
761 }
762 }
763
764 fn raw_chunks_debug_string(mut raw_chunks: Vec<RawChunk<Event, Gap>>) -> Vec<String> {
767 let mut result = Vec::new();
768
769 raw_chunks.sort_by_key(|c| c.identifier.index());
771
772 for c in raw_chunks {
773 let content = chunk_debug_string(&c.content);
774
775 let prev =
776 c.previous.map_or_else(|| "<none>".to_owned(), |prev| prev.index().to_string());
777 let next = c.next.map_or_else(|| "<none>".to_owned(), |prev| prev.index().to_string());
778
779 let line =
780 format!("chunk #{} (prev={prev}, next={next}): {content}", c.identifier.index());
781
782 result.push(line);
783 }
784
785 result
786 }
787
788 #[cfg(test)]
789 mod tests {
790 use matrix_sdk_base::{
791 event_cache::Gap,
792 linked_chunk::{ChunkContent, ChunkIdentifier as CId, RawChunk},
793 };
794 use matrix_sdk_test::{event_factory::EventFactory, ALICE, DEFAULT_TEST_ROOM_ID};
795 use ruma::event_id;
796
797 use super::raw_chunks_debug_string;
798
799 #[test]
800 fn test_raw_chunks_debug_string() {
801 let mut raws = Vec::new();
802 let f = EventFactory::new().room(&DEFAULT_TEST_ROOM_ID).sender(*ALICE);
803
804 raws.push(RawChunk {
805 content: ChunkContent::Items(vec![
806 f.text_msg("hey")
807 .event_id(event_id!("$123456789101112131415617181920"))
808 .into_event(),
809 f.text_msg("you").event_id(event_id!("$2")).into_event(),
810 ]),
811 identifier: CId::new(1),
812 previous: Some(CId::new(0)),
813 next: None,
814 });
815
816 raws.push(RawChunk {
817 content: ChunkContent::Gap(Gap { prev_token: "prev-token".to_owned() }),
818 identifier: CId::new(0),
819 previous: None,
820 next: Some(CId::new(1)),
821 });
822
823 let output = raw_chunks_debug_string(raws);
824 assert_eq!(output.len(), 2);
825 assert_eq!(&output[0], "chunk #0 (prev=<none>, next=1): gap['prev-token']");
826 assert_eq!(&output[1], "chunk #1 (prev=0, next=<none>): events[$12345678, $2]");
827 }
828 }
829}
830
831pub(super) use private::RoomEventCacheState;
832
833#[cfg(test)]
834mod tests {
835 use std::sync::Arc;
836
837 use assert_matches::assert_matches;
838 use assert_matches2::assert_let;
839 use matrix_sdk_base::{
840 event_cache::{
841 store::{EventCacheStore as _, MemoryStore},
842 Gap,
843 },
844 linked_chunk::{ChunkContent, ChunkIdentifier, Position, Update},
845 store::StoreConfig,
846 sync::{JoinedRoomUpdate, Timeline},
847 };
848 use matrix_sdk_common::deserialized_responses::TimelineEvent;
849 use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE, BOB};
850 use ruma::{
851 event_id,
852 events::{
853 relation::RelationType, room::message::RoomMessageEventContentWithoutRelation,
854 AnySyncMessageLikeEvent, AnySyncTimelineEvent,
855 },
856 room_id, user_id, RoomId,
857 };
858
859 use crate::test_utils::{client::MockClientBuilder, logged_in_client};
860
861 #[async_test]
862 async fn test_event_with_redaction_relation() {
863 let original_id = event_id!("$original");
864 let related_id = event_id!("$related");
865 let room_id = room_id!("!galette:saucisse.bzh");
866 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
867
868 assert_relations(
869 room_id,
870 f.text_msg("Original event").event_id(original_id).into(),
871 f.redaction(original_id).event_id(related_id).into(),
872 f,
873 )
874 .await;
875 }
876
877 #[async_test]
878 async fn test_event_with_edit_relation() {
879 let original_id = event_id!("$original");
880 let related_id = event_id!("$related");
881 let room_id = room_id!("!galette:saucisse.bzh");
882 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
883
884 assert_relations(
885 room_id,
886 f.text_msg("Original event").event_id(original_id).into(),
887 f.text_msg("* An edited event")
888 .edit(
889 original_id,
890 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
891 )
892 .event_id(related_id)
893 .into(),
894 f,
895 )
896 .await;
897 }
898
899 #[async_test]
900 async fn test_event_with_reply_relation() {
901 let original_id = event_id!("$original");
902 let related_id = event_id!("$related");
903 let room_id = room_id!("!galette:saucisse.bzh");
904 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
905
906 assert_relations(
907 room_id,
908 f.text_msg("Original event").event_id(original_id).into(),
909 f.text_msg("A reply").reply_to(original_id).event_id(related_id).into(),
910 f,
911 )
912 .await;
913 }
914
915 #[async_test]
916 async fn test_event_with_thread_reply_relation() {
917 let original_id = event_id!("$original");
918 let related_id = event_id!("$related");
919 let room_id = room_id!("!galette:saucisse.bzh");
920 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
921
922 assert_relations(
923 room_id,
924 f.text_msg("Original event").event_id(original_id).into(),
925 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
926 f,
927 )
928 .await;
929 }
930
931 #[async_test]
932 async fn test_event_with_reaction_relation() {
933 let original_id = event_id!("$original");
934 let related_id = event_id!("$related");
935 let room_id = room_id!("!galette:saucisse.bzh");
936 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
937
938 assert_relations(
939 room_id,
940 f.text_msg("Original event").event_id(original_id).into(),
941 f.reaction(original_id, ":D").event_id(related_id).into(),
942 f,
943 )
944 .await;
945 }
946
947 #[async_test]
948 async fn test_event_with_poll_response_relation() {
949 let original_id = event_id!("$original");
950 let related_id = event_id!("$related");
951 let room_id = room_id!("!galette:saucisse.bzh");
952 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
953
954 assert_relations(
955 room_id,
956 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
957 .event_id(original_id)
958 .into(),
959 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
960 f,
961 )
962 .await;
963 }
964
965 #[async_test]
966 async fn test_event_with_poll_end_relation() {
967 let original_id = event_id!("$original");
968 let related_id = event_id!("$related");
969 let room_id = room_id!("!galette:saucisse.bzh");
970 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
971
972 assert_relations(
973 room_id,
974 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
975 .event_id(original_id)
976 .into(),
977 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
978 f,
979 )
980 .await;
981 }
982
983 #[async_test]
984 async fn test_event_with_filtered_relationships() {
985 let original_id = event_id!("$original");
986 let related_id = event_id!("$related");
987 let associated_related_id = event_id!("$recursive_related");
988 let room_id = room_id!("!galette:saucisse.bzh");
989 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
990
991 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
992 let related_event = event_factory
993 .text_msg("* Edited event")
994 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
995 .event_id(related_id)
996 .into();
997 let associated_related_event =
998 event_factory.redaction(related_id).event_id(associated_related_id).into();
999
1000 let client = logged_in_client(None).await;
1001
1002 let event_cache = client.event_cache();
1003 event_cache.subscribe().unwrap();
1004
1005 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1006 let room = client.get_room(room_id).unwrap();
1007
1008 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1009
1010 room_event_cache.save_event(original_event).await;
1012
1013 room_event_cache.save_event(related_event).await;
1015
1016 room_event_cache.save_event(associated_related_event).await;
1018
1019 let filter = Some(vec![RelationType::Replacement]);
1020 let (event, related_events) =
1021 room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1022 let cached_event_id = event.event_id().unwrap();
1024 assert_eq!(cached_event_id, original_id);
1025
1026 assert_eq!(related_events.len(), 2);
1028
1029 let related_event_id = related_events[0].event_id().unwrap();
1030 assert_eq!(related_event_id, related_id);
1031 let related_event_id = related_events[1].event_id().unwrap();
1032 assert_eq!(related_event_id, associated_related_id);
1033
1034 let filter = Some(vec![RelationType::Thread]);
1036 let (event, related_events) =
1037 room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1038 let cached_event_id = event.event_id().unwrap();
1040 assert_eq!(cached_event_id, original_id);
1041 assert!(related_events.is_empty());
1043 }
1044
1045 #[async_test]
1046 async fn test_event_with_recursive_relation() {
1047 let original_id = event_id!("$original");
1048 let related_id = event_id!("$related");
1049 let associated_related_id = event_id!("$recursive_related");
1050 let room_id = room_id!("!galette:saucisse.bzh");
1051 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1052
1053 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1054 let related_event = event_factory
1055 .text_msg("* Edited event")
1056 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1057 .event_id(related_id)
1058 .into();
1059 let associated_related_event =
1060 event_factory.redaction(related_id).event_id(associated_related_id).into();
1061
1062 let client = logged_in_client(None).await;
1063
1064 let event_cache = client.event_cache();
1065 event_cache.subscribe().unwrap();
1066
1067 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1068 let room = client.get_room(room_id).unwrap();
1069
1070 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1071
1072 room_event_cache.save_event(original_event).await;
1074
1075 room_event_cache.save_event(related_event).await;
1077
1078 room_event_cache.save_event(associated_related_event).await;
1080
1081 let (event, related_events) =
1082 room_event_cache.event_with_relations(original_id, None).await.unwrap();
1083 let cached_event_id = event.event_id().unwrap();
1085 assert_eq!(cached_event_id, original_id);
1086
1087 assert_eq!(related_events.len(), 2);
1089
1090 let related_event_id = related_events[0].event_id().unwrap();
1091 assert_eq!(related_event_id, related_id);
1092 let related_event_id = related_events[1].event_id().unwrap();
1093 assert_eq!(related_event_id, associated_related_id);
1094 }
1095
1096 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1098 async fn test_write_to_storage() {
1099 use matrix_sdk_base::linked_chunk::LinkedChunkBuilder;
1100
1101 let room_id = room_id!("!galette:saucisse.bzh");
1102 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1103
1104 let event_cache_store = Arc::new(MemoryStore::new());
1105
1106 let client = MockClientBuilder::new("http://localhost".to_owned())
1107 .store_config(
1108 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1109 )
1110 .build()
1111 .await;
1112
1113 let event_cache = client.event_cache();
1114
1115 event_cache.subscribe().unwrap();
1117 event_cache.enable_storage().unwrap();
1118
1119 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1120 let room = client.get_room(room_id).unwrap();
1121
1122 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1123
1124 let timeline = Timeline {
1126 limited: true,
1127 prev_batch: Some("raclette".to_owned()),
1128 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
1129 };
1130
1131 room_event_cache
1132 .inner
1133 .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1134 .await
1135 .unwrap();
1136
1137 let raws = event_cache_store.reload_linked_chunk(room_id).await.unwrap();
1138 let linked_chunk =
1139 LinkedChunkBuilder::<3, _, _>::from_raw_parts(raws).build().unwrap().unwrap();
1140
1141 assert_eq!(linked_chunk.chunks().count(), 3);
1142
1143 let mut chunks = linked_chunk.chunks();
1144
1145 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1147 assert_eq!(events.len(), 0)
1148 });
1149
1150 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
1152 assert_eq!(gap.prev_token, "raclette");
1153 });
1154
1155 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1157 assert_eq!(events.len(), 1);
1158 let deserialized = events[0].raw().deserialize().unwrap();
1159 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
1160 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
1161 });
1162
1163 assert!(chunks.next().is_none());
1165 }
1166
1167 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1169 async fn test_write_to_storage_strips_bundled_relations() {
1170 use matrix_sdk_base::linked_chunk::LinkedChunkBuilder;
1171 use ruma::events::BundledMessageLikeRelations;
1172
1173 let room_id = room_id!("!galette:saucisse.bzh");
1174 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1175
1176 let event_cache_store = Arc::new(MemoryStore::new());
1177
1178 let client = MockClientBuilder::new("http://localhost".to_owned())
1179 .store_config(
1180 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1181 )
1182 .build()
1183 .await;
1184
1185 let event_cache = client.event_cache();
1186
1187 event_cache.subscribe().unwrap();
1189 event_cache.enable_storage().unwrap();
1190
1191 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1192 let room = client.get_room(room_id).unwrap();
1193
1194 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1195
1196 let mut relations = BundledMessageLikeRelations::new();
1198 relations.replace =
1199 Some(Box::new(f.text_msg("Hello, Kind Sir").sender(*ALICE).into_raw_sync()));
1200 let ev = f.text_msg("hey yo").sender(*ALICE).bundled_relations(relations).into_event();
1201
1202 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1203
1204 room_event_cache
1205 .inner
1206 .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1207 .await
1208 .unwrap();
1209
1210 {
1212 let (events, _) = room_event_cache.subscribe().await.unwrap();
1213
1214 assert_eq!(events.len(), 1);
1215
1216 let ev = events[0].raw().deserialize().unwrap();
1217 assert_let!(
1218 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1219 );
1220
1221 let original = msg.as_original().unwrap();
1222 assert_eq!(original.content.body(), "hey yo");
1223 assert!(original.unsigned.relations.replace.is_some());
1224 }
1225
1226 let raws = event_cache_store.reload_linked_chunk(room_id).await.unwrap();
1228 let linked_chunk =
1229 LinkedChunkBuilder::<3, _, _>::from_raw_parts(raws).build().unwrap().unwrap();
1230
1231 assert_eq!(linked_chunk.chunks().count(), 1);
1232
1233 let mut chunks = linked_chunk.chunks();
1234 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1235 assert_eq!(events.len(), 1);
1236
1237 let ev = events[0].raw().deserialize().unwrap();
1238 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1239
1240 let original = msg.as_original().unwrap();
1241 assert_eq!(original.content.body(), "hey yo");
1242 assert!(original.unsigned.relations.replace.is_none());
1243 });
1244
1245 assert!(chunks.next().is_none());
1247 }
1248
1249 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1251 async fn test_clear() {
1252 use matrix_sdk_base::linked_chunk::LinkedChunkBuilder;
1253
1254 use crate::{assert_let_timeout, event_cache::RoomEventCacheUpdate};
1255
1256 let room_id = room_id!("!galette:saucisse.bzh");
1257 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1258
1259 let event_cache_store = Arc::new(MemoryStore::new());
1260
1261 let event_id1 = event_id!("$1");
1262 let event_id2 = event_id!("$2");
1263
1264 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1265 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1266
1267 event_cache_store
1269 .handle_linked_chunk_updates(
1270 room_id,
1271 vec![
1272 Update::NewItemsChunk {
1274 previous: None,
1275 new: ChunkIdentifier::new(0),
1276 next: None,
1277 },
1278 Update::NewGapChunk {
1280 previous: Some(ChunkIdentifier::new(0)),
1281 new: ChunkIdentifier::new(42),
1283 next: None,
1284 gap: Gap { prev_token: "cheddar".to_owned() },
1285 },
1286 Update::NewItemsChunk {
1288 previous: Some(ChunkIdentifier::new(42)),
1289 new: ChunkIdentifier::new(1),
1290 next: None,
1291 },
1292 Update::PushItems {
1293 at: Position::new(ChunkIdentifier::new(1), 0),
1294 items: vec![ev1.clone()],
1295 },
1296 Update::NewItemsChunk {
1298 previous: Some(ChunkIdentifier::new(1)),
1299 new: ChunkIdentifier::new(2),
1300 next: None,
1301 },
1302 Update::PushItems {
1303 at: Position::new(ChunkIdentifier::new(2), 0),
1304 items: vec![ev2.clone()],
1305 },
1306 ],
1307 )
1308 .await
1309 .unwrap();
1310
1311 let client = MockClientBuilder::new("http://localhost".to_owned())
1312 .store_config(
1313 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1314 )
1315 .build()
1316 .await;
1317
1318 let event_cache = client.event_cache();
1319
1320 event_cache.subscribe().unwrap();
1322 event_cache.enable_storage().unwrap();
1323
1324 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1325 let room = client.get_room(room_id).unwrap();
1326
1327 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1328
1329 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
1330
1331 assert!(room_event_cache.event(event_id1).await.is_some());
1333
1334 assert_eq!(items.len(), 2);
1336 assert_eq!(items[0].event_id().unwrap(), event_id1);
1337 assert_eq!(items[1].event_id().unwrap(), event_id2);
1338
1339 assert!(stream.is_empty());
1340
1341 room_event_cache.clear().await.unwrap();
1343
1344 assert_let_timeout!(Ok(RoomEventCacheUpdate::Clear) = stream.recv());
1346
1347 assert!(room_event_cache.event(event_id1).await.is_none());
1349
1350 let (items, _) = room_event_cache.subscribe().await.unwrap();
1351 assert!(items.is_empty());
1352
1353 let raws = event_cache_store.reload_linked_chunk(room_id).await.unwrap();
1355 let linked_chunk = LinkedChunkBuilder::<3, _, _>::from_raw_parts(raws).build().unwrap();
1356
1357 let linked_chunk = linked_chunk.unwrap();
1361 assert_eq!(linked_chunk.num_items(), 0);
1362 }
1363
1364 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1366 async fn test_load_from_storage() {
1367 let room_id = room_id!("!galette:saucisse.bzh");
1368 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1369
1370 let event_cache_store = Arc::new(MemoryStore::new());
1371
1372 let event_id1 = event_id!("$1");
1373 let event_id2 = event_id!("$2");
1374
1375 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1376 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1377
1378 event_cache_store
1380 .handle_linked_chunk_updates(
1381 room_id,
1382 vec![
1383 Update::NewItemsChunk {
1385 previous: None,
1386 new: ChunkIdentifier::new(0),
1387 next: None,
1388 },
1389 Update::NewGapChunk {
1391 previous: Some(ChunkIdentifier::new(0)),
1392 new: ChunkIdentifier::new(42),
1394 next: None,
1395 gap: Gap { prev_token: "cheddar".to_owned() },
1396 },
1397 Update::NewItemsChunk {
1399 previous: Some(ChunkIdentifier::new(42)),
1400 new: ChunkIdentifier::new(1),
1401 next: None,
1402 },
1403 Update::PushItems {
1404 at: Position::new(ChunkIdentifier::new(1), 0),
1405 items: vec![ev1.clone()],
1406 },
1407 Update::NewItemsChunk {
1409 previous: Some(ChunkIdentifier::new(1)),
1410 new: ChunkIdentifier::new(2),
1411 next: None,
1412 },
1413 Update::PushItems {
1414 at: Position::new(ChunkIdentifier::new(2), 0),
1415 items: vec![ev2.clone()],
1416 },
1417 ],
1418 )
1419 .await
1420 .unwrap();
1421
1422 let client = MockClientBuilder::new("http://localhost".to_owned())
1423 .store_config(
1424 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1425 )
1426 .build()
1427 .await;
1428
1429 let event_cache = client.event_cache();
1430
1431 event_cache.subscribe().unwrap();
1433 event_cache.enable_storage().unwrap();
1434
1435 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1436 let room = client.get_room(room_id).unwrap();
1437
1438 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1439
1440 let (items, _stream) = room_event_cache.subscribe().await.unwrap();
1441
1442 assert_eq!(items.len(), 2);
1444 assert_eq!(items[0].event_id().unwrap(), event_id1);
1445 assert_eq!(items[1].event_id().unwrap(), event_id2);
1446
1447 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
1449 room_event_cache
1450 .inner
1451 .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() })
1452 .await
1453 .unwrap();
1454
1455 let (items, _stream) = room_event_cache.subscribe().await.unwrap();
1460 assert_eq!(items.len(), 2);
1461 assert_eq!(items[0].event_id().unwrap(), event_id1);
1462 assert_eq!(items[1].event_id().unwrap(), event_id2);
1463 }
1464
1465 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1467 async fn test_load_from_storage_resilient_to_failure() {
1468 let room_id = room_id!("!galette:saucisse.bzh");
1469 let event_cache_store = Arc::new(MemoryStore::new());
1470
1471 event_cache_store
1473 .handle_linked_chunk_updates(
1474 room_id,
1475 vec![
1476 Update::NewItemsChunk {
1477 previous: None,
1478 new: ChunkIdentifier::new(0),
1479 next: None,
1480 },
1481 Update::NewItemsChunk {
1482 previous: Some(ChunkIdentifier::new(0)),
1483 new: ChunkIdentifier::new(1),
1484 next: Some(ChunkIdentifier::new(0)),
1485 },
1486 ],
1487 )
1488 .await
1489 .unwrap();
1490
1491 let client = MockClientBuilder::new("http://localhost".to_owned())
1492 .store_config(
1493 StoreConfig::new("holder".to_owned()).event_cache_store(event_cache_store.clone()),
1494 )
1495 .build()
1496 .await;
1497
1498 let event_cache = client.event_cache();
1499
1500 event_cache.subscribe().unwrap();
1502 event_cache.enable_storage().unwrap();
1503
1504 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1505 let room = client.get_room(room_id).unwrap();
1506
1507 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1508
1509 let (items, _stream) = room_event_cache.subscribe().await.unwrap();
1510
1511 assert!(items.is_empty());
1514
1515 let raw_chunks = event_cache_store.reload_linked_chunk(room_id).await.unwrap();
1518 assert!(raw_chunks.is_empty());
1519 }
1520
1521 #[cfg(not(target_arch = "wasm32"))] #[async_test]
1523 async fn test_no_useless_gaps() {
1524 let room_id = room_id!("!galette:saucisse.bzh");
1525
1526 let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
1527
1528 let event_cache = client.event_cache();
1529 event_cache.subscribe().unwrap();
1530
1531 let has_storage = true; event_cache.enable_storage().unwrap();
1533
1534 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1535 let room = client.get_room(room_id).unwrap();
1536 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1537
1538 let f = EventFactory::new().room(room_id).sender(*ALICE);
1539
1540 room_event_cache
1543 .inner
1544 .handle_joined_room_update(
1545 has_storage,
1546 JoinedRoomUpdate {
1547 timeline: Timeline {
1548 limited: true,
1549 prev_batch: Some("raclette".to_owned()),
1550 events: vec![f.text_msg("hey yo").into_event()],
1551 },
1552 ..Default::default()
1553 },
1554 )
1555 .await
1556 .unwrap();
1557
1558 {
1559 let state = room_event_cache.inner.state.read().await;
1560
1561 let mut num_gaps = 0;
1562 let mut num_events = 0;
1563
1564 for c in state.events().chunks() {
1565 match c.content() {
1566 ChunkContent::Items(items) => num_events += items.len(),
1567 ChunkContent::Gap(_) => num_gaps += 1,
1568 }
1569 }
1570
1571 assert_eq!(num_gaps, 1);
1573 assert_eq!(num_events, 1);
1574 }
1575
1576 room_event_cache
1579 .inner
1580 .handle_joined_room_update(
1581 has_storage,
1582 JoinedRoomUpdate {
1583 timeline: Timeline {
1584 limited: false,
1585 prev_batch: Some("fondue".to_owned()),
1586 events: vec![f.text_msg("sup").into_event()],
1587 },
1588 ..Default::default()
1589 },
1590 )
1591 .await
1592 .unwrap();
1593
1594 {
1595 let state = room_event_cache.inner.state.read().await;
1596
1597 let mut num_gaps = 0;
1598 let mut num_events = 0;
1599
1600 for c in state.events().chunks() {
1601 match c.content() {
1602 ChunkContent::Items(items) => num_events += items.len(),
1603 ChunkContent::Gap(gap) => {
1604 assert_eq!(gap.prev_token, "raclette");
1605 num_gaps += 1;
1606 }
1607 }
1608 }
1609
1610 assert_eq!(num_gaps, 1);
1612 assert_eq!(num_events, 2);
1613 }
1614 }
1615
1616 async fn assert_relations(
1617 room_id: &RoomId,
1618 original_event: TimelineEvent,
1619 related_event: TimelineEvent,
1620 event_factory: EventFactory,
1621 ) {
1622 let client = logged_in_client(None).await;
1623
1624 let event_cache = client.event_cache();
1625 event_cache.subscribe().unwrap();
1626
1627 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1628 let room = client.get_room(room_id).unwrap();
1629
1630 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1631
1632 let original_event_id = original_event.event_id().unwrap();
1634 room_event_cache.save_event(original_event).await;
1635
1636 let unrelated_id = event_id!("$2");
1638 room_event_cache
1639 .save_event(event_factory.text_msg("An unrelated event").event_id(unrelated_id).into())
1640 .await;
1641
1642 let related_id = related_event.event_id().unwrap();
1644 room_event_cache.save_event(related_event).await;
1645
1646 let (event, related_events) =
1647 room_event_cache.event_with_relations(&original_event_id, None).await.unwrap();
1648 let cached_event_id = event.event_id().unwrap();
1650 assert_eq!(cached_event_id, original_event_id);
1651
1652 assert_eq!(related_events.len(), 1);
1654 let related_event_id = related_events[0].event_id().unwrap();
1655 assert_eq!(related_event_id, related_id);
1656 }
1657}