Skip to main content

matrix_sdk_ui/timeline/controller/
read_receipts.rs

1// Copyright 2023 Kévin Commaille
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{cmp::Ordering, collections::HashMap};
16
17use futures_core::Stream;
18use indexmap::IndexMap;
19use ruma::{
20    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, UserId,
21    events::receipt::{Receipt, ReceiptEventContent, ReceiptThread, ReceiptType},
22};
23use tokio::sync::watch;
24use tokio_stream::wrappers::WatchStream;
25use tracing::{debug, error, instrument, trace, warn};
26
27use super::{
28    AllRemoteEvents, ObservableItemsTransaction, RelativePosition, RoomDataProvider,
29    TimelineMetadata, TimelineState, rfind_event_by_id,
30};
31use crate::timeline::{TimelineItem, TimelineItemContent, controller::TimelineStateTransaction};
32
33/// In-memory caches for read receipts.
34#[derive(Clone, Debug, Default)]
35pub(super) struct ReadReceipts {
36    /// Map of public read receipts on events.
37    ///
38    /// Event ID => User ID => Read receipt of the user.
39    by_event: HashMap<OwnedEventId, IndexMap<OwnedUserId, Receipt>>,
40
41    /// In-memory cache of all latest read receipts by user.
42    ///
43    /// User ID => Receipt type => Read receipt of the user of the given
44    /// type.
45    latest_by_user: HashMap<OwnedUserId, HashMap<ReceiptType, (OwnedEventId, Receipt)>>,
46
47    /// A sender to notify of changes to the receipts of our own user.
48    own_user_read_receipts_changed_sender: watch::Sender<()>,
49}
50
51impl ReadReceipts {
52    /// Empty the caches.
53    pub(super) fn clear(&mut self) {
54        self.by_event.clear();
55        self.latest_by_user.clear();
56    }
57
58    /// Subscribe to changes in the read receipts of our own user.
59    pub(super) fn subscribe_own_user_read_receipts_changed(
60        &self,
61    ) -> impl Stream<Item = ()> + use<> {
62        let subscriber = self.own_user_read_receipts_changed_sender.subscribe();
63        WatchStream::from_changes(subscriber)
64    }
65
66    /// Read the latest read receipt of the given type for the given user, from
67    /// the in-memory cache.
68    pub(crate) fn get_latest(
69        &self,
70        user_id: &UserId,
71        receipt_type: &ReceiptType,
72    ) -> Option<&(OwnedEventId, Receipt)> {
73        self.latest_by_user.get(user_id).and_then(|map| map.get(receipt_type))
74    }
75
76    /// Insert or update in the local cache the latest read receipt for the
77    /// given user.
78    fn upsert_latest(
79        &mut self,
80        user_id: OwnedUserId,
81        receipt_type: ReceiptType,
82        read_receipt: (OwnedEventId, Receipt),
83    ) {
84        self.latest_by_user.entry(user_id).or_default().insert(receipt_type, read_receipt);
85    }
86
87    /// Update the timeline items with the given read receipt if it is more
88    /// recent than the current one.
89    ///
90    /// In the process, if applicable, this method updates the inner maps to use
91    /// the new receipt. If `is_own_user_id` is `false`, it also updates the
92    /// receipts on the corresponding timeline items.
93    ///
94    /// Currently this method only works reliably if the timeline was started
95    /// from the end of the timeline.
96    #[instrument(skip_all, fields(user_id = %new_receipt.user_id, event_id = %new_receipt.event_id))]
97    fn maybe_update_read_receipt(
98        &mut self,
99        new_receipt: FullReceipt<'_>,
100        is_own_user_id: bool,
101        timeline_items: &mut ObservableItemsTransaction<'_>,
102    ) {
103        let all_events = timeline_items.all_remote_events();
104
105        // Get old receipt.
106        let old_receipt = self.get_latest(new_receipt.user_id, &new_receipt.receipt_type);
107
108        if old_receipt
109            .as_ref()
110            .is_some_and(|(old_receipt_event_id, _)| old_receipt_event_id == new_receipt.event_id)
111        {
112            // The receipt has not changed so there is nothing to do.
113            if !is_own_user_id {
114                trace!("receipt hasn't changed, nothing to do");
115            }
116            return;
117        }
118
119        let old_event_id = old_receipt.map(|(event_id, _)| event_id);
120
121        // Find receipts positions.
122        let mut old_receipt_pos = None;
123        let mut old_item_pos = None;
124        let mut old_item_event_id = None;
125        let mut new_receipt_pos = None;
126        let mut new_item_pos = None;
127        let mut new_item_event_id = None;
128
129        for (pos, event) in all_events.iter().rev().enumerate() {
130            if old_receipt_pos.is_none() && old_event_id == Some(&event.event_id) {
131                old_receipt_pos = Some(pos);
132            }
133
134            // The receipt should appear on the first visible event that can show read
135            // receipts.
136            if old_receipt_pos.is_some()
137                && old_item_event_id.is_none()
138                && event.visible
139                && event.can_show_read_receipts
140            {
141                old_item_pos = event.timeline_item_index;
142                old_item_event_id = Some(event.event_id.clone());
143            }
144
145            if new_receipt_pos.is_none() && new_receipt.event_id == event.event_id {
146                new_receipt_pos = Some(pos);
147            }
148
149            // The receipt should appear on the first visible event that can show read
150            // receipts.
151            if new_receipt_pos.is_some()
152                && new_item_event_id.is_none()
153                && event.visible
154                && event.can_show_read_receipts
155            {
156                new_item_pos = event.timeline_item_index;
157                new_item_event_id = Some(event.event_id.clone());
158            }
159
160            if old_item_event_id.is_some() && new_item_event_id.is_some() {
161                // We have everything we need, stop.
162                break;
163            }
164        }
165
166        // Check if the old receipt is more recent than the new receipt.
167        if let Some(old_receipt_pos) = old_receipt_pos {
168            let Some(new_receipt_pos) = new_receipt_pos else {
169                // The old receipt is more recent since we can't find the new receipt in the
170                // timeline and we supposedly have all events since the end of the timeline.
171                if !is_own_user_id {
172                    trace!(
173                        "we had a previous read receipt, but couldn't find the event \
174                         targeted by the new read receipt in the timeline, exiting"
175                    );
176                }
177                return;
178            };
179
180            if old_receipt_pos < new_receipt_pos {
181                // The old receipt is more recent than the new one.
182                if !is_own_user_id {
183                    trace!("the previous read receipt is more recent than the new one, exiting");
184                }
185                return;
186            }
187        }
188
189        // The new receipt is deemed more recent from now on because:
190        // - If old_receipt_pos is Some, we already checked all the cases where it
191        //   wouldn't be more recent.
192        // - If both old_receipt_pos and new_receipt_pos are None, they are both
193        //   explicit read receipts so the server should only send us a more recent
194        //   receipt.
195        // - If old_receipt_pos is None and new_receipt_pos is Some, the new receipt is
196        //   more recent because it has a place in the timeline.
197
198        if !is_own_user_id {
199            trace!(
200                from_event = ?old_event_id,
201                from_visible_event = ?old_item_event_id,
202                to_event = ?new_receipt.event_id,
203                to_visible_event = ?new_item_event_id,
204                ?old_item_pos,
205                ?new_item_pos,
206                "moving read receipt",
207            );
208
209            // Remove the old receipt from the old event.
210            if let Some(old_event_id) = old_event_id.cloned() {
211                self.remove_event_receipt_for_user(&old_event_id, new_receipt.user_id);
212            }
213
214            // Add the new receipt to the new event.
215            self.add_event_receipt_for_user(
216                new_receipt.event_id.to_owned(),
217                new_receipt.user_id.to_owned(),
218                new_receipt.receipt.clone(),
219            );
220        }
221
222        // Update the receipt of the user.
223        self.upsert_latest(
224            new_receipt.user_id.to_owned(),
225            new_receipt.receipt_type,
226            (new_receipt.event_id.to_owned(), new_receipt.receipt.clone()),
227        );
228
229        if is_own_user_id {
230            self.own_user_read_receipts_changed_sender.send_replace(());
231            // This receipt cannot change items in the timeline.
232            return;
233        }
234
235        if new_item_event_id == old_item_event_id {
236            // The receipt did not change in the timeline.
237            return;
238        }
239
240        let timeline_update = ReadReceiptTimelineUpdate {
241            old_item_pos,
242            old_event_id: old_item_event_id,
243            new_item_pos,
244            new_event_id: new_item_event_id,
245        };
246
247        timeline_update.apply(
248            timeline_items,
249            new_receipt.user_id.to_owned(),
250            new_receipt.receipt.clone(),
251        );
252    }
253
254    /// Returns the cached receipts by user for a given `event_id`.
255    fn get_event_receipts(&self, event_id: &EventId) -> Option<&IndexMap<OwnedUserId, Receipt>> {
256        self.by_event.get(event_id)
257    }
258
259    /// Mark the given event as seen by the user with the given receipt.
260    fn add_event_receipt_for_user(
261        &mut self,
262        event_id: OwnedEventId,
263        user_id: OwnedUserId,
264        receipt: Receipt,
265    ) {
266        self.by_event.entry(event_id).or_default().insert(user_id, receipt);
267    }
268
269    /// Unmark the given event as seen by the user.
270    fn remove_event_receipt_for_user(&mut self, event_id: &EventId, user_id: &UserId) {
271        if let Some(map) = self.by_event.get_mut(event_id) {
272            map.swap_remove(user_id);
273            // Remove the entire map if this was the last entry.
274            if map.is_empty() {
275                self.by_event.remove(event_id);
276            }
277        }
278    }
279
280    /// Get the read receipts by user for the given event.
281    ///
282    /// This includes all the receipts on the event as well as all the receipts
283    /// on the following events that are filtered out (not visible).
284    #[instrument(skip(self, timeline_items, at_end))]
285    pub(super) fn compute_event_receipts(
286        &self,
287        event_id: &EventId,
288        timeline_items: &mut ObservableItemsTransaction<'_>,
289        at_end: bool,
290    ) -> IndexMap<OwnedUserId, Receipt> {
291        let mut all_receipts = self.get_event_receipts(event_id).cloned().unwrap_or_default();
292
293        if at_end {
294            // No need to search for extra receipts, there are no events after.
295            trace!(
296                "early return because @end, retrieved receipts: {}",
297                all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
298            );
299            return all_receipts;
300        }
301
302        trace!(
303            "loaded receipts: {}",
304            all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
305        );
306
307        // We are going to add receipts for hidden events to this item.
308        //
309        // However: since we may be inserting an event at a random position, the
310        // previous timeline item may already be holding some hidden read
311        // receipts. As a result, we need to be careful here: if we're inserting
312        // after an event that holds hidden read receipts, then we should steal
313        // them from it.
314        //
315        // Find the event, go past it, and keep a reference to the previous rendered
316        // timeline item, if any.
317        let mut events_iter = timeline_items.all_remote_events().iter();
318        let mut prev_event_and_item_index = None;
319
320        for meta in events_iter.by_ref() {
321            if meta.event_id == event_id {
322                break;
323            }
324            if let Some(item_index) = meta.timeline_item_index {
325                prev_event_and_item_index = Some((meta.event_id.clone(), item_index));
326            }
327        }
328
329        // Include receipts from all the following events that are hidden or can't show
330        // read receipts.
331        let mut hidden = Vec::new();
332        for hidden_receipt_event_meta in
333            events_iter.take_while(|meta| !meta.visible || !meta.can_show_read_receipts)
334        {
335            if let Some(event_receipts) =
336                self.get_event_receipts(&hidden_receipt_event_meta.event_id)
337            {
338                trace!(%hidden_receipt_event_meta.event_id, "found receipts on hidden event");
339                hidden.extend(event_receipts.clone());
340            }
341        }
342
343        // Steal hidden receipts from the previous timeline item, if it carried them.
344        if let Some((prev_event_id, prev_item_index)) = prev_event_and_item_index {
345            let prev_item = &timeline_items[prev_item_index];
346            // Technically, we could unwrap the `as_event()`, because this is a rendered
347            // item for an event in all_remote_events, but this extra check is
348            // cheap.
349            if let Some(remote_prev_item) = prev_item.as_event() {
350                let prev_receipts = remote_prev_item.read_receipts().clone();
351                for (user_id, _) in &hidden {
352                    if !prev_receipts.contains_key(user_id) {
353                        continue;
354                    }
355                    let mut up = ReadReceiptTimelineUpdate {
356                        old_item_pos: Some(prev_item_index),
357                        old_event_id: Some(prev_event_id.clone()),
358                        new_item_pos: None,
359                        new_event_id: None,
360                    };
361                    up.remove_old_receipt(timeline_items, user_id);
362                }
363            }
364        }
365
366        all_receipts.extend(hidden);
367        trace!(
368            "computed receipts: {}",
369            all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
370        );
371        all_receipts
372    }
373}
374
375struct FullReceipt<'a> {
376    event_id: &'a EventId,
377    user_id: &'a UserId,
378    receipt_type: ReceiptType,
379    receipt: &'a Receipt,
380}
381
382/// A read receipt update in the timeline.
383#[derive(Clone, Debug, Default)]
384struct ReadReceiptTimelineUpdate {
385    /// The position of the timeline item that had the old receipt of the user,
386    /// if any.
387    old_item_pos: Option<usize>,
388    /// The old event that had the receipt of the user, if any.
389    old_event_id: Option<OwnedEventId>,
390    /// The position of the timeline item that has the new receipt of the user,
391    /// if any.
392    new_item_pos: Option<usize>,
393    /// The new event that has the receipt of the user, if any.
394    new_event_id: Option<OwnedEventId>,
395}
396
397impl ReadReceiptTimelineUpdate {
398    /// Remove the old receipt from the corresponding timeline item.
399    #[instrument(skip_all)]
400    fn remove_old_receipt(&mut self, items: &mut ObservableItemsTransaction<'_>, user_id: &UserId) {
401        let Some(event_id) = &self.old_event_id else {
402            // Nothing to do.
403            return;
404        };
405
406        let item_pos = self.old_item_pos.or_else(|| {
407            items
408                .iter_remotes_region()
409                .rev()
410                .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
411                .find_map(|(nth, event_item)| {
412                    (event_item.event_id() == Some(event_id)).then_some(nth)
413                })
414        });
415
416        let Some(item_pos) = item_pos else {
417            debug!(%event_id, %user_id, "inconsistent state: old event item for read receipt was not found");
418            return;
419        };
420
421        self.old_item_pos = Some(item_pos);
422
423        let event_item = &items[item_pos];
424        let event_item_id = event_item.unique_id().to_owned();
425
426        let Some(mut event_item) = event_item.as_event().cloned() else {
427            warn!("received a read receipt for a virtual item, this should not be possible");
428            return;
429        };
430
431        if let Some(remote_event_item) = event_item.as_remote_mut() {
432            if remote_event_item.read_receipts.swap_remove(user_id).is_none() {
433                debug!(
434                    %event_id, %user_id,
435                    "inconsistent state: old event item for user's read \
436                     receipt doesn't have a receipt for the user"
437                );
438            }
439            trace!(%user_id, %event_id, "removed read receipt from event item");
440            items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
441        } else {
442            warn!("received a read receipt for a local item, this should not be possible");
443        }
444    }
445
446    /// Add the new receipt to the corresponding timeline item.
447    #[instrument(skip_all)]
448    fn add_new_receipt(
449        self,
450        items: &mut ObservableItemsTransaction<'_>,
451        user_id: OwnedUserId,
452        receipt: Receipt,
453    ) {
454        let Some(event_id) = self.new_event_id else {
455            // Nothing to do.
456            return;
457        };
458
459        let old_item_pos = self.old_item_pos.unwrap_or(0);
460
461        let item_pos = self.new_item_pos.or_else(|| {
462            items
463                .iter_remotes_region()
464                // Don't iterate over all items if the `old_item_pos` is known: the `item_pos`
465                // for the new item is necessarily _after_ the old item.
466                .skip_while(|(nth, _)| *nth < old_item_pos)
467                .find_map(|(nth, item)| {
468                    if let Some(event_item) = item.as_event() {
469                        (event_item.event_id() == Some(&event_id)).then_some(nth)
470                    } else {
471                        None
472                    }
473                })
474        });
475
476        let Some(item_pos) = item_pos else {
477            debug!(
478                %event_id, %user_id,
479                "inconsistent state: new event item for read receipt was not found",
480            );
481            return;
482        };
483
484        debug_assert!(
485            item_pos >= self.old_item_pos.unwrap_or(0),
486            "The new receipt must be added on a timeline item that is _after_ the timeline item \
487             that was holding the old receipt"
488        );
489
490        let event_item = &items[item_pos];
491        let event_item_id = event_item.unique_id().to_owned();
492
493        let Some(mut event_item) = event_item.as_event().cloned() else {
494            warn!("received a read receipt for a virtual item, this should not be possible");
495            return;
496        };
497
498        if let Some(remote_event_item) = event_item.as_remote_mut() {
499            trace!(%user_id, %event_id, "added read receipt to event item");
500            remote_event_item.read_receipts.insert(user_id, receipt);
501            items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
502        } else {
503            warn!("received a read receipt for a local item, this should not be possible");
504        }
505    }
506
507    /// Apply this update to the timeline.
508    fn apply(
509        mut self,
510        items: &mut ObservableItemsTransaction<'_>,
511        user_id: OwnedUserId,
512        receipt: Receipt,
513    ) {
514        self.remove_old_receipt(items, &user_id);
515        self.add_new_receipt(items, user_id, receipt);
516    }
517}
518
519impl<P: RoomDataProvider> TimelineStateTransaction<'_, P> {
520    pub(super) fn handle_explicit_read_receipts(
521        &mut self,
522        receipt_event_content: ReceiptEventContent,
523        own_user_id: &UserId,
524    ) {
525        trace!("handling explicit read receipts");
526        let own_receipt_thread = self.focus.receipt_thread();
527
528        for (event_id, receipt_types) in receipt_event_content.0 {
529            for (receipt_type, receipts) in receipt_types {
530                // Discard the read marker updates in this function.
531                if !matches!(receipt_type, ReceiptType::Read | ReceiptType::ReadPrivate) {
532                    continue;
533                }
534
535                for (user_id, receipt) in receipts {
536                    if matches!(own_receipt_thread, ReceiptThread::Unthreaded | ReceiptThread::Main)
537                    {
538                        // If the own receipt thread is unthreaded or main, we maintain maximal
539                        // compatibility with clients using either unthreaded or main-thread read
540                        // receipts by allowing both here.
541                        match receipt.thread {
542                            ReceiptThread::Unthreaded | ReceiptThread::Main => {
543                                // Processing happens below.
544                            }
545
546                            ReceiptThread::Thread(thread_root) => {
547                                // Special processing for threads: try to find a timeline item for
548                                // the root, and update its thread summary, if the receipt is for
549                                // ourselves.
550                                //
551                                // TODO: This is temporary code, and should be removed when #4113 is
552                                // done.
553                                if user_id == self.meta.own_user_id
554                                    && let Some((item_pos, item)) =
555                                        rfind_event_by_id(&self.items, &thread_root)
556                                {
557                                    trace!(
558                                        "thread root has been found; will update thread summary with the latest receipts"
559                                    );
560
561                                    let item_id = item.internal_id.to_owned();
562                                    let mut new_item = item.clone();
563
564                                    let TimelineItemContent::MsgLike(msglike) =
565                                        &mut new_item.content
566                                    else {
567                                        // Only MsgLike items have a thread summary.
568                                        continue;
569                                    };
570
571                                    let Some(thread_summary) = &mut msglike.thread_summary else {
572                                        // No thread summary to update.
573                                        continue;
574                                    };
575
576                                    // Assume read receipts only move forward, at the moment.
577                                    if receipt_type == ReceiptType::Read {
578                                        thread_summary.public_read_receipt_event_id =
579                                            Some(event_id.clone());
580                                    } else if receipt_type == ReceiptType::ReadPrivate {
581                                        thread_summary.private_read_receipt_event_id =
582                                            Some(event_id.clone());
583                                    } else {
584                                        // We don't know this receipt type; skip it.
585                                        continue;
586                                    }
587
588                                    self.items
589                                        .replace(item_pos, TimelineItem::new(new_item, item_id));
590                                }
591
592                                // Couldn't find an item for this new read receipt; process the
593                                // next receipt.
594                                continue;
595                            }
596
597                            _ => {
598                                // No processing: ignore.
599                                continue;
600                            }
601                        }
602                    } else if own_receipt_thread != receipt.thread {
603                        // Otherwise, we only keep the receipts of the same thread kind.
604                        continue;
605                    }
606
607                    let is_own_user_id = user_id == own_user_id;
608                    let full_receipt = FullReceipt {
609                        event_id: &event_id,
610                        user_id: &user_id,
611                        receipt_type: receipt_type.clone(),
612                        receipt: &receipt,
613                    };
614
615                    self.meta.read_receipts.maybe_update_read_receipt(
616                        full_receipt,
617                        is_own_user_id,
618                        &mut self.items,
619                    );
620                }
621            }
622        }
623    }
624
625    /// Load the read receipts from the store for the given event ID.
626    ///
627    /// Populates the read receipts in-memory caches.
628    pub(super) async fn load_read_receipts_for_event(
629        &mut self,
630        event_id: &EventId,
631        room_data_provider: &P,
632    ) {
633        trace!(%event_id, "loading initial receipts for an event");
634
635        let receipt_thread = self.focus.receipt_thread();
636
637        let receipts = if matches!(receipt_thread, ReceiptThread::Unthreaded | ReceiptThread::Main)
638        {
639            // If the requested receipt thread is unthreaded or main, we maintain maximal
640            // compatibility with clients using either unthreaded or main-thread read
641            // receipts by allowing both here.
642
643            // First, load the main receipts.
644            let mut main_receipts =
645                room_data_provider.load_event_receipts(event_id, ReceiptThread::Main).await;
646
647            // Then, load the unthreaded receipts.
648            let unthreaded_receipts =
649                room_data_provider.load_event_receipts(event_id, ReceiptThread::Unthreaded).await;
650
651            // We can safely extend both here: if a key is already set, then that means that
652            // the user has the unthreaded and main receipt on the main event,
653            // which is fine, and something we display as the one user receipt.
654            main_receipts.extend(unthreaded_receipts);
655            main_receipts
656        } else {
657            // In all other cases, return what's requested, and only that (threaded
658            // receipts).
659            room_data_provider.load_event_receipts(event_id, receipt_thread.clone()).await
660        };
661
662        let own_user_id = room_data_provider.own_user_id();
663
664        // Since they are explicit read receipts, we need to check if they are
665        // superseded by implicit read receipts.
666        for (user_id, receipt) in receipts {
667            let full_receipt = FullReceipt {
668                event_id,
669                user_id: &user_id,
670                receipt_type: ReceiptType::Read,
671                receipt: &receipt,
672            };
673
674            self.meta.read_receipts.maybe_update_read_receipt(
675                full_receipt,
676                user_id == own_user_id,
677                &mut self.items,
678            );
679        }
680    }
681
682    /// Add an implicit read receipt to the given event item, if it is more
683    /// recent than the current read receipt for the sender of the event.
684    ///
685    /// According to the spec, read receipts should not point to events sent by
686    /// our own user, but these events are used to reset the notification
687    /// count, so we need to handle them locally too. For that we create an
688    /// "implicit" read receipt, compared to the "explicit" ones sent by the
689    /// client.
690    pub(super) fn maybe_add_implicit_read_receipt(
691        &mut self,
692        event_id: &EventId,
693        sender: Option<&UserId>,
694        timestamp: Option<MilliSecondsSinceUnixEpoch>,
695    ) {
696        let (Some(user_id), Some(timestamp)) = (sender, timestamp) else {
697            // We cannot add a read receipt if we do not know the user or the timestamp.
698            return;
699        };
700
701        trace!(%user_id, %event_id, "adding implicit read receipt");
702
703        let mut receipt = Receipt::new(timestamp);
704        receipt.thread = self.focus.receipt_thread();
705
706        let full_receipt =
707            FullReceipt { event_id, user_id, receipt_type: ReceiptType::Read, receipt: &receipt };
708
709        let is_own_event = sender.is_some_and(|sender| sender == self.meta.own_user_id);
710
711        self.meta.read_receipts.maybe_update_read_receipt(
712            full_receipt,
713            is_own_event,
714            &mut self.items,
715        );
716    }
717
718    /// Update the read receipts on the event with the given event ID and the
719    /// previous visible event because of a visibility change.
720    #[instrument(skip(self))]
721    pub(super) fn maybe_update_read_receipts_of_prev_event(&mut self, event_id: &EventId) {
722        // Find the previous visible event, if there is one.
723        let Some(prev_event_meta) = self
724            .items
725            .all_remote_events()
726            .iter()
727            .rev()
728            // Find the event item.
729            .skip_while(|meta| meta.event_id != event_id)
730            // Go past the event item.
731            .skip(1)
732            // Find the first visible item that can show read receipts.
733            .find(|meta| meta.visible && meta.can_show_read_receipts)
734        else {
735            trace!("Couldn't find any previous visible event, exiting");
736            return;
737        };
738
739        let Some((prev_item_pos, prev_event_item)) =
740            rfind_event_by_id(&self.items, &prev_event_meta.event_id)
741        else {
742            error!("inconsistent state: timeline item of visible event was not found");
743            return;
744        };
745
746        let prev_event_item_id = prev_event_item.internal_id.to_owned();
747        let mut prev_event_item = prev_event_item.clone();
748
749        let Some(remote_prev_event_item) = prev_event_item.as_remote_mut() else {
750            warn!("loading read receipts for a local item, this should not be possible");
751            return;
752        };
753
754        let read_receipts = self.meta.read_receipts.compute_event_receipts(
755            &remote_prev_event_item.event_id,
756            &mut self.items,
757            false,
758        );
759
760        // If the count did not change, the receipts did not change either.
761        if read_receipts.len() == remote_prev_event_item.read_receipts.len() {
762            trace!("same count of read receipts, not doing anything");
763            return;
764        }
765
766        trace!("replacing read receipts with the new ones");
767        remote_prev_event_item.read_receipts = read_receipts;
768        self.items.replace(prev_item_pos, TimelineItem::new(prev_event_item, prev_event_item_id));
769    }
770}
771
772impl<P: RoomDataProvider> TimelineState<P> {
773    /// Populates our own latest read receipt in the in-memory by-user read
774    /// receipt cache.
775    pub(super) async fn populate_initial_user_receipt(
776        &mut self,
777        room_data_provider: &P,
778        receipt_type: ReceiptType,
779    ) {
780        let own_user_id = room_data_provider.own_user_id().to_owned();
781
782        let receipt_thread = self.focus.receipt_thread();
783        let wants_unthreaded_receipts = receipt_thread == ReceiptThread::Unthreaded;
784
785        let mut read_receipt = room_data_provider
786            .load_user_receipt(receipt_type.clone(), receipt_thread, &own_user_id)
787            .await;
788
789        if wants_unthreaded_receipts && read_receipt.is_none() {
790            // Fallback to the one in the main thread.
791            read_receipt = room_data_provider
792                .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, &own_user_id)
793                .await;
794        }
795
796        if let Some(read_receipt) = read_receipt {
797            self.meta.read_receipts.upsert_latest(own_user_id, receipt_type, read_receipt);
798        }
799    }
800
801    /// Get the latest read receipt for the given user.
802    ///
803    /// Useful to get the latest read receipt, whether it's private or public.
804    pub(super) async fn latest_user_read_receipt(
805        &self,
806        user_id: &UserId,
807        receipt_thread: ReceiptThread,
808        room_data_provider: &P,
809    ) -> Option<(OwnedEventId, Receipt)> {
810        let all_remote_events = self.items.all_remote_events();
811
812        let public_read_receipt = self
813            .meta
814            .user_receipt(
815                user_id,
816                ReceiptType::Read,
817                receipt_thread.clone(),
818                room_data_provider,
819                all_remote_events,
820            )
821            .await;
822
823        let private_read_receipt = self
824            .meta
825            .user_receipt(
826                user_id,
827                ReceiptType::ReadPrivate,
828                receipt_thread,
829                room_data_provider,
830                all_remote_events,
831            )
832            .await;
833
834        // Let's assume that a private read receipt should be more recent than a public
835        // read receipt (otherwise there's no point in the private read receipt),
836        // and use it as the default.
837        match TimelineMetadata::compare_optional_receipts(
838            public_read_receipt.as_ref(),
839            private_read_receipt.as_ref(),
840            all_remote_events,
841        ) {
842            Ordering::Greater => public_read_receipt,
843            Ordering::Less => private_read_receipt,
844            _ => unreachable!(),
845        }
846    }
847
848    /// Get the ID of the visible timeline event with the latest read receipt
849    /// for the given user.
850    pub(super) fn latest_user_read_receipt_timeline_event_id(
851        &self,
852        user_id: &UserId,
853    ) -> Option<OwnedEventId> {
854        // We only need to use the local map, since receipts for known events are
855        // already loaded from the store.
856        let public_read_receipt = self.meta.read_receipts.get_latest(user_id, &ReceiptType::Read);
857        let private_read_receipt =
858            self.meta.read_receipts.get_latest(user_id, &ReceiptType::ReadPrivate);
859
860        // Let's assume that a private read receipt should be more recent than a public
861        // read receipt, otherwise there's no point in the private read receipt,
862        // and use it as default.
863        let (latest_receipt_id, _) = match TimelineMetadata::compare_optional_receipts(
864            public_read_receipt,
865            private_read_receipt,
866            self.items.all_remote_events(),
867        ) {
868            Ordering::Greater => public_read_receipt?,
869            Ordering::Less => private_read_receipt?,
870            _ => unreachable!(),
871        };
872
873        // Find the corresponding visible event.
874        self.items
875            .all_remote_events()
876            .iter()
877            .rev()
878            .skip_while(|ev| ev.event_id != *latest_receipt_id)
879            .find(|ev| ev.visible && ev.can_show_read_receipts)
880            .map(|ev| ev.event_id.clone())
881    }
882}
883
884impl TimelineMetadata {
885    /// Get the latest receipt of the given type for the given user in the
886    /// timeline.
887    ///
888    /// This will attempt to read the latest user receipt for a user from the
889    /// cache, or load it from the storage if missing from the cache.
890    ///
891    /// If the `ReceiptThread` is `Unthreaded`, it will try to find either the
892    /// unthreaded or the main-thread read receipt, to be maximally
893    /// compatible with clients using one or the other. Otherwise, it will
894    /// select only the receipts for that specific thread.
895    pub(super) async fn user_receipt<P: RoomDataProvider>(
896        &self,
897        user_id: &UserId,
898        receipt_type: ReceiptType,
899        receipt_thread: ReceiptThread,
900        room_data_provider: &P,
901        all_remote_events: &AllRemoteEvents,
902    ) -> Option<(OwnedEventId, Receipt)> {
903        if let Some(receipt) = self.read_receipts.get_latest(user_id, &receipt_type) {
904            // Since it is in the timeline, it should be the most recent.
905            return Some(receipt.clone());
906        }
907
908        if receipt_thread == ReceiptThread::Unthreaded {
909            // Maintain compatibility with clients using either the unthreaded and main read
910            // receipts, and try to find the most recent one.
911            let unthreaded_read_receipt = room_data_provider
912                .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
913                .await;
914
915            let main_thread_read_receipt = room_data_provider
916                .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id)
917                .await;
918
919            // Let's use the unthreaded read receipt as default, since it's the one we
920            // should be using.
921            match Self::compare_optional_receipts(
922                main_thread_read_receipt.as_ref(),
923                unthreaded_read_receipt.as_ref(),
924                all_remote_events,
925            ) {
926                Ordering::Greater => main_thread_read_receipt,
927                Ordering::Less => unthreaded_read_receipt,
928                _ => unreachable!(),
929            }
930        } else {
931            // In all the other cases, use the thread's read receipt. A main-thread receipt
932            // in particular will use this code path, and not be compatible with
933            // an unthreaded read receipt.
934            room_data_provider
935                .load_user_receipt(receipt_type.clone(), receipt_thread, user_id)
936                .await
937        }
938    }
939
940    /// Compares two optional receipts to know which one is more recent.
941    ///
942    /// Returns `Ordering::Greater` if the left-hand side is more recent than
943    /// the right-hand side, and `Ordering::Less` if it is older. If it's
944    /// not possible to know which one is the more recent, defaults to
945    /// `Ordering::Less`, making the right-hand side the default.
946    fn compare_optional_receipts(
947        lhs: Option<&(OwnedEventId, Receipt)>,
948        rhs_or_default: Option<&(OwnedEventId, Receipt)>,
949        all_remote_events: &AllRemoteEvents,
950    ) -> Ordering {
951        // If we only have one, use it.
952        let Some((lhs_event_id, lhs_receipt)) = lhs else {
953            return Ordering::Less;
954        };
955        let Some((rhs_event_id, rhs_receipt)) = rhs_or_default else {
956            return Ordering::Greater;
957        };
958
959        // Compare by position in the timeline.
960        if let Some(relative_pos) =
961            Self::compare_events_positions(lhs_event_id, rhs_event_id, all_remote_events)
962        {
963            if relative_pos == RelativePosition::Before {
964                return Ordering::Greater;
965            }
966
967            return Ordering::Less;
968        }
969
970        // Compare by timestamp.
971        if let Some((lhs_ts, rhs_ts)) = lhs_receipt.ts.zip(rhs_receipt.ts) {
972            if lhs_ts > rhs_ts {
973                return Ordering::Greater;
974            }
975
976            return Ordering::Less;
977        }
978
979        Ordering::Less
980    }
981}