Skip to main content

matrix_sdk/latest_events/latest_event/
builder.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    iter::once,
18    ops::{ControlFlow, Deref},
19};
20
21pub use matrix_sdk_base::latest_event::{LatestEventValue, LocalLatestEventValue};
22use matrix_sdk_base::{
23    check_validity_of_replacement_events, deserialized_responses::TimelineEvent,
24    store::SerializableEventContent,
25};
26use ruma::{
27    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
28    events::{
29        AnyMessageLikeEventContent, AnySyncStateEvent, AnySyncTimelineEvent, SyncStateEvent,
30        relation::Replacement,
31        room::{
32            member::MembershipChange,
33            message::{MessageType, Relation, RoomMessageEventContent},
34            power_levels::RoomPowerLevels,
35            redaction::RoomRedactionEventContent,
36        },
37    },
38};
39use tracing::{debug, error};
40
41use crate::{Room, event_cache::RoomEventCache, room::Invite, send_queue::RoomSendQueueUpdate};
42
43/// A builder of [`LatestEventValue`]s.
44pub(super) struct Builder;
45
46impl Builder {
47    /// Create a new [`LatestEventValue::Remote`].
48    pub async fn new_remote(
49        room_event_cache: &RoomEventCache,
50        current_event: LatestEventValue,
51        own_user_id: &UserId,
52        power_levels: Option<&RoomPowerLevels>,
53    ) -> Option<LatestEventValue> {
54        // If we are computing a value from the Event Cache, it's because we have
55        // received an update from the Event Cache. This update falls in two categories:
56        // either an event has been added or updated, or the room has been emptied (via
57        // `EventCache::clear_all_rooms` for example).
58        //
59        // We consider the room has been emptied by default. If we are able to scan at
60        // least one in-memory event, we consider the room has not been emptied.
61        let mut room_has_been_emptied = true;
62        let mut current_value_must_be_erased = false;
63
64        // Track the most recent edit for each event.
65        let mut latest_edit_for_event: HashMap<OwnedEventId, TimelineEvent> = HashMap::new();
66
67        if let Ok(Some(event)) = room_event_cache
68            .rfind_map_event_in_memory_by(|event| {
69                // At least one event lives in-memory: we consider the room is not empty.
70                room_has_been_emptied = false;
71
72                match filter_timeline_event(
73                    event,
74                    current_event.event_id().as_ref(),
75                    own_user_id,
76                    power_levels,
77                ) {
78                    // Let's continue, event is not suitable.
79                    ControlFlow::Continue(FilterContinue {
80                        current_value_must_be_erased: erased,
81                        edited_event_id,
82                    }) => {
83                        current_value_must_be_erased = erased;
84
85                        if let Some(edited_event_id) = edited_event_id {
86                            // This is an edit. Store it if we haven't seen an edit for the
87                            // targeted event yet.
88                            latest_edit_for_event.entry(edited_event_id).or_insert(event.clone());
89                        }
90
91                        None
92                    }
93
94                    // Stop! We found a suitable event!
95                    ControlFlow::Break(()) => {
96                        // Return the latest known edit of the event or the event itself if it
97                        // hasn't been replaced or the replacement is invalid.
98                        if let Some(event_id) = event.event_id()
99                            && let Some(edit) = latest_edit_for_event.get(&event_id)
100                        {
101                            let original = event.kind.raw();
102                            let original_encryption_info = event.kind.encryption_info();
103
104                            let replacement = edit.kind.raw();
105                            let replacement_encryption_info = event.kind.encryption_info();
106
107                            Some(
108                                match check_validity_of_replacement_events(
109                                    original,
110                                    original_encryption_info.map(|e| &(**e)),
111                                    replacement,
112                                    replacement_encryption_info.map(|e| &(**e)),
113                                ) {
114                                    Ok(_) => edit.clone(),
115                                    Err(e) => {
116                                        debug!(
117                                        "Skipping an edit of a latest event due to the replacement event being invalid: {e}"
118                                    );
119                                        event.clone()
120                                    }
121                                },
122                            )
123                        } else {
124                            Some(event.clone())
125                        }
126                    }
127                }
128            })
129            .await
130        {
131            Some(LatestEventValue::Remote(event))
132        } else {
133            // When the room has been emptied, we must erase any previous value.
134            if room_has_been_emptied {
135                current_value_must_be_erased = true;
136            }
137
138            current_value_must_be_erased.then(LatestEventValue::default)
139        }
140    }
141
142    pub async fn new_remote_for_invite(room: &Room) -> LatestEventValue {
143        let (event_id, timestamp, inviter_id) = room
144            .invite_details()
145            .await
146            .map(|Invite { invitee, inviter_id, .. }| {
147                let event = invitee.event();
148
149                (
150                    // If the event is a stripped state event, it has no event ID. This is
151                    // acceptable.
152                    event.event_id().map(ToOwned::to_owned),
153                    // If the event is a stripped state event, it has no timestamp (no
154                    // `origin_server_ts`). If, in any case, it has one (it could happen depending
155                    // of the server implementation), let's use it.
156                    event.timestamp().map(MilliSecondsSinceUnixEpoch),
157                    Some(inviter_id),
158                )
159            })
160            .unwrap_or_else(|_| (None, None, None));
161
162        LatestEventValue::RemoteInvite {
163            event_id,
164            // If the event is a stripped state event, it should not have a timestamp. Let's use
165            // `now()` as a fallback so that the `LatestEventValue` can be used to sort rooms in a
166            // room list for example.
167            timestamp: timestamp.unwrap_or_else(MilliSecondsSinceUnixEpoch::now),
168            inviter: inviter_id,
169        }
170    }
171
172    /// Create a new [`LatestEventValue::LocalIsSending`] or
173    /// [`LatestEventValue::LocalCannotBeSent`].
174    pub async fn new_local(
175        send_queue_update: &RoomSendQueueUpdate,
176        buffer_of_values_for_local_events: &mut BufferOfValuesForLocalEvents,
177        room_event_cache: &RoomEventCache,
178        current_event: LatestEventValue,
179        own_user_id: &UserId,
180        power_levels: Option<&RoomPowerLevels>,
181    ) -> Option<LatestEventValue> {
182        use crate::send_queue::{LocalEcho, LocalEchoContent};
183
184        match send_queue_update {
185            // A new local event is being sent.
186            //
187            // Let's create the `LatestEventValue` and push it in the buffer of values.
188            RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
189                transaction_id,
190                content: local_echo_content,
191            }) => match local_echo_content {
192                LocalEchoContent::Event { serialized_event: serialized_event_content, .. } => {
193                    Some(match serialized_event_content.deserialize() {
194                        Ok(content) => {
195                            if filter_any_message_like_event_content(
196                                content,
197                                current_event.event_id().as_ref(),
198                            )
199                            .is_break()
200                            {
201                                let local_value = LocalLatestEventValue {
202                                    timestamp: MilliSecondsSinceUnixEpoch::now(),
203                                    content: serialized_event_content.clone(),
204                                };
205
206                                // If a local previous `LatestEventValue` exists and has been marked
207                                // as “cannot be sent”, it means the new `LatestEventValue` must
208                                // also be marked as “cannot be sent”.
209                                let value =
210                                    if let Some((_, LatestEventValue::LocalCannotBeSent(_))) =
211                                        buffer_of_values_for_local_events.last()
212                                    {
213                                        LatestEventValue::LocalCannotBeSent(local_value)
214                                    } else {
215                                        LatestEventValue::LocalIsSending(local_value)
216                                    };
217
218                                buffer_of_values_for_local_events
219                                    .push(transaction_id.to_owned(), value.clone());
220
221                                value
222                            } else {
223                                return None;
224                            }
225                        }
226
227                        Err(error) => {
228                            error!(
229                                ?error,
230                                "Failed to deserialize an event from `RoomSendQueueUpdate::NewLocalEvent`"
231                            );
232
233                            return None;
234                        }
235                    })
236                }
237
238                LocalEchoContent::React { .. } => None,
239
240                // TODO: Rework the latest event system to handle local redactions. This will
241                // require mixing the processing of local and remote events since a local redaction
242                // will target a previous remote event.
243                LocalEchoContent::Redaction { .. } => None,
244            },
245
246            // A local event has been cancelled before being sent.
247            //
248            // Remove the calculated `LatestEventValue` from the buffer of values, and return the
249            // last `LatestEventValue` or calculate a new one.
250            RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
251                let or = if let Some(position) =
252                    buffer_of_values_for_local_events.position(transaction_id)
253                {
254                    buffer_of_values_for_local_events.remove(position);
255
256                    // We have cancelled a local value. If there is no more local values, and if
257                    // there is no candidate for the Event Cache, we must generate some value, here
258                    // `LatestEventValue::None`, to erase any existing value.
259                    Some(LatestEventValue::None)
260                } else {
261                    None
262                };
263
264                Self::new_local_or_remote(
265                    buffer_of_values_for_local_events,
266                    room_event_cache,
267                    current_event,
268                    own_user_id,
269                    power_levels,
270                )
271                .await
272                .or(or)
273            }
274
275            // A local event has successfully been sent!
276            //
277            // Mark all “cannot be sent” values as “is sending” after the one matching
278            // `transaction_id`. Indeed, if an event has been sent, it means the send queue is
279            // working, so if any value has been marked as “cannot be sent”, it must be marked as
280            // “is sending”. Then, remove the calculated `LatestEventValue` from the buffer of
281            // values. Finally, return the last `LatestEventValue` or calculate a new
282            // one.
283            RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
284                if let Some(position) =
285                    buffer_of_values_for_local_events.mark_is_sending_after(transaction_id)
286                {
287                    let (_, value) = buffer_of_values_for_local_events.remove(position);
288
289                    // The sent event is the last of the buffer. Let's compute a proper “detached”
290                    // one if and only if the sent one was the last local one: not from the buffer,
291                    // not from the Event Cache!.
292                    if buffer_of_values_for_local_events.last().is_none() {
293                        // Try to resolve to a remote value from the event cache first. The remote
294                        // echo might already have been processed before we get here.
295                        if let Ok(Some(_)) = room_event_cache.find_event(event_id).await
296                            && let Some(latest_event_value) = Self::new_remote(
297                                room_event_cache,
298                                current_event,
299                                own_user_id,
300                                power_levels,
301                            )
302                            .await
303                        {
304                            return Some(latest_event_value);
305                        }
306
307                        match value {
308                            LatestEventValue::LocalIsSending(local_value)
309                            | LatestEventValue::LocalCannotBeSent(local_value)
310                            // Technically impossible, but it's not harmful to handle this that way.
311                            | LatestEventValue::LocalHasBeenSent { value: local_value, .. } => {
312                                return Some(LatestEventValue::LocalHasBeenSent { event_id: event_id.clone(), value: local_value })
313                            }
314                            LatestEventValue::Remote(_) | LatestEventValue::RemoteInvite { .. } | LatestEventValue::None => unreachable!("Impossible to get a remote `LatestEventValue`"),
315                        };
316                    }
317                }
318
319                Self::new_local_or_remote(
320                    buffer_of_values_for_local_events,
321                    room_event_cache,
322                    current_event,
323                    own_user_id,
324                    power_levels,
325                )
326                .await
327            }
328
329            // A local event has been replaced by another one.
330            //
331            // Replace the latest event value matching `transaction_id` in the buffer if it exists
332            // (note: it should!), and return the last `LatestEventValue` or calculate a new one.
333            RoomSendQueueUpdate::ReplacedLocalEvent {
334                transaction_id,
335                new_content: new_serialized_event_content,
336            } => {
337                if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
338                    match new_serialized_event_content.deserialize() {
339                        Ok(content) => {
340                            if filter_any_message_like_event_content(
341                                content,
342                                current_event.event_id().as_ref(),
343                            )
344                            .is_break()
345                            {
346                                buffer_of_values_for_local_events.replace_content(
347                                    position,
348                                    new_serialized_event_content.clone(),
349                                );
350                            } else {
351                                buffer_of_values_for_local_events.remove(position);
352                            }
353                        }
354
355                        Err(error) => {
356                            error!(
357                                ?error,
358                                "Failed to deserialize an event from `RoomSendQueueUpdate::ReplacedLocalEvent`"
359                            );
360
361                            return None;
362                        }
363                    }
364                }
365
366                Self::new_local_or_remote(
367                    buffer_of_values_for_local_events,
368                    room_event_cache,
369                    current_event,
370                    own_user_id,
371                    power_levels,
372                )
373                .await
374            }
375
376            // An error has occurred.
377            //
378            // Mark the latest event value matching `transaction_id`, and all its following values,
379            // as “cannot be sent” if the error isn't recoverable, and as "sending" if the error
380            // was recoverable.
381            RoomSendQueueUpdate::SendError { transaction_id, is_recoverable, .. } => {
382                if *is_recoverable {
383                    // If the room send queue error is recoverable, the send queue may retry to send
384                    // it in a short while, so the event should still be considered
385                    // sending. Leave it to that, at this point.
386                    buffer_of_values_for_local_events.mark_is_sending_from(transaction_id);
387                } else {
388                    // If the error isn't recoverable, mark as a true "cannot be sent".
389                    buffer_of_values_for_local_events.mark_cannot_be_sent_from(transaction_id);
390                }
391
392                Self::new_local_or_remote(
393                    buffer_of_values_for_local_events,
394                    room_event_cache,
395                    current_event,
396                    own_user_id,
397                    power_levels,
398                )
399                .await
400            }
401
402            // A local event has been unwedged and sending is being retried.
403            //
404            // Mark the latest event value matching `transaction_id`, and all its following values,
405            // as “is sending”.
406            RoomSendQueueUpdate::RetryEvent { transaction_id } => {
407                buffer_of_values_for_local_events.mark_is_sending_from(transaction_id);
408
409                Self::new_local_or_remote(
410                    buffer_of_values_for_local_events,
411                    room_event_cache,
412                    current_event,
413                    own_user_id,
414                    power_levels,
415                )
416                .await
417            }
418
419            // A media upload has made progress.
420            //
421            // Nothing to do here.
422            RoomSendQueueUpdate::MediaUpload { .. } => None,
423        }
424    }
425
426    /// Get the last [`LatestEventValue`] from the local latest event values if
427    /// any, or create a new [`LatestEventValue`] from the remote events.
428    ///
429    /// If the buffer of latest event values is not empty, let's return the last
430    /// one. Otherwise, it means we no longer have any local event: let's
431    /// fallback on remote event!
432    async fn new_local_or_remote(
433        buffer_of_values_for_local_events: &mut BufferOfValuesForLocalEvents,
434        room_event_cache: &RoomEventCache,
435        current_event: LatestEventValue,
436        own_user_id: &UserId,
437        power_levels: Option<&RoomPowerLevels>,
438    ) -> Option<LatestEventValue> {
439        if let Some((_, value)) = buffer_of_values_for_local_events.last() {
440            Some(value.clone())
441        } else {
442            Self::new_remote(room_event_cache, current_event, own_user_id, power_levels).await
443        }
444    }
445}
446
447/// A buffer of the current [`LatestEventValue`] computed for local events
448/// seen by the send queue. It is used by
449/// [`LatestEvent::buffer_of_values_for_local_events`].
450///
451/// The system does only receive [`RoomSendQueueUpdate`]s. It's not designed to
452/// iterate over local events in the send queue when a local event is changed
453/// (cancelled, or updated for example). That's why we keep our own buffer here.
454/// Imagine the system receives 4 [`RoomSendQueueUpdate`]:
455///
456/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
457/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
458/// 3. [`RoomSendQueueUpdate::ReplacedLocalEvent`]: replaced the first local
459///    event,
460/// 4. [`RoomSendQueueUpdate::CancelledLocalEvent`]: cancelled the second local
461///    event.
462///
463/// `NewLocalEvent`s will trigger the computation of new
464/// `LatestEventValue`s, but `CancelledLocalEvent` for example doesn't hold
465/// any information to compute a new `LatestEventValue`, so we need to
466/// remember the previous values, until the local events are sent and
467/// removed from this buffer.
468///
469/// Another reason why we need a buffer is to handle wedged local event. Imagine
470/// the system receives 3 [`RoomSendQueueUpdate`]:
471///
472/// 1. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
473/// 2. [`RoomSendQueueUpdate::NewLocalEvent`]: new local event,
474/// 3. [`RoomSendQueueUpdate::SendError`]: the first local event has failed to
475///    be sent.
476///
477/// Because a `SendError` is received (targeting the first `NewLocalEvent`), the
478/// send queue is stopped. However, the `LatestEventValue` targets the second
479/// `NewLocalEvent`. The system must consider that when a local event is wedged,
480/// all the following local events must also be marked as “cannot be sent”. And
481/// vice versa, when the send queue is able to send an event again, all the
482/// following local events must be marked as “is sending”.
483///
484/// This type isolates a couple of methods designed to manage these specific
485/// behaviours.
486#[derive(Debug)]
487pub(super) struct BufferOfValuesForLocalEvents {
488    buffer: Vec<(OwnedTransactionId, LatestEventValue)>,
489}
490
491impl BufferOfValuesForLocalEvents {
492    /// Create a new [`BufferOfValuesForLocalEvents`].
493    pub fn new() -> Self {
494        Self { buffer: Vec::with_capacity(2) }
495    }
496
497    /// Check the buffer is empty.
498    pub fn is_empty(&self) -> bool {
499        self.buffer.is_empty()
500    }
501
502    /// Get the last [`LatestEventValue`].
503    fn last(&self) -> Option<&(OwnedTransactionId, LatestEventValue)> {
504        self.buffer.last()
505    }
506
507    /// Find the position of the [`LatestEventValue`] matching `transaction_id`.
508    fn position(&self, transaction_id: &TransactionId) -> Option<usize> {
509        self.buffer
510            .iter()
511            .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
512    }
513
514    /// Push a new [`LatestEventValue`].
515    ///
516    /// # Panics
517    ///
518    /// Panics if `value` is not of kind [`LatestEventValue::LocalIsSending`] or
519    /// [`LatestEventValue::LocalCannotBeSent`].
520    fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) {
521        assert!(value.is_local(), "`value` must be a local `LatestEventValue`");
522
523        self.buffer.push((transaction_id, value));
524    }
525
526    /// Replace the content of the [`LatestEventValue`] at position `position`.
527    ///
528    /// # Panics
529    ///
530    /// Panics if:
531    /// - `position` is strictly greater than buffer's length,
532    /// - the [`LatestEventValue`] is not of kind
533    ///   [`LatestEventValue::LocalIsSending`] or
534    ///   [`LatestEventValue::LocalCannotBeSent`].
535    fn replace_content(&mut self, position: usize, new_content: SerializableEventContent) {
536        let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid");
537
538        match value {
539            LatestEventValue::LocalIsSending(LocalLatestEventValue { content, .. })
540            | LatestEventValue::LocalCannotBeSent(LocalLatestEventValue { content, .. }) => {
541                *content = new_content;
542            }
543
544            _ => panic!("`value` must be either `LocalIsSending` or `LocalCannotBeSent`"),
545        }
546    }
547
548    /// Remove the [`LatestEventValue`] at position `position`.
549    ///
550    /// # Panics
551    ///
552    /// Panics if `position` is strictly greater than buffer's length.
553    fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) {
554        self.buffer.remove(position)
555    }
556
557    /// Mark the `LatestEventValue` matching `transaction_id`, and all the
558    /// following values, as “cannot be sent”.
559    fn mark_cannot_be_sent_from(&mut self, transaction_id: &TransactionId) {
560        let mut values = self.buffer.iter_mut();
561
562        if let Some(first_value_to_wedge) = values
563            .by_ref()
564            .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
565        {
566            // Iterate over the found value and the following ones.
567            for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) {
568                if let LatestEventValue::LocalIsSending(content) = value_to_wedge {
569                    *value_to_wedge = LatestEventValue::LocalCannotBeSent(content.clone());
570                }
571            }
572        }
573    }
574
575    /// Mark the `LatestEventValue` matching `transaction_id`, and all the
576    /// following values, as “is sending”.
577    fn mark_is_sending_from(&mut self, transaction_id: &TransactionId) {
578        let mut values = self.buffer.iter_mut();
579
580        if let Some(first_value_to_unwedge) = values
581            .by_ref()
582            .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
583        {
584            // Iterate over the found value and the following ones.
585            for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) {
586                if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
587                    *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
588                }
589            }
590        }
591    }
592
593    /// Mark all the following values after the `LatestEventValue` matching
594    /// `transaction_id` as “is sending”.
595    ///
596    /// Note that contrary to [`Self::mark_is_sending_from`], the
597    /// `LatestEventValue` is untouched. However, its position is returned
598    /// (if any).
599    fn mark_is_sending_after(&mut self, transaction_id: &TransactionId) -> Option<usize> {
600        let mut values = self.buffer.iter_mut();
601
602        if let Some(position) = values
603            .by_ref()
604            .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
605        {
606            // Iterate over all values after the found one.
607            for (_, value_to_unwedge) in values {
608                if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
609                    *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
610                }
611            }
612
613            Some(position)
614        } else {
615            None
616        }
617    }
618}
619
620/// The [`ControlFlow::Continue`] value used by the filters.
621#[derive(Debug)]
622struct FilterContinue {
623    /// Whether the current [`LatestEventValue`] must be erased or not.
624    current_value_must_be_erased: bool,
625    /// When the event is a replacement, this is the targeted event ID.
626    edited_event_id: Option<OwnedEventId>,
627}
628
629/// Build the [`ControlFlow::Break`] for the filters.
630fn filter_break() -> ControlFlow<(), FilterContinue> {
631    ControlFlow::Break(())
632}
633
634/// Build the [`ControlFlow::Continue`] for the filters.
635fn filter_continue() -> ControlFlow<(), FilterContinue> {
636    ControlFlow::Continue(FilterContinue {
637        current_value_must_be_erased: false,
638        edited_event_id: None,
639    })
640}
641
642/// Build the [`ControlFlow::Continue`] with erasing, for the filters.
643fn filter_continue_with_erasing() -> ControlFlow<(), FilterContinue> {
644    ControlFlow::Continue(FilterContinue {
645        current_value_must_be_erased: true,
646        edited_event_id: None,
647    })
648}
649
650/// Build the [`ControlFlow::Continue`] with an edited event ID, for the
651/// filters.
652fn filter_continue_with_edit(edited_event_id: OwnedEventId) -> ControlFlow<(), FilterContinue> {
653    ControlFlow::Continue(FilterContinue {
654        current_value_must_be_erased: false,
655        edited_event_id: Some(edited_event_id),
656    })
657}
658
659/// Filter a [`TimelineEvent`].
660///
661/// Be careful:
662///
663/// - `event` is the current event in the collection of events that is scanned.
664/// - `current_value_event_id` is the event ID of the current
665///   [`LatestEventValue`].
666fn filter_timeline_event(
667    event: &TimelineEvent,
668    current_value_event_id: Option<&OwnedEventId>,
669    own_user_id: &UserId,
670    power_levels: Option<&RoomPowerLevels>,
671) -> ControlFlow<(), FilterContinue> {
672    // Cast the event into an `AnySyncTimelineEvent`. If deserializing fails, we
673    // ignore the event.
674    let event = match event.raw().deserialize() {
675        Ok(event) => event,
676        Err(error) => {
677            error!(
678                ?error,
679                "Failed to deserialize the event when looking for a suitable latest event"
680            );
681
682            return filter_continue();
683        }
684    };
685
686    match event {
687        AnySyncTimelineEvent::MessageLike(message_like_event) => {
688            match message_like_event.original_content() {
689                Some(any_message_like_event_content) => filter_any_message_like_event_content(
690                    any_message_like_event_content,
691                    current_value_event_id,
692                ),
693
694                // The event has been redacted.
695                None => filter_continue(),
696            }
697        }
698
699        AnySyncTimelineEvent::State(state) => {
700            filter_any_sync_state_event(state, own_user_id, power_levels)
701        }
702    }
703}
704
705fn filter_any_message_like_event_content(
706    event: AnyMessageLikeEventContent,
707    current_value_event_id: Option<&OwnedEventId>,
708) -> ControlFlow<(), FilterContinue> {
709    match event {
710        // `m.room.message`
711        AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
712            msgtype,
713            relates_to,
714            ..
715        }) => {
716            // Don't show incoming verification requests.
717            if let MessageType::VerificationRequest(_) = msgtype {
718                return filter_continue();
719            }
720
721            // Not all relations are accepted. Let's filter them.
722            match relates_to {
723                Some(Relation::Replacement(Replacement { event_id, .. })) => {
724                    // Edits are suitable as latest events if and only if the targeted event is also
725                    // suitable. Let's remember this edit, and the associated
726                    // targeted event ID.
727                    filter_continue_with_edit(event_id)
728                }
729
730                _ => filter_break(),
731            }
732        }
733
734        // `org.matrix.msc3381.poll.start`
735        // `m.call.invite`
736        // `m.rtc.notification`
737        // `m.sticker`
738        AnyMessageLikeEventContent::UnstablePollStart(_)
739        | AnyMessageLikeEventContent::CallInvite(_)
740        | AnyMessageLikeEventContent::RtcNotification(_)
741        | AnyMessageLikeEventContent::Sticker(_) => filter_break(),
742
743        // `m.room.redaction`
744        AnyMessageLikeEventContent::RoomRedaction(RoomRedactionEventContent {
745            redacts, ..
746        }) => {
747            // A redaction is not suitable.
748            //
749            // However, this redaction targets the current `LatestEventValue`! It means the
750            // current value must be erased.
751            if redacts.as_ref() == current_value_event_id {
752                filter_continue_with_erasing()
753            } else {
754                filter_continue()
755            }
756        }
757
758        // `m.room.encrypted`
759        AnyMessageLikeEventContent::RoomEncrypted(_) => {
760            // **explicitly** not suitable.
761            filter_continue()
762        }
763
764        // Everything else is considered not suitable.
765        _ => filter_continue(),
766    }
767}
768
769fn filter_any_sync_state_event(
770    event: AnySyncStateEvent,
771    own_user_id: &UserId,
772    power_levels: Option<&RoomPowerLevels>,
773) -> ControlFlow<(), FilterContinue> {
774    match event {
775        AnySyncStateEvent::RoomMember(SyncStateEvent::Original(member)) => {
776            match member.membership_change() {
777                MembershipChange::Knocked => {
778                    let can_accept_or_decline_knocks = match power_levels {
779                        Some(room_power_levels) => {
780                            room_power_levels.user_can_invite(own_user_id)
781                                || room_power_levels
782                                    .user_can_kick_user(own_user_id, &member.state_key)
783                        }
784                        None => false,
785                    };
786
787                    // The current user can act on the knock changes, so they should be
788                    // displayed
789                    if can_accept_or_decline_knocks {
790                        return filter_break();
791                    }
792
793                    filter_continue()
794                }
795
796                // A member has joined one way or another.
797                MembershipChange::Joined
798                | MembershipChange::Invited
799                | MembershipChange::InvitationAccepted
800                | MembershipChange::KnockAccepted => {
801                    // This member _is_ the current user, not someone else! This is a valid state
802                    // event:
803                    //
804                    // - the user is joining a room (for the first time or again): we want a
805                    //   `LatestEventValue` to get a first value!
806                    // - the user is being invited: we want a `LatestEventValue` to represent the
807                    //   invitation!
808                    if member.state_key.deref() == own_user_id {
809                        filter_break()
810                    } else {
811                        filter_continue()
812                    }
813                }
814
815                _ => filter_continue(),
816            }
817        }
818
819        // `org.matrix.msc3672.beacon_info`
820        //
821        // Both the start (`is_live() == true`) and stop (`is_live() == false`)
822        // events are suitable, so that the item stays visible in the room
823        // summary even after the sharing session ends.
824        AnySyncStateEvent::BeaconInfo(SyncStateEvent::Original(_)) => filter_break(),
825
826        _ => filter_continue(),
827    }
828}
829
830#[cfg(test)]
831mod filter_tests {
832    use std::ops::Not;
833
834    use assert_matches::assert_matches;
835    use matrix_sdk_test::event_factory::{EventFactory, PreviousMembership};
836    use ruma::{
837        event_id,
838        events::{
839            room::{member::MembershipState, message::RoomMessageEventContent},
840            rtc::notification::NotificationType,
841        },
842        owned_event_id, owned_user_id, user_id,
843    };
844
845    use super::{ControlFlow, FilterContinue, filter_timeline_event};
846
847    macro_rules! assert_latest_event_content {
848        ( event | $event_factory:ident | $event_builder:block
849          is a candidate ) => {
850            assert_latest_event_content!(@_ | $event_factory | $event_builder, ControlFlow::Break(_));
851        };
852
853        ( event | $event_factory:ident | $event_builder:block
854          is not a candidate ) => {
855            assert_latest_event_content!(@_ | $event_factory | $event_builder, ControlFlow::Continue(_));
856        };
857
858        ( @_ | $event_factory:ident | $event_builder:block, $expect:pat) => {
859            let user_id = user_id!("@mnt_io:matrix.org");
860            let event_factory = EventFactory::new().sender(user_id);
861            let event = {
862                let $event_factory = event_factory;
863                $event_builder
864            };
865
866            assert_matches!(
867                filter_timeline_event(&event, None, user_id!("@mnt_io:matrix.org"), None),
868                $expect
869            );
870        };
871    }
872
873    #[test]
874    fn test_room_message() {
875        assert_latest_event_content!(
876            event | event_factory | { event_factory.text_msg("hello").into_event() }
877            is a candidate
878        );
879    }
880
881    #[test]
882    fn test_room_message_replacement() {
883        let user_id = user_id!("@mnt_io:matrix.org");
884        let event_factory = EventFactory::new().sender(user_id);
885        let event_id = event_id!("$ev0");
886        let event = event_factory
887            .text_msg("bonjour")
888            .edit(event_id, RoomMessageEventContent::text_plain("hello").into())
889            .into_event();
890
891        assert_matches!(filter_timeline_event(&event, None, user_id, None), ControlFlow::Continue(FilterContinue { current_value_must_be_erased, edited_event_id }) => {
892                    assert!(current_value_must_be_erased.not());
893                    assert_eq!(edited_event_id, Some(event_id.to_owned()));
894                }
895        );
896    }
897
898    #[test]
899    fn test_redaction() {
900        let user_id = user_id!("@mnt_io:matrix.org");
901        let event_factory = EventFactory::new().sender(user_id);
902        let event_id = event_id!("$ev0");
903        let event = event_factory.redaction(event_id).into_event();
904
905        // `current_value_event_id` is `None`, it cannot be used to decide whether the
906        // current value must be erased.
907        {
908            let current_value_event_id = None;
909
910            assert_matches!(
911                filter_timeline_event(&event, current_value_event_id, user_id, None),
912                ControlFlow::Continue(FilterContinue { current_value_must_be_erased, edited_event_id }) => {
913                    assert!(current_value_must_be_erased.not());
914                    assert!(edited_event_id.is_none());
915                }
916            );
917        }
918
919        // `current_value_event_id` is `Some(_)`, but the redaction event doesn't target
920        // this event ID.
921        {
922            let current_value_event_id = Some(owned_event_id!("$ev1"));
923
924            assert_matches!(
925                filter_timeline_event(&event, current_value_event_id.as_ref(), user_id, None),
926                ControlFlow::Continue(FilterContinue { current_value_must_be_erased, edited_event_id }) => {
927                    assert!(current_value_must_be_erased.not());
928                    assert!(edited_event_id.is_none());
929                }
930            );
931        }
932
933        // `current_value_event_id` is `Some(_)`, and the redaction event does target
934        // this event ID: great, the current value must be erased!
935        {
936            let current_value_event_id = Some(event_id.to_owned());
937
938            assert_matches!(
939                filter_timeline_event(&event, current_value_event_id.as_ref(), user_id, None),
940                ControlFlow::Continue(FilterContinue { current_value_must_be_erased, edited_event_id }) => {
941                    assert!(current_value_must_be_erased);
942                    assert!(edited_event_id.is_none());
943                }
944            );
945        }
946    }
947
948    #[test]
949    fn test_redacted() {
950        assert_latest_event_content!(
951            event | event_factory | {
952                event_factory
953                    .redacted(
954                        user_id!("@mnt_io:matrix.org"),
955                        ruma::events::room::message::RedactedRoomMessageEventContent::new(),
956                    )
957                    .event_id(event_id!("$ev0"))
958                    .into_event()
959            }
960            is not a candidate
961        );
962    }
963
964    #[test]
965    fn test_poll() {
966        assert_latest_event_content!(
967            event | event_factory | {
968                event_factory
969                    .poll_start("the people need to know", "comté > gruyère", vec!["yes", "oui"])
970                    .into_event()
971            }
972            is a candidate
973        );
974    }
975
976    #[test]
977    fn test_call_invite() {
978        assert_latest_event_content!(
979            event | event_factory | {
980                event_factory
981                    .call_invite(
982                        ruma::OwnedVoipId::from("vvooiipp".to_owned()),
983                        ruma::UInt::from(1234u32),
984                        ruma::events::call::SessionDescription::new(
985                            "type".to_owned(),
986                            "sdp".to_owned(),
987                        ),
988                        ruma::VoipVersionId::V1,
989                    )
990                    .into_event()
991            }
992            is a candidate
993        );
994    }
995
996    #[test]
997    fn test_rtc_notification() {
998        assert_latest_event_content!(
999            event | event_factory | {
1000                event_factory
1001                     .rtc_notification(
1002                        NotificationType::Ring,
1003                    )
1004                    .mentions(vec![owned_user_id!("@alice:server.name")])
1005                    .relates_to_membership_state_event(ruma::OwnedEventId::try_from("$abc:server.name").unwrap())
1006                    .lifetime(60)
1007                    .into_event()
1008            }
1009            is a candidate
1010        );
1011    }
1012
1013    #[test]
1014    fn test_sticker() {
1015        assert_latest_event_content!(
1016            event | event_factory | {
1017                event_factory
1018                    .sticker(
1019                        "wink wink",
1020                        ruma::events::room::ImageInfo::new(),
1021                        ruma::OwnedMxcUri::from("mxc://foo/bar"),
1022                    )
1023                    .into_event()
1024            }
1025            is a candidate
1026        );
1027    }
1028
1029    #[test]
1030    fn test_encrypted_room_message() {
1031        assert_latest_event_content!(
1032            event | event_factory | {
1033                event_factory
1034                    .event(ruma::events::room::encrypted::RoomEncryptedEventContent::new(
1035                        ruma::events::room::encrypted::EncryptedEventScheme::MegolmV1AesSha2(
1036                            ruma::events::room::encrypted::MegolmV1AesSha2ContentInit {
1037                                ciphertext: "cipher".to_owned(),
1038                                sender_key: "sender_key".to_owned(),
1039                                device_id: "device_id".into(),
1040                                session_id: "session_id".to_owned(),
1041                            }
1042                            .into(),
1043                        ),
1044                        None,
1045                    ))
1046                    .into_event()
1047            }
1048            is not a candidate
1049        );
1050    }
1051
1052    #[test]
1053    fn test_reaction() {
1054        // Take a random message-like event.
1055        assert_latest_event_content!(
1056            event | event_factory | { event_factory.reaction(event_id!("$ev0"), "+1").into_event() }
1057            is not a candidate
1058        );
1059    }
1060
1061    #[test]
1062    fn test_state_event() {
1063        assert_latest_event_content!(
1064            event | event_factory | { event_factory.room_topic("new room topic").into_event() }
1065            is not a candidate
1066        );
1067    }
1068
1069    #[test]
1070    fn test_knocked_without_power_levels() {
1071        assert_latest_event_content!(
1072            event | event_factory | {
1073                event_factory
1074                    .member(user_id!("@other_mnt_io:server.name"))
1075                    .membership(MembershipState::Knock)
1076                    .into_event()
1077            }
1078            is not a candidate
1079        );
1080    }
1081
1082    #[test]
1083    fn test_knocked_with_power_levels() {
1084        use ruma::{
1085            events::room::power_levels::{RoomPowerLevels, RoomPowerLevelsSource},
1086            room_version_rules::AuthorizationRules,
1087        };
1088
1089        let user_id = user_id!("@mnt_io:matrix.org");
1090        let other_user_id = user_id!("@other_mnt_io:server.name");
1091        let event_factory = EventFactory::new().sender(user_id);
1092        let event =
1093            event_factory.member(other_user_id).membership(MembershipState::Knock).into_event();
1094
1095        let mut room_power_levels =
1096            RoomPowerLevels::new(RoomPowerLevelsSource::None, &AuthorizationRules::V1, []);
1097        room_power_levels.users.insert(user_id.to_owned(), 5.into());
1098        room_power_levels.users.insert(other_user_id.to_owned(), 4.into());
1099
1100        // Cannot accept. Cannot decline.
1101        {
1102            room_power_levels.invite = 10.into();
1103            room_power_levels.kick = 10.into();
1104            assert!(
1105                filter_timeline_event(&event, None, user_id, Some(&room_power_levels))
1106                    .is_continue(),
1107                "cannot accept, cannot decline",
1108            );
1109        }
1110
1111        // Can accept. Cannot decline.
1112        {
1113            room_power_levels.invite = 0.into();
1114            room_power_levels.kick = 10.into();
1115            assert!(
1116                filter_timeline_event(&event, None, user_id, Some(&room_power_levels)).is_break(),
1117                "can accept, cannot decline",
1118            );
1119        }
1120
1121        // Cannot accept. Can decline.
1122        {
1123            room_power_levels.invite = 10.into();
1124            room_power_levels.kick = 0.into();
1125            assert!(
1126                filter_timeline_event(&event, None, user_id, Some(&room_power_levels)).is_break(),
1127                "cannot accept, can decline",
1128            );
1129        }
1130
1131        // Can accept. Can decline.
1132        {
1133            room_power_levels.invite = 0.into();
1134            room_power_levels.kick = 0.into();
1135            assert!(
1136                filter_timeline_event(&event, None, user_id, Some(&room_power_levels)).is_break(),
1137                "can accept, can decline",
1138            );
1139        }
1140
1141        // Cannot accept. Can decline. But with an other user ID with at least the same
1142        // levels, i.e. the current user cannot kick another user with the same
1143        // or higher levels.
1144        {
1145            room_power_levels.users.insert(user_id.to_owned(), 5.into());
1146            room_power_levels.users.insert(other_user_id.to_owned(), 5.into());
1147
1148            room_power_levels.invite = 10.into();
1149            room_power_levels.kick = 0.into();
1150
1151            assert!(
1152                filter_timeline_event(&event, None, user_id, Some(&room_power_levels))
1153                    .is_continue(),
1154                "cannot accept, can decline, at least same user levels",
1155            );
1156        }
1157    }
1158
1159    #[test]
1160    fn test_knock_accepted() {
1161        // The current user was knocking and now is invited.
1162        assert_latest_event_content!(
1163            event | event_factory | {
1164                event_factory
1165                    .member(user_id!("@mnt_io:matrix.org"))
1166                    .membership(MembershipState::Invite)
1167                    .previous(MembershipState::Knock)
1168                    .into_event()
1169            }
1170            is a candidate
1171        );
1172    }
1173
1174    #[test]
1175    fn test_knock_accepted_from_someone_else() {
1176        // The current user sees an accepted knock but for someone else.
1177        assert_latest_event_content!(
1178            event | event_factory | {
1179                event_factory
1180                    .member(user_id!("@other_mnt_io:server.name"))
1181                    .membership(MembershipState::Invite)
1182                    .previous(MembershipState::Knock)
1183                    .into_event()
1184            }
1185            is not a candidate
1186        );
1187    }
1188
1189    #[test]
1190    fn test_joined() {
1191        // The current user is joining the room.
1192        assert_latest_event_content!(
1193            event | event_factory | {
1194                event_factory
1195                    .member(user_id!("@mnt_io:matrix.org"))
1196                    .membership(MembershipState::Join)
1197                    .into_event()
1198            }
1199            is a candidate
1200        );
1201    }
1202
1203    #[test]
1204    fn test_joined_from_left() {
1205        // The current user was left and now joins again.
1206        assert_latest_event_content!(
1207            event | event_factory | {
1208                event_factory
1209                    .member(user_id!("@mnt_io:matrix.org"))
1210                    .membership(MembershipState::Join)
1211                    .previous(MembershipState::Leave)
1212                    .into_event()
1213            }
1214            is a candidate
1215        );
1216    }
1217
1218    #[test]
1219    fn test_joined_for_someone_else() {
1220        use ruma::events::room::member::MembershipState;
1221
1222        // The current user sees a join but for someone else.
1223        assert_latest_event_content!(
1224            event | event_factory | {
1225                event_factory
1226                    .member(user_id!("@other_mnt_io:server.name"))
1227                    .membership(MembershipState::Join)
1228                    .into_event()
1229            }
1230            is not a candidate
1231        );
1232    }
1233
1234    #[test]
1235    fn test_invited() {
1236        assert_latest_event_content!(
1237            event | event_factory | {
1238                event_factory
1239                    .member(user_id!("@mnt_io:matrix.org"))
1240                    .membership(MembershipState::Invite)
1241                    .into_event()
1242            }
1243            is a candidate
1244        );
1245    }
1246
1247    #[test]
1248    fn test_invited_from_left() {
1249        // The current user was left and now is invited again.
1250        assert_latest_event_content!(
1251            event | event_factory | {
1252                event_factory
1253                    .member(user_id!("@mnt_io:matrix.org"))
1254                    .membership(MembershipState::Invite)
1255                    .previous(MembershipState::Leave)
1256                    .into_event()
1257            }
1258            is a candidate
1259        );
1260    }
1261
1262    #[test]
1263    fn test_invited_for_someone_else() {
1264        // The current user sees an invite but for someone else.
1265        assert_latest_event_content!(
1266            event | event_factory | {
1267                event_factory
1268                    .member(user_id!("@other_mnt_io:server.name"))
1269                    .membership(MembershipState::Invite)
1270                    .into_event()
1271            }
1272            is not a candidate
1273        );
1274    }
1275
1276    #[test]
1277    fn test_invitation_accepted() {
1278        // The current user was invited and now is joined.
1279        assert_latest_event_content!(
1280            event | event_factory | {
1281                event_factory
1282                    .member(user_id!("@mnt_io:matrix.org"))
1283                    .membership(MembershipState::Join)
1284                    .previous(MembershipState::Invite)
1285                    .into_event()
1286            }
1287            is a candidate
1288        );
1289    }
1290
1291    #[test]
1292    fn test_invitation_accepted_from_someone_else() {
1293        // The current user sees an accepted invitation but for someone else.
1294        assert_latest_event_content!(
1295            event | event_factory | {
1296                event_factory
1297                    .member(user_id!("@other_mnt_io:server.name"))
1298                    .membership(MembershipState::Join)
1299                    .previous(MembershipState::Invite)
1300                    .into_event()
1301            }
1302            is not a candidate
1303        );
1304    }
1305
1306    #[test]
1307    fn test_profile_changed() {
1308        // The current user has a new display name.
1309        assert_latest_event_content!(
1310            event | event_factory | {
1311                event_factory
1312                    .member(user_id!("@mnt_io:matrix.org"))
1313                    .membership(MembershipState::Join)
1314                    .previous(PreviousMembership::new(MembershipState::Join).display_name("Coucou"))
1315                    .into_event()
1316            }
1317            is not a candidate
1318        );
1319    }
1320
1321    #[test]
1322    fn test_beacon_info_live() {
1323        use std::time::Duration;
1324
1325        // A live beacon_info (start of location sharing) is a candidate.
1326        assert_latest_event_content!(
1327            event | event_factory | {
1328                event_factory
1329                    .beacon_info(None, Duration::from_secs(60), true, None)
1330                    .state_key(user_id!("@mnt_io:matrix.org"))
1331                    .into_event()
1332            }
1333            is a candidate
1334        );
1335    }
1336
1337    #[test]
1338    fn test_beacon_info_stopped() {
1339        use std::time::Duration;
1340
1341        // A non-live beacon_info (stop event) is still a candidate — the item
1342        // should remain visible in the room summary after the session ends.
1343        assert_latest_event_content!(
1344            event | event_factory | {
1345                event_factory
1346                    .beacon_info(None, Duration::from_secs(60), false, None)
1347                    .state_key(user_id!("@mnt_io:matrix.org"))
1348                    .into_event()
1349            }
1350            is a candidate
1351        );
1352    }
1353
1354    #[test]
1355    fn test_room_message_verification_request() {
1356        use ruma::{OwnedDeviceId, events::room::message};
1357
1358        assert_latest_event_content!(
1359            event | event_factory | {
1360                event_factory
1361                    .event(RoomMessageEventContent::new(message::MessageType::VerificationRequest(
1362                        message::KeyVerificationRequestEventContent::new(
1363                            "body".to_owned(),
1364                            vec![],
1365                            OwnedDeviceId::from("device_id"),
1366                            owned_user_id!("@user:server.name"),
1367                        ),
1368                    )))
1369                    .into_event()
1370            }
1371            is not a candidate
1372        );
1373    }
1374}
1375
1376#[cfg(test)]
1377mod buffer_of_values_for_local_event_tests {
1378    use assert_matches::assert_matches;
1379    use ruma::{
1380        OwnedTransactionId,
1381        events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
1382        owned_event_id,
1383        serde::Raw,
1384    };
1385    use serde_json::json;
1386
1387    use super::{
1388        super::{super::local_room_message, RemoteLatestEventValue},
1389        BufferOfValuesForLocalEvents, LatestEventValue, LocalLatestEventValue,
1390    };
1391
1392    fn remote_room_message(body: &str) -> RemoteLatestEventValue {
1393        RemoteLatestEventValue::from_plaintext(
1394            Raw::from_json_string(
1395                json!({
1396                    "content": RoomMessageEventContent::text_plain(body),
1397                    "type": "m.room.message",
1398                    "event_id": "$ev0",
1399                    "origin_server_ts": 42,
1400                    "sender": "@mnt_io:matrix.org",
1401                })
1402                .to_string(),
1403            )
1404            .unwrap(),
1405        )
1406    }
1407
1408    #[test]
1409    fn test_last() {
1410        let mut buffer = BufferOfValuesForLocalEvents::new();
1411
1412        assert!(buffer.last().is_none());
1413
1414        let transaction_id = OwnedTransactionId::from("txnid");
1415        buffer.push(
1416            transaction_id.clone(),
1417            LatestEventValue::LocalIsSending(local_room_message("tome")),
1418        );
1419
1420        assert_matches!(
1421            buffer.last(),
1422            Some((expected_transaction_id, LatestEventValue::LocalIsSending(_))) => {
1423                assert_eq!(expected_transaction_id, &transaction_id);
1424            }
1425        );
1426    }
1427
1428    #[test]
1429    fn test_position() {
1430        let mut buffer = BufferOfValuesForLocalEvents::new();
1431        let transaction_id = OwnedTransactionId::from("txnid");
1432
1433        assert!(buffer.position(&transaction_id).is_none());
1434
1435        buffer.push(
1436            transaction_id.clone(),
1437            LatestEventValue::LocalIsSending(local_room_message("raclette")),
1438        );
1439        buffer.push(
1440            OwnedTransactionId::from("othertxnid"),
1441            LatestEventValue::LocalIsSending(local_room_message("tome")),
1442        );
1443
1444        assert_eq!(buffer.position(&transaction_id), Some(0));
1445    }
1446
1447    #[test]
1448    #[should_panic]
1449    fn test_push_none() {
1450        let mut buffer = BufferOfValuesForLocalEvents::new();
1451
1452        buffer.push(OwnedTransactionId::from("txnid"), LatestEventValue::None);
1453    }
1454
1455    #[test]
1456    #[should_panic]
1457    fn test_push_remote() {
1458        let mut buffer = BufferOfValuesForLocalEvents::new();
1459
1460        buffer.push(
1461            OwnedTransactionId::from("txnid"),
1462            LatestEventValue::Remote(remote_room_message("tome")),
1463        );
1464    }
1465
1466    #[test]
1467    fn test_push_local() {
1468        let mut buffer = BufferOfValuesForLocalEvents::new();
1469
1470        buffer.push(
1471            OwnedTransactionId::from("txnid0"),
1472            LatestEventValue::LocalIsSending(local_room_message("tome")),
1473        );
1474        buffer.push(
1475            OwnedTransactionId::from("txnid1"),
1476            LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1477        );
1478        buffer.push(
1479            OwnedTransactionId::from("txnid1"),
1480            LatestEventValue::LocalHasBeenSent {
1481                event_id: owned_event_id!("$ev0"),
1482                value: local_room_message("raclette"),
1483            },
1484        );
1485
1486        // no panic.
1487    }
1488
1489    #[test]
1490    #[should_panic]
1491    fn test_replace_content_on_remote() {
1492        let mut buffer = BufferOfValuesForLocalEvents::new();
1493
1494        buffer.push(
1495            OwnedTransactionId::from("txnid"),
1496            LatestEventValue::Remote(remote_room_message("gruyère")),
1497        );
1498
1499        let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1500
1501        buffer.replace_content(0, new_content);
1502    }
1503
1504    #[test]
1505    #[should_panic]
1506    fn test_replace_content_on_local_has_been_sent() {
1507        let mut buffer = BufferOfValuesForLocalEvents::new();
1508
1509        buffer.push(
1510            OwnedTransactionId::from("txnid"),
1511            LatestEventValue::LocalHasBeenSent {
1512                event_id: owned_event_id!("$ev0"),
1513                value: local_room_message("gruyère"),
1514            },
1515        );
1516
1517        let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1518
1519        buffer.replace_content(0, new_content);
1520    }
1521
1522    #[test]
1523    fn test_replace_content_on_local_is_sending() {
1524        let mut buffer = BufferOfValuesForLocalEvents::new();
1525
1526        let transaction_id = OwnedTransactionId::from("txnid0");
1527        buffer.push(
1528            transaction_id.clone(),
1529            LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1530        );
1531
1532        let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1533
1534        buffer.replace_content(0, new_content);
1535
1536        assert_matches!(
1537            buffer.last(),
1538            Some((expected_transaction_id, LatestEventValue::LocalIsSending(local_event))) => {
1539                assert_eq!(expected_transaction_id, &transaction_id);
1540                assert_matches!(
1541                    local_event.content.deserialize().unwrap(),
1542                    AnyMessageLikeEventContent::RoomMessage(content) => {
1543                        assert_eq!(content.body(), "comté");
1544                    }
1545                );
1546            }
1547        );
1548    }
1549
1550    #[test]
1551    fn test_replace_content_on_local_cannot_be_sent() {
1552        let mut buffer = BufferOfValuesForLocalEvents::new();
1553
1554        let transaction_id = OwnedTransactionId::from("txnid0");
1555        buffer.push(
1556            transaction_id.clone(),
1557            LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1558        );
1559
1560        let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1561
1562        buffer.replace_content(0, new_content);
1563
1564        assert_matches!(
1565            buffer.last(),
1566            Some((expected_transaction_id, LatestEventValue::LocalCannotBeSent(local_event))) => {
1567                assert_eq!(expected_transaction_id, &transaction_id);
1568                assert_matches!(
1569                    local_event.content.deserialize().unwrap(),
1570                    AnyMessageLikeEventContent::RoomMessage(content) => {
1571                        assert_eq!(content.body(), "comté");
1572                    }
1573                );
1574            }
1575        );
1576    }
1577
1578    #[test]
1579    fn test_remove() {
1580        let mut buffer = BufferOfValuesForLocalEvents::new();
1581
1582        buffer.push(
1583            OwnedTransactionId::from("txnid"),
1584            LatestEventValue::LocalIsSending(local_room_message("gryuère")),
1585        );
1586
1587        assert!(buffer.last().is_some());
1588
1589        buffer.remove(0);
1590
1591        assert!(buffer.last().is_none());
1592    }
1593
1594    #[test]
1595    fn test_mark_cannot_be_sent_from() {
1596        let mut buffer = BufferOfValuesForLocalEvents::new();
1597        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1598        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1599        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1600
1601        buffer.push(
1602            transaction_id_0,
1603            LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1604        );
1605        buffer.push(
1606            transaction_id_1.clone(),
1607            LatestEventValue::LocalIsSending(local_room_message("brigand")),
1608        );
1609        buffer.push(
1610            transaction_id_2,
1611            LatestEventValue::LocalIsSending(local_room_message("raclette")),
1612        );
1613
1614        buffer.mark_cannot_be_sent_from(&transaction_id_1);
1615
1616        assert_eq!(buffer.buffer.len(), 3);
1617        assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
1618        assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1619        assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalCannotBeSent(_));
1620    }
1621
1622    #[test]
1623    fn test_mark_is_sending_from() {
1624        let mut buffer = BufferOfValuesForLocalEvents::new();
1625        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1626        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1627        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1628
1629        buffer.push(
1630            transaction_id_0,
1631            LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1632        );
1633        buffer.push(
1634            transaction_id_1.clone(),
1635            LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1636        );
1637        buffer.push(
1638            transaction_id_2,
1639            LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1640        );
1641
1642        buffer.mark_is_sending_from(&transaction_id_1);
1643
1644        assert_eq!(buffer.buffer.len(), 3);
1645        assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1646        assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
1647        assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1648    }
1649
1650    #[test]
1651    fn test_mark_is_sending_after() {
1652        let mut buffer = BufferOfValuesForLocalEvents::new();
1653        let transaction_id_0 = OwnedTransactionId::from("txnid0");
1654        let transaction_id_1 = OwnedTransactionId::from("txnid1");
1655        let transaction_id_2 = OwnedTransactionId::from("txnid2");
1656
1657        buffer.push(
1658            transaction_id_0,
1659            LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1660        );
1661        buffer.push(
1662            transaction_id_1.clone(),
1663            LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1664        );
1665        buffer.push(
1666            transaction_id_2,
1667            LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1668        );
1669
1670        buffer.mark_is_sending_after(&transaction_id_1);
1671
1672        assert_eq!(buffer.buffer.len(), 3);
1673        assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1674        assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1675        assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1676    }
1677}
1678
1679#[cfg(all(not(target_family = "wasm"), test))]
1680mod builder_tests {
1681    use std::sync::Arc;
1682
1683    use assert_matches::assert_matches;
1684    use matrix_sdk_base::{
1685        RoomState,
1686        deserialized_responses::TimelineEventKind,
1687        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1688        store::{ChildTransactionId, SerializableEventContent},
1689    };
1690    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1691    use ruma::{
1692        EventId, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedTransactionId, event_id,
1693        events::{
1694            AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
1695            SyncMessageLikeEvent, reaction::ReactionEventContent, relation::Annotation,
1696            room::message::RoomMessageEventContent,
1697        },
1698        owned_event_id, owned_room_id, room_id,
1699        serde::Raw,
1700        user_id,
1701    };
1702    use serde_json::json;
1703
1704    use super::{
1705        super::RemoteLatestEventValue, BufferOfValuesForLocalEvents, Builder, LatestEventValue,
1706        RoomEventCache, RoomSendQueueUpdate,
1707    };
1708    use crate::{
1709        Client, Error,
1710        send_queue::{
1711            AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle,
1712            SendReactionHandle, SendRedactionHandle,
1713        },
1714        test_utils::mocks::MatrixMockServer,
1715    };
1716
1717    macro_rules! assert_remote_value_matches_room_message_with_body {
1718        ( $latest_event_value:expr => with body = $body:expr ) => {{
1719            let latest_event_value = $latest_event_value;
1720
1721            assert_matches!(
1722                latest_event_value,
1723                Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event: ref event }, .. })) => {
1724                    assert_matches!(
1725                        event.deserialize().unwrap(),
1726                        AnySyncTimelineEvent::MessageLike(
1727                            AnySyncMessageLikeEvent::RoomMessage(
1728                                SyncMessageLikeEvent::Original(message_content)
1729                            )
1730                        ) => {
1731                            assert_eq!(message_content.content.body(), $body);
1732                        }
1733                    );
1734
1735                    latest_event_value.unwrap()
1736                }
1737            )
1738        }};
1739    }
1740
1741    macro_rules! assert_local_value_matches_room_message_with_body {
1742        ( $latest_event_value:expr, $pattern:path => with body = $body:expr ) => {{
1743            let latest_event_value = $latest_event_value;
1744
1745            assert_matches!(
1746                latest_event_value,
1747                Some( $pattern (ref local_event)) => {
1748                    assert_matches!(
1749                        local_event.content.deserialize().unwrap(),
1750                        AnyMessageLikeEventContent::RoomMessage(message_content) => {
1751                            assert_eq!(message_content.body(), $body);
1752                        }
1753                    );
1754
1755                    latest_event_value.unwrap()
1756                }
1757            )
1758        }};
1759
1760        ( $latest_event_value:expr, $pattern:path {
1761            $local_value:ident with body = $body:expr
1762            $( , $field:ident => $more:block )*
1763        } ) => {{
1764            let latest_event_value = $latest_event_value;
1765
1766            assert_matches!(
1767                latest_event_value,
1768                Some( $pattern { ref $local_value, $( ref $field, )* .. }) => {
1769                    assert_matches!(
1770                        $local_value .content.deserialize().unwrap(),
1771                        AnyMessageLikeEventContent::RoomMessage(message_content) => {
1772                            assert_eq!(message_content.body(), $body);
1773
1774                            $({
1775                                let $field = $field;
1776                                $more
1777                            })*
1778                        }
1779                    );
1780
1781                    latest_event_value.unwrap()
1782                }
1783            )
1784        }};
1785    }
1786
1787    fn remote_room_message(event_id: &EventId, body: &str) -> RemoteLatestEventValue {
1788        RemoteLatestEventValue::from_plaintext(
1789            Raw::from_json_string(
1790                json!({
1791                    "content": RoomMessageEventContent::text_plain(body),
1792                    "type": "m.room.message",
1793                    "event_id": event_id,
1794                    "origin_server_ts": 42,
1795                    "sender": "@mnt_io:matrix.org",
1796                })
1797                .to_string(),
1798            )
1799            .unwrap(),
1800        )
1801    }
1802
1803    #[async_test]
1804    async fn test_remote_is_scanning_event_backwards_from_event_cache() {
1805        let room_id = room_id!("!r0");
1806        let user_id = user_id!("@mnt_io:matrix.org");
1807        let event_factory = EventFactory::new().sender(user_id).room(room_id);
1808        let event_id_0 = event_id!("$ev0");
1809        let event_id_1 = event_id!("$ev1");
1810        let event_id_2 = event_id!("$ev2");
1811
1812        let server = MatrixMockServer::new().await;
1813        let client = server.client_builder().build().await;
1814
1815        // Prelude.
1816        {
1817            // Create the room.
1818            client.base_client().get_or_create_room(room_id, RoomState::Joined);
1819
1820            // Initialise the event cache store.
1821            client
1822                .event_cache_store()
1823                .lock()
1824                .await
1825                .expect("Could not acquire the event cache lock")
1826                .as_clean()
1827                .expect("Could not acquire a clean event cache lock")
1828                .handle_linked_chunk_updates(
1829                    LinkedChunkId::Room(room_id),
1830                    vec![
1831                        Update::NewItemsChunk {
1832                            previous: None,
1833                            new: ChunkIdentifier::new(0),
1834                            next: None,
1835                        },
1836                        Update::PushItems {
1837                            at: Position::new(ChunkIdentifier::new(0), 0),
1838                            items: vec![
1839                                // a latest event candidate
1840                                event_factory.text_msg("hello").event_id(event_id_0).into(),
1841                                // a latest event candidate
1842                                event_factory.text_msg("world").event_id(event_id_1).into(),
1843                                // not a latest event candidate
1844                                event_factory
1845                                    .room_topic("new room topic")
1846                                    .event_id(event_id_2)
1847                                    .into(),
1848                            ],
1849                        },
1850                    ],
1851                )
1852                .await
1853                .unwrap();
1854        }
1855
1856        let event_cache = client.event_cache();
1857        event_cache.subscribe().unwrap();
1858
1859        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1860
1861        assert_remote_value_matches_room_message_with_body!(
1862            // We get `event_id_1` because `event_id_2` isn't a candidate,
1863            // and `event_id_0` hasn't been read yet (because events are read
1864            // backwards).
1865            Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None).await => with body = "world"
1866        );
1867    }
1868
1869    #[async_test]
1870    async fn test_remote_without_a_candidate() {
1871        let room_id = room_id!("!r0");
1872
1873        let server = MatrixMockServer::new().await;
1874        let client = server.client_builder().build().await;
1875        let user_id = client.user_id().unwrap();
1876        let event_factory = EventFactory::new().sender(user_id).room(room_id);
1877
1878        let room = client.base_client().get_or_create_room(room_id, RoomState::Joined);
1879
1880        // Insert an non-suitable candidate.
1881        client
1882            .event_cache_store()
1883            .lock()
1884            .await
1885            .expect("Could not acquire the event cache lock")
1886            .as_clean()
1887            .expect("Could not acquire a clean event cache lock")
1888            .handle_linked_chunk_updates(
1889                LinkedChunkId::Room(room_id),
1890                vec![
1891                    Update::NewItemsChunk {
1892                        previous: None,
1893                        new: ChunkIdentifier::new(0),
1894                        next: None,
1895                    },
1896                    Update::PushItems {
1897                        at: Position::new(ChunkIdentifier::new(0), 0),
1898                        items: vec![event_factory.room_topic("new room topic").into()],
1899                    },
1900                ],
1901            )
1902            .await
1903            .unwrap();
1904
1905        let event_cache = client.event_cache();
1906        event_cache.subscribe().unwrap();
1907
1908        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1909
1910        // Check initial state.
1911        let current_value = {
1912            let value = room.latest_event();
1913
1914            assert_matches!(value, LatestEventValue::None);
1915
1916            value
1917        };
1918
1919        // Compute a new remote value: not able to find a relevant candidate.
1920        //
1921        // No candidate is found, so it's just `None` here.
1922        assert_matches!(
1923            Builder::new_remote(&room_event_cache, current_value, user_id, None,).await,
1924            None
1925        );
1926    }
1927
1928    #[async_test]
1929    async fn test_remote_with_a_candidate() {
1930        let room_id = room_id!("!r0");
1931
1932        let server = MatrixMockServer::new().await;
1933        let client = server.client_builder().build().await;
1934        let user_id = client.user_id().unwrap();
1935        let event_factory = EventFactory::new().sender(user_id).room(room_id);
1936
1937        let room = client.base_client().get_or_create_room(room_id, RoomState::Joined);
1938
1939        // Insert a suitable candidate.
1940        client
1941            .event_cache_store()
1942            .lock()
1943            .await
1944            .expect("Could not acquire the event cache lock")
1945            .as_clean()
1946            .expect("Could not acquire a clean event cache lock")
1947            .handle_linked_chunk_updates(
1948                LinkedChunkId::Room(room_id),
1949                vec![
1950                    Update::NewItemsChunk {
1951                        previous: None,
1952                        new: ChunkIdentifier::new(0),
1953                        next: None,
1954                    },
1955                    Update::PushItems {
1956                        at: Position::new(ChunkIdentifier::new(0), 0),
1957                        items: vec![
1958                            event_factory.text_msg("hello").event_id(event_id!("$ev0")).into(),
1959                        ],
1960                    },
1961                ],
1962            )
1963            .await
1964            .unwrap();
1965
1966        let event_cache = client.event_cache();
1967        event_cache.subscribe().unwrap();
1968
1969        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1970
1971        // Check initial state.
1972        let current_value = {
1973            let value = room.latest_event();
1974
1975            assert_matches!(value, LatestEventValue::None);
1976
1977            value
1978        };
1979
1980        // Compute a new remote value: will be able to find a relevant
1981        // candidate.
1982        //
1983        // A candidate is found, so it's a `Some(LatestEventValue::Remote)`
1984        // that is returned! Let's check the event.
1985        assert_remote_value_matches_room_message_with_body!(
1986            Builder::new_remote(&room_event_cache, current_value, user_id, None).await => with body = "hello"
1987        );
1988    }
1989
1990    #[async_test]
1991    async fn test_remote_without_a_candidate_but_with_an_existing_latest_event_value() {
1992        let room_id = room_id!("!r0");
1993
1994        let server = MatrixMockServer::new().await;
1995        let client = server.client_builder().build().await;
1996        let user_id = client.user_id().unwrap();
1997        let event_factory = EventFactory::new().sender(user_id).room(room_id);
1998
1999        client.base_client().get_or_create_room(room_id, RoomState::Joined);
2000
2001        // Insert a non-suitable candidate.
2002        client
2003            .event_cache_store()
2004            .lock()
2005            .await
2006            .expect("Could not acquire the event cache lock")
2007            .as_clean()
2008            .expect("Could not acquire a clean event cache lock")
2009            .handle_linked_chunk_updates(
2010                LinkedChunkId::Room(room_id),
2011                vec![
2012                    Update::NewItemsChunk {
2013                        previous: None,
2014                        new: ChunkIdentifier::new(1),
2015                        next: None,
2016                    },
2017                    Update::PushItems {
2018                        at: Position::new(ChunkIdentifier::new(1), 0),
2019                        items: vec![event_factory.room_topic("new room topic").into()],
2020                    },
2021                ],
2022            )
2023            .await
2024            .unwrap();
2025
2026        let event_cache = client.event_cache();
2027        event_cache.subscribe().unwrap();
2028
2029        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2030
2031        // Initial state.
2032        let current_value =
2033            LatestEventValue::Remote(remote_room_message(event_id!("$ev0"), "hello"));
2034
2035        // Compute a new remote value: with no candidate.
2036        //
2037        // No candidate is found, so it's just a `None` here.
2038        assert_matches!(
2039            Builder::new_remote(&room_event_cache, current_value, user_id, None).await,
2040            None
2041        );
2042    }
2043
2044    #[async_test]
2045    async fn test_remote_without_a_candidate_but_with_an_erasable_existing_latest_event_value() {
2046        let room_id = room_id!("!r0");
2047        let event_id = event_id!("$ev0");
2048
2049        let server = MatrixMockServer::new().await;
2050        let client = server.client_builder().build().await;
2051        let user_id = client.user_id().unwrap();
2052        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2053
2054        client.base_client().get_or_create_room(room_id, RoomState::Joined);
2055
2056        // Insert a non-suitable candidate.
2057        client
2058            .event_cache_store()
2059            .lock()
2060            .await
2061            .expect("Could not acquire the event cache lock")
2062            .as_clean()
2063            .expect("Could not acquire a clean event cache lock")
2064            .handle_linked_chunk_updates(
2065                LinkedChunkId::Room(room_id),
2066                vec![
2067                    Update::NewItemsChunk {
2068                        previous: None,
2069                        new: ChunkIdentifier::new(1),
2070                        next: None,
2071                    },
2072                    Update::PushItems {
2073                        at: Position::new(ChunkIdentifier::new(1), 0),
2074                        items: vec![event_factory.redaction(event_id).into()],
2075                    },
2076                ],
2077            )
2078            .await
2079            .unwrap();
2080
2081        let event_cache = client.event_cache();
2082        event_cache.subscribe().unwrap();
2083
2084        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2085
2086        // Initial state.
2087        let current_value = LatestEventValue::Remote(remote_room_message(event_id, "hello"));
2088
2089        // Compute a new remote value: with no candidate, but it's a `m.room.redaction`
2090        // that erases `current_value`!
2091        //
2092        // No candidate is found, so it SHOULD BE a `None`, but since we found a
2093        // `m.room.redaction` targeting our `current_value`, it MUST BE a
2094        // `Some(LatestEventValue::None)` to erase it.
2095        assert_matches!(
2096            Builder::new_remote(&room_event_cache, current_value, user_id, None).await,
2097            Some(LatestEventValue::None)
2098        );
2099    }
2100
2101    #[async_test]
2102    async fn test_remote_with_a_candidate_and_an_erasable_existing_latest_event_value() {
2103        let room_id = room_id!("!r0");
2104        let event_id = event_id!("$ev0");
2105
2106        let server = MatrixMockServer::new().await;
2107        let client = server.client_builder().build().await;
2108        let user_id = client.user_id().unwrap();
2109        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2110
2111        client.base_client().get_or_create_room(room_id, RoomState::Joined);
2112
2113        // Insert a non-suitable candidate.
2114        client
2115            .event_cache_store()
2116            .lock()
2117            .await
2118            .expect("Could not acquire the event cache lock")
2119            .as_clean()
2120            .expect("Could not acquire a clean event cache lock")
2121            .handle_linked_chunk_updates(
2122                LinkedChunkId::Room(room_id),
2123                vec![
2124                    Update::NewItemsChunk {
2125                        previous: None,
2126                        new: ChunkIdentifier::new(1),
2127                        next: None,
2128                    },
2129                    Update::PushItems {
2130                        at: Position::new(ChunkIdentifier::new(1), 0),
2131                        items: vec![
2132                            event_factory.redaction(event_id).into(),
2133                            event_factory.text_msg("world").into(),
2134                        ],
2135                    },
2136                ],
2137            )
2138            .await
2139            .unwrap();
2140
2141        let event_cache = client.event_cache();
2142        event_cache.subscribe().unwrap();
2143
2144        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2145
2146        // Initial state.
2147        let current_value = LatestEventValue::Remote(remote_room_message(event_id, "hello"));
2148
2149        // Compute a new remote value: with a candidate, and a `m.room.redaction`
2150        // that erases `current_value`!
2151        //
2152        // A candidate is found, so it MUST BE a
2153        // `Some(LatestEventValue::Remote(_))`, whatever the `current_value`
2154        // is.
2155        assert_remote_value_matches_room_message_with_body!(
2156            Builder::new_remote(&room_event_cache, current_value, user_id, None).await => with body = "world"
2157        );
2158    }
2159
2160    #[async_test]
2161    async fn test_remote_when_room_has_been_emptied() {
2162        let room_id = room_id!("!r0");
2163
2164        let server = MatrixMockServer::new().await;
2165        let client = server.client_builder().build().await;
2166        let user_id = client.user_id().unwrap();
2167        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2168
2169        let room = client.base_client().get_or_create_room(room_id, RoomState::Joined);
2170
2171        // Insert a suitable candidate.
2172        client
2173            .event_cache_store()
2174            .lock()
2175            .await
2176            .expect("Could not acquire the event cache lock")
2177            .as_clean()
2178            .expect("Could not acquire a clean event cache lock")
2179            .handle_linked_chunk_updates(
2180                LinkedChunkId::Room(room_id),
2181                vec![
2182                    Update::NewItemsChunk {
2183                        previous: None,
2184                        new: ChunkIdentifier::new(0),
2185                        next: None,
2186                    },
2187                    Update::PushItems {
2188                        at: Position::new(ChunkIdentifier::new(0), 0),
2189                        items: vec![
2190                            event_factory.text_msg("hello").event_id(event_id!("$ev0")).into(),
2191                        ],
2192                    },
2193                ],
2194            )
2195            .await
2196            .unwrap();
2197
2198        let event_cache = client.event_cache();
2199        event_cache.subscribe().unwrap();
2200
2201        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2202
2203        // Check initial state.
2204        let current_value = {
2205            let value = room.latest_event();
2206
2207            assert_matches!(value, LatestEventValue::None);
2208
2209            value
2210        };
2211
2212        // Compute a new remote value: will be able to find a relevant
2213        // candidate.
2214        let current_value = assert_remote_value_matches_room_message_with_body!(
2215            Builder::new_remote(&room_event_cache, current_value, user_id, None).await => with body = "hello"
2216        );
2217
2218        // Now, let's clear all rooms.
2219        event_cache.clear_all_rooms().await.unwrap();
2220
2221        // The latest event has been erased!
2222        assert_matches!(
2223            Builder::new_remote(&room_event_cache, current_value, user_id, None).await,
2224            Some(LatestEventValue::None)
2225        );
2226    }
2227
2228    #[async_test]
2229    async fn test_remote_edit() {
2230        let room_id = room_id!("!r0");
2231        let user_id = user_id!("@mnt_io:matrix.org");
2232        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2233        let event_id_0 = event_id!("$ev0");
2234        let event_id_1 = event_id!("$ev1");
2235
2236        let server = MatrixMockServer::new().await;
2237        let client = server.client_builder().build().await;
2238
2239        // Prelude.
2240        {
2241            // Create the room.
2242            client.base_client().get_or_create_room(room_id, RoomState::Joined);
2243
2244            // Initialise the event cache store.
2245            client
2246                .event_cache_store()
2247                .lock()
2248                .await
2249                .expect("Could not acquire the event cache lock")
2250                .as_clean()
2251                .expect("Could not acquire a clean event cache lock")
2252                .handle_linked_chunk_updates(
2253                    LinkedChunkId::Room(room_id),
2254                    vec![
2255                        Update::NewItemsChunk {
2256                            previous: None,
2257                            new: ChunkIdentifier::new(0),
2258                            next: None,
2259                        },
2260                        Update::PushItems {
2261                            at: Position::new(ChunkIdentifier::new(0), 0),
2262                            items: vec![
2263                                // a text message
2264                                event_factory.text_msg("hello").event_id(event_id_0).into(),
2265                                // a replacement of the previous message
2266                                event_factory
2267                                    .text_msg("* goodbye")
2268                                    .event_id(event_id_1)
2269                                    .edit(
2270                                        event_id_0,
2271                                        RoomMessageEventContent::text_plain("goodbye").into(),
2272                                    )
2273                                    .into(),
2274                            ],
2275                        },
2276                    ],
2277                )
2278                .await
2279                .unwrap();
2280        }
2281
2282        let event_cache = client.event_cache();
2283        event_cache.subscribe().unwrap();
2284
2285        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2286
2287        assert_remote_value_matches_room_message_with_body!(
2288            // We get `event_id_1` because it edits `event_id_0` which is a candidate.
2289            Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None).await => with body = "* goodbye"
2290        );
2291    }
2292
2293    #[async_test]
2294    async fn test_remote_edit_invalid_edit() {
2295        let room_id = room_id!("!r0");
2296        let user_id = user_id!("@mnt_io:matrix.org");
2297        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2298        let event_id_0 = event_id!("$ev0");
2299        let event_id_1 = event_id!("$ev1");
2300
2301        let server = MatrixMockServer::new().await;
2302        let client = server.client_builder().build().await;
2303
2304        // Prelude.
2305        {
2306            // Create the room.
2307            client.base_client().get_or_create_room(room_id, RoomState::Joined);
2308
2309            // Initialise the event cache store.
2310            client
2311                .event_cache_store()
2312                .lock()
2313                .await
2314                .expect("Could not acquire the event cache lock")
2315                .as_clean()
2316                .expect("Could not acquire a clean event cache lock")
2317                .handle_linked_chunk_updates(
2318                    LinkedChunkId::Room(room_id),
2319                    vec![
2320                        Update::NewItemsChunk {
2321                            previous: None,
2322                            new: ChunkIdentifier::new(0),
2323                            next: None,
2324                        },
2325                        Update::PushItems {
2326                            at: Position::new(ChunkIdentifier::new(0), 0),
2327                            items: vec![
2328                                // a text message
2329                                event_factory
2330                                    .text_msg("hello")
2331                                    .sender(user_id!("@alice:example.org"))
2332                                    .event_id(event_id_0)
2333                                    .into(),
2334                                // a replacement of the previous message
2335                                event_factory
2336                                    .text_msg("* goodbye")
2337                                    .event_id(event_id_1)
2338                                    .sender(user_id!("@malory:example.org"))
2339                                    .edit(
2340                                        event_id_0,
2341                                        RoomMessageEventContent::text_plain("goodbye").into(),
2342                                    )
2343                                    .into(),
2344                            ],
2345                        },
2346                    ],
2347                )
2348                .await
2349                .unwrap();
2350        }
2351
2352        let event_cache = client.event_cache();
2353        event_cache.subscribe().unwrap();
2354
2355        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2356
2357        assert_remote_value_matches_room_message_with_body!(
2358            // We get `event_id_0` because the edit `event_id_1` is invalid.
2359            Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None).await => with body = "hello"
2360        );
2361    }
2362
2363    #[async_test]
2364    async fn test_remote_double_edit() {
2365        let room_id = room_id!("!r0");
2366        let user_id = user_id!("@mnt_io:matrix.org");
2367        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2368        let event_id_0 = event_id!("$ev0");
2369        let event_id_1 = event_id!("$ev1");
2370        let event_id_2 = event_id!("$ev2");
2371
2372        let server = MatrixMockServer::new().await;
2373        let client = server.client_builder().build().await;
2374
2375        // Prelude.
2376        {
2377            // Create the room.
2378            client.base_client().get_or_create_room(room_id, RoomState::Joined);
2379
2380            // Initialise the event cache store.
2381            client
2382                .event_cache_store()
2383                .lock()
2384                .await
2385                .expect("Could not acquire the event cache lock")
2386                .as_clean()
2387                .expect("Could not acquire a clean event cache lock")
2388                .handle_linked_chunk_updates(
2389                    LinkedChunkId::Room(room_id),
2390                    vec![
2391                        Update::NewItemsChunk {
2392                            previous: None,
2393                            new: ChunkIdentifier::new(0),
2394                            next: None,
2395                        },
2396                        Update::PushItems {
2397                            at: Position::new(ChunkIdentifier::new(0), 0),
2398                            items: vec![
2399                                // a text message
2400                                event_factory.text_msg("hello").event_id(event_id_0).into(),
2401                                // a replacement of the previous message
2402                                event_factory
2403                                    .text_msg("* goodbye")
2404                                    .event_id(event_id_1)
2405                                    .edit(
2406                                        event_id_0,
2407                                        RoomMessageEventContent::text_plain("goodbye").into(),
2408                                    )
2409                                    .into(),
2410                                // another replacement of the first message
2411                                event_factory
2412                                    .text_msg("* err, hello")
2413                                    .event_id(event_id_2)
2414                                    .edit(
2415                                        event_id_0,
2416                                        RoomMessageEventContent::text_plain("err, hello").into(),
2417                                    )
2418                                    .into(),
2419                            ],
2420                        },
2421                    ],
2422                )
2423                .await
2424                .unwrap();
2425        }
2426
2427        let event_cache = client.event_cache();
2428        event_cache.subscribe().unwrap();
2429
2430        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2431
2432        assert_remote_value_matches_room_message_with_body!(
2433            // We get `event_id_1` because `event_id_2` isn't a candidate,
2434            // and `event_id_0` hasn't been read yet (because events are read
2435            // backwards).
2436            Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None).await => with body = "* err, hello"
2437        );
2438    }
2439
2440    #[async_test]
2441    async fn test_remote_latest_edit_targets_older_event() {
2442        let room_id = room_id!("!r0");
2443        let user_id = user_id!("@mnt_io:matrix.org");
2444        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2445        let event_id_0 = event_id!("$ev0");
2446        let event_id_1 = event_id!("$ev1");
2447        let event_id_2 = event_id!("$ev2");
2448        let event_id_3 = event_id!("$ev3");
2449        let event_id_4 = event_id!("$ev4");
2450
2451        let server = MatrixMockServer::new().await;
2452        let client = server.client_builder().build().await;
2453
2454        // Prelude.
2455        {
2456            // Create the room.
2457            client.base_client().get_or_create_room(room_id, RoomState::Joined);
2458
2459            // Initialise the event cache store.
2460            client
2461                .event_cache_store()
2462                .lock()
2463                .await
2464                .expect("Could not acquire the event cache lock")
2465                .as_clean()
2466                .expect("Could not acquire a clean event cache lock")
2467                .handle_linked_chunk_updates(
2468                    LinkedChunkId::Room(room_id),
2469                    vec![
2470                        Update::NewItemsChunk {
2471                            previous: None,
2472                            new: ChunkIdentifier::new(0),
2473                            next: None,
2474                        },
2475                        Update::PushItems {
2476                            at: Position::new(ChunkIdentifier::new(0), 0),
2477                            items: vec![
2478                                // a text message
2479                                event_factory.text_msg("A").event_id(event_id_0).into(),
2480                                // another text message
2481                                event_factory.text_msg("B").event_id(event_id_1).into(),
2482                                // and a third text message
2483                                event_factory.text_msg("C").event_id(event_id_2).into(),
2484                                // a replacement of the last message
2485                                event_factory
2486                                    .text_msg("* D")
2487                                    .event_id(event_id_3)
2488                                    .edit(
2489                                        event_id_2,
2490                                        RoomMessageEventContent::text_plain("D").into(),
2491                                    )
2492                                    .into(),
2493                                // another replacement but this time for the first message
2494                                event_factory
2495                                    .text_msg("* X")
2496                                    .event_id(event_id_4)
2497                                    .edit(
2498                                        event_id_0,
2499                                        RoomMessageEventContent::text_plain("X").into(),
2500                                    )
2501                                    .into(),
2502                            ],
2503                        },
2504                    ],
2505                )
2506                .await
2507                .unwrap();
2508        }
2509
2510        let event_cache = client.event_cache();
2511        event_cache.subscribe().unwrap();
2512
2513        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2514
2515        assert_remote_value_matches_room_message_with_body!(
2516            // We get `event_id_3` because `event_id_4` edits an older event.
2517            Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None).await => with body = "* D"
2518        );
2519    }
2520
2521    #[async_test]
2522    async fn test_remote_latest_edit_preceded_by_unrelated_event() {
2523        let room_id = room_id!("!r0");
2524        let user_id = user_id!("@mnt_io:matrix.org");
2525        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2526        let event_id_0 = event_id!("$ev0");
2527        let event_id_1 = event_id!("$ev1");
2528        let event_id_2 = event_id!("$ev2");
2529
2530        let server = MatrixMockServer::new().await;
2531        let client = server.client_builder().build().await;
2532
2533        // Prelude.
2534        {
2535            // Create the room.
2536            client.base_client().get_or_create_room(room_id, RoomState::Joined);
2537
2538            // Initialise the event cache store.
2539            client
2540                .event_cache_store()
2541                .lock()
2542                .await
2543                .expect("Could not acquire the event cache lock")
2544                .as_clean()
2545                .expect("Could not acquire a clean event cache lock")
2546                .handle_linked_chunk_updates(
2547                    LinkedChunkId::Room(room_id),
2548                    vec![
2549                        Update::NewItemsChunk {
2550                            previous: None,
2551                            new: ChunkIdentifier::new(0),
2552                            next: None,
2553                        },
2554                        Update::PushItems {
2555                            at: Position::new(ChunkIdentifier::new(0), 0),
2556                            items: vec![
2557                                // a text message
2558                                event_factory.text_msg("A").event_id(event_id_0).into(),
2559                                // a replacement of another event
2560                                event_factory
2561                                    .text_msg("* B")
2562                                    .event_id(event_id_1)
2563                                    .edit(
2564                                        event_id_2,
2565                                        RoomMessageEventContent::text_plain("B").into(),
2566                                    )
2567                                    .into(),
2568                            ],
2569                        },
2570                    ],
2571                )
2572                .await
2573                .unwrap();
2574        }
2575
2576        let event_cache = client.event_cache();
2577        event_cache.subscribe().unwrap();
2578
2579        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2580
2581        assert_remote_value_matches_room_message_with_body!(
2582            // We get `event_id_0` because `event_id_1` edits an event that is unknown.
2583            Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None).await => with body = "A"
2584        );
2585    }
2586
2587    #[async_test]
2588    async fn test_remote_latest_edit_without_previous_events() {
2589        let room_id = room_id!("!r0");
2590        let user_id = user_id!("@mnt_io:matrix.org");
2591        let event_factory = EventFactory::new().sender(user_id).room(room_id);
2592        let event_id_0 = event_id!("$ev0");
2593        let event_id_1 = event_id!("$ev1");
2594
2595        let server = MatrixMockServer::new().await;
2596        let client = server.client_builder().build().await;
2597
2598        // Prelude.
2599        {
2600            // Create the room.
2601            client.base_client().get_or_create_room(room_id, RoomState::Joined);
2602
2603            // Initialise the event cache store.
2604            client
2605                .event_cache_store()
2606                .lock()
2607                .await
2608                .expect("Could not acquire the event cache lock")
2609                .as_clean()
2610                .expect("Could not acquire a clean event cache lock")
2611                .handle_linked_chunk_updates(
2612                    LinkedChunkId::Room(room_id),
2613                    vec![
2614                        Update::NewItemsChunk {
2615                            previous: None,
2616                            new: ChunkIdentifier::new(0),
2617                            next: None,
2618                        },
2619                        Update::PushItems {
2620                            at: Position::new(ChunkIdentifier::new(0), 0),
2621                            items: vec![
2622                                // a replacement of an unknown event
2623                                event_factory
2624                                    .text_msg("* B")
2625                                    .event_id(event_id_0)
2626                                    .edit(
2627                                        event_id_1,
2628                                        RoomMessageEventContent::text_plain("B").into(),
2629                                    )
2630                                    .into(),
2631                            ],
2632                        },
2633                    ],
2634                )
2635                .await
2636                .unwrap();
2637        }
2638
2639        let event_cache = client.event_cache();
2640        event_cache.subscribe().unwrap();
2641
2642        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2643
2644        // We get no latest event value because no candidate event is known.
2645        assert!(
2646            Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None)
2647                .await
2648                .is_none()
2649        );
2650    }
2651
2652    async fn local_prelude() -> (Client, OwnedRoomId, RoomSendQueue, RoomEventCache) {
2653        let room_id = owned_room_id!("!r0");
2654
2655        let server = MatrixMockServer::new().await;
2656        let client = server.client_builder().build().await;
2657        client.base_client().get_or_create_room(&room_id, RoomState::Joined);
2658        let room = client.get_room(&room_id).unwrap();
2659        let user_id = client.user_id().unwrap();
2660        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
2661
2662        // Insert a non-suitable candidate.
2663        client
2664            .event_cache_store()
2665            .lock()
2666            .await
2667            .expect("Could not acquire the event cache lock")
2668            .as_clean()
2669            .expect("Could not acquire a clean event cache lock")
2670            .handle_linked_chunk_updates(
2671                LinkedChunkId::Room(&room_id),
2672                vec![
2673                    Update::NewItemsChunk {
2674                        previous: None,
2675                        new: ChunkIdentifier::new(0),
2676                        next: None,
2677                    },
2678                    Update::PushItems {
2679                        at: Position::new(ChunkIdentifier::new(0), 0),
2680                        items: vec![event_factory.room_topic("new room topic").into()],
2681                    },
2682                ],
2683            )
2684            .await
2685            .unwrap();
2686
2687        let event_cache = client.event_cache();
2688        event_cache.subscribe().unwrap();
2689
2690        let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
2691
2692        let send_queue = client.send_queue();
2693        let room_send_queue = send_queue.for_room(room);
2694
2695        (client, room_id, room_send_queue, room_event_cache)
2696    }
2697
2698    fn new_local_echo_content(
2699        room_send_queue: &RoomSendQueue,
2700        transaction_id: &OwnedTransactionId,
2701        body: &str,
2702    ) -> LocalEchoContent {
2703        LocalEchoContent::Event {
2704            serialized_event: SerializableEventContent::new(
2705                &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
2706            )
2707            .unwrap(),
2708            send_handle: SendHandle::new(
2709                room_send_queue.clone(),
2710                transaction_id.clone(),
2711                MilliSecondsSinceUnixEpoch::now(),
2712            ),
2713            send_error: None,
2714        }
2715    }
2716
2717    #[async_test]
2718    async fn test_local_new_local_event_with_content_of_kind_event() {
2719        let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2720        let user_id = client.user_id().unwrap();
2721
2722        let mut buffer = BufferOfValuesForLocalEvents::new();
2723
2724        // Receiving one `NewLocalEvent`.
2725        let previous_value = {
2726            let transaction_id = OwnedTransactionId::from("txnid0");
2727            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2728
2729            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
2730
2731            // The `LatestEventValue` matches the new local event.
2732            assert_local_value_matches_room_message_with_body!(
2733                Builder::new_local(&update, &mut buffer, &room_event_cache, LatestEventValue::None, user_id, None).await,
2734                LatestEventValue::LocalIsSending => with body = "A"
2735            )
2736        };
2737
2738        // Receiving another `NewLocalEvent`, ensuring it's pushed back in the buffer.
2739        {
2740            let transaction_id = OwnedTransactionId::from("txnid1");
2741            let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
2742
2743            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
2744
2745            // The `LatestEventValue` matches the new local event.
2746            assert_local_value_matches_room_message_with_body!(
2747                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
2748                LatestEventValue::LocalIsSending => with body = "B"
2749            );
2750        }
2751
2752        assert_eq!(buffer.buffer.len(), 2);
2753    }
2754
2755    #[async_test]
2756    async fn test_local_new_local_event_with_content_of_kind_event_when_previous_local_event_cannot_be_sent()
2757     {
2758        let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2759        let user_id = client.user_id().unwrap();
2760
2761        let mut buffer = BufferOfValuesForLocalEvents::new();
2762
2763        // Receiving one `NewLocalEvent`.
2764        let (transaction_id_0, previous_value) = {
2765            let transaction_id = OwnedTransactionId::from("txnid0");
2766            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2767
2768            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2769                transaction_id: transaction_id.clone(),
2770                content,
2771            });
2772
2773            // The `LatestEventValue` matches the new local event.
2774            let value = assert_local_value_matches_room_message_with_body!(
2775                Builder::new_local(&update, &mut buffer, &room_event_cache, LatestEventValue::None, user_id, None).await,
2776                LatestEventValue::LocalIsSending => with body = "A"
2777            );
2778
2779            (transaction_id, value)
2780        };
2781
2782        // Receiving a `SendError` targeting the first event. The
2783        // `LatestEventValue` must change to indicate it “cannot be sent”, because the
2784        // error is unrecoverable.
2785        let previous_value = {
2786            let update = RoomSendQueueUpdate::SendError {
2787                transaction_id: transaction_id_0.clone(),
2788                error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2789                is_recoverable: false,
2790            };
2791
2792            // The `LatestEventValue` has changed, it still matches the latest local
2793            // event but it's marked as “cannot be sent”.
2794            let value = assert_local_value_matches_room_message_with_body!(
2795                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
2796                LatestEventValue::LocalCannotBeSent => with body = "A"
2797            );
2798
2799            assert_eq!(buffer.buffer.len(), 1);
2800            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2801
2802            value
2803        };
2804
2805        // Receiving another `NewLocalEvent`, ensuring it's pushed back in the buffer,
2806        // and as a `LocalCannotBeSent` because the previous value is itself
2807        // `LocalCannotBeSent`.
2808        {
2809            let transaction_id = OwnedTransactionId::from("txnid1");
2810            let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
2811
2812            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
2813
2814            // The `LatestEventValue` matches the new local event.
2815            assert_local_value_matches_room_message_with_body!(
2816                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
2817                LatestEventValue::LocalCannotBeSent => with body = "B"
2818            );
2819        }
2820
2821        assert_eq!(buffer.buffer.len(), 2);
2822        assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2823        assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2824    }
2825
2826    #[async_test]
2827    async fn test_local_new_local_event_with_content_of_kind_react() {
2828        let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2829        let user_id = client.user_id().unwrap();
2830
2831        let mut buffer = BufferOfValuesForLocalEvents::new();
2832
2833        // Receiving one `NewLocalEvent` with content of kind event.
2834        let (transaction_id_0, previous_value) = {
2835            let transaction_id = OwnedTransactionId::from("txnid0");
2836            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2837
2838            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2839                transaction_id: transaction_id.clone(),
2840                content,
2841            });
2842
2843            // The `LatestEventValue` matches the new local event.
2844            let value = assert_local_value_matches_room_message_with_body!(
2845                Builder::new_local(&update, &mut buffer, &room_event_cache, LatestEventValue::None, user_id, None).await,
2846                LatestEventValue::LocalIsSending => with body = "A"
2847            );
2848
2849            (transaction_id, value)
2850        };
2851
2852        // Receiving one `NewLocalEvent` with content of kind react! This time, it is
2853        // ignored.
2854        {
2855            let transaction_id = OwnedTransactionId::from("txnid1");
2856            let content = LocalEchoContent::React {
2857                key: "<< 1".to_owned(),
2858                send_handle: SendReactionHandle::new(
2859                    room_send_queue.clone(),
2860                    ChildTransactionId::new(),
2861                ),
2862                applies_to: transaction_id_0,
2863            };
2864
2865            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
2866
2867            // The `LatestEventValue` matches the previous local event.
2868            assert_matches!(
2869                Builder::new_local(
2870                    &update,
2871                    &mut buffer,
2872                    &room_event_cache,
2873                    previous_value,
2874                    user_id,
2875                    None
2876                )
2877                .await,
2878                None
2879            )
2880        }
2881
2882        assert_eq!(buffer.buffer.len(), 1);
2883    }
2884
2885    #[async_test]
2886    async fn test_local_cancelled_local_event() {
2887        let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2888        let user_id = client.user_id().unwrap();
2889
2890        let mut buffer = BufferOfValuesForLocalEvents::new();
2891        let transaction_id_0 = OwnedTransactionId::from("txnid0");
2892        let transaction_id_1 = OwnedTransactionId::from("txnid1");
2893        let transaction_id_2 = OwnedTransactionId::from("txnid2");
2894
2895        // Receiving three `NewLocalEvent`s.
2896        let previous_value = {
2897            let mut value = None;
2898
2899            for (transaction_id, body) in
2900                [(&transaction_id_0, "A"), (&transaction_id_1, "B"), (&transaction_id_2, "C")]
2901            {
2902                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2903
2904                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2905                    transaction_id: transaction_id.clone(),
2906                    content,
2907                });
2908
2909                // The `LatestEventValue` matches the new local event.
2910                value = Some(assert_local_value_matches_room_message_with_body!(
2911                    Builder::new_local(&update, &mut buffer, &room_event_cache, value.unwrap_or_default(), user_id, None).await,
2912                    LatestEventValue::LocalIsSending => with body = body
2913                ));
2914            }
2915
2916            assert_eq!(buffer.buffer.len(), 3);
2917
2918            value.unwrap()
2919        };
2920
2921        // Receiving a `CancelledLocalEvent` targeting the second event. The
2922        // `LatestEventValue` must not change.
2923        let previous_value = {
2924            let update = RoomSendQueueUpdate::CancelledLocalEvent {
2925                transaction_id: transaction_id_1.clone(),
2926            };
2927
2928            // The `LatestEventValue` hasn't changed, it still matches the latest local
2929            // event.
2930            let value = assert_local_value_matches_room_message_with_body!(
2931                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
2932                LatestEventValue::LocalIsSending => with body = "C"
2933            );
2934
2935            assert_eq!(buffer.buffer.len(), 2);
2936
2937            value
2938        };
2939
2940        // Receiving a `CancelledLocalEvent` targeting the second (so the last) event.
2941        // The `LatestEventValue` must point to the first local event.
2942        let previous_value = {
2943            let update = RoomSendQueueUpdate::CancelledLocalEvent {
2944                transaction_id: transaction_id_2.clone(),
2945            };
2946
2947            // The `LatestEventValue` has changed, it matches the previous (so the first)
2948            // local event.
2949            let value = assert_local_value_matches_room_message_with_body!(
2950                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
2951                LatestEventValue::LocalIsSending => with body = "A"
2952            );
2953
2954            assert_eq!(buffer.buffer.len(), 1);
2955
2956            value
2957        };
2958
2959        // Receiving a `CancelledLocalEvent` targeting the first (so the last) event.
2960        // The `LatestEventValue` cannot be computed from the send queue and will
2961        // fallback to the event cache. The event cache is empty in this case, so we get
2962        // nothing.
2963        {
2964            let update =
2965                RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: transaction_id_0 };
2966
2967            // The `LatestEventValue` has changed, it's empty!
2968            assert_matches!(
2969                Builder::new_local(
2970                    &update,
2971                    &mut buffer,
2972                    &room_event_cache,
2973                    previous_value,
2974                    user_id,
2975                    None
2976                )
2977                .await,
2978                Some(LatestEventValue::None)
2979            );
2980
2981            assert!(buffer.buffer.is_empty());
2982        }
2983    }
2984
2985    #[async_test]
2986    async fn test_local_sent_event() {
2987        let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2988        let user_id = client.user_id().unwrap();
2989
2990        let mut buffer = BufferOfValuesForLocalEvents::new();
2991        let transaction_id_0 = OwnedTransactionId::from("txnid0");
2992        let transaction_id_1 = OwnedTransactionId::from("txnid1");
2993
2994        // Receiving two `NewLocalEvent`s.
2995        let previous_value = {
2996            let mut value = None;
2997
2998            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2999                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
3000
3001                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3002                    transaction_id: transaction_id.clone(),
3003                    content,
3004                });
3005
3006                // The `LatestEventValue` matches the new local event.
3007                value = Some(assert_local_value_matches_room_message_with_body!(
3008                    Builder::new_local(&update, &mut buffer, &room_event_cache, value.unwrap_or_default(), user_id, None).await,
3009                    LatestEventValue::LocalIsSending => with body = body
3010                ));
3011            }
3012
3013            assert_eq!(buffer.buffer.len(), 2);
3014
3015            value.unwrap()
3016        };
3017
3018        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
3019        // must not change.
3020        let previous_value = {
3021            let update = RoomSendQueueUpdate::SentEvent {
3022                transaction_id: transaction_id_0.clone(),
3023                event_id: owned_event_id!("$ev0"),
3024            };
3025
3026            // The `LatestEventValue` hasn't changed, it still matches the latest local
3027            // event.
3028            let value = assert_local_value_matches_room_message_with_body!(
3029                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3030                LatestEventValue::LocalIsSending => with body = "B"
3031            );
3032
3033            assert_eq!(buffer.buffer.len(), 1);
3034
3035            value
3036        };
3037
3038        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
3039        // hasn't changed, this is still this event, but the status has changed to
3040        // `LocalHasBeenSent`.
3041        {
3042            let expected_event_id = event_id!("$ev1");
3043            let update = RoomSendQueueUpdate::SentEvent {
3044                transaction_id: transaction_id_1,
3045                event_id: expected_event_id.to_owned(),
3046            };
3047
3048            // The `LatestEventValue` hasn't changed.
3049            assert_local_value_matches_room_message_with_body!(
3050                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3051                LatestEventValue::LocalHasBeenSent {
3052                    value with body = "B",
3053                    event_id => {
3054                        assert_eq!(event_id, expected_event_id);
3055                    }
3056                }
3057            );
3058
3059            assert!(buffer.buffer.is_empty());
3060        }
3061    }
3062
3063    #[async_test]
3064    async fn test_local_replaced_local_event() {
3065        let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3066        let user_id = client.user_id().unwrap();
3067
3068        let mut buffer = BufferOfValuesForLocalEvents::new();
3069        let transaction_id_0 = OwnedTransactionId::from("txnid0");
3070        let transaction_id_1 = OwnedTransactionId::from("txnid1");
3071
3072        // Receiving two `NewLocalEvent`s.
3073        let previous_value = {
3074            let mut value = None;
3075
3076            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
3077                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
3078
3079                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3080                    transaction_id: transaction_id.clone(),
3081                    content,
3082                });
3083
3084                // The `LatestEventValue` matches the new local event.
3085                value = Some(assert_local_value_matches_room_message_with_body!(
3086                    Builder::new_local(&update, &mut buffer, &room_event_cache, value.unwrap_or_default(), user_id, None).await,
3087                    LatestEventValue::LocalIsSending => with body = body
3088                ));
3089            }
3090
3091            assert_eq!(buffer.buffer.len(), 2);
3092
3093            value.unwrap()
3094        };
3095
3096        // Receiving a `ReplacedLocalEvent` targeting the first event. The
3097        // `LatestEventValue` must not change.
3098        let previous_value = {
3099            let transaction_id = &transaction_id_0;
3100            let LocalEchoContent::Event { serialized_event: new_content, .. } =
3101                new_local_echo_content(&room_send_queue, transaction_id, "A.")
3102            else {
3103                panic!("oopsy");
3104            };
3105
3106            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
3107                transaction_id: transaction_id.clone(),
3108                new_content,
3109            };
3110
3111            // The `LatestEventValue` hasn't changed, it still matches the latest local
3112            // event.
3113            let value = assert_local_value_matches_room_message_with_body!(
3114                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3115                LatestEventValue::LocalIsSending => with body = "B"
3116            );
3117
3118            assert_eq!(buffer.buffer.len(), 2);
3119
3120            value
3121        };
3122
3123        // Receiving a `ReplacedLocalEvent` targeting the second (so the last) event.
3124        // The `LatestEventValue` is changing.
3125        {
3126            let transaction_id = &transaction_id_1;
3127            let LocalEchoContent::Event { serialized_event: new_content, .. } =
3128                new_local_echo_content(&room_send_queue, transaction_id, "B.")
3129            else {
3130                panic!("oopsy");
3131            };
3132
3133            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
3134                transaction_id: transaction_id.clone(),
3135                new_content,
3136            };
3137
3138            // The `LatestEventValue` has changed, it still matches the latest local
3139            // event but with its new content.
3140            assert_local_value_matches_room_message_with_body!(
3141                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3142                LatestEventValue::LocalIsSending => with body = "B."
3143            );
3144
3145            assert_eq!(buffer.buffer.len(), 2);
3146        }
3147    }
3148
3149    #[async_test]
3150    async fn test_local_replaced_local_event_by_a_non_suitable_event() {
3151        let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3152        let user_id = client.user_id().unwrap();
3153
3154        let mut buffer = BufferOfValuesForLocalEvents::new();
3155        let transaction_id = OwnedTransactionId::from("txnid0");
3156
3157        // Receiving one `NewLocalEvent`.
3158        let previous_value = {
3159            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
3160
3161            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3162                transaction_id: transaction_id.clone(),
3163                content,
3164            });
3165
3166            // The `LatestEventValue` matches the new local event.
3167            let value = assert_local_value_matches_room_message_with_body!(
3168                Builder::new_local(&update, &mut buffer, &room_event_cache, LatestEventValue::None, user_id, None).await,
3169                LatestEventValue::LocalIsSending => with body = "A"
3170            );
3171
3172            assert_eq!(buffer.buffer.len(), 1);
3173
3174            value
3175        };
3176
3177        // Receiving a `ReplacedLocalEvent` targeting the first event. Sadly, the new
3178        // event cannot be mapped to a `LatestEventValue`! The first event is removed
3179        // from the buffer, and the `LatestEventValue` becomes `None` because there is
3180        // no other alternative.
3181        {
3182            let new_content = SerializableEventContent::new(&AnyMessageLikeEventContent::Reaction(
3183                ReactionEventContent::new(Annotation::new(
3184                    owned_event_id!("$ev0"),
3185                    "+1".to_owned(),
3186                )),
3187            ))
3188            .unwrap();
3189
3190            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
3191                transaction_id: transaction_id.clone(),
3192                new_content,
3193            };
3194
3195            // The `LatestEventValue` has changed!
3196            assert_matches!(
3197                Builder::new_local(
3198                    &update,
3199                    &mut buffer,
3200                    &room_event_cache,
3201                    previous_value,
3202                    user_id,
3203                    None
3204                )
3205                .await,
3206                None
3207            );
3208
3209            assert_eq!(buffer.buffer.len(), 0);
3210        }
3211    }
3212
3213    #[async_test]
3214    async fn test_local_replaced_local_event_twice() {
3215        let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3216        let user_id = client.user_id().unwrap();
3217
3218        let mut buffer = BufferOfValuesForLocalEvents::new();
3219        let transaction_id = OwnedTransactionId::from("txnid0");
3220
3221        // Receiving one `NewLocalEvent`.
3222        let previous_value = {
3223            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
3224
3225            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3226                transaction_id: transaction_id.clone(),
3227                content,
3228            });
3229
3230            // The `LatestEventValue` matches the new local event.
3231            let value = assert_local_value_matches_room_message_with_body!(
3232                Builder::new_local(&update, &mut buffer, &room_event_cache, LatestEventValue::None, user_id, None).await,
3233                LatestEventValue::LocalIsSending => with body = "A"
3234            );
3235
3236            assert_eq!(buffer.buffer.len(), 1);
3237
3238            value
3239        };
3240
3241        // Receiving a `ReplacedLocalEvent` targeting the event.
3242        // The `LatestEventValue` is changing.
3243        {
3244            let LocalEchoContent::Event { serialized_event: new_content, .. } =
3245                new_local_echo_content(&room_send_queue, &transaction_id, "B")
3246            else {
3247                panic!("oopsy");
3248            };
3249
3250            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
3251                transaction_id: transaction_id.clone(),
3252                new_content,
3253            };
3254
3255            // The `LatestEventValue` has changed, it still matches the latest local
3256            // event but with its new content.
3257            assert_local_value_matches_room_message_with_body!(
3258                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value.clone(), user_id, None).await,
3259                LatestEventValue::LocalIsSending => with body = "B"
3260            );
3261
3262            assert_eq!(buffer.buffer.len(), 1);
3263        }
3264
3265        // Receiving another `ReplacedLocalEvent` targeting the event.
3266        // The `LatestEventValue` is changing again.
3267        {
3268            let LocalEchoContent::Event { serialized_event: new_content, .. } =
3269                new_local_echo_content(&room_send_queue, &transaction_id, "C")
3270            else {
3271                panic!("oopsy");
3272            };
3273
3274            let update = RoomSendQueueUpdate::ReplacedLocalEvent {
3275                transaction_id: transaction_id.clone(),
3276                new_content,
3277            };
3278
3279            // The `LatestEventValue` has changed, it still matches the latest local
3280            // event but with its new content.
3281            assert_local_value_matches_room_message_with_body!(
3282                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3283                LatestEventValue::LocalIsSending => with body = "C"
3284            );
3285
3286            assert_eq!(buffer.buffer.len(), 1);
3287        }
3288    }
3289
3290    #[async_test]
3291    async fn test_local_send_unrecoverable_error() {
3292        let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3293        let user_id = client.user_id().unwrap();
3294
3295        let mut buffer = BufferOfValuesForLocalEvents::new();
3296        let transaction_id_0 = OwnedTransactionId::from("txnid0");
3297        let transaction_id_1 = OwnedTransactionId::from("txnid1");
3298
3299        // Receiving two `NewLocalEvent`s.
3300        let previous_value = {
3301            let mut value = None;
3302
3303            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
3304                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
3305
3306                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3307                    transaction_id: transaction_id.clone(),
3308                    content,
3309                });
3310
3311                // The `LatestEventValue` matches the new local event.
3312                value = Some(assert_local_value_matches_room_message_with_body!(
3313                    Builder::new_local(&update, &mut buffer, &room_event_cache, value.unwrap_or_default(), user_id, None).await,
3314                    LatestEventValue::LocalIsSending => with body = body
3315                ));
3316            }
3317
3318            assert_eq!(buffer.buffer.len(), 2);
3319
3320            value.unwrap()
3321        };
3322
3323        // Receiving a `SendError` targeting the first event. The
3324        // `LatestEventValue` must change to indicate it “cannot be sent”, because the
3325        // error is unrecoverable.
3326        let previous_value = {
3327            let update = RoomSendQueueUpdate::SendError {
3328                transaction_id: transaction_id_0.clone(),
3329                error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
3330                is_recoverable: false,
3331            };
3332
3333            // The `LatestEventValue` has changed, it still matches the latest local
3334            // event but it's marked as “cannot be sent”.
3335            let value = assert_local_value_matches_room_message_with_body!(
3336                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3337                LatestEventValue::LocalCannotBeSent => with body = "B"
3338            );
3339
3340            assert_eq!(buffer.buffer.len(), 2);
3341            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
3342            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
3343
3344            value
3345        };
3346
3347        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
3348        // must change: since an event has been sent, the following events are now
3349        // “is sending”.
3350        {
3351            let update = RoomSendQueueUpdate::SentEvent {
3352                transaction_id: transaction_id_0.clone(),
3353                event_id: owned_event_id!("$ev0"),
3354            };
3355
3356            // The `LatestEventValue` has changed, it still matches the latest local
3357            // event but it's “is sending”.
3358            assert_local_value_matches_room_message_with_body!(
3359                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3360                LatestEventValue::LocalIsSending => with body = "B"
3361            );
3362
3363            assert_eq!(buffer.buffer.len(), 1);
3364            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
3365        }
3366    }
3367
3368    #[async_test]
3369    async fn test_local_send_recoverable_error() {
3370        let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3371        let user_id = client.user_id().unwrap();
3372
3373        let mut buffer = BufferOfValuesForLocalEvents::new();
3374        let transaction_id_0 = OwnedTransactionId::from("txnid0");
3375        let transaction_id_1 = OwnedTransactionId::from("txnid1");
3376
3377        // Receiving two `NewLocalEvent`s.
3378        let previous_value = {
3379            let mut value = None;
3380
3381            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
3382                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
3383
3384                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3385                    transaction_id: transaction_id.clone(),
3386                    content,
3387                });
3388
3389                // The `LatestEventValue` matches the new local event.
3390                value = Some(assert_local_value_matches_room_message_with_body!(
3391                    Builder::new_local(&update, &mut buffer, &room_event_cache, value.unwrap_or_default(), user_id, None).await,
3392                    LatestEventValue::LocalIsSending => with body = body
3393                ));
3394            }
3395
3396            assert_eq!(buffer.buffer.len(), 2);
3397
3398            value.unwrap()
3399        };
3400
3401        // Receiving a `SendError` targeting the first event. The
3402        // `LatestEventValue` doesn't change, because the sending error is recoverable
3403        // (and thus will be retried soon, or as soon as network comes back).
3404        let previous_value = {
3405            let update = RoomSendQueueUpdate::SendError {
3406                transaction_id: transaction_id_0.clone(),
3407                error: Arc::new(Error::UnknownError("no more network".to_owned().into())),
3408                is_recoverable: true,
3409            };
3410
3411            // The `LatestEventValue` still matches the latest local event and should still
3412            // be marked as a local being sent.
3413            let value = assert_local_value_matches_room_message_with_body!(
3414                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3415                LatestEventValue::LocalIsSending => with body = "B"
3416            );
3417
3418            assert_eq!(buffer.buffer.len(), 2);
3419            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
3420            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
3421
3422            value
3423        };
3424
3425        // Receiving a `SentEvent` targeting the first event. The `LatestEventValue`
3426        // must change: since an event has been sent, the following events are now
3427        // “is sending”.
3428        {
3429            let update = RoomSendQueueUpdate::SentEvent {
3430                transaction_id: transaction_id_0.clone(),
3431                event_id: owned_event_id!("$ev0"),
3432            };
3433
3434            // The `LatestEventValue` has changed, it still matches the latest local
3435            // event but it's “is sending”.
3436            assert_local_value_matches_room_message_with_body!(
3437                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3438                LatestEventValue::LocalIsSending => with body = "B"
3439            );
3440
3441            assert_eq!(buffer.buffer.len(), 1);
3442            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
3443        }
3444    }
3445
3446    #[async_test]
3447    async fn test_local_retry_event() {
3448        let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3449        let user_id = client.user_id().unwrap();
3450
3451        let mut buffer = BufferOfValuesForLocalEvents::new();
3452        let transaction_id_0 = OwnedTransactionId::from("txnid0");
3453        let transaction_id_1 = OwnedTransactionId::from("txnid1");
3454
3455        // Receiving two `NewLocalEvent`s.
3456        let previous_value = {
3457            let mut value = None;
3458
3459            for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
3460                let content = new_local_echo_content(&room_send_queue, transaction_id, body);
3461
3462                let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3463                    transaction_id: transaction_id.clone(),
3464                    content,
3465                });
3466
3467                // The `LatestEventValue` matches the new local event.
3468                value = Some(assert_local_value_matches_room_message_with_body!(
3469                    Builder::new_local(&update, &mut buffer, &room_event_cache, value.unwrap_or_default(), user_id, None).await,
3470                    LatestEventValue::LocalIsSending => with body = body
3471                ));
3472            }
3473
3474            assert_eq!(buffer.buffer.len(), 2);
3475
3476            value.unwrap()
3477        };
3478
3479        // Receiving a `SendError` targeting the first event. The
3480        // `LatestEventValue` must change to indicate it “cannot be sent”, because the
3481        // error is unrecoverable.
3482        let previous_value = {
3483            let update = RoomSendQueueUpdate::SendError {
3484                transaction_id: transaction_id_0.clone(),
3485                error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
3486                is_recoverable: false,
3487            };
3488
3489            // The `LatestEventValue` has changed, it still matches the latest local
3490            // event but it's marked as “cannot be sent”.
3491            let value = assert_local_value_matches_room_message_with_body!(
3492                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3493                LatestEventValue::LocalCannotBeSent => with body = "B"
3494            );
3495
3496            assert_eq!(buffer.buffer.len(), 2);
3497            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
3498            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
3499
3500            value
3501        };
3502
3503        // Receiving a `RetryEvent` targeting the first event. The `LatestEventValue`
3504        // must change: this local event and its following must be “is sending”.
3505        {
3506            let update =
3507                RoomSendQueueUpdate::RetryEvent { transaction_id: transaction_id_0.clone() };
3508
3509            // The `LatestEventValue` has changed, it still matches the latest local
3510            // event but it's “is sending”.
3511            assert_local_value_matches_room_message_with_body!(
3512                Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3513                LatestEventValue::LocalIsSending => with body = "B"
3514            );
3515
3516            assert_eq!(buffer.buffer.len(), 2);
3517            assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
3518            assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
3519        }
3520    }
3521
3522    #[async_test]
3523    async fn test_local_media_upload() {
3524        let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3525        let user_id = client.user_id().unwrap();
3526
3527        let mut buffer = BufferOfValuesForLocalEvents::new();
3528        let transaction_id = OwnedTransactionId::from("txnid");
3529
3530        // Receiving a `NewLocalEvent`.
3531        let previous_value = {
3532            let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
3533
3534            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3535                transaction_id: transaction_id.clone(),
3536                content,
3537            });
3538
3539            // The `LatestEventValue` matches the new local event.
3540            let value = assert_local_value_matches_room_message_with_body!(
3541                Builder::new_local(&update, &mut buffer, &room_event_cache, LatestEventValue::None, user_id, None).await,
3542                LatestEventValue::LocalIsSending => with body = "A"
3543            );
3544
3545            assert_eq!(buffer.buffer.len(), 1);
3546
3547            value
3548        };
3549
3550        // Receiving a `MediaUpload` targeting the first event. The
3551        // `LatestEventValue` must not change as `MediaUpload` are ignored.
3552        {
3553            let update = RoomSendQueueUpdate::MediaUpload {
3554                related_to: transaction_id,
3555                file: None,
3556                index: 0,
3557                progress: AbstractProgress { current: 0, total: 0 },
3558            };
3559
3560            // The `LatestEventValue` has changed somehow, it tells no new
3561            // `LatestEventValue` is computed.
3562            assert_matches!(
3563                Builder::new_local(
3564                    &update,
3565                    &mut buffer,
3566                    &room_event_cache,
3567                    previous_value,
3568                    user_id,
3569                    None
3570                )
3571                .await,
3572                None
3573            );
3574
3575            assert_eq!(buffer.buffer.len(), 1);
3576        }
3577    }
3578
3579    #[async_test]
3580    async fn test_local_fallbacks_to_remote_when_empty() {
3581        let room_id = room_id!("!r0");
3582        let user_id = user_id!("@mnt_io:matrix.org");
3583        let event_factory = EventFactory::new().sender(user_id).room(room_id);
3584        let event_id_0 = event_id!("$ev0");
3585        let event_id_1 = event_id!("$ev1");
3586
3587        let server = MatrixMockServer::new().await;
3588        let client = server.client_builder().build().await;
3589
3590        // Prelude.
3591        {
3592            // Create the room.
3593            client.base_client().get_or_create_room(room_id, RoomState::Joined);
3594
3595            // Initialise the event cache store.
3596            client
3597                .event_cache_store()
3598                .lock()
3599                .await
3600                .expect("Could not acquire the event cache lock")
3601                .as_clean()
3602                .expect("Could not acquire a clean event cache lock")
3603                .handle_linked_chunk_updates(
3604                    LinkedChunkId::Room(room_id),
3605                    vec![
3606                        Update::NewItemsChunk {
3607                            previous: None,
3608                            new: ChunkIdentifier::new(0),
3609                            next: None,
3610                        },
3611                        Update::PushItems {
3612                            at: Position::new(ChunkIdentifier::new(0), 0),
3613                            items: vec![
3614                                event_factory.text_msg("hello").event_id(event_id_0).into(),
3615                            ],
3616                        },
3617                    ],
3618                )
3619                .await
3620                .unwrap();
3621        }
3622
3623        let event_cache = client.event_cache();
3624        event_cache.subscribe().unwrap();
3625
3626        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
3627
3628        let mut buffer = BufferOfValuesForLocalEvents::new();
3629
3630        // We get a `Remote` because there is no `Local*` values!
3631        assert_remote_value_matches_room_message_with_body!(
3632            Builder::new_local(
3633                // An update that won't create a new `LatestEventValue`: it maps
3634                // to zero existing local value.
3635                &RoomSendQueueUpdate::SentEvent {
3636                    transaction_id: OwnedTransactionId::from("txnid"),
3637                    event_id: event_id_1.to_owned(),
3638                },
3639                &mut buffer,
3640                &room_event_cache,
3641                LatestEventValue::None,
3642                user_id,
3643                None,
3644            )
3645            .await
3646            => with body = "hello"
3647        );
3648    }
3649
3650    #[async_test]
3651    async fn test_local_sent_event_resolves_to_remote_when_remote_echo_is_processed_first() {
3652        let room_id = room_id!("!r0");
3653        let user_id = user_id!("@mnt_io:matrix.org");
3654        let event_factory = EventFactory::new().sender(user_id).room(room_id);
3655        let sent_event_id = event_id!("$ev0");
3656
3657        let server = MatrixMockServer::new().await;
3658        let client = server.client_builder().build().await;
3659
3660        // Prelude.
3661        {
3662            // Create the room.
3663            client.base_client().get_or_create_room(room_id, RoomState::Joined);
3664
3665            // Initialise the event cache store.
3666            client
3667                .event_cache_store()
3668                .lock()
3669                .await
3670                .expect("Could not acquire the event cache lock")
3671                .as_clean()
3672                .expect("Could not acquire a clean event cache lock")
3673                .handle_linked_chunk_updates(
3674                    LinkedChunkId::Room(room_id),
3675                    vec![
3676                        Update::NewItemsChunk {
3677                            previous: None,
3678                            new: ChunkIdentifier::new(0),
3679                            next: None,
3680                        },
3681                        Update::PushItems {
3682                            at: Position::new(ChunkIdentifier::new(0), 0),
3683                            items: vec![
3684                                // The event that was sent and already synced back.
3685                                event_factory.text_msg("hello").event_id(sent_event_id).into(),
3686                            ],
3687                        },
3688                    ],
3689                )
3690                .await
3691                .unwrap();
3692        }
3693
3694        let event_cache = client.event_cache();
3695        event_cache.subscribe().unwrap();
3696
3697        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
3698        let send_queue = client.send_queue();
3699        let room = client.get_room(room_id).unwrap();
3700        let room_send_queue = send_queue.for_room(room);
3701
3702        let mut buffer = BufferOfValuesForLocalEvents::new();
3703        let transaction_id = OwnedTransactionId::from("txnid0");
3704
3705        // Step 1: A local event is being sent (added to the buffer).
3706        let previous_value = {
3707            let content = new_local_echo_content(&room_send_queue, &transaction_id, "hello");
3708            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3709                transaction_id: transaction_id.clone(),
3710                content,
3711            });
3712
3713            assert_local_value_matches_room_message_with_body!(
3714                Builder::new_local(
3715                    &update,
3716                    &mut buffer,
3717                    &room_event_cache,
3718                    LatestEventValue::None,
3719                    user_id,
3720                    None,
3721                )
3722                .await,
3723                LatestEventValue::LocalIsSending => with body = "hello"
3724            )
3725        };
3726        assert_eq!(buffer.buffer.len(), 1);
3727
3728        // Step 2: The SentEvent update arrives. The buffer is now empty but the remote
3729        // echo has already been processed and the event was added to the cache. The
3730        // builder should resolve to LatestEventValue::Remote.
3731        assert_remote_value_matches_room_message_with_body!(
3732            Builder::new_local(
3733                &RoomSendQueueUpdate::SentEvent {
3734                    transaction_id,
3735                    event_id: sent_event_id.to_owned(),
3736                },
3737                &mut buffer,
3738                &room_event_cache,
3739                previous_value,
3740                user_id,
3741                None,
3742            )
3743            .await
3744            => with body = "hello"
3745        );
3746        assert!(buffer.buffer.is_empty());
3747    }
3748
3749    #[async_test]
3750    async fn test_local_redaction() {
3751        let room_id = room_id!("!r0");
3752        let user_id = user_id!("@mnt_io:matrix.org");
3753        let event_factory = EventFactory::new().sender(user_id).room(room_id);
3754        let event_id = event_id!("$ev0");
3755
3756        let server = MatrixMockServer::new().await;
3757        let client = server.client_builder().build().await;
3758
3759        // Prelude.
3760        {
3761            // Create the room.
3762            client.base_client().get_or_create_room(room_id, RoomState::Joined);
3763
3764            // Initialise the event cache store.
3765            client
3766                .event_cache_store()
3767                .lock()
3768                .await
3769                .expect("Could not acquire the event cache lock")
3770                .as_clean()
3771                .expect("Could not acquire a clean event cache lock")
3772                .handle_linked_chunk_updates(
3773                    LinkedChunkId::Room(room_id),
3774                    vec![
3775                        Update::NewItemsChunk {
3776                            previous: None,
3777                            new: ChunkIdentifier::new(0),
3778                            next: None,
3779                        },
3780                        Update::PushItems {
3781                            at: Position::new(ChunkIdentifier::new(0), 0),
3782                            items: vec![event_factory.text_msg("hello").event_id(event_id).into()],
3783                        },
3784                    ],
3785                )
3786                .await
3787                .unwrap();
3788        }
3789
3790        let event_cache = client.event_cache();
3791        event_cache.subscribe().unwrap();
3792
3793        let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
3794        let send_queue = client.send_queue();
3795        let room = client.get_room(room_id).unwrap();
3796        let room_send_queue = send_queue.for_room(room);
3797
3798        let mut buffer = BufferOfValuesForLocalEvents::new();
3799
3800        // We get a `Remote` because there is no `Local*` values!
3801        assert_remote_value_matches_room_message_with_body!(
3802            Builder::new_local(
3803                // An update that won't create a new `LatestEventValue`: it maps
3804                // to zero existing local value.
3805                &RoomSendQueueUpdate::SentEvent {
3806                    transaction_id: OwnedTransactionId::from("txnid"),
3807                    event_id: event_id.to_owned(),
3808                },
3809                &mut buffer,
3810                &room_event_cache,
3811                LatestEventValue::None,
3812                user_id,
3813                None,
3814            )
3815            .await
3816            => with body = "hello"
3817        );
3818
3819        // A local redaction of the latest event value is being sent
3820        {
3821            let previous_value =
3822                LatestEventValue::Remote(remote_room_message(event_id!("$foo"), "Hello world"));
3823
3824            let transaction_id = OwnedTransactionId::from("txnid0");
3825            let content = LocalEchoContent::Redaction {
3826                redacts: event_id.to_owned(),
3827                reason: Some("whatever".to_owned()),
3828                send_handle: SendRedactionHandle::new(
3829                    room_send_queue.clone(),
3830                    transaction_id.clone(),
3831                ),
3832                send_error: None,
3833            };
3834            let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3835                transaction_id: transaction_id.clone(),
3836                content,
3837            });
3838
3839            // Local redactions are currently ignored.
3840            assert_matches!(
3841                Builder::new_local(
3842                    &update,
3843                    &mut buffer,
3844                    &room_event_cache,
3845                    previous_value,
3846                    user_id,
3847                    None
3848                )
3849                .await,
3850                None
3851            );
3852        };
3853        assert_eq!(buffer.buffer.len(), 0);
3854    }
3855}