1use std::{
18 collections::BTreeMap,
19 fmt,
20 ops::{Deref, DerefMut},
21 sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc,
24 },
25};
26
27use events::{sort_positions_descending, Gap};
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31 deserialized_responses::{AmbiguityChange, TimelineEvent},
32 linked_chunk::Position,
33 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
34};
35use ruma::{
36 events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
37 serde::Raw,
38 EventId, OwnedEventId, OwnedRoomId,
39};
40use tokio::sync::{
41 broadcast::{Receiver, Sender},
42 mpsc, Notify, RwLock,
43};
44use tracing::{instrument, trace, warn};
45
46use super::{
47 deduplicator::DeduplicationOutcome, AutoShrinkChannelPayload, EventsOrigin, Result,
48 RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
49};
50use crate::{client::WeakClient, room::WeakRoom};
51
52pub(super) mod events;
53
54#[derive(Clone)]
58pub struct RoomEventCache {
59 pub(super) inner: Arc<RoomEventCacheInner>,
60}
61
62impl fmt::Debug for RoomEventCache {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 f.debug_struct("RoomEventCache").finish_non_exhaustive()
65 }
66}
67
68#[allow(missing_debug_implementations)]
71pub struct RoomEventCacheListener {
72 recv: Receiver<RoomEventCacheUpdate>,
74
75 room_id: OwnedRoomId,
77
78 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
80
81 listener_count: Arc<AtomicUsize>,
83}
84
85impl Drop for RoomEventCacheListener {
86 fn drop(&mut self) {
87 let previous_listener_count = self.listener_count.fetch_sub(1, Ordering::SeqCst);
88
89 trace!("dropping a room event cache listener; previous count: {previous_listener_count}");
90
91 if previous_listener_count == 1 {
92 let mut room_id = self.room_id.clone();
96
97 let mut num_attempts = 0;
103
104 while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
105 num_attempts += 1;
106
107 if num_attempts > 1024 {
108 warn!("couldn't send notification to the auto-shrink channel after 1024 attempts; giving up");
111 return;
112 }
113
114 match err {
115 mpsc::error::TrySendError::Full(stolen_room_id) => {
116 room_id = stolen_room_id;
117 }
118 mpsc::error::TrySendError::Closed(_) => return,
119 }
120 }
121
122 trace!("sent notification to the parent channel that we were the last listener");
123 }
124 }
125}
126
127impl Deref for RoomEventCacheListener {
128 type Target = Receiver<RoomEventCacheUpdate>;
129
130 fn deref(&self) -> &Self::Target {
131 &self.recv
132 }
133}
134
135impl DerefMut for RoomEventCacheListener {
136 fn deref_mut(&mut self) -> &mut Self::Target {
137 &mut self.recv
138 }
139}
140
141impl RoomEventCache {
142 pub(super) fn new(
144 client: WeakClient,
145 state: RoomEventCacheState,
146 pagination_status: SharedObservable<RoomPaginationStatus>,
147 room_id: OwnedRoomId,
148 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
149 ) -> Self {
150 Self {
151 inner: Arc::new(RoomEventCacheInner::new(
152 client,
153 state,
154 pagination_status,
155 room_id,
156 auto_shrink_sender,
157 )),
158 }
159 }
160
161 pub async fn events(&self) -> Vec<TimelineEvent> {
166 let state = self.inner.state.read().await;
167
168 state.events().events().map(|(_position, item)| item.clone()).collect()
169 }
170
171 pub async fn subscribe(&self) -> (Vec<TimelineEvent>, RoomEventCacheListener) {
178 let state = self.inner.state.read().await;
179 let events = state.events().events().map(|(_position, item)| item.clone()).collect();
180
181 let previous_listener_count = state.listener_count.fetch_add(1, Ordering::SeqCst);
182 trace!("added a room event cache listener; new count: {}", previous_listener_count + 1);
183
184 let recv = self.inner.sender.subscribe();
185 let listener = RoomEventCacheListener {
186 recv,
187 room_id: self.inner.room_id.clone(),
188 auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
189 listener_count: state.listener_count.clone(),
190 };
191
192 (events, listener)
193 }
194
195 pub fn pagination(&self) -> RoomPagination {
198 RoomPagination { inner: self.inner.clone() }
199 }
200
201 pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
203 self.inner
204 .state
205 .read()
206 .await
207 .find_event(event_id)
208 .await
209 .ok()
210 .flatten()
211 .map(|(_loc, event)| event)
212 }
213
214 pub async fn event_with_relations(
219 &self,
220 event_id: &EventId,
221 filter: Option<Vec<RelationType>>,
222 ) -> Option<(TimelineEvent, Vec<TimelineEvent>)> {
223 self.inner
225 .state
226 .read()
227 .await
228 .find_event_with_relations(event_id, filter.clone())
229 .await
230 .ok()
231 .flatten()
232 }
233
234 pub async fn clear(&self) -> Result<()> {
239 let updates_as_vector_diffs = self.inner.state.write().await.reset().await?;
241
242 let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
244 diffs: updates_as_vector_diffs,
245 origin: EventsOrigin::Cache,
246 });
247
248 Ok(())
249 }
250
251 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = TimelineEvent>) {
254 if let Err(err) = self.inner.state.write().await.save_event(events).await {
255 warn!("couldn't save event in the event cache: {err}");
256 }
257 }
258
259 pub async fn debug_string(&self) -> Vec<String> {
262 self.inner.state.read().await.events().debug_string()
263 }
264}
265
266pub(super) struct RoomEventCacheInner {
268 room_id: OwnedRoomId,
270
271 pub weak_room: WeakRoom,
272
273 pub sender: Sender<RoomEventCacheUpdate>,
275
276 pub state: RwLock<RoomEventCacheState>,
278
279 pub pagination_batch_token_notifier: Notify,
281
282 pub pagination_status: SharedObservable<RoomPaginationStatus>,
283
284 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
289}
290
291impl RoomEventCacheInner {
292 fn new(
295 client: WeakClient,
296 state: RoomEventCacheState,
297 pagination_status: SharedObservable<RoomPaginationStatus>,
298 room_id: OwnedRoomId,
299 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
300 ) -> Self {
301 let sender = Sender::new(32);
302 let weak_room = WeakRoom::new(client, room_id);
303 Self {
304 room_id: weak_room.room_id().to_owned(),
305 weak_room,
306 state: RwLock::new(state),
307 sender,
308 pagination_batch_token_notifier: Default::default(),
309 auto_shrink_sender,
310 pagination_status,
311 }
312 }
313
314 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
315 if account_data.is_empty() {
316 return;
317 }
318
319 let mut handled_read_marker = false;
320
321 trace!("Handling account data");
322
323 for raw_event in account_data {
324 match raw_event.deserialize() {
325 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
326 if handled_read_marker {
329 continue;
330 }
331
332 handled_read_marker = true;
333
334 let _ = self.sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
336 event_id: ev.content.event_id,
337 });
338 }
339
340 Ok(_) => {
341 }
344
345 Err(e) => {
346 let event_type = raw_event.get_field::<String>("type").ok().flatten();
347 warn!(event_type, "Failed to deserialize account data: {e}");
348 }
349 }
350 }
351 }
352
353 #[instrument(skip_all, fields(room_id = %self.room_id))]
354 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
355 self.handle_timeline(
356 updates.timeline,
357 updates.ephemeral.clone(),
358 updates.ambiguity_changes,
359 )
360 .await?;
361
362 self.handle_account_data(updates.account_data);
363
364 Ok(())
365 }
366
367 #[instrument(skip_all, fields(room_id = %self.room_id))]
368 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
369 self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
370 Ok(())
371 }
372
373 async fn handle_timeline(
374 &self,
375 timeline: Timeline,
376 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
377 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
378 ) -> Result<()> {
379 let mut prev_batch = timeline.prev_batch;
380 if timeline.events.is_empty()
381 && prev_batch.is_none()
382 && ephemeral_events.is_empty()
383 && ambiguity_changes.is_empty()
384 {
385 return Ok(());
386 }
387
388 trace!("adding new events");
390
391 let mut state = self.state.write().await;
392
393 if !timeline.limited && state.events().events().next().is_some() {
400 prev_batch = None;
401 }
402
403 let (
404 DeduplicationOutcome {
405 all_events: events,
406 in_memory_duplicated_event_ids,
407 in_store_duplicated_event_ids,
408 },
409 all_duplicates,
410 ) = state.collect_valid_and_duplicated_events(timeline.events).await?;
411
412 let timeline_event_diffs = if all_duplicates {
417 vec![]
419 } else {
420 let mut timeline_event_diffs = state
426 .remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
427 .await?;
428
429 let new_timeline_event_diffs = state
432 .with_events_mut(true, |room_events| {
433 if !all_duplicates {
438 if let Some(prev_token) = &prev_batch {
439 let prev_chunk_to_remove =
442 room_events.rchunks().next().and_then(|chunk| {
443 (chunk.is_items() && chunk.num_items() == 0)
444 .then_some(chunk.identifier())
445 });
446
447 room_events.push_gap(Gap { prev_token: prev_token.clone() });
448
449 if let Some(prev_chunk_to_remove) = prev_chunk_to_remove {
450 room_events.remove_empty_chunk_at(prev_chunk_to_remove).expect(
451 "we just checked the chunk is there, and it's an empty item chunk",
452 );
453 }
454 }
455 }
456
457 room_events.push_events(events.clone());
458
459 events.clone()
460 })
461 .await?;
462
463 timeline_event_diffs.extend(new_timeline_event_diffs);
464
465 if timeline.limited && prev_batch.is_some() && !all_duplicates {
466 if let Some(diffs) = state.shrink_to_last_chunk().await? {
474 timeline_event_diffs = diffs;
477 }
478 }
479
480 timeline_event_diffs
481 };
482
483 if prev_batch.is_some() {
486 self.pagination_batch_token_notifier.notify_one();
487 }
488
489 {
491 if !timeline_event_diffs.is_empty() {
492 let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
493 diffs: timeline_event_diffs,
494 origin: EventsOrigin::Sync,
495 });
496 }
497
498 if !ephemeral_events.is_empty() {
499 let _ = self
500 .sender
501 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
502 }
503
504 if !ambiguity_changes.is_empty() {
505 let _ = self.sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
506 }
507 }
508
509 Ok(())
510 }
511}
512
513#[derive(Debug)]
516pub(super) enum LoadMoreEventsBackwardsOutcome {
517 Gap {
519 prev_token: Option<String>,
522 },
523
524 StartOfTimeline,
526
527 Events {
529 events: Vec<TimelineEvent>,
530 timeline_event_diffs: Vec<VectorDiff<TimelineEvent>>,
531 reached_start: bool,
532 },
533
534 WaitForInitialPrevToken,
536}
537
538mod private {
540 use std::{
541 collections::HashSet,
542 sync::{atomic::AtomicUsize, Arc},
543 };
544
545 use eyeball::SharedObservable;
546 use eyeball_im::VectorDiff;
547 use matrix_sdk_base::{
548 apply_redaction,
549 deserialized_responses::{
550 ThreadSummary, ThreadSummaryStatus, TimelineEvent, TimelineEventKind,
551 },
552 event_cache::{store::EventCacheStoreLock, Event, Gap},
553 linked_chunk::{
554 lazy_loader, ChunkContent, ChunkIdentifierGenerator, LinkedChunkId, Position, Update,
555 },
556 serde_helpers::extract_thread_root,
557 };
558 use matrix_sdk_common::executor::spawn;
559 use ruma::{
560 events::{
561 relation::RelationType, room::redaction::SyncRoomRedactionEvent, AnySyncTimelineEvent,
562 MessageLikeEventType,
563 },
564 serde::Raw,
565 EventId, OwnedEventId, OwnedRoomId, RoomVersionId,
566 };
567 use tracing::{debug, error, instrument, trace, warn};
568
569 use super::{
570 super::{
571 deduplicator::{DeduplicationOutcome, Deduplicator},
572 EventCacheError,
573 },
574 events::RoomEvents,
575 sort_positions_descending, EventLocation, LoadMoreEventsBackwardsOutcome,
576 };
577 use crate::event_cache::RoomPaginationStatus;
578
579 pub struct RoomEventCacheState {
584 room: OwnedRoomId,
586
587 room_version: RoomVersionId,
589
590 store: EventCacheStoreLock,
592
593 events: RoomEvents,
595
596 deduplicator: Deduplicator,
598
599 pub waited_for_initial_prev_token: bool,
604
605 pagination_status: SharedObservable<RoomPaginationStatus>,
606
607 pub(super) listener_count: Arc<AtomicUsize>,
610 }
611
612 impl RoomEventCacheState {
613 pub async fn new(
623 room_id: OwnedRoomId,
624 room_version: RoomVersionId,
625 store: EventCacheStoreLock,
626 pagination_status: SharedObservable<RoomPaginationStatus>,
627 ) -> Result<Self, EventCacheError> {
628 let store_lock = store.lock().await?;
629
630 let linked_chunk_id = LinkedChunkId::Room(&room_id);
631 let linked_chunk = match store_lock
632 .load_last_chunk(linked_chunk_id)
633 .await
634 .map_err(EventCacheError::from)
635 .and_then(|(last_chunk, chunk_identifier_generator)| {
636 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
637 .map_err(EventCacheError::from)
638 }) {
639 Ok(linked_chunk) => linked_chunk,
640
641 Err(err) => {
642 error!("error when reloading a linked chunk from memory: {err}");
643
644 store_lock
646 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
647 .await?;
648
649 None
651 }
652 };
653
654 let events = RoomEvents::with_initial_linked_chunk(linked_chunk);
655 let deduplicator = Deduplicator::new(room_id.clone(), store.clone());
656
657 Ok(Self {
658 room: room_id,
659 room_version,
660 store,
661 events,
662 deduplicator,
663 waited_for_initial_prev_token: false,
664 listener_count: Default::default(),
665 pagination_status,
666 })
667 }
668
669 pub async fn collect_valid_and_duplicated_events(
696 &mut self,
697 events: Vec<Event>,
698 ) -> Result<(DeduplicationOutcome, bool), EventCacheError> {
699 let deduplication_outcome =
700 self.deduplicator.filter_duplicate_events(events, &self.events).await?;
701
702 let number_of_events = deduplication_outcome.all_events.len();
703 let number_of_deduplicated_events =
704 deduplication_outcome.in_memory_duplicated_event_ids.len()
705 + deduplication_outcome.in_store_duplicated_event_ids.len();
706
707 let all_duplicates =
708 number_of_events > 0 && number_of_events == number_of_deduplicated_events;
709
710 Ok((deduplication_outcome, all_duplicates))
711 }
712
713 fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
716 if self.events.events().next().is_some() {
721 trace!("chunk is fully loaded and non-empty: reached_start=true");
724 LoadMoreEventsBackwardsOutcome::StartOfTimeline
725 } else if !self.waited_for_initial_prev_token {
726 LoadMoreEventsBackwardsOutcome::WaitForInitialPrevToken
728 } else {
729 LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
733 }
734 }
735
736 pub(in super::super) async fn load_more_events_backwards(
738 &mut self,
739 ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
740 if let Some(prev_token) = self.events.rgap().map(|gap| gap.prev_token) {
743 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
744 }
745
746 let first_chunk_identifier =
749 self.events.chunks().next().expect("a linked chunk is never empty").identifier();
750
751 let store = self.store.lock().await?;
752
753 let linked_chunk_id = LinkedChunkId::Room(&self.room);
755 let new_first_chunk = match store
756 .load_previous_chunk(linked_chunk_id, first_chunk_identifier)
757 .await
758 {
759 Ok(Some(new_first_chunk)) => {
760 new_first_chunk
762 }
763
764 Ok(None) => {
765 return Ok(self.conclude_load_more_for_fully_loaded_chunk());
767 }
768
769 Err(err) => {
770 error!("error when loading the previous chunk of a linked chunk: {err}");
771
772 store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
774
775 return Err(err.into());
777 }
778 };
779
780 let chunk_content = new_first_chunk.content.clone();
781
782 let reached_start = new_first_chunk.previous.is_none();
788
789 if let Err(err) = self.events.insert_new_chunk_as_first(new_first_chunk) {
790 error!("error when inserting the previous chunk into its linked chunk: {err}");
791
792 store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?;
794
795 return Err(err.into());
797 };
798
799 let _ = self.events.store_updates().take();
802
803 let timeline_event_diffs = self.events.updates_as_vector_diffs();
805
806 Ok(match chunk_content {
807 ChunkContent::Gap(gap) => {
808 trace!("reloaded chunk from disk (gap)");
809 LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
810 }
811
812 ChunkContent::Items(events) => {
813 trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
814 LoadMoreEventsBackwardsOutcome::Events {
815 events,
816 timeline_event_diffs,
817 reached_start,
818 }
819 }
820 })
821 }
822
823 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
832 pub(super) async fn shrink_to_last_chunk(
833 &mut self,
834 ) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
835 let store_lock = self.store.lock().await?;
836
837 let linked_chunk_id = LinkedChunkId::Room(&self.room);
839 let (last_chunk, chunk_identifier_generator) =
840 match store_lock.load_last_chunk(linked_chunk_id).await {
841 Ok(pair) => pair,
842
843 Err(err) => {
844 error!("error when reloading a linked chunk from memory: {err}");
846
847 store_lock
849 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
850 .await?;
851
852 (None, ChunkIdentifierGenerator::new_from_scratch())
854 }
855 };
856
857 debug!("unloading the linked chunk, and resetting it to its last chunk");
858
859 if let Err(err) = self.events.replace_with(last_chunk, chunk_identifier_generator) {
862 error!("error when replacing the linked chunk: {err}");
863 return self.reset().await.map(Some);
864 }
865
866 self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
870
871 let _ = self.events.store_updates().take();
874
875 let diffs = self.events.updates_as_vector_diffs();
878 assert!(matches!(diffs[0], VectorDiff::Clear));
879
880 Ok(Some(diffs))
881 }
882
883 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
886 pub(crate) async fn auto_shrink_if_no_listeners(
887 &mut self,
888 ) -> Result<Option<Vec<VectorDiff<TimelineEvent>>>, EventCacheError> {
889 let listener_count = self.listener_count.load(std::sync::atomic::Ordering::SeqCst);
890
891 trace!(listener_count, "received request to auto-shrink");
892
893 if listener_count == 0 {
894 self.shrink_to_last_chunk().await
897 } else {
898 Ok(None)
899 }
900 }
901
902 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
906 let mut closure = || -> Option<()> {
910 let mut val: serde_json::Value = event.deserialize_as().ok()?;
911 let unsigned = val.get_mut("unsigned")?;
912 let unsigned_obj = unsigned.as_object_mut()?;
913 if unsigned_obj.remove("m.relations").is_some() {
914 *event = Raw::new(&val).ok()?.cast();
915 }
916 None
917 };
918 let _ = closure();
919 }
920
921 fn strip_relations_from_event(ev: &mut TimelineEvent) {
922 match &mut ev.kind {
923 TimelineEventKind::Decrypted(decrypted) => {
924 decrypted.unsigned_encryption_info = None;
927
928 Self::strip_relations_if_present(&mut decrypted.event);
930 }
931
932 TimelineEventKind::UnableToDecrypt { event, .. }
933 | TimelineEventKind::PlainText { event } => {
934 Self::strip_relations_if_present(event);
935 }
936 }
937 }
938
939 fn strip_relations_from_events(items: &mut [TimelineEvent]) {
941 for ev in items.iter_mut() {
942 Self::strip_relations_from_event(ev);
943 }
944 }
945
946 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
952 #[instrument(skip_all)]
953 pub(crate) async fn remove_events(
954 &mut self,
955 in_memory_events: Vec<(OwnedEventId, Position)>,
956 in_store_events: Vec<(OwnedEventId, Position)>,
957 ) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
958 if !in_store_events.is_empty() {
960 let mut positions = in_store_events
961 .into_iter()
962 .map(|(_event_id, position)| position)
963 .collect::<Vec<_>>();
964
965 sort_positions_descending(&mut positions);
966
967 self.send_updates_to_store(
968 positions
969 .into_iter()
970 .map(|position| Update::RemoveItem { at: position })
971 .collect(),
972 )
973 .await?;
974 }
975
976 if in_memory_events.is_empty() {
978 return Ok(Vec::new());
980 }
981
982 self.events
984 .remove_events_by_position(
985 in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
986 )
987 .expect("failed to remove an event");
988
989 self.propagate_changes().await?;
990
991 Ok(self.events.updates_as_vector_diffs())
992 }
993
994 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
996 let updates = self.events.store_updates().take();
997 self.send_updates_to_store(updates).await
998 }
999
1000 pub async fn send_updates_to_store(
1001 &mut self,
1002 mut updates: Vec<Update<TimelineEvent, Gap>>,
1003 ) -> Result<(), EventCacheError> {
1004 if updates.is_empty() {
1005 return Ok(());
1006 }
1007
1008 for update in updates.iter_mut() {
1010 match update {
1011 Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
1012 Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item),
1013 Update::NewItemsChunk { .. }
1015 | Update::NewGapChunk { .. }
1016 | Update::RemoveChunk(_)
1017 | Update::RemoveItem { .. }
1018 | Update::DetachLastItems { .. }
1019 | Update::StartReattachItems
1020 | Update::EndReattachItems
1021 | Update::Clear => {}
1022 }
1023 }
1024
1025 let store = self.store.clone();
1032 let room_id = self.room.clone();
1033
1034 spawn(async move {
1035 let store = store.lock().await?;
1036
1037 trace!(?updates, "sending linked chunk updates to the store");
1038 let linked_chunk_id = LinkedChunkId::Room(&room_id);
1039 store.handle_linked_chunk_updates(linked_chunk_id, updates).await?;
1040 trace!("linked chunk updates applied");
1041
1042 super::Result::Ok(())
1043 })
1044 .await
1045 .expect("joining failed")?;
1046
1047 Ok(())
1048 }
1049
1050 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1056 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError> {
1057 self.events.reset();
1058
1059 self.propagate_changes().await?;
1060
1061 self.waited_for_initial_prev_token = false;
1065 self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1067
1068 let diff_updates = self.events.updates_as_vector_diffs();
1069
1070 debug_assert_eq!(diff_updates.len(), 1);
1072 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1073
1074 Ok(diff_updates)
1075 }
1076
1077 pub fn events(&self) -> &RoomEvents {
1079 &self.events
1080 }
1081
1082 pub async fn find_event(
1087 &self,
1088 event_id: &EventId,
1089 ) -> Result<Option<(EventLocation, TimelineEvent)>, EventCacheError> {
1090 for (position, event) in self.events().revents() {
1093 if event.event_id().as_deref() == Some(event_id) {
1094 return Ok(Some((EventLocation::Memory(position), event.clone())));
1095 }
1096 }
1097
1098 let store = self.store.lock().await?;
1099
1100 Ok(store
1101 .find_event(&self.room, event_id)
1102 .await?
1103 .map(|event| (EventLocation::Store, event)))
1104 }
1105
1106 pub async fn find_event_with_relations(
1112 &self,
1113 event_id: &EventId,
1114 filters: Option<Vec<RelationType>>,
1115 ) -> Result<Option<(TimelineEvent, Vec<TimelineEvent>)>, EventCacheError> {
1116 let store = self.store.lock().await?;
1117
1118 let found = store.find_event(&self.room, event_id).await?;
1120
1121 let Some(target) = found else {
1122 return Ok(None);
1124 };
1125
1126 let mut related =
1129 store.find_event_relations(&self.room, event_id, filters.as_deref()).await?;
1130 let mut stack = related.iter().filter_map(|event| event.event_id()).collect::<Vec<_>>();
1131
1132 let mut already_seen = HashSet::new();
1135 already_seen.insert(event_id.to_owned());
1136
1137 let mut num_iters = 1;
1138
1139 while let Some(event_id) = stack.pop() {
1141 if !already_seen.insert(event_id.clone()) {
1142 continue;
1144 }
1145
1146 let other_related =
1147 store.find_event_relations(&self.room, &event_id, filters.as_deref()).await?;
1148
1149 stack.extend(other_related.iter().filter_map(|event| event.event_id()));
1150 related.extend(other_related);
1151
1152 num_iters += 1;
1153 }
1154
1155 trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
1156
1157 Ok(Some((target, related)))
1158 }
1159
1160 #[must_use = "Updates as `VectorDiff` must probably be propagated via `RoomEventCacheUpdate`"]
1171 #[instrument(skip_all, fields(room_id = %self.room))]
1172 pub async fn with_events_mut<F>(
1173 &mut self,
1174 is_live_sync: bool,
1175 func: F,
1176 ) -> Result<Vec<VectorDiff<TimelineEvent>>, EventCacheError>
1177 where
1178 F: FnOnce(&mut RoomEvents) -> Vec<TimelineEvent>,
1179 {
1180 let events_to_post_process = func(&mut self.events);
1181
1182 self.propagate_changes().await?;
1184
1185 for event in events_to_post_process {
1186 self.maybe_apply_new_redaction(&event).await?;
1187
1188 self.analyze_thread_root(&event, is_live_sync).await?;
1189
1190 if let Some(bundled_thread) = event.bundled_latest_thread_event {
1192 self.save_event([*bundled_thread]).await?;
1193 }
1194 }
1195
1196 if !self.waited_for_initial_prev_token
1199 && self.events.chunks().any(|chunk| chunk.is_gap())
1200 {
1201 self.waited_for_initial_prev_token = true;
1202 }
1203
1204 let updates_as_vector_diffs = self.events.updates_as_vector_diffs();
1205
1206 Ok(updates_as_vector_diffs)
1207 }
1208
1209 #[instrument(skip_all)]
1212 async fn analyze_thread_root(
1213 &mut self,
1214 event: &Event,
1215 is_live_sync: bool,
1216 ) -> Result<(), EventCacheError> {
1217 let Some(thread_root) = extract_thread_root(event.raw()) else {
1218 return Ok(());
1220 };
1221
1222 let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1225 trace!("thread root event is missing from the linked chunk");
1226 return Ok(());
1227 };
1228
1229 let num_replies = {
1236 let store_guard = &*self.store.lock().await?;
1237 let related_thread_events = store_guard
1238 .find_event_relations(&self.room, &thread_root, Some(&[RelationType::Thread]))
1239 .await?;
1240 related_thread_events.len()
1241 };
1242
1243 let prev_summary = target_event.thread_summary.summary();
1244 let mut latest_reply =
1245 prev_summary.as_ref().and_then(|summary| summary.latest_reply.clone());
1246
1247 if is_live_sync || latest_reply.is_none() {
1263 latest_reply = event.event_id();
1264 }
1265
1266 let new_summary = ThreadSummary { num_replies, latest_reply };
1267
1268 if prev_summary == Some(&new_summary) {
1269 trace!("thread summary is already up-to-date");
1270 return Ok(());
1271 }
1272
1273 target_event.thread_summary = ThreadSummaryStatus::Some(new_summary);
1275 self.replace_event_at(location, target_event).await?;
1276
1277 Ok(())
1278 }
1279
1280 async fn replace_event_at(
1287 &mut self,
1288 location: EventLocation,
1289 event: TimelineEvent,
1290 ) -> Result<(), EventCacheError> {
1291 match location {
1292 EventLocation::Memory(position) => {
1293 self.events
1294 .replace_event_at(position, event)
1295 .expect("should have been a valid position of an item");
1296 self.propagate_changes().await?;
1299 }
1300 EventLocation::Store => {
1301 self.save_event([event]).await?;
1302 }
1303 }
1304
1305 Ok(())
1306 }
1307
1308 #[instrument(skip_all)]
1312 async fn maybe_apply_new_redaction(
1313 &mut self,
1314 event: &Event,
1315 ) -> Result<(), EventCacheError> {
1316 let raw_event = event.raw();
1317
1318 let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1321 raw_event.get_field::<MessageLikeEventType>("type")
1322 else {
1323 return Ok(());
1324 };
1325
1326 let Ok(AnySyncTimelineEvent::MessageLike(
1329 ruma::events::AnySyncMessageLikeEvent::RoomRedaction(redaction),
1330 )) = event.raw().deserialize()
1331 else {
1332 return Ok(());
1333 };
1334
1335 let Some(event_id) = redaction.redacts(&self.room_version) else {
1336 warn!("missing target event id from the redaction event");
1337 return Ok(());
1338 };
1339
1340 if let Some((location, mut target_event)) = self.find_event(event_id).await? {
1342 if let Ok(deserialized) = target_event.raw().deserialize() {
1344 match deserialized {
1345 AnySyncTimelineEvent::MessageLike(ev) => {
1346 if ev.is_redacted() {
1347 return Ok(());
1348 }
1349 }
1350 AnySyncTimelineEvent::State(ev) => {
1351 if ev.is_redacted() {
1352 return Ok(());
1353 }
1354 }
1355 }
1356 }
1357
1358 if let Some(redacted_event) = apply_redaction(
1359 target_event.raw(),
1360 event.raw().cast_ref::<SyncRoomRedactionEvent>(),
1361 &self.room_version,
1362 ) {
1363 target_event.replace_raw(redacted_event.cast());
1368
1369 self.replace_event_at(location, target_event).await?;
1370 }
1371 } else {
1372 trace!("redacted event is missing from the linked chunk");
1373 }
1374
1375 Ok(())
1376 }
1377
1378 pub async fn save_event(
1385 &self,
1386 events: impl IntoIterator<Item = TimelineEvent>,
1387 ) -> Result<(), EventCacheError> {
1388 let store = self.store.clone();
1389 let room_id = self.room.clone();
1390 let events = events.into_iter().collect::<Vec<_>>();
1391
1392 spawn(async move {
1394 let store = store.lock().await?;
1395 for event in events {
1396 store.save_event(&room_id, event).await?;
1397 }
1398 super::Result::Ok(())
1399 })
1400 .await
1401 .expect("joining failed")?;
1402
1403 Ok(())
1404 }
1405 }
1406}
1407
1408pub(super) enum EventLocation {
1410 Memory(Position),
1412
1413 Store,
1415}
1416
1417pub(super) use private::RoomEventCacheState;
1418
1419#[cfg(test)]
1420mod tests {
1421
1422 use matrix_sdk_common::deserialized_responses::TimelineEvent;
1423 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1424 use ruma::{
1425 event_id,
1426 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
1427 room_id, user_id, RoomId,
1428 };
1429
1430 use crate::test_utils::logged_in_client;
1431
1432 #[async_test]
1433 async fn test_event_with_edit_relation() {
1434 let original_id = event_id!("$original");
1435 let related_id = event_id!("$related");
1436 let room_id = room_id!("!galette:saucisse.bzh");
1437 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1438
1439 assert_relations(
1440 room_id,
1441 f.text_msg("Original event").event_id(original_id).into(),
1442 f.text_msg("* An edited event")
1443 .edit(
1444 original_id,
1445 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
1446 )
1447 .event_id(related_id)
1448 .into(),
1449 f,
1450 )
1451 .await;
1452 }
1453
1454 #[async_test]
1455 async fn test_event_with_thread_reply_relation() {
1456 let original_id = event_id!("$original");
1457 let related_id = event_id!("$related");
1458 let room_id = room_id!("!galette:saucisse.bzh");
1459 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1460
1461 assert_relations(
1462 room_id,
1463 f.text_msg("Original event").event_id(original_id).into(),
1464 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
1465 f,
1466 )
1467 .await;
1468 }
1469
1470 #[async_test]
1471 async fn test_event_with_reaction_relation() {
1472 let original_id = event_id!("$original");
1473 let related_id = event_id!("$related");
1474 let room_id = room_id!("!galette:saucisse.bzh");
1475 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1476
1477 assert_relations(
1478 room_id,
1479 f.text_msg("Original event").event_id(original_id).into(),
1480 f.reaction(original_id, ":D").event_id(related_id).into(),
1481 f,
1482 )
1483 .await;
1484 }
1485
1486 #[async_test]
1487 async fn test_event_with_poll_response_relation() {
1488 let original_id = event_id!("$original");
1489 let related_id = event_id!("$related");
1490 let room_id = room_id!("!galette:saucisse.bzh");
1491 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1492
1493 assert_relations(
1494 room_id,
1495 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1496 .event_id(original_id)
1497 .into(),
1498 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
1499 f,
1500 )
1501 .await;
1502 }
1503
1504 #[async_test]
1505 async fn test_event_with_poll_end_relation() {
1506 let original_id = event_id!("$original");
1507 let related_id = event_id!("$related");
1508 let room_id = room_id!("!galette:saucisse.bzh");
1509 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1510
1511 assert_relations(
1512 room_id,
1513 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
1514 .event_id(original_id)
1515 .into(),
1516 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
1517 f,
1518 )
1519 .await;
1520 }
1521
1522 #[async_test]
1523 async fn test_event_with_filtered_relationships() {
1524 let original_id = event_id!("$original");
1525 let related_id = event_id!("$related");
1526 let associated_related_id = event_id!("$recursive_related");
1527 let room_id = room_id!("!galette:saucisse.bzh");
1528 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1529
1530 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1531 let related_event = event_factory
1532 .text_msg("* Edited event")
1533 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1534 .event_id(related_id)
1535 .into();
1536 let associated_related_event =
1537 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
1538
1539 let client = logged_in_client(None).await;
1540
1541 let event_cache = client.event_cache();
1542 event_cache.subscribe().unwrap();
1543
1544 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1545 let room = client.get_room(room_id).unwrap();
1546
1547 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1548
1549 room_event_cache.save_events([original_event]).await;
1551
1552 room_event_cache.save_events([related_event]).await;
1554
1555 room_event_cache.save_events([associated_related_event]).await;
1557
1558 let filter = Some(vec![RelationType::Replacement]);
1559 let (event, related_events) =
1560 room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1561 let cached_event_id = event.event_id().unwrap();
1563 assert_eq!(cached_event_id, original_id);
1564
1565 assert_eq!(related_events.len(), 1);
1567
1568 let related_event_id = related_events[0].event_id().unwrap();
1569 assert_eq!(related_event_id, related_id);
1570
1571 let filter = Some(vec![RelationType::Thread]);
1573 let (event, related_events) =
1574 room_event_cache.event_with_relations(original_id, filter).await.unwrap();
1575 let cached_event_id = event.event_id().unwrap();
1577 assert_eq!(cached_event_id, original_id);
1578 assert!(related_events.is_empty());
1580 }
1581
1582 #[async_test]
1583 async fn test_event_with_recursive_relation() {
1584 let original_id = event_id!("$original");
1585 let related_id = event_id!("$related");
1586 let associated_related_id = event_id!("$recursive_related");
1587 let room_id = room_id!("!galette:saucisse.bzh");
1588 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1589
1590 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
1591 let related_event = event_factory
1592 .text_msg("* Edited event")
1593 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
1594 .event_id(related_id)
1595 .into();
1596 let associated_related_event =
1597 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
1598
1599 let client = logged_in_client(None).await;
1600
1601 let event_cache = client.event_cache();
1602 event_cache.subscribe().unwrap();
1603
1604 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1605 let room = client.get_room(room_id).unwrap();
1606
1607 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1608
1609 room_event_cache.save_events([original_event]).await;
1611
1612 room_event_cache.save_events([related_event]).await;
1614
1615 room_event_cache.save_events([associated_related_event]).await;
1617
1618 let (event, related_events) =
1619 room_event_cache.event_with_relations(original_id, None).await.unwrap();
1620 let cached_event_id = event.event_id().unwrap();
1622 assert_eq!(cached_event_id, original_id);
1623
1624 assert_eq!(related_events.len(), 2);
1626
1627 let related_event_id = related_events[0].event_id().unwrap();
1628 assert_eq!(related_event_id, related_id);
1629 let related_event_id = related_events[1].event_id().unwrap();
1630 assert_eq!(related_event_id, associated_related_id);
1631 }
1632
1633 async fn assert_relations(
1634 room_id: &RoomId,
1635 original_event: TimelineEvent,
1636 related_event: TimelineEvent,
1637 event_factory: EventFactory,
1638 ) {
1639 let client = logged_in_client(None).await;
1640
1641 let event_cache = client.event_cache();
1642 event_cache.subscribe().unwrap();
1643
1644 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1645 let room = client.get_room(room_id).unwrap();
1646
1647 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1648
1649 let original_event_id = original_event.event_id().unwrap();
1651 room_event_cache.save_events([original_event]).await;
1652
1653 let unrelated_id = event_id!("$2");
1655 room_event_cache
1656 .save_events([event_factory
1657 .text_msg("An unrelated event")
1658 .event_id(unrelated_id)
1659 .into()])
1660 .await;
1661
1662 let related_id = related_event.event_id().unwrap();
1664 room_event_cache.save_events([related_event]).await;
1665
1666 let (event, related_events) =
1667 room_event_cache.event_with_relations(&original_event_id, None).await.unwrap();
1668 let cached_event_id = event.event_id().unwrap();
1670 assert_eq!(cached_event_id, original_event_id);
1671
1672 let related_event_id = related_events[0].event_id().unwrap();
1674 assert_eq!(related_event_id, related_id);
1675 }
1676}
1677
1678#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
1680 use std::sync::Arc;
1681
1682 use assert_matches::assert_matches;
1683 use assert_matches2::assert_let;
1684 use eyeball_im::VectorDiff;
1685 use matrix_sdk_base::{
1686 event_cache::{
1687 store::{EventCacheStore as _, MemoryStore},
1688 Gap,
1689 },
1690 linked_chunk::{
1691 lazy_loader::from_all_chunks, ChunkContent, ChunkIdentifier, LinkedChunkId, Position,
1692 Update,
1693 },
1694 store::StoreConfig,
1695 sync::{JoinedRoomUpdate, Timeline},
1696 };
1697 use matrix_sdk_test::{async_test, event_factory::EventFactory, ALICE, BOB};
1698 use ruma::{
1699 event_id,
1700 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
1701 room_id, user_id,
1702 };
1703 use tokio::task::yield_now;
1704
1705 use crate::{
1706 assert_let_timeout,
1707 event_cache::{room::LoadMoreEventsBackwardsOutcome, RoomEventCacheUpdate},
1708 test_utils::client::MockClientBuilder,
1709 };
1710
1711 #[async_test]
1712 async fn test_write_to_storage() {
1713 let room_id = room_id!("!galette:saucisse.bzh");
1714 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1715
1716 let event_cache_store = Arc::new(MemoryStore::new());
1717
1718 let client = MockClientBuilder::new("http://localhost".to_owned())
1719 .store_config(
1720 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1721 )
1722 .build()
1723 .await;
1724
1725 let event_cache = client.event_cache();
1726
1727 event_cache.subscribe().unwrap();
1729
1730 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1731 let room = client.get_room(room_id).unwrap();
1732
1733 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1734
1735 let timeline = Timeline {
1737 limited: true,
1738 prev_batch: Some("raclette".to_owned()),
1739 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
1740 };
1741
1742 room_event_cache
1743 .inner
1744 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1745 .await
1746 .unwrap();
1747
1748 let linked_chunk = from_all_chunks::<3, _, _>(
1749 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1750 )
1751 .unwrap()
1752 .unwrap();
1753
1754 assert_eq!(linked_chunk.chunks().count(), 2);
1755
1756 let mut chunks = linked_chunk.chunks();
1757
1758 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
1760 assert_eq!(gap.prev_token, "raclette");
1761 });
1762
1763 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1765 assert_eq!(events.len(), 1);
1766 let deserialized = events[0].raw().deserialize().unwrap();
1767 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
1768 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
1769 });
1770
1771 assert!(chunks.next().is_none());
1773 }
1774
1775 #[async_test]
1776 async fn test_write_to_storage_strips_bundled_relations() {
1777 let room_id = room_id!("!galette:saucisse.bzh");
1778 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1779
1780 let event_cache_store = Arc::new(MemoryStore::new());
1781
1782 let client = MockClientBuilder::new("http://localhost".to_owned())
1783 .store_config(
1784 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1785 )
1786 .build()
1787 .await;
1788
1789 let event_cache = client.event_cache();
1790
1791 event_cache.subscribe().unwrap();
1793
1794 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1795 let room = client.get_room(room_id).unwrap();
1796
1797 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1798
1799 let ev = f
1801 .text_msg("hey yo")
1802 .sender(*ALICE)
1803 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
1804 .into_event();
1805
1806 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
1807
1808 room_event_cache
1809 .inner
1810 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
1811 .await
1812 .unwrap();
1813
1814 {
1816 let events = room_event_cache.events().await;
1817
1818 assert_eq!(events.len(), 1);
1819
1820 let ev = events[0].raw().deserialize().unwrap();
1821 assert_let!(
1822 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
1823 );
1824
1825 let original = msg.as_original().unwrap();
1826 assert_eq!(original.content.body(), "hey yo");
1827 assert!(original.unsigned.relations.replace.is_some());
1828 }
1829
1830 let linked_chunk = from_all_chunks::<3, _, _>(
1832 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1833 )
1834 .unwrap()
1835 .unwrap();
1836
1837 assert_eq!(linked_chunk.chunks().count(), 1);
1838
1839 let mut chunks = linked_chunk.chunks();
1840 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
1841 assert_eq!(events.len(), 1);
1842
1843 let ev = events[0].raw().deserialize().unwrap();
1844 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
1845
1846 let original = msg.as_original().unwrap();
1847 assert_eq!(original.content.body(), "hey yo");
1848 assert!(original.unsigned.relations.replace.is_none());
1849 });
1850
1851 assert!(chunks.next().is_none());
1853 }
1854
1855 #[async_test]
1856 async fn test_clear() {
1857 let room_id = room_id!("!galette:saucisse.bzh");
1858 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1859
1860 let event_cache_store = Arc::new(MemoryStore::new());
1861
1862 let event_id1 = event_id!("$1");
1863 let event_id2 = event_id!("$2");
1864
1865 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
1866 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
1867
1868 event_cache_store
1870 .handle_linked_chunk_updates(
1871 LinkedChunkId::Room(room_id),
1872 vec![
1873 Update::NewItemsChunk {
1875 previous: None,
1876 new: ChunkIdentifier::new(0),
1877 next: None,
1878 },
1879 Update::NewGapChunk {
1881 previous: Some(ChunkIdentifier::new(0)),
1882 new: ChunkIdentifier::new(42),
1884 next: None,
1885 gap: Gap { prev_token: "comté".to_owned() },
1886 },
1887 Update::NewItemsChunk {
1889 previous: Some(ChunkIdentifier::new(42)),
1890 new: ChunkIdentifier::new(1),
1891 next: None,
1892 },
1893 Update::PushItems {
1894 at: Position::new(ChunkIdentifier::new(1), 0),
1895 items: vec![ev1.clone()],
1896 },
1897 Update::NewItemsChunk {
1899 previous: Some(ChunkIdentifier::new(1)),
1900 new: ChunkIdentifier::new(2),
1901 next: None,
1902 },
1903 Update::PushItems {
1904 at: Position::new(ChunkIdentifier::new(2), 0),
1905 items: vec![ev2.clone()],
1906 },
1907 ],
1908 )
1909 .await
1910 .unwrap();
1911
1912 let client = MockClientBuilder::new("http://localhost".to_owned())
1913 .store_config(
1914 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
1915 )
1916 .build()
1917 .await;
1918
1919 let event_cache = client.event_cache();
1920
1921 event_cache.subscribe().unwrap();
1923
1924 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
1925 let room = client.get_room(room_id).unwrap();
1926
1927 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
1928
1929 let (items, mut stream) = room_event_cache.subscribe().await;
1930
1931 {
1933 assert!(room_event_cache.event(event_id1).await.is_some());
1934 assert!(room_event_cache.event(event_id2).await.is_some());
1935 }
1936
1937 {
1939 assert_eq!(items.len(), 1);
1941 assert_eq!(items[0].event_id().unwrap(), event_id2);
1942
1943 assert!(stream.is_empty());
1944 }
1945
1946 {
1948 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
1949
1950 assert_let_timeout!(
1951 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1952 );
1953 assert_eq!(diffs.len(), 1);
1954 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
1955 assert_eq!(event.event_id().unwrap(), event_id1);
1957 });
1958
1959 assert!(stream.is_empty());
1960 }
1961
1962 room_event_cache.clear().await.unwrap();
1964
1965 assert_let_timeout!(
1967 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
1968 );
1969 assert_eq!(diffs.len(), 1);
1970 assert_let!(VectorDiff::Clear = &diffs[0]);
1971
1972 assert!(room_event_cache.event(event_id1).await.is_some());
1975
1976 let items = room_event_cache.events().await;
1978 assert!(items.is_empty());
1979
1980 let linked_chunk = from_all_chunks::<3, _, _>(
1982 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
1983 )
1984 .unwrap()
1985 .unwrap();
1986
1987 assert_eq!(linked_chunk.num_items(), 0);
1991 }
1992
1993 #[async_test]
1994 async fn test_load_from_storage() {
1995 let room_id = room_id!("!galette:saucisse.bzh");
1996 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
1997
1998 let event_cache_store = Arc::new(MemoryStore::new());
1999
2000 let event_id1 = event_id!("$1");
2001 let event_id2 = event_id!("$2");
2002
2003 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2004 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2005
2006 event_cache_store
2008 .handle_linked_chunk_updates(
2009 LinkedChunkId::Room(room_id),
2010 vec![
2011 Update::NewItemsChunk {
2013 previous: None,
2014 new: ChunkIdentifier::new(0),
2015 next: None,
2016 },
2017 Update::NewGapChunk {
2019 previous: Some(ChunkIdentifier::new(0)),
2020 new: ChunkIdentifier::new(42),
2022 next: None,
2023 gap: Gap { prev_token: "cheddar".to_owned() },
2024 },
2025 Update::NewItemsChunk {
2027 previous: Some(ChunkIdentifier::new(42)),
2028 new: ChunkIdentifier::new(1),
2029 next: None,
2030 },
2031 Update::PushItems {
2032 at: Position::new(ChunkIdentifier::new(1), 0),
2033 items: vec![ev1.clone()],
2034 },
2035 Update::NewItemsChunk {
2037 previous: Some(ChunkIdentifier::new(1)),
2038 new: ChunkIdentifier::new(2),
2039 next: None,
2040 },
2041 Update::PushItems {
2042 at: Position::new(ChunkIdentifier::new(2), 0),
2043 items: vec![ev2.clone()],
2044 },
2045 ],
2046 )
2047 .await
2048 .unwrap();
2049
2050 let client = MockClientBuilder::new("http://localhost".to_owned())
2051 .store_config(
2052 StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
2053 )
2054 .build()
2055 .await;
2056
2057 let event_cache = client.event_cache();
2058
2059 event_cache.subscribe().unwrap();
2061
2062 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2063 let room = client.get_room(room_id).unwrap();
2064
2065 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2066
2067 let (items, mut stream) = room_event_cache.subscribe().await;
2068
2069 assert_eq!(items.len(), 1);
2072 assert_eq!(items[0].event_id().unwrap(), event_id2);
2073 assert!(stream.is_empty());
2074
2075 assert!(room_event_cache.event(event_id1).await.is_some());
2077 assert!(room_event_cache.event(event_id2).await.is_some());
2078
2079 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2081
2082 assert_let_timeout!(
2083 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2084 );
2085 assert_eq!(diffs.len(), 1);
2086 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2087 assert_eq!(event.event_id().unwrap(), event_id1);
2088 });
2089
2090 assert!(stream.is_empty());
2091
2092 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
2094 room_event_cache
2095 .inner
2096 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2097 .await
2098 .unwrap();
2099
2100 let items = room_event_cache.events().await;
2105 assert_eq!(items.len(), 2);
2106 assert_eq!(items[0].event_id().unwrap(), event_id1);
2107 assert_eq!(items[1].event_id().unwrap(), event_id2);
2108 }
2109
2110 #[async_test]
2111 async fn test_load_from_storage_resilient_to_failure() {
2112 let room_id = room_id!("!fondue:patate.ch");
2113 let event_cache_store = Arc::new(MemoryStore::new());
2114
2115 let event = EventFactory::new()
2116 .room(room_id)
2117 .sender(user_id!("@ben:saucisse.bzh"))
2118 .text_msg("foo")
2119 .event_id(event_id!("$42"))
2120 .into_event();
2121
2122 event_cache_store
2124 .handle_linked_chunk_updates(
2125 LinkedChunkId::Room(room_id),
2126 vec![
2127 Update::NewItemsChunk {
2128 previous: None,
2129 new: ChunkIdentifier::new(0),
2130 next: None,
2131 },
2132 Update::PushItems {
2133 at: Position::new(ChunkIdentifier::new(0), 0),
2134 items: vec![event],
2135 },
2136 Update::NewItemsChunk {
2137 previous: Some(ChunkIdentifier::new(0)),
2138 new: ChunkIdentifier::new(1),
2139 next: Some(ChunkIdentifier::new(0)),
2140 },
2141 ],
2142 )
2143 .await
2144 .unwrap();
2145
2146 let client = MockClientBuilder::new("http://localhost".to_owned())
2147 .store_config(
2148 StoreConfig::new("holder".to_owned()).event_cache_store(event_cache_store.clone()),
2149 )
2150 .build()
2151 .await;
2152
2153 let event_cache = client.event_cache();
2154
2155 event_cache.subscribe().unwrap();
2157
2158 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2159 let room = client.get_room(room_id).unwrap();
2160
2161 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2162
2163 let items = room_event_cache.events().await;
2164
2165 assert!(items.is_empty());
2168
2169 let raw_chunks =
2172 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
2173 assert!(raw_chunks.is_empty());
2174 }
2175
2176 #[async_test]
2177 async fn test_no_useless_gaps() {
2178 let room_id = room_id!("!galette:saucisse.bzh");
2179
2180 let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2181
2182 let event_cache = client.event_cache();
2183 event_cache.subscribe().unwrap();
2184
2185 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2186 let room = client.get_room(room_id).unwrap();
2187 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2188
2189 let f = EventFactory::new().room(room_id).sender(*ALICE);
2190
2191 room_event_cache
2194 .inner
2195 .handle_joined_room_update(JoinedRoomUpdate {
2196 timeline: Timeline {
2197 limited: true,
2198 prev_batch: Some("raclette".to_owned()),
2199 events: vec![f.text_msg("hey yo").into_event()],
2200 },
2201 ..Default::default()
2202 })
2203 .await
2204 .unwrap();
2205
2206 {
2207 let mut state = room_event_cache.inner.state.write().await;
2208
2209 let mut num_gaps = 0;
2210 let mut num_events = 0;
2211
2212 for c in state.events().chunks() {
2213 match c.content() {
2214 ChunkContent::Items(items) => num_events += items.len(),
2215 ChunkContent::Gap(_) => num_gaps += 1,
2216 }
2217 }
2218
2219 assert_eq!(num_gaps, 0);
2222 assert_eq!(num_events, 1);
2223
2224 assert_matches!(
2226 state.load_more_events_backwards().await.unwrap(),
2227 LoadMoreEventsBackwardsOutcome::Gap { .. }
2228 );
2229
2230 num_gaps = 0;
2231 num_events = 0;
2232 for c in state.events().chunks() {
2233 match c.content() {
2234 ChunkContent::Items(items) => num_events += items.len(),
2235 ChunkContent::Gap(_) => num_gaps += 1,
2236 }
2237 }
2238
2239 assert_eq!(num_gaps, 1);
2241 assert_eq!(num_events, 1);
2242 }
2243
2244 room_event_cache
2247 .inner
2248 .handle_joined_room_update(JoinedRoomUpdate {
2249 timeline: Timeline {
2250 limited: false,
2251 prev_batch: Some("fondue".to_owned()),
2252 events: vec![f.text_msg("sup").into_event()],
2253 },
2254 ..Default::default()
2255 })
2256 .await
2257 .unwrap();
2258
2259 {
2260 let state = room_event_cache.inner.state.read().await;
2261
2262 let mut num_gaps = 0;
2263 let mut num_events = 0;
2264
2265 for c in state.events().chunks() {
2266 match c.content() {
2267 ChunkContent::Items(items) => num_events += items.len(),
2268 ChunkContent::Gap(gap) => {
2269 assert_eq!(gap.prev_token, "raclette");
2270 num_gaps += 1;
2271 }
2272 }
2273 }
2274
2275 assert_eq!(num_gaps, 1);
2277 assert_eq!(num_events, 2);
2278 }
2279 }
2280
2281 #[async_test]
2282 async fn test_shrink_to_last_chunk() {
2283 let room_id = room_id!("!galette:saucisse.bzh");
2284
2285 let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2286
2287 let f = EventFactory::new().room(room_id);
2288
2289 let evid1 = event_id!("$1");
2290 let evid2 = event_id!("$2");
2291
2292 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2293 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2294
2295 {
2297 let store = client.event_cache_store();
2298 let store = store.lock().await.unwrap();
2299 store
2300 .handle_linked_chunk_updates(
2301 LinkedChunkId::Room(room_id),
2302 vec![
2303 Update::NewItemsChunk {
2304 previous: None,
2305 new: ChunkIdentifier::new(0),
2306 next: None,
2307 },
2308 Update::PushItems {
2309 at: Position::new(ChunkIdentifier::new(0), 0),
2310 items: vec![ev1],
2311 },
2312 Update::NewItemsChunk {
2313 previous: Some(ChunkIdentifier::new(0)),
2314 new: ChunkIdentifier::new(1),
2315 next: None,
2316 },
2317 Update::PushItems {
2318 at: Position::new(ChunkIdentifier::new(1), 0),
2319 items: vec![ev2],
2320 },
2321 ],
2322 )
2323 .await
2324 .unwrap();
2325 }
2326
2327 let event_cache = client.event_cache();
2328 event_cache.subscribe().unwrap();
2329
2330 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2331 let room = client.get_room(room_id).unwrap();
2332 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2333
2334 let (events, mut stream) = room_event_cache.subscribe().await;
2336 assert_eq!(events.len(), 1);
2337 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2338 assert!(stream.is_empty());
2339
2340 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2342 assert_eq!(outcome.events.len(), 1);
2343 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2344 assert!(outcome.reached_start);
2345
2346 assert_let_timeout!(
2348 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2349 );
2350 assert_eq!(diffs.len(), 1);
2351 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2352 assert_eq!(value.event_id().as_deref(), Some(evid1));
2353 });
2354
2355 assert!(stream.is_empty());
2356
2357 let diffs = room_event_cache
2359 .inner
2360 .state
2361 .write()
2362 .await
2363 .shrink_to_last_chunk()
2364 .await
2365 .expect("shrinking should succeed")
2366 .unwrap();
2367
2368 assert_eq!(diffs.len(), 2);
2370 assert_matches!(&diffs[0], VectorDiff::Clear);
2371 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
2372 assert_eq!(values.len(), 1);
2373 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
2374 });
2375
2376 assert!(stream.is_empty());
2377
2378 let events = room_event_cache.events().await;
2380 assert_eq!(events.len(), 1);
2381 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
2382
2383 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2386 assert_eq!(outcome.events.len(), 1);
2387 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2388 assert!(outcome.reached_start);
2389 }
2390
2391 #[async_test]
2392 async fn test_auto_shrink_after_all_subscribers_are_gone() {
2393 let room_id = room_id!("!galette:saucisse.bzh");
2394
2395 let client = MockClientBuilder::new("http://localhost".to_owned()).build().await;
2396
2397 let f = EventFactory::new().room(room_id);
2398
2399 let evid1 = event_id!("$1");
2400 let evid2 = event_id!("$2");
2401
2402 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
2403 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
2404
2405 {
2407 let store = client.event_cache_store();
2408 let store = store.lock().await.unwrap();
2409 store
2410 .handle_linked_chunk_updates(
2411 LinkedChunkId::Room(room_id),
2412 vec![
2413 Update::NewItemsChunk {
2414 previous: None,
2415 new: ChunkIdentifier::new(0),
2416 next: None,
2417 },
2418 Update::PushItems {
2419 at: Position::new(ChunkIdentifier::new(0), 0),
2420 items: vec![ev1],
2421 },
2422 Update::NewItemsChunk {
2423 previous: Some(ChunkIdentifier::new(0)),
2424 new: ChunkIdentifier::new(1),
2425 next: None,
2426 },
2427 Update::PushItems {
2428 at: Position::new(ChunkIdentifier::new(1), 0),
2429 items: vec![ev2],
2430 },
2431 ],
2432 )
2433 .await
2434 .unwrap();
2435 }
2436
2437 let event_cache = client.event_cache();
2438 event_cache.subscribe().unwrap();
2439
2440 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2441 let room = client.get_room(room_id).unwrap();
2442 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2443
2444 let (events1, mut stream1) = room_event_cache.subscribe().await;
2446 assert_eq!(events1.len(), 1);
2447 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
2448 assert!(stream1.is_empty());
2449
2450 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2452 assert_eq!(outcome.events.len(), 1);
2453 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
2454 assert!(outcome.reached_start);
2455
2456 assert_let_timeout!(
2459 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
2460 );
2461 assert_eq!(diffs.len(), 1);
2462 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
2463 assert_eq!(value.event_id().as_deref(), Some(evid1));
2464 });
2465
2466 assert!(stream1.is_empty());
2467
2468 let (events2, stream2) = room_event_cache.subscribe().await;
2472 assert_eq!(events2.len(), 2);
2473 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
2474 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
2475 assert!(stream2.is_empty());
2476
2477 drop(stream1);
2479 yield_now().await;
2480
2481 assert!(stream2.is_empty());
2483
2484 drop(stream2);
2486 yield_now().await;
2487
2488 {
2491 let state = room_event_cache.inner.state.read().await;
2493 assert_eq!(state.listener_count.load(std::sync::atomic::Ordering::SeqCst), 0);
2494 }
2495
2496 let events3 = room_event_cache.events().await;
2498 assert_eq!(events3.len(), 1);
2499 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
2500 }
2501}