matrix_sdk/latest_events/
latest_event.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{iter::once, ops::Not};
16
17use eyeball::{AsyncLock, SharedObservable, Subscriber};
18pub use matrix_sdk_base::latest_event::{
19    LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue,
20};
21use matrix_sdk_base::{
22    deserialized_responses::TimelineEvent, store::SerializableEventContent,
23    RoomInfoNotableUpdateReasons, StateChanges,
24};
25use ruma::{
26    events::{
27        relation::RelationType,
28        room::{member::MembershipState, message::MessageType, power_levels::RoomPowerLevels},
29        AnyMessageLikeEventContent, AnySyncStateEvent, AnySyncTimelineEvent, SyncStateEvent,
30    },
31    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
32};
33use tracing::{error, instrument, warn};
34
35use crate::{event_cache::RoomEventCache, room::WeakRoom, send_queue::RoomSendQueueUpdate};
36
37/// The latest event of a room or a thread.
38///
39/// Use [`LatestEvent::subscribe`] to get a stream of updates.
40#[derive(Debug)]
41pub(super) struct LatestEvent {
42    /// The room owning this latest event.
43    weak_room: WeakRoom,
44
45    /// The thread (if any) owning this latest event.
46    _thread_id: Option<OwnedEventId>,
47
48    /// A buffer of the current [`LatestEventValue`] computed for local events
49    /// seen by the send queue. See [`LatestEventValuesForLocalEvents`] to learn
50    /// more.
51    buffer_of_values_for_local_events: LatestEventValuesForLocalEvents,
52
53    /// The latest event value.
54    current_value: SharedObservable<LatestEventValue, AsyncLock>,
55}
56
57impl LatestEvent {
58    pub(super) async fn new(
59        weak_room: &WeakRoom,
60        thread_id: Option<&EventId>,
61        room_event_cache: &RoomEventCache,
62    ) -> Self {
63        Self {
64            weak_room: weak_room.clone(),
65            _thread_id: thread_id.map(ToOwned::to_owned),
66            buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(),
67            current_value: SharedObservable::new_async(
68                LatestEventValueBuilder::new_remote(room_event_cache, weak_room).await,
69            ),
70        }
71    }
72
73    /// Return a [`Subscriber`] to new values.
74    pub async fn subscribe(&self) -> Subscriber<LatestEventValue, AsyncLock> {
75        self.current_value.subscribe().await
76    }
77
78    /// Update the inner latest event value, based on the event cache
79    /// (specifically with the [`RoomEventCache`]), if and only if there is no
80    /// local latest event value waiting.
81    ///
82    /// It is only necessary to compute a new [`LatestEventValue`] from the
83    /// event cache if there is no [`LatestEventValue`] to be compute from the
84    /// send queue. Indeed, anything coming from the send queue has the priority
85    /// over the anything coming from the event cache. We believe it provides a
86    /// better user experience.
87    pub async fn update_with_event_cache(
88        &mut self,
89        room_event_cache: &RoomEventCache,
90        power_levels: &Option<(&UserId, RoomPowerLevels)>,
91    ) {
92        if self.buffer_of_values_for_local_events.is_empty().not() {
93            // At least one `LatestEventValue` exists for local events (i.e. coming from the
94            // send queue). In this case, we don't overwrite the current value with a newly
95            // computed one from the event cache.
96            return;
97        }
98
99        let new_value =
100            LatestEventValueBuilder::new_remote_with_power_levels(room_event_cache, power_levels)
101                .await;
102
103        self.update(new_value).await;
104    }
105
106    /// Update the inner latest event value, based on the send queue
107    /// (specifically with the [`RoomSendQueueUpdate`]).
108    pub async fn update_with_send_queue(
109        &mut self,
110        send_queue_update: &RoomSendQueueUpdate,
111        room_event_cache: &RoomEventCache,
112        power_levels: &Option<(&UserId, RoomPowerLevels)>,
113    ) {
114        let new_value = LatestEventValueBuilder::new_local(
115            send_queue_update,
116            &mut self.buffer_of_values_for_local_events,
117            room_event_cache,
118            power_levels,
119        )
120        .await;
121
122        self.update(new_value).await;
123    }
124
125    /// Update [`Self::current_value`] if and only if the `new_value` is not
126    /// [`LatestEventValue::None`].
127    async fn update(&mut self, new_value: LatestEventValue) {
128        if let LatestEventValue::None = new_value {
129            // Do not update to a `None` value.
130        } else {
131            self.current_value.set(new_value.clone()).await;
132            self.store(new_value).await;
133        }
134    }
135
136    /// Update the `RoomInfo` associated to this room to set the new
137    /// [`LatestEventValue`], and persist it in the
138    /// [`StateStore`][matrix_sdk_base::StateStore] (the one from
139    /// [`Client::state_store`][crate::Client::state_store]).
140    #[instrument(skip_all)]
141    async fn store(&mut self, new_value: LatestEventValue) {
142        let Some(room) = self.weak_room.get() else {
143            warn!(room_id = ?self.weak_room.room_id(), "Cannot store the latest event value because the room cannot be accessed");
144            return;
145        };
146
147        // Compute a new `RoomInfo`.
148        let mut room_info = room.clone_info();
149        room_info.set_new_latest_event(new_value);
150
151        let mut state_changes = StateChanges::default();
152        state_changes.add_room(room_info.clone());
153
154        let client = room.client();
155
156        // Take the sync lock.
157        let _sync_lock = client.base_client().sync_lock().lock().await;
158
159        // Update the `RoomInfo` in the state store.
160        if let Err(error) = client.state_store().save_changes(&state_changes).await {
161            error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
162        }
163
164        // Update the `RoomInfo` of the room.
165        room.set_room_info(room_info, RoomInfoNotableUpdateReasons::LATEST_EVENT);
166    }
167}
168
169#[cfg(all(not(target_family = "wasm"), test))]
170mod tests_latest_event {
171    use assert_matches::assert_matches;
172    use matrix_sdk_base::{
173        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
174        store::StoreConfig,
175        RoomInfoNotableUpdateReasons, RoomState,
176    };
177    use matrix_sdk_test::{async_test, event_factory::EventFactory};
178    use ruma::{
179        event_id,
180        events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
181        room_id,
182        serde::Raw,
183        user_id, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
184    };
185
186    use super::{LatestEvent, LatestEventValue, LocalLatestEventValue, SerializableEventContent};
187    use crate::{
188        client::WeakClient,
189        room::WeakRoom,
190        send_queue::{LocalEcho, LocalEchoContent, RoomSendQueue, RoomSendQueueUpdate, SendHandle},
191        test_utils::mocks::MatrixMockServer,
192    };
193
194    fn local_room_message(body: &str) -> LocalLatestEventValue {
195        LocalLatestEventValue {
196            timestamp: MilliSecondsSinceUnixEpoch::now(),
197            content: SerializableEventContent::from_raw(
198                Raw::new(&AnyMessageLikeEventContent::RoomMessage(
199                    RoomMessageEventContent::text_plain(body),
200                ))
201                .unwrap(),
202                "m.room.message".to_owned(),
203            ),
204        }
205    }
206
207    fn new_local_echo_content(
208        room_send_queue: &RoomSendQueue,
209        transaction_id: &OwnedTransactionId,
210        body: &str,
211    ) -> LocalEchoContent {
212        LocalEchoContent::Event {
213            serialized_event: SerializableEventContent::new(
214                &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
215            )
216            .unwrap(),
217            send_handle: SendHandle::new(
218                room_send_queue.clone(),
219                transaction_id.clone(),
220                MilliSecondsSinceUnixEpoch::now(),
221            ),
222            send_error: None,
223        }
224    }
225
226    #[async_test]
227    async fn test_update_ignores_none_value() {
228        let room_id = room_id!("!r0");
229
230        let server = MatrixMockServer::new().await;
231        let client = server.client_builder().build().await;
232        let weak_client = WeakClient::from_client(&client);
233
234        // Create the room.
235        client.base_client().get_or_create_room(room_id, RoomState::Joined);
236        let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
237
238        // Get a `RoomEventCache`.
239        let event_cache = client.event_cache();
240        event_cache.subscribe().unwrap();
241
242        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
243
244        let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
245
246        // First off, check the default value is `None`!
247        assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
248
249        // Second, set a new value.
250        latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
251
252        assert_matches!(
253            latest_event.current_value.get().await,
254            LatestEventValue::LocalIsSending(_)
255        );
256
257        // Finally, set a new `None` value. It must be ignored.
258        latest_event.update(LatestEventValue::None).await;
259
260        assert_matches!(
261            latest_event.current_value.get().await,
262            LatestEventValue::LocalIsSending(_)
263        );
264    }
265
266    #[async_test]
267    async fn test_local_has_priority_over_remote() {
268        let room_id = room_id!("!r0").to_owned();
269        let user_id = user_id!("@mnt_io:matrix.org");
270        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
271
272        let server = MatrixMockServer::new().await;
273        let client = server.client_builder().build().await;
274        client.base_client().get_or_create_room(&room_id, RoomState::Joined);
275        let room = client.get_room(&room_id).unwrap();
276        let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
277
278        let event_cache = client.event_cache();
279        event_cache.subscribe().unwrap();
280
281        // Fill the event cache with one event.
282        client
283            .event_cache_store()
284            .lock()
285            .await
286            .unwrap()
287            .handle_linked_chunk_updates(
288                LinkedChunkId::Room(&room_id),
289                vec![
290                    Update::NewItemsChunk {
291                        previous: None,
292                        new: ChunkIdentifier::new(0),
293                        next: None,
294                    },
295                    Update::PushItems {
296                        at: Position::new(ChunkIdentifier::new(0), 0),
297                        items: vec![event_factory.text_msg("A").event_id(event_id!("$ev0")).into()],
298                    },
299                ],
300            )
301            .await
302            .unwrap();
303
304        let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
305
306        let send_queue = client.send_queue();
307        let room_send_queue = send_queue.for_room(room);
308
309        let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
310
311        // First, let's create a `LatestEventValue` from the event cache. It must work.
312        {
313            latest_event.update_with_event_cache(&room_event_cache, &None).await;
314
315            assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
316        }
317
318        // Second, let's create a `LatestEventValue` from the send queue. It
319        // must overwrite the current `LatestEventValue`.
320        let transaction_id = OwnedTransactionId::from("txnid0");
321
322        {
323            let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
324
325            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
326                transaction_id: transaction_id.clone(),
327                content,
328            });
329
330            latest_event.update_with_send_queue(&update, &room_event_cache, &None).await;
331
332            assert_matches!(
333                latest_event.current_value.get().await,
334                LatestEventValue::LocalIsSending(_)
335            );
336        }
337
338        // Third, let's create a `LatestEventValue` from the event cache.
339        // Nothing must happen, it cannot overwrite the current
340        // `LatestEventValue` because the local event isn't sent yet.
341        {
342            latest_event.update_with_event_cache(&room_event_cache, &None).await;
343
344            assert_matches!(
345                latest_event.current_value.get().await,
346                LatestEventValue::LocalIsSending(_)
347            );
348        }
349
350        // Fourth, let's a `LatestEventValue` from the send queue. It must stay the
351        // same, but now the local event is sent.
352        {
353            let update = RoomSendQueueUpdate::SentEvent {
354                transaction_id,
355                event_id: event_id!("$ev1").to_owned(),
356            };
357
358            latest_event.update_with_send_queue(&update, &room_event_cache, &None).await;
359
360            assert_matches!(
361                latest_event.current_value.get().await,
362                LatestEventValue::LocalIsSending(_)
363            );
364        }
365
366        // Finally, let's create a `LatestEventValue` from the event cache. _Now_ it's
367        // possible, because there is no more local events.
368        {
369            latest_event.update_with_event_cache(&room_event_cache, &None).await;
370
371            assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
372        }
373    }
374
375    #[async_test]
376    async fn test_store_latest_event_value() {
377        let room_id = room_id!("!r0").to_owned();
378        let user_id = user_id!("@mnt_io:matrix.org");
379        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
380
381        let server = MatrixMockServer::new().await;
382
383        let store_config = StoreConfig::new("cross-process-lock-holder".to_owned());
384
385        // Load the client for the first time, and run some operations.
386        {
387            let client = server
388                .client_builder()
389                .on_builder(|builder| builder.store_config(store_config.clone()))
390                .build()
391                .await;
392            let mut room_info_notable_update_receiver = client.room_info_notable_update_receiver();
393            let room = client.base_client().get_or_create_room(&room_id, RoomState::Joined);
394            let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
395
396            let event_cache = client.event_cache();
397            event_cache.subscribe().unwrap();
398
399            // Fill the event cache with one event.
400            client
401                .event_cache_store()
402                .lock()
403                .await
404                .unwrap()
405                .handle_linked_chunk_updates(
406                    LinkedChunkId::Room(&room_id),
407                    vec![
408                        Update::NewItemsChunk {
409                            previous: None,
410                            new: ChunkIdentifier::new(0),
411                            next: None,
412                        },
413                        Update::PushItems {
414                            at: Position::new(ChunkIdentifier::new(0), 0),
415                            items: vec![event_factory
416                                .text_msg("A")
417                                .event_id(event_id!("$ev0"))
418                                .into()],
419                        },
420                    ],
421                )
422                .await
423                .unwrap();
424
425            let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
426
427            // Check there is no `LatestEventValue` for the moment.
428            {
429                let latest_event = room.new_latest_event();
430
431                assert_matches!(latest_event, LatestEventValue::None);
432            }
433
434            // Generate a new `LatestEventValue`.
435            {
436                let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
437                latest_event.update_with_event_cache(&room_event_cache, &None).await;
438
439                assert_matches!(
440                    latest_event.current_value.get().await,
441                    LatestEventValue::Remote(_)
442                );
443            }
444
445            // We see the `RoomInfoNotableUpdateReasons`.
446            {
447                let update = room_info_notable_update_receiver.recv().await.unwrap();
448
449                assert_eq!(update.room_id, room_id);
450                assert!(update.reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
451            }
452
453            // Check it's in the `RoomInfo` and in `Room`.
454            {
455                let latest_event = room.new_latest_event();
456
457                assert_matches!(latest_event, LatestEventValue::Remote(_));
458            }
459        }
460
461        // Reload the client with the same store config, and see the `LatestEventValue`
462        // is inside the `RoomInfo`.
463        {
464            let client = server
465                .client_builder()
466                .on_builder(|builder| builder.store_config(store_config))
467                .build()
468                .await;
469            let room = client.get_room(&room_id).unwrap();
470            let latest_event = room.new_latest_event();
471
472            assert_matches!(latest_event, LatestEventValue::Remote(_));
473        }
474    }
475}
476
477/// A builder of [`LatestEventValue`]s.
478struct LatestEventValueBuilder;
479
480impl LatestEventValueBuilder {
481    /// Create a new [`LatestEventValue::Remote`].
482    async fn new_remote(
483        room_event_cache: &RoomEventCache,
484        weak_room: &WeakRoom,
485    ) -> LatestEventValue {
486        // Get the power levels of the user for the current room if the `WeakRoom` is
487        // still valid.
488        let room = weak_room.get();
489        let power_levels = match &room {
490            Some(room) => {
491                let power_levels = room.power_levels().await.ok();
492
493                Some(room.own_user_id()).zip(power_levels)
494            }
495
496            None => None,
497        };
498
499        Self::new_remote_with_power_levels(room_event_cache, &power_levels).await
500    }
501
502    /// Create a new [`LatestEventValue::Remote`] based on existing power
503    /// levels.
504    async fn new_remote_with_power_levels(
505        room_event_cache: &RoomEventCache,
506        power_levels: &Option<(&UserId, RoomPowerLevels)>,
507    ) -> LatestEventValue {
508        room_event_cache
509            .rfind_map_event_in_memory_by(|event| {
510                filter_timeline_event(event, power_levels).then(|| event.clone())
511            })
512            .await
513            .map(LatestEventValue::Remote)
514            .unwrap_or_default()
515    }
516
517    /// Create a new [`LatestEventValue::LocalIsSending`] or
518    /// [`LatestEventValue::LocalCannotBeSent`].
519    async fn new_local(
520        send_queue_update: &RoomSendQueueUpdate,
521        buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
522        room_event_cache: &RoomEventCache,
523        power_levels: &Option<(&UserId, RoomPowerLevels)>,
524    ) -> LatestEventValue {
525        use crate::send_queue::{LocalEcho, LocalEchoContent};
526
527        match send_queue_update {
528            // A new local event is being sent.
529            //
530            // Let's create the `LatestEventValue` and push it in the buffer of values.
531            RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
532                transaction_id,
533                content: local_echo_content,
534            }) => match local_echo_content {
535                LocalEchoContent::Event { serialized_event: serialized_event_content, .. } => {
536                    match serialized_event_content.deserialize() {
537                        Ok(content) => {
538                            if filter_any_message_like_event_content(content) {
539                                let value =
540                                    LatestEventValue::LocalIsSending(LocalLatestEventValue {
541                                        timestamp: MilliSecondsSinceUnixEpoch::now(),
542                                        content: serialized_event_content.clone(),
543                                    });
544
545                                buffer_of_values_for_local_events
546                                    .push(transaction_id.to_owned(), value.clone());
547
548                                value
549                            } else {
550                                LatestEventValue::None
551                            }
552                        }
553
554                        Err(error) => {
555                            error!(?error, "Failed to deserialize an event from `RoomSendQueueUpdate::NewLocalEvent`");
556
557                            LatestEventValue::None
558                        }
559                    }
560                }
561
562                LocalEchoContent::React { .. } => LatestEventValue::None,
563            },
564
565            // A local event has been cancelled before being sent.
566            //
567            // Remove the calculated `LatestEventValue` from the buffer of values, and return the
568            // last `LatestEventValue` or calculate a new one.
569            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
570                if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
571                    buffer_of_values_for_local_events.remove(position);
572                }
573
574                Self::new_local_or_remote(
575                    buffer_of_values_for_local_events,
576                    room_event_cache,
577                    power_levels,
578                )
579                .await
580            }
581
582            // A local event has successfully been sent!
583            //
584            // Mark all “cannot be sent” values as “is sending” after the one matching
585            // `transaction_id`. Indeed, if an event has been sent, it means the send queue is
586            // working, so if any value has been marked as “cannot be sent”, it must be marked as
587            // “is sending”. Then, remove the calculated `LatestEventValue` from the buffer of
588            // values. Finally, return the last `LatestEventValue` or calculate a new
589            // one.
590            RoomSendQueueUpdate::SentEvent { transaction_id, .. } => {
591                let position =
592                    buffer_of_values_for_local_events.mark_is_sending_after(transaction_id);
593
594                // First, compute the new value. Then we remove the sent local event from the
595                // buffer.
596                //
597                // Why in this order? Because in between sending and remote echoing, the event
598                // will only be stored as a local echo and not as a full event in the event
599                // cache. Just after sending, it won't show up as a `LatestEventValue` as it
600                // will immediately be replaced by an event from the event cache. By computing
601                // the new value before removing it from the buffer, we ensure the
602                // `LatestEventValue` represents the just sent local event.
603                //
604                // Note: the next sync may not include the just sent local event. This is a race
605                // condition we are aware of, see https://github.com/matrix-org/matrix-rust-sdk/issues/3941.
606                let value = Self::new_local_or_remote(
607                    buffer_of_values_for_local_events,
608                    room_event_cache,
609                    power_levels,
610                )
611                .await;
612
613                if let Some(position) = position {
614                    buffer_of_values_for_local_events.remove(position);
615                }
616
617                value
618            }
619
620            // A local event has been replaced by another one.
621            //
622            // Replace the latest event value matching `transaction_id` in the buffer if it exists
623            // (note: it should!), and return the last `LatestEventValue` or calculate a new one.
624            RoomSendQueueUpdate::ReplacedLocalEvent {
625                transaction_id,
626                new_content: new_serialized_event_content,
627            } => {
628                if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
629                    match new_serialized_event_content.deserialize() {
630                        Ok(content) => {
631                            if filter_any_message_like_event_content(content) {
632                                buffer_of_values_for_local_events.replace_content(
633                                    position,
634                                    new_serialized_event_content.clone(),
635                                );
636                            } else {
637                                buffer_of_values_for_local_events.remove(position);
638                            }
639                        }
640
641                        Err(error) => {
642                            error!(?error, "Failed to deserialize an event from `RoomSendQueueUpdate::ReplacedLocalEvent`");
643
644                            return LatestEventValue::None;
645                        }
646                    }
647                }
648
649                Self::new_local_or_remote(
650                    buffer_of_values_for_local_events,
651                    room_event_cache,
652                    power_levels,
653                )
654                .await
655            }
656
657            // An error has occurred.
658            //
659            // Mark the latest event value matching `transaction_id`, and all its following values,
660            // as “cannot be sent”.
661            RoomSendQueueUpdate::SendError { transaction_id, .. } => {
662                buffer_of_values_for_local_events.mark_cannot_be_sent_from(transaction_id);
663
664                Self::new_local_or_remote(
665                    buffer_of_values_for_local_events,
666                    room_event_cache,
667                    power_levels,
668                )
669                .await
670            }
671
672            // A local event has been unwedged and sending is being retried.
673            //
674            // Mark the latest event value matching `transaction_id`, and all its following values,
675            // as “is sending”.
676            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
677                buffer_of_values_for_local_events.mark_is_sending_from(transaction_id);
678
679                Self::new_local_or_remote(
680                    buffer_of_values_for_local_events,
681                    room_event_cache,
682                    power_levels,
683                )
684                .await
685            }
686
687            // A media upload has made progress.
688            //
689            // Nothing to do here.
690            RoomSendQueueUpdate::MediaUpload { .. } => LatestEventValue::None,
691        }
692    }
693
694    /// Get the last [`LatestEventValue`] from the local latest event values if
695    /// any, or create a new [`LatestEventValue`] from the remote events.
696    ///
697    /// If the buffer of latest event values is not empty, let's return the last
698    /// one. Otherwise, it means we no longer have any local event: let's
699    /// fallback on remote event!
700    async fn new_local_or_remote(
701        buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
702        room_event_cache: &RoomEventCache,
703        power_levels: &Option<(&UserId, RoomPowerLevels)>,
704    ) -> LatestEventValue {
705        if let Some(value) = buffer_of_values_for_local_events.last() {
706            value.clone()
707        } else {
708            Self::new_remote_with_power_levels(room_event_cache, power_levels).await
709        }
710    }
711}
712
713/// A buffer of the current [`LatestEventValue`] computed for local events
714/// seen by the send queue. It is used by
715/// [`LatestEvent::buffer_of_values_for_local_events`].
716///
717/// The system does only receive [`RoomSendQueueUpdate`]s. It's not designed to
718/// iterate over local events in the send queue when a local event is changed
719/// (cancelled, or updated for example). That's why we keep our own buffer here.
720/// Imagine the system receives 4 [`RoomSendQueueUpdate`]:
721///
722/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
723/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
724/// 3. [`RoomSendQueueUpdate::ReplacedLocalEvent`]: replaced the first local
725///    event,
726/// 4. [`RoomSendQueueUpdate::CancelledLocalEvent`]: cancelled the second local
727///    event.
728///
729/// `NewLocalEvent`s will trigger the computation of new
730/// `LatestEventValue`s, but `CancelledLocalEvent` for example doesn't hold
731/// any information to compute a new `LatestEventValue`, so we need to
732/// remember the previous values, until the local events are sent and
733/// removed from this buffer.
734///
735/// Another reason why we need a buffer is to handle wedged local event. Imagine
736/// the system receives 3 [`RoomSendQueueUpdate`]:
737///
738/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
739/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
740/// 3. [`RoomSendQueueUpdate::SendError`]: the first local event has failed to
741///    be sent.
742///
743/// Because a `SendError` is received (targeting the first `NewLocalEvent`), the
744/// send queue is stopped. However, the `LatestEventValue` targets the second
745/// `NewLocalEvent`. The system must consider that when a local event is wedged,
746/// all the following local events must also be marked as “cannot be sent”. And
747/// vice versa, when the send queue is able to send an event again, all the
748/// following local events must be marked as “is sending”.
749///
750/// This type isolates a couple of methods designed to manage these specific
751/// behaviours.
752#[derive(Debug)]
753struct LatestEventValuesForLocalEvents {
754    buffer: Vec<(OwnedTransactionId, LatestEventValue)>,
755}
756
757impl LatestEventValuesForLocalEvents {
758    /// Create a new [`LatestEventValuesForLocalEvents`].
759    fn new() -> Self {
760        Self { buffer: Vec::with_capacity(2) }
761    }
762
763    /// Check the buffer is empty.
764    fn is_empty(&self) -> bool {
765        self.buffer.is_empty()
766    }
767
768    /// Get the last [`LatestEventValue`].
769    fn last(&self) -> Option<&LatestEventValue> {
770        self.buffer.last().map(|(_, value)| value)
771    }
772
773    /// Find the position of the [`LatestEventValue`] matching `transaction_id`.
774    fn position(&self, transaction_id: &TransactionId) -> Option<usize> {
775        self.buffer
776            .iter()
777            .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
778    }
779
780    /// Push a new [`LatestEventValue`].
781    ///
782    /// # Panics
783    ///
784    /// Panics if `value` is not of kind [`LatestEventValue::LocalIsSending`] or
785    /// [`LatestEventValue::LocalCannotBeSent`].
786    fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) {
787        assert!(
788            matches!(
789                value,
790                LatestEventValue::LocalIsSending(_) | LatestEventValue::LocalCannotBeSent(_)
791            ),
792            "`value` must be either `LocalIsSending` or `LocalCannotBeSent`"
793        );
794
795        self.buffer.push((transaction_id, value));
796    }
797
798    /// Replace the content of the [`LatestEventValue`] at position `position`.
799    ///
800    /// # Panics
801    ///
802    /// Panics if:
803    /// - `position` is strictly greater than buffer's length,
804    /// - the [`LatestEventValue`] is not of kind
805    ///   [`LatestEventValue::LocalIsSending`] or
806    ///   [`LatestEventValue::LocalCannotBeSent`].
807    fn replace_content(&mut self, position: usize, new_content: SerializableEventContent) {
808        let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid");
809
810        match value {
811            LatestEventValue::LocalIsSending(LocalLatestEventValue { content, .. }) => {
812                *content = new_content;
813            }
814
815            LatestEventValue::LocalCannotBeSent(LocalLatestEventValue { content, .. }) => {
816                *content = new_content;
817            }
818
819            _ => panic!("`value` must be either `LocalIsSending` or `LocalCannotBeSent`"),
820        }
821    }
822
823    /// Remove the [`LatestEventValue`] at position `position`.
824    ///
825    /// # Panics
826    ///
827    /// Panics if `position` is strictly greater than buffer's length.
828    fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) {
829        self.buffer.remove(position)
830    }
831
832    /// Mark the `LatestEventValue` matching `transaction_id`, and all the
833    /// following values, as “cannot be sent”.
834    fn mark_cannot_be_sent_from(&mut self, transaction_id: &TransactionId) {
835        let mut values = self.buffer.iter_mut();
836
837        if let Some(first_value_to_wedge) = values
838            .by_ref()
839            .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
840        {
841            // Iterate over the found value and the following ones.
842            for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) {
843                if let LatestEventValue::LocalIsSending(content) = value_to_wedge {
844                    *value_to_wedge = LatestEventValue::LocalCannotBeSent(content.clone());
845                }
846            }
847        }
848    }
849
850    /// Mark the `LatestEventValue` matching `transaction_id`, and all the
851    /// following values, as “is sending”.
852    fn mark_is_sending_from(&mut self, transaction_id: &TransactionId) {
853        let mut values = self.buffer.iter_mut();
854
855        if let Some(first_value_to_unwedge) = values
856            .by_ref()
857            .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
858        {
859            // Iterate over the found value and the following ones.
860            for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) {
861                if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
862                    *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
863                }
864            }
865        }
866    }
867
868    /// Mark all the following values after the `LatestEventValue` matching
869    /// `transaction_id` as “is sending”.
870    ///
871    /// Note that contrary to [`Self::mark_is_sending_from`], the
872    /// `LatestEventValue` is untouched. However, its position is returned
873    /// (if any).
874    fn mark_is_sending_after(&mut self, transaction_id: &TransactionId) -> Option<usize> {
875        let mut values = self.buffer.iter_mut();
876
877        if let Some(position) = values
878            .by_ref()
879            .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
880        {
881            // Iterate over all values after the found one.
882            for (_, value_to_unwedge) in values {
883                if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
884                    *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
885                }
886            }
887
888            Some(position)
889        } else {
890            None
891        }
892    }
893}
894
895fn filter_timeline_event(
896    event: &TimelineEvent,
897    power_levels: &Option<(&UserId, RoomPowerLevels)>,
898) -> bool {
899    // Cast the event into an `AnySyncTimelineEvent`. If deserializing fails, we
900    // ignore the event.
901    let event = match event.raw().deserialize() {
902        Ok(event) => event,
903        Err(error) => {
904            error!(
905                ?error,
906                "Failed to deserialize the event when looking for a suitable latest event"
907            );
908
909            return false;
910        }
911    };
912
913    match event {
914        AnySyncTimelineEvent::MessageLike(message_like_event) => {
915            match message_like_event.original_content() {
916                Some(any_message_like_event_content) => {
917                    filter_any_message_like_event_content(any_message_like_event_content)
918                }
919
920                // The event has been redacted.
921                None => true,
922            }
923        }
924
925        // We don't currently support most state events…
926        AnySyncTimelineEvent::State(state) => {
927            // … but we make an exception for knocked state events _if_ the current user
928            // can either accept or decline them.
929            if let AnySyncStateEvent::RoomMember(member) = state {
930                if matches!(member.membership(), MembershipState::Knock) {
931                    let can_accept_or_decline_knocks = match power_levels {
932                        Some((own_user_id, room_power_levels)) => {
933                            room_power_levels.user_can_invite(own_user_id)
934                                || room_power_levels.user_can_kick(own_user_id)
935                        }
936                        _ => false,
937                    };
938
939                    // The current user can act on the knock changes, so they should be
940                    // displayed
941                    if can_accept_or_decline_knocks {
942                        // We can only decide whether the user can accept or decline knocks if the
943                        // event isn't redacted.
944                        return matches!(member, SyncStateEvent::Original(_));
945                    }
946                }
947            }
948
949            false
950        }
951    }
952}
953
954fn filter_any_message_like_event_content(event: AnyMessageLikeEventContent) -> bool {
955    match event {
956        AnyMessageLikeEventContent::RoomMessage(message) => {
957            // Don't show incoming verification requests.
958            if let MessageType::VerificationRequest(_) = message.msgtype {
959                return false;
960            }
961
962            // Check if this is a replacement for another message. If it is, ignore
963            // it.
964            //
965            // TODO: if we want to support something like
966            // `LatestEventContent::EditedRoomMessage`, it's here :-].
967            let is_replacement = message.relates_to.as_ref().is_some_and(|relates_to| {
968                if let Some(relation_type) = relates_to.rel_type() {
969                    relation_type == RelationType::Replacement
970                } else {
971                    false
972                }
973            });
974
975            !is_replacement
976        }
977
978        AnyMessageLikeEventContent::UnstablePollStart(_)
979        | AnyMessageLikeEventContent::CallInvite(_)
980        | AnyMessageLikeEventContent::CallNotify(_)
981        | AnyMessageLikeEventContent::Sticker(_) => true,
982
983        // Encrypted events are not suitable.
984        AnyMessageLikeEventContent::RoomEncrypted(_) => false,
985
986        // Everything else is considered not suitable.
987        _ => false,
988    }
989}
990
991#[cfg(test)]
992mod tests_latest_event_content {
993    use std::ops::Not;
994
995    use matrix_sdk_test::event_factory::EventFactory;
996    use ruma::{event_id, events::room::message::RoomMessageEventContent, user_id};
997
998    use super::filter_timeline_event;
999
1000    macro_rules! assert_latest_event_content {
1001        ( event | $event_factory:ident | $event_builder:block
1002          is a candidate ) => {
1003            assert_latest_event_content!(@_ | $event_factory | $event_builder, true);
1004        };
1005
1006        ( event | $event_factory:ident | $event_builder:block
1007          is not a candidate ) => {
1008            assert_latest_event_content!(@_ | $event_factory | $event_builder, false);
1009        };
1010
1011        ( @_ | $event_factory:ident | $event_builder:block, $expect:literal ) => {
1012            let user_id = user_id!("@mnt_io:matrix.org");
1013            let event_factory = EventFactory::new().sender(user_id);
1014            let event = {
1015                let $event_factory = event_factory;
1016                $event_builder
1017            };
1018
1019            assert_eq!(filter_timeline_event(&event, &None), $expect );
1020        };
1021    }
1022
1023    #[test]
1024    fn test_room_message() {
1025        assert_latest_event_content!(
1026            event | event_factory | { event_factory.text_msg("hello").into_event() }
1027            is a candidate
1028        );
1029    }
1030
1031    #[test]
1032    fn test_redacted() {
1033        assert_latest_event_content!(
1034            event | event_factory | {
1035                event_factory
1036                    .redacted(
1037                        user_id!("@mnt_io:matrix.org"),
1038                        ruma::events::room::message::RedactedRoomMessageEventContent::new(),
1039                    )
1040                    .into_event()
1041            }
1042            is a candidate
1043        );
1044    }
1045
1046    #[test]
1047    fn test_room_message_replacement() {
1048        assert_latest_event_content!(
1049            event | event_factory | {
1050                event_factory
1051                    .text_msg("bonjour")
1052                    .edit(event_id!("$ev0"), RoomMessageEventContent::text_plain("hello").into())
1053                    .into_event()
1054            }
1055            is not a candidate
1056        );
1057    }
1058
1059    #[test]
1060    fn test_poll() {
1061        assert_latest_event_content!(
1062            event | event_factory | {
1063                event_factory
1064                    .poll_start("the people need to know", "comté > gruyère", vec!["yes", "oui"])
1065                    .into_event()
1066            }
1067            is a candidate
1068        );
1069    }
1070
1071    #[test]
1072    fn test_call_invite() {
1073        assert_latest_event_content!(
1074            event | event_factory | {
1075                event_factory
1076                    .call_invite(
1077                        ruma::OwnedVoipId::from("vvooiipp".to_owned()),
1078                        ruma::UInt::from(1234u32),
1079                        ruma::events::call::SessionDescription::new(
1080                            "type".to_owned(),
1081                            "sdp".to_owned(),
1082                        ),
1083                        ruma::VoipVersionId::V1,
1084                    )
1085                    .into_event()
1086            }
1087            is a candidate
1088        );
1089    }
1090
1091    #[test]
1092    fn test_call_notify() {
1093        assert_latest_event_content!(
1094            event | event_factory | {
1095                event_factory
1096                    .call_notify(
1097                        "call_id".to_owned(),
1098                        ruma::events::call::notify::ApplicationType::Call,
1099                        ruma::events::call::notify::NotifyType::Ring,
1100                        ruma::events::Mentions::new(),
1101                    )
1102                    .into_event()
1103            }
1104            is a candidate
1105        );
1106    }
1107
1108    #[test]
1109    fn test_sticker() {
1110        assert_latest_event_content!(
1111            event | event_factory | {
1112                event_factory
1113                    .sticker(
1114                        "wink wink",
1115                        ruma::events::room::ImageInfo::new(),
1116                        ruma::OwnedMxcUri::from("mxc://foo/bar"),
1117                    )
1118                    .into_event()
1119            }
1120            is a candidate
1121        );
1122    }
1123
1124    #[test]
1125    fn test_encrypted_room_message() {
1126        assert_latest_event_content!(
1127            event | event_factory | {
1128                event_factory
1129                    .event(ruma::events::room::encrypted::RoomEncryptedEventContent::new(
1130                        ruma::events::room::encrypted::EncryptedEventScheme::MegolmV1AesSha2(
1131                            ruma::events::room::encrypted::MegolmV1AesSha2ContentInit {
1132                                ciphertext: "cipher".to_owned(),
1133                                sender_key: "sender_key".to_owned(),
1134                                device_id: "device_id".into(),
1135                                session_id: "session_id".to_owned(),
1136                            }
1137                            .into(),
1138                        ),
1139                        None,
1140                    ))
1141                    .into_event()
1142            }
1143            is not a candidate
1144        );
1145    }
1146
1147    #[test]
1148    fn test_reaction() {
1149        // Take a random message-like event.
1150        assert_latest_event_content!(
1151            event | event_factory | { event_factory.reaction(event_id!("$ev0"), "+1").into_event() }
1152            is not a candidate
1153        );
1154    }
1155
1156    #[test]
1157    fn test_state_event() {
1158        assert_latest_event_content!(
1159            event | event_factory | { event_factory.room_topic("new room topic").into_event() }
1160            is not a candidate
1161        );
1162    }
1163
1164    #[test]
1165    fn test_knocked_state_event_without_power_levels() {
1166        assert_latest_event_content!(
1167            event | event_factory | {
1168                event_factory
1169                    .member(user_id!("@other_mnt_io:server.name"))
1170                    .membership(ruma::events::room::member::MembershipState::Knock)
1171                    .into_event()
1172            }
1173            is not a candidate
1174        );
1175    }
1176
1177    #[test]
1178    fn test_knocked_state_event_with_power_levels() {
1179        use ruma::{
1180            events::room::{
1181                member::MembershipState,
1182                power_levels::{RoomPowerLevels, RoomPowerLevelsSource},
1183            },
1184            room_version_rules::AuthorizationRules,
1185        };
1186
1187        let user_id = user_id!("@mnt_io:matrix.org");
1188        let other_user_id = user_id!("@other_mnt_io:server.name");
1189        let event_factory = EventFactory::new().sender(user_id);
1190        let event =
1191            event_factory.member(other_user_id).membership(MembershipState::Knock).into_event();
1192
1193        let mut room_power_levels =
1194            RoomPowerLevels::new(RoomPowerLevelsSource::None, &AuthorizationRules::V1, []);
1195        room_power_levels.users_default = 5.into();
1196
1197        // Cannot accept. Cannot decline.
1198        {
1199            let mut room_power_levels = room_power_levels.clone();
1200            room_power_levels.invite = 10.into();
1201            room_power_levels.kick = 10.into();
1202            assert!(
1203                filter_timeline_event(&event, &Some((user_id, room_power_levels))).not(),
1204                "cannot accept, cannot decline",
1205            );
1206        }
1207
1208        // Can accept. Cannot decline.
1209        {
1210            let mut room_power_levels = room_power_levels.clone();
1211            room_power_levels.invite = 0.into();
1212            room_power_levels.kick = 10.into();
1213            assert!(
1214                filter_timeline_event(&event, &Some((user_id, room_power_levels))),
1215                "can accept, cannot decline",
1216            );
1217        }
1218
1219        // Cannot accept. Can decline.
1220        {
1221            let mut room_power_levels = room_power_levels.clone();
1222            room_power_levels.invite = 10.into();
1223            room_power_levels.kick = 0.into();
1224            assert!(
1225                filter_timeline_event(&event, &Some((user_id, room_power_levels))),
1226                "cannot accept, can decline",
1227            );
1228        }
1229
1230        // Can accept. Can decline.
1231        {
1232            room_power_levels.invite = 0.into();
1233            room_power_levels.kick = 0.into();
1234            assert!(
1235                filter_timeline_event(&event, &Some((user_id, room_power_levels))),
1236                "can accept, can decline",
1237            );
1238        }
1239    }
1240
1241    #[test]
1242    fn test_room_message_verification_request() {
1243        use ruma::{events::room::message, OwnedDeviceId};
1244
1245        assert_latest_event_content!(
1246            event | event_factory | {
1247                event_factory
1248                    .event(RoomMessageEventContent::new(message::MessageType::VerificationRequest(
1249                        message::KeyVerificationRequestEventContent::new(
1250                            "body".to_owned(),
1251                            vec![],
1252                            OwnedDeviceId::from("device_id"),
1253                            user_id!("@user:server.name").to_owned(),
1254                        ),
1255                    )))
1256                    .into_event()
1257            }
1258            is not a candidate
1259        );
1260    }
1261}
1262
1263#[cfg(test)]
1264mod tests_latest_event_values_for_local_events {
1265    use assert_matches::assert_matches;
1266    use ruma::{
1267        events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
1268        serde::Raw,
1269        MilliSecondsSinceUnixEpoch, OwnedTransactionId,
1270    };
1271    use serde_json::json;
1272
1273    use super::{
1274        LatestEventValue, LatestEventValuesForLocalEvents, LocalLatestEventValue,
1275        RemoteLatestEventValue, SerializableEventContent,
1276    };
1277
1278    fn remote_room_message(body: &str) -> RemoteLatestEventValue {
1279        RemoteLatestEventValue::from_plaintext(
1280            Raw::from_json_string(
1281                json!({
1282                    "content": RoomMessageEventContent::text_plain(body),
1283                    "type": "m.room.message",
1284                    "event_id": "$ev0",
1285                    "room_id": "!r0",
1286                    "origin_server_ts": 42,
1287                    "sender": "@mnt_io:matrix.org",
1288                })
1289                .to_string(),
1290            )
1291            .unwrap(),
1292        )
1293    }
1294
1295    fn local_room_message(body: &str) -> LocalLatestEventValue {
1296        LocalLatestEventValue {
1297            timestamp: MilliSecondsSinceUnixEpoch::now(),
1298            content: SerializableEventContent::from_raw(
1299                Raw::new(&AnyMessageLikeEventContent::RoomMessage(
1300                    RoomMessageEventContent::text_plain(body),
1301                ))
1302                .unwrap(),
1303                "m.room.message".to_owned(),
1304            ),
1305        }
1306    }
1307
1308    #[test]
1309    fn test_last() {
1310        let mut buffer = LatestEventValuesForLocalEvents::new();
1311
1312        assert!(buffer.last().is_none());
1313
1314        buffer.push(
1315            OwnedTransactionId::from("txnid"),
1316            LatestEventValue::LocalIsSending(local_room_message("tome")),
1317        );
1318
1319        assert_matches!(buffer.last(), Some(LatestEventValue::LocalIsSending(_)));
1320    }
1321
1322    #[test]
1323    fn test_position() {
1324        let mut buffer = LatestEventValuesForLocalEvents::new();
1325        let transaction_id = OwnedTransactionId::from("txnid");
1326
1327        assert!(buffer.position(&transaction_id).is_none());
1328
1329        buffer.push(
1330            transaction_id.clone(),
1331            LatestEventValue::LocalIsSending(local_room_message("raclette")),
1332        );
1333        buffer.push(
1334            OwnedTransactionId::from("othertxnid"),
1335            LatestEventValue::LocalIsSending(local_room_message("tome")),
1336        );
1337
1338        assert_eq!(buffer.position(&transaction_id), Some(0));
1339    }
1340
1341    #[test]
1342    #[should_panic]
1343    fn test_push_none() {
1344        let mut buffer = LatestEventValuesForLocalEvents::new();
1345
1346        buffer.push(OwnedTransactionId::from("txnid"), LatestEventValue::None);
1347    }
1348
1349    #[test]
1350    #[should_panic]
1351    fn test_push_remote() {
1352        let mut buffer = LatestEventValuesForLocalEvents::new();
1353
1354        buffer.push(
1355            OwnedTransactionId::from("txnid"),
1356            LatestEventValue::Remote(remote_room_message("tome")),
1357        );
1358    }
1359
1360    #[test]
1361    fn test_push_local() {
1362        let mut buffer = LatestEventValuesForLocalEvents::new();
1363
1364        buffer.push(
1365            OwnedTransactionId::from("txnid0"),
1366            LatestEventValue::LocalIsSending(local_room_message("tome")),
1367        );
1368        buffer.push(
1369            OwnedTransactionId::from("txnid1"),
1370            LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1371        );
1372
1373        // no panic.
1374    }
1375
1376    #[test]
1377    fn test_replace_content() {
1378        let mut buffer = LatestEventValuesForLocalEvents::new();
1379
1380        buffer.push(
1381            OwnedTransactionId::from("txnid0"),
1382            LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1383        );
1384
1385        let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1386
1387        buffer.replace_content(0, new_content);
1388
1389        assert_matches!(
1390            buffer.last(),
1391            Some(LatestEventValue::LocalIsSending(local_event)) => {
1392                assert_matches!(
1393                    local_event.content.deserialize().unwrap(),
1394                    AnyMessageLikeEventContent::RoomMessage(content) => {
1395                        assert_eq!(content.body(), "comté");
1396                    }
1397                );
1398            }
1399        );
1400    }
1401
1402    #[test]
1403    fn test_remove() {
1404        let mut buffer = LatestEventValuesForLocalEvents::new();
1405
1406        buffer.push(
1407            OwnedTransactionId::from("txnid"),
1408            LatestEventValue::LocalIsSending(local_room_message("gryuère")),
1409        );
1410
1411        assert!(buffer.last().is_some());
1412
1413        buffer.remove(0);
1414
1415        assert!(buffer.last().is_none());
1416    }
1417
1418    #[test]
1419    fn test_mark_cannot_be_sent_from() {
1420        let mut buffer = LatestEventValuesForLocalEvents::new();
1421        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1422        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1423        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1424
1425        buffer.push(
1426            transaction_id_0,
1427            LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1428        );
1429        buffer.push(
1430            transaction_id_1.clone(),
1431            LatestEventValue::LocalIsSending(local_room_message("brigand")),
1432        );
1433        buffer.push(
1434            transaction_id_2,
1435            LatestEventValue::LocalIsSending(local_room_message("raclette")),
1436        );
1437
1438        buffer.mark_cannot_be_sent_from(&transaction_id_1);
1439
1440        assert_eq!(buffer.buffer.len(), 3);
1441        assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
1442        assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1443        assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalCannotBeSent(_));
1444    }
1445
1446    #[test]
1447    fn test_mark_is_sending_from() {
1448        let mut buffer = LatestEventValuesForLocalEvents::new();
1449        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1450        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1451        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1452
1453        buffer.push(
1454            transaction_id_0,
1455            LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1456        );
1457        buffer.push(
1458            transaction_id_1.clone(),
1459            LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1460        );
1461        buffer.push(
1462            transaction_id_2,
1463            LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1464        );
1465
1466        buffer.mark_is_sending_from(&transaction_id_1);
1467
1468        assert_eq!(buffer.buffer.len(), 3);
1469        assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1470        assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
1471        assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1472    }
1473
1474    #[test]
1475    fn test_mark_is_sending_after() {
1476        let mut buffer = LatestEventValuesForLocalEvents::new();
1477        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1478        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1479        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1480
1481        buffer.push(
1482            transaction_id_0,
1483            LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1484        );
1485        buffer.push(
1486            transaction_id_1.clone(),
1487            LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1488        );
1489        buffer.push(
1490            transaction_id_2,
1491            LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1492        );
1493
1494        buffer.mark_is_sending_after(&transaction_id_1);
1495
1496        assert_eq!(buffer.buffer.len(), 3);
1497        assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1498        assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1499        assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1500    }
1501}
1502
1503#[cfg(all(not(target_family = "wasm"), test))]
1504mod tests_latest_event_value_builder {
1505    use std::sync::Arc;
1506
1507    use assert_matches::assert_matches;
1508    use matrix_sdk_base::{
1509        deserialized_responses::TimelineEventKind,
1510        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1511        store::SerializableEventContent,
1512        RoomState,
1513    };
1514    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1515    use ruma::{
1516        event_id,
1517        events::{
1518            reaction::ReactionEventContent, relation::Annotation,
1519            room::message::RoomMessageEventContent, AnyMessageLikeEventContent,
1520            AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent,
1521        },
1522        room_id, user_id, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedTransactionId,
1523    };
1524
1525    use super::{
1526        LatestEventValue, LatestEventValueBuilder, LatestEventValuesForLocalEvents,
1527        RemoteLatestEventValue, RoomEventCache, RoomSendQueueUpdate,
1528    };
1529    use crate::{
1530        client::WeakClient,
1531        room::WeakRoom,
1532        send_queue::{AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle},
1533        test_utils::mocks::MatrixMockServer,
1534        Client, Error,
1535    };
1536
1537    macro_rules! assert_remote_value_matches_room_message_with_body {
1538        ( $latest_event_value:expr => with body = $body:expr ) => {
1539            assert_matches!(
1540                $latest_event_value,
1541                LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => {
1542                    assert_matches!(
1543                        event.deserialize().unwrap(),
1544                        AnySyncTimelineEvent::MessageLike(
1545                            AnySyncMessageLikeEvent::RoomMessage(
1546                                SyncMessageLikeEvent::Original(message_content)
1547                            )
1548                        ) => {
1549                            assert_eq!(message_content.content.body(), $body);
1550                        }
1551                    );
1552                }
1553            );
1554        };
1555    }
1556
1557    macro_rules! assert_local_value_matches_room_message_with_body {
1558        ( $latest_event_value:expr, $pattern:path => with body = $body:expr ) => {
1559            assert_matches!(
1560                $latest_event_value,
1561                $pattern (local_event) => {
1562                    assert_matches!(
1563                        local_event.content.deserialize().unwrap(),
1564                        AnyMessageLikeEventContent::RoomMessage(message_content) => {
1565                            assert_eq!(message_content.body(), $body);
1566                        }
1567                    );
1568                }
1569            );
1570        };
1571    }
1572
1573    #[async_test]
1574    async fn test_remote_is_scanning_event_backwards_from_event_cache() {
1575        let room_id = room_id!("!r0");
1576        let user_id = user_id!("@mnt_io:matrix.org");
1577        let event_factory = EventFactory::new().sender(user_id).room(room_id);
1578        let event_id_0 = event_id!("$ev0");
1579        let event_id_1 = event_id!("$ev1");
1580        let event_id_2 = event_id!("$ev2");
1581
1582        let server = MatrixMockServer::new().await;
1583        let client = server.client_builder().build().await;
1584
1585        // Prelude.
1586        {
1587            // Create the room.
1588            client.base_client().get_or_create_room(room_id, RoomState::Joined);
1589
1590            // Initialise the event cache store.
1591            client
1592                .event_cache_store()
1593                .lock()
1594                .await
1595                .unwrap()
1596                .handle_linked_chunk_updates(
1597                    LinkedChunkId::Room(room_id),
1598                    vec![
1599                        Update::NewItemsChunk {
1600                            previous: None,
1601                            new: ChunkIdentifier::new(0),
1602                            next: None,
1603                        },
1604                        Update::PushItems {
1605                            at: Position::new(ChunkIdentifier::new(0), 0),
1606                            items: vec![
1607                                // a latest event candidate
1608                                event_factory.text_msg("hello").event_id(event_id_0).into(),
1609                                // a latest event candidate
1610                                event_factory.text_msg("world").event_id(event_id_1).into(),
1611                                // not a latest event candidate
1612                                event_factory
1613                                    .room_topic("new room topic")
1614                                    .event_id(event_id_2)
1615                                    .into(),
1616                            ],
1617                        },
1618                    ],
1619                )
1620                .await
1621                .unwrap();
1622        }
1623
1624        let event_cache = client.event_cache();
1625        event_cache.subscribe().unwrap();
1626
1627        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1628        let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.to_owned());
1629
1630        assert_remote_value_matches_room_message_with_body!(
1631            // We get `event_id_1` because `event_id_2` isn't a candidate,
1632            // and `event_id_0` hasn't been read yet (because events are read
1633            // backwards).
1634            LatestEventValueBuilder::new_remote(&room_event_cache, &weak_room).await => with body = "world"
1635        );
1636    }
1637
1638    async fn local_prelude() -> (Client, OwnedRoomId, RoomSendQueue, RoomEventCache) {
1639        let room_id = room_id!("!r0").to_owned();
1640
1641        let server = MatrixMockServer::new().await;
1642        let client = server.client_builder().build().await;
1643        client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1644        let room = client.get_room(&room_id).unwrap();
1645
1646        let event_cache = client.event_cache();
1647        event_cache.subscribe().unwrap();
1648
1649        let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
1650
1651        let send_queue = client.send_queue();
1652        let room_send_queue = send_queue.for_room(room);
1653
1654        (client, room_id, room_send_queue, room_event_cache)
1655    }
1656
1657    fn new_local_echo_content(
1658        room_send_queue: &RoomSendQueue,
1659        transaction_id: &OwnedTransactionId,
1660        body: &str,
1661    ) -> LocalEchoContent {
1662        LocalEchoContent::Event {
1663            serialized_event: SerializableEventContent::new(
1664                &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
1665            )
1666            .unwrap(),
1667            send_handle: SendHandle::new(
1668                room_send_queue.clone(),
1669                transaction_id.clone(),
1670                MilliSecondsSinceUnixEpoch::now(),
1671            ),
1672            send_error: None,
1673        }
1674    }
1675
1676    #[async_test]
1677    async fn test_local_new_local_event() {
1678        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1679
1680        let mut buffer = LatestEventValuesForLocalEvents::new();
1681
1682        // Receiving one `NewLocalEvent`.
1683        {
1684            let transaction_id = OwnedTransactionId::from("txnid0");
1685            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
1686
1687            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1688
1689            // The `LatestEventValue` matches the new local event.
1690            assert_local_value_matches_room_message_with_body!(
1691                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1692                LatestEventValue::LocalIsSending => with body = "A"
1693            );
1694        }
1695
1696        // Receiving another `NewLocalEvent`, ensuring it's pushed back in the buffer.
1697        {
1698            let transaction_id = OwnedTransactionId::from("txnid1");
1699            let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
1700
1701            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1702
1703            // The `LatestEventValue` matches the new local event.
1704            assert_local_value_matches_room_message_with_body!(
1705                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1706                LatestEventValue::LocalIsSending => with body = "B"
1707            );
1708        }
1709
1710        assert_eq!(buffer.buffer.len(), 2);
1711    }
1712
1713    #[async_test]
1714    async fn test_local_cancelled_local_event() {
1715        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1716
1717        let mut buffer = LatestEventValuesForLocalEvents::new();
1718        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1719        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1720        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1721
1722        // Receiving three `NewLocalEvent`s.
1723        {
1724            for (transaction_id, body) in
1725                [(&transaction_id_0, "A"), (&transaction_id_1, "B"), (&transaction_id_2, "C")]
1726            {
1727                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
1728
1729                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1730                    transaction_id: transaction_id.clone(),
1731                    content,
1732                });
1733
1734                // The `LatestEventValue` matches the new local event.
1735                assert_local_value_matches_room_message_with_body!(
1736                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1737                    LatestEventValue::LocalIsSending => with body = body
1738                );
1739            }
1740
1741            assert_eq!(buffer.buffer.len(), 3);
1742        }
1743
1744        // Receiving a `CancelledLocalEvent` targeting the second event. The
1745        // `LatestEventValue` must not change.
1746        {
1747            let update = RoomSendQueueUpdate::CancelledLocalEvent {
1748                transaction_id: transaction_id_1.clone(),
1749            };
1750
1751            // The `LatestEventValue` hasn't changed, it still matches the latest local
1752            // event.
1753            assert_local_value_matches_room_message_with_body!(
1754                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1755                LatestEventValue::LocalIsSending => with body = "C"
1756            );
1757
1758            assert_eq!(buffer.buffer.len(), 2);
1759        }
1760
1761        // Receiving a `CancelledLocalEvent` targeting the second (so the last) event.
1762        // The `LatestEventValue` must point to the first local event.
1763        {
1764            let update = RoomSendQueueUpdate::CancelledLocalEvent {
1765                transaction_id: transaction_id_2.clone(),
1766            };
1767
1768            // The `LatestEventValue` has changed, it matches the previous (so the first)
1769            // local event.
1770            assert_local_value_matches_room_message_with_body!(
1771                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1772                LatestEventValue::LocalIsSending => with body = "A"
1773            );
1774
1775            assert_eq!(buffer.buffer.len(), 1);
1776        }
1777
1778        // Receiving a `CancelledLocalEvent` targeting the first (so the last) event.
1779        // The `LatestEventValue` cannot be computed from the send queue and will
1780        // fallback to the event cache. The event cache is empty in this case, so we get
1781        // nothing.
1782        {
1783            let update =
1784                RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: transaction_id_0 };
1785
1786            // The `LatestEventValue` has changed, it's empty!
1787            assert_matches!(
1788                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None)
1789                    .await,
1790                LatestEventValue::None
1791            );
1792
1793            assert!(buffer.buffer.is_empty());
1794        }
1795    }
1796
1797    #[async_test]
1798    async fn test_local_sent_event() {
1799        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1800
1801        let mut buffer = LatestEventValuesForLocalEvents::new();
1802        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1803        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1804
1805        // Receiving two `NewLocalEvent`s.
1806        {
1807            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
1808                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
1809
1810                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1811                    transaction_id: transaction_id.clone(),
1812                    content,
1813                });
1814
1815                // The `LatestEventValue` matches the new local event.
1816                assert_local_value_matches_room_message_with_body!(
1817                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1818                    LatestEventValue::LocalIsSending => with body = body
1819                );
1820            }
1821
1822            assert_eq!(buffer.buffer.len(), 2);
1823        }
1824
1825        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
1826        // must not change.
1827        {
1828            let update = RoomSendQueueUpdate::SentEvent {
1829                transaction_id: transaction_id_0.clone(),
1830                event_id: event_id!("$ev0").to_owned(),
1831            };
1832
1833            // The `LatestEventValue` hasn't changed, it still matches the latest local
1834            // event.
1835            assert_local_value_matches_room_message_with_body!(
1836                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1837                LatestEventValue::LocalIsSending => with body = "B"
1838            );
1839
1840            assert_eq!(buffer.buffer.len(), 1);
1841        }
1842
1843        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
1844        // hasn't changed, this is still this event.
1845        {
1846            let update = RoomSendQueueUpdate::SentEvent {
1847                transaction_id: transaction_id_1,
1848                event_id: event_id!("$ev1").to_owned(),
1849            };
1850
1851            // The `LatestEventValue` hasn't changed.
1852            assert_local_value_matches_room_message_with_body!(
1853                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1854                LatestEventValue::LocalIsSending => with body = "B"
1855            );
1856
1857            assert!(buffer.buffer.is_empty());
1858        }
1859    }
1860
1861    #[async_test]
1862    async fn test_local_replaced_local_event() {
1863        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1864
1865        let mut buffer = LatestEventValuesForLocalEvents::new();
1866        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1867        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1868
1869        // Receiving two `NewLocalEvent`s.
1870        {
1871            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
1872                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
1873
1874                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1875                    transaction_id: transaction_id.clone(),
1876                    content,
1877                });
1878
1879                // The `LatestEventValue` matches the new local event.
1880                assert_local_value_matches_room_message_with_body!(
1881                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1882                    LatestEventValue::LocalIsSending => with body = body
1883                );
1884            }
1885
1886            assert_eq!(buffer.buffer.len(), 2);
1887        }
1888
1889        // Receiving a `ReplacedLocalEvent` targeting the first event. The
1890        // `LatestEventValue` must not change.
1891        {
1892            let transaction_id = &transaction_id_0;
1893            let LocalEchoContent::Event { serialized_event: new_content, .. } =
1894                new_local_echo_content(&room_send_queue, transaction_id, "A.")
1895            else {
1896                panic!("oopsy");
1897            };
1898
1899            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
1900                transaction_id: transaction_id.clone(),
1901                new_content,
1902            };
1903
1904            // The `LatestEventValue` hasn't changed, it still matches the latest local
1905            // event.
1906            assert_local_value_matches_room_message_with_body!(
1907                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1908                LatestEventValue::LocalIsSending => with body = "B"
1909            );
1910
1911            assert_eq!(buffer.buffer.len(), 2);
1912        }
1913
1914        // Receiving a `ReplacedLocalEvent` targeting the second (so the last) event.
1915        // The `LatestEventValue` is changing.
1916        {
1917            let transaction_id = &transaction_id_1;
1918            let LocalEchoContent::Event { serialized_event: new_content, .. } =
1919                new_local_echo_content(&room_send_queue, transaction_id, "B.")
1920            else {
1921                panic!("oopsy");
1922            };
1923
1924            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
1925                transaction_id: transaction_id.clone(),
1926                new_content,
1927            };
1928
1929            // The `LatestEventValue` has changed, it still matches the latest local
1930            // event but with its new content.
1931            assert_local_value_matches_room_message_with_body!(
1932                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1933                LatestEventValue::LocalIsSending => with body = "B."
1934            );
1935
1936            assert_eq!(buffer.buffer.len(), 2);
1937        }
1938    }
1939
1940    #[async_test]
1941    async fn test_local_replaced_local_event_by_a_non_suitable_event() {
1942        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1943
1944        let mut buffer = LatestEventValuesForLocalEvents::new();
1945        let transaction_id = OwnedTransactionId::from("txnid0");
1946
1947        // Receiving one `NewLocalEvent`.
1948        {
1949            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
1950
1951            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1952                transaction_id: transaction_id.clone(),
1953                content,
1954            });
1955
1956            // The `LatestEventValue` matches the new local event.
1957            assert_local_value_matches_room_message_with_body!(
1958                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1959                LatestEventValue::LocalIsSending => with body = "A"
1960            );
1961
1962            assert_eq!(buffer.buffer.len(), 1);
1963        }
1964
1965        // Receiving a `ReplacedLocalEvent` targeting the first event. Sadly, the new
1966        // event cannot be mapped to a `LatestEventValue`! The first event is removed
1967        // from the buffer, and the `LatestEventValue` becomes `None` because there is
1968        // no other alternative.
1969        {
1970            let new_content = SerializableEventContent::new(&AnyMessageLikeEventContent::Reaction(
1971                ReactionEventContent::new(Annotation::new(
1972                    event_id!("$ev0").to_owned(),
1973                    "+1".to_owned(),
1974                )),
1975            ))
1976            .unwrap();
1977
1978            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
1979                transaction_id: transaction_id.clone(),
1980                new_content,
1981            };
1982
1983            // The `LatestEventValue` has changed!
1984            assert_matches!(
1985                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None)
1986                    .await,
1987                LatestEventValue::None
1988            );
1989
1990            assert_eq!(buffer.buffer.len(), 0);
1991        }
1992    }
1993
1994    #[async_test]
1995    async fn test_local_send_error() {
1996        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1997
1998        let mut buffer = LatestEventValuesForLocalEvents::new();
1999        let transaction_id_0 = OwnedTransactionId::from("txnid0");
2000        let transaction_id_1 = OwnedTransactionId::from("txnid1");
2001
2002        // Receiving two `NewLocalEvent`s.
2003        {
2004            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2005                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2006
2007                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2008                    transaction_id: transaction_id.clone(),
2009                    content,
2010                });
2011
2012                // The `LatestEventValue` matches the new local event.
2013                assert_local_value_matches_room_message_with_body!(
2014                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2015                    LatestEventValue::LocalIsSending => with body = body
2016                );
2017            }
2018
2019            assert_eq!(buffer.buffer.len(), 2);
2020        }
2021
2022        // Receiving a `SendError` targeting the first event. The
2023        // `LatestEventValue` must change to indicate it's “cannot be sent”.
2024        {
2025            let update = RoomSendQueueUpdate::SendError {
2026                transaction_id: transaction_id_0.clone(),
2027                error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2028                is_recoverable: true,
2029            };
2030
2031            // The `LatestEventValue` has changed, it still matches the latest local
2032            // event but it's marked as “cannot be sent”.
2033            assert_local_value_matches_room_message_with_body!(
2034                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2035                LatestEventValue::LocalCannotBeSent => with body = "B"
2036            );
2037
2038            assert_eq!(buffer.buffer.len(), 2);
2039            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2040            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2041        }
2042
2043        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
2044        // must change: since an event has been sent, the following events are now
2045        // “is sending”.
2046        {
2047            let update = RoomSendQueueUpdate::SentEvent {
2048                transaction_id: transaction_id_0.clone(),
2049                event_id: event_id!("$ev0").to_owned(),
2050            };
2051
2052            // The `LatestEventValue` has changed, it still matches the latest local
2053            // event but it's “is sending”.
2054            assert_local_value_matches_room_message_with_body!(
2055                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2056                LatestEventValue::LocalIsSending => with body = "B"
2057            );
2058
2059            assert_eq!(buffer.buffer.len(), 1);
2060            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
2061        }
2062    }
2063
2064    #[async_test]
2065    async fn test_local_retry_event() {
2066        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2067
2068        let mut buffer = LatestEventValuesForLocalEvents::new();
2069        let transaction_id_0 = OwnedTransactionId::from("txnid0");
2070        let transaction_id_1 = OwnedTransactionId::from("txnid1");
2071
2072        // Receiving two `NewLocalEvent`s.
2073        {
2074            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2075                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2076
2077                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2078                    transaction_id: transaction_id.clone(),
2079                    content,
2080                });
2081
2082                // The `LatestEventValue` matches the new local event.
2083                assert_local_value_matches_room_message_with_body!(
2084                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2085                    LatestEventValue::LocalIsSending => with body = body
2086                );
2087            }
2088
2089            assert_eq!(buffer.buffer.len(), 2);
2090        }
2091
2092        // Receiving a `SendError` targeting the first event. The
2093        // `LatestEventValue` must change to indicate it's “cannot be sent”.
2094        {
2095            let update = RoomSendQueueUpdate::SendError {
2096                transaction_id: transaction_id_0.clone(),
2097                error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2098                is_recoverable: true,
2099            };
2100
2101            // The `LatestEventValue` has changed, it still matches the latest local
2102            // event but it's marked as “cannot be sent”.
2103            assert_local_value_matches_room_message_with_body!(
2104                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2105                LatestEventValue::LocalCannotBeSent => with body = "B"
2106            );
2107
2108            assert_eq!(buffer.buffer.len(), 2);
2109            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2110            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2111        }
2112
2113        // Receiving a `RetryEvent` targeting the first event. The `LatestEventValue`
2114        // must change: this local event and its following must be “is sending”.
2115        {
2116            let update =
2117                RoomSendQueueUpdate::RetryEvent { transaction_id: transaction_id_0.clone() };
2118
2119            // The `LatestEventValue` has changed, it still matches the latest local
2120            // event but it's “is sending”.
2121            assert_local_value_matches_room_message_with_body!(
2122                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2123                LatestEventValue::LocalIsSending => with body = "B"
2124            );
2125
2126            assert_eq!(buffer.buffer.len(), 2);
2127            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
2128            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
2129        }
2130    }
2131
2132    #[async_test]
2133    async fn test_local_media_upload() {
2134        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2135
2136        let mut buffer = LatestEventValuesForLocalEvents::new();
2137        let transaction_id = OwnedTransactionId::from("txnid");
2138
2139        // Receiving a `NewLocalEvent`.
2140        {
2141            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2142
2143            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2144                transaction_id: transaction_id.clone(),
2145                content,
2146            });
2147
2148            // The `LatestEventValue` matches the new local event.
2149            assert_local_value_matches_room_message_with_body!(
2150                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2151                LatestEventValue::LocalIsSending => with body = "A"
2152            );
2153
2154            assert_eq!(buffer.buffer.len(), 1);
2155        }
2156
2157        // Receiving a `MediaUpload` targeting the first event. The
2158        // `LatestEventValue` must not change as `MediaUpload` are ignored.
2159        {
2160            let update = RoomSendQueueUpdate::MediaUpload {
2161                related_to: transaction_id,
2162                file: None,
2163                index: 0,
2164                progress: AbstractProgress { current: 0, total: 0 },
2165            };
2166
2167            // The `LatestEventValue` has changed somehow, it tells no new
2168            // `LatestEventValue` is computed.
2169            assert_matches!(
2170                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None)
2171                    .await,
2172                LatestEventValue::None
2173            );
2174
2175            assert_eq!(buffer.buffer.len(), 1);
2176        }
2177    }
2178
2179    #[async_test]
2180    async fn test_local_fallbacks_to_remote_when_empty() {
2181        let room_id = room_id!("!r0");
2182        let user_id = user_id!("@mnt_io:matrix.org");
2183        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2184        let event_id_0 = event_id!("$ev0");
2185        let event_id_1 = event_id!("$ev1");
2186
2187        let server = MatrixMockServer::new().await;
2188        let client = server.client_builder().build().await;
2189
2190        // Prelude.
2191        {
2192            // Create the room.
2193            client.base_client().get_or_create_room(room_id, RoomState::Joined);
2194
2195            // Initialise the event cache store.
2196            client
2197                .event_cache_store()
2198                .lock()
2199                .await
2200                .unwrap()
2201                .handle_linked_chunk_updates(
2202                    LinkedChunkId::Room(room_id),
2203                    vec![
2204                        Update::NewItemsChunk {
2205                            previous: None,
2206                            new: ChunkIdentifier::new(0),
2207                            next: None,
2208                        },
2209                        Update::PushItems {
2210                            at: Position::new(ChunkIdentifier::new(0), 0),
2211                            items: vec![event_factory
2212                                .text_msg("hello")
2213                                .event_id(event_id_0)
2214                                .into()],
2215                        },
2216                    ],
2217                )
2218                .await
2219                .unwrap();
2220        }
2221
2222        let event_cache = client.event_cache();
2223        event_cache.subscribe().unwrap();
2224
2225        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2226
2227        let mut buffer = LatestEventValuesForLocalEvents::new();
2228
2229        // We get a `Remote` because there is no `Local*` values!
2230        assert_remote_value_matches_room_message_with_body!(
2231            LatestEventValueBuilder::new_local(
2232                // An update that won't be create a new `LatestEventValue`.
2233                &RoomSendQueueUpdate::SentEvent {
2234                    transaction_id: OwnedTransactionId::from("txnid"),
2235                    event_id: event_id_1.to_owned(),
2236                },
2237                &mut buffer,
2238                &room_event_cache,
2239                &None,
2240            )
2241            .await
2242             => with body = "hello"
2243        );
2244    }
2245}