1use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use decryption_retry_task::DecryptionRetryTask;
19use eyeball_im::VectorDiff;
20use eyeball_im_util::vector::VectorObserverExt;
21use futures_core::Stream;
22use imbl::Vector;
23#[cfg(test)]
24use matrix_sdk::{crypto::OlmMachine, SendOutsideWasm};
25use matrix_sdk::{
26 deserialized_responses::TimelineEvent,
27 event_cache::{
28 paginator::{PaginationResult, Paginator},
29 RoomEventCache, RoomPaginationStatus,
30 },
31 send_queue::{
32 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
33 },
34 Result, Room,
35};
36use ruma::{
37 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
38 events::{
39 poll::unstable_start::UnstablePollStartEventContent,
40 reaction::ReactionEventContent,
41 receipt::{Receipt, ReceiptThread, ReceiptType},
42 relation::Annotation,
43 room::message::{MessageType, Relation},
44 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
45 AnySyncTimelineEvent, MessageLikeEventType,
46 },
47 serde::Raw,
48 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, RoomVersionId,
49 TransactionId, UserId,
50};
51#[cfg(test)]
52use ruma::{events::receipt::ReceiptEventContent, OwnedRoomId, RoomId};
53use tokio::sync::{RwLock, RwLockWriteGuard};
54use tracing::{debug, error, field::debug, info, instrument, trace, warn};
55
56pub(super) use self::{
57 metadata::{RelativePosition, TimelineMetadata},
58 observable_items::{
59 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
60 ObservableItemsTransactionEntry,
61 },
62 state::TimelineState,
63 state_transaction::TimelineStateTransaction,
64};
65use super::{
66 algorithms::{rfind_event_by_id, rfind_event_item},
67 event_item::{ReactionStatus, RemoteEventOrigin},
68 item::TimelineUniqueId,
69 subscriber::TimelineSubscriber,
70 threaded_events_loader::ThreadedEventsLoader,
71 traits::{Decryptor, RoomDataProvider},
72 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
73 PaginationError, Profile, TimelineDetails, TimelineEventItemId, TimelineFocus, TimelineItem,
74 TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
75};
76use crate::{
77 timeline::{
78 algorithms::rfind_event_by_item_id,
79 date_dividers::DateDividerAdjuster,
80 event_item::EventTimelineItemKind,
81 pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
82 MsgLikeContent, MsgLikeKind, TimelineEventFilterFn,
83 },
84 unable_to_decrypt_hook::UtdHookManager,
85};
86
87pub(in crate::timeline) mod aggregations;
88mod decryption_retry_task;
89mod metadata;
90mod observable_items;
91mod read_receipts;
92mod state;
93mod state_transaction;
94
95pub(super) use aggregations::*;
96
97#[derive(Debug)]
99enum TimelineFocusData<P: RoomDataProvider> {
100 Live,
102
103 Event {
106 event_id: OwnedEventId,
108 paginator: Paginator<P>,
110 num_context_events: u16,
112 },
113
114 Thread {
115 loader: ThreadedEventsLoader<P>,
116
117 num_events: u16,
119 },
120
121 PinnedEvents {
122 loader: PinnedEventsLoader,
123 },
124}
125
126#[derive(Clone, Debug)]
127pub(super) struct TimelineController<P: RoomDataProvider = Room, D: Decryptor = Room> {
128 state: Arc<RwLock<TimelineState>>,
130
131 focus: Arc<RwLock<TimelineFocusData<P>>>,
133
134 pub(crate) room_data_provider: P,
138
139 pub(super) settings: TimelineSettings,
141
142 decryption_retry_task: DecryptionRetryTask<D>,
145}
146
147#[derive(Clone)]
148pub(super) struct TimelineSettings {
149 pub(super) track_read_receipts: bool,
151
152 pub(super) event_filter: Arc<TimelineEventFilterFn>,
155
156 pub(super) add_failed_to_parse: bool,
158
159 pub(super) date_divider_mode: DateDividerMode,
161}
162
163#[cfg(not(tarpaulin_include))]
164impl fmt::Debug for TimelineSettings {
165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166 f.debug_struct("TimelineSettings")
167 .field("track_read_receipts", &self.track_read_receipts)
168 .field("add_failed_to_parse", &self.add_failed_to_parse)
169 .finish_non_exhaustive()
170 }
171}
172
173impl Default for TimelineSettings {
174 fn default() -> Self {
175 Self {
176 track_read_receipts: false,
177 event_filter: Arc::new(default_event_filter),
178 add_failed_to_parse: true,
179 date_divider_mode: DateDividerMode::Daily,
180 }
181 }
182}
183
184#[derive(Debug, Clone)]
185pub(super) enum TimelineFocusKind {
186 Live { hide_threaded_events: bool },
187 Event { hide_threaded_events: bool },
188 Thread { root_event_id: OwnedEventId },
189 PinnedEvents,
190}
191
192pub fn default_event_filter(event: &AnySyncTimelineEvent, room_version: &RoomVersionId) -> bool {
202 match event {
203 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
204 if ev.redacts(room_version).is_some() {
205 false
208 } else {
209 ev.event_type() != MessageLikeEventType::Reaction
212 }
213 }
214
215 AnySyncTimelineEvent::MessageLike(msg) => {
216 match msg.original_content() {
217 None => {
218 msg.event_type() != MessageLikeEventType::Reaction
221 }
222
223 Some(original_content) => {
224 match original_content {
225 AnyMessageLikeEventContent::RoomMessage(content) => {
226 if content
227 .relates_to
228 .as_ref()
229 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
230 {
231 return false;
233 }
234
235 match content.msgtype {
236 MessageType::Audio(_)
237 | MessageType::Emote(_)
238 | MessageType::File(_)
239 | MessageType::Image(_)
240 | MessageType::Location(_)
241 | MessageType::Notice(_)
242 | MessageType::ServerNotice(_)
243 | MessageType::Text(_)
244 | MessageType::Video(_)
245 | MessageType::VerificationRequest(_) => true,
246 #[cfg(feature = "unstable-msc4274")]
247 MessageType::Gallery(_) => true,
248 _ => false,
249 }
250 }
251
252 AnyMessageLikeEventContent::Sticker(_)
253 | AnyMessageLikeEventContent::UnstablePollStart(
254 UnstablePollStartEventContent::New(_),
255 )
256 | AnyMessageLikeEventContent::CallInvite(_)
257 | AnyMessageLikeEventContent::CallNotify(_)
258 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
259
260 _ => false,
261 }
262 }
263 }
264 }
265
266 AnySyncTimelineEvent::State(_) => {
267 true
269 }
270 }
271}
272
273impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
274 pub(super) fn new(
275 room_data_provider: P,
276 focus: TimelineFocus,
277 internal_id_prefix: Option<String>,
278 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
279 is_room_encrypted: bool,
280 ) -> Self {
281 let (focus_data, focus_kind) = match focus {
282 TimelineFocus::Live { hide_threaded_events } => {
283 (TimelineFocusData::Live, TimelineFocusKind::Live { hide_threaded_events })
284 }
285
286 TimelineFocus::Event { target, num_context_events, hide_threaded_events } => {
287 let paginator = Paginator::new(room_data_provider.clone());
288 (
289 TimelineFocusData::Event { paginator, event_id: target, num_context_events },
290 TimelineFocusKind::Event { hide_threaded_events },
291 )
292 }
293
294 TimelineFocus::Thread { root_event_id, num_events } => (
295 TimelineFocusData::Thread {
296 loader: ThreadedEventsLoader::new(
297 room_data_provider.clone(),
298 root_event_id.clone(),
299 ),
300 num_events,
301 },
302 TimelineFocusKind::Thread { root_event_id },
303 ),
304
305 TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => (
306 TimelineFocusData::PinnedEvents {
307 loader: PinnedEventsLoader::new(
308 Arc::new(room_data_provider.clone()),
309 max_events_to_load as usize,
310 max_concurrent_requests as usize,
311 ),
312 },
313 TimelineFocusKind::PinnedEvents,
314 ),
315 };
316
317 let state = Arc::new(RwLock::new(TimelineState::new(
318 focus_kind,
319 room_data_provider.own_user_id().to_owned(),
320 room_data_provider.room_version(),
321 internal_id_prefix,
322 unable_to_decrypt_hook,
323 is_room_encrypted,
324 )));
325
326 let settings = TimelineSettings::default();
327
328 let decryption_retry_task =
329 DecryptionRetryTask::new(state.clone(), room_data_provider.clone());
330
331 Self {
332 state,
333 focus: Arc::new(RwLock::new(focus_data)),
334 room_data_provider,
335 settings,
336 decryption_retry_task,
337 }
338 }
339
340 pub(super) async fn init_focus(
347 &self,
348 room_event_cache: &RoomEventCache,
349 ) -> Result<bool, Error> {
350 let focus_guard = self.focus.read().await;
351
352 match &*focus_guard {
353 TimelineFocusData::Live => {
354 let events = room_event_cache.events().await;
356
357 let has_events = !events.is_empty();
358
359 self.replace_with_initial_remote_events(
360 events.into_iter(),
361 RemoteEventOrigin::Cache,
362 )
363 .await;
364
365 match room_event_cache.pagination().status().get() {
366 RoomPaginationStatus::Idle { hit_timeline_start } => {
367 if hit_timeline_start {
368 self.insert_timeline_start_if_missing().await;
371 }
372 }
373 RoomPaginationStatus::Paginating => {}
374 }
375
376 Ok(has_events)
377 }
378
379 TimelineFocusData::Event { event_id, paginator, num_context_events } => {
380 let start_from_result = paginator
382 .start_from(event_id, (*num_context_events).into())
383 .await
384 .map_err(PaginationError::Paginator)?;
385
386 drop(focus_guard);
387
388 let has_events = !start_from_result.events.is_empty();
389
390 self.replace_with_initial_remote_events(
391 start_from_result.events.into_iter(),
392 RemoteEventOrigin::Pagination,
393 )
394 .await;
395
396 Ok(has_events)
397 }
398
399 TimelineFocusData::Thread { loader, num_events } => {
400 let result = loader
401 .paginate_backwards((*num_events).into())
402 .await
403 .map_err(PaginationError::Paginator)?;
404
405 drop(focus_guard);
406
407 self.replace_with_initial_remote_events(
409 result.events.into_iter().rev(),
410 RemoteEventOrigin::Pagination,
411 )
412 .await;
413
414 Ok(true)
415 }
416
417 TimelineFocusData::PinnedEvents { loader } => {
418 let Some(loaded_events) =
419 loader.load_events().await.map_err(Error::PinnedEventsError)?
420 else {
421 return Ok(false);
423 };
424
425 drop(focus_guard);
426
427 let has_events = !loaded_events.is_empty();
428
429 self.replace_with_initial_remote_events(
430 loaded_events.into_iter(),
431 RemoteEventOrigin::Pagination,
432 )
433 .await;
434
435 Ok(has_events)
436 }
437 }
438 }
439
440 pub async fn handle_encryption_state_changes(&self) {
445 let mut room_info = self.room_data_provider.room_info();
446
447 let mark_encrypted = || async {
449 let mut state = self.state.write().await;
450 state.meta.is_room_encrypted = true;
451 state.mark_all_events_as_encrypted();
452 };
453
454 if room_info.get().encryption_state().is_encrypted() {
455 mark_encrypted().await;
458 return;
459 }
460
461 while let Some(info) = room_info.next().await {
462 if info.encryption_state().is_encrypted() {
463 mark_encrypted().await;
464 break;
467 }
468 }
469 }
470
471 pub(crate) async fn reload_pinned_events(
472 &self,
473 ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
474 let focus_guard = self.focus.read().await;
475
476 if let TimelineFocusData::PinnedEvents { loader } = &*focus_guard {
477 loader.load_events().await
478 } else {
479 Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
480 }
481 }
482
483 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
492 let state = self.state.read().await;
493
494 let (count, needs) = state
495 .meta
496 .subscriber_skip_count
497 .compute_next_when_paginating_backwards(num_events.into());
498
499 state.meta.subscriber_skip_count.update(count, &state.timeline_focus);
500
501 needs
502 }
503
504 pub(super) async fn focused_paginate_backwards(
509 &self,
510 num_events: u16,
511 ) -> Result<bool, PaginationError> {
512 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
513 TimelineFocusData::Live | TimelineFocusData::PinnedEvents { .. } => {
514 return Err(PaginationError::NotSupported)
515 }
516 TimelineFocusData::Event { paginator, .. } => paginator
517 .paginate_backward(num_events.into())
518 .await
519 .map_err(PaginationError::Paginator)?,
520 TimelineFocusData::Thread { loader, num_events } => loader
521 .paginate_backwards((*num_events).into())
522 .await
523 .map_err(PaginationError::Paginator)?,
524 };
525
526 self.handle_remote_events_with_diffs(
529 events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
530 RemoteEventOrigin::Pagination,
531 )
532 .await;
533
534 Ok(hit_end_of_timeline)
535 }
536
537 pub(super) async fn focused_paginate_forwards(
542 &self,
543 num_events: u16,
544 ) -> Result<bool, PaginationError> {
545 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus.read().await {
546 TimelineFocusData::Live
547 | TimelineFocusData::PinnedEvents { .. }
548 | TimelineFocusData::Thread { .. } => return Err(PaginationError::NotSupported),
549
550 TimelineFocusData::Event { paginator, .. } => paginator
551 .paginate_forward(num_events.into())
552 .await
553 .map_err(PaginationError::Paginator)?,
554 };
555
556 self.handle_remote_events_with_diffs(
559 vec![VectorDiff::Append { values: events.into() }],
560 RemoteEventOrigin::Pagination,
561 )
562 .await;
563
564 Ok(hit_end_of_timeline)
565 }
566
567 pub(super) async fn is_live(&self) -> bool {
569 matches!(&*self.focus.read().await, TimelineFocusData::Live)
570 }
571
572 pub(super) fn with_settings(mut self, settings: TimelineSettings) -> Self {
573 self.settings = settings;
574 self
575 }
576
577 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
581 self.state.read().await.items.clone_items()
582 }
583
584 #[cfg(test)]
585 pub(super) async fn subscribe_raw(
586 &self,
587 ) -> (
588 Vector<Arc<TimelineItem>>,
589 impl Stream<Item = VectorDiff<Arc<TimelineItem>>> + SendOutsideWasm,
590 ) {
591 let state = self.state.read().await;
592
593 state.items.subscribe().into_values_and_stream()
594 }
595
596 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
597 let state = self.state.read().await;
598
599 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
600 }
601
602 pub(super) async fn subscribe_filter_map<U, F>(
603 &self,
604 f: F,
605 ) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>)
606 where
607 U: Clone,
608 F: Fn(Arc<TimelineItem>) -> Option<U>,
609 {
610 self.state.read().await.items.subscribe().filter_map(f)
611 }
612
613 #[instrument(skip_all)]
617 pub(super) async fn toggle_reaction_local(
618 &self,
619 item_id: &TimelineEventItemId,
620 key: &str,
621 ) -> Result<bool, Error> {
622 let mut state = self.state.write().await;
623
624 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
625 warn!("Timeline item not found, can't add reaction");
626 return Err(Error::FailedToToggleReaction);
627 };
628
629 let user_id = self.room_data_provider.own_user_id();
630 let prev_status = item
631 .content()
632 .reactions()
633 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
634
635 let Some(prev_status) = prev_status else {
636 match &item.kind {
637 EventTimelineItemKind::Local(local) => {
638 if let Some(send_handle) = &local.send_handle {
639 if send_handle
640 .react(key.to_owned())
641 .await
642 .map_err(|err| Error::SendQueueError(err.into()))?
643 .is_some()
644 {
645 trace!("adding a reaction to a local echo");
646 return Ok(true);
647 }
648
649 warn!("couldn't toggle reaction for local echo");
650 return Ok(false);
651 }
652
653 warn!("missing send handle for local echo; is this a test?");
654 return Ok(false);
655 }
656
657 EventTimelineItemKind::Remote(remote) => {
658 trace!("adding a reaction to a remote echo");
662 let annotation = Annotation::new(remote.event_id.to_owned(), key.to_owned());
663 self.room_data_provider
664 .send(ReactionEventContent::from(annotation).into())
665 .await?;
666 return Ok(true);
667 }
668 }
669 };
670
671 trace!("removing a previous reaction");
672 match prev_status {
673 ReactionStatus::LocalToLocal(send_reaction_handle) => {
674 if let Some(handle) = send_reaction_handle {
675 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
676 warn!("unexpectedly unable to abort sending of local reaction");
679 }
680 } else {
681 warn!("no send reaction handle (this should only happen in testing contexts)");
682 }
683 }
684
685 ReactionStatus::LocalToRemote(send_handle) => {
686 trace!("aborting send of the previous reaction that was a local echo");
689 if let Some(handle) = send_handle {
690 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
691 warn!("unexpectedly unable to abort sending of local reaction");
694 }
695 } else {
696 warn!("no send handle (this should only happen in testing contexts)");
697 }
698 }
699
700 ReactionStatus::RemoteToRemote(event_id) => {
701 let Some(annotated_event_id) =
703 item.as_remote().map(|event_item| event_item.event_id.clone())
704 else {
705 warn!("remote reaction to remote event, but the associated item isn't remote");
706 return Ok(false);
707 };
708
709 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
710 let reaction_info = reactions.remove_reaction(user_id, key);
711
712 if reaction_info.is_some() {
713 let new_item = item.with_reactions(reactions);
714 state.items.replace(item_pos, new_item);
715 } else {
716 warn!("reaction is missing on the item, not removing it locally, but sending redaction.");
717 }
718
719 drop(state);
721
722 trace!("sending redact for a previous reaction");
723 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
724 if let Some(reaction_info) = reaction_info {
725 debug!("sending redact failed, adding the reaction back to the list");
726
727 let mut state = self.state.write().await;
728 if let Some((item_pos, item)) =
729 rfind_event_by_id(&state.items, &annotated_event_id)
730 {
731 let mut reactions =
733 item.content().reactions().cloned().unwrap_or_default();
734 reactions
735 .entry(key.to_owned())
736 .or_default()
737 .insert(user_id.to_owned(), reaction_info);
738 let new_item = item.with_reactions(reactions);
739 state.items.replace(item_pos, new_item);
740 } else {
741 warn!("couldn't find item to re-add reaction anymore; maybe it's been redacted?");
742 }
743 }
744
745 return Err(err);
746 }
747 }
748 }
749
750 Ok(false)
751 }
752
753 pub(super) async fn handle_remote_events_with_diffs(
755 &self,
756 diffs: Vec<VectorDiff<TimelineEvent>>,
757 origin: RemoteEventOrigin,
758 ) {
759 if diffs.is_empty() {
760 return;
761 }
762
763 let mut state = self.state.write().await;
764 state
765 .handle_remote_events_with_diffs(
766 diffs,
767 origin,
768 &self.room_data_provider,
769 &self.settings,
770 )
771 .await
772 }
773
774 pub(super) async fn handle_remote_aggregations(
776 &self,
777 diffs: Vec<VectorDiff<TimelineEvent>>,
778 origin: RemoteEventOrigin,
779 ) {
780 if diffs.is_empty() {
781 return;
782 }
783
784 let mut state = self.state.write().await;
785 state
786 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
787 .await
788 }
789
790 pub(super) async fn clear(&self) {
791 self.state.write().await.clear();
792 }
793
794 pub(super) async fn replace_with_initial_remote_events<Events>(
802 &self,
803 events: Events,
804 origin: RemoteEventOrigin,
805 ) where
806 Events: IntoIterator + ExactSizeIterator,
807 <Events as IntoIterator>::Item: Into<TimelineEvent>,
808 {
809 let mut state = self.state.write().await;
810
811 let track_read_markers = self.settings.track_read_receipts;
812 if track_read_markers {
813 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
814 state
815 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
816 .await;
817 }
818
819 if !state.items.is_empty() || events.len() > 0 {
825 state
826 .replace_with_remote_events(
827 events,
828 origin,
829 &self.room_data_provider,
830 &self.settings,
831 )
832 .await;
833 }
834
835 if track_read_markers {
836 if let Some(fully_read_event_id) =
837 self.room_data_provider.load_fully_read_marker().await
838 {
839 state.handle_fully_read_marker(fully_read_event_id);
840 }
841 }
842 }
843
844 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
845 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
846 }
847
848 pub(super) async fn handle_ephemeral_events(
849 &self,
850 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
851 ) {
852 let mut state = self.state.write().await;
853 state.handle_ephemeral_events(events, &self.room_data_provider).await;
854 }
855
856 #[instrument(skip_all)]
858 pub(super) async fn handle_local_event(
859 &self,
860 txn_id: OwnedTransactionId,
861 content: AnyMessageLikeEventContent,
862 send_handle: Option<SendHandle>,
863 ) {
864 let sender = self.room_data_provider.own_user_id().to_owned();
865 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
866
867 let date_divider_mode = self.settings.date_divider_mode.clone();
868
869 let mut state = self.state.write().await;
870 state
871 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
872 .await;
873 }
874
875 #[instrument(skip(self))]
880 pub(super) async fn update_event_send_state(
881 &self,
882 txn_id: &TransactionId,
883 send_state: EventSendState,
884 ) {
885 let mut state = self.state.write().await;
886 let mut txn = state.transaction();
887
888 let new_event_id: Option<&EventId> =
889 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
890
891 if rfind_event_item(&txn.items, |it| {
894 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
895 })
896 .is_some()
897 {
898 trace!("Remote echo received before send-event response");
900
901 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
902
903 if let Some((idx, _)) = local_echo {
907 warn!("Message echo got duplicated, removing the local one");
908 txn.items.remove(idx);
909
910 let mut adjuster =
912 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
913 adjuster.run(&mut txn.items, &mut txn.meta);
914 }
915
916 txn.commit();
917 return;
918 }
919
920 let result = rfind_event_item(&txn.items, |it| {
922 it.transaction_id() == Some(txn_id)
923 || new_event_id.is_some()
924 && it.event_id() == new_event_id
925 && it.as_local().is_some()
926 });
927
928 let Some((idx, item)) = result else {
929 if let Some(new_event_id) = new_event_id {
934 if txn.meta.aggregations.mark_aggregation_as_sent(
935 txn_id.to_owned(),
936 new_event_id.to_owned(),
937 &mut txn.items,
938 &txn.meta.room_version,
939 ) {
940 trace!("Aggregation marked as sent");
941 txn.commit();
942 return;
943 }
944
945 trace!("Sent aggregation was not found");
946 }
947
948 warn!("Timeline item not found, can't update send state");
949 return;
950 };
951
952 let Some(local_item) = item.as_local() else {
953 warn!("We looked for a local item, but it transitioned to remote.");
954 return;
955 };
956
957 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
960 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
961 }
962
963 if let Some(new_event_id) = new_event_id {
966 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
967 }
968
969 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
970 txn.items.replace(idx, new_item);
971
972 txn.commit();
973 }
974
975 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
976 let mut state = self.state.write().await;
977
978 if let Some((idx, _)) =
979 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
980 {
981 let mut txn = state.transaction();
982
983 txn.items.remove(idx);
984
985 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
988 adjuster.run(&mut txn.items, &mut txn.meta);
989
990 txn.meta.update_read_marker(&mut txn.items);
991
992 txn.commit();
993
994 debug!("discarded local echo");
995 return true;
996 }
997
998 let mut txn = state.transaction();
1001
1002 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1004 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1005 &mut txn.items,
1006 ) {
1007 Ok(val) => val,
1008 Err(err) => {
1009 warn!("error when discarding local echo for an aggregation: {err}");
1010 true
1012 }
1013 };
1014
1015 if found_aggregation {
1016 txn.commit();
1017 }
1018
1019 found_aggregation
1020 }
1021
1022 pub(super) async fn replace_local_echo(
1023 &self,
1024 txn_id: &TransactionId,
1025 content: AnyMessageLikeEventContent,
1026 ) -> bool {
1027 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1028 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1033 return false;
1034 };
1035
1036 let mut state = self.state.write().await;
1037 let mut txn = state.transaction();
1038
1039 let Some((idx, prev_item)) =
1040 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1041 else {
1042 debug!("Can't find local echo to replace");
1043 return false;
1044 };
1045
1046 let ti_kind = {
1049 let Some(prev_local_item) = prev_item.as_local() else {
1050 warn!("We looked for a local item, but it transitioned as remote??");
1051 return false;
1052 };
1053 prev_local_item.with_send_state(EventSendState::NotSentYet)
1054 };
1055
1056 let new_item = TimelineItem::new(
1058 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1059 content.msgtype,
1060 content.mentions,
1061 prev_item.content().reactions().cloned().unwrap_or_default(),
1062 prev_item.content().thread_root(),
1063 prev_item.content().in_reply_to(),
1064 prev_item.content().thread_summary(),
1065 )),
1066 prev_item.internal_id.to_owned(),
1067 );
1068
1069 txn.items.replace(idx, new_item);
1070
1071 txn.commit();
1075
1076 debug!("Replaced local echo");
1077 true
1078 }
1079
1080 async fn retry_event_decryption_inner(
1081 &self,
1082 decryptor: D,
1083 session_ids: Option<BTreeSet<String>>,
1084 ) {
1085 self.decryption_retry_task.decrypt(decryptor, session_ids, self.settings.clone()).await;
1086 }
1087
1088 pub(super) async fn set_sender_profiles_pending(&self) {
1089 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1090 }
1091
1092 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1093 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1094 }
1095
1096 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1097 self.state.write().await.items.for_each(|mut entry| {
1098 let Some(event_item) = entry.as_event() else { return };
1099 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1100 let new_item = entry.with_kind(TimelineItemKind::Event(
1101 event_item.with_sender_profile(profile_state.clone()),
1102 ));
1103 ObservableItemsEntry::replace(&mut entry, new_item);
1104 }
1105 });
1106 }
1107
1108 pub(super) async fn update_missing_sender_profiles(&self) {
1109 trace!("Updating missing sender profiles");
1110
1111 let mut state = self.state.write().await;
1112 let mut entries = state.items.entries();
1113 while let Some(mut entry) = entries.next() {
1114 let Some(event_item) = entry.as_event() else { continue };
1115 let event_id = event_item.event_id().map(debug);
1116 let transaction_id = event_item.transaction_id().map(debug);
1117
1118 if event_item.sender_profile().is_ready() {
1119 trace!(event_id, transaction_id, "Profile already set");
1120 continue;
1121 }
1122
1123 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1124 Some(profile) => {
1125 trace!(event_id, transaction_id, "Adding profile");
1126 let updated_item =
1127 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1128 let new_item = entry.with_kind(updated_item);
1129 ObservableItemsEntry::replace(&mut entry, new_item);
1130 }
1131 None => {
1132 if !event_item.sender_profile().is_unavailable() {
1133 trace!(event_id, transaction_id, "Marking profile unavailable");
1134 let updated_item =
1135 event_item.with_sender_profile(TimelineDetails::Unavailable);
1136 let new_item = entry.with_kind(updated_item);
1137 ObservableItemsEntry::replace(&mut entry, new_item);
1138 } else {
1139 debug!(event_id, transaction_id, "Profile already marked unavailable");
1140 }
1141 }
1142 }
1143 }
1144
1145 trace!("Done updating missing sender profiles");
1146 }
1147
1148 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1150 trace!("Forcing update of sender profiles: {sender_ids:?}");
1151
1152 let mut state = self.state.write().await;
1153 let mut entries = state.items.entries();
1154 while let Some(mut entry) = entries.next() {
1155 let Some(event_item) = entry.as_event() else { continue };
1156 if !sender_ids.contains(event_item.sender()) {
1157 continue;
1158 }
1159
1160 let event_id = event_item.event_id().map(debug);
1161 let transaction_id = event_item.transaction_id().map(debug);
1162
1163 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1164 Some(profile) => {
1165 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1166 {
1167 debug!(event_id, transaction_id, "Profile already up-to-date");
1168 } else {
1169 trace!(event_id, transaction_id, "Updating profile");
1170 let updated_item =
1171 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1172 let new_item = entry.with_kind(updated_item);
1173 ObservableItemsEntry::replace(&mut entry, new_item);
1174 }
1175 }
1176 None => {
1177 if !event_item.sender_profile().is_unavailable() {
1178 trace!(event_id, transaction_id, "Marking profile unavailable");
1179 let updated_item =
1180 event_item.with_sender_profile(TimelineDetails::Unavailable);
1181 let new_item = entry.with_kind(updated_item);
1182 ObservableItemsEntry::replace(&mut entry, new_item);
1183 } else {
1184 debug!(event_id, transaction_id, "Profile already marked unavailable");
1185 }
1186 }
1187 }
1188 }
1189
1190 trace!("Done forcing update of sender profiles");
1191 }
1192
1193 #[cfg(test)]
1194 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1195 let own_user_id = self.room_data_provider.own_user_id();
1196 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1197 }
1198
1199 pub(super) async fn latest_user_read_receipt(
1203 &self,
1204 user_id: &UserId,
1205 ) -> Option<(OwnedEventId, Receipt)> {
1206 self.state.read().await.latest_user_read_receipt(user_id, &self.room_data_provider).await
1207 }
1208
1209 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1212 &self,
1213 user_id: &UserId,
1214 ) -> Option<OwnedEventId> {
1215 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1216 }
1217
1218 pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
1220 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1221 }
1222
1223 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1225 match echo.content {
1226 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1227 let content = match serialized_event.deserialize() {
1228 Ok(d) => d,
1229 Err(err) => {
1230 warn!("error deserializing local echo: {err}");
1231 return;
1232 }
1233 };
1234
1235 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1236 .await;
1237
1238 if let Some(send_error) = send_error {
1239 self.update_event_send_state(
1240 &echo.transaction_id,
1241 EventSendState::SendingFailed {
1242 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1243 send_error,
1244 ))),
1245 is_recoverable: false,
1246 },
1247 )
1248 .await;
1249 }
1250 }
1251
1252 LocalEchoContent::React { key, send_handle, applies_to } => {
1253 self.handle_local_reaction(key, send_handle, applies_to).await;
1254 }
1255 }
1256 }
1257
1258 #[instrument(skip(self, send_handle))]
1260 async fn handle_local_reaction(
1261 &self,
1262 reaction_key: String,
1263 send_handle: SendReactionHandle,
1264 applies_to: OwnedTransactionId,
1265 ) {
1266 let mut state = self.state.write().await;
1267 let mut tr = state.transaction();
1268
1269 let target = TimelineEventItemId::TransactionId(applies_to);
1270
1271 let reaction_txn_id = send_handle.transaction_id().to_owned();
1272 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1273 let aggregation = Aggregation::new(
1274 TimelineEventItemId::TransactionId(reaction_txn_id),
1275 AggregationKind::Reaction {
1276 key: reaction_key.clone(),
1277 sender: self.room_data_provider.own_user_id().to_owned(),
1278 timestamp: MilliSecondsSinceUnixEpoch::now(),
1279 reaction_status,
1280 },
1281 );
1282
1283 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1284 find_item_and_apply_aggregation(
1285 &tr.meta.aggregations,
1286 &mut tr.items,
1287 &target,
1288 aggregation,
1289 &tr.meta.room_version,
1290 );
1291
1292 tr.commit();
1293 }
1294
1295 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1297 match update {
1298 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1299 self.handle_local_echo(echo).await;
1300 }
1301
1302 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1303 if !self.discard_local_echo(&transaction_id).await {
1304 warn!("couldn't find the local echo to discard");
1305 }
1306 }
1307
1308 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1309 let content = match new_content.deserialize() {
1310 Ok(d) => d,
1311 Err(err) => {
1312 warn!("error deserializing local echo (upon edit): {err}");
1313 return;
1314 }
1315 };
1316
1317 if !self.replace_local_echo(&transaction_id, content).await {
1318 warn!("couldn't find the local echo to replace");
1319 }
1320 }
1321
1322 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1323 self.update_event_send_state(
1324 &transaction_id,
1325 EventSendState::SendingFailed { error, is_recoverable },
1326 )
1327 .await;
1328 }
1329
1330 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1331 self.update_event_send_state(&transaction_id, EventSendState::NotSentYet).await;
1332 }
1333
1334 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1335 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1336 .await;
1337 }
1338
1339 RoomSendQueueUpdate::UploadedMedia { related_to, .. } => {
1340 info!(txn_id = %related_to, "some media for a media event has been uploaded");
1342 }
1343 }
1344 }
1345
1346 pub async fn insert_timeline_start_if_missing(&self) {
1349 let mut state = self.state.write().await;
1350 let mut txn = state.transaction();
1351 txn.items.push_timeline_start_if_missing(
1352 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1353 );
1354 txn.commit();
1355 }
1356
1357 pub(super) async fn make_replied_to(
1363 &self,
1364 event: TimelineEvent,
1365 ) -> Result<Option<EmbeddedEvent>, Error> {
1366 let state = self.state.read().await;
1367 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1368 }
1369}
1370
1371impl TimelineController {
1372 pub(super) fn room(&self) -> &Room {
1373 &self.room_data_provider
1374 }
1375
1376 #[instrument(skip(self))]
1379 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1380 let state_guard = self.state.write().await;
1381 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1382 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1383 let remote_item = item
1384 .as_remote()
1385 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1386 .clone();
1387
1388 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1389 debug!("Event is not a message");
1390 return Ok(());
1391 };
1392 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1393 debug!("Event is not a reply");
1394 return Ok(());
1395 };
1396 if let TimelineDetails::Pending = &in_reply_to.event {
1397 debug!("Replied-to event is already being fetched");
1398 return Ok(());
1399 }
1400 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1401 debug!("Replied-to event has already been fetched");
1402 return Ok(());
1403 }
1404
1405 let internal_id = item.internal_id.to_owned();
1406 let item = item.clone();
1407 let event = fetch_replied_to_event(
1408 state_guard,
1409 &self.state,
1410 index,
1411 &item,
1412 internal_id,
1413 &msglike,
1414 &in_reply_to.event_id,
1415 self.room(),
1416 )
1417 .await?;
1418
1419 let mut state = self.state.write().await;
1422 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1423 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1424
1425 let TimelineItemContent::MsgLike(MsgLikeContent {
1428 kind: MsgLikeKind::Message(message),
1429 reactions,
1430 thread_root,
1431 in_reply_to,
1432 thread_summary,
1433 }) = item.content().clone()
1434 else {
1435 info!("Event is no longer a message (redacted?)");
1436 return Ok(());
1437 };
1438 let Some(in_reply_to) = in_reply_to else {
1439 warn!("Event no longer has a reply (bug?)");
1440 return Ok(());
1441 };
1442
1443 trace!("Updating in-reply-to details");
1446 let internal_id = item.internal_id.to_owned();
1447 let mut item = item.clone();
1448 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1449 kind: MsgLikeKind::Message(message),
1450 reactions,
1451 thread_root,
1452 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1453 thread_summary,
1454 }));
1455 state.items.replace(index, TimelineItem::new(item, internal_id));
1456
1457 Ok(())
1458 }
1459
1460 pub(super) async fn should_send_receipt(
1464 &self,
1465 receipt_type: &SendReceiptType,
1466 thread: &ReceiptThread,
1467 event_id: &EventId,
1468 ) -> bool {
1469 if *thread != ReceiptThread::Unthreaded {
1471 return true;
1472 }
1473
1474 let own_user_id = self.room().own_user_id();
1475 let state = self.state.read().await;
1476 let room = self.room();
1477
1478 match receipt_type {
1479 SendReceiptType::Read => {
1480 if let Some((old_pub_read, _)) = state
1481 .meta
1482 .user_receipt(
1483 own_user_id,
1484 ReceiptType::Read,
1485 room,
1486 state.items.all_remote_events(),
1487 )
1488 .await
1489 {
1490 trace!(%old_pub_read, "found a previous public receipt");
1491 if let Some(relative_pos) = state.meta.compare_events_positions(
1492 &old_pub_read,
1493 event_id,
1494 state.items.all_remote_events(),
1495 ) {
1496 trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
1497 return relative_pos == RelativePosition::After;
1498 }
1499 }
1500 }
1501 SendReceiptType::ReadPrivate => {
1504 if let Some((old_priv_read, _)) =
1505 state.latest_user_read_receipt(own_user_id, room).await
1506 {
1507 trace!(%old_priv_read, "found a previous private receipt");
1508 if let Some(relative_pos) = state.meta.compare_events_positions(
1509 &old_priv_read,
1510 event_id,
1511 state.items.all_remote_events(),
1512 ) {
1513 trace!("event referred to new receipt is {relative_pos:?} the previous receipt");
1514 return relative_pos == RelativePosition::After;
1515 }
1516 }
1517 }
1518 SendReceiptType::FullyRead => {
1519 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1520 {
1521 if let Some(relative_pos) = state.meta.compare_events_positions(
1522 &prev_event_id,
1523 event_id,
1524 state.items.all_remote_events(),
1525 ) {
1526 return relative_pos == RelativePosition::After;
1527 }
1528 }
1529 }
1530 _ => {}
1531 }
1532
1533 true
1535 }
1536
1537 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1540 let state = self.state.read().await;
1541 state.items.all_remote_events().last().map(|event_meta| &event_meta.event_id).cloned()
1542 }
1543
1544 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1545 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1546 self.retry_event_decryption_inner(self.room().clone(), session_ids).await
1547 }
1548}
1549
1550#[cfg(test)]
1551impl<P: RoomDataProvider> TimelineController<P, (OlmMachine, OwnedRoomId)> {
1552 pub(super) async fn retry_event_decryption_test(
1553 &self,
1554 room_id: &RoomId,
1555 olm_machine: OlmMachine,
1556 session_ids: Option<BTreeSet<String>>,
1557 ) {
1558 self.retry_event_decryption_inner((olm_machine, room_id.to_owned()), session_ids).await
1559 }
1560}
1561
1562#[allow(clippy::too_many_arguments)]
1563async fn fetch_replied_to_event(
1564 mut state_guard: RwLockWriteGuard<'_, TimelineState>,
1565 state_lock: &RwLock<TimelineState>,
1566 index: usize,
1567 item: &EventTimelineItem,
1568 internal_id: TimelineUniqueId,
1569 msglike: &MsgLikeContent,
1570 in_reply_to: &EventId,
1571 room: &Room,
1572) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1573 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1574 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1575 trace!("Found replied-to event locally");
1576 return Ok(details);
1577 };
1578
1579 trace!("Setting in-reply-to details to pending");
1582 let in_reply_to_details =
1583 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1584
1585 let event_item = item
1586 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1587
1588 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1589 state_guard.items.replace(index, new_timeline_item);
1590
1591 drop(state_guard);
1593
1594 trace!("Fetching replied-to event");
1595 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1596 Ok(timeline_event) => {
1597 let state = state_lock.read().await;
1598
1599 let replied_to_item =
1600 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1601
1602 if let Some(item) = replied_to_item {
1603 TimelineDetails::Ready(Box::new(item))
1604 } else {
1605 return Err(Error::UnsupportedEvent);
1607 }
1608 }
1609
1610 Err(e) => TimelineDetails::Error(Arc::new(e)),
1611 };
1612
1613 Ok(res)
1614}