Skip to main content

matrix_sdk/latest_events/
latest_event.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    iter::once,
17    ops::{Deref, Not},
18};
19
20use eyeball::{AsyncLock, SharedObservable, Subscriber};
21pub use matrix_sdk_base::latest_event::{
22    LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue,
23};
24use matrix_sdk_base::{
25    RoomInfoNotableUpdateReasons, StateChanges, check_validity_of_replacement_events,
26    deserialized_responses::TimelineEvent, store::SerializableEventContent,
27};
28use ruma::{
29    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
30    events::{
31        AnyMessageLikeEventContent, AnySyncStateEvent, AnySyncTimelineEvent, SyncStateEvent,
32        relation::Replacement,
33        room::{
34            member::MembershipState,
35            message::{MessageType, Relation},
36            power_levels::RoomPowerLevels,
37        },
38    },
39};
40use tracing::{debug, error, instrument, warn};
41
42use crate::{event_cache::RoomEventCache, room::WeakRoom, send_queue::RoomSendQueueUpdate};
43
44/// The latest event of a room or a thread.
45///
46/// Use [`LatestEvent::subscribe`] to get a stream of updates.
47#[derive(Debug)]
48pub(super) struct LatestEvent {
49    /// The room owning this latest event.
50    weak_room: WeakRoom,
51
52    /// The thread (if any) owning this latest event.
53    _thread_id: Option<OwnedEventId>,
54
55    /// A buffer of the current [`LatestEventValue`] computed for local events
56    /// seen by the send queue. See [`LatestEventValuesForLocalEvents`] to learn
57    /// more.
58    buffer_of_values_for_local_events: LatestEventValuesForLocalEvents,
59
60    /// The latest event value.
61    current_value: SharedObservable<LatestEventValue, AsyncLock>,
62}
63
64impl LatestEvent {
65    pub(super) async fn new(
66        weak_room: &WeakRoom,
67        thread_id: Option<&EventId>,
68        room_event_cache: &RoomEventCache,
69    ) -> Self {
70        Self {
71            weak_room: weak_room.clone(),
72            _thread_id: thread_id.map(ToOwned::to_owned),
73            buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(),
74            current_value: SharedObservable::new_async(
75                LatestEventValueBuilder::new_remote(room_event_cache, weak_room).await,
76            ),
77        }
78    }
79
80    /// Return a [`Subscriber`] to new values.
81    pub async fn subscribe(&self) -> Subscriber<LatestEventValue, AsyncLock> {
82        self.current_value.subscribe().await
83    }
84
85    /// Update the inner latest event value, based on the event cache
86    /// (specifically with the [`RoomEventCache`]), if and only if there is no
87    /// local latest event value waiting.
88    ///
89    /// It is only necessary to compute a new [`LatestEventValue`] from the
90    /// event cache if there is no [`LatestEventValue`] to be compute from the
91    /// send queue. Indeed, anything coming from the send queue has the priority
92    /// over the anything coming from the event cache. We believe it provides a
93    /// better user experience.
94    pub async fn update_with_event_cache(
95        &mut self,
96        room_event_cache: &RoomEventCache,
97        own_user_id: Option<&UserId>,
98        power_levels: Option<&RoomPowerLevels>,
99    ) {
100        if self.buffer_of_values_for_local_events.is_empty().not() {
101            // At least one `LatestEventValue` exists for local events (i.e. coming from the
102            // send queue). In this case, we don't overwrite the current value with a newly
103            // computed one from the event cache.
104            return;
105        }
106
107        let new_value = LatestEventValueBuilder::new_remote_with_power_levels(
108            room_event_cache,
109            own_user_id,
110            power_levels,
111        )
112        .await;
113
114        self.update(new_value).await;
115    }
116
117    /// Update the inner latest event value, based on the send queue
118    /// (specifically with the [`RoomSendQueueUpdate`]).
119    pub async fn update_with_send_queue(
120        &mut self,
121        send_queue_update: &RoomSendQueueUpdate,
122        room_event_cache: &RoomEventCache,
123        own_user_id: Option<&UserId>,
124        power_levels: Option<&RoomPowerLevels>,
125    ) {
126        let new_value = LatestEventValueBuilder::new_local(
127            send_queue_update,
128            &mut self.buffer_of_values_for_local_events,
129            room_event_cache,
130            own_user_id,
131            power_levels,
132        )
133        .await;
134
135        self.update(new_value).await;
136    }
137
138    /// Update [`Self::current_value`] if and only if the `new_value` is not
139    /// [`LatestEventValue::None`].
140    async fn update(&mut self, new_value: LatestEventValue) {
141        if let LatestEventValue::None = new_value {
142            // Do not update to a `None` value.
143        } else {
144            self.current_value.set(new_value.clone()).await;
145            self.store(new_value).await;
146        }
147    }
148
149    /// Update the `RoomInfo` associated to this room to set the new
150    /// [`LatestEventValue`], and persist it in the
151    /// [`StateStore`][matrix_sdk_base::StateStore] (the one from
152    /// [`Client::state_store`][crate::Client::state_store]).
153    #[instrument(skip_all)]
154    async fn store(&mut self, new_value: LatestEventValue) {
155        let Some(room) = self.weak_room.get() else {
156            warn!(room_id = ?self.weak_room.room_id(), "Cannot store the latest event value because the room cannot be accessed");
157            return;
158        };
159
160        // Compute a new `RoomInfo`.
161        let mut room_info = room.clone_info();
162        room_info.set_new_latest_event(new_value);
163
164        let mut state_changes = StateChanges::default();
165        state_changes.add_room(room_info.clone());
166
167        let client = room.client();
168
169        // Take the state store lock.
170        let _state_store_lock = client.base_client().state_store_lock().lock().await;
171
172        // Update the `RoomInfo` in the state store.
173        if let Err(error) = client.state_store().save_changes(&state_changes).await {
174            error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
175        }
176
177        // Update the `RoomInfo` of the room.
178        room.set_room_info(room_info, RoomInfoNotableUpdateReasons::LATEST_EVENT);
179    }
180}
181
182#[cfg(all(not(target_family = "wasm"), test))]
183mod tests_latest_event {
184    use assert_matches::assert_matches;
185    use matrix_sdk_base::{
186        RoomInfoNotableUpdateReasons, RoomState,
187        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
188        store::StoreConfig,
189    };
190    use matrix_sdk_test::{async_test, event_factory::EventFactory};
191    use ruma::{
192        MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
193        events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
194        room_id, user_id,
195    };
196
197    use super::{LatestEvent, LatestEventValue, LocalLatestEventValue, SerializableEventContent};
198    use crate::{
199        client::WeakClient,
200        room::WeakRoom,
201        send_queue::{LocalEcho, LocalEchoContent, RoomSendQueue, RoomSendQueueUpdate, SendHandle},
202        test_utils::mocks::MatrixMockServer,
203    };
204
205    fn local_room_message(body: &str) -> LocalLatestEventValue {
206        LocalLatestEventValue {
207            timestamp: MilliSecondsSinceUnixEpoch::now(),
208            content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
209                RoomMessageEventContent::text_plain(body),
210            ))
211            .unwrap(),
212        }
213    }
214
215    fn new_local_echo_content(
216        room_send_queue: &RoomSendQueue,
217        transaction_id: &OwnedTransactionId,
218        body: &str,
219    ) -> LocalEchoContent {
220        LocalEchoContent::Event {
221            serialized_event: SerializableEventContent::new(
222                &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
223            )
224            .unwrap(),
225            send_handle: SendHandle::new(
226                room_send_queue.clone(),
227                transaction_id.clone(),
228                MilliSecondsSinceUnixEpoch::now(),
229            ),
230            send_error: None,
231        }
232    }
233
234    #[async_test]
235    async fn test_update_ignores_none_value() {
236        let room_id = room_id!("!r0");
237
238        let server = MatrixMockServer::new().await;
239        let client = server.client_builder().build().await;
240        let weak_client = WeakClient::from_client(&client);
241
242        // Create the room.
243        client.base_client().get_or_create_room(room_id, RoomState::Joined);
244        let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
245
246        // Get a `RoomEventCache`.
247        let event_cache = client.event_cache();
248        event_cache.subscribe().unwrap();
249
250        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
251
252        let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
253
254        // First off, check the default value is `None`!
255        assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
256
257        // Second, set a new value.
258        latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
259
260        assert_matches!(
261            latest_event.current_value.get().await,
262            LatestEventValue::LocalIsSending(_)
263        );
264
265        // Finally, set a new `None` value. It must be ignored.
266        latest_event.update(LatestEventValue::None).await;
267
268        assert_matches!(
269            latest_event.current_value.get().await,
270            LatestEventValue::LocalIsSending(_)
271        );
272    }
273
274    #[async_test]
275    async fn test_local_has_priority_over_remote() {
276        let room_id = room_id!("!r0").to_owned();
277        let user_id = user_id!("@mnt_io:matrix.org");
278        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
279
280        let server = MatrixMockServer::new().await;
281        let client = server.client_builder().build().await;
282        client.base_client().get_or_create_room(&room_id, RoomState::Joined);
283        let room = client.get_room(&room_id).unwrap();
284        let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
285
286        let event_cache = client.event_cache();
287        event_cache.subscribe().unwrap();
288
289        // Fill the event cache with one event.
290        client
291            .event_cache_store()
292            .lock()
293            .await
294            .expect("Could not acquire the event cache lock")
295            .as_clean()
296            .expect("Could not acquire a clean event cache lock")
297            .handle_linked_chunk_updates(
298                LinkedChunkId::Room(&room_id),
299                vec![
300                    Update::NewItemsChunk {
301                        previous: None,
302                        new: ChunkIdentifier::new(0),
303                        next: None,
304                    },
305                    Update::PushItems {
306                        at: Position::new(ChunkIdentifier::new(0), 0),
307                        items: vec![event_factory.text_msg("A").event_id(event_id!("$ev0")).into()],
308                    },
309                ],
310            )
311            .await
312            .unwrap();
313
314        let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
315
316        let send_queue = client.send_queue();
317        let room_send_queue = send_queue.for_room(room);
318
319        let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
320
321        // First, let's create a `LatestEventValue` from the event cache. It must work.
322        {
323            latest_event.update_with_event_cache(&room_event_cache, None, None).await;
324
325            assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
326        }
327
328        // Second, let's create a `LatestEventValue` from the send queue. It
329        // must overwrite the current `LatestEventValue`.
330        let transaction_id = OwnedTransactionId::from("txnid0");
331
332        {
333            let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
334
335            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
336                transaction_id: transaction_id.clone(),
337                content,
338            });
339
340            latest_event.update_with_send_queue(&update, &room_event_cache, None, None).await;
341
342            assert_matches!(
343                latest_event.current_value.get().await,
344                LatestEventValue::LocalIsSending(_)
345            );
346        }
347
348        // Third, let's create a `LatestEventValue` from the event cache.
349        // Nothing must happen, it cannot overwrite the current
350        // `LatestEventValue` because the local event isn't sent yet.
351        {
352            latest_event.update_with_event_cache(&room_event_cache, None, None).await;
353
354            assert_matches!(
355                latest_event.current_value.get().await,
356                LatestEventValue::LocalIsSending(_)
357            );
358        }
359
360        // Fourth, let's a `LatestEventValue` from the send queue. It must stay the
361        // same, but now the local event is sent.
362        {
363            let update = RoomSendQueueUpdate::SentEvent {
364                transaction_id,
365                event_id: event_id!("$ev1").to_owned(),
366            };
367
368            latest_event.update_with_send_queue(&update, &room_event_cache, None, None).await;
369
370            assert_matches!(
371                latest_event.current_value.get().await,
372                LatestEventValue::LocalIsSending(_)
373            );
374        }
375
376        // Finally, let's create a `LatestEventValue` from the event cache. _Now_ it's
377        // possible, because there is no more local events.
378        {
379            latest_event.update_with_event_cache(&room_event_cache, None, None).await;
380
381            assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
382        }
383    }
384
385    #[async_test]
386    async fn test_store_latest_event_value() {
387        let room_id = room_id!("!r0").to_owned();
388        let user_id = user_id!("@mnt_io:matrix.org");
389        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
390
391        let server = MatrixMockServer::new().await;
392
393        let store_config = StoreConfig::new("cross-process-lock-holder".to_owned());
394
395        // Load the client for the first time, and run some operations.
396        {
397            let client = server
398                .client_builder()
399                .on_builder(|builder| builder.store_config(store_config.clone()))
400                .build()
401                .await;
402            let mut room_info_notable_update_receiver = client.room_info_notable_update_receiver();
403            let room = client.base_client().get_or_create_room(&room_id, RoomState::Joined);
404            let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
405
406            let event_cache = client.event_cache();
407            event_cache.subscribe().unwrap();
408
409            // Fill the event cache with one event.
410            client
411                .event_cache_store()
412                .lock()
413                .await
414                .expect("Could not acquire the event cache lock")
415                .as_clean()
416                .expect("Could not acquire a clean event cache lock")
417                .handle_linked_chunk_updates(
418                    LinkedChunkId::Room(&room_id),
419                    vec![
420                        Update::NewItemsChunk {
421                            previous: None,
422                            new: ChunkIdentifier::new(0),
423                            next: None,
424                        },
425                        Update::PushItems {
426                            at: Position::new(ChunkIdentifier::new(0), 0),
427                            items: vec![
428                                event_factory.text_msg("A").event_id(event_id!("$ev0")).into(),
429                            ],
430                        },
431                    ],
432                )
433                .await
434                .unwrap();
435
436            let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
437
438            // Check there is no `LatestEventValue` for the moment.
439            {
440                let latest_event = room.new_latest_event();
441
442                assert_matches!(latest_event, LatestEventValue::None);
443            }
444
445            // Generate a new `LatestEventValue`.
446            {
447                let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
448                latest_event.update_with_event_cache(&room_event_cache, None, None).await;
449
450                assert_matches!(
451                    latest_event.current_value.get().await,
452                    LatestEventValue::Remote(_)
453                );
454            }
455
456            // We see the `RoomInfoNotableUpdateReasons`.
457            {
458                let update = room_info_notable_update_receiver.recv().await.unwrap();
459
460                assert_eq!(update.room_id, room_id);
461                assert!(update.reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
462            }
463
464            // Check it's in the `RoomInfo` and in `Room`.
465            {
466                let latest_event = room.new_latest_event();
467
468                assert_matches!(latest_event, LatestEventValue::Remote(_));
469            }
470        }
471
472        // Reload the client with the same store config, and see the `LatestEventValue`
473        // is inside the `RoomInfo`.
474        {
475            let client = server
476                .client_builder()
477                .on_builder(|builder| builder.store_config(store_config))
478                .build()
479                .await;
480            let room = client.get_room(&room_id).unwrap();
481            let latest_event = room.new_latest_event();
482
483            assert_matches!(latest_event, LatestEventValue::Remote(_));
484        }
485    }
486}
487
488/// A builder of [`LatestEventValue`]s.
489struct LatestEventValueBuilder;
490
491impl LatestEventValueBuilder {
492    /// Create a new [`LatestEventValue::Remote`].
493    async fn new_remote(
494        room_event_cache: &RoomEventCache,
495        weak_room: &WeakRoom,
496    ) -> LatestEventValue {
497        // Get the power levels of the user for the current room if the `WeakRoom` is
498        // still valid.
499        let room = weak_room.get();
500        let (own_user_id, power_levels) = match &room {
501            Some(room) => {
502                let power_levels = room.power_levels().await.ok();
503
504                (Some(room.own_user_id()), power_levels)
505            }
506
507            None => (None, None),
508        };
509
510        Self::new_remote_with_power_levels(room_event_cache, own_user_id, power_levels.as_ref())
511            .await
512    }
513
514    /// Create a new [`LatestEventValue::Remote`] based on existing power
515    /// levels.
516    async fn new_remote_with_power_levels(
517        room_event_cache: &RoomEventCache,
518        own_user_id: Option<&UserId>,
519        power_levels: Option<&RoomPowerLevels>,
520    ) -> LatestEventValue {
521        if let Ok(Some(event)) = room_event_cache
522            .rfind_map_event_in_memory_by(|event, previous_event| {
523                filter_timeline_event(event, previous_event, own_user_id, power_levels)
524                    .then(|| event.clone())
525            })
526            .await
527        {
528            LatestEventValue::Remote(event)
529        } else {
530            LatestEventValue::default()
531        }
532    }
533
534    /// Create a new [`LatestEventValue::LocalIsSending`] or
535    /// [`LatestEventValue::LocalCannotBeSent`].
536    async fn new_local(
537        send_queue_update: &RoomSendQueueUpdate,
538        buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
539        room_event_cache: &RoomEventCache,
540        own_user_id: Option<&UserId>,
541        power_levels: Option<&RoomPowerLevels>,
542    ) -> LatestEventValue {
543        use crate::send_queue::{LocalEcho, LocalEchoContent};
544
545        match send_queue_update {
546            // A new local event is being sent.
547            //
548            // Let's create the `LatestEventValue` and push it in the buffer of values.
549            RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
550                transaction_id,
551                content: local_echo_content,
552            }) => match local_echo_content {
553                LocalEchoContent::Event { serialized_event: serialized_event_content, .. } => {
554                    match serialized_event_content.deserialize() {
555                        Ok(content) => {
556                            if filter_any_message_like_event_content(content, None, None) {
557                                let local_value = LocalLatestEventValue {
558                                    timestamp: MilliSecondsSinceUnixEpoch::now(),
559                                    content: serialized_event_content.clone(),
560                                };
561
562                                // If a local previous `LatestEventValue` exists and has been marked
563                                // as “cannot be sent”, it means the new `LatestEventValue` must
564                                // also be marked as “cannot be sent”.
565                                let value = if let Some(LatestEventValue::LocalCannotBeSent(_)) =
566                                    buffer_of_values_for_local_events.last()
567                                {
568                                    LatestEventValue::LocalCannotBeSent(local_value)
569                                } else {
570                                    LatestEventValue::LocalIsSending(local_value)
571                                };
572
573                                buffer_of_values_for_local_events
574                                    .push(transaction_id.to_owned(), value.clone());
575
576                                value
577                            } else {
578                                LatestEventValue::None
579                            }
580                        }
581
582                        Err(error) => {
583                            error!(
584                                ?error,
585                                "Failed to deserialize an event from `RoomSendQueueUpdate::NewLocalEvent`"
586                            );
587
588                            LatestEventValue::None
589                        }
590                    }
591                }
592
593                LocalEchoContent::React { .. } => LatestEventValue::None,
594            },
595
596            // A local event has been cancelled before being sent.
597            //
598            // Remove the calculated `LatestEventValue` from the buffer of values, and return the
599            // last `LatestEventValue` or calculate a new one.
600            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
601                if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
602                    buffer_of_values_for_local_events.remove(position);
603                }
604
605                Self::new_local_or_remote(
606                    buffer_of_values_for_local_events,
607                    room_event_cache,
608                    own_user_id,
609                    power_levels,
610                )
611                .await
612            }
613
614            // A local event has successfully been sent!
615            //
616            // Mark all “cannot be sent” values as “is sending” after the one matching
617            // `transaction_id`. Indeed, if an event has been sent, it means the send queue is
618            // working, so if any value has been marked as “cannot be sent”, it must be marked as
619            // “is sending”. Then, remove the calculated `LatestEventValue` from the buffer of
620            // values. Finally, return the last `LatestEventValue` or calculate a new
621            // one.
622            RoomSendQueueUpdate::SentEvent { transaction_id, .. } => {
623                let position =
624                    buffer_of_values_for_local_events.mark_is_sending_after(transaction_id);
625
626                // First, compute the new value. Then we remove the sent local event from the
627                // buffer.
628                //
629                // Why in this order? Because in between sending and remote echoing, the event
630                // will only be stored as a local echo and not as a full event in the event
631                // cache. Just after sending, it won't show up as a `LatestEventValue` as it
632                // will immediately be replaced by an event from the event cache. By computing
633                // the new value before removing it from the buffer, we ensure the
634                // `LatestEventValue` represents the just sent local event.
635                //
636                // Note: the next sync may not include the just sent local event. This is a race
637                // condition we are aware of, see https://github.com/matrix-org/matrix-rust-sdk/issues/3941.
638                let value = Self::new_local_or_remote(
639                    buffer_of_values_for_local_events,
640                    room_event_cache,
641                    own_user_id,
642                    power_levels,
643                )
644                .await;
645
646                if let Some(position) = position {
647                    buffer_of_values_for_local_events.remove(position);
648                }
649
650                value
651            }
652
653            // A local event has been replaced by another one.
654            //
655            // Replace the latest event value matching `transaction_id` in the buffer if it exists
656            // (note: it should!), and return the last `LatestEventValue` or calculate a new one.
657            RoomSendQueueUpdate::ReplacedLocalEvent {
658                transaction_id,
659                new_content: new_serialized_event_content,
660            } => {
661                if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
662                    match new_serialized_event_content.deserialize() {
663                        Ok(content) => {
664                            if filter_any_message_like_event_content(content, None, None) {
665                                buffer_of_values_for_local_events.replace_content(
666                                    position,
667                                    new_serialized_event_content.clone(),
668                                );
669                            } else {
670                                buffer_of_values_for_local_events.remove(position);
671                            }
672                        }
673
674                        Err(error) => {
675                            error!(
676                                ?error,
677                                "Failed to deserialize an event from `RoomSendQueueUpdate::ReplacedLocalEvent`"
678                            );
679
680                            return LatestEventValue::None;
681                        }
682                    }
683                }
684
685                Self::new_local_or_remote(
686                    buffer_of_values_for_local_events,
687                    room_event_cache,
688                    own_user_id,
689                    power_levels,
690                )
691                .await
692            }
693
694            // An error has occurred.
695            //
696            // Mark the latest event value matching `transaction_id`, and all its following values,
697            // as “cannot be sent”.
698            RoomSendQueueUpdate::SendError { transaction_id, .. } => {
699                buffer_of_values_for_local_events.mark_cannot_be_sent_from(transaction_id);
700
701                Self::new_local_or_remote(
702                    buffer_of_values_for_local_events,
703                    room_event_cache,
704                    own_user_id,
705                    power_levels,
706                )
707                .await
708            }
709
710            // A local event has been unwedged and sending is being retried.
711            //
712            // Mark the latest event value matching `transaction_id`, and all its following values,
713            // as “is sending”.
714            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
715                buffer_of_values_for_local_events.mark_is_sending_from(transaction_id);
716
717                Self::new_local_or_remote(
718                    buffer_of_values_for_local_events,
719                    room_event_cache,
720                    own_user_id,
721                    power_levels,
722                )
723                .await
724            }
725
726            // A media upload has made progress.
727            //
728            // Nothing to do here.
729            RoomSendQueueUpdate::MediaUpload { .. } => LatestEventValue::None,
730        }
731    }
732
733    /// Get the last [`LatestEventValue`] from the local latest event values if
734    /// any, or create a new [`LatestEventValue`] from the remote events.
735    ///
736    /// If the buffer of latest event values is not empty, let's return the last
737    /// one. Otherwise, it means we no longer have any local event: let's
738    /// fallback on remote event!
739    async fn new_local_or_remote(
740        buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
741        room_event_cache: &RoomEventCache,
742        own_user_id: Option<&UserId>,
743        power_levels: Option<&RoomPowerLevels>,
744    ) -> LatestEventValue {
745        if let Some(value) = buffer_of_values_for_local_events.last() {
746            value.clone()
747        } else {
748            Self::new_remote_with_power_levels(room_event_cache, own_user_id, power_levels).await
749        }
750    }
751}
752
753/// A buffer of the current [`LatestEventValue`] computed for local events
754/// seen by the send queue. It is used by
755/// [`LatestEvent::buffer_of_values_for_local_events`].
756///
757/// The system does only receive [`RoomSendQueueUpdate`]s. It's not designed to
758/// iterate over local events in the send queue when a local event is changed
759/// (cancelled, or updated for example). That's why we keep our own buffer here.
760/// Imagine the system receives 4 [`RoomSendQueueUpdate`]:
761///
762/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
763/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
764/// 3. [`RoomSendQueueUpdate::ReplacedLocalEvent`]: replaced the first local
765///    event,
766/// 4. [`RoomSendQueueUpdate::CancelledLocalEvent`]: cancelled the second local
767///    event.
768///
769/// `NewLocalEvent`s will trigger the computation of new
770/// `LatestEventValue`s, but `CancelledLocalEvent` for example doesn't hold
771/// any information to compute a new `LatestEventValue`, so we need to
772/// remember the previous values, until the local events are sent and
773/// removed from this buffer.
774///
775/// Another reason why we need a buffer is to handle wedged local event. Imagine
776/// the system receives 3 [`RoomSendQueueUpdate`]:
777///
778/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
779/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
780/// 3. [`RoomSendQueueUpdate::SendError`]: the first local event has failed to
781///    be sent.
782///
783/// Because a `SendError` is received (targeting the first `NewLocalEvent`), the
784/// send queue is stopped. However, the `LatestEventValue` targets the second
785/// `NewLocalEvent`. The system must consider that when a local event is wedged,
786/// all the following local events must also be marked as “cannot be sent”. And
787/// vice versa, when the send queue is able to send an event again, all the
788/// following local events must be marked as “is sending”.
789///
790/// This type isolates a couple of methods designed to manage these specific
791/// behaviours.
792#[derive(Debug)]
793struct LatestEventValuesForLocalEvents {
794    buffer: Vec<(OwnedTransactionId, LatestEventValue)>,
795}
796
797impl LatestEventValuesForLocalEvents {
798    /// Create a new [`LatestEventValuesForLocalEvents`].
799    fn new() -> Self {
800        Self { buffer: Vec::with_capacity(2) }
801    }
802
803    /// Check the buffer is empty.
804    fn is_empty(&self) -> bool {
805        self.buffer.is_empty()
806    }
807
808    /// Get the last [`LatestEventValue`].
809    fn last(&self) -> Option<&LatestEventValue> {
810        self.buffer.last().map(|(_, value)| value)
811    }
812
813    /// Find the position of the [`LatestEventValue`] matching `transaction_id`.
814    fn position(&self, transaction_id: &TransactionId) -> Option<usize> {
815        self.buffer
816            .iter()
817            .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
818    }
819
820    /// Push a new [`LatestEventValue`].
821    ///
822    /// # Panics
823    ///
824    /// Panics if `value` is not of kind [`LatestEventValue::LocalIsSending`] or
825    /// [`LatestEventValue::LocalCannotBeSent`].
826    fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) {
827        assert!(
828            matches!(
829                value,
830                LatestEventValue::LocalIsSending(_) | LatestEventValue::LocalCannotBeSent(_)
831            ),
832            "`value` must be either `LocalIsSending` or `LocalCannotBeSent`"
833        );
834
835        self.buffer.push((transaction_id, value));
836    }
837
838    /// Replace the content of the [`LatestEventValue`] at position `position`.
839    ///
840    /// # Panics
841    ///
842    /// Panics if:
843    /// - `position` is strictly greater than buffer's length,
844    /// - the [`LatestEventValue`] is not of kind
845    ///   [`LatestEventValue::LocalIsSending`] or
846    ///   [`LatestEventValue::LocalCannotBeSent`].
847    fn replace_content(&mut self, position: usize, new_content: SerializableEventContent) {
848        let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid");
849
850        match value {
851            LatestEventValue::LocalIsSending(LocalLatestEventValue { content, .. }) => {
852                *content = new_content;
853            }
854
855            LatestEventValue::LocalCannotBeSent(LocalLatestEventValue { content, .. }) => {
856                *content = new_content;
857            }
858
859            _ => panic!("`value` must be either `LocalIsSending` or `LocalCannotBeSent`"),
860        }
861    }
862
863    /// Remove the [`LatestEventValue`] at position `position`.
864    ///
865    /// # Panics
866    ///
867    /// Panics if `position` is strictly greater than buffer's length.
868    fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) {
869        self.buffer.remove(position)
870    }
871
872    /// Mark the `LatestEventValue` matching `transaction_id`, and all the
873    /// following values, as “cannot be sent”.
874    fn mark_cannot_be_sent_from(&mut self, transaction_id: &TransactionId) {
875        let mut values = self.buffer.iter_mut();
876
877        if let Some(first_value_to_wedge) = values
878            .by_ref()
879            .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
880        {
881            // Iterate over the found value and the following ones.
882            for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) {
883                if let LatestEventValue::LocalIsSending(content) = value_to_wedge {
884                    *value_to_wedge = LatestEventValue::LocalCannotBeSent(content.clone());
885                }
886            }
887        }
888    }
889
890    /// Mark the `LatestEventValue` matching `transaction_id`, and all the
891    /// following values, as “is sending”.
892    fn mark_is_sending_from(&mut self, transaction_id: &TransactionId) {
893        let mut values = self.buffer.iter_mut();
894
895        if let Some(first_value_to_unwedge) = values
896            .by_ref()
897            .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
898        {
899            // Iterate over the found value and the following ones.
900            for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) {
901                if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
902                    *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
903                }
904            }
905        }
906    }
907
908    /// Mark all the following values after the `LatestEventValue` matching
909    /// `transaction_id` as “is sending”.
910    ///
911    /// Note that contrary to [`Self::mark_is_sending_from`], the
912    /// `LatestEventValue` is untouched. However, its position is returned
913    /// (if any).
914    fn mark_is_sending_after(&mut self, transaction_id: &TransactionId) -> Option<usize> {
915        let mut values = self.buffer.iter_mut();
916
917        if let Some(position) = values
918            .by_ref()
919            .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
920        {
921            // Iterate over all values after the found one.
922            for (_, value_to_unwedge) in values {
923                if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
924                    *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
925                }
926            }
927
928            Some(position)
929        } else {
930            None
931        }
932    }
933}
934
935fn filter_timeline_event(
936    timeline_event: &TimelineEvent,
937    previous_event: Option<&TimelineEvent>,
938    own_user_id: Option<&UserId>,
939    power_levels: Option<&RoomPowerLevels>,
940) -> bool {
941    // Cast the event into an `AnySyncTimelineEvent`. If deserializing fails, we
942    // ignore the event.
943    let event = match timeline_event.raw().deserialize() {
944        Ok(event) => event,
945        Err(error) => {
946            error!(
947                ?error,
948                "Failed to deserialize the event when looking for a suitable latest event"
949            );
950
951            return false;
952        }
953    };
954
955    match event {
956        AnySyncTimelineEvent::MessageLike(message_like_event) => {
957            match message_like_event.original_content() {
958                Some(any_message_like_event_content) => filter_any_message_like_event_content(
959                    any_message_like_event_content,
960                    Some(timeline_event),
961                    previous_event,
962                ),
963
964                // The event has been redacted.
965                None => true,
966            }
967        }
968
969        AnySyncTimelineEvent::State(state) => {
970            filter_any_sync_state_event(state, own_user_id, power_levels)
971        }
972    }
973}
974
975fn filter_any_message_like_event_content(
976    content: AnyMessageLikeEventContent,
977    new_event: Option<&TimelineEvent>,
978    previous_event: Option<&TimelineEvent>,
979) -> bool {
980    match content {
981        AnyMessageLikeEventContent::RoomMessage(message) => {
982            // Don't show incoming verification requests.
983            if let MessageType::VerificationRequest(_) = message.msgtype {
984                return false;
985            }
986
987            // Not all relations are accepted. Let's filter them.
988            match &message.relates_to {
989                Some(Relation::Replacement(Replacement { .. })) => {
990                    // If the edit is a valid replacement event for the previous latest event, it's
991                    // a valid new latest event, otherwise let's ignore it.
992                    if let Some(event) = previous_event
993                        && let Some(edit) = new_event
994                    {
995                        let original = event.kind.raw();
996                        let original_encryption_info = event.kind.encryption_info();
997
998                        let replacement = edit.kind.raw();
999                        let replacement_encryption_info = event.kind.encryption_info();
1000
1001                        match check_validity_of_replacement_events(
1002                            original,
1003                            original_encryption_info.map(|e| &(**e)),
1004                            replacement,
1005                            replacement_encryption_info.map(|e| &(**e)),
1006                        ) {
1007                            Ok(_) => true,
1008                            Err(e) => {
1009                                debug!(
1010                                    "Skipping an edit of a latest event due to the replacement event being invalid: {e}"
1011                                );
1012                                false
1013                            }
1014                        }
1015                    } else {
1016                        false
1017                    }
1018                }
1019
1020                _ => true,
1021            }
1022        }
1023
1024        AnyMessageLikeEventContent::UnstablePollStart(_)
1025        | AnyMessageLikeEventContent::CallInvite(_)
1026        | AnyMessageLikeEventContent::RtcNotification(_)
1027        | AnyMessageLikeEventContent::Sticker(_) => true,
1028
1029        // Encrypted events are not suitable.
1030        AnyMessageLikeEventContent::RoomEncrypted(_) => false,
1031
1032        // Everything else is considered not suitable.
1033        _ => false,
1034    }
1035}
1036
1037fn filter_any_sync_state_event(
1038    event: AnySyncStateEvent,
1039    own_user_id: Option<&UserId>,
1040    power_levels: Option<&RoomPowerLevels>,
1041) -> bool {
1042    match event {
1043        AnySyncStateEvent::RoomMember(member) => {
1044            match member.membership() {
1045                MembershipState::Knock => {
1046                    let can_accept_or_decline_knocks = match (own_user_id, power_levels) {
1047                        (Some(own_user_id), Some(room_power_levels)) => {
1048                            room_power_levels.user_can_invite(own_user_id)
1049                                || room_power_levels
1050                                    .user_can_kick_user(own_user_id, member.state_key())
1051                        }
1052                        _ => false,
1053                    };
1054
1055                    // The current user can act on the knock changes, so they should be
1056                    // displayed
1057                    if can_accept_or_decline_knocks {
1058                        // We can only decide whether the user can accept or decline knocks if the
1059                        // event isn't redacted.
1060                        return matches!(member, SyncStateEvent::Original(_));
1061                    }
1062
1063                    false
1064                }
1065
1066                MembershipState::Invite => {
1067                    // The current _is_ invited (not someone else).
1068                    match member {
1069                        // We can only decide whether the user is invited if the event isn't
1070                        // redacted.
1071                        SyncStateEvent::Original(state) => {
1072                            Some(state.state_key.deref()) == own_user_id
1073                        }
1074
1075                        _ => false,
1076                    }
1077                }
1078
1079                _ => false,
1080            }
1081        }
1082
1083        _ => false,
1084    }
1085}
1086
1087#[cfg(test)]
1088mod tests_latest_event_content {
1089    use std::ops::Not;
1090
1091    use matrix_sdk_test::event_factory::EventFactory;
1092    use ruma::{
1093        event_id,
1094        events::{room::message::RoomMessageEventContent, rtc::notification::NotificationType},
1095        owned_user_id, user_id,
1096    };
1097
1098    use super::filter_timeline_event;
1099    use crate::latest_events::latest_event::tests_latest_event_values_for_local_events::remote_room_message_with_event_id;
1100
1101    macro_rules! assert_latest_event_content {
1102        ( event | $event_factory:ident | $event_builder:block
1103          is a candidate ) => {
1104            assert_latest_event_content!(@_ | $event_factory | $event_builder, true);
1105        };
1106
1107        ( event | $event_factory:ident | $event_builder:block
1108          is not a candidate ) => {
1109            assert_latest_event_content!(@_ | $event_factory | $event_builder, false);
1110        };
1111
1112        ( @_ | $event_factory:ident | $event_builder:block, $expect:literal ) => {
1113            let user_id = user_id!("@mnt_io:matrix.org");
1114            let event_factory = EventFactory::new().sender(user_id);
1115            let event = {
1116                let $event_factory = event_factory;
1117                $event_builder
1118            };
1119
1120            assert_eq!(filter_timeline_event(&event, None, Some(user_id!("@mnt_io:matrix.org")), None), $expect );
1121        };
1122    }
1123
1124    #[test]
1125    fn test_room_message() {
1126        assert_latest_event_content!(
1127            event | event_factory | { event_factory.text_msg("hello").into_event() }
1128            is a candidate
1129        );
1130    }
1131
1132    #[test]
1133    fn test_redacted() {
1134        assert_latest_event_content!(
1135            event | event_factory | {
1136                event_factory
1137                    .redacted(
1138                        user_id!("@mnt_io:matrix.org"),
1139                        ruma::events::room::message::RedactedRoomMessageEventContent::new(),
1140                    )
1141                    .into_event()
1142            }
1143            is a candidate
1144        );
1145    }
1146
1147    #[test]
1148    fn test_room_message_replacement() {
1149        let user_id = user_id!("@mnt_io:matrix.org");
1150        let event_factory = EventFactory::new().sender(user_id);
1151        let event = event_factory
1152            .text_msg("bonjour")
1153            .edit(event_id!("$ev0"), RoomMessageEventContent::text_plain("hello").into())
1154            .into_event();
1155
1156        // Without a previous event.
1157        //
1158        // This is an edge case where either the event cache has been emptied and only
1159        // the edit is received via the sync for example, or either the previous event
1160        // is part of another chunk that is not loaded in memory yet. In this case,
1161        // let's not consider the event as a `LatestEventValue` candidate.
1162        {
1163            let previous_event_id = None;
1164
1165            assert!(
1166                filter_timeline_event(
1167                    &event,
1168                    previous_event_id,
1169                    Some(user_id!("@mnt_io:matrix.org")),
1170                    None
1171                )
1172                .not()
1173            );
1174        }
1175
1176        // With a previous event, but the one being replaced.
1177        {
1178            let previous_value =
1179                remote_room_message_with_event_id(event_id!("$ev1"), "Hello world");
1180
1181            assert!(
1182                filter_timeline_event(
1183                    &event,
1184                    Some(&previous_value),
1185                    Some(user_id!("@mnt_io:matrix.org")),
1186                    None
1187                )
1188                .not()
1189            );
1190        }
1191
1192        // With a previous event, and that's the one being replaced!
1193        {
1194            let previous_value =
1195                remote_room_message_with_event_id(event_id!("$ev0"), "Hello world");
1196
1197            assert!(filter_timeline_event(
1198                &event,
1199                Some(&previous_value),
1200                Some(user_id!("@mnt_io:matrix.org")),
1201                None
1202            ));
1203        }
1204    }
1205
1206    #[test]
1207    fn test_poll() {
1208        assert_latest_event_content!(
1209            event | event_factory | {
1210                event_factory
1211                    .poll_start("the people need to know", "comté > gruyère", vec!["yes", "oui"])
1212                    .into_event()
1213            }
1214            is a candidate
1215        );
1216    }
1217
1218    #[test]
1219    fn test_call_invite() {
1220        assert_latest_event_content!(
1221            event | event_factory | {
1222                event_factory
1223                    .call_invite(
1224                        ruma::OwnedVoipId::from("vvooiipp".to_owned()),
1225                        ruma::UInt::from(1234u32),
1226                        ruma::events::call::SessionDescription::new(
1227                            "type".to_owned(),
1228                            "sdp".to_owned(),
1229                        ),
1230                        ruma::VoipVersionId::V1,
1231                    )
1232                    .into_event()
1233            }
1234            is a candidate
1235        );
1236    }
1237
1238    #[test]
1239    fn test_rtc_notification() {
1240        assert_latest_event_content!(
1241            event | event_factory | {
1242                event_factory
1243                     .rtc_notification(
1244                        NotificationType::Ring,
1245                    )
1246                    .mentions(vec![owned_user_id!("@alice:server.name")])
1247                    .relates_to_membership_state_event(ruma::OwnedEventId::try_from("$abc:server.name").unwrap())
1248                    .lifetime(60)
1249                    .into_event()
1250            }
1251            is a candidate
1252        );
1253    }
1254
1255    #[test]
1256    fn test_sticker() {
1257        assert_latest_event_content!(
1258            event | event_factory | {
1259                event_factory
1260                    .sticker(
1261                        "wink wink",
1262                        ruma::events::room::ImageInfo::new(),
1263                        ruma::OwnedMxcUri::from("mxc://foo/bar"),
1264                    )
1265                    .into_event()
1266            }
1267            is a candidate
1268        );
1269    }
1270
1271    #[test]
1272    fn test_encrypted_room_message() {
1273        assert_latest_event_content!(
1274            event | event_factory | {
1275                event_factory
1276                    .event(ruma::events::room::encrypted::RoomEncryptedEventContent::new(
1277                        ruma::events::room::encrypted::EncryptedEventScheme::MegolmV1AesSha2(
1278                            ruma::events::room::encrypted::MegolmV1AesSha2ContentInit {
1279                                ciphertext: "cipher".to_owned(),
1280                                sender_key: "sender_key".to_owned(),
1281                                device_id: "device_id".into(),
1282                                session_id: "session_id".to_owned(),
1283                            }
1284                            .into(),
1285                        ),
1286                        None,
1287                    ))
1288                    .into_event()
1289            }
1290            is not a candidate
1291        );
1292    }
1293
1294    #[test]
1295    fn test_reaction() {
1296        // Take a random message-like event.
1297        assert_latest_event_content!(
1298            event | event_factory | { event_factory.reaction(event_id!("$ev0"), "+1").into_event() }
1299            is not a candidate
1300        );
1301    }
1302
1303    #[test]
1304    fn test_state_event() {
1305        assert_latest_event_content!(
1306            event | event_factory | { event_factory.room_topic("new room topic").into_event() }
1307            is not a candidate
1308        );
1309    }
1310
1311    #[test]
1312    fn test_knocked_state_event_without_power_levels() {
1313        assert_latest_event_content!(
1314            event | event_factory | {
1315                event_factory
1316                    .member(user_id!("@other_mnt_io:server.name"))
1317                    .membership(ruma::events::room::member::MembershipState::Knock)
1318                    .into_event()
1319            }
1320            is not a candidate
1321        );
1322    }
1323
1324    #[test]
1325    fn test_knocked_state_event_with_power_levels() {
1326        use ruma::{
1327            events::room::{
1328                member::MembershipState,
1329                power_levels::{RoomPowerLevels, RoomPowerLevelsSource},
1330            },
1331            room_version_rules::AuthorizationRules,
1332        };
1333
1334        let user_id = user_id!("@mnt_io:matrix.org");
1335        let other_user_id = user_id!("@other_mnt_io:server.name");
1336        let event_factory = EventFactory::new().sender(user_id);
1337        let event =
1338            event_factory.member(other_user_id).membership(MembershipState::Knock).into_event();
1339
1340        let mut room_power_levels =
1341            RoomPowerLevels::new(RoomPowerLevelsSource::None, &AuthorizationRules::V1, []);
1342        room_power_levels.users.insert(user_id.to_owned(), 5.into());
1343        room_power_levels.users.insert(other_user_id.to_owned(), 4.into());
1344
1345        // Cannot accept. Cannot decline.
1346        {
1347            room_power_levels.invite = 10.into();
1348            room_power_levels.kick = 10.into();
1349            assert!(
1350                filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)).not(),
1351                "cannot accept, cannot decline",
1352            );
1353        }
1354
1355        // Can accept. Cannot decline.
1356        {
1357            room_power_levels.invite = 0.into();
1358            room_power_levels.kick = 10.into();
1359            assert!(
1360                filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
1361                "can accept, cannot decline",
1362            );
1363        }
1364
1365        // Cannot accept. Can decline.
1366        {
1367            room_power_levels.invite = 10.into();
1368            room_power_levels.kick = 0.into();
1369            assert!(
1370                filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
1371                "cannot accept, can decline",
1372            );
1373        }
1374
1375        // Can accept. Can decline.
1376        {
1377            room_power_levels.invite = 0.into();
1378            room_power_levels.kick = 0.into();
1379            assert!(
1380                filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
1381                "can accept, can decline",
1382            );
1383        }
1384
1385        // Cannot accept. Can decline. But with an other user ID with at least the same
1386        // levels, i.e. the current user cannot kick another user with the same
1387        // or higher levels.
1388        {
1389            room_power_levels.users.insert(user_id.to_owned(), 5.into());
1390            room_power_levels.users.insert(other_user_id.to_owned(), 5.into());
1391
1392            room_power_levels.invite = 10.into();
1393            room_power_levels.kick = 0.into();
1394
1395            assert!(
1396                filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)).not(),
1397                "cannot accept, can decline, at least same user levels",
1398            );
1399        }
1400    }
1401
1402    #[test]
1403    fn test_invite_state_event() {
1404        use ruma::events::room::member::MembershipState;
1405
1406        // The current user is receiving an invite.
1407        assert_latest_event_content!(
1408            event | event_factory | {
1409                event_factory
1410                    .member(user_id!("@mnt_io:matrix.org"))
1411                    .membership(MembershipState::Invite)
1412                    .into_event()
1413            }
1414            is a candidate
1415        );
1416    }
1417
1418    #[test]
1419    fn test_invite_state_event_for_someone_else() {
1420        use ruma::events::room::member::MembershipState;
1421
1422        // The current user sees an invite but for someone else.
1423        assert_latest_event_content!(
1424            event | event_factory | {
1425                event_factory
1426                    .member(user_id!("@other_mnt_io:server.name"))
1427                    .membership(MembershipState::Invite)
1428                    .into_event()
1429            }
1430            is not a candidate
1431        );
1432    }
1433
1434    #[test]
1435    fn test_room_message_verification_request() {
1436        use ruma::{OwnedDeviceId, events::room::message};
1437
1438        assert_latest_event_content!(
1439            event | event_factory | {
1440                event_factory
1441                    .event(RoomMessageEventContent::new(message::MessageType::VerificationRequest(
1442                        message::KeyVerificationRequestEventContent::new(
1443                            "body".to_owned(),
1444                            vec![],
1445                            OwnedDeviceId::from("device_id"),
1446                            user_id!("@user:server.name").to_owned(),
1447                        ),
1448                    )))
1449                    .into_event()
1450            }
1451            is not a candidate
1452        );
1453    }
1454}
1455
1456#[cfg(test)]
1457mod tests_latest_event_values_for_local_events {
1458    use assert_matches::assert_matches;
1459    use ruma::{
1460        EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
1461        events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
1462        serde::Raw,
1463    };
1464    use serde_json::json;
1465
1466    use super::{
1467        LatestEventValue, LatestEventValuesForLocalEvents, LocalLatestEventValue,
1468        RemoteLatestEventValue, SerializableEventContent,
1469    };
1470
1471    pub fn remote_room_message_with_event_id(
1472        event_id: &EventId,
1473        body: &str,
1474    ) -> RemoteLatestEventValue {
1475        RemoteLatestEventValue::from_plaintext(
1476            Raw::from_json_string(
1477                json!({
1478                    "content": RoomMessageEventContent::text_plain(body),
1479                    "type": "m.room.message",
1480                    "event_id": event_id,
1481                    "origin_server_ts": 42,
1482                    "sender": "@mnt_io:matrix.org",
1483                })
1484                .to_string(),
1485            )
1486            .unwrap(),
1487        )
1488    }
1489
1490    fn remote_room_message(body: &str) -> RemoteLatestEventValue {
1491        let event_id = event_id!("$ev0");
1492        remote_room_message_with_event_id(&event_id, body)
1493    }
1494
1495    fn local_room_message(body: &str) -> LocalLatestEventValue {
1496        LocalLatestEventValue {
1497            timestamp: MilliSecondsSinceUnixEpoch::now(),
1498            content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
1499                RoomMessageEventContent::text_plain(body),
1500            ))
1501            .unwrap(),
1502        }
1503    }
1504
1505    #[test]
1506    fn test_last() {
1507        let mut buffer = LatestEventValuesForLocalEvents::new();
1508
1509        assert!(buffer.last().is_none());
1510
1511        buffer.push(
1512            OwnedTransactionId::from("txnid"),
1513            LatestEventValue::LocalIsSending(local_room_message("tome")),
1514        );
1515
1516        assert_matches!(buffer.last(), Some(LatestEventValue::LocalIsSending(_)));
1517    }
1518
1519    #[test]
1520    fn test_position() {
1521        let mut buffer = LatestEventValuesForLocalEvents::new();
1522        let transaction_id = OwnedTransactionId::from("txnid");
1523
1524        assert!(buffer.position(&transaction_id).is_none());
1525
1526        buffer.push(
1527            transaction_id.clone(),
1528            LatestEventValue::LocalIsSending(local_room_message("raclette")),
1529        );
1530        buffer.push(
1531            OwnedTransactionId::from("othertxnid"),
1532            LatestEventValue::LocalIsSending(local_room_message("tome")),
1533        );
1534
1535        assert_eq!(buffer.position(&transaction_id), Some(0));
1536    }
1537
1538    #[test]
1539    #[should_panic]
1540    fn test_push_none() {
1541        let mut buffer = LatestEventValuesForLocalEvents::new();
1542
1543        buffer.push(OwnedTransactionId::from("txnid"), LatestEventValue::None);
1544    }
1545
1546    #[test]
1547    #[should_panic]
1548    fn test_push_remote() {
1549        let mut buffer = LatestEventValuesForLocalEvents::new();
1550
1551        buffer.push(
1552            OwnedTransactionId::from("txnid"),
1553            LatestEventValue::Remote(remote_room_message("tome")),
1554        );
1555    }
1556
1557    #[test]
1558    fn test_push_local() {
1559        let mut buffer = LatestEventValuesForLocalEvents::new();
1560
1561        buffer.push(
1562            OwnedTransactionId::from("txnid0"),
1563            LatestEventValue::LocalIsSending(local_room_message("tome")),
1564        );
1565        buffer.push(
1566            OwnedTransactionId::from("txnid1"),
1567            LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1568        );
1569
1570        // no panic.
1571    }
1572
1573    #[test]
1574    fn test_replace_content() {
1575        let mut buffer = LatestEventValuesForLocalEvents::new();
1576
1577        buffer.push(
1578            OwnedTransactionId::from("txnid0"),
1579            LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1580        );
1581
1582        let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1583
1584        buffer.replace_content(0, new_content);
1585
1586        assert_matches!(
1587            buffer.last(),
1588            Some(LatestEventValue::LocalIsSending(local_event)) => {
1589                assert_matches!(
1590                    local_event.content.deserialize().unwrap(),
1591                    AnyMessageLikeEventContent::RoomMessage(content) => {
1592                        assert_eq!(content.body(), "comté");
1593                    }
1594                );
1595            }
1596        );
1597    }
1598
1599    #[test]
1600    fn test_remove() {
1601        let mut buffer = LatestEventValuesForLocalEvents::new();
1602
1603        buffer.push(
1604            OwnedTransactionId::from("txnid"),
1605            LatestEventValue::LocalIsSending(local_room_message("gryuère")),
1606        );
1607
1608        assert!(buffer.last().is_some());
1609
1610        buffer.remove(0);
1611
1612        assert!(buffer.last().is_none());
1613    }
1614
1615    #[test]
1616    fn test_mark_cannot_be_sent_from() {
1617        let mut buffer = LatestEventValuesForLocalEvents::new();
1618        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1619        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1620        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1621
1622        buffer.push(
1623            transaction_id_0,
1624            LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1625        );
1626        buffer.push(
1627            transaction_id_1.clone(),
1628            LatestEventValue::LocalIsSending(local_room_message("brigand")),
1629        );
1630        buffer.push(
1631            transaction_id_2,
1632            LatestEventValue::LocalIsSending(local_room_message("raclette")),
1633        );
1634
1635        buffer.mark_cannot_be_sent_from(&transaction_id_1);
1636
1637        assert_eq!(buffer.buffer.len(), 3);
1638        assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
1639        assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1640        assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalCannotBeSent(_));
1641    }
1642
1643    #[test]
1644    fn test_mark_is_sending_from() {
1645        let mut buffer = LatestEventValuesForLocalEvents::new();
1646        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1647        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1648        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1649
1650        buffer.push(
1651            transaction_id_0,
1652            LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1653        );
1654        buffer.push(
1655            transaction_id_1.clone(),
1656            LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1657        );
1658        buffer.push(
1659            transaction_id_2,
1660            LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1661        );
1662
1663        buffer.mark_is_sending_from(&transaction_id_1);
1664
1665        assert_eq!(buffer.buffer.len(), 3);
1666        assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1667        assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
1668        assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1669    }
1670
1671    #[test]
1672    fn test_mark_is_sending_after() {
1673        let mut buffer = LatestEventValuesForLocalEvents::new();
1674        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1675        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1676        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1677
1678        buffer.push(
1679            transaction_id_0,
1680            LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1681        );
1682        buffer.push(
1683            transaction_id_1.clone(),
1684            LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1685        );
1686        buffer.push(
1687            transaction_id_2,
1688            LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1689        );
1690
1691        buffer.mark_is_sending_after(&transaction_id_1);
1692
1693        assert_eq!(buffer.buffer.len(), 3);
1694        assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1695        assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1696        assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1697    }
1698}
1699
1700#[cfg(all(not(target_family = "wasm"), test))]
1701mod tests_latest_event_value_builder {
1702    use std::sync::Arc;
1703
1704    use assert_matches::assert_matches;
1705    use matrix_sdk_base::{
1706        RoomState,
1707        deserialized_responses::TimelineEventKind,
1708        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1709        store::SerializableEventContent,
1710    };
1711    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1712    use ruma::{
1713        MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedTransactionId, event_id,
1714        events::{
1715            AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
1716            SyncMessageLikeEvent, reaction::ReactionEventContent, relation::Annotation,
1717            room::message::RoomMessageEventContent,
1718        },
1719        room_id, user_id,
1720    };
1721
1722    use super::{
1723        LatestEventValue, LatestEventValueBuilder, LatestEventValuesForLocalEvents,
1724        RemoteLatestEventValue, RoomEventCache, RoomSendQueueUpdate,
1725    };
1726    use crate::{
1727        Client, Error,
1728        client::WeakClient,
1729        room::WeakRoom,
1730        send_queue::{AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle},
1731        test_utils::mocks::MatrixMockServer,
1732    };
1733
1734    macro_rules! assert_remote_value_matches_room_message_with_body {
1735        ( $latest_event_value:expr => with body = $body:expr ) => {
1736            assert_matches!(
1737                $latest_event_value,
1738                LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => {
1739                    assert_matches!(
1740                        event.deserialize().unwrap(),
1741                        AnySyncTimelineEvent::MessageLike(
1742                            AnySyncMessageLikeEvent::RoomMessage(
1743                                SyncMessageLikeEvent::Original(message_content)
1744                            )
1745                        ) => {
1746                            assert_eq!(message_content.content.body(), $body);
1747                        }
1748                    );
1749                }
1750            );
1751        };
1752    }
1753
1754    macro_rules! assert_local_value_matches_room_message_with_body {
1755        ( $latest_event_value:expr, $pattern:path => with body = $body:expr ) => {
1756            assert_matches!(
1757                $latest_event_value,
1758                $pattern (local_event) => {
1759                    assert_matches!(
1760                        local_event.content.deserialize().unwrap(),
1761                        AnyMessageLikeEventContent::RoomMessage(message_content) => {
1762                            assert_eq!(message_content.body(), $body);
1763                        }
1764                    );
1765                }
1766            );
1767        };
1768    }
1769
1770    #[async_test]
1771    async fn test_remote_is_scanning_event_backwards_from_event_cache() {
1772        let room_id = room_id!("!r0");
1773        let user_id = user_id!("@mnt_io:matrix.org");
1774        let event_factory = EventFactory::new().sender(user_id).room(room_id);
1775        let event_id_0 = event_id!("$ev0");
1776        let event_id_1 = event_id!("$ev1");
1777        let event_id_2 = event_id!("$ev2");
1778
1779        let server = MatrixMockServer::new().await;
1780        let client = server.client_builder().build().await;
1781
1782        // Prelude.
1783        {
1784            // Create the room.
1785            client.base_client().get_or_create_room(room_id, RoomState::Joined);
1786
1787            // Initialise the event cache store.
1788            client
1789                .event_cache_store()
1790                .lock()
1791                .await
1792                .expect("Could not acquire the event cache lock")
1793                .as_clean()
1794                .expect("Could not acquire a clean event cache lock")
1795                .handle_linked_chunk_updates(
1796                    LinkedChunkId::Room(room_id),
1797                    vec![
1798                        Update::NewItemsChunk {
1799                            previous: None,
1800                            new: ChunkIdentifier::new(0),
1801                            next: None,
1802                        },
1803                        Update::PushItems {
1804                            at: Position::new(ChunkIdentifier::new(0), 0),
1805                            items: vec![
1806                                // a latest event candidate
1807                                event_factory.text_msg("hello").event_id(event_id_0).into(),
1808                                // a latest event candidate
1809                                event_factory.text_msg("world").event_id(event_id_1).into(),
1810                                // not a latest event candidate
1811                                event_factory
1812                                    .room_topic("new room topic")
1813                                    .event_id(event_id_2)
1814                                    .into(),
1815                            ],
1816                        },
1817                    ],
1818                )
1819                .await
1820                .unwrap();
1821        }
1822
1823        let event_cache = client.event_cache();
1824        event_cache.subscribe().unwrap();
1825
1826        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1827        let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.to_owned());
1828
1829        assert_remote_value_matches_room_message_with_body!(
1830            // We get `event_id_1` because `event_id_2` isn't a candidate,
1831            // and `event_id_0` hasn't been read yet (because events are read
1832            // backwards).
1833            LatestEventValueBuilder::new_remote(&room_event_cache, &weak_room).await => with body = "world"
1834        );
1835    }
1836
1837    async fn local_prelude() -> (Client, OwnedRoomId, RoomSendQueue, RoomEventCache) {
1838        let room_id = room_id!("!r0").to_owned();
1839
1840        let server = MatrixMockServer::new().await;
1841        let client = server.client_builder().build().await;
1842        client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1843        let room = client.get_room(&room_id).unwrap();
1844
1845        let event_cache = client.event_cache();
1846        event_cache.subscribe().unwrap();
1847
1848        let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
1849
1850        let send_queue = client.send_queue();
1851        let room_send_queue = send_queue.for_room(room);
1852
1853        (client, room_id, room_send_queue, room_event_cache)
1854    }
1855
1856    fn new_local_echo_content(
1857        room_send_queue: &RoomSendQueue,
1858        transaction_id: &OwnedTransactionId,
1859        body: &str,
1860    ) -> LocalEchoContent {
1861        LocalEchoContent::Event {
1862            serialized_event: SerializableEventContent::new(
1863                &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
1864            )
1865            .unwrap(),
1866            send_handle: SendHandle::new(
1867                room_send_queue.clone(),
1868                transaction_id.clone(),
1869                MilliSecondsSinceUnixEpoch::now(),
1870            ),
1871            send_error: None,
1872        }
1873    }
1874
1875    #[async_test]
1876    async fn test_local_new_local_event() {
1877        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1878
1879        let mut buffer = LatestEventValuesForLocalEvents::new();
1880
1881        // Receiving one `NewLocalEvent`.
1882        {
1883            let transaction_id = OwnedTransactionId::from("txnid0");
1884            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
1885
1886            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1887
1888            // The `LatestEventValue` matches the new local event.
1889            assert_local_value_matches_room_message_with_body!(
1890                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1891                LatestEventValue::LocalIsSending => with body = "A"
1892            );
1893        }
1894
1895        // Receiving another `NewLocalEvent`, ensuring it's pushed back in the buffer.
1896        {
1897            let transaction_id = OwnedTransactionId::from("txnid1");
1898            let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
1899
1900            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1901
1902            // The `LatestEventValue` matches the new local event.
1903            assert_local_value_matches_room_message_with_body!(
1904                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1905                LatestEventValue::LocalIsSending => with body = "B"
1906            );
1907        }
1908
1909        assert_eq!(buffer.buffer.len(), 2);
1910    }
1911
1912    #[async_test]
1913    async fn test_local_new_local_event_when_previous_local_event_cannot_be_sent() {
1914        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1915
1916        let mut buffer = LatestEventValuesForLocalEvents::new();
1917
1918        // Receiving one `NewLocalEvent`.
1919        let transaction_id_0 = {
1920            let transaction_id = OwnedTransactionId::from("txnid0");
1921            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
1922
1923            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1924                transaction_id: transaction_id.clone(),
1925                content,
1926            });
1927
1928            // The `LatestEventValue` matches the new local event.
1929            assert_local_value_matches_room_message_with_body!(
1930                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1931                LatestEventValue::LocalIsSending => with body = "A"
1932            );
1933
1934            transaction_id
1935        };
1936
1937        // Receiving a `SendError` targeting the first event. The
1938        // `LatestEventValue` must change to indicate it “cannot be sent”.
1939        {
1940            let update = RoomSendQueueUpdate::SendError {
1941                transaction_id: transaction_id_0.clone(),
1942                error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
1943                is_recoverable: true,
1944            };
1945
1946            // The `LatestEventValue` has changed, it still matches the latest local
1947            // event but it's marked as “cannot be sent”.
1948            assert_local_value_matches_room_message_with_body!(
1949                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1950                LatestEventValue::LocalCannotBeSent => with body = "A"
1951            );
1952
1953            assert_eq!(buffer.buffer.len(), 1);
1954            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1955        }
1956
1957        // Receiving another `NewLocalEvent`, ensuring it's pushed back in the buffer,
1958        // and as a `LocalCannotBeSent` because the previous value is itself
1959        // `LocalCannotBeSent`.
1960        {
1961            let transaction_id = OwnedTransactionId::from("txnid1");
1962            let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
1963
1964            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1965
1966            // The `LatestEventValue` matches the new local event.
1967            assert_local_value_matches_room_message_with_body!(
1968                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1969                LatestEventValue::LocalCannotBeSent => with body = "B"
1970            );
1971        }
1972
1973        assert_eq!(buffer.buffer.len(), 2);
1974        assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1975        assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1976    }
1977
1978    #[async_test]
1979    async fn test_local_cancelled_local_event() {
1980        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1981
1982        let mut buffer = LatestEventValuesForLocalEvents::new();
1983        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1984        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1985        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1986
1987        // Receiving three `NewLocalEvent`s.
1988        {
1989            for (transaction_id, body) in
1990                [(&transaction_id_0, "A"), (&transaction_id_1, "B"), (&transaction_id_2, "C")]
1991            {
1992                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
1993
1994                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1995                    transaction_id: transaction_id.clone(),
1996                    content,
1997                });
1998
1999                // The `LatestEventValue` matches the new local event.
2000                assert_local_value_matches_room_message_with_body!(
2001                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2002                    LatestEventValue::LocalIsSending => with body = body
2003                );
2004            }
2005
2006            assert_eq!(buffer.buffer.len(), 3);
2007        }
2008
2009        // Receiving a `CancelledLocalEvent` targeting the second event. The
2010        // `LatestEventValue` must not change.
2011        {
2012            let update = RoomSendQueueUpdate::CancelledLocalEvent {
2013                transaction_id: transaction_id_1.clone(),
2014            };
2015
2016            // The `LatestEventValue` hasn't changed, it still matches the latest local
2017            // event.
2018            assert_local_value_matches_room_message_with_body!(
2019                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2020                LatestEventValue::LocalIsSending => with body = "C"
2021            );
2022
2023            assert_eq!(buffer.buffer.len(), 2);
2024        }
2025
2026        // Receiving a `CancelledLocalEvent` targeting the second (so the last) event.
2027        // The `LatestEventValue` must point to the first local event.
2028        {
2029            let update = RoomSendQueueUpdate::CancelledLocalEvent {
2030                transaction_id: transaction_id_2.clone(),
2031            };
2032
2033            // The `LatestEventValue` has changed, it matches the previous (so the first)
2034            // local event.
2035            assert_local_value_matches_room_message_with_body!(
2036                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2037                LatestEventValue::LocalIsSending => with body = "A"
2038            );
2039
2040            assert_eq!(buffer.buffer.len(), 1);
2041        }
2042
2043        // Receiving a `CancelledLocalEvent` targeting the first (so the last) event.
2044        // The `LatestEventValue` cannot be computed from the send queue and will
2045        // fallback to the event cache. The event cache is empty in this case, so we get
2046        // nothing.
2047        {
2048            let update =
2049                RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: transaction_id_0 };
2050
2051            // The `LatestEventValue` has changed, it's empty!
2052            assert_matches!(
2053                LatestEventValueBuilder::new_local(
2054                    &update,
2055                    &mut buffer,
2056                    &room_event_cache,
2057                    None,
2058                    None
2059                )
2060                .await,
2061                LatestEventValue::None
2062            );
2063
2064            assert!(buffer.buffer.is_empty());
2065        }
2066    }
2067
2068    #[async_test]
2069    async fn test_local_sent_event() {
2070        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2071
2072        let mut buffer = LatestEventValuesForLocalEvents::new();
2073        let transaction_id_0 = OwnedTransactionId::from("txnid0");
2074        let transaction_id_1 = OwnedTransactionId::from("txnid1");
2075
2076        // Receiving two `NewLocalEvent`s.
2077        {
2078            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2079                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2080
2081                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2082                    transaction_id: transaction_id.clone(),
2083                    content,
2084                });
2085
2086                // The `LatestEventValue` matches the new local event.
2087                assert_local_value_matches_room_message_with_body!(
2088                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2089                    LatestEventValue::LocalIsSending => with body = body
2090                );
2091            }
2092
2093            assert_eq!(buffer.buffer.len(), 2);
2094        }
2095
2096        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
2097        // must not change.
2098        {
2099            let update = RoomSendQueueUpdate::SentEvent {
2100                transaction_id: transaction_id_0.clone(),
2101                event_id: event_id!("$ev0").to_owned(),
2102            };
2103
2104            // The `LatestEventValue` hasn't changed, it still matches the latest local
2105            // event.
2106            assert_local_value_matches_room_message_with_body!(
2107                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2108                LatestEventValue::LocalIsSending => with body = "B"
2109            );
2110
2111            assert_eq!(buffer.buffer.len(), 1);
2112        }
2113
2114        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
2115        // hasn't changed, this is still this event.
2116        {
2117            let update = RoomSendQueueUpdate::SentEvent {
2118                transaction_id: transaction_id_1,
2119                event_id: event_id!("$ev1").to_owned(),
2120            };
2121
2122            // The `LatestEventValue` hasn't changed.
2123            assert_local_value_matches_room_message_with_body!(
2124                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2125                LatestEventValue::LocalIsSending => with body = "B"
2126            );
2127
2128            assert!(buffer.buffer.is_empty());
2129        }
2130    }
2131
2132    #[async_test]
2133    async fn test_local_replaced_local_event() {
2134        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2135
2136        let mut buffer = LatestEventValuesForLocalEvents::new();
2137        let transaction_id_0 = OwnedTransactionId::from("txnid0");
2138        let transaction_id_1 = OwnedTransactionId::from("txnid1");
2139
2140        // Receiving two `NewLocalEvent`s.
2141        {
2142            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2143                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2144
2145                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2146                    transaction_id: transaction_id.clone(),
2147                    content,
2148                });
2149
2150                // The `LatestEventValue` matches the new local event.
2151                assert_local_value_matches_room_message_with_body!(
2152                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2153                    LatestEventValue::LocalIsSending => with body = body
2154                );
2155            }
2156
2157            assert_eq!(buffer.buffer.len(), 2);
2158        }
2159
2160        // Receiving a `ReplacedLocalEvent` targeting the first event. The
2161        // `LatestEventValue` must not change.
2162        {
2163            let transaction_id = &transaction_id_0;
2164            let LocalEchoContent::Event { serialized_event: new_content, .. } =
2165                new_local_echo_content(&room_send_queue, transaction_id, "A.")
2166            else {
2167                panic!("oopsy");
2168            };
2169
2170            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
2171                transaction_id: transaction_id.clone(),
2172                new_content,
2173            };
2174
2175            // The `LatestEventValue` hasn't changed, it still matches the latest local
2176            // event.
2177            assert_local_value_matches_room_message_with_body!(
2178                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2179                LatestEventValue::LocalIsSending => with body = "B"
2180            );
2181
2182            assert_eq!(buffer.buffer.len(), 2);
2183        }
2184
2185        // Receiving a `ReplacedLocalEvent` targeting the second (so the last) event.
2186        // The `LatestEventValue` is changing.
2187        {
2188            let transaction_id = &transaction_id_1;
2189            let LocalEchoContent::Event { serialized_event: new_content, .. } =
2190                new_local_echo_content(&room_send_queue, transaction_id, "B.")
2191            else {
2192                panic!("oopsy");
2193            };
2194
2195            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
2196                transaction_id: transaction_id.clone(),
2197                new_content,
2198            };
2199
2200            // The `LatestEventValue` has changed, it still matches the latest local
2201            // event but with its new content.
2202            assert_local_value_matches_room_message_with_body!(
2203                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2204                LatestEventValue::LocalIsSending => with body = "B."
2205            );
2206
2207            assert_eq!(buffer.buffer.len(), 2);
2208        }
2209    }
2210
2211    #[async_test]
2212    async fn test_local_replaced_local_event_by_a_non_suitable_event() {
2213        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2214
2215        let mut buffer = LatestEventValuesForLocalEvents::new();
2216        let transaction_id = OwnedTransactionId::from("txnid0");
2217
2218        // Receiving one `NewLocalEvent`.
2219        {
2220            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2221
2222            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2223                transaction_id: transaction_id.clone(),
2224                content,
2225            });
2226
2227            // The `LatestEventValue` matches the new local event.
2228            assert_local_value_matches_room_message_with_body!(
2229                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2230                LatestEventValue::LocalIsSending => with body = "A"
2231            );
2232
2233            assert_eq!(buffer.buffer.len(), 1);
2234        }
2235
2236        // Receiving a `ReplacedLocalEvent` targeting the first event. Sadly, the new
2237        // event cannot be mapped to a `LatestEventValue`! The first event is removed
2238        // from the buffer, and the `LatestEventValue` becomes `None` because there is
2239        // no other alternative.
2240        {
2241            let new_content = SerializableEventContent::new(&AnyMessageLikeEventContent::Reaction(
2242                ReactionEventContent::new(Annotation::new(
2243                    event_id!("$ev0").to_owned(),
2244                    "+1".to_owned(),
2245                )),
2246            ))
2247            .unwrap();
2248
2249            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
2250                transaction_id: transaction_id.clone(),
2251                new_content,
2252            };
2253
2254            // The `LatestEventValue` has changed!
2255            assert_matches!(
2256                LatestEventValueBuilder::new_local(
2257                    &update,
2258                    &mut buffer,
2259                    &room_event_cache,
2260                    None,
2261                    None
2262                )
2263                .await,
2264                LatestEventValue::None
2265            );
2266
2267            assert_eq!(buffer.buffer.len(), 0);
2268        }
2269    }
2270
2271    #[async_test]
2272    async fn test_remote_edit_invalid_edit() {
2273        let room_id = room_id!("!r0");
2274        let user_id = user_id!("@mnt_io:matrix.org");
2275        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2276        let event_id_0 = event_id!("$ev0");
2277        let event_id_1 = event_id!("$ev1");
2278
2279        let server = MatrixMockServer::new().await;
2280        let client = server.client_builder().build().await;
2281
2282        // Prelude.
2283        {
2284            // Create the room.
2285            client.base_client().get_or_create_room(room_id, RoomState::Joined);
2286
2287            // Initialise the event cache store.
2288            client
2289                .event_cache_store()
2290                .lock()
2291                .await
2292                .expect("Could not acquire the event cache lock")
2293                .as_clean()
2294                .expect("Could not acquire a clean event cache lock")
2295                .handle_linked_chunk_updates(
2296                    LinkedChunkId::Room(room_id),
2297                    vec![
2298                        Update::NewItemsChunk {
2299                            previous: None,
2300                            new: ChunkIdentifier::new(0),
2301                            next: None,
2302                        },
2303                        Update::PushItems {
2304                            at: Position::new(ChunkIdentifier::new(0), 0),
2305                            items: vec![
2306                                // a text message
2307                                event_factory
2308                                    .text_msg("hello")
2309                                    .sender(user_id!("@alice:example.org"))
2310                                    .event_id(event_id_0)
2311                                    .into(),
2312                                // a replacement of the previous message
2313                                event_factory
2314                                    .text_msg("* goodbye")
2315                                    .event_id(event_id_1)
2316                                    .sender(user_id!("@malory:example.org"))
2317                                    .edit(
2318                                        event_id_0,
2319                                        RoomMessageEventContent::text_plain("goodbye").into(),
2320                                    )
2321                                    .into(),
2322                            ],
2323                        },
2324                    ],
2325                )
2326                .await
2327                .unwrap();
2328        }
2329
2330        let event_cache = client.event_cache();
2331        event_cache.subscribe().unwrap();
2332
2333        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2334        let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.to_owned());
2335
2336        assert_remote_value_matches_room_message_with_body!(
2337            // We get `event_id_0` because the edit `event_id_1` is invalid.
2338            LatestEventValueBuilder::new_remote(&room_event_cache, &weak_room).await => with body = "hello"
2339        );
2340    }
2341
2342    #[async_test]
2343    async fn test_local_send_error() {
2344        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2345
2346        let mut buffer = LatestEventValuesForLocalEvents::new();
2347        let transaction_id_0 = OwnedTransactionId::from("txnid0");
2348        let transaction_id_1 = OwnedTransactionId::from("txnid1");
2349
2350        // Receiving two `NewLocalEvent`s.
2351        {
2352            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2353                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2354
2355                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2356                    transaction_id: transaction_id.clone(),
2357                    content,
2358                });
2359
2360                // The `LatestEventValue` matches the new local event.
2361                assert_local_value_matches_room_message_with_body!(
2362                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2363                    LatestEventValue::LocalIsSending => with body = body
2364                );
2365            }
2366
2367            assert_eq!(buffer.buffer.len(), 2);
2368        }
2369
2370        // Receiving a `SendError` targeting the first event. The
2371        // `LatestEventValue` must change to indicate it's “cannot be sent”.
2372        {
2373            let update = RoomSendQueueUpdate::SendError {
2374                transaction_id: transaction_id_0.clone(),
2375                error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2376                is_recoverable: true,
2377            };
2378
2379            // The `LatestEventValue` has changed, it still matches the latest local
2380            // event but it's marked as “cannot be sent”.
2381            assert_local_value_matches_room_message_with_body!(
2382                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2383                LatestEventValue::LocalCannotBeSent => with body = "B"
2384            );
2385
2386            assert_eq!(buffer.buffer.len(), 2);
2387            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2388            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2389        }
2390
2391        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
2392        // must change: since an event has been sent, the following events are now
2393        // “is sending”.
2394        {
2395            let update = RoomSendQueueUpdate::SentEvent {
2396                transaction_id: transaction_id_0.clone(),
2397                event_id: event_id!("$ev0").to_owned(),
2398            };
2399
2400            // The `LatestEventValue` has changed, it still matches the latest local
2401            // event but it's “is sending”.
2402            assert_local_value_matches_room_message_with_body!(
2403                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2404                LatestEventValue::LocalIsSending => with body = "B"
2405            );
2406
2407            assert_eq!(buffer.buffer.len(), 1);
2408            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
2409        }
2410    }
2411
2412    #[async_test]
2413    async fn test_local_retry_event() {
2414        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2415
2416        let mut buffer = LatestEventValuesForLocalEvents::new();
2417        let transaction_id_0 = OwnedTransactionId::from("txnid0");
2418        let transaction_id_1 = OwnedTransactionId::from("txnid1");
2419
2420        // Receiving two `NewLocalEvent`s.
2421        {
2422            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2423                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2424
2425                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2426                    transaction_id: transaction_id.clone(),
2427                    content,
2428                });
2429
2430                // The `LatestEventValue` matches the new local event.
2431                assert_local_value_matches_room_message_with_body!(
2432                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2433                    LatestEventValue::LocalIsSending => with body = body
2434                );
2435            }
2436
2437            assert_eq!(buffer.buffer.len(), 2);
2438        }
2439
2440        // Receiving a `SendError` targeting the first event. The
2441        // `LatestEventValue` must change to indicate it's “cannot be sent”.
2442        {
2443            let update = RoomSendQueueUpdate::SendError {
2444                transaction_id: transaction_id_0.clone(),
2445                error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2446                is_recoverable: true,
2447            };
2448
2449            // The `LatestEventValue` has changed, it still matches the latest local
2450            // event but it's marked as “cannot be sent”.
2451            assert_local_value_matches_room_message_with_body!(
2452                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2453                LatestEventValue::LocalCannotBeSent => with body = "B"
2454            );
2455
2456            assert_eq!(buffer.buffer.len(), 2);
2457            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2458            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2459        }
2460
2461        // Receiving a `RetryEvent` targeting the first event. The `LatestEventValue`
2462        // must change: this local event and its following must be “is sending”.
2463        {
2464            let update =
2465                RoomSendQueueUpdate::RetryEvent { transaction_id: transaction_id_0.clone() };
2466
2467            // The `LatestEventValue` has changed, it still matches the latest local
2468            // event but it's “is sending”.
2469            assert_local_value_matches_room_message_with_body!(
2470                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2471                LatestEventValue::LocalIsSending => with body = "B"
2472            );
2473
2474            assert_eq!(buffer.buffer.len(), 2);
2475            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
2476            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
2477        }
2478    }
2479
2480    #[async_test]
2481    async fn test_local_media_upload() {
2482        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2483
2484        let mut buffer = LatestEventValuesForLocalEvents::new();
2485        let transaction_id = OwnedTransactionId::from("txnid");
2486
2487        // Receiving a `NewLocalEvent`.
2488        {
2489            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2490
2491            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2492                transaction_id: transaction_id.clone(),
2493                content,
2494            });
2495
2496            // The `LatestEventValue` matches the new local event.
2497            assert_local_value_matches_room_message_with_body!(
2498                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2499                LatestEventValue::LocalIsSending => with body = "A"
2500            );
2501
2502            assert_eq!(buffer.buffer.len(), 1);
2503        }
2504
2505        // Receiving a `MediaUpload` targeting the first event. The
2506        // `LatestEventValue` must not change as `MediaUpload` are ignored.
2507        {
2508            let update = RoomSendQueueUpdate::MediaUpload {
2509                related_to: transaction_id,
2510                file: None,
2511                index: 0,
2512                progress: AbstractProgress { current: 0, total: 0 },
2513            };
2514
2515            // The `LatestEventValue` has changed somehow, it tells no new
2516            // `LatestEventValue` is computed.
2517            assert_matches!(
2518                LatestEventValueBuilder::new_local(
2519                    &update,
2520                    &mut buffer,
2521                    &room_event_cache,
2522                    None,
2523                    None
2524                )
2525                .await,
2526                LatestEventValue::None
2527            );
2528
2529            assert_eq!(buffer.buffer.len(), 1);
2530        }
2531    }
2532
2533    #[async_test]
2534    async fn test_local_fallbacks_to_remote_when_empty() {
2535        let room_id = room_id!("!r0");
2536        let user_id = user_id!("@mnt_io:matrix.org");
2537        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2538        let event_id_0 = event_id!("$ev0");
2539        let event_id_1 = event_id!("$ev1");
2540
2541        let server = MatrixMockServer::new().await;
2542        let client = server.client_builder().build().await;
2543
2544        // Prelude.
2545        {
2546            // Create the room.
2547            client.base_client().get_or_create_room(room_id, RoomState::Joined);
2548
2549            // Initialise the event cache store.
2550            client
2551                .event_cache_store()
2552                .lock()
2553                .await
2554                .expect("Could not acquire the event cache lock")
2555                .as_clean()
2556                .expect("Could not acquire a clean event cache lock")
2557                .handle_linked_chunk_updates(
2558                    LinkedChunkId::Room(room_id),
2559                    vec![
2560                        Update::NewItemsChunk {
2561                            previous: None,
2562                            new: ChunkIdentifier::new(0),
2563                            next: None,
2564                        },
2565                        Update::PushItems {
2566                            at: Position::new(ChunkIdentifier::new(0), 0),
2567                            items: vec![
2568                                event_factory.text_msg("hello").event_id(event_id_0).into(),
2569                            ],
2570                        },
2571                    ],
2572                )
2573                .await
2574                .unwrap();
2575        }
2576
2577        let event_cache = client.event_cache();
2578        event_cache.subscribe().unwrap();
2579
2580        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2581
2582        let mut buffer = LatestEventValuesForLocalEvents::new();
2583
2584        // We get a `Remote` because there is no `Local*` values!
2585        assert_remote_value_matches_room_message_with_body!(
2586            LatestEventValueBuilder::new_local(
2587                // An update that won't be create a new `LatestEventValue`.
2588                &RoomSendQueueUpdate::SentEvent {
2589                    transaction_id: OwnedTransactionId::from("txnid"),
2590                    event_id: event_id_1.to_owned(),
2591                },
2592                &mut buffer,
2593                &room_event_cache,
2594                None,
2595                None,
2596            )
2597            .await
2598             => with body = "hello"
2599        );
2600    }
2601}