1use std::{cmp::Ordering, collections::HashMap};
16
17use futures_core::Stream;
18use indexmap::IndexMap;
19use ruma::{
20 events::receipt::{Receipt, ReceiptEventContent, ReceiptThread, ReceiptType},
21 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, UserId,
22};
23use tokio::sync::watch;
24use tokio_stream::wrappers::WatchStream;
25use tracing::{debug, error, instrument, trace, warn};
26
27use super::{
28 rfind_event_by_id, AllRemoteEvents, ObservableItemsTransaction, RelativePosition,
29 RoomDataProvider, TimelineMetadata, TimelineState,
30};
31use crate::timeline::{controller::TimelineStateTransaction, TimelineItem};
32
33#[derive(Clone, Debug, Default)]
35pub(super) struct ReadReceipts {
36 by_event: HashMap<OwnedEventId, IndexMap<OwnedUserId, Receipt>>,
40
41 latest_by_user: HashMap<OwnedUserId, HashMap<ReceiptType, (OwnedEventId, Receipt)>>,
46
47 own_user_read_receipts_changed_sender: watch::Sender<()>,
49}
50
51impl ReadReceipts {
52 pub(super) fn clear(&mut self) {
54 self.by_event.clear();
55 self.latest_by_user.clear();
56 }
57
58 pub(super) fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
60 let subscriber = self.own_user_read_receipts_changed_sender.subscribe();
61 WatchStream::from_changes(subscriber)
62 }
63
64 fn get_latest(
67 &self,
68 user_id: &UserId,
69 receipt_type: &ReceiptType,
70 ) -> Option<&(OwnedEventId, Receipt)> {
71 self.latest_by_user.get(user_id).and_then(|map| map.get(receipt_type))
72 }
73
74 fn upsert_latest(
77 &mut self,
78 user_id: OwnedUserId,
79 receipt_type: ReceiptType,
80 read_receipt: (OwnedEventId, Receipt),
81 ) {
82 self.latest_by_user.entry(user_id).or_default().insert(receipt_type, read_receipt);
83 }
84
85 #[instrument(skip_all, fields(user_id = %new_receipt.user_id, event_id = %new_receipt.event_id))]
95 fn maybe_update_read_receipt(
96 &mut self,
97 new_receipt: FullReceipt<'_>,
98 is_own_user_id: bool,
99 timeline_items: &mut ObservableItemsTransaction<'_>,
100 ) {
101 let all_events = timeline_items.all_remote_events();
102
103 let old_receipt = self.get_latest(new_receipt.user_id, &new_receipt.receipt_type);
105
106 if old_receipt
107 .as_ref()
108 .is_some_and(|(old_receipt_event_id, _)| old_receipt_event_id == new_receipt.event_id)
109 {
110 if !is_own_user_id {
112 trace!("receipt hasn't changed, nothing to do");
113 }
114 return;
115 }
116
117 let old_event_id = old_receipt.map(|(event_id, _)| event_id);
118
119 let mut old_receipt_pos = None;
121 let mut old_item_pos = None;
122 let mut old_item_event_id = None;
123 let mut new_receipt_pos = None;
124 let mut new_item_pos = None;
125 let mut new_item_event_id = None;
126
127 for (pos, event) in all_events.iter().rev().enumerate() {
128 if old_receipt_pos.is_none() && old_event_id == Some(&event.event_id) {
129 old_receipt_pos = Some(pos);
130 }
131
132 if old_receipt_pos.is_some() && old_item_event_id.is_none() && event.visible {
134 old_item_pos = event.timeline_item_index;
135 old_item_event_id = Some(event.event_id.clone());
136 }
137
138 if new_receipt_pos.is_none() && new_receipt.event_id == event.event_id {
139 new_receipt_pos = Some(pos);
140 }
141
142 if new_receipt_pos.is_some() && new_item_event_id.is_none() && event.visible {
144 new_item_pos = event.timeline_item_index;
145 new_item_event_id = Some(event.event_id.clone());
146 }
147
148 if old_item_event_id.is_some() && new_item_event_id.is_some() {
149 break;
151 }
152 }
153
154 if let Some(old_receipt_pos) = old_receipt_pos {
156 let Some(new_receipt_pos) = new_receipt_pos else {
157 if !is_own_user_id {
160 trace!("we had a previous read receipt, but couldn't find the event targeted by the new read receipt in the timeline, exiting");
161 }
162 return;
163 };
164
165 if old_receipt_pos < new_receipt_pos {
166 if !is_own_user_id {
168 trace!("the previous read receipt is more recent than the new one, exiting");
169 }
170 return;
171 }
172 }
173
174 if !is_own_user_id {
184 trace!(
185 from_event = ?old_event_id,
186 from_visible_event = ?old_item_event_id,
187 to_event = ?new_receipt.event_id,
188 to_visible_event = ?new_item_event_id,
189 ?old_item_pos,
190 ?new_item_pos,
191 "moving read receipt",
192 );
193
194 if let Some(old_event_id) = old_event_id.cloned() {
196 self.remove_event_receipt_for_user(&old_event_id, new_receipt.user_id);
197 }
198
199 self.add_event_receipt_for_user(
201 new_receipt.event_id.to_owned(),
202 new_receipt.user_id.to_owned(),
203 new_receipt.receipt.clone(),
204 );
205 }
206
207 self.upsert_latest(
209 new_receipt.user_id.to_owned(),
210 new_receipt.receipt_type,
211 (new_receipt.event_id.to_owned(), new_receipt.receipt.clone()),
212 );
213
214 if is_own_user_id {
215 self.own_user_read_receipts_changed_sender.send_replace(());
216 return;
218 }
219
220 if new_item_event_id == old_item_event_id {
221 return;
223 }
224
225 let timeline_update = ReadReceiptTimelineUpdate {
226 old_item_pos,
227 old_event_id: old_item_event_id,
228 new_item_pos,
229 new_event_id: new_item_event_id,
230 };
231
232 timeline_update.apply(
233 timeline_items,
234 new_receipt.user_id.to_owned(),
235 new_receipt.receipt.clone(),
236 );
237 }
238
239 fn get_event_receipts(&self, event_id: &EventId) -> Option<&IndexMap<OwnedUserId, Receipt>> {
241 self.by_event.get(event_id)
242 }
243
244 fn add_event_receipt_for_user(
246 &mut self,
247 event_id: OwnedEventId,
248 user_id: OwnedUserId,
249 receipt: Receipt,
250 ) {
251 self.by_event.entry(event_id).or_default().insert(user_id, receipt);
252 }
253
254 fn remove_event_receipt_for_user(&mut self, event_id: &EventId, user_id: &UserId) {
256 if let Some(map) = self.by_event.get_mut(event_id) {
257 map.swap_remove(user_id);
258 if map.is_empty() {
260 self.by_event.remove(event_id);
261 }
262 }
263 }
264
265 #[instrument(skip(self, timeline_items, at_end))]
270 pub(super) fn compute_event_receipts(
271 &self,
272 event_id: &EventId,
273 timeline_items: &mut ObservableItemsTransaction<'_>,
274 at_end: bool,
275 ) -> IndexMap<OwnedUserId, Receipt> {
276 let mut all_receipts = self.get_event_receipts(event_id).cloned().unwrap_or_default();
277
278 if at_end {
279 trace!(
281 "early return because @end, retrieved receipts: {}",
282 all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
283 );
284 return all_receipts;
285 }
286
287 trace!(
288 "loaded receipts: {}",
289 all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
290 );
291
292 let mut events_iter = timeline_items.all_remote_events().iter();
303 let mut prev_event_and_item_index = None;
304
305 for meta in events_iter.by_ref() {
306 if meta.event_id == event_id {
307 break;
308 }
309 if let Some(item_index) = meta.timeline_item_index {
310 prev_event_and_item_index = Some((meta.event_id.clone(), item_index));
311 }
312 }
313
314 let mut hidden = Vec::new();
316 for hidden_event_meta in events_iter.take_while(|meta| !meta.visible) {
317 if let Some(event_receipts) = self.get_event_receipts(&hidden_event_meta.event_id) {
318 trace!(%hidden_event_meta.event_id, "found receipts on hidden event");
319 hidden.extend(event_receipts.clone());
320 }
321 }
322
323 if let Some((prev_event_id, prev_item_index)) = prev_event_and_item_index {
325 let prev_item = &timeline_items[prev_item_index];
326 if let Some(remote_prev_item) = prev_item.as_event() {
330 let prev_receipts = remote_prev_item.read_receipts().clone();
331 for (user_id, _) in &hidden {
332 if !prev_receipts.contains_key(user_id) {
333 continue;
334 }
335 let mut up = ReadReceiptTimelineUpdate {
336 old_item_pos: Some(prev_item_index),
337 old_event_id: Some(prev_event_id.clone()),
338 new_item_pos: None,
339 new_event_id: None,
340 };
341 up.remove_old_receipt(timeline_items, user_id);
342 }
343 }
344 }
345
346 all_receipts.extend(hidden);
347 trace!(
348 "computed receipts: {}",
349 all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
350 );
351 all_receipts
352 }
353}
354
355struct FullReceipt<'a> {
356 event_id: &'a EventId,
357 user_id: &'a UserId,
358 receipt_type: ReceiptType,
359 receipt: &'a Receipt,
360}
361
362#[derive(Clone, Debug, Default)]
364struct ReadReceiptTimelineUpdate {
365 old_item_pos: Option<usize>,
368 old_event_id: Option<OwnedEventId>,
370 new_item_pos: Option<usize>,
373 new_event_id: Option<OwnedEventId>,
375}
376
377impl ReadReceiptTimelineUpdate {
378 #[instrument(skip_all)]
380 fn remove_old_receipt(&mut self, items: &mut ObservableItemsTransaction<'_>, user_id: &UserId) {
381 let Some(event_id) = &self.old_event_id else {
382 return;
384 };
385
386 let item_pos = self.old_item_pos.or_else(|| {
387 items
388 .iter_remotes_region()
389 .rev()
390 .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
391 .find_map(|(nth, event_item)| {
392 (event_item.event_id() == Some(event_id)).then_some(nth)
393 })
394 });
395
396 let Some(item_pos) = item_pos else {
397 debug!(%event_id, %user_id, "inconsistent state: old event item for read receipt was not found");
398 return;
399 };
400
401 self.old_item_pos = Some(item_pos);
402
403 let event_item = &items[item_pos];
404 let event_item_id = event_item.unique_id().to_owned();
405
406 let Some(mut event_item) = event_item.as_event().cloned() else {
407 warn!("received a read receipt for a virtual item, this should not be possible");
408 return;
409 };
410
411 if let Some(remote_event_item) = event_item.as_remote_mut() {
412 if remote_event_item.read_receipts.swap_remove(user_id).is_none() {
413 debug!(
414 %event_id, %user_id,
415 "inconsistent state: old event item for user's read \
416 receipt doesn't have a receipt for the user"
417 );
418 }
419 trace!(%user_id, %event_id, "removed read receipt from event item");
420 items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
421 } else {
422 warn!("received a read receipt for a local item, this should not be possible");
423 }
424 }
425
426 #[instrument(skip_all)]
428 fn add_new_receipt(
429 self,
430 items: &mut ObservableItemsTransaction<'_>,
431 user_id: OwnedUserId,
432 receipt: Receipt,
433 ) {
434 let Some(event_id) = self.new_event_id else {
435 return;
437 };
438
439 let old_item_pos = self.old_item_pos.unwrap_or(0);
440
441 let item_pos = self.new_item_pos.or_else(|| {
442 items
443 .iter_remotes_region()
444 .skip_while(|(nth, _)| *nth < old_item_pos)
447 .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
448 .find_map(|(nth, event_item)| {
449 (event_item.event_id() == Some(&event_id)).then_some(nth)
450 })
451 });
452
453 let Some(item_pos) = item_pos else {
454 debug!(
455 %event_id, %user_id,
456 "inconsistent state: new event item for read receipt was not found",
457 );
458 return;
459 };
460
461 debug_assert!(
462 item_pos >= self.old_item_pos.unwrap_or(0),
463 "The new receipt must be added on a timeline item that is _after_ the timeline item \
464 that was holding the old receipt"
465 );
466
467 let event_item = &items[item_pos];
468 let event_item_id = event_item.unique_id().to_owned();
469
470 let Some(mut event_item) = event_item.as_event().cloned() else {
471 warn!("received a read receipt for a virtual item, this should not be possible");
472 return;
473 };
474
475 if let Some(remote_event_item) = event_item.as_remote_mut() {
476 trace!(%user_id, %event_id, "added read receipt to event item");
477 remote_event_item.read_receipts.insert(user_id, receipt);
478 items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
479 } else {
480 warn!("received a read receipt for a local item, this should not be possible");
481 }
482 }
483
484 fn apply(
486 mut self,
487 items: &mut ObservableItemsTransaction<'_>,
488 user_id: OwnedUserId,
489 receipt: Receipt,
490 ) {
491 self.remove_old_receipt(items, &user_id);
492 self.add_new_receipt(items, user_id, receipt);
493 }
494}
495
496impl TimelineStateTransaction<'_> {
497 pub(super) fn handle_explicit_read_receipts(
498 &mut self,
499 receipt_event_content: ReceiptEventContent,
500 own_user_id: &UserId,
501 ) {
502 trace!("handling explicit read receipts");
503 for (event_id, receipt_types) in receipt_event_content.0 {
504 for (receipt_type, receipts) in receipt_types {
505 if !matches!(receipt_type, ReceiptType::Read | ReceiptType::ReadPrivate) {
507 continue;
508 }
509
510 for (user_id, receipt) in receipts {
511 if !matches!(receipt.thread, ReceiptThread::Unthreaded | ReceiptThread::Main) {
512 continue;
513 }
514
515 let is_own_user_id = user_id == own_user_id;
516 let full_receipt = FullReceipt {
517 event_id: &event_id,
518 user_id: &user_id,
519 receipt_type: receipt_type.clone(),
520 receipt: &receipt,
521 };
522
523 self.meta.read_receipts.maybe_update_read_receipt(
524 full_receipt,
525 is_own_user_id,
526 &mut self.items,
527 );
528 }
529 }
530 }
531 }
532
533 pub(super) async fn load_read_receipts_for_event<P: RoomDataProvider>(
537 &mut self,
538 event_id: &EventId,
539 room_data_provider: &P,
540 ) {
541 trace!(%event_id, "loading initial receipts for an event");
542 let read_receipts = room_data_provider.load_event_receipts(event_id).await;
543 let own_user_id = room_data_provider.own_user_id();
544
545 for (user_id, receipt) in read_receipts {
548 let full_receipt = FullReceipt {
549 event_id,
550 user_id: &user_id,
551 receipt_type: ReceiptType::Read,
552 receipt: &receipt,
553 };
554
555 self.meta.read_receipts.maybe_update_read_receipt(
556 full_receipt,
557 user_id == own_user_id,
558 &mut self.items,
559 );
560 }
561 }
562
563 pub(super) fn maybe_add_implicit_read_receipt(
572 &mut self,
573 event_id: &EventId,
574 sender: Option<&UserId>,
575 timestamp: Option<MilliSecondsSinceUnixEpoch>,
576 ) {
577 let (Some(user_id), Some(timestamp)) = (sender, timestamp) else {
578 return;
580 };
581
582 trace!(%event_id, "adding implicit read receipt");
583 let receipt = Receipt::new(timestamp);
584 let full_receipt =
585 FullReceipt { event_id, user_id, receipt_type: ReceiptType::Read, receipt: &receipt };
586
587 let is_own_event = sender.is_some_and(|sender| sender == self.meta.own_user_id);
588 self.meta.read_receipts.maybe_update_read_receipt(
589 full_receipt,
590 is_own_event,
591 &mut self.items,
592 );
593 }
594
595 #[instrument(skip(self))]
598 pub(super) fn maybe_update_read_receipts_of_prev_event(&mut self, event_id: &EventId) {
599 let Some(prev_event_meta) = self
601 .items
602 .all_remote_events()
603 .iter()
604 .rev()
605 .skip_while(|meta| meta.event_id != event_id)
607 .skip(1)
609 .find(|meta| meta.visible)
611 else {
612 trace!("Couldn't find any previous visible event, exiting");
613 return;
614 };
615
616 let Some((prev_item_pos, prev_event_item)) =
617 rfind_event_by_id(&self.items, &prev_event_meta.event_id)
618 else {
619 error!("inconsistent state: timeline item of visible event was not found");
620 return;
621 };
622
623 let prev_event_item_id = prev_event_item.internal_id.to_owned();
624 let mut prev_event_item = prev_event_item.clone();
625
626 let Some(remote_prev_event_item) = prev_event_item.as_remote_mut() else {
627 warn!("loading read receipts for a local item, this should not be possible");
628 return;
629 };
630
631 let read_receipts = self.meta.read_receipts.compute_event_receipts(
632 &remote_prev_event_item.event_id,
633 &mut self.items,
634 false,
635 );
636
637 if read_receipts.len() == remote_prev_event_item.read_receipts.len() {
639 trace!("same count of read receipts, not doing anything");
640 return;
641 }
642
643 trace!("replacing read receipts with the new ones");
644 remote_prev_event_item.read_receipts = read_receipts;
645 self.items.replace(prev_item_pos, TimelineItem::new(prev_event_item, prev_event_item_id));
646 }
647}
648
649impl TimelineState {
650 pub(super) async fn populate_initial_user_receipt<P: RoomDataProvider>(
653 &mut self,
654 room_data_provider: &P,
655 receipt_type: ReceiptType,
656 ) {
657 let own_user_id = room_data_provider.own_user_id().to_owned();
658
659 let mut read_receipt = room_data_provider
660 .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, &own_user_id)
661 .await;
662
663 if read_receipt.is_none() {
665 read_receipt = room_data_provider
666 .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, &own_user_id)
667 .await;
668 }
669
670 if let Some(read_receipt) = read_receipt {
671 self.meta.read_receipts.upsert_latest(own_user_id, receipt_type, read_receipt);
672 }
673 }
674
675 pub(super) async fn latest_user_read_receipt<P: RoomDataProvider>(
679 &self,
680 user_id: &UserId,
681 room_data_provider: &P,
682 ) -> Option<(OwnedEventId, Receipt)> {
683 let all_remote_events = self.items.all_remote_events();
684 let public_read_receipt = self
685 .meta
686 .user_receipt(user_id, ReceiptType::Read, room_data_provider, all_remote_events)
687 .await;
688 let private_read_receipt = self
689 .meta
690 .user_receipt(user_id, ReceiptType::ReadPrivate, room_data_provider, all_remote_events)
691 .await;
692
693 match self.meta.compare_optional_receipts(
697 public_read_receipt.as_ref(),
698 private_read_receipt.as_ref(),
699 self.items.all_remote_events(),
700 ) {
701 Ordering::Greater => public_read_receipt,
702 Ordering::Less => private_read_receipt,
703 _ => unreachable!(),
704 }
705 }
706
707 pub(super) fn latest_user_read_receipt_timeline_event_id(
710 &self,
711 user_id: &UserId,
712 ) -> Option<OwnedEventId> {
713 let public_read_receipt = self.meta.read_receipts.get_latest(user_id, &ReceiptType::Read);
716 let private_read_receipt =
717 self.meta.read_receipts.get_latest(user_id, &ReceiptType::ReadPrivate);
718
719 let (latest_receipt_id, _) = match self.meta.compare_optional_receipts(
723 public_read_receipt,
724 private_read_receipt,
725 self.items.all_remote_events(),
726 ) {
727 Ordering::Greater => public_read_receipt?,
728 Ordering::Less => private_read_receipt?,
729 _ => unreachable!(),
730 };
731
732 self.items
734 .all_remote_events()
735 .iter()
736 .rev()
737 .skip_while(|ev| ev.event_id != *latest_receipt_id)
738 .find(|ev| ev.visible)
739 .map(|ev| ev.event_id.clone())
740 }
741}
742
743impl TimelineMetadata {
744 pub(super) async fn user_receipt<P: RoomDataProvider>(
750 &self,
751 user_id: &UserId,
752 receipt_type: ReceiptType,
753 room_data_provider: &P,
754 all_remote_events: &AllRemoteEvents,
755 ) -> Option<(OwnedEventId, Receipt)> {
756 if let Some(receipt) = self.read_receipts.get_latest(user_id, &receipt_type) {
757 return Some(receipt.clone());
759 }
760
761 let unthreaded_read_receipt = room_data_provider
762 .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
763 .await;
764
765 let main_thread_read_receipt = room_data_provider
766 .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id)
767 .await;
768
769 match self.compare_optional_receipts(
772 main_thread_read_receipt.as_ref(),
773 unthreaded_read_receipt.as_ref(),
774 all_remote_events,
775 ) {
776 Ordering::Greater => main_thread_read_receipt,
777 Ordering::Less => unthreaded_read_receipt,
778 _ => unreachable!(),
779 }
780 }
781
782 fn compare_optional_receipts(
789 &self,
790 lhs: Option<&(OwnedEventId, Receipt)>,
791 rhs_or_default: Option<&(OwnedEventId, Receipt)>,
792 all_remote_events: &AllRemoteEvents,
793 ) -> Ordering {
794 let Some((lhs_event_id, lhs_receipt)) = lhs else {
796 return Ordering::Less;
797 };
798 let Some((rhs_event_id, rhs_receipt)) = rhs_or_default else {
799 return Ordering::Greater;
800 };
801
802 if let Some(relative_pos) =
804 self.compare_events_positions(lhs_event_id, rhs_event_id, all_remote_events)
805 {
806 if relative_pos == RelativePosition::Before {
807 return Ordering::Greater;
808 }
809
810 return Ordering::Less;
811 }
812
813 if let Some((lhs_ts, rhs_ts)) = lhs_receipt.ts.zip(rhs_receipt.ts) {
815 if lhs_ts > rhs_ts {
816 return Ordering::Greater;
817 }
818
819 return Ordering::Less;
820 }
821
822 Ordering::Less
823 }
824}