1use std::{
16 iter::once,
17 ops::{Deref, Not},
18};
19
20use eyeball::{AsyncLock, SharedObservable, Subscriber};
21pub use matrix_sdk_base::latest_event::{
22 LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue,
23};
24use matrix_sdk_base::{
25 RoomInfoNotableUpdateReasons, StateChanges, check_validity_of_replacement_events,
26 deserialized_responses::TimelineEvent, store::SerializableEventContent,
27};
28use ruma::{
29 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
30 events::{
31 AnyMessageLikeEventContent, AnySyncStateEvent, AnySyncTimelineEvent, SyncStateEvent,
32 relation::Replacement,
33 room::{
34 member::MembershipState,
35 message::{MessageType, Relation},
36 power_levels::RoomPowerLevels,
37 },
38 },
39};
40use tracing::{debug, error, instrument, warn};
41
42use crate::{event_cache::RoomEventCache, room::WeakRoom, send_queue::RoomSendQueueUpdate};
43
44#[derive(Debug)]
48pub(super) struct LatestEvent {
49 weak_room: WeakRoom,
51
52 _thread_id: Option<OwnedEventId>,
54
55 buffer_of_values_for_local_events: LatestEventValuesForLocalEvents,
59
60 current_value: SharedObservable<LatestEventValue, AsyncLock>,
62}
63
64impl LatestEvent {
65 pub(super) async fn new(
66 weak_room: &WeakRoom,
67 thread_id: Option<&EventId>,
68 room_event_cache: &RoomEventCache,
69 ) -> Self {
70 Self {
71 weak_room: weak_room.clone(),
72 _thread_id: thread_id.map(ToOwned::to_owned),
73 buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(),
74 current_value: SharedObservable::new_async(
75 LatestEventValueBuilder::new_remote(room_event_cache, weak_room).await,
76 ),
77 }
78 }
79
80 pub async fn subscribe(&self) -> Subscriber<LatestEventValue, AsyncLock> {
82 self.current_value.subscribe().await
83 }
84
85 pub async fn update_with_event_cache(
95 &mut self,
96 room_event_cache: &RoomEventCache,
97 own_user_id: Option<&UserId>,
98 power_levels: Option<&RoomPowerLevels>,
99 ) {
100 if self.buffer_of_values_for_local_events.is_empty().not() {
101 return;
105 }
106
107 let new_value = LatestEventValueBuilder::new_remote_with_power_levels(
108 room_event_cache,
109 own_user_id,
110 power_levels,
111 )
112 .await;
113
114 self.update(new_value).await;
115 }
116
117 pub async fn update_with_send_queue(
120 &mut self,
121 send_queue_update: &RoomSendQueueUpdate,
122 room_event_cache: &RoomEventCache,
123 own_user_id: Option<&UserId>,
124 power_levels: Option<&RoomPowerLevels>,
125 ) {
126 let new_value = LatestEventValueBuilder::new_local(
127 send_queue_update,
128 &mut self.buffer_of_values_for_local_events,
129 room_event_cache,
130 own_user_id,
131 power_levels,
132 )
133 .await;
134
135 self.update(new_value).await;
136 }
137
138 async fn update(&mut self, new_value: LatestEventValue) {
141 if let LatestEventValue::None = new_value {
142 } else {
144 self.current_value.set(new_value.clone()).await;
145 self.store(new_value).await;
146 }
147 }
148
149 #[instrument(skip_all)]
154 async fn store(&mut self, new_value: LatestEventValue) {
155 let Some(room) = self.weak_room.get() else {
156 warn!(room_id = ?self.weak_room.room_id(), "Cannot store the latest event value because the room cannot be accessed");
157 return;
158 };
159
160 let mut room_info = room.clone_info();
162 room_info.set_new_latest_event(new_value);
163
164 let mut state_changes = StateChanges::default();
165 state_changes.add_room(room_info.clone());
166
167 let client = room.client();
168
169 let _state_store_lock = client.base_client().state_store_lock().lock().await;
171
172 if let Err(error) = client.state_store().save_changes(&state_changes).await {
174 error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
175 }
176
177 room.set_room_info(room_info, RoomInfoNotableUpdateReasons::LATEST_EVENT);
179 }
180}
181
182#[cfg(all(not(target_family = "wasm"), test))]
183mod tests_latest_event {
184 use assert_matches::assert_matches;
185 use matrix_sdk_base::{
186 RoomInfoNotableUpdateReasons, RoomState,
187 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
188 store::StoreConfig,
189 };
190 use matrix_sdk_test::{async_test, event_factory::EventFactory};
191 use ruma::{
192 MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
193 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
194 room_id, user_id,
195 };
196
197 use super::{LatestEvent, LatestEventValue, LocalLatestEventValue, SerializableEventContent};
198 use crate::{
199 client::WeakClient,
200 room::WeakRoom,
201 send_queue::{LocalEcho, LocalEchoContent, RoomSendQueue, RoomSendQueueUpdate, SendHandle},
202 test_utils::mocks::MatrixMockServer,
203 };
204
205 fn local_room_message(body: &str) -> LocalLatestEventValue {
206 LocalLatestEventValue {
207 timestamp: MilliSecondsSinceUnixEpoch::now(),
208 content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
209 RoomMessageEventContent::text_plain(body),
210 ))
211 .unwrap(),
212 }
213 }
214
215 fn new_local_echo_content(
216 room_send_queue: &RoomSendQueue,
217 transaction_id: &OwnedTransactionId,
218 body: &str,
219 ) -> LocalEchoContent {
220 LocalEchoContent::Event {
221 serialized_event: SerializableEventContent::new(
222 &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
223 )
224 .unwrap(),
225 send_handle: SendHandle::new(
226 room_send_queue.clone(),
227 transaction_id.clone(),
228 MilliSecondsSinceUnixEpoch::now(),
229 ),
230 send_error: None,
231 }
232 }
233
234 #[async_test]
235 async fn test_update_ignores_none_value() {
236 let room_id = room_id!("!r0");
237
238 let server = MatrixMockServer::new().await;
239 let client = server.client_builder().build().await;
240 let weak_client = WeakClient::from_client(&client);
241
242 client.base_client().get_or_create_room(room_id, RoomState::Joined);
244 let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
245
246 let event_cache = client.event_cache();
248 event_cache.subscribe().unwrap();
249
250 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
251
252 let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
253
254 assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
256
257 latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
259
260 assert_matches!(
261 latest_event.current_value.get().await,
262 LatestEventValue::LocalIsSending(_)
263 );
264
265 latest_event.update(LatestEventValue::None).await;
267
268 assert_matches!(
269 latest_event.current_value.get().await,
270 LatestEventValue::LocalIsSending(_)
271 );
272 }
273
274 #[async_test]
275 async fn test_local_has_priority_over_remote() {
276 let room_id = room_id!("!r0").to_owned();
277 let user_id = user_id!("@mnt_io:matrix.org");
278 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
279
280 let server = MatrixMockServer::new().await;
281 let client = server.client_builder().build().await;
282 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
283 let room = client.get_room(&room_id).unwrap();
284 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
285
286 let event_cache = client.event_cache();
287 event_cache.subscribe().unwrap();
288
289 client
291 .event_cache_store()
292 .lock()
293 .await
294 .expect("Could not acquire the event cache lock")
295 .as_clean()
296 .expect("Could not acquire a clean event cache lock")
297 .handle_linked_chunk_updates(
298 LinkedChunkId::Room(&room_id),
299 vec![
300 Update::NewItemsChunk {
301 previous: None,
302 new: ChunkIdentifier::new(0),
303 next: None,
304 },
305 Update::PushItems {
306 at: Position::new(ChunkIdentifier::new(0), 0),
307 items: vec![event_factory.text_msg("A").event_id(event_id!("$ev0")).into()],
308 },
309 ],
310 )
311 .await
312 .unwrap();
313
314 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
315
316 let send_queue = client.send_queue();
317 let room_send_queue = send_queue.for_room(room);
318
319 let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
320
321 {
323 latest_event.update_with_event_cache(&room_event_cache, None, None).await;
324
325 assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
326 }
327
328 let transaction_id = OwnedTransactionId::from("txnid0");
331
332 {
333 let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
334
335 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
336 transaction_id: transaction_id.clone(),
337 content,
338 });
339
340 latest_event.update_with_send_queue(&update, &room_event_cache, None, None).await;
341
342 assert_matches!(
343 latest_event.current_value.get().await,
344 LatestEventValue::LocalIsSending(_)
345 );
346 }
347
348 {
352 latest_event.update_with_event_cache(&room_event_cache, None, None).await;
353
354 assert_matches!(
355 latest_event.current_value.get().await,
356 LatestEventValue::LocalIsSending(_)
357 );
358 }
359
360 {
363 let update = RoomSendQueueUpdate::SentEvent {
364 transaction_id,
365 event_id: event_id!("$ev1").to_owned(),
366 };
367
368 latest_event.update_with_send_queue(&update, &room_event_cache, None, None).await;
369
370 assert_matches!(
371 latest_event.current_value.get().await,
372 LatestEventValue::LocalIsSending(_)
373 );
374 }
375
376 {
379 latest_event.update_with_event_cache(&room_event_cache, None, None).await;
380
381 assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
382 }
383 }
384
385 #[async_test]
386 async fn test_store_latest_event_value() {
387 let room_id = room_id!("!r0").to_owned();
388 let user_id = user_id!("@mnt_io:matrix.org");
389 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
390
391 let server = MatrixMockServer::new().await;
392
393 let store_config = StoreConfig::new("cross-process-lock-holder".to_owned());
394
395 {
397 let client = server
398 .client_builder()
399 .on_builder(|builder| builder.store_config(store_config.clone()))
400 .build()
401 .await;
402 let mut room_info_notable_update_receiver = client.room_info_notable_update_receiver();
403 let room = client.base_client().get_or_create_room(&room_id, RoomState::Joined);
404 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
405
406 let event_cache = client.event_cache();
407 event_cache.subscribe().unwrap();
408
409 client
411 .event_cache_store()
412 .lock()
413 .await
414 .expect("Could not acquire the event cache lock")
415 .as_clean()
416 .expect("Could not acquire a clean event cache lock")
417 .handle_linked_chunk_updates(
418 LinkedChunkId::Room(&room_id),
419 vec![
420 Update::NewItemsChunk {
421 previous: None,
422 new: ChunkIdentifier::new(0),
423 next: None,
424 },
425 Update::PushItems {
426 at: Position::new(ChunkIdentifier::new(0), 0),
427 items: vec![
428 event_factory.text_msg("A").event_id(event_id!("$ev0")).into(),
429 ],
430 },
431 ],
432 )
433 .await
434 .unwrap();
435
436 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
437
438 {
440 let latest_event = room.new_latest_event();
441
442 assert_matches!(latest_event, LatestEventValue::None);
443 }
444
445 {
447 let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
448 latest_event.update_with_event_cache(&room_event_cache, None, None).await;
449
450 assert_matches!(
451 latest_event.current_value.get().await,
452 LatestEventValue::Remote(_)
453 );
454 }
455
456 {
458 let update = room_info_notable_update_receiver.recv().await.unwrap();
459
460 assert_eq!(update.room_id, room_id);
461 assert!(update.reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
462 }
463
464 {
466 let latest_event = room.new_latest_event();
467
468 assert_matches!(latest_event, LatestEventValue::Remote(_));
469 }
470 }
471
472 {
475 let client = server
476 .client_builder()
477 .on_builder(|builder| builder.store_config(store_config))
478 .build()
479 .await;
480 let room = client.get_room(&room_id).unwrap();
481 let latest_event = room.new_latest_event();
482
483 assert_matches!(latest_event, LatestEventValue::Remote(_));
484 }
485 }
486}
487
488struct LatestEventValueBuilder;
490
491impl LatestEventValueBuilder {
492 async fn new_remote(
494 room_event_cache: &RoomEventCache,
495 weak_room: &WeakRoom,
496 ) -> LatestEventValue {
497 let room = weak_room.get();
500 let (own_user_id, power_levels) = match &room {
501 Some(room) => {
502 let power_levels = room.power_levels().await.ok();
503
504 (Some(room.own_user_id()), power_levels)
505 }
506
507 None => (None, None),
508 };
509
510 Self::new_remote_with_power_levels(room_event_cache, own_user_id, power_levels.as_ref())
511 .await
512 }
513
514 async fn new_remote_with_power_levels(
517 room_event_cache: &RoomEventCache,
518 own_user_id: Option<&UserId>,
519 power_levels: Option<&RoomPowerLevels>,
520 ) -> LatestEventValue {
521 if let Ok(Some(event)) = room_event_cache
522 .rfind_map_event_in_memory_by(|event, previous_event| {
523 filter_timeline_event(event, previous_event, own_user_id, power_levels)
524 .then(|| event.clone())
525 })
526 .await
527 {
528 LatestEventValue::Remote(event)
529 } else {
530 LatestEventValue::default()
531 }
532 }
533
534 async fn new_local(
537 send_queue_update: &RoomSendQueueUpdate,
538 buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
539 room_event_cache: &RoomEventCache,
540 own_user_id: Option<&UserId>,
541 power_levels: Option<&RoomPowerLevels>,
542 ) -> LatestEventValue {
543 use crate::send_queue::{LocalEcho, LocalEchoContent};
544
545 match send_queue_update {
546 RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
550 transaction_id,
551 content: local_echo_content,
552 }) => match local_echo_content {
553 LocalEchoContent::Event { serialized_event: serialized_event_content, .. } => {
554 match serialized_event_content.deserialize() {
555 Ok(content) => {
556 if filter_any_message_like_event_content(content, None, None) {
557 let local_value = LocalLatestEventValue {
558 timestamp: MilliSecondsSinceUnixEpoch::now(),
559 content: serialized_event_content.clone(),
560 };
561
562 let value = if let Some(LatestEventValue::LocalCannotBeSent(_)) =
566 buffer_of_values_for_local_events.last()
567 {
568 LatestEventValue::LocalCannotBeSent(local_value)
569 } else {
570 LatestEventValue::LocalIsSending(local_value)
571 };
572
573 buffer_of_values_for_local_events
574 .push(transaction_id.to_owned(), value.clone());
575
576 value
577 } else {
578 LatestEventValue::None
579 }
580 }
581
582 Err(error) => {
583 error!(
584 ?error,
585 "Failed to deserialize an event from `RoomSendQueueUpdate::NewLocalEvent`"
586 );
587
588 LatestEventValue::None
589 }
590 }
591 }
592
593 LocalEchoContent::React { .. } => LatestEventValue::None,
594 },
595
596 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
601 if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
602 buffer_of_values_for_local_events.remove(position);
603 }
604
605 Self::new_local_or_remote(
606 buffer_of_values_for_local_events,
607 room_event_cache,
608 own_user_id,
609 power_levels,
610 )
611 .await
612 }
613
614 RoomSendQueueUpdate::SentEvent { transaction_id, .. } => {
623 let position =
624 buffer_of_values_for_local_events.mark_is_sending_after(transaction_id);
625
626 let value = Self::new_local_or_remote(
639 buffer_of_values_for_local_events,
640 room_event_cache,
641 own_user_id,
642 power_levels,
643 )
644 .await;
645
646 if let Some(position) = position {
647 buffer_of_values_for_local_events.remove(position);
648 }
649
650 value
651 }
652
653 RoomSendQueueUpdate::ReplacedLocalEvent {
658 transaction_id,
659 new_content: new_serialized_event_content,
660 } => {
661 if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
662 match new_serialized_event_content.deserialize() {
663 Ok(content) => {
664 if filter_any_message_like_event_content(content, None, None) {
665 buffer_of_values_for_local_events.replace_content(
666 position,
667 new_serialized_event_content.clone(),
668 );
669 } else {
670 buffer_of_values_for_local_events.remove(position);
671 }
672 }
673
674 Err(error) => {
675 error!(
676 ?error,
677 "Failed to deserialize an event from `RoomSendQueueUpdate::ReplacedLocalEvent`"
678 );
679
680 return LatestEventValue::None;
681 }
682 }
683 }
684
685 Self::new_local_or_remote(
686 buffer_of_values_for_local_events,
687 room_event_cache,
688 own_user_id,
689 power_levels,
690 )
691 .await
692 }
693
694 RoomSendQueueUpdate::SendError { transaction_id, .. } => {
699 buffer_of_values_for_local_events.mark_cannot_be_sent_from(transaction_id);
700
701 Self::new_local_or_remote(
702 buffer_of_values_for_local_events,
703 room_event_cache,
704 own_user_id,
705 power_levels,
706 )
707 .await
708 }
709
710 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
715 buffer_of_values_for_local_events.mark_is_sending_from(transaction_id);
716
717 Self::new_local_or_remote(
718 buffer_of_values_for_local_events,
719 room_event_cache,
720 own_user_id,
721 power_levels,
722 )
723 .await
724 }
725
726 RoomSendQueueUpdate::MediaUpload { .. } => LatestEventValue::None,
730 }
731 }
732
733 async fn new_local_or_remote(
740 buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
741 room_event_cache: &RoomEventCache,
742 own_user_id: Option<&UserId>,
743 power_levels: Option<&RoomPowerLevels>,
744 ) -> LatestEventValue {
745 if let Some(value) = buffer_of_values_for_local_events.last() {
746 value.clone()
747 } else {
748 Self::new_remote_with_power_levels(room_event_cache, own_user_id, power_levels).await
749 }
750 }
751}
752
753#[derive(Debug)]
793struct LatestEventValuesForLocalEvents {
794 buffer: Vec<(OwnedTransactionId, LatestEventValue)>,
795}
796
797impl LatestEventValuesForLocalEvents {
798 fn new() -> Self {
800 Self { buffer: Vec::with_capacity(2) }
801 }
802
803 fn is_empty(&self) -> bool {
805 self.buffer.is_empty()
806 }
807
808 fn last(&self) -> Option<&LatestEventValue> {
810 self.buffer.last().map(|(_, value)| value)
811 }
812
813 fn position(&self, transaction_id: &TransactionId) -> Option<usize> {
815 self.buffer
816 .iter()
817 .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
818 }
819
820 fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) {
827 assert!(
828 matches!(
829 value,
830 LatestEventValue::LocalIsSending(_) | LatestEventValue::LocalCannotBeSent(_)
831 ),
832 "`value` must be either `LocalIsSending` or `LocalCannotBeSent`"
833 );
834
835 self.buffer.push((transaction_id, value));
836 }
837
838 fn replace_content(&mut self, position: usize, new_content: SerializableEventContent) {
848 let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid");
849
850 match value {
851 LatestEventValue::LocalIsSending(LocalLatestEventValue { content, .. }) => {
852 *content = new_content;
853 }
854
855 LatestEventValue::LocalCannotBeSent(LocalLatestEventValue { content, .. }) => {
856 *content = new_content;
857 }
858
859 _ => panic!("`value` must be either `LocalIsSending` or `LocalCannotBeSent`"),
860 }
861 }
862
863 fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) {
869 self.buffer.remove(position)
870 }
871
872 fn mark_cannot_be_sent_from(&mut self, transaction_id: &TransactionId) {
875 let mut values = self.buffer.iter_mut();
876
877 if let Some(first_value_to_wedge) = values
878 .by_ref()
879 .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
880 {
881 for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) {
883 if let LatestEventValue::LocalIsSending(content) = value_to_wedge {
884 *value_to_wedge = LatestEventValue::LocalCannotBeSent(content.clone());
885 }
886 }
887 }
888 }
889
890 fn mark_is_sending_from(&mut self, transaction_id: &TransactionId) {
893 let mut values = self.buffer.iter_mut();
894
895 if let Some(first_value_to_unwedge) = values
896 .by_ref()
897 .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
898 {
899 for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) {
901 if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
902 *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
903 }
904 }
905 }
906 }
907
908 fn mark_is_sending_after(&mut self, transaction_id: &TransactionId) -> Option<usize> {
915 let mut values = self.buffer.iter_mut();
916
917 if let Some(position) = values
918 .by_ref()
919 .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
920 {
921 for (_, value_to_unwedge) in values {
923 if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
924 *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
925 }
926 }
927
928 Some(position)
929 } else {
930 None
931 }
932 }
933}
934
935fn filter_timeline_event(
936 timeline_event: &TimelineEvent,
937 previous_event: Option<&TimelineEvent>,
938 own_user_id: Option<&UserId>,
939 power_levels: Option<&RoomPowerLevels>,
940) -> bool {
941 let event = match timeline_event.raw().deserialize() {
944 Ok(event) => event,
945 Err(error) => {
946 error!(
947 ?error,
948 "Failed to deserialize the event when looking for a suitable latest event"
949 );
950
951 return false;
952 }
953 };
954
955 match event {
956 AnySyncTimelineEvent::MessageLike(message_like_event) => {
957 match message_like_event.original_content() {
958 Some(any_message_like_event_content) => filter_any_message_like_event_content(
959 any_message_like_event_content,
960 Some(timeline_event),
961 previous_event,
962 ),
963
964 None => true,
966 }
967 }
968
969 AnySyncTimelineEvent::State(state) => {
970 filter_any_sync_state_event(state, own_user_id, power_levels)
971 }
972 }
973}
974
975fn filter_any_message_like_event_content(
976 content: AnyMessageLikeEventContent,
977 new_event: Option<&TimelineEvent>,
978 previous_event: Option<&TimelineEvent>,
979) -> bool {
980 match content {
981 AnyMessageLikeEventContent::RoomMessage(message) => {
982 if let MessageType::VerificationRequest(_) = message.msgtype {
984 return false;
985 }
986
987 match &message.relates_to {
989 Some(Relation::Replacement(Replacement { .. })) => {
990 if let Some(event) = previous_event
993 && let Some(edit) = new_event
994 {
995 let original = event.kind.raw();
996 let original_encryption_info = event.kind.encryption_info();
997
998 let replacement = edit.kind.raw();
999 let replacement_encryption_info = event.kind.encryption_info();
1000
1001 match check_validity_of_replacement_events(
1002 original,
1003 original_encryption_info.map(|e| &(**e)),
1004 replacement,
1005 replacement_encryption_info.map(|e| &(**e)),
1006 ) {
1007 Ok(_) => true,
1008 Err(e) => {
1009 debug!(
1010 "Skipping an edit of a latest event due to the replacement event being invalid: {e}"
1011 );
1012 false
1013 }
1014 }
1015 } else {
1016 false
1017 }
1018 }
1019
1020 _ => true,
1021 }
1022 }
1023
1024 AnyMessageLikeEventContent::UnstablePollStart(_)
1025 | AnyMessageLikeEventContent::CallInvite(_)
1026 | AnyMessageLikeEventContent::RtcNotification(_)
1027 | AnyMessageLikeEventContent::Sticker(_) => true,
1028
1029 AnyMessageLikeEventContent::RoomEncrypted(_) => false,
1031
1032 _ => false,
1034 }
1035}
1036
1037fn filter_any_sync_state_event(
1038 event: AnySyncStateEvent,
1039 own_user_id: Option<&UserId>,
1040 power_levels: Option<&RoomPowerLevels>,
1041) -> bool {
1042 match event {
1043 AnySyncStateEvent::RoomMember(member) => {
1044 match member.membership() {
1045 MembershipState::Knock => {
1046 let can_accept_or_decline_knocks = match (own_user_id, power_levels) {
1047 (Some(own_user_id), Some(room_power_levels)) => {
1048 room_power_levels.user_can_invite(own_user_id)
1049 || room_power_levels
1050 .user_can_kick_user(own_user_id, member.state_key())
1051 }
1052 _ => false,
1053 };
1054
1055 if can_accept_or_decline_knocks {
1058 return matches!(member, SyncStateEvent::Original(_));
1061 }
1062
1063 false
1064 }
1065
1066 MembershipState::Invite => {
1067 match member {
1069 SyncStateEvent::Original(state) => {
1072 Some(state.state_key.deref()) == own_user_id
1073 }
1074
1075 _ => false,
1076 }
1077 }
1078
1079 _ => false,
1080 }
1081 }
1082
1083 _ => false,
1084 }
1085}
1086
1087#[cfg(test)]
1088mod tests_latest_event_content {
1089 use std::ops::Not;
1090
1091 use matrix_sdk_test::event_factory::EventFactory;
1092 use ruma::{
1093 event_id,
1094 events::{room::message::RoomMessageEventContent, rtc::notification::NotificationType},
1095 owned_user_id, user_id,
1096 };
1097
1098 use super::filter_timeline_event;
1099 use crate::latest_events::latest_event::tests_latest_event_values_for_local_events::remote_room_message_with_event_id;
1100
1101 macro_rules! assert_latest_event_content {
1102 ( event | $event_factory:ident | $event_builder:block
1103 is a candidate ) => {
1104 assert_latest_event_content!(@_ | $event_factory | $event_builder, true);
1105 };
1106
1107 ( event | $event_factory:ident | $event_builder:block
1108 is not a candidate ) => {
1109 assert_latest_event_content!(@_ | $event_factory | $event_builder, false);
1110 };
1111
1112 ( @_ | $event_factory:ident | $event_builder:block, $expect:literal ) => {
1113 let user_id = user_id!("@mnt_io:matrix.org");
1114 let event_factory = EventFactory::new().sender(user_id);
1115 let event = {
1116 let $event_factory = event_factory;
1117 $event_builder
1118 };
1119
1120 assert_eq!(filter_timeline_event(&event, None, Some(user_id!("@mnt_io:matrix.org")), None), $expect );
1121 };
1122 }
1123
1124 #[test]
1125 fn test_room_message() {
1126 assert_latest_event_content!(
1127 event | event_factory | { event_factory.text_msg("hello").into_event() }
1128 is a candidate
1129 );
1130 }
1131
1132 #[test]
1133 fn test_redacted() {
1134 assert_latest_event_content!(
1135 event | event_factory | {
1136 event_factory
1137 .redacted(
1138 user_id!("@mnt_io:matrix.org"),
1139 ruma::events::room::message::RedactedRoomMessageEventContent::new(),
1140 )
1141 .into_event()
1142 }
1143 is a candidate
1144 );
1145 }
1146
1147 #[test]
1148 fn test_room_message_replacement() {
1149 let user_id = user_id!("@mnt_io:matrix.org");
1150 let event_factory = EventFactory::new().sender(user_id);
1151 let event = event_factory
1152 .text_msg("bonjour")
1153 .edit(event_id!("$ev0"), RoomMessageEventContent::text_plain("hello").into())
1154 .into_event();
1155
1156 {
1163 let previous_event_id = None;
1164
1165 assert!(
1166 filter_timeline_event(
1167 &event,
1168 previous_event_id,
1169 Some(user_id!("@mnt_io:matrix.org")),
1170 None
1171 )
1172 .not()
1173 );
1174 }
1175
1176 {
1178 let previous_value =
1179 remote_room_message_with_event_id(event_id!("$ev1"), "Hello world");
1180
1181 assert!(
1182 filter_timeline_event(
1183 &event,
1184 Some(&previous_value),
1185 Some(user_id!("@mnt_io:matrix.org")),
1186 None
1187 )
1188 .not()
1189 );
1190 }
1191
1192 {
1194 let previous_value =
1195 remote_room_message_with_event_id(event_id!("$ev0"), "Hello world");
1196
1197 assert!(filter_timeline_event(
1198 &event,
1199 Some(&previous_value),
1200 Some(user_id!("@mnt_io:matrix.org")),
1201 None
1202 ));
1203 }
1204 }
1205
1206 #[test]
1207 fn test_poll() {
1208 assert_latest_event_content!(
1209 event | event_factory | {
1210 event_factory
1211 .poll_start("the people need to know", "comté > gruyère", vec!["yes", "oui"])
1212 .into_event()
1213 }
1214 is a candidate
1215 );
1216 }
1217
1218 #[test]
1219 fn test_call_invite() {
1220 assert_latest_event_content!(
1221 event | event_factory | {
1222 event_factory
1223 .call_invite(
1224 ruma::OwnedVoipId::from("vvooiipp".to_owned()),
1225 ruma::UInt::from(1234u32),
1226 ruma::events::call::SessionDescription::new(
1227 "type".to_owned(),
1228 "sdp".to_owned(),
1229 ),
1230 ruma::VoipVersionId::V1,
1231 )
1232 .into_event()
1233 }
1234 is a candidate
1235 );
1236 }
1237
1238 #[test]
1239 fn test_rtc_notification() {
1240 assert_latest_event_content!(
1241 event | event_factory | {
1242 event_factory
1243 .rtc_notification(
1244 NotificationType::Ring,
1245 )
1246 .mentions(vec![owned_user_id!("@alice:server.name")])
1247 .relates_to_membership_state_event(ruma::OwnedEventId::try_from("$abc:server.name").unwrap())
1248 .lifetime(60)
1249 .into_event()
1250 }
1251 is a candidate
1252 );
1253 }
1254
1255 #[test]
1256 fn test_sticker() {
1257 assert_latest_event_content!(
1258 event | event_factory | {
1259 event_factory
1260 .sticker(
1261 "wink wink",
1262 ruma::events::room::ImageInfo::new(),
1263 ruma::OwnedMxcUri::from("mxc://foo/bar"),
1264 )
1265 .into_event()
1266 }
1267 is a candidate
1268 );
1269 }
1270
1271 #[test]
1272 fn test_encrypted_room_message() {
1273 assert_latest_event_content!(
1274 event | event_factory | {
1275 event_factory
1276 .event(ruma::events::room::encrypted::RoomEncryptedEventContent::new(
1277 ruma::events::room::encrypted::EncryptedEventScheme::MegolmV1AesSha2(
1278 ruma::events::room::encrypted::MegolmV1AesSha2ContentInit {
1279 ciphertext: "cipher".to_owned(),
1280 sender_key: "sender_key".to_owned(),
1281 device_id: "device_id".into(),
1282 session_id: "session_id".to_owned(),
1283 }
1284 .into(),
1285 ),
1286 None,
1287 ))
1288 .into_event()
1289 }
1290 is not a candidate
1291 );
1292 }
1293
1294 #[test]
1295 fn test_reaction() {
1296 assert_latest_event_content!(
1298 event | event_factory | { event_factory.reaction(event_id!("$ev0"), "+1").into_event() }
1299 is not a candidate
1300 );
1301 }
1302
1303 #[test]
1304 fn test_state_event() {
1305 assert_latest_event_content!(
1306 event | event_factory | { event_factory.room_topic("new room topic").into_event() }
1307 is not a candidate
1308 );
1309 }
1310
1311 #[test]
1312 fn test_knocked_state_event_without_power_levels() {
1313 assert_latest_event_content!(
1314 event | event_factory | {
1315 event_factory
1316 .member(user_id!("@other_mnt_io:server.name"))
1317 .membership(ruma::events::room::member::MembershipState::Knock)
1318 .into_event()
1319 }
1320 is not a candidate
1321 );
1322 }
1323
1324 #[test]
1325 fn test_knocked_state_event_with_power_levels() {
1326 use ruma::{
1327 events::room::{
1328 member::MembershipState,
1329 power_levels::{RoomPowerLevels, RoomPowerLevelsSource},
1330 },
1331 room_version_rules::AuthorizationRules,
1332 };
1333
1334 let user_id = user_id!("@mnt_io:matrix.org");
1335 let other_user_id = user_id!("@other_mnt_io:server.name");
1336 let event_factory = EventFactory::new().sender(user_id);
1337 let event =
1338 event_factory.member(other_user_id).membership(MembershipState::Knock).into_event();
1339
1340 let mut room_power_levels =
1341 RoomPowerLevels::new(RoomPowerLevelsSource::None, &AuthorizationRules::V1, []);
1342 room_power_levels.users.insert(user_id.to_owned(), 5.into());
1343 room_power_levels.users.insert(other_user_id.to_owned(), 4.into());
1344
1345 {
1347 room_power_levels.invite = 10.into();
1348 room_power_levels.kick = 10.into();
1349 assert!(
1350 filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)).not(),
1351 "cannot accept, cannot decline",
1352 );
1353 }
1354
1355 {
1357 room_power_levels.invite = 0.into();
1358 room_power_levels.kick = 10.into();
1359 assert!(
1360 filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
1361 "can accept, cannot decline",
1362 );
1363 }
1364
1365 {
1367 room_power_levels.invite = 10.into();
1368 room_power_levels.kick = 0.into();
1369 assert!(
1370 filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
1371 "cannot accept, can decline",
1372 );
1373 }
1374
1375 {
1377 room_power_levels.invite = 0.into();
1378 room_power_levels.kick = 0.into();
1379 assert!(
1380 filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
1381 "can accept, can decline",
1382 );
1383 }
1384
1385 {
1389 room_power_levels.users.insert(user_id.to_owned(), 5.into());
1390 room_power_levels.users.insert(other_user_id.to_owned(), 5.into());
1391
1392 room_power_levels.invite = 10.into();
1393 room_power_levels.kick = 0.into();
1394
1395 assert!(
1396 filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)).not(),
1397 "cannot accept, can decline, at least same user levels",
1398 );
1399 }
1400 }
1401
1402 #[test]
1403 fn test_invite_state_event() {
1404 use ruma::events::room::member::MembershipState;
1405
1406 assert_latest_event_content!(
1408 event | event_factory | {
1409 event_factory
1410 .member(user_id!("@mnt_io:matrix.org"))
1411 .membership(MembershipState::Invite)
1412 .into_event()
1413 }
1414 is a candidate
1415 );
1416 }
1417
1418 #[test]
1419 fn test_invite_state_event_for_someone_else() {
1420 use ruma::events::room::member::MembershipState;
1421
1422 assert_latest_event_content!(
1424 event | event_factory | {
1425 event_factory
1426 .member(user_id!("@other_mnt_io:server.name"))
1427 .membership(MembershipState::Invite)
1428 .into_event()
1429 }
1430 is not a candidate
1431 );
1432 }
1433
1434 #[test]
1435 fn test_room_message_verification_request() {
1436 use ruma::{OwnedDeviceId, events::room::message};
1437
1438 assert_latest_event_content!(
1439 event | event_factory | {
1440 event_factory
1441 .event(RoomMessageEventContent::new(message::MessageType::VerificationRequest(
1442 message::KeyVerificationRequestEventContent::new(
1443 "body".to_owned(),
1444 vec![],
1445 OwnedDeviceId::from("device_id"),
1446 user_id!("@user:server.name").to_owned(),
1447 ),
1448 )))
1449 .into_event()
1450 }
1451 is not a candidate
1452 );
1453 }
1454}
1455
1456#[cfg(test)]
1457mod tests_latest_event_values_for_local_events {
1458 use assert_matches::assert_matches;
1459 use ruma::{
1460 EventId, MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
1461 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
1462 serde::Raw,
1463 };
1464 use serde_json::json;
1465
1466 use super::{
1467 LatestEventValue, LatestEventValuesForLocalEvents, LocalLatestEventValue,
1468 RemoteLatestEventValue, SerializableEventContent,
1469 };
1470
1471 pub fn remote_room_message_with_event_id(
1472 event_id: &EventId,
1473 body: &str,
1474 ) -> RemoteLatestEventValue {
1475 RemoteLatestEventValue::from_plaintext(
1476 Raw::from_json_string(
1477 json!({
1478 "content": RoomMessageEventContent::text_plain(body),
1479 "type": "m.room.message",
1480 "event_id": event_id,
1481 "origin_server_ts": 42,
1482 "sender": "@mnt_io:matrix.org",
1483 })
1484 .to_string(),
1485 )
1486 .unwrap(),
1487 )
1488 }
1489
1490 fn remote_room_message(body: &str) -> RemoteLatestEventValue {
1491 let event_id = event_id!("$ev0");
1492 remote_room_message_with_event_id(&event_id, body)
1493 }
1494
1495 fn local_room_message(body: &str) -> LocalLatestEventValue {
1496 LocalLatestEventValue {
1497 timestamp: MilliSecondsSinceUnixEpoch::now(),
1498 content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
1499 RoomMessageEventContent::text_plain(body),
1500 ))
1501 .unwrap(),
1502 }
1503 }
1504
1505 #[test]
1506 fn test_last() {
1507 let mut buffer = LatestEventValuesForLocalEvents::new();
1508
1509 assert!(buffer.last().is_none());
1510
1511 buffer.push(
1512 OwnedTransactionId::from("txnid"),
1513 LatestEventValue::LocalIsSending(local_room_message("tome")),
1514 );
1515
1516 assert_matches!(buffer.last(), Some(LatestEventValue::LocalIsSending(_)));
1517 }
1518
1519 #[test]
1520 fn test_position() {
1521 let mut buffer = LatestEventValuesForLocalEvents::new();
1522 let transaction_id = OwnedTransactionId::from("txnid");
1523
1524 assert!(buffer.position(&transaction_id).is_none());
1525
1526 buffer.push(
1527 transaction_id.clone(),
1528 LatestEventValue::LocalIsSending(local_room_message("raclette")),
1529 );
1530 buffer.push(
1531 OwnedTransactionId::from("othertxnid"),
1532 LatestEventValue::LocalIsSending(local_room_message("tome")),
1533 );
1534
1535 assert_eq!(buffer.position(&transaction_id), Some(0));
1536 }
1537
1538 #[test]
1539 #[should_panic]
1540 fn test_push_none() {
1541 let mut buffer = LatestEventValuesForLocalEvents::new();
1542
1543 buffer.push(OwnedTransactionId::from("txnid"), LatestEventValue::None);
1544 }
1545
1546 #[test]
1547 #[should_panic]
1548 fn test_push_remote() {
1549 let mut buffer = LatestEventValuesForLocalEvents::new();
1550
1551 buffer.push(
1552 OwnedTransactionId::from("txnid"),
1553 LatestEventValue::Remote(remote_room_message("tome")),
1554 );
1555 }
1556
1557 #[test]
1558 fn test_push_local() {
1559 let mut buffer = LatestEventValuesForLocalEvents::new();
1560
1561 buffer.push(
1562 OwnedTransactionId::from("txnid0"),
1563 LatestEventValue::LocalIsSending(local_room_message("tome")),
1564 );
1565 buffer.push(
1566 OwnedTransactionId::from("txnid1"),
1567 LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1568 );
1569
1570 }
1572
1573 #[test]
1574 fn test_replace_content() {
1575 let mut buffer = LatestEventValuesForLocalEvents::new();
1576
1577 buffer.push(
1578 OwnedTransactionId::from("txnid0"),
1579 LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1580 );
1581
1582 let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1583
1584 buffer.replace_content(0, new_content);
1585
1586 assert_matches!(
1587 buffer.last(),
1588 Some(LatestEventValue::LocalIsSending(local_event)) => {
1589 assert_matches!(
1590 local_event.content.deserialize().unwrap(),
1591 AnyMessageLikeEventContent::RoomMessage(content) => {
1592 assert_eq!(content.body(), "comté");
1593 }
1594 );
1595 }
1596 );
1597 }
1598
1599 #[test]
1600 fn test_remove() {
1601 let mut buffer = LatestEventValuesForLocalEvents::new();
1602
1603 buffer.push(
1604 OwnedTransactionId::from("txnid"),
1605 LatestEventValue::LocalIsSending(local_room_message("gryuère")),
1606 );
1607
1608 assert!(buffer.last().is_some());
1609
1610 buffer.remove(0);
1611
1612 assert!(buffer.last().is_none());
1613 }
1614
1615 #[test]
1616 fn test_mark_cannot_be_sent_from() {
1617 let mut buffer = LatestEventValuesForLocalEvents::new();
1618 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1619 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1620 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1621
1622 buffer.push(
1623 transaction_id_0,
1624 LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1625 );
1626 buffer.push(
1627 transaction_id_1.clone(),
1628 LatestEventValue::LocalIsSending(local_room_message("brigand")),
1629 );
1630 buffer.push(
1631 transaction_id_2,
1632 LatestEventValue::LocalIsSending(local_room_message("raclette")),
1633 );
1634
1635 buffer.mark_cannot_be_sent_from(&transaction_id_1);
1636
1637 assert_eq!(buffer.buffer.len(), 3);
1638 assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
1639 assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1640 assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalCannotBeSent(_));
1641 }
1642
1643 #[test]
1644 fn test_mark_is_sending_from() {
1645 let mut buffer = LatestEventValuesForLocalEvents::new();
1646 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1647 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1648 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1649
1650 buffer.push(
1651 transaction_id_0,
1652 LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1653 );
1654 buffer.push(
1655 transaction_id_1.clone(),
1656 LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1657 );
1658 buffer.push(
1659 transaction_id_2,
1660 LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1661 );
1662
1663 buffer.mark_is_sending_from(&transaction_id_1);
1664
1665 assert_eq!(buffer.buffer.len(), 3);
1666 assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1667 assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
1668 assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1669 }
1670
1671 #[test]
1672 fn test_mark_is_sending_after() {
1673 let mut buffer = LatestEventValuesForLocalEvents::new();
1674 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1675 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1676 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1677
1678 buffer.push(
1679 transaction_id_0,
1680 LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1681 );
1682 buffer.push(
1683 transaction_id_1.clone(),
1684 LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1685 );
1686 buffer.push(
1687 transaction_id_2,
1688 LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1689 );
1690
1691 buffer.mark_is_sending_after(&transaction_id_1);
1692
1693 assert_eq!(buffer.buffer.len(), 3);
1694 assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1695 assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1696 assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1697 }
1698}
1699
1700#[cfg(all(not(target_family = "wasm"), test))]
1701mod tests_latest_event_value_builder {
1702 use std::sync::Arc;
1703
1704 use assert_matches::assert_matches;
1705 use matrix_sdk_base::{
1706 RoomState,
1707 deserialized_responses::TimelineEventKind,
1708 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1709 store::SerializableEventContent,
1710 };
1711 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1712 use ruma::{
1713 MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedTransactionId, event_id,
1714 events::{
1715 AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
1716 SyncMessageLikeEvent, reaction::ReactionEventContent, relation::Annotation,
1717 room::message::RoomMessageEventContent,
1718 },
1719 room_id, user_id,
1720 };
1721
1722 use super::{
1723 LatestEventValue, LatestEventValueBuilder, LatestEventValuesForLocalEvents,
1724 RemoteLatestEventValue, RoomEventCache, RoomSendQueueUpdate,
1725 };
1726 use crate::{
1727 Client, Error,
1728 client::WeakClient,
1729 room::WeakRoom,
1730 send_queue::{AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle},
1731 test_utils::mocks::MatrixMockServer,
1732 };
1733
1734 macro_rules! assert_remote_value_matches_room_message_with_body {
1735 ( $latest_event_value:expr => with body = $body:expr ) => {
1736 assert_matches!(
1737 $latest_event_value,
1738 LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => {
1739 assert_matches!(
1740 event.deserialize().unwrap(),
1741 AnySyncTimelineEvent::MessageLike(
1742 AnySyncMessageLikeEvent::RoomMessage(
1743 SyncMessageLikeEvent::Original(message_content)
1744 )
1745 ) => {
1746 assert_eq!(message_content.content.body(), $body);
1747 }
1748 );
1749 }
1750 );
1751 };
1752 }
1753
1754 macro_rules! assert_local_value_matches_room_message_with_body {
1755 ( $latest_event_value:expr, $pattern:path => with body = $body:expr ) => {
1756 assert_matches!(
1757 $latest_event_value,
1758 $pattern (local_event) => {
1759 assert_matches!(
1760 local_event.content.deserialize().unwrap(),
1761 AnyMessageLikeEventContent::RoomMessage(message_content) => {
1762 assert_eq!(message_content.body(), $body);
1763 }
1764 );
1765 }
1766 );
1767 };
1768 }
1769
1770 #[async_test]
1771 async fn test_remote_is_scanning_event_backwards_from_event_cache() {
1772 let room_id = room_id!("!r0");
1773 let user_id = user_id!("@mnt_io:matrix.org");
1774 let event_factory = EventFactory::new().sender(user_id).room(room_id);
1775 let event_id_0 = event_id!("$ev0");
1776 let event_id_1 = event_id!("$ev1");
1777 let event_id_2 = event_id!("$ev2");
1778
1779 let server = MatrixMockServer::new().await;
1780 let client = server.client_builder().build().await;
1781
1782 {
1784 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1786
1787 client
1789 .event_cache_store()
1790 .lock()
1791 .await
1792 .expect("Could not acquire the event cache lock")
1793 .as_clean()
1794 .expect("Could not acquire a clean event cache lock")
1795 .handle_linked_chunk_updates(
1796 LinkedChunkId::Room(room_id),
1797 vec![
1798 Update::NewItemsChunk {
1799 previous: None,
1800 new: ChunkIdentifier::new(0),
1801 next: None,
1802 },
1803 Update::PushItems {
1804 at: Position::new(ChunkIdentifier::new(0), 0),
1805 items: vec![
1806 event_factory.text_msg("hello").event_id(event_id_0).into(),
1808 event_factory.text_msg("world").event_id(event_id_1).into(),
1810 event_factory
1812 .room_topic("new room topic")
1813 .event_id(event_id_2)
1814 .into(),
1815 ],
1816 },
1817 ],
1818 )
1819 .await
1820 .unwrap();
1821 }
1822
1823 let event_cache = client.event_cache();
1824 event_cache.subscribe().unwrap();
1825
1826 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1827 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.to_owned());
1828
1829 assert_remote_value_matches_room_message_with_body!(
1830 LatestEventValueBuilder::new_remote(&room_event_cache, &weak_room).await => with body = "world"
1834 );
1835 }
1836
1837 async fn local_prelude() -> (Client, OwnedRoomId, RoomSendQueue, RoomEventCache) {
1838 let room_id = room_id!("!r0").to_owned();
1839
1840 let server = MatrixMockServer::new().await;
1841 let client = server.client_builder().build().await;
1842 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1843 let room = client.get_room(&room_id).unwrap();
1844
1845 let event_cache = client.event_cache();
1846 event_cache.subscribe().unwrap();
1847
1848 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
1849
1850 let send_queue = client.send_queue();
1851 let room_send_queue = send_queue.for_room(room);
1852
1853 (client, room_id, room_send_queue, room_event_cache)
1854 }
1855
1856 fn new_local_echo_content(
1857 room_send_queue: &RoomSendQueue,
1858 transaction_id: &OwnedTransactionId,
1859 body: &str,
1860 ) -> LocalEchoContent {
1861 LocalEchoContent::Event {
1862 serialized_event: SerializableEventContent::new(
1863 &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
1864 )
1865 .unwrap(),
1866 send_handle: SendHandle::new(
1867 room_send_queue.clone(),
1868 transaction_id.clone(),
1869 MilliSecondsSinceUnixEpoch::now(),
1870 ),
1871 send_error: None,
1872 }
1873 }
1874
1875 #[async_test]
1876 async fn test_local_new_local_event() {
1877 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1878
1879 let mut buffer = LatestEventValuesForLocalEvents::new();
1880
1881 {
1883 let transaction_id = OwnedTransactionId::from("txnid0");
1884 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
1885
1886 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1887
1888 assert_local_value_matches_room_message_with_body!(
1890 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1891 LatestEventValue::LocalIsSending => with body = "A"
1892 );
1893 }
1894
1895 {
1897 let transaction_id = OwnedTransactionId::from("txnid1");
1898 let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
1899
1900 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1901
1902 assert_local_value_matches_room_message_with_body!(
1904 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1905 LatestEventValue::LocalIsSending => with body = "B"
1906 );
1907 }
1908
1909 assert_eq!(buffer.buffer.len(), 2);
1910 }
1911
1912 #[async_test]
1913 async fn test_local_new_local_event_when_previous_local_event_cannot_be_sent() {
1914 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1915
1916 let mut buffer = LatestEventValuesForLocalEvents::new();
1917
1918 let transaction_id_0 = {
1920 let transaction_id = OwnedTransactionId::from("txnid0");
1921 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
1922
1923 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1924 transaction_id: transaction_id.clone(),
1925 content,
1926 });
1927
1928 assert_local_value_matches_room_message_with_body!(
1930 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1931 LatestEventValue::LocalIsSending => with body = "A"
1932 );
1933
1934 transaction_id
1935 };
1936
1937 {
1940 let update = RoomSendQueueUpdate::SendError {
1941 transaction_id: transaction_id_0.clone(),
1942 error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
1943 is_recoverable: true,
1944 };
1945
1946 assert_local_value_matches_room_message_with_body!(
1949 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1950 LatestEventValue::LocalCannotBeSent => with body = "A"
1951 );
1952
1953 assert_eq!(buffer.buffer.len(), 1);
1954 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1955 }
1956
1957 {
1961 let transaction_id = OwnedTransactionId::from("txnid1");
1962 let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
1963
1964 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1965
1966 assert_local_value_matches_room_message_with_body!(
1968 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1969 LatestEventValue::LocalCannotBeSent => with body = "B"
1970 );
1971 }
1972
1973 assert_eq!(buffer.buffer.len(), 2);
1974 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1975 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1976 }
1977
1978 #[async_test]
1979 async fn test_local_cancelled_local_event() {
1980 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1981
1982 let mut buffer = LatestEventValuesForLocalEvents::new();
1983 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1984 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1985 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1986
1987 {
1989 for (transaction_id, body) in
1990 [(&transaction_id_0, "A"), (&transaction_id_1, "B"), (&transaction_id_2, "C")]
1991 {
1992 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
1993
1994 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1995 transaction_id: transaction_id.clone(),
1996 content,
1997 });
1998
1999 assert_local_value_matches_room_message_with_body!(
2001 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2002 LatestEventValue::LocalIsSending => with body = body
2003 );
2004 }
2005
2006 assert_eq!(buffer.buffer.len(), 3);
2007 }
2008
2009 {
2012 let update = RoomSendQueueUpdate::CancelledLocalEvent {
2013 transaction_id: transaction_id_1.clone(),
2014 };
2015
2016 assert_local_value_matches_room_message_with_body!(
2019 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2020 LatestEventValue::LocalIsSending => with body = "C"
2021 );
2022
2023 assert_eq!(buffer.buffer.len(), 2);
2024 }
2025
2026 {
2029 let update = RoomSendQueueUpdate::CancelledLocalEvent {
2030 transaction_id: transaction_id_2.clone(),
2031 };
2032
2033 assert_local_value_matches_room_message_with_body!(
2036 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2037 LatestEventValue::LocalIsSending => with body = "A"
2038 );
2039
2040 assert_eq!(buffer.buffer.len(), 1);
2041 }
2042
2043 {
2048 let update =
2049 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: transaction_id_0 };
2050
2051 assert_matches!(
2053 LatestEventValueBuilder::new_local(
2054 &update,
2055 &mut buffer,
2056 &room_event_cache,
2057 None,
2058 None
2059 )
2060 .await,
2061 LatestEventValue::None
2062 );
2063
2064 assert!(buffer.buffer.is_empty());
2065 }
2066 }
2067
2068 #[async_test]
2069 async fn test_local_sent_event() {
2070 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2071
2072 let mut buffer = LatestEventValuesForLocalEvents::new();
2073 let transaction_id_0 = OwnedTransactionId::from("txnid0");
2074 let transaction_id_1 = OwnedTransactionId::from("txnid1");
2075
2076 {
2078 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2079 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2080
2081 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2082 transaction_id: transaction_id.clone(),
2083 content,
2084 });
2085
2086 assert_local_value_matches_room_message_with_body!(
2088 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2089 LatestEventValue::LocalIsSending => with body = body
2090 );
2091 }
2092
2093 assert_eq!(buffer.buffer.len(), 2);
2094 }
2095
2096 {
2099 let update = RoomSendQueueUpdate::SentEvent {
2100 transaction_id: transaction_id_0.clone(),
2101 event_id: event_id!("$ev0").to_owned(),
2102 };
2103
2104 assert_local_value_matches_room_message_with_body!(
2107 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2108 LatestEventValue::LocalIsSending => with body = "B"
2109 );
2110
2111 assert_eq!(buffer.buffer.len(), 1);
2112 }
2113
2114 {
2117 let update = RoomSendQueueUpdate::SentEvent {
2118 transaction_id: transaction_id_1,
2119 event_id: event_id!("$ev1").to_owned(),
2120 };
2121
2122 assert_local_value_matches_room_message_with_body!(
2124 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2125 LatestEventValue::LocalIsSending => with body = "B"
2126 );
2127
2128 assert!(buffer.buffer.is_empty());
2129 }
2130 }
2131
2132 #[async_test]
2133 async fn test_local_replaced_local_event() {
2134 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2135
2136 let mut buffer = LatestEventValuesForLocalEvents::new();
2137 let transaction_id_0 = OwnedTransactionId::from("txnid0");
2138 let transaction_id_1 = OwnedTransactionId::from("txnid1");
2139
2140 {
2142 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2143 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2144
2145 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2146 transaction_id: transaction_id.clone(),
2147 content,
2148 });
2149
2150 assert_local_value_matches_room_message_with_body!(
2152 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2153 LatestEventValue::LocalIsSending => with body = body
2154 );
2155 }
2156
2157 assert_eq!(buffer.buffer.len(), 2);
2158 }
2159
2160 {
2163 let transaction_id = &transaction_id_0;
2164 let LocalEchoContent::Event { serialized_event: new_content, .. } =
2165 new_local_echo_content(&room_send_queue, transaction_id, "A.")
2166 else {
2167 panic!("oopsy");
2168 };
2169
2170 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
2171 transaction_id: transaction_id.clone(),
2172 new_content,
2173 };
2174
2175 assert_local_value_matches_room_message_with_body!(
2178 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2179 LatestEventValue::LocalIsSending => with body = "B"
2180 );
2181
2182 assert_eq!(buffer.buffer.len(), 2);
2183 }
2184
2185 {
2188 let transaction_id = &transaction_id_1;
2189 let LocalEchoContent::Event { serialized_event: new_content, .. } =
2190 new_local_echo_content(&room_send_queue, transaction_id, "B.")
2191 else {
2192 panic!("oopsy");
2193 };
2194
2195 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
2196 transaction_id: transaction_id.clone(),
2197 new_content,
2198 };
2199
2200 assert_local_value_matches_room_message_with_body!(
2203 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2204 LatestEventValue::LocalIsSending => with body = "B."
2205 );
2206
2207 assert_eq!(buffer.buffer.len(), 2);
2208 }
2209 }
2210
2211 #[async_test]
2212 async fn test_local_replaced_local_event_by_a_non_suitable_event() {
2213 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2214
2215 let mut buffer = LatestEventValuesForLocalEvents::new();
2216 let transaction_id = OwnedTransactionId::from("txnid0");
2217
2218 {
2220 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2221
2222 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2223 transaction_id: transaction_id.clone(),
2224 content,
2225 });
2226
2227 assert_local_value_matches_room_message_with_body!(
2229 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2230 LatestEventValue::LocalIsSending => with body = "A"
2231 );
2232
2233 assert_eq!(buffer.buffer.len(), 1);
2234 }
2235
2236 {
2241 let new_content = SerializableEventContent::new(&AnyMessageLikeEventContent::Reaction(
2242 ReactionEventContent::new(Annotation::new(
2243 event_id!("$ev0").to_owned(),
2244 "+1".to_owned(),
2245 )),
2246 ))
2247 .unwrap();
2248
2249 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
2250 transaction_id: transaction_id.clone(),
2251 new_content,
2252 };
2253
2254 assert_matches!(
2256 LatestEventValueBuilder::new_local(
2257 &update,
2258 &mut buffer,
2259 &room_event_cache,
2260 None,
2261 None
2262 )
2263 .await,
2264 LatestEventValue::None
2265 );
2266
2267 assert_eq!(buffer.buffer.len(), 0);
2268 }
2269 }
2270
2271 #[async_test]
2272 async fn test_remote_edit_invalid_edit() {
2273 let room_id = room_id!("!r0");
2274 let user_id = user_id!("@mnt_io:matrix.org");
2275 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2276 let event_id_0 = event_id!("$ev0");
2277 let event_id_1 = event_id!("$ev1");
2278
2279 let server = MatrixMockServer::new().await;
2280 let client = server.client_builder().build().await;
2281
2282 {
2284 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2286
2287 client
2289 .event_cache_store()
2290 .lock()
2291 .await
2292 .expect("Could not acquire the event cache lock")
2293 .as_clean()
2294 .expect("Could not acquire a clean event cache lock")
2295 .handle_linked_chunk_updates(
2296 LinkedChunkId::Room(room_id),
2297 vec![
2298 Update::NewItemsChunk {
2299 previous: None,
2300 new: ChunkIdentifier::new(0),
2301 next: None,
2302 },
2303 Update::PushItems {
2304 at: Position::new(ChunkIdentifier::new(0), 0),
2305 items: vec![
2306 event_factory
2308 .text_msg("hello")
2309 .sender(user_id!("@alice:example.org"))
2310 .event_id(event_id_0)
2311 .into(),
2312 event_factory
2314 .text_msg("* goodbye")
2315 .event_id(event_id_1)
2316 .sender(user_id!("@malory:example.org"))
2317 .edit(
2318 event_id_0,
2319 RoomMessageEventContent::text_plain("goodbye").into(),
2320 )
2321 .into(),
2322 ],
2323 },
2324 ],
2325 )
2326 .await
2327 .unwrap();
2328 }
2329
2330 let event_cache = client.event_cache();
2331 event_cache.subscribe().unwrap();
2332
2333 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2334 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.to_owned());
2335
2336 assert_remote_value_matches_room_message_with_body!(
2337 LatestEventValueBuilder::new_remote(&room_event_cache, &weak_room).await => with body = "hello"
2339 );
2340 }
2341
2342 #[async_test]
2343 async fn test_local_send_error() {
2344 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2345
2346 let mut buffer = LatestEventValuesForLocalEvents::new();
2347 let transaction_id_0 = OwnedTransactionId::from("txnid0");
2348 let transaction_id_1 = OwnedTransactionId::from("txnid1");
2349
2350 {
2352 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2353 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2354
2355 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2356 transaction_id: transaction_id.clone(),
2357 content,
2358 });
2359
2360 assert_local_value_matches_room_message_with_body!(
2362 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2363 LatestEventValue::LocalIsSending => with body = body
2364 );
2365 }
2366
2367 assert_eq!(buffer.buffer.len(), 2);
2368 }
2369
2370 {
2373 let update = RoomSendQueueUpdate::SendError {
2374 transaction_id: transaction_id_0.clone(),
2375 error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2376 is_recoverable: true,
2377 };
2378
2379 assert_local_value_matches_room_message_with_body!(
2382 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2383 LatestEventValue::LocalCannotBeSent => with body = "B"
2384 );
2385
2386 assert_eq!(buffer.buffer.len(), 2);
2387 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2388 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2389 }
2390
2391 {
2395 let update = RoomSendQueueUpdate::SentEvent {
2396 transaction_id: transaction_id_0.clone(),
2397 event_id: event_id!("$ev0").to_owned(),
2398 };
2399
2400 assert_local_value_matches_room_message_with_body!(
2403 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2404 LatestEventValue::LocalIsSending => with body = "B"
2405 );
2406
2407 assert_eq!(buffer.buffer.len(), 1);
2408 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
2409 }
2410 }
2411
2412 #[async_test]
2413 async fn test_local_retry_event() {
2414 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2415
2416 let mut buffer = LatestEventValuesForLocalEvents::new();
2417 let transaction_id_0 = OwnedTransactionId::from("txnid0");
2418 let transaction_id_1 = OwnedTransactionId::from("txnid1");
2419
2420 {
2422 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2423 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2424
2425 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2426 transaction_id: transaction_id.clone(),
2427 content,
2428 });
2429
2430 assert_local_value_matches_room_message_with_body!(
2432 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2433 LatestEventValue::LocalIsSending => with body = body
2434 );
2435 }
2436
2437 assert_eq!(buffer.buffer.len(), 2);
2438 }
2439
2440 {
2443 let update = RoomSendQueueUpdate::SendError {
2444 transaction_id: transaction_id_0.clone(),
2445 error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2446 is_recoverable: true,
2447 };
2448
2449 assert_local_value_matches_room_message_with_body!(
2452 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2453 LatestEventValue::LocalCannotBeSent => with body = "B"
2454 );
2455
2456 assert_eq!(buffer.buffer.len(), 2);
2457 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2458 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2459 }
2460
2461 {
2464 let update =
2465 RoomSendQueueUpdate::RetryEvent { transaction_id: transaction_id_0.clone() };
2466
2467 assert_local_value_matches_room_message_with_body!(
2470 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2471 LatestEventValue::LocalIsSending => with body = "B"
2472 );
2473
2474 assert_eq!(buffer.buffer.len(), 2);
2475 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
2476 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
2477 }
2478 }
2479
2480 #[async_test]
2481 async fn test_local_media_upload() {
2482 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2483
2484 let mut buffer = LatestEventValuesForLocalEvents::new();
2485 let transaction_id = OwnedTransactionId::from("txnid");
2486
2487 {
2489 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2490
2491 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2492 transaction_id: transaction_id.clone(),
2493 content,
2494 });
2495
2496 assert_local_value_matches_room_message_with_body!(
2498 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2499 LatestEventValue::LocalIsSending => with body = "A"
2500 );
2501
2502 assert_eq!(buffer.buffer.len(), 1);
2503 }
2504
2505 {
2508 let update = RoomSendQueueUpdate::MediaUpload {
2509 related_to: transaction_id,
2510 file: None,
2511 index: 0,
2512 progress: AbstractProgress { current: 0, total: 0 },
2513 };
2514
2515 assert_matches!(
2518 LatestEventValueBuilder::new_local(
2519 &update,
2520 &mut buffer,
2521 &room_event_cache,
2522 None,
2523 None
2524 )
2525 .await,
2526 LatestEventValue::None
2527 );
2528
2529 assert_eq!(buffer.buffer.len(), 1);
2530 }
2531 }
2532
2533 #[async_test]
2534 async fn test_local_fallbacks_to_remote_when_empty() {
2535 let room_id = room_id!("!r0");
2536 let user_id = user_id!("@mnt_io:matrix.org");
2537 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2538 let event_id_0 = event_id!("$ev0");
2539 let event_id_1 = event_id!("$ev1");
2540
2541 let server = MatrixMockServer::new().await;
2542 let client = server.client_builder().build().await;
2543
2544 {
2546 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2548
2549 client
2551 .event_cache_store()
2552 .lock()
2553 .await
2554 .expect("Could not acquire the event cache lock")
2555 .as_clean()
2556 .expect("Could not acquire a clean event cache lock")
2557 .handle_linked_chunk_updates(
2558 LinkedChunkId::Room(room_id),
2559 vec![
2560 Update::NewItemsChunk {
2561 previous: None,
2562 new: ChunkIdentifier::new(0),
2563 next: None,
2564 },
2565 Update::PushItems {
2566 at: Position::new(ChunkIdentifier::new(0), 0),
2567 items: vec![
2568 event_factory.text_msg("hello").event_id(event_id_0).into(),
2569 ],
2570 },
2571 ],
2572 )
2573 .await
2574 .unwrap();
2575 }
2576
2577 let event_cache = client.event_cache();
2578 event_cache.subscribe().unwrap();
2579
2580 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2581
2582 let mut buffer = LatestEventValuesForLocalEvents::new();
2583
2584 assert_remote_value_matches_room_message_with_body!(
2586 LatestEventValueBuilder::new_local(
2587 &RoomSendQueueUpdate::SentEvent {
2589 transaction_id: OwnedTransactionId::from("txnid"),
2590 event_id: event_id_1.to_owned(),
2591 },
2592 &mut buffer,
2593 &room_event_cache,
2594 None,
2595 None,
2596 )
2597 .await
2598 => with body = "hello"
2599 );
2600 }
2601}