1use std::{
16 collections::HashMap,
17 iter::once,
18 ops::{ControlFlow, Deref},
19};
20
21pub use matrix_sdk_base::latest_event::{LatestEventValue, LocalLatestEventValue};
22use matrix_sdk_base::{
23 check_validity_of_replacement_events, deserialized_responses::TimelineEvent,
24 store::SerializableEventContent,
25};
26use ruma::{
27 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
28 events::{
29 AnyMessageLikeEventContent, AnySyncStateEvent, AnySyncTimelineEvent, SyncStateEvent,
30 relation::Replacement,
31 room::{
32 member::MembershipChange,
33 message::{MessageType, Relation, RoomMessageEventContent},
34 power_levels::RoomPowerLevels,
35 redaction::RoomRedactionEventContent,
36 },
37 },
38};
39use tracing::{debug, error};
40
41use crate::{Room, event_cache::RoomEventCache, room::Invite, send_queue::RoomSendQueueUpdate};
42
43pub(super) struct Builder;
45
46impl Builder {
47 pub async fn new_remote(
49 room_event_cache: &RoomEventCache,
50 current_event: LatestEventValue,
51 own_user_id: &UserId,
52 power_levels: Option<&RoomPowerLevels>,
53 ) -> Option<LatestEventValue> {
54 let mut room_has_been_emptied = true;
62 let mut current_value_must_be_erased = false;
63
64 let mut latest_edit_for_event: HashMap<OwnedEventId, TimelineEvent> = HashMap::new();
66
67 if let Ok(Some(event)) = room_event_cache
68 .rfind_map_event_in_memory_by(|event| {
69 room_has_been_emptied = false;
71
72 match filter_timeline_event(
73 event,
74 current_event.event_id().as_ref(),
75 own_user_id,
76 power_levels,
77 ) {
78 ControlFlow::Continue(FilterContinue {
80 current_value_must_be_erased: erased,
81 edited_event_id,
82 }) => {
83 current_value_must_be_erased = erased;
84
85 if let Some(edited_event_id) = edited_event_id {
86 latest_edit_for_event.entry(edited_event_id).or_insert(event.clone());
89 }
90
91 None
92 }
93
94 ControlFlow::Break(()) => {
96 if let Some(event_id) = event.event_id()
99 && let Some(edit) = latest_edit_for_event.get(&event_id)
100 {
101 let original = event.kind.raw();
102 let original_encryption_info = event.kind.encryption_info();
103
104 let replacement = edit.kind.raw();
105 let replacement_encryption_info = event.kind.encryption_info();
106
107 Some(
108 match check_validity_of_replacement_events(
109 original,
110 original_encryption_info.map(|e| &(**e)),
111 replacement,
112 replacement_encryption_info.map(|e| &(**e)),
113 ) {
114 Ok(_) => edit.clone(),
115 Err(e) => {
116 debug!(
117 "Skipping an edit of a latest event due to the replacement event being invalid: {e}"
118 );
119 event.clone()
120 }
121 },
122 )
123 } else {
124 Some(event.clone())
125 }
126 }
127 }
128 })
129 .await
130 {
131 Some(LatestEventValue::Remote(event))
132 } else {
133 if room_has_been_emptied {
135 current_value_must_be_erased = true;
136 }
137
138 current_value_must_be_erased.then(LatestEventValue::default)
139 }
140 }
141
142 pub async fn new_remote_for_invite(room: &Room) -> LatestEventValue {
143 let (event_id, timestamp, inviter_id) = room
144 .invite_details()
145 .await
146 .map(|Invite { invitee, inviter_id, .. }| {
147 let event = invitee.event();
148
149 (
150 event.event_id().map(ToOwned::to_owned),
153 event.timestamp().map(MilliSecondsSinceUnixEpoch),
157 Some(inviter_id),
158 )
159 })
160 .unwrap_or_else(|_| (None, None, None));
161
162 LatestEventValue::RemoteInvite {
163 event_id,
164 timestamp: timestamp.unwrap_or_else(MilliSecondsSinceUnixEpoch::now),
168 inviter: inviter_id,
169 }
170 }
171
172 pub async fn new_local(
175 send_queue_update: &RoomSendQueueUpdate,
176 buffer_of_values_for_local_events: &mut BufferOfValuesForLocalEvents,
177 room_event_cache: &RoomEventCache,
178 current_event: LatestEventValue,
179 own_user_id: &UserId,
180 power_levels: Option<&RoomPowerLevels>,
181 ) -> Option<LatestEventValue> {
182 use crate::send_queue::{LocalEcho, LocalEchoContent};
183
184 match send_queue_update {
185 RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
189 transaction_id,
190 content: local_echo_content,
191 }) => match local_echo_content {
192 LocalEchoContent::Event { serialized_event: serialized_event_content, .. } => {
193 Some(match serialized_event_content.deserialize() {
194 Ok(content) => {
195 if filter_any_message_like_event_content(
196 content,
197 current_event.event_id().as_ref(),
198 )
199 .is_break()
200 {
201 let local_value = LocalLatestEventValue {
202 timestamp: MilliSecondsSinceUnixEpoch::now(),
203 content: serialized_event_content.clone(),
204 };
205
206 let value =
210 if let Some((_, LatestEventValue::LocalCannotBeSent(_))) =
211 buffer_of_values_for_local_events.last()
212 {
213 LatestEventValue::LocalCannotBeSent(local_value)
214 } else {
215 LatestEventValue::LocalIsSending(local_value)
216 };
217
218 buffer_of_values_for_local_events
219 .push(transaction_id.to_owned(), value.clone());
220
221 value
222 } else {
223 return None;
224 }
225 }
226
227 Err(error) => {
228 error!(
229 ?error,
230 "Failed to deserialize an event from `RoomSendQueueUpdate::NewLocalEvent`"
231 );
232
233 return None;
234 }
235 })
236 }
237
238 LocalEchoContent::React { .. } => None,
239
240 LocalEchoContent::Redaction { .. } => None,
244 },
245
246 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
251 let or = if let Some(position) =
252 buffer_of_values_for_local_events.position(transaction_id)
253 {
254 buffer_of_values_for_local_events.remove(position);
255
256 Some(LatestEventValue::None)
260 } else {
261 None
262 };
263
264 Self::new_local_or_remote(
265 buffer_of_values_for_local_events,
266 room_event_cache,
267 current_event,
268 own_user_id,
269 power_levels,
270 )
271 .await
272 .or(or)
273 }
274
275 RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
284 if let Some(position) =
285 buffer_of_values_for_local_events.mark_is_sending_after(transaction_id)
286 {
287 let (_, value) = buffer_of_values_for_local_events.remove(position);
288
289 if buffer_of_values_for_local_events.last().is_none() {
293 if let Ok(Some(_)) = room_event_cache.find_event(event_id).await
296 && let Some(latest_event_value) = Self::new_remote(
297 room_event_cache,
298 current_event,
299 own_user_id,
300 power_levels,
301 )
302 .await
303 {
304 return Some(latest_event_value);
305 }
306
307 match value {
308 LatestEventValue::LocalIsSending(local_value)
309 | LatestEventValue::LocalCannotBeSent(local_value)
310 | LatestEventValue::LocalHasBeenSent { value: local_value, .. } => {
312 return Some(LatestEventValue::LocalHasBeenSent { event_id: event_id.clone(), value: local_value })
313 }
314 LatestEventValue::Remote(_) | LatestEventValue::RemoteInvite { .. } | LatestEventValue::None => unreachable!("Impossible to get a remote `LatestEventValue`"),
315 };
316 }
317 }
318
319 Self::new_local_or_remote(
320 buffer_of_values_for_local_events,
321 room_event_cache,
322 current_event,
323 own_user_id,
324 power_levels,
325 )
326 .await
327 }
328
329 RoomSendQueueUpdate::ReplacedLocalEvent {
334 transaction_id,
335 new_content: new_serialized_event_content,
336 } => {
337 if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
338 match new_serialized_event_content.deserialize() {
339 Ok(content) => {
340 if filter_any_message_like_event_content(
341 content,
342 current_event.event_id().as_ref(),
343 )
344 .is_break()
345 {
346 buffer_of_values_for_local_events.replace_content(
347 position,
348 new_serialized_event_content.clone(),
349 );
350 } else {
351 buffer_of_values_for_local_events.remove(position);
352 }
353 }
354
355 Err(error) => {
356 error!(
357 ?error,
358 "Failed to deserialize an event from `RoomSendQueueUpdate::ReplacedLocalEvent`"
359 );
360
361 return None;
362 }
363 }
364 }
365
366 Self::new_local_or_remote(
367 buffer_of_values_for_local_events,
368 room_event_cache,
369 current_event,
370 own_user_id,
371 power_levels,
372 )
373 .await
374 }
375
376 RoomSendQueueUpdate::SendError { transaction_id, is_recoverable, .. } => {
382 if *is_recoverable {
383 buffer_of_values_for_local_events.mark_is_sending_from(transaction_id);
387 } else {
388 buffer_of_values_for_local_events.mark_cannot_be_sent_from(transaction_id);
390 }
391
392 Self::new_local_or_remote(
393 buffer_of_values_for_local_events,
394 room_event_cache,
395 current_event,
396 own_user_id,
397 power_levels,
398 )
399 .await
400 }
401
402 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
407 buffer_of_values_for_local_events.mark_is_sending_from(transaction_id);
408
409 Self::new_local_or_remote(
410 buffer_of_values_for_local_events,
411 room_event_cache,
412 current_event,
413 own_user_id,
414 power_levels,
415 )
416 .await
417 }
418
419 RoomSendQueueUpdate::MediaUpload { .. } => None,
423 }
424 }
425
426 async fn new_local_or_remote(
433 buffer_of_values_for_local_events: &mut BufferOfValuesForLocalEvents,
434 room_event_cache: &RoomEventCache,
435 current_event: LatestEventValue,
436 own_user_id: &UserId,
437 power_levels: Option<&RoomPowerLevels>,
438 ) -> Option<LatestEventValue> {
439 if let Some((_, value)) = buffer_of_values_for_local_events.last() {
440 Some(value.clone())
441 } else {
442 Self::new_remote(room_event_cache, current_event, own_user_id, power_levels).await
443 }
444 }
445}
446
447#[derive(Debug)]
487pub(super) struct BufferOfValuesForLocalEvents {
488 buffer: Vec<(OwnedTransactionId, LatestEventValue)>,
489}
490
491impl BufferOfValuesForLocalEvents {
492 pub fn new() -> Self {
494 Self { buffer: Vec::with_capacity(2) }
495 }
496
497 pub fn is_empty(&self) -> bool {
499 self.buffer.is_empty()
500 }
501
502 fn last(&self) -> Option<&(OwnedTransactionId, LatestEventValue)> {
504 self.buffer.last()
505 }
506
507 fn position(&self, transaction_id: &TransactionId) -> Option<usize> {
509 self.buffer
510 .iter()
511 .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
512 }
513
514 fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) {
521 assert!(value.is_local(), "`value` must be a local `LatestEventValue`");
522
523 self.buffer.push((transaction_id, value));
524 }
525
526 fn replace_content(&mut self, position: usize, new_content: SerializableEventContent) {
536 let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid");
537
538 match value {
539 LatestEventValue::LocalIsSending(LocalLatestEventValue { content, .. })
540 | LatestEventValue::LocalCannotBeSent(LocalLatestEventValue { content, .. }) => {
541 *content = new_content;
542 }
543
544 _ => panic!("`value` must be either `LocalIsSending` or `LocalCannotBeSent`"),
545 }
546 }
547
548 fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) {
554 self.buffer.remove(position)
555 }
556
557 fn mark_cannot_be_sent_from(&mut self, transaction_id: &TransactionId) {
560 let mut values = self.buffer.iter_mut();
561
562 if let Some(first_value_to_wedge) = values
563 .by_ref()
564 .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
565 {
566 for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) {
568 if let LatestEventValue::LocalIsSending(content) = value_to_wedge {
569 *value_to_wedge = LatestEventValue::LocalCannotBeSent(content.clone());
570 }
571 }
572 }
573 }
574
575 fn mark_is_sending_from(&mut self, transaction_id: &TransactionId) {
578 let mut values = self.buffer.iter_mut();
579
580 if let Some(first_value_to_unwedge) = values
581 .by_ref()
582 .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
583 {
584 for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) {
586 if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
587 *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
588 }
589 }
590 }
591 }
592
593 fn mark_is_sending_after(&mut self, transaction_id: &TransactionId) -> Option<usize> {
600 let mut values = self.buffer.iter_mut();
601
602 if let Some(position) = values
603 .by_ref()
604 .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
605 {
606 for (_, value_to_unwedge) in values {
608 if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
609 *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
610 }
611 }
612
613 Some(position)
614 } else {
615 None
616 }
617 }
618}
619
620#[derive(Debug)]
622struct FilterContinue {
623 current_value_must_be_erased: bool,
625 edited_event_id: Option<OwnedEventId>,
627}
628
629fn filter_break() -> ControlFlow<(), FilterContinue> {
631 ControlFlow::Break(())
632}
633
634fn filter_continue() -> ControlFlow<(), FilterContinue> {
636 ControlFlow::Continue(FilterContinue {
637 current_value_must_be_erased: false,
638 edited_event_id: None,
639 })
640}
641
642fn filter_continue_with_erasing() -> ControlFlow<(), FilterContinue> {
644 ControlFlow::Continue(FilterContinue {
645 current_value_must_be_erased: true,
646 edited_event_id: None,
647 })
648}
649
650fn filter_continue_with_edit(edited_event_id: OwnedEventId) -> ControlFlow<(), FilterContinue> {
653 ControlFlow::Continue(FilterContinue {
654 current_value_must_be_erased: false,
655 edited_event_id: Some(edited_event_id),
656 })
657}
658
659fn filter_timeline_event(
667 event: &TimelineEvent,
668 current_value_event_id: Option<&OwnedEventId>,
669 own_user_id: &UserId,
670 power_levels: Option<&RoomPowerLevels>,
671) -> ControlFlow<(), FilterContinue> {
672 let event = match event.raw().deserialize() {
675 Ok(event) => event,
676 Err(error) => {
677 error!(
678 ?error,
679 "Failed to deserialize the event when looking for a suitable latest event"
680 );
681
682 return filter_continue();
683 }
684 };
685
686 match event {
687 AnySyncTimelineEvent::MessageLike(message_like_event) => {
688 match message_like_event.original_content() {
689 Some(any_message_like_event_content) => filter_any_message_like_event_content(
690 any_message_like_event_content,
691 current_value_event_id,
692 ),
693
694 None => filter_continue(),
696 }
697 }
698
699 AnySyncTimelineEvent::State(state) => {
700 filter_any_sync_state_event(state, own_user_id, power_levels)
701 }
702 }
703}
704
705fn filter_any_message_like_event_content(
706 event: AnyMessageLikeEventContent,
707 current_value_event_id: Option<&OwnedEventId>,
708) -> ControlFlow<(), FilterContinue> {
709 match event {
710 AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent {
712 msgtype,
713 relates_to,
714 ..
715 }) => {
716 if let MessageType::VerificationRequest(_) = msgtype {
718 return filter_continue();
719 }
720
721 match relates_to {
723 Some(Relation::Replacement(Replacement { event_id, .. })) => {
724 filter_continue_with_edit(event_id)
728 }
729
730 _ => filter_break(),
731 }
732 }
733
734 AnyMessageLikeEventContent::UnstablePollStart(_)
739 | AnyMessageLikeEventContent::CallInvite(_)
740 | AnyMessageLikeEventContent::RtcNotification(_)
741 | AnyMessageLikeEventContent::Sticker(_) => filter_break(),
742
743 AnyMessageLikeEventContent::RoomRedaction(RoomRedactionEventContent {
745 redacts, ..
746 }) => {
747 if redacts.as_ref() == current_value_event_id {
752 filter_continue_with_erasing()
753 } else {
754 filter_continue()
755 }
756 }
757
758 AnyMessageLikeEventContent::RoomEncrypted(_) => {
760 filter_continue()
762 }
763
764 _ => filter_continue(),
766 }
767}
768
769fn filter_any_sync_state_event(
770 event: AnySyncStateEvent,
771 own_user_id: &UserId,
772 power_levels: Option<&RoomPowerLevels>,
773) -> ControlFlow<(), FilterContinue> {
774 match event {
775 AnySyncStateEvent::RoomMember(SyncStateEvent::Original(member)) => {
776 match member.membership_change() {
777 MembershipChange::Knocked => {
778 let can_accept_or_decline_knocks = match power_levels {
779 Some(room_power_levels) => {
780 room_power_levels.user_can_invite(own_user_id)
781 || room_power_levels
782 .user_can_kick_user(own_user_id, &member.state_key)
783 }
784 None => false,
785 };
786
787 if can_accept_or_decline_knocks {
790 return filter_break();
791 }
792
793 filter_continue()
794 }
795
796 MembershipChange::Joined
798 | MembershipChange::Invited
799 | MembershipChange::InvitationAccepted
800 | MembershipChange::KnockAccepted => {
801 if member.state_key.deref() == own_user_id {
809 filter_break()
810 } else {
811 filter_continue()
812 }
813 }
814
815 _ => filter_continue(),
816 }
817 }
818
819 AnySyncStateEvent::BeaconInfo(SyncStateEvent::Original(_)) => filter_break(),
825
826 _ => filter_continue(),
827 }
828}
829
830#[cfg(test)]
831mod filter_tests {
832 use std::ops::Not;
833
834 use assert_matches::assert_matches;
835 use matrix_sdk_test::event_factory::{EventFactory, PreviousMembership};
836 use ruma::{
837 event_id,
838 events::{
839 room::{member::MembershipState, message::RoomMessageEventContent},
840 rtc::notification::NotificationType,
841 },
842 owned_event_id, owned_user_id, user_id,
843 };
844
845 use super::{ControlFlow, FilterContinue, filter_timeline_event};
846
847 macro_rules! assert_latest_event_content {
848 ( event | $event_factory:ident | $event_builder:block
849 is a candidate ) => {
850 assert_latest_event_content!(@_ | $event_factory | $event_builder, ControlFlow::Break(_));
851 };
852
853 ( event | $event_factory:ident | $event_builder:block
854 is not a candidate ) => {
855 assert_latest_event_content!(@_ | $event_factory | $event_builder, ControlFlow::Continue(_));
856 };
857
858 ( @_ | $event_factory:ident | $event_builder:block, $expect:pat) => {
859 let user_id = user_id!("@mnt_io:matrix.org");
860 let event_factory = EventFactory::new().sender(user_id);
861 let event = {
862 let $event_factory = event_factory;
863 $event_builder
864 };
865
866 assert_matches!(
867 filter_timeline_event(&event, None, user_id!("@mnt_io:matrix.org"), None),
868 $expect
869 );
870 };
871 }
872
873 #[test]
874 fn test_room_message() {
875 assert_latest_event_content!(
876 event | event_factory | { event_factory.text_msg("hello").into_event() }
877 is a candidate
878 );
879 }
880
881 #[test]
882 fn test_room_message_replacement() {
883 let user_id = user_id!("@mnt_io:matrix.org");
884 let event_factory = EventFactory::new().sender(user_id);
885 let event_id = event_id!("$ev0");
886 let event = event_factory
887 .text_msg("bonjour")
888 .edit(event_id, RoomMessageEventContent::text_plain("hello").into())
889 .into_event();
890
891 assert_matches!(filter_timeline_event(&event, None, user_id, None), ControlFlow::Continue(FilterContinue { current_value_must_be_erased, edited_event_id }) => {
892 assert!(current_value_must_be_erased.not());
893 assert_eq!(edited_event_id, Some(event_id.to_owned()));
894 }
895 );
896 }
897
898 #[test]
899 fn test_redaction() {
900 let user_id = user_id!("@mnt_io:matrix.org");
901 let event_factory = EventFactory::new().sender(user_id);
902 let event_id = event_id!("$ev0");
903 let event = event_factory.redaction(event_id).into_event();
904
905 {
908 let current_value_event_id = None;
909
910 assert_matches!(
911 filter_timeline_event(&event, current_value_event_id, user_id, None),
912 ControlFlow::Continue(FilterContinue { current_value_must_be_erased, edited_event_id }) => {
913 assert!(current_value_must_be_erased.not());
914 assert!(edited_event_id.is_none());
915 }
916 );
917 }
918
919 {
922 let current_value_event_id = Some(owned_event_id!("$ev1"));
923
924 assert_matches!(
925 filter_timeline_event(&event, current_value_event_id.as_ref(), user_id, None),
926 ControlFlow::Continue(FilterContinue { current_value_must_be_erased, edited_event_id }) => {
927 assert!(current_value_must_be_erased.not());
928 assert!(edited_event_id.is_none());
929 }
930 );
931 }
932
933 {
936 let current_value_event_id = Some(event_id.to_owned());
937
938 assert_matches!(
939 filter_timeline_event(&event, current_value_event_id.as_ref(), user_id, None),
940 ControlFlow::Continue(FilterContinue { current_value_must_be_erased, edited_event_id }) => {
941 assert!(current_value_must_be_erased);
942 assert!(edited_event_id.is_none());
943 }
944 );
945 }
946 }
947
948 #[test]
949 fn test_redacted() {
950 assert_latest_event_content!(
951 event | event_factory | {
952 event_factory
953 .redacted(
954 user_id!("@mnt_io:matrix.org"),
955 ruma::events::room::message::RedactedRoomMessageEventContent::new(),
956 )
957 .event_id(event_id!("$ev0"))
958 .into_event()
959 }
960 is not a candidate
961 );
962 }
963
964 #[test]
965 fn test_poll() {
966 assert_latest_event_content!(
967 event | event_factory | {
968 event_factory
969 .poll_start("the people need to know", "comté > gruyère", vec!["yes", "oui"])
970 .into_event()
971 }
972 is a candidate
973 );
974 }
975
976 #[test]
977 fn test_call_invite() {
978 assert_latest_event_content!(
979 event | event_factory | {
980 event_factory
981 .call_invite(
982 ruma::OwnedVoipId::from("vvooiipp".to_owned()),
983 ruma::UInt::from(1234u32),
984 ruma::events::call::SessionDescription::new(
985 "type".to_owned(),
986 "sdp".to_owned(),
987 ),
988 ruma::VoipVersionId::V1,
989 )
990 .into_event()
991 }
992 is a candidate
993 );
994 }
995
996 #[test]
997 fn test_rtc_notification() {
998 assert_latest_event_content!(
999 event | event_factory | {
1000 event_factory
1001 .rtc_notification(
1002 NotificationType::Ring,
1003 )
1004 .mentions(vec![owned_user_id!("@alice:server.name")])
1005 .relates_to_membership_state_event(ruma::OwnedEventId::try_from("$abc:server.name").unwrap())
1006 .lifetime(60)
1007 .into_event()
1008 }
1009 is a candidate
1010 );
1011 }
1012
1013 #[test]
1014 fn test_sticker() {
1015 assert_latest_event_content!(
1016 event | event_factory | {
1017 event_factory
1018 .sticker(
1019 "wink wink",
1020 ruma::events::room::ImageInfo::new(),
1021 ruma::OwnedMxcUri::from("mxc://foo/bar"),
1022 )
1023 .into_event()
1024 }
1025 is a candidate
1026 );
1027 }
1028
1029 #[test]
1030 fn test_encrypted_room_message() {
1031 assert_latest_event_content!(
1032 event | event_factory | {
1033 event_factory
1034 .event(ruma::events::room::encrypted::RoomEncryptedEventContent::new(
1035 ruma::events::room::encrypted::EncryptedEventScheme::MegolmV1AesSha2(
1036 ruma::events::room::encrypted::MegolmV1AesSha2ContentInit {
1037 ciphertext: "cipher".to_owned(),
1038 sender_key: "sender_key".to_owned(),
1039 device_id: "device_id".into(),
1040 session_id: "session_id".to_owned(),
1041 }
1042 .into(),
1043 ),
1044 None,
1045 ))
1046 .into_event()
1047 }
1048 is not a candidate
1049 );
1050 }
1051
1052 #[test]
1053 fn test_reaction() {
1054 assert_latest_event_content!(
1056 event | event_factory | { event_factory.reaction(event_id!("$ev0"), "+1").into_event() }
1057 is not a candidate
1058 );
1059 }
1060
1061 #[test]
1062 fn test_state_event() {
1063 assert_latest_event_content!(
1064 event | event_factory | { event_factory.room_topic("new room topic").into_event() }
1065 is not a candidate
1066 );
1067 }
1068
1069 #[test]
1070 fn test_knocked_without_power_levels() {
1071 assert_latest_event_content!(
1072 event | event_factory | {
1073 event_factory
1074 .member(user_id!("@other_mnt_io:server.name"))
1075 .membership(MembershipState::Knock)
1076 .into_event()
1077 }
1078 is not a candidate
1079 );
1080 }
1081
1082 #[test]
1083 fn test_knocked_with_power_levels() {
1084 use ruma::{
1085 events::room::power_levels::{RoomPowerLevels, RoomPowerLevelsSource},
1086 room_version_rules::AuthorizationRules,
1087 };
1088
1089 let user_id = user_id!("@mnt_io:matrix.org");
1090 let other_user_id = user_id!("@other_mnt_io:server.name");
1091 let event_factory = EventFactory::new().sender(user_id);
1092 let event =
1093 event_factory.member(other_user_id).membership(MembershipState::Knock).into_event();
1094
1095 let mut room_power_levels =
1096 RoomPowerLevels::new(RoomPowerLevelsSource::None, &AuthorizationRules::V1, []);
1097 room_power_levels.users.insert(user_id.to_owned(), 5.into());
1098 room_power_levels.users.insert(other_user_id.to_owned(), 4.into());
1099
1100 {
1102 room_power_levels.invite = 10.into();
1103 room_power_levels.kick = 10.into();
1104 assert!(
1105 filter_timeline_event(&event, None, user_id, Some(&room_power_levels))
1106 .is_continue(),
1107 "cannot accept, cannot decline",
1108 );
1109 }
1110
1111 {
1113 room_power_levels.invite = 0.into();
1114 room_power_levels.kick = 10.into();
1115 assert!(
1116 filter_timeline_event(&event, None, user_id, Some(&room_power_levels)).is_break(),
1117 "can accept, cannot decline",
1118 );
1119 }
1120
1121 {
1123 room_power_levels.invite = 10.into();
1124 room_power_levels.kick = 0.into();
1125 assert!(
1126 filter_timeline_event(&event, None, user_id, Some(&room_power_levels)).is_break(),
1127 "cannot accept, can decline",
1128 );
1129 }
1130
1131 {
1133 room_power_levels.invite = 0.into();
1134 room_power_levels.kick = 0.into();
1135 assert!(
1136 filter_timeline_event(&event, None, user_id, Some(&room_power_levels)).is_break(),
1137 "can accept, can decline",
1138 );
1139 }
1140
1141 {
1145 room_power_levels.users.insert(user_id.to_owned(), 5.into());
1146 room_power_levels.users.insert(other_user_id.to_owned(), 5.into());
1147
1148 room_power_levels.invite = 10.into();
1149 room_power_levels.kick = 0.into();
1150
1151 assert!(
1152 filter_timeline_event(&event, None, user_id, Some(&room_power_levels))
1153 .is_continue(),
1154 "cannot accept, can decline, at least same user levels",
1155 );
1156 }
1157 }
1158
1159 #[test]
1160 fn test_knock_accepted() {
1161 assert_latest_event_content!(
1163 event | event_factory | {
1164 event_factory
1165 .member(user_id!("@mnt_io:matrix.org"))
1166 .membership(MembershipState::Invite)
1167 .previous(MembershipState::Knock)
1168 .into_event()
1169 }
1170 is a candidate
1171 );
1172 }
1173
1174 #[test]
1175 fn test_knock_accepted_from_someone_else() {
1176 assert_latest_event_content!(
1178 event | event_factory | {
1179 event_factory
1180 .member(user_id!("@other_mnt_io:server.name"))
1181 .membership(MembershipState::Invite)
1182 .previous(MembershipState::Knock)
1183 .into_event()
1184 }
1185 is not a candidate
1186 );
1187 }
1188
1189 #[test]
1190 fn test_joined() {
1191 assert_latest_event_content!(
1193 event | event_factory | {
1194 event_factory
1195 .member(user_id!("@mnt_io:matrix.org"))
1196 .membership(MembershipState::Join)
1197 .into_event()
1198 }
1199 is a candidate
1200 );
1201 }
1202
1203 #[test]
1204 fn test_joined_from_left() {
1205 assert_latest_event_content!(
1207 event | event_factory | {
1208 event_factory
1209 .member(user_id!("@mnt_io:matrix.org"))
1210 .membership(MembershipState::Join)
1211 .previous(MembershipState::Leave)
1212 .into_event()
1213 }
1214 is a candidate
1215 );
1216 }
1217
1218 #[test]
1219 fn test_joined_for_someone_else() {
1220 use ruma::events::room::member::MembershipState;
1221
1222 assert_latest_event_content!(
1224 event | event_factory | {
1225 event_factory
1226 .member(user_id!("@other_mnt_io:server.name"))
1227 .membership(MembershipState::Join)
1228 .into_event()
1229 }
1230 is not a candidate
1231 );
1232 }
1233
1234 #[test]
1235 fn test_invited() {
1236 assert_latest_event_content!(
1237 event | event_factory | {
1238 event_factory
1239 .member(user_id!("@mnt_io:matrix.org"))
1240 .membership(MembershipState::Invite)
1241 .into_event()
1242 }
1243 is a candidate
1244 );
1245 }
1246
1247 #[test]
1248 fn test_invited_from_left() {
1249 assert_latest_event_content!(
1251 event | event_factory | {
1252 event_factory
1253 .member(user_id!("@mnt_io:matrix.org"))
1254 .membership(MembershipState::Invite)
1255 .previous(MembershipState::Leave)
1256 .into_event()
1257 }
1258 is a candidate
1259 );
1260 }
1261
1262 #[test]
1263 fn test_invited_for_someone_else() {
1264 assert_latest_event_content!(
1266 event | event_factory | {
1267 event_factory
1268 .member(user_id!("@other_mnt_io:server.name"))
1269 .membership(MembershipState::Invite)
1270 .into_event()
1271 }
1272 is not a candidate
1273 );
1274 }
1275
1276 #[test]
1277 fn test_invitation_accepted() {
1278 assert_latest_event_content!(
1280 event | event_factory | {
1281 event_factory
1282 .member(user_id!("@mnt_io:matrix.org"))
1283 .membership(MembershipState::Join)
1284 .previous(MembershipState::Invite)
1285 .into_event()
1286 }
1287 is a candidate
1288 );
1289 }
1290
1291 #[test]
1292 fn test_invitation_accepted_from_someone_else() {
1293 assert_latest_event_content!(
1295 event | event_factory | {
1296 event_factory
1297 .member(user_id!("@other_mnt_io:server.name"))
1298 .membership(MembershipState::Join)
1299 .previous(MembershipState::Invite)
1300 .into_event()
1301 }
1302 is not a candidate
1303 );
1304 }
1305
1306 #[test]
1307 fn test_profile_changed() {
1308 assert_latest_event_content!(
1310 event | event_factory | {
1311 event_factory
1312 .member(user_id!("@mnt_io:matrix.org"))
1313 .membership(MembershipState::Join)
1314 .previous(PreviousMembership::new(MembershipState::Join).display_name("Coucou"))
1315 .into_event()
1316 }
1317 is not a candidate
1318 );
1319 }
1320
1321 #[test]
1322 fn test_beacon_info_live() {
1323 use std::time::Duration;
1324
1325 assert_latest_event_content!(
1327 event | event_factory | {
1328 event_factory
1329 .beacon_info(None, Duration::from_secs(60), true, None)
1330 .state_key(user_id!("@mnt_io:matrix.org"))
1331 .into_event()
1332 }
1333 is a candidate
1334 );
1335 }
1336
1337 #[test]
1338 fn test_beacon_info_stopped() {
1339 use std::time::Duration;
1340
1341 assert_latest_event_content!(
1344 event | event_factory | {
1345 event_factory
1346 .beacon_info(None, Duration::from_secs(60), false, None)
1347 .state_key(user_id!("@mnt_io:matrix.org"))
1348 .into_event()
1349 }
1350 is a candidate
1351 );
1352 }
1353
1354 #[test]
1355 fn test_room_message_verification_request() {
1356 use ruma::{OwnedDeviceId, events::room::message};
1357
1358 assert_latest_event_content!(
1359 event | event_factory | {
1360 event_factory
1361 .event(RoomMessageEventContent::new(message::MessageType::VerificationRequest(
1362 message::KeyVerificationRequestEventContent::new(
1363 "body".to_owned(),
1364 vec![],
1365 OwnedDeviceId::from("device_id"),
1366 owned_user_id!("@user:server.name"),
1367 ),
1368 )))
1369 .into_event()
1370 }
1371 is not a candidate
1372 );
1373 }
1374}
1375
1376#[cfg(test)]
1377mod buffer_of_values_for_local_event_tests {
1378 use assert_matches::assert_matches;
1379 use ruma::{
1380 OwnedTransactionId,
1381 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
1382 owned_event_id,
1383 serde::Raw,
1384 };
1385 use serde_json::json;
1386
1387 use super::{
1388 super::{super::local_room_message, RemoteLatestEventValue},
1389 BufferOfValuesForLocalEvents, LatestEventValue, LocalLatestEventValue,
1390 };
1391
1392 fn remote_room_message(body: &str) -> RemoteLatestEventValue {
1393 RemoteLatestEventValue::from_plaintext(
1394 Raw::from_json_string(
1395 json!({
1396 "content": RoomMessageEventContent::text_plain(body),
1397 "type": "m.room.message",
1398 "event_id": "$ev0",
1399 "origin_server_ts": 42,
1400 "sender": "@mnt_io:matrix.org",
1401 })
1402 .to_string(),
1403 )
1404 .unwrap(),
1405 )
1406 }
1407
1408 #[test]
1409 fn test_last() {
1410 let mut buffer = BufferOfValuesForLocalEvents::new();
1411
1412 assert!(buffer.last().is_none());
1413
1414 let transaction_id = OwnedTransactionId::from("txnid");
1415 buffer.push(
1416 transaction_id.clone(),
1417 LatestEventValue::LocalIsSending(local_room_message("tome")),
1418 );
1419
1420 assert_matches!(
1421 buffer.last(),
1422 Some((expected_transaction_id, LatestEventValue::LocalIsSending(_))) => {
1423 assert_eq!(expected_transaction_id, &transaction_id);
1424 }
1425 );
1426 }
1427
1428 #[test]
1429 fn test_position() {
1430 let mut buffer = BufferOfValuesForLocalEvents::new();
1431 let transaction_id = OwnedTransactionId::from("txnid");
1432
1433 assert!(buffer.position(&transaction_id).is_none());
1434
1435 buffer.push(
1436 transaction_id.clone(),
1437 LatestEventValue::LocalIsSending(local_room_message("raclette")),
1438 );
1439 buffer.push(
1440 OwnedTransactionId::from("othertxnid"),
1441 LatestEventValue::LocalIsSending(local_room_message("tome")),
1442 );
1443
1444 assert_eq!(buffer.position(&transaction_id), Some(0));
1445 }
1446
1447 #[test]
1448 #[should_panic]
1449 fn test_push_none() {
1450 let mut buffer = BufferOfValuesForLocalEvents::new();
1451
1452 buffer.push(OwnedTransactionId::from("txnid"), LatestEventValue::None);
1453 }
1454
1455 #[test]
1456 #[should_panic]
1457 fn test_push_remote() {
1458 let mut buffer = BufferOfValuesForLocalEvents::new();
1459
1460 buffer.push(
1461 OwnedTransactionId::from("txnid"),
1462 LatestEventValue::Remote(remote_room_message("tome")),
1463 );
1464 }
1465
1466 #[test]
1467 fn test_push_local() {
1468 let mut buffer = BufferOfValuesForLocalEvents::new();
1469
1470 buffer.push(
1471 OwnedTransactionId::from("txnid0"),
1472 LatestEventValue::LocalIsSending(local_room_message("tome")),
1473 );
1474 buffer.push(
1475 OwnedTransactionId::from("txnid1"),
1476 LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1477 );
1478 buffer.push(
1479 OwnedTransactionId::from("txnid1"),
1480 LatestEventValue::LocalHasBeenSent {
1481 event_id: owned_event_id!("$ev0"),
1482 value: local_room_message("raclette"),
1483 },
1484 );
1485
1486 }
1488
1489 #[test]
1490 #[should_panic]
1491 fn test_replace_content_on_remote() {
1492 let mut buffer = BufferOfValuesForLocalEvents::new();
1493
1494 buffer.push(
1495 OwnedTransactionId::from("txnid"),
1496 LatestEventValue::Remote(remote_room_message("gruyère")),
1497 );
1498
1499 let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1500
1501 buffer.replace_content(0, new_content);
1502 }
1503
1504 #[test]
1505 #[should_panic]
1506 fn test_replace_content_on_local_has_been_sent() {
1507 let mut buffer = BufferOfValuesForLocalEvents::new();
1508
1509 buffer.push(
1510 OwnedTransactionId::from("txnid"),
1511 LatestEventValue::LocalHasBeenSent {
1512 event_id: owned_event_id!("$ev0"),
1513 value: local_room_message("gruyère"),
1514 },
1515 );
1516
1517 let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1518
1519 buffer.replace_content(0, new_content);
1520 }
1521
1522 #[test]
1523 fn test_replace_content_on_local_is_sending() {
1524 let mut buffer = BufferOfValuesForLocalEvents::new();
1525
1526 let transaction_id = OwnedTransactionId::from("txnid0");
1527 buffer.push(
1528 transaction_id.clone(),
1529 LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1530 );
1531
1532 let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1533
1534 buffer.replace_content(0, new_content);
1535
1536 assert_matches!(
1537 buffer.last(),
1538 Some((expected_transaction_id, LatestEventValue::LocalIsSending(local_event))) => {
1539 assert_eq!(expected_transaction_id, &transaction_id);
1540 assert_matches!(
1541 local_event.content.deserialize().unwrap(),
1542 AnyMessageLikeEventContent::RoomMessage(content) => {
1543 assert_eq!(content.body(), "comté");
1544 }
1545 );
1546 }
1547 );
1548 }
1549
1550 #[test]
1551 fn test_replace_content_on_local_cannot_be_sent() {
1552 let mut buffer = BufferOfValuesForLocalEvents::new();
1553
1554 let transaction_id = OwnedTransactionId::from("txnid0");
1555 buffer.push(
1556 transaction_id.clone(),
1557 LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1558 );
1559
1560 let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1561
1562 buffer.replace_content(0, new_content);
1563
1564 assert_matches!(
1565 buffer.last(),
1566 Some((expected_transaction_id, LatestEventValue::LocalCannotBeSent(local_event))) => {
1567 assert_eq!(expected_transaction_id, &transaction_id);
1568 assert_matches!(
1569 local_event.content.deserialize().unwrap(),
1570 AnyMessageLikeEventContent::RoomMessage(content) => {
1571 assert_eq!(content.body(), "comté");
1572 }
1573 );
1574 }
1575 );
1576 }
1577
1578 #[test]
1579 fn test_remove() {
1580 let mut buffer = BufferOfValuesForLocalEvents::new();
1581
1582 buffer.push(
1583 OwnedTransactionId::from("txnid"),
1584 LatestEventValue::LocalIsSending(local_room_message("gryuère")),
1585 );
1586
1587 assert!(buffer.last().is_some());
1588
1589 buffer.remove(0);
1590
1591 assert!(buffer.last().is_none());
1592 }
1593
1594 #[test]
1595 fn test_mark_cannot_be_sent_from() {
1596 let mut buffer = BufferOfValuesForLocalEvents::new();
1597 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1598 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1599 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1600
1601 buffer.push(
1602 transaction_id_0,
1603 LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1604 );
1605 buffer.push(
1606 transaction_id_1.clone(),
1607 LatestEventValue::LocalIsSending(local_room_message("brigand")),
1608 );
1609 buffer.push(
1610 transaction_id_2,
1611 LatestEventValue::LocalIsSending(local_room_message("raclette")),
1612 );
1613
1614 buffer.mark_cannot_be_sent_from(&transaction_id_1);
1615
1616 assert_eq!(buffer.buffer.len(), 3);
1617 assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
1618 assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1619 assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalCannotBeSent(_));
1620 }
1621
1622 #[test]
1623 fn test_mark_is_sending_from() {
1624 let mut buffer = BufferOfValuesForLocalEvents::new();
1625 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1626 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1627 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1628
1629 buffer.push(
1630 transaction_id_0,
1631 LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1632 );
1633 buffer.push(
1634 transaction_id_1.clone(),
1635 LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1636 );
1637 buffer.push(
1638 transaction_id_2,
1639 LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1640 );
1641
1642 buffer.mark_is_sending_from(&transaction_id_1);
1643
1644 assert_eq!(buffer.buffer.len(), 3);
1645 assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1646 assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
1647 assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1648 }
1649
1650 #[test]
1651 fn test_mark_is_sending_after() {
1652 let mut buffer = BufferOfValuesForLocalEvents::new();
1653 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1654 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1655 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1656
1657 buffer.push(
1658 transaction_id_0,
1659 LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1660 );
1661 buffer.push(
1662 transaction_id_1.clone(),
1663 LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1664 );
1665 buffer.push(
1666 transaction_id_2,
1667 LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1668 );
1669
1670 buffer.mark_is_sending_after(&transaction_id_1);
1671
1672 assert_eq!(buffer.buffer.len(), 3);
1673 assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1674 assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1675 assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1676 }
1677}
1678
1679#[cfg(all(not(target_family = "wasm"), test))]
1680mod builder_tests {
1681 use std::sync::Arc;
1682
1683 use assert_matches::assert_matches;
1684 use matrix_sdk_base::{
1685 RoomState,
1686 deserialized_responses::TimelineEventKind,
1687 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1688 store::{ChildTransactionId, SerializableEventContent},
1689 };
1690 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1691 use ruma::{
1692 EventId, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedTransactionId, event_id,
1693 events::{
1694 AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
1695 SyncMessageLikeEvent, reaction::ReactionEventContent, relation::Annotation,
1696 room::message::RoomMessageEventContent,
1697 },
1698 owned_event_id, owned_room_id, room_id,
1699 serde::Raw,
1700 user_id,
1701 };
1702 use serde_json::json;
1703
1704 use super::{
1705 super::RemoteLatestEventValue, BufferOfValuesForLocalEvents, Builder, LatestEventValue,
1706 RoomEventCache, RoomSendQueueUpdate,
1707 };
1708 use crate::{
1709 Client, Error,
1710 send_queue::{
1711 AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle,
1712 SendReactionHandle, SendRedactionHandle,
1713 },
1714 test_utils::mocks::MatrixMockServer,
1715 };
1716
1717 macro_rules! assert_remote_value_matches_room_message_with_body {
1718 ( $latest_event_value:expr => with body = $body:expr ) => {{
1719 let latest_event_value = $latest_event_value;
1720
1721 assert_matches!(
1722 latest_event_value,
1723 Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event: ref event }, .. })) => {
1724 assert_matches!(
1725 event.deserialize().unwrap(),
1726 AnySyncTimelineEvent::MessageLike(
1727 AnySyncMessageLikeEvent::RoomMessage(
1728 SyncMessageLikeEvent::Original(message_content)
1729 )
1730 ) => {
1731 assert_eq!(message_content.content.body(), $body);
1732 }
1733 );
1734
1735 latest_event_value.unwrap()
1736 }
1737 )
1738 }};
1739 }
1740
1741 macro_rules! assert_local_value_matches_room_message_with_body {
1742 ( $latest_event_value:expr, $pattern:path => with body = $body:expr ) => {{
1743 let latest_event_value = $latest_event_value;
1744
1745 assert_matches!(
1746 latest_event_value,
1747 Some( $pattern (ref local_event)) => {
1748 assert_matches!(
1749 local_event.content.deserialize().unwrap(),
1750 AnyMessageLikeEventContent::RoomMessage(message_content) => {
1751 assert_eq!(message_content.body(), $body);
1752 }
1753 );
1754
1755 latest_event_value.unwrap()
1756 }
1757 )
1758 }};
1759
1760 ( $latest_event_value:expr, $pattern:path {
1761 $local_value:ident with body = $body:expr
1762 $( , $field:ident => $more:block )*
1763 } ) => {{
1764 let latest_event_value = $latest_event_value;
1765
1766 assert_matches!(
1767 latest_event_value,
1768 Some( $pattern { ref $local_value, $( ref $field, )* .. }) => {
1769 assert_matches!(
1770 $local_value .content.deserialize().unwrap(),
1771 AnyMessageLikeEventContent::RoomMessage(message_content) => {
1772 assert_eq!(message_content.body(), $body);
1773
1774 $({
1775 let $field = $field;
1776 $more
1777 })*
1778 }
1779 );
1780
1781 latest_event_value.unwrap()
1782 }
1783 )
1784 }};
1785 }
1786
1787 fn remote_room_message(event_id: &EventId, body: &str) -> RemoteLatestEventValue {
1788 RemoteLatestEventValue::from_plaintext(
1789 Raw::from_json_string(
1790 json!({
1791 "content": RoomMessageEventContent::text_plain(body),
1792 "type": "m.room.message",
1793 "event_id": event_id,
1794 "origin_server_ts": 42,
1795 "sender": "@mnt_io:matrix.org",
1796 })
1797 .to_string(),
1798 )
1799 .unwrap(),
1800 )
1801 }
1802
1803 #[async_test]
1804 async fn test_remote_is_scanning_event_backwards_from_event_cache() {
1805 let room_id = room_id!("!r0");
1806 let user_id = user_id!("@mnt_io:matrix.org");
1807 let event_factory = EventFactory::new().sender(user_id).room(room_id);
1808 let event_id_0 = event_id!("$ev0");
1809 let event_id_1 = event_id!("$ev1");
1810 let event_id_2 = event_id!("$ev2");
1811
1812 let server = MatrixMockServer::new().await;
1813 let client = server.client_builder().build().await;
1814
1815 {
1817 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1819
1820 client
1822 .event_cache_store()
1823 .lock()
1824 .await
1825 .expect("Could not acquire the event cache lock")
1826 .as_clean()
1827 .expect("Could not acquire a clean event cache lock")
1828 .handle_linked_chunk_updates(
1829 LinkedChunkId::Room(room_id),
1830 vec![
1831 Update::NewItemsChunk {
1832 previous: None,
1833 new: ChunkIdentifier::new(0),
1834 next: None,
1835 },
1836 Update::PushItems {
1837 at: Position::new(ChunkIdentifier::new(0), 0),
1838 items: vec![
1839 event_factory.text_msg("hello").event_id(event_id_0).into(),
1841 event_factory.text_msg("world").event_id(event_id_1).into(),
1843 event_factory
1845 .room_topic("new room topic")
1846 .event_id(event_id_2)
1847 .into(),
1848 ],
1849 },
1850 ],
1851 )
1852 .await
1853 .unwrap();
1854 }
1855
1856 let event_cache = client.event_cache();
1857 event_cache.subscribe().unwrap();
1858
1859 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1860
1861 assert_remote_value_matches_room_message_with_body!(
1862 Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None).await => with body = "world"
1866 );
1867 }
1868
1869 #[async_test]
1870 async fn test_remote_without_a_candidate() {
1871 let room_id = room_id!("!r0");
1872
1873 let server = MatrixMockServer::new().await;
1874 let client = server.client_builder().build().await;
1875 let user_id = client.user_id().unwrap();
1876 let event_factory = EventFactory::new().sender(user_id).room(room_id);
1877
1878 let room = client.base_client().get_or_create_room(room_id, RoomState::Joined);
1879
1880 client
1882 .event_cache_store()
1883 .lock()
1884 .await
1885 .expect("Could not acquire the event cache lock")
1886 .as_clean()
1887 .expect("Could not acquire a clean event cache lock")
1888 .handle_linked_chunk_updates(
1889 LinkedChunkId::Room(room_id),
1890 vec![
1891 Update::NewItemsChunk {
1892 previous: None,
1893 new: ChunkIdentifier::new(0),
1894 next: None,
1895 },
1896 Update::PushItems {
1897 at: Position::new(ChunkIdentifier::new(0), 0),
1898 items: vec![event_factory.room_topic("new room topic").into()],
1899 },
1900 ],
1901 )
1902 .await
1903 .unwrap();
1904
1905 let event_cache = client.event_cache();
1906 event_cache.subscribe().unwrap();
1907
1908 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1909
1910 let current_value = {
1912 let value = room.latest_event();
1913
1914 assert_matches!(value, LatestEventValue::None);
1915
1916 value
1917 };
1918
1919 assert_matches!(
1923 Builder::new_remote(&room_event_cache, current_value, user_id, None,).await,
1924 None
1925 );
1926 }
1927
1928 #[async_test]
1929 async fn test_remote_with_a_candidate() {
1930 let room_id = room_id!("!r0");
1931
1932 let server = MatrixMockServer::new().await;
1933 let client = server.client_builder().build().await;
1934 let user_id = client.user_id().unwrap();
1935 let event_factory = EventFactory::new().sender(user_id).room(room_id);
1936
1937 let room = client.base_client().get_or_create_room(room_id, RoomState::Joined);
1938
1939 client
1941 .event_cache_store()
1942 .lock()
1943 .await
1944 .expect("Could not acquire the event cache lock")
1945 .as_clean()
1946 .expect("Could not acquire a clean event cache lock")
1947 .handle_linked_chunk_updates(
1948 LinkedChunkId::Room(room_id),
1949 vec![
1950 Update::NewItemsChunk {
1951 previous: None,
1952 new: ChunkIdentifier::new(0),
1953 next: None,
1954 },
1955 Update::PushItems {
1956 at: Position::new(ChunkIdentifier::new(0), 0),
1957 items: vec![
1958 event_factory.text_msg("hello").event_id(event_id!("$ev0")).into(),
1959 ],
1960 },
1961 ],
1962 )
1963 .await
1964 .unwrap();
1965
1966 let event_cache = client.event_cache();
1967 event_cache.subscribe().unwrap();
1968
1969 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1970
1971 let current_value = {
1973 let value = room.latest_event();
1974
1975 assert_matches!(value, LatestEventValue::None);
1976
1977 value
1978 };
1979
1980 assert_remote_value_matches_room_message_with_body!(
1986 Builder::new_remote(&room_event_cache, current_value, user_id, None).await => with body = "hello"
1987 );
1988 }
1989
1990 #[async_test]
1991 async fn test_remote_without_a_candidate_but_with_an_existing_latest_event_value() {
1992 let room_id = room_id!("!r0");
1993
1994 let server = MatrixMockServer::new().await;
1995 let client = server.client_builder().build().await;
1996 let user_id = client.user_id().unwrap();
1997 let event_factory = EventFactory::new().sender(user_id).room(room_id);
1998
1999 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2000
2001 client
2003 .event_cache_store()
2004 .lock()
2005 .await
2006 .expect("Could not acquire the event cache lock")
2007 .as_clean()
2008 .expect("Could not acquire a clean event cache lock")
2009 .handle_linked_chunk_updates(
2010 LinkedChunkId::Room(room_id),
2011 vec![
2012 Update::NewItemsChunk {
2013 previous: None,
2014 new: ChunkIdentifier::new(1),
2015 next: None,
2016 },
2017 Update::PushItems {
2018 at: Position::new(ChunkIdentifier::new(1), 0),
2019 items: vec![event_factory.room_topic("new room topic").into()],
2020 },
2021 ],
2022 )
2023 .await
2024 .unwrap();
2025
2026 let event_cache = client.event_cache();
2027 event_cache.subscribe().unwrap();
2028
2029 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2030
2031 let current_value =
2033 LatestEventValue::Remote(remote_room_message(event_id!("$ev0"), "hello"));
2034
2035 assert_matches!(
2039 Builder::new_remote(&room_event_cache, current_value, user_id, None).await,
2040 None
2041 );
2042 }
2043
2044 #[async_test]
2045 async fn test_remote_without_a_candidate_but_with_an_erasable_existing_latest_event_value() {
2046 let room_id = room_id!("!r0");
2047 let event_id = event_id!("$ev0");
2048
2049 let server = MatrixMockServer::new().await;
2050 let client = server.client_builder().build().await;
2051 let user_id = client.user_id().unwrap();
2052 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2053
2054 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2055
2056 client
2058 .event_cache_store()
2059 .lock()
2060 .await
2061 .expect("Could not acquire the event cache lock")
2062 .as_clean()
2063 .expect("Could not acquire a clean event cache lock")
2064 .handle_linked_chunk_updates(
2065 LinkedChunkId::Room(room_id),
2066 vec![
2067 Update::NewItemsChunk {
2068 previous: None,
2069 new: ChunkIdentifier::new(1),
2070 next: None,
2071 },
2072 Update::PushItems {
2073 at: Position::new(ChunkIdentifier::new(1), 0),
2074 items: vec![event_factory.redaction(event_id).into()],
2075 },
2076 ],
2077 )
2078 .await
2079 .unwrap();
2080
2081 let event_cache = client.event_cache();
2082 event_cache.subscribe().unwrap();
2083
2084 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2085
2086 let current_value = LatestEventValue::Remote(remote_room_message(event_id, "hello"));
2088
2089 assert_matches!(
2096 Builder::new_remote(&room_event_cache, current_value, user_id, None).await,
2097 Some(LatestEventValue::None)
2098 );
2099 }
2100
2101 #[async_test]
2102 async fn test_remote_with_a_candidate_and_an_erasable_existing_latest_event_value() {
2103 let room_id = room_id!("!r0");
2104 let event_id = event_id!("$ev0");
2105
2106 let server = MatrixMockServer::new().await;
2107 let client = server.client_builder().build().await;
2108 let user_id = client.user_id().unwrap();
2109 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2110
2111 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2112
2113 client
2115 .event_cache_store()
2116 .lock()
2117 .await
2118 .expect("Could not acquire the event cache lock")
2119 .as_clean()
2120 .expect("Could not acquire a clean event cache lock")
2121 .handle_linked_chunk_updates(
2122 LinkedChunkId::Room(room_id),
2123 vec![
2124 Update::NewItemsChunk {
2125 previous: None,
2126 new: ChunkIdentifier::new(1),
2127 next: None,
2128 },
2129 Update::PushItems {
2130 at: Position::new(ChunkIdentifier::new(1), 0),
2131 items: vec![
2132 event_factory.redaction(event_id).into(),
2133 event_factory.text_msg("world").into(),
2134 ],
2135 },
2136 ],
2137 )
2138 .await
2139 .unwrap();
2140
2141 let event_cache = client.event_cache();
2142 event_cache.subscribe().unwrap();
2143
2144 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2145
2146 let current_value = LatestEventValue::Remote(remote_room_message(event_id, "hello"));
2148
2149 assert_remote_value_matches_room_message_with_body!(
2156 Builder::new_remote(&room_event_cache, current_value, user_id, None).await => with body = "world"
2157 );
2158 }
2159
2160 #[async_test]
2161 async fn test_remote_when_room_has_been_emptied() {
2162 let room_id = room_id!("!r0");
2163
2164 let server = MatrixMockServer::new().await;
2165 let client = server.client_builder().build().await;
2166 let user_id = client.user_id().unwrap();
2167 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2168
2169 let room = client.base_client().get_or_create_room(room_id, RoomState::Joined);
2170
2171 client
2173 .event_cache_store()
2174 .lock()
2175 .await
2176 .expect("Could not acquire the event cache lock")
2177 .as_clean()
2178 .expect("Could not acquire a clean event cache lock")
2179 .handle_linked_chunk_updates(
2180 LinkedChunkId::Room(room_id),
2181 vec![
2182 Update::NewItemsChunk {
2183 previous: None,
2184 new: ChunkIdentifier::new(0),
2185 next: None,
2186 },
2187 Update::PushItems {
2188 at: Position::new(ChunkIdentifier::new(0), 0),
2189 items: vec![
2190 event_factory.text_msg("hello").event_id(event_id!("$ev0")).into(),
2191 ],
2192 },
2193 ],
2194 )
2195 .await
2196 .unwrap();
2197
2198 let event_cache = client.event_cache();
2199 event_cache.subscribe().unwrap();
2200
2201 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2202
2203 let current_value = {
2205 let value = room.latest_event();
2206
2207 assert_matches!(value, LatestEventValue::None);
2208
2209 value
2210 };
2211
2212 let current_value = assert_remote_value_matches_room_message_with_body!(
2215 Builder::new_remote(&room_event_cache, current_value, user_id, None).await => with body = "hello"
2216 );
2217
2218 event_cache.clear_all_rooms().await.unwrap();
2220
2221 assert_matches!(
2223 Builder::new_remote(&room_event_cache, current_value, user_id, None).await,
2224 Some(LatestEventValue::None)
2225 );
2226 }
2227
2228 #[async_test]
2229 async fn test_remote_edit() {
2230 let room_id = room_id!("!r0");
2231 let user_id = user_id!("@mnt_io:matrix.org");
2232 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2233 let event_id_0 = event_id!("$ev0");
2234 let event_id_1 = event_id!("$ev1");
2235
2236 let server = MatrixMockServer::new().await;
2237 let client = server.client_builder().build().await;
2238
2239 {
2241 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2243
2244 client
2246 .event_cache_store()
2247 .lock()
2248 .await
2249 .expect("Could not acquire the event cache lock")
2250 .as_clean()
2251 .expect("Could not acquire a clean event cache lock")
2252 .handle_linked_chunk_updates(
2253 LinkedChunkId::Room(room_id),
2254 vec![
2255 Update::NewItemsChunk {
2256 previous: None,
2257 new: ChunkIdentifier::new(0),
2258 next: None,
2259 },
2260 Update::PushItems {
2261 at: Position::new(ChunkIdentifier::new(0), 0),
2262 items: vec![
2263 event_factory.text_msg("hello").event_id(event_id_0).into(),
2265 event_factory
2267 .text_msg("* goodbye")
2268 .event_id(event_id_1)
2269 .edit(
2270 event_id_0,
2271 RoomMessageEventContent::text_plain("goodbye").into(),
2272 )
2273 .into(),
2274 ],
2275 },
2276 ],
2277 )
2278 .await
2279 .unwrap();
2280 }
2281
2282 let event_cache = client.event_cache();
2283 event_cache.subscribe().unwrap();
2284
2285 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2286
2287 assert_remote_value_matches_room_message_with_body!(
2288 Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None).await => with body = "* goodbye"
2290 );
2291 }
2292
2293 #[async_test]
2294 async fn test_remote_edit_invalid_edit() {
2295 let room_id = room_id!("!r0");
2296 let user_id = user_id!("@mnt_io:matrix.org");
2297 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2298 let event_id_0 = event_id!("$ev0");
2299 let event_id_1 = event_id!("$ev1");
2300
2301 let server = MatrixMockServer::new().await;
2302 let client = server.client_builder().build().await;
2303
2304 {
2306 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2308
2309 client
2311 .event_cache_store()
2312 .lock()
2313 .await
2314 .expect("Could not acquire the event cache lock")
2315 .as_clean()
2316 .expect("Could not acquire a clean event cache lock")
2317 .handle_linked_chunk_updates(
2318 LinkedChunkId::Room(room_id),
2319 vec![
2320 Update::NewItemsChunk {
2321 previous: None,
2322 new: ChunkIdentifier::new(0),
2323 next: None,
2324 },
2325 Update::PushItems {
2326 at: Position::new(ChunkIdentifier::new(0), 0),
2327 items: vec![
2328 event_factory
2330 .text_msg("hello")
2331 .sender(user_id!("@alice:example.org"))
2332 .event_id(event_id_0)
2333 .into(),
2334 event_factory
2336 .text_msg("* goodbye")
2337 .event_id(event_id_1)
2338 .sender(user_id!("@malory:example.org"))
2339 .edit(
2340 event_id_0,
2341 RoomMessageEventContent::text_plain("goodbye").into(),
2342 )
2343 .into(),
2344 ],
2345 },
2346 ],
2347 )
2348 .await
2349 .unwrap();
2350 }
2351
2352 let event_cache = client.event_cache();
2353 event_cache.subscribe().unwrap();
2354
2355 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2356
2357 assert_remote_value_matches_room_message_with_body!(
2358 Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None).await => with body = "hello"
2360 );
2361 }
2362
2363 #[async_test]
2364 async fn test_remote_double_edit() {
2365 let room_id = room_id!("!r0");
2366 let user_id = user_id!("@mnt_io:matrix.org");
2367 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2368 let event_id_0 = event_id!("$ev0");
2369 let event_id_1 = event_id!("$ev1");
2370 let event_id_2 = event_id!("$ev2");
2371
2372 let server = MatrixMockServer::new().await;
2373 let client = server.client_builder().build().await;
2374
2375 {
2377 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2379
2380 client
2382 .event_cache_store()
2383 .lock()
2384 .await
2385 .expect("Could not acquire the event cache lock")
2386 .as_clean()
2387 .expect("Could not acquire a clean event cache lock")
2388 .handle_linked_chunk_updates(
2389 LinkedChunkId::Room(room_id),
2390 vec![
2391 Update::NewItemsChunk {
2392 previous: None,
2393 new: ChunkIdentifier::new(0),
2394 next: None,
2395 },
2396 Update::PushItems {
2397 at: Position::new(ChunkIdentifier::new(0), 0),
2398 items: vec![
2399 event_factory.text_msg("hello").event_id(event_id_0).into(),
2401 event_factory
2403 .text_msg("* goodbye")
2404 .event_id(event_id_1)
2405 .edit(
2406 event_id_0,
2407 RoomMessageEventContent::text_plain("goodbye").into(),
2408 )
2409 .into(),
2410 event_factory
2412 .text_msg("* err, hello")
2413 .event_id(event_id_2)
2414 .edit(
2415 event_id_0,
2416 RoomMessageEventContent::text_plain("err, hello").into(),
2417 )
2418 .into(),
2419 ],
2420 },
2421 ],
2422 )
2423 .await
2424 .unwrap();
2425 }
2426
2427 let event_cache = client.event_cache();
2428 event_cache.subscribe().unwrap();
2429
2430 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2431
2432 assert_remote_value_matches_room_message_with_body!(
2433 Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None).await => with body = "* err, hello"
2437 );
2438 }
2439
2440 #[async_test]
2441 async fn test_remote_latest_edit_targets_older_event() {
2442 let room_id = room_id!("!r0");
2443 let user_id = user_id!("@mnt_io:matrix.org");
2444 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2445 let event_id_0 = event_id!("$ev0");
2446 let event_id_1 = event_id!("$ev1");
2447 let event_id_2 = event_id!("$ev2");
2448 let event_id_3 = event_id!("$ev3");
2449 let event_id_4 = event_id!("$ev4");
2450
2451 let server = MatrixMockServer::new().await;
2452 let client = server.client_builder().build().await;
2453
2454 {
2456 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2458
2459 client
2461 .event_cache_store()
2462 .lock()
2463 .await
2464 .expect("Could not acquire the event cache lock")
2465 .as_clean()
2466 .expect("Could not acquire a clean event cache lock")
2467 .handle_linked_chunk_updates(
2468 LinkedChunkId::Room(room_id),
2469 vec![
2470 Update::NewItemsChunk {
2471 previous: None,
2472 new: ChunkIdentifier::new(0),
2473 next: None,
2474 },
2475 Update::PushItems {
2476 at: Position::new(ChunkIdentifier::new(0), 0),
2477 items: vec![
2478 event_factory.text_msg("A").event_id(event_id_0).into(),
2480 event_factory.text_msg("B").event_id(event_id_1).into(),
2482 event_factory.text_msg("C").event_id(event_id_2).into(),
2484 event_factory
2486 .text_msg("* D")
2487 .event_id(event_id_3)
2488 .edit(
2489 event_id_2,
2490 RoomMessageEventContent::text_plain("D").into(),
2491 )
2492 .into(),
2493 event_factory
2495 .text_msg("* X")
2496 .event_id(event_id_4)
2497 .edit(
2498 event_id_0,
2499 RoomMessageEventContent::text_plain("X").into(),
2500 )
2501 .into(),
2502 ],
2503 },
2504 ],
2505 )
2506 .await
2507 .unwrap();
2508 }
2509
2510 let event_cache = client.event_cache();
2511 event_cache.subscribe().unwrap();
2512
2513 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2514
2515 assert_remote_value_matches_room_message_with_body!(
2516 Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None).await => with body = "* D"
2518 );
2519 }
2520
2521 #[async_test]
2522 async fn test_remote_latest_edit_preceded_by_unrelated_event() {
2523 let room_id = room_id!("!r0");
2524 let user_id = user_id!("@mnt_io:matrix.org");
2525 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2526 let event_id_0 = event_id!("$ev0");
2527 let event_id_1 = event_id!("$ev1");
2528 let event_id_2 = event_id!("$ev2");
2529
2530 let server = MatrixMockServer::new().await;
2531 let client = server.client_builder().build().await;
2532
2533 {
2535 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2537
2538 client
2540 .event_cache_store()
2541 .lock()
2542 .await
2543 .expect("Could not acquire the event cache lock")
2544 .as_clean()
2545 .expect("Could not acquire a clean event cache lock")
2546 .handle_linked_chunk_updates(
2547 LinkedChunkId::Room(room_id),
2548 vec![
2549 Update::NewItemsChunk {
2550 previous: None,
2551 new: ChunkIdentifier::new(0),
2552 next: None,
2553 },
2554 Update::PushItems {
2555 at: Position::new(ChunkIdentifier::new(0), 0),
2556 items: vec![
2557 event_factory.text_msg("A").event_id(event_id_0).into(),
2559 event_factory
2561 .text_msg("* B")
2562 .event_id(event_id_1)
2563 .edit(
2564 event_id_2,
2565 RoomMessageEventContent::text_plain("B").into(),
2566 )
2567 .into(),
2568 ],
2569 },
2570 ],
2571 )
2572 .await
2573 .unwrap();
2574 }
2575
2576 let event_cache = client.event_cache();
2577 event_cache.subscribe().unwrap();
2578
2579 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2580
2581 assert_remote_value_matches_room_message_with_body!(
2582 Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None).await => with body = "A"
2584 );
2585 }
2586
2587 #[async_test]
2588 async fn test_remote_latest_edit_without_previous_events() {
2589 let room_id = room_id!("!r0");
2590 let user_id = user_id!("@mnt_io:matrix.org");
2591 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2592 let event_id_0 = event_id!("$ev0");
2593 let event_id_1 = event_id!("$ev1");
2594
2595 let server = MatrixMockServer::new().await;
2596 let client = server.client_builder().build().await;
2597
2598 {
2600 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2602
2603 client
2605 .event_cache_store()
2606 .lock()
2607 .await
2608 .expect("Could not acquire the event cache lock")
2609 .as_clean()
2610 .expect("Could not acquire a clean event cache lock")
2611 .handle_linked_chunk_updates(
2612 LinkedChunkId::Room(room_id),
2613 vec![
2614 Update::NewItemsChunk {
2615 previous: None,
2616 new: ChunkIdentifier::new(0),
2617 next: None,
2618 },
2619 Update::PushItems {
2620 at: Position::new(ChunkIdentifier::new(0), 0),
2621 items: vec![
2622 event_factory
2624 .text_msg("* B")
2625 .event_id(event_id_0)
2626 .edit(
2627 event_id_1,
2628 RoomMessageEventContent::text_plain("B").into(),
2629 )
2630 .into(),
2631 ],
2632 },
2633 ],
2634 )
2635 .await
2636 .unwrap();
2637 }
2638
2639 let event_cache = client.event_cache();
2640 event_cache.subscribe().unwrap();
2641
2642 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2643
2644 assert!(
2646 Builder::new_remote(&room_event_cache, LatestEventValue::None, user_id, None)
2647 .await
2648 .is_none()
2649 );
2650 }
2651
2652 async fn local_prelude() -> (Client, OwnedRoomId, RoomSendQueue, RoomEventCache) {
2653 let room_id = owned_room_id!("!r0");
2654
2655 let server = MatrixMockServer::new().await;
2656 let client = server.client_builder().build().await;
2657 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
2658 let room = client.get_room(&room_id).unwrap();
2659 let user_id = client.user_id().unwrap();
2660 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
2661
2662 client
2664 .event_cache_store()
2665 .lock()
2666 .await
2667 .expect("Could not acquire the event cache lock")
2668 .as_clean()
2669 .expect("Could not acquire a clean event cache lock")
2670 .handle_linked_chunk_updates(
2671 LinkedChunkId::Room(&room_id),
2672 vec![
2673 Update::NewItemsChunk {
2674 previous: None,
2675 new: ChunkIdentifier::new(0),
2676 next: None,
2677 },
2678 Update::PushItems {
2679 at: Position::new(ChunkIdentifier::new(0), 0),
2680 items: vec![event_factory.room_topic("new room topic").into()],
2681 },
2682 ],
2683 )
2684 .await
2685 .unwrap();
2686
2687 let event_cache = client.event_cache();
2688 event_cache.subscribe().unwrap();
2689
2690 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
2691
2692 let send_queue = client.send_queue();
2693 let room_send_queue = send_queue.for_room(room);
2694
2695 (client, room_id, room_send_queue, room_event_cache)
2696 }
2697
2698 fn new_local_echo_content(
2699 room_send_queue: &RoomSendQueue,
2700 transaction_id: &OwnedTransactionId,
2701 body: &str,
2702 ) -> LocalEchoContent {
2703 LocalEchoContent::Event {
2704 serialized_event: SerializableEventContent::new(
2705 &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
2706 )
2707 .unwrap(),
2708 send_handle: SendHandle::new(
2709 room_send_queue.clone(),
2710 transaction_id.clone(),
2711 MilliSecondsSinceUnixEpoch::now(),
2712 ),
2713 send_error: None,
2714 }
2715 }
2716
2717 #[async_test]
2718 async fn test_local_new_local_event_with_content_of_kind_event() {
2719 let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2720 let user_id = client.user_id().unwrap();
2721
2722 let mut buffer = BufferOfValuesForLocalEvents::new();
2723
2724 let previous_value = {
2726 let transaction_id = OwnedTransactionId::from("txnid0");
2727 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2728
2729 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
2730
2731 assert_local_value_matches_room_message_with_body!(
2733 Builder::new_local(&update, &mut buffer, &room_event_cache, LatestEventValue::None, user_id, None).await,
2734 LatestEventValue::LocalIsSending => with body = "A"
2735 )
2736 };
2737
2738 {
2740 let transaction_id = OwnedTransactionId::from("txnid1");
2741 let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
2742
2743 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
2744
2745 assert_local_value_matches_room_message_with_body!(
2747 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
2748 LatestEventValue::LocalIsSending => with body = "B"
2749 );
2750 }
2751
2752 assert_eq!(buffer.buffer.len(), 2);
2753 }
2754
2755 #[async_test]
2756 async fn test_local_new_local_event_with_content_of_kind_event_when_previous_local_event_cannot_be_sent()
2757 {
2758 let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2759 let user_id = client.user_id().unwrap();
2760
2761 let mut buffer = BufferOfValuesForLocalEvents::new();
2762
2763 let (transaction_id_0, previous_value) = {
2765 let transaction_id = OwnedTransactionId::from("txnid0");
2766 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2767
2768 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2769 transaction_id: transaction_id.clone(),
2770 content,
2771 });
2772
2773 let value = assert_local_value_matches_room_message_with_body!(
2775 Builder::new_local(&update, &mut buffer, &room_event_cache, LatestEventValue::None, user_id, None).await,
2776 LatestEventValue::LocalIsSending => with body = "A"
2777 );
2778
2779 (transaction_id, value)
2780 };
2781
2782 let previous_value = {
2786 let update = RoomSendQueueUpdate::SendError {
2787 transaction_id: transaction_id_0.clone(),
2788 error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2789 is_recoverable: false,
2790 };
2791
2792 let value = assert_local_value_matches_room_message_with_body!(
2795 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
2796 LatestEventValue::LocalCannotBeSent => with body = "A"
2797 );
2798
2799 assert_eq!(buffer.buffer.len(), 1);
2800 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2801
2802 value
2803 };
2804
2805 {
2809 let transaction_id = OwnedTransactionId::from("txnid1");
2810 let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
2811
2812 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
2813
2814 assert_local_value_matches_room_message_with_body!(
2816 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
2817 LatestEventValue::LocalCannotBeSent => with body = "B"
2818 );
2819 }
2820
2821 assert_eq!(buffer.buffer.len(), 2);
2822 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2823 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2824 }
2825
2826 #[async_test]
2827 async fn test_local_new_local_event_with_content_of_kind_react() {
2828 let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2829 let user_id = client.user_id().unwrap();
2830
2831 let mut buffer = BufferOfValuesForLocalEvents::new();
2832
2833 let (transaction_id_0, previous_value) = {
2835 let transaction_id = OwnedTransactionId::from("txnid0");
2836 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2837
2838 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2839 transaction_id: transaction_id.clone(),
2840 content,
2841 });
2842
2843 let value = assert_local_value_matches_room_message_with_body!(
2845 Builder::new_local(&update, &mut buffer, &room_event_cache, LatestEventValue::None, user_id, None).await,
2846 LatestEventValue::LocalIsSending => with body = "A"
2847 );
2848
2849 (transaction_id, value)
2850 };
2851
2852 {
2855 let transaction_id = OwnedTransactionId::from("txnid1");
2856 let content = LocalEchoContent::React {
2857 key: "<< 1".to_owned(),
2858 send_handle: SendReactionHandle::new(
2859 room_send_queue.clone(),
2860 ChildTransactionId::new(),
2861 ),
2862 applies_to: transaction_id_0,
2863 };
2864
2865 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
2866
2867 assert_matches!(
2869 Builder::new_local(
2870 &update,
2871 &mut buffer,
2872 &room_event_cache,
2873 previous_value,
2874 user_id,
2875 None
2876 )
2877 .await,
2878 None
2879 )
2880 }
2881
2882 assert_eq!(buffer.buffer.len(), 1);
2883 }
2884
2885 #[async_test]
2886 async fn test_local_cancelled_local_event() {
2887 let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2888 let user_id = client.user_id().unwrap();
2889
2890 let mut buffer = BufferOfValuesForLocalEvents::new();
2891 let transaction_id_0 = OwnedTransactionId::from("txnid0");
2892 let transaction_id_1 = OwnedTransactionId::from("txnid1");
2893 let transaction_id_2 = OwnedTransactionId::from("txnid2");
2894
2895 let previous_value = {
2897 let mut value = None;
2898
2899 for (transaction_id, body) in
2900 [(&transaction_id_0, "A"), (&transaction_id_1, "B"), (&transaction_id_2, "C")]
2901 {
2902 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2903
2904 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2905 transaction_id: transaction_id.clone(),
2906 content,
2907 });
2908
2909 value = Some(assert_local_value_matches_room_message_with_body!(
2911 Builder::new_local(&update, &mut buffer, &room_event_cache, value.unwrap_or_default(), user_id, None).await,
2912 LatestEventValue::LocalIsSending => with body = body
2913 ));
2914 }
2915
2916 assert_eq!(buffer.buffer.len(), 3);
2917
2918 value.unwrap()
2919 };
2920
2921 let previous_value = {
2924 let update = RoomSendQueueUpdate::CancelledLocalEvent {
2925 transaction_id: transaction_id_1.clone(),
2926 };
2927
2928 let value = assert_local_value_matches_room_message_with_body!(
2931 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
2932 LatestEventValue::LocalIsSending => with body = "C"
2933 );
2934
2935 assert_eq!(buffer.buffer.len(), 2);
2936
2937 value
2938 };
2939
2940 let previous_value = {
2943 let update = RoomSendQueueUpdate::CancelledLocalEvent {
2944 transaction_id: transaction_id_2.clone(),
2945 };
2946
2947 let value = assert_local_value_matches_room_message_with_body!(
2950 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
2951 LatestEventValue::LocalIsSending => with body = "A"
2952 );
2953
2954 assert_eq!(buffer.buffer.len(), 1);
2955
2956 value
2957 };
2958
2959 {
2964 let update =
2965 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: transaction_id_0 };
2966
2967 assert_matches!(
2969 Builder::new_local(
2970 &update,
2971 &mut buffer,
2972 &room_event_cache,
2973 previous_value,
2974 user_id,
2975 None
2976 )
2977 .await,
2978 Some(LatestEventValue::None)
2979 );
2980
2981 assert!(buffer.buffer.is_empty());
2982 }
2983 }
2984
2985 #[async_test]
2986 async fn test_local_sent_event() {
2987 let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2988 let user_id = client.user_id().unwrap();
2989
2990 let mut buffer = BufferOfValuesForLocalEvents::new();
2991 let transaction_id_0 = OwnedTransactionId::from("txnid0");
2992 let transaction_id_1 = OwnedTransactionId::from("txnid1");
2993
2994 let previous_value = {
2996 let mut value = None;
2997
2998 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2999 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
3000
3001 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3002 transaction_id: transaction_id.clone(),
3003 content,
3004 });
3005
3006 value = Some(assert_local_value_matches_room_message_with_body!(
3008 Builder::new_local(&update, &mut buffer, &room_event_cache, value.unwrap_or_default(), user_id, None).await,
3009 LatestEventValue::LocalIsSending => with body = body
3010 ));
3011 }
3012
3013 assert_eq!(buffer.buffer.len(), 2);
3014
3015 value.unwrap()
3016 };
3017
3018 let previous_value = {
3021 let update = RoomSendQueueUpdate::SentEvent {
3022 transaction_id: transaction_id_0.clone(),
3023 event_id: owned_event_id!("$ev0"),
3024 };
3025
3026 let value = assert_local_value_matches_room_message_with_body!(
3029 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3030 LatestEventValue::LocalIsSending => with body = "B"
3031 );
3032
3033 assert_eq!(buffer.buffer.len(), 1);
3034
3035 value
3036 };
3037
3038 {
3042 let expected_event_id = event_id!("$ev1");
3043 let update = RoomSendQueueUpdate::SentEvent {
3044 transaction_id: transaction_id_1,
3045 event_id: expected_event_id.to_owned(),
3046 };
3047
3048 assert_local_value_matches_room_message_with_body!(
3050 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3051 LatestEventValue::LocalHasBeenSent {
3052 value with body = "B",
3053 event_id => {
3054 assert_eq!(event_id, expected_event_id);
3055 }
3056 }
3057 );
3058
3059 assert!(buffer.buffer.is_empty());
3060 }
3061 }
3062
3063 #[async_test]
3064 async fn test_local_replaced_local_event() {
3065 let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3066 let user_id = client.user_id().unwrap();
3067
3068 let mut buffer = BufferOfValuesForLocalEvents::new();
3069 let transaction_id_0 = OwnedTransactionId::from("txnid0");
3070 let transaction_id_1 = OwnedTransactionId::from("txnid1");
3071
3072 let previous_value = {
3074 let mut value = None;
3075
3076 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
3077 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
3078
3079 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3080 transaction_id: transaction_id.clone(),
3081 content,
3082 });
3083
3084 value = Some(assert_local_value_matches_room_message_with_body!(
3086 Builder::new_local(&update, &mut buffer, &room_event_cache, value.unwrap_or_default(), user_id, None).await,
3087 LatestEventValue::LocalIsSending => with body = body
3088 ));
3089 }
3090
3091 assert_eq!(buffer.buffer.len(), 2);
3092
3093 value.unwrap()
3094 };
3095
3096 let previous_value = {
3099 let transaction_id = &transaction_id_0;
3100 let LocalEchoContent::Event { serialized_event: new_content, .. } =
3101 new_local_echo_content(&room_send_queue, transaction_id, "A.")
3102 else {
3103 panic!("oopsy");
3104 };
3105
3106 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
3107 transaction_id: transaction_id.clone(),
3108 new_content,
3109 };
3110
3111 let value = assert_local_value_matches_room_message_with_body!(
3114 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3115 LatestEventValue::LocalIsSending => with body = "B"
3116 );
3117
3118 assert_eq!(buffer.buffer.len(), 2);
3119
3120 value
3121 };
3122
3123 {
3126 let transaction_id = &transaction_id_1;
3127 let LocalEchoContent::Event { serialized_event: new_content, .. } =
3128 new_local_echo_content(&room_send_queue, transaction_id, "B.")
3129 else {
3130 panic!("oopsy");
3131 };
3132
3133 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
3134 transaction_id: transaction_id.clone(),
3135 new_content,
3136 };
3137
3138 assert_local_value_matches_room_message_with_body!(
3141 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3142 LatestEventValue::LocalIsSending => with body = "B."
3143 );
3144
3145 assert_eq!(buffer.buffer.len(), 2);
3146 }
3147 }
3148
3149 #[async_test]
3150 async fn test_local_replaced_local_event_by_a_non_suitable_event() {
3151 let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3152 let user_id = client.user_id().unwrap();
3153
3154 let mut buffer = BufferOfValuesForLocalEvents::new();
3155 let transaction_id = OwnedTransactionId::from("txnid0");
3156
3157 let previous_value = {
3159 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
3160
3161 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3162 transaction_id: transaction_id.clone(),
3163 content,
3164 });
3165
3166 let value = assert_local_value_matches_room_message_with_body!(
3168 Builder::new_local(&update, &mut buffer, &room_event_cache, LatestEventValue::None, user_id, None).await,
3169 LatestEventValue::LocalIsSending => with body = "A"
3170 );
3171
3172 assert_eq!(buffer.buffer.len(), 1);
3173
3174 value
3175 };
3176
3177 {
3182 let new_content = SerializableEventContent::new(&AnyMessageLikeEventContent::Reaction(
3183 ReactionEventContent::new(Annotation::new(
3184 owned_event_id!("$ev0"),
3185 "+1".to_owned(),
3186 )),
3187 ))
3188 .unwrap();
3189
3190 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
3191 transaction_id: transaction_id.clone(),
3192 new_content,
3193 };
3194
3195 assert_matches!(
3197 Builder::new_local(
3198 &update,
3199 &mut buffer,
3200 &room_event_cache,
3201 previous_value,
3202 user_id,
3203 None
3204 )
3205 .await,
3206 None
3207 );
3208
3209 assert_eq!(buffer.buffer.len(), 0);
3210 }
3211 }
3212
3213 #[async_test]
3214 async fn test_local_replaced_local_event_twice() {
3215 let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3216 let user_id = client.user_id().unwrap();
3217
3218 let mut buffer = BufferOfValuesForLocalEvents::new();
3219 let transaction_id = OwnedTransactionId::from("txnid0");
3220
3221 let previous_value = {
3223 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
3224
3225 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3226 transaction_id: transaction_id.clone(),
3227 content,
3228 });
3229
3230 let value = assert_local_value_matches_room_message_with_body!(
3232 Builder::new_local(&update, &mut buffer, &room_event_cache, LatestEventValue::None, user_id, None).await,
3233 LatestEventValue::LocalIsSending => with body = "A"
3234 );
3235
3236 assert_eq!(buffer.buffer.len(), 1);
3237
3238 value
3239 };
3240
3241 {
3244 let LocalEchoContent::Event { serialized_event: new_content, .. } =
3245 new_local_echo_content(&room_send_queue, &transaction_id, "B")
3246 else {
3247 panic!("oopsy");
3248 };
3249
3250 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
3251 transaction_id: transaction_id.clone(),
3252 new_content,
3253 };
3254
3255 assert_local_value_matches_room_message_with_body!(
3258 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value.clone(), user_id, None).await,
3259 LatestEventValue::LocalIsSending => with body = "B"
3260 );
3261
3262 assert_eq!(buffer.buffer.len(), 1);
3263 }
3264
3265 {
3268 let LocalEchoContent::Event { serialized_event: new_content, .. } =
3269 new_local_echo_content(&room_send_queue, &transaction_id, "C")
3270 else {
3271 panic!("oopsy");
3272 };
3273
3274 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
3275 transaction_id: transaction_id.clone(),
3276 new_content,
3277 };
3278
3279 assert_local_value_matches_room_message_with_body!(
3282 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3283 LatestEventValue::LocalIsSending => with body = "C"
3284 );
3285
3286 assert_eq!(buffer.buffer.len(), 1);
3287 }
3288 }
3289
3290 #[async_test]
3291 async fn test_local_send_unrecoverable_error() {
3292 let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3293 let user_id = client.user_id().unwrap();
3294
3295 let mut buffer = BufferOfValuesForLocalEvents::new();
3296 let transaction_id_0 = OwnedTransactionId::from("txnid0");
3297 let transaction_id_1 = OwnedTransactionId::from("txnid1");
3298
3299 let previous_value = {
3301 let mut value = None;
3302
3303 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
3304 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
3305
3306 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3307 transaction_id: transaction_id.clone(),
3308 content,
3309 });
3310
3311 value = Some(assert_local_value_matches_room_message_with_body!(
3313 Builder::new_local(&update, &mut buffer, &room_event_cache, value.unwrap_or_default(), user_id, None).await,
3314 LatestEventValue::LocalIsSending => with body = body
3315 ));
3316 }
3317
3318 assert_eq!(buffer.buffer.len(), 2);
3319
3320 value.unwrap()
3321 };
3322
3323 let previous_value = {
3327 let update = RoomSendQueueUpdate::SendError {
3328 transaction_id: transaction_id_0.clone(),
3329 error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
3330 is_recoverable: false,
3331 };
3332
3333 let value = assert_local_value_matches_room_message_with_body!(
3336 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3337 LatestEventValue::LocalCannotBeSent => with body = "B"
3338 );
3339
3340 assert_eq!(buffer.buffer.len(), 2);
3341 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
3342 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
3343
3344 value
3345 };
3346
3347 {
3351 let update = RoomSendQueueUpdate::SentEvent {
3352 transaction_id: transaction_id_0.clone(),
3353 event_id: owned_event_id!("$ev0"),
3354 };
3355
3356 assert_local_value_matches_room_message_with_body!(
3359 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3360 LatestEventValue::LocalIsSending => with body = "B"
3361 );
3362
3363 assert_eq!(buffer.buffer.len(), 1);
3364 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
3365 }
3366 }
3367
3368 #[async_test]
3369 async fn test_local_send_recoverable_error() {
3370 let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3371 let user_id = client.user_id().unwrap();
3372
3373 let mut buffer = BufferOfValuesForLocalEvents::new();
3374 let transaction_id_0 = OwnedTransactionId::from("txnid0");
3375 let transaction_id_1 = OwnedTransactionId::from("txnid1");
3376
3377 let previous_value = {
3379 let mut value = None;
3380
3381 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
3382 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
3383
3384 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3385 transaction_id: transaction_id.clone(),
3386 content,
3387 });
3388
3389 value = Some(assert_local_value_matches_room_message_with_body!(
3391 Builder::new_local(&update, &mut buffer, &room_event_cache, value.unwrap_or_default(), user_id, None).await,
3392 LatestEventValue::LocalIsSending => with body = body
3393 ));
3394 }
3395
3396 assert_eq!(buffer.buffer.len(), 2);
3397
3398 value.unwrap()
3399 };
3400
3401 let previous_value = {
3405 let update = RoomSendQueueUpdate::SendError {
3406 transaction_id: transaction_id_0.clone(),
3407 error: Arc::new(Error::UnknownError("no more network".to_owned().into())),
3408 is_recoverable: true,
3409 };
3410
3411 let value = assert_local_value_matches_room_message_with_body!(
3414 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3415 LatestEventValue::LocalIsSending => with body = "B"
3416 );
3417
3418 assert_eq!(buffer.buffer.len(), 2);
3419 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
3420 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
3421
3422 value
3423 };
3424
3425 {
3429 let update = RoomSendQueueUpdate::SentEvent {
3430 transaction_id: transaction_id_0.clone(),
3431 event_id: owned_event_id!("$ev0"),
3432 };
3433
3434 assert_local_value_matches_room_message_with_body!(
3437 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3438 LatestEventValue::LocalIsSending => with body = "B"
3439 );
3440
3441 assert_eq!(buffer.buffer.len(), 1);
3442 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
3443 }
3444 }
3445
3446 #[async_test]
3447 async fn test_local_retry_event() {
3448 let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3449 let user_id = client.user_id().unwrap();
3450
3451 let mut buffer = BufferOfValuesForLocalEvents::new();
3452 let transaction_id_0 = OwnedTransactionId::from("txnid0");
3453 let transaction_id_1 = OwnedTransactionId::from("txnid1");
3454
3455 let previous_value = {
3457 let mut value = None;
3458
3459 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
3460 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
3461
3462 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3463 transaction_id: transaction_id.clone(),
3464 content,
3465 });
3466
3467 value = Some(assert_local_value_matches_room_message_with_body!(
3469 Builder::new_local(&update, &mut buffer, &room_event_cache, value.unwrap_or_default(), user_id, None).await,
3470 LatestEventValue::LocalIsSending => with body = body
3471 ));
3472 }
3473
3474 assert_eq!(buffer.buffer.len(), 2);
3475
3476 value.unwrap()
3477 };
3478
3479 let previous_value = {
3483 let update = RoomSendQueueUpdate::SendError {
3484 transaction_id: transaction_id_0.clone(),
3485 error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
3486 is_recoverable: false,
3487 };
3488
3489 let value = assert_local_value_matches_room_message_with_body!(
3492 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3493 LatestEventValue::LocalCannotBeSent => with body = "B"
3494 );
3495
3496 assert_eq!(buffer.buffer.len(), 2);
3497 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
3498 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
3499
3500 value
3501 };
3502
3503 {
3506 let update =
3507 RoomSendQueueUpdate::RetryEvent { transaction_id: transaction_id_0.clone() };
3508
3509 assert_local_value_matches_room_message_with_body!(
3512 Builder::new_local(&update, &mut buffer, &room_event_cache, previous_value, user_id, None).await,
3513 LatestEventValue::LocalIsSending => with body = "B"
3514 );
3515
3516 assert_eq!(buffer.buffer.len(), 2);
3517 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
3518 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
3519 }
3520 }
3521
3522 #[async_test]
3523 async fn test_local_media_upload() {
3524 let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
3525 let user_id = client.user_id().unwrap();
3526
3527 let mut buffer = BufferOfValuesForLocalEvents::new();
3528 let transaction_id = OwnedTransactionId::from("txnid");
3529
3530 let previous_value = {
3532 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
3533
3534 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3535 transaction_id: transaction_id.clone(),
3536 content,
3537 });
3538
3539 let value = assert_local_value_matches_room_message_with_body!(
3541 Builder::new_local(&update, &mut buffer, &room_event_cache, LatestEventValue::None, user_id, None).await,
3542 LatestEventValue::LocalIsSending => with body = "A"
3543 );
3544
3545 assert_eq!(buffer.buffer.len(), 1);
3546
3547 value
3548 };
3549
3550 {
3553 let update = RoomSendQueueUpdate::MediaUpload {
3554 related_to: transaction_id,
3555 file: None,
3556 index: 0,
3557 progress: AbstractProgress { current: 0, total: 0 },
3558 };
3559
3560 assert_matches!(
3563 Builder::new_local(
3564 &update,
3565 &mut buffer,
3566 &room_event_cache,
3567 previous_value,
3568 user_id,
3569 None
3570 )
3571 .await,
3572 None
3573 );
3574
3575 assert_eq!(buffer.buffer.len(), 1);
3576 }
3577 }
3578
3579 #[async_test]
3580 async fn test_local_fallbacks_to_remote_when_empty() {
3581 let room_id = room_id!("!r0");
3582 let user_id = user_id!("@mnt_io:matrix.org");
3583 let event_factory = EventFactory::new().sender(user_id).room(room_id);
3584 let event_id_0 = event_id!("$ev0");
3585 let event_id_1 = event_id!("$ev1");
3586
3587 let server = MatrixMockServer::new().await;
3588 let client = server.client_builder().build().await;
3589
3590 {
3592 client.base_client().get_or_create_room(room_id, RoomState::Joined);
3594
3595 client
3597 .event_cache_store()
3598 .lock()
3599 .await
3600 .expect("Could not acquire the event cache lock")
3601 .as_clean()
3602 .expect("Could not acquire a clean event cache lock")
3603 .handle_linked_chunk_updates(
3604 LinkedChunkId::Room(room_id),
3605 vec![
3606 Update::NewItemsChunk {
3607 previous: None,
3608 new: ChunkIdentifier::new(0),
3609 next: None,
3610 },
3611 Update::PushItems {
3612 at: Position::new(ChunkIdentifier::new(0), 0),
3613 items: vec![
3614 event_factory.text_msg("hello").event_id(event_id_0).into(),
3615 ],
3616 },
3617 ],
3618 )
3619 .await
3620 .unwrap();
3621 }
3622
3623 let event_cache = client.event_cache();
3624 event_cache.subscribe().unwrap();
3625
3626 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
3627
3628 let mut buffer = BufferOfValuesForLocalEvents::new();
3629
3630 assert_remote_value_matches_room_message_with_body!(
3632 Builder::new_local(
3633 &RoomSendQueueUpdate::SentEvent {
3636 transaction_id: OwnedTransactionId::from("txnid"),
3637 event_id: event_id_1.to_owned(),
3638 },
3639 &mut buffer,
3640 &room_event_cache,
3641 LatestEventValue::None,
3642 user_id,
3643 None,
3644 )
3645 .await
3646 => with body = "hello"
3647 );
3648 }
3649
3650 #[async_test]
3651 async fn test_local_sent_event_resolves_to_remote_when_remote_echo_is_processed_first() {
3652 let room_id = room_id!("!r0");
3653 let user_id = user_id!("@mnt_io:matrix.org");
3654 let event_factory = EventFactory::new().sender(user_id).room(room_id);
3655 let sent_event_id = event_id!("$ev0");
3656
3657 let server = MatrixMockServer::new().await;
3658 let client = server.client_builder().build().await;
3659
3660 {
3662 client.base_client().get_or_create_room(room_id, RoomState::Joined);
3664
3665 client
3667 .event_cache_store()
3668 .lock()
3669 .await
3670 .expect("Could not acquire the event cache lock")
3671 .as_clean()
3672 .expect("Could not acquire a clean event cache lock")
3673 .handle_linked_chunk_updates(
3674 LinkedChunkId::Room(room_id),
3675 vec![
3676 Update::NewItemsChunk {
3677 previous: None,
3678 new: ChunkIdentifier::new(0),
3679 next: None,
3680 },
3681 Update::PushItems {
3682 at: Position::new(ChunkIdentifier::new(0), 0),
3683 items: vec![
3684 event_factory.text_msg("hello").event_id(sent_event_id).into(),
3686 ],
3687 },
3688 ],
3689 )
3690 .await
3691 .unwrap();
3692 }
3693
3694 let event_cache = client.event_cache();
3695 event_cache.subscribe().unwrap();
3696
3697 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
3698 let send_queue = client.send_queue();
3699 let room = client.get_room(room_id).unwrap();
3700 let room_send_queue = send_queue.for_room(room);
3701
3702 let mut buffer = BufferOfValuesForLocalEvents::new();
3703 let transaction_id = OwnedTransactionId::from("txnid0");
3704
3705 let previous_value = {
3707 let content = new_local_echo_content(&room_send_queue, &transaction_id, "hello");
3708 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3709 transaction_id: transaction_id.clone(),
3710 content,
3711 });
3712
3713 assert_local_value_matches_room_message_with_body!(
3714 Builder::new_local(
3715 &update,
3716 &mut buffer,
3717 &room_event_cache,
3718 LatestEventValue::None,
3719 user_id,
3720 None,
3721 )
3722 .await,
3723 LatestEventValue::LocalIsSending => with body = "hello"
3724 )
3725 };
3726 assert_eq!(buffer.buffer.len(), 1);
3727
3728 assert_remote_value_matches_room_message_with_body!(
3732 Builder::new_local(
3733 &RoomSendQueueUpdate::SentEvent {
3734 transaction_id,
3735 event_id: sent_event_id.to_owned(),
3736 },
3737 &mut buffer,
3738 &room_event_cache,
3739 previous_value,
3740 user_id,
3741 None,
3742 )
3743 .await
3744 => with body = "hello"
3745 );
3746 assert!(buffer.buffer.is_empty());
3747 }
3748
3749 #[async_test]
3750 async fn test_local_redaction() {
3751 let room_id = room_id!("!r0");
3752 let user_id = user_id!("@mnt_io:matrix.org");
3753 let event_factory = EventFactory::new().sender(user_id).room(room_id);
3754 let event_id = event_id!("$ev0");
3755
3756 let server = MatrixMockServer::new().await;
3757 let client = server.client_builder().build().await;
3758
3759 {
3761 client.base_client().get_or_create_room(room_id, RoomState::Joined);
3763
3764 client
3766 .event_cache_store()
3767 .lock()
3768 .await
3769 .expect("Could not acquire the event cache lock")
3770 .as_clean()
3771 .expect("Could not acquire a clean event cache lock")
3772 .handle_linked_chunk_updates(
3773 LinkedChunkId::Room(room_id),
3774 vec![
3775 Update::NewItemsChunk {
3776 previous: None,
3777 new: ChunkIdentifier::new(0),
3778 next: None,
3779 },
3780 Update::PushItems {
3781 at: Position::new(ChunkIdentifier::new(0), 0),
3782 items: vec![event_factory.text_msg("hello").event_id(event_id).into()],
3783 },
3784 ],
3785 )
3786 .await
3787 .unwrap();
3788 }
3789
3790 let event_cache = client.event_cache();
3791 event_cache.subscribe().unwrap();
3792
3793 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
3794 let send_queue = client.send_queue();
3795 let room = client.get_room(room_id).unwrap();
3796 let room_send_queue = send_queue.for_room(room);
3797
3798 let mut buffer = BufferOfValuesForLocalEvents::new();
3799
3800 assert_remote_value_matches_room_message_with_body!(
3802 Builder::new_local(
3803 &RoomSendQueueUpdate::SentEvent {
3806 transaction_id: OwnedTransactionId::from("txnid"),
3807 event_id: event_id.to_owned(),
3808 },
3809 &mut buffer,
3810 &room_event_cache,
3811 LatestEventValue::None,
3812 user_id,
3813 None,
3814 )
3815 .await
3816 => with body = "hello"
3817 );
3818
3819 {
3821 let previous_value =
3822 LatestEventValue::Remote(remote_room_message(event_id!("$foo"), "Hello world"));
3823
3824 let transaction_id = OwnedTransactionId::from("txnid0");
3825 let content = LocalEchoContent::Redaction {
3826 redacts: event_id.to_owned(),
3827 reason: Some("whatever".to_owned()),
3828 send_handle: SendRedactionHandle::new(
3829 room_send_queue.clone(),
3830 transaction_id.clone(),
3831 ),
3832 send_error: None,
3833 };
3834 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
3835 transaction_id: transaction_id.clone(),
3836 content,
3837 });
3838
3839 assert_matches!(
3841 Builder::new_local(
3842 &update,
3843 &mut buffer,
3844 &room_event_cache,
3845 previous_value,
3846 user_id,
3847 None
3848 )
3849 .await,
3850 None
3851 );
3852 };
3853 assert_eq!(buffer.buffer.len(), 0);
3854 }
3855}