1#![forbid(missing_docs)]
29
30use std::{
31 collections::BTreeMap,
32 fmt::Debug,
33 sync::{Arc, OnceLock},
34};
35
36use eyeball::Subscriber;
37use eyeball_im::VectorDiff;
38use matrix_sdk_base::{
39 deserialized_responses::{AmbiguityChange, TimelineEvent},
40 event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
41 store_locks::LockStoreError,
42 sync::RoomUpdates,
43};
44use matrix_sdk_common::executor::{spawn, JoinHandle};
45use once_cell::sync::OnceCell;
46use room::RoomEventCacheState;
47use ruma::{
48 events::{
49 relation::RelationType,
50 room::{message::Relation, redaction::SyncRoomRedactionEvent},
51 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
52 AnySyncTimelineEvent,
53 },
54 serde::Raw,
55 EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
56};
57use tokio::sync::{
58 broadcast::{error::RecvError, Receiver},
59 Mutex, RwLock,
60};
61use tracing::{error, info, info_span, instrument, trace, warn, Instrument as _, Span};
62
63use self::paginator::PaginatorError;
64use crate::{client::WeakClient, Client};
65
66mod deduplicator;
67mod pagination;
68mod room;
69
70pub mod paginator;
71pub use pagination::{PaginationToken, RoomPagination, TimelineHasBeenResetWhilePaginating};
72pub use room::RoomEventCache;
73
74#[derive(thiserror::Error, Debug)]
76pub enum EventCacheError {
77 #[error(
80 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
81 )]
82 NotSubscribedYet,
83
84 #[error("Room {0} hasn't been found in the Client.")]
89 RoomNotFound(OwnedRoomId),
90
91 #[error("The given back-pagination token is unknown to the event cache.")]
93 UnknownBackpaginationToken,
94
95 #[error("Error observed while back-paginating: {0}")]
97 BackpaginationError(#[from] PaginatorError),
98
99 #[error(transparent)]
101 Storage(#[from] EventCacheStoreError),
102
103 #[error(transparent)]
105 LockingStorage(#[from] LockStoreError),
106
107 #[error("The owning client of the event cache has been dropped.")]
111 ClientDropped,
112}
113
114pub type Result<T> = std::result::Result<T, EventCacheError>;
116
117pub struct EventCacheDropHandles {
119 listen_updates_task: JoinHandle<()>,
121
122 ignore_user_list_update_task: JoinHandle<()>,
124}
125
126impl Debug for EventCacheDropHandles {
127 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
129 }
130}
131
132impl Drop for EventCacheDropHandles {
133 fn drop(&mut self) {
134 self.listen_updates_task.abort();
135 self.ignore_user_list_update_task.abort();
136 }
137}
138
139#[derive(Clone)]
145pub struct EventCache {
146 inner: Arc<EventCacheInner>,
148}
149
150impl Debug for EventCache {
151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
152 f.debug_struct("EventCache").finish_non_exhaustive()
153 }
154}
155
156impl EventCache {
157 pub(crate) fn new(client: WeakClient) -> Self {
159 Self {
160 inner: Arc::new(EventCacheInner {
161 client,
162 store: Default::default(),
163 multiple_room_updates_lock: Default::default(),
164 by_room: Default::default(),
165 drop_handles: Default::default(),
166 all_events: Default::default(),
167 }),
168 }
169 }
170
171 pub fn enable_storage(&self) -> Result<()> {
176 let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| {
177 let client = self.inner.client()?;
178 Ok(client.event_cache_store().clone())
179 })?;
180 Ok(())
181 }
182
183 pub fn has_storage(&self) -> bool {
185 self.inner.has_storage()
186 }
187
188 pub fn subscribe(&self) -> Result<()> {
194 let client = self.inner.client()?;
195
196 let _ = self.inner.drop_handles.get_or_init(|| {
197 let listen_updates_task = spawn(Self::listen_task(
199 self.inner.clone(),
200 client.subscribe_to_all_room_updates(),
201 ));
202
203 let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
204 self.inner.clone(),
205 client.subscribe_to_ignore_user_list_changes(),
206 ));
207
208 Arc::new(EventCacheDropHandles { listen_updates_task, ignore_user_list_update_task })
209 });
210
211 Ok(())
212 }
213
214 pub async fn event(&self, event_id: &EventId) -> Option<TimelineEvent> {
218 self.inner
219 .all_events
220 .read()
221 .await
222 .events
223 .get(event_id)
224 .map(|(_room_id, event)| event.clone())
225 }
226
227 #[cfg(any(test, feature = "testing"))]
237 pub async fn empty_immutable_cache(&self) {
238 self.inner.all_events.write().await.events.clear();
239 }
240
241 #[instrument(skip_all)]
242 async fn ignore_user_list_update_task(
243 inner: Arc<EventCacheInner>,
244 mut ignore_user_list_stream: Subscriber<Vec<String>>,
245 ) {
246 let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
247 span.follows_from(Span::current());
248
249 async move {
250 while ignore_user_list_stream.next().await.is_some() {
251 info!("Received an ignore user list change");
252 if let Err(err) = inner.clear_all_rooms().await {
253 error!("when clearing room storage after ignore user list change: {err}");
254 }
255 }
256 info!("Ignore user list stream has closed");
257 }
258 .instrument(span)
259 .await;
260 }
261
262 #[instrument(skip_all)]
263 async fn listen_task(
264 inner: Arc<EventCacheInner>,
265 mut room_updates_feed: Receiver<RoomUpdates>,
266 ) {
267 trace!("Spawning the listen task");
268 loop {
269 match room_updates_feed.recv().await {
270 Ok(updates) => {
271 if let Err(err) = inner.handle_room_updates(updates).await {
272 match err {
273 EventCacheError::ClientDropped => {
274 info!("Closing the event cache global listen task because client dropped");
276 break;
277 }
278 err => {
279 error!("Error when handling room updates: {err}");
280 }
281 }
282 }
283 }
284
285 Err(RecvError::Lagged(num_skipped)) => {
286 warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
290 if let Err(err) = inner.clear_all_rooms().await {
291 error!("when clearing storage after lag in listen_task: {err}");
292 }
293 }
294
295 Err(RecvError::Closed) => {
296 info!("Closing the event cache global listen task because receiver closed");
298 break;
299 }
300 }
301 }
302 }
303
304 pub(crate) async fn for_room(
306 &self,
307 room_id: &RoomId,
308 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
309 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
310 return Err(EventCacheError::NotSubscribedYet);
311 };
312
313 let room = self.inner.for_room(room_id).await?;
314
315 Ok((room, drop_handles))
316 }
317
318 #[instrument(skip(self, events))]
323 pub async fn add_initial_events(
324 &self,
325 room_id: &RoomId,
326 events: Vec<TimelineEvent>,
327 prev_batch: Option<String>,
328 ) -> Result<()> {
329 if self.inner.has_storage() {
331 return Ok(());
332 }
333
334 let room_cache = self.inner.for_room(room_id).await?;
335
336 if !room_cache.inner.state.read().await.events().is_empty() {
339 return Ok(());
340 }
341
342 room_cache
347 .inner
348 .replace_all_events_by(events, prev_batch, Default::default(), Default::default())
349 .await?;
350
351 Ok(())
352 }
353}
354
355type AllEventsMap = BTreeMap<OwnedEventId, (OwnedRoomId, TimelineEvent)>;
356type RelationsMap = BTreeMap<OwnedEventId, BTreeMap<OwnedEventId, RelationType>>;
357
358#[derive(Default, Clone)]
361struct AllEventsCache {
362 events: AllEventsMap,
364 relations: RelationsMap,
367}
368
369impl AllEventsCache {
370 fn clear(&mut self) {
371 self.events.clear();
372 self.relations.clear();
373 }
374
375 fn append_related_event(&mut self, event: &TimelineEvent) {
378 let Ok(AnySyncTimelineEvent::MessageLike(ev)) = event.raw().deserialize() else {
380 return;
381 };
382
383 if let AnySyncMessageLikeEvent::RoomRedaction(room_redaction) = &ev {
385 let redacted_event_id = match room_redaction {
386 SyncRoomRedactionEvent::Original(ev) => {
387 ev.content.redacts.as_ref().or(ev.redacts.as_ref())
388 }
389 SyncRoomRedactionEvent::Redacted(redacted_redaction) => {
390 redacted_redaction.content.redacts.as_ref()
391 }
392 };
393
394 if let Some(redacted_event_id) = redacted_event_id {
395 self.relations
396 .entry(redacted_event_id.to_owned())
397 .or_default()
398 .insert(ev.event_id().to_owned(), RelationType::Replacement);
399 }
400
401 return;
402 }
403
404 let relationship = match ev.original_content() {
405 Some(AnyMessageLikeEventContent::RoomMessage(c)) => {
406 if let Some(relation) = c.relates_to {
407 match relation {
408 Relation::Replacement(replacement) => {
409 Some((replacement.event_id, RelationType::Replacement))
410 }
411 Relation::Reply { in_reply_to } => {
412 Some((in_reply_to.event_id, RelationType::Reference))
413 }
414 Relation::Thread(thread) => Some((thread.event_id, RelationType::Thread)),
415 _ => None,
417 }
418 } else {
419 None
420 }
421 }
422 Some(AnyMessageLikeEventContent::PollResponse(c)) => {
423 Some((c.relates_to.event_id, RelationType::Reference))
424 }
425 Some(AnyMessageLikeEventContent::PollEnd(c)) => {
426 Some((c.relates_to.event_id, RelationType::Reference))
427 }
428 Some(AnyMessageLikeEventContent::UnstablePollResponse(c)) => {
429 Some((c.relates_to.event_id, RelationType::Reference))
430 }
431 Some(AnyMessageLikeEventContent::UnstablePollEnd(c)) => {
432 Some((c.relates_to.event_id, RelationType::Reference))
433 }
434 Some(AnyMessageLikeEventContent::Reaction(c)) => {
435 Some((c.relates_to.event_id, RelationType::Annotation))
436 }
437 _ => None,
438 };
439
440 if let Some(relationship) = relationship {
441 self.relations
442 .entry(relationship.0)
443 .or_default()
444 .insert(ev.event_id().to_owned(), relationship.1);
445 }
446 }
447
448 fn collect_related_events(
452 &self,
453 event_id: &EventId,
454 filter: Option<&[RelationType]>,
455 ) -> Vec<TimelineEvent> {
456 let mut results = Vec::new();
457 self.collect_related_events_rec(event_id, filter, &mut results);
458 results
459 }
460
461 fn collect_related_events_rec(
462 &self,
463 event_id: &EventId,
464 filter: Option<&[RelationType]>,
465 results: &mut Vec<TimelineEvent>,
466 ) {
467 let Some(related_event_ids) = self.relations.get(event_id) else {
468 return;
469 };
470
471 for (related_event_id, relation_type) in related_event_ids {
472 if let Some(filter) = filter {
473 if !filter.contains(relation_type) {
474 continue;
475 }
476 }
477
478 if results.iter().any(|event| {
480 event.event_id().is_some_and(|added_related_event_id| {
481 added_related_event_id == *related_event_id
482 })
483 }) {
484 continue;
485 }
486
487 if let Some((_, ev)) = self.events.get(related_event_id) {
488 results.push(ev.clone());
489 self.collect_related_events_rec(related_event_id, filter, results);
490 }
491 }
492 }
493}
494
495struct EventCacheInner {
496 client: WeakClient,
499
500 store: Arc<OnceCell<EventCacheStoreLock>>,
505
506 multiple_room_updates_lock: Mutex<()>,
513
514 by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
516
517 all_events: Arc<RwLock<AllEventsCache>>,
526
527 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
529}
530
531impl EventCacheInner {
532 fn client(&self) -> Result<Client> {
533 self.client.get().ok_or(EventCacheError::ClientDropped)
534 }
535
536 fn has_storage(&self) -> bool {
538 self.store.get().is_some()
539 }
540
541 async fn clear_all_rooms(&self) -> Result<()> {
543 let rooms = self.by_room.write().await;
551 for room in rooms.values() {
552 let _ = room.inner.sender.send(RoomEventCacheUpdate::Clear);
555 room.inner.state.write().await.reset().await?;
557 }
558
559 Ok(())
560 }
561
562 #[instrument(skip(self, updates))]
564 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
565 let _lock = self.multiple_room_updates_lock.lock().await;
568
569 for (room_id, left_room_update) in updates.leave {
571 let room = self.for_room(&room_id).await?;
572
573 if let Err(err) =
574 room.inner.handle_left_room_update(self.has_storage(), left_room_update).await
575 {
576 error!("handling left room update: {err}");
578 }
579 }
580
581 for (room_id, joined_room_update) in updates.join {
583 let room = self.for_room(&room_id).await?;
584
585 if let Err(err) =
586 room.inner.handle_joined_room_update(self.has_storage(), joined_room_update).await
587 {
588 error!("handling joined room update: {err}");
590 }
591 }
592
593 Ok(())
597 }
598
599 async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
604 let by_room_guard = self.by_room.read().await;
607
608 match by_room_guard.get(room_id) {
609 Some(room) => Ok(room.clone()),
610
611 None => {
612 drop(by_room_guard);
614 let mut by_room_guard = self.by_room.write().await;
615
616 if let Some(room) = by_room_guard.get(room_id) {
619 return Ok(room.clone());
620 }
621
622 let room_state =
623 RoomEventCacheState::new(room_id.to_owned(), self.store.clone()).await?;
624
625 let room_version = self
626 .client
627 .get()
628 .and_then(|client| client.get_room(room_id))
629 .map(|room| room.clone_info().room_version_or_default())
630 .unwrap_or_else(|| {
631 warn!("unknown room version for {room_id}, using default V1");
632 RoomVersionId::V1
633 });
634
635 let room_event_cache = RoomEventCache::new(
636 self.client.clone(),
637 room_state,
638 room_id.to_owned(),
639 room_version,
640 self.all_events.clone(),
641 );
642
643 by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
644
645 Ok(room_event_cache)
646 }
647 }
648 }
649}
650
651#[derive(Debug)]
653pub struct BackPaginationOutcome {
654 pub reached_start: bool,
656
657 pub events: Vec<TimelineEvent>,
666}
667
668#[derive(Debug, Clone)]
670pub enum RoomEventCacheUpdate {
671 Clear,
673
674 MoveReadMarkerTo {
676 event_id: OwnedEventId,
678 },
679
680 UpdateMembers {
682 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
687 },
688
689 UpdateTimelineEvents {
691 diffs: Vec<VectorDiff<TimelineEvent>>,
693
694 origin: EventsOrigin,
696 },
697
698 AddEphemeralEvents {
700 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
703 },
704}
705
706#[derive(Debug, Clone)]
708pub enum EventsOrigin {
709 Sync,
711
712 Pagination,
714}
715
716#[cfg(test)]
717mod tests {
718 use assert_matches::assert_matches;
719 use futures_util::FutureExt as _;
720 use matrix_sdk_base::sync::{JoinedRoomUpdate, RoomUpdates, Timeline};
721 use matrix_sdk_test::{async_test, event_factory::EventFactory};
722 use ruma::{event_id, room_id, serde::Raw, user_id};
723 use serde_json::json;
724
725 use super::{EventCacheError, RoomEventCacheUpdate};
726 use crate::test_utils::{assert_event_matches_msg, logged_in_client};
727
728 #[async_test]
729 async fn test_must_explicitly_subscribe() {
730 let client = logged_in_client(None).await;
731
732 let event_cache = client.event_cache();
733
734 let room_id = room_id!("!omelette:fromage.fr");
737 let result = event_cache.for_room(room_id).await;
738
739 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
742 }
743
744 #[async_test]
745 async fn test_uniq_read_marker() {
746 let client = logged_in_client(None).await;
747 let room_id = room_id!("!galette:saucisse.bzh");
748 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
749
750 let event_cache = client.event_cache();
751
752 event_cache.subscribe().unwrap();
753
754 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
755
756 let (events, mut stream) = room_event_cache.subscribe().await.unwrap();
757
758 assert!(events.is_empty());
759
760 let read_marker_event = Raw::from_json_string(
762 json!({
763 "content": {
764 "event_id": "$crepe:saucisse.bzh"
765 },
766 "room_id": "!galette:saucisse.bzh",
767 "type": "m.fully_read"
768 })
769 .to_string(),
770 )
771 .unwrap();
772 let account_data = vec![read_marker_event; 100];
773
774 room_event_cache
775 .inner
776 .handle_joined_room_update(
777 event_cache.inner.has_storage(),
778 JoinedRoomUpdate { account_data, ..Default::default() },
779 )
780 .await
781 .unwrap();
782
783 assert_matches!(
785 stream.recv().await.unwrap(),
786 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
787 );
788
789 assert!(stream.recv().now_or_never().is_none());
790 }
791
792 #[async_test]
793 async fn test_get_event_by_id() {
794 let client = logged_in_client(None).await;
795 let room_id1 = room_id!("!galette:saucisse.bzh");
796 let room_id2 = room_id!("!crepe:saucisse.bzh");
797
798 let event_cache = client.event_cache();
799 event_cache.subscribe().unwrap();
800
801 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
803
804 let eid1 = event_id!("$1");
805 let eid2 = event_id!("$2");
806 let eid3 = event_id!("$3");
807
808 let joined_room_update1 = JoinedRoomUpdate {
809 timeline: Timeline {
810 events: vec![
811 f.text_msg("hey").event_id(eid1).into(),
812 f.text_msg("you").event_id(eid2).into(),
813 ],
814 ..Default::default()
815 },
816 ..Default::default()
817 };
818
819 let joined_room_update2 = JoinedRoomUpdate {
820 timeline: Timeline {
821 events: vec![f.text_msg("bjr").event_id(eid3).into()],
822 ..Default::default()
823 },
824 ..Default::default()
825 };
826
827 let mut updates = RoomUpdates::default();
828 updates.join.insert(room_id1.to_owned(), joined_room_update1);
829 updates.join.insert(room_id2.to_owned(), joined_room_update2);
830
831 event_cache.inner.handle_room_updates(updates).await.unwrap();
833
834 let found1 = event_cache.event(eid1).await.unwrap();
836 assert_event_matches_msg(&found1, "hey");
837
838 let found2 = event_cache.event(eid2).await.unwrap();
839 assert_event_matches_msg(&found2, "you");
840
841 let found3 = event_cache.event(eid3).await.unwrap();
842 assert_event_matches_msg(&found3, "bjr");
843
844 assert!(event_cache.event(event_id!("$unknown")).await.is_none());
846
847 client.base_client().get_or_create_room(room_id1, matrix_sdk_base::RoomState::Joined);
849 let room1 = client.get_room(room_id1).unwrap();
850
851 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
852
853 let found1 = room_event_cache.event(eid1).await.unwrap();
854 assert_event_matches_msg(&found1, "hey");
855
856 let found2 = room_event_cache.event(eid2).await.unwrap();
857 assert_event_matches_msg(&found2, "you");
858
859 assert!(room_event_cache.event(eid3).await.is_none());
862 assert!(event_cache.event(eid3).await.is_some());
864 }
865
866 #[async_test]
867 async fn test_save_event_and_clear() {
868 let client = logged_in_client(None).await;
869 let room_id = room_id!("!galette:saucisse.bzh");
870
871 let event_cache = client.event_cache();
872 event_cache.subscribe().unwrap();
873
874 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
875 let event_id = event_id!("$1");
876
877 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
878 let room = client.get_room(room_id).unwrap();
879
880 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
881 room_event_cache.save_event(f.text_msg("hey there").event_id(event_id).into()).await;
882
883 assert!(room_event_cache.event(event_id).await.is_some());
885 assert!(event_cache.event(event_id).await.is_some());
887
888 event_cache.empty_immutable_cache().await;
889
890 assert!(room_event_cache.event(event_id).await.is_none());
892 assert!(event_cache.event(event_id).await.is_none());
893 }
894
895 #[async_test]
896 async fn test_add_initial_events() {
897 let client = logged_in_client(None).await;
899 let room_id = room_id!("!galette:saucisse.bzh");
900
901 let event_cache = client.event_cache();
902 event_cache.subscribe().unwrap();
903
904 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
905 event_cache
906 .add_initial_events(room_id, vec![f.text_msg("hey").into()], None)
907 .await
908 .unwrap();
909
910 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
911 let room = client.get_room(room_id).unwrap();
912
913 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
914 let (initial_events, _) = room_event_cache.subscribe().await.unwrap();
915 assert_eq!(initial_events.len(), 1);
917 }
918}