1use std::{cmp::Ordering, collections::HashMap};
16
17use futures_core::Stream;
18use indexmap::IndexMap;
19use ruma::{
20 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, UserId,
21 events::receipt::{Receipt, ReceiptEventContent, ReceiptThread, ReceiptType},
22};
23use tokio::sync::watch;
24use tokio_stream::wrappers::WatchStream;
25use tracing::{debug, error, instrument, trace, warn};
26
27use super::{
28 AllRemoteEvents, ObservableItemsTransaction, RelativePosition, RoomDataProvider,
29 TimelineMetadata, TimelineState, rfind_event_by_id,
30};
31use crate::timeline::{TimelineItem, controller::TimelineStateTransaction};
32
33#[derive(Clone, Debug, Default)]
35pub(super) struct ReadReceipts {
36 by_event: HashMap<OwnedEventId, IndexMap<OwnedUserId, Receipt>>,
40
41 latest_by_user: HashMap<OwnedUserId, HashMap<ReceiptType, (OwnedEventId, Receipt)>>,
46
47 own_user_read_receipts_changed_sender: watch::Sender<()>,
49}
50
51impl ReadReceipts {
52 pub(super) fn clear(&mut self) {
54 self.by_event.clear();
55 self.latest_by_user.clear();
56 }
57
58 pub(super) fn subscribe_own_user_read_receipts_changed(
60 &self,
61 ) -> impl Stream<Item = ()> + use<> {
62 let subscriber = self.own_user_read_receipts_changed_sender.subscribe();
63 WatchStream::from_changes(subscriber)
64 }
65
66 fn get_latest(
69 &self,
70 user_id: &UserId,
71 receipt_type: &ReceiptType,
72 ) -> Option<&(OwnedEventId, Receipt)> {
73 self.latest_by_user.get(user_id).and_then(|map| map.get(receipt_type))
74 }
75
76 fn upsert_latest(
79 &mut self,
80 user_id: OwnedUserId,
81 receipt_type: ReceiptType,
82 read_receipt: (OwnedEventId, Receipt),
83 ) {
84 self.latest_by_user.entry(user_id).or_default().insert(receipt_type, read_receipt);
85 }
86
87 #[instrument(skip_all, fields(user_id = %new_receipt.user_id, event_id = %new_receipt.event_id))]
97 fn maybe_update_read_receipt(
98 &mut self,
99 new_receipt: FullReceipt<'_>,
100 is_own_user_id: bool,
101 timeline_items: &mut ObservableItemsTransaction<'_>,
102 ) {
103 let all_events = timeline_items.all_remote_events();
104
105 let old_receipt = self.get_latest(new_receipt.user_id, &new_receipt.receipt_type);
107
108 if old_receipt
109 .as_ref()
110 .is_some_and(|(old_receipt_event_id, _)| old_receipt_event_id == new_receipt.event_id)
111 {
112 if !is_own_user_id {
114 trace!("receipt hasn't changed, nothing to do");
115 }
116 return;
117 }
118
119 let old_event_id = old_receipt.map(|(event_id, _)| event_id);
120
121 let mut old_receipt_pos = None;
123 let mut old_item_pos = None;
124 let mut old_item_event_id = None;
125 let mut new_receipt_pos = None;
126 let mut new_item_pos = None;
127 let mut new_item_event_id = None;
128
129 for (pos, event) in all_events.iter().rev().enumerate() {
130 if old_receipt_pos.is_none() && old_event_id == Some(&event.event_id) {
131 old_receipt_pos = Some(pos);
132 }
133
134 if old_receipt_pos.is_some() && old_item_event_id.is_none() && event.visible {
136 old_item_pos = event.timeline_item_index;
137 old_item_event_id = Some(event.event_id.clone());
138 }
139
140 if new_receipt_pos.is_none() && new_receipt.event_id == event.event_id {
141 new_receipt_pos = Some(pos);
142 }
143
144 if new_receipt_pos.is_some() && new_item_event_id.is_none() && event.visible {
146 new_item_pos = event.timeline_item_index;
147 new_item_event_id = Some(event.event_id.clone());
148 }
149
150 if old_item_event_id.is_some() && new_item_event_id.is_some() {
151 break;
153 }
154 }
155
156 if let Some(old_receipt_pos) = old_receipt_pos {
158 let Some(new_receipt_pos) = new_receipt_pos else {
159 if !is_own_user_id {
162 trace!(
163 "we had a previous read receipt, but couldn't find the event \
164 targeted by the new read receipt in the timeline, exiting"
165 );
166 }
167 return;
168 };
169
170 if old_receipt_pos < new_receipt_pos {
171 if !is_own_user_id {
173 trace!("the previous read receipt is more recent than the new one, exiting");
174 }
175 return;
176 }
177 }
178
179 if !is_own_user_id {
189 trace!(
190 from_event = ?old_event_id,
191 from_visible_event = ?old_item_event_id,
192 to_event = ?new_receipt.event_id,
193 to_visible_event = ?new_item_event_id,
194 ?old_item_pos,
195 ?new_item_pos,
196 "moving read receipt",
197 );
198
199 if let Some(old_event_id) = old_event_id.cloned() {
201 self.remove_event_receipt_for_user(&old_event_id, new_receipt.user_id);
202 }
203
204 self.add_event_receipt_for_user(
206 new_receipt.event_id.to_owned(),
207 new_receipt.user_id.to_owned(),
208 new_receipt.receipt.clone(),
209 );
210 }
211
212 self.upsert_latest(
214 new_receipt.user_id.to_owned(),
215 new_receipt.receipt_type,
216 (new_receipt.event_id.to_owned(), new_receipt.receipt.clone()),
217 );
218
219 if is_own_user_id {
220 self.own_user_read_receipts_changed_sender.send_replace(());
221 return;
223 }
224
225 if new_item_event_id == old_item_event_id {
226 return;
228 }
229
230 let timeline_update = ReadReceiptTimelineUpdate {
231 old_item_pos,
232 old_event_id: old_item_event_id,
233 new_item_pos,
234 new_event_id: new_item_event_id,
235 };
236
237 timeline_update.apply(
238 timeline_items,
239 new_receipt.user_id.to_owned(),
240 new_receipt.receipt.clone(),
241 );
242 }
243
244 fn get_event_receipts(&self, event_id: &EventId) -> Option<&IndexMap<OwnedUserId, Receipt>> {
246 self.by_event.get(event_id)
247 }
248
249 fn add_event_receipt_for_user(
251 &mut self,
252 event_id: OwnedEventId,
253 user_id: OwnedUserId,
254 receipt: Receipt,
255 ) {
256 self.by_event.entry(event_id).or_default().insert(user_id, receipt);
257 }
258
259 fn remove_event_receipt_for_user(&mut self, event_id: &EventId, user_id: &UserId) {
261 if let Some(map) = self.by_event.get_mut(event_id) {
262 map.swap_remove(user_id);
263 if map.is_empty() {
265 self.by_event.remove(event_id);
266 }
267 }
268 }
269
270 #[instrument(skip(self, timeline_items, at_end))]
275 pub(super) fn compute_event_receipts(
276 &self,
277 event_id: &EventId,
278 timeline_items: &mut ObservableItemsTransaction<'_>,
279 at_end: bool,
280 ) -> IndexMap<OwnedUserId, Receipt> {
281 let mut all_receipts = self.get_event_receipts(event_id).cloned().unwrap_or_default();
282
283 if at_end {
284 trace!(
286 "early return because @end, retrieved receipts: {}",
287 all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
288 );
289 return all_receipts;
290 }
291
292 trace!(
293 "loaded receipts: {}",
294 all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
295 );
296
297 let mut events_iter = timeline_items.all_remote_events().iter();
308 let mut prev_event_and_item_index = None;
309
310 for meta in events_iter.by_ref() {
311 if meta.event_id == event_id {
312 break;
313 }
314 if let Some(item_index) = meta.timeline_item_index {
315 prev_event_and_item_index = Some((meta.event_id.clone(), item_index));
316 }
317 }
318
319 let mut hidden = Vec::new();
321 for hidden_event_meta in events_iter.take_while(|meta| !meta.visible) {
322 if let Some(event_receipts) = self.get_event_receipts(&hidden_event_meta.event_id) {
323 trace!(%hidden_event_meta.event_id, "found receipts on hidden event");
324 hidden.extend(event_receipts.clone());
325 }
326 }
327
328 if let Some((prev_event_id, prev_item_index)) = prev_event_and_item_index {
330 let prev_item = &timeline_items[prev_item_index];
331 if let Some(remote_prev_item) = prev_item.as_event() {
335 let prev_receipts = remote_prev_item.read_receipts().clone();
336 for (user_id, _) in &hidden {
337 if !prev_receipts.contains_key(user_id) {
338 continue;
339 }
340 let mut up = ReadReceiptTimelineUpdate {
341 old_item_pos: Some(prev_item_index),
342 old_event_id: Some(prev_event_id.clone()),
343 new_item_pos: None,
344 new_event_id: None,
345 };
346 up.remove_old_receipt(timeline_items, user_id);
347 }
348 }
349 }
350
351 all_receipts.extend(hidden);
352 trace!(
353 "computed receipts: {}",
354 all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
355 );
356 all_receipts
357 }
358}
359
360struct FullReceipt<'a> {
361 event_id: &'a EventId,
362 user_id: &'a UserId,
363 receipt_type: ReceiptType,
364 receipt: &'a Receipt,
365}
366
367#[derive(Clone, Debug, Default)]
369struct ReadReceiptTimelineUpdate {
370 old_item_pos: Option<usize>,
373 old_event_id: Option<OwnedEventId>,
375 new_item_pos: Option<usize>,
378 new_event_id: Option<OwnedEventId>,
380}
381
382impl ReadReceiptTimelineUpdate {
383 #[instrument(skip_all)]
385 fn remove_old_receipt(&mut self, items: &mut ObservableItemsTransaction<'_>, user_id: &UserId) {
386 let Some(event_id) = &self.old_event_id else {
387 return;
389 };
390
391 let item_pos = self.old_item_pos.or_else(|| {
392 items
393 .iter_remotes_region()
394 .rev()
395 .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
396 .find_map(|(nth, event_item)| {
397 (event_item.event_id() == Some(event_id)).then_some(nth)
398 })
399 });
400
401 let Some(item_pos) = item_pos else {
402 debug!(%event_id, %user_id, "inconsistent state: old event item for read receipt was not found");
403 return;
404 };
405
406 self.old_item_pos = Some(item_pos);
407
408 let event_item = &items[item_pos];
409 let event_item_id = event_item.unique_id().to_owned();
410
411 let Some(mut event_item) = event_item.as_event().cloned() else {
412 warn!("received a read receipt for a virtual item, this should not be possible");
413 return;
414 };
415
416 if let Some(remote_event_item) = event_item.as_remote_mut() {
417 if remote_event_item.read_receipts.swap_remove(user_id).is_none() {
418 debug!(
419 %event_id, %user_id,
420 "inconsistent state: old event item for user's read \
421 receipt doesn't have a receipt for the user"
422 );
423 }
424 trace!(%user_id, %event_id, "removed read receipt from event item");
425 items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
426 } else {
427 warn!("received a read receipt for a local item, this should not be possible");
428 }
429 }
430
431 #[instrument(skip_all)]
433 fn add_new_receipt(
434 self,
435 items: &mut ObservableItemsTransaction<'_>,
436 user_id: OwnedUserId,
437 receipt: Receipt,
438 ) {
439 let Some(event_id) = self.new_event_id else {
440 return;
442 };
443
444 let old_item_pos = self.old_item_pos.unwrap_or(0);
445
446 let item_pos = self.new_item_pos.or_else(|| {
447 items
448 .iter_remotes_region()
449 .skip_while(|(nth, _)| *nth < old_item_pos)
452 .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
453 .find_map(|(nth, event_item)| {
454 (event_item.event_id() == Some(&event_id)).then_some(nth)
455 })
456 });
457
458 let Some(item_pos) = item_pos else {
459 debug!(
460 %event_id, %user_id,
461 "inconsistent state: new event item for read receipt was not found",
462 );
463 return;
464 };
465
466 debug_assert!(
467 item_pos >= self.old_item_pos.unwrap_or(0),
468 "The new receipt must be added on a timeline item that is _after_ the timeline item \
469 that was holding the old receipt"
470 );
471
472 let event_item = &items[item_pos];
473 let event_item_id = event_item.unique_id().to_owned();
474
475 let Some(mut event_item) = event_item.as_event().cloned() else {
476 warn!("received a read receipt for a virtual item, this should not be possible");
477 return;
478 };
479
480 if let Some(remote_event_item) = event_item.as_remote_mut() {
481 trace!(%user_id, %event_id, "added read receipt to event item");
482 remote_event_item.read_receipts.insert(user_id, receipt);
483 items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
484 } else {
485 warn!("received a read receipt for a local item, this should not be possible");
486 }
487 }
488
489 fn apply(
491 mut self,
492 items: &mut ObservableItemsTransaction<'_>,
493 user_id: OwnedUserId,
494 receipt: Receipt,
495 ) {
496 self.remove_old_receipt(items, &user_id);
497 self.add_new_receipt(items, user_id, receipt);
498 }
499}
500
501impl<P: RoomDataProvider> TimelineStateTransaction<'_, P> {
502 pub(super) fn handle_explicit_read_receipts(
503 &mut self,
504 receipt_event_content: ReceiptEventContent,
505 own_user_id: &UserId,
506 ) {
507 trace!("handling explicit read receipts");
508 let own_receipt_thread = self.focus.receipt_thread();
509
510 for (event_id, receipt_types) in receipt_event_content.0 {
511 for (receipt_type, receipts) in receipt_types {
512 if !matches!(receipt_type, ReceiptType::Read | ReceiptType::ReadPrivate) {
514 continue;
515 }
516
517 for (user_id, receipt) in receipts {
518 if matches!(own_receipt_thread, ReceiptThread::Unthreaded | ReceiptThread::Main)
519 {
520 if !matches!(
524 receipt.thread,
525 ReceiptThread::Unthreaded | ReceiptThread::Main
526 ) {
527 continue;
528 }
529 } else if own_receipt_thread != receipt.thread {
530 continue;
532 }
533
534 let is_own_user_id = user_id == own_user_id;
535 let full_receipt = FullReceipt {
536 event_id: &event_id,
537 user_id: &user_id,
538 receipt_type: receipt_type.clone(),
539 receipt: &receipt,
540 };
541
542 self.meta.read_receipts.maybe_update_read_receipt(
543 full_receipt,
544 is_own_user_id,
545 &mut self.items,
546 );
547 }
548 }
549 }
550 }
551
552 pub(super) async fn load_read_receipts_for_event(
556 &mut self,
557 event_id: &EventId,
558 room_data_provider: &P,
559 ) {
560 trace!(%event_id, "loading initial receipts for an event");
561
562 let receipt_thread = self.focus.receipt_thread();
563 let read_receipts = room_data_provider.load_event_receipts(event_id, receipt_thread).await;
564 let own_user_id = room_data_provider.own_user_id();
565
566 for (user_id, receipt) in read_receipts {
569 let full_receipt = FullReceipt {
570 event_id,
571 user_id: &user_id,
572 receipt_type: ReceiptType::Read,
573 receipt: &receipt,
574 };
575
576 self.meta.read_receipts.maybe_update_read_receipt(
577 full_receipt,
578 user_id == own_user_id,
579 &mut self.items,
580 );
581 }
582 }
583
584 pub(super) fn maybe_add_implicit_read_receipt(
593 &mut self,
594 event_id: &EventId,
595 sender: Option<&UserId>,
596 timestamp: Option<MilliSecondsSinceUnixEpoch>,
597 ) {
598 let (Some(user_id), Some(timestamp)) = (sender, timestamp) else {
599 return;
601 };
602
603 trace!(%user_id, %event_id, "adding implicit read receipt");
604
605 let mut receipt = Receipt::new(timestamp);
606 receipt.thread = self.focus.receipt_thread();
607
608 let full_receipt =
609 FullReceipt { event_id, user_id, receipt_type: ReceiptType::Read, receipt: &receipt };
610
611 let is_own_event = sender.is_some_and(|sender| sender == self.meta.own_user_id);
612
613 self.meta.read_receipts.maybe_update_read_receipt(
614 full_receipt,
615 is_own_event,
616 &mut self.items,
617 );
618 }
619
620 #[instrument(skip(self))]
623 pub(super) fn maybe_update_read_receipts_of_prev_event(&mut self, event_id: &EventId) {
624 let Some(prev_event_meta) = self
626 .items
627 .all_remote_events()
628 .iter()
629 .rev()
630 .skip_while(|meta| meta.event_id != event_id)
632 .skip(1)
634 .find(|meta| meta.visible)
636 else {
637 trace!("Couldn't find any previous visible event, exiting");
638 return;
639 };
640
641 let Some((prev_item_pos, prev_event_item)) =
642 rfind_event_by_id(&self.items, &prev_event_meta.event_id)
643 else {
644 error!("inconsistent state: timeline item of visible event was not found");
645 return;
646 };
647
648 let prev_event_item_id = prev_event_item.internal_id.to_owned();
649 let mut prev_event_item = prev_event_item.clone();
650
651 let Some(remote_prev_event_item) = prev_event_item.as_remote_mut() else {
652 warn!("loading read receipts for a local item, this should not be possible");
653 return;
654 };
655
656 let read_receipts = self.meta.read_receipts.compute_event_receipts(
657 &remote_prev_event_item.event_id,
658 &mut self.items,
659 false,
660 );
661
662 if read_receipts.len() == remote_prev_event_item.read_receipts.len() {
664 trace!("same count of read receipts, not doing anything");
665 return;
666 }
667
668 trace!("replacing read receipts with the new ones");
669 remote_prev_event_item.read_receipts = read_receipts;
670 self.items.replace(prev_item_pos, TimelineItem::new(prev_event_item, prev_event_item_id));
671 }
672}
673
674impl<P: RoomDataProvider> TimelineState<P> {
675 pub(super) async fn populate_initial_user_receipt(
678 &mut self,
679 room_data_provider: &P,
680 receipt_type: ReceiptType,
681 ) {
682 let own_user_id = room_data_provider.own_user_id().to_owned();
683
684 let receipt_thread = self.focus.receipt_thread();
685 let wants_unthreaded_receipts = receipt_thread == ReceiptThread::Unthreaded;
686
687 let mut read_receipt = room_data_provider
688 .load_user_receipt(receipt_type.clone(), receipt_thread, &own_user_id)
689 .await;
690
691 if wants_unthreaded_receipts && read_receipt.is_none() {
692 read_receipt = room_data_provider
694 .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, &own_user_id)
695 .await;
696 }
697
698 if let Some(read_receipt) = read_receipt {
699 self.meta.read_receipts.upsert_latest(own_user_id, receipt_type, read_receipt);
700 }
701 }
702
703 pub(super) async fn latest_user_read_receipt(
707 &self,
708 user_id: &UserId,
709 receipt_thread: ReceiptThread,
710 room_data_provider: &P,
711 ) -> Option<(OwnedEventId, Receipt)> {
712 let all_remote_events = self.items.all_remote_events();
713
714 let public_read_receipt = self
715 .meta
716 .user_receipt(
717 user_id,
718 ReceiptType::Read,
719 receipt_thread.clone(),
720 room_data_provider,
721 all_remote_events,
722 )
723 .await;
724
725 let private_read_receipt = self
726 .meta
727 .user_receipt(
728 user_id,
729 ReceiptType::ReadPrivate,
730 receipt_thread,
731 room_data_provider,
732 all_remote_events,
733 )
734 .await;
735
736 match TimelineMetadata::compare_optional_receipts(
740 public_read_receipt.as_ref(),
741 private_read_receipt.as_ref(),
742 all_remote_events,
743 ) {
744 Ordering::Greater => public_read_receipt,
745 Ordering::Less => private_read_receipt,
746 _ => unreachable!(),
747 }
748 }
749
750 pub(super) fn latest_user_read_receipt_timeline_event_id(
753 &self,
754 user_id: &UserId,
755 ) -> Option<OwnedEventId> {
756 let public_read_receipt = self.meta.read_receipts.get_latest(user_id, &ReceiptType::Read);
759 let private_read_receipt =
760 self.meta.read_receipts.get_latest(user_id, &ReceiptType::ReadPrivate);
761
762 let (latest_receipt_id, _) = match TimelineMetadata::compare_optional_receipts(
766 public_read_receipt,
767 private_read_receipt,
768 self.items.all_remote_events(),
769 ) {
770 Ordering::Greater => public_read_receipt?,
771 Ordering::Less => private_read_receipt?,
772 _ => unreachable!(),
773 };
774
775 self.items
777 .all_remote_events()
778 .iter()
779 .rev()
780 .skip_while(|ev| ev.event_id != *latest_receipt_id)
781 .find(|ev| ev.visible)
782 .map(|ev| ev.event_id.clone())
783 }
784}
785
786impl TimelineMetadata {
787 pub(super) async fn user_receipt<P: RoomDataProvider>(
798 &self,
799 user_id: &UserId,
800 receipt_type: ReceiptType,
801 receipt_thread: ReceiptThread,
802 room_data_provider: &P,
803 all_remote_events: &AllRemoteEvents,
804 ) -> Option<(OwnedEventId, Receipt)> {
805 if let Some(receipt) = self.read_receipts.get_latest(user_id, &receipt_type) {
806 return Some(receipt.clone());
808 }
809
810 if receipt_thread == ReceiptThread::Unthreaded {
811 let unthreaded_read_receipt = room_data_provider
814 .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
815 .await;
816
817 let main_thread_read_receipt = room_data_provider
818 .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id)
819 .await;
820
821 match Self::compare_optional_receipts(
824 main_thread_read_receipt.as_ref(),
825 unthreaded_read_receipt.as_ref(),
826 all_remote_events,
827 ) {
828 Ordering::Greater => main_thread_read_receipt,
829 Ordering::Less => unthreaded_read_receipt,
830 _ => unreachable!(),
831 }
832 } else {
833 room_data_provider
837 .load_user_receipt(receipt_type.clone(), receipt_thread, user_id)
838 .await
839 }
840 }
841
842 fn compare_optional_receipts(
849 lhs: Option<&(OwnedEventId, Receipt)>,
850 rhs_or_default: Option<&(OwnedEventId, Receipt)>,
851 all_remote_events: &AllRemoteEvents,
852 ) -> Ordering {
853 let Some((lhs_event_id, lhs_receipt)) = lhs else {
855 return Ordering::Less;
856 };
857 let Some((rhs_event_id, rhs_receipt)) = rhs_or_default else {
858 return Ordering::Greater;
859 };
860
861 if let Some(relative_pos) =
863 Self::compare_events_positions(lhs_event_id, rhs_event_id, all_remote_events)
864 {
865 if relative_pos == RelativePosition::Before {
866 return Ordering::Greater;
867 }
868
869 return Ordering::Less;
870 }
871
872 if let Some((lhs_ts, rhs_ts)) = lhs_receipt.ts.zip(rhs_receipt.ts) {
874 if lhs_ts > rhs_ts {
875 return Ordering::Greater;
876 }
877
878 return Ordering::Less;
879 }
880
881 Ordering::Less
882 }
883}