1use std::{
16 collections::{BTreeMap, HashMap, HashSet},
17 sync::{
18 Arc, OnceLock,
19 atomic::{AtomicUsize, Ordering},
20 },
21};
22
23use eyeball::SharedObservable;
24use eyeball_im::VectorDiff;
25use matrix_sdk_base::{
26 RoomInfoNotableUpdateReasons, apply_redaction, check_validity_of_replacement_events,
27 deserialized_responses::{ThreadSummary, ThreadSummaryStatus},
28 event_cache::{
29 Event, Gap,
30 store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState},
31 },
32 linked_chunk::{
33 ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, OwnedLinkedChunkId, Position,
34 Update, lazy_loader,
35 },
36 serde_helpers::{extract_edit_target, extract_thread_root},
37 sync::Timeline,
38};
39use matrix_sdk_common::executor::spawn;
40use ruma::{
41 EventId, OwnedEventId, OwnedRoomId, OwnedUserId,
42 events::{
43 AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
44 MessageLikeEventType,
45 receipt::{ReceiptEventContent, SyncReceiptEvent},
46 relation::RelationType,
47 room::redaction::SyncRoomRedactionEvent,
48 },
49 room_version_rules::RoomVersionRules,
50 serde::Raw,
51};
52use tokio::sync::broadcast::{Receiver, Sender};
53use tracing::{debug, error, instrument, trace, warn};
54
55use super::{
56 super::{
57 super::{
58 EventCacheError,
59 deduplicator::{DeduplicationOutcome, filter_duplicate_events},
60 persistence::send_updates_to_store,
61 },
62 EventLocation, TimelineVectorDiffs,
63 event_focused::{EventFocusThreadMode, EventFocusedCache},
64 event_linked_chunk::EventLinkedChunk,
65 lock,
66 pinned_events::PinnedEventCache,
67 read_receipts::compute_unread_counts,
68 thread::ThreadEventCache,
69 },
70 EventsOrigin, PostProcessingOrigin, RoomEventCacheGenericUpdate,
71 RoomEventCacheLinkedChunkUpdate, RoomEventCacheUpdate, RoomEventCacheUpdateSender,
72 sort_positions_descending,
73};
74use crate::{
75 Room,
76 event_cache::{
77 automatic_pagination::AutomaticPagination, caches::pagination::SharedPaginationStatus,
78 },
79 room::WeakRoom,
80};
81
82#[derive(Hash, PartialEq, Eq)]
84struct EventFocusedCacheKey {
85 focused: OwnedEventId,
87 thread_mode: EventFocusThreadMode,
89}
90
91pub struct RoomEventCacheState {
92 enabled_thread_support: bool,
94
95 pub room_id: OwnedRoomId,
97
98 weak_room: WeakRoom,
100
101 pub own_user_id: OwnedUserId,
103
104 store: EventCacheStoreLock,
106
107 room_linked_chunk: EventLinkedChunk,
110
111 threads: HashMap<OwnedEventId, ThreadEventCache>,
115
116 event_focused_caches: HashMap<EventFocusedCacheKey, EventFocusedCache>,
122
123 pinned_event_cache: OnceLock<PinnedEventCache>,
125
126 pagination_status: SharedObservable<SharedPaginationStatus>,
127
128 update_sender: RoomEventCacheUpdateSender,
133
134 pub(super) linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
137
138 room_version_rules: RoomVersionRules,
140
141 waited_for_initial_prev_token: bool,
146
147 subscriber_count: Arc<AtomicUsize>,
150
151 automatic_pagination: Option<AutomaticPagination>,
153}
154
155impl RoomEventCacheState {
156 pub fn room_linked_chunk(&self) -> &EventLinkedChunk {
158 &self.room_linked_chunk
159 }
160
161 async fn find_event(
164 &self,
165 event_id: &EventId,
166 store: &EventCacheStoreLockGuard,
167 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
168 for (position, event) in self.room_linked_chunk.revents() {
171 if event.event_id().as_deref() == Some(event_id) {
172 return Ok(Some((EventLocation::Memory(position), event.clone())));
173 }
174 }
175
176 Ok(store
177 .find_event(&self.room_id, event_id)
178 .await?
179 .map(|event| (EventLocation::Store, event)))
180 }
181
182 async fn find_event_with_relations(
186 &self,
187 event_id: &EventId,
188 filters: Option<Vec<RelationType>>,
189 store: &EventCacheStoreLockGuard,
190 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
191 let found = store.find_event(&self.room_id, event_id).await?;
193
194 let Some(target) = found else {
195 return Ok(None);
197 };
198
199 let related = self.find_event_relations(event_id, filters, store).await?;
201
202 Ok(Some((target, related)))
203 }
204
205 async fn find_event_relations(
208 &self,
209 event_id: &EventId,
210 filters: Option<Vec<RelationType>>,
211 store: &EventCacheStoreLockGuard,
212 ) -> Result<Vec<Event>, EventCacheError> {
213 let mut related =
216 store.find_event_relations(&self.room_id, event_id, filters.as_deref()).await?;
217 let mut stack =
218 related.iter().filter_map(|(event, _pos)| event.event_id()).collect::<Vec<_>>();
219
220 let mut already_seen = HashSet::new();
223 already_seen.insert(event_id.to_owned());
224
225 let mut num_iters = 1;
226
227 while let Some(event_id) = stack.pop() {
229 if !already_seen.insert(event_id.clone()) {
230 continue;
232 }
233
234 let other_related =
235 store.find_event_relations(&self.room_id, &event_id, filters.as_deref()).await?;
236
237 stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id()));
238 related.extend(other_related);
239
240 num_iters += 1;
241 }
242
243 trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
244
245 related.sort_by(|(_, lhs), (_, rhs)| {
249 use std::cmp::Ordering;
250
251 match (lhs, rhs) {
252 (None, None) => Ordering::Equal,
253 (None, Some(_)) => Ordering::Less,
254 (Some(_), None) => Ordering::Greater,
255 (Some(lhs), Some(rhs)) => {
256 let lhs = self.room_linked_chunk.event_order(*lhs);
257 let rhs = self.room_linked_chunk.event_order(*rhs);
258
259 match (lhs, rhs) {
263 (None, None) => Ordering::Equal,
264 (None, Some(_)) => Ordering::Less,
265 (Some(_), None) => Ordering::Greater,
266 (Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
267 }
268 }
269 }
270 });
271
272 let related = related.into_iter().map(|(event, _pos)| event).collect();
274
275 Ok(related)
276 }
277}
278
279impl lock::Store for RoomEventCacheState {
280 fn store(&self) -> &EventCacheStoreLock {
281 &self.store
282 }
283}
284
285pub type LockedRoomEventCacheState = lock::StateLock<RoomEventCacheState>;
290
291impl LockedRoomEventCacheState {
292 #[allow(clippy::too_many_arguments)]
303 pub async fn new(
304 own_user_id: OwnedUserId,
305 room_id: OwnedRoomId,
306 weak_room: WeakRoom,
307 room_version_rules: RoomVersionRules,
308 enabled_thread_support: bool,
309 update_sender: RoomEventCacheUpdateSender,
310 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
311 store: EventCacheStoreLock,
312 pagination_status: SharedObservable<SharedPaginationStatus>,
313 automatic_pagination: Option<AutomaticPagination>,
314 ) -> Result<Self, EventCacheError> {
315 let store_guard = match store.lock().await? {
316 EventCacheStoreLockState::Clean(guard) => guard,
318
319 EventCacheStoreLockState::Dirty(guard) => {
322 EventCacheStoreLockGuard::clear_dirty(&guard);
323
324 guard
325 }
326 };
327
328 let linked_chunk_id = LinkedChunkId::Room(&room_id);
329
330 let full_linked_chunk_metadata =
335 match load_linked_chunk_metadata(&store_guard, linked_chunk_id).await {
336 Ok(metas) => metas,
337 Err(err) => {
338 error!("error when loading a linked chunk's metadata from the store: {err}");
339
340 store_guard
342 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
343 .await?;
344
345 None
347 }
348 };
349
350 let linked_chunk = match store_guard
351 .load_last_chunk(linked_chunk_id)
352 .await
353 .map_err(EventCacheError::from)
354 .and_then(|(last_chunk, chunk_identifier_generator)| {
355 lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
356 .map_err(EventCacheError::from)
357 }) {
358 Ok(linked_chunk) => linked_chunk,
359 Err(err) => {
360 error!("error when loading a linked chunk's latest chunk from the store: {err}");
361
362 store_guard
364 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
365 .await?;
366
367 None
368 }
369 };
370
371 Ok(Self::new_inner(RoomEventCacheState {
372 own_user_id,
373 enabled_thread_support,
374 room_id,
375 weak_room,
376 store,
377 room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk(
378 linked_chunk,
379 full_linked_chunk_metadata,
380 ),
381 threads: HashMap::new(),
386 event_focused_caches: HashMap::new(),
389 pagination_status,
390 update_sender,
391 linked_chunk_update_sender,
392 room_version_rules,
393 waited_for_initial_prev_token: false,
394 subscriber_count: Default::default(),
395 pinned_event_cache: OnceLock::new(),
396 automatic_pagination,
397 }))
398 }
399}
400
401pub type RoomEventCacheStateLockReadGuard<'a> = lock::StateLockReadGuard<'a, RoomEventCacheState>;
405
406pub type RoomEventCacheStateLockWriteGuard<'a> = lock::StateLockWriteGuard<'a, RoomEventCacheState>;
410
411impl<'a> lock::Reload for RoomEventCacheStateLockWriteGuard<'a> {
412 async fn reload(&mut self) -> Result<(), EventCacheError> {
414 self.shrink_to_last_chunk().await?;
415
416 let diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
417
418 self.state.update_sender.send(
420 RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
421 diffs,
422 origin: EventsOrigin::Cache,
423 }),
424 Some(RoomEventCacheGenericUpdate { room_id: self.state.room_id.clone() }),
425 );
426
427 Ok(())
428 }
429}
430
431impl<'a> RoomEventCacheStateLockReadGuard<'a> {
432 pub fn subscriber_count(&self) -> &Arc<AtomicUsize> {
434 &self.state.subscriber_count
435 }
436
437 pub async fn find_event(
442 &self,
443 event_id: &EventId,
444 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
445 self.state.find_event(event_id, &self.store).await
446 }
447
448 pub async fn find_event_with_relations(
461 &self,
462 event_id: &EventId,
463 filters: Option<Vec<RelationType>>,
464 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
465 self.state.find_event_with_relations(event_id, filters, &self.store).await
466 }
467
468 pub async fn find_event_relations(
481 &self,
482 event_id: &EventId,
483 filters: Option<Vec<RelationType>>,
484 ) -> Result<Vec<Event>, EventCacheError> {
485 self.state.find_event_relations(event_id, filters, &self.store).await
486 }
487
488 pub fn rfind_map_event_in_memory_by<O, P>(&self, mut predicate: P) -> Option<O>
496 where
497 P: FnMut(&Event) -> Option<O>,
498 {
499 self.state.room_linked_chunk.revents().find_map(|(_, event)| predicate(event))
500 }
501
502 #[cfg(test)]
503 pub fn is_dirty(&self) -> bool {
504 EventCacheStoreLockGuard::is_dirty(&self.store)
505 }
506
507 pub async fn subscribe_to_pinned_events(
519 &self,
520 room: Room,
521 ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>), EventCacheError> {
522 let pinned_event_cache = self.state.pinned_event_cache.get_or_init(|| {
523 PinnedEventCache::new(
524 room,
525 self.state.linked_chunk_update_sender.clone(),
526 self.state.store.clone(),
527 )
528 });
529
530 pinned_event_cache.subscribe().await
531 }
532
533 pub fn get_event_focused_cache(
538 &self,
539 event_id: OwnedEventId,
540 thread_mode: EventFocusThreadMode,
541 ) -> Option<EventFocusedCache> {
542 get_event_focused_cache(&self.state, event_id, thread_mode)
543 }
544}
545
546impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
547 pub fn room_linked_chunk_mut(&mut self) -> &mut EventLinkedChunk {
549 &mut self.state.room_linked_chunk
550 }
551
552 #[cfg(any(feature = "e2e-encryption", test))]
555 pub fn pinned_event_cache(&self) -> Option<&PinnedEventCache> {
556 self.state.pinned_event_cache.get()
557 }
558
559 #[cfg(feature = "e2e-encryption")]
561 pub fn event_focused_caches(&self) -> impl Iterator<Item = &EventFocusedCache> {
562 self.state.event_focused_caches.values()
563 }
564
565 pub fn waited_for_initial_prev_token(&self) -> bool {
567 self.state.waited_for_initial_prev_token
568 }
569
570 pub fn waited_for_initial_prev_token_mut(&mut self) -> &mut bool {
572 &mut self.state.waited_for_initial_prev_token
573 }
574
575 pub async fn find_event(
580 &self,
581 event_id: &EventId,
582 ) -> Result<Option<(EventLocation, Event)>, EventCacheError> {
583 self.state.find_event(event_id, &self.store).await
584 }
585
586 pub async fn find_event_with_relations(
599 &self,
600 event_id: &EventId,
601 filters: Option<Vec<RelationType>>,
602 ) -> Result<Option<(Event, Vec<Event>)>, EventCacheError> {
603 self.state.find_event_with_relations(event_id, filters, &self.store).await
604 }
605
606 pub async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> {
615 let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id);
617 let (last_chunk, chunk_identifier_generator) =
618 match self.store.load_last_chunk(linked_chunk_id).await {
619 Ok(pair) => pair,
620
621 Err(err) => {
622 error!("error when reloading a linked chunk from memory: {err}");
624
625 self.store
627 .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
628 .await?;
629
630 (None, ChunkIdentifierGenerator::new_from_scratch())
632 }
633 };
634
635 debug!("unloading the linked chunk, and resetting it to its last chunk");
636
637 if let Err(err) =
640 self.state.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator)
641 {
642 error!("error when replacing the linked chunk: {err}");
643 return self.reset_internal().await;
644 }
645
646 self.state
649 .pagination_status
650 .set(SharedPaginationStatus::Idle { hit_timeline_start: false });
651
652 let _ = self.state.room_linked_chunk.store_updates().take();
655
656 Ok(())
657 }
658
659 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
662 pub async fn auto_shrink_if_no_subscribers(
663 &mut self,
664 ) -> Result<Option<Vec<VectorDiff<Event>>>, EventCacheError> {
665 let subscriber_count = self.state.subscriber_count.load(Ordering::SeqCst);
666
667 trace!(subscriber_count, "received request to auto-shrink");
668
669 if subscriber_count == 0 {
670 self.shrink_to_last_chunk().await?;
673
674 Ok(Some(self.state.room_linked_chunk.updates_as_vector_diffs()))
675 } else {
676 Ok(None)
677 }
678 }
679
680 #[instrument(skip_all)]
686 pub async fn remove_events(
687 &mut self,
688 in_memory_events: Vec<(OwnedEventId, Position)>,
689 in_store_events: Vec<(OwnedEventId, Position)>,
690 ) -> Result<(), EventCacheError> {
691 if !in_store_events.is_empty() {
693 let mut positions = in_store_events
694 .into_iter()
695 .map(|(_event_id, position)| position)
696 .collect::<Vec<_>>();
697
698 sort_positions_descending(&mut positions);
699
700 let updates =
701 positions.into_iter().map(|pos| Update::RemoveItem { at: pos }).collect::<Vec<_>>();
702
703 self.apply_store_only_updates(updates).await?;
704 }
705
706 if in_memory_events.is_empty() {
708 return Ok(());
710 }
711
712 self.state
714 .room_linked_chunk
715 .remove_events_by_position(
716 in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
717 )
718 .expect("failed to remove an event");
719
720 self.propagate_changes().await
721 }
722
723 async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
724 let updates = self.state.room_linked_chunk.store_updates().take();
725
726 self.send_updates_to_store(updates).await
727 }
728
729 async fn apply_store_only_updates(
736 &mut self,
737 updates: Vec<Update<Event, Gap>>,
738 ) -> Result<(), EventCacheError> {
739 self.state.room_linked_chunk.order_tracker.map_updates(&updates);
740 self.send_updates_to_store(updates).await
741 }
742
743 async fn send_updates_to_store(
744 &mut self,
745 updates: Vec<Update<Event, Gap>>,
746 ) -> Result<(), EventCacheError> {
747 let linked_chunk_id = OwnedLinkedChunkId::Room(self.state.room_id.clone());
748
749 send_updates_to_store(
750 &self.store,
751 linked_chunk_id,
752 &self.state.linked_chunk_update_sender,
753 updates,
754 )
755 .await
756 }
757
758 pub async fn reset(&mut self) -> Result<Vec<VectorDiff<Event>>, EventCacheError> {
764 self.reset_internal().await?;
765
766 let diff_updates = self.state.room_linked_chunk.updates_as_vector_diffs();
767
768 debug_assert_eq!(diff_updates.len(), 1);
770 debug_assert!(matches!(diff_updates[0], VectorDiff::Clear));
771
772 Ok(diff_updates)
773 }
774
775 async fn reset_internal(&mut self) -> Result<(), EventCacheError> {
776 self.state.room_linked_chunk.reset();
777
778 for thread in self.state.threads.values_mut() {
783 thread.clear().await?;
784 }
785
786 self.propagate_changes().await?;
787
788 self.state.waited_for_initial_prev_token = false;
792
793 self.state
795 .pagination_status
796 .set(SharedPaginationStatus::Idle { hit_timeline_start: false });
797
798 Ok(())
799 }
800
801 #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"]
809 pub async fn handle_sync(
810 &mut self,
811 mut timeline: Timeline,
812 ephemeral_events: &[Raw<AnySyncEphemeralRoomEvent>],
813 ) -> Result<(bool, Vec<VectorDiff<Event>>), EventCacheError> {
814 let mut prev_batch = timeline.prev_batch.take();
815
816 let DeduplicationOutcome {
817 all_events: events,
818 in_memory_duplicated_event_ids,
819 in_store_duplicated_event_ids,
820 non_empty_all_duplicates: all_duplicates,
821 } = filter_duplicate_events(
822 &self.state.own_user_id,
823 &self.store,
824 LinkedChunkId::Room(&self.state.room_id),
825 &self.state.room_linked_chunk,
826 timeline.events,
827 )
828 .await?;
829
830 if !timeline.limited && self.state.room_linked_chunk.events().next().is_some()
843 || all_duplicates
844 {
845 prev_batch = None;
846 }
847
848 let has_new_gap = prev_batch.is_some();
849
850 if has_new_gap {
851 let mut summaries_to_update = Vec::new();
856
857 for (thread_root, thread) in self.state.threads.iter_mut() {
858 thread.clear().await?;
860
861 summaries_to_update.push(thread_root.clone());
862 }
863
864 for thread_root in summaries_to_update {
868 let Some((location, mut target_event)) = self.find_event(&thread_root).await?
869 else {
870 trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync");
871 continue;
872 };
873
874 if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() {
875 prev_summary.latest_reply = None;
876
877 target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary);
878
879 self.replace_event_at(location, target_event).await?;
880 }
881 }
882 }
883
884 if all_duplicates {
885 if let Some(new_receipt) = extract_read_receipt(ephemeral_events) {
891 self.update_read_receipts(Some(&new_receipt)).await?;
892 }
893
894 return Ok((false, Vec::new()));
895 }
896
897 if !self.state.waited_for_initial_prev_token && has_new_gap {
900 self.state.waited_for_initial_prev_token = true;
901 }
902
903 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids).await?;
908
909 self.state
910 .room_linked_chunk
911 .push_live_events(prev_batch.map(|prev_token| Gap { token: prev_token }), &events);
912
913 let new_receipt = extract_read_receipt(ephemeral_events);
915 self.post_process_new_events(events, PostProcessingOrigin::Sync, new_receipt).await?;
916
917 if timeline.limited && has_new_gap {
918 self.shrink_to_last_chunk().await?;
925 }
926
927 let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs();
928
929 Ok((has_new_gap, timeline_event_diffs))
930 }
931
932 pub async fn subscribe_to_thread(
935 &mut self,
936 root: OwnedEventId,
937 ) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>), EventCacheError> {
938 self.get_or_reload_thread(root).subscribe().await
939 }
940
941 pub async fn post_process_new_events(
950 &mut self,
951 events: Vec<Event>,
952 post_processing_origin: PostProcessingOrigin,
953 receipt_event: Option<ReceiptEventContent>,
954 ) -> Result<(), EventCacheError> {
955 self.propagate_changes().await?;
957
958 let state = &mut *self.state;
961
962 if let Some(pinned_event_cache) = state.pinned_event_cache.get_mut() {
963 pinned_event_cache
964 .maybe_add_live_related_events(&events, &state.room_version_rules.redaction)
965 .await?;
966 }
967
968 let mut new_events_by_thread: BTreeMap<_, Vec<_>> = BTreeMap::new();
969
970 for event in events {
971 self.maybe_apply_new_redaction(&event, post_processing_origin).await?;
972
973 if self.state.enabled_thread_support {
974 if matches!(post_processing_origin, PostProcessingOrigin::Sync) {
979 if let Some(thread_root) = extract_thread_root(event.raw()) {
980 new_events_by_thread.entry(thread_root).or_default().push(event.clone());
981 } else if let Some(event_id) = event.event_id() {
982 if self.state.threads.contains_key(&event_id) {
984 new_events_by_thread.entry(event_id).or_default().push(event.clone());
985 }
986 }
987 }
988
989 #[cfg(feature = "e2e-encryption")]
993 if matches!(post_processing_origin, PostProcessingOrigin::Redecryption)
994 && let Some(thread_root) = extract_thread_root(event.raw())
995 {
996 new_events_by_thread.entry(thread_root).or_default();
997 }
998
999 if let Some(edit_target) = extract_edit_target(event.raw()) {
1001 if let Some((_location, edit_target_event)) =
1003 self.find_event(&edit_target).await?
1004 && let Some(thread_root) = extract_thread_root(edit_target_event.raw())
1005 {
1006 new_events_by_thread.entry(thread_root).or_default();
1009 }
1010 }
1011 }
1012
1013 if let Some(bundled_thread) = event.bundled_latest_thread_event {
1015 self.save_events([*bundled_thread]).await?;
1016 }
1017 }
1018
1019 if self.state.enabled_thread_support {
1020 self.update_threads(new_events_by_thread, post_processing_origin).await?;
1021 }
1022
1023 self.update_read_receipts(receipt_event.as_ref()).await?;
1024
1025 Ok(())
1026 }
1027
1028 pub async fn update_read_receipts(
1031 &mut self,
1032 receipt_event: Option<&ReceiptEventContent>,
1033 ) -> Result<(), EventCacheError> {
1034 let Some(room) = self.state.weak_room.get() else {
1035 debug!("can't update read receipts: client's closing");
1036 return Ok(());
1037 };
1038
1039 let user_id = &self.state.own_user_id;
1040 let room_id = &self.state.room_id;
1041
1042 let prev_read_receipts = room.read_receipts().clone();
1043 let mut read_receipts = prev_read_receipts.clone();
1044
1045 compute_unread_counts(
1046 user_id,
1047 room_id,
1048 receipt_event,
1049 &self.state.room_linked_chunk,
1050 &mut read_receipts,
1051 self.state.enabled_thread_support,
1052 self.state.automatic_pagination.as_ref(),
1053 room.client().state_store(),
1054 )
1055 .await;
1056
1057 if prev_read_receipts != read_receipts {
1058 let result = room
1062 .update_and_save_room_info(|mut room_info| {
1063 room_info.set_read_receipts(read_receipts);
1064 (room_info, RoomInfoNotableUpdateReasons::READ_RECEIPT)
1065 })
1066 .await;
1067 if let Err(error) = result {
1068 error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
1069 }
1070 }
1071
1072 Ok(())
1073 }
1074
1075 pub(in super::super) fn get_or_reload_thread(
1076 &mut self,
1077 root_event_id: OwnedEventId,
1078 ) -> &mut ThreadEventCache {
1079 let room_id = self.state.room_id.clone();
1082 let weak_room = self.state.weak_room.clone();
1083 let linked_chunk_update_sender = self.state.linked_chunk_update_sender.clone();
1084 let store = self.state.store.clone();
1085
1086 self.state.threads.entry(root_event_id.clone()).or_insert_with(|| {
1087 ThreadEventCache::new(
1088 room_id,
1089 root_event_id,
1090 weak_room,
1091 store,
1092 linked_chunk_update_sender,
1093 )
1094 })
1095 }
1096
1097 #[instrument(skip_all)]
1098 async fn update_threads(
1099 &mut self,
1100 new_events_by_thread: BTreeMap<OwnedEventId, Vec<Event>>,
1101 post_processing_origin: PostProcessingOrigin,
1102 ) -> Result<(), EventCacheError> {
1103 for (thread_root, new_events) in new_events_by_thread {
1104 let thread_cache = self.get_or_reload_thread(thread_root.clone());
1105
1106 thread_cache.add_live_events(new_events).await?;
1107
1108 let mut latest_event_id = thread_cache.latest_event_id().await?;
1109
1110 if let Some(event_id) = latest_event_id.as_ref()
1113 && let Some((original_event, edits)) = self
1114 .find_event_with_relations(event_id, Some(vec![RelationType::Replacement]))
1115 .await?
1116 {
1117 let latest_valid_edit = edits.into_iter().rfind(|edit| {
1118 let original_json = original_event.raw();
1119 let original_encryption_info = original_event.encryption_info();
1120 let replacement_json = edit.raw();
1121 let replacement_encryption_info = edit.encryption_info();
1122
1123 check_validity_of_replacement_events(
1124 original_json,
1125 original_encryption_info.map(|v| &**v),
1126 replacement_json,
1127 replacement_encryption_info.map(|v| &**v),
1128 )
1129 .is_ok()
1130 });
1131
1132 if let Some(latest_valid_edit) = latest_valid_edit {
1133 latest_event_id = latest_valid_edit.event_id();
1134 }
1135 }
1136
1137 self.maybe_update_thread_summary(thread_root, latest_event_id, post_processing_origin)
1138 .await?;
1139 }
1140
1141 Ok(())
1142 }
1143
1144 async fn maybe_update_thread_summary(
1146 &mut self,
1147 thread_root: OwnedEventId,
1148 latest_event_id: Option<OwnedEventId>,
1149 _post_processing_origin: PostProcessingOrigin,
1150 ) -> Result<(), EventCacheError> {
1151 let Some((location, mut target_event)) = self.find_event(&thread_root).await? else {
1155 trace!(%thread_root, "thread root event is missing from the room linked chunk");
1156 return Ok(());
1157 };
1158
1159 let prev_summary = target_event.thread_summary.summary();
1160
1161 let num_replies = {
1170 let thread_replies = self
1171 .store
1172 .find_event_relations(
1173 &self.state.room_id,
1174 &thread_root,
1175 Some(&[RelationType::Thread]),
1176 )
1177 .await?;
1178 thread_replies.len().try_into().unwrap_or(u32::MAX)
1179 };
1180
1181 let new_summary = if num_replies > 0 {
1182 Some(ThreadSummary { num_replies, latest_reply: latest_event_id })
1183 } else {
1184 None
1185 };
1186
1187 #[cfg(feature = "e2e-encryption")]
1191 let update_if_same_summaries =
1192 matches!(_post_processing_origin, PostProcessingOrigin::Redecryption);
1193 #[cfg(not(feature = "e2e-encryption"))]
1194 let update_if_same_summaries = false;
1195
1196 if !update_if_same_summaries && prev_summary == new_summary.as_ref() {
1197 trace!(%thread_root, "thread summary is up-to-date, no need to update it");
1198 return Ok(());
1199 }
1200
1201 trace!(%thread_root, "updating thread summary: {new_summary:?}");
1203 target_event.thread_summary = ThreadSummaryStatus::from_opt(new_summary);
1204 self.replace_event_at(location, target_event).await
1205 }
1206
1207 pub async fn replace_event_at(
1214 &mut self,
1215 location: EventLocation,
1216 event: Event,
1217 ) -> Result<(), EventCacheError> {
1218 match location {
1219 EventLocation::Memory(position) => {
1220 self.state
1221 .room_linked_chunk
1222 .replace_event_at(position, event)
1223 .expect("should have been a valid position of an item");
1224 self.propagate_changes().await?;
1227 }
1228 EventLocation::Store => {
1229 self.save_events([event]).await?;
1230 }
1231 }
1232
1233 Ok(())
1234 }
1235
1236 #[instrument(skip_all)]
1240 async fn maybe_apply_new_redaction(
1241 &mut self,
1242 event: &Event,
1243 post_processing_origin: PostProcessingOrigin,
1244 ) -> Result<(), EventCacheError> {
1245 let raw_event = event.raw();
1246
1247 let Ok(Some(MessageLikeEventType::RoomRedaction)) =
1250 raw_event.get_field::<MessageLikeEventType>("type")
1251 else {
1252 return Ok(());
1253 };
1254
1255 let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
1258 redaction,
1259 ))) = raw_event.deserialize()
1260 else {
1261 return Ok(());
1262 };
1263
1264 let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else {
1265 warn!("missing target event id from the redaction event");
1266 return Ok(());
1267 };
1268
1269 let Some((location, mut target_event)) = self.find_event(event_id).await? else {
1271 trace!("redacted event is missing from the linked chunk");
1272 return Ok(());
1273 };
1274
1275 let thread_root = if let Ok(deserialized) = target_event.raw().deserialize() {
1277 if deserialized.is_redacted() {
1278 return Ok(());
1279 }
1280
1281 extract_thread_root(target_event.raw())
1284 } else {
1285 warn!("failed to deserialize the event to redact");
1286 None
1287 };
1288
1289 if let Some(redacted_event) = apply_redaction(
1290 target_event.raw(),
1291 event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
1292 &self.state.room_version_rules.redaction,
1293 ) {
1294 target_event.replace_raw(redacted_event.cast_unchecked());
1299
1300 self.replace_event_at(location, target_event).await?;
1301
1302 if let Some(thread_root) = thread_root
1310 && let Some(thread_cache) = self.state.threads.get_mut(&thread_root)
1311 {
1312 thread_cache.remove_if_present(event_id).await?;
1313
1314 let latest_event_id = thread_cache.latest_event_id().await?;
1317
1318 self.maybe_update_thread_summary(
1319 thread_root,
1320 latest_event_id,
1321 post_processing_origin,
1322 )
1323 .await?;
1324 }
1325 }
1326
1327 Ok(())
1328 }
1329
1330 pub async fn save_events(
1332 &mut self,
1333 events: impl IntoIterator<Item = Event>,
1334 ) -> Result<(), EventCacheError> {
1335 let store = self.store.clone();
1336 let room_id = self.state.room_id.clone();
1337 let events = events.into_iter().collect::<Vec<_>>();
1338
1339 spawn(async move {
1341 for event in events {
1342 store.save_event(&room_id, event).await?;
1343 }
1344 super::Result::Ok(())
1345 })
1346 .await
1347 .expect("joining failed")?;
1348
1349 Ok(())
1350 }
1351
1352 #[cfg(test)]
1353 pub fn is_dirty(&self) -> bool {
1354 EventCacheStoreLockGuard::is_dirty(&self.store)
1355 }
1356
1357 pub fn insert_event_focused_cache(
1359 &mut self,
1360 event_id: OwnedEventId,
1361 thread_mode: EventFocusThreadMode,
1362 cache: EventFocusedCache,
1363 ) {
1364 let key = EventFocusedCacheKey { focused: event_id, thread_mode };
1365 self.state.event_focused_caches.insert(key, cache);
1366 }
1367
1368 pub fn get_event_focused_cache(
1373 &self,
1374 event_id: OwnedEventId,
1375 thread_mode: EventFocusThreadMode,
1376 ) -> Option<EventFocusedCache> {
1377 get_event_focused_cache(&self.state, event_id, thread_mode)
1378 }
1379}
1380
1381fn extract_read_receipt(
1384 ephemeral_events: &[Raw<AnySyncEphemeralRoomEvent>],
1385) -> Option<ReceiptEventContent> {
1386 let mut receipt_event = None;
1387
1388 for raw_ephemeral in ephemeral_events {
1389 match raw_ephemeral.deserialize() {
1390 Ok(AnySyncEphemeralRoomEvent::Receipt(SyncReceiptEvent { content, .. })) => {
1391 receipt_event = Some(content);
1392 break;
1393 }
1394
1395 Ok(_) => {}
1396
1397 Err(err) => {
1398 error!("error when deserializing an ephemeral event from sync: {err}");
1399 }
1400 }
1401 }
1402
1403 receipt_event
1404}
1405
1406fn get_event_focused_cache(
1413 state: &RoomEventCacheState,
1414 event_id: OwnedEventId,
1415 thread_mode: EventFocusThreadMode,
1416) -> Option<EventFocusedCache> {
1417 let key = EventFocusedCacheKey { focused: event_id, thread_mode };
1418 state.event_focused_caches.get(&key).cloned()
1419}
1420
1421async fn load_linked_chunk_metadata(
1427 store_guard: &EventCacheStoreLockGuard,
1428 linked_chunk_id: LinkedChunkId<'_>,
1429) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
1430 let mut all_chunks = store_guard
1431 .load_all_chunks_metadata(linked_chunk_id)
1432 .await
1433 .map_err(EventCacheError::from)?;
1434
1435 if all_chunks.is_empty() {
1436 return Ok(None);
1438 }
1439
1440 let chunk_map: HashMap<_, _> = all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
1442
1443 let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
1445 let Some(last) = iter.next() else {
1446 return Err(EventCacheError::InvalidLinkedChunkMetadata {
1447 details: "no last chunk found".to_owned(),
1448 });
1449 };
1450
1451 if let Some(other_last) = iter.next() {
1453 return Err(EventCacheError::InvalidLinkedChunkMetadata {
1454 details: format!(
1455 "chunks {} and {} both claim to be last chunks",
1456 last.identifier.index(),
1457 other_last.identifier.index()
1458 ),
1459 });
1460 }
1461
1462 let mut seen = HashSet::new();
1465 let mut current = last;
1466 loop {
1467 if !seen.insert(current.identifier) {
1469 return Err(EventCacheError::InvalidLinkedChunkMetadata {
1470 details: format!(
1471 "cycle detected in linked chunk at {}",
1472 current.identifier.index()
1473 ),
1474 });
1475 }
1476
1477 let Some(prev_id) = current.previous else {
1478 if seen.len() != all_chunks.len() {
1480 return Err(EventCacheError::InvalidLinkedChunkMetadata {
1481 details: format!(
1482 "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
1483 seen.len(),
1484 all_chunks.len()
1485 ),
1486 });
1487 }
1488 break;
1489 };
1490
1491 let Some(pred_meta) = chunk_map.get(&prev_id) else {
1494 return Err(EventCacheError::InvalidLinkedChunkMetadata {
1495 details: format!(
1496 "missing predecessor {} chunk for {}",
1497 prev_id.index(),
1498 current.identifier.index()
1499 ),
1500 });
1501 };
1502
1503 if pred_meta.next != Some(current.identifier) {
1505 return Err(EventCacheError::InvalidLinkedChunkMetadata {
1506 details: format!(
1507 "chunk {}'s next ({:?}) doesn't match the current chunk ({})",
1508 pred_meta.identifier.index(),
1509 pred_meta.next.map(|chunk_id| chunk_id.index()),
1510 current.identifier.index()
1511 ),
1512 });
1513 }
1514
1515 current = *pred_meta;
1516 }
1517
1518 let mut current = current.identifier;
1526 for i in 0..all_chunks.len() {
1527 let j = all_chunks
1529 .iter()
1530 .rev()
1531 .position(|meta| meta.identifier == current)
1532 .map(|j| all_chunks.len() - 1 - j)
1533 .expect("the target chunk must be present in the metadata");
1534 if i != j {
1535 all_chunks.swap(i, j);
1536 }
1537 if let Some(next) = all_chunks[i].next {
1538 current = next;
1539 }
1540 }
1541
1542 Ok(Some(all_chunks))
1543}