use std::{fmt::Formatter, sync::Arc};
use futures_util::{StreamExt, stream};
use matrix_sdk::{BoxFuture, Room, SendOutsideWasm, SyncOutsideWasm, config::RequestConfig};
use matrix_sdk_base::deserialized_responses::TimelineEvent;
use ruma::{EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, events::relation::RelationType};
use thiserror::Error;
use tokio::sync::Mutex;
use tracing::{debug, warn};
pub struct PinnedEventsLoader {
room: Arc<dyn PinnedEventsRoom>,
previous_pinned_event_ids: Mutex<Vec<OwnedEventId>>,
max_events_to_load: usize,
max_concurrent_requests: usize,
}
impl PinnedEventsLoader {
pub fn new(
room: Arc<dyn PinnedEventsRoom>,
max_events_to_load: usize,
max_concurrent_requests: usize,
) -> Self {
Self {
room,
max_events_to_load,
max_concurrent_requests,
previous_pinned_event_ids: Mutex::new(Vec::new()),
}
}
pub async fn load_events(&self) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
let pinned_event_ids: Vec<OwnedEventId> = self
.room
.pinned_event_ids()
.unwrap_or_default()
.into_iter()
.rev()
.take(self.max_events_to_load)
.rev()
.collect();
if pinned_event_ids == *self.previous_pinned_event_ids.lock().await {
return Ok(None);
}
if pinned_event_ids.is_empty() {
*self.previous_pinned_event_ids.lock().await = Vec::new();
return Ok(Some(Vec::new()));
}
let request_config = Some(RequestConfig::default().retry_limit(3));
let mut loaded_events: Vec<TimelineEvent> =
stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
let provider = self.room.clone();
let relations_filter =
Some(vec![RelationType::Annotation, RelationType::Replacement]);
async move {
match provider
.load_event_with_relations(&event_id, request_config, relations_filter)
.await
{
Ok((event, related_events)) => {
let mut events = vec![event];
events.extend(related_events);
Some(events)
}
Err(err) => {
warn!("error when loading pinned event: {err}");
None
}
}
}
}))
.buffer_unordered(self.max_concurrent_requests)
.flat_map(stream::iter)
.flat_map(stream::iter)
.collect()
.await;
if loaded_events.is_empty() {
return Err(PinnedEventsLoaderError::TimelineReloadFailed);
}
loaded_events.sort_by_key(|item| {
item.raw()
.deserialize()
.map(|e| e.origin_server_ts())
.unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
});
*self.previous_pinned_event_ids.lock().await = pinned_event_ids;
Ok(Some(loaded_events))
}
}
pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
fn load_event_with_relations<'a>(
&'a self,
event_id: &'a EventId,
request_config: Option<RequestConfig>,
related_event_filters: Option<Vec<RelationType>>,
) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>>;
fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>>;
fn is_pinned_event(&self, event_id: &EventId) -> bool;
}
impl PinnedEventsRoom for Room {
fn load_event_with_relations<'a>(
&'a self,
event_id: &'a EventId,
request_config: Option<RequestConfig>,
related_event_filters: Option<Vec<RelationType>>,
) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>> {
Box::pin(async move {
if let Ok((cache, _handles)) = self.event_cache().await
&& let Some(ret) =
cache.find_event_with_relations(event_id, related_event_filters).await?
{
debug!("Loaded pinned event {event_id} and related events from cache");
return Ok(ret);
}
debug!("Loading pinned event {event_id} from HS");
self.event(event_id, request_config).await.map(|e| (e, Vec::new()))
})
}
fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>> {
self.clone_info().pinned_event_ids()
}
fn is_pinned_event(&self, event_id: &EventId) -> bool {
self.clone_info().is_pinned_event(event_id)
}
}
#[cfg(not(tarpaulin_include))]
impl std::fmt::Debug for PinnedEventsLoader {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PinnedEventsLoader")
.field("max_events_to_load", &self.max_events_to_load)
.finish()
}
}
#[derive(Error, Debug)]
pub enum PinnedEventsLoaderError {
#[error("No event found for the given event id.")]
EventNotFound(OwnedEventId),
#[error("Timeline focus is not pinned events.")]
TimelineFocusNotPinnedEvents,
#[error("Could not load pinned events.")]
TimelineReloadFailed,
}