matrix_sdk_ui/timeline/controller/
read_receipts.rs

1// Copyright 2023 Kévin Commaille
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{cmp::Ordering, collections::HashMap};
16
17use futures_core::Stream;
18use indexmap::IndexMap;
19use ruma::{
20    events::receipt::{Receipt, ReceiptEventContent, ReceiptThread, ReceiptType},
21    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, UserId,
22};
23use tokio::sync::watch;
24use tokio_stream::wrappers::WatchStream;
25use tracing::{debug, error, instrument, trace, warn};
26
27use super::{
28    rfind_event_by_id, AllRemoteEvents, ObservableItemsTransaction, RelativePosition,
29    RoomDataProvider, TimelineMetadata, TimelineState,
30};
31use crate::timeline::{controller::TimelineStateTransaction, TimelineItem};
32
33/// In-memory caches for read receipts.
34#[derive(Clone, Debug, Default)]
35pub(super) struct ReadReceipts {
36    /// Map of public read receipts on events.
37    ///
38    /// Event ID => User ID => Read receipt of the user.
39    by_event: HashMap<OwnedEventId, IndexMap<OwnedUserId, Receipt>>,
40
41    /// In-memory cache of all latest read receipts by user.
42    ///
43    /// User ID => Receipt type => Read receipt of the user of the given
44    /// type.
45    latest_by_user: HashMap<OwnedUserId, HashMap<ReceiptType, (OwnedEventId, Receipt)>>,
46
47    /// A sender to notify of changes to the receipts of our own user.
48    own_user_read_receipts_changed_sender: watch::Sender<()>,
49}
50
51impl ReadReceipts {
52    /// Empty the caches.
53    pub(super) fn clear(&mut self) {
54        self.by_event.clear();
55        self.latest_by_user.clear();
56    }
57
58    /// Subscribe to changes in the read receipts of our own user.
59    pub(super) fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
60        let subscriber = self.own_user_read_receipts_changed_sender.subscribe();
61        WatchStream::from_changes(subscriber)
62    }
63
64    /// Read the latest read receipt of the given type for the given user, from
65    /// the in-memory cache.
66    fn get_latest(
67        &self,
68        user_id: &UserId,
69        receipt_type: &ReceiptType,
70    ) -> Option<&(OwnedEventId, Receipt)> {
71        self.latest_by_user.get(user_id).and_then(|map| map.get(receipt_type))
72    }
73
74    /// Insert or update in the local cache the latest read receipt for the
75    /// given user.
76    fn upsert_latest(
77        &mut self,
78        user_id: OwnedUserId,
79        receipt_type: ReceiptType,
80        read_receipt: (OwnedEventId, Receipt),
81    ) {
82        self.latest_by_user.entry(user_id).or_default().insert(receipt_type, read_receipt);
83    }
84
85    /// Update the timeline items with the given read receipt if it is more
86    /// recent than the current one.
87    ///
88    /// In the process, if applicable, this method updates the inner maps to use
89    /// the new receipt. If `is_own_user_id` is `false`, it also updates the
90    /// receipts on the corresponding timeline items.
91    ///
92    /// Currently this method only works reliably if the timeline was started
93    /// from the end of the timeline.
94    #[instrument(skip_all, fields(user_id = %new_receipt.user_id, event_id = %new_receipt.event_id))]
95    fn maybe_update_read_receipt(
96        &mut self,
97        new_receipt: FullReceipt<'_>,
98        is_own_user_id: bool,
99        timeline_items: &mut ObservableItemsTransaction<'_>,
100    ) {
101        let all_events = timeline_items.all_remote_events();
102
103        // Get old receipt.
104        let old_receipt = self.get_latest(new_receipt.user_id, &new_receipt.receipt_type);
105
106        if old_receipt
107            .as_ref()
108            .is_some_and(|(old_receipt_event_id, _)| old_receipt_event_id == new_receipt.event_id)
109        {
110            // The receipt has not changed so there is nothing to do.
111            if !is_own_user_id {
112                trace!("receipt hasn't changed, nothing to do");
113            }
114            return;
115        }
116
117        let old_event_id = old_receipt.map(|(event_id, _)| event_id);
118
119        // Find receipts positions.
120        let mut old_receipt_pos = None;
121        let mut old_item_pos = None;
122        let mut old_item_event_id = None;
123        let mut new_receipt_pos = None;
124        let mut new_item_pos = None;
125        let mut new_item_event_id = None;
126
127        for (pos, event) in all_events.iter().rev().enumerate() {
128            if old_receipt_pos.is_none() && old_event_id == Some(&event.event_id) {
129                old_receipt_pos = Some(pos);
130            }
131
132            // The receipt should appear on the first event that is visible.
133            if old_receipt_pos.is_some() && old_item_event_id.is_none() && event.visible {
134                old_item_pos = event.timeline_item_index;
135                old_item_event_id = Some(event.event_id.clone());
136            }
137
138            if new_receipt_pos.is_none() && new_receipt.event_id == event.event_id {
139                new_receipt_pos = Some(pos);
140            }
141
142            // The receipt should appear on the first event that is visible.
143            if new_receipt_pos.is_some() && new_item_event_id.is_none() && event.visible {
144                new_item_pos = event.timeline_item_index;
145                new_item_event_id = Some(event.event_id.clone());
146            }
147
148            if old_item_event_id.is_some() && new_item_event_id.is_some() {
149                // We have everything we need, stop.
150                break;
151            }
152        }
153
154        // Check if the old receipt is more recent than the new receipt.
155        if let Some(old_receipt_pos) = old_receipt_pos {
156            let Some(new_receipt_pos) = new_receipt_pos else {
157                // The old receipt is more recent since we can't find the new receipt in the
158                // timeline and we supposedly have all events since the end of the timeline.
159                if !is_own_user_id {
160                    trace!("we had a previous read receipt, but couldn't find the event targeted by the new read receipt in the timeline, exiting");
161                }
162                return;
163            };
164
165            if old_receipt_pos < new_receipt_pos {
166                // The old receipt is more recent than the new one.
167                if !is_own_user_id {
168                    trace!("the previous read receipt is more recent than the new one, exiting");
169                }
170                return;
171            }
172        }
173
174        // The new receipt is deemed more recent from now on because:
175        // - If old_receipt_pos is Some, we already checked all the cases where it
176        //   wouldn't be more recent.
177        // - If both old_receipt_pos and new_receipt_pos are None, they are both
178        //   explicit read receipts so the server should only send us a more recent
179        //   receipt.
180        // - If old_receipt_pos is None and new_receipt_pos is Some, the new receipt is
181        //   more recent because it has a place in the timeline.
182
183        if !is_own_user_id {
184            trace!(
185                from_event = ?old_event_id,
186                from_visible_event = ?old_item_event_id,
187                to_event = ?new_receipt.event_id,
188                to_visible_event = ?new_item_event_id,
189                ?old_item_pos,
190                ?new_item_pos,
191                "moving read receipt",
192            );
193
194            // Remove the old receipt from the old event.
195            if let Some(old_event_id) = old_event_id.cloned() {
196                self.remove_event_receipt_for_user(&old_event_id, new_receipt.user_id);
197            }
198
199            // Add the new receipt to the new event.
200            self.add_event_receipt_for_user(
201                new_receipt.event_id.to_owned(),
202                new_receipt.user_id.to_owned(),
203                new_receipt.receipt.clone(),
204            );
205        }
206
207        // Update the receipt of the user.
208        self.upsert_latest(
209            new_receipt.user_id.to_owned(),
210            new_receipt.receipt_type,
211            (new_receipt.event_id.to_owned(), new_receipt.receipt.clone()),
212        );
213
214        if is_own_user_id {
215            self.own_user_read_receipts_changed_sender.send_replace(());
216            // This receipt cannot change items in the timeline.
217            return;
218        }
219
220        if new_item_event_id == old_item_event_id {
221            // The receipt did not change in the timeline.
222            return;
223        }
224
225        let timeline_update = ReadReceiptTimelineUpdate {
226            old_item_pos,
227            old_event_id: old_item_event_id,
228            new_item_pos,
229            new_event_id: new_item_event_id,
230        };
231
232        timeline_update.apply(
233            timeline_items,
234            new_receipt.user_id.to_owned(),
235            new_receipt.receipt.clone(),
236        );
237    }
238
239    /// Returns the cached receipts by user for a given `event_id`.
240    fn get_event_receipts(&self, event_id: &EventId) -> Option<&IndexMap<OwnedUserId, Receipt>> {
241        self.by_event.get(event_id)
242    }
243
244    /// Mark the given event as seen by the user with the given receipt.
245    fn add_event_receipt_for_user(
246        &mut self,
247        event_id: OwnedEventId,
248        user_id: OwnedUserId,
249        receipt: Receipt,
250    ) {
251        self.by_event.entry(event_id).or_default().insert(user_id, receipt);
252    }
253
254    /// Unmark the given event as seen by the user.
255    fn remove_event_receipt_for_user(&mut self, event_id: &EventId, user_id: &UserId) {
256        if let Some(map) = self.by_event.get_mut(event_id) {
257            map.swap_remove(user_id);
258            // Remove the entire map if this was the last entry.
259            if map.is_empty() {
260                self.by_event.remove(event_id);
261            }
262        }
263    }
264
265    /// Get the read receipts by user for the given event.
266    ///
267    /// This includes all the receipts on the event as well as all the receipts
268    /// on the following events that are filtered out (not visible).
269    #[instrument(skip(self, timeline_items, at_end))]
270    pub(super) fn compute_event_receipts(
271        &self,
272        event_id: &EventId,
273        timeline_items: &mut ObservableItemsTransaction<'_>,
274        at_end: bool,
275    ) -> IndexMap<OwnedUserId, Receipt> {
276        let mut all_receipts = self.get_event_receipts(event_id).cloned().unwrap_or_default();
277
278        if at_end {
279            // No need to search for extra receipts, there are no events after.
280            trace!(
281                "early return because @end, retrieved receipts: {}",
282                all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
283            );
284            return all_receipts;
285        }
286
287        trace!(
288            "loaded receipts: {}",
289            all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
290        );
291
292        // We are going to add receipts for hidden events to this item.
293        //
294        // However: since we may be inserting an event at a random position, the
295        // previous timeline item may already be holding some hidden read
296        // receipts. As a result, we need to be careful here: if we're inserting
297        // after an event that holds hidden read receipts, then we should steal
298        // them from it.
299        //
300        // Find the event, go past it, and keep a reference to the previous rendered
301        // timeline item, if any.
302        let mut events_iter = timeline_items.all_remote_events().iter();
303        let mut prev_event_and_item_index = None;
304
305        for meta in events_iter.by_ref() {
306            if meta.event_id == event_id {
307                break;
308            }
309            if let Some(item_index) = meta.timeline_item_index {
310                prev_event_and_item_index = Some((meta.event_id.clone(), item_index));
311            }
312        }
313
314        // Include receipts for all the following non-visible events.
315        let mut hidden = Vec::new();
316        for hidden_event_meta in events_iter.take_while(|meta| !meta.visible) {
317            if let Some(event_receipts) = self.get_event_receipts(&hidden_event_meta.event_id) {
318                trace!(%hidden_event_meta.event_id, "found receipts on hidden event");
319                hidden.extend(event_receipts.clone());
320            }
321        }
322
323        // Steal hidden receipts from the previous timeline item, if it carried them.
324        if let Some((prev_event_id, prev_item_index)) = prev_event_and_item_index {
325            let prev_item = &timeline_items[prev_item_index];
326            // Technically, we could unwrap the `as_event()`, because this is a rendered
327            // item for an event in all_remote_events, but this extra check is
328            // cheap.
329            if let Some(remote_prev_item) = prev_item.as_event() {
330                let prev_receipts = remote_prev_item.read_receipts().clone();
331                for (user_id, _) in &hidden {
332                    if !prev_receipts.contains_key(user_id) {
333                        continue;
334                    }
335                    let mut up = ReadReceiptTimelineUpdate {
336                        old_item_pos: Some(prev_item_index),
337                        old_event_id: Some(prev_event_id.clone()),
338                        new_item_pos: None,
339                        new_event_id: None,
340                    };
341                    up.remove_old_receipt(timeline_items, user_id);
342                }
343            }
344        }
345
346        all_receipts.extend(hidden);
347        trace!(
348            "computed receipts: {}",
349            all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
350        );
351        all_receipts
352    }
353}
354
355struct FullReceipt<'a> {
356    event_id: &'a EventId,
357    user_id: &'a UserId,
358    receipt_type: ReceiptType,
359    receipt: &'a Receipt,
360}
361
362/// A read receipt update in the timeline.
363#[derive(Clone, Debug, Default)]
364struct ReadReceiptTimelineUpdate {
365    /// The position of the timeline item that had the old receipt of the user,
366    /// if any.
367    old_item_pos: Option<usize>,
368    /// The old event that had the receipt of the user, if any.
369    old_event_id: Option<OwnedEventId>,
370    /// The position of the timeline item that has the new receipt of the user,
371    /// if any.
372    new_item_pos: Option<usize>,
373    /// The new event that has the receipt of the user, if any.
374    new_event_id: Option<OwnedEventId>,
375}
376
377impl ReadReceiptTimelineUpdate {
378    /// Remove the old receipt from the corresponding timeline item.
379    #[instrument(skip_all)]
380    fn remove_old_receipt(&mut self, items: &mut ObservableItemsTransaction<'_>, user_id: &UserId) {
381        let Some(event_id) = &self.old_event_id else {
382            // Nothing to do.
383            return;
384        };
385
386        let item_pos = self.old_item_pos.or_else(|| {
387            items
388                .iter_remotes_region()
389                .rev()
390                .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
391                .find_map(|(nth, event_item)| {
392                    (event_item.event_id() == Some(event_id)).then_some(nth)
393                })
394        });
395
396        let Some(item_pos) = item_pos else {
397            debug!(%event_id, %user_id, "inconsistent state: old event item for read receipt was not found");
398            return;
399        };
400
401        self.old_item_pos = Some(item_pos);
402
403        let event_item = &items[item_pos];
404        let event_item_id = event_item.unique_id().to_owned();
405
406        let Some(mut event_item) = event_item.as_event().cloned() else {
407            warn!("received a read receipt for a virtual item, this should not be possible");
408            return;
409        };
410
411        if let Some(remote_event_item) = event_item.as_remote_mut() {
412            if remote_event_item.read_receipts.swap_remove(user_id).is_none() {
413                debug!(
414                    %event_id, %user_id,
415                    "inconsistent state: old event item for user's read \
416                     receipt doesn't have a receipt for the user"
417                );
418            }
419            trace!(%user_id, %event_id, "removed read receipt from event item");
420            items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
421        } else {
422            warn!("received a read receipt for a local item, this should not be possible");
423        }
424    }
425
426    /// Add the new receipt to the corresponding timeline item.
427    #[instrument(skip_all)]
428    fn add_new_receipt(
429        self,
430        items: &mut ObservableItemsTransaction<'_>,
431        user_id: OwnedUserId,
432        receipt: Receipt,
433    ) {
434        let Some(event_id) = self.new_event_id else {
435            // Nothing to do.
436            return;
437        };
438
439        let old_item_pos = self.old_item_pos.unwrap_or(0);
440
441        let item_pos = self.new_item_pos.or_else(|| {
442            items
443                .iter_remotes_region()
444                // Don't iterate over all items if the `old_item_pos` is known: the `item_pos`
445                // for the new item is necessarily _after_ the old item.
446                .skip_while(|(nth, _)| *nth < old_item_pos)
447                .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
448                .find_map(|(nth, event_item)| {
449                    (event_item.event_id() == Some(&event_id)).then_some(nth)
450                })
451        });
452
453        let Some(item_pos) = item_pos else {
454            debug!(
455                %event_id, %user_id,
456                "inconsistent state: new event item for read receipt was not found",
457            );
458            return;
459        };
460
461        debug_assert!(
462            item_pos >= self.old_item_pos.unwrap_or(0),
463            "The new receipt must be added on a timeline item that is _after_ the timeline item \
464             that was holding the old receipt"
465        );
466
467        let event_item = &items[item_pos];
468        let event_item_id = event_item.unique_id().to_owned();
469
470        let Some(mut event_item) = event_item.as_event().cloned() else {
471            warn!("received a read receipt for a virtual item, this should not be possible");
472            return;
473        };
474
475        if let Some(remote_event_item) = event_item.as_remote_mut() {
476            trace!(%user_id, %event_id, "added read receipt to event item");
477            remote_event_item.read_receipts.insert(user_id, receipt);
478            items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
479        } else {
480            warn!("received a read receipt for a local item, this should not be possible");
481        }
482    }
483
484    /// Apply this update to the timeline.
485    fn apply(
486        mut self,
487        items: &mut ObservableItemsTransaction<'_>,
488        user_id: OwnedUserId,
489        receipt: Receipt,
490    ) {
491        self.remove_old_receipt(items, &user_id);
492        self.add_new_receipt(items, user_id, receipt);
493    }
494}
495
496impl TimelineStateTransaction<'_> {
497    pub(super) fn handle_explicit_read_receipts(
498        &mut self,
499        receipt_event_content: ReceiptEventContent,
500        own_user_id: &UserId,
501    ) {
502        trace!("handling explicit read receipts");
503        for (event_id, receipt_types) in receipt_event_content.0 {
504            for (receipt_type, receipts) in receipt_types {
505                // We only care about read receipts here.
506                if !matches!(receipt_type, ReceiptType::Read | ReceiptType::ReadPrivate) {
507                    continue;
508                }
509
510                for (user_id, receipt) in receipts {
511                    if !matches!(receipt.thread, ReceiptThread::Unthreaded | ReceiptThread::Main) {
512                        continue;
513                    }
514
515                    let is_own_user_id = user_id == own_user_id;
516                    let full_receipt = FullReceipt {
517                        event_id: &event_id,
518                        user_id: &user_id,
519                        receipt_type: receipt_type.clone(),
520                        receipt: &receipt,
521                    };
522
523                    self.meta.read_receipts.maybe_update_read_receipt(
524                        full_receipt,
525                        is_own_user_id,
526                        &mut self.items,
527                    );
528                }
529            }
530        }
531    }
532
533    /// Load the read receipts from the store for the given event ID.
534    ///
535    /// Populates the read receipts in-memory caches.
536    pub(super) async fn load_read_receipts_for_event<P: RoomDataProvider>(
537        &mut self,
538        event_id: &EventId,
539        room_data_provider: &P,
540    ) {
541        trace!(%event_id, "loading initial receipts for an event");
542        let read_receipts = room_data_provider.load_event_receipts(event_id).await;
543        let own_user_id = room_data_provider.own_user_id();
544
545        // Since they are explicit read receipts, we need to check if they are
546        // superseded by implicit read receipts.
547        for (user_id, receipt) in read_receipts {
548            let full_receipt = FullReceipt {
549                event_id,
550                user_id: &user_id,
551                receipt_type: ReceiptType::Read,
552                receipt: &receipt,
553            };
554
555            self.meta.read_receipts.maybe_update_read_receipt(
556                full_receipt,
557                user_id == own_user_id,
558                &mut self.items,
559            );
560        }
561    }
562
563    /// Add an implicit read receipt to the given event item, if it is more
564    /// recent than the current read receipt for the sender of the event.
565    ///
566    /// According to the spec, read receipts should not point to events sent by
567    /// our own user, but these events are used to reset the notification
568    /// count, so we need to handle them locally too. For that we create an
569    /// "implicit" read receipt, compared to the "explicit" ones sent by the
570    /// client.
571    pub(super) fn maybe_add_implicit_read_receipt(
572        &mut self,
573        event_id: &EventId,
574        sender: Option<&UserId>,
575        timestamp: Option<MilliSecondsSinceUnixEpoch>,
576    ) {
577        let (Some(user_id), Some(timestamp)) = (sender, timestamp) else {
578            // We cannot add a read receipt if we do not know the user or the timestamp.
579            return;
580        };
581
582        trace!(%event_id, "adding implicit read receipt");
583        let receipt = Receipt::new(timestamp);
584        let full_receipt =
585            FullReceipt { event_id, user_id, receipt_type: ReceiptType::Read, receipt: &receipt };
586
587        let is_own_event = sender.is_some_and(|sender| sender == self.meta.own_user_id);
588        self.meta.read_receipts.maybe_update_read_receipt(
589            full_receipt,
590            is_own_event,
591            &mut self.items,
592        );
593    }
594
595    /// Update the read receipts on the event with the given event ID and the
596    /// previous visible event because of a visibility change.
597    #[instrument(skip(self))]
598    pub(super) fn maybe_update_read_receipts_of_prev_event(&mut self, event_id: &EventId) {
599        // Find the previous visible event, if there is one.
600        let Some(prev_event_meta) = self
601            .items
602            .all_remote_events()
603            .iter()
604            .rev()
605            // Find the event item.
606            .skip_while(|meta| meta.event_id != event_id)
607            // Go past the event item.
608            .skip(1)
609            // Find the first visible item.
610            .find(|meta| meta.visible)
611        else {
612            trace!("Couldn't find any previous visible event, exiting");
613            return;
614        };
615
616        let Some((prev_item_pos, prev_event_item)) =
617            rfind_event_by_id(&self.items, &prev_event_meta.event_id)
618        else {
619            error!("inconsistent state: timeline item of visible event was not found");
620            return;
621        };
622
623        let prev_event_item_id = prev_event_item.internal_id.to_owned();
624        let mut prev_event_item = prev_event_item.clone();
625
626        let Some(remote_prev_event_item) = prev_event_item.as_remote_mut() else {
627            warn!("loading read receipts for a local item, this should not be possible");
628            return;
629        };
630
631        let read_receipts = self.meta.read_receipts.compute_event_receipts(
632            &remote_prev_event_item.event_id,
633            &mut self.items,
634            false,
635        );
636
637        // If the count did not change, the receipts did not change either.
638        if read_receipts.len() == remote_prev_event_item.read_receipts.len() {
639            trace!("same count of read receipts, not doing anything");
640            return;
641        }
642
643        trace!("replacing read receipts with the new ones");
644        remote_prev_event_item.read_receipts = read_receipts;
645        self.items.replace(prev_item_pos, TimelineItem::new(prev_event_item, prev_event_item_id));
646    }
647}
648
649impl TimelineState {
650    /// Populates our own latest read receipt in the in-memory by-user read
651    /// receipt cache.
652    pub(super) async fn populate_initial_user_receipt<P: RoomDataProvider>(
653        &mut self,
654        room_data_provider: &P,
655        receipt_type: ReceiptType,
656    ) {
657        let own_user_id = room_data_provider.own_user_id().to_owned();
658
659        let mut read_receipt = room_data_provider
660            .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, &own_user_id)
661            .await;
662
663        // Fallback to the one in the main thread.
664        if read_receipt.is_none() {
665            read_receipt = room_data_provider
666                .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, &own_user_id)
667                .await;
668        }
669
670        if let Some(read_receipt) = read_receipt {
671            self.meta.read_receipts.upsert_latest(own_user_id, receipt_type, read_receipt);
672        }
673    }
674
675    /// Get the latest read receipt for the given user.
676    ///
677    /// Useful to get the latest read receipt, whether it's private or public.
678    pub(super) async fn latest_user_read_receipt<P: RoomDataProvider>(
679        &self,
680        user_id: &UserId,
681        room_data_provider: &P,
682    ) -> Option<(OwnedEventId, Receipt)> {
683        let all_remote_events = self.items.all_remote_events();
684        let public_read_receipt = self
685            .meta
686            .user_receipt(user_id, ReceiptType::Read, room_data_provider, all_remote_events)
687            .await;
688        let private_read_receipt = self
689            .meta
690            .user_receipt(user_id, ReceiptType::ReadPrivate, room_data_provider, all_remote_events)
691            .await;
692
693        // Let's assume that a private read receipt should be more recent than a public
694        // read receipt, otherwise there's no point in the private read receipt,
695        // and use it as default.
696        match self.meta.compare_optional_receipts(
697            public_read_receipt.as_ref(),
698            private_read_receipt.as_ref(),
699            self.items.all_remote_events(),
700        ) {
701            Ordering::Greater => public_read_receipt,
702            Ordering::Less => private_read_receipt,
703            _ => unreachable!(),
704        }
705    }
706
707    /// Get the ID of the visible timeline event with the latest read receipt
708    /// for the given user.
709    pub(super) fn latest_user_read_receipt_timeline_event_id(
710        &self,
711        user_id: &UserId,
712    ) -> Option<OwnedEventId> {
713        // We only need to use the local map, since receipts for known events are
714        // already loaded from the store.
715        let public_read_receipt = self.meta.read_receipts.get_latest(user_id, &ReceiptType::Read);
716        let private_read_receipt =
717            self.meta.read_receipts.get_latest(user_id, &ReceiptType::ReadPrivate);
718
719        // Let's assume that a private read receipt should be more recent than a public
720        // read receipt, otherwise there's no point in the private read receipt,
721        // and use it as default.
722        let (latest_receipt_id, _) = match self.meta.compare_optional_receipts(
723            public_read_receipt,
724            private_read_receipt,
725            self.items.all_remote_events(),
726        ) {
727            Ordering::Greater => public_read_receipt?,
728            Ordering::Less => private_read_receipt?,
729            _ => unreachable!(),
730        };
731
732        // Find the corresponding visible event.
733        self.items
734            .all_remote_events()
735            .iter()
736            .rev()
737            .skip_while(|ev| ev.event_id != *latest_receipt_id)
738            .find(|ev| ev.visible)
739            .map(|ev| ev.event_id.clone())
740    }
741}
742
743impl TimelineMetadata {
744    /// Get the latest receipt of the given type for the given user in the
745    /// timeline.
746    ///
747    /// This will attempt to read the latest user receipt for a user from the
748    /// cache, or load it from the storage if missing from the cache.
749    pub(super) async fn user_receipt<P: RoomDataProvider>(
750        &self,
751        user_id: &UserId,
752        receipt_type: ReceiptType,
753        room_data_provider: &P,
754        all_remote_events: &AllRemoteEvents,
755    ) -> Option<(OwnedEventId, Receipt)> {
756        if let Some(receipt) = self.read_receipts.get_latest(user_id, &receipt_type) {
757            // Since it is in the timeline, it should be the most recent.
758            return Some(receipt.clone());
759        }
760
761        let unthreaded_read_receipt = room_data_provider
762            .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
763            .await;
764
765        let main_thread_read_receipt = room_data_provider
766            .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id)
767            .await;
768
769        // Let's use the unthreaded read receipt as default, since it's the one we
770        // should be using.
771        match self.compare_optional_receipts(
772            main_thread_read_receipt.as_ref(),
773            unthreaded_read_receipt.as_ref(),
774            all_remote_events,
775        ) {
776            Ordering::Greater => main_thread_read_receipt,
777            Ordering::Less => unthreaded_read_receipt,
778            _ => unreachable!(),
779        }
780    }
781
782    /// Compares two optional receipts to know which one is more recent.
783    ///
784    /// Returns `Ordering::Greater` if the left-hand side is more recent than
785    /// the right-hand side, and `Ordering::Less` if it is older. If it's
786    /// not possible to know which one is the more recent, defaults to
787    /// `Ordering::Less`, making the right-hand side the default.
788    fn compare_optional_receipts(
789        &self,
790        lhs: Option<&(OwnedEventId, Receipt)>,
791        rhs_or_default: Option<&(OwnedEventId, Receipt)>,
792        all_remote_events: &AllRemoteEvents,
793    ) -> Ordering {
794        // If we only have one, use it.
795        let Some((lhs_event_id, lhs_receipt)) = lhs else {
796            return Ordering::Less;
797        };
798        let Some((rhs_event_id, rhs_receipt)) = rhs_or_default else {
799            return Ordering::Greater;
800        };
801
802        // Compare by position in the timeline.
803        if let Some(relative_pos) =
804            self.compare_events_positions(lhs_event_id, rhs_event_id, all_remote_events)
805        {
806            if relative_pos == RelativePosition::Before {
807                return Ordering::Greater;
808            }
809
810            return Ordering::Less;
811        }
812
813        // Compare by timestamp.
814        if let Some((lhs_ts, rhs_ts)) = lhs_receipt.ts.zip(rhs_receipt.ts) {
815            if lhs_ts > rhs_ts {
816                return Ordering::Greater;
817            }
818
819            return Ordering::Less;
820        }
821
822        Ordering::Less
823    }
824}