1use std::{
18 collections::BTreeMap,
19 fmt,
20 ops::{Deref, DerefMut},
21 sync::{
22 Arc,
23 atomic::{AtomicUsize, Ordering},
24 },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31 deserialized_responses::AmbiguityChange,
32 event_cache::Event,
33 linked_chunk::Position,
34 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37 EventId, OwnedEventId, OwnedRoomId,
38 api::Direction,
39 events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40 serde::Raw,
41};
42use tokio::sync::{
43 Notify,
44 broadcast::{Receiver, Sender},
45 mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50 AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51 RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54 client::WeakClient,
55 event_cache::EventCacheError,
56 room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod threads;
61
62pub use threads::ThreadEventCacheUpdate;
63
64#[derive(Clone)]
68pub struct RoomEventCache {
69 pub(super) inner: Arc<RoomEventCacheInner>,
70}
71
72impl fmt::Debug for RoomEventCache {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_struct("RoomEventCache").finish_non_exhaustive()
75 }
76}
77
78#[allow(missing_debug_implementations)]
88pub struct RoomEventCacheSubscriber {
89 recv: Receiver<RoomEventCacheUpdate>,
91
92 room_id: OwnedRoomId,
94
95 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
97
98 subscriber_count: Arc<AtomicUsize>,
100}
101
102impl Drop for RoomEventCacheSubscriber {
103 fn drop(&mut self) {
104 let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
105
106 trace!(
107 "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
108 );
109
110 if previous_subscriber_count == 1 {
111 let mut room_id = self.room_id.clone();
115
116 let mut num_attempts = 0;
122
123 while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
124 num_attempts += 1;
125
126 if num_attempts > 1024 {
127 warn!(
130 "couldn't send notification to the auto-shrink channel \
131 after 1024 attempts; giving up"
132 );
133 return;
134 }
135
136 match err {
137 mpsc::error::TrySendError::Full(stolen_room_id) => {
138 room_id = stolen_room_id;
139 }
140 mpsc::error::TrySendError::Closed(_) => return,
141 }
142 }
143
144 trace!("sent notification to the parent channel that we were the last subscriber");
145 }
146 }
147}
148
149impl Deref for RoomEventCacheSubscriber {
150 type Target = Receiver<RoomEventCacheUpdate>;
151
152 fn deref(&self) -> &Self::Target {
153 &self.recv
154 }
155}
156
157impl DerefMut for RoomEventCacheSubscriber {
158 fn deref_mut(&mut self) -> &mut Self::Target {
159 &mut self.recv
160 }
161}
162
163impl RoomEventCache {
164 pub(super) fn new(
166 client: WeakClient,
167 state: RoomEventCacheStateLock,
168 pagination_status: SharedObservable<RoomPaginationStatus>,
169 room_id: OwnedRoomId,
170 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
171 update_sender: Sender<RoomEventCacheUpdate>,
172 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
173 ) -> Self {
174 Self {
175 inner: Arc::new(RoomEventCacheInner::new(
176 client,
177 state,
178 pagination_status,
179 room_id,
180 auto_shrink_sender,
181 update_sender,
182 generic_update_sender,
183 )),
184 }
185 }
186
187 pub async fn events(&self) -> Result<Vec<Event>> {
192 let state = self.inner.state.read().await?;
193
194 Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
195 }
196
197 pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
204 let state = self.inner.state.read().await?;
205 let events =
206 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
207
208 let subscriber_count = state.subscriber_count();
209 let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
210 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
211
212 let recv = self.inner.update_sender.subscribe();
213 let subscriber = RoomEventCacheSubscriber {
214 recv,
215 room_id: self.inner.room_id.clone(),
216 auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
217 subscriber_count: subscriber_count.clone(),
218 };
219
220 Ok((events, subscriber))
221 }
222
223 pub async fn subscribe_to_thread(
226 &self,
227 thread_root: OwnedEventId,
228 ) -> Result<(Vec<Event>, Receiver<ThreadEventCacheUpdate>)> {
229 let mut state = self.inner.state.write().await?;
230 Ok(state.subscribe_to_thread(thread_root))
231 }
232
233 #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
238 pub async fn paginate_thread_backwards(
239 &self,
240 thread_root: OwnedEventId,
241 num_events: u16,
242 ) -> Result<bool> {
243 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
244
245 let mut outcome =
247 self.inner.state.write().await?.load_more_thread_events_backwards(thread_root.clone());
248
249 loop {
250 match outcome {
251 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
252 let options = RelationsOptions {
254 from: prev_token.clone(),
255 dir: Direction::Backward,
256 limit: Some(num_events.into()),
257 include_relations: IncludeRelations::AllRelations,
258 recurse: true,
259 };
260
261 let mut result = room
262 .relations(thread_root.clone(), options)
263 .await
264 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
265
266 let reached_start = result.next_batch_token.is_none();
267 trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
268
269 let root_event =
272 if reached_start {
273 Some(room.load_or_fetch_event(&thread_root, None).await.map_err(
275 |err| EventCacheError::BackpaginationError(Box::new(err)),
276 )?)
277 } else {
278 None
279 };
280
281 let mut state = self.inner.state.write().await?;
282
283 state.save_events(result.chunk.iter().cloned()).await?;
285
286 result.chunk.extend(root_event);
289
290 if let Some(outcome) = state.finish_thread_network_pagination(
291 thread_root.clone(),
292 prev_token,
293 result.next_batch_token,
294 result.chunk,
295 ) {
296 return Ok(outcome.reached_start);
297 }
298
299 outcome = state.load_more_thread_events_backwards(thread_root.clone());
301 }
302
303 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
304 return Ok(true);
306 }
307
308 LoadMoreEventsBackwardsOutcome::Events { .. } => {
309 unimplemented!("loading from disk for threads is not implemented yet");
311 }
312 }
313 }
314 }
315
316 pub fn pagination(&self) -> RoomPagination {
319 RoomPagination { inner: self.inner.clone() }
320 }
321
322 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
331 where
332 P: FnMut(&Event, Option<OwnedEventId>) -> Option<O>,
333 {
334 Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
335 }
336
337 pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
342 Ok(self
343 .inner
344 .state
345 .read()
346 .await?
347 .find_event(event_id)
348 .await
349 .ok()
350 .flatten()
351 .map(|(_loc, event)| event))
352 }
353
354 pub async fn find_event_with_relations(
366 &self,
367 event_id: &EventId,
368 filter: Option<Vec<RelationType>>,
369 ) -> Result<Option<(Event, Vec<Event>)>> {
370 Ok(self
372 .inner
373 .state
374 .read()
375 .await?
376 .find_event_with_relations(event_id, filter.clone())
377 .await
378 .ok()
379 .flatten())
380 }
381
382 pub async fn clear(&self) -> Result<()> {
387 let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
389
390 let _ = self.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
392 diffs: updates_as_vector_diffs,
393 origin: EventsOrigin::Cache,
394 });
395
396 let _ = self
398 .inner
399 .generic_update_sender
400 .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
401
402 Ok(())
403 }
404
405 pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
407 self.inner
408 .handle_timeline(
409 Timeline { limited: false, prev_batch: None, events: vec![event] },
410 Vec::new(),
411 BTreeMap::new(),
412 )
413 .await
414 }
415
416 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
419 match self.inner.state.write().await {
420 Ok(mut state_guard) => {
421 if let Err(err) = state_guard.save_events(events).await {
422 warn!("couldn't save event in the event cache: {err}");
423 }
424 }
425
426 Err(err) => {
427 warn!("couldn't save event in the event cache: {err}");
428 }
429 }
430 }
431
432 pub async fn debug_string(&self) -> Vec<String> {
435 match self.inner.state.read().await {
436 Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
437 Err(err) => {
438 warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
439
440 vec![]
441 }
442 }
443 }
444}
445
446pub(super) struct RoomEventCacheInner {
448 pub(super) room_id: OwnedRoomId,
450
451 pub weak_room: WeakRoom,
452
453 pub state: RoomEventCacheStateLock,
455
456 pub pagination_batch_token_notifier: Notify,
458
459 pub pagination_status: SharedObservable<RoomPaginationStatus>,
460
461 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
466
467 pub update_sender: Sender<RoomEventCacheUpdate>,
469
470 pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
476}
477
478impl RoomEventCacheInner {
479 fn new(
482 client: WeakClient,
483 state: RoomEventCacheStateLock,
484 pagination_status: SharedObservable<RoomPaginationStatus>,
485 room_id: OwnedRoomId,
486 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
487 update_sender: Sender<RoomEventCacheUpdate>,
488 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
489 ) -> Self {
490 let weak_room = WeakRoom::new(client, room_id);
491
492 Self {
493 room_id: weak_room.room_id().to_owned(),
494 weak_room,
495 state,
496 update_sender,
497 pagination_batch_token_notifier: Default::default(),
498 auto_shrink_sender,
499 pagination_status,
500 generic_update_sender,
501 }
502 }
503
504 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
505 if account_data.is_empty() {
506 return;
507 }
508
509 let mut handled_read_marker = false;
510
511 trace!("Handling account data");
512
513 for raw_event in account_data {
514 match raw_event.deserialize() {
515 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
516 if handled_read_marker {
519 continue;
520 }
521
522 handled_read_marker = true;
523
524 let _ = self.update_sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
526 event_id: ev.content.event_id,
527 });
528 }
529
530 Ok(_) => {
531 }
534
535 Err(e) => {
536 let event_type = raw_event.get_field::<String>("type").ok().flatten();
537 warn!(event_type, "Failed to deserialize account data: {e}");
538 }
539 }
540 }
541 }
542
543 #[instrument(skip_all, fields(room_id = %self.room_id))]
544 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
545 self.handle_timeline(
546 updates.timeline,
547 updates.ephemeral.clone(),
548 updates.ambiguity_changes,
549 )
550 .await?;
551 self.handle_account_data(updates.account_data);
552
553 Ok(())
554 }
555
556 #[instrument(skip_all, fields(room_id = %self.room_id))]
557 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
558 self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
559
560 Ok(())
561 }
562
563 async fn handle_timeline(
566 &self,
567 timeline: Timeline,
568 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
569 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
570 ) -> Result<()> {
571 if timeline.events.is_empty()
572 && timeline.prev_batch.is_none()
573 && ephemeral_events.is_empty()
574 && ambiguity_changes.is_empty()
575 {
576 return Ok(());
577 }
578
579 trace!("adding new events");
581
582 let (stored_prev_batch_token, timeline_event_diffs) =
583 self.state.write().await?.handle_sync(timeline).await?;
584
585 if stored_prev_batch_token {
588 self.pagination_batch_token_notifier.notify_one();
589 }
590
591 if !timeline_event_diffs.is_empty() {
594 let _ = self.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
595 diffs: timeline_event_diffs,
596 origin: EventsOrigin::Sync,
597 });
598
599 let _ = self
600 .generic_update_sender
601 .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
602 }
603
604 if !ephemeral_events.is_empty() {
605 let _ = self
606 .update_sender
607 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
608 }
609
610 if !ambiguity_changes.is_empty() {
611 let _ =
612 self.update_sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
613 }
614
615 Ok(())
616 }
617}
618
619#[derive(Debug)]
622pub(super) enum LoadMoreEventsBackwardsOutcome {
623 Gap {
625 prev_token: Option<String>,
628 },
629
630 StartOfTimeline,
632
633 Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
635}
636
637mod private {
639 use std::{
640 collections::{BTreeMap, HashMap, HashSet},
641 sync::{
642 Arc,
643 atomic::{AtomicBool, AtomicUsize, Ordering},
644 },
645 };
646
647 use eyeball::SharedObservable;
648 use eyeball_im::VectorDiff;
649 use itertools::Itertools;
650 use matrix_sdk_base::{
651 apply_redaction,
652 deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
653 event_cache::{
654 Event, Gap,
655 store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState},
656 },
657 linked_chunk::{
658 ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
659 OwnedLinkedChunkId, Position, Update, lazy_loader,
660 },
661 serde_helpers::{extract_edit_target, extract_thread_root},
662 sync::Timeline,
663 };
664 use matrix_sdk_common::executor::spawn;
665 use ruma::{
666 EventId, OwnedEventId, OwnedRoomId, RoomId,
667 events::{
668 AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
669 relation::RelationType, room::redaction::SyncRoomRedactionEvent,
670 },
671 room_version_rules::RoomVersionRules,
672 serde::Raw,
673 };
674 use tokio::sync::{
675 Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
676 broadcast::{Receiver, Sender},
677 };
678 use tracing::{debug, error, instrument, trace, warn};
679
680 use super::{
681 super::{
682 BackPaginationOutcome, EventCacheError, RoomEventCacheLinkedChunkUpdate,
683 RoomPaginationStatus, ThreadEventCacheUpdate,
684 deduplicator::{DeduplicationOutcome, filter_duplicate_events},
685 room::threads::ThreadEventCache,
686 },
687 EventLocation, EventsOrigin, LoadMoreEventsBackwardsOutcome, RoomEventCacheGenericUpdate,
688 RoomEventCacheUpdate,
689 events::EventLinkedChunk,
690 sort_positions_descending,
691 };
692
693 pub struct RoomEventCacheStateLock {
698 locked_state: RwLock<RoomEventCacheStateLockInner>,
700
701 read_lock_acquisition: Mutex<()>,
704 }
705
706 struct RoomEventCacheStateLockInner {
707 enabled_thread_support: bool,
709
710 room_id: OwnedRoomId,
712
713 store: EventCacheStoreLock,
715
716 room_linked_chunk: EventLinkedChunk,
719
720 threads: HashMap<OwnedEventId, ThreadEventCache>,
724
725 pagination_status: SharedObservable<RoomPaginationStatus>,
726
727 update_sender: Sender<RoomEventCacheUpdate>,
732
733 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
738
739 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
742
743 room_version_rules: RoomVersionRules,
745
746 waited_for_initial_prev_token: Arc<AtomicBool>,
751
752 subscriber_count: Arc<AtomicUsize>,
755 }
756
757 impl RoomEventCacheStateLock {
758 #[allow(clippy::too_many_arguments)]
769 pub async fn new(
770 room_id: OwnedRoomId,
771 room_version_rules: RoomVersionRules,
772 enabled_thread_support: bool,
773 update_sender: Sender<RoomEventCacheUpdate>,
774 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
775 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
776 store: EventCacheStoreLock,
777 pagination_status: SharedObservable<RoomPaginationStatus>,
778 ) -> Result<Self, EventCacheError> {
779 let store_guard = match store.lock().await? {
780 EventCacheStoreLockState::Clean(guard) => guard,
782
783 EventCacheStoreLockState::Dirty(guard) => {
786 EventCacheStoreLockGuard::clear_dirty(&guard);
787
788 guard
789 }
790 };
791
792 let linked_chunk_id = LinkedChunkId::Room(&room_id);
793
794 let full_linked_chunk_metadata =
799 match load_linked_chunk_metadata(&store_guard, linked_chunk_id).await {
800 Ok(metas) => metas,
801 Err(err) => {
802 error!(
803 "error when loading a linked chunk's metadata from the store: {err}"
804 );
805
806 store_guard
808 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
809 .await?;
810
811 None
813 }
814 };
815
816 let linked_chunk = match store_guard
817 .load_last_chunk(linked_chunk_id)
818 .await
819 .map_err(EventCacheError::from)
820 .and_then(|(last_chunk, chunk_identifier_generator)| {
821 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
822 .map_err(EventCacheError::from)
823 }) {
824 Ok(linked_chunk) => linked_chunk,
825 Err(err) => {
826 error!(
827 "error when loading a linked chunk's latest chunk from the store: {err}"
828 );
829
830 store_guard
832 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
833 .await?;
834
835 None
836 }
837 };
838
839 let waited_for_initial_prev_token = Arc::new(AtomicBool::new(false));
840
841 Ok(Self {
842 locked_state: RwLock::new(RoomEventCacheStateLockInner {
843 enabled_thread_support,
844 room_id,
845 store,
846 room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
847 linked_chunk,
848 full_linked_chunk_metadata,
849 ),
850 threads: HashMap::new(),
855 pagination_status,
856 update_sender,
857 generic_update_sender,
858 linked_chunk_update_sender,
859 room_version_rules,
860 waited_for_initial_prev_token,
861 subscriber_count: Default::default(),
862 }),
863 read_lock_acquisition: Mutex::new(()),
864 })
865 }
866
867 pub async fn read(&self) -> Result<RoomEventCacheStateLockReadGuard<'_>, EventCacheError> {
877 let _one_reader_guard = self.read_lock_acquisition.lock().await;
924
925 let state_guard = self.locked_state.read().await;
927
928 match state_guard.store.lock().await? {
929 EventCacheStoreLockState::Clean(store_guard) => {
930 Ok(RoomEventCacheStateLockReadGuard { state: state_guard, store: store_guard })
931 }
932 EventCacheStoreLockState::Dirty(store_guard) => {
933 drop(state_guard);
937 let state_guard = self.locked_state.write().await;
938
939 let mut guard = RoomEventCacheStateLockWriteGuard {
940 state: state_guard,
941 store: store_guard,
942 };
943
944 let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
946
947 EventCacheStoreLockGuard::clear_dirty(&guard.store);
949
950 let guard = guard.downgrade();
952
953 if !updates_as_vector_diffs.is_empty() {
955 let _ = guard.state.update_sender.send(
957 RoomEventCacheUpdate::UpdateTimelineEvents {
958 diffs: updates_as_vector_diffs,
959 origin: EventsOrigin::Cache,
960 },
961 );
962
963 let _ =
965 guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
966 room_id: guard.state.room_id.clone(),
967 });
968 }
969
970 Ok(guard)
971 }
972 }
973 }
974
975 pub async fn write(
986 &self,
987 ) -> Result<RoomEventCacheStateLockWriteGuard<'_>, EventCacheError> {
988 let state_guard = self.locked_state.write().await;
989
990 match state_guard.store.lock().await? {
991 EventCacheStoreLockState::Clean(store_guard) => {
992 Ok(RoomEventCacheStateLockWriteGuard { state: state_guard, store: store_guard })
993 }
994 EventCacheStoreLockState::Dirty(store_guard) => {
995 let mut guard = RoomEventCacheStateLockWriteGuard {
996 state: state_guard,
997 store: store_guard,
998 };
999
1000 let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
1002
1003 EventCacheStoreLockGuard::clear_dirty(&guard.store);
1005
1006 if !updates_as_vector_diffs.is_empty() {
1008 let _ = guard.state.update_sender.send(
1010 RoomEventCacheUpdate::UpdateTimelineEvents {
1011 diffs: updates_as_vector_diffs,
1012 origin: EventsOrigin::Cache,
1013 },
1014 );
1015
1016 let _ =
1018 guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
1019 room_id: guard.state.room_id.clone(),
1020 });
1021 }
1022
1023 Ok(guard)
1024 }
1025 }
1026 }
1027 }
1028
1029 pub struct RoomEventCacheStateLockReadGuard<'a> {
1031 state: RwLockReadGuard<'a, RoomEventCacheStateLockInner>,
1034
1035 store: EventCacheStoreLockGuard,
1037 }
1038
1039 pub struct RoomEventCacheStateLockWriteGuard<'a> {
1041 state: RwLockWriteGuard<'a, RoomEventCacheStateLockInner>,
1044
1045 store: EventCacheStoreLockGuard,
1047 }
1048
1049 impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1050 fn downgrade(self) -> RoomEventCacheStateLockReadGuard<'a> {
1058 RoomEventCacheStateLockReadGuard { state: self.state.downgrade(), store: self.store }
1059 }
1060 }
1061
1062 impl<'a> RoomEventCacheStateLockReadGuard<'a> {
1063 pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1065 &self.state.room_linked_chunk
1066 }
1067
1068 pub fn subscriber_count(&self) -> &Arc<AtomicUsize> {
1069 &self.state.subscriber_count
1070 }
1071
1072 pub async fn find_event(
1077 &self,
1078 event_id: &EventId,
1079 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1080 find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1081 .await
1082 }
1083
1084 pub async fn find_event_with_relations(
1098 &self,
1099 event_id: &EventId,
1100 filters: Option<Vec<RelationType>>,
1101 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1102 find_event_with_relations(
1103 event_id,
1104 &self.state.room_id,
1105 filters,
1106 &self.state.room_linked_chunk,
1107 &self.store,
1108 )
1109 .await
1110 }
1111
1112 pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
1121 where
1122 P: FnMut(&Event, Option<OwnedEventId>) -> Option<O>,
1123 {
1124 self.state
1125 .room_linked_chunk
1126 .revents()
1127 .peekable()
1128 .batching(|iter| {
1129 iter.next().map(|(_position, event)| {
1130 (
1131 event,
1132 iter.peek()
1133 .and_then(|(_next_position, next_event)| next_event.event_id()),
1134 )
1135 })
1136 })
1137 .find_map(|(event, next_event_id)| predicate(event, next_event_id))
1138 }
1139
1140 #[cfg(test)]
1141 pub fn is_dirty(&self) -> bool {
1142 EventCacheStoreLockGuard::is_dirty(&self.store)
1143 }
1144 }
1145
1146 impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1147 #[cfg(any(feature = "e2e-encryption", test))]
1149 pub fn room_linked_chunk(&mut self) -> &mut EventLinkedChunk {
1150 &mut self.state.room_linked_chunk
1151 }
1152
1153 pub fn waited_for_initial_prev_token(&self) -> &Arc<AtomicBool> {
1155 &self.state.waited_for_initial_prev_token
1156 }
1157
1158 pub async fn find_event(
1163 &self,
1164 event_id: &EventId,
1165 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1166 find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1167 .await
1168 }
1169
1170 pub async fn find_event_with_relations(
1184 &self,
1185 event_id: &EventId,
1186 filters: Option<Vec<RelationType>>,
1187 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1188 find_event_with_relations(
1189 event_id,
1190 &self.state.room_id,
1191 filters,
1192 &self.state.room_linked_chunk,
1193 &self.store,
1194 )
1195 .await
1196 }
1197
1198 pub async fn load_more_events_backwards(
1200 &mut self,
1201 ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
1202 if let Some(prev_token) = self.state.room_linked_chunk.rgap().map(|gap| gap.prev_token)
1205 {
1206 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
1207 }
1208
1209 let prev_first_chunk = self
1210 .state
1211 .room_linked_chunk
1212 .chunks()
1213 .next()
1214 .expect("a linked chunk is never empty");
1215
1216 let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1218 let new_first_chunk = match self
1219 .store
1220 .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
1221 .await
1222 {
1223 Ok(Some(new_first_chunk)) => {
1224 new_first_chunk
1226 }
1227
1228 Ok(None) => {
1229 if self.state.room_linked_chunk.events().next().is_some() {
1234 trace!("chunk is fully loaded and non-empty: reached_start=true");
1237 return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
1238 }
1239
1240 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: None });
1242 }
1243
1244 Err(err) => {
1245 error!("error when loading the previous chunk of a linked chunk: {err}");
1246
1247 self.store
1249 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1250 .await?;
1251
1252 return Err(err.into());
1254 }
1255 };
1256
1257 let chunk_content = new_first_chunk.content.clone();
1258
1259 let reached_start = new_first_chunk.previous.is_none();
1265
1266 if let Err(err) =
1267 self.state.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk)
1268 {
1269 error!("error when inserting the previous chunk into its linked chunk: {err}");
1270
1271 self.store
1273 .handle_linked_chunk_updates(
1274 LinkedChunkId::Room(&self.state.room_id),
1275 vec![Update::Clear],
1276 )
1277 .await?;
1278
1279 return Err(err.into());
1281 }
1282
1283 let _ = self.state.room_linked_chunk.store_updates().take();
1286
1287 let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1289
1290 Ok(match chunk_content {
1291 ChunkContent::Gap(gap) => {
1292 trace!("reloaded chunk from disk (gap)");
1293 LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1294 }
1295
1296 ChunkContent::Items(events) => {
1297 trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1298 LoadMoreEventsBackwardsOutcome::Events {
1299 events,
1300 timeline_event_diffs,
1301 reached_start,
1302 }
1303 }
1304 })
1305 }
1306
1307 pub async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1316 let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1318 let (last_chunk, chunk_identifier_generator) =
1319 match self.store.load_last_chunk(linked_chunk_id).await {
1320 Ok(pair) => pair,
1321
1322 Err(err) => {
1323 error!("error when reloading a linked chunk from memory: {err}");
1325
1326 self.store
1328 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1329 .await?;
1330
1331 (None, ChunkIdentifierGenerator::new_from_scratch())
1333 }
1334 };
1335
1336 debug!("unloading the linked chunk, and resetting it to its last chunk");
1337
1338 if let Err(err) =
1341 self.state.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1342 {
1343 error!("error when replacing the linked chunk: {err}");
1344 return self.reset_internal().await;
1345 }
1346
1347 self.state
1351 .pagination_status
1352 .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1353
1354 let _ = self.state.room_linked_chunk.store_updates().take();
1357
1358 Ok(())
1359 }
1360
1361 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1364 pub async fn auto_shrink_if_no_subscribers(
1365 &mut self,
1366 ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1367 let subscriber_count = self.state.subscriber_count.load(Ordering::SeqCst);
1368
1369 trace!(subscriber_count, "received request to auto-shrink");
1370
1371 if subscriber_count == 0 {
1372 self.shrink_to_last_chunk().await?;
1375
1376 Ok(Some(self.state.room_linked_chunk.updates_as_vector_diffs()))
1377 } else {
1378 Ok(None)
1379 }
1380 }
1381
1382 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1384 pub async fn force_shrink_to_last_chunk(
1385 &mut self,
1386 ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1387 self.shrink_to_last_chunk().await?;
1388
1389 Ok(self.state.room_linked_chunk.updates_as_vector_diffs())
1390 }
1391
1392 #[instrument(skip_all)]
1398 pub async fn remove_events(
1399 &mut self,
1400 in_memory_events: Vec<(OwnedEventId, Position)>,
1401 in_store_events: Vec<(OwnedEventId, Position)>,
1402 ) -> Result<(), EventCacheError> {
1403 if !in_store_events.is_empty() {
1405 let mut positions = in_store_events
1406 .into_iter()
1407 .map(|(_event_id, position)| position)
1408 .collect::<Vec<_>>();
1409
1410 sort_positions_descending(&mut positions);
1411
1412 let updates = positions
1413 .into_iter()
1414 .map(|pos| Update::RemoveItem { at: pos })
1415 .collect::<Vec<_>>();
1416
1417 self.apply_store_only_updates(updates).await?;
1418 }
1419
1420 if in_memory_events.is_empty() {
1422 return Ok(());
1424 }
1425
1426 self.state
1428 .room_linked_chunk
1429 .remove_events_by_position(
1430 in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1431 )
1432 .expect("failed to remove an event");
1433
1434 self.propagate_changes().await
1435 }
1436
1437 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1438 let updates = self.state.room_linked_chunk.store_updates().take();
1439 self.send_updates_to_store(updates).await
1440 }
1441
1442 async fn apply_store_only_updates(
1449 &mut self,
1450 updates: Vec<Update<Event, Gap>>,
1451 ) -> Result<(), EventCacheError> {
1452 self.state.room_linked_chunk.order_tracker.map_updates(&updates);
1453 self.send_updates_to_store(updates).await
1454 }
1455
1456 async fn send_updates_to_store(
1457 &mut self,
1458 mut updates: Vec<Update<Event, Gap>>,
1459 ) -> Result<(), EventCacheError> {
1460 if updates.is_empty() {
1461 return Ok(());
1462 }
1463
1464 for update in updates.iter_mut() {
1466 match update {
1467 Update::PushItems { items, .. } => strip_relations_from_events(items),
1468 Update::ReplaceItem { item, .. } => strip_relations_from_event(item),
1469 Update::NewItemsChunk { .. }
1471 | Update::NewGapChunk { .. }
1472 | Update::RemoveChunk(_)
1473 | Update::RemoveItem { .. }
1474 | Update::DetachLastItems { .. }
1475 | Update::StartReattachItems
1476 | Update::EndReattachItems
1477 | Update::Clear => {}
1478 }
1479 }
1480
1481 let store = self.store.clone();
1488 let room_id = self.state.room_id.clone();
1489 let cloned_updates = updates.clone();
1490
1491 spawn(async move {
1492 trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1493 let linked_chunk_id = LinkedChunkId::Room(&room_id);
1494 store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1495 trace!("linked chunk updates applied");
1496
1497 super::Result::Ok(())
1498 })
1499 .await
1500 .expect("joining failed")?;
1501
1502 let _ = self.state.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1504 linked_chunk_id: OwnedLinkedChunkId::Room(self.state.room_id.clone()),
1505 updates,
1506 });
1507
1508 Ok(())
1509 }
1510
1511 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1517 self.reset_internal().await?;
1518
1519 let diff_updates = self.state.room_linked_chunk.updates_as_vector_diffs();
1520
1521 debug_assert_eq!(diff_updates.len(), 1);
1523 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1524
1525 Ok(diff_updates)
1526 }
1527
1528 async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1529 self.state.room_linked_chunk.reset();
1530
1531 for thread in self.state.threads.values_mut() {
1536 thread.clear();
1537 }
1538
1539 self.propagate_changes().await?;
1540
1541 self.state.waited_for_initial_prev_token.store(false, Ordering::SeqCst);
1545 self.state
1547 .pagination_status
1548 .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1549
1550 Ok(())
1551 }
1552
1553 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1561 pub async fn handle_sync(
1562 &mut self,
1563 mut timeline: Timeline,
1564 ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1565 let mut prev_batch = timeline.prev_batch.take();
1566
1567 let DeduplicationOutcome {
1568 all_events: events,
1569 in_memory_duplicated_event_ids,
1570 in_store_duplicated_event_ids,
1571 non_empty_all_duplicates: all_duplicates,
1572 } = filter_duplicate_events(
1573 &self.store,
1574 LinkedChunkId::Room(&self.state.room_id),
1575 &self.state.room_linked_chunk,
1576 timeline.events,
1577 )
1578 .await?;
1579
1580 if !timeline.limited && self.state.room_linked_chunk.events().next().is_some()
1593 || all_duplicates
1594 {
1595 prev_batch = None;
1596 }
1597
1598 if prev_batch.is_some() {
1599 let mut summaries_to_update = Vec::new();
1604
1605 for (thread_root, thread) in self.state.threads.iter_mut() {
1606 thread.clear();
1608
1609 summaries_to_update.push(thread_root.clone());
1610 }
1611
1612 for thread_root in summaries_to_update {
1616 let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1617 else {
1618 trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1619 continue;
1620 };
1621
1622 if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1623 prev_summary.latest_reply = None;
1624
1625 target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1626
1627 self.replace_event_at(location, target_event).await?;
1628 }
1629 }
1630 }
1631
1632 if all_duplicates {
1633 return Ok((false, Vec::new()));
1636 }
1637
1638 let has_new_gap = prev_batch.is_some();
1639
1640 if !self.state.waited_for_initial_prev_token.load(Ordering::SeqCst) && has_new_gap {
1643 self.state.waited_for_initial_prev_token.store(true, Ordering::SeqCst);
1644 }
1645
1646 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1651 .await?;
1652
1653 self.state
1654 .room_linked_chunk
1655 .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1656
1657 self.post_process_new_events(events, true).await?;
1658
1659 if timeline.limited && has_new_gap {
1660 self.shrink_to_last_chunk().await?;
1667 }
1668
1669 let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1670
1671 Ok((has_new_gap, timeline_event_diffs))
1672 }
1673
1674 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1682 pub async fn handle_backpagination(
1683 &mut self,
1684 events: Vec<Event>,
1685 mut new_token: Option<String>,
1686 prev_token: Option<String>,
1687 ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1688 {
1689 let prev_gap_id = if let Some(token) = prev_token {
1692 let gap_chunk_id = self.state.room_linked_chunk.chunk_identifier(|chunk| {
1694 matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1695 });
1696
1697 if gap_chunk_id.is_none() {
1698 return Ok(None);
1704 }
1705
1706 gap_chunk_id
1707 } else {
1708 None
1709 };
1710
1711 let DeduplicationOutcome {
1712 all_events: mut events,
1713 in_memory_duplicated_event_ids,
1714 in_store_duplicated_event_ids,
1715 non_empty_all_duplicates: all_duplicates,
1716 } = filter_duplicate_events(
1717 &self.store,
1718 LinkedChunkId::Room(&self.state.room_id),
1719 &self.state.room_linked_chunk,
1720 events,
1721 )
1722 .await?;
1723
1724 if !all_duplicates {
1738 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1740 .await?;
1741 } else {
1742 events.clear();
1744 new_token = None;
1747 }
1748
1749 let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1752
1753 let new_gap = new_token.map(|prev_token| Gap { prev_token });
1754 let reached_start = self.state.room_linked_chunk.finish_back_pagination(
1755 prev_gap_id,
1756 new_gap,
1757 &topo_ordered_events,
1758 );
1759
1760 self.post_process_new_events(topo_ordered_events, false).await?;
1762
1763 let event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1764
1765 Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1766 }
1767
1768 pub fn subscribe_to_thread(
1771 &mut self,
1772 root: OwnedEventId,
1773 ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1774 self.get_or_reload_thread(root).subscribe()
1775 }
1776
1777 pub fn finish_thread_network_pagination(
1781 &mut self,
1782 root: OwnedEventId,
1783 prev_token: Option<String>,
1784 new_token: Option<String>,
1785 events: Vec<Event>,
1786 ) -> Option<BackPaginationOutcome> {
1787 self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1788 }
1789
1790 pub fn load_more_thread_events_backwards(
1791 &mut self,
1792 root: OwnedEventId,
1793 ) -> LoadMoreEventsBackwardsOutcome {
1794 self.get_or_reload_thread(root).load_more_events_backwards()
1795 }
1796
1797 pub(in super::super) async fn post_process_new_events(
1806 &mut self,
1807 events: Vec<Event>,
1808 is_sync: bool,
1809 ) -> Result<(), EventCacheError> {
1810 self.propagate_changes().await?;
1812
1813 let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1814
1815 for event in events {
1816 self.maybe_apply_new_redaction(&event).await?;
1817
1818 if self.state.enabled_thread_support {
1819 if is_sync {
1824 if let Some(thread_root) = extract_thread_root(event.raw()) {
1825 new_events_by_thread
1826 .entry(thread_root)
1827 .or_default()
1828 .push(event.clone());
1829 } else if let Some(event_id) = event.event_id() {
1830 if self.state.threads.contains_key(&event_id) {
1832 new_events_by_thread
1833 .entry(event_id)
1834 .or_default()
1835 .push(event.clone());
1836 }
1837 }
1838 }
1839
1840 if let Some(edit_target) = extract_edit_target(event.raw()) {
1842 if let Some((_location, edit_target_event)) =
1844 self.find_event(&edit_target).await?
1845 && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1846 {
1847 new_events_by_thread.entry(thread_root).or_default();
1850 }
1851 }
1852 }
1853
1854 if let Some(bundled_thread) = event.bundled_latest_thread_event {
1856 self.save_events([*bundled_thread]).await?;
1857 }
1858 }
1859
1860 if self.state.enabled_thread_support {
1861 self.update_threads(new_events_by_thread).await?;
1862 }
1863
1864 Ok(())
1865 }
1866
1867 fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1868 let room_id = self.state.room_id.clone();
1871 let linked_chunk_update_sender = self.state.linked_chunk_update_sender.clone();
1872
1873 self.state.threads.entry(root_event_id.clone()).or_insert_with(|| {
1874 ThreadEventCache::new(room_id, root_event_id, linked_chunk_update_sender)
1875 })
1876 }
1877
1878 #[instrument(skip_all)]
1879 async fn update_threads(
1880 &mut self,
1881 new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1882 ) -> Result<(), EventCacheError> {
1883 for (thread_root, new_events) in new_events_by_thread {
1884 let thread_cache = self.get_or_reload_thread(thread_root.clone());
1885
1886 thread_cache.add_live_events(new_events);
1887
1888 let mut latest_event_id = thread_cache.latest_event_id();
1889
1890 if let Some(event_id) = latest_event_id.as_ref()
1893 && let Some((_, edits)) = self
1894 .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
1895 .await?
1896 && let Some(latest_edit) = edits.last()
1897 {
1898 latest_event_id = latest_edit.event_id();
1899 }
1900
1901 self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
1902 }
1903
1904 Ok(())
1905 }
1906
1907 async fn maybe_update_thread_summary(
1909 &mut self,
1910 thread_root: OwnedEventId,
1911 latest_event_id: Option<OwnedEventId>,
1912 ) -> Result<(), EventCacheError> {
1913 let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1917 trace!(%thread_root, "thread root event is missing from the room linked chunk");
1918 return Ok(());
1919 };
1920
1921 let prev_summary = target_event.thread_summary.summary();
1922
1923 let num_replies = {
1932 let thread_replies = self
1933 .store
1934 .find_event_relations(
1935 &self.state.room_id,
1936 &thread_root,
1937 Some(&[RelationType::Thread]),
1938 )
1939 .await?;
1940 thread_replies.len().try_into().unwrap_or(u32::MAX)
1941 };
1942
1943 let new_summary = if num_replies > 0 {
1944 Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
1945 } else {
1946 None
1947 };
1948
1949 if prev_summary == new_summary.as_ref() {
1950 trace!(%thread_root, "thread summary is already up-to-date");
1951 return Ok(());
1952 }
1953
1954 trace!(%thread_root, "updating thread summary: {new_summary:?}");
1956 target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
1957 self.replace_event_at(location, target_event).await
1958 }
1959
1960 pub(crate) async fn replace_event_at(
1967 &mut self,
1968 location: EventLocation,
1969 event: Event,
1970 ) -> Result<(), EventCacheError> {
1971 match location {
1972 EventLocation::Memory(position) => {
1973 self.state
1974 .room_linked_chunk
1975 .replace_event_at(position, event)
1976 .expect("should have been a valid position of an item");
1977 self.propagate_changes().await?;
1980 }
1981 EventLocation::Store => {
1982 self.save_events([event]).await?;
1983 }
1984 }
1985
1986 Ok(())
1987 }
1988
1989 #[instrument(skip_all)]
1993 async fn maybe_apply_new_redaction(
1994 &mut self,
1995 event: &Event,
1996 ) -> Result<(), EventCacheError> {
1997 let raw_event = event.raw();
1998
1999 let Ok(Some(MessageLikeEventType::RoomRedaction)) =
2002 raw_event.get_field::<MessageLikeEventType>("type")
2003 else {
2004 return Ok(());
2005 };
2006
2007 let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
2010 redaction,
2011 ))) = raw_event.deserialize()
2012 else {
2013 return Ok(());
2014 };
2015
2016 let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else {
2017 warn!("missing target event id from the redaction event");
2018 return Ok(());
2019 };
2020
2021 let Some((location, mut target_event)) = self.find_event(event_id).await? else {
2023 trace!("redacted event is missing from the linked chunk");
2024 return Ok(());
2025 };
2026
2027 let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
2029 match deserialized {
2032 AnySyncTimelineEvent::MessageLike(ev) => {
2033 if ev.is_redacted() {
2034 return Ok(());
2035 }
2036 }
2037 AnySyncTimelineEvent::State(ev) => {
2038 if ev.is_redacted() {
2039 return Ok(());
2040 }
2041 }
2042 }
2043
2044 extract_thread_root(target_event.raw())
2047 } else {
2048 warn!("failed to deserialize the event to redact");
2049 None
2050 };
2051
2052 if let Some(redacted_event) = apply_redaction(
2053 target_event.raw(),
2054 event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
2055 &self.state.room_version_rules.redaction,
2056 ) {
2057 target_event.replace_raw(redacted_event.cast_unchecked());
2062
2063 self.replace_event_at(location, target_event).await?;
2064
2065 if let Some(thread_root) = thread_root
2073 && let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
2074 {
2075 thread_cache.remove_if_present(event_id);
2076
2077 let latest_event_id = thread_cache.latest_event_id();
2080 self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
2081 }
2082 }
2083
2084 Ok(())
2085 }
2086
2087 pub async fn save_events(
2089 &mut self,
2090 events: impl IntoIterator<Item = Event>,
2091 ) -> Result<(), EventCacheError> {
2092 let store = self.store.clone();
2093 let room_id = self.state.room_id.clone();
2094 let events = events.into_iter().collect::<Vec<_>>();
2095
2096 spawn(async move {
2098 for event in events {
2099 store.save_event(&room_id, event).await?;
2100 }
2101 super::Result::Ok(())
2102 })
2103 .await
2104 .expect("joining failed")?;
2105
2106 Ok(())
2107 }
2108
2109 #[cfg(test)]
2110 pub fn is_dirty(&self) -> bool {
2111 EventCacheStoreLockGuard::is_dirty(&self.store)
2112 }
2113 }
2114
2115 async fn load_linked_chunk_metadata(
2121 store_guard: &EventCacheStoreLockGuard,
2122 linked_chunk_id: LinkedChunkId<'_>,
2123 ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
2124 let mut all_chunks = store_guard
2125 .load_all_chunks_metadata(linked_chunk_id)
2126 .await
2127 .map_err(EventCacheError::from)?;
2128
2129 if all_chunks.is_empty() {
2130 return Ok(None);
2132 }
2133
2134 let chunk_map: HashMap<_, _> =
2136 all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
2137
2138 let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
2140 let Some(last) = iter.next() else {
2141 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2142 details: "no last chunk found".to_owned(),
2143 });
2144 };
2145
2146 if let Some(other_last) = iter.next() {
2148 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2149 details: format!(
2150 "chunks {} and {} both claim to be last chunks",
2151 last.identifier.index(),
2152 other_last.identifier.index()
2153 ),
2154 });
2155 }
2156
2157 let mut seen = HashSet::new();
2160 let mut current = last;
2161 loop {
2162 if !seen.insert(current.identifier) {
2164 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2165 details: format!(
2166 "cycle detected in linked chunk at {}",
2167 current.identifier.index()
2168 ),
2169 });
2170 }
2171
2172 let Some(prev_id) = current.previous else {
2173 if seen.len() != all_chunks.len() {
2175 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2176 details: format!(
2177 "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
2178 seen.len(),
2179 all_chunks.len()
2180 ),
2181 });
2182 }
2183 break;
2184 };
2185
2186 let Some(pred_meta) = chunk_map.get(&prev_id) else {
2189 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2190 details: format!(
2191 "missing predecessor {} chunk for {}",
2192 prev_id.index(),
2193 current.identifier.index()
2194 ),
2195 });
2196 };
2197
2198 if pred_meta.next != Some(current.identifier) {
2200 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2201 details: format!(
2202 "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
2203 pred_meta.identifier.index(),
2204 pred_meta.next.map(|chunk_id| chunk_id.index()),
2205 current.identifier.index()
2206 ),
2207 });
2208 }
2209
2210 current = *pred_meta;
2211 }
2212
2213 let mut current = current.identifier;
2221 for i in 0..all_chunks.len() {
2222 let j = all_chunks
2224 .iter()
2225 .rev()
2226 .position(|meta| meta.identifier == current)
2227 .map(|j| all_chunks.len() - 1 - j)
2228 .expect("the target chunk must be present in the metadata");
2229 if i != j {
2230 all_chunks.swap(i, j);
2231 }
2232 if let Some(next) = all_chunks[i].next {
2233 current = next;
2234 }
2235 }
2236
2237 Ok(Some(all_chunks))
2238 }
2239
2240 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
2244 let mut closure = || -> Option<()> {
2248 let mut val: serde_json::Value = event.deserialize_as().ok()?;
2249 let unsigned = val.get_mut("unsigned")?;
2250 let unsigned_obj = unsigned.as_object_mut()?;
2251 if unsigned_obj.remove("m.relations").is_some() {
2252 *event = Raw::new(&val).ok()?.cast_unchecked();
2253 }
2254 None
2255 };
2256 let _ = closure();
2257 }
2258
2259 fn strip_relations_from_event(ev: &mut Event) {
2260 match &mut ev.kind {
2261 TimelineEventKind::Decrypted(decrypted) => {
2262 decrypted.unsigned_encryption_info = None;
2265
2266 strip_relations_if_present(&mut decrypted.event);
2268 }
2269
2270 TimelineEventKind::UnableToDecrypt { event, .. }
2271 | TimelineEventKind::PlainText { event } => {
2272 strip_relations_if_present(event);
2273 }
2274 }
2275 }
2276
2277 fn strip_relations_from_events(items: &mut [Event]) {
2279 for ev in items.iter_mut() {
2280 strip_relations_from_event(ev);
2281 }
2282 }
2283
2284 async fn find_event(
2287 event_id: &EventId,
2288 room_id: &RoomId,
2289 room_linked_chunk: &EventLinkedChunk,
2290 store: &EventCacheStoreLockGuard,
2291 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
2292 for (position, event) in room_linked_chunk.revents() {
2295 if event.event_id().as_deref() == Some(event_id) {
2296 return Ok(Some((EventLocation::Memory(position), event.clone())));
2297 }
2298 }
2299
2300 Ok(store.find_event(room_id, event_id).await?.map(|event| (EventLocation::Store, event)))
2301 }
2302
2303 async fn find_event_with_relations(
2307 event_id: &EventId,
2308 room_id: &RoomId,
2309 filters: Option<Vec<RelationType>>,
2310 room_linked_chunk: &EventLinkedChunk,
2311 store: &EventCacheStoreLockGuard,
2312 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
2313 let found = store.find_event(room_id, event_id).await?;
2315
2316 let Some(target) = found else {
2317 return Ok(None);
2319 };
2320
2321 let mut related = store.find_event_relations(room_id, event_id, filters.as_deref()).await?;
2324 let mut stack =
2325 related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
2326
2327 let mut already_seen = HashSet::new();
2330 already_seen.insert(event_id.to_owned());
2331
2332 let mut num_iters = 1;
2333
2334 while let Some(event_id) = stack.pop() {
2336 if !already_seen.insert(event_id.clone()) {
2337 continue;
2339 }
2340
2341 let other_related =
2342 store.find_event_relations(room_id, &event_id, filters.as_deref()).await?;
2343
2344 stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
2345 related.extend(other_related);
2346
2347 num_iters += 1;
2348 }
2349
2350 trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
2351
2352 related.sort_by(|(_, lhs), (_, rhs)| {
2356 use std::cmp::Ordering;
2357
2358 match (lhs, rhs) {
2359 (None, None) => Ordering::Equal,
2360 (None, Some(_)) => Ordering::Less,
2361 (Some(_), None) => Ordering::Greater,
2362 (Some(lhs), Some(rhs)) => {
2363 let lhs = room_linked_chunk.event_order(*lhs);
2364 let rhs = room_linked_chunk.event_order(*rhs);
2365
2366 match (lhs, rhs) {
2370 (None, None) => Ordering::Equal,
2371 (None, Some(_)) => Ordering::Less,
2372 (Some(_), None) => Ordering::Greater,
2373 (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
2374 }
2375 }
2376 }
2377 });
2378
2379 let related = related.into_iter().map(|(event, _pos)| event).collect();
2381
2382 Ok(Some((target, related)))
2383 }
2384}
2385
2386pub(super) enum EventLocation {
2388 Memory(Position),
2390
2391 Store,
2393}
2394
2395pub(super) use private::RoomEventCacheStateLock;
2396
2397#[cfg(test)]
2398mod tests {
2399 use matrix_sdk_base::event_cache::Event;
2400 use matrix_sdk_test::{async_test, event_factory::EventFactory};
2401 use ruma::{
2402 RoomId, event_id,
2403 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2404 room_id, user_id,
2405 };
2406
2407 use crate::test_utils::logged_in_client;
2408
2409 #[async_test]
2410 async fn test_find_event_by_id_with_edit_relation() {
2411 let original_id = event_id!("$original");
2412 let related_id = event_id!("$related");
2413 let room_id = room_id!("!galette:saucisse.bzh");
2414 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2415
2416 assert_relations(
2417 room_id,
2418 f.text_msg("Original event").event_id(original_id).into(),
2419 f.text_msg("* An edited event")
2420 .edit(
2421 original_id,
2422 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
2423 )
2424 .event_id(related_id)
2425 .into(),
2426 f,
2427 )
2428 .await;
2429 }
2430
2431 #[async_test]
2432 async fn test_find_event_by_id_with_thread_reply_relation() {
2433 let original_id = event_id!("$original");
2434 let related_id = event_id!("$related");
2435 let room_id = room_id!("!galette:saucisse.bzh");
2436 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2437
2438 assert_relations(
2439 room_id,
2440 f.text_msg("Original event").event_id(original_id).into(),
2441 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2442 f,
2443 )
2444 .await;
2445 }
2446
2447 #[async_test]
2448 async fn test_find_event_by_id_with_reaction_relation() {
2449 let original_id = event_id!("$original");
2450 let related_id = event_id!("$related");
2451 let room_id = room_id!("!galette:saucisse.bzh");
2452 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2453
2454 assert_relations(
2455 room_id,
2456 f.text_msg("Original event").event_id(original_id).into(),
2457 f.reaction(original_id, ":D").event_id(related_id).into(),
2458 f,
2459 )
2460 .await;
2461 }
2462
2463 #[async_test]
2464 async fn test_find_event_by_id_with_poll_response_relation() {
2465 let original_id = event_id!("$original");
2466 let related_id = event_id!("$related");
2467 let room_id = room_id!("!galette:saucisse.bzh");
2468 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2469
2470 assert_relations(
2471 room_id,
2472 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2473 .event_id(original_id)
2474 .into(),
2475 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2476 f,
2477 )
2478 .await;
2479 }
2480
2481 #[async_test]
2482 async fn test_find_event_by_id_with_poll_end_relation() {
2483 let original_id = event_id!("$original");
2484 let related_id = event_id!("$related");
2485 let room_id = room_id!("!galette:saucisse.bzh");
2486 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2487
2488 assert_relations(
2489 room_id,
2490 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2491 .event_id(original_id)
2492 .into(),
2493 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2494 f,
2495 )
2496 .await;
2497 }
2498
2499 #[async_test]
2500 async fn test_find_event_by_id_with_filtered_relationships() {
2501 let original_id = event_id!("$original");
2502 let related_id = event_id!("$related");
2503 let associated_related_id = event_id!("$recursive_related");
2504 let room_id = room_id!("!galette:saucisse.bzh");
2505 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2506
2507 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2508 let related_event = event_factory
2509 .text_msg("* Edited event")
2510 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2511 .event_id(related_id)
2512 .into();
2513 let associated_related_event =
2514 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2515
2516 let client = logged_in_client(None).await;
2517
2518 let event_cache = client.event_cache();
2519 event_cache.subscribe().unwrap();
2520
2521 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2522 let room = client.get_room(room_id).unwrap();
2523
2524 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2525
2526 room_event_cache.save_events([original_event]).await;
2528
2529 room_event_cache.save_events([related_event]).await;
2531
2532 room_event_cache.save_events([associated_related_event]).await;
2534
2535 let filter = Some(vec![RelationType::Replacement]);
2536 let (event, related_events) = room_event_cache
2537 .find_event_with_relations(original_id, filter)
2538 .await
2539 .expect("Failed to find the event with relations")
2540 .expect("Event has no relation");
2541 let cached_event_id = event.event_id().unwrap();
2543 assert_eq!(cached_event_id, original_id);
2544
2545 assert_eq!(related_events.len(), 1);
2547
2548 let related_event_id = related_events[0].event_id().unwrap();
2549 assert_eq!(related_event_id, related_id);
2550
2551 let filter = Some(vec![RelationType::Thread]);
2553 let (event, related_events) = room_event_cache
2554 .find_event_with_relations(original_id, filter)
2555 .await
2556 .expect("Failed to find the event with relations")
2557 .expect("Event has no relation");
2558
2559 let cached_event_id = event.event_id().unwrap();
2561 assert_eq!(cached_event_id, original_id);
2562 assert!(related_events.is_empty());
2564 }
2565
2566 #[async_test]
2567 async fn test_find_event_by_id_with_recursive_relation() {
2568 let original_id = event_id!("$original");
2569 let related_id = event_id!("$related");
2570 let associated_related_id = event_id!("$recursive_related");
2571 let room_id = room_id!("!galette:saucisse.bzh");
2572 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2573
2574 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2575 let related_event = event_factory
2576 .text_msg("* Edited event")
2577 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2578 .event_id(related_id)
2579 .into();
2580 let associated_related_event =
2581 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2582
2583 let client = logged_in_client(None).await;
2584
2585 let event_cache = client.event_cache();
2586 event_cache.subscribe().unwrap();
2587
2588 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2589 let room = client.get_room(room_id).unwrap();
2590
2591 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2592
2593 room_event_cache.save_events([original_event]).await;
2595
2596 room_event_cache.save_events([related_event]).await;
2598
2599 room_event_cache.save_events([associated_related_event]).await;
2601
2602 let (event, related_events) = room_event_cache
2603 .find_event_with_relations(original_id, None)
2604 .await
2605 .expect("Failed to find the event with relations")
2606 .expect("Event has no relation");
2607 let cached_event_id = event.event_id().unwrap();
2609 assert_eq!(cached_event_id, original_id);
2610
2611 assert_eq!(related_events.len(), 2);
2613
2614 let related_event_id = related_events[0].event_id().unwrap();
2615 assert_eq!(related_event_id, related_id);
2616 let related_event_id = related_events[1].event_id().unwrap();
2617 assert_eq!(related_event_id, associated_related_id);
2618 }
2619
2620 async fn assert_relations(
2621 room_id: &RoomId,
2622 original_event: Event,
2623 related_event: Event,
2624 event_factory: EventFactory,
2625 ) {
2626 let client = logged_in_client(None).await;
2627
2628 let event_cache = client.event_cache();
2629 event_cache.subscribe().unwrap();
2630
2631 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2632 let room = client.get_room(room_id).unwrap();
2633
2634 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2635
2636 let original_event_id = original_event.event_id().unwrap();
2638 room_event_cache.save_events([original_event]).await;
2639
2640 let unrelated_id = event_id!("$2");
2642 room_event_cache
2643 .save_events([event_factory
2644 .text_msg("An unrelated event")
2645 .event_id(unrelated_id)
2646 .into()])
2647 .await;
2648
2649 let related_id = related_event.event_id().unwrap();
2651 room_event_cache.save_events([related_event]).await;
2652
2653 let (event, related_events) = room_event_cache
2654 .find_event_with_relations(&original_event_id, None)
2655 .await
2656 .expect("Failed to find the event with relations")
2657 .expect("Event has no relation");
2658 let cached_event_id = event.event_id().unwrap();
2660 assert_eq!(cached_event_id, original_event_id);
2661
2662 let related_event_id = related_events[0].event_id().unwrap();
2664 assert_eq!(related_event_id, related_id);
2665 }
2666}
2667
2668#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
2670 use std::{ops::Not, sync::Arc};
2671
2672 use assert_matches::assert_matches;
2673 use assert_matches2::assert_let;
2674 use eyeball_im::VectorDiff;
2675 use futures_util::FutureExt;
2676 use matrix_sdk_base::{
2677 event_cache::{
2678 Gap,
2679 store::{EventCacheStore as _, MemoryStore},
2680 },
2681 linked_chunk::{
2682 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2683 lazy_loader::from_all_chunks,
2684 },
2685 store::StoreConfig,
2686 sync::{JoinedRoomUpdate, Timeline},
2687 };
2688 use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2689 use ruma::{
2690 EventId, OwnedUserId, event_id,
2691 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2692 room_id, user_id,
2693 };
2694 use tokio::task::yield_now;
2695
2696 use super::RoomEventCacheGenericUpdate;
2697 use crate::{
2698 assert_let_timeout,
2699 event_cache::{RoomEventCache, RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2700 test_utils::client::MockClientBuilder,
2701 };
2702
2703 #[async_test]
2704 async fn test_write_to_storage() {
2705 let room_id = room_id!("!galette:saucisse.bzh");
2706 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2707
2708 let event_cache_store = Arc::new(MemoryStore::new());
2709
2710 let client = MockClientBuilder::new(None)
2711 .on_builder(|builder| {
2712 builder.store_config(
2713 StoreConfig::new("hodlor".to_owned())
2714 .event_cache_store(event_cache_store.clone()),
2715 )
2716 })
2717 .build()
2718 .await;
2719
2720 let event_cache = client.event_cache();
2721
2722 event_cache.subscribe().unwrap();
2724
2725 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2726 let room = client.get_room(room_id).unwrap();
2727
2728 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2729 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2730
2731 let timeline = Timeline {
2733 limited: true,
2734 prev_batch: Some("raclette".to_owned()),
2735 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2736 };
2737
2738 room_event_cache
2739 .inner
2740 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2741 .await
2742 .unwrap();
2743
2744 assert_matches!(
2746 generic_stream.recv().await,
2747 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2748 assert_eq!(expected_room_id, room_id);
2749 }
2750 );
2751
2752 let linked_chunk = from_all_chunks::<3, _, _>(
2754 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2755 )
2756 .unwrap()
2757 .unwrap();
2758
2759 assert_eq!(linked_chunk.chunks().count(), 2);
2760
2761 let mut chunks = linked_chunk.chunks();
2762
2763 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2765 assert_eq!(gap.prev_token, "raclette");
2766 });
2767
2768 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2770 assert_eq!(events.len(), 1);
2771 let deserialized = events[0].raw().deserialize().unwrap();
2772 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2773 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2774 });
2775
2776 assert!(chunks.next().is_none());
2778 }
2779
2780 #[async_test]
2781 async fn test_write_to_storage_strips_bundled_relations() {
2782 let room_id = room_id!("!galette:saucisse.bzh");
2783 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2784
2785 let event_cache_store = Arc::new(MemoryStore::new());
2786
2787 let client = MockClientBuilder::new(None)
2788 .on_builder(|builder| {
2789 builder.store_config(
2790 StoreConfig::new("hodlor".to_owned())
2791 .event_cache_store(event_cache_store.clone()),
2792 )
2793 })
2794 .build()
2795 .await;
2796
2797 let event_cache = client.event_cache();
2798
2799 event_cache.subscribe().unwrap();
2801
2802 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2803 let room = client.get_room(room_id).unwrap();
2804
2805 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2806 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2807
2808 let ev = f
2810 .text_msg("hey yo")
2811 .sender(*ALICE)
2812 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2813 .into_event();
2814
2815 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2816
2817 room_event_cache
2818 .inner
2819 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2820 .await
2821 .unwrap();
2822
2823 assert_matches!(
2825 generic_stream.recv().await,
2826 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2827 assert_eq!(expected_room_id, room_id);
2828 }
2829 );
2830
2831 {
2833 let events = room_event_cache.events().await.unwrap();
2834
2835 assert_eq!(events.len(), 1);
2836
2837 let ev = events[0].raw().deserialize().unwrap();
2838 assert_let!(
2839 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2840 );
2841
2842 let original = msg.as_original().unwrap();
2843 assert_eq!(original.content.body(), "hey yo");
2844 assert!(original.unsigned.relations.replace.is_some());
2845 }
2846
2847 let linked_chunk = from_all_chunks::<3, _, _>(
2849 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2850 )
2851 .unwrap()
2852 .unwrap();
2853
2854 assert_eq!(linked_chunk.chunks().count(), 1);
2855
2856 let mut chunks = linked_chunk.chunks();
2857 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2858 assert_eq!(events.len(), 1);
2859
2860 let ev = events[0].raw().deserialize().unwrap();
2861 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2862
2863 let original = msg.as_original().unwrap();
2864 assert_eq!(original.content.body(), "hey yo");
2865 assert!(original.unsigned.relations.replace.is_none());
2866 });
2867
2868 assert!(chunks.next().is_none());
2870 }
2871
2872 #[async_test]
2873 async fn test_clear() {
2874 let room_id = room_id!("!galette:saucisse.bzh");
2875 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2876
2877 let event_cache_store = Arc::new(MemoryStore::new());
2878
2879 let event_id1 = event_id!("$1");
2880 let event_id2 = event_id!("$2");
2881
2882 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2883 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2884
2885 event_cache_store
2887 .handle_linked_chunk_updates(
2888 LinkedChunkId::Room(room_id),
2889 vec![
2890 Update::NewItemsChunk {
2892 previous: None,
2893 new: ChunkIdentifier::new(0),
2894 next: None,
2895 },
2896 Update::NewGapChunk {
2898 previous: Some(ChunkIdentifier::new(0)),
2899 new: ChunkIdentifier::new(42),
2901 next: None,
2902 gap: Gap { prev_token: "comté".to_owned() },
2903 },
2904 Update::NewItemsChunk {
2906 previous: Some(ChunkIdentifier::new(42)),
2907 new: ChunkIdentifier::new(1),
2908 next: None,
2909 },
2910 Update::PushItems {
2911 at: Position::new(ChunkIdentifier::new(1), 0),
2912 items: vec![ev1.clone()],
2913 },
2914 Update::NewItemsChunk {
2916 previous: Some(ChunkIdentifier::new(1)),
2917 new: ChunkIdentifier::new(2),
2918 next: None,
2919 },
2920 Update::PushItems {
2921 at: Position::new(ChunkIdentifier::new(2), 0),
2922 items: vec![ev2.clone()],
2923 },
2924 ],
2925 )
2926 .await
2927 .unwrap();
2928
2929 let client = MockClientBuilder::new(None)
2930 .on_builder(|builder| {
2931 builder.store_config(
2932 StoreConfig::new("hodlor".to_owned())
2933 .event_cache_store(event_cache_store.clone()),
2934 )
2935 })
2936 .build()
2937 .await;
2938
2939 let event_cache = client.event_cache();
2940
2941 event_cache.subscribe().unwrap();
2943
2944 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2945 let room = client.get_room(room_id).unwrap();
2946
2947 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2948
2949 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
2950 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2951
2952 {
2954 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
2955 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
2956 }
2957
2958 {
2960 assert_eq!(items.len(), 1);
2962 assert_eq!(items[0].event_id().unwrap(), event_id2);
2963
2964 assert!(stream.is_empty());
2965 }
2966
2967 {
2969 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2970
2971 assert_let_timeout!(
2972 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2973 );
2974 assert_eq!(diffs.len(), 1);
2975 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2976 assert_eq!(event.event_id().unwrap(), event_id1);
2978 });
2979
2980 assert!(stream.is_empty());
2981 }
2982
2983 room_event_cache.clear().await.unwrap();
2985
2986 assert_let_timeout!(
2988 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2989 );
2990 assert_eq!(diffs.len(), 1);
2991 assert_let!(VectorDiff::Clear = &diffs[0]);
2992
2993 assert_let_timeout!(
2995 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
2996 );
2997 assert_eq!(received_room_id, room_id);
2998
2999 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3002
3003 let items = room_event_cache.events().await.unwrap();
3005 assert!(items.is_empty());
3006
3007 let linked_chunk = from_all_chunks::<3, _, _>(
3009 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
3010 )
3011 .unwrap()
3012 .unwrap();
3013
3014 assert_eq!(linked_chunk.num_items(), 0);
3018 }
3019
3020 #[async_test]
3021 async fn test_load_from_storage() {
3022 let room_id = room_id!("!galette:saucisse.bzh");
3023 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
3024
3025 let event_cache_store = Arc::new(MemoryStore::new());
3026
3027 let event_id1 = event_id!("$1");
3028 let event_id2 = event_id!("$2");
3029
3030 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
3031 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
3032
3033 event_cache_store
3035 .handle_linked_chunk_updates(
3036 LinkedChunkId::Room(room_id),
3037 vec![
3038 Update::NewItemsChunk {
3040 previous: None,
3041 new: ChunkIdentifier::new(0),
3042 next: None,
3043 },
3044 Update::NewGapChunk {
3046 previous: Some(ChunkIdentifier::new(0)),
3047 new: ChunkIdentifier::new(42),
3049 next: None,
3050 gap: Gap { prev_token: "cheddar".to_owned() },
3051 },
3052 Update::NewItemsChunk {
3054 previous: Some(ChunkIdentifier::new(42)),
3055 new: ChunkIdentifier::new(1),
3056 next: None,
3057 },
3058 Update::PushItems {
3059 at: Position::new(ChunkIdentifier::new(1), 0),
3060 items: vec![ev1.clone()],
3061 },
3062 Update::NewItemsChunk {
3064 previous: Some(ChunkIdentifier::new(1)),
3065 new: ChunkIdentifier::new(2),
3066 next: None,
3067 },
3068 Update::PushItems {
3069 at: Position::new(ChunkIdentifier::new(2), 0),
3070 items: vec![ev2.clone()],
3071 },
3072 ],
3073 )
3074 .await
3075 .unwrap();
3076
3077 let client = MockClientBuilder::new(None)
3078 .on_builder(|builder| {
3079 builder.store_config(
3080 StoreConfig::new("hodlor".to_owned())
3081 .event_cache_store(event_cache_store.clone()),
3082 )
3083 })
3084 .build()
3085 .await;
3086
3087 let event_cache = client.event_cache();
3088
3089 event_cache.subscribe().unwrap();
3091
3092 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3094
3095 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3096 let room = client.get_room(room_id).unwrap();
3097
3098 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3099
3100 assert_matches!(
3103 generic_stream.recv().await,
3104 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3105 assert_eq!(room_id, expected_room_id);
3106 }
3107 );
3108
3109 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3110
3111 assert_eq!(items.len(), 1);
3114 assert_eq!(items[0].event_id().unwrap(), event_id2);
3115 assert!(stream.is_empty());
3116
3117 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3119 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3120
3121 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3123
3124 assert_let_timeout!(
3125 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3126 );
3127 assert_eq!(diffs.len(), 1);
3128 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3129 assert_eq!(event.event_id().unwrap(), event_id1);
3130 });
3131
3132 assert!(stream.is_empty());
3133
3134 assert_matches!(
3136 generic_stream.recv().await,
3137 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3138 assert_eq!(expected_room_id, room_id);
3139 }
3140 );
3141
3142 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
3144
3145 room_event_cache
3146 .inner
3147 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
3148 .await
3149 .unwrap();
3150
3151 assert!(generic_stream.recv().now_or_never().is_none());
3154
3155 let items = room_event_cache.events().await.unwrap();
3160 assert_eq!(items.len(), 2);
3161 assert_eq!(items[0].event_id().unwrap(), event_id1);
3162 assert_eq!(items[1].event_id().unwrap(), event_id2);
3163 }
3164
3165 #[async_test]
3166 async fn test_load_from_storage_resilient_to_failure() {
3167 let room_id = room_id!("!fondue:patate.ch");
3168 let event_cache_store = Arc::new(MemoryStore::new());
3169
3170 let event = EventFactory::new()
3171 .room(room_id)
3172 .sender(user_id!("@ben:saucisse.bzh"))
3173 .text_msg("foo")
3174 .event_id(event_id!("$42"))
3175 .into_event();
3176
3177 event_cache_store
3179 .handle_linked_chunk_updates(
3180 LinkedChunkId::Room(room_id),
3181 vec![
3182 Update::NewItemsChunk {
3183 previous: None,
3184 new: ChunkIdentifier::new(0),
3185 next: None,
3186 },
3187 Update::PushItems {
3188 at: Position::new(ChunkIdentifier::new(0), 0),
3189 items: vec![event],
3190 },
3191 Update::NewItemsChunk {
3192 previous: Some(ChunkIdentifier::new(0)),
3193 new: ChunkIdentifier::new(1),
3194 next: Some(ChunkIdentifier::new(0)),
3195 },
3196 ],
3197 )
3198 .await
3199 .unwrap();
3200
3201 let client = MockClientBuilder::new(None)
3202 .on_builder(|builder| {
3203 builder.store_config(
3204 StoreConfig::new("holder".to_owned())
3205 .event_cache_store(event_cache_store.clone()),
3206 )
3207 })
3208 .build()
3209 .await;
3210
3211 let event_cache = client.event_cache();
3212
3213 event_cache.subscribe().unwrap();
3215
3216 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3217 let room = client.get_room(room_id).unwrap();
3218
3219 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3220
3221 let items = room_event_cache.events().await.unwrap();
3222
3223 assert!(items.is_empty());
3226
3227 let raw_chunks =
3230 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
3231 assert!(raw_chunks.is_empty());
3232 }
3233
3234 #[async_test]
3235 async fn test_no_useless_gaps() {
3236 let room_id = room_id!("!galette:saucisse.bzh");
3237
3238 let client = MockClientBuilder::new(None).build().await;
3239
3240 let event_cache = client.event_cache();
3241 event_cache.subscribe().unwrap();
3242
3243 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3244 let room = client.get_room(room_id).unwrap();
3245 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3246 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3247
3248 let f = EventFactory::new().room(room_id).sender(*ALICE);
3249
3250 room_event_cache
3253 .inner
3254 .handle_joined_room_update(JoinedRoomUpdate {
3255 timeline: Timeline {
3256 limited: true,
3257 prev_batch: Some("raclette".to_owned()),
3258 events: vec![f.text_msg("hey yo").into_event()],
3259 },
3260 ..Default::default()
3261 })
3262 .await
3263 .unwrap();
3264
3265 assert_matches!(
3267 generic_stream.recv().await,
3268 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3269 assert_eq!(expected_room_id, room_id);
3270 }
3271 );
3272
3273 {
3274 let mut state = room_event_cache.inner.state.write().await.unwrap();
3275
3276 let mut num_gaps = 0;
3277 let mut num_events = 0;
3278
3279 for c in state.room_linked_chunk().chunks() {
3280 match c.content() {
3281 ChunkContent::Items(items) => num_events += items.len(),
3282 ChunkContent::Gap(_) => num_gaps += 1,
3283 }
3284 }
3285
3286 assert_eq!(num_gaps, 0);
3289 assert_eq!(num_events, 1);
3290
3291 assert_matches!(
3293 state.load_more_events_backwards().await.unwrap(),
3294 LoadMoreEventsBackwardsOutcome::Gap { .. }
3295 );
3296
3297 num_gaps = 0;
3298 num_events = 0;
3299 for c in state.room_linked_chunk().chunks() {
3300 match c.content() {
3301 ChunkContent::Items(items) => num_events += items.len(),
3302 ChunkContent::Gap(_) => num_gaps += 1,
3303 }
3304 }
3305
3306 assert_eq!(num_gaps, 1);
3308 assert_eq!(num_events, 1);
3309 }
3310
3311 room_event_cache
3314 .inner
3315 .handle_joined_room_update(JoinedRoomUpdate {
3316 timeline: Timeline {
3317 limited: false,
3318 prev_batch: Some("fondue".to_owned()),
3319 events: vec![f.text_msg("sup").into_event()],
3320 },
3321 ..Default::default()
3322 })
3323 .await
3324 .unwrap();
3325
3326 assert_matches!(
3328 generic_stream.recv().await,
3329 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3330 assert_eq!(expected_room_id, room_id);
3331 }
3332 );
3333
3334 {
3335 let state = room_event_cache.inner.state.read().await.unwrap();
3336
3337 let mut num_gaps = 0;
3338 let mut num_events = 0;
3339
3340 for c in state.room_linked_chunk().chunks() {
3341 match c.content() {
3342 ChunkContent::Items(items) => num_events += items.len(),
3343 ChunkContent::Gap(gap) => {
3344 assert_eq!(gap.prev_token, "raclette");
3345 num_gaps += 1;
3346 }
3347 }
3348 }
3349
3350 assert_eq!(num_gaps, 1);
3352 assert_eq!(num_events, 2);
3353 }
3354 }
3355
3356 #[async_test]
3357 async fn test_shrink_to_last_chunk() {
3358 let room_id = room_id!("!galette:saucisse.bzh");
3359
3360 let client = MockClientBuilder::new(None).build().await;
3361
3362 let f = EventFactory::new().room(room_id);
3363
3364 let evid1 = event_id!("$1");
3365 let evid2 = event_id!("$2");
3366
3367 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3368 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3369
3370 {
3372 client
3373 .event_cache_store()
3374 .lock()
3375 .await
3376 .expect("Could not acquire the event cache lock")
3377 .as_clean()
3378 .expect("Could not acquire a clean event cache lock")
3379 .handle_linked_chunk_updates(
3380 LinkedChunkId::Room(room_id),
3381 vec![
3382 Update::NewItemsChunk {
3383 previous: None,
3384 new: ChunkIdentifier::new(0),
3385 next: None,
3386 },
3387 Update::PushItems {
3388 at: Position::new(ChunkIdentifier::new(0), 0),
3389 items: vec![ev1],
3390 },
3391 Update::NewItemsChunk {
3392 previous: Some(ChunkIdentifier::new(0)),
3393 new: ChunkIdentifier::new(1),
3394 next: None,
3395 },
3396 Update::PushItems {
3397 at: Position::new(ChunkIdentifier::new(1), 0),
3398 items: vec![ev2],
3399 },
3400 ],
3401 )
3402 .await
3403 .unwrap();
3404 }
3405
3406 let event_cache = client.event_cache();
3407 event_cache.subscribe().unwrap();
3408
3409 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3410 let room = client.get_room(room_id).unwrap();
3411 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3412
3413 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
3415 assert_eq!(events.len(), 1);
3416 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3417 assert!(stream.is_empty());
3418
3419 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3420
3421 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3423 assert_eq!(outcome.events.len(), 1);
3424 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3425 assert!(outcome.reached_start);
3426
3427 assert_let_timeout!(
3429 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3430 );
3431 assert_eq!(diffs.len(), 1);
3432 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3433 assert_eq!(value.event_id().as_deref(), Some(evid1));
3434 });
3435
3436 assert!(stream.is_empty());
3437
3438 assert_let_timeout!(
3440 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3441 );
3442 assert_eq!(received_room_id, room_id);
3443
3444 let diffs = room_event_cache
3446 .inner
3447 .state
3448 .write()
3449 .await
3450 .unwrap()
3451 .force_shrink_to_last_chunk()
3452 .await
3453 .expect("shrinking should succeed");
3454
3455 assert_eq!(diffs.len(), 2);
3457 assert_matches!(&diffs[0], VectorDiff::Clear);
3458 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3459 assert_eq!(values.len(), 1);
3460 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3461 });
3462
3463 assert!(stream.is_empty());
3464
3465 assert!(generic_stream.is_empty());
3467
3468 let events = room_event_cache.events().await.unwrap();
3470 assert_eq!(events.len(), 1);
3471 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3472
3473 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3476 assert_eq!(outcome.events.len(), 1);
3477 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3478 assert!(outcome.reached_start);
3479 }
3480
3481 #[async_test]
3482 async fn test_room_ordering() {
3483 let room_id = room_id!("!galette:saucisse.bzh");
3484
3485 let client = MockClientBuilder::new(None).build().await;
3486
3487 let f = EventFactory::new().room(room_id).sender(*ALICE);
3488
3489 let evid1 = event_id!("$1");
3490 let evid2 = event_id!("$2");
3491 let evid3 = event_id!("$3");
3492
3493 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3494 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3495 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3496
3497 {
3499 client
3500 .event_cache_store()
3501 .lock()
3502 .await
3503 .expect("Could not acquire the event cache lock")
3504 .as_clean()
3505 .expect("Could not acquire a clean event cache lock")
3506 .handle_linked_chunk_updates(
3507 LinkedChunkId::Room(room_id),
3508 vec![
3509 Update::NewItemsChunk {
3510 previous: None,
3511 new: ChunkIdentifier::new(0),
3512 next: None,
3513 },
3514 Update::PushItems {
3515 at: Position::new(ChunkIdentifier::new(0), 0),
3516 items: vec![ev1, ev2],
3517 },
3518 Update::NewItemsChunk {
3519 previous: Some(ChunkIdentifier::new(0)),
3520 new: ChunkIdentifier::new(1),
3521 next: None,
3522 },
3523 Update::PushItems {
3524 at: Position::new(ChunkIdentifier::new(1), 0),
3525 items: vec![ev3.clone()],
3526 },
3527 ],
3528 )
3529 .await
3530 .unwrap();
3531 }
3532
3533 let event_cache = client.event_cache();
3534 event_cache.subscribe().unwrap();
3535
3536 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3537 let room = client.get_room(room_id).unwrap();
3538 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3539
3540 {
3543 let state = room_event_cache.inner.state.read().await.unwrap();
3544 let room_linked_chunk = state.room_linked_chunk();
3545
3546 assert_eq!(
3548 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3549 Some(0)
3550 );
3551
3552 assert_eq!(
3554 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3555 Some(1)
3556 );
3557
3558 let mut events = room_linked_chunk.events();
3560 let (pos, ev) = events.next().unwrap();
3561 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3562 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3563 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3564
3565 assert!(events.next().is_none());
3567 }
3568
3569 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3571 assert!(outcome.reached_start);
3572
3573 {
3576 let state = room_event_cache.inner.state.read().await.unwrap();
3577 let room_linked_chunk = state.room_linked_chunk();
3578
3579 for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
3580 assert_eq!(room_linked_chunk.event_order(pos), Some(i));
3581 }
3582 }
3583
3584 let evid4 = event_id!("$4");
3589 room_event_cache
3590 .inner
3591 .handle_joined_room_update(JoinedRoomUpdate {
3592 timeline: Timeline {
3593 limited: true,
3594 prev_batch: Some("fondue".to_owned()),
3595 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3596 },
3597 ..Default::default()
3598 })
3599 .await
3600 .unwrap();
3601
3602 {
3603 let state = room_event_cache.inner.state.read().await.unwrap();
3604 let room_linked_chunk = state.room_linked_chunk();
3605
3606 let mut events = room_linked_chunk.events();
3608
3609 let (pos, ev) = events.next().unwrap();
3610 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3611 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3612
3613 let (pos, ev) = events.next().unwrap();
3614 assert_eq!(ev.event_id().as_deref(), Some(evid4));
3615 assert_eq!(room_linked_chunk.event_order(pos), Some(3));
3616
3617 assert!(events.next().is_none());
3619
3620 assert_eq!(
3622 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3623 Some(0)
3624 );
3625 assert_eq!(
3626 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3627 Some(1)
3628 );
3629
3630 assert_eq!(
3633 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
3634 None
3635 );
3636 }
3637 }
3638
3639 #[async_test]
3640 async fn test_auto_shrink_after_all_subscribers_are_gone() {
3641 let room_id = room_id!("!galette:saucisse.bzh");
3642
3643 let client = MockClientBuilder::new(None).build().await;
3644
3645 let f = EventFactory::new().room(room_id);
3646
3647 let evid1 = event_id!("$1");
3648 let evid2 = event_id!("$2");
3649
3650 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3651 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3652
3653 {
3655 client
3656 .event_cache_store()
3657 .lock()
3658 .await
3659 .expect("Could not acquire the event cache lock")
3660 .as_clean()
3661 .expect("Could not acquire a clean event cache lock")
3662 .handle_linked_chunk_updates(
3663 LinkedChunkId::Room(room_id),
3664 vec![
3665 Update::NewItemsChunk {
3666 previous: None,
3667 new: ChunkIdentifier::new(0),
3668 next: None,
3669 },
3670 Update::PushItems {
3671 at: Position::new(ChunkIdentifier::new(0), 0),
3672 items: vec![ev1],
3673 },
3674 Update::NewItemsChunk {
3675 previous: Some(ChunkIdentifier::new(0)),
3676 new: ChunkIdentifier::new(1),
3677 next: None,
3678 },
3679 Update::PushItems {
3680 at: Position::new(ChunkIdentifier::new(1), 0),
3681 items: vec![ev2],
3682 },
3683 ],
3684 )
3685 .await
3686 .unwrap();
3687 }
3688
3689 let event_cache = client.event_cache();
3690 event_cache.subscribe().unwrap();
3691
3692 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3693 let room = client.get_room(room_id).unwrap();
3694 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3695
3696 let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
3698 assert_eq!(events1.len(), 1);
3699 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3700 assert!(stream1.is_empty());
3701
3702 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3704 assert_eq!(outcome.events.len(), 1);
3705 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3706 assert!(outcome.reached_start);
3707
3708 assert_let_timeout!(
3711 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3712 );
3713 assert_eq!(diffs.len(), 1);
3714 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3715 assert_eq!(value.event_id().as_deref(), Some(evid1));
3716 });
3717
3718 assert!(stream1.is_empty());
3719
3720 let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
3724 assert_eq!(events2.len(), 2);
3725 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3726 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3727 assert!(stream2.is_empty());
3728
3729 drop(stream1);
3731 yield_now().await;
3732
3733 assert!(stream2.is_empty());
3735
3736 drop(stream2);
3738 yield_now().await;
3739
3740 {
3743 let state = room_event_cache.inner.state.read().await.unwrap();
3745 assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
3746 }
3747
3748 let events3 = room_event_cache.events().await.unwrap();
3750 assert_eq!(events3.len(), 1);
3751 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3752 }
3753
3754 #[async_test]
3755 async fn test_rfind_map_event_in_memory_by() {
3756 let user_id = user_id!("@mnt_io:matrix.org");
3757 let room_id = room_id!("!raclette:patate.ch");
3758 let client = MockClientBuilder::new(None).build().await;
3759
3760 let event_factory = EventFactory::new().room(room_id);
3761
3762 let event_id_0 = event_id!("$ev0");
3763 let event_id_1 = event_id!("$ev1");
3764 let event_id_2 = event_id!("$ev2");
3765 let event_id_3 = event_id!("$ev3");
3766
3767 let event_0 =
3768 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3769 let event_1 =
3770 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3771 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3772 let event_3 =
3773 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3774
3775 {
3778 client
3779 .event_cache_store()
3780 .lock()
3781 .await
3782 .expect("Could not acquire the event cache lock")
3783 .as_clean()
3784 .expect("Could not acquire a clean event cache lock")
3785 .handle_linked_chunk_updates(
3786 LinkedChunkId::Room(room_id),
3787 vec![
3788 Update::NewItemsChunk {
3789 previous: None,
3790 new: ChunkIdentifier::new(0),
3791 next: None,
3792 },
3793 Update::PushItems {
3794 at: Position::new(ChunkIdentifier::new(0), 0),
3795 items: vec![event_3],
3796 },
3797 Update::NewItemsChunk {
3798 previous: Some(ChunkIdentifier::new(0)),
3799 new: ChunkIdentifier::new(1),
3800 next: None,
3801 },
3802 Update::PushItems {
3803 at: Position::new(ChunkIdentifier::new(1), 0),
3804 items: vec![event_0, event_1, event_2],
3805 },
3806 ],
3807 )
3808 .await
3809 .unwrap();
3810 }
3811
3812 let event_cache = client.event_cache();
3813 event_cache.subscribe().unwrap();
3814
3815 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3816 let room = client.get_room(room_id).unwrap();
3817 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3818
3819 assert_matches!(
3821 room_event_cache
3822 .rfind_map_event_in_memory_by(|event, previous_event_id| {
3823 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| (event.event_id(), previous_event_id))
3824 })
3825 .await,
3826 Ok(Some((event_id, previous_event_id))) => {
3827 assert_eq!(event_id.as_deref(), Some(event_id_0));
3828 assert!(previous_event_id.is_none());
3829 }
3830 );
3831
3832 assert_matches!(
3835 room_event_cache
3836 .rfind_map_event_in_memory_by(|event, previous_event_id| {
3837 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| (event.event_id(), previous_event_id))
3838 })
3839 .await,
3840 Ok(Some((event_id, previous_event_id))) => {
3841 assert_eq!(event_id.as_deref(), Some(event_id_2));
3842 assert_eq!(previous_event_id.as_deref(), Some(event_id_1));
3843 }
3844 );
3845
3846 assert!(
3848 room_event_cache
3849 .rfind_map_event_in_memory_by(|event, _| {
3850 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3851 == Some(user_id))
3852 .then(|| event.event_id())
3853 })
3854 .await
3855 .unwrap()
3856 .is_none()
3857 );
3858
3859 assert!(
3861 room_event_cache
3862 .rfind_map_event_in_memory_by(|_, _| None::<()>)
3863 .await
3864 .unwrap()
3865 .is_none()
3866 );
3867 }
3868
3869 #[async_test]
3870 async fn test_reload_when_dirty() {
3871 let user_id = user_id!("@mnt_io:matrix.org");
3872 let room_id = room_id!("!raclette:patate.ch");
3873
3874 let event_cache_store = MemoryStore::new();
3876
3877 let client_p0 = MockClientBuilder::new(None)
3879 .on_builder(|builder| {
3880 builder.store_config(
3881 StoreConfig::new("process #0".to_owned())
3882 .event_cache_store(event_cache_store.clone()),
3883 )
3884 })
3885 .build()
3886 .await;
3887
3888 let client_p1 = MockClientBuilder::new(None)
3890 .on_builder(|builder| {
3891 builder.store_config(
3892 StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
3893 )
3894 })
3895 .build()
3896 .await;
3897
3898 let event_factory = EventFactory::new().room(room_id).sender(user_id);
3899
3900 let ev_id_0 = event_id!("$ev_0");
3901 let ev_id_1 = event_id!("$ev_1");
3902
3903 let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
3904 let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
3905
3906 client_p0
3908 .event_cache_store()
3909 .lock()
3910 .await
3911 .expect("[p0] Could not acquire the event cache lock")
3912 .as_clean()
3913 .expect("[p0] Could not acquire a clean event cache lock")
3914 .handle_linked_chunk_updates(
3915 LinkedChunkId::Room(room_id),
3916 vec![
3917 Update::NewItemsChunk {
3918 previous: None,
3919 new: ChunkIdentifier::new(0),
3920 next: None,
3921 },
3922 Update::PushItems {
3923 at: Position::new(ChunkIdentifier::new(0), 0),
3924 items: vec![ev_0],
3925 },
3926 Update::NewItemsChunk {
3927 previous: Some(ChunkIdentifier::new(0)),
3928 new: ChunkIdentifier::new(1),
3929 next: None,
3930 },
3931 Update::PushItems {
3932 at: Position::new(ChunkIdentifier::new(1), 0),
3933 items: vec![ev_1],
3934 },
3935 ],
3936 )
3937 .await
3938 .unwrap();
3939
3940 let (room_event_cache_p0, room_event_cache_p1) = {
3942 let event_cache_p0 = client_p0.event_cache();
3943 event_cache_p0.subscribe().unwrap();
3944
3945 let event_cache_p1 = client_p1.event_cache();
3946 event_cache_p1.subscribe().unwrap();
3947
3948 client_p0.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3949 client_p1.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3950
3951 let (room_event_cache_p0, _drop_handles) =
3952 client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
3953 let (room_event_cache_p1, _drop_handles) =
3954 client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
3955
3956 (room_event_cache_p0, room_event_cache_p1)
3957 };
3958
3959 let mut updates_stream_p0 = {
3964 let room_event_cache = &room_event_cache_p0;
3965
3966 let (initial_updates, mut updates_stream) =
3967 room_event_cache_p0.subscribe().await.unwrap();
3968
3969 assert_eq!(initial_updates.len(), 1);
3971 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
3972 assert!(updates_stream.is_empty());
3973
3974 assert!(event_loaded(room_event_cache, ev_id_1).await);
3976
3977 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
3979
3980 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
3982
3983 assert_matches!(
3985 updates_stream.recv().await.unwrap(),
3986 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
3987 assert_eq!(diffs.len(), 1, "{diffs:#?}");
3988 assert_matches!(
3989 &diffs[0],
3990 VectorDiff::Insert { index: 0, value: event } => {
3991 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
3992 }
3993 );
3994 }
3995 );
3996
3997 assert!(event_loaded(room_event_cache, ev_id_0).await);
3999
4000 updates_stream
4001 };
4002
4003 let mut updates_stream_p1 = {
4005 let room_event_cache = &room_event_cache_p1;
4006 let (initial_updates, mut updates_stream) =
4007 room_event_cache_p1.subscribe().await.unwrap();
4008
4009 assert_eq!(initial_updates.len(), 1);
4011 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
4012 assert!(updates_stream.is_empty());
4013
4014 assert!(event_loaded(room_event_cache, ev_id_1).await);
4016
4017 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4019
4020 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4022
4023 assert_matches!(
4025 updates_stream.recv().await.unwrap(),
4026 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4027 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4028 assert_matches!(
4029 &diffs[0],
4030 VectorDiff::Insert { index: 0, value: event } => {
4031 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4032 }
4033 );
4034 }
4035 );
4036
4037 assert!(event_loaded(room_event_cache, ev_id_0).await);
4039
4040 updates_stream
4041 };
4042
4043 for _ in 0..3 {
4045 {
4049 let room_event_cache = &room_event_cache_p0;
4050 let updates_stream = &mut updates_stream_p0;
4051
4052 assert!(event_loaded(room_event_cache, ev_id_1).await);
4054
4055 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4058
4059 assert_matches!(
4061 updates_stream.recv().await.unwrap(),
4062 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4063 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4064 assert_matches!(&diffs[0], VectorDiff::Clear);
4065 assert_matches!(
4066 &diffs[1],
4067 VectorDiff::Append { values: events } => {
4068 assert_eq!(events.len(), 1);
4069 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4070 }
4071 );
4072 }
4073 );
4074
4075 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4077
4078 assert!(event_loaded(room_event_cache, ev_id_0).await);
4080
4081 assert_matches!(
4083 updates_stream.recv().await.unwrap(),
4084 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4085 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4086 assert_matches!(
4087 &diffs[0],
4088 VectorDiff::Insert { index: 0, value: event } => {
4089 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4090 }
4091 );
4092 }
4093 );
4094 }
4095
4096 {
4100 let room_event_cache = &room_event_cache_p1;
4101 let updates_stream = &mut updates_stream_p1;
4102
4103 assert!(event_loaded(room_event_cache, ev_id_1).await);
4105
4106 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4109
4110 assert_matches!(
4112 updates_stream.recv().await.unwrap(),
4113 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4114 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4115 assert_matches!(&diffs[0], VectorDiff::Clear);
4116 assert_matches!(
4117 &diffs[1],
4118 VectorDiff::Append { values: events } => {
4119 assert_eq!(events.len(), 1);
4120 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4121 }
4122 );
4123 }
4124 );
4125
4126 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4128
4129 assert!(event_loaded(room_event_cache, ev_id_0).await);
4131
4132 assert_matches!(
4134 updates_stream.recv().await.unwrap(),
4135 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4136 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4137 assert_matches!(
4138 &diffs[0],
4139 VectorDiff::Insert { index: 0, value: event } => {
4140 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4141 }
4142 );
4143 }
4144 );
4145 }
4146 }
4147
4148 for _ in 0..3 {
4151 {
4152 let room_event_cache = &room_event_cache_p0;
4153 let updates_stream = &mut updates_stream_p0;
4154
4155 let guard = room_event_cache.inner.state.read().await.unwrap();
4156
4157 assert!(guard.is_dirty().not());
4163
4164 assert_matches!(
4166 updates_stream.recv().await.unwrap(),
4167 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4168 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4169 assert_matches!(&diffs[0], VectorDiff::Clear);
4170 assert_matches!(
4171 &diffs[1],
4172 VectorDiff::Append { values: events } => {
4173 assert_eq!(events.len(), 1);
4174 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4175 }
4176 );
4177 }
4178 );
4179
4180 assert!(event_loaded(room_event_cache, ev_id_1).await);
4181 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4182
4183 drop(guard);
4189
4190 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4191 assert!(event_loaded(room_event_cache, ev_id_0).await);
4192
4193 assert_matches!(
4195 updates_stream.recv().await.unwrap(),
4196 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4197 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4198 assert_matches!(
4199 &diffs[0],
4200 VectorDiff::Insert { index: 0, value: event } => {
4201 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4202 }
4203 );
4204 }
4205 );
4206 }
4207
4208 {
4209 let room_event_cache = &room_event_cache_p1;
4210 let updates_stream = &mut updates_stream_p1;
4211
4212 let guard = room_event_cache.inner.state.read().await.unwrap();
4213
4214 assert!(guard.is_dirty().not());
4219
4220 assert_matches!(
4222 updates_stream.recv().await.unwrap(),
4223 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4224 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4225 assert_matches!(&diffs[0], VectorDiff::Clear);
4226 assert_matches!(
4227 &diffs[1],
4228 VectorDiff::Append { values: events } => {
4229 assert_eq!(events.len(), 1);
4230 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4231 }
4232 );
4233 }
4234 );
4235
4236 assert!(event_loaded(room_event_cache, ev_id_1).await);
4237 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4238
4239 drop(guard);
4245
4246 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4247 assert!(event_loaded(room_event_cache, ev_id_0).await);
4248
4249 assert_matches!(
4251 updates_stream.recv().await.unwrap(),
4252 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4253 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4254 assert_matches!(
4255 &diffs[0],
4256 VectorDiff::Insert { index: 0, value: event } => {
4257 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4258 }
4259 );
4260 }
4261 );
4262 }
4263 }
4264
4265 for _ in 0..3 {
4267 {
4268 let room_event_cache = &room_event_cache_p0;
4269 let updates_stream = &mut updates_stream_p0;
4270
4271 let guard = room_event_cache.inner.state.write().await.unwrap();
4272
4273 assert!(guard.is_dirty().not());
4275
4276 assert_matches!(
4278 updates_stream.recv().await.unwrap(),
4279 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4280 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4281 assert_matches!(&diffs[0], VectorDiff::Clear);
4282 assert_matches!(
4283 &diffs[1],
4284 VectorDiff::Append { values: events } => {
4285 assert_eq!(events.len(), 1);
4286 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4287 }
4288 );
4289 }
4290 );
4291
4292 drop(guard);
4295
4296 assert!(event_loaded(room_event_cache, ev_id_1).await);
4297 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4298
4299 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4300 assert!(event_loaded(room_event_cache, ev_id_0).await);
4301
4302 assert_matches!(
4304 updates_stream.recv().await.unwrap(),
4305 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4306 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4307 assert_matches!(
4308 &diffs[0],
4309 VectorDiff::Insert { index: 0, value: event } => {
4310 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4311 }
4312 );
4313 }
4314 );
4315 }
4316
4317 {
4318 let room_event_cache = &room_event_cache_p1;
4319 let updates_stream = &mut updates_stream_p1;
4320
4321 let guard = room_event_cache.inner.state.write().await.unwrap();
4322
4323 assert!(guard.is_dirty().not());
4325
4326 assert_matches!(
4328 updates_stream.recv().await.unwrap(),
4329 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4330 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4331 assert_matches!(&diffs[0], VectorDiff::Clear);
4332 assert_matches!(
4333 &diffs[1],
4334 VectorDiff::Append { values: events } => {
4335 assert_eq!(events.len(), 1);
4336 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4337 }
4338 );
4339 }
4340 );
4341
4342 drop(guard);
4345
4346 assert!(event_loaded(room_event_cache, ev_id_1).await);
4347 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4348
4349 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4350 assert!(event_loaded(room_event_cache, ev_id_0).await);
4351
4352 assert_matches!(
4354 updates_stream.recv().await.unwrap(),
4355 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4356 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4357 assert_matches!(
4358 &diffs[0],
4359 VectorDiff::Insert { index: 0, value: event } => {
4360 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4361 }
4362 );
4363 }
4364 );
4365 }
4366 }
4367 }
4368
4369 #[async_test]
4370 async fn test_load_when_dirty() {
4371 let room_id_0 = room_id!("!raclette:patate.ch");
4372 let room_id_1 = room_id!("!morbiflette:patate.ch");
4373
4374 let event_cache_store = MemoryStore::new();
4376
4377 let client_p0 = MockClientBuilder::new(None)
4379 .on_builder(|builder| {
4380 builder.store_config(
4381 StoreConfig::new("process #0".to_owned())
4382 .event_cache_store(event_cache_store.clone()),
4383 )
4384 })
4385 .build()
4386 .await;
4387
4388 let client_p1 = MockClientBuilder::new(None)
4390 .on_builder(|builder| {
4391 builder.store_config(
4392 StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
4393 )
4394 })
4395 .build()
4396 .await;
4397
4398 let (room_event_cache_0_p0, room_event_cache_0_p1) = {
4400 let event_cache_p0 = client_p0.event_cache();
4401 event_cache_p0.subscribe().unwrap();
4402
4403 let event_cache_p1 = client_p1.event_cache();
4404 event_cache_p1.subscribe().unwrap();
4405
4406 client_p0
4407 .base_client()
4408 .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4409 client_p0
4410 .base_client()
4411 .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4412
4413 client_p1
4414 .base_client()
4415 .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4416 client_p1
4417 .base_client()
4418 .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4419
4420 let (room_event_cache_0_p0, _drop_handles) =
4421 client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4422 let (room_event_cache_0_p1, _drop_handles) =
4423 client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4424
4425 (room_event_cache_0_p0, room_event_cache_0_p1)
4426 };
4427
4428 {
4430 drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
4431 drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
4432 }
4433
4434 let (room_event_cache_1_p0, _) =
4438 client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
4439
4440 {
4442 let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
4443 assert!(guard.is_dirty().not());
4444 }
4445
4446 }
4449
4450 async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
4451 room_event_cache
4452 .rfind_map_event_in_memory_by(|event, _previous_event_id| {
4453 (event.event_id().as_deref() == Some(event_id)).then_some(())
4454 })
4455 .await
4456 .unwrap()
4457 .is_some()
4458 }
4459}