1use std::{collections::BTreeSet, fmt, sync::Arc};
16
17use as_variant::as_variant;
18use eyeball_im::{VectorDiff, VectorSubscriberStream};
19use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
20use futures_core::Stream;
21use imbl::Vector;
22#[cfg(test)]
23use matrix_sdk::Result;
24use matrix_sdk::{
25 deserialized_responses::TimelineEvent,
26 event_cache::{DecryptionRetryRequest, RoomEventCache, RoomPaginationStatus},
27 paginators::{PaginationResult, PaginationToken, Paginator},
28 send_queue::{
29 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
30 },
31};
32#[cfg(test)]
33use ruma::events::receipt::ReceiptEventContent;
34use ruma::{
35 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
36 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
37 events::{
38 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
39 AnySyncTimelineEvent, MessageLikeEventType,
40 poll::unstable_start::UnstablePollStartEventContent,
41 reaction::ReactionEventContent,
42 receipt::{Receipt, ReceiptThread, ReceiptType},
43 relation::Annotation,
44 room::message::{MessageType, Relation},
45 },
46 room_version_rules::RoomVersionRules,
47 serde::Raw,
48};
49use tokio::sync::{OnceCell, RwLock, RwLockWriteGuard};
50use tracing::{debug, error, field::debug, info, instrument, trace, warn};
51
52pub(super) use self::{
53 metadata::{RelativePosition, TimelineMetadata},
54 observable_items::{
55 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
56 ObservableItemsTransactionEntry,
57 },
58 state::TimelineState,
59 state_transaction::TimelineStateTransaction,
60};
61use super::{
62 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
63 MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
64 TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
65 TimelineReadReceiptTracking, VirtualTimelineItem,
66 algorithms::{rfind_event_by_id, rfind_event_item},
67 event_item::{ReactionStatus, RemoteEventOrigin},
68 item::TimelineUniqueId,
69 subscriber::TimelineSubscriber,
70 traits::RoomDataProvider,
71};
72use crate::{
73 timeline::{
74 MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn,
75 algorithms::rfind_event_by_item_id,
76 controller::decryption_retry_task::compute_redecryption_candidates,
77 date_dividers::DateDividerAdjuster,
78 event_item::TimelineItemHandle,
79 pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError},
80 },
81 unable_to_decrypt_hook::UtdHookManager,
82};
83
84pub(in crate::timeline) mod aggregations;
85mod decryption_retry_task;
86mod metadata;
87mod observable_items;
88mod read_receipts;
89mod state;
90mod state_transaction;
91
92pub(super) use aggregations::*;
93pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
94use matrix_sdk::paginators::{PaginatorError, thread::ThreadedEventsLoader};
95use matrix_sdk_common::serde_helpers::extract_thread_root;
96
97#[derive(Debug)]
103pub(in crate::timeline) enum TimelineFocusKind<P: RoomDataProvider> {
104 Live {
106 hide_threaded_events: bool,
108 },
109
110 Event {
113 paginator: OnceCell<AnyPaginator<P>>,
115 },
116
117 Thread {
119 root_event_id: OwnedEventId,
121 },
122
123 PinnedEvents {
124 loader: PinnedEventsLoader,
125 },
126}
127
128#[derive(Debug)]
129pub(in crate::timeline) enum AnyPaginator<P: RoomDataProvider> {
130 Unthreaded {
131 paginator: Paginator<P>,
133 hide_threaded_events: bool,
135 },
136 Threaded(ThreadedEventsLoader<P>),
137}
138
139impl<P: RoomDataProvider> AnyPaginator<P> {
140 pub async fn paginate_backwards(
149 &self,
150 num_events: u16,
151 ) -> Result<PaginationResult, PaginatorError> {
152 match self {
153 Self::Unthreaded { paginator, .. } => {
154 paginator.paginate_backward(num_events.into()).await
155 }
156 Self::Threaded(threaded_paginator) => {
157 threaded_paginator.paginate_backwards(num_events.into()).await
158 }
159 }
160 }
161
162 pub async fn paginate_forwards(
170 &self,
171 num_events: u16,
172 ) -> Result<PaginationResult, PaginatorError> {
173 match self {
174 Self::Unthreaded { paginator, .. } => {
175 paginator.paginate_forward(num_events.into()).await
176 }
177 Self::Threaded(threaded_paginator) => {
178 threaded_paginator.paginate_forwards(num_events.into()).await
179 }
180 }
181 }
182
183 pub fn hide_threaded_events(&self) -> bool {
185 match self {
186 Self::Unthreaded { hide_threaded_events, .. } => *hide_threaded_events,
187 Self::Threaded(_) => false,
188 }
189 }
190
191 pub fn thread_root(&self) -> Option<&EventId> {
194 match self {
195 Self::Unthreaded { .. } => None,
196 Self::Threaded(thread_events_loader) => {
197 Some(thread_events_loader.thread_root_event_id())
198 }
199 }
200 }
201}
202
203impl<P: RoomDataProvider> TimelineFocusKind<P> {
204 pub(super) fn receipt_thread(&self) -> ReceiptThread {
211 if let Some(thread_root) = self.thread_root() {
212 ReceiptThread::Thread(thread_root.to_owned())
213 } else if self.hide_threaded_events() {
214 ReceiptThread::Main
215 } else {
216 ReceiptThread::Unthreaded
217 }
218 }
219
220 fn hide_threaded_events(&self) -> bool {
222 match self {
223 TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
224 TimelineFocusKind::Event { paginator } => {
225 paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
226 }
227 TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents { .. } => false,
228 }
229 }
230
231 fn is_thread(&self) -> bool {
234 self.thread_root().is_some()
235 }
236
237 fn thread_root(&self) -> Option<&EventId> {
239 match self {
240 TimelineFocusKind::Event { paginator, .. } => {
241 paginator.get().and_then(|paginator| paginator.thread_root())
242 }
243 TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents { .. } => None,
244 TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
245 }
246 }
247}
248
249#[derive(Clone, Debug)]
250pub(super) struct TimelineController<P: RoomDataProvider = Room> {
251 state: Arc<RwLock<TimelineState<P>>>,
253
254 focus: Arc<TimelineFocusKind<P>>,
256
257 pub(crate) room_data_provider: P,
262
263 pub(super) settings: TimelineSettings,
265}
266
267#[derive(Clone)]
268pub(super) struct TimelineSettings {
269 pub(super) track_read_receipts: TimelineReadReceiptTracking,
272
273 pub(super) event_filter: Arc<TimelineEventFilterFn>,
276
277 pub(super) add_failed_to_parse: bool,
279
280 pub(super) date_divider_mode: DateDividerMode,
282}
283
284#[cfg(not(tarpaulin_include))]
285impl fmt::Debug for TimelineSettings {
286 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287 f.debug_struct("TimelineSettings")
288 .field("track_read_receipts", &self.track_read_receipts)
289 .field("add_failed_to_parse", &self.add_failed_to_parse)
290 .finish_non_exhaustive()
291 }
292}
293
294impl Default for TimelineSettings {
295 fn default() -> Self {
296 Self {
297 track_read_receipts: TimelineReadReceiptTracking::Disabled,
298 event_filter: Arc::new(default_event_filter),
299 add_failed_to_parse: true,
300 date_divider_mode: DateDividerMode::Daily,
301 }
302 }
303}
304
305pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
315 match event {
316 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
317 if ev.redacts(&rules.redaction).is_some() {
318 false
321 } else {
322 ev.event_type() != MessageLikeEventType::Reaction
325 }
326 }
327
328 AnySyncTimelineEvent::MessageLike(msg) => {
329 match msg.original_content() {
330 None => {
331 msg.event_type() != MessageLikeEventType::Reaction
334 }
335
336 Some(original_content) => {
337 match original_content {
338 AnyMessageLikeEventContent::RoomMessage(content) => {
339 if content
340 .relates_to
341 .as_ref()
342 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
343 {
344 return false;
346 }
347
348 match content.msgtype {
349 MessageType::Audio(_)
350 | MessageType::Emote(_)
351 | MessageType::File(_)
352 | MessageType::Image(_)
353 | MessageType::Location(_)
354 | MessageType::Notice(_)
355 | MessageType::ServerNotice(_)
356 | MessageType::Text(_)
357 | MessageType::Video(_)
358 | MessageType::VerificationRequest(_) => true,
359 #[cfg(feature = "unstable-msc4274")]
360 MessageType::Gallery(_) => true,
361 _ => false,
362 }
363 }
364
365 AnyMessageLikeEventContent::Sticker(_)
366 | AnyMessageLikeEventContent::UnstablePollStart(
367 UnstablePollStartEventContent::New(_),
368 )
369 | AnyMessageLikeEventContent::CallInvite(_)
370 | AnyMessageLikeEventContent::RtcNotification(_)
371 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
372
373 _ => false,
374 }
375 }
376 }
377 }
378
379 AnySyncTimelineEvent::State(_) => {
380 true
382 }
383 }
384}
385
386impl<P: RoomDataProvider> TimelineController<P> {
387 pub(super) fn new(
388 room_data_provider: P,
389 focus: TimelineFocus,
390 internal_id_prefix: Option<String>,
391 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
392 is_room_encrypted: bool,
393 settings: TimelineSettings,
394 ) -> Self {
395 let focus = match focus {
396 TimelineFocus::Live { hide_threaded_events } => {
397 TimelineFocusKind::Live { hide_threaded_events }
398 }
399
400 TimelineFocus::Event { .. } => TimelineFocusKind::Event { paginator: OnceCell::new() },
401
402 TimelineFocus::Thread { root_event_id, .. } => {
403 TimelineFocusKind::Thread { root_event_id }
404 }
405
406 TimelineFocus::PinnedEvents { max_events_to_load, max_concurrent_requests } => {
407 TimelineFocusKind::PinnedEvents {
408 loader: PinnedEventsLoader::new(
409 Arc::new(room_data_provider.clone()),
410 max_events_to_load as usize,
411 max_concurrent_requests as usize,
412 ),
413 }
414 }
415 };
416
417 let focus = Arc::new(focus);
418 let state = Arc::new(RwLock::new(TimelineState::new(
419 focus.clone(),
420 room_data_provider.own_user_id().to_owned(),
421 room_data_provider.room_version_rules(),
422 internal_id_prefix,
423 unable_to_decrypt_hook,
424 is_room_encrypted,
425 )));
426
427 Self { state, focus, room_data_provider, settings }
428 }
429
430 pub(super) async fn init_focus(
437 &self,
438 focus: &TimelineFocus,
439 room_event_cache: &RoomEventCache,
440 ) -> Result<bool, Error> {
441 match focus {
442 TimelineFocus::Live { .. } => {
443 let events = room_event_cache.events().await?;
445
446 let has_events = !events.is_empty();
447
448 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
449
450 match room_event_cache.pagination().status().get() {
451 RoomPaginationStatus::Idle { hit_timeline_start } => {
452 if hit_timeline_start {
453 self.insert_timeline_start_if_missing().await;
456 }
457 }
458 RoomPaginationStatus::Paginating => {}
459 }
460
461 Ok(has_events)
462 }
463
464 TimelineFocus::Event { target: event_id, num_context_events, hide_threaded_events } => {
465 let TimelineFocusKind::Event { paginator, .. } = &*self.focus else {
466 unreachable!();
468 };
469
470 let event_paginator = Paginator::new(self.room_data_provider.clone());
471
472 let start_from_result = event_paginator
475 .start_from(event_id, (*num_context_events).into())
476 .await
477 .map_err(PaginationError::Paginator)?;
478
479 let thread_root_event_id = start_from_result
481 .events
482 .iter()
483 .find(
484 |event| {
485 if let Some(id) = event.event_id() { id == *event_id } else { false }
486 },
487 )
488 .and_then(|event| extract_thread_root(event.raw()));
489
490 let _ = paginator.set(match thread_root_event_id {
491 Some(root_id) => {
492 let mut tokens = event_paginator.tokens();
493
494 let includes_root_event = start_from_result.events.iter().any(|event| {
498 if let Some(id) = event.event_id() { id == root_id } else { false }
499 });
500
501 if includes_root_event {
502 tokens.previous = PaginationToken::HitEnd;
505 }
506
507 AnyPaginator::Threaded(ThreadedEventsLoader::new(
508 self.room_data_provider.clone(),
509 root_id,
510 tokens,
511 ))
512 }
513
514 None => AnyPaginator::Unthreaded {
515 paginator: event_paginator,
516 hide_threaded_events: *hide_threaded_events,
517 },
518 });
519
520 let has_events = !start_from_result.events.is_empty();
521 let events = start_from_result.events;
522
523 match paginator.get().expect("Paginator was not instantiated") {
524 AnyPaginator::Unthreaded { .. } => {
525 self.replace_with_initial_remote_events(
526 events,
527 RemoteEventOrigin::Pagination,
528 )
529 .await;
530 }
531
532 AnyPaginator::Threaded(threaded_events_loader) => {
533 let thread_root = threaded_events_loader.thread_root_event_id();
536 let events_in_thread = events.into_iter().filter(|event| {
537 extract_thread_root(event.raw())
538 .is_some_and(|event_thread_root| event_thread_root == thread_root)
539 || event.event_id().as_deref() == Some(thread_root)
540 });
541
542 self.replace_with_initial_remote_events(
543 events_in_thread,
544 RemoteEventOrigin::Pagination,
545 )
546 .await;
547 }
548 }
549
550 Ok(has_events)
551 }
552
553 TimelineFocus::Thread { root_event_id, .. } => {
554 let (events, _) =
555 room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
556 let has_events = !events.is_empty();
557
558 let mut related_events = Vector::new();
562 for event_id in events.iter().filter_map(|event| event.event_id()) {
563 if let Some((_original, related)) =
564 room_event_cache.find_event_with_relations(&event_id, None).await?
565 {
566 related_events.extend(related);
567 }
568 }
569
570 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
571
572 if !related_events.is_empty() {
574 self.handle_remote_aggregations(
575 vec![VectorDiff::Append { values: related_events }],
576 RemoteEventOrigin::Cache,
577 )
578 .await;
579 }
580
581 Ok(has_events)
582 }
583
584 TimelineFocus::PinnedEvents { .. } => {
585 let TimelineFocusKind::PinnedEvents { loader } = &*self.focus else {
586 unreachable!();
588 };
589
590 let Some(loaded_events) =
591 loader.load_events().await.map_err(Error::PinnedEventsError)?
592 else {
593 return Ok(false);
595 };
596
597 let has_events = !loaded_events.is_empty();
598
599 self.replace_with_initial_remote_events(
600 loaded_events,
601 RemoteEventOrigin::Pagination,
602 )
603 .await;
604
605 Ok(has_events)
606 }
607 }
608 }
609
610 pub async fn handle_encryption_state_changes(&self) {
615 let mut room_info = self.room_data_provider.room_info();
616
617 let mark_encrypted = || async {
619 let mut state = self.state.write().await;
620 state.meta.is_room_encrypted = true;
621 state.mark_all_events_as_encrypted();
622 };
623
624 if room_info.get().encryption_state().is_encrypted() {
625 mark_encrypted().await;
628 return;
629 }
630
631 while let Some(info) = room_info.next().await {
632 if info.encryption_state().is_encrypted() {
633 mark_encrypted().await;
634 break;
637 }
638 }
639 }
640
641 pub(crate) async fn reload_pinned_events(
642 &self,
643 ) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
644 if let TimelineFocusKind::PinnedEvents { loader } = &*self.focus {
645 loader.load_events().await
646 } else {
647 Err(PinnedEventsLoaderError::TimelineFocusNotPinnedEvents)
648 }
649 }
650
651 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
660 let state = self.state.read().await;
661
662 let (count, needs) = state
663 .meta
664 .subscriber_skip_count
665 .compute_next_when_paginating_backwards(num_events.into());
666
667 let is_live_timeline = true;
669 state.meta.subscriber_skip_count.update(count, is_live_timeline);
670
671 needs
672 }
673
674 pub(super) async fn focused_paginate_backwards(
679 &self,
680 num_events: u16,
681 ) -> Result<bool, PaginationError> {
682 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
683 TimelineFocusKind::Live { .. }
684 | TimelineFocusKind::PinnedEvents { .. }
685 | TimelineFocusKind::Thread { .. } => {
686 return Err(PaginationError::NotSupported);
687 }
688 TimelineFocusKind::Event { paginator, .. } => paginator
689 .get()
690 .expect("Paginator was not instantiated")
691 .paginate_backwards(num_events)
692 .await
693 .map_err(PaginationError::Paginator)?,
694 };
695
696 self.handle_remote_events_with_diffs(
699 events.into_iter().map(|event| VectorDiff::PushFront { value: event }).collect(),
700 RemoteEventOrigin::Pagination,
701 )
702 .await;
703
704 Ok(hit_end_of_timeline)
705 }
706
707 pub(super) async fn focused_paginate_forwards(
712 &self,
713 num_events: u16,
714 ) -> Result<bool, PaginationError> {
715 let PaginationResult { events, hit_end_of_timeline } = match &*self.focus {
716 TimelineFocusKind::Live { .. }
717 | TimelineFocusKind::PinnedEvents { .. }
718 | TimelineFocusKind::Thread { .. } => return Err(PaginationError::NotSupported),
719
720 TimelineFocusKind::Event { paginator, .. } => paginator
721 .get()
722 .expect("Paginator was not instantiated")
723 .paginate_forwards(num_events)
724 .await
725 .map_err(PaginationError::Paginator)?,
726 };
727
728 self.handle_remote_events_with_diffs(
731 vec![VectorDiff::Append { values: events.into() }],
732 RemoteEventOrigin::Pagination,
733 )
734 .await;
735
736 Ok(hit_end_of_timeline)
737 }
738
739 pub(super) fn is_live(&self) -> bool {
741 matches!(&*self.focus, TimelineFocusKind::Live { .. })
742 }
743
744 pub(super) fn is_threaded(&self) -> bool {
746 self.focus.is_thread()
747 }
748
749 pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
752 self.focus.thread_root().map(ToOwned::to_owned)
753 }
754
755 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
759 self.state.read().await.items.clone_items()
760 }
761
762 #[cfg(test)]
763 pub(super) async fn subscribe_raw(
764 &self,
765 ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
766 self.state.read().await.items.subscribe().into_values_and_stream()
767 }
768
769 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
770 let state = self.state.read().await;
771
772 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
773 }
774
775 pub(super) async fn subscribe_filter_map<U, F>(
776 &self,
777 f: F,
778 ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
779 where
780 U: Clone,
781 F: Fn(Arc<TimelineItem>) -> Option<U>,
782 {
783 self.state.read().await.items.subscribe().filter_map(f)
784 }
785
786 #[instrument(skip_all)]
790 pub(super) async fn toggle_reaction_local(
791 &self,
792 item_id: &TimelineEventItemId,
793 key: &str,
794 ) -> Result<bool, Error> {
795 let mut state = self.state.write().await;
796
797 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
798 warn!("Timeline item not found, can't add reaction");
799 return Err(Error::FailedToToggleReaction);
800 };
801
802 let user_id = self.room_data_provider.own_user_id();
803 let prev_status = item
804 .content()
805 .reactions()
806 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
807
808 let Some(prev_status) = prev_status else {
809 match item.handle() {
811 TimelineItemHandle::Local(send_handle) => {
812 if send_handle
813 .react(key.to_owned())
814 .await
815 .map_err(|err| Error::SendQueueError(err.into()))?
816 .is_some()
817 {
818 trace!("adding a reaction to a local echo");
819 return Ok(true);
820 }
821
822 warn!("couldn't toggle reaction for local echo");
823 return Ok(false);
824 }
825
826 TimelineItemHandle::Remote(event_id) => {
827 trace!("adding a reaction to a remote echo");
831 let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
832 self.room_data_provider
833 .send(ReactionEventContent::from(annotation).into())
834 .await?;
835 return Ok(true);
836 }
837 }
838 };
839
840 trace!("removing a previous reaction");
841 match prev_status {
842 ReactionStatus::LocalToLocal(send_reaction_handle) => {
843 if let Some(handle) = send_reaction_handle {
844 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
845 warn!("unexpectedly unable to abort sending of local reaction");
848 }
849 } else {
850 warn!("no send reaction handle (this should only happen in testing contexts)");
851 }
852 }
853
854 ReactionStatus::LocalToRemote(send_handle) => {
855 trace!("aborting send of the previous reaction that was a local echo");
858 if let Some(handle) = send_handle {
859 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
860 warn!("unexpectedly unable to abort sending of local reaction");
863 }
864 } else {
865 warn!("no send handle (this should only happen in testing contexts)");
866 }
867 }
868
869 ReactionStatus::RemoteToRemote(event_id) => {
870 let Some(annotated_event_id) =
872 item.as_remote().map(|event_item| event_item.event_id.clone())
873 else {
874 warn!("remote reaction to remote event, but the associated item isn't remote");
875 return Ok(false);
876 };
877
878 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
879 let reaction_info = reactions.remove_reaction(user_id, key);
880
881 if reaction_info.is_some() {
882 let new_item = item.with_reactions(reactions);
883 state.items.replace(item_pos, new_item);
884 } else {
885 warn!(
886 "reaction is missing on the item, not removing it locally, \
887 but sending redaction."
888 );
889 }
890
891 drop(state);
893
894 trace!("sending redact for a previous reaction");
895 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
896 if let Some(reaction_info) = reaction_info {
897 debug!("sending redact failed, adding the reaction back to the list");
898
899 let mut state = self.state.write().await;
900 if let Some((item_pos, item)) =
901 rfind_event_by_id(&state.items, &annotated_event_id)
902 {
903 let mut reactions =
905 item.content().reactions().cloned().unwrap_or_default();
906 reactions
907 .entry(key.to_owned())
908 .or_default()
909 .insert(user_id.to_owned(), reaction_info);
910 let new_item = item.with_reactions(reactions);
911 state.items.replace(item_pos, new_item);
912 } else {
913 warn!(
914 "couldn't find item to re-add reaction anymore; \
915 maybe it's been redacted?"
916 );
917 }
918 }
919
920 return Err(err);
921 }
922 }
923 }
924
925 Ok(false)
926 }
927
928 pub(super) async fn handle_remote_events_with_diffs(
930 &self,
931 diffs: Vec<VectorDiff<TimelineEvent>>,
932 origin: RemoteEventOrigin,
933 ) {
934 if diffs.is_empty() {
935 return;
936 }
937
938 let mut state = self.state.write().await;
939 state
940 .handle_remote_events_with_diffs(
941 diffs,
942 origin,
943 &self.room_data_provider,
944 &self.settings,
945 )
946 .await
947 }
948
949 pub(super) async fn handle_remote_aggregations(
951 &self,
952 diffs: Vec<VectorDiff<TimelineEvent>>,
953 origin: RemoteEventOrigin,
954 ) {
955 if diffs.is_empty() {
956 return;
957 }
958
959 let mut state = self.state.write().await;
960 state
961 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
962 .await
963 }
964
965 pub(super) async fn clear(&self) {
966 self.state.write().await.clear();
967 }
968
969 pub(super) async fn replace_with_initial_remote_events<Events>(
977 &self,
978 events: Events,
979 origin: RemoteEventOrigin,
980 ) where
981 Events: IntoIterator,
982 <Events as IntoIterator>::Item: Into<TimelineEvent>,
983 {
984 let mut state = self.state.write().await;
985
986 let track_read_markers = &self.settings.track_read_receipts;
987 if track_read_markers.is_enabled() {
988 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
989 state
990 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
991 .await;
992 }
993
994 let mut events = events.into_iter().peekable();
1000 if !state.items.is_empty() || events.peek().is_some() {
1001 state
1002 .replace_with_remote_events(
1003 events,
1004 origin,
1005 &self.room_data_provider,
1006 &self.settings,
1007 )
1008 .await;
1009 }
1010
1011 if track_read_markers.is_enabled() {
1012 if let Some(fully_read_event_id) =
1013 self.room_data_provider.load_fully_read_marker().await
1014 {
1015 state.handle_fully_read_marker(fully_read_event_id);
1016 } else if let Some(latest_receipt_event_id) = state
1017 .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
1018 {
1019 debug!("no `m.fully_read` marker found, falling back to read receipt");
1021 state.handle_fully_read_marker(latest_receipt_event_id);
1022 }
1023 }
1024 }
1025
1026 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
1027 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
1028 }
1029
1030 pub(super) async fn handle_ephemeral_events(
1031 &self,
1032 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
1033 ) {
1034 let mut state = self.state.write().await;
1035 state.handle_ephemeral_events(events, &self.room_data_provider).await;
1036 }
1037
1038 #[instrument(skip_all)]
1040 pub(super) async fn handle_local_event(
1041 &self,
1042 txn_id: OwnedTransactionId,
1043 content: AnyMessageLikeEventContent,
1044 send_handle: Option<SendHandle>,
1045 ) {
1046 let sender = self.room_data_provider.own_user_id().to_owned();
1047 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
1048
1049 let date_divider_mode = self.settings.date_divider_mode.clone();
1050
1051 let mut state = self.state.write().await;
1052 state
1053 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
1054 .await;
1055 }
1056
1057 #[instrument(skip(self))]
1062 pub(super) async fn update_event_send_state(
1063 &self,
1064 txn_id: &TransactionId,
1065 send_state: EventSendState,
1066 ) {
1067 let mut state = self.state.write().await;
1068 let mut txn = state.transaction();
1069
1070 let new_event_id: Option<&EventId> =
1071 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
1072
1073 if rfind_event_item(&txn.items, |it| {
1076 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
1077 })
1078 .is_some()
1079 {
1080 trace!("Remote echo received before send-event response");
1082
1083 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
1084
1085 if let Some((idx, _)) = local_echo {
1089 warn!("Message echo got duplicated, removing the local one");
1090 txn.items.remove(idx);
1091
1092 let mut adjuster =
1094 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1095 adjuster.run(&mut txn.items, &mut txn.meta);
1096 }
1097
1098 txn.commit();
1099 return;
1100 }
1101
1102 let result = rfind_event_item(&txn.items, |it| {
1104 it.transaction_id() == Some(txn_id)
1105 || new_event_id.is_some()
1106 && it.event_id() == new_event_id
1107 && it.as_local().is_some()
1108 });
1109
1110 let Some((idx, item)) = result else {
1111 if let Some(new_event_id) = new_event_id {
1116 if txn.meta.aggregations.mark_aggregation_as_sent(
1117 txn_id.to_owned(),
1118 new_event_id.to_owned(),
1119 &mut txn.items,
1120 &txn.meta.room_version_rules,
1121 ) {
1122 trace!("Aggregation marked as sent");
1123 txn.commit();
1124 return;
1125 }
1126
1127 trace!("Sent aggregation was not found");
1128 }
1129
1130 warn!("Timeline item not found, can't update send state");
1131 return;
1132 };
1133
1134 let Some(local_item) = item.as_local() else {
1135 warn!("We looked for a local item, but it transitioned to remote.");
1136 return;
1137 };
1138
1139 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
1142 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
1143 }
1144
1145 if let Some(new_event_id) = new_event_id {
1148 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
1149 }
1150
1151 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
1152 txn.items.replace(idx, new_item);
1153
1154 txn.commit();
1155 }
1156
1157 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
1158 let mut state = self.state.write().await;
1159
1160 if let Some((idx, _)) =
1161 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
1162 {
1163 let mut txn = state.transaction();
1164
1165 txn.items.remove(idx);
1166
1167 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
1170 adjuster.run(&mut txn.items, &mut txn.meta);
1171
1172 txn.meta.update_read_marker(&mut txn.items);
1173
1174 txn.commit();
1175
1176 debug!("discarded local echo");
1177 return true;
1178 }
1179
1180 let mut txn = state.transaction();
1183
1184 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
1186 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
1187 &mut txn.items,
1188 ) {
1189 Ok(val) => val,
1190 Err(err) => {
1191 warn!("error when discarding local echo for an aggregation: {err}");
1192 true
1194 }
1195 };
1196
1197 if found_aggregation {
1198 txn.commit();
1199 }
1200
1201 found_aggregation
1202 }
1203
1204 pub(super) async fn replace_local_echo(
1205 &self,
1206 txn_id: &TransactionId,
1207 content: AnyMessageLikeEventContent,
1208 ) -> bool {
1209 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
1210 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
1215 return false;
1216 };
1217
1218 let mut state = self.state.write().await;
1219 let mut txn = state.transaction();
1220
1221 let Some((idx, prev_item)) =
1222 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
1223 else {
1224 debug!("Can't find local echo to replace");
1225 return false;
1226 };
1227
1228 let ti_kind = {
1231 let Some(prev_local_item) = prev_item.as_local() else {
1232 warn!("We looked for a local item, but it transitioned as remote??");
1233 return false;
1234 };
1235 let progress = as_variant!(&prev_local_item.send_state,
1237 EventSendState::NotSentYet { progress } => progress.clone())
1238 .flatten();
1239 prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
1240 };
1241
1242 let new_item = TimelineItem::new(
1244 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
1245 content.msgtype,
1246 content.mentions,
1247 prev_item.content().reactions().cloned().unwrap_or_default(),
1248 prev_item.content().thread_root(),
1249 prev_item.content().in_reply_to(),
1250 prev_item.content().thread_summary(),
1251 )),
1252 prev_item.internal_id.to_owned(),
1253 );
1254
1255 txn.items.replace(idx, new_item);
1256
1257 txn.commit();
1261
1262 debug!("Replaced local echo");
1263 true
1264 }
1265
1266 pub(super) async fn compute_redecryption_candidates(
1267 &self,
1268 ) -> (BTreeSet<String>, BTreeSet<String>) {
1269 let state = self.state.read().await;
1270 compute_redecryption_candidates(&state.items)
1271 }
1272
1273 pub(super) async fn set_sender_profiles_pending(&self) {
1274 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
1275 }
1276
1277 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
1278 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
1279 }
1280
1281 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
1282 self.state.write().await.items.for_each(|mut entry| {
1283 let Some(event_item) = entry.as_event() else { return };
1284 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
1285 let new_item = entry.with_kind(TimelineItemKind::Event(
1286 event_item.with_sender_profile(profile_state.clone()),
1287 ));
1288 ObservableItemsEntry::replace(&mut entry, new_item);
1289 }
1290 });
1291 }
1292
1293 pub(super) async fn update_missing_sender_profiles(&self) {
1294 trace!("Updating missing sender profiles");
1295
1296 let mut state = self.state.write().await;
1297 let mut entries = state.items.entries();
1298 while let Some(mut entry) = entries.next() {
1299 let Some(event_item) = entry.as_event() else { continue };
1300 let event_id = event_item.event_id().map(debug);
1301 let transaction_id = event_item.transaction_id().map(debug);
1302
1303 if event_item.sender_profile().is_ready() {
1304 trace!(event_id, transaction_id, "Profile already set");
1305 continue;
1306 }
1307
1308 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1309 Some(profile) => {
1310 trace!(event_id, transaction_id, "Adding profile");
1311 let updated_item =
1312 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1313 let new_item = entry.with_kind(updated_item);
1314 ObservableItemsEntry::replace(&mut entry, new_item);
1315 }
1316 None => {
1317 if !event_item.sender_profile().is_unavailable() {
1318 trace!(event_id, transaction_id, "Marking profile unavailable");
1319 let updated_item =
1320 event_item.with_sender_profile(TimelineDetails::Unavailable);
1321 let new_item = entry.with_kind(updated_item);
1322 ObservableItemsEntry::replace(&mut entry, new_item);
1323 } else {
1324 debug!(event_id, transaction_id, "Profile already marked unavailable");
1325 }
1326 }
1327 }
1328 }
1329
1330 trace!("Done updating missing sender profiles");
1331 }
1332
1333 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1335 trace!("Forcing update of sender profiles: {sender_ids:?}");
1336
1337 let mut state = self.state.write().await;
1338 let mut entries = state.items.entries();
1339 while let Some(mut entry) = entries.next() {
1340 let Some(event_item) = entry.as_event() else { continue };
1341 if !sender_ids.contains(event_item.sender()) {
1342 continue;
1343 }
1344
1345 let event_id = event_item.event_id().map(debug);
1346 let transaction_id = event_item.transaction_id().map(debug);
1347
1348 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1349 Some(profile) => {
1350 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1351 {
1352 debug!(event_id, transaction_id, "Profile already up-to-date");
1353 } else {
1354 trace!(event_id, transaction_id, "Updating profile");
1355 let updated_item =
1356 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1357 let new_item = entry.with_kind(updated_item);
1358 ObservableItemsEntry::replace(&mut entry, new_item);
1359 }
1360 }
1361 None => {
1362 if !event_item.sender_profile().is_unavailable() {
1363 trace!(event_id, transaction_id, "Marking profile unavailable");
1364 let updated_item =
1365 event_item.with_sender_profile(TimelineDetails::Unavailable);
1366 let new_item = entry.with_kind(updated_item);
1367 ObservableItemsEntry::replace(&mut entry, new_item);
1368 } else {
1369 debug!(event_id, transaction_id, "Profile already marked unavailable");
1370 }
1371 }
1372 }
1373 }
1374
1375 trace!("Done forcing update of sender profiles");
1376 }
1377
1378 #[cfg(test)]
1379 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1380 let own_user_id = self.room_data_provider.own_user_id();
1381 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1382 }
1383
1384 pub(super) async fn latest_user_read_receipt(
1388 &self,
1389 user_id: &UserId,
1390 ) -> Option<(OwnedEventId, Receipt)> {
1391 let receipt_thread = self.focus.receipt_thread();
1392
1393 self.state
1394 .read()
1395 .await
1396 .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1397 .await
1398 }
1399
1400 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1403 &self,
1404 user_id: &UserId,
1405 ) -> Option<OwnedEventId> {
1406 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1407 }
1408
1409 pub async fn subscribe_own_user_read_receipts_changed(
1411 &self,
1412 ) -> impl Stream<Item = ()> + use<P> {
1413 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1414 }
1415
1416 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1418 match echo.content {
1419 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1420 let content = match serialized_event.deserialize() {
1421 Ok(d) => d,
1422 Err(err) => {
1423 warn!("error deserializing local echo: {err}");
1424 return;
1425 }
1426 };
1427
1428 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1429 .await;
1430
1431 if let Some(send_error) = send_error {
1432 self.update_event_send_state(
1433 &echo.transaction_id,
1434 EventSendState::SendingFailed {
1435 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1436 send_error,
1437 ))),
1438 is_recoverable: false,
1439 },
1440 )
1441 .await;
1442 }
1443 }
1444
1445 LocalEchoContent::React { key, send_handle, applies_to } => {
1446 self.handle_local_reaction(key, send_handle, applies_to).await;
1447 }
1448 }
1449 }
1450
1451 #[instrument(skip(self, send_handle))]
1453 async fn handle_local_reaction(
1454 &self,
1455 reaction_key: String,
1456 send_handle: SendReactionHandle,
1457 applies_to: OwnedTransactionId,
1458 ) {
1459 let mut state = self.state.write().await;
1460 let mut tr = state.transaction();
1461
1462 let target = TimelineEventItemId::TransactionId(applies_to);
1463
1464 let reaction_txn_id = send_handle.transaction_id().to_owned();
1465 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1466 let aggregation = Aggregation::new(
1467 TimelineEventItemId::TransactionId(reaction_txn_id),
1468 AggregationKind::Reaction {
1469 key: reaction_key.clone(),
1470 sender: self.room_data_provider.own_user_id().to_owned(),
1471 timestamp: MilliSecondsSinceUnixEpoch::now(),
1472 reaction_status,
1473 },
1474 );
1475
1476 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1477 find_item_and_apply_aggregation(
1478 &tr.meta.aggregations,
1479 &mut tr.items,
1480 &target,
1481 aggregation,
1482 &tr.meta.room_version_rules,
1483 );
1484
1485 tr.commit();
1486 }
1487
1488 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1490 match update {
1491 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1492 self.handle_local_echo(echo).await;
1493 }
1494
1495 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1496 if !self.discard_local_echo(&transaction_id).await {
1497 warn!("couldn't find the local echo to discard");
1498 }
1499 }
1500
1501 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1502 let content = match new_content.deserialize() {
1503 Ok(d) => d,
1504 Err(err) => {
1505 warn!("error deserializing local echo (upon edit): {err}");
1506 return;
1507 }
1508 };
1509
1510 if !self.replace_local_echo(&transaction_id, content).await {
1511 warn!("couldn't find the local echo to replace");
1512 }
1513 }
1514
1515 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1516 self.update_event_send_state(
1517 &transaction_id,
1518 EventSendState::SendingFailed { error, is_recoverable },
1519 )
1520 .await;
1521 }
1522
1523 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1524 self.update_event_send_state(
1525 &transaction_id,
1526 EventSendState::NotSentYet { progress: None },
1527 )
1528 .await;
1529 }
1530
1531 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1532 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1533 .await;
1534 }
1535
1536 RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1537 self.update_event_send_state(
1538 &related_to,
1539 EventSendState::NotSentYet {
1540 progress: Some(MediaUploadProgress { index, progress }),
1541 },
1542 )
1543 .await;
1544 }
1545 }
1546 }
1547
1548 pub async fn insert_timeline_start_if_missing(&self) {
1551 let mut state = self.state.write().await;
1552 let mut txn = state.transaction();
1553 txn.items.push_timeline_start_if_missing(
1554 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1555 );
1556 txn.commit();
1557 }
1558
1559 pub(super) async fn make_replied_to(
1565 &self,
1566 event: TimelineEvent,
1567 ) -> Result<Option<EmbeddedEvent>, Error> {
1568 let state = self.state.read().await;
1569 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1570 }
1571}
1572
1573impl TimelineController {
1574 pub(super) fn room(&self) -> &Room {
1575 &self.room_data_provider
1576 }
1577
1578 #[instrument(skip(self))]
1581 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1582 let state_guard = self.state.write().await;
1583 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1584 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1585 let remote_item = item
1586 .as_remote()
1587 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1588 .clone();
1589
1590 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1591 debug!("Event is not a message");
1592 return Ok(());
1593 };
1594 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1595 debug!("Event is not a reply");
1596 return Ok(());
1597 };
1598 if let TimelineDetails::Pending = &in_reply_to.event {
1599 debug!("Replied-to event is already being fetched");
1600 return Ok(());
1601 }
1602 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1603 debug!("Replied-to event has already been fetched");
1604 return Ok(());
1605 }
1606
1607 let internal_id = item.internal_id.to_owned();
1608 let item = item.clone();
1609 let event = fetch_replied_to_event(
1610 state_guard,
1611 &self.state,
1612 index,
1613 &item,
1614 internal_id,
1615 &msglike,
1616 &in_reply_to.event_id,
1617 self.room(),
1618 )
1619 .await?;
1620
1621 let mut state = self.state.write().await;
1624 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1625 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1626
1627 let TimelineItemContent::MsgLike(MsgLikeContent {
1630 kind: MsgLikeKind::Message(message),
1631 reactions,
1632 thread_root,
1633 in_reply_to,
1634 thread_summary,
1635 }) = item.content().clone()
1636 else {
1637 info!("Event is no longer a message (redacted?)");
1638 return Ok(());
1639 };
1640 let Some(in_reply_to) = in_reply_to else {
1641 warn!("Event no longer has a reply (bug?)");
1642 return Ok(());
1643 };
1644
1645 trace!("Updating in-reply-to details");
1648 let internal_id = item.internal_id.to_owned();
1649 let mut item = item.clone();
1650 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1651 kind: MsgLikeKind::Message(message),
1652 reactions,
1653 thread_root,
1654 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1655 thread_summary,
1656 }));
1657 state.items.replace(index, TimelineItem::new(item, internal_id));
1658
1659 Ok(())
1660 }
1661
1662 pub(super) fn infer_thread_for_read_receipt(
1668 &self,
1669 receipt_type: &SendReceiptType,
1670 ) -> ReceiptThread {
1671 if matches!(receipt_type, SendReceiptType::FullyRead) {
1672 ReceiptThread::Unthreaded
1673 } else {
1674 self.focus.receipt_thread()
1675 }
1676 }
1677
1678 pub(super) async fn should_send_receipt(
1682 &self,
1683 receipt_type: &SendReceiptType,
1684 receipt_thread: &ReceiptThread,
1685 event_id: &EventId,
1686 ) -> bool {
1687 let own_user_id = self.room().own_user_id();
1688 let state = self.state.read().await;
1689 let room = self.room();
1690
1691 match receipt_type {
1692 SendReceiptType::Read => {
1693 if let Some((old_pub_read, _)) = state
1694 .meta
1695 .user_receipt(
1696 own_user_id,
1697 ReceiptType::Read,
1698 receipt_thread.clone(),
1699 room,
1700 state.items.all_remote_events(),
1701 )
1702 .await
1703 {
1704 trace!(%old_pub_read, "found a previous public receipt");
1705 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1706 &old_pub_read,
1707 event_id,
1708 state.items.all_remote_events(),
1709 ) {
1710 trace!(
1711 "event referred to new receipt is {relative_pos:?} the previous receipt"
1712 );
1713 return relative_pos == RelativePosition::After;
1714 }
1715 }
1716 }
1717
1718 SendReceiptType::ReadPrivate => {
1721 if let Some((old_priv_read, _)) =
1722 state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1723 {
1724 trace!(%old_priv_read, "found a previous private receipt");
1725 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1726 &old_priv_read,
1727 event_id,
1728 state.items.all_remote_events(),
1729 ) {
1730 trace!(
1731 "event referred to new receipt is {relative_pos:?} the previous receipt"
1732 );
1733 return relative_pos == RelativePosition::After;
1734 }
1735 }
1736 }
1737
1738 SendReceiptType::FullyRead => {
1739 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1740 && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1741 &prev_event_id,
1742 event_id,
1743 state.items.all_remote_events(),
1744 )
1745 {
1746 return relative_pos == RelativePosition::After;
1747 }
1748 }
1749
1750 _ => {}
1751 }
1752
1753 true
1755 }
1756
1757 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1760 let state = self.state.read().await;
1761 let filter_out_thread_events = match self.focus() {
1762 TimelineFocusKind::Thread { .. } => false,
1763 TimelineFocusKind::Live { hide_threaded_events } => hide_threaded_events.to_owned(),
1764 TimelineFocusKind::Event { paginator } => {
1765 paginator.get().is_some_and(|paginator| paginator.hide_threaded_events())
1766 }
1767 _ => true,
1768 };
1769
1770 state
1775 .items
1776 .all_remote_events()
1777 .iter()
1778 .rev()
1779 .filter_map(|item| {
1780 if !filter_out_thread_events || item.thread_root_id.is_none() {
1781 Some(item.event_id.clone())
1782 } else {
1783 None
1784 }
1785 })
1786 .next()
1787 }
1788
1789 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1790 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1791 let (utds, decrypted) = self.compute_redecryption_candidates().await;
1792
1793 let request = DecryptionRetryRequest {
1794 room_id: self.room().room_id().to_owned(),
1795 utd_session_ids: utds,
1796 refresh_info_session_ids: decrypted,
1797 };
1798
1799 self.room().client().event_cache().request_decryption(request);
1800 }
1801
1802 pub(super) async fn map_pagination_status(
1810 &self,
1811 status: RoomPaginationStatus,
1812 ) -> RoomPaginationStatus {
1813 match status {
1814 RoomPaginationStatus::Idle { hit_timeline_start } => {
1815 if hit_timeline_start {
1816 let state = self.state.read().await;
1817 if state.meta.subscriber_skip_count.get() > 0 {
1821 return RoomPaginationStatus::Idle { hit_timeline_start: false };
1822 }
1823 }
1824 }
1825 RoomPaginationStatus::Paginating => {}
1826 }
1827
1828 status
1830 }
1831}
1832
1833impl<P: RoomDataProvider> TimelineController<P> {
1834 pub(super) fn focus(&self) -> &TimelineFocusKind<P> {
1836 &self.focus
1837 }
1838}
1839
1840#[allow(clippy::too_many_arguments)]
1841async fn fetch_replied_to_event<P: RoomDataProvider>(
1842 mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1843 state_lock: &RwLock<TimelineState<P>>,
1844 index: usize,
1845 item: &EventTimelineItem,
1846 internal_id: TimelineUniqueId,
1847 msglike: &MsgLikeContent,
1848 in_reply_to: &EventId,
1849 room: &Room,
1850) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1851 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1852 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1853 trace!("Found replied-to event locally");
1854 return Ok(details);
1855 }
1856
1857 trace!("Setting in-reply-to details to pending");
1860 let in_reply_to_details =
1861 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1862
1863 let event_item = item
1864 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1865
1866 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1867 state_guard.items.replace(index, new_timeline_item);
1868
1869 drop(state_guard);
1871
1872 trace!("Fetching replied-to event");
1873 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1874 Ok(timeline_event) => {
1875 let state = state_lock.read().await;
1876
1877 let replied_to_item =
1878 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1879
1880 if let Some(item) = replied_to_item {
1881 TimelineDetails::Ready(Box::new(item))
1882 } else {
1883 return Err(Error::UnsupportedEvent);
1885 }
1886 }
1887
1888 Err(e) => TimelineDetails::Error(Arc::new(e)),
1889 };
1890
1891 Ok(res)
1892}