use std::{collections::HashMap, sync::Arc};
use async_once_cell::OnceCell;
use matrix_sdk_base::RoomInfoNotableUpdateReasons;
use ruma::{EventId, OwnedEventId};
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
use tracing::error;
use super::{
LatestEvent,
latest_event::{IsLatestEventValueNone, With},
};
use crate::{
event_cache::{EventCache, EventCacheError, RoomEventCache},
room::WeakRoom,
send_queue::RoomSendQueueUpdate,
};
#[derive(Debug)]
pub(super) struct RoomLatestEvents {
state: Arc<RwLock<RoomLatestEventsState>>,
}
impl RoomLatestEvents {
pub fn new(
weak_room: WeakRoom,
event_cache: &EventCache,
) -> With<Self, IsLatestEventValueNone> {
let latest_event_with = Self::create_latest_event(&weak_room, None);
With::map(latest_event_with, |for_the_room| Self {
state: Arc::new(RwLock::new(RoomLatestEventsState {
for_the_room,
per_thread: HashMap::new(),
weak_room,
event_cache: event_cache.clone(),
room_event_cache: OnceCell::new(),
})),
})
}
fn create_latest_event(
weak_room: &WeakRoom,
thread_id: Option<&EventId>,
) -> With<LatestEvent, IsLatestEventValueNone> {
LatestEvent::new(weak_room, thread_id)
}
pub async fn read(&self) -> RoomLatestEventsReadGuard {
RoomLatestEventsReadGuard { inner: self.state.clone().read_owned().await }
}
pub async fn write(&self) -> RoomLatestEventsWriteGuard {
RoomLatestEventsWriteGuard { inner: self.state.clone().write_owned().await }
}
}
#[derive(Debug)]
struct RoomLatestEventsState {
for_the_room: LatestEvent,
per_thread: HashMap<OwnedEventId, LatestEvent>,
event_cache: EventCache,
room_event_cache: OnceCell<RoomEventCache>,
weak_room: WeakRoom,
}
pub(super) struct RoomLatestEventsReadGuard {
inner: OwnedRwLockReadGuard<RoomLatestEventsState>,
}
impl RoomLatestEventsReadGuard {
pub fn for_room(&self) -> &LatestEvent {
&self.inner.for_the_room
}
pub fn for_thread(&self, thread_id: &EventId) -> Option<&LatestEvent> {
self.inner.per_thread.get(thread_id)
}
#[cfg(test)]
pub fn per_thread(&self) -> &HashMap<OwnedEventId, LatestEvent> {
&self.inner.per_thread
}
}
pub(super) struct RoomLatestEventsWriteGuard {
inner: OwnedRwLockWriteGuard<RoomLatestEventsState>,
}
impl RoomLatestEventsWriteGuard {
pub fn has_thread(&self, thread_id: &EventId) -> bool {
self.inner.per_thread.contains_key(thread_id)
}
pub fn create_and_insert_latest_event_for_thread(&mut self, thread_id: &EventId) {
let latest_event_with =
RoomLatestEvents::create_latest_event(&self.inner.weak_room, Some(thread_id));
self.inner.per_thread.insert(thread_id.to_owned(), With::inner(latest_event_with));
}
pub fn forget_thread(&mut self, thread_id: &EventId) {
self.inner.per_thread.remove(thread_id);
}
pub async fn update_with_event_cache(&mut self) {
let Some(room) = self.inner.weak_room.get() else {
error!(room = ?self.inner.weak_room, "Room is unknown");
return;
};
let own_user_id = room.own_user_id();
let power_levels = room.power_levels().await.ok();
let inner = &mut *self.inner;
let for_the_room = &mut inner.for_the_room;
let per_thread = &mut inner.per_thread;
let room_event_cache = match inner
.room_event_cache
.get_or_try_init(async {
let (room_event_cache, _drop_handles) =
inner.event_cache.for_room(room.room_id()).await?;
Ok::<RoomEventCache, EventCacheError>(room_event_cache)
})
.await
{
Ok(room_event_cache) => room_event_cache,
Err(err) => {
error!(room_id = ?room.room_id(), ?err, "Failed to fetch the `RoomEventCache`");
return;
}
};
for_the_room
.update_with_event_cache(room_event_cache, own_user_id, power_levels.as_ref())
.await;
for latest_event in per_thread.values_mut() {
latest_event
.update_with_event_cache(room_event_cache, own_user_id, power_levels.as_ref())
.await;
}
}
pub async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) {
let Some(room) = self.inner.weak_room.get() else {
return;
};
let own_user_id = room.own_user_id();
let power_levels = room.power_levels().await.ok();
let inner = &mut *self.inner;
let for_the_room = &mut inner.for_the_room;
let per_thread = &mut inner.per_thread;
let room_event_cache = match inner
.room_event_cache
.get_or_try_init(async {
let (room_event_cache, _drop_handles) =
inner.event_cache.for_room(room.room_id()).await?;
Ok::<RoomEventCache, EventCacheError>(room_event_cache)
})
.await
{
Ok(room_event_cache) => room_event_cache,
Err(err) => {
error!(room_id = ?room.room_id(), ?err, "Failed to fetch the `RoomEventCache`");
return;
}
};
for_the_room
.update_with_send_queue(
send_queue_update,
room_event_cache,
own_user_id,
power_levels.as_ref(),
)
.await;
for latest_event in per_thread.values_mut() {
latest_event
.update_with_send_queue(
send_queue_update,
room_event_cache,
own_user_id,
power_levels.as_ref(),
)
.await;
}
}
pub async fn update_with_room_info(&mut self, reasons: RoomInfoNotableUpdateReasons) {
let Some(room) = self.inner.weak_room.get() else {
return;
};
self.inner.for_the_room.update_with_room_info(room, reasons).await;
}
}