use std::{collections::BTreeSet, sync::Arc};
use futures_util::{StreamExt as _, stream};
use matrix_sdk_base::{
event_cache::{Event, store::EventCacheStoreLock},
linked_chunk::{LinkedChunkId, OwnedLinkedChunkId},
serde_helpers::extract_relation,
task_monitor::BackgroundTaskHandle,
};
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId,
events::{
AnySyncMessageLikeEvent, AnySyncTimelineEvent, MessageLikeEventType, relation::RelationType,
},
room_version_rules::RedactionRules,
serde::Raw,
};
use tokio::sync::broadcast::{Receiver, Sender};
use tracing::{debug, instrument, trace, warn};
#[cfg(feature = "e2e-encryption")]
use super::super::redecryptor::ResolvedUtd;
use super::{
super::{EventCacheError, EventsOrigin, Result, persistence::send_updates_to_store},
event_linked_chunk::EventLinkedChunk,
lock,
room::RoomEventCacheLinkedChunkUpdate,
};
use crate::{
Room, client::WeakClient, config::RequestConfig, event_cache::TimelineVectorDiffs,
room::WeakRoom,
};
pub(in super::super) struct PinnedEventCacheState {
room_id: OwnedRoomId,
sender: Sender<TimelineVectorDiffs>,
chunk: EventLinkedChunk,
store: EventCacheStoreLock,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
}
impl lock::Store for PinnedEventCacheState {
fn store(&self) -> &EventCacheStoreLock {
&self.store
}
}
#[cfg(not(tarpaulin_include))]
impl std::fmt::Debug for PinnedEventCacheState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PinnedEventCacheState")
.field("room_id", &self.room_id)
.field("chunk", &self.chunk)
.finish_non_exhaustive()
}
}
pub type PinnedEventCacheStateLock = lock::StateLock<PinnedEventCacheState>;
pub type PinnedEventCacheStateLockWriteGuard<'a> =
lock::StateLockWriteGuard<'a, PinnedEventCacheState>;
impl<'a> lock::Reload for PinnedEventCacheStateLockWriteGuard<'a> {
async fn reload(&mut self) -> Result<()> {
self.reload_from_storage().await?;
Ok(())
}
}
impl<'a> PinnedEventCacheStateLockWriteGuard<'a> {
async fn reload_from_storage(&mut self) -> Result<()> {
let room_id = self.state.room_id.clone();
let linked_chunk_id = LinkedChunkId::PinnedEvents(&room_id);
let (last_chunk, chunk_id_gen) = self.store.load_last_chunk(linked_chunk_id).await?;
let Some(last_chunk) = last_chunk else {
if self.state.chunk.events().next().is_some() {
self.state.chunk.reset();
self.notify_subscribers(EventsOrigin::Sync);
}
return Ok(());
};
{
let mut current_chunk_identifier = last_chunk.identifier;
self.state.chunk.replace_with(Some(last_chunk), chunk_id_gen)?;
while let Some(previous_chunk) =
self.store.load_previous_chunk(linked_chunk_id, current_chunk_identifier).await?
{
current_chunk_identifier = previous_chunk.identifier;
self.state.chunk.insert_new_chunk_as_first(previous_chunk)?;
}
}
self.state.chunk.store_updates().take();
self.notify_subscribers(EventsOrigin::Cache);
Ok(())
}
async fn replace_all_events(&mut self, new_events: Vec<Event>) -> Result<()> {
trace!("resetting all pinned events in linked chunk");
let previous_pinned_event_ids = self.state.current_event_ids();
if new_events.iter().filter_map(|e| e.event_id()).collect::<BTreeSet<_>>()
== previous_pinned_event_ids.iter().cloned().collect()
{
return Ok(());
}
if self.state.chunk.events().next().is_some() {
self.state.chunk.reset();
}
self.state.chunk.push_live_events(None, &new_events);
self.propagate_changes().await?;
self.notify_subscribers(EventsOrigin::Sync);
Ok(())
}
async fn propagate_changes(&mut self) -> Result<()> {
let updates = self.state.chunk.store_updates().take();
let linked_chunk_id = OwnedLinkedChunkId::PinnedEvents(self.state.room_id.clone());
send_updates_to_store(
&self.store,
linked_chunk_id,
&self.state.linked_chunk_update_sender,
updates,
)
.await
}
fn notify_subscribers(&mut self, origin: EventsOrigin) {
let diffs = self.state.chunk.updates_as_vector_diffs();
if !diffs.is_empty() {
let _ = self.state.sender.send(TimelineVectorDiffs { diffs, origin });
}
}
}
impl PinnedEventCacheState {
fn current_event_ids(&self) -> Vec<OwnedEventId> {
self.chunk.events().filter_map(|(_position, event)| event.event_id()).collect()
}
}
#[derive(Clone)]
pub struct PinnedEventCache {
state: Arc<PinnedEventCacheStateLock>,
_task: Arc<BackgroundTaskHandle>,
}
impl PinnedEventCache {
pub(in super::super) fn new(
room: Room,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
store: EventCacheStoreLock,
) -> Self {
let sender = Sender::new(32);
let room_id = room.room_id().to_owned();
let chunk = EventLinkedChunk::new();
let state =
PinnedEventCacheState { room_id, chunk, sender, linked_chunk_update_sender, store };
let state = Arc::new(PinnedEventCacheStateLock::new_inner(state));
let task = Arc::new(
room.client()
.task_monitor()
.spawn_infinite_task(
"pinned_event_listener_task",
Self::pinned_event_listener_task(room, state.clone()),
)
.abort_on_drop(),
);
Self { state, _task: task }
}
pub async fn subscribe(&self) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
let guard = self.state.read().await?;
let events = guard.state.chunk.events().map(|(_position, item)| item.clone()).collect();
let recv = guard.state.sender.subscribe();
Ok((events, recv))
}
#[cfg(feature = "e2e-encryption")]
pub(in crate::event_cache) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> {
let mut guard = self.state.write().await?;
if guard.state.chunk.replace_utds(events) {
guard.propagate_changes().await?;
guard.notify_subscribers(EventsOrigin::Cache);
}
Ok(())
}
fn extract_relation_target(raw: &Raw<AnySyncTimelineEvent>) -> Option<OwnedEventId> {
let (rel_type, event_id) = extract_relation(raw)?;
match rel_type {
RelationType::Thread => None,
_ => Some(event_id),
}
}
fn extract_redaction_target(
raw: &Raw<AnySyncTimelineEvent>,
room_redaction_rules: &RedactionRules,
) -> Option<OwnedEventId> {
if raw.get_field::<MessageLikeEventType>("type").ok()??
!= MessageLikeEventType::RoomRedaction
{
return None;
}
let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(redaction)) =
raw.deserialize().ok()?
else {
return None;
};
redaction.redacts(room_redaction_rules).map(ToOwned::to_owned).or_else(|| {
warn!("missing target event id from the redaction event");
None
})
}
pub(in super::super) async fn maybe_add_live_related_events(
&mut self,
events: &[Event],
room_redaction_rules: &RedactionRules,
) -> Result<()> {
trace!("checking live events for relations to pinned events");
let mut guard = self.state.write().await?;
let pinned_event_ids: BTreeSet<OwnedEventId> =
guard.state.current_event_ids().into_iter().collect();
if pinned_event_ids.is_empty() {
return Ok(());
}
let mut new_relations = Vec::new();
for ev in events {
if let Some(relation_target) = Self::extract_relation_target(ev.raw())
&& pinned_event_ids.contains(&relation_target)
{
new_relations.push(ev.clone());
continue;
}
if let Some(redaction_target) =
Self::extract_redaction_target(ev.raw(), room_redaction_rules)
&& pinned_event_ids.contains(&redaction_target)
{
new_relations.push(ev.clone());
continue;
}
}
if !new_relations.is_empty() {
trace!("found {} new related events to pinned events", new_relations.len());
guard.state.chunk.push_live_events(None, &new_relations);
guard.propagate_changes().await?;
guard.notify_subscribers(EventsOrigin::Sync);
}
Ok(())
}
#[instrument(fields(%room_id = room.room_id()), skip(room, state))]
async fn pinned_event_listener_task(room: Room, state: Arc<PinnedEventCacheStateLock>) {
debug!("pinned events listener task started");
let reload_from_network = async |room: Room| {
let events = match Self::reload_pinned_events(room).await {
Ok(Some(events)) => events,
Ok(None) => Vec::new(),
Err(err) => {
warn!("error when loading pinned events: {err}");
return;
}
};
match state.write().await {
Ok(mut guard) => {
guard.replace_all_events(events).await.unwrap_or_else(|err| {
warn!("error when replacing pinned events: {err}");
});
}
Err(err) => {
warn!("error when acquiring write lock to replace pinned events: {err}");
}
}
};
match state.write().await {
Ok(mut guard) => {
guard.reload_from_storage().await.unwrap_or_else(|err| {
warn!("error when reloading pinned events from storage, at start: {err}");
});
let actual_pinned_events = room.pinned_event_ids().unwrap_or_default();
let reloaded_set =
guard.state.current_event_ids().into_iter().collect::<BTreeSet<_>>();
if actual_pinned_events.len() != reloaded_set.len()
|| actual_pinned_events.iter().any(|event_id| !reloaded_set.contains(event_id))
{
drop(guard);
reload_from_network(room.clone()).await;
}
}
Err(err) => {
warn!("error when acquiring write lock to initialize pinned events: {err}");
}
}
let weak_room =
WeakRoom::new(WeakClient::from_client(&room.client()), room.room_id().to_owned());
let mut stream = room.pinned_event_ids_stream();
drop(room);
while let Some(new_list) = stream.next().await {
trace!("handling update");
let guard = match state.read().await {
Ok(guard) => guard,
Err(err) => {
warn!("error when acquiring read lock to handle pinned events update: {err}");
break;
}
};
let current_set = guard.state.current_event_ids().into_iter().collect::<BTreeSet<_>>();
if !new_list.is_empty()
&& new_list.iter().all(|event_id| current_set.contains(event_id))
{
continue;
}
let Some(room) = weak_room.get() else {
debug!("room has been dropped, ending pinned events listener task");
break;
};
drop(guard);
reload_from_network(room).await;
}
debug!("pinned events listener task ended");
}
async fn reload_pinned_events(room: Room) -> Result<Option<Vec<Event>>> {
let (max_events_to_load, max_concurrent_requests) = {
let client = room.client();
let config = client.event_cache().config();
(config.max_pinned_events_to_load, config.max_pinned_events_concurrent_requests)
};
let pinned_event_ids: Vec<OwnedEventId> = room
.pinned_event_ids()
.unwrap_or_default()
.into_iter()
.rev()
.take(max_events_to_load)
.rev()
.collect();
if pinned_event_ids.is_empty() {
return Ok(Some(Vec::new()));
}
let mut num_successful_loads = 0;
let mut loaded_events: Vec<Event> =
stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
let room = room.clone();
let filter = vec![RelationType::Annotation, RelationType::Replacement];
let request_config = RequestConfig::default().retry_limit(3);
async move {
let (target, mut relations) = room
.load_or_fetch_event_with_relations(
&event_id,
Some(filter),
Some(request_config),
)
.await?;
relations.insert(0, target);
Ok::<_, crate::Error>(relations)
}
}))
.buffer_unordered(max_concurrent_requests)
.inspect(|result| {
if result.is_ok() {
num_successful_loads += 1;
}
})
.flat_map(stream::iter)
.flat_map(stream::iter)
.collect()
.await;
if num_successful_loads != pinned_event_ids.len() {
warn!(
"only successfully loaded {} out of {} pinned events",
num_successful_loads,
pinned_event_ids.len()
);
}
if loaded_events.is_empty() {
return Err(EventCacheError::UnableToLoadPinnedEvents);
}
loaded_events.sort_by_key(|item| {
item.raw()
.deserialize()
.map(|e| e.origin_server_ts())
.unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
});
Ok(Some(loaded_events))
}
}