1use std::{cmp::Ordering, collections::HashMap};
16
17use futures_core::Stream;
18use indexmap::IndexMap;
19use ruma::{
20 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, UserId,
21 events::receipt::{Receipt, ReceiptEventContent, ReceiptThread, ReceiptType},
22};
23use tokio::sync::watch;
24use tokio_stream::wrappers::WatchStream;
25use tracing::{debug, error, instrument, trace, warn};
26
27use super::{
28 AllRemoteEvents, ObservableItemsTransaction, RelativePosition, RoomDataProvider,
29 TimelineMetadata, TimelineState, rfind_event_by_id,
30};
31use crate::timeline::{TimelineItem, TimelineItemContent, controller::TimelineStateTransaction};
32
33#[derive(Clone, Debug, Default)]
35pub(super) struct ReadReceipts {
36 by_event: HashMap<OwnedEventId, IndexMap<OwnedUserId, Receipt>>,
40
41 latest_by_user: HashMap<OwnedUserId, HashMap<ReceiptType, (OwnedEventId, Receipt)>>,
46
47 own_user_read_receipts_changed_sender: watch::Sender<()>,
49}
50
51impl ReadReceipts {
52 pub(super) fn clear(&mut self) {
54 self.by_event.clear();
55 self.latest_by_user.clear();
56 }
57
58 pub(super) fn subscribe_own_user_read_receipts_changed(
60 &self,
61 ) -> impl Stream<Item = ()> + use<> {
62 let subscriber = self.own_user_read_receipts_changed_sender.subscribe();
63 WatchStream::from_changes(subscriber)
64 }
65
66 pub(crate) fn get_latest(
69 &self,
70 user_id: &UserId,
71 receipt_type: &ReceiptType,
72 ) -> Option<&(OwnedEventId, Receipt)> {
73 self.latest_by_user.get(user_id).and_then(|map| map.get(receipt_type))
74 }
75
76 fn upsert_latest(
79 &mut self,
80 user_id: OwnedUserId,
81 receipt_type: ReceiptType,
82 read_receipt: (OwnedEventId, Receipt),
83 ) {
84 self.latest_by_user.entry(user_id).or_default().insert(receipt_type, read_receipt);
85 }
86
87 #[instrument(skip_all, fields(user_id = %new_receipt.user_id, event_id = %new_receipt.event_id))]
97 fn maybe_update_read_receipt(
98 &mut self,
99 new_receipt: FullReceipt<'_>,
100 is_own_user_id: bool,
101 timeline_items: &mut ObservableItemsTransaction<'_>,
102 ) {
103 let all_events = timeline_items.all_remote_events();
104
105 let old_receipt = self.get_latest(new_receipt.user_id, &new_receipt.receipt_type);
107
108 if old_receipt
109 .as_ref()
110 .is_some_and(|(old_receipt_event_id, _)| old_receipt_event_id == new_receipt.event_id)
111 {
112 if !is_own_user_id {
114 trace!("receipt hasn't changed, nothing to do");
115 }
116 return;
117 }
118
119 let old_event_id = old_receipt.map(|(event_id, _)| event_id);
120
121 let mut old_receipt_pos = None;
123 let mut old_item_pos = None;
124 let mut old_item_event_id = None;
125 let mut new_receipt_pos = None;
126 let mut new_item_pos = None;
127 let mut new_item_event_id = None;
128
129 for (pos, event) in all_events.iter().rev().enumerate() {
130 if old_receipt_pos.is_none() && old_event_id == Some(&event.event_id) {
131 old_receipt_pos = Some(pos);
132 }
133
134 if old_receipt_pos.is_some()
137 && old_item_event_id.is_none()
138 && event.visible
139 && event.can_show_read_receipts
140 {
141 old_item_pos = event.timeline_item_index;
142 old_item_event_id = Some(event.event_id.clone());
143 }
144
145 if new_receipt_pos.is_none() && new_receipt.event_id == event.event_id {
146 new_receipt_pos = Some(pos);
147 }
148
149 if new_receipt_pos.is_some()
152 && new_item_event_id.is_none()
153 && event.visible
154 && event.can_show_read_receipts
155 {
156 new_item_pos = event.timeline_item_index;
157 new_item_event_id = Some(event.event_id.clone());
158 }
159
160 if old_item_event_id.is_some() && new_item_event_id.is_some() {
161 break;
163 }
164 }
165
166 if let Some(old_receipt_pos) = old_receipt_pos {
168 let Some(new_receipt_pos) = new_receipt_pos else {
169 if !is_own_user_id {
172 trace!(
173 "we had a previous read receipt, but couldn't find the event \
174 targeted by the new read receipt in the timeline, exiting"
175 );
176 }
177 return;
178 };
179
180 if old_receipt_pos < new_receipt_pos {
181 if !is_own_user_id {
183 trace!("the previous read receipt is more recent than the new one, exiting");
184 }
185 return;
186 }
187 }
188
189 if !is_own_user_id {
199 trace!(
200 from_event = ?old_event_id,
201 from_visible_event = ?old_item_event_id,
202 to_event = ?new_receipt.event_id,
203 to_visible_event = ?new_item_event_id,
204 ?old_item_pos,
205 ?new_item_pos,
206 "moving read receipt",
207 );
208
209 if let Some(old_event_id) = old_event_id.cloned() {
211 self.remove_event_receipt_for_user(&old_event_id, new_receipt.user_id);
212 }
213
214 self.add_event_receipt_for_user(
216 new_receipt.event_id.to_owned(),
217 new_receipt.user_id.to_owned(),
218 new_receipt.receipt.clone(),
219 );
220 }
221
222 self.upsert_latest(
224 new_receipt.user_id.to_owned(),
225 new_receipt.receipt_type,
226 (new_receipt.event_id.to_owned(), new_receipt.receipt.clone()),
227 );
228
229 if is_own_user_id {
230 self.own_user_read_receipts_changed_sender.send_replace(());
231 return;
233 }
234
235 if new_item_event_id == old_item_event_id {
236 return;
238 }
239
240 let timeline_update = ReadReceiptTimelineUpdate {
241 old_item_pos,
242 old_event_id: old_item_event_id,
243 new_item_pos,
244 new_event_id: new_item_event_id,
245 };
246
247 timeline_update.apply(
248 timeline_items,
249 new_receipt.user_id.to_owned(),
250 new_receipt.receipt.clone(),
251 );
252 }
253
254 fn get_event_receipts(&self, event_id: &EventId) -> Option<&IndexMap<OwnedUserId, Receipt>> {
256 self.by_event.get(event_id)
257 }
258
259 fn add_event_receipt_for_user(
261 &mut self,
262 event_id: OwnedEventId,
263 user_id: OwnedUserId,
264 receipt: Receipt,
265 ) {
266 self.by_event.entry(event_id).or_default().insert(user_id, receipt);
267 }
268
269 fn remove_event_receipt_for_user(&mut self, event_id: &EventId, user_id: &UserId) {
271 if let Some(map) = self.by_event.get_mut(event_id) {
272 map.swap_remove(user_id);
273 if map.is_empty() {
275 self.by_event.remove(event_id);
276 }
277 }
278 }
279
280 #[instrument(skip(self, timeline_items, at_end))]
285 pub(super) fn compute_event_receipts(
286 &self,
287 event_id: &EventId,
288 timeline_items: &mut ObservableItemsTransaction<'_>,
289 at_end: bool,
290 ) -> IndexMap<OwnedUserId, Receipt> {
291 let mut all_receipts = self.get_event_receipts(event_id).cloned().unwrap_or_default();
292
293 if at_end {
294 trace!(
296 "early return because @end, retrieved receipts: {}",
297 all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
298 );
299 return all_receipts;
300 }
301
302 trace!(
303 "loaded receipts: {}",
304 all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
305 );
306
307 let mut events_iter = timeline_items.all_remote_events().iter();
318 let mut prev_event_and_item_index = None;
319
320 for meta in events_iter.by_ref() {
321 if meta.event_id == event_id {
322 break;
323 }
324 if let Some(item_index) = meta.timeline_item_index {
325 prev_event_and_item_index = Some((meta.event_id.clone(), item_index));
326 }
327 }
328
329 let mut hidden = Vec::new();
332 for hidden_receipt_event_meta in
333 events_iter.take_while(|meta| !meta.visible || !meta.can_show_read_receipts)
334 {
335 if let Some(event_receipts) =
336 self.get_event_receipts(&hidden_receipt_event_meta.event_id)
337 {
338 trace!(%hidden_receipt_event_meta.event_id, "found receipts on hidden event");
339 hidden.extend(event_receipts.clone());
340 }
341 }
342
343 if let Some((prev_event_id, prev_item_index)) = prev_event_and_item_index {
345 let prev_item = &timeline_items[prev_item_index];
346 if let Some(remote_prev_item) = prev_item.as_event() {
350 let prev_receipts = remote_prev_item.read_receipts().clone();
351 for (user_id, _) in &hidden {
352 if !prev_receipts.contains_key(user_id) {
353 continue;
354 }
355 let mut up = ReadReceiptTimelineUpdate {
356 old_item_pos: Some(prev_item_index),
357 old_event_id: Some(prev_event_id.clone()),
358 new_item_pos: None,
359 new_event_id: None,
360 };
361 up.remove_old_receipt(timeline_items, user_id);
362 }
363 }
364 }
365
366 all_receipts.extend(hidden);
367 trace!(
368 "computed receipts: {}",
369 all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
370 );
371 all_receipts
372 }
373}
374
375struct FullReceipt<'a> {
376 event_id: &'a EventId,
377 user_id: &'a UserId,
378 receipt_type: ReceiptType,
379 receipt: &'a Receipt,
380}
381
382#[derive(Clone, Debug, Default)]
384struct ReadReceiptTimelineUpdate {
385 old_item_pos: Option<usize>,
388 old_event_id: Option<OwnedEventId>,
390 new_item_pos: Option<usize>,
393 new_event_id: Option<OwnedEventId>,
395}
396
397impl ReadReceiptTimelineUpdate {
398 #[instrument(skip_all)]
400 fn remove_old_receipt(&mut self, items: &mut ObservableItemsTransaction<'_>, user_id: &UserId) {
401 let Some(event_id) = &self.old_event_id else {
402 return;
404 };
405
406 let item_pos = self.old_item_pos.or_else(|| {
407 items
408 .iter_remotes_region()
409 .rev()
410 .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
411 .find_map(|(nth, event_item)| {
412 (event_item.event_id() == Some(event_id)).then_some(nth)
413 })
414 });
415
416 let Some(item_pos) = item_pos else {
417 debug!(%event_id, %user_id, "inconsistent state: old event item for read receipt was not found");
418 return;
419 };
420
421 self.old_item_pos = Some(item_pos);
422
423 let event_item = &items[item_pos];
424 let event_item_id = event_item.unique_id().to_owned();
425
426 let Some(mut event_item) = event_item.as_event().cloned() else {
427 warn!("received a read receipt for a virtual item, this should not be possible");
428 return;
429 };
430
431 if let Some(remote_event_item) = event_item.as_remote_mut() {
432 if remote_event_item.read_receipts.swap_remove(user_id).is_none() {
433 debug!(
434 %event_id, %user_id,
435 "inconsistent state: old event item for user's read \
436 receipt doesn't have a receipt for the user"
437 );
438 }
439 trace!(%user_id, %event_id, "removed read receipt from event item");
440 items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
441 } else {
442 warn!("received a read receipt for a local item, this should not be possible");
443 }
444 }
445
446 #[instrument(skip_all)]
448 fn add_new_receipt(
449 self,
450 items: &mut ObservableItemsTransaction<'_>,
451 user_id: OwnedUserId,
452 receipt: Receipt,
453 ) {
454 let Some(event_id) = self.new_event_id else {
455 return;
457 };
458
459 let old_item_pos = self.old_item_pos.unwrap_or(0);
460
461 let item_pos = self.new_item_pos.or_else(|| {
462 items
463 .iter_remotes_region()
464 .skip_while(|(nth, _)| *nth < old_item_pos)
467 .find_map(|(nth, item)| {
468 if let Some(event_item) = item.as_event() {
469 (event_item.event_id() == Some(&event_id)).then_some(nth)
470 } else {
471 None
472 }
473 })
474 });
475
476 let Some(item_pos) = item_pos else {
477 debug!(
478 %event_id, %user_id,
479 "inconsistent state: new event item for read receipt was not found",
480 );
481 return;
482 };
483
484 debug_assert!(
485 item_pos >= self.old_item_pos.unwrap_or(0),
486 "The new receipt must be added on a timeline item that is _after_ the timeline item \
487 that was holding the old receipt"
488 );
489
490 let event_item = &items[item_pos];
491 let event_item_id = event_item.unique_id().to_owned();
492
493 let Some(mut event_item) = event_item.as_event().cloned() else {
494 warn!("received a read receipt for a virtual item, this should not be possible");
495 return;
496 };
497
498 if let Some(remote_event_item) = event_item.as_remote_mut() {
499 trace!(%user_id, %event_id, "added read receipt to event item");
500 remote_event_item.read_receipts.insert(user_id, receipt);
501 items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
502 } else {
503 warn!("received a read receipt for a local item, this should not be possible");
504 }
505 }
506
507 fn apply(
509 mut self,
510 items: &mut ObservableItemsTransaction<'_>,
511 user_id: OwnedUserId,
512 receipt: Receipt,
513 ) {
514 self.remove_old_receipt(items, &user_id);
515 self.add_new_receipt(items, user_id, receipt);
516 }
517}
518
519impl<P: RoomDataProvider> TimelineStateTransaction<'_, P> {
520 pub(super) fn handle_explicit_read_receipts(
521 &mut self,
522 receipt_event_content: ReceiptEventContent,
523 own_user_id: &UserId,
524 ) {
525 trace!("handling explicit read receipts");
526 let own_receipt_thread = self.focus.receipt_thread();
527
528 for (event_id, receipt_types) in receipt_event_content.0 {
529 for (receipt_type, receipts) in receipt_types {
530 if !matches!(receipt_type, ReceiptType::Read | ReceiptType::ReadPrivate) {
532 continue;
533 }
534
535 for (user_id, receipt) in receipts {
536 if matches!(own_receipt_thread, ReceiptThread::Unthreaded | ReceiptThread::Main)
537 {
538 match receipt.thread {
542 ReceiptThread::Unthreaded | ReceiptThread::Main => {
543 }
545
546 ReceiptThread::Thread(thread_root) => {
547 if user_id == self.meta.own_user_id
554 && let Some((item_pos, item)) =
555 rfind_event_by_id(&self.items, &thread_root)
556 {
557 trace!(
558 "thread root has been found; will update thread summary with the latest receipts"
559 );
560
561 let item_id = item.internal_id.to_owned();
562 let mut new_item = item.clone();
563
564 let TimelineItemContent::MsgLike(msglike) =
565 &mut new_item.content
566 else {
567 continue;
569 };
570
571 let Some(thread_summary) = &mut msglike.thread_summary else {
572 continue;
574 };
575
576 if receipt_type == ReceiptType::Read {
578 thread_summary.public_read_receipt_event_id =
579 Some(event_id.clone());
580 } else if receipt_type == ReceiptType::ReadPrivate {
581 thread_summary.private_read_receipt_event_id =
582 Some(event_id.clone());
583 } else {
584 continue;
586 }
587
588 self.items
589 .replace(item_pos, TimelineItem::new(new_item, item_id));
590 }
591
592 continue;
595 }
596
597 _ => {
598 continue;
600 }
601 }
602 } else if own_receipt_thread != receipt.thread {
603 continue;
605 }
606
607 let is_own_user_id = user_id == own_user_id;
608 let full_receipt = FullReceipt {
609 event_id: &event_id,
610 user_id: &user_id,
611 receipt_type: receipt_type.clone(),
612 receipt: &receipt,
613 };
614
615 self.meta.read_receipts.maybe_update_read_receipt(
616 full_receipt,
617 is_own_user_id,
618 &mut self.items,
619 );
620 }
621 }
622 }
623 }
624
625 pub(super) async fn load_read_receipts_for_event(
629 &mut self,
630 event_id: &EventId,
631 room_data_provider: &P,
632 ) {
633 trace!(%event_id, "loading initial receipts for an event");
634
635 let receipt_thread = self.focus.receipt_thread();
636
637 let receipts = if matches!(receipt_thread, ReceiptThread::Unthreaded | ReceiptThread::Main)
638 {
639 let mut main_receipts =
645 room_data_provider.load_event_receipts(event_id, ReceiptThread::Main).await;
646
647 let unthreaded_receipts =
649 room_data_provider.load_event_receipts(event_id, ReceiptThread::Unthreaded).await;
650
651 main_receipts.extend(unthreaded_receipts);
655 main_receipts
656 } else {
657 room_data_provider.load_event_receipts(event_id, receipt_thread.clone()).await
660 };
661
662 let own_user_id = room_data_provider.own_user_id();
663
664 for (user_id, receipt) in receipts {
667 let full_receipt = FullReceipt {
668 event_id,
669 user_id: &user_id,
670 receipt_type: ReceiptType::Read,
671 receipt: &receipt,
672 };
673
674 self.meta.read_receipts.maybe_update_read_receipt(
675 full_receipt,
676 user_id == own_user_id,
677 &mut self.items,
678 );
679 }
680 }
681
682 pub(super) fn maybe_add_implicit_read_receipt(
691 &mut self,
692 event_id: &EventId,
693 sender: Option<&UserId>,
694 timestamp: Option<MilliSecondsSinceUnixEpoch>,
695 ) {
696 let (Some(user_id), Some(timestamp)) = (sender, timestamp) else {
697 return;
699 };
700
701 trace!(%user_id, %event_id, "adding implicit read receipt");
702
703 let mut receipt = Receipt::new(timestamp);
704 receipt.thread = self.focus.receipt_thread();
705
706 let full_receipt =
707 FullReceipt { event_id, user_id, receipt_type: ReceiptType::Read, receipt: &receipt };
708
709 let is_own_event = sender.is_some_and(|sender| sender == self.meta.own_user_id);
710
711 self.meta.read_receipts.maybe_update_read_receipt(
712 full_receipt,
713 is_own_event,
714 &mut self.items,
715 );
716 }
717
718 #[instrument(skip(self))]
721 pub(super) fn maybe_update_read_receipts_of_prev_event(&mut self, event_id: &EventId) {
722 let Some(prev_event_meta) = self
724 .items
725 .all_remote_events()
726 .iter()
727 .rev()
728 .skip_while(|meta| meta.event_id != event_id)
730 .skip(1)
732 .find(|meta| meta.visible && meta.can_show_read_receipts)
734 else {
735 trace!("Couldn't find any previous visible event, exiting");
736 return;
737 };
738
739 let Some((prev_item_pos, prev_event_item)) =
740 rfind_event_by_id(&self.items, &prev_event_meta.event_id)
741 else {
742 error!("inconsistent state: timeline item of visible event was not found");
743 return;
744 };
745
746 let prev_event_item_id = prev_event_item.internal_id.to_owned();
747 let mut prev_event_item = prev_event_item.clone();
748
749 let Some(remote_prev_event_item) = prev_event_item.as_remote_mut() else {
750 warn!("loading read receipts for a local item, this should not be possible");
751 return;
752 };
753
754 let read_receipts = self.meta.read_receipts.compute_event_receipts(
755 &remote_prev_event_item.event_id,
756 &mut self.items,
757 false,
758 );
759
760 if read_receipts.len() == remote_prev_event_item.read_receipts.len() {
762 trace!("same count of read receipts, not doing anything");
763 return;
764 }
765
766 trace!("replacing read receipts with the new ones");
767 remote_prev_event_item.read_receipts = read_receipts;
768 self.items.replace(prev_item_pos, TimelineItem::new(prev_event_item, prev_event_item_id));
769 }
770}
771
772impl<P: RoomDataProvider> TimelineState<P> {
773 pub(super) async fn populate_initial_user_receipt(
776 &mut self,
777 room_data_provider: &P,
778 receipt_type: ReceiptType,
779 ) {
780 let own_user_id = room_data_provider.own_user_id().to_owned();
781
782 let receipt_thread = self.focus.receipt_thread();
783 let wants_unthreaded_receipts = receipt_thread == ReceiptThread::Unthreaded;
784
785 let mut read_receipt = room_data_provider
786 .load_user_receipt(receipt_type.clone(), receipt_thread, &own_user_id)
787 .await;
788
789 if wants_unthreaded_receipts && read_receipt.is_none() {
790 read_receipt = room_data_provider
792 .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, &own_user_id)
793 .await;
794 }
795
796 if let Some(read_receipt) = read_receipt {
797 self.meta.read_receipts.upsert_latest(own_user_id, receipt_type, read_receipt);
798 }
799 }
800
801 pub(super) async fn latest_user_read_receipt(
805 &self,
806 user_id: &UserId,
807 receipt_thread: ReceiptThread,
808 room_data_provider: &P,
809 ) -> Option<(OwnedEventId, Receipt)> {
810 let all_remote_events = self.items.all_remote_events();
811
812 let public_read_receipt = self
813 .meta
814 .user_receipt(
815 user_id,
816 ReceiptType::Read,
817 receipt_thread.clone(),
818 room_data_provider,
819 all_remote_events,
820 )
821 .await;
822
823 let private_read_receipt = self
824 .meta
825 .user_receipt(
826 user_id,
827 ReceiptType::ReadPrivate,
828 receipt_thread,
829 room_data_provider,
830 all_remote_events,
831 )
832 .await;
833
834 match TimelineMetadata::compare_optional_receipts(
838 public_read_receipt.as_ref(),
839 private_read_receipt.as_ref(),
840 all_remote_events,
841 ) {
842 Ordering::Greater => public_read_receipt,
843 Ordering::Less => private_read_receipt,
844 _ => unreachable!(),
845 }
846 }
847
848 pub(super) fn latest_user_read_receipt_timeline_event_id(
851 &self,
852 user_id: &UserId,
853 ) -> Option<OwnedEventId> {
854 let public_read_receipt = self.meta.read_receipts.get_latest(user_id, &ReceiptType::Read);
857 let private_read_receipt =
858 self.meta.read_receipts.get_latest(user_id, &ReceiptType::ReadPrivate);
859
860 let (latest_receipt_id, _) = match TimelineMetadata::compare_optional_receipts(
864 public_read_receipt,
865 private_read_receipt,
866 self.items.all_remote_events(),
867 ) {
868 Ordering::Greater => public_read_receipt?,
869 Ordering::Less => private_read_receipt?,
870 _ => unreachable!(),
871 };
872
873 self.items
875 .all_remote_events()
876 .iter()
877 .rev()
878 .skip_while(|ev| ev.event_id != *latest_receipt_id)
879 .find(|ev| ev.visible && ev.can_show_read_receipts)
880 .map(|ev| ev.event_id.clone())
881 }
882}
883
884impl TimelineMetadata {
885 pub(super) async fn user_receipt<P: RoomDataProvider>(
896 &self,
897 user_id: &UserId,
898 receipt_type: ReceiptType,
899 receipt_thread: ReceiptThread,
900 room_data_provider: &P,
901 all_remote_events: &AllRemoteEvents,
902 ) -> Option<(OwnedEventId, Receipt)> {
903 if let Some(receipt) = self.read_receipts.get_latest(user_id, &receipt_type) {
904 return Some(receipt.clone());
906 }
907
908 if receipt_thread == ReceiptThread::Unthreaded {
909 let unthreaded_read_receipt = room_data_provider
912 .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
913 .await;
914
915 let main_thread_read_receipt = room_data_provider
916 .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id)
917 .await;
918
919 match Self::compare_optional_receipts(
922 main_thread_read_receipt.as_ref(),
923 unthreaded_read_receipt.as_ref(),
924 all_remote_events,
925 ) {
926 Ordering::Greater => main_thread_read_receipt,
927 Ordering::Less => unthreaded_read_receipt,
928 _ => unreachable!(),
929 }
930 } else {
931 room_data_provider
935 .load_user_receipt(receipt_type.clone(), receipt_thread, user_id)
936 .await
937 }
938 }
939
940 fn compare_optional_receipts(
947 lhs: Option<&(OwnedEventId, Receipt)>,
948 rhs_or_default: Option<&(OwnedEventId, Receipt)>,
949 all_remote_events: &AllRemoteEvents,
950 ) -> Ordering {
951 let Some((lhs_event_id, lhs_receipt)) = lhs else {
953 return Ordering::Less;
954 };
955 let Some((rhs_event_id, rhs_receipt)) = rhs_or_default else {
956 return Ordering::Greater;
957 };
958
959 if let Some(relative_pos) =
961 Self::compare_events_positions(lhs_event_id, rhs_event_id, all_remote_events)
962 {
963 if relative_pos == RelativePosition::Before {
964 return Ordering::Greater;
965 }
966
967 return Ordering::Less;
968 }
969
970 if let Some((lhs_ts, rhs_ts)) = lhs_receipt.ts.zip(rhs_receipt.ts) {
972 if lhs_ts > rhs_ts {
973 return Ordering::Greater;
974 }
975
976 return Ordering::Less;
977 }
978
979 Ordering::Less
980 }
981}