1use std::{fs, path::PathBuf, sync::Arc};
20
21use algorithms::rfind_event_by_item_id;
22use event_item::TimelineItemHandle;
23use eyeball_im::VectorDiff;
24#[cfg(feature = "unstable-msc4274")]
25use futures::SendGallery;
26use futures_core::Stream;
27use imbl::Vector;
28#[cfg(feature = "unstable-msc4274")]
29use matrix_sdk::attachment::{AttachmentInfo, Thumbnail};
30use matrix_sdk::{
31 attachment::AttachmentConfig,
32 deserialized_responses::TimelineEvent,
33 event_cache::{EventCacheDropHandles, RoomEventCache},
34 event_handler::EventHandlerHandle,
35 executor::JoinHandle,
36 room::{edit::EditedContent, reply::Reply, Receipts, Room},
37 send_queue::{RoomSendQueueError, SendHandle},
38 Client, Result,
39};
40use mime::Mime;
41use pinned_events_loader::PinnedEventsRoom;
42use ruma::{
43 api::client::receipt::create_receipt::v3::ReceiptType,
44 events::{
45 poll::unstable_start::{NewUnstablePollStartEventContent, UnstablePollStartEventContent},
46 receipt::{Receipt, ReceiptThread},
47 room::{
48 message::RoomMessageEventContentWithoutRelation,
49 pinned_events::RoomPinnedEventsEventContent,
50 },
51 AnyMessageLikeEventContent, AnySyncTimelineEvent,
52 },
53 EventId, OwnedEventId, RoomVersionId, UserId,
54};
55#[cfg(feature = "unstable-msc4274")]
56use ruma::{
57 events::{room::message::FormattedBody, Mentions},
58 OwnedTransactionId,
59};
60use subscriber::TimelineWithDropHandle;
61use thiserror::Error;
62use tracing::{instrument, trace, warn};
63
64use self::{
65 algorithms::rfind_event_by_id, controller::TimelineController, futures::SendAttachment,
66};
67
68mod algorithms;
69mod builder;
70mod controller;
71mod date_dividers;
72mod error;
73mod event_handler;
74mod event_item;
75pub mod event_type_filter;
76pub mod futures;
77mod item;
78mod pagination;
79mod pinned_events_loader;
80mod subscriber;
81#[cfg(test)]
82mod tests;
83mod threaded_events_loader;
84mod to_device;
85mod traits;
86mod virtual_item;
87
88pub use self::{
89 builder::TimelineBuilder,
90 controller::default_event_filter,
91 error::*,
92 event_item::{
93 AnyOtherFullStateEventContent, EmbeddedEvent, EncryptedMessage, EventItemOrigin,
94 EventSendState, EventTimelineItem, InReplyToDetails, MemberProfileChange, MembershipChange,
95 Message, MsgLikeContent, MsgLikeKind, OtherState, PollResult, PollState, Profile,
96 ReactionInfo, ReactionStatus, ReactionsByKeyBySender, RoomMembershipChange,
97 RoomPinnedEventsChange, Sticker, ThreadSummary, TimelineDetails, TimelineEventItemId,
98 TimelineItemContent,
99 },
100 event_type_filter::TimelineEventTypeFilter,
101 item::{TimelineItem, TimelineItemKind, TimelineUniqueId},
102 traits::RoomExt,
103 virtual_item::VirtualTimelineItem,
104};
105
106#[derive(Debug)]
112pub struct Timeline {
113 controller: TimelineController,
116
117 event_cache: RoomEventCache,
119
120 drop_handle: Arc<TimelineDropHandle>,
122}
123
124#[derive(Clone, Debug, PartialEq)]
126pub enum TimelineFocus {
127 Live {
130 hide_threaded_events: bool,
135 },
136
137 Event {
139 target: OwnedEventId,
140 num_context_events: u16,
141 hide_threaded_events: bool,
146 },
147
148 Thread {
150 root_event_id: OwnedEventId,
151 num_events: u16,
153 },
154
155 PinnedEvents { max_events_to_load: u16, max_concurrent_requests: u16 },
157}
158
159impl TimelineFocus {
160 pub(super) fn debug_string(&self) -> String {
161 match self {
162 TimelineFocus::Live { .. } => "live".to_owned(),
163 TimelineFocus::Event { target, .. } => format!("permalink:{target}"),
164 TimelineFocus::Thread { root_event_id, .. } => format!("thread:{root_event_id}"),
165 TimelineFocus::PinnedEvents { .. } => "pinned-events".to_owned(),
166 }
167 }
168}
169
170#[derive(Debug, Clone)]
173pub enum DateDividerMode {
174 Daily,
175 Monthly,
176}
177
178impl Timeline {
179 pub fn room(&self) -> &Room {
181 self.controller.room()
182 }
183
184 pub async fn clear(&self) {
186 self.controller.clear().await;
187 }
188
189 pub async fn retry_decryption<S: Into<String>>(
214 &self,
215 session_ids: impl IntoIterator<Item = S>,
216 ) {
217 self.controller
218 .retry_event_decryption(Some(session_ids.into_iter().map(Into::into).collect()))
219 .await;
220 }
221
222 #[tracing::instrument(skip(self))]
223 async fn retry_decryption_for_all_events(&self) {
224 self.controller.retry_event_decryption(None).await;
225 }
226
227 pub async fn item_by_event_id(&self, event_id: &EventId) -> Option<EventTimelineItem> {
236 let items = self.controller.items().await;
237 let (_, item) = rfind_event_by_id(&items, event_id)?;
238 Some(item.to_owned())
239 }
240
241 pub async fn latest_event(&self) -> Option<EventTimelineItem> {
243 if self.controller.is_live().await {
244 self.controller.items().await.last()?.as_event().cloned()
245 } else {
246 None
247 }
248 }
249
250 pub async fn subscribe(
257 &self,
258 ) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>>) {
259 let (items, stream) = self.controller.subscribe().await;
260 let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
261 (items, stream)
262 }
263
264 #[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))]
282 pub async fn send(
283 &self,
284 content: AnyMessageLikeEventContent,
285 ) -> Result<SendHandle, RoomSendQueueError> {
286 self.room().send_queue().send(content).await
287 }
288
289 #[instrument(skip(self, content))]
307 pub async fn send_reply(
308 &self,
309 content: RoomMessageEventContentWithoutRelation,
310 reply: Reply,
311 ) -> Result<(), Error> {
312 let content = self.room().make_reply_event(content, reply).await?;
313 self.send(content.into()).await?;
314 Ok(())
315 }
316
317 #[instrument(skip(self, new_content))]
322 pub async fn edit(
323 &self,
324 item_id: &TimelineEventItemId,
325 new_content: EditedContent,
326 ) -> Result<(), Error> {
327 let items = self.items().await;
328 let Some((_pos, item)) = rfind_event_by_item_id(&items, item_id) else {
329 return Err(Error::EventNotInTimeline(item_id.clone()));
330 };
331
332 match item.handle() {
333 TimelineItemHandle::Remote(event_id) => {
334 let content = self
335 .room()
336 .make_edit_event(event_id, new_content)
337 .await
338 .map_err(EditError::RoomError)?;
339 self.send(content).await?;
340 Ok(())
341 }
342
343 TimelineItemHandle::Local(handle) => {
344 let new_content: AnyMessageLikeEventContent = match new_content {
346 EditedContent::RoomMessage(message) => {
347 if item.content.is_message() {
348 AnyMessageLikeEventContent::RoomMessage(message.into())
349 } else {
350 return Err(EditError::ContentMismatch {
351 original: item.content.debug_string().to_owned(),
352 new: "a message".to_owned(),
353 }
354 .into());
355 }
356 }
357
358 EditedContent::PollStart { new_content, .. } => {
359 if item.content.is_poll() {
360 AnyMessageLikeEventContent::UnstablePollStart(
361 UnstablePollStartEventContent::New(
362 NewUnstablePollStartEventContent::new(new_content),
363 ),
364 )
365 } else {
366 return Err(EditError::ContentMismatch {
367 original: item.content.debug_string().to_owned(),
368 new: "a poll".to_owned(),
369 }
370 .into());
371 }
372 }
373
374 EditedContent::MediaCaption { caption, formatted_caption, mentions } => {
375 if handle
376 .edit_media_caption(caption, formatted_caption, mentions)
377 .await
378 .map_err(RoomSendQueueError::StorageError)?
379 {
380 return Ok(());
381 }
382 return Err(EditError::InvalidLocalEchoState.into());
383 }
384 };
385
386 if !handle.edit(new_content).await.map_err(RoomSendQueueError::StorageError)? {
387 return Err(EditError::InvalidLocalEchoState.into());
388 }
389
390 Ok(())
391 }
392 }
393 }
394
395 pub async fn toggle_reaction(
405 &self,
406 item_id: &TimelineEventItemId,
407 reaction_key: &str,
408 ) -> Result<(), Error> {
409 self.controller.toggle_reaction_local(item_id, reaction_key).await?;
410 Ok(())
411 }
412
413 #[instrument(skip_all)]
437 pub fn send_attachment(
438 &self,
439 source: impl Into<AttachmentSource>,
440 mime_type: Mime,
441 config: AttachmentConfig,
442 ) -> SendAttachment<'_> {
443 SendAttachment::new(self, source.into(), mime_type, config)
444 }
445
446 #[cfg(feature = "unstable-msc4274")]
463 #[instrument(skip_all)]
464 pub fn send_gallery(&self, gallery: GalleryConfig) -> SendGallery<'_> {
465 SendGallery::new(self, gallery)
466 }
467
468 pub async fn redact(
471 &self,
472 item_id: &TimelineEventItemId,
473 reason: Option<&str>,
474 ) -> Result<(), Error> {
475 let items = self.items().await;
476 let Some((_pos, event)) = rfind_event_by_item_id(&items, item_id) else {
477 return Err(RedactError::ItemNotFound(item_id.clone()).into());
478 };
479
480 match event.handle() {
481 TimelineItemHandle::Remote(event_id) => {
482 self.room().redact(event_id, reason, None).await.map_err(RedactError::HttpError)?;
483 }
484 TimelineItemHandle::Local(handle) => {
485 if !handle.abort().await.map_err(RoomSendQueueError::StorageError)? {
486 return Err(RedactError::InvalidLocalEchoState.into());
487 }
488 }
489 }
490
491 Ok(())
492 }
493
494 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
514 pub async fn fetch_details_for_event(&self, event_id: &EventId) -> Result<(), Error> {
515 self.controller.fetch_in_reply_to_details(event_id).await
516 }
517
518 #[instrument(skip_all)]
526 pub async fn fetch_members(&self) {
527 self.controller.set_sender_profiles_pending().await;
528 match self.room().sync_members().await {
529 Ok(_) => {
530 self.controller.update_missing_sender_profiles().await;
531 }
532 Err(e) => {
533 self.controller.set_sender_profiles_error(Arc::new(e)).await;
534 }
535 }
536 }
537
538 #[instrument(skip(self))]
544 pub async fn latest_user_read_receipt(
545 &self,
546 user_id: &UserId,
547 ) -> Option<(OwnedEventId, Receipt)> {
548 self.controller.latest_user_read_receipt(user_id).await
549 }
550
551 #[instrument(skip(self))]
559 pub async fn latest_user_read_receipt_timeline_event_id(
560 &self,
561 user_id: &UserId,
562 ) -> Option<OwnedEventId> {
563 self.controller.latest_user_read_receipt_timeline_event_id(user_id).await
564 }
565
566 pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
568 self.controller.subscribe_own_user_read_receipts_changed().await
569 }
570
571 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
582 pub async fn send_single_receipt(
583 &self,
584 receipt_type: ReceiptType,
585 thread: ReceiptThread,
586 event_id: OwnedEventId,
587 ) -> Result<bool> {
588 if !self.controller.should_send_receipt(&receipt_type, &thread, &event_id).await {
589 trace!(
590 "not sending receipt, because we already cover the event with a previous receipt"
591 );
592
593 if thread == ReceiptThread::Unthreaded {
594 self.room().set_unread_flag(false).await?;
596 }
597
598 return Ok(false);
599 }
600
601 trace!("sending receipt");
602 self.room().send_single_receipt(receipt_type, thread, event_id).await?;
603 Ok(true)
604 }
605
606 #[instrument(skip(self))]
615 pub async fn send_multiple_receipts(&self, mut receipts: Receipts) -> Result<()> {
616 if let Some(fully_read) = &receipts.fully_read {
617 if !self
618 .controller
619 .should_send_receipt(
620 &ReceiptType::FullyRead,
621 &ReceiptThread::Unthreaded,
622 fully_read,
623 )
624 .await
625 {
626 receipts.fully_read = None;
627 }
628 }
629
630 if let Some(read_receipt) = &receipts.public_read_receipt {
631 if !self
632 .controller
633 .should_send_receipt(&ReceiptType::Read, &ReceiptThread::Unthreaded, read_receipt)
634 .await
635 {
636 receipts.public_read_receipt = None;
637 }
638 }
639
640 if let Some(private_read_receipt) = &receipts.private_read_receipt {
641 if !self
642 .controller
643 .should_send_receipt(
644 &ReceiptType::ReadPrivate,
645 &ReceiptThread::Unthreaded,
646 private_read_receipt,
647 )
648 .await
649 {
650 receipts.private_read_receipt = None;
651 }
652 }
653
654 let room = self.room();
655
656 if !receipts.is_empty() {
657 room.send_multiple_receipts(receipts).await?;
658 } else {
659 room.set_unread_flag(false).await?;
660 }
661
662 Ok(())
663 }
664
665 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
676 pub async fn mark_as_read(&self, receipt_type: ReceiptType) -> Result<bool> {
677 if let Some(event_id) = self.controller.latest_event_id().await {
678 self.send_single_receipt(receipt_type, ReceiptThread::Unthreaded, event_id).await
679 } else {
680 trace!("can't mark room as read because there's no latest event id");
681
682 self.room().set_unread_flag(false).await?;
684
685 Ok(false)
686 }
687 }
688
689 pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
699 let mut pinned_event_ids = if let Some(event_ids) = self.room().pinned_event_ids() {
700 event_ids
701 } else {
702 self.room().load_pinned_events().await?.unwrap_or_default()
703 };
704 let event_id = event_id.to_owned();
705 if pinned_event_ids.contains(&event_id) {
706 Ok(false)
707 } else {
708 pinned_event_ids.push(event_id);
709 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
710 self.room().send_state_event(content).await?;
711 Ok(true)
712 }
713 }
714
715 pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
725 let mut pinned_event_ids = if let Some(event_ids) = self.room().pinned_event_ids() {
726 event_ids
727 } else {
728 self.room().load_pinned_events().await?.unwrap_or_default()
729 };
730 let event_id = event_id.to_owned();
731 if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
732 pinned_event_ids.remove(idx);
733 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
734 self.room().send_state_event(content).await?;
735 Ok(true)
736 } else {
737 Ok(false)
738 }
739 }
740
741 pub async fn make_replied_to(
747 &self,
748 event: TimelineEvent,
749 ) -> Result<Option<EmbeddedEvent>, Error> {
750 self.controller.make_replied_to(event).await
751 }
752}
753
754#[doc(hidden)]
756impl Timeline {
757 pub async fn items(&self) -> Vector<Arc<TimelineItem>> {
759 self.controller.items().await
760 }
761
762 pub async fn subscribe_filter_map<U: Clone>(
763 &self,
764 f: impl Fn(Arc<TimelineItem>) -> Option<U>,
765 ) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>) {
766 let (items, stream) = self.controller.subscribe_filter_map(f).await;
767 let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
768 (items, stream)
769 }
770}
771
772#[derive(Debug)]
773struct TimelineDropHandle {
774 client: Client,
775 event_handler_handles: Vec<EventHandlerHandle>,
776 room_update_join_handle: JoinHandle<()>,
777 pinned_events_join_handle: Option<JoinHandle<()>>,
778 room_key_from_backups_join_handle: JoinHandle<()>,
779 room_keys_received_join_handle: JoinHandle<()>,
780 room_key_backup_enabled_join_handle: JoinHandle<()>,
781 local_echo_listener_handle: JoinHandle<()>,
782 _event_cache_drop_handle: Arc<EventCacheDropHandles>,
783 encryption_changes_handle: JoinHandle<()>,
784}
785
786impl Drop for TimelineDropHandle {
787 fn drop(&mut self) {
788 for handle in self.event_handler_handles.drain(..) {
789 self.client.remove_event_handler(handle);
790 }
791
792 if let Some(handle) = self.pinned_events_join_handle.take() {
793 handle.abort()
794 };
795
796 self.local_echo_listener_handle.abort();
797 self.room_update_join_handle.abort();
798 self.room_key_from_backups_join_handle.abort();
799 self.room_key_backup_enabled_join_handle.abort();
800 self.room_keys_received_join_handle.abort();
801 self.encryption_changes_handle.abort();
802 }
803}
804
805#[cfg(not(target_family = "wasm"))]
806pub type TimelineEventFilterFn =
807 dyn Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool + Send + Sync;
808#[cfg(target_family = "wasm")]
809pub type TimelineEventFilterFn = dyn Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool;
810
811#[derive(Debug, Clone)]
816pub enum AttachmentSource {
817 Data {
819 bytes: Vec<u8>,
821
822 filename: String,
824 },
825
826 File(PathBuf),
830}
831
832impl AttachmentSource {
833 pub(crate) fn try_into_bytes_and_filename(self) -> Result<(Vec<u8>, String), Error> {
835 match self {
836 Self::Data { bytes, filename } => Ok((bytes, filename)),
837 Self::File(path) => {
838 let filename = path
839 .file_name()
840 .ok_or(Error::InvalidAttachmentFileName)?
841 .to_str()
842 .ok_or(Error::InvalidAttachmentFileName)?
843 .to_owned();
844 let bytes = fs::read(&path).map_err(|_| Error::InvalidAttachmentData)?;
845 Ok((bytes, filename))
846 }
847 }
848 }
849}
850
851impl<P> From<P> for AttachmentSource
852where
853 P: Into<PathBuf>,
854{
855 fn from(value: P) -> Self {
856 Self::File(value.into())
857 }
858}
859
860#[cfg(feature = "unstable-msc4274")]
867#[derive(Debug, Default)]
868pub struct GalleryConfig {
869 pub(crate) txn_id: Option<OwnedTransactionId>,
870 pub(crate) items: Vec<GalleryItemInfo>,
871 pub(crate) caption: Option<String>,
872 pub(crate) formatted_caption: Option<FormattedBody>,
873 pub(crate) mentions: Option<Mentions>,
874 pub(crate) reply: Option<Reply>,
875}
876
877#[cfg(feature = "unstable-msc4274")]
878impl GalleryConfig {
879 pub fn new() -> Self {
881 Self::default()
882 }
883
884 #[must_use]
892 pub fn txn_id(mut self, txn_id: OwnedTransactionId) -> Self {
893 self.txn_id = Some(txn_id);
894 self
895 }
896
897 #[must_use]
903 pub fn add_item(mut self, item: GalleryItemInfo) -> Self {
904 self.items.push(item);
905 self
906 }
907
908 pub fn caption(mut self, caption: Option<String>) -> Self {
914 self.caption = caption;
915 self
916 }
917
918 pub fn formatted_caption(mut self, formatted_caption: Option<FormattedBody>) -> Self {
924 self.formatted_caption = formatted_caption;
925 self
926 }
927
928 pub fn mentions(mut self, mentions: Option<Mentions>) -> Self {
934 self.mentions = mentions;
935 self
936 }
937
938 pub fn reply(mut self, reply: Option<Reply>) -> Self {
944 self.reply = reply;
945 self
946 }
947
948 pub fn len(&self) -> usize {
950 self.items.len()
951 }
952
953 pub fn is_empty(&self) -> bool {
955 self.items.is_empty()
956 }
957}
958
959#[cfg(feature = "unstable-msc4274")]
960impl TryFrom<GalleryConfig> for matrix_sdk::attachment::GalleryConfig {
961 type Error = Error;
962
963 fn try_from(value: GalleryConfig) -> Result<Self, Self::Error> {
964 let mut config = matrix_sdk::attachment::GalleryConfig::new();
965
966 if let Some(txn_id) = value.txn_id {
967 config = config.txn_id(txn_id);
968 }
969
970 for item in value.items {
971 config = config.add_item(item.try_into()?);
972 }
973
974 config = config.caption(value.caption);
975 config = config.formatted_caption(value.formatted_caption);
976 config = config.mentions(value.mentions);
977 config = config.reply(value.reply);
978
979 Ok(config)
980 }
981}
982
983#[cfg(feature = "unstable-msc4274")]
984#[derive(Debug)]
985pub struct GalleryItemInfo {
987 pub source: AttachmentSource,
989 pub content_type: Mime,
991 pub attachment_info: AttachmentInfo,
993 pub caption: Option<String>,
995 pub formatted_caption: Option<FormattedBody>,
997 pub thumbnail: Option<Thumbnail>,
999}
1000
1001#[cfg(feature = "unstable-msc4274")]
1002impl TryFrom<GalleryItemInfo> for matrix_sdk::attachment::GalleryItemInfo {
1003 type Error = Error;
1004
1005 fn try_from(value: GalleryItemInfo) -> Result<Self, Self::Error> {
1006 let (data, filename) = value.source.try_into_bytes_and_filename()?;
1007 Ok(matrix_sdk::attachment::GalleryItemInfo {
1008 filename,
1009 content_type: value.content_type,
1010 data,
1011 attachment_info: value.attachment_info,
1012 caption: value.caption,
1013 formatted_caption: value.formatted_caption,
1014 thumbnail: value.thumbnail,
1015 })
1016 }
1017}