1use std::{
18 collections::BTreeMap,
19 fmt,
20 ops::{Deref, DerefMut},
21 sync::{
22 Arc,
23 atomic::{AtomicUsize, Ordering},
24 },
25};
26
27use events::sort_positions_descending;
28use eyeball::SharedObservable;
29use eyeball_im::VectorDiff;
30use matrix_sdk_base::{
31 deserialized_responses::AmbiguityChange,
32 event_cache::Event,
33 linked_chunk::Position,
34 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
35};
36use ruma::{
37 EventId, OwnedEventId, OwnedRoomId,
38 api::Direction,
39 events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, relation::RelationType},
40 serde::Raw,
41};
42use tokio::sync::{
43 Notify,
44 broadcast::{Receiver, Sender},
45 mpsc,
46};
47use tracing::{instrument, trace, warn};
48
49use super::{
50 AutoShrinkChannelPayload, EventsOrigin, Result, RoomEventCacheGenericUpdate,
51 RoomEventCacheUpdate, RoomPagination, RoomPaginationStatus,
52};
53use crate::{
54 client::WeakClient,
55 event_cache::EventCacheError,
56 room::{IncludeRelations, RelationsOptions, WeakRoom},
57};
58
59pub(super) mod events;
60mod threads;
61
62pub use threads::ThreadEventCacheUpdate;
63
64#[derive(Clone)]
68pub struct RoomEventCache {
69 pub(super) inner: Arc<RoomEventCacheInner>,
70}
71
72impl fmt::Debug for RoomEventCache {
73 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
74 f.debug_struct("RoomEventCache").finish_non_exhaustive()
75 }
76}
77
78#[allow(missing_debug_implementations)]
88pub struct RoomEventCacheSubscriber {
89 recv: Receiver<RoomEventCacheUpdate>,
91
92 room_id: OwnedRoomId,
94
95 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
97
98 subscriber_count: Arc<AtomicUsize>,
100}
101
102impl Drop for RoomEventCacheSubscriber {
103 fn drop(&mut self) {
104 let previous_subscriber_count = self.subscriber_count.fetch_sub(1, Ordering::SeqCst);
105
106 trace!(
107 "dropping a room event cache subscriber; previous count: {previous_subscriber_count}"
108 );
109
110 if previous_subscriber_count == 1 {
111 let mut room_id = self.room_id.clone();
115
116 let mut num_attempts = 0;
122
123 while let Err(err) = self.auto_shrink_sender.try_send(room_id) {
124 num_attempts += 1;
125
126 if num_attempts > 1024 {
127 warn!(
130 "couldn't send notification to the auto-shrink channel \
131 after 1024 attempts; giving up"
132 );
133 return;
134 }
135
136 match err {
137 mpsc::error::TrySendError::Full(stolen_room_id) => {
138 room_id = stolen_room_id;
139 }
140 mpsc::error::TrySendError::Closed(_) => return,
141 }
142 }
143
144 trace!("sent notification to the parent channel that we were the last subscriber");
145 }
146 }
147}
148
149impl Deref for RoomEventCacheSubscriber {
150 type Target = Receiver<RoomEventCacheUpdate>;
151
152 fn deref(&self) -> &Self::Target {
153 &self.recv
154 }
155}
156
157impl DerefMut for RoomEventCacheSubscriber {
158 fn deref_mut(&mut self) -> &mut Self::Target {
159 &mut self.recv
160 }
161}
162
163impl RoomEventCache {
164 pub(super) fn new(
166 client: WeakClient,
167 state: RoomEventCacheStateLock,
168 pagination_status: SharedObservable<RoomPaginationStatus>,
169 room_id: OwnedRoomId,
170 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
171 update_sender: Sender<RoomEventCacheUpdate>,
172 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
173 ) -> Self {
174 Self {
175 inner: Arc::new(RoomEventCacheInner::new(
176 client,
177 state,
178 pagination_status,
179 room_id,
180 auto_shrink_sender,
181 update_sender,
182 generic_update_sender,
183 )),
184 }
185 }
186
187 pub async fn events(&self) -> Result<Vec<Event>> {
192 let state = self.inner.state.read().await?;
193
194 Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect())
195 }
196
197 pub async fn subscribe(&self) -> Result<(Vec<Event>, RoomEventCacheSubscriber)> {
204 let state = self.inner.state.read().await?;
205 let events =
206 state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect();
207
208 let subscriber_count = state.subscriber_count();
209 let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst);
210 trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1);
211
212 let recv = self.inner.update_sender.subscribe();
213 let subscriber = RoomEventCacheSubscriber {
214 recv,
215 room_id: self.inner.room_id.clone(),
216 auto_shrink_sender: self.inner.auto_shrink_sender.clone(),
217 subscriber_count: subscriber_count.clone(),
218 };
219
220 Ok((events, subscriber))
221 }
222
223 pub async fn subscribe_to_thread(
226 &self,
227 thread_root: OwnedEventId,
228 ) -> Result<(Vec<Event>, Receiver<ThreadEventCacheUpdate>)> {
229 let mut state = self.inner.state.write().await?;
230 Ok(state.subscribe_to_thread(thread_root))
231 }
232
233 #[instrument(skip(self), fields(room_id = %self.inner.room_id))]
238 pub async fn paginate_thread_backwards(
239 &self,
240 thread_root: OwnedEventId,
241 num_events: u16,
242 ) -> Result<bool> {
243 let room = self.inner.weak_room.get().ok_or(EventCacheError::ClientDropped)?;
244
245 let mut outcome =
247 self.inner.state.write().await?.load_more_thread_events_backwards(thread_root.clone());
248
249 loop {
250 match outcome {
251 LoadMoreEventsBackwardsOutcome::Gap { prev_token } => {
252 let options = RelationsOptions {
254 from: prev_token.clone(),
255 dir: Direction::Backward,
256 limit: Some(num_events.into()),
257 include_relations: IncludeRelations::AllRelations,
258 recurse: true,
259 };
260
261 let mut result = room
262 .relations(thread_root.clone(), options)
263 .await
264 .map_err(|err| EventCacheError::BackpaginationError(Box::new(err)))?;
265
266 let reached_start = result.next_batch_token.is_none();
267 trace!(num_events = result.chunk.len(), %reached_start, "received a /relations response");
268
269 let root_event =
272 if reached_start {
273 Some(room.load_or_fetch_event(&thread_root, None).await.map_err(
275 |err| EventCacheError::BackpaginationError(Box::new(err)),
276 )?)
277 } else {
278 None
279 };
280
281 let mut state = self.inner.state.write().await?;
282
283 state.save_events(result.chunk.iter().cloned()).await?;
285
286 result.chunk.extend(root_event);
289
290 if let Some(outcome) = state.finish_thread_network_pagination(
291 thread_root.clone(),
292 prev_token,
293 result.next_batch_token,
294 result.chunk,
295 ) {
296 return Ok(outcome.reached_start);
297 }
298
299 outcome = state.load_more_thread_events_backwards(thread_root.clone());
301 }
302
303 LoadMoreEventsBackwardsOutcome::StartOfTimeline => {
304 return Ok(true);
306 }
307
308 LoadMoreEventsBackwardsOutcome::Events { .. } => {
309 unimplemented!("loading from disk for threads is not implemented yet");
311 }
312 }
313 }
314 }
315
316 pub fn pagination(&self) -> RoomPagination {
319 RoomPagination { inner: self.inner.clone() }
320 }
321
322 pub async fn rfind_map_event_in_memory_by<O, P>(&self, predicate: P) -> Result<Option<O>>
331 where
332 P: FnMut(&Event, Option<&Event>) -> Option<O>,
333 {
334 Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate))
335 }
336
337 pub async fn find_event(&self, event_id: &EventId) -> Result<Option<Event>> {
342 Ok(self
343 .inner
344 .state
345 .read()
346 .await?
347 .find_event(event_id)
348 .await
349 .ok()
350 .flatten()
351 .map(|(_loc, event)| event))
352 }
353
354 pub async fn find_event_with_relations(
366 &self,
367 event_id: &EventId,
368 filter: Option<Vec<RelationType>>,
369 ) -> Result<Option<(Event, Vec<Event>)>> {
370 Ok(self
372 .inner
373 .state
374 .read()
375 .await?
376 .find_event_with_relations(event_id, filter.clone())
377 .await
378 .ok()
379 .flatten())
380 }
381
382 pub async fn clear(&self) -> Result<()> {
387 let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?;
389
390 let _ = self.inner.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
392 diffs: updates_as_vector_diffs,
393 origin: EventsOrigin::Cache,
394 });
395
396 let _ = self
398 .inner
399 .generic_update_sender
400 .send(RoomEventCacheGenericUpdate { room_id: self.inner.room_id.clone() });
401
402 Ok(())
403 }
404
405 pub(crate) async fn insert_sent_event_from_send_queue(&self, event: Event) -> Result<()> {
407 self.inner
408 .handle_timeline(
409 Timeline { limited: false, prev_batch: None, events: vec![event] },
410 Vec::new(),
411 BTreeMap::new(),
412 )
413 .await
414 }
415
416 pub(crate) async fn save_events(&self, events: impl IntoIterator<Item = Event>) {
419 match self.inner.state.write().await {
420 Ok(mut state_guard) => {
421 if let Err(err) = state_guard.save_events(events).await {
422 warn!("couldn't save event in the event cache: {err}");
423 }
424 }
425
426 Err(err) => {
427 warn!("couldn't save event in the event cache: {err}");
428 }
429 }
430 }
431
432 pub async fn debug_string(&self) -> Vec<String> {
435 match self.inner.state.read().await {
436 Ok(read_guard) => read_guard.room_linked_chunk().debug_string(),
437 Err(err) => {
438 warn!(?err, "Failed to obtain the read guard for the `RoomEventCache`");
439
440 vec![]
441 }
442 }
443 }
444}
445
446pub(super) struct RoomEventCacheInner {
448 pub(super) room_id: OwnedRoomId,
450
451 pub weak_room: WeakRoom,
452
453 pub state: RoomEventCacheStateLock,
455
456 pub pagination_batch_token_notifier: Notify,
458
459 pub pagination_status: SharedObservable<RoomPaginationStatus>,
460
461 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
466
467 pub update_sender: Sender<RoomEventCacheUpdate>,
469
470 pub(super) generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
476}
477
478impl RoomEventCacheInner {
479 fn new(
482 client: WeakClient,
483 state: RoomEventCacheStateLock,
484 pagination_status: SharedObservable<RoomPaginationStatus>,
485 room_id: OwnedRoomId,
486 auto_shrink_sender: mpsc::Sender<AutoShrinkChannelPayload>,
487 update_sender: Sender<RoomEventCacheUpdate>,
488 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
489 ) -> Self {
490 let weak_room = WeakRoom::new(client, room_id);
491
492 Self {
493 room_id: weak_room.room_id().to_owned(),
494 weak_room,
495 state,
496 update_sender,
497 pagination_batch_token_notifier: Default::default(),
498 auto_shrink_sender,
499 pagination_status,
500 generic_update_sender,
501 }
502 }
503
504 fn handle_account_data(&self, account_data: Vec<Raw<AnyRoomAccountDataEvent>>) {
505 if account_data.is_empty() {
506 return;
507 }
508
509 let mut handled_read_marker = false;
510
511 trace!("Handling account data");
512
513 for raw_event in account_data {
514 match raw_event.deserialize() {
515 Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
516 if handled_read_marker {
519 continue;
520 }
521
522 handled_read_marker = true;
523
524 let _ = self.update_sender.send(RoomEventCacheUpdate::MoveReadMarkerTo {
526 event_id: ev.content.event_id,
527 });
528 }
529
530 Ok(_) => {
531 }
534
535 Err(e) => {
536 let event_type = raw_event.get_field::<String>("type").ok().flatten();
537 warn!(event_type, "Failed to deserialize account data: {e}");
538 }
539 }
540 }
541 }
542
543 #[instrument(skip_all, fields(room_id = %self.room_id))]
544 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
545 self.handle_timeline(
546 updates.timeline,
547 updates.ephemeral.clone(),
548 updates.ambiguity_changes,
549 )
550 .await?;
551 self.handle_account_data(updates.account_data);
552
553 Ok(())
554 }
555
556 #[instrument(skip_all, fields(room_id = %self.room_id))]
557 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
558 self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?;
559
560 Ok(())
561 }
562
563 async fn handle_timeline(
566 &self,
567 timeline: Timeline,
568 ephemeral_events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
569 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
570 ) -> Result<()> {
571 if timeline.events.is_empty()
572 && timeline.prev_batch.is_none()
573 && ephemeral_events.is_empty()
574 && ambiguity_changes.is_empty()
575 {
576 return Ok(());
577 }
578
579 trace!("adding new events");
581
582 let (stored_prev_batch_token, timeline_event_diffs) =
583 self.state.write().await?.handle_sync(timeline).await?;
584
585 if stored_prev_batch_token {
588 self.pagination_batch_token_notifier.notify_one();
589 }
590
591 if !timeline_event_diffs.is_empty() {
594 let _ = self.update_sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
595 diffs: timeline_event_diffs,
596 origin: EventsOrigin::Sync,
597 });
598
599 let _ = self
600 .generic_update_sender
601 .send(RoomEventCacheGenericUpdate { room_id: self.room_id.clone() });
602 }
603
604 if !ephemeral_events.is_empty() {
605 let _ = self
606 .update_sender
607 .send(RoomEventCacheUpdate::AddEphemeralEvents { events: ephemeral_events });
608 }
609
610 if !ambiguity_changes.is_empty() {
611 let _ =
612 self.update_sender.send(RoomEventCacheUpdate::UpdateMembers { ambiguity_changes });
613 }
614
615 Ok(())
616 }
617}
618
619#[derive(Debug)]
622pub(super) enum LoadMoreEventsBackwardsOutcome {
623 Gap {
625 prev_token: Option<String>,
628 },
629
630 StartOfTimeline,
632
633 Events { events: Vec<Event>, timeline_event_diffs: Vec<VectorDiff<Event>>, reached_start: bool },
635}
636
637mod private {
639 use std::{
640 collections::{BTreeMap, HashMap, HashSet},
641 sync::{
642 Arc,
643 atomic::{AtomicBool, AtomicUsize, Ordering},
644 },
645 };
646
647 use eyeball::SharedObservable;
648 use eyeball_im::VectorDiff;
649 use itertools::Itertools;
650 use matrix_sdk_base::{
651 apply_redaction, check_validity_of_replacement_events,
652 deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
653 event_cache::{
654 Event, Gap,
655 store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState},
656 },
657 linked_chunk::{
658 ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId,
659 OwnedLinkedChunkId, Position, Update, lazy_loader,
660 },
661 serde_helpers::{extract_edit_target, extract_thread_root},
662 sync::Timeline,
663 };
664 use matrix_sdk_common::executor::spawn;
665 use ruma::{
666 EventId, OwnedEventId, OwnedRoomId, RoomId,
667 events::{
668 AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType,
669 relation::RelationType, room::redaction::SyncRoomRedactionEvent,
670 },
671 room_version_rules::RoomVersionRules,
672 serde::Raw,
673 };
674 use tokio::sync::{
675 Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard,
676 broadcast::{Receiver, Sender},
677 };
678 use tracing::{debug, error, instrument, trace, warn};
679
680 use super::{
681 super::{
682 BackPaginationOutcome, EventCacheError, RoomEventCacheLinkedChunkUpdate,
683 RoomPaginationStatus, ThreadEventCacheUpdate,
684 deduplicator::{DeduplicationOutcome, filter_duplicate_events},
685 room::threads::ThreadEventCache,
686 },
687 EventLocation, EventsOrigin, LoadMoreEventsBackwardsOutcome, RoomEventCacheGenericUpdate,
688 RoomEventCacheUpdate,
689 events::EventLinkedChunk,
690 sort_positions_descending,
691 };
692
693 pub struct RoomEventCacheStateLock {
698 locked_state: RwLock<RoomEventCacheStateLockInner>,
700
701 read_lock_acquisition: Mutex<()>,
704 }
705
706 struct RoomEventCacheStateLockInner {
707 enabled_thread_support: bool,
709
710 room_id: OwnedRoomId,
712
713 store: EventCacheStoreLock,
715
716 room_linked_chunk: EventLinkedChunk,
719
720 threads: HashMap<OwnedEventId, ThreadEventCache>,
724
725 pagination_status: SharedObservable<RoomPaginationStatus>,
726
727 update_sender: Sender<RoomEventCacheUpdate>,
732
733 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
738
739 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
742
743 room_version_rules: RoomVersionRules,
745
746 waited_for_initial_prev_token: Arc<AtomicBool>,
751
752 subscriber_count: Arc<AtomicUsize>,
755 }
756
757 impl RoomEventCacheStateLock {
758 #[allow(clippy::too_many_arguments)]
769 pub async fn new(
770 room_id: OwnedRoomId,
771 room_version_rules: RoomVersionRules,
772 enabled_thread_support: bool,
773 update_sender: Sender<RoomEventCacheUpdate>,
774 generic_update_sender: Sender<RoomEventCacheGenericUpdate>,
775 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
776 store: EventCacheStoreLock,
777 pagination_status: SharedObservable<RoomPaginationStatus>,
778 ) -> Result<Self, EventCacheError> {
779 let store_guard = match store.lock().await? {
780 EventCacheStoreLockState::Clean(guard) => guard,
782
783 EventCacheStoreLockState::Dirty(guard) => {
786 EventCacheStoreLockGuard::clear_dirty(&guard);
787
788 guard
789 }
790 };
791
792 let linked_chunk_id = LinkedChunkId::Room(&room_id);
793
794 let full_linked_chunk_metadata =
799 match load_linked_chunk_metadata(&store_guard, linked_chunk_id).await {
800 Ok(metas) => metas,
801 Err(err) => {
802 error!(
803 "error when loading a linked chunk's metadata from the store: {err}"
804 );
805
806 store_guard
808 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
809 .await?;
810
811 None
813 }
814 };
815
816 let linked_chunk = match store_guard
817 .load_last_chunk(linked_chunk_id)
818 .await
819 .map_err(EventCacheError::from)
820 .and_then(|(last_chunk, chunk_identifier_generator)| {
821 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
822 .map_err(EventCacheError::from)
823 }) {
824 Ok(linked_chunk) => linked_chunk,
825 Err(err) => {
826 error!(
827 "error when loading a linked chunk's latest chunk from the store: {err}"
828 );
829
830 store_guard
832 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
833 .await?;
834
835 None
836 }
837 };
838
839 let waited_for_initial_prev_token = Arc::new(AtomicBool::new(false));
840
841 Ok(Self {
842 locked_state: RwLock::new(RoomEventCacheStateLockInner {
843 enabled_thread_support,
844 room_id,
845 store,
846 room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
847 linked_chunk,
848 full_linked_chunk_metadata,
849 ),
850 threads: HashMap::new(),
855 pagination_status,
856 update_sender,
857 generic_update_sender,
858 linked_chunk_update_sender,
859 room_version_rules,
860 waited_for_initial_prev_token,
861 subscriber_count: Default::default(),
862 }),
863 read_lock_acquisition: Mutex::new(()),
864 })
865 }
866
867 pub async fn read(&self) -> Result<RoomEventCacheStateLockReadGuard<'_>, EventCacheError> {
877 let _one_reader_guard = self.read_lock_acquisition.lock().await;
924
925 let state_guard = self.locked_state.read().await;
927
928 match state_guard.store.lock().await? {
929 EventCacheStoreLockState::Clean(store_guard) => {
930 Ok(RoomEventCacheStateLockReadGuard { state: state_guard, store: store_guard })
931 }
932 EventCacheStoreLockState::Dirty(store_guard) => {
933 drop(state_guard);
937 let state_guard = self.locked_state.write().await;
938
939 let mut guard = RoomEventCacheStateLockWriteGuard {
940 state: state_guard,
941 store: store_guard,
942 };
943
944 let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
946
947 EventCacheStoreLockGuard::clear_dirty(&guard.store);
949
950 let guard = guard.downgrade();
952
953 if !updates_as_vector_diffs.is_empty() {
955 let _ = guard.state.update_sender.send(
957 RoomEventCacheUpdate::UpdateTimelineEvents {
958 diffs: updates_as_vector_diffs,
959 origin: EventsOrigin::Cache,
960 },
961 );
962
963 let _ =
965 guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
966 room_id: guard.state.room_id.clone(),
967 });
968 }
969
970 Ok(guard)
971 }
972 }
973 }
974
975 pub async fn write(
986 &self,
987 ) -> Result<RoomEventCacheStateLockWriteGuard<'_>, EventCacheError> {
988 let state_guard = self.locked_state.write().await;
989
990 match state_guard.store.lock().await? {
991 EventCacheStoreLockState::Clean(store_guard) => {
992 Ok(RoomEventCacheStateLockWriteGuard { state: state_guard, store: store_guard })
993 }
994 EventCacheStoreLockState::Dirty(store_guard) => {
995 let mut guard = RoomEventCacheStateLockWriteGuard {
996 state: state_guard,
997 store: store_guard,
998 };
999
1000 let updates_as_vector_diffs = guard.force_shrink_to_last_chunk().await?;
1002
1003 EventCacheStoreLockGuard::clear_dirty(&guard.store);
1005
1006 if !updates_as_vector_diffs.is_empty() {
1008 let _ = guard.state.update_sender.send(
1010 RoomEventCacheUpdate::UpdateTimelineEvents {
1011 diffs: updates_as_vector_diffs,
1012 origin: EventsOrigin::Cache,
1013 },
1014 );
1015
1016 let _ =
1018 guard.state.generic_update_sender.send(RoomEventCacheGenericUpdate {
1019 room_id: guard.state.room_id.clone(),
1020 });
1021 }
1022
1023 Ok(guard)
1024 }
1025 }
1026 }
1027 }
1028
1029 pub struct RoomEventCacheStateLockReadGuard<'a> {
1031 state: RwLockReadGuard<'a, RoomEventCacheStateLockInner>,
1034
1035 store: EventCacheStoreLockGuard,
1037 }
1038
1039 pub struct RoomEventCacheStateLockWriteGuard<'a> {
1041 state: RwLockWriteGuard<'a, RoomEventCacheStateLockInner>,
1044
1045 store: EventCacheStoreLockGuard,
1047 }
1048
1049 impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1050 fn downgrade(self) -> RoomEventCacheStateLockReadGuard<'a> {
1058 RoomEventCacheStateLockReadGuard { state: self.state.downgrade(), store: self.store }
1059 }
1060 }
1061
1062 impl<'a> RoomEventCacheStateLockReadGuard<'a> {
1063 pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
1065 &self.state.room_linked_chunk
1066 }
1067
1068 pub fn subscriber_count(&self) -> &Arc<AtomicUsize> {
1069 &self.state.subscriber_count
1070 }
1071
1072 pub async fn find_event(
1077 &self,
1078 event_id: &EventId,
1079 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1080 find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1081 .await
1082 }
1083
1084 pub async fn find_event_with_relations(
1098 &self,
1099 event_id: &EventId,
1100 filters: Option<Vec<RelationType>>,
1101 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1102 find_event_with_relations(
1103 event_id,
1104 &self.state.room_id,
1105 filters,
1106 &self.state.room_linked_chunk,
1107 &self.store,
1108 )
1109 .await
1110 }
1111
1112 pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
1121 where
1122 P: FnMut(&Event, Option<&Event>) -> Option<O>,
1123 {
1124 self.state
1125 .room_linked_chunk
1126 .revents()
1127 .peekable()
1128 .batching(|iter| {
1129 iter.next().map(|(_position, event)| {
1130 (event, iter.peek().map(|(_next_position, next_event)| *next_event))
1131 })
1132 })
1133 .find_map(|(event, next_event_id)| predicate(event, next_event_id))
1134 }
1135
1136 #[cfg(test)]
1137 pub fn is_dirty(&self) -> bool {
1138 EventCacheStoreLockGuard::is_dirty(&self.store)
1139 }
1140 }
1141
1142 impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
1143 #[cfg(any(feature = "e2e-encryption", test))]
1145 pub fn room_linked_chunk(&mut self) -> &mut EventLinkedChunk {
1146 &mut self.state.room_linked_chunk
1147 }
1148
1149 pub fn waited_for_initial_prev_token(&self) -> &Arc<AtomicBool> {
1151 &self.state.waited_for_initial_prev_token
1152 }
1153
1154 pub async fn find_event(
1159 &self,
1160 event_id: &EventId,
1161 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
1162 find_event(event_id, &self.state.room_id, &self.state.room_linked_chunk, &self.store)
1163 .await
1164 }
1165
1166 pub async fn find_event_with_relations(
1180 &self,
1181 event_id: &EventId,
1182 filters: Option<Vec<RelationType>>,
1183 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
1184 find_event_with_relations(
1185 event_id,
1186 &self.state.room_id,
1187 filters,
1188 &self.state.room_linked_chunk,
1189 &self.store,
1190 )
1191 .await
1192 }
1193
1194 pub async fn load_more_events_backwards(
1196 &mut self,
1197 ) -> Result<LoadMoreEventsBackwardsOutcome, EventCacheError> {
1198 if let Some(prev_token) = self.state.room_linked_chunk.rgap().map(|gap| gap.prev_token)
1201 {
1202 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) });
1203 }
1204
1205 let prev_first_chunk = self
1206 .state
1207 .room_linked_chunk
1208 .chunks()
1209 .next()
1210 .expect("a linked chunk is never empty");
1211
1212 let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1214 let new_first_chunk = match self
1215 .store
1216 .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier())
1217 .await
1218 {
1219 Ok(Some(new_first_chunk)) => {
1220 new_first_chunk
1222 }
1223
1224 Ok(None) => {
1225 if self.state.room_linked_chunk.events().next().is_some() {
1230 trace!("chunk is fully loaded and non-empty: reached_start=true");
1233 return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
1234 }
1235
1236 return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: None });
1238 }
1239
1240 Err(err) => {
1241 error!("error when loading the previous chunk of a linked chunk: {err}");
1242
1243 self.store
1245 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1246 .await?;
1247
1248 return Err(err.into());
1250 }
1251 };
1252
1253 let chunk_content = new_first_chunk.content.clone();
1254
1255 let reached_start = new_first_chunk.previous.is_none();
1261
1262 if let Err(err) =
1263 self.state.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk)
1264 {
1265 error!("error when inserting the previous chunk into its linked chunk: {err}");
1266
1267 self.store
1269 .handle_linked_chunk_updates(
1270 LinkedChunkId::Room(&self.state.room_id),
1271 vec![Update::Clear],
1272 )
1273 .await?;
1274
1275 return Err(err.into());
1277 }
1278
1279 let _ = self.state.room_linked_chunk.store_updates().take();
1282
1283 let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1285
1286 Ok(match chunk_content {
1287 ChunkContent::Gap(gap) => {
1288 trace!("reloaded chunk from disk (gap)");
1289 LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(gap.prev_token) }
1290 }
1291
1292 ChunkContent::Items(events) => {
1293 trace!(?reached_start, "reloaded chunk from disk ({} items)", events.len());
1294 LoadMoreEventsBackwardsOutcome::Events {
1295 events,
1296 timeline_event_diffs,
1297 reached_start,
1298 }
1299 }
1300 })
1301 }
1302
1303 pub async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
1312 let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
1314 let (last_chunk, chunk_identifier_generator) =
1315 match self.store.load_last_chunk(linked_chunk_id).await {
1316 Ok(pair) => pair,
1317
1318 Err(err) => {
1319 error!("error when reloading a linked chunk from memory: {err}");
1321
1322 self.store
1324 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
1325 .await?;
1326
1327 (None, ChunkIdentifierGenerator::new_from_scratch())
1329 }
1330 };
1331
1332 debug!("unloading the linked chunk, and resetting it to its last chunk");
1333
1334 if let Err(err) =
1337 self.state.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
1338 {
1339 error!("error when replacing the linked chunk: {err}");
1340 return self.reset_internal().await;
1341 }
1342
1343 self.state
1347 .pagination_status
1348 .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1349
1350 let _ = self.state.room_linked_chunk.store_updates().take();
1353
1354 Ok(())
1355 }
1356
1357 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1360 pub async fn auto_shrink_if_no_subscribers(
1361 &mut self,
1362 ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
1363 let subscriber_count = self.state.subscriber_count.load(Ordering::SeqCst);
1364
1365 trace!(subscriber_count, "received request to auto-shrink");
1366
1367 if subscriber_count == 0 {
1368 self.shrink_to_last_chunk().await?;
1371
1372 Ok(Some(self.state.room_linked_chunk.updates_as_vector_diffs()))
1373 } else {
1374 Ok(None)
1375 }
1376 }
1377
1378 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1380 pub async fn force_shrink_to_last_chunk(
1381 &mut self,
1382 ) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1383 self.shrink_to_last_chunk().await?;
1384
1385 Ok(self.state.room_linked_chunk.updates_as_vector_diffs())
1386 }
1387
1388 #[instrument(skip_all)]
1394 pub async fn remove_events(
1395 &mut self,
1396 in_memory_events: Vec<(OwnedEventId, Position)>,
1397 in_store_events: Vec<(OwnedEventId, Position)>,
1398 ) -> Result<(), EventCacheError> {
1399 if !in_store_events.is_empty() {
1401 let mut positions = in_store_events
1402 .into_iter()
1403 .map(|(_event_id, position)| position)
1404 .collect::<Vec<_>>();
1405
1406 sort_positions_descending(&mut positions);
1407
1408 let updates = positions
1409 .into_iter()
1410 .map(|pos| Update::RemoveItem { at: pos })
1411 .collect::<Vec<_>>();
1412
1413 self.apply_store_only_updates(updates).await?;
1414 }
1415
1416 if in_memory_events.is_empty() {
1418 return Ok(());
1420 }
1421
1422 self.state
1424 .room_linked_chunk
1425 .remove_events_by_position(
1426 in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
1427 )
1428 .expect("failed to remove an event");
1429
1430 self.propagate_changes().await
1431 }
1432
1433 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
1434 let updates = self.state.room_linked_chunk.store_updates().take();
1435 self.send_updates_to_store(updates).await
1436 }
1437
1438 async fn apply_store_only_updates(
1445 &mut self,
1446 updates: Vec<Update<Event, Gap>>,
1447 ) -> Result<(), EventCacheError> {
1448 self.state.room_linked_chunk.order_tracker.map_updates(&updates);
1449 self.send_updates_to_store(updates).await
1450 }
1451
1452 async fn send_updates_to_store(
1453 &mut self,
1454 mut updates: Vec<Update<Event, Gap>>,
1455 ) -> Result<(), EventCacheError> {
1456 if updates.is_empty() {
1457 return Ok(());
1458 }
1459
1460 for update in updates.iter_mut() {
1462 match update {
1463 Update::PushItems { items, .. } => strip_relations_from_events(items),
1464 Update::ReplaceItem { item, .. } => strip_relations_from_event(item),
1465 Update::NewItemsChunk { .. }
1467 | Update::NewGapChunk { .. }
1468 | Update::RemoveChunk(_)
1469 | Update::RemoveItem { .. }
1470 | Update::DetachLastItems { .. }
1471 | Update::StartReattachItems
1472 | Update::EndReattachItems
1473 | Update::Clear => {}
1474 }
1475 }
1476
1477 let store = self.store.clone();
1484 let room_id = self.state.room_id.clone();
1485 let cloned_updates = updates.clone();
1486
1487 spawn(async move {
1488 trace!(updates = ?cloned_updates, "sending linked chunk updates to the store");
1489 let linked_chunk_id = LinkedChunkId::Room(&room_id);
1490 store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?;
1491 trace!("linked chunk updates applied");
1492
1493 super::Result::Ok(())
1494 })
1495 .await
1496 .expect("joining failed")?;
1497
1498 let _ = self.state.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
1500 linked_chunk_id: OwnedLinkedChunkId::Room(self.state.room_id.clone()),
1501 updates,
1502 });
1503
1504 Ok(())
1505 }
1506
1507 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
1513 self.reset_internal().await?;
1514
1515 let diff_updates = self.state.room_linked_chunk.updates_as_vector_diffs();
1516
1517 debug_assert_eq!(diff_updates.len(), 1);
1519 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
1520
1521 Ok(diff_updates)
1522 }
1523
1524 async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
1525 self.state.room_linked_chunk.reset();
1526
1527 for thread in self.state.threads.values_mut() {
1532 thread.clear();
1533 }
1534
1535 self.propagate_changes().await?;
1536
1537 self.state.waited_for_initial_prev_token.store(false, Ordering::SeqCst);
1541 self.state
1543 .pagination_status
1544 .set(RoomPaginationStatus::Idle { hit_timeline_start: false });
1545
1546 Ok(())
1547 }
1548
1549 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1557 pub async fn handle_sync(
1558 &mut self,
1559 mut timeline: Timeline,
1560 ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
1561 let mut prev_batch = timeline.prev_batch.take();
1562
1563 let DeduplicationOutcome {
1564 all_events: events,
1565 in_memory_duplicated_event_ids,
1566 in_store_duplicated_event_ids,
1567 non_empty_all_duplicates: all_duplicates,
1568 } = filter_duplicate_events(
1569 &self.store,
1570 LinkedChunkId::Room(&self.state.room_id),
1571 &self.state.room_linked_chunk,
1572 timeline.events,
1573 )
1574 .await?;
1575
1576 if !timeline.limited && self.state.room_linked_chunk.events().next().is_some()
1589 || all_duplicates
1590 {
1591 prev_batch = None;
1592 }
1593
1594 if prev_batch.is_some() {
1595 let mut summaries_to_update = Vec::new();
1600
1601 for (thread_root, thread) in self.state.threads.iter_mut() {
1602 thread.clear();
1604
1605 summaries_to_update.push(thread_root.clone());
1606 }
1607
1608 for thread_root in summaries_to_update {
1612 let Some((location, mut target_event)) = self.find_event(&thread_root).await?
1613 else {
1614 trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
1615 continue;
1616 };
1617
1618 if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
1619 prev_summary.latest_reply = None;
1620
1621 target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
1622
1623 self.replace_event_at(location, target_event).await?;
1624 }
1625 }
1626 }
1627
1628 if all_duplicates {
1629 return Ok((false, Vec::new()));
1632 }
1633
1634 let has_new_gap = prev_batch.is_some();
1635
1636 if !self.state.waited_for_initial_prev_token.load(Ordering::SeqCst) && has_new_gap {
1639 self.state.waited_for_initial_prev_token.store(true, Ordering::SeqCst);
1640 }
1641
1642 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1647 .await?;
1648
1649 self.state
1650 .room_linked_chunk
1651 .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events);
1652
1653 self.post_process_new_events(events, true).await?;
1654
1655 if timeline.limited && has_new_gap {
1656 self.shrink_to_last_chunk().await?;
1663 }
1664
1665 let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1666
1667 Ok((has_new_gap, timeline_event_diffs))
1668 }
1669
1670 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
1678 pub async fn handle_backpagination(
1679 &mut self,
1680 events: Vec<Event>,
1681 mut new_token: Option<String>,
1682 prev_token: Option<String>,
1683 ) -> Result<Option<(BackPaginationOutcome, Vec<VectorDiff<Event>>)>, EventCacheError>
1684 {
1685 let prev_gap_id = if let Some(token) = prev_token {
1688 let gap_chunk_id = self.state.room_linked_chunk.chunk_identifier(|chunk| {
1690 matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token)
1691 });
1692
1693 if gap_chunk_id.is_none() {
1694 return Ok(None);
1700 }
1701
1702 gap_chunk_id
1703 } else {
1704 None
1705 };
1706
1707 let DeduplicationOutcome {
1708 all_events: mut events,
1709 in_memory_duplicated_event_ids,
1710 in_store_duplicated_event_ids,
1711 non_empty_all_duplicates: all_duplicates,
1712 } = filter_duplicate_events(
1713 &self.store,
1714 LinkedChunkId::Room(&self.state.room_id),
1715 &self.state.room_linked_chunk,
1716 events,
1717 )
1718 .await?;
1719
1720 if !all_duplicates {
1734 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids)
1736 .await?;
1737 } else {
1738 events.clear();
1740 new_token = None;
1743 }
1744
1745 let topo_ordered_events = events.iter().rev().cloned().collect::<Vec<_>>();
1748
1749 let new_gap = new_token.map(|prev_token| Gap { prev_token });
1750 let reached_start = self.state.room_linked_chunk.finish_back_pagination(
1751 prev_gap_id,
1752 new_gap,
1753 &topo_ordered_events,
1754 );
1755
1756 self.post_process_new_events(topo_ordered_events, false).await?;
1758
1759 let event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
1760
1761 Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs)))
1762 }
1763
1764 pub fn subscribe_to_thread(
1767 &mut self,
1768 root: OwnedEventId,
1769 ) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
1770 self.get_or_reload_thread(root).subscribe()
1771 }
1772
1773 pub fn finish_thread_network_pagination(
1777 &mut self,
1778 root: OwnedEventId,
1779 prev_token: Option<String>,
1780 new_token: Option<String>,
1781 events: Vec<Event>,
1782 ) -> Option<BackPaginationOutcome> {
1783 self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events)
1784 }
1785
1786 pub fn load_more_thread_events_backwards(
1787 &mut self,
1788 root: OwnedEventId,
1789 ) -> LoadMoreEventsBackwardsOutcome {
1790 self.get_or_reload_thread(root).load_more_events_backwards()
1791 }
1792
1793 pub(in super::super) async fn post_process_new_events(
1802 &mut self,
1803 events: Vec<Event>,
1804 is_sync: bool,
1805 ) -> Result<(), EventCacheError> {
1806 self.propagate_changes().await?;
1808
1809 let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
1810
1811 for event in events {
1812 self.maybe_apply_new_redaction(&event).await?;
1813
1814 if self.state.enabled_thread_support {
1815 if is_sync {
1820 if let Some(thread_root) = extract_thread_root(event.raw()) {
1821 new_events_by_thread
1822 .entry(thread_root)
1823 .or_default()
1824 .push(event.clone());
1825 } else if let Some(event_id) = event.event_id() {
1826 if self.state.threads.contains_key(&event_id) {
1828 new_events_by_thread
1829 .entry(event_id)
1830 .or_default()
1831 .push(event.clone());
1832 }
1833 }
1834 }
1835
1836 if let Some(edit_target) = extract_edit_target(event.raw()) {
1838 if let Some((_location, edit_target_event)) =
1840 self.find_event(&edit_target).await?
1841 && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1842 {
1843 new_events_by_thread.entry(thread_root).or_default();
1846 }
1847 }
1848 }
1849
1850 if let Some(bundled_thread) = event.bundled_latest_thread_event {
1852 self.save_events([*bundled_thread]).await?;
1853 }
1854 }
1855
1856 if self.state.enabled_thread_support {
1857 self.update_threads(new_events_by_thread).await?;
1858 }
1859
1860 Ok(())
1861 }
1862
1863 fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache {
1864 let room_id = self.state.room_id.clone();
1867 let linked_chunk_update_sender = self.state.linked_chunk_update_sender.clone();
1868
1869 self.state.threads.entry(root_event_id.clone()).or_insert_with(|| {
1870 ThreadEventCache::new(room_id, root_event_id, linked_chunk_update_sender)
1871 })
1872 }
1873
1874 #[instrument(skip_all)]
1875 async fn update_threads(
1876 &mut self,
1877 new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1878 ) -> Result<(), EventCacheError> {
1879 for (thread_root, new_events) in new_events_by_thread {
1880 let thread_cache = self.get_or_reload_thread(thread_root.clone());
1881
1882 thread_cache.add_live_events(new_events);
1883
1884 let mut latest_event_id = thread_cache.latest_event_id();
1885
1886 if let Some(event_id) = latest_event_id.as_ref()
1889 && let Some((original_event, edits)) = self
1890 .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
1891 .await?
1892 {
1893 let latest_valid_edit = edits.into_iter().rfind(|edit| {
1894 let original_json = original_event.raw();
1895 let original_encryption_info = original_event.encryption_info();
1896 let replacement_json = edit.raw();
1897 let replacement_encryption_info = edit.encryption_info();
1898
1899 check_validity_of_replacement_events(
1900 original_json,
1901 original_encryption_info.map(|v| &**v),
1902 replacement_json,
1903 replacement_encryption_info.map(|v| &**v),
1904 )
1905 .is_ok()
1906 });
1907
1908 if let Some(latest_valid_edit) = latest_valid_edit {
1909 latest_event_id = latest_valid_edit.event_id();
1910 }
1911 }
1912
1913 self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
1914 }
1915
1916 Ok(())
1917 }
1918
1919 async fn maybe_update_thread_summary(
1921 &mut self,
1922 thread_root: OwnedEventId,
1923 latest_event_id: Option<OwnedEventId>,
1924 ) -> Result<(), EventCacheError> {
1925 let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1929 trace!(%thread_root, "thread root event is missing from the room linked chunk");
1930 return Ok(());
1931 };
1932
1933 let prev_summary = target_event.thread_summary.summary();
1934
1935 let num_replies = {
1944 let thread_replies = self
1945 .store
1946 .find_event_relations(
1947 &self.state.room_id,
1948 &thread_root,
1949 Some(&[RelationType::Thread]),
1950 )
1951 .await?;
1952 thread_replies.len().try_into().unwrap_or(u32::MAX)
1953 };
1954
1955 let new_summary = if num_replies > 0 {
1956 Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
1957 } else {
1958 None
1959 };
1960
1961 if prev_summary == new_summary.as_ref() {
1962 trace!(%thread_root, "thread summary is already up-to-date");
1963 return Ok(());
1964 }
1965
1966 trace!(%thread_root, "updating thread summary: {new_summary:?}");
1968 target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
1969 self.replace_event_at(location, target_event).await
1970 }
1971
1972 pub(crate) async fn replace_event_at(
1979 &mut self,
1980 location: EventLocation,
1981 event: Event,
1982 ) -> Result<(), EventCacheError> {
1983 match location {
1984 EventLocation::Memory(position) => {
1985 self.state
1986 .room_linked_chunk
1987 .replace_event_at(position, event)
1988 .expect("should have been a valid position of an item");
1989 self.propagate_changes().await?;
1992 }
1993 EventLocation::Store => {
1994 self.save_events([event]).await?;
1995 }
1996 }
1997
1998 Ok(())
1999 }
2000
2001 #[instrument(skip_all)]
2005 async fn maybe_apply_new_redaction(
2006 &mut self,
2007 event: &Event,
2008 ) -> Result<(), EventCacheError> {
2009 let raw_event = event.raw();
2010
2011 let Ok(Some(MessageLikeEventType::RoomRedaction)) =
2014 raw_event.get_field::<MessageLikeEventType>("type")
2015 else {
2016 return Ok(());
2017 };
2018
2019 let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
2022 redaction,
2023 ))) = raw_event.deserialize()
2024 else {
2025 return Ok(());
2026 };
2027
2028 let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else {
2029 warn!("missing target event id from the redaction event");
2030 return Ok(());
2031 };
2032
2033 let Some((location, mut target_event)) = self.find_event(event_id).await? else {
2035 trace!("redacted event is missing from the linked chunk");
2036 return Ok(());
2037 };
2038
2039 let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
2041 match deserialized {
2044 AnySyncTimelineEvent::MessageLike(ev) => {
2045 if ev.is_redacted() {
2046 return Ok(());
2047 }
2048 }
2049 AnySyncTimelineEvent::State(ev) => {
2050 if ev.is_redacted() {
2051 return Ok(());
2052 }
2053 }
2054 }
2055
2056 extract_thread_root(target_event.raw())
2059 } else {
2060 warn!("failed to deserialize the event to redact");
2061 None
2062 };
2063
2064 if let Some(redacted_event) = apply_redaction(
2065 target_event.raw(),
2066 event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
2067 &self.state.room_version_rules.redaction,
2068 ) {
2069 target_event.replace_raw(redacted_event.cast_unchecked());
2074
2075 self.replace_event_at(location, target_event).await?;
2076
2077 if let Some(thread_root) = thread_root
2085 && let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
2086 {
2087 thread_cache.remove_if_present(event_id);
2088
2089 let latest_event_id = thread_cache.latest_event_id();
2092 self.maybe_update_thread_summary(thread_root, latest_event_id).await?;
2093 }
2094 }
2095
2096 Ok(())
2097 }
2098
2099 pub async fn save_events(
2101 &mut self,
2102 events: impl IntoIterator<Item = Event>,
2103 ) -> Result<(), EventCacheError> {
2104 let store = self.store.clone();
2105 let room_id = self.state.room_id.clone();
2106 let events = events.into_iter().collect::<Vec<_>>();
2107
2108 spawn(async move {
2110 for event in events {
2111 store.save_event(&room_id, event).await?;
2112 }
2113 super::Result::Ok(())
2114 })
2115 .await
2116 .expect("joining failed")?;
2117
2118 Ok(())
2119 }
2120
2121 #[cfg(test)]
2122 pub fn is_dirty(&self) -> bool {
2123 EventCacheStoreLockGuard::is_dirty(&self.store)
2124 }
2125 }
2126
2127 async fn load_linked_chunk_metadata(
2133 store_guard: &EventCacheStoreLockGuard,
2134 linked_chunk_id: LinkedChunkId<'_>,
2135 ) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
2136 let mut all_chunks = store_guard
2137 .load_all_chunks_metadata(linked_chunk_id)
2138 .await
2139 .map_err(EventCacheError::from)?;
2140
2141 if all_chunks.is_empty() {
2142 return Ok(None);
2144 }
2145
2146 let chunk_map: HashMap<_, _> =
2148 all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
2149
2150 let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
2152 let Some(last) = iter.next() else {
2153 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2154 details: "no last chunk found".to_owned(),
2155 });
2156 };
2157
2158 if let Some(other_last) = iter.next() {
2160 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2161 details: format!(
2162 "chunks {} and {} both claim to be last chunks",
2163 last.identifier.index(),
2164 other_last.identifier.index()
2165 ),
2166 });
2167 }
2168
2169 let mut seen = HashSet::new();
2172 let mut current = last;
2173 loop {
2174 if !seen.insert(current.identifier) {
2176 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2177 details: format!(
2178 "cycle detected in linked chunk at {}",
2179 current.identifier.index()
2180 ),
2181 });
2182 }
2183
2184 let Some(prev_id) = current.previous else {
2185 if seen.len() != all_chunks.len() {
2187 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2188 details: format!(
2189 "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
2190 seen.len(),
2191 all_chunks.len()
2192 ),
2193 });
2194 }
2195 break;
2196 };
2197
2198 let Some(pred_meta) = chunk_map.get(&prev_id) else {
2201 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2202 details: format!(
2203 "missing predecessor {} chunk for {}",
2204 prev_id.index(),
2205 current.identifier.index()
2206 ),
2207 });
2208 };
2209
2210 if pred_meta.next != Some(current.identifier) {
2212 return Err(EventCacheError::InvalidLinkedChunkMetadata {
2213 details: format!(
2214 "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
2215 pred_meta.identifier.index(),
2216 pred_meta.next.map(|chunk_id| chunk_id.index()),
2217 current.identifier.index()
2218 ),
2219 });
2220 }
2221
2222 current = *pred_meta;
2223 }
2224
2225 let mut current = current.identifier;
2233 for i in 0..all_chunks.len() {
2234 let j = all_chunks
2236 .iter()
2237 .rev()
2238 .position(|meta| meta.identifier == current)
2239 .map(|j| all_chunks.len() - 1 - j)
2240 .expect("the target chunk must be present in the metadata");
2241 if i != j {
2242 all_chunks.swap(i, j);
2243 }
2244 if let Some(next) = all_chunks[i].next {
2245 current = next;
2246 }
2247 }
2248
2249 Ok(Some(all_chunks))
2250 }
2251
2252 fn strip_relations_if_present<T>(event: &mut Raw<T>) {
2256 let mut closure = || -> Option<()> {
2260 let mut val: serde_json::Value = event.deserialize_as().ok()?;
2261 let unsigned = val.get_mut("unsigned")?;
2262 let unsigned_obj = unsigned.as_object_mut()?;
2263 if unsigned_obj.remove("m.relations").is_some() {
2264 *event = Raw::new(&val).ok()?.cast_unchecked();
2265 }
2266 None
2267 };
2268 let _ = closure();
2269 }
2270
2271 fn strip_relations_from_event(ev: &mut Event) {
2272 match &mut ev.kind {
2273 TimelineEventKind::Decrypted(decrypted) => {
2274 decrypted.unsigned_encryption_info = None;
2277
2278 strip_relations_if_present(&mut decrypted.event);
2280 }
2281
2282 TimelineEventKind::UnableToDecrypt { event, .. }
2283 | TimelineEventKind::PlainText { event } => {
2284 strip_relations_if_present(event);
2285 }
2286 }
2287 }
2288
2289 fn strip_relations_from_events(items: &mut [Event]) {
2291 for ev in items.iter_mut() {
2292 strip_relations_from_event(ev);
2293 }
2294 }
2295
2296 async fn find_event(
2299 event_id: &EventId,
2300 room_id: &RoomId,
2301 room_linked_chunk: &EventLinkedChunk,
2302 store: &EventCacheStoreLockGuard,
2303 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
2304 for (position, event) in room_linked_chunk.revents() {
2307 if event.event_id().as_deref() == Some(event_id) {
2308 return Ok(Some((EventLocation::Memory(position), event.clone())));
2309 }
2310 }
2311
2312 Ok(store.find_event(room_id, event_id).await?.map(|event| (EventLocation::Store, event)))
2313 }
2314
2315 async fn find_event_with_relations(
2319 event_id: &EventId,
2320 room_id: &RoomId,
2321 filters: Option<Vec<RelationType>>,
2322 room_linked_chunk: &EventLinkedChunk,
2323 store: &EventCacheStoreLockGuard,
2324 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
2325 let found = store.find_event(room_id, event_id).await?;
2327
2328 let Some(target) = found else {
2329 return Ok(None);
2331 };
2332
2333 let mut related = store.find_event_relations(room_id, event_id, filters.as_deref()).await?;
2336 let mut stack =
2337 related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
2338
2339 let mut already_seen = HashSet::new();
2342 already_seen.insert(event_id.to_owned());
2343
2344 let mut num_iters = 1;
2345
2346 while let Some(event_id) = stack.pop() {
2348 if !already_seen.insert(event_id.clone()) {
2349 continue;
2351 }
2352
2353 let other_related =
2354 store.find_event_relations(room_id, &event_id, filters.as_deref()).await?;
2355
2356 stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
2357 related.extend(other_related);
2358
2359 num_iters += 1;
2360 }
2361
2362 trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
2363
2364 related.sort_by(|(_, lhs), (_, rhs)| {
2368 use std::cmp::Ordering;
2369
2370 match (lhs, rhs) {
2371 (None, None) => Ordering::Equal,
2372 (None, Some(_)) => Ordering::Less,
2373 (Some(_), None) => Ordering::Greater,
2374 (Some(lhs), Some(rhs)) => {
2375 let lhs = room_linked_chunk.event_order(*lhs);
2376 let rhs = room_linked_chunk.event_order(*rhs);
2377
2378 match (lhs, rhs) {
2382 (None, None) => Ordering::Equal,
2383 (None, Some(_)) => Ordering::Less,
2384 (Some(_), None) => Ordering::Greater,
2385 (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
2386 }
2387 }
2388 }
2389 });
2390
2391 let related = related.into_iter().map(|(event, _pos)| event).collect();
2393
2394 Ok(Some((target, related)))
2395 }
2396}
2397
2398pub(super) enum EventLocation {
2400 Memory(Position),
2402
2403 Store,
2405}
2406
2407pub(super) use private::RoomEventCacheStateLock;
2408
2409#[cfg(test)]
2410mod tests {
2411 use matrix_sdk_base::event_cache::Event;
2412 use matrix_sdk_test::{async_test, event_factory::EventFactory};
2413 use ruma::{
2414 RoomId, event_id,
2415 events::{relation::RelationType, room::message::RoomMessageEventContentWithoutRelation},
2416 room_id, user_id,
2417 };
2418
2419 use crate::test_utils::logged_in_client;
2420
2421 #[async_test]
2422 async fn test_find_event_by_id_with_edit_relation() {
2423 let original_id = event_id!("$original");
2424 let related_id = event_id!("$related");
2425 let room_id = room_id!("!galette:saucisse.bzh");
2426 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2427
2428 assert_relations(
2429 room_id,
2430 f.text_msg("Original event").event_id(original_id).into(),
2431 f.text_msg("* An edited event")
2432 .edit(
2433 original_id,
2434 RoomMessageEventContentWithoutRelation::text_plain("And edited event"),
2435 )
2436 .event_id(related_id)
2437 .into(),
2438 f,
2439 )
2440 .await;
2441 }
2442
2443 #[async_test]
2444 async fn test_find_event_by_id_with_thread_reply_relation() {
2445 let original_id = event_id!("$original");
2446 let related_id = event_id!("$related");
2447 let room_id = room_id!("!galette:saucisse.bzh");
2448 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2449
2450 assert_relations(
2451 room_id,
2452 f.text_msg("Original event").event_id(original_id).into(),
2453 f.text_msg("A reply").in_thread(original_id, related_id).event_id(related_id).into(),
2454 f,
2455 )
2456 .await;
2457 }
2458
2459 #[async_test]
2460 async fn test_find_event_by_id_with_reaction_relation() {
2461 let original_id = event_id!("$original");
2462 let related_id = event_id!("$related");
2463 let room_id = room_id!("!galette:saucisse.bzh");
2464 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2465
2466 assert_relations(
2467 room_id,
2468 f.text_msg("Original event").event_id(original_id).into(),
2469 f.reaction(original_id, ":D").event_id(related_id).into(),
2470 f,
2471 )
2472 .await;
2473 }
2474
2475 #[async_test]
2476 async fn test_find_event_by_id_with_poll_response_relation() {
2477 let original_id = event_id!("$original");
2478 let related_id = event_id!("$related");
2479 let room_id = room_id!("!galette:saucisse.bzh");
2480 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2481
2482 assert_relations(
2483 room_id,
2484 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2485 .event_id(original_id)
2486 .into(),
2487 f.poll_response(vec!["1"], original_id).event_id(related_id).into(),
2488 f,
2489 )
2490 .await;
2491 }
2492
2493 #[async_test]
2494 async fn test_find_event_by_id_with_poll_end_relation() {
2495 let original_id = event_id!("$original");
2496 let related_id = event_id!("$related");
2497 let room_id = room_id!("!galette:saucisse.bzh");
2498 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2499
2500 assert_relations(
2501 room_id,
2502 f.poll_start("Poll start event", "A poll question", vec!["An answer"])
2503 .event_id(original_id)
2504 .into(),
2505 f.poll_end("Poll ended", original_id).event_id(related_id).into(),
2506 f,
2507 )
2508 .await;
2509 }
2510
2511 #[async_test]
2512 async fn test_find_event_by_id_with_filtered_relationships() {
2513 let original_id = event_id!("$original");
2514 let related_id = event_id!("$related");
2515 let associated_related_id = event_id!("$recursive_related");
2516 let room_id = room_id!("!galette:saucisse.bzh");
2517 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2518
2519 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2520 let related_event = event_factory
2521 .text_msg("* Edited event")
2522 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2523 .event_id(related_id)
2524 .into();
2525 let associated_related_event =
2526 event_factory.reaction(related_id, "🤡").event_id(associated_related_id).into();
2527
2528 let client = logged_in_client(None).await;
2529
2530 let event_cache = client.event_cache();
2531 event_cache.subscribe().unwrap();
2532
2533 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2534 let room = client.get_room(room_id).unwrap();
2535
2536 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2537
2538 room_event_cache.save_events([original_event]).await;
2540
2541 room_event_cache.save_events([related_event]).await;
2543
2544 room_event_cache.save_events([associated_related_event]).await;
2546
2547 let filter = Some(vec![RelationType::Replacement]);
2548 let (event, related_events) = room_event_cache
2549 .find_event_with_relations(original_id, filter)
2550 .await
2551 .expect("Failed to find the event with relations")
2552 .expect("Event has no relation");
2553 let cached_event_id = event.event_id().unwrap();
2555 assert_eq!(cached_event_id, original_id);
2556
2557 assert_eq!(related_events.len(), 1);
2559
2560 let related_event_id = related_events[0].event_id().unwrap();
2561 assert_eq!(related_event_id, related_id);
2562
2563 let filter = Some(vec![RelationType::Thread]);
2565 let (event, related_events) = room_event_cache
2566 .find_event_with_relations(original_id, filter)
2567 .await
2568 .expect("Failed to find the event with relations")
2569 .expect("Event has no relation");
2570
2571 let cached_event_id = event.event_id().unwrap();
2573 assert_eq!(cached_event_id, original_id);
2574 assert!(related_events.is_empty());
2576 }
2577
2578 #[async_test]
2579 async fn test_find_event_by_id_with_recursive_relation() {
2580 let original_id = event_id!("$original");
2581 let related_id = event_id!("$related");
2582 let associated_related_id = event_id!("$recursive_related");
2583 let room_id = room_id!("!galette:saucisse.bzh");
2584 let event_factory = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2585
2586 let original_event = event_factory.text_msg("Original event").event_id(original_id).into();
2587 let related_event = event_factory
2588 .text_msg("* Edited event")
2589 .edit(original_id, RoomMessageEventContentWithoutRelation::text_plain("Edited event"))
2590 .event_id(related_id)
2591 .into();
2592 let associated_related_event =
2593 event_factory.reaction(related_id, "👍").event_id(associated_related_id).into();
2594
2595 let client = logged_in_client(None).await;
2596
2597 let event_cache = client.event_cache();
2598 event_cache.subscribe().unwrap();
2599
2600 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2601 let room = client.get_room(room_id).unwrap();
2602
2603 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2604
2605 room_event_cache.save_events([original_event]).await;
2607
2608 room_event_cache.save_events([related_event]).await;
2610
2611 room_event_cache.save_events([associated_related_event]).await;
2613
2614 let (event, related_events) = room_event_cache
2615 .find_event_with_relations(original_id, None)
2616 .await
2617 .expect("Failed to find the event with relations")
2618 .expect("Event has no relation");
2619 let cached_event_id = event.event_id().unwrap();
2621 assert_eq!(cached_event_id, original_id);
2622
2623 assert_eq!(related_events.len(), 2);
2625
2626 let related_event_id = related_events[0].event_id().unwrap();
2627 assert_eq!(related_event_id, related_id);
2628 let related_event_id = related_events[1].event_id().unwrap();
2629 assert_eq!(related_event_id, associated_related_id);
2630 }
2631
2632 async fn assert_relations(
2633 room_id: &RoomId,
2634 original_event: Event,
2635 related_event: Event,
2636 event_factory: EventFactory,
2637 ) {
2638 let client = logged_in_client(None).await;
2639
2640 let event_cache = client.event_cache();
2641 event_cache.subscribe().unwrap();
2642
2643 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2644 let room = client.get_room(room_id).unwrap();
2645
2646 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2647
2648 let original_event_id = original_event.event_id().unwrap();
2650 room_event_cache.save_events([original_event]).await;
2651
2652 let unrelated_id = event_id!("$2");
2654 room_event_cache
2655 .save_events([event_factory
2656 .text_msg("An unrelated event")
2657 .event_id(unrelated_id)
2658 .into()])
2659 .await;
2660
2661 let related_id = related_event.event_id().unwrap();
2663 room_event_cache.save_events([related_event]).await;
2664
2665 let (event, related_events) = room_event_cache
2666 .find_event_with_relations(&original_event_id, None)
2667 .await
2668 .expect("Failed to find the event with relations")
2669 .expect("Event has no relation");
2670 let cached_event_id = event.event_id().unwrap();
2672 assert_eq!(cached_event_id, original_event_id);
2673
2674 let related_event_id = related_events[0].event_id().unwrap();
2676 assert_eq!(related_event_id, related_id);
2677 }
2678}
2679
2680#[cfg(all(test, not(target_family = "wasm")))] mod timed_tests {
2682 use std::{ops::Not, sync::Arc};
2683
2684 use assert_matches::assert_matches;
2685 use assert_matches2::assert_let;
2686 use eyeball_im::VectorDiff;
2687 use futures_util::FutureExt;
2688 use matrix_sdk_base::{
2689 event_cache::{
2690 Gap,
2691 store::{EventCacheStore as _, MemoryStore},
2692 },
2693 linked_chunk::{
2694 ChunkContent, ChunkIdentifier, LinkedChunkId, Position, Update,
2695 lazy_loader::from_all_chunks,
2696 },
2697 store::StoreConfig,
2698 sync::{JoinedRoomUpdate, Timeline},
2699 };
2700 use matrix_sdk_test::{ALICE, BOB, async_test, event_factory::EventFactory};
2701 use ruma::{
2702 EventId, OwnedUserId, event_id,
2703 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
2704 room_id, user_id,
2705 };
2706 use tokio::task::yield_now;
2707
2708 use super::RoomEventCacheGenericUpdate;
2709 use crate::{
2710 assert_let_timeout,
2711 event_cache::{RoomEventCache, RoomEventCacheUpdate, room::LoadMoreEventsBackwardsOutcome},
2712 test_utils::client::MockClientBuilder,
2713 };
2714
2715 #[async_test]
2716 async fn test_write_to_storage() {
2717 let room_id = room_id!("!galette:saucisse.bzh");
2718 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2719
2720 let event_cache_store = Arc::new(MemoryStore::new());
2721
2722 let client = MockClientBuilder::new(None)
2723 .on_builder(|builder| {
2724 builder.store_config(
2725 StoreConfig::new("hodlor".to_owned())
2726 .event_cache_store(event_cache_store.clone()),
2727 )
2728 })
2729 .build()
2730 .await;
2731
2732 let event_cache = client.event_cache();
2733
2734 event_cache.subscribe().unwrap();
2736
2737 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2738 let room = client.get_room(room_id).unwrap();
2739
2740 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2741 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2742
2743 let timeline = Timeline {
2745 limited: true,
2746 prev_batch: Some("raclette".to_owned()),
2747 events: vec![f.text_msg("hey yo").sender(*ALICE).into_event()],
2748 };
2749
2750 room_event_cache
2751 .inner
2752 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2753 .await
2754 .unwrap();
2755
2756 assert_matches!(
2758 generic_stream.recv().await,
2759 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2760 assert_eq!(expected_room_id, room_id);
2761 }
2762 );
2763
2764 let linked_chunk = from_all_chunks::<3, _, _>(
2766 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2767 )
2768 .unwrap()
2769 .unwrap();
2770
2771 assert_eq!(linked_chunk.chunks().count(), 2);
2772
2773 let mut chunks = linked_chunk.chunks();
2774
2775 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => {
2777 assert_eq!(gap.prev_token, "raclette");
2778 });
2779
2780 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2782 assert_eq!(events.len(), 1);
2783 let deserialized = events[0].raw().deserialize().unwrap();
2784 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = deserialized);
2785 assert_eq!(msg.as_original().unwrap().content.body(), "hey yo");
2786 });
2787
2788 assert!(chunks.next().is_none());
2790 }
2791
2792 #[async_test]
2793 async fn test_write_to_storage_strips_bundled_relations() {
2794 let room_id = room_id!("!galette:saucisse.bzh");
2795 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2796
2797 let event_cache_store = Arc::new(MemoryStore::new());
2798
2799 let client = MockClientBuilder::new(None)
2800 .on_builder(|builder| {
2801 builder.store_config(
2802 StoreConfig::new("hodlor".to_owned())
2803 .event_cache_store(event_cache_store.clone()),
2804 )
2805 })
2806 .build()
2807 .await;
2808
2809 let event_cache = client.event_cache();
2810
2811 event_cache.subscribe().unwrap();
2813
2814 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2815 let room = client.get_room(room_id).unwrap();
2816
2817 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2818 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2819
2820 let ev = f
2822 .text_msg("hey yo")
2823 .sender(*ALICE)
2824 .with_bundled_edit(f.text_msg("Hello, Kind Sir").sender(*ALICE))
2825 .into_event();
2826
2827 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev] };
2828
2829 room_event_cache
2830 .inner
2831 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
2832 .await
2833 .unwrap();
2834
2835 assert_matches!(
2837 generic_stream.recv().await,
2838 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
2839 assert_eq!(expected_room_id, room_id);
2840 }
2841 );
2842
2843 {
2845 let events = room_event_cache.events().await.unwrap();
2846
2847 assert_eq!(events.len(), 1);
2848
2849 let ev = events[0].raw().deserialize().unwrap();
2850 assert_let!(
2851 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev
2852 );
2853
2854 let original = msg.as_original().unwrap();
2855 assert_eq!(original.content.body(), "hey yo");
2856 assert!(original.unsigned.relations.replace.is_some());
2857 }
2858
2859 let linked_chunk = from_all_chunks::<3, _, _>(
2861 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
2862 )
2863 .unwrap()
2864 .unwrap();
2865
2866 assert_eq!(linked_chunk.chunks().count(), 1);
2867
2868 let mut chunks = linked_chunk.chunks();
2869 assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => {
2870 assert_eq!(events.len(), 1);
2871
2872 let ev = events[0].raw().deserialize().unwrap();
2873 assert_let!(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(msg)) = ev);
2874
2875 let original = msg.as_original().unwrap();
2876 assert_eq!(original.content.body(), "hey yo");
2877 assert!(original.unsigned.relations.replace.is_none());
2878 });
2879
2880 assert!(chunks.next().is_none());
2882 }
2883
2884 #[async_test]
2885 async fn test_clear() {
2886 let room_id = room_id!("!galette:saucisse.bzh");
2887 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
2888
2889 let event_cache_store = Arc::new(MemoryStore::new());
2890
2891 let event_id1 = event_id!("$1");
2892 let event_id2 = event_id!("$2");
2893
2894 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
2895 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
2896
2897 event_cache_store
2899 .handle_linked_chunk_updates(
2900 LinkedChunkId::Room(room_id),
2901 vec![
2902 Update::NewItemsChunk {
2904 previous: None,
2905 new: ChunkIdentifier::new(0),
2906 next: None,
2907 },
2908 Update::NewGapChunk {
2910 previous: Some(ChunkIdentifier::new(0)),
2911 new: ChunkIdentifier::new(42),
2913 next: None,
2914 gap: Gap { prev_token: "comté".to_owned() },
2915 },
2916 Update::NewItemsChunk {
2918 previous: Some(ChunkIdentifier::new(42)),
2919 new: ChunkIdentifier::new(1),
2920 next: None,
2921 },
2922 Update::PushItems {
2923 at: Position::new(ChunkIdentifier::new(1), 0),
2924 items: vec![ev1.clone()],
2925 },
2926 Update::NewItemsChunk {
2928 previous: Some(ChunkIdentifier::new(1)),
2929 new: ChunkIdentifier::new(2),
2930 next: None,
2931 },
2932 Update::PushItems {
2933 at: Position::new(ChunkIdentifier::new(2), 0),
2934 items: vec![ev2.clone()],
2935 },
2936 ],
2937 )
2938 .await
2939 .unwrap();
2940
2941 let client = MockClientBuilder::new(None)
2942 .on_builder(|builder| {
2943 builder.store_config(
2944 StoreConfig::new("hodlor".to_owned())
2945 .event_cache_store(event_cache_store.clone()),
2946 )
2947 })
2948 .build()
2949 .await;
2950
2951 let event_cache = client.event_cache();
2952
2953 event_cache.subscribe().unwrap();
2955
2956 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
2957 let room = client.get_room(room_id).unwrap();
2958
2959 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
2960
2961 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
2962 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
2963
2964 {
2966 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
2967 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
2968 }
2969
2970 {
2972 assert_eq!(items.len(), 1);
2974 assert_eq!(items[0].event_id().unwrap(), event_id2);
2975
2976 assert!(stream.is_empty());
2977 }
2978
2979 {
2981 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
2982
2983 assert_let_timeout!(
2984 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
2985 );
2986 assert_eq!(diffs.len(), 1);
2987 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
2988 assert_eq!(event.event_id().unwrap(), event_id1);
2990 });
2991
2992 assert!(stream.is_empty());
2993 }
2994
2995 room_event_cache.clear().await.unwrap();
2997
2998 assert_let_timeout!(
3000 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3001 );
3002 assert_eq!(diffs.len(), 1);
3003 assert_let!(VectorDiff::Clear = &diffs[0]);
3004
3005 assert_let_timeout!(
3007 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3008 );
3009 assert_eq!(received_room_id, room_id);
3010
3011 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3014
3015 let items = room_event_cache.events().await.unwrap();
3017 assert!(items.is_empty());
3018
3019 let linked_chunk = from_all_chunks::<3, _, _>(
3021 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap(),
3022 )
3023 .unwrap()
3024 .unwrap();
3025
3026 assert_eq!(linked_chunk.num_items(), 0);
3030 }
3031
3032 #[async_test]
3033 async fn test_load_from_storage() {
3034 let room_id = room_id!("!galette:saucisse.bzh");
3035 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
3036
3037 let event_cache_store = Arc::new(MemoryStore::new());
3038
3039 let event_id1 = event_id!("$1");
3040 let event_id2 = event_id!("$2");
3041
3042 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event();
3043 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event();
3044
3045 event_cache_store
3047 .handle_linked_chunk_updates(
3048 LinkedChunkId::Room(room_id),
3049 vec![
3050 Update::NewItemsChunk {
3052 previous: None,
3053 new: ChunkIdentifier::new(0),
3054 next: None,
3055 },
3056 Update::NewGapChunk {
3058 previous: Some(ChunkIdentifier::new(0)),
3059 new: ChunkIdentifier::new(42),
3061 next: None,
3062 gap: Gap { prev_token: "cheddar".to_owned() },
3063 },
3064 Update::NewItemsChunk {
3066 previous: Some(ChunkIdentifier::new(42)),
3067 new: ChunkIdentifier::new(1),
3068 next: None,
3069 },
3070 Update::PushItems {
3071 at: Position::new(ChunkIdentifier::new(1), 0),
3072 items: vec![ev1.clone()],
3073 },
3074 Update::NewItemsChunk {
3076 previous: Some(ChunkIdentifier::new(1)),
3077 new: ChunkIdentifier::new(2),
3078 next: None,
3079 },
3080 Update::PushItems {
3081 at: Position::new(ChunkIdentifier::new(2), 0),
3082 items: vec![ev2.clone()],
3083 },
3084 ],
3085 )
3086 .await
3087 .unwrap();
3088
3089 let client = MockClientBuilder::new(None)
3090 .on_builder(|builder| {
3091 builder.store_config(
3092 StoreConfig::new("hodlor".to_owned())
3093 .event_cache_store(event_cache_store.clone()),
3094 )
3095 })
3096 .build()
3097 .await;
3098
3099 let event_cache = client.event_cache();
3100
3101 event_cache.subscribe().unwrap();
3103
3104 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3106
3107 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3108 let room = client.get_room(room_id).unwrap();
3109
3110 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3111
3112 assert_matches!(
3115 generic_stream.recv().await,
3116 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3117 assert_eq!(room_id, expected_room_id);
3118 }
3119 );
3120
3121 let (items, mut stream) = room_event_cache.subscribe().await.unwrap();
3122
3123 assert_eq!(items.len(), 1);
3126 assert_eq!(items[0].event_id().unwrap(), event_id2);
3127 assert!(stream.is_empty());
3128
3129 assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some());
3131 assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some());
3132
3133 room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3135
3136 assert_let_timeout!(
3137 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3138 );
3139 assert_eq!(diffs.len(), 1);
3140 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: event } => {
3141 assert_eq!(event.event_id().unwrap(), event_id1);
3142 });
3143
3144 assert!(stream.is_empty());
3145
3146 assert_matches!(
3148 generic_stream.recv().await,
3149 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3150 assert_eq!(expected_room_id, room_id);
3151 }
3152 );
3153
3154 let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] };
3156
3157 room_event_cache
3158 .inner
3159 .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() })
3160 .await
3161 .unwrap();
3162
3163 assert!(generic_stream.recv().now_or_never().is_none());
3166
3167 let items = room_event_cache.events().await.unwrap();
3172 assert_eq!(items.len(), 2);
3173 assert_eq!(items[0].event_id().unwrap(), event_id1);
3174 assert_eq!(items[1].event_id().unwrap(), event_id2);
3175 }
3176
3177 #[async_test]
3178 async fn test_load_from_storage_resilient_to_failure() {
3179 let room_id = room_id!("!fondue:patate.ch");
3180 let event_cache_store = Arc::new(MemoryStore::new());
3181
3182 let event = EventFactory::new()
3183 .room(room_id)
3184 .sender(user_id!("@ben:saucisse.bzh"))
3185 .text_msg("foo")
3186 .event_id(event_id!("$42"))
3187 .into_event();
3188
3189 event_cache_store
3191 .handle_linked_chunk_updates(
3192 LinkedChunkId::Room(room_id),
3193 vec![
3194 Update::NewItemsChunk {
3195 previous: None,
3196 new: ChunkIdentifier::new(0),
3197 next: None,
3198 },
3199 Update::PushItems {
3200 at: Position::new(ChunkIdentifier::new(0), 0),
3201 items: vec![event],
3202 },
3203 Update::NewItemsChunk {
3204 previous: Some(ChunkIdentifier::new(0)),
3205 new: ChunkIdentifier::new(1),
3206 next: Some(ChunkIdentifier::new(0)),
3207 },
3208 ],
3209 )
3210 .await
3211 .unwrap();
3212
3213 let client = MockClientBuilder::new(None)
3214 .on_builder(|builder| {
3215 builder.store_config(
3216 StoreConfig::new("holder".to_owned())
3217 .event_cache_store(event_cache_store.clone()),
3218 )
3219 })
3220 .build()
3221 .await;
3222
3223 let event_cache = client.event_cache();
3224
3225 event_cache.subscribe().unwrap();
3227
3228 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3229 let room = client.get_room(room_id).unwrap();
3230
3231 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3232
3233 let items = room_event_cache.events().await.unwrap();
3234
3235 assert!(items.is_empty());
3238
3239 let raw_chunks =
3242 event_cache_store.load_all_chunks(LinkedChunkId::Room(room_id)).await.unwrap();
3243 assert!(raw_chunks.is_empty());
3244 }
3245
3246 #[async_test]
3247 async fn test_no_useless_gaps() {
3248 let room_id = room_id!("!galette:saucisse.bzh");
3249
3250 let client = MockClientBuilder::new(None).build().await;
3251
3252 let event_cache = client.event_cache();
3253 event_cache.subscribe().unwrap();
3254
3255 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3256 let room = client.get_room(room_id).unwrap();
3257 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3258 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3259
3260 let f = EventFactory::new().room(room_id).sender(*ALICE);
3261
3262 room_event_cache
3265 .inner
3266 .handle_joined_room_update(JoinedRoomUpdate {
3267 timeline: Timeline {
3268 limited: true,
3269 prev_batch: Some("raclette".to_owned()),
3270 events: vec![f.text_msg("hey yo").into_event()],
3271 },
3272 ..Default::default()
3273 })
3274 .await
3275 .unwrap();
3276
3277 assert_matches!(
3279 generic_stream.recv().await,
3280 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3281 assert_eq!(expected_room_id, room_id);
3282 }
3283 );
3284
3285 {
3286 let mut state = room_event_cache.inner.state.write().await.unwrap();
3287
3288 let mut num_gaps = 0;
3289 let mut num_events = 0;
3290
3291 for c in state.room_linked_chunk().chunks() {
3292 match c.content() {
3293 ChunkContent::Items(items) => num_events += items.len(),
3294 ChunkContent::Gap(_) => num_gaps += 1,
3295 }
3296 }
3297
3298 assert_eq!(num_gaps, 0);
3301 assert_eq!(num_events, 1);
3302
3303 assert_matches!(
3305 state.load_more_events_backwards().await.unwrap(),
3306 LoadMoreEventsBackwardsOutcome::Gap { .. }
3307 );
3308
3309 num_gaps = 0;
3310 num_events = 0;
3311 for c in state.room_linked_chunk().chunks() {
3312 match c.content() {
3313 ChunkContent::Items(items) => num_events += items.len(),
3314 ChunkContent::Gap(_) => num_gaps += 1,
3315 }
3316 }
3317
3318 assert_eq!(num_gaps, 1);
3320 assert_eq!(num_events, 1);
3321 }
3322
3323 room_event_cache
3326 .inner
3327 .handle_joined_room_update(JoinedRoomUpdate {
3328 timeline: Timeline {
3329 limited: false,
3330 prev_batch: Some("fondue".to_owned()),
3331 events: vec![f.text_msg("sup").into_event()],
3332 },
3333 ..Default::default()
3334 })
3335 .await
3336 .unwrap();
3337
3338 assert_matches!(
3340 generic_stream.recv().await,
3341 Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => {
3342 assert_eq!(expected_room_id, room_id);
3343 }
3344 );
3345
3346 {
3347 let state = room_event_cache.inner.state.read().await.unwrap();
3348
3349 let mut num_gaps = 0;
3350 let mut num_events = 0;
3351
3352 for c in state.room_linked_chunk().chunks() {
3353 match c.content() {
3354 ChunkContent::Items(items) => num_events += items.len(),
3355 ChunkContent::Gap(gap) => {
3356 assert_eq!(gap.prev_token, "raclette");
3357 num_gaps += 1;
3358 }
3359 }
3360 }
3361
3362 assert_eq!(num_gaps, 1);
3364 assert_eq!(num_events, 2);
3365 }
3366 }
3367
3368 #[async_test]
3369 async fn test_shrink_to_last_chunk() {
3370 let room_id = room_id!("!galette:saucisse.bzh");
3371
3372 let client = MockClientBuilder::new(None).build().await;
3373
3374 let f = EventFactory::new().room(room_id);
3375
3376 let evid1 = event_id!("$1");
3377 let evid2 = event_id!("$2");
3378
3379 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3380 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3381
3382 {
3384 client
3385 .event_cache_store()
3386 .lock()
3387 .await
3388 .expect("Could not acquire the event cache lock")
3389 .as_clean()
3390 .expect("Could not acquire a clean event cache lock")
3391 .handle_linked_chunk_updates(
3392 LinkedChunkId::Room(room_id),
3393 vec![
3394 Update::NewItemsChunk {
3395 previous: None,
3396 new: ChunkIdentifier::new(0),
3397 next: None,
3398 },
3399 Update::PushItems {
3400 at: Position::new(ChunkIdentifier::new(0), 0),
3401 items: vec![ev1],
3402 },
3403 Update::NewItemsChunk {
3404 previous: Some(ChunkIdentifier::new(0)),
3405 new: ChunkIdentifier::new(1),
3406 next: None,
3407 },
3408 Update::PushItems {
3409 at: Position::new(ChunkIdentifier::new(1), 0),
3410 items: vec![ev2],
3411 },
3412 ],
3413 )
3414 .await
3415 .unwrap();
3416 }
3417
3418 let event_cache = client.event_cache();
3419 event_cache.subscribe().unwrap();
3420
3421 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3422 let room = client.get_room(room_id).unwrap();
3423 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3424
3425 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
3427 assert_eq!(events.len(), 1);
3428 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3429 assert!(stream.is_empty());
3430
3431 let mut generic_stream = event_cache.subscribe_to_room_generic_updates();
3432
3433 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3435 assert_eq!(outcome.events.len(), 1);
3436 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3437 assert!(outcome.reached_start);
3438
3439 assert_let_timeout!(
3441 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv()
3442 );
3443 assert_eq!(diffs.len(), 1);
3444 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3445 assert_eq!(value.event_id().as_deref(), Some(evid1));
3446 });
3447
3448 assert!(stream.is_empty());
3449
3450 assert_let_timeout!(
3452 Ok(RoomEventCacheGenericUpdate { room_id: received_room_id }) = generic_stream.recv()
3453 );
3454 assert_eq!(received_room_id, room_id);
3455
3456 let diffs = room_event_cache
3458 .inner
3459 .state
3460 .write()
3461 .await
3462 .unwrap()
3463 .force_shrink_to_last_chunk()
3464 .await
3465 .expect("shrinking should succeed");
3466
3467 assert_eq!(diffs.len(), 2);
3469 assert_matches!(&diffs[0], VectorDiff::Clear);
3470 assert_matches!(&diffs[1], VectorDiff::Append { values} => {
3471 assert_eq!(values.len(), 1);
3472 assert_eq!(values[0].event_id().as_deref(), Some(evid2));
3473 });
3474
3475 assert!(stream.is_empty());
3476
3477 assert!(generic_stream.is_empty());
3479
3480 let events = room_event_cache.events().await.unwrap();
3482 assert_eq!(events.len(), 1);
3483 assert_eq!(events[0].event_id().as_deref(), Some(evid2));
3484
3485 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3488 assert_eq!(outcome.events.len(), 1);
3489 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3490 assert!(outcome.reached_start);
3491 }
3492
3493 #[async_test]
3494 async fn test_room_ordering() {
3495 let room_id = room_id!("!galette:saucisse.bzh");
3496
3497 let client = MockClientBuilder::new(None).build().await;
3498
3499 let f = EventFactory::new().room(room_id).sender(*ALICE);
3500
3501 let evid1 = event_id!("$1");
3502 let evid2 = event_id!("$2");
3503 let evid3 = event_id!("$3");
3504
3505 let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
3506 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3507 let ev3 = f.text_msg("yo").event_id(evid3).into_event();
3508
3509 {
3511 client
3512 .event_cache_store()
3513 .lock()
3514 .await
3515 .expect("Could not acquire the event cache lock")
3516 .as_clean()
3517 .expect("Could not acquire a clean event cache lock")
3518 .handle_linked_chunk_updates(
3519 LinkedChunkId::Room(room_id),
3520 vec![
3521 Update::NewItemsChunk {
3522 previous: None,
3523 new: ChunkIdentifier::new(0),
3524 next: None,
3525 },
3526 Update::PushItems {
3527 at: Position::new(ChunkIdentifier::new(0), 0),
3528 items: vec![ev1, ev2],
3529 },
3530 Update::NewItemsChunk {
3531 previous: Some(ChunkIdentifier::new(0)),
3532 new: ChunkIdentifier::new(1),
3533 next: None,
3534 },
3535 Update::PushItems {
3536 at: Position::new(ChunkIdentifier::new(1), 0),
3537 items: vec![ev3.clone()],
3538 },
3539 ],
3540 )
3541 .await
3542 .unwrap();
3543 }
3544
3545 let event_cache = client.event_cache();
3546 event_cache.subscribe().unwrap();
3547
3548 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3549 let room = client.get_room(room_id).unwrap();
3550 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3551
3552 {
3555 let state = room_event_cache.inner.state.read().await.unwrap();
3556 let room_linked_chunk = state.room_linked_chunk();
3557
3558 assert_eq!(
3560 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3561 Some(0)
3562 );
3563
3564 assert_eq!(
3566 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3567 Some(1)
3568 );
3569
3570 let mut events = room_linked_chunk.events();
3572 let (pos, ev) = events.next().unwrap();
3573 assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
3574 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3575 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3576
3577 assert!(events.next().is_none());
3579 }
3580
3581 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3583 assert!(outcome.reached_start);
3584
3585 {
3588 let state = room_event_cache.inner.state.read().await.unwrap();
3589 let room_linked_chunk = state.room_linked_chunk();
3590
3591 for (i, (pos, _)) in room_linked_chunk.events().enumerate() {
3592 assert_eq!(room_linked_chunk.event_order(pos), Some(i));
3593 }
3594 }
3595
3596 let evid4 = event_id!("$4");
3601 room_event_cache
3602 .inner
3603 .handle_joined_room_update(JoinedRoomUpdate {
3604 timeline: Timeline {
3605 limited: true,
3606 prev_batch: Some("fondue".to_owned()),
3607 events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
3608 },
3609 ..Default::default()
3610 })
3611 .await
3612 .unwrap();
3613
3614 {
3615 let state = room_event_cache.inner.state.read().await.unwrap();
3616 let room_linked_chunk = state.room_linked_chunk();
3617
3618 let mut events = room_linked_chunk.events();
3620
3621 let (pos, ev) = events.next().unwrap();
3622 assert_eq!(ev.event_id().as_deref(), Some(evid3));
3623 assert_eq!(room_linked_chunk.event_order(pos), Some(2));
3624
3625 let (pos, ev) = events.next().unwrap();
3626 assert_eq!(ev.event_id().as_deref(), Some(evid4));
3627 assert_eq!(room_linked_chunk.event_order(pos), Some(3));
3628
3629 assert!(events.next().is_none());
3631
3632 assert_eq!(
3634 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 0)),
3635 Some(0)
3636 );
3637 assert_eq!(
3638 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(0), 1)),
3639 Some(1)
3640 );
3641
3642 assert_eq!(
3645 room_linked_chunk.event_order(Position::new(ChunkIdentifier::new(1), 0)),
3646 None
3647 );
3648 }
3649 }
3650
3651 #[async_test]
3652 async fn test_auto_shrink_after_all_subscribers_are_gone() {
3653 let room_id = room_id!("!galette:saucisse.bzh");
3654
3655 let client = MockClientBuilder::new(None).build().await;
3656
3657 let f = EventFactory::new().room(room_id);
3658
3659 let evid1 = event_id!("$1");
3660 let evid2 = event_id!("$2");
3661
3662 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(evid1).into_event();
3663 let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
3664
3665 {
3667 client
3668 .event_cache_store()
3669 .lock()
3670 .await
3671 .expect("Could not acquire the event cache lock")
3672 .as_clean()
3673 .expect("Could not acquire a clean event cache lock")
3674 .handle_linked_chunk_updates(
3675 LinkedChunkId::Room(room_id),
3676 vec![
3677 Update::NewItemsChunk {
3678 previous: None,
3679 new: ChunkIdentifier::new(0),
3680 next: None,
3681 },
3682 Update::PushItems {
3683 at: Position::new(ChunkIdentifier::new(0), 0),
3684 items: vec![ev1],
3685 },
3686 Update::NewItemsChunk {
3687 previous: Some(ChunkIdentifier::new(0)),
3688 new: ChunkIdentifier::new(1),
3689 next: None,
3690 },
3691 Update::PushItems {
3692 at: Position::new(ChunkIdentifier::new(1), 0),
3693 items: vec![ev2],
3694 },
3695 ],
3696 )
3697 .await
3698 .unwrap();
3699 }
3700
3701 let event_cache = client.event_cache();
3702 event_cache.subscribe().unwrap();
3703
3704 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3705 let room = client.get_room(room_id).unwrap();
3706 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3707
3708 let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap();
3710 assert_eq!(events1.len(), 1);
3711 assert_eq!(events1[0].event_id().as_deref(), Some(evid2));
3712 assert!(stream1.is_empty());
3713
3714 let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
3716 assert_eq!(outcome.events.len(), 1);
3717 assert_eq!(outcome.events[0].event_id().as_deref(), Some(evid1));
3718 assert!(outcome.reached_start);
3719
3720 assert_let_timeout!(
3723 Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream1.recv()
3724 );
3725 assert_eq!(diffs.len(), 1);
3726 assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value } => {
3727 assert_eq!(value.event_id().as_deref(), Some(evid1));
3728 });
3729
3730 assert!(stream1.is_empty());
3731
3732 let (events2, stream2) = room_event_cache.subscribe().await.unwrap();
3736 assert_eq!(events2.len(), 2);
3737 assert_eq!(events2[0].event_id().as_deref(), Some(evid1));
3738 assert_eq!(events2[1].event_id().as_deref(), Some(evid2));
3739 assert!(stream2.is_empty());
3740
3741 drop(stream1);
3743 yield_now().await;
3744
3745 assert!(stream2.is_empty());
3747
3748 drop(stream2);
3750 yield_now().await;
3751
3752 {
3755 let state = room_event_cache.inner.state.read().await.unwrap();
3757 assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0);
3758 }
3759
3760 let events3 = room_event_cache.events().await.unwrap();
3762 assert_eq!(events3.len(), 1);
3763 assert_eq!(events3[0].event_id().as_deref(), Some(evid2));
3764 }
3765
3766 #[async_test]
3767 async fn test_rfind_map_event_in_memory_by() {
3768 let user_id = user_id!("@mnt_io:matrix.org");
3769 let room_id = room_id!("!raclette:patate.ch");
3770 let client = MockClientBuilder::new(None).build().await;
3771
3772 let event_factory = EventFactory::new().room(room_id);
3773
3774 let event_id_0 = event_id!("$ev0");
3775 let event_id_1 = event_id!("$ev1");
3776 let event_id_2 = event_id!("$ev2");
3777 let event_id_3 = event_id!("$ev3");
3778
3779 let event_0 =
3780 event_factory.text_msg("hello").sender(*BOB).event_id(event_id_0).into_event();
3781 let event_1 =
3782 event_factory.text_msg("world").sender(*ALICE).event_id(event_id_1).into_event();
3783 let event_2 = event_factory.text_msg("!").sender(*ALICE).event_id(event_id_2).into_event();
3784 let event_3 =
3785 event_factory.text_msg("eh!").sender(user_id).event_id(event_id_3).into_event();
3786
3787 {
3790 client
3791 .event_cache_store()
3792 .lock()
3793 .await
3794 .expect("Could not acquire the event cache lock")
3795 .as_clean()
3796 .expect("Could not acquire a clean event cache lock")
3797 .handle_linked_chunk_updates(
3798 LinkedChunkId::Room(room_id),
3799 vec![
3800 Update::NewItemsChunk {
3801 previous: None,
3802 new: ChunkIdentifier::new(0),
3803 next: None,
3804 },
3805 Update::PushItems {
3806 at: Position::new(ChunkIdentifier::new(0), 0),
3807 items: vec![event_3],
3808 },
3809 Update::NewItemsChunk {
3810 previous: Some(ChunkIdentifier::new(0)),
3811 new: ChunkIdentifier::new(1),
3812 next: None,
3813 },
3814 Update::PushItems {
3815 at: Position::new(ChunkIdentifier::new(1), 0),
3816 items: vec![event_0, event_1, event_2],
3817 },
3818 ],
3819 )
3820 .await
3821 .unwrap();
3822 }
3823
3824 let event_cache = client.event_cache();
3825 event_cache.subscribe().unwrap();
3826
3827 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3828 let room = client.get_room(room_id).unwrap();
3829 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
3830
3831 assert_matches!(
3833 room_event_cache
3834 .rfind_map_event_in_memory_by(|event, previous_event| {
3835 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*BOB)).then(|| (event.event_id(), previous_event.and_then(|e| e.event_id())))
3836 })
3837 .await,
3838 Ok(Some((event_id, previous_event_id))) => {
3839 assert_eq!(event_id.as_deref(), Some(event_id_0));
3840 assert!(previous_event_id.is_none());
3841 }
3842 );
3843
3844 assert_matches!(
3847 room_event_cache
3848 .rfind_map_event_in_memory_by(|event, previous_event| {
3849 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref() == Some(*ALICE)).then(|| (event.event_id(), previous_event.and_then(|e| e.event_id())))
3850 })
3851 .await,
3852 Ok(Some((event_id, previous_event_id))) => {
3853 assert_eq!(event_id.as_deref(), Some(event_id_2));
3854 assert_eq!(previous_event_id.as_deref(), Some(event_id_1));
3855 }
3856 );
3857
3858 assert!(
3860 room_event_cache
3861 .rfind_map_event_in_memory_by(|event, _| {
3862 (event.raw().get_field::<OwnedUserId>("sender").unwrap().as_deref()
3863 == Some(user_id))
3864 .then(|| event.event_id())
3865 })
3866 .await
3867 .unwrap()
3868 .is_none()
3869 );
3870
3871 assert!(
3873 room_event_cache
3874 .rfind_map_event_in_memory_by(|_, _| None::<()>)
3875 .await
3876 .unwrap()
3877 .is_none()
3878 );
3879 }
3880
3881 #[async_test]
3882 async fn test_reload_when_dirty() {
3883 let user_id = user_id!("@mnt_io:matrix.org");
3884 let room_id = room_id!("!raclette:patate.ch");
3885
3886 let event_cache_store = MemoryStore::new();
3888
3889 let client_p0 = MockClientBuilder::new(None)
3891 .on_builder(|builder| {
3892 builder.store_config(
3893 StoreConfig::new("process #0".to_owned())
3894 .event_cache_store(event_cache_store.clone()),
3895 )
3896 })
3897 .build()
3898 .await;
3899
3900 let client_p1 = MockClientBuilder::new(None)
3902 .on_builder(|builder| {
3903 builder.store_config(
3904 StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
3905 )
3906 })
3907 .build()
3908 .await;
3909
3910 let event_factory = EventFactory::new().room(room_id).sender(user_id);
3911
3912 let ev_id_0 = event_id!("$ev_0");
3913 let ev_id_1 = event_id!("$ev_1");
3914
3915 let ev_0 = event_factory.text_msg("comté").event_id(ev_id_0).into_event();
3916 let ev_1 = event_factory.text_msg("morbier").event_id(ev_id_1).into_event();
3917
3918 client_p0
3920 .event_cache_store()
3921 .lock()
3922 .await
3923 .expect("[p0] Could not acquire the event cache lock")
3924 .as_clean()
3925 .expect("[p0] Could not acquire a clean event cache lock")
3926 .handle_linked_chunk_updates(
3927 LinkedChunkId::Room(room_id),
3928 vec![
3929 Update::NewItemsChunk {
3930 previous: None,
3931 new: ChunkIdentifier::new(0),
3932 next: None,
3933 },
3934 Update::PushItems {
3935 at: Position::new(ChunkIdentifier::new(0), 0),
3936 items: vec![ev_0],
3937 },
3938 Update::NewItemsChunk {
3939 previous: Some(ChunkIdentifier::new(0)),
3940 new: ChunkIdentifier::new(1),
3941 next: None,
3942 },
3943 Update::PushItems {
3944 at: Position::new(ChunkIdentifier::new(1), 0),
3945 items: vec![ev_1],
3946 },
3947 ],
3948 )
3949 .await
3950 .unwrap();
3951
3952 let (room_event_cache_p0, room_event_cache_p1) = {
3954 let event_cache_p0 = client_p0.event_cache();
3955 event_cache_p0.subscribe().unwrap();
3956
3957 let event_cache_p1 = client_p1.event_cache();
3958 event_cache_p1.subscribe().unwrap();
3959
3960 client_p0.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3961 client_p1.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
3962
3963 let (room_event_cache_p0, _drop_handles) =
3964 client_p0.get_room(room_id).unwrap().event_cache().await.unwrap();
3965 let (room_event_cache_p1, _drop_handles) =
3966 client_p1.get_room(room_id).unwrap().event_cache().await.unwrap();
3967
3968 (room_event_cache_p0, room_event_cache_p1)
3969 };
3970
3971 let mut updates_stream_p0 = {
3976 let room_event_cache = &room_event_cache_p0;
3977
3978 let (initial_updates, mut updates_stream) =
3979 room_event_cache_p0.subscribe().await.unwrap();
3980
3981 assert_eq!(initial_updates.len(), 1);
3983 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
3984 assert!(updates_stream.is_empty());
3985
3986 assert!(event_loaded(room_event_cache, ev_id_1).await);
3988
3989 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
3991
3992 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
3994
3995 assert_matches!(
3997 updates_stream.recv().await.unwrap(),
3998 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
3999 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4000 assert_matches!(
4001 &diffs[0],
4002 VectorDiff::Insert { index: 0, value: event } => {
4003 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4004 }
4005 );
4006 }
4007 );
4008
4009 assert!(event_loaded(room_event_cache, ev_id_0).await);
4011
4012 updates_stream
4013 };
4014
4015 let mut updates_stream_p1 = {
4017 let room_event_cache = &room_event_cache_p1;
4018 let (initial_updates, mut updates_stream) =
4019 room_event_cache_p1.subscribe().await.unwrap();
4020
4021 assert_eq!(initial_updates.len(), 1);
4023 assert_eq!(initial_updates[0].event_id().as_deref(), Some(ev_id_1));
4024 assert!(updates_stream.is_empty());
4025
4026 assert!(event_loaded(room_event_cache, ev_id_1).await);
4028
4029 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4031
4032 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4034
4035 assert_matches!(
4037 updates_stream.recv().await.unwrap(),
4038 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4039 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4040 assert_matches!(
4041 &diffs[0],
4042 VectorDiff::Insert { index: 0, value: event } => {
4043 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4044 }
4045 );
4046 }
4047 );
4048
4049 assert!(event_loaded(room_event_cache, ev_id_0).await);
4051
4052 updates_stream
4053 };
4054
4055 for _ in 0..3 {
4057 {
4061 let room_event_cache = &room_event_cache_p0;
4062 let updates_stream = &mut updates_stream_p0;
4063
4064 assert!(event_loaded(room_event_cache, ev_id_1).await);
4066
4067 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4070
4071 assert_matches!(
4073 updates_stream.recv().await.unwrap(),
4074 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4075 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4076 assert_matches!(&diffs[0], VectorDiff::Clear);
4077 assert_matches!(
4078 &diffs[1],
4079 VectorDiff::Append { values: events } => {
4080 assert_eq!(events.len(), 1);
4081 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4082 }
4083 );
4084 }
4085 );
4086
4087 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4089
4090 assert!(event_loaded(room_event_cache, ev_id_0).await);
4092
4093 assert_matches!(
4095 updates_stream.recv().await.unwrap(),
4096 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4097 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4098 assert_matches!(
4099 &diffs[0],
4100 VectorDiff::Insert { index: 0, value: event } => {
4101 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4102 }
4103 );
4104 }
4105 );
4106 }
4107
4108 {
4112 let room_event_cache = &room_event_cache_p1;
4113 let updates_stream = &mut updates_stream_p1;
4114
4115 assert!(event_loaded(room_event_cache, ev_id_1).await);
4117
4118 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4121
4122 assert_matches!(
4124 updates_stream.recv().await.unwrap(),
4125 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4126 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4127 assert_matches!(&diffs[0], VectorDiff::Clear);
4128 assert_matches!(
4129 &diffs[1],
4130 VectorDiff::Append { values: events } => {
4131 assert_eq!(events.len(), 1);
4132 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4133 }
4134 );
4135 }
4136 );
4137
4138 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4140
4141 assert!(event_loaded(room_event_cache, ev_id_0).await);
4143
4144 assert_matches!(
4146 updates_stream.recv().await.unwrap(),
4147 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4148 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4149 assert_matches!(
4150 &diffs[0],
4151 VectorDiff::Insert { index: 0, value: event } => {
4152 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4153 }
4154 );
4155 }
4156 );
4157 }
4158 }
4159
4160 for _ in 0..3 {
4163 {
4164 let room_event_cache = &room_event_cache_p0;
4165 let updates_stream = &mut updates_stream_p0;
4166
4167 let guard = room_event_cache.inner.state.read().await.unwrap();
4168
4169 assert!(guard.is_dirty().not());
4175
4176 assert_matches!(
4178 updates_stream.recv().await.unwrap(),
4179 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4180 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4181 assert_matches!(&diffs[0], VectorDiff::Clear);
4182 assert_matches!(
4183 &diffs[1],
4184 VectorDiff::Append { values: events } => {
4185 assert_eq!(events.len(), 1);
4186 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4187 }
4188 );
4189 }
4190 );
4191
4192 assert!(event_loaded(room_event_cache, ev_id_1).await);
4193 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4194
4195 drop(guard);
4201
4202 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4203 assert!(event_loaded(room_event_cache, ev_id_0).await);
4204
4205 assert_matches!(
4207 updates_stream.recv().await.unwrap(),
4208 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4209 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4210 assert_matches!(
4211 &diffs[0],
4212 VectorDiff::Insert { index: 0, value: event } => {
4213 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4214 }
4215 );
4216 }
4217 );
4218 }
4219
4220 {
4221 let room_event_cache = &room_event_cache_p1;
4222 let updates_stream = &mut updates_stream_p1;
4223
4224 let guard = room_event_cache.inner.state.read().await.unwrap();
4225
4226 assert!(guard.is_dirty().not());
4231
4232 assert_matches!(
4234 updates_stream.recv().await.unwrap(),
4235 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4236 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4237 assert_matches!(&diffs[0], VectorDiff::Clear);
4238 assert_matches!(
4239 &diffs[1],
4240 VectorDiff::Append { values: events } => {
4241 assert_eq!(events.len(), 1);
4242 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4243 }
4244 );
4245 }
4246 );
4247
4248 assert!(event_loaded(room_event_cache, ev_id_1).await);
4249 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4250
4251 drop(guard);
4257
4258 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4259 assert!(event_loaded(room_event_cache, ev_id_0).await);
4260
4261 assert_matches!(
4263 updates_stream.recv().await.unwrap(),
4264 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4265 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4266 assert_matches!(
4267 &diffs[0],
4268 VectorDiff::Insert { index: 0, value: event } => {
4269 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4270 }
4271 );
4272 }
4273 );
4274 }
4275 }
4276
4277 for _ in 0..3 {
4279 {
4280 let room_event_cache = &room_event_cache_p0;
4281 let updates_stream = &mut updates_stream_p0;
4282
4283 let guard = room_event_cache.inner.state.write().await.unwrap();
4284
4285 assert!(guard.is_dirty().not());
4287
4288 assert_matches!(
4290 updates_stream.recv().await.unwrap(),
4291 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4292 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4293 assert_matches!(&diffs[0], VectorDiff::Clear);
4294 assert_matches!(
4295 &diffs[1],
4296 VectorDiff::Append { values: events } => {
4297 assert_eq!(events.len(), 1);
4298 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4299 }
4300 );
4301 }
4302 );
4303
4304 drop(guard);
4307
4308 assert!(event_loaded(room_event_cache, ev_id_1).await);
4309 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4310
4311 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4312 assert!(event_loaded(room_event_cache, ev_id_0).await);
4313
4314 assert_matches!(
4316 updates_stream.recv().await.unwrap(),
4317 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4318 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4319 assert_matches!(
4320 &diffs[0],
4321 VectorDiff::Insert { index: 0, value: event } => {
4322 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4323 }
4324 );
4325 }
4326 );
4327 }
4328
4329 {
4330 let room_event_cache = &room_event_cache_p1;
4331 let updates_stream = &mut updates_stream_p1;
4332
4333 let guard = room_event_cache.inner.state.write().await.unwrap();
4334
4335 assert!(guard.is_dirty().not());
4337
4338 assert_matches!(
4340 updates_stream.recv().await.unwrap(),
4341 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4342 assert_eq!(diffs.len(), 2, "{diffs:#?}");
4343 assert_matches!(&diffs[0], VectorDiff::Clear);
4344 assert_matches!(
4345 &diffs[1],
4346 VectorDiff::Append { values: events } => {
4347 assert_eq!(events.len(), 1);
4348 assert_eq!(events[0].event_id().as_deref(), Some(ev_id_1));
4349 }
4350 );
4351 }
4352 );
4353
4354 drop(guard);
4357
4358 assert!(event_loaded(room_event_cache, ev_id_1).await);
4359 assert!(event_loaded(room_event_cache, ev_id_0).await.not());
4360
4361 room_event_cache.pagination().run_backwards_once(1).await.unwrap();
4362 assert!(event_loaded(room_event_cache, ev_id_0).await);
4363
4364 assert_matches!(
4366 updates_stream.recv().await.unwrap(),
4367 RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. } => {
4368 assert_eq!(diffs.len(), 1, "{diffs:#?}");
4369 assert_matches!(
4370 &diffs[0],
4371 VectorDiff::Insert { index: 0, value: event } => {
4372 assert_eq!(event.event_id().as_deref(), Some(ev_id_0));
4373 }
4374 );
4375 }
4376 );
4377 }
4378 }
4379 }
4380
4381 #[async_test]
4382 async fn test_load_when_dirty() {
4383 let room_id_0 = room_id!("!raclette:patate.ch");
4384 let room_id_1 = room_id!("!morbiflette:patate.ch");
4385
4386 let event_cache_store = MemoryStore::new();
4388
4389 let client_p0 = MockClientBuilder::new(None)
4391 .on_builder(|builder| {
4392 builder.store_config(
4393 StoreConfig::new("process #0".to_owned())
4394 .event_cache_store(event_cache_store.clone()),
4395 )
4396 })
4397 .build()
4398 .await;
4399
4400 let client_p1 = MockClientBuilder::new(None)
4402 .on_builder(|builder| {
4403 builder.store_config(
4404 StoreConfig::new("process #1".to_owned()).event_cache_store(event_cache_store),
4405 )
4406 })
4407 .build()
4408 .await;
4409
4410 let (room_event_cache_0_p0, room_event_cache_0_p1) = {
4412 let event_cache_p0 = client_p0.event_cache();
4413 event_cache_p0.subscribe().unwrap();
4414
4415 let event_cache_p1 = client_p1.event_cache();
4416 event_cache_p1.subscribe().unwrap();
4417
4418 client_p0
4419 .base_client()
4420 .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4421 client_p0
4422 .base_client()
4423 .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4424
4425 client_p1
4426 .base_client()
4427 .get_or_create_room(room_id_0, matrix_sdk_base::RoomState::Joined);
4428 client_p1
4429 .base_client()
4430 .get_or_create_room(room_id_1, matrix_sdk_base::RoomState::Joined);
4431
4432 let (room_event_cache_0_p0, _drop_handles) =
4433 client_p0.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4434 let (room_event_cache_0_p1, _drop_handles) =
4435 client_p1.get_room(room_id_0).unwrap().event_cache().await.unwrap();
4436
4437 (room_event_cache_0_p0, room_event_cache_0_p1)
4438 };
4439
4440 {
4442 drop(room_event_cache_0_p0.inner.state.read().await.unwrap());
4443 drop(room_event_cache_0_p1.inner.state.read().await.unwrap());
4444 }
4445
4446 let (room_event_cache_1_p0, _) =
4450 client_p0.get_room(room_id_1).unwrap().event_cache().await.unwrap();
4451
4452 {
4454 let guard = room_event_cache_1_p0.inner.state.read().await.unwrap();
4455 assert!(guard.is_dirty().not());
4456 }
4457
4458 }
4461
4462 async fn event_loaded(room_event_cache: &RoomEventCache, event_id: &EventId) -> bool {
4463 room_event_cache
4464 .rfind_map_event_in_memory_by(|event, _previous_event_id| {
4465 (event.event_id().as_deref() == Some(event_id)).then_some(())
4466 })
4467 .await
4468 .unwrap()
4469 .is_some()
4470 }
4471}