matrix_sdk_ui/timeline/controller/
read_receipts.rs

1// Copyright 2023 Kévin Commaille
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{cmp::Ordering, collections::HashMap};
16
17use futures_core::Stream;
18use indexmap::IndexMap;
19use ruma::{
20    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, UserId,
21    events::receipt::{Receipt, ReceiptEventContent, ReceiptThread, ReceiptType},
22};
23use tokio::sync::watch;
24use tokio_stream::wrappers::WatchStream;
25use tracing::{debug, error, instrument, trace, warn};
26
27use super::{
28    AllRemoteEvents, ObservableItemsTransaction, RelativePosition, RoomDataProvider,
29    TimelineMetadata, TimelineState, rfind_event_by_id,
30};
31use crate::timeline::{TimelineItem, controller::TimelineStateTransaction};
32
33/// In-memory caches for read receipts.
34#[derive(Clone, Debug, Default)]
35pub(super) struct ReadReceipts {
36    /// Map of public read receipts on events.
37    ///
38    /// Event ID => User ID => Read receipt of the user.
39    by_event: HashMap<OwnedEventId, IndexMap<OwnedUserId, Receipt>>,
40
41    /// In-memory cache of all latest read receipts by user.
42    ///
43    /// User ID => Receipt type => Read receipt of the user of the given
44    /// type.
45    latest_by_user: HashMap<OwnedUserId, HashMap<ReceiptType, (OwnedEventId, Receipt)>>,
46
47    /// A sender to notify of changes to the receipts of our own user.
48    own_user_read_receipts_changed_sender: watch::Sender<()>,
49}
50
51impl ReadReceipts {
52    /// Empty the caches.
53    pub(super) fn clear(&mut self) {
54        self.by_event.clear();
55        self.latest_by_user.clear();
56    }
57
58    /// Subscribe to changes in the read receipts of our own user.
59    pub(super) fn subscribe_own_user_read_receipts_changed(
60        &self,
61    ) -> impl Stream<Item = ()> + use<> {
62        let subscriber = self.own_user_read_receipts_changed_sender.subscribe();
63        WatchStream::from_changes(subscriber)
64    }
65
66    /// Read the latest read receipt of the given type for the given user, from
67    /// the in-memory cache.
68    fn get_latest(
69        &self,
70        user_id: &UserId,
71        receipt_type: &ReceiptType,
72    ) -> Option<&(OwnedEventId, Receipt)> {
73        self.latest_by_user.get(user_id).and_then(|map| map.get(receipt_type))
74    }
75
76    /// Insert or update in the local cache the latest read receipt for the
77    /// given user.
78    fn upsert_latest(
79        &mut self,
80        user_id: OwnedUserId,
81        receipt_type: ReceiptType,
82        read_receipt: (OwnedEventId, Receipt),
83    ) {
84        self.latest_by_user.entry(user_id).or_default().insert(receipt_type, read_receipt);
85    }
86
87    /// Update the timeline items with the given read receipt if it is more
88    /// recent than the current one.
89    ///
90    /// In the process, if applicable, this method updates the inner maps to use
91    /// the new receipt. If `is_own_user_id` is `false`, it also updates the
92    /// receipts on the corresponding timeline items.
93    ///
94    /// Currently this method only works reliably if the timeline was started
95    /// from the end of the timeline.
96    #[instrument(skip_all, fields(user_id = %new_receipt.user_id, event_id = %new_receipt.event_id))]
97    fn maybe_update_read_receipt(
98        &mut self,
99        new_receipt: FullReceipt<'_>,
100        is_own_user_id: bool,
101        timeline_items: &mut ObservableItemsTransaction<'_>,
102    ) {
103        let all_events = timeline_items.all_remote_events();
104
105        // Get old receipt.
106        let old_receipt = self.get_latest(new_receipt.user_id, &new_receipt.receipt_type);
107
108        if old_receipt
109            .as_ref()
110            .is_some_and(|(old_receipt_event_id, _)| old_receipt_event_id == new_receipt.event_id)
111        {
112            // The receipt has not changed so there is nothing to do.
113            if !is_own_user_id {
114                trace!("receipt hasn't changed, nothing to do");
115            }
116            return;
117        }
118
119        let old_event_id = old_receipt.map(|(event_id, _)| event_id);
120
121        // Find receipts positions.
122        let mut old_receipt_pos = None;
123        let mut old_item_pos = None;
124        let mut old_item_event_id = None;
125        let mut new_receipt_pos = None;
126        let mut new_item_pos = None;
127        let mut new_item_event_id = None;
128
129        for (pos, event) in all_events.iter().rev().enumerate() {
130            if old_receipt_pos.is_none() && old_event_id == Some(&event.event_id) {
131                old_receipt_pos = Some(pos);
132            }
133
134            // The receipt should appear on the first event that is visible.
135            if old_receipt_pos.is_some() && old_item_event_id.is_none() && event.visible {
136                old_item_pos = event.timeline_item_index;
137                old_item_event_id = Some(event.event_id.clone());
138            }
139
140            if new_receipt_pos.is_none() && new_receipt.event_id == event.event_id {
141                new_receipt_pos = Some(pos);
142            }
143
144            // The receipt should appear on the first event that is visible.
145            if new_receipt_pos.is_some() && new_item_event_id.is_none() && event.visible {
146                new_item_pos = event.timeline_item_index;
147                new_item_event_id = Some(event.event_id.clone());
148            }
149
150            if old_item_event_id.is_some() && new_item_event_id.is_some() {
151                // We have everything we need, stop.
152                break;
153            }
154        }
155
156        // Check if the old receipt is more recent than the new receipt.
157        if let Some(old_receipt_pos) = old_receipt_pos {
158            let Some(new_receipt_pos) = new_receipt_pos else {
159                // The old receipt is more recent since we can't find the new receipt in the
160                // timeline and we supposedly have all events since the end of the timeline.
161                if !is_own_user_id {
162                    trace!(
163                        "we had a previous read receipt, but couldn't find the event \
164                         targeted by the new read receipt in the timeline, exiting"
165                    );
166                }
167                return;
168            };
169
170            if old_receipt_pos < new_receipt_pos {
171                // The old receipt is more recent than the new one.
172                if !is_own_user_id {
173                    trace!("the previous read receipt is more recent than the new one, exiting");
174                }
175                return;
176            }
177        }
178
179        // The new receipt is deemed more recent from now on because:
180        // - If old_receipt_pos is Some, we already checked all the cases where it
181        //   wouldn't be more recent.
182        // - If both old_receipt_pos and new_receipt_pos are None, they are both
183        //   explicit read receipts so the server should only send us a more recent
184        //   receipt.
185        // - If old_receipt_pos is None and new_receipt_pos is Some, the new receipt is
186        //   more recent because it has a place in the timeline.
187
188        if !is_own_user_id {
189            trace!(
190                from_event = ?old_event_id,
191                from_visible_event = ?old_item_event_id,
192                to_event = ?new_receipt.event_id,
193                to_visible_event = ?new_item_event_id,
194                ?old_item_pos,
195                ?new_item_pos,
196                "moving read receipt",
197            );
198
199            // Remove the old receipt from the old event.
200            if let Some(old_event_id) = old_event_id.cloned() {
201                self.remove_event_receipt_for_user(&old_event_id, new_receipt.user_id);
202            }
203
204            // Add the new receipt to the new event.
205            self.add_event_receipt_for_user(
206                new_receipt.event_id.to_owned(),
207                new_receipt.user_id.to_owned(),
208                new_receipt.receipt.clone(),
209            );
210        }
211
212        // Update the receipt of the user.
213        self.upsert_latest(
214            new_receipt.user_id.to_owned(),
215            new_receipt.receipt_type,
216            (new_receipt.event_id.to_owned(), new_receipt.receipt.clone()),
217        );
218
219        if is_own_user_id {
220            self.own_user_read_receipts_changed_sender.send_replace(());
221            // This receipt cannot change items in the timeline.
222            return;
223        }
224
225        if new_item_event_id == old_item_event_id {
226            // The receipt did not change in the timeline.
227            return;
228        }
229
230        let timeline_update = ReadReceiptTimelineUpdate {
231            old_item_pos,
232            old_event_id: old_item_event_id,
233            new_item_pos,
234            new_event_id: new_item_event_id,
235        };
236
237        timeline_update.apply(
238            timeline_items,
239            new_receipt.user_id.to_owned(),
240            new_receipt.receipt.clone(),
241        );
242    }
243
244    /// Returns the cached receipts by user for a given `event_id`.
245    fn get_event_receipts(&self, event_id: &EventId) -> Option<&IndexMap<OwnedUserId, Receipt>> {
246        self.by_event.get(event_id)
247    }
248
249    /// Mark the given event as seen by the user with the given receipt.
250    fn add_event_receipt_for_user(
251        &mut self,
252        event_id: OwnedEventId,
253        user_id: OwnedUserId,
254        receipt: Receipt,
255    ) {
256        self.by_event.entry(event_id).or_default().insert(user_id, receipt);
257    }
258
259    /// Unmark the given event as seen by the user.
260    fn remove_event_receipt_for_user(&mut self, event_id: &EventId, user_id: &UserId) {
261        if let Some(map) = self.by_event.get_mut(event_id) {
262            map.swap_remove(user_id);
263            // Remove the entire map if this was the last entry.
264            if map.is_empty() {
265                self.by_event.remove(event_id);
266            }
267        }
268    }
269
270    /// Get the read receipts by user for the given event.
271    ///
272    /// This includes all the receipts on the event as well as all the receipts
273    /// on the following events that are filtered out (not visible).
274    #[instrument(skip(self, timeline_items, at_end))]
275    pub(super) fn compute_event_receipts(
276        &self,
277        event_id: &EventId,
278        timeline_items: &mut ObservableItemsTransaction<'_>,
279        at_end: bool,
280    ) -> IndexMap<OwnedUserId, Receipt> {
281        let mut all_receipts = self.get_event_receipts(event_id).cloned().unwrap_or_default();
282
283        if at_end {
284            // No need to search for extra receipts, there are no events after.
285            trace!(
286                "early return because @end, retrieved receipts: {}",
287                all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
288            );
289            return all_receipts;
290        }
291
292        trace!(
293            "loaded receipts: {}",
294            all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
295        );
296
297        // We are going to add receipts for hidden events to this item.
298        //
299        // However: since we may be inserting an event at a random position, the
300        // previous timeline item may already be holding some hidden read
301        // receipts. As a result, we need to be careful here: if we're inserting
302        // after an event that holds hidden read receipts, then we should steal
303        // them from it.
304        //
305        // Find the event, go past it, and keep a reference to the previous rendered
306        // timeline item, if any.
307        let mut events_iter = timeline_items.all_remote_events().iter();
308        let mut prev_event_and_item_index = None;
309
310        for meta in events_iter.by_ref() {
311            if meta.event_id == event_id {
312                break;
313            }
314            if let Some(item_index) = meta.timeline_item_index {
315                prev_event_and_item_index = Some((meta.event_id.clone(), item_index));
316            }
317        }
318
319        // Include receipts for all the following non-visible events.
320        let mut hidden = Vec::new();
321        for hidden_event_meta in events_iter.take_while(|meta| !meta.visible) {
322            if let Some(event_receipts) = self.get_event_receipts(&hidden_event_meta.event_id) {
323                trace!(%hidden_event_meta.event_id, "found receipts on hidden event");
324                hidden.extend(event_receipts.clone());
325            }
326        }
327
328        // Steal hidden receipts from the previous timeline item, if it carried them.
329        if let Some((prev_event_id, prev_item_index)) = prev_event_and_item_index {
330            let prev_item = &timeline_items[prev_item_index];
331            // Technically, we could unwrap the `as_event()`, because this is a rendered
332            // item for an event in all_remote_events, but this extra check is
333            // cheap.
334            if let Some(remote_prev_item) = prev_item.as_event() {
335                let prev_receipts = remote_prev_item.read_receipts().clone();
336                for (user_id, _) in &hidden {
337                    if !prev_receipts.contains_key(user_id) {
338                        continue;
339                    }
340                    let mut up = ReadReceiptTimelineUpdate {
341                        old_item_pos: Some(prev_item_index),
342                        old_event_id: Some(prev_event_id.clone()),
343                        new_item_pos: None,
344                        new_event_id: None,
345                    };
346                    up.remove_old_receipt(timeline_items, user_id);
347                }
348            }
349        }
350
351        all_receipts.extend(hidden);
352        trace!(
353            "computed receipts: {}",
354            all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
355        );
356        all_receipts
357    }
358}
359
360struct FullReceipt<'a> {
361    event_id: &'a EventId,
362    user_id: &'a UserId,
363    receipt_type: ReceiptType,
364    receipt: &'a Receipt,
365}
366
367/// A read receipt update in the timeline.
368#[derive(Clone, Debug, Default)]
369struct ReadReceiptTimelineUpdate {
370    /// The position of the timeline item that had the old receipt of the user,
371    /// if any.
372    old_item_pos: Option<usize>,
373    /// The old event that had the receipt of the user, if any.
374    old_event_id: Option<OwnedEventId>,
375    /// The position of the timeline item that has the new receipt of the user,
376    /// if any.
377    new_item_pos: Option<usize>,
378    /// The new event that has the receipt of the user, if any.
379    new_event_id: Option<OwnedEventId>,
380}
381
382impl ReadReceiptTimelineUpdate {
383    /// Remove the old receipt from the corresponding timeline item.
384    #[instrument(skip_all)]
385    fn remove_old_receipt(&mut self, items: &mut ObservableItemsTransaction<'_>, user_id: &UserId) {
386        let Some(event_id) = &self.old_event_id else {
387            // Nothing to do.
388            return;
389        };
390
391        let item_pos = self.old_item_pos.or_else(|| {
392            items
393                .iter_remotes_region()
394                .rev()
395                .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
396                .find_map(|(nth, event_item)| {
397                    (event_item.event_id() == Some(event_id)).then_some(nth)
398                })
399        });
400
401        let Some(item_pos) = item_pos else {
402            debug!(%event_id, %user_id, "inconsistent state: old event item for read receipt was not found");
403            return;
404        };
405
406        self.old_item_pos = Some(item_pos);
407
408        let event_item = &items[item_pos];
409        let event_item_id = event_item.unique_id().to_owned();
410
411        let Some(mut event_item) = event_item.as_event().cloned() else {
412            warn!("received a read receipt for a virtual item, this should not be possible");
413            return;
414        };
415
416        if let Some(remote_event_item) = event_item.as_remote_mut() {
417            if remote_event_item.read_receipts.swap_remove(user_id).is_none() {
418                debug!(
419                    %event_id, %user_id,
420                    "inconsistent state: old event item for user's read \
421                     receipt doesn't have a receipt for the user"
422                );
423            }
424            trace!(%user_id, %event_id, "removed read receipt from event item");
425            items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
426        } else {
427            warn!("received a read receipt for a local item, this should not be possible");
428        }
429    }
430
431    /// Add the new receipt to the corresponding timeline item.
432    #[instrument(skip_all)]
433    fn add_new_receipt(
434        self,
435        items: &mut ObservableItemsTransaction<'_>,
436        user_id: OwnedUserId,
437        receipt: Receipt,
438    ) {
439        let Some(event_id) = self.new_event_id else {
440            // Nothing to do.
441            return;
442        };
443
444        let old_item_pos = self.old_item_pos.unwrap_or(0);
445
446        let item_pos = self.new_item_pos.or_else(|| {
447            items
448                .iter_remotes_region()
449                // Don't iterate over all items if the `old_item_pos` is known: the `item_pos`
450                // for the new item is necessarily _after_ the old item.
451                .skip_while(|(nth, _)| *nth < old_item_pos)
452                .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
453                .find_map(|(nth, event_item)| {
454                    (event_item.event_id() == Some(&event_id)).then_some(nth)
455                })
456        });
457
458        let Some(item_pos) = item_pos else {
459            debug!(
460                %event_id, %user_id,
461                "inconsistent state: new event item for read receipt was not found",
462            );
463            return;
464        };
465
466        debug_assert!(
467            item_pos >= self.old_item_pos.unwrap_or(0),
468            "The new receipt must be added on a timeline item that is _after_ the timeline item \
469             that was holding the old receipt"
470        );
471
472        let event_item = &items[item_pos];
473        let event_item_id = event_item.unique_id().to_owned();
474
475        let Some(mut event_item) = event_item.as_event().cloned() else {
476            warn!("received a read receipt for a virtual item, this should not be possible");
477            return;
478        };
479
480        if let Some(remote_event_item) = event_item.as_remote_mut() {
481            trace!(%user_id, %event_id, "added read receipt to event item");
482            remote_event_item.read_receipts.insert(user_id, receipt);
483            items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
484        } else {
485            warn!("received a read receipt for a local item, this should not be possible");
486        }
487    }
488
489    /// Apply this update to the timeline.
490    fn apply(
491        mut self,
492        items: &mut ObservableItemsTransaction<'_>,
493        user_id: OwnedUserId,
494        receipt: Receipt,
495    ) {
496        self.remove_old_receipt(items, &user_id);
497        self.add_new_receipt(items, user_id, receipt);
498    }
499}
500
501impl<P: RoomDataProvider> TimelineStateTransaction<'_, P> {
502    pub(super) fn handle_explicit_read_receipts(
503        &mut self,
504        receipt_event_content: ReceiptEventContent,
505        own_user_id: &UserId,
506    ) {
507        trace!("handling explicit read receipts");
508        let own_receipt_thread = self.focus.receipt_thread();
509
510        for (event_id, receipt_types) in receipt_event_content.0 {
511            for (receipt_type, receipts) in receipt_types {
512                // Discard the read marker updates in this function.
513                if !matches!(receipt_type, ReceiptType::Read | ReceiptType::ReadPrivate) {
514                    continue;
515                }
516
517                for (user_id, receipt) in receipts {
518                    if matches!(own_receipt_thread, ReceiptThread::Unthreaded | ReceiptThread::Main)
519                    {
520                        // If the own receipt thread is unthreaded or main, we maintain maximal
521                        // compatibility with clients using either unthreaded or main-thread read
522                        // receipts by allowing both here.
523                        if !matches!(
524                            receipt.thread,
525                            ReceiptThread::Unthreaded | ReceiptThread::Main
526                        ) {
527                            continue;
528                        }
529                    } else if own_receipt_thread != receipt.thread {
530                        // Otherwise, we only keep the receipts of the same thread kind.
531                        continue;
532                    }
533
534                    let is_own_user_id = user_id == own_user_id;
535                    let full_receipt = FullReceipt {
536                        event_id: &event_id,
537                        user_id: &user_id,
538                        receipt_type: receipt_type.clone(),
539                        receipt: &receipt,
540                    };
541
542                    self.meta.read_receipts.maybe_update_read_receipt(
543                        full_receipt,
544                        is_own_user_id,
545                        &mut self.items,
546                    );
547                }
548            }
549        }
550    }
551
552    /// Load the read receipts from the store for the given event ID.
553    ///
554    /// Populates the read receipts in-memory caches.
555    pub(super) async fn load_read_receipts_for_event(
556        &mut self,
557        event_id: &EventId,
558        room_data_provider: &P,
559    ) {
560        trace!(%event_id, "loading initial receipts for an event");
561
562        let receipt_thread = self.focus.receipt_thread();
563        let read_receipts = room_data_provider.load_event_receipts(event_id, receipt_thread).await;
564        let own_user_id = room_data_provider.own_user_id();
565
566        // Since they are explicit read receipts, we need to check if they are
567        // superseded by implicit read receipts.
568        for (user_id, receipt) in read_receipts {
569            let full_receipt = FullReceipt {
570                event_id,
571                user_id: &user_id,
572                receipt_type: ReceiptType::Read,
573                receipt: &receipt,
574            };
575
576            self.meta.read_receipts.maybe_update_read_receipt(
577                full_receipt,
578                user_id == own_user_id,
579                &mut self.items,
580            );
581        }
582    }
583
584    /// Add an implicit read receipt to the given event item, if it is more
585    /// recent than the current read receipt for the sender of the event.
586    ///
587    /// According to the spec, read receipts should not point to events sent by
588    /// our own user, but these events are used to reset the notification
589    /// count, so we need to handle them locally too. For that we create an
590    /// "implicit" read receipt, compared to the "explicit" ones sent by the
591    /// client.
592    pub(super) fn maybe_add_implicit_read_receipt(
593        &mut self,
594        event_id: &EventId,
595        sender: Option<&UserId>,
596        timestamp: Option<MilliSecondsSinceUnixEpoch>,
597    ) {
598        let (Some(user_id), Some(timestamp)) = (sender, timestamp) else {
599            // We cannot add a read receipt if we do not know the user or the timestamp.
600            return;
601        };
602
603        trace!(%user_id, %event_id, "adding implicit read receipt");
604
605        let mut receipt = Receipt::new(timestamp);
606        receipt.thread = self.focus.receipt_thread();
607
608        let full_receipt =
609            FullReceipt { event_id, user_id, receipt_type: ReceiptType::Read, receipt: &receipt };
610
611        let is_own_event = sender.is_some_and(|sender| sender == self.meta.own_user_id);
612
613        self.meta.read_receipts.maybe_update_read_receipt(
614            full_receipt,
615            is_own_event,
616            &mut self.items,
617        );
618    }
619
620    /// Update the read receipts on the event with the given event ID and the
621    /// previous visible event because of a visibility change.
622    #[instrument(skip(self))]
623    pub(super) fn maybe_update_read_receipts_of_prev_event(&mut self, event_id: &EventId) {
624        // Find the previous visible event, if there is one.
625        let Some(prev_event_meta) = self
626            .items
627            .all_remote_events()
628            .iter()
629            .rev()
630            // Find the event item.
631            .skip_while(|meta| meta.event_id != event_id)
632            // Go past the event item.
633            .skip(1)
634            // Find the first visible item.
635            .find(|meta| meta.visible)
636        else {
637            trace!("Couldn't find any previous visible event, exiting");
638            return;
639        };
640
641        let Some((prev_item_pos, prev_event_item)) =
642            rfind_event_by_id(&self.items, &prev_event_meta.event_id)
643        else {
644            error!("inconsistent state: timeline item of visible event was not found");
645            return;
646        };
647
648        let prev_event_item_id = prev_event_item.internal_id.to_owned();
649        let mut prev_event_item = prev_event_item.clone();
650
651        let Some(remote_prev_event_item) = prev_event_item.as_remote_mut() else {
652            warn!("loading read receipts for a local item, this should not be possible");
653            return;
654        };
655
656        let read_receipts = self.meta.read_receipts.compute_event_receipts(
657            &remote_prev_event_item.event_id,
658            &mut self.items,
659            false,
660        );
661
662        // If the count did not change, the receipts did not change either.
663        if read_receipts.len() == remote_prev_event_item.read_receipts.len() {
664            trace!("same count of read receipts, not doing anything");
665            return;
666        }
667
668        trace!("replacing read receipts with the new ones");
669        remote_prev_event_item.read_receipts = read_receipts;
670        self.items.replace(prev_item_pos, TimelineItem::new(prev_event_item, prev_event_item_id));
671    }
672}
673
674impl<P: RoomDataProvider> TimelineState<P> {
675    /// Populates our own latest read receipt in the in-memory by-user read
676    /// receipt cache.
677    pub(super) async fn populate_initial_user_receipt(
678        &mut self,
679        room_data_provider: &P,
680        receipt_type: ReceiptType,
681    ) {
682        let own_user_id = room_data_provider.own_user_id().to_owned();
683
684        let receipt_thread = self.focus.receipt_thread();
685        let wants_unthreaded_receipts = receipt_thread == ReceiptThread::Unthreaded;
686
687        let mut read_receipt = room_data_provider
688            .load_user_receipt(receipt_type.clone(), receipt_thread, &own_user_id)
689            .await;
690
691        if wants_unthreaded_receipts && read_receipt.is_none() {
692            // Fallback to the one in the main thread.
693            read_receipt = room_data_provider
694                .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, &own_user_id)
695                .await;
696        }
697
698        if let Some(read_receipt) = read_receipt {
699            self.meta.read_receipts.upsert_latest(own_user_id, receipt_type, read_receipt);
700        }
701    }
702
703    /// Get the latest read receipt for the given user.
704    ///
705    /// Useful to get the latest read receipt, whether it's private or public.
706    pub(super) async fn latest_user_read_receipt(
707        &self,
708        user_id: &UserId,
709        receipt_thread: ReceiptThread,
710        room_data_provider: &P,
711    ) -> Option<(OwnedEventId, Receipt)> {
712        let all_remote_events = self.items.all_remote_events();
713
714        let public_read_receipt = self
715            .meta
716            .user_receipt(
717                user_id,
718                ReceiptType::Read,
719                receipt_thread.clone(),
720                room_data_provider,
721                all_remote_events,
722            )
723            .await;
724
725        let private_read_receipt = self
726            .meta
727            .user_receipt(
728                user_id,
729                ReceiptType::ReadPrivate,
730                receipt_thread,
731                room_data_provider,
732                all_remote_events,
733            )
734            .await;
735
736        // Let's assume that a private read receipt should be more recent than a public
737        // read receipt (otherwise there's no point in the private read receipt),
738        // and use it as the default.
739        match TimelineMetadata::compare_optional_receipts(
740            public_read_receipt.as_ref(),
741            private_read_receipt.as_ref(),
742            all_remote_events,
743        ) {
744            Ordering::Greater => public_read_receipt,
745            Ordering::Less => private_read_receipt,
746            _ => unreachable!(),
747        }
748    }
749
750    /// Get the ID of the visible timeline event with the latest read receipt
751    /// for the given user.
752    pub(super) fn latest_user_read_receipt_timeline_event_id(
753        &self,
754        user_id: &UserId,
755    ) -> Option<OwnedEventId> {
756        // We only need to use the local map, since receipts for known events are
757        // already loaded from the store.
758        let public_read_receipt = self.meta.read_receipts.get_latest(user_id, &ReceiptType::Read);
759        let private_read_receipt =
760            self.meta.read_receipts.get_latest(user_id, &ReceiptType::ReadPrivate);
761
762        // Let's assume that a private read receipt should be more recent than a public
763        // read receipt, otherwise there's no point in the private read receipt,
764        // and use it as default.
765        let (latest_receipt_id, _) = match TimelineMetadata::compare_optional_receipts(
766            public_read_receipt,
767            private_read_receipt,
768            self.items.all_remote_events(),
769        ) {
770            Ordering::Greater => public_read_receipt?,
771            Ordering::Less => private_read_receipt?,
772            _ => unreachable!(),
773        };
774
775        // Find the corresponding visible event.
776        self.items
777            .all_remote_events()
778            .iter()
779            .rev()
780            .skip_while(|ev| ev.event_id != *latest_receipt_id)
781            .find(|ev| ev.visible)
782            .map(|ev| ev.event_id.clone())
783    }
784}
785
786impl TimelineMetadata {
787    /// Get the latest receipt of the given type for the given user in the
788    /// timeline.
789    ///
790    /// This will attempt to read the latest user receipt for a user from the
791    /// cache, or load it from the storage if missing from the cache.
792    ///
793    /// If the `ReceiptThread` is `Unthreaded`, it will try to find either the
794    /// unthreaded or the main-thread read receipt, to be maximally
795    /// compatible with clients using one or the other. Otherwise, it will
796    /// select only the receipts for that specific thread.
797    pub(super) async fn user_receipt<P: RoomDataProvider>(
798        &self,
799        user_id: &UserId,
800        receipt_type: ReceiptType,
801        receipt_thread: ReceiptThread,
802        room_data_provider: &P,
803        all_remote_events: &AllRemoteEvents,
804    ) -> Option<(OwnedEventId, Receipt)> {
805        if let Some(receipt) = self.read_receipts.get_latest(user_id, &receipt_type) {
806            // Since it is in the timeline, it should be the most recent.
807            return Some(receipt.clone());
808        }
809
810        if receipt_thread == ReceiptThread::Unthreaded {
811            // Maintain compatibility with clients using either the unthreaded and main read
812            // receipts, and try to find the most recent one.
813            let unthreaded_read_receipt = room_data_provider
814                .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
815                .await;
816
817            let main_thread_read_receipt = room_data_provider
818                .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id)
819                .await;
820
821            // Let's use the unthreaded read receipt as default, since it's the one we
822            // should be using.
823            match Self::compare_optional_receipts(
824                main_thread_read_receipt.as_ref(),
825                unthreaded_read_receipt.as_ref(),
826                all_remote_events,
827            ) {
828                Ordering::Greater => main_thread_read_receipt,
829                Ordering::Less => unthreaded_read_receipt,
830                _ => unreachable!(),
831            }
832        } else {
833            // In all the other cases, use the thread's read receipt. A main-thread receipt
834            // in particular will use this code path, and not be compatible with
835            // an unthreaded read receipt.
836            room_data_provider
837                .load_user_receipt(receipt_type.clone(), receipt_thread, user_id)
838                .await
839        }
840    }
841
842    /// Compares two optional receipts to know which one is more recent.
843    ///
844    /// Returns `Ordering::Greater` if the left-hand side is more recent than
845    /// the right-hand side, and `Ordering::Less` if it is older. If it's
846    /// not possible to know which one is the more recent, defaults to
847    /// `Ordering::Less`, making the right-hand side the default.
848    fn compare_optional_receipts(
849        lhs: Option<&(OwnedEventId, Receipt)>,
850        rhs_or_default: Option<&(OwnedEventId, Receipt)>,
851        all_remote_events: &AllRemoteEvents,
852    ) -> Ordering {
853        // If we only have one, use it.
854        let Some((lhs_event_id, lhs_receipt)) = lhs else {
855            return Ordering::Less;
856        };
857        let Some((rhs_event_id, rhs_receipt)) = rhs_or_default else {
858            return Ordering::Greater;
859        };
860
861        // Compare by position in the timeline.
862        if let Some(relative_pos) =
863            Self::compare_events_positions(lhs_event_id, rhs_event_id, all_remote_events)
864        {
865            if relative_pos == RelativePosition::Before {
866                return Ordering::Greater;
867            }
868
869            return Ordering::Less;
870        }
871
872        // Compare by timestamp.
873        if let Some((lhs_ts, rhs_ts)) = lhs_receipt.ts.zip(rhs_receipt.ts) {
874            if lhs_ts > rhs_ts {
875                return Ordering::Greater;
876            }
877
878            return Ordering::Less;
879        }
880
881        Ordering::Less
882    }
883}