Skip to main content

matrix_sdk/latest_events/latest_event/
mod.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod builder;
16
17use std::ops::{Deref, DerefMut, Not};
18
19use builder::{BufferOfValuesForLocalEvents, Builder};
20use eyeball::{AsyncLock, ObservableWriteGuard, SharedObservable, Subscriber};
21pub use matrix_sdk_base::latest_event::{
22    LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue,
23};
24use matrix_sdk_base::{RoomInfoNotableUpdateReasons, RoomState};
25use ruma::{EventId, OwnedEventId, UserId, events::room::power_levels::RoomPowerLevels};
26use tracing::{error, info, instrument, trace, warn};
27
28use crate::{Room, event_cache::RoomEventCache, room::WeakRoom, send_queue::RoomSendQueueUpdate};
29
30/// The latest event of a room or a thread.
31///
32/// Use [`LatestEvent::subscribe`] to get a stream of updates.
33#[derive(Debug)]
34pub(super) struct LatestEvent {
35    /// The room owning this latest event.
36    weak_room: WeakRoom,
37
38    /// The thread (if any) owning this latest event.
39    _thread_id: Option<OwnedEventId>,
40
41    /// A buffer of the current [`LatestEventValue`]s computed for local events
42    /// seen by the send queue. See [`BufferOfValuesForLocalEvents`] to learn
43    /// more.
44    buffer_of_values_for_local_events: BufferOfValuesForLocalEvents,
45
46    /// The latest event value.
47    current_value: SharedObservable<LatestEventValue, AsyncLock>,
48}
49
50impl LatestEvent {
51    pub fn new(
52        weak_room: &WeakRoom,
53        thread_id: Option<&EventId>,
54    ) -> With<Self, IsLatestEventValueNone> {
55        let latest_event_value = match thread_id {
56            Some(_thread_id) => LatestEventValue::default(),
57            None => weak_room.get().map(|room| room.latest_event()).unwrap_or_default(),
58        };
59        let is_none = latest_event_value.is_none();
60
61        With {
62            result: Self {
63                weak_room: weak_room.clone(),
64                _thread_id: thread_id.map(ToOwned::to_owned),
65                buffer_of_values_for_local_events: BufferOfValuesForLocalEvents::new(),
66                current_value: SharedObservable::new_async(latest_event_value),
67            },
68            with: is_none,
69        }
70    }
71
72    /// Return a [`Subscriber`] to new values.
73    pub async fn subscribe(&self) -> Subscriber<LatestEventValue, AsyncLock> {
74        self.current_value.subscribe().await
75    }
76
77    #[cfg(test)]
78    pub async fn get(&self) -> LatestEventValue {
79        self.current_value.get().await
80    }
81
82    /// Update the inner latest event value, based on the event cache
83    /// (specifically with the [`RoomEventCache`]), if and only if there is no
84    /// local latest event value waiting.
85    ///
86    /// It is only necessary to compute a new [`LatestEventValue`] from the
87    /// event cache if there is no [`LatestEventValue`] to be compute from the
88    /// send queue. Indeed, anything coming from the send queue has the priority
89    /// over the anything coming from the event cache. We believe it provides a
90    /// better user experience.
91    pub async fn update_with_event_cache(
92        &mut self,
93        room_event_cache: &RoomEventCache,
94        own_user_id: &UserId,
95        power_levels: Option<&RoomPowerLevels>,
96    ) {
97        if self.buffer_of_values_for_local_events.is_empty().not() {
98            // At least one `LatestEventValue` exists for local events (i.e. coming from the
99            // send queue). In this case, we don't overwrite the current value with a newly
100            // computed one from the event cache.
101            return;
102        }
103
104        let current_event = self.current_value.get().await;
105        let new_value =
106            Builder::new_remote(room_event_cache, current_event, own_user_id, power_levels).await;
107
108        trace!(value = ?new_value, "Computed a remote `LatestEventValue`");
109
110        if let Some(new_value) = new_value {
111            self.update(new_value).await;
112        }
113    }
114
115    /// Update the inner latest event value, based on the send queue
116    /// (specifically with the [`RoomSendQueueUpdate`]).
117    pub async fn update_with_send_queue(
118        &mut self,
119        send_queue_update: &RoomSendQueueUpdate,
120        room_event_cache: &RoomEventCache,
121        own_user_id: &UserId,
122        power_levels: Option<&RoomPowerLevels>,
123    ) {
124        let current_event = self.current_value.get().await;
125        let new_value = Builder::new_local(
126            send_queue_update,
127            &mut self.buffer_of_values_for_local_events,
128            room_event_cache,
129            current_event,
130            own_user_id,
131            power_levels,
132        )
133        .await;
134
135        trace!(value = ?new_value, "Computed a local `LatestEventValue`");
136
137        if let Some(new_value) = new_value {
138            self.update(new_value).await;
139        }
140    }
141
142    /// Update the inner latest event value, based on the room info.
143    pub async fn update_with_room_info(
144        &mut self,
145        room: Room,
146        reasons: RoomInfoNotableUpdateReasons,
147    ) {
148        // If the `RoomInfo` has been updated due to a change of the own membership.
149        if reasons.contains(RoomInfoNotableUpdateReasons::MEMBERSHIP) {
150            let new_value = match room.state() {
151                // If the room' state is `Invited`, it means the current user has been recently
152                // invited to this room.
153                RoomState::Invited => {
154                    // Short: Let's not update a `RemoteInvite` to another `RemoteInvite`.
155                    //
156                    // Long: An invite room is only constituted of stripped-state events. These
157                    // events do not have an `origin_server_ts` field. It means we cannot compute
158                    // the timestamp of the `LatestEventValue`. To workaround this, we set the
159                    // timestamp to `now()`. See `Builder::new_remote_for_invite` to learn more.
160                    // If an invite room receives a new event, its `LatestEventValue`'s timestamp
161                    // will be updated to `now()`, which will make the room bumps to the top of the
162                    // room list for example. This is not an acceptable behaviour because it can be
163                    // an “attack vector”, i.e. a way to annoy people with spammy invites. That's
164                    // why, once a `RemoteInvite` has been computed, we do not refresh it.
165                    if matches!(
166                        self.current_value.read().await.deref(),
167                        LatestEventValue::RemoteInvite { .. }
168                    ) {
169                        return;
170                    }
171
172                    let new_value = Builder::new_remote_for_invite(&room).await;
173
174                    trace!(value = ?new_value, "Computed a remote `LatestEventValue` for invite");
175
176                    new_value
177                }
178
179                _ => {
180                    info!(
181                        "Skipping the computation of a remote `LatestEventValue` from a `RoomInfo`"
182                    );
183
184                    return;
185                }
186            };
187
188            self.update(new_value).await;
189        }
190    }
191
192    /// Update [`Self::current_value`], and persist the `new_value` in the
193    /// store.
194    ///
195    /// If the `new_value` is [`LatestEventValue::None`], it is accepted: if the
196    /// previous latest event value has been redacted and no other candidate has
197    /// been found, we want to latest event value to be `None`, so that it is
198    /// erased correctly.
199    async fn update(&mut self, new_value: LatestEventValue) {
200        // Ideally, we would set `new_value` if and only if it is different from the
201        // previous value. However, `LatestEventValue` cannot implement `PartialEq` at
202        // the time of writing (2025-12-12). So we are only updating if:
203        //
204        // - if `LatestEventValue` and the previous value aren't `None`,
205        // - if the event IDs are different.
206        //
207        // We must be careful when comparing the event IDs: `None` and `Local*` have no
208        // event ID, we can't compare them at this point. Hence the `match` statement to
209        // have a fine-grained decision.
210        {
211            let mut guard = self.current_value.write().await;
212            let previous_value = guard.deref();
213
214            let do_update = match (previous_value, &new_value) {
215                // If both are `None`, no.
216                (LatestEventValue::None, LatestEventValue::None) => false,
217
218                // If at least one is `None`, yes.
219                (_, LatestEventValue::None) | (LatestEventValue::None, _) => true,
220
221                // If the event IDs are identical, no.
222                (previous, new) if previous.event_id() == new.event_id() => false,
223
224                // Otherwise, yes.
225                (_, _) => true,
226            };
227
228            if do_update {
229                ObservableWriteGuard::set(&mut guard, new_value.clone());
230
231                // Release the write guard over the current value before hitting the store.
232                drop(guard);
233
234                self.store(new_value).await;
235            }
236        }
237    }
238
239    /// Update the `RoomInfo` associated to this room to set the new
240    /// [`LatestEventValue`], and persist it in the
241    /// [`StateStore`][matrix_sdk_base::StateStore] (the one from
242    /// [`Client::state_store`][crate::Client::state_store]).
243    #[instrument(skip_all)]
244    async fn store(&mut self, new_value: LatestEventValue) {
245        let Some(room) = self.weak_room.get() else {
246            warn!(room_id = ?self.weak_room.room_id(), "Cannot store the latest event value because the room cannot be accessed");
247            return;
248        };
249        let result = room
250            .update_and_save_room_info(|mut info| {
251                info.set_latest_event(new_value);
252                (info, RoomInfoNotableUpdateReasons::LATEST_EVENT)
253            })
254            .await;
255        if let Err(error) = result {
256            error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
257        }
258    }
259}
260
261/// Semantic type similar to a tuple where the left part is the main result and
262/// the right part is an “attached” value.
263pub(super) struct With<T, W> {
264    /// The main value.
265    result: T,
266
267    /// The “attached” value.
268    with: W,
269}
270
271impl<T, W> With<T, W> {
272    /// Map the main result without changing the “attached” value.
273    pub fn map<F, O>(this: With<T, W>, f: F) -> With<O, W>
274    where
275        F: FnOnce(T) -> O,
276    {
277        With { result: f(this.result), with: this.with }
278    }
279
280    /// Get the main result.
281    pub fn inner(this: With<T, W>) -> T {
282        this.result
283    }
284
285    /// Get a tuple.
286    pub fn unzip(this: With<T, W>) -> (T, W) {
287        (this.result, this.with)
288    }
289}
290
291impl<T, W> Deref for With<T, W> {
292    type Target = T;
293
294    fn deref(&self) -> &Self::Target {
295        &self.result
296    }
297}
298
299impl<T, W> DerefMut for With<T, W> {
300    fn deref_mut(&mut self) -> &mut Self::Target {
301        &mut self.result
302    }
303}
304
305pub(super) type IsLatestEventValueNone = bool;
306
307#[cfg(all(not(target_family = "wasm"), test))]
308mod tests_latest_event {
309    use std::ops::Not;
310
311    use assert_matches::assert_matches;
312    use matrix_sdk_base::{
313        RoomInfoNotableUpdateReasons, RoomState,
314        latest_event::RemoteLatestEventValue,
315        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
316        store::{SerializableEventContent, StoreConfig},
317    };
318    use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
319    use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
320    use ruma::{
321        MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
322        events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
323        owned_event_id, owned_room_id, room_id, user_id,
324    };
325    use stream_assert::{assert_next_matches, assert_pending};
326    use tokio::task::yield_now;
327
328    use super::{super::local_room_message, LatestEvent, LatestEventValue, With};
329    use crate::{
330        client::WeakClient,
331        room::WeakRoom,
332        send_queue::{LocalEcho, LocalEchoContent, RoomSendQueue, RoomSendQueueUpdate, SendHandle},
333        test_utils::mocks::MatrixMockServer,
334    };
335
336    fn new_local_echo_content(
337        room_send_queue: &RoomSendQueue,
338        transaction_id: &OwnedTransactionId,
339        body: &str,
340    ) -> LocalEchoContent {
341        LocalEchoContent::Event {
342            serialized_event: SerializableEventContent::new(
343                &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
344            )
345            .unwrap(),
346            send_handle: SendHandle::new(
347                room_send_queue.clone(),
348                transaction_id.clone(),
349                MilliSecondsSinceUnixEpoch::now(),
350            ),
351            send_error: None,
352        }
353    }
354
355    #[async_test]
356    async fn test_new_loads_from_room_info() {
357        let room_id = room_id!("!r0");
358
359        let server = MatrixMockServer::new().await;
360        let client = server.client_builder().build().await;
361        let weak_client = WeakClient::from_client(&client);
362
363        // Create the room.
364        let room = client.base_client().get_or_create_room(room_id, RoomState::Joined);
365        let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
366
367        // First time `LatestEvent` is created: we get `None`.
368        {
369            let (latest_event, is_none) = With::unzip(LatestEvent::new(&weak_room, None));
370
371            // By default, it's `None`.
372            assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
373            assert!(is_none);
374        }
375
376        // Set the `RoomInfo`.
377        {
378            room.update_room_info(|mut info| {
379                info.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
380                (info, Default::default())
381            })
382            .await;
383        }
384
385        // Second time. We get `LocalIsSending` from `RoomInfo`.
386        {
387            let (latest_event, is_none) = With::unzip(LatestEvent::new(&weak_room, None));
388
389            // By default, it's `None`.
390            assert_matches!(
391                latest_event.current_value.get().await,
392                LatestEventValue::LocalIsSending(_)
393            );
394            assert!(is_none.not());
395        }
396    }
397
398    #[async_test]
399    async fn test_update_do_not_ignore_none_value() {
400        let room_id = room_id!("!r0");
401
402        let server = MatrixMockServer::new().await;
403        let client = server.client_builder().build().await;
404        let weak_client = WeakClient::from_client(&client);
405
406        // Create the room.
407        client.base_client().get_or_create_room(room_id, RoomState::Joined);
408        let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
409
410        // Get a `RoomEventCache`.
411        let event_cache = client.event_cache();
412        event_cache.subscribe().unwrap();
413
414        let mut latest_event = LatestEvent::new(&weak_room, None);
415
416        // First off, check the default value is `None`!
417        assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
418
419        // Second, set a new value.
420        latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
421
422        assert_matches!(
423            latest_event.current_value.get().await,
424            LatestEventValue::LocalIsSending(_)
425        );
426
427        // Finally, set a new `None` value. It must NOT be ignored.
428        latest_event.update(LatestEventValue::None).await;
429
430        assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
431    }
432
433    #[async_test]
434    async fn test_update_ignore_none_if_previous_value_is_none() {
435        let room_id = room_id!("!r0");
436
437        let server = MatrixMockServer::new().await;
438        let client = server.client_builder().build().await;
439        let weak_client = WeakClient::from_client(&client);
440
441        // Create the room.
442        client.base_client().get_or_create_room(room_id, RoomState::Joined);
443        let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
444
445        let mut latest_event = LatestEvent::new(&weak_room, None);
446
447        let mut stream = latest_event.subscribe().await;
448        assert_pending!(stream);
449
450        // Set a non-`None` value.
451        latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
452        // We get it.
453        assert_next_matches!(stream, LatestEventValue::LocalIsSending(_));
454
455        // Set a `None` value.
456        latest_event.update(LatestEventValue::None).await;
457        // We get it.
458        assert_next_matches!(stream, LatestEventValue::None);
459
460        // Set a `None` value, again!
461        latest_event.update(LatestEventValue::None).await;
462        // We get it? No!
463        assert_pending!(stream);
464
465        // Set a `None` value, again, and again!
466        latest_event.update(LatestEventValue::None).await;
467        // No means No!
468        assert_pending!(stream);
469
470        // Set a non-`None` value.
471        latest_event.update(LatestEventValue::LocalIsSending(local_room_message("bar"))).await;
472        // We get it. Oof.
473        assert_next_matches!(stream, LatestEventValue::LocalIsSending(_));
474
475        assert_pending!(stream);
476    }
477
478    #[async_test]
479    async fn test_update_ignore_when_previous_value_has_the_same_event_id() {
480        let room_id = room_id!("!r0");
481        let user_id = user_id!("@mnt_io:matrix.org");
482        let event_factory = EventFactory::new().sender(user_id).room(room_id);
483
484        let server = MatrixMockServer::new().await;
485        let client = server.client_builder().build().await;
486        let weak_client = WeakClient::from_client(&client);
487
488        // Create the room.
489        client.base_client().get_or_create_room(room_id, RoomState::Joined);
490        let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
491
492        let mut latest_event = LatestEvent::new(&weak_room, None);
493
494        let mut stream = latest_event.subscribe().await;
495        assert_pending!(stream);
496
497        // Set a non-`None` value.
498        latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
499        // We get it.
500        assert_next_matches!(stream, LatestEventValue::LocalIsSending(_));
501
502        // Set a non-`None` value, with a specific event ID.
503        let first_event: RemoteLatestEventValue =
504            event_factory.text_msg("A").event_id(event_id!("$ev0")).into();
505        latest_event.update(LatestEventValue::Remote(first_event.clone())).await;
506        // We get it.
507        assert_next_matches!(stream, LatestEventValue::Remote(_));
508
509        // Set a non-`None` value again, with the same event ID!
510        latest_event.update(LatestEventValue::Remote(first_event)).await;
511        // It's ignored!
512        assert_pending!(stream);
513
514        // Set a non-`None` value again, with a different event ID!
515        let second_event = event_factory.text_msg("A").event_id(event_id!("$ev1")).into();
516        latest_event.update(LatestEventValue::Remote(second_event)).await;
517        // We get it!
518        assert_next_matches!(stream, LatestEventValue::Remote(_));
519
520        assert_pending!(stream);
521    }
522
523    #[async_test]
524    async fn test_local_has_priority_over_remote() {
525        let room_id = owned_room_id!("!r0");
526        let user_id = user_id!("@mnt_io:matrix.org");
527        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
528
529        let server = MatrixMockServer::new().await;
530        let client = server.client_builder().build().await;
531        client.base_client().get_or_create_room(&room_id, RoomState::Joined);
532        let room = client.get_room(&room_id).unwrap();
533        let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
534
535        let event_cache = client.event_cache();
536        event_cache.subscribe().unwrap();
537
538        // Fill the event cache with one event.
539        client
540            .event_cache_store()
541            .lock()
542            .await
543            .expect("Could not acquire the event cache lock")
544            .as_clean()
545            .expect("Could not acquire a clean event cache lock")
546            .handle_linked_chunk_updates(
547                LinkedChunkId::Room(&room_id),
548                vec![
549                    Update::NewItemsChunk {
550                        previous: None,
551                        new: ChunkIdentifier::new(0),
552                        next: None,
553                    },
554                    Update::PushItems {
555                        at: Position::new(ChunkIdentifier::new(0), 0),
556                        items: vec![event_factory.text_msg("A").event_id(event_id!("$ev0")).into()],
557                    },
558                ],
559            )
560            .await
561            .unwrap();
562
563        let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
564
565        let send_queue = client.send_queue();
566        let room_send_queue = send_queue.for_room(room);
567
568        let mut latest_event = LatestEvent::new(&weak_room, None);
569
570        // First, let's create a `LatestEventValue` from the event cache. It must work.
571        {
572            latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
573
574            assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
575        }
576
577        // Second, let's create a `LatestEventValue` from the send queue. It
578        // must overwrite the current `LatestEventValue`.
579        let transaction_id = OwnedTransactionId::from("txnid0");
580
581        {
582            let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
583
584            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
585                transaction_id: transaction_id.clone(),
586                content,
587            });
588
589            latest_event.update_with_send_queue(&update, &room_event_cache, user_id, None).await;
590
591            assert_matches!(
592                latest_event.current_value.get().await,
593                LatestEventValue::LocalIsSending(_)
594            );
595        }
596
597        // Third, let's create a `LatestEventValue` from the event cache.
598        // Nothing must happen, it cannot overwrite the current
599        // `LatestEventValue` because the local event isn't sent yet.
600        {
601            latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
602
603            assert_matches!(
604                latest_event.current_value.get().await,
605                LatestEventValue::LocalIsSending(_)
606            );
607        }
608
609        // Fourth, let's a `LatestEventValue` from the send queue. It must stay the
610        // same, but now the local event is sent.
611        {
612            let update = RoomSendQueueUpdate::SentEvent {
613                transaction_id,
614                event_id: owned_event_id!("$ev1"),
615            };
616
617            latest_event.update_with_send_queue(&update, &room_event_cache, user_id, None).await;
618
619            assert_matches!(
620                latest_event.current_value.get().await,
621                LatestEventValue::LocalHasBeenSent { .. }
622            );
623        }
624
625        // Finally, let's create a `LatestEventValue` from the event cache. _Now_ it's
626        // possible, because there is no more local events.
627        {
628            latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
629
630            assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
631        }
632    }
633
634    #[async_test]
635    async fn test_redacted_latest_event_is_removed() {
636        let room_id = owned_room_id!("!r0");
637        let user_id = user_id!("@mnt_io:matrix.org");
638        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
639
640        let server = MatrixMockServer::new().await;
641        let client = server.client_builder().build().await;
642        client.base_client().get_or_create_room(&room_id, RoomState::Joined);
643        let _room = client.get_room(&room_id).unwrap();
644        let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
645
646        let event_cache = client.event_cache();
647        event_cache.subscribe().unwrap();
648
649        let event_id_0 = event_id!("$ev0");
650        let event_id_1 = event_id!("$ev1");
651
652        // Fill the event cache with two events.
653        client
654            .event_cache_store()
655            .lock()
656            .await
657            .expect("Could not acquire the event cache lock")
658            .as_clean()
659            .expect("Could not acquire a clean event cache lock")
660            .handle_linked_chunk_updates(
661                LinkedChunkId::Room(&room_id),
662                vec![
663                    Update::NewItemsChunk {
664                        previous: None,
665                        new: ChunkIdentifier::new(0),
666                        next: None,
667                    },
668                    Update::PushItems {
669                        at: Position::new(ChunkIdentifier::new(0), 0),
670                        items: vec![
671                            event_factory.text_msg("A").event_id(event_id_0).into(),
672                            event_factory.text_msg("B").event_id(event_id_1).into(),
673                        ],
674                    },
675                ],
676            )
677            .await
678            .unwrap();
679
680        let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
681
682        let mut latest_event = LatestEvent::new(&weak_room, None);
683
684        // Let's create a `LatestEventValue` from the event cache. It must work.
685        {
686            latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
687
688            assert_matches!(
689                latest_event.current_value.get().await,
690                LatestEventValue::Remote(remote) => {
691                    assert_eq!(remote.event_id().as_deref(), Some(event_id_1));
692                }
693            );
694        }
695
696        // Now, let's redact `$ev1`.
697        {
698            server
699                .mock_sync()
700                .ok_and_run(&client, |builder| {
701                    builder.add_joined_room(
702                        JoinedRoomBuilder::new(&room_id)
703                            .add_timeline_event(event_factory.redaction(event_id_1)),
704                    );
705                })
706                .await;
707
708            yield_now().await;
709
710            latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
711
712            assert_matches!(
713                latest_event.current_value.get().await,
714                LatestEventValue::Remote(remote) => {
715                    // `$ev1` has been redacted, so `$ev0` is the new candidate!
716                    assert_eq!(remote.event_id().as_deref(), Some(event_id_0));
717                }
718            );
719        }
720    }
721
722    #[async_test]
723    async fn test_store_latest_event_value() {
724        let room_id = owned_room_id!("!r0");
725        let user_id = user_id!("@mnt_io:matrix.org");
726        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
727
728        let server = MatrixMockServer::new().await;
729
730        let store_config =
731            StoreConfig::new(CrossProcessLockConfig::multi_process("cross-process-lock-holder"));
732
733        // Load the client for the first time, and run some operations.
734        {
735            let client = server
736                .client_builder()
737                .on_builder(|builder| builder.store_config(store_config.clone()))
738                .build()
739                .await;
740            let mut room_info_notable_update_receiver = client.room_info_notable_update_receiver();
741            let room = client.base_client().get_or_create_room(&room_id, RoomState::Joined);
742            let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
743
744            let event_cache = client.event_cache();
745            event_cache.subscribe().unwrap();
746
747            // Fill the event cache with one event.
748            client
749                .event_cache_store()
750                .lock()
751                .await
752                .expect("Could not acquire the event cache lock")
753                .as_clean()
754                .expect("Could not acquire a clean event cache lock")
755                .handle_linked_chunk_updates(
756                    LinkedChunkId::Room(&room_id),
757                    vec![
758                        Update::NewItemsChunk {
759                            previous: None,
760                            new: ChunkIdentifier::new(0),
761                            next: None,
762                        },
763                        Update::PushItems {
764                            at: Position::new(ChunkIdentifier::new(0), 0),
765                            items: vec![
766                                event_factory.text_msg("A").event_id(event_id!("$ev0")).into(),
767                            ],
768                        },
769                    ],
770                )
771                .await
772                .unwrap();
773
774            let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
775
776            // Check there is no `LatestEventValue` for the moment.
777            {
778                let latest_event = room.latest_event();
779
780                assert_matches!(latest_event, LatestEventValue::None);
781            }
782
783            // Generate a new `LatestEventValue`.
784            {
785                let mut latest_event = LatestEvent::new(&weak_room, None);
786                latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
787
788                assert_matches!(
789                    latest_event.current_value.get().await,
790                    LatestEventValue::Remote(_)
791                );
792            }
793
794            // We see the `RoomInfoNotableUpdateReasons`.
795            {
796                let update = room_info_notable_update_receiver.recv().await.unwrap();
797
798                assert_eq!(update.room_id, room_id);
799                assert!(update.reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
800            }
801
802            // Check it's in the `RoomInfo` and in `Room`.
803            {
804                let latest_event = room.latest_event();
805
806                assert_matches!(latest_event, LatestEventValue::Remote(_));
807            }
808        }
809
810        // Reload the client with the same store config, and see the `LatestEventValue`
811        // is inside the `RoomInfo`.
812        {
813            let client = server
814                .client_builder()
815                .on_builder(|builder| builder.store_config(store_config))
816                .build()
817                .await;
818            let room = client.get_room(&room_id).unwrap();
819            let latest_event = room.latest_event();
820
821            assert_matches!(latest_event, LatestEventValue::Remote(_));
822        }
823    }
824}