matrix_sdk/event_cache/mod.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The event cache is an abstraction layer, sitting between the Rust SDK and a
16//! final client, that acts as a global observer of all the rooms, gathering and
17//! inferring some extra useful information about each room. In particular, this
18//! doesn't require subscribing to a specific room to get access to this
19//! information.
20//!
21//! It's intended to be fast, robust and easy to maintain, having learned from
22//! previous endeavours at implementing middle to high level features elsewhere
23//! in the SDK, notably in the UI's Timeline object.
24//!
25//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more
26//! details about the historical reasons that led us to start writing this.
27
28#![forbid(missing_docs)]
29
30use std::{
31 collections::BTreeMap,
32 fmt::Debug,
33 sync::{Arc, OnceLock},
34};
35
36use eyeball::{SharedObservable, Subscriber};
37use eyeball_im::VectorDiff;
38use futures_util::future::{join_all, try_join_all};
39use matrix_sdk_base::{
40 deserialized_responses::{AmbiguityChange, TimelineEvent},
41 event_cache::store::{EventCacheStoreError, EventCacheStoreLock},
42 linked_chunk::lazy_loader::LazyLoaderError,
43 store_locks::LockStoreError,
44 sync::RoomUpdates,
45};
46use matrix_sdk_common::executor::{spawn, JoinHandle};
47use room::RoomEventCacheState;
48use ruma::{
49 events::AnySyncEphemeralRoomEvent, serde::Raw, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
50};
51use tokio::sync::{
52 broadcast::{error::RecvError, Receiver},
53 mpsc, Mutex, RwLock,
54};
55use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument as _, Span};
56
57use self::paginator::PaginatorError;
58use crate::{client::WeakClient, Client};
59
60mod deduplicator;
61mod pagination;
62mod room;
63
64pub mod paginator;
65pub use pagination::{PaginationToken, RoomPagination, RoomPaginationStatus};
66pub use room::{RoomEventCache, RoomEventCacheListener};
67
68/// An error observed in the [`EventCache`].
69#[derive(thiserror::Error, Debug)]
70pub enum EventCacheError {
71 /// The [`EventCache`] instance hasn't been initialized with
72 /// [`EventCache::subscribe`]
73 #[error(
74 "The EventCache hasn't subscribed to sync responses yet, call `EventCache::subscribe()`"
75 )]
76 NotSubscribedYet,
77
78 /// An error has been observed while back-paginating.
79 #[error(transparent)]
80 BackpaginationError(#[from] PaginatorError),
81
82 /// Back-pagination was already happening in a given room, where we tried to
83 /// back-paginate again.
84 #[error("We were already back-paginating.")]
85 AlreadyBackpaginating,
86
87 /// An error happening when interacting with storage.
88 #[error(transparent)]
89 Storage(#[from] EventCacheStoreError),
90
91 /// An error happening when attempting to (cross-process) lock storage.
92 #[error(transparent)]
93 LockingStorage(#[from] LockStoreError),
94
95 /// The [`EventCache`] owns a weak reference to the [`Client`] it pertains
96 /// to. It's possible this weak reference points to nothing anymore, at
97 /// times where we try to use the client.
98 #[error("The owning client of the event cache has been dropped.")]
99 ClientDropped,
100
101 /// An error happening when interacting with the [`LinkedChunk`]'s lazy
102 /// loader.
103 ///
104 /// [`LinkedChunk`]: matrix_sdk_common::linked_chunk::LinkedChunk
105 #[error(transparent)]
106 LinkedChunkLoader(#[from] LazyLoaderError),
107}
108
109/// A result using the [`EventCacheError`].
110pub type Result<T> = std::result::Result<T, EventCacheError>;
111
112/// Hold handles to the tasks spawn by a [`RoomEventCache`].
113pub struct EventCacheDropHandles {
114 /// Task that listens to room updates.
115 listen_updates_task: JoinHandle<()>,
116
117 /// Task that listens to updates to the user's ignored list.
118 ignore_user_list_update_task: JoinHandle<()>,
119
120 /// The task used to automatically shrink the linked chunks.
121 auto_shrink_linked_chunk_task: JoinHandle<()>,
122}
123
124impl Debug for EventCacheDropHandles {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
127 }
128}
129
130impl Drop for EventCacheDropHandles {
131 fn drop(&mut self) {
132 self.listen_updates_task.abort();
133 self.ignore_user_list_update_task.abort();
134 self.auto_shrink_linked_chunk_task.abort();
135 }
136}
137
138/// An event cache, providing lots of useful functionality for clients.
139///
140/// Cloning is shallow, and thus is cheap to do.
141///
142/// See also the module-level comment.
143#[derive(Clone)]
144pub struct EventCache {
145 /// Reference to the inner cache.
146 inner: Arc<EventCacheInner>,
147}
148
149impl Debug for EventCache {
150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 f.debug_struct("EventCache").finish_non_exhaustive()
152 }
153}
154
155impl EventCache {
156 /// Create a new [`EventCache`] for the given client.
157 pub(crate) fn new(client: WeakClient, event_cache_store: EventCacheStoreLock) -> Self {
158 Self {
159 inner: Arc::new(EventCacheInner {
160 client,
161 store: event_cache_store,
162 multiple_room_updates_lock: Default::default(),
163 by_room: Default::default(),
164 drop_handles: Default::default(),
165 auto_shrink_sender: Default::default(),
166 }),
167 }
168 }
169
170 /// Starts subscribing the [`EventCache`] to sync responses, if not done
171 /// before.
172 ///
173 /// Re-running this has no effect if we already subscribed before, and is
174 /// cheap.
175 pub fn subscribe(&self) -> Result<()> {
176 let client = self.inner.client()?;
177
178 // Initialize the drop handles.
179 let _ = self.inner.drop_handles.get_or_init(|| {
180 // Spawn the task that will listen to all the room updates at once.
181 let listen_updates_task = spawn(Self::listen_task(
182 self.inner.clone(),
183 client.subscribe_to_all_room_updates(),
184 ));
185
186 let ignore_user_list_update_task = spawn(Self::ignore_user_list_update_task(
187 self.inner.clone(),
188 client.subscribe_to_ignore_user_list_changes(),
189 ));
190
191 let (tx, rx) = mpsc::channel(32);
192
193 // Force-initialize the sender in the [`RoomEventCacheInner`].
194 self.inner.auto_shrink_sender.get_or_init(|| tx);
195
196 let auto_shrink_linked_chunk_tasks =
197 spawn(Self::auto_shrink_linked_chunk_task(self.inner.clone(), rx));
198
199 Arc::new(EventCacheDropHandles {
200 listen_updates_task,
201 ignore_user_list_update_task,
202 auto_shrink_linked_chunk_task: auto_shrink_linked_chunk_tasks,
203 })
204 });
205
206 Ok(())
207 }
208
209 #[instrument(skip_all)]
210 async fn ignore_user_list_update_task(
211 inner: Arc<EventCacheInner>,
212 mut ignore_user_list_stream: Subscriber<Vec<String>>,
213 ) {
214 let span = info_span!(parent: Span::none(), "ignore_user_list_update_task");
215 span.follows_from(Span::current());
216
217 async move {
218 while ignore_user_list_stream.next().await.is_some() {
219 info!("Received an ignore user list change");
220 if let Err(err) = inner.clear_all_rooms().await {
221 error!("when clearing room storage after ignore user list change: {err}");
222 }
223 }
224 info!("Ignore user list stream has closed");
225 }
226 .instrument(span)
227 .await;
228 }
229
230 #[instrument(skip_all)]
231 async fn listen_task(
232 inner: Arc<EventCacheInner>,
233 mut room_updates_feed: Receiver<RoomUpdates>,
234 ) {
235 trace!("Spawning the listen task");
236 loop {
237 match room_updates_feed.recv().await {
238 Ok(updates) => {
239 if let Err(err) = inner.handle_room_updates(updates).await {
240 match err {
241 EventCacheError::ClientDropped => {
242 // The client has dropped, exit the listen task.
243 info!("Closing the event cache global listen task because client dropped");
244 break;
245 }
246 err => {
247 error!("Error when handling room updates: {err}");
248 }
249 }
250 }
251 }
252
253 Err(RecvError::Lagged(num_skipped)) => {
254 // Forget everything we know; we could have missed events, and we have
255 // no way to reconcile at the moment!
256 // TODO: implement Smart Matching™,
257 warn!(num_skipped, "Lagged behind room updates, clearing all rooms");
258 if let Err(err) = inner.clear_all_rooms().await {
259 error!("when clearing storage after lag in listen_task: {err}");
260 }
261 }
262
263 Err(RecvError::Closed) => {
264 // The sender has shut down, exit.
265 info!("Closing the event cache global listen task because receiver closed");
266 break;
267 }
268 }
269 }
270 }
271
272 /// Spawns the task that will listen to auto-shrink notifications.
273 ///
274 /// The auto-shrink mechanism works this way:
275 ///
276 /// - Each time there's a new subscriber to a [`RoomEventCache`], it will
277 /// increment the active number of listeners to that room, aka
278 /// [`RoomEventCacheState::listener_count`].
279 /// - When that subscriber is dropped, it will decrement that count; and
280 /// notify the task below if it reached 0.
281 /// - The task spawned here, owned by the [`EventCacheInner`], will listen
282 /// to such notifications that a room may be shrunk. It will attempt an
283 /// auto-shrink, by letting the inner state decide whether this is a good
284 /// time to do so (new listeners might have spawned in the meanwhile).
285 #[instrument(skip_all)]
286 async fn auto_shrink_linked_chunk_task(
287 inner: Arc<EventCacheInner>,
288 mut rx: mpsc::Receiver<AutoShrinkChannelPayload>,
289 ) {
290 while let Some(room_id) = rx.recv().await {
291 trace!(for_room = %room_id, "received notification to shrink");
292
293 let room = match inner.for_room(&room_id).await {
294 Ok(room) => room,
295 Err(err) => {
296 warn!(for_room = %room_id, "error when getting a RoomEventCache: {err}");
297 continue;
298 }
299 };
300
301 trace!("waiting for state lock…");
302 let mut state = room.inner.state.write().await;
303
304 match state.auto_shrink_if_no_listeners().await {
305 Ok(diffs) => {
306 if let Some(diffs) = diffs {
307 // Hey, fun stuff: we shrunk the linked chunk, so there shouldn't be any
308 // listeners, right? RIGHT? Especially because the state is guarded behind
309 // a lock.
310 //
311 // However, better safe than sorry, and it's cheap to send an update here,
312 // so let's do it!
313 if !diffs.is_empty() {
314 let _ = room.inner.sender.send(
315 RoomEventCacheUpdate::UpdateTimelineEvents {
316 diffs,
317 origin: EventsOrigin::Cache,
318 },
319 );
320 }
321 } else {
322 debug!("auto-shrinking didn't happen");
323 }
324 }
325
326 Err(err) => {
327 // There's not much we can do here, unfortunately.
328 warn!(for_room = %room_id, "error when attempting to shrink linked chunk: {err}");
329 }
330 }
331 }
332 }
333
334 /// Return a room-specific view over the [`EventCache`].
335 pub(crate) async fn for_room(
336 &self,
337 room_id: &RoomId,
338 ) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
339 let Some(drop_handles) = self.inner.drop_handles.get().cloned() else {
340 return Err(EventCacheError::NotSubscribedYet);
341 };
342
343 let room = self.inner.for_room(room_id).await?;
344
345 Ok((room, drop_handles))
346 }
347
348 /// Cleanly clear all the rooms' event caches.
349 ///
350 /// This will notify any live observers that the room has been cleared.
351 pub async fn clear_all_rooms(&self) -> Result<()> {
352 self.inner.clear_all_rooms().await
353 }
354}
355
356struct EventCacheInner {
357 /// A weak reference to the inner client, useful when trying to get a handle
358 /// on the owning client.
359 client: WeakClient,
360
361 /// Reference to the underlying store.
362 store: EventCacheStoreLock,
363
364 /// A lock used when many rooms must be updated at once.
365 ///
366 /// [`Mutex`] is “fair”, as it is implemented as a FIFO. It is important to
367 /// ensure that multiple updates will be applied in the correct order, which
368 /// is enforced by taking this lock when handling an update.
369 // TODO: that's the place to add a cross-process lock!
370 multiple_room_updates_lock: Mutex<()>,
371
372 /// Lazily-filled cache of live [`RoomEventCache`], once per room.
373 by_room: RwLock<BTreeMap<OwnedRoomId, RoomEventCache>>,
374
375 /// Handles to keep alive the task listening to updates.
376 drop_handles: OnceLock<Arc<EventCacheDropHandles>>,
377
378 /// A sender for notifications that a room *may* need to be auto-shrunk.
379 ///
380 /// Needs to live here, so it may be passed to each [`RoomEventCache`]
381 /// instance.
382 ///
383 /// See doc comment of [`EventCache::auto_shrink_linked_chunk_task`].
384 auto_shrink_sender: OnceLock<mpsc::Sender<AutoShrinkChannelPayload>>,
385}
386
387type AutoShrinkChannelPayload = OwnedRoomId;
388
389impl EventCacheInner {
390 fn client(&self) -> Result<Client> {
391 self.client.get().ok_or(EventCacheError::ClientDropped)
392 }
393
394 /// Clears all the room's data.
395 async fn clear_all_rooms(&self) -> Result<()> {
396 // Okay, here's where things get complicated.
397 //
398 // On the one hand, `by_room` may include storage for *some* rooms that we know
399 // about, but not *all* of them. Any room that hasn't been loaded in the
400 // client, or touched by a sync, will remain unloaded in memory, so it
401 // will be missing from `self.by_room`. As a result, we need to make
402 // sure that we're hitting the storage backend to *really* clear all the
403 // rooms, including those that haven't been loaded yet.
404 //
405 // On the other hand, one must NOT clear the `by_room` map, because if someone
406 // subscribed to a room update, they would never get any new update for
407 // that room, since re-creating the `RoomEventCache` would create a new,
408 // unrelated sender.
409 //
410 // So we need to *keep* the rooms in `by_room` alive, while clearing them in the
411 // store backend.
412 //
413 // As a result, for a short while, the in-memory linked chunks
414 // will be desynchronized from the storage. We need to be careful then. During
415 // that short while, we don't want *anyone* to touch the linked chunk
416 // (be it in memory or in the storage).
417 //
418 // And since that requirement applies to *any* room in `by_room` at the same
419 // time, we'll have to take the locks for *all* the live rooms, so as to
420 // properly clear the underlying storage.
421 //
422 // At this point, you might be scared about the potential for deadlocking. I am
423 // as well, but I'm convinced we're fine:
424 // 1. the lock for `by_room` is usually held only for a short while, and
425 // independently of the other two kinds.
426 // 2. the state may acquire the store cross-process lock internally, but only
427 // while the state's methods are called (so it's always transient). As a
428 // result, as soon as we've acquired the state locks, the store lock ought to
429 // be free.
430 // 3. The store lock is held explicitly only in a small scoped area below.
431 // 4. Then the store lock will be held internally when calling `reset()`, but at
432 // this point it's only held for a short while each time, so rooms will take
433 // turn to acquire it.
434
435 let rooms = self.by_room.write().await;
436
437 // Collect all the rooms' state locks, first: we can clear the storage only when
438 // nobody will touch it at the same time.
439 let room_locks = join_all(
440 rooms.values().map(|room| async move { (room, room.inner.state.write().await) }),
441 )
442 .await;
443
444 // Clear the storage for all the rooms, using the storage facility.
445 self.store.lock().await?.clear_all_linked_chunks().await?;
446
447 // At this point, all the in-memory linked chunks are desynchronized from the
448 // storage. Resynchronize them manually by calling reset(), and
449 // propagate updates to observers.
450 try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move {
451 let updates_as_vector_diffs = state_guard.reset().await?;
452 let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents {
453 diffs: updates_as_vector_diffs,
454 origin: EventsOrigin::Cache,
455 });
456 Ok::<_, EventCacheError>(())
457 }))
458 .await?;
459
460 Ok(())
461 }
462
463 /// Handles a single set of room updates at once.
464 #[instrument(skip(self, updates))]
465 async fn handle_room_updates(&self, updates: RoomUpdates) -> Result<()> {
466 // First, take the lock that indicates we're processing updates, to avoid
467 // handling multiple updates concurrently.
468 let _lock = self.multiple_room_updates_lock.lock().await;
469
470 // Left rooms.
471 for (room_id, left_room_update) in updates.left {
472 let room = self.for_room(&room_id).await?;
473
474 if let Err(err) = room.inner.handle_left_room_update(left_room_update).await {
475 // Non-fatal error, try to continue to the next room.
476 error!("handling left room update: {err}");
477 }
478 }
479
480 // Joined rooms.
481 for (room_id, joined_room_update) in updates.joined {
482 let room = self.for_room(&room_id).await?;
483
484 if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await {
485 // Non-fatal error, try to continue to the next room.
486 error!(%room_id, "handling joined room update: {err}");
487 }
488 }
489
490 // Invited rooms.
491 // TODO: we don't anything with `updates.invite` at this point.
492
493 Ok(())
494 }
495
496 /// Return a room-specific view over the [`EventCache`].
497 ///
498 /// It may not be found, if the room isn't known to the client, in which
499 /// case it'll return None.
500 async fn for_room(&self, room_id: &RoomId) -> Result<RoomEventCache> {
501 // Fast path: the entry exists; let's acquire a read lock, it's cheaper than a
502 // write lock.
503 let by_room_guard = self.by_room.read().await;
504
505 match by_room_guard.get(room_id) {
506 Some(room) => Ok(room.clone()),
507
508 None => {
509 // Slow-path: the entry doesn't exist; let's acquire a write lock.
510 drop(by_room_guard);
511 let mut by_room_guard = self.by_room.write().await;
512
513 // In the meanwhile, some other caller might have obtained write access and done
514 // the same, so check for existence again.
515 if let Some(room) = by_room_guard.get(room_id) {
516 return Ok(room.clone());
517 }
518
519 let pagination_status =
520 SharedObservable::new(RoomPaginationStatus::Idle { hit_timeline_start: false });
521
522 let room_version = self
523 .client
524 .get()
525 .and_then(|client| client.get_room(room_id))
526 .as_ref()
527 .map(|room| room.clone_info().room_version_or_default())
528 .unwrap_or_else(|| {
529 warn!("unknown room version for {room_id}, using default V1");
530 RoomVersionId::V1
531 });
532
533 let room_state = RoomEventCacheState::new(
534 room_id.to_owned(),
535 room_version,
536 self.store.clone(),
537 pagination_status.clone(),
538 )
539 .await?;
540
541 // SAFETY: we must have subscribed before reaching this coed, otherwise
542 // something is very wrong.
543 let auto_shrink_sender =
544 self.auto_shrink_sender.get().cloned().expect(
545 "we must have called `EventCache::subscribe()` before calling here.",
546 );
547
548 let room_event_cache = RoomEventCache::new(
549 self.client.clone(),
550 room_state,
551 pagination_status,
552 room_id.to_owned(),
553 auto_shrink_sender,
554 );
555
556 by_room_guard.insert(room_id.to_owned(), room_event_cache.clone());
557
558 Ok(room_event_cache)
559 }
560 }
561 }
562}
563
564/// The result of a single back-pagination request.
565#[derive(Debug)]
566pub struct BackPaginationOutcome {
567 /// Did the back-pagination reach the start of the timeline?
568 pub reached_start: bool,
569
570 /// All the events that have been returned in the back-pagination
571 /// request.
572 ///
573 /// Events are presented in reverse order: the first element of the vec,
574 /// if present, is the most "recent" event from the chunk (or
575 /// technically, the last one in the topological ordering).
576 pub events: Vec<TimelineEvent>,
577}
578
579/// An update related to events happened in a room.
580#[derive(Debug, Clone)]
581pub enum RoomEventCacheUpdate {
582 /// The fully read marker has moved to a different event.
583 MoveReadMarkerTo {
584 /// Event at which the read marker is now pointing.
585 event_id: OwnedEventId,
586 },
587
588 /// The members have changed.
589 UpdateMembers {
590 /// Collection of ambiguity changes that room member events trigger.
591 ///
592 /// This is a map of event ID of the `m.room.member` event to the
593 /// details of the ambiguity change.
594 ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
595 },
596
597 /// The room has received updates for the timeline as _diffs_.
598 UpdateTimelineEvents {
599 /// Diffs to apply to the timeline.
600 diffs: Vec<VectorDiff<TimelineEvent>>,
601
602 /// Where the diffs are coming from.
603 origin: EventsOrigin,
604 },
605
606 /// The room has received new ephemeral events.
607 AddEphemeralEvents {
608 /// XXX: this is temporary, until read receipts are handled in the event
609 /// cache
610 events: Vec<Raw<AnySyncEphemeralRoomEvent>>,
611 },
612}
613
614/// Indicate where events are coming from.
615#[derive(Debug, Clone)]
616pub enum EventsOrigin {
617 /// Events are coming from a sync.
618 Sync,
619
620 /// Events are coming from pagination.
621 Pagination,
622
623 /// The cause of the change is purely internal to the cache.
624 Cache,
625}
626
627#[cfg(test)]
628mod tests {
629 use assert_matches::assert_matches;
630 use futures_util::FutureExt as _;
631 use matrix_sdk_base::sync::{JoinedRoomUpdate, RoomUpdates, Timeline};
632 use matrix_sdk_test::{async_test, event_factory::EventFactory};
633 use ruma::{event_id, room_id, serde::Raw, user_id};
634 use serde_json::json;
635
636 use super::{EventCacheError, RoomEventCacheUpdate};
637 use crate::test_utils::{assert_event_matches_msg, logged_in_client};
638
639 #[async_test]
640 async fn test_must_explicitly_subscribe() {
641 let client = logged_in_client(None).await;
642
643 let event_cache = client.event_cache();
644
645 // If I create a room event subscriber for a room before subscribing the event
646 // cache,
647 let room_id = room_id!("!omelette:fromage.fr");
648 let result = event_cache.for_room(room_id).await;
649
650 // Then it fails, because one must explicitly call `.subscribe()` on the event
651 // cache.
652 assert_matches!(result, Err(EventCacheError::NotSubscribedYet));
653 }
654
655 #[async_test]
656 async fn test_uniq_read_marker() {
657 let client = logged_in_client(None).await;
658 let room_id = room_id!("!galette:saucisse.bzh");
659 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
660
661 let event_cache = client.event_cache();
662
663 event_cache.subscribe().unwrap();
664
665 let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap();
666
667 let (events, mut stream) = room_event_cache.subscribe().await;
668
669 assert!(events.is_empty());
670
671 // When sending multiple times the same read marker event,…
672 let read_marker_event = Raw::from_json_string(
673 json!({
674 "content": {
675 "event_id": "$crepe:saucisse.bzh"
676 },
677 "room_id": "!galette:saucisse.bzh",
678 "type": "m.fully_read"
679 })
680 .to_string(),
681 )
682 .unwrap();
683 let account_data = vec![read_marker_event; 100];
684
685 room_event_cache
686 .inner
687 .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() })
688 .await
689 .unwrap();
690
691 // … there's only one read marker update.
692 assert_matches!(
693 stream.recv().await.unwrap(),
694 RoomEventCacheUpdate::MoveReadMarkerTo { .. }
695 );
696
697 assert!(stream.recv().now_or_never().is_none());
698 }
699
700 #[async_test]
701 async fn test_get_event_by_id() {
702 let client = logged_in_client(None).await;
703 let room_id1 = room_id!("!galette:saucisse.bzh");
704 let room_id2 = room_id!("!crepe:saucisse.bzh");
705
706 let event_cache = client.event_cache();
707 event_cache.subscribe().unwrap();
708
709 // Insert two rooms with a few events.
710 let f = EventFactory::new().room(room_id1).sender(user_id!("@ben:saucisse.bzh"));
711
712 let eid1 = event_id!("$1");
713 let eid2 = event_id!("$2");
714 let eid3 = event_id!("$3");
715
716 let joined_room_update1 = JoinedRoomUpdate {
717 timeline: Timeline {
718 events: vec![
719 f.text_msg("hey").event_id(eid1).into(),
720 f.text_msg("you").event_id(eid2).into(),
721 ],
722 ..Default::default()
723 },
724 ..Default::default()
725 };
726
727 let joined_room_update2 = JoinedRoomUpdate {
728 timeline: Timeline {
729 events: vec![f.text_msg("bjr").event_id(eid3).into()],
730 ..Default::default()
731 },
732 ..Default::default()
733 };
734
735 let mut updates = RoomUpdates::default();
736 updates.joined.insert(room_id1.to_owned(), joined_room_update1);
737 updates.joined.insert(room_id2.to_owned(), joined_room_update2);
738
739 // Have the event cache handle them.
740 event_cache.inner.handle_room_updates(updates).await.unwrap();
741
742 // We can find the events in a single room.
743 client.base_client().get_or_create_room(room_id1, matrix_sdk_base::RoomState::Joined);
744 let room1 = client.get_room(room_id1).unwrap();
745
746 let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap();
747
748 let found1 = room_event_cache.event(eid1).await.unwrap();
749 assert_event_matches_msg(&found1, "hey");
750
751 let found2 = room_event_cache.event(eid2).await.unwrap();
752 assert_event_matches_msg(&found2, "you");
753
754 // Retrieving the event with id3 from the room which doesn't contain it will
755 // fail…
756 assert!(room_event_cache.event(eid3).await.is_none());
757 }
758
759 #[async_test]
760 async fn test_save_event() {
761 let client = logged_in_client(None).await;
762 let room_id = room_id!("!galette:saucisse.bzh");
763
764 let event_cache = client.event_cache();
765 event_cache.subscribe().unwrap();
766
767 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
768 let event_id = event_id!("$1");
769
770 client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
771 let room = client.get_room(room_id).unwrap();
772
773 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
774 room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await;
775
776 // Retrieving the event at the room-wide cache works.
777 assert!(room_event_cache.event(event_id).await.is_some());
778 }
779}