1use std::{
16 collections::BTreeSet,
17 fmt,
18 sync::{Arc, OnceLock},
19};
20
21use as_variant::as_variant;
22use eyeball_im::{VectorDiff, VectorSubscriberStream};
23use eyeball_im_util::vector::{FilterMap, VectorObserverExt};
24use futures_core::Stream;
25use imbl::Vector;
26use matrix_sdk::{
27 deserialized_responses::TimelineEvent,
28 event_cache::{
29 DecryptionRetryRequest, EventFocusThreadMode, PaginationStatus, RoomEventCache,
30 TimelineVectorDiffs,
31 },
32 send_queue::{
33 LocalEcho, LocalEchoContent, RoomSendQueueUpdate, SendHandle, SendReactionHandle,
34 },
35 task_monitor::BackgroundTaskHandle,
36};
37#[cfg(test)]
38use ruma::events::receipt::ReceiptEventContent;
39use ruma::{
40 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
41 api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
42 events::{
43 AnyMessageLikeEventContent, AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent,
44 AnySyncTimelineEvent, MessageLikeEventType,
45 poll::unstable_start::UnstablePollStartEventContent,
46 reaction::ReactionEventContent,
47 receipt::{Receipt, ReceiptThread, ReceiptType},
48 relation::Annotation,
49 room::message::{MessageType, Relation},
50 },
51 room_version_rules::RoomVersionRules,
52 serde::Raw,
53};
54use tokio::sync::{RwLock, RwLockWriteGuard, broadcast};
55use tracing::{
56 Instrument as _, Span, debug, error, field::debug, info, info_span, instrument, trace, warn,
57};
58
59pub(super) use self::{
60 metadata::{RelativePosition, TimelineMetadata},
61 observable_items::{
62 AllRemoteEvents, ObservableItems, ObservableItemsEntry, ObservableItemsTransaction,
63 ObservableItemsTransactionEntry,
64 },
65 state::TimelineState,
66 state_transaction::TimelineStateTransaction,
67};
68use super::{
69 DateDividerMode, EmbeddedEvent, Error, EventSendState, EventTimelineItem, InReplyToDetails,
70 MediaUploadProgress, PaginationError, Profile, TimelineDetails, TimelineEventItemId,
71 TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind,
72 TimelineReadReceiptTracking, VirtualTimelineItem,
73 algorithms::{rfind_event_by_id, rfind_event_item},
74 event_item::{ReactionStatus, RemoteEventOrigin},
75 item::TimelineUniqueId,
76 subscriber::TimelineSubscriber,
77 traits::RoomDataProvider,
78};
79use crate::{
80 timeline::{
81 MsgLikeContent, MsgLikeKind, Room, TimelineEventFilterFn, TimelineEventFocusThreadMode,
82 algorithms::rfind_event_by_item_id,
83 controller::decryption_retry_task::compute_redecryption_candidates,
84 date_dividers::DateDividerAdjuster,
85 event_item::TimelineItemHandle,
86 tasks::{event_focused_task, pinned_events_task, thread_updates_task},
87 },
88 unable_to_decrypt_hook::UtdHookManager,
89};
90
91pub(in crate::timeline) mod aggregations;
92mod decryption_retry_task;
93mod metadata;
94mod observable_items;
95mod read_receipts;
96mod state;
97mod state_transaction;
98
99pub(super) use aggregations::*;
100pub(super) use decryption_retry_task::{CryptoDropHandles, spawn_crypto_tasks};
101
102#[derive(Debug)]
108pub(in crate::timeline) enum TimelineFocusKind {
109 Live {
111 hide_threaded_events: bool,
113 },
114
115 Event {
118 focused_event_id: OwnedEventId,
120
121 thread_root: OnceLock<OwnedEventId>,
127
128 thread_mode: TimelineEventFocusThreadMode,
131 },
132
133 Thread {
135 root_event_id: OwnedEventId,
137 },
138
139 PinnedEvents,
140}
141
142impl TimelineFocusKind {
143 pub(super) fn receipt_thread(&self) -> ReceiptThread {
150 if let Some(thread_root) = self.thread_root() {
151 ReceiptThread::Thread(thread_root.to_owned())
152 } else if self.hide_threaded_events() {
153 ReceiptThread::Main
154 } else {
155 ReceiptThread::Unthreaded
156 }
157 }
158
159 fn hide_threaded_events(&self) -> bool {
161 match self {
162 TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
163 TimelineFocusKind::Event { thread_mode, .. } => {
164 matches!(
165 thread_mode,
166 TimelineEventFocusThreadMode::Automatic { hide_threaded_events: true }
167 )
168 }
169 TimelineFocusKind::Thread { .. } | TimelineFocusKind::PinnedEvents => false,
170 }
171 }
172
173 fn is_thread(&self) -> bool {
176 self.thread_root().is_some()
177 }
178
179 fn thread_root(&self) -> Option<&EventId> {
181 match self {
182 TimelineFocusKind::Event { thread_root, .. } => thread_root.get().map(|v| &**v),
183 TimelineFocusKind::Live { .. } | TimelineFocusKind::PinnedEvents => None,
184 TimelineFocusKind::Thread { root_event_id } => Some(root_event_id),
185 }
186 }
187}
188
189#[derive(Clone, Debug)]
190pub(super) struct TimelineController<P: RoomDataProvider = Room> {
191 state: Arc<RwLock<TimelineState<P>>>,
193
194 focus: Arc<TimelineFocusKind>,
196
197 pub(crate) room_data_provider: P,
202
203 pub(super) settings: TimelineSettings,
205}
206
207#[derive(Clone)]
208pub(super) struct TimelineSettings {
209 pub(super) track_read_receipts: TimelineReadReceiptTracking,
212
213 pub(super) event_filter: Arc<TimelineEventFilterFn>,
216
217 pub(super) add_failed_to_parse: bool,
219
220 pub(super) date_divider_mode: DateDividerMode,
222}
223
224#[cfg(not(tarpaulin_include))]
225impl fmt::Debug for TimelineSettings {
226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227 f.debug_struct("TimelineSettings")
228 .field("track_read_receipts", &self.track_read_receipts)
229 .field("add_failed_to_parse", &self.add_failed_to_parse)
230 .finish_non_exhaustive()
231 }
232}
233
234impl Default for TimelineSettings {
235 fn default() -> Self {
236 Self {
237 track_read_receipts: TimelineReadReceiptTracking::Disabled,
238 event_filter: Arc::new(default_event_filter),
239 add_failed_to_parse: true,
240 date_divider_mode: DateDividerMode::Daily,
241 }
242 }
243}
244
245pub fn default_event_filter(event: &AnySyncTimelineEvent, rules: &RoomVersionRules) -> bool {
255 match event {
256 AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) => {
257 if ev.redacts(&rules.redaction).is_some() {
258 false
261 } else {
262 ev.event_type() != MessageLikeEventType::Reaction
265 }
266 }
267
268 AnySyncTimelineEvent::MessageLike(msg) => {
269 match msg.original_content() {
270 None => {
271 msg.event_type() != MessageLikeEventType::Reaction
274 }
275
276 Some(original_content) => {
277 match original_content {
278 AnyMessageLikeEventContent::RoomMessage(content) => {
279 if content
280 .relates_to
281 .as_ref()
282 .is_some_and(|rel| matches!(rel, Relation::Replacement(_)))
283 {
284 return false;
286 }
287
288 match content.msgtype {
289 MessageType::Audio(_)
290 | MessageType::Emote(_)
291 | MessageType::File(_)
292 | MessageType::Image(_)
293 | MessageType::Location(_)
294 | MessageType::Notice(_)
295 | MessageType::ServerNotice(_)
296 | MessageType::Text(_)
297 | MessageType::Video(_)
298 | MessageType::VerificationRequest(_) => true,
299 #[cfg(feature = "unstable-msc4274")]
300 MessageType::Gallery(_) => true,
301 _ => false,
302 }
303 }
304
305 AnyMessageLikeEventContent::Sticker(_)
306 | AnyMessageLikeEventContent::UnstablePollStart(
307 UnstablePollStartEventContent::New(_),
308 )
309 | AnyMessageLikeEventContent::CallInvite(_)
310 | AnyMessageLikeEventContent::RtcNotification(_)
311 | AnyMessageLikeEventContent::RoomEncrypted(_) => true,
312
313 AnyMessageLikeEventContent::Beacon(_) => false,
317 AnyMessageLikeEventContent::RtcDecline(_) => false,
320
321 _ => false,
322 }
323 }
324 }
325 }
326
327 AnySyncTimelineEvent::State(_) => {
328 true
330 }
331 }
332}
333
334pub(super) struct InitFocusResult {
336 pub has_events: bool,
338 pub focus_task: Option<BackgroundTaskHandle>,
341}
342
343impl<P: RoomDataProvider> TimelineController<P> {
344 pub(super) fn new(
345 room_data_provider: P,
346 focus: TimelineFocus,
347 internal_id_prefix: Option<String>,
348 unable_to_decrypt_hook: Option<Arc<UtdHookManager>>,
349 is_room_encrypted: bool,
350 settings: TimelineSettings,
351 ) -> Self {
352 let focus = match focus {
353 TimelineFocus::Live { hide_threaded_events } => {
354 TimelineFocusKind::Live { hide_threaded_events }
355 }
356
357 TimelineFocus::Event { target, thread_mode, .. } => {
358 TimelineFocusKind::Event {
359 focused_event_id: target,
360 thread_root: OnceLock::new(),
362 thread_mode,
363 }
364 }
365
366 TimelineFocus::Thread { root_event_id, .. } => {
367 TimelineFocusKind::Thread { root_event_id }
368 }
369
370 TimelineFocus::PinnedEvents => TimelineFocusKind::PinnedEvents,
371 };
372
373 let focus = Arc::new(focus);
374 let state = Arc::new(RwLock::new(TimelineState::new(
375 focus.clone(),
376 room_data_provider.own_user_id().to_owned(),
377 room_data_provider.room_version_rules(),
378 internal_id_prefix,
379 unable_to_decrypt_hook,
380 is_room_encrypted,
381 )));
382
383 Self { state, focus, room_data_provider, settings }
384 }
385
386 pub async fn handle_encryption_state_changes(&self) {
391 let mut room_info = self.room_data_provider.room_info();
392
393 let mark_encrypted = || async {
395 let mut state = self.state.write().await;
396 state.meta.is_room_encrypted = true;
397 state.mark_all_events_as_encrypted();
398 };
399
400 if room_info.get().encryption_state().is_encrypted() {
401 mark_encrypted().await;
404 return;
405 }
406
407 while let Some(info) = room_info.next().await {
408 if info.encryption_state().is_encrypted() {
409 mark_encrypted().await;
410 break;
413 }
414 }
415 }
416
417 pub(super) async fn live_lazy_paginate_backwards(&self, num_events: u16) -> Option<usize> {
426 let state = self.state.read().await;
427
428 let (count, needs) = state
429 .meta
430 .subscriber_skip_count
431 .compute_next_when_paginating_backwards(num_events.into());
432
433 let is_live_timeline = true;
435 state.meta.subscriber_skip_count.update(count, is_live_timeline);
436
437 needs
438 }
439
440 pub(super) fn is_live(&self) -> bool {
442 matches!(&*self.focus, TimelineFocusKind::Live { .. })
443 }
444
445 pub(super) fn is_threaded(&self) -> bool {
447 self.focus.is_thread()
448 }
449
450 pub(super) fn thread_root(&self) -> Option<OwnedEventId> {
453 self.focus.thread_root().map(ToOwned::to_owned)
454 }
455
456 pub(super) async fn items(&self) -> Vector<Arc<TimelineItem>> {
460 self.state.read().await.items.clone_items()
461 }
462
463 #[cfg(test)]
464 pub(super) async fn subscribe_raw(
465 &self,
466 ) -> (Vector<Arc<TimelineItem>>, VectorSubscriberStream<Arc<TimelineItem>>) {
467 self.state.read().await.items.subscribe().into_values_and_stream()
468 }
469
470 pub(super) async fn subscribe(&self) -> (Vector<Arc<TimelineItem>>, TimelineSubscriber) {
471 let state = self.state.read().await;
472
473 TimelineSubscriber::new(&state.items, &state.meta.subscriber_skip_count)
474 }
475
476 pub(super) async fn subscribe_filter_map<U, F>(
477 &self,
478 f: F,
479 ) -> (Vector<U>, FilterMap<VectorSubscriberStream<Arc<TimelineItem>>, F>)
480 where
481 U: Clone,
482 F: Fn(Arc<TimelineItem>) -> Option<U>,
483 {
484 self.state.read().await.items.subscribe().filter_map(f)
485 }
486
487 #[instrument(skip_all)]
491 pub(super) async fn toggle_reaction_local(
492 &self,
493 item_id: &TimelineEventItemId,
494 key: &str,
495 ) -> Result<bool, Error> {
496 let mut state = self.state.write().await;
497
498 let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, item_id) else {
499 warn!("Timeline item not found, can't add reaction");
500 return Err(Error::FailedToToggleReaction);
501 };
502
503 let user_id = self.room_data_provider.own_user_id();
504 let prev_status = item
505 .content()
506 .reactions()
507 .and_then(|map| Some(map.get(key)?.get(user_id)?.status.clone()));
508
509 let Some(prev_status) = prev_status else {
510 match item.handle() {
512 TimelineItemHandle::Local(send_handle) => {
513 if send_handle
514 .react(key.to_owned())
515 .await
516 .map_err(|err| Error::SendQueueError(err.into()))?
517 .is_some()
518 {
519 trace!("adding a reaction to a local echo");
520 return Ok(true);
521 }
522
523 warn!("couldn't toggle reaction for local echo");
524 return Ok(false);
525 }
526
527 TimelineItemHandle::Remote(event_id) => {
528 trace!("adding a reaction to a remote echo");
532 let annotation = Annotation::new(event_id.to_owned(), key.to_owned());
533 self.room_data_provider
534 .send(ReactionEventContent::from(annotation).into())
535 .await?;
536 return Ok(true);
537 }
538 }
539 };
540
541 trace!("removing a previous reaction");
542 match prev_status {
543 ReactionStatus::LocalToLocal(send_reaction_handle) => {
544 if let Some(handle) = send_reaction_handle {
545 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
546 warn!("unexpectedly unable to abort sending of local reaction");
549 }
550 } else {
551 warn!("no send reaction handle (this should only happen in testing contexts)");
552 }
553 }
554
555 ReactionStatus::LocalToRemote(send_handle) => {
556 trace!("aborting send of the previous reaction that was a local echo");
559 if let Some(handle) = send_handle {
560 if !handle.abort().await.map_err(|err| Error::SendQueueError(err.into()))? {
561 warn!("unexpectedly unable to abort sending of local reaction");
564 }
565 } else {
566 warn!("no send handle (this should only happen in testing contexts)");
567 }
568 }
569
570 ReactionStatus::RemoteToRemote(event_id) => {
571 let Some(annotated_event_id) =
573 item.as_remote().map(|event_item| event_item.event_id.clone())
574 else {
575 warn!("remote reaction to remote event, but the associated item isn't remote");
576 return Ok(false);
577 };
578
579 let mut reactions = item.content().reactions().cloned().unwrap_or_default();
580 let reaction_info = reactions.remove_reaction(user_id, key);
581
582 if reaction_info.is_some() {
583 let new_item = item.with_reactions(reactions);
584 state.items.replace(item_pos, new_item);
585 } else {
586 warn!(
587 "reaction is missing on the item, not removing it locally, \
588 but sending redaction."
589 );
590 }
591
592 drop(state);
594
595 trace!("sending redact for a previous reaction");
596 if let Err(err) = self.room_data_provider.redact(&event_id, None, None).await {
597 if let Some(reaction_info) = reaction_info {
598 debug!("sending redact failed, adding the reaction back to the list");
599
600 let mut state = self.state.write().await;
601 if let Some((item_pos, item)) =
602 rfind_event_by_id(&state.items, &annotated_event_id)
603 {
604 let mut reactions =
606 item.content().reactions().cloned().unwrap_or_default();
607 reactions
608 .entry(key.to_owned())
609 .or_default()
610 .insert(user_id.to_owned(), reaction_info);
611 let new_item = item.with_reactions(reactions);
612 state.items.replace(item_pos, new_item);
613 } else {
614 warn!(
615 "couldn't find item to re-add reaction anymore; \
616 maybe it's been redacted?"
617 );
618 }
619 }
620
621 return Err(err);
622 }
623 }
624 }
625
626 Ok(false)
627 }
628
629 pub(super) async fn handle_remote_events_with_diffs(
631 &self,
632 diffs: Vec<VectorDiff<TimelineEvent>>,
633 origin: RemoteEventOrigin,
634 ) {
635 if diffs.is_empty() {
636 return;
637 }
638
639 let mut state = self.state.write().await;
640 state
641 .handle_remote_events_with_diffs(
642 diffs,
643 origin,
644 &self.room_data_provider,
645 &self.settings,
646 )
647 .await
648 }
649
650 pub(super) async fn handle_remote_aggregations(
652 &self,
653 diffs: Vec<VectorDiff<TimelineEvent>>,
654 origin: RemoteEventOrigin,
655 ) {
656 if diffs.is_empty() {
657 return;
658 }
659
660 let mut state = self.state.write().await;
661 state
662 .handle_remote_aggregations(diffs, origin, &self.room_data_provider, &self.settings)
663 .await
664 }
665
666 pub(super) async fn clear(&self) {
667 self.state.write().await.clear();
668 }
669
670 pub(super) async fn replace_with_initial_remote_events<Events>(
678 &self,
679 events: Events,
680 origin: RemoteEventOrigin,
681 ) where
682 Events: IntoIterator,
683 <Events as IntoIterator>::Item: Into<TimelineEvent>,
684 {
685 let mut state = self.state.write().await;
686
687 let track_read_markers = &self.settings.track_read_receipts;
688 if track_read_markers.is_enabled() {
689 state.populate_initial_user_receipt(&self.room_data_provider, ReceiptType::Read).await;
690 state
691 .populate_initial_user_receipt(&self.room_data_provider, ReceiptType::ReadPrivate)
692 .await;
693 }
694
695 let mut events = events.into_iter().peekable();
701 if !state.items.is_empty() || events.peek().is_some() {
702 state
703 .replace_with_remote_events(
704 events,
705 origin,
706 &self.room_data_provider,
707 &self.settings,
708 )
709 .await;
710 }
711
712 if track_read_markers.is_enabled() {
713 if let Some(fully_read_event_id) =
714 self.room_data_provider.load_fully_read_marker().await
715 {
716 state.handle_fully_read_marker(fully_read_event_id);
717 } else if let Some(latest_receipt_event_id) = state
718 .latest_user_read_receipt_timeline_event_id(self.room_data_provider.own_user_id())
719 {
720 debug!("no `m.fully_read` marker found, falling back to read receipt");
722 state.handle_fully_read_marker(latest_receipt_event_id);
723 }
724 }
725 }
726
727 pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
728 self.state.write().await.handle_fully_read_marker(fully_read_event_id);
729 }
730
731 pub(super) async fn handle_ephemeral_events(
732 &self,
733 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
734 ) {
735 if events.is_empty() {
737 return;
738 }
739 let mut state = self.state.write().await;
740 state.handle_ephemeral_events(events, &self.room_data_provider).await;
741 }
742
743 #[instrument(skip_all)]
745 pub(super) async fn handle_local_event(
746 &self,
747 txn_id: OwnedTransactionId,
748 content: AnyMessageLikeEventContent,
749 send_handle: Option<SendHandle>,
750 ) {
751 let sender = self.room_data_provider.own_user_id().to_owned();
752 let profile = self.room_data_provider.profile_from_user_id(&sender).await;
753
754 let date_divider_mode = self.settings.date_divider_mode.clone();
755
756 let mut state = self.state.write().await;
757 state
758 .handle_local_event(sender, profile, date_divider_mode, txn_id, send_handle, content)
759 .await;
760 }
761
762 #[instrument(skip(self))]
767 pub(super) async fn update_event_send_state(
768 &self,
769 txn_id: &TransactionId,
770 send_state: EventSendState,
771 ) {
772 let mut state = self.state.write().await;
773 let mut txn = state.transaction();
774
775 let new_event_id: Option<&EventId> =
776 as_variant!(&send_state, EventSendState::Sent { event_id } => event_id);
777
778 if rfind_event_item(&txn.items, |it| {
781 new_event_id.is_some() && it.event_id() == new_event_id && it.as_remote().is_some()
782 })
783 .is_some()
784 {
785 trace!("Remote echo received before send-event response");
787
788 let local_echo = rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id));
789
790 if let Some((idx, _)) = local_echo {
794 warn!("Message echo got duplicated, removing the local one");
795 txn.items.remove(idx);
796
797 let mut adjuster =
799 DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
800 adjuster.run(&mut txn.items, &mut txn.meta);
801 }
802
803 txn.commit();
804 return;
805 }
806
807 let result = rfind_event_item(&txn.items, |it| {
809 it.transaction_id() == Some(txn_id)
810 || new_event_id.is_some()
811 && it.event_id() == new_event_id
812 && it.as_local().is_some()
813 });
814
815 let Some((idx, item)) = result else {
816 if let Some(new_event_id) = new_event_id {
821 if txn.meta.aggregations.mark_aggregation_as_sent(
822 txn_id.to_owned(),
823 new_event_id.to_owned(),
824 &mut txn.items,
825 &txn.meta.room_version_rules,
826 ) {
827 trace!("Aggregation marked as sent");
828 txn.commit();
829 return;
830 }
831
832 trace!("Sent aggregation was not found");
833 }
834
835 warn!("Timeline item not found, can't update send state");
836 return;
837 };
838
839 let Some(local_item) = item.as_local() else {
840 warn!("We looked for a local item, but it transitioned to remote.");
841 return;
842 };
843
844 if let EventSendState::Sent { event_id: existing_event_id } = &local_item.send_state {
847 error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
848 }
849
850 if let Some(new_event_id) = new_event_id {
853 txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned());
854 }
855
856 let new_item = item.with_inner_kind(local_item.with_send_state(send_state));
857 txn.items.replace(idx, new_item);
858
859 txn.commit();
860 }
861
862 pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
863 let mut state = self.state.write().await;
864
865 if let Some((idx, _)) =
866 rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))
867 {
868 let mut txn = state.transaction();
869
870 txn.items.remove(idx);
871
872 let mut adjuster = DateDividerAdjuster::new(self.settings.date_divider_mode.clone());
875 adjuster.run(&mut txn.items, &mut txn.meta);
876
877 txn.meta.update_read_marker(&mut txn.items);
878
879 txn.commit();
880
881 debug!("discarded local echo");
882 return true;
883 }
884
885 let mut txn = state.transaction();
888
889 let found_aggregation = match txn.meta.aggregations.try_remove_aggregation(
891 &TimelineEventItemId::TransactionId(txn_id.to_owned()),
892 &mut txn.items,
893 ) {
894 Ok(val) => val,
895 Err(err) => {
896 warn!("error when discarding local echo for an aggregation: {err}");
897 true
899 }
900 };
901
902 if found_aggregation {
903 txn.commit();
904 }
905
906 found_aggregation
907 }
908
909 pub(super) async fn replace_local_echo(
910 &self,
911 txn_id: &TransactionId,
912 content: AnyMessageLikeEventContent,
913 ) -> bool {
914 let AnyMessageLikeEventContent::RoomMessage(content) = content else {
915 warn!("Replacing a local echo for a non-RoomMessage-like event NYI");
920 return false;
921 };
922
923 let mut state = self.state.write().await;
924 let mut txn = state.transaction();
925
926 let Some((idx, prev_item)) =
927 rfind_event_item(&txn.items, |it| it.transaction_id() == Some(txn_id))
928 else {
929 debug!("Can't find local echo to replace");
930 return false;
931 };
932
933 let ti_kind = {
936 let Some(prev_local_item) = prev_item.as_local() else {
937 warn!("We looked for a local item, but it transitioned as remote??");
938 return false;
939 };
940 let progress = as_variant!(&prev_local_item.send_state,
942 EventSendState::NotSentYet { progress } => progress.clone())
943 .flatten();
944 prev_local_item.with_send_state(EventSendState::NotSentYet { progress })
945 };
946
947 let new_item = TimelineItem::new(
949 prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message(
950 content.msgtype,
951 content.mentions,
952 prev_item.content().reactions().cloned().unwrap_or_default(),
953 prev_item.content().thread_root(),
954 prev_item.content().in_reply_to(),
955 prev_item.content().thread_summary(),
956 )),
957 prev_item.internal_id.to_owned(),
958 );
959
960 txn.items.replace(idx, new_item);
961
962 txn.commit();
966
967 debug!("Replaced local echo");
968 true
969 }
970
971 pub(super) async fn compute_redecryption_candidates(
972 &self,
973 ) -> (BTreeSet<String>, BTreeSet<String>) {
974 let state = self.state.read().await;
975 compute_redecryption_candidates(&state.items)
976 }
977
978 pub(super) async fn set_sender_profiles_pending(&self) {
979 self.set_non_ready_sender_profiles(TimelineDetails::Pending).await;
980 }
981
982 pub(super) async fn set_sender_profiles_error(&self, error: Arc<matrix_sdk::Error>) {
983 self.set_non_ready_sender_profiles(TimelineDetails::Error(error)).await;
984 }
985
986 async fn set_non_ready_sender_profiles(&self, profile_state: TimelineDetails<Profile>) {
987 self.state.write().await.items.for_each(|mut entry| {
988 let Some(event_item) = entry.as_event() else { return };
989 if !matches!(event_item.sender_profile(), TimelineDetails::Ready(_)) {
990 let new_item = entry.with_kind(TimelineItemKind::Event(
991 event_item.with_sender_profile(profile_state.clone()),
992 ));
993 ObservableItemsEntry::replace(&mut entry, new_item);
994 }
995 });
996 }
997
998 pub(super) async fn update_missing_sender_profiles(&self) {
999 trace!("Updating missing sender profiles");
1000
1001 let mut state = self.state.write().await;
1002 let mut entries = state.items.entries();
1003 while let Some(mut entry) = entries.next() {
1004 let Some(event_item) = entry.as_event() else { continue };
1005 let event_id = event_item.event_id().map(debug);
1006 let transaction_id = event_item.transaction_id().map(debug);
1007
1008 if event_item.sender_profile().is_ready() {
1009 trace!(event_id, transaction_id, "Profile already set");
1010 continue;
1011 }
1012
1013 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1014 Some(profile) => {
1015 trace!(event_id, transaction_id, "Adding profile");
1016 let updated_item =
1017 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1018 let new_item = entry.with_kind(updated_item);
1019 ObservableItemsEntry::replace(&mut entry, new_item);
1020 }
1021 None => {
1022 if !event_item.sender_profile().is_unavailable() {
1023 trace!(event_id, transaction_id, "Marking profile unavailable");
1024 let updated_item =
1025 event_item.with_sender_profile(TimelineDetails::Unavailable);
1026 let new_item = entry.with_kind(updated_item);
1027 ObservableItemsEntry::replace(&mut entry, new_item);
1028 } else {
1029 debug!(event_id, transaction_id, "Profile already marked unavailable");
1030 }
1031 }
1032 }
1033 }
1034
1035 trace!("Done updating missing sender profiles");
1036 }
1037
1038 pub(super) async fn force_update_sender_profiles(&self, sender_ids: &BTreeSet<&UserId>) {
1040 trace!("Forcing update of sender profiles: {sender_ids:?}");
1041
1042 let mut state = self.state.write().await;
1043 let mut entries = state.items.entries();
1044 while let Some(mut entry) = entries.next() {
1045 let Some(event_item) = entry.as_event() else { continue };
1046 if !sender_ids.contains(event_item.sender()) {
1047 continue;
1048 }
1049
1050 let event_id = event_item.event_id().map(debug);
1051 let transaction_id = event_item.transaction_id().map(debug);
1052
1053 match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
1054 Some(profile) => {
1055 if matches!(event_item.sender_profile(), TimelineDetails::Ready(old_profile) if *old_profile == profile)
1056 {
1057 debug!(event_id, transaction_id, "Profile already up-to-date");
1058 } else {
1059 trace!(event_id, transaction_id, "Updating profile");
1060 let updated_item =
1061 event_item.with_sender_profile(TimelineDetails::Ready(profile));
1062 let new_item = entry.with_kind(updated_item);
1063 ObservableItemsEntry::replace(&mut entry, new_item);
1064 }
1065 }
1066 None => {
1067 if !event_item.sender_profile().is_unavailable() {
1068 trace!(event_id, transaction_id, "Marking profile unavailable");
1069 let updated_item =
1070 event_item.with_sender_profile(TimelineDetails::Unavailable);
1071 let new_item = entry.with_kind(updated_item);
1072 ObservableItemsEntry::replace(&mut entry, new_item);
1073 } else {
1074 debug!(event_id, transaction_id, "Profile already marked unavailable");
1075 }
1076 }
1077 }
1078 }
1079
1080 trace!("Done forcing update of sender profiles");
1081 }
1082
1083 #[cfg(test)]
1084 pub(super) async fn handle_read_receipts(&self, receipt_event_content: ReceiptEventContent) {
1085 let own_user_id = self.room_data_provider.own_user_id();
1086 self.state.write().await.handle_read_receipts(receipt_event_content, own_user_id);
1087 }
1088
1089 pub(super) async fn latest_user_read_receipt(
1093 &self,
1094 user_id: &UserId,
1095 ) -> Option<(OwnedEventId, Receipt)> {
1096 let receipt_thread = self.focus.receipt_thread();
1097
1098 self.state
1099 .read()
1100 .await
1101 .latest_user_read_receipt(user_id, receipt_thread, &self.room_data_provider)
1102 .await
1103 }
1104
1105 pub(super) async fn latest_user_read_receipt_timeline_event_id(
1108 &self,
1109 user_id: &UserId,
1110 ) -> Option<OwnedEventId> {
1111 self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
1112 }
1113
1114 pub async fn subscribe_own_user_read_receipts_changed(
1116 &self,
1117 ) -> impl Stream<Item = ()> + use<P> {
1118 self.state.read().await.meta.read_receipts.subscribe_own_user_read_receipts_changed()
1119 }
1120
1121 pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
1123 match echo.content {
1124 LocalEchoContent::Event { serialized_event, send_handle, send_error } => {
1125 let content = match serialized_event.deserialize() {
1126 Ok(d) => d,
1127 Err(err) => {
1128 warn!("error deserializing local echo: {err}");
1129 return;
1130 }
1131 };
1132
1133 self.handle_local_event(echo.transaction_id.clone(), content, Some(send_handle))
1134 .await;
1135
1136 if let Some(send_error) = send_error {
1137 self.update_event_send_state(
1138 &echo.transaction_id,
1139 EventSendState::SendingFailed {
1140 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1141 send_error,
1142 ))),
1143 is_recoverable: false,
1144 },
1145 )
1146 .await;
1147 }
1148 }
1149
1150 LocalEchoContent::React { key, send_handle, applies_to } => {
1151 self.handle_local_reaction(key, send_handle, applies_to).await;
1152 }
1153
1154 LocalEchoContent::Redaction { redacts, send_error, .. } => {
1155 self.handle_local_redaction(echo.transaction_id.clone(), redacts).await;
1156
1157 if let Some(send_error) = send_error {
1158 self.update_event_send_state(
1159 &echo.transaction_id,
1160 EventSendState::SendingFailed {
1161 error: Arc::new(matrix_sdk::Error::SendQueueWedgeError(Box::new(
1162 send_error,
1163 ))),
1164 is_recoverable: false,
1165 },
1166 )
1167 .await;
1168 }
1169 }
1170 }
1171 }
1172
1173 #[instrument(skip(self, send_handle))]
1175 async fn handle_local_reaction(
1176 &self,
1177 reaction_key: String,
1178 send_handle: SendReactionHandle,
1179 applies_to: OwnedTransactionId,
1180 ) {
1181 let mut state = self.state.write().await;
1182 let mut tr = state.transaction();
1183
1184 let target = TimelineEventItemId::TransactionId(applies_to);
1185
1186 let reaction_txn_id = send_handle.transaction_id().to_owned();
1187 let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle));
1188 let aggregation = Aggregation::new(
1189 TimelineEventItemId::TransactionId(reaction_txn_id),
1190 AggregationKind::Reaction {
1191 key: reaction_key.clone(),
1192 sender: self.room_data_provider.own_user_id().to_owned(),
1193 timestamp: MilliSecondsSinceUnixEpoch::now(),
1194 reaction_status,
1195 },
1196 );
1197
1198 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1199 find_item_and_apply_aggregation(
1200 &tr.meta.aggregations,
1201 &mut tr.items,
1202 &target,
1203 aggregation,
1204 &tr.meta.room_version_rules,
1205 );
1206
1207 tr.commit();
1208 }
1209
1210 pub(super) async fn handle_local_redaction(
1212 &self,
1213 txn_id: OwnedTransactionId,
1214 redacts: OwnedEventId,
1215 ) {
1216 let mut state = self.state.write().await;
1217 let mut tr = state.transaction();
1218
1219 let target = TimelineEventItemId::EventId(redacts);
1220
1221 let aggregation = Aggregation::new(
1222 TimelineEventItemId::TransactionId(txn_id),
1223 AggregationKind::Redaction { is_local: true },
1224 );
1225
1226 tr.meta.aggregations.add(target.clone(), aggregation.clone());
1227 find_item_and_apply_aggregation(
1228 &tr.meta.aggregations,
1229 &mut tr.items,
1230 &target,
1231 aggregation,
1232 &tr.meta.room_version_rules,
1233 );
1234
1235 tr.commit();
1236 }
1237
1238 pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
1240 match update {
1241 RoomSendQueueUpdate::NewLocalEvent(echo) => {
1242 self.handle_local_echo(echo).await;
1243 }
1244
1245 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
1246 if !self.discard_local_echo(&transaction_id).await {
1247 warn!("couldn't find the local echo to discard");
1248 }
1249 }
1250
1251 RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
1252 let content = match new_content.deserialize() {
1253 Ok(d) => d,
1254 Err(err) => {
1255 warn!("error deserializing local echo (upon edit): {err}");
1256 return;
1257 }
1258 };
1259
1260 if !self.replace_local_echo(&transaction_id, content).await {
1261 warn!("couldn't find the local echo to replace");
1262 }
1263 }
1264
1265 RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
1266 self.update_event_send_state(
1267 &transaction_id,
1268 EventSendState::SendingFailed { error, is_recoverable },
1269 )
1270 .await;
1271 }
1272
1273 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
1274 self.update_event_send_state(
1275 &transaction_id,
1276 EventSendState::NotSentYet { progress: None },
1277 )
1278 .await;
1279 }
1280
1281 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
1282 self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
1283 .await;
1284 }
1285
1286 RoomSendQueueUpdate::MediaUpload { related_to, index, progress, .. } => {
1287 self.update_event_send_state(
1288 &related_to,
1289 EventSendState::NotSentYet {
1290 progress: Some(MediaUploadProgress { index, progress }),
1291 },
1292 )
1293 .await;
1294 }
1295 }
1296 }
1297
1298 pub async fn insert_timeline_start_if_missing(&self) {
1301 let mut state = self.state.write().await;
1302 let mut txn = state.transaction();
1303 txn.items.push_timeline_start_if_missing(
1304 txn.meta.new_timeline_item(VirtualTimelineItem::TimelineStart),
1305 );
1306 txn.commit();
1307 }
1308
1309 pub(super) async fn make_replied_to(
1315 &self,
1316 event: TimelineEvent,
1317 ) -> Result<Option<EmbeddedEvent>, Error> {
1318 let state = self.state.read().await;
1319 EmbeddedEvent::try_from_timeline_event(event, &self.room_data_provider, &state.meta).await
1320 }
1321}
1322
1323impl TimelineController {
1324 pub(super) fn room(&self) -> &Room {
1325 &self.room_data_provider
1326 }
1327
1328 pub(super) async fn init_focus(
1333 &self,
1334 focus: &TimelineFocus,
1335 room_event_cache: &RoomEventCache,
1336 ) -> Result<InitFocusResult, Error> {
1337 match focus {
1338 TimelineFocus::Live { .. } => {
1339 let events = room_event_cache.events().await?;
1341
1342 let has_events = !events.is_empty();
1343
1344 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
1345
1346 match room_event_cache.pagination().status().get() {
1347 PaginationStatus::Idle { hit_timeline_start } => {
1348 if hit_timeline_start {
1349 self.insert_timeline_start_if_missing().await;
1352 }
1353 }
1354 PaginationStatus::Paginating => {}
1355 }
1356
1357 Ok(InitFocusResult { has_events, focus_task: None })
1358 }
1359
1360 TimelineFocus::Event { target: event_id, num_context_events, thread_mode } => {
1361 let event_cache_thread_mode = match thread_mode {
1363 TimelineEventFocusThreadMode::ForceThread => EventFocusThreadMode::ForceThread,
1364 TimelineEventFocusThreadMode::Automatic { .. } => {
1365 EventFocusThreadMode::Automatic
1366 }
1367 };
1368
1369 let cache = room_event_cache
1370 .get_or_create_event_focused_cache(
1371 event_id.clone(),
1372 *num_context_events,
1373 event_cache_thread_mode,
1374 )
1375 .await
1376 .map_err(PaginationError::EventCache)?;
1377
1378 let (events, receiver) = cache.subscribe().await;
1379
1380 let has_events = !events.is_empty();
1381
1382 match &*self.focus {
1385 TimelineFocusKind::Event { thread_root: focus_thread_root, .. } => {
1386 if let Some(thread_root) = cache.thread_root().await {
1387 focus_thread_root.get_or_init(|| thread_root);
1388 }
1389 }
1390 TimelineFocusKind::Live { .. }
1391 | TimelineFocusKind::Thread { .. }
1392 | TimelineFocusKind::PinnedEvents => {
1393 panic!("unexpected focus for an event-focused timeline")
1394 }
1395 }
1396
1397 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Pagination)
1398 .await;
1399
1400 let task = self
1401 .room_data_provider
1402 .client()
1403 .task_monitor()
1404 .spawn_infinite_task(
1405 "timeline::event_focused_cache_updates",
1406 event_focused_task(
1407 event_id.clone(),
1408 (*thread_mode).into(),
1409 room_event_cache.clone(),
1410 self.clone(),
1411 receiver,
1412 ),
1413 )
1414 .abort_on_drop();
1415
1416 Ok(InitFocusResult { has_events, focus_task: Some(task) })
1417 }
1418
1419 TimelineFocus::Thread { root_event_id, .. } => {
1420 let (has_events, receiver) =
1421 self.init_with_thread_root(root_event_id, room_event_cache).await?;
1422
1423 let room = &self.room_data_provider;
1424 let span = info_span!(
1425 parent: Span::none(),
1426 "thread_live_update_handler",
1427 room_id = ?room.room_id(),
1428 );
1429 span.follows_from(Span::current());
1430
1431 let task = room
1432 .client()
1433 .task_monitor()
1434 .spawn_infinite_task(
1435 "timeline::thread_event_cache_updates",
1436 thread_updates_task(
1437 receiver,
1438 room_event_cache.clone(),
1439 self.clone(),
1440 root_event_id.clone(),
1441 )
1442 .instrument(span),
1443 )
1444 .abort_on_drop();
1445
1446 Ok(InitFocusResult { has_events, focus_task: Some(task) })
1447 }
1448
1449 TimelineFocus::PinnedEvents => {
1450 let (initial_events, pinned_events_recv) =
1451 room_event_cache.subscribe_to_pinned_events().await?;
1452
1453 let has_events = !initial_events.is_empty();
1454
1455 self.replace_with_initial_remote_events(
1456 initial_events,
1457 RemoteEventOrigin::Pagination,
1458 )
1459 .await;
1460
1461 let task = self
1462 .room_data_provider
1463 .client()
1464 .task_monitor()
1465 .spawn_infinite_task(
1466 "timeline::pinned_event_cache_updates",
1467 pinned_events_task(
1468 room_event_cache.clone(),
1469 self.clone(),
1470 pinned_events_recv,
1471 ),
1472 )
1473 .abort_on_drop();
1474
1475 Ok(InitFocusResult { has_events, focus_task: Some(task) })
1476 }
1477 }
1478 }
1479
1480 pub(super) async fn init_with_thread_root(
1487 &self,
1488 root_event_id: &OwnedEventId,
1489 room_event_cache: &RoomEventCache,
1490 ) -> Result<(bool, broadcast::Receiver<TimelineVectorDiffs>), Error> {
1491 let (events, receiver) =
1492 room_event_cache.subscribe_to_thread(root_event_id.clone()).await?;
1493 let has_events = !events.is_empty();
1494
1495 let mut related_events = Vector::new();
1499 for event_id in events.iter().filter_map(|event| event.event_id()) {
1500 if let Some((_original, related)) =
1501 room_event_cache.find_event_with_relations(&event_id, None).await?
1502 {
1503 related_events.extend(related);
1504 }
1505 }
1506
1507 self.replace_with_initial_remote_events(events, RemoteEventOrigin::Cache).await;
1508
1509 if !related_events.is_empty() {
1511 self.handle_remote_aggregations(
1512 vec![VectorDiff::Append { values: related_events }],
1513 RemoteEventOrigin::Cache,
1514 )
1515 .await;
1516 }
1517
1518 Ok((has_events, receiver))
1519 }
1520
1521 #[instrument(skip(self))]
1524 pub(super) async fn fetch_in_reply_to_details(&self, event_id: &EventId) -> Result<(), Error> {
1525 let state_guard = self.state.write().await;
1526 let (index, item) = rfind_event_by_id(&state_guard.items, event_id)
1527 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1528 let remote_item = item
1529 .as_remote()
1530 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?
1531 .clone();
1532
1533 let TimelineItemContent::MsgLike(msglike) = item.content().clone() else {
1534 debug!("Event is not a message");
1535 return Ok(());
1536 };
1537 let Some(in_reply_to) = msglike.in_reply_to.clone() else {
1538 debug!("Event is not a reply");
1539 return Ok(());
1540 };
1541 if let TimelineDetails::Pending = &in_reply_to.event {
1542 debug!("Replied-to event is already being fetched");
1543 return Ok(());
1544 }
1545 if let TimelineDetails::Ready(_) = &in_reply_to.event {
1546 debug!("Replied-to event has already been fetched");
1547 return Ok(());
1548 }
1549
1550 let internal_id = item.internal_id.to_owned();
1551 let item = item.clone();
1552 let event = fetch_replied_to_event(
1553 state_guard,
1554 &self.state,
1555 index,
1556 &item,
1557 internal_id,
1558 &msglike,
1559 &in_reply_to.event_id,
1560 self.room(),
1561 )
1562 .await?;
1563
1564 let mut state = self.state.write().await;
1567 let (index, item) = rfind_event_by_id(&state.items, &remote_item.event_id)
1568 .ok_or(Error::EventNotInTimeline(TimelineEventItemId::EventId(event_id.to_owned())))?;
1569
1570 let TimelineItemContent::MsgLike(MsgLikeContent {
1573 kind: MsgLikeKind::Message(message),
1574 reactions,
1575 thread_root,
1576 in_reply_to,
1577 thread_summary,
1578 }) = item.content().clone()
1579 else {
1580 info!("Event is no longer a message (redacted?)");
1581 return Ok(());
1582 };
1583 let Some(in_reply_to) = in_reply_to else {
1584 warn!("Event no longer has a reply (bug?)");
1585 return Ok(());
1586 };
1587
1588 trace!("Updating in-reply-to details");
1591 let internal_id = item.internal_id.to_owned();
1592 let mut item = item.clone();
1593 item.set_content(TimelineItemContent::MsgLike(MsgLikeContent {
1594 kind: MsgLikeKind::Message(message),
1595 reactions,
1596 thread_root,
1597 in_reply_to: Some(InReplyToDetails { event_id: in_reply_to.event_id, event }),
1598 thread_summary,
1599 }));
1600 state.items.replace(index, TimelineItem::new(item, internal_id));
1601
1602 Ok(())
1603 }
1604
1605 pub(super) fn infer_thread_for_read_receipt(
1611 &self,
1612 receipt_type: &SendReceiptType,
1613 ) -> ReceiptThread {
1614 if matches!(receipt_type, SendReceiptType::FullyRead) {
1615 ReceiptThread::Unthreaded
1616 } else {
1617 self.focus.receipt_thread()
1618 }
1619 }
1620
1621 pub(super) async fn should_send_receipt(
1625 &self,
1626 receipt_type: &SendReceiptType,
1627 receipt_thread: &ReceiptThread,
1628 event_id: &EventId,
1629 ) -> bool {
1630 let own_user_id = self.room().own_user_id();
1631 let state = self.state.read().await;
1632 let room = self.room();
1633
1634 match receipt_type {
1635 SendReceiptType::Read => {
1636 if let Some((old_pub_read, _)) = state
1637 .meta
1638 .user_receipt(
1639 own_user_id,
1640 ReceiptType::Read,
1641 receipt_thread.clone(),
1642 room,
1643 state.items.all_remote_events(),
1644 )
1645 .await
1646 {
1647 trace!(%old_pub_read, "found a previous public receipt");
1648 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1649 &old_pub_read,
1650 event_id,
1651 state.items.all_remote_events(),
1652 ) {
1653 trace!(
1654 "event referred to new receipt is {relative_pos:?} the previous receipt"
1655 );
1656 return relative_pos == RelativePosition::After;
1657 }
1658 }
1659 }
1660
1661 SendReceiptType::ReadPrivate => {
1664 if let Some((old_priv_read, _)) =
1665 state.latest_user_read_receipt(own_user_id, receipt_thread.clone(), room).await
1666 {
1667 trace!(%old_priv_read, "found a previous private receipt");
1668 if let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1669 &old_priv_read,
1670 event_id,
1671 state.items.all_remote_events(),
1672 ) {
1673 trace!(
1674 "event referred to new receipt is {relative_pos:?} the previous receipt"
1675 );
1676 return relative_pos == RelativePosition::After;
1677 }
1678 }
1679 }
1680
1681 SendReceiptType::FullyRead => {
1682 if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
1683 && let Some(relative_pos) = TimelineMetadata::compare_events_positions(
1684 &prev_event_id,
1685 event_id,
1686 state.items.all_remote_events(),
1687 )
1688 {
1689 return relative_pos == RelativePosition::After;
1690 }
1691 }
1692
1693 _ => {}
1694 }
1695
1696 true
1698 }
1699
1700 pub(crate) async fn latest_event_id(&self) -> Option<OwnedEventId> {
1703 let state = self.state.read().await;
1704 let filter_out_thread_events = match self.focus() {
1705 TimelineFocusKind::Thread { .. } => false,
1706 TimelineFocusKind::Live { hide_threaded_events } => *hide_threaded_events,
1707 TimelineFocusKind::Event { .. } => {
1708 false
1710 }
1711 TimelineFocusKind::PinnedEvents => true,
1712 };
1713
1714 state
1715 .items
1716 .all_remote_events()
1717 .iter()
1718 .rev()
1719 .filter_map(|event_meta| {
1720 if !filter_out_thread_events {
1721 Some(event_meta.event_id.clone())
1723 } else if event_meta.thread_root_id.is_none() {
1724 if let Some(TimelineEventItemId::EventId(target_event_id)) =
1730 state.meta.aggregations.is_aggregation_of(&TimelineEventItemId::EventId(
1731 event_meta.event_id.clone(),
1732 ))
1733 && let Some(target_meta) =
1734 state.items.all_remote_events().get_by_event_id(target_event_id)
1735 && target_meta.thread_root_id.is_some()
1736 {
1737 None
1739 } else {
1740 Some(event_meta.event_id.clone())
1743 }
1744 } else {
1745 None
1748 }
1749 })
1750 .next()
1751 }
1752
1753 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
1754 pub(super) async fn retry_event_decryption(&self, session_ids: Option<BTreeSet<String>>) {
1755 let (utds, decrypted) = self.compute_redecryption_candidates().await;
1756
1757 let request = DecryptionRetryRequest {
1758 room_id: self.room().room_id().to_owned(),
1759 utd_session_ids: utds,
1760 refresh_info_session_ids: decrypted,
1761 };
1762
1763 self.room().client().event_cache().request_decryption(request);
1764 }
1765
1766 pub(super) async fn map_pagination_status(&self, status: PaginationStatus) -> PaginationStatus {
1774 match status {
1775 PaginationStatus::Idle { hit_timeline_start } => {
1776 if hit_timeline_start {
1777 let state = self.state.read().await;
1778 if state.meta.subscriber_skip_count.get() > 0 {
1782 return PaginationStatus::Idle { hit_timeline_start: false };
1783 }
1784 }
1785 }
1786 PaginationStatus::Paginating => {}
1787 }
1788
1789 status
1791 }
1792}
1793
1794impl<P: RoomDataProvider> TimelineController<P> {
1795 pub(super) fn focus(&self) -> &TimelineFocusKind {
1797 &self.focus
1798 }
1799}
1800
1801#[allow(clippy::too_many_arguments)]
1802async fn fetch_replied_to_event<P: RoomDataProvider>(
1803 mut state_guard: RwLockWriteGuard<'_, TimelineState<P>>,
1804 state_lock: &RwLock<TimelineState<P>>,
1805 index: usize,
1806 item: &EventTimelineItem,
1807 internal_id: TimelineUniqueId,
1808 msglike: &MsgLikeContent,
1809 in_reply_to: &EventId,
1810 room: &Room,
1811) -> Result<TimelineDetails<Box<EmbeddedEvent>>, Error> {
1812 if let Some((_, item)) = rfind_event_by_id(&state_guard.items, in_reply_to) {
1813 let details = TimelineDetails::Ready(Box::new(EmbeddedEvent::from_timeline_item(&item)));
1814 trace!("Found replied-to event locally");
1815 return Ok(details);
1816 }
1817
1818 trace!("Setting in-reply-to details to pending");
1821 let in_reply_to_details =
1822 InReplyToDetails { event_id: in_reply_to.to_owned(), event: TimelineDetails::Pending };
1823
1824 let event_item = item
1825 .with_content(TimelineItemContent::MsgLike(msglike.with_in_reply_to(in_reply_to_details)));
1826
1827 let new_timeline_item = TimelineItem::new(event_item, internal_id);
1828 state_guard.items.replace(index, new_timeline_item);
1829
1830 drop(state_guard);
1832
1833 trace!("Fetching replied-to event");
1834 let res = match room.load_or_fetch_event(in_reply_to, None).await {
1835 Ok(timeline_event) => {
1836 let state = state_lock.read().await;
1837
1838 let replied_to_item =
1839 EmbeddedEvent::try_from_timeline_event(timeline_event, room, &state.meta).await?;
1840
1841 if let Some(item) = replied_to_item {
1842 TimelineDetails::Ready(Box::new(item))
1843 } else {
1844 return Err(Error::UnsupportedEvent);
1846 }
1847 }
1848
1849 Err(e) => TimelineDetails::Error(Arc::new(e)),
1850 };
1851
1852 Ok(res)
1853}