matrix_sdk/latest_events/
latest_event.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    iter::once,
17    ops::{Deref, Not},
18};
19
20use eyeball::{AsyncLock, SharedObservable, Subscriber};
21pub use matrix_sdk_base::latest_event::{
22    LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue,
23};
24use matrix_sdk_base::{
25    RoomInfoNotableUpdateReasons, StateChanges, deserialized_responses::TimelineEvent,
26    store::SerializableEventContent,
27};
28use ruma::{
29    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
30    events::{
31        AnyMessageLikeEventContent, AnySyncStateEvent, AnySyncTimelineEvent, SyncStateEvent,
32        relation::Replacement,
33        room::{
34            member::MembershipState,
35            message::{MessageType, Relation},
36            power_levels::RoomPowerLevels,
37        },
38    },
39};
40use tracing::{error, instrument, warn};
41
42use crate::{event_cache::RoomEventCache, room::WeakRoom, send_queue::RoomSendQueueUpdate};
43
44/// The latest event of a room or a thread.
45///
46/// Use [`LatestEvent::subscribe`] to get a stream of updates.
47#[derive(Debug)]
48pub(super) struct LatestEvent {
49    /// The room owning this latest event.
50    weak_room: WeakRoom,
51
52    /// The thread (if any) owning this latest event.
53    _thread_id: Option<OwnedEventId>,
54
55    /// A buffer of the current [`LatestEventValue`] computed for local events
56    /// seen by the send queue. See [`LatestEventValuesForLocalEvents`] to learn
57    /// more.
58    buffer_of_values_for_local_events: LatestEventValuesForLocalEvents,
59
60    /// The latest event value.
61    current_value: SharedObservable<LatestEventValue, AsyncLock>,
62}
63
64impl LatestEvent {
65    pub(super) async fn new(
66        weak_room: &WeakRoom,
67        thread_id: Option<&EventId>,
68        room_event_cache: &RoomEventCache,
69    ) -> Self {
70        Self {
71            weak_room: weak_room.clone(),
72            _thread_id: thread_id.map(ToOwned::to_owned),
73            buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(),
74            current_value: SharedObservable::new_async(
75                LatestEventValueBuilder::new_remote(room_event_cache, weak_room).await,
76            ),
77        }
78    }
79
80    /// Return a [`Subscriber`] to new values.
81    pub async fn subscribe(&self) -> Subscriber<LatestEventValue, AsyncLock> {
82        self.current_value.subscribe().await
83    }
84
85    /// Update the inner latest event value, based on the event cache
86    /// (specifically with the [`RoomEventCache`]), if and only if there is no
87    /// local latest event value waiting.
88    ///
89    /// It is only necessary to compute a new [`LatestEventValue`] from the
90    /// event cache if there is no [`LatestEventValue`] to be compute from the
91    /// send queue. Indeed, anything coming from the send queue has the priority
92    /// over the anything coming from the event cache. We believe it provides a
93    /// better user experience.
94    pub async fn update_with_event_cache(
95        &mut self,
96        room_event_cache: &RoomEventCache,
97        own_user_id: Option<&UserId>,
98        power_levels: Option<&RoomPowerLevels>,
99    ) {
100        if self.buffer_of_values_for_local_events.is_empty().not() {
101            // At least one `LatestEventValue` exists for local events (i.e. coming from the
102            // send queue). In this case, we don't overwrite the current value with a newly
103            // computed one from the event cache.
104            return;
105        }
106
107        let new_value = LatestEventValueBuilder::new_remote_with_power_levels(
108            room_event_cache,
109            own_user_id,
110            power_levels,
111        )
112        .await;
113
114        self.update(new_value).await;
115    }
116
117    /// Update the inner latest event value, based on the send queue
118    /// (specifically with the [`RoomSendQueueUpdate`]).
119    pub async fn update_with_send_queue(
120        &mut self,
121        send_queue_update: &RoomSendQueueUpdate,
122        room_event_cache: &RoomEventCache,
123        own_user_id: Option<&UserId>,
124        power_levels: Option<&RoomPowerLevels>,
125    ) {
126        let new_value = LatestEventValueBuilder::new_local(
127            send_queue_update,
128            &mut self.buffer_of_values_for_local_events,
129            room_event_cache,
130            own_user_id,
131            power_levels,
132        )
133        .await;
134
135        self.update(new_value).await;
136    }
137
138    /// Update [`Self::current_value`] if and only if the `new_value` is not
139    /// [`LatestEventValue::None`].
140    async fn update(&mut self, new_value: LatestEventValue) {
141        if let LatestEventValue::None = new_value {
142            // Do not update to a `None` value.
143        } else {
144            self.current_value.set(new_value.clone()).await;
145            self.store(new_value).await;
146        }
147    }
148
149    /// Update the `RoomInfo` associated to this room to set the new
150    /// [`LatestEventValue`], and persist it in the
151    /// [`StateStore`][matrix_sdk_base::StateStore] (the one from
152    /// [`Client::state_store`][crate::Client::state_store]).
153    #[instrument(skip_all)]
154    async fn store(&mut self, new_value: LatestEventValue) {
155        let Some(room) = self.weak_room.get() else {
156            warn!(room_id = ?self.weak_room.room_id(), "Cannot store the latest event value because the room cannot be accessed");
157            return;
158        };
159
160        // Compute a new `RoomInfo`.
161        let mut room_info = room.clone_info();
162        room_info.set_new_latest_event(new_value);
163
164        let mut state_changes = StateChanges::default();
165        state_changes.add_room(room_info.clone());
166
167        let client = room.client();
168
169        // Take the state store lock.
170        let _state_store_lock = client.base_client().state_store_lock().lock().await;
171
172        // Update the `RoomInfo` in the state store.
173        if let Err(error) = client.state_store().save_changes(&state_changes).await {
174            error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
175        }
176
177        // Update the `RoomInfo` of the room.
178        room.set_room_info(room_info, RoomInfoNotableUpdateReasons::LATEST_EVENT);
179    }
180}
181
182#[cfg(all(not(target_family = "wasm"), test))]
183mod tests_latest_event {
184    use assert_matches::assert_matches;
185    use matrix_sdk_base::{
186        RoomInfoNotableUpdateReasons, RoomState,
187        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
188        store::StoreConfig,
189    };
190    use matrix_sdk_test::{async_test, event_factory::EventFactory};
191    use ruma::{
192        MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
193        events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
194        room_id, user_id,
195    };
196
197    use super::{LatestEvent, LatestEventValue, LocalLatestEventValue, SerializableEventContent};
198    use crate::{
199        client::WeakClient,
200        room::WeakRoom,
201        send_queue::{LocalEcho, LocalEchoContent, RoomSendQueue, RoomSendQueueUpdate, SendHandle},
202        test_utils::mocks::MatrixMockServer,
203    };
204
205    fn local_room_message(body: &str) -> LocalLatestEventValue {
206        LocalLatestEventValue {
207            timestamp: MilliSecondsSinceUnixEpoch::now(),
208            content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
209                RoomMessageEventContent::text_plain(body),
210            ))
211            .unwrap(),
212        }
213    }
214
215    fn new_local_echo_content(
216        room_send_queue: &RoomSendQueue,
217        transaction_id: &OwnedTransactionId,
218        body: &str,
219    ) -> LocalEchoContent {
220        LocalEchoContent::Event {
221            serialized_event: SerializableEventContent::new(
222                &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
223            )
224            .unwrap(),
225            send_handle: SendHandle::new(
226                room_send_queue.clone(),
227                transaction_id.clone(),
228                MilliSecondsSinceUnixEpoch::now(),
229            ),
230            send_error: None,
231        }
232    }
233
234    #[async_test]
235    async fn test_update_ignores_none_value() {
236        let room_id = room_id!("!r0");
237
238        let server = MatrixMockServer::new().await;
239        let client = server.client_builder().build().await;
240        let weak_client = WeakClient::from_client(&client);
241
242        // Create the room.
243        client.base_client().get_or_create_room(room_id, RoomState::Joined);
244        let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
245
246        // Get a `RoomEventCache`.
247        let event_cache = client.event_cache();
248        event_cache.subscribe().unwrap();
249
250        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
251
252        let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
253
254        // First off, check the default value is `None`!
255        assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
256
257        // Second, set a new value.
258        latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
259
260        assert_matches!(
261            latest_event.current_value.get().await,
262            LatestEventValue::LocalIsSending(_)
263        );
264
265        // Finally, set a new `None` value. It must be ignored.
266        latest_event.update(LatestEventValue::None).await;
267
268        assert_matches!(
269            latest_event.current_value.get().await,
270            LatestEventValue::LocalIsSending(_)
271        );
272    }
273
274    #[async_test]
275    async fn test_local_has_priority_over_remote() {
276        let room_id = room_id!("!r0").to_owned();
277        let user_id = user_id!("@mnt_io:matrix.org");
278        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
279
280        let server = MatrixMockServer::new().await;
281        let client = server.client_builder().build().await;
282        client.base_client().get_or_create_room(&room_id, RoomState::Joined);
283        let room = client.get_room(&room_id).unwrap();
284        let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
285
286        let event_cache = client.event_cache();
287        event_cache.subscribe().unwrap();
288
289        // Fill the event cache with one event.
290        client
291            .event_cache_store()
292            .lock()
293            .await
294            .expect("Could not acquire the event cache lock")
295            .as_clean()
296            .expect("Could not acquire a clean event cache lock")
297            .handle_linked_chunk_updates(
298                LinkedChunkId::Room(&room_id),
299                vec![
300                    Update::NewItemsChunk {
301                        previous: None,
302                        new: ChunkIdentifier::new(0),
303                        next: None,
304                    },
305                    Update::PushItems {
306                        at: Position::new(ChunkIdentifier::new(0), 0),
307                        items: vec![event_factory.text_msg("A").event_id(event_id!("$ev0")).into()],
308                    },
309                ],
310            )
311            .await
312            .unwrap();
313
314        let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
315
316        let send_queue = client.send_queue();
317        let room_send_queue = send_queue.for_room(room);
318
319        let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
320
321        // First, let's create a `LatestEventValue` from the event cache. It must work.
322        {
323            latest_event.update_with_event_cache(&room_event_cache, None, None).await;
324
325            assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
326        }
327
328        // Second, let's create a `LatestEventValue` from the send queue. It
329        // must overwrite the current `LatestEventValue`.
330        let transaction_id = OwnedTransactionId::from("txnid0");
331
332        {
333            let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
334
335            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
336                transaction_id: transaction_id.clone(),
337                content,
338            });
339
340            latest_event.update_with_send_queue(&update, &room_event_cache, None, None).await;
341
342            assert_matches!(
343                latest_event.current_value.get().await,
344                LatestEventValue::LocalIsSending(_)
345            );
346        }
347
348        // Third, let's create a `LatestEventValue` from the event cache.
349        // Nothing must happen, it cannot overwrite the current
350        // `LatestEventValue` because the local event isn't sent yet.
351        {
352            latest_event.update_with_event_cache(&room_event_cache, None, None).await;
353
354            assert_matches!(
355                latest_event.current_value.get().await,
356                LatestEventValue::LocalIsSending(_)
357            );
358        }
359
360        // Fourth, let's a `LatestEventValue` from the send queue. It must stay the
361        // same, but now the local event is sent.
362        {
363            let update = RoomSendQueueUpdate::SentEvent {
364                transaction_id,
365                event_id: event_id!("$ev1").to_owned(),
366            };
367
368            latest_event.update_with_send_queue(&update, &room_event_cache, None, None).await;
369
370            assert_matches!(
371                latest_event.current_value.get().await,
372                LatestEventValue::LocalIsSending(_)
373            );
374        }
375
376        // Finally, let's create a `LatestEventValue` from the event cache. _Now_ it's
377        // possible, because there is no more local events.
378        {
379            latest_event.update_with_event_cache(&room_event_cache, None, None).await;
380
381            assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
382        }
383    }
384
385    #[async_test]
386    async fn test_store_latest_event_value() {
387        let room_id = room_id!("!r0").to_owned();
388        let user_id = user_id!("@mnt_io:matrix.org");
389        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
390
391        let server = MatrixMockServer::new().await;
392
393        let store_config = StoreConfig::new("cross-process-lock-holder".to_owned());
394
395        // Load the client for the first time, and run some operations.
396        {
397            let client = server
398                .client_builder()
399                .on_builder(|builder| builder.store_config(store_config.clone()))
400                .build()
401                .await;
402            let mut room_info_notable_update_receiver = client.room_info_notable_update_receiver();
403            let room = client.base_client().get_or_create_room(&room_id, RoomState::Joined);
404            let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
405
406            let event_cache = client.event_cache();
407            event_cache.subscribe().unwrap();
408
409            // Fill the event cache with one event.
410            client
411                .event_cache_store()
412                .lock()
413                .await
414                .expect("Could not acquire the event cache lock")
415                .as_clean()
416                .expect("Could not acquire a clean event cache lock")
417                .handle_linked_chunk_updates(
418                    LinkedChunkId::Room(&room_id),
419                    vec![
420                        Update::NewItemsChunk {
421                            previous: None,
422                            new: ChunkIdentifier::new(0),
423                            next: None,
424                        },
425                        Update::PushItems {
426                            at: Position::new(ChunkIdentifier::new(0), 0),
427                            items: vec![
428                                event_factory.text_msg("A").event_id(event_id!("$ev0")).into(),
429                            ],
430                        },
431                    ],
432                )
433                .await
434                .unwrap();
435
436            let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
437
438            // Check there is no `LatestEventValue` for the moment.
439            {
440                let latest_event = room.new_latest_event();
441
442                assert_matches!(latest_event, LatestEventValue::None);
443            }
444
445            // Generate a new `LatestEventValue`.
446            {
447                let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
448                latest_event.update_with_event_cache(&room_event_cache, None, None).await;
449
450                assert_matches!(
451                    latest_event.current_value.get().await,
452                    LatestEventValue::Remote(_)
453                );
454            }
455
456            // We see the `RoomInfoNotableUpdateReasons`.
457            {
458                let update = room_info_notable_update_receiver.recv().await.unwrap();
459
460                assert_eq!(update.room_id, room_id);
461                assert!(update.reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
462            }
463
464            // Check it's in the `RoomInfo` and in `Room`.
465            {
466                let latest_event = room.new_latest_event();
467
468                assert_matches!(latest_event, LatestEventValue::Remote(_));
469            }
470        }
471
472        // Reload the client with the same store config, and see the `LatestEventValue`
473        // is inside the `RoomInfo`.
474        {
475            let client = server
476                .client_builder()
477                .on_builder(|builder| builder.store_config(store_config))
478                .build()
479                .await;
480            let room = client.get_room(&room_id).unwrap();
481            let latest_event = room.new_latest_event();
482
483            assert_matches!(latest_event, LatestEventValue::Remote(_));
484        }
485    }
486}
487
488/// A builder of [`LatestEventValue`]s.
489struct LatestEventValueBuilder;
490
491impl LatestEventValueBuilder {
492    /// Create a new [`LatestEventValue::Remote`].
493    async fn new_remote(
494        room_event_cache: &RoomEventCache,
495        weak_room: &WeakRoom,
496    ) -> LatestEventValue {
497        // Get the power levels of the user for the current room if the `WeakRoom` is
498        // still valid.
499        let room = weak_room.get();
500        let (own_user_id, power_levels) = match &room {
501            Some(room) => {
502                let power_levels = room.power_levels().await.ok();
503
504                (Some(room.own_user_id()), power_levels)
505            }
506
507            None => (None, None),
508        };
509
510        Self::new_remote_with_power_levels(room_event_cache, own_user_id, power_levels.as_ref())
511            .await
512    }
513
514    /// Create a new [`LatestEventValue::Remote`] based on existing power
515    /// levels.
516    async fn new_remote_with_power_levels(
517        room_event_cache: &RoomEventCache,
518        own_user_id: Option<&UserId>,
519        power_levels: Option<&RoomPowerLevels>,
520    ) -> LatestEventValue {
521        if let Ok(Some(event)) = room_event_cache
522            .rfind_map_event_in_memory_by(|event, previous_event_id| {
523                filter_timeline_event(event, previous_event_id, own_user_id, power_levels)
524                    .then(|| event.clone())
525            })
526            .await
527        {
528            LatestEventValue::Remote(event)
529        } else {
530            LatestEventValue::default()
531        }
532    }
533
534    /// Create a new [`LatestEventValue::LocalIsSending`] or
535    /// [`LatestEventValue::LocalCannotBeSent`].
536    async fn new_local(
537        send_queue_update: &RoomSendQueueUpdate,
538        buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
539        room_event_cache: &RoomEventCache,
540        own_user_id: Option<&UserId>,
541        power_levels: Option<&RoomPowerLevels>,
542    ) -> LatestEventValue {
543        use crate::send_queue::{LocalEcho, LocalEchoContent};
544
545        match send_queue_update {
546            // A new local event is being sent.
547            //
548            // Let's create the `LatestEventValue` and push it in the buffer of values.
549            RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
550                transaction_id,
551                content: local_echo_content,
552            }) => match local_echo_content {
553                LocalEchoContent::Event { serialized_event: serialized_event_content, .. } => {
554                    match serialized_event_content.deserialize() {
555                        Ok(content) => {
556                            if filter_any_message_like_event_content(content, None) {
557                                let local_value = LocalLatestEventValue {
558                                    timestamp: MilliSecondsSinceUnixEpoch::now(),
559                                    content: serialized_event_content.clone(),
560                                };
561
562                                // If a local previous `LatestEventValue` exists and has been marked
563                                // as “cannot be sent”, it means the new `LatestEventValue` must
564                                // also be marked as “cannot be sent”.
565                                let value = if let Some(LatestEventValue::LocalCannotBeSent(_)) =
566                                    buffer_of_values_for_local_events.last()
567                                {
568                                    LatestEventValue::LocalCannotBeSent(local_value)
569                                } else {
570                                    LatestEventValue::LocalIsSending(local_value)
571                                };
572
573                                buffer_of_values_for_local_events
574                                    .push(transaction_id.to_owned(), value.clone());
575
576                                value
577                            } else {
578                                LatestEventValue::None
579                            }
580                        }
581
582                        Err(error) => {
583                            error!(
584                                ?error,
585                                "Failed to deserialize an event from `RoomSendQueueUpdate::NewLocalEvent`"
586                            );
587
588                            LatestEventValue::None
589                        }
590                    }
591                }
592
593                LocalEchoContent::React { .. } => LatestEventValue::None,
594            },
595
596            // A local event has been cancelled before being sent.
597            //
598            // Remove the calculated `LatestEventValue` from the buffer of values, and return the
599            // last `LatestEventValue` or calculate a new one.
600            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
601                if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
602                    buffer_of_values_for_local_events.remove(position);
603                }
604
605                Self::new_local_or_remote(
606                    buffer_of_values_for_local_events,
607                    room_event_cache,
608                    own_user_id,
609                    power_levels,
610                )
611                .await
612            }
613
614            // A local event has successfully been sent!
615            //
616            // Mark all “cannot be sent” values as “is sending” after the one matching
617            // `transaction_id`. Indeed, if an event has been sent, it means the send queue is
618            // working, so if any value has been marked as “cannot be sent”, it must be marked as
619            // “is sending”. Then, remove the calculated `LatestEventValue` from the buffer of
620            // values. Finally, return the last `LatestEventValue` or calculate a new
621            // one.
622            RoomSendQueueUpdate::SentEvent { transaction_id, .. } => {
623                let position =
624                    buffer_of_values_for_local_events.mark_is_sending_after(transaction_id);
625
626                // First, compute the new value. Then we remove the sent local event from the
627                // buffer.
628                //
629                // Why in this order? Because in between sending and remote echoing, the event
630                // will only be stored as a local echo and not as a full event in the event
631                // cache. Just after sending, it won't show up as a `LatestEventValue` as it
632                // will immediately be replaced by an event from the event cache. By computing
633                // the new value before removing it from the buffer, we ensure the
634                // `LatestEventValue` represents the just sent local event.
635                //
636                // Note: the next sync may not include the just sent local event. This is a race
637                // condition we are aware of, see https://github.com/matrix-org/matrix-rust-sdk/issues/3941.
638                let value = Self::new_local_or_remote(
639                    buffer_of_values_for_local_events,
640                    room_event_cache,
641                    own_user_id,
642                    power_levels,
643                )
644                .await;
645
646                if let Some(position) = position {
647                    buffer_of_values_for_local_events.remove(position);
648                }
649
650                value
651            }
652
653            // A local event has been replaced by another one.
654            //
655            // Replace the latest event value matching `transaction_id` in the buffer if it exists
656            // (note: it should!), and return the last `LatestEventValue` or calculate a new one.
657            RoomSendQueueUpdate::ReplacedLocalEvent {
658                transaction_id,
659                new_content: new_serialized_event_content,
660            } => {
661                if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
662                    match new_serialized_event_content.deserialize() {
663                        Ok(content) => {
664                            if filter_any_message_like_event_content(content, None) {
665                                buffer_of_values_for_local_events.replace_content(
666                                    position,
667                                    new_serialized_event_content.clone(),
668                                );
669                            } else {
670                                buffer_of_values_for_local_events.remove(position);
671                            }
672                        }
673
674                        Err(error) => {
675                            error!(
676                                ?error,
677                                "Failed to deserialize an event from `RoomSendQueueUpdate::ReplacedLocalEvent`"
678                            );
679
680                            return LatestEventValue::None;
681                        }
682                    }
683                }
684
685                Self::new_local_or_remote(
686                    buffer_of_values_for_local_events,
687                    room_event_cache,
688                    own_user_id,
689                    power_levels,
690                )
691                .await
692            }
693
694            // An error has occurred.
695            //
696            // Mark the latest event value matching `transaction_id`, and all its following values,
697            // as “cannot be sent”.
698            RoomSendQueueUpdate::SendError { transaction_id, .. } => {
699                buffer_of_values_for_local_events.mark_cannot_be_sent_from(transaction_id);
700
701                Self::new_local_or_remote(
702                    buffer_of_values_for_local_events,
703                    room_event_cache,
704                    own_user_id,
705                    power_levels,
706                )
707                .await
708            }
709
710            // A local event has been unwedged and sending is being retried.
711            //
712            // Mark the latest event value matching `transaction_id`, and all its following values,
713            // as “is sending”.
714            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
715                buffer_of_values_for_local_events.mark_is_sending_from(transaction_id);
716
717                Self::new_local_or_remote(
718                    buffer_of_values_for_local_events,
719                    room_event_cache,
720                    own_user_id,
721                    power_levels,
722                )
723                .await
724            }
725
726            // A media upload has made progress.
727            //
728            // Nothing to do here.
729            RoomSendQueueUpdate::MediaUpload { .. } => LatestEventValue::None,
730        }
731    }
732
733    /// Get the last [`LatestEventValue`] from the local latest event values if
734    /// any, or create a new [`LatestEventValue`] from the remote events.
735    ///
736    /// If the buffer of latest event values is not empty, let's return the last
737    /// one. Otherwise, it means we no longer have any local event: let's
738    /// fallback on remote event!
739    async fn new_local_or_remote(
740        buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
741        room_event_cache: &RoomEventCache,
742        own_user_id: Option<&UserId>,
743        power_levels: Option<&RoomPowerLevels>,
744    ) -> LatestEventValue {
745        if let Some(value) = buffer_of_values_for_local_events.last() {
746            value.clone()
747        } else {
748            Self::new_remote_with_power_levels(room_event_cache, own_user_id, power_levels).await
749        }
750    }
751}
752
753/// A buffer of the current [`LatestEventValue`] computed for local events
754/// seen by the send queue. It is used by
755/// [`LatestEvent::buffer_of_values_for_local_events`].
756///
757/// The system does only receive [`RoomSendQueueUpdate`]s. It's not designed to
758/// iterate over local events in the send queue when a local event is changed
759/// (cancelled, or updated for example). That's why we keep our own buffer here.
760/// Imagine the system receives 4 [`RoomSendQueueUpdate`]:
761///
762/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
763/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
764/// 3. [`RoomSendQueueUpdate::ReplacedLocalEvent`]: replaced the first local
765///    event,
766/// 4. [`RoomSendQueueUpdate::CancelledLocalEvent`]: cancelled the second local
767///    event.
768///
769/// `NewLocalEvent`s will trigger the computation of new
770/// `LatestEventValue`s, but `CancelledLocalEvent` for example doesn't hold
771/// any information to compute a new `LatestEventValue`, so we need to
772/// remember the previous values, until the local events are sent and
773/// removed from this buffer.
774///
775/// Another reason why we need a buffer is to handle wedged local event. Imagine
776/// the system receives 3 [`RoomSendQueueUpdate`]:
777///
778/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
779/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
780/// 3. [`RoomSendQueueUpdate::SendError`]: the first local event has failed to
781///    be sent.
782///
783/// Because a `SendError` is received (targeting the first `NewLocalEvent`), the
784/// send queue is stopped. However, the `LatestEventValue` targets the second
785/// `NewLocalEvent`. The system must consider that when a local event is wedged,
786/// all the following local events must also be marked as “cannot be sent”. And
787/// vice versa, when the send queue is able to send an event again, all the
788/// following local events must be marked as “is sending”.
789///
790/// This type isolates a couple of methods designed to manage these specific
791/// behaviours.
792#[derive(Debug)]
793struct LatestEventValuesForLocalEvents {
794    buffer: Vec<(OwnedTransactionId, LatestEventValue)>,
795}
796
797impl LatestEventValuesForLocalEvents {
798    /// Create a new [`LatestEventValuesForLocalEvents`].
799    fn new() -> Self {
800        Self { buffer: Vec::with_capacity(2) }
801    }
802
803    /// Check the buffer is empty.
804    fn is_empty(&self) -> bool {
805        self.buffer.is_empty()
806    }
807
808    /// Get the last [`LatestEventValue`].
809    fn last(&self) -> Option<&LatestEventValue> {
810        self.buffer.last().map(|(_, value)| value)
811    }
812
813    /// Find the position of the [`LatestEventValue`] matching `transaction_id`.
814    fn position(&self, transaction_id: &TransactionId) -> Option<usize> {
815        self.buffer
816            .iter()
817            .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
818    }
819
820    /// Push a new [`LatestEventValue`].
821    ///
822    /// # Panics
823    ///
824    /// Panics if `value` is not of kind [`LatestEventValue::LocalIsSending`] or
825    /// [`LatestEventValue::LocalCannotBeSent`].
826    fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) {
827        assert!(
828            matches!(
829                value,
830                LatestEventValue::LocalIsSending(_) | LatestEventValue::LocalCannotBeSent(_)
831            ),
832            "`value` must be either `LocalIsSending` or `LocalCannotBeSent`"
833        );
834
835        self.buffer.push((transaction_id, value));
836    }
837
838    /// Replace the content of the [`LatestEventValue`] at position `position`.
839    ///
840    /// # Panics
841    ///
842    /// Panics if:
843    /// - `position` is strictly greater than buffer's length,
844    /// - the [`LatestEventValue`] is not of kind
845    ///   [`LatestEventValue::LocalIsSending`] or
846    ///   [`LatestEventValue::LocalCannotBeSent`].
847    fn replace_content(&mut self, position: usize, new_content: SerializableEventContent) {
848        let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid");
849
850        match value {
851            LatestEventValue::LocalIsSending(LocalLatestEventValue { content, .. }) => {
852                *content = new_content;
853            }
854
855            LatestEventValue::LocalCannotBeSent(LocalLatestEventValue { content, .. }) => {
856                *content = new_content;
857            }
858
859            _ => panic!("`value` must be either `LocalIsSending` or `LocalCannotBeSent`"),
860        }
861    }
862
863    /// Remove the [`LatestEventValue`] at position `position`.
864    ///
865    /// # Panics
866    ///
867    /// Panics if `position` is strictly greater than buffer's length.
868    fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) {
869        self.buffer.remove(position)
870    }
871
872    /// Mark the `LatestEventValue` matching `transaction_id`, and all the
873    /// following values, as “cannot be sent”.
874    fn mark_cannot_be_sent_from(&mut self, transaction_id: &TransactionId) {
875        let mut values = self.buffer.iter_mut();
876
877        if let Some(first_value_to_wedge) = values
878            .by_ref()
879            .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
880        {
881            // Iterate over the found value and the following ones.
882            for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) {
883                if let LatestEventValue::LocalIsSending(content) = value_to_wedge {
884                    *value_to_wedge = LatestEventValue::LocalCannotBeSent(content.clone());
885                }
886            }
887        }
888    }
889
890    /// Mark the `LatestEventValue` matching `transaction_id`, and all the
891    /// following values, as “is sending”.
892    fn mark_is_sending_from(&mut self, transaction_id: &TransactionId) {
893        let mut values = self.buffer.iter_mut();
894
895        if let Some(first_value_to_unwedge) = values
896            .by_ref()
897            .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
898        {
899            // Iterate over the found value and the following ones.
900            for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) {
901                if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
902                    *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
903                }
904            }
905        }
906    }
907
908    /// Mark all the following values after the `LatestEventValue` matching
909    /// `transaction_id` as “is sending”.
910    ///
911    /// Note that contrary to [`Self::mark_is_sending_from`], the
912    /// `LatestEventValue` is untouched. However, its position is returned
913    /// (if any).
914    fn mark_is_sending_after(&mut self, transaction_id: &TransactionId) -> Option<usize> {
915        let mut values = self.buffer.iter_mut();
916
917        if let Some(position) = values
918            .by_ref()
919            .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
920        {
921            // Iterate over all values after the found one.
922            for (_, value_to_unwedge) in values {
923                if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
924                    *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
925                }
926            }
927
928            Some(position)
929        } else {
930            None
931        }
932    }
933}
934
935fn filter_timeline_event(
936    event: &TimelineEvent,
937    previous_event_id: Option<OwnedEventId>,
938    own_user_id: Option<&UserId>,
939    power_levels: Option<&RoomPowerLevels>,
940) -> bool {
941    // Cast the event into an `AnySyncTimelineEvent`. If deserializing fails, we
942    // ignore the event.
943    let event = match event.raw().deserialize() {
944        Ok(event) => event,
945        Err(error) => {
946            error!(
947                ?error,
948                "Failed to deserialize the event when looking for a suitable latest event"
949            );
950
951            return false;
952        }
953    };
954
955    match event {
956        AnySyncTimelineEvent::MessageLike(message_like_event) => {
957            match message_like_event.original_content() {
958                Some(any_message_like_event_content) => filter_any_message_like_event_content(
959                    any_message_like_event_content,
960                    previous_event_id,
961                ),
962
963                // The event has been redacted.
964                None => true,
965            }
966        }
967
968        AnySyncTimelineEvent::State(state) => {
969            filter_any_sync_state_event(state, own_user_id, power_levels)
970        }
971    }
972}
973
974fn filter_any_message_like_event_content(
975    event: AnyMessageLikeEventContent,
976    previous_event_id: Option<OwnedEventId>,
977) -> bool {
978    match event {
979        AnyMessageLikeEventContent::RoomMessage(message) => {
980            // Don't show incoming verification requests.
981            if let MessageType::VerificationRequest(_) = message.msgtype {
982                return false;
983            }
984
985            // Not all relations are accepted. Let's filter them.
986            match &message.relates_to {
987                Some(Relation::Replacement(Replacement { event_id, .. })) => {
988                    // If the edit relates to the immediate previous event, this is an acceptable
989                    // latest event, otherwise let's ignore it.
990                    Some(event_id) == previous_event_id.as_ref()
991                }
992
993                _ => true,
994            }
995        }
996
997        AnyMessageLikeEventContent::UnstablePollStart(_)
998        | AnyMessageLikeEventContent::CallInvite(_)
999        | AnyMessageLikeEventContent::RtcNotification(_)
1000        | AnyMessageLikeEventContent::Sticker(_) => true,
1001
1002        // Encrypted events are not suitable.
1003        AnyMessageLikeEventContent::RoomEncrypted(_) => false,
1004
1005        // Everything else is considered not suitable.
1006        _ => false,
1007    }
1008}
1009
1010fn filter_any_sync_state_event(
1011    event: AnySyncStateEvent,
1012    own_user_id: Option<&UserId>,
1013    power_levels: Option<&RoomPowerLevels>,
1014) -> bool {
1015    match event {
1016        AnySyncStateEvent::RoomMember(member) => {
1017            match member.membership() {
1018                MembershipState::Knock => {
1019                    let can_accept_or_decline_knocks = match (own_user_id, power_levels) {
1020                        (Some(own_user_id), Some(room_power_levels)) => {
1021                            room_power_levels.user_can_invite(own_user_id)
1022                                || room_power_levels
1023                                    .user_can_kick_user(own_user_id, member.state_key())
1024                        }
1025                        _ => false,
1026                    };
1027
1028                    // The current user can act on the knock changes, so they should be
1029                    // displayed
1030                    if can_accept_or_decline_knocks {
1031                        // We can only decide whether the user can accept or decline knocks if the
1032                        // event isn't redacted.
1033                        return matches!(member, SyncStateEvent::Original(_));
1034                    }
1035
1036                    false
1037                }
1038
1039                MembershipState::Invite => {
1040                    // The current _is_ invited (not someone else).
1041                    match member {
1042                        // We can only decide whether the user is invited if the event isn't
1043                        // redacted.
1044                        SyncStateEvent::Original(state) => {
1045                            Some(state.state_key.deref()) == own_user_id
1046                        }
1047
1048                        _ => false,
1049                    }
1050                }
1051
1052                _ => false,
1053            }
1054        }
1055
1056        _ => false,
1057    }
1058}
1059
1060#[cfg(test)]
1061mod tests_latest_event_content {
1062    use std::ops::Not;
1063
1064    use matrix_sdk_test::event_factory::EventFactory;
1065    use ruma::{
1066        event_id,
1067        events::{room::message::RoomMessageEventContent, rtc::notification::NotificationType},
1068        owned_user_id, user_id,
1069    };
1070
1071    use super::filter_timeline_event;
1072
1073    macro_rules! assert_latest_event_content {
1074        ( event | $event_factory:ident | $event_builder:block
1075          is a candidate ) => {
1076            assert_latest_event_content!(@_ | $event_factory | $event_builder, true);
1077        };
1078
1079        ( event | $event_factory:ident | $event_builder:block
1080          is not a candidate ) => {
1081            assert_latest_event_content!(@_ | $event_factory | $event_builder, false);
1082        };
1083
1084        ( @_ | $event_factory:ident | $event_builder:block, $expect:literal ) => {
1085            let user_id = user_id!("@mnt_io:matrix.org");
1086            let event_factory = EventFactory::new().sender(user_id);
1087            let event = {
1088                let $event_factory = event_factory;
1089                $event_builder
1090            };
1091
1092            assert_eq!(filter_timeline_event(&event, None, Some(user_id!("@mnt_io:matrix.org")), None), $expect );
1093        };
1094    }
1095
1096    #[test]
1097    fn test_room_message() {
1098        assert_latest_event_content!(
1099            event | event_factory | { event_factory.text_msg("hello").into_event() }
1100            is a candidate
1101        );
1102    }
1103
1104    #[test]
1105    fn test_redacted() {
1106        assert_latest_event_content!(
1107            event | event_factory | {
1108                event_factory
1109                    .redacted(
1110                        user_id!("@mnt_io:matrix.org"),
1111                        ruma::events::room::message::RedactedRoomMessageEventContent::new(),
1112                    )
1113                    .into_event()
1114            }
1115            is a candidate
1116        );
1117    }
1118
1119    #[test]
1120    fn test_room_message_replacement() {
1121        let user_id = user_id!("@mnt_io:matrix.org");
1122        let event_factory = EventFactory::new().sender(user_id);
1123        let event = event_factory
1124            .text_msg("bonjour")
1125            .edit(event_id!("$ev0"), RoomMessageEventContent::text_plain("hello").into())
1126            .into_event();
1127
1128        // Without a previous event.
1129        //
1130        // This is an edge case where either the event cache has been emptied and only
1131        // the edit is received via the sync for example, or either the previous event
1132        // is part of another chunk that is not loaded in memory yet. In this case,
1133        // let's not consider the event as a `LatestEventValue` candidate.
1134        {
1135            let previous_event_id = None;
1136
1137            assert!(
1138                filter_timeline_event(
1139                    &event,
1140                    previous_event_id,
1141                    Some(user_id!("@mnt_io:matrix.org")),
1142                    None
1143                )
1144                .not()
1145            );
1146        }
1147
1148        // With a previous event, but the one being replaced.
1149        {
1150            let previous_event_id = Some(event_id!("$ev1").to_owned());
1151
1152            assert!(
1153                filter_timeline_event(
1154                    &event,
1155                    previous_event_id,
1156                    Some(user_id!("@mnt_io:matrix.org")),
1157                    None
1158                )
1159                .not()
1160            );
1161        }
1162
1163        // With a previous event, and that's the one being replaced!
1164        {
1165            let previous_event_id = Some(event_id!("$ev0").to_owned());
1166
1167            assert!(filter_timeline_event(
1168                &event,
1169                previous_event_id,
1170                Some(user_id!("@mnt_io:matrix.org")),
1171                None
1172            ));
1173        }
1174    }
1175
1176    #[test]
1177    fn test_poll() {
1178        assert_latest_event_content!(
1179            event | event_factory | {
1180                event_factory
1181                    .poll_start("the people need to know", "comté > gruyère", vec!["yes", "oui"])
1182                    .into_event()
1183            }
1184            is a candidate
1185        );
1186    }
1187
1188    #[test]
1189    fn test_call_invite() {
1190        assert_latest_event_content!(
1191            event | event_factory | {
1192                event_factory
1193                    .call_invite(
1194                        ruma::OwnedVoipId::from("vvooiipp".to_owned()),
1195                        ruma::UInt::from(1234u32),
1196                        ruma::events::call::SessionDescription::new(
1197                            "type".to_owned(),
1198                            "sdp".to_owned(),
1199                        ),
1200                        ruma::VoipVersionId::V1,
1201                    )
1202                    .into_event()
1203            }
1204            is a candidate
1205        );
1206    }
1207
1208    #[test]
1209    fn test_rtc_notification() {
1210        assert_latest_event_content!(
1211            event | event_factory | {
1212                event_factory
1213                     .rtc_notification(
1214                        NotificationType::Ring,
1215                    )
1216                    .mentions(vec![owned_user_id!("@alice:server.name")])
1217                    .relates_to_membership_state_event(ruma::OwnedEventId::try_from("$abc:server.name").unwrap())
1218                    .lifetime(60)
1219                    .into_event()
1220            }
1221            is a candidate
1222        );
1223    }
1224
1225    #[test]
1226    fn test_sticker() {
1227        assert_latest_event_content!(
1228            event | event_factory | {
1229                event_factory
1230                    .sticker(
1231                        "wink wink",
1232                        ruma::events::room::ImageInfo::new(),
1233                        ruma::OwnedMxcUri::from("mxc://foo/bar"),
1234                    )
1235                    .into_event()
1236            }
1237            is a candidate
1238        );
1239    }
1240
1241    #[test]
1242    fn test_encrypted_room_message() {
1243        assert_latest_event_content!(
1244            event | event_factory | {
1245                event_factory
1246                    .event(ruma::events::room::encrypted::RoomEncryptedEventContent::new(
1247                        ruma::events::room::encrypted::EncryptedEventScheme::MegolmV1AesSha2(
1248                            ruma::events::room::encrypted::MegolmV1AesSha2ContentInit {
1249                                ciphertext: "cipher".to_owned(),
1250                                sender_key: "sender_key".to_owned(),
1251                                device_id: "device_id".into(),
1252                                session_id: "session_id".to_owned(),
1253                            }
1254                            .into(),
1255                        ),
1256                        None,
1257                    ))
1258                    .into_event()
1259            }
1260            is not a candidate
1261        );
1262    }
1263
1264    #[test]
1265    fn test_reaction() {
1266        // Take a random message-like event.
1267        assert_latest_event_content!(
1268            event | event_factory | { event_factory.reaction(event_id!("$ev0"), "+1").into_event() }
1269            is not a candidate
1270        );
1271    }
1272
1273    #[test]
1274    fn test_state_event() {
1275        assert_latest_event_content!(
1276            event | event_factory | { event_factory.room_topic("new room topic").into_event() }
1277            is not a candidate
1278        );
1279    }
1280
1281    #[test]
1282    fn test_knocked_state_event_without_power_levels() {
1283        assert_latest_event_content!(
1284            event | event_factory | {
1285                event_factory
1286                    .member(user_id!("@other_mnt_io:server.name"))
1287                    .membership(ruma::events::room::member::MembershipState::Knock)
1288                    .into_event()
1289            }
1290            is not a candidate
1291        );
1292    }
1293
1294    #[test]
1295    fn test_knocked_state_event_with_power_levels() {
1296        use ruma::{
1297            events::room::{
1298                member::MembershipState,
1299                power_levels::{RoomPowerLevels, RoomPowerLevelsSource},
1300            },
1301            room_version_rules::AuthorizationRules,
1302        };
1303
1304        let user_id = user_id!("@mnt_io:matrix.org");
1305        let other_user_id = user_id!("@other_mnt_io:server.name");
1306        let event_factory = EventFactory::new().sender(user_id);
1307        let event =
1308            event_factory.member(other_user_id).membership(MembershipState::Knock).into_event();
1309
1310        let mut room_power_levels =
1311            RoomPowerLevels::new(RoomPowerLevelsSource::None, &AuthorizationRules::V1, []);
1312        room_power_levels.users.insert(user_id.to_owned(), 5.into());
1313        room_power_levels.users.insert(other_user_id.to_owned(), 4.into());
1314
1315        // Cannot accept. Cannot decline.
1316        {
1317            room_power_levels.invite = 10.into();
1318            room_power_levels.kick = 10.into();
1319            assert!(
1320                filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)).not(),
1321                "cannot accept, cannot decline",
1322            );
1323        }
1324
1325        // Can accept. Cannot decline.
1326        {
1327            room_power_levels.invite = 0.into();
1328            room_power_levels.kick = 10.into();
1329            assert!(
1330                filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
1331                "can accept, cannot decline",
1332            );
1333        }
1334
1335        // Cannot accept. Can decline.
1336        {
1337            room_power_levels.invite = 10.into();
1338            room_power_levels.kick = 0.into();
1339            assert!(
1340                filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
1341                "cannot accept, can decline",
1342            );
1343        }
1344
1345        // Can accept. Can decline.
1346        {
1347            room_power_levels.invite = 0.into();
1348            room_power_levels.kick = 0.into();
1349            assert!(
1350                filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
1351                "can accept, can decline",
1352            );
1353        }
1354
1355        // Cannot accept. Can decline. But with an other user ID with at least the same
1356        // levels, i.e. the current user cannot kick another user with the same
1357        // or higher levels.
1358        {
1359            room_power_levels.users.insert(user_id.to_owned(), 5.into());
1360            room_power_levels.users.insert(other_user_id.to_owned(), 5.into());
1361
1362            room_power_levels.invite = 10.into();
1363            room_power_levels.kick = 0.into();
1364
1365            assert!(
1366                filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)).not(),
1367                "cannot accept, can decline, at least same user levels",
1368            );
1369        }
1370    }
1371
1372    #[test]
1373    fn test_invite_state_event() {
1374        use ruma::events::room::member::MembershipState;
1375
1376        // The current user is receiving an invite.
1377        assert_latest_event_content!(
1378            event | event_factory | {
1379                event_factory
1380                    .member(user_id!("@mnt_io:matrix.org"))
1381                    .membership(MembershipState::Invite)
1382                    .into_event()
1383            }
1384            is a candidate
1385        );
1386    }
1387
1388    #[test]
1389    fn test_invite_state_event_for_someone_else() {
1390        use ruma::events::room::member::MembershipState;
1391
1392        // The current user sees an invite but for someone else.
1393        assert_latest_event_content!(
1394            event | event_factory | {
1395                event_factory
1396                    .member(user_id!("@other_mnt_io:server.name"))
1397                    .membership(MembershipState::Invite)
1398                    .into_event()
1399            }
1400            is not a candidate
1401        );
1402    }
1403
1404    #[test]
1405    fn test_room_message_verification_request() {
1406        use ruma::{OwnedDeviceId, events::room::message};
1407
1408        assert_latest_event_content!(
1409            event | event_factory | {
1410                event_factory
1411                    .event(RoomMessageEventContent::new(message::MessageType::VerificationRequest(
1412                        message::KeyVerificationRequestEventContent::new(
1413                            "body".to_owned(),
1414                            vec![],
1415                            OwnedDeviceId::from("device_id"),
1416                            user_id!("@user:server.name").to_owned(),
1417                        ),
1418                    )))
1419                    .into_event()
1420            }
1421            is not a candidate
1422        );
1423    }
1424}
1425
1426#[cfg(test)]
1427mod tests_latest_event_values_for_local_events {
1428    use assert_matches::assert_matches;
1429    use ruma::{
1430        MilliSecondsSinceUnixEpoch, OwnedTransactionId,
1431        events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
1432        serde::Raw,
1433    };
1434    use serde_json::json;
1435
1436    use super::{
1437        LatestEventValue, LatestEventValuesForLocalEvents, LocalLatestEventValue,
1438        RemoteLatestEventValue, SerializableEventContent,
1439    };
1440
1441    fn remote_room_message(body: &str) -> RemoteLatestEventValue {
1442        RemoteLatestEventValue::from_plaintext(
1443            Raw::from_json_string(
1444                json!({
1445                    "content": RoomMessageEventContent::text_plain(body),
1446                    "type": "m.room.message",
1447                    "event_id": "$ev0",
1448                    "origin_server_ts": 42,
1449                    "sender": "@mnt_io:matrix.org",
1450                })
1451                .to_string(),
1452            )
1453            .unwrap(),
1454        )
1455    }
1456
1457    fn local_room_message(body: &str) -> LocalLatestEventValue {
1458        LocalLatestEventValue {
1459            timestamp: MilliSecondsSinceUnixEpoch::now(),
1460            content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
1461                RoomMessageEventContent::text_plain(body),
1462            ))
1463            .unwrap(),
1464        }
1465    }
1466
1467    #[test]
1468    fn test_last() {
1469        let mut buffer = LatestEventValuesForLocalEvents::new();
1470
1471        assert!(buffer.last().is_none());
1472
1473        buffer.push(
1474            OwnedTransactionId::from("txnid"),
1475            LatestEventValue::LocalIsSending(local_room_message("tome")),
1476        );
1477
1478        assert_matches!(buffer.last(), Some(LatestEventValue::LocalIsSending(_)));
1479    }
1480
1481    #[test]
1482    fn test_position() {
1483        let mut buffer = LatestEventValuesForLocalEvents::new();
1484        let transaction_id = OwnedTransactionId::from("txnid");
1485
1486        assert!(buffer.position(&transaction_id).is_none());
1487
1488        buffer.push(
1489            transaction_id.clone(),
1490            LatestEventValue::LocalIsSending(local_room_message("raclette")),
1491        );
1492        buffer.push(
1493            OwnedTransactionId::from("othertxnid"),
1494            LatestEventValue::LocalIsSending(local_room_message("tome")),
1495        );
1496
1497        assert_eq!(buffer.position(&transaction_id), Some(0));
1498    }
1499
1500    #[test]
1501    #[should_panic]
1502    fn test_push_none() {
1503        let mut buffer = LatestEventValuesForLocalEvents::new();
1504
1505        buffer.push(OwnedTransactionId::from("txnid"), LatestEventValue::None);
1506    }
1507
1508    #[test]
1509    #[should_panic]
1510    fn test_push_remote() {
1511        let mut buffer = LatestEventValuesForLocalEvents::new();
1512
1513        buffer.push(
1514            OwnedTransactionId::from("txnid"),
1515            LatestEventValue::Remote(remote_room_message("tome")),
1516        );
1517    }
1518
1519    #[test]
1520    fn test_push_local() {
1521        let mut buffer = LatestEventValuesForLocalEvents::new();
1522
1523        buffer.push(
1524            OwnedTransactionId::from("txnid0"),
1525            LatestEventValue::LocalIsSending(local_room_message("tome")),
1526        );
1527        buffer.push(
1528            OwnedTransactionId::from("txnid1"),
1529            LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1530        );
1531
1532        // no panic.
1533    }
1534
1535    #[test]
1536    fn test_replace_content() {
1537        let mut buffer = LatestEventValuesForLocalEvents::new();
1538
1539        buffer.push(
1540            OwnedTransactionId::from("txnid0"),
1541            LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1542        );
1543
1544        let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1545
1546        buffer.replace_content(0, new_content);
1547
1548        assert_matches!(
1549            buffer.last(),
1550            Some(LatestEventValue::LocalIsSending(local_event)) => {
1551                assert_matches!(
1552                    local_event.content.deserialize().unwrap(),
1553                    AnyMessageLikeEventContent::RoomMessage(content) => {
1554                        assert_eq!(content.body(), "comté");
1555                    }
1556                );
1557            }
1558        );
1559    }
1560
1561    #[test]
1562    fn test_remove() {
1563        let mut buffer = LatestEventValuesForLocalEvents::new();
1564
1565        buffer.push(
1566            OwnedTransactionId::from("txnid"),
1567            LatestEventValue::LocalIsSending(local_room_message("gryuère")),
1568        );
1569
1570        assert!(buffer.last().is_some());
1571
1572        buffer.remove(0);
1573
1574        assert!(buffer.last().is_none());
1575    }
1576
1577    #[test]
1578    fn test_mark_cannot_be_sent_from() {
1579        let mut buffer = LatestEventValuesForLocalEvents::new();
1580        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1581        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1582        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1583
1584        buffer.push(
1585            transaction_id_0,
1586            LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1587        );
1588        buffer.push(
1589            transaction_id_1.clone(),
1590            LatestEventValue::LocalIsSending(local_room_message("brigand")),
1591        );
1592        buffer.push(
1593            transaction_id_2,
1594            LatestEventValue::LocalIsSending(local_room_message("raclette")),
1595        );
1596
1597        buffer.mark_cannot_be_sent_from(&transaction_id_1);
1598
1599        assert_eq!(buffer.buffer.len(), 3);
1600        assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
1601        assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1602        assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalCannotBeSent(_));
1603    }
1604
1605    #[test]
1606    fn test_mark_is_sending_from() {
1607        let mut buffer = LatestEventValuesForLocalEvents::new();
1608        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1609        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1610        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1611
1612        buffer.push(
1613            transaction_id_0,
1614            LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1615        );
1616        buffer.push(
1617            transaction_id_1.clone(),
1618            LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1619        );
1620        buffer.push(
1621            transaction_id_2,
1622            LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1623        );
1624
1625        buffer.mark_is_sending_from(&transaction_id_1);
1626
1627        assert_eq!(buffer.buffer.len(), 3);
1628        assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1629        assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
1630        assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1631    }
1632
1633    #[test]
1634    fn test_mark_is_sending_after() {
1635        let mut buffer = LatestEventValuesForLocalEvents::new();
1636        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1637        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1638        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1639
1640        buffer.push(
1641            transaction_id_0,
1642            LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1643        );
1644        buffer.push(
1645            transaction_id_1.clone(),
1646            LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1647        );
1648        buffer.push(
1649            transaction_id_2,
1650            LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1651        );
1652
1653        buffer.mark_is_sending_after(&transaction_id_1);
1654
1655        assert_eq!(buffer.buffer.len(), 3);
1656        assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1657        assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1658        assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1659    }
1660}
1661
1662#[cfg(all(not(target_family = "wasm"), test))]
1663mod tests_latest_event_value_builder {
1664    use std::sync::Arc;
1665
1666    use assert_matches::assert_matches;
1667    use matrix_sdk_base::{
1668        RoomState,
1669        deserialized_responses::TimelineEventKind,
1670        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1671        store::SerializableEventContent,
1672    };
1673    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1674    use ruma::{
1675        MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedTransactionId, event_id,
1676        events::{
1677            AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
1678            SyncMessageLikeEvent, reaction::ReactionEventContent, relation::Annotation,
1679            room::message::RoomMessageEventContent,
1680        },
1681        room_id, user_id,
1682    };
1683
1684    use super::{
1685        LatestEventValue, LatestEventValueBuilder, LatestEventValuesForLocalEvents,
1686        RemoteLatestEventValue, RoomEventCache, RoomSendQueueUpdate,
1687    };
1688    use crate::{
1689        Client, Error,
1690        client::WeakClient,
1691        room::WeakRoom,
1692        send_queue::{AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle},
1693        test_utils::mocks::MatrixMockServer,
1694    };
1695
1696    macro_rules! assert_remote_value_matches_room_message_with_body {
1697        ( $latest_event_value:expr => with body = $body:expr ) => {
1698            assert_matches!(
1699                $latest_event_value,
1700                LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => {
1701                    assert_matches!(
1702                        event.deserialize().unwrap(),
1703                        AnySyncTimelineEvent::MessageLike(
1704                            AnySyncMessageLikeEvent::RoomMessage(
1705                                SyncMessageLikeEvent::Original(message_content)
1706                            )
1707                        ) => {
1708                            assert_eq!(message_content.content.body(), $body);
1709                        }
1710                    );
1711                }
1712            );
1713        };
1714    }
1715
1716    macro_rules! assert_local_value_matches_room_message_with_body {
1717        ( $latest_event_value:expr, $pattern:path => with body = $body:expr ) => {
1718            assert_matches!(
1719                $latest_event_value,
1720                $pattern (local_event) => {
1721                    assert_matches!(
1722                        local_event.content.deserialize().unwrap(),
1723                        AnyMessageLikeEventContent::RoomMessage(message_content) => {
1724                            assert_eq!(message_content.body(), $body);
1725                        }
1726                    );
1727                }
1728            );
1729        };
1730    }
1731
1732    #[async_test]
1733    async fn test_remote_is_scanning_event_backwards_from_event_cache() {
1734        let room_id = room_id!("!r0");
1735        let user_id = user_id!("@mnt_io:matrix.org");
1736        let event_factory = EventFactory::new().sender(user_id).room(room_id);
1737        let event_id_0 = event_id!("$ev0");
1738        let event_id_1 = event_id!("$ev1");
1739        let event_id_2 = event_id!("$ev2");
1740
1741        let server = MatrixMockServer::new().await;
1742        let client = server.client_builder().build().await;
1743
1744        // Prelude.
1745        {
1746            // Create the room.
1747            client.base_client().get_or_create_room(room_id, RoomState::Joined);
1748
1749            // Initialise the event cache store.
1750            client
1751                .event_cache_store()
1752                .lock()
1753                .await
1754                .expect("Could not acquire the event cache lock")
1755                .as_clean()
1756                .expect("Could not acquire a clean event cache lock")
1757                .handle_linked_chunk_updates(
1758                    LinkedChunkId::Room(room_id),
1759                    vec![
1760                        Update::NewItemsChunk {
1761                            previous: None,
1762                            new: ChunkIdentifier::new(0),
1763                            next: None,
1764                        },
1765                        Update::PushItems {
1766                            at: Position::new(ChunkIdentifier::new(0), 0),
1767                            items: vec![
1768                                // a latest event candidate
1769                                event_factory.text_msg("hello").event_id(event_id_0).into(),
1770                                // a latest event candidate
1771                                event_factory.text_msg("world").event_id(event_id_1).into(),
1772                                // not a latest event candidate
1773                                event_factory
1774                                    .room_topic("new room topic")
1775                                    .event_id(event_id_2)
1776                                    .into(),
1777                            ],
1778                        },
1779                    ],
1780                )
1781                .await
1782                .unwrap();
1783        }
1784
1785        let event_cache = client.event_cache();
1786        event_cache.subscribe().unwrap();
1787
1788        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1789        let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.to_owned());
1790
1791        assert_remote_value_matches_room_message_with_body!(
1792            // We get `event_id_1` because `event_id_2` isn't a candidate,
1793            // and `event_id_0` hasn't been read yet (because events are read
1794            // backwards).
1795            LatestEventValueBuilder::new_remote(&room_event_cache, &weak_room).await => with body = "world"
1796        );
1797    }
1798
1799    async fn local_prelude() -> (Client, OwnedRoomId, RoomSendQueue, RoomEventCache) {
1800        let room_id = room_id!("!r0").to_owned();
1801
1802        let server = MatrixMockServer::new().await;
1803        let client = server.client_builder().build().await;
1804        client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1805        let room = client.get_room(&room_id).unwrap();
1806
1807        let event_cache = client.event_cache();
1808        event_cache.subscribe().unwrap();
1809
1810        let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
1811
1812        let send_queue = client.send_queue();
1813        let room_send_queue = send_queue.for_room(room);
1814
1815        (client, room_id, room_send_queue, room_event_cache)
1816    }
1817
1818    fn new_local_echo_content(
1819        room_send_queue: &RoomSendQueue,
1820        transaction_id: &OwnedTransactionId,
1821        body: &str,
1822    ) -> LocalEchoContent {
1823        LocalEchoContent::Event {
1824            serialized_event: SerializableEventContent::new(
1825                &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
1826            )
1827            .unwrap(),
1828            send_handle: SendHandle::new(
1829                room_send_queue.clone(),
1830                transaction_id.clone(),
1831                MilliSecondsSinceUnixEpoch::now(),
1832            ),
1833            send_error: None,
1834        }
1835    }
1836
1837    #[async_test]
1838    async fn test_local_new_local_event() {
1839        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1840
1841        let mut buffer = LatestEventValuesForLocalEvents::new();
1842
1843        // Receiving one `NewLocalEvent`.
1844        {
1845            let transaction_id = OwnedTransactionId::from("txnid0");
1846            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
1847
1848            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1849
1850            // The `LatestEventValue` matches the new local event.
1851            assert_local_value_matches_room_message_with_body!(
1852                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1853                LatestEventValue::LocalIsSending => with body = "A"
1854            );
1855        }
1856
1857        // Receiving another `NewLocalEvent`, ensuring it's pushed back in the buffer.
1858        {
1859            let transaction_id = OwnedTransactionId::from("txnid1");
1860            let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
1861
1862            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1863
1864            // The `LatestEventValue` matches the new local event.
1865            assert_local_value_matches_room_message_with_body!(
1866                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1867                LatestEventValue::LocalIsSending => with body = "B"
1868            );
1869        }
1870
1871        assert_eq!(buffer.buffer.len(), 2);
1872    }
1873
1874    #[async_test]
1875    async fn test_local_new_local_event_when_previous_local_event_cannot_be_sent() {
1876        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1877
1878        let mut buffer = LatestEventValuesForLocalEvents::new();
1879
1880        // Receiving one `NewLocalEvent`.
1881        let transaction_id_0 = {
1882            let transaction_id = OwnedTransactionId::from("txnid0");
1883            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
1884
1885            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1886                transaction_id: transaction_id.clone(),
1887                content,
1888            });
1889
1890            // The `LatestEventValue` matches the new local event.
1891            assert_local_value_matches_room_message_with_body!(
1892                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1893                LatestEventValue::LocalIsSending => with body = "A"
1894            );
1895
1896            transaction_id
1897        };
1898
1899        // Receiving a `SendError` targeting the first event. The
1900        // `LatestEventValue` must change to indicate it “cannot be sent”.
1901        {
1902            let update = RoomSendQueueUpdate::SendError {
1903                transaction_id: transaction_id_0.clone(),
1904                error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
1905                is_recoverable: true,
1906            };
1907
1908            // The `LatestEventValue` has changed, it still matches the latest local
1909            // event but it's marked as “cannot be sent”.
1910            assert_local_value_matches_room_message_with_body!(
1911                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1912                LatestEventValue::LocalCannotBeSent => with body = "A"
1913            );
1914
1915            assert_eq!(buffer.buffer.len(), 1);
1916            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1917        }
1918
1919        // Receiving another `NewLocalEvent`, ensuring it's pushed back in the buffer,
1920        // and as a `LocalCannotBeSent` because the previous value is itself
1921        // `LocalCannotBeSent`.
1922        {
1923            let transaction_id = OwnedTransactionId::from("txnid1");
1924            let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
1925
1926            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1927
1928            // The `LatestEventValue` matches the new local event.
1929            assert_local_value_matches_room_message_with_body!(
1930                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1931                LatestEventValue::LocalCannotBeSent => with body = "B"
1932            );
1933        }
1934
1935        assert_eq!(buffer.buffer.len(), 2);
1936        assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1937        assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1938    }
1939
1940    #[async_test]
1941    async fn test_local_cancelled_local_event() {
1942        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1943
1944        let mut buffer = LatestEventValuesForLocalEvents::new();
1945        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1946        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1947        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1948
1949        // Receiving three `NewLocalEvent`s.
1950        {
1951            for (transaction_id, body) in
1952                [(&transaction_id_0, "A"), (&transaction_id_1, "B"), (&transaction_id_2, "C")]
1953            {
1954                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
1955
1956                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1957                    transaction_id: transaction_id.clone(),
1958                    content,
1959                });
1960
1961                // The `LatestEventValue` matches the new local event.
1962                assert_local_value_matches_room_message_with_body!(
1963                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1964                    LatestEventValue::LocalIsSending => with body = body
1965                );
1966            }
1967
1968            assert_eq!(buffer.buffer.len(), 3);
1969        }
1970
1971        // Receiving a `CancelledLocalEvent` targeting the second event. The
1972        // `LatestEventValue` must not change.
1973        {
1974            let update = RoomSendQueueUpdate::CancelledLocalEvent {
1975                transaction_id: transaction_id_1.clone(),
1976            };
1977
1978            // The `LatestEventValue` hasn't changed, it still matches the latest local
1979            // event.
1980            assert_local_value_matches_room_message_with_body!(
1981                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1982                LatestEventValue::LocalIsSending => with body = "C"
1983            );
1984
1985            assert_eq!(buffer.buffer.len(), 2);
1986        }
1987
1988        // Receiving a `CancelledLocalEvent` targeting the second (so the last) event.
1989        // The `LatestEventValue` must point to the first local event.
1990        {
1991            let update = RoomSendQueueUpdate::CancelledLocalEvent {
1992                transaction_id: transaction_id_2.clone(),
1993            };
1994
1995            // The `LatestEventValue` has changed, it matches the previous (so the first)
1996            // local event.
1997            assert_local_value_matches_room_message_with_body!(
1998                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1999                LatestEventValue::LocalIsSending => with body = "A"
2000            );
2001
2002            assert_eq!(buffer.buffer.len(), 1);
2003        }
2004
2005        // Receiving a `CancelledLocalEvent` targeting the first (so the last) event.
2006        // The `LatestEventValue` cannot be computed from the send queue and will
2007        // fallback to the event cache. The event cache is empty in this case, so we get
2008        // nothing.
2009        {
2010            let update =
2011                RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: transaction_id_0 };
2012
2013            // The `LatestEventValue` has changed, it's empty!
2014            assert_matches!(
2015                LatestEventValueBuilder::new_local(
2016                    &update,
2017                    &mut buffer,
2018                    &room_event_cache,
2019                    None,
2020                    None
2021                )
2022                .await,
2023                LatestEventValue::None
2024            );
2025
2026            assert!(buffer.buffer.is_empty());
2027        }
2028    }
2029
2030    #[async_test]
2031    async fn test_local_sent_event() {
2032        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2033
2034        let mut buffer = LatestEventValuesForLocalEvents::new();
2035        let transaction_id_0 = OwnedTransactionId::from("txnid0");
2036        let transaction_id_1 = OwnedTransactionId::from("txnid1");
2037
2038        // Receiving two `NewLocalEvent`s.
2039        {
2040            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2041                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2042
2043                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2044                    transaction_id: transaction_id.clone(),
2045                    content,
2046                });
2047
2048                // The `LatestEventValue` matches the new local event.
2049                assert_local_value_matches_room_message_with_body!(
2050                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2051                    LatestEventValue::LocalIsSending => with body = body
2052                );
2053            }
2054
2055            assert_eq!(buffer.buffer.len(), 2);
2056        }
2057
2058        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
2059        // must not change.
2060        {
2061            let update = RoomSendQueueUpdate::SentEvent {
2062                transaction_id: transaction_id_0.clone(),
2063                event_id: event_id!("$ev0").to_owned(),
2064            };
2065
2066            // The `LatestEventValue` hasn't changed, it still matches the latest local
2067            // event.
2068            assert_local_value_matches_room_message_with_body!(
2069                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2070                LatestEventValue::LocalIsSending => with body = "B"
2071            );
2072
2073            assert_eq!(buffer.buffer.len(), 1);
2074        }
2075
2076        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
2077        // hasn't changed, this is still this event.
2078        {
2079            let update = RoomSendQueueUpdate::SentEvent {
2080                transaction_id: transaction_id_1,
2081                event_id: event_id!("$ev1").to_owned(),
2082            };
2083
2084            // The `LatestEventValue` hasn't changed.
2085            assert_local_value_matches_room_message_with_body!(
2086                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2087                LatestEventValue::LocalIsSending => with body = "B"
2088            );
2089
2090            assert!(buffer.buffer.is_empty());
2091        }
2092    }
2093
2094    #[async_test]
2095    async fn test_local_replaced_local_event() {
2096        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2097
2098        let mut buffer = LatestEventValuesForLocalEvents::new();
2099        let transaction_id_0 = OwnedTransactionId::from("txnid0");
2100        let transaction_id_1 = OwnedTransactionId::from("txnid1");
2101
2102        // Receiving two `NewLocalEvent`s.
2103        {
2104            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2105                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2106
2107                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2108                    transaction_id: transaction_id.clone(),
2109                    content,
2110                });
2111
2112                // The `LatestEventValue` matches the new local event.
2113                assert_local_value_matches_room_message_with_body!(
2114                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2115                    LatestEventValue::LocalIsSending => with body = body
2116                );
2117            }
2118
2119            assert_eq!(buffer.buffer.len(), 2);
2120        }
2121
2122        // Receiving a `ReplacedLocalEvent` targeting the first event. The
2123        // `LatestEventValue` must not change.
2124        {
2125            let transaction_id = &transaction_id_0;
2126            let LocalEchoContent::Event { serialized_event: new_content, .. } =
2127                new_local_echo_content(&room_send_queue, transaction_id, "A.")
2128            else {
2129                panic!("oopsy");
2130            };
2131
2132            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
2133                transaction_id: transaction_id.clone(),
2134                new_content,
2135            };
2136
2137            // The `LatestEventValue` hasn't changed, it still matches the latest local
2138            // event.
2139            assert_local_value_matches_room_message_with_body!(
2140                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2141                LatestEventValue::LocalIsSending => with body = "B"
2142            );
2143
2144            assert_eq!(buffer.buffer.len(), 2);
2145        }
2146
2147        // Receiving a `ReplacedLocalEvent` targeting the second (so the last) event.
2148        // The `LatestEventValue` is changing.
2149        {
2150            let transaction_id = &transaction_id_1;
2151            let LocalEchoContent::Event { serialized_event: new_content, .. } =
2152                new_local_echo_content(&room_send_queue, transaction_id, "B.")
2153            else {
2154                panic!("oopsy");
2155            };
2156
2157            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
2158                transaction_id: transaction_id.clone(),
2159                new_content,
2160            };
2161
2162            // The `LatestEventValue` has changed, it still matches the latest local
2163            // event but with its new content.
2164            assert_local_value_matches_room_message_with_body!(
2165                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2166                LatestEventValue::LocalIsSending => with body = "B."
2167            );
2168
2169            assert_eq!(buffer.buffer.len(), 2);
2170        }
2171    }
2172
2173    #[async_test]
2174    async fn test_local_replaced_local_event_by_a_non_suitable_event() {
2175        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2176
2177        let mut buffer = LatestEventValuesForLocalEvents::new();
2178        let transaction_id = OwnedTransactionId::from("txnid0");
2179
2180        // Receiving one `NewLocalEvent`.
2181        {
2182            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2183
2184            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2185                transaction_id: transaction_id.clone(),
2186                content,
2187            });
2188
2189            // The `LatestEventValue` matches the new local event.
2190            assert_local_value_matches_room_message_with_body!(
2191                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2192                LatestEventValue::LocalIsSending => with body = "A"
2193            );
2194
2195            assert_eq!(buffer.buffer.len(), 1);
2196        }
2197
2198        // Receiving a `ReplacedLocalEvent` targeting the first event. Sadly, the new
2199        // event cannot be mapped to a `LatestEventValue`! The first event is removed
2200        // from the buffer, and the `LatestEventValue` becomes `None` because there is
2201        // no other alternative.
2202        {
2203            let new_content = SerializableEventContent::new(&AnyMessageLikeEventContent::Reaction(
2204                ReactionEventContent::new(Annotation::new(
2205                    event_id!("$ev0").to_owned(),
2206                    "+1".to_owned(),
2207                )),
2208            ))
2209            .unwrap();
2210
2211            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
2212                transaction_id: transaction_id.clone(),
2213                new_content,
2214            };
2215
2216            // The `LatestEventValue` has changed!
2217            assert_matches!(
2218                LatestEventValueBuilder::new_local(
2219                    &update,
2220                    &mut buffer,
2221                    &room_event_cache,
2222                    None,
2223                    None
2224                )
2225                .await,
2226                LatestEventValue::None
2227            );
2228
2229            assert_eq!(buffer.buffer.len(), 0);
2230        }
2231    }
2232
2233    #[async_test]
2234    async fn test_local_send_error() {
2235        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2236
2237        let mut buffer = LatestEventValuesForLocalEvents::new();
2238        let transaction_id_0 = OwnedTransactionId::from("txnid0");
2239        let transaction_id_1 = OwnedTransactionId::from("txnid1");
2240
2241        // Receiving two `NewLocalEvent`s.
2242        {
2243            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2244                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2245
2246                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2247                    transaction_id: transaction_id.clone(),
2248                    content,
2249                });
2250
2251                // The `LatestEventValue` matches the new local event.
2252                assert_local_value_matches_room_message_with_body!(
2253                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2254                    LatestEventValue::LocalIsSending => with body = body
2255                );
2256            }
2257
2258            assert_eq!(buffer.buffer.len(), 2);
2259        }
2260
2261        // Receiving a `SendError` targeting the first event. The
2262        // `LatestEventValue` must change to indicate it's “cannot be sent”.
2263        {
2264            let update = RoomSendQueueUpdate::SendError {
2265                transaction_id: transaction_id_0.clone(),
2266                error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2267                is_recoverable: true,
2268            };
2269
2270            // The `LatestEventValue` has changed, it still matches the latest local
2271            // event but it's marked as “cannot be sent”.
2272            assert_local_value_matches_room_message_with_body!(
2273                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2274                LatestEventValue::LocalCannotBeSent => with body = "B"
2275            );
2276
2277            assert_eq!(buffer.buffer.len(), 2);
2278            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2279            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2280        }
2281
2282        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
2283        // must change: since an event has been sent, the following events are now
2284        // “is sending”.
2285        {
2286            let update = RoomSendQueueUpdate::SentEvent {
2287                transaction_id: transaction_id_0.clone(),
2288                event_id: event_id!("$ev0").to_owned(),
2289            };
2290
2291            // The `LatestEventValue` has changed, it still matches the latest local
2292            // event but it's “is sending”.
2293            assert_local_value_matches_room_message_with_body!(
2294                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2295                LatestEventValue::LocalIsSending => with body = "B"
2296            );
2297
2298            assert_eq!(buffer.buffer.len(), 1);
2299            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
2300        }
2301    }
2302
2303    #[async_test]
2304    async fn test_local_retry_event() {
2305        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2306
2307        let mut buffer = LatestEventValuesForLocalEvents::new();
2308        let transaction_id_0 = OwnedTransactionId::from("txnid0");
2309        let transaction_id_1 = OwnedTransactionId::from("txnid1");
2310
2311        // Receiving two `NewLocalEvent`s.
2312        {
2313            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2314                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2315
2316                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2317                    transaction_id: transaction_id.clone(),
2318                    content,
2319                });
2320
2321                // The `LatestEventValue` matches the new local event.
2322                assert_local_value_matches_room_message_with_body!(
2323                    LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2324                    LatestEventValue::LocalIsSending => with body = body
2325                );
2326            }
2327
2328            assert_eq!(buffer.buffer.len(), 2);
2329        }
2330
2331        // Receiving a `SendError` targeting the first event. The
2332        // `LatestEventValue` must change to indicate it's “cannot be sent”.
2333        {
2334            let update = RoomSendQueueUpdate::SendError {
2335                transaction_id: transaction_id_0.clone(),
2336                error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2337                is_recoverable: true,
2338            };
2339
2340            // The `LatestEventValue` has changed, it still matches the latest local
2341            // event but it's marked as “cannot be sent”.
2342            assert_local_value_matches_room_message_with_body!(
2343                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2344                LatestEventValue::LocalCannotBeSent => with body = "B"
2345            );
2346
2347            assert_eq!(buffer.buffer.len(), 2);
2348            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2349            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2350        }
2351
2352        // Receiving a `RetryEvent` targeting the first event. The `LatestEventValue`
2353        // must change: this local event and its following must be “is sending”.
2354        {
2355            let update =
2356                RoomSendQueueUpdate::RetryEvent { transaction_id: transaction_id_0.clone() };
2357
2358            // The `LatestEventValue` has changed, it still matches the latest local
2359            // event but it's “is sending”.
2360            assert_local_value_matches_room_message_with_body!(
2361                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2362                LatestEventValue::LocalIsSending => with body = "B"
2363            );
2364
2365            assert_eq!(buffer.buffer.len(), 2);
2366            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
2367            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
2368        }
2369    }
2370
2371    #[async_test]
2372    async fn test_local_media_upload() {
2373        let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2374
2375        let mut buffer = LatestEventValuesForLocalEvents::new();
2376        let transaction_id = OwnedTransactionId::from("txnid");
2377
2378        // Receiving a `NewLocalEvent`.
2379        {
2380            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2381
2382            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2383                transaction_id: transaction_id.clone(),
2384                content,
2385            });
2386
2387            // The `LatestEventValue` matches the new local event.
2388            assert_local_value_matches_room_message_with_body!(
2389                LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2390                LatestEventValue::LocalIsSending => with body = "A"
2391            );
2392
2393            assert_eq!(buffer.buffer.len(), 1);
2394        }
2395
2396        // Receiving a `MediaUpload` targeting the first event. The
2397        // `LatestEventValue` must not change as `MediaUpload` are ignored.
2398        {
2399            let update = RoomSendQueueUpdate::MediaUpload {
2400                related_to: transaction_id,
2401                file: None,
2402                index: 0,
2403                progress: AbstractProgress { current: 0, total: 0 },
2404            };
2405
2406            // The `LatestEventValue` has changed somehow, it tells no new
2407            // `LatestEventValue` is computed.
2408            assert_matches!(
2409                LatestEventValueBuilder::new_local(
2410                    &update,
2411                    &mut buffer,
2412                    &room_event_cache,
2413                    None,
2414                    None
2415                )
2416                .await,
2417                LatestEventValue::None
2418            );
2419
2420            assert_eq!(buffer.buffer.len(), 1);
2421        }
2422    }
2423
2424    #[async_test]
2425    async fn test_local_fallbacks_to_remote_when_empty() {
2426        let room_id = room_id!("!r0");
2427        let user_id = user_id!("@mnt_io:matrix.org");
2428        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2429        let event_id_0 = event_id!("$ev0");
2430        let event_id_1 = event_id!("$ev1");
2431
2432        let server = MatrixMockServer::new().await;
2433        let client = server.client_builder().build().await;
2434
2435        // Prelude.
2436        {
2437            // Create the room.
2438            client.base_client().get_or_create_room(room_id, RoomState::Joined);
2439
2440            // Initialise the event cache store.
2441            client
2442                .event_cache_store()
2443                .lock()
2444                .await
2445                .expect("Could not acquire the event cache lock")
2446                .as_clean()
2447                .expect("Could not acquire a clean event cache lock")
2448                .handle_linked_chunk_updates(
2449                    LinkedChunkId::Room(room_id),
2450                    vec![
2451                        Update::NewItemsChunk {
2452                            previous: None,
2453                            new: ChunkIdentifier::new(0),
2454                            next: None,
2455                        },
2456                        Update::PushItems {
2457                            at: Position::new(ChunkIdentifier::new(0), 0),
2458                            items: vec![
2459                                event_factory.text_msg("hello").event_id(event_id_0).into(),
2460                            ],
2461                        },
2462                    ],
2463                )
2464                .await
2465                .unwrap();
2466        }
2467
2468        let event_cache = client.event_cache();
2469        event_cache.subscribe().unwrap();
2470
2471        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2472
2473        let mut buffer = LatestEventValuesForLocalEvents::new();
2474
2475        // We get a `Remote` because there is no `Local*` values!
2476        assert_remote_value_matches_room_message_with_body!(
2477            LatestEventValueBuilder::new_local(
2478                // An update that won't be create a new `LatestEventValue`.
2479                &RoomSendQueueUpdate::SentEvent {
2480                    transaction_id: OwnedTransactionId::from("txnid"),
2481                    event_id: event_id_1.to_owned(),
2482                },
2483                &mut buffer,
2484                &room_event_cache,
2485                None,
2486                None,
2487            )
2488            .await
2489             => with body = "hello"
2490        );
2491    }
2492}