use std::{fs, path::PathBuf, sync::Arc};
use algorithms::rfind_event_by_item_id;
use event_item::TimelineItemHandle;
use eyeball_im::VectorDiff;
#[cfg(feature = "unstable-msc4274")]
use futures::SendGallery;
use futures_core::Stream;
use imbl::Vector;
use matrix_sdk::{
Result,
attachment::{AttachmentInfo, Thumbnail},
deserialized_responses::TimelineEvent,
event_cache::{EventCacheDropHandles, EventFocusThreadMode, RoomEventCache},
room::{
Receipts, Room,
edit::EditedContent,
reply::{EnforceThread, Reply},
},
send_queue::{RoomSendQueueError, SendHandle},
task_monitor::BackgroundTaskHandle,
};
use mime::Mime;
use ruma::{
EventId, OwnedEventId, OwnedTransactionId, UserId,
api::client::receipt::create_receipt::v3::ReceiptType,
events::{
AnyMessageLikeEventContent, AnySyncTimelineEvent, Mentions,
poll::unstable_start::{NewUnstablePollStartEventContent, UnstablePollStartEventContent},
receipt::{Receipt, ReceiptThread},
relation::Thread,
room::message::{
AddMentions, Relation, RelationWithoutReplacement, ReplyWithinThread,
RoomMessageEventContentWithoutRelation, TextMessageEventContent,
},
},
room_version_rules::RoomVersionRules,
};
use subscriber::TimelineWithDropHandle;
use thiserror::Error;
use tracing::{instrument, trace, warn};
use self::{
algorithms::rfind_event_by_id, controller::TimelineController, futures::SendAttachment,
};
use crate::timeline::controller::CryptoDropHandles;
mod algorithms;
mod builder;
mod controller;
mod date_dividers;
mod error;
pub mod event_filter;
mod event_handler;
mod event_item;
pub mod futures;
mod item;
mod latest_event;
mod pagination;
mod subscriber;
mod tasks;
#[cfg(test)]
mod tests;
pub mod thread_list_service;
mod traits;
mod virtual_item;
pub use self::{
builder::TimelineBuilder,
controller::default_event_filter,
error::*,
event_filter::{TimelineEventCondition, TimelineEventFilter},
event_item::{
AnyOtherStateEventContentChange, BeaconInfo, EmbeddedEvent, EncryptedMessage,
EventItemOrigin, EventSendState, EventTimelineItem, InReplyToDetails, LiveLocationState,
MediaUploadProgress, MemberProfileChange, MembershipChange, Message, MsgLikeContent,
MsgLikeKind, OtherMessageLike, OtherState, PollResult, PollState, Profile, ReactionInfo,
ReactionStatus, ReactionsByKeyBySender, RoomMembershipChange, RoomPinnedEventsChange,
Sticker, ThreadSummary, TimelineDetails, TimelineEventItemId, TimelineEventShieldState,
TimelineEventShieldStateCode, TimelineItemContent,
},
item::{TimelineItem, TimelineItemKind, TimelineUniqueId},
latest_event::{LatestEventValue, LatestEventValueLocalState},
thread_list_service::{ThreadListPaginationState, ThreadListService},
traits::RoomExt,
virtual_item::VirtualTimelineItem,
};
#[derive(Debug)]
pub struct Timeline {
controller: TimelineController,
event_cache: RoomEventCache,
drop_handle: Arc<TimelineDropHandle>,
}
#[derive(Clone, Debug, PartialEq)]
pub enum TimelineFocus {
Live {
hide_threaded_events: bool,
},
Event {
target: OwnedEventId,
num_context_events: u16,
thread_mode: TimelineEventFocusThreadMode,
},
Thread { root_event_id: OwnedEventId },
PinnedEvents,
}
#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum TimelineEventFocusThreadMode {
ForceThread,
Automatic {
hide_threaded_events: bool,
},
}
impl From<TimelineEventFocusThreadMode> for EventFocusThreadMode {
fn from(val: TimelineEventFocusThreadMode) -> Self {
match val {
TimelineEventFocusThreadMode::ForceThread => EventFocusThreadMode::ForceThread,
TimelineEventFocusThreadMode::Automatic { .. } => EventFocusThreadMode::Automatic,
}
}
}
impl TimelineFocus {
pub(super) fn debug_string(&self) -> String {
match self {
TimelineFocus::Live { .. } => "live".to_owned(),
TimelineFocus::Event { target, .. } => format!("permalink:{target}"),
TimelineFocus::Thread { root_event_id, .. } => format!("thread:{root_event_id}"),
TimelineFocus::PinnedEvents => "pinned-events".to_owned(),
}
}
}
#[derive(Debug, Clone)]
pub enum DateDividerMode {
Daily,
Monthly,
}
#[derive(Debug, Default)]
pub struct AttachmentConfig {
pub txn_id: Option<OwnedTransactionId>,
pub info: Option<AttachmentInfo>,
pub thumbnail: Option<Thumbnail>,
pub caption: Option<TextMessageEventContent>,
pub mentions: Option<Mentions>,
pub in_reply_to: Option<OwnedEventId>,
}
impl Timeline {
pub fn room(&self) -> &Room {
self.controller.room()
}
pub async fn clear(&self) {
self.controller.clear().await;
}
pub async fn retry_decryption<S: Into<String>>(
&self,
session_ids: impl IntoIterator<Item = S>,
) {
self.controller
.retry_event_decryption(Some(session_ids.into_iter().map(Into::into).collect()))
.await;
}
#[tracing::instrument(skip(self))]
async fn retry_decryption_for_all_events(&self) {
self.controller.retry_event_decryption(None).await;
}
pub async fn item_by_event_id(&self, event_id: &EventId) -> Option<EventTimelineItem> {
let items = self.controller.items().await;
let (_, item) = rfind_event_by_id(&items, event_id)?;
Some(item.to_owned())
}
pub async fn latest_event_id(&self) -> Option<OwnedEventId> {
self.controller.latest_event_id().await
}
pub async fn subscribe(
&self,
) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + use<>)
{
let (items, stream) = self.controller.subscribe().await;
let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
(items, stream)
}
#[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))]
pub async fn send(&self, mut content: AnyMessageLikeEventContent) -> Result<SendHandle, Error> {
if content.relation().is_none()
&& let Some(reply) = self.infer_reply(None).await
{
match &mut content {
AnyMessageLikeEventContent::RoomMessage(room_msg_content) => {
content = self
.room()
.make_reply_event(
room_msg_content.clone().into(),
reply,
)
.await?
.into();
}
AnyMessageLikeEventContent::UnstablePollStart(
UnstablePollStartEventContent::New(poll),
) => {
if let Some(thread_root) = self.controller.thread_root() {
poll.relates_to = Some(RelationWithoutReplacement::Thread(Thread::plain(
thread_root,
reply.event_id,
)));
}
}
AnyMessageLikeEventContent::Sticker(sticker) => {
if let Some(thread_root) = self.controller.thread_root() {
sticker.relates_to =
Some(Relation::Thread(Thread::plain(thread_root, reply.event_id)));
}
}
_ => {}
}
}
Ok(self.room().send_queue().send(content).await?)
}
#[instrument(skip(self, content))]
pub async fn send_reply(
&self,
content: RoomMessageEventContentWithoutRelation,
in_reply_to: OwnedEventId,
) -> Result<(), Error> {
let reply = self
.infer_reply(Some(in_reply_to))
.await
.expect("the reply will always be set because we provided a replied-to event id");
let content = self.room().make_reply_event(content, reply).await?;
self.send(content.into()).await?;
Ok(())
}
pub(crate) async fn infer_reply(&self, in_reply_to: Option<OwnedEventId>) -> Option<Reply> {
if let Some(in_reply_to) = in_reply_to {
let enforce_thread = if self.controller.is_threaded() {
EnforceThread::Threaded(ReplyWithinThread::Yes)
} else {
EnforceThread::MaybeThreaded
};
return Some(Reply {
event_id: in_reply_to,
enforce_thread,
add_mentions: AddMentions::Yes,
});
}
let thread_root = self.controller.thread_root()?;
let latest_event_id = self
.controller
.items()
.await
.iter()
.rev()
.find_map(|item| {
if let TimelineItemKind::Event(event) = item.kind() {
event.event_id().map(ToOwned::to_owned)
} else {
None
}
})
.unwrap_or(thread_root);
Some(Reply {
event_id: latest_event_id,
enforce_thread: EnforceThread::Threaded(ReplyWithinThread::No),
add_mentions: AddMentions::Yes,
})
}
#[instrument(skip(self, new_content))]
pub async fn edit(
&self,
item_id: &TimelineEventItemId,
new_content: EditedContent,
) -> Result<(), Error> {
let items = self.items().await;
let Some((_pos, item)) = rfind_event_by_item_id(&items, item_id) else {
return Err(Error::EventNotInTimeline(item_id.clone()));
};
match item.handle() {
TimelineItemHandle::Remote(event_id) => {
let content = self
.room()
.make_edit_event(event_id, new_content)
.await
.map_err(EditError::RoomError)?;
self.send(content).await?;
Ok(())
}
TimelineItemHandle::Local(handle) => {
let new_content: AnyMessageLikeEventContent = match new_content {
EditedContent::RoomMessage(message) => {
if item.content.is_message() {
AnyMessageLikeEventContent::RoomMessage(message.into())
} else {
return Err(EditError::ContentMismatch {
original: item.content.debug_string().to_owned(),
new: "a message".to_owned(),
}
.into());
}
}
EditedContent::PollStart { new_content, .. } => {
if item.content.is_poll() {
AnyMessageLikeEventContent::UnstablePollStart(
UnstablePollStartEventContent::New(
NewUnstablePollStartEventContent::new(new_content),
),
)
} else {
return Err(EditError::ContentMismatch {
original: item.content.debug_string().to_owned(),
new: "a poll".to_owned(),
}
.into());
}
}
EditedContent::MediaCaption { caption, formatted_caption, mentions } => {
if handle
.edit_media_caption(caption, formatted_caption, mentions)
.await
.map_err(RoomSendQueueError::StorageError)?
{
return Ok(());
}
return Err(EditError::InvalidLocalEchoState.into());
}
};
if !handle.edit(new_content).await.map_err(RoomSendQueueError::StorageError)? {
return Err(EditError::InvalidLocalEchoState.into());
}
Ok(())
}
}
}
pub async fn toggle_reaction(
&self,
item_id: &TimelineEventItemId,
reaction_key: &str,
) -> Result<bool, Error> {
self.controller.toggle_reaction_local(item_id, reaction_key).await
}
#[instrument(skip_all)]
pub fn send_attachment(
&self,
source: impl Into<AttachmentSource>,
mime_type: Mime,
config: AttachmentConfig,
) -> SendAttachment<'_> {
SendAttachment::new(self, source.into(), mime_type, config)
}
#[cfg(feature = "unstable-msc4274")]
#[instrument(skip_all)]
pub fn send_gallery(&self, gallery: GalleryConfig) -> SendGallery<'_> {
SendGallery::new(self, gallery)
}
pub async fn redact(
&self,
item_id: &TimelineEventItemId,
reason: Option<&str>,
) -> Result<(), Error> {
let items = self.items().await;
let Some((_pos, event)) = rfind_event_by_item_id(&items, item_id) else {
return Err(RedactError::ItemNotFound(item_id.clone()).into());
};
match event.handle() {
TimelineItemHandle::Remote(event_id) => {
self.room().redact(event_id, reason, None).await.map_err(RedactError::HttpError)?;
}
TimelineItemHandle::Local(handle) => {
if !handle.abort().await.map_err(RoomSendQueueError::StorageError)? {
return Err(RedactError::InvalidLocalEchoState.into());
}
}
}
Ok(())
}
#[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
pub async fn fetch_details_for_event(&self, event_id: &EventId) -> Result<(), Error> {
self.controller.fetch_in_reply_to_details(event_id).await
}
#[instrument(skip_all)]
pub async fn fetch_members(&self) {
self.controller.set_sender_profiles_pending().await;
match self.room().sync_members().await {
Ok(_) => {
self.controller.update_missing_sender_profiles().await;
}
Err(e) => {
self.controller.set_sender_profiles_error(Arc::new(e)).await;
}
}
}
#[instrument(skip(self))]
pub async fn latest_user_read_receipt(
&self,
user_id: &UserId,
) -> Option<(OwnedEventId, Receipt)> {
self.controller.latest_user_read_receipt(user_id).await
}
#[instrument(skip(self))]
pub async fn latest_user_read_receipt_timeline_event_id(
&self,
user_id: &UserId,
) -> Option<OwnedEventId> {
self.controller.latest_user_read_receipt_timeline_event_id(user_id).await
}
pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> + use<> {
self.controller.subscribe_own_user_read_receipts_changed().await
}
#[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
pub async fn send_single_receipt(
&self,
receipt_type: ReceiptType,
event_id: OwnedEventId,
) -> Result<bool> {
let thread = self.controller.infer_thread_for_read_receipt(&receipt_type);
if !self.controller.should_send_receipt(&receipt_type, &thread, &event_id).await {
trace!(
"not sending receipt, because we already cover the event with a previous receipt"
);
if thread == ReceiptThread::Unthreaded {
self.room().set_unread_flag(false).await?;
}
return Ok(false);
}
trace!("sending receipt");
self.room().send_single_receipt(receipt_type, thread, event_id).await?;
Ok(true)
}
#[instrument(skip(self))]
pub async fn send_multiple_receipts(&self, mut receipts: Receipts) -> Result<()> {
if let Some(fully_read) = &receipts.fully_read
&& !self
.controller
.should_send_receipt(
&ReceiptType::FullyRead,
&ReceiptThread::Unthreaded,
fully_read,
)
.await
{
receipts.fully_read = None;
}
if let Some(read_receipt) = &receipts.public_read_receipt
&& !self
.controller
.should_send_receipt(&ReceiptType::Read, &ReceiptThread::Unthreaded, read_receipt)
.await
{
receipts.public_read_receipt = None;
}
if let Some(private_read_receipt) = &receipts.private_read_receipt
&& !self
.controller
.should_send_receipt(
&ReceiptType::ReadPrivate,
&ReceiptThread::Unthreaded,
private_read_receipt,
)
.await
{
receipts.private_read_receipt = None;
}
let room = self.room();
if !receipts.is_empty() {
room.send_multiple_receipts(receipts).await?;
} else {
room.set_unread_flag(false).await?;
}
Ok(())
}
#[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
pub async fn mark_as_read(&self, receipt_type: ReceiptType) -> Result<bool> {
if let Some(event_id) = self.controller.latest_event_id().await {
self.send_single_receipt(receipt_type, event_id).await
} else {
trace!("can't mark room as read because there's no latest event id");
if self.controller.is_live() {
self.room().set_unread_flag(false).await?;
}
Ok(false)
}
}
pub async fn make_replied_to(
&self,
event: TimelineEvent,
) -> Result<Option<EmbeddedEvent>, Error> {
self.controller.make_replied_to(event).await
}
pub fn is_threaded(&self) -> bool {
self.controller.is_threaded()
}
}
#[doc(hidden)]
impl Timeline {
pub async fn items(&self) -> Vector<Arc<TimelineItem>> {
self.controller.items().await
}
pub async fn subscribe_filter_map<U: Clone>(
&self,
f: impl Fn(Arc<TimelineItem>) -> Option<U>,
) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>) {
let (items, stream) = self.controller.subscribe_filter_map(f).await;
let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
(items, stream)
}
}
#[derive(Debug)]
struct TimelineDropHandle {
_room_update_join_handle: BackgroundTaskHandle,
_local_echo_listener_handle: BackgroundTaskHandle,
_event_cache_drop_handle: Arc<EventCacheDropHandles>,
_focus_drop_handle: Option<BackgroundTaskHandle>,
_crypto_drop_handles: CryptoDropHandles,
}
#[cfg(not(target_family = "wasm"))]
pub type TimelineEventFilterFn =
dyn Fn(&AnySyncTimelineEvent, &RoomVersionRules) -> bool + Send + Sync;
#[cfg(target_family = "wasm")]
pub type TimelineEventFilterFn = dyn Fn(&AnySyncTimelineEvent, &RoomVersionRules) -> bool;
#[derive(Debug, Clone)]
pub enum AttachmentSource {
Data {
bytes: Vec<u8>,
filename: String,
},
File(PathBuf),
}
impl AttachmentSource {
pub(crate) fn try_into_bytes_and_filename(self) -> Result<(Vec<u8>, String), Error> {
match self {
Self::Data { bytes, filename } => Ok((bytes, filename)),
Self::File(path) => {
let filename = path
.file_name()
.ok_or(Error::InvalidAttachmentFileName)?
.to_str()
.ok_or(Error::InvalidAttachmentFileName)?
.to_owned();
let bytes = fs::read(&path).map_err(|_| Error::InvalidAttachmentData)?;
Ok((bytes, filename))
}
}
}
}
impl<P> From<P> for AttachmentSource
where
P: Into<PathBuf>,
{
fn from(value: P) -> Self {
Self::File(value.into())
}
}
#[cfg(feature = "unstable-msc4274")]
#[derive(Debug, Default)]
pub struct GalleryConfig {
pub(crate) txn_id: Option<OwnedTransactionId>,
pub(crate) items: Vec<GalleryItemInfo>,
pub(crate) caption: Option<TextMessageEventContent>,
pub(crate) mentions: Option<Mentions>,
pub(crate) in_reply_to: Option<OwnedEventId>,
}
#[cfg(feature = "unstable-msc4274")]
impl GalleryConfig {
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn txn_id(mut self, txn_id: OwnedTransactionId) -> Self {
self.txn_id = Some(txn_id);
self
}
#[must_use]
pub fn add_item(mut self, item: GalleryItemInfo) -> Self {
self.items.push(item);
self
}
pub fn caption(mut self, caption: Option<TextMessageEventContent>) -> Self {
self.caption = caption;
self
}
pub fn mentions(mut self, mentions: Option<Mentions>) -> Self {
self.mentions = mentions;
self
}
pub fn in_reply_to(mut self, event_id: Option<OwnedEventId>) -> Self {
self.in_reply_to = event_id;
self
}
pub fn len(&self) -> usize {
self.items.len()
}
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
}
#[cfg(feature = "unstable-msc4274")]
#[derive(Debug)]
pub struct GalleryItemInfo {
pub source: AttachmentSource,
pub content_type: Mime,
pub attachment_info: AttachmentInfo,
pub caption: Option<TextMessageEventContent>,
pub thumbnail: Option<Thumbnail>,
}
#[cfg(feature = "unstable-msc4274")]
impl TryFrom<GalleryItemInfo> for matrix_sdk::attachment::GalleryItemInfo {
type Error = Error;
fn try_from(value: GalleryItemInfo) -> Result<Self, Self::Error> {
let (data, filename) = value.source.try_into_bytes_and_filename()?;
Ok(matrix_sdk::attachment::GalleryItemInfo {
filename,
content_type: value.content_type,
data,
attachment_info: value.attachment_info,
caption: value.caption,
thumbnail: value.thumbnail,
})
}
}
#[derive(Clone, Debug)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
pub enum TimelineReadReceiptTracking {
AllEvents,
MessageLikeEvents,
Disabled,
}
impl TimelineReadReceiptTracking {
pub fn is_enabled(&self) -> bool {
match self {
TimelineReadReceiptTracking::AllEvents
| TimelineReadReceiptTracking::MessageLikeEvents => true,
TimelineReadReceiptTracking::Disabled => false,
}
}
}