1use std::{
16 iter::once,
17 ops::{Deref, Not},
18};
19
20use eyeball::{AsyncLock, SharedObservable, Subscriber};
21pub use matrix_sdk_base::latest_event::{
22 LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue,
23};
24use matrix_sdk_base::{
25 RoomInfoNotableUpdateReasons, StateChanges, deserialized_responses::TimelineEvent,
26 store::SerializableEventContent,
27};
28use ruma::{
29 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
30 events::{
31 AnyMessageLikeEventContent, AnySyncStateEvent, AnySyncTimelineEvent, SyncStateEvent,
32 relation::Replacement,
33 room::{
34 member::MembershipState,
35 message::{MessageType, Relation},
36 power_levels::RoomPowerLevels,
37 },
38 },
39};
40use tracing::{error, instrument, warn};
41
42use crate::{event_cache::RoomEventCache, room::WeakRoom, send_queue::RoomSendQueueUpdate};
43
44#[derive(Debug)]
48pub(super) struct LatestEvent {
49 weak_room: WeakRoom,
51
52 _thread_id: Option<OwnedEventId>,
54
55 buffer_of_values_for_local_events: LatestEventValuesForLocalEvents,
59
60 current_value: SharedObservable<LatestEventValue, AsyncLock>,
62}
63
64impl LatestEvent {
65 pub(super) async fn new(
66 weak_room: &WeakRoom,
67 thread_id: Option<&EventId>,
68 room_event_cache: &RoomEventCache,
69 ) -> Self {
70 Self {
71 weak_room: weak_room.clone(),
72 _thread_id: thread_id.map(ToOwned::to_owned),
73 buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(),
74 current_value: SharedObservable::new_async(
75 LatestEventValueBuilder::new_remote(room_event_cache, weak_room).await,
76 ),
77 }
78 }
79
80 pub async fn subscribe(&self) -> Subscriber<LatestEventValue, AsyncLock> {
82 self.current_value.subscribe().await
83 }
84
85 pub async fn update_with_event_cache(
95 &mut self,
96 room_event_cache: &RoomEventCache,
97 own_user_id: Option<&UserId>,
98 power_levels: Option<&RoomPowerLevels>,
99 ) {
100 if self.buffer_of_values_for_local_events.is_empty().not() {
101 return;
105 }
106
107 let new_value = LatestEventValueBuilder::new_remote_with_power_levels(
108 room_event_cache,
109 own_user_id,
110 power_levels,
111 )
112 .await;
113
114 self.update(new_value).await;
115 }
116
117 pub async fn update_with_send_queue(
120 &mut self,
121 send_queue_update: &RoomSendQueueUpdate,
122 room_event_cache: &RoomEventCache,
123 own_user_id: Option<&UserId>,
124 power_levels: Option<&RoomPowerLevels>,
125 ) {
126 let new_value = LatestEventValueBuilder::new_local(
127 send_queue_update,
128 &mut self.buffer_of_values_for_local_events,
129 room_event_cache,
130 own_user_id,
131 power_levels,
132 )
133 .await;
134
135 self.update(new_value).await;
136 }
137
138 async fn update(&mut self, new_value: LatestEventValue) {
141 if let LatestEventValue::None = new_value {
142 } else {
144 self.current_value.set(new_value.clone()).await;
145 self.store(new_value).await;
146 }
147 }
148
149 #[instrument(skip_all)]
154 async fn store(&mut self, new_value: LatestEventValue) {
155 let Some(room) = self.weak_room.get() else {
156 warn!(room_id = ?self.weak_room.room_id(), "Cannot store the latest event value because the room cannot be accessed");
157 return;
158 };
159
160 let mut room_info = room.clone_info();
162 room_info.set_new_latest_event(new_value);
163
164 let mut state_changes = StateChanges::default();
165 state_changes.add_room(room_info.clone());
166
167 let client = room.client();
168
169 let _state_store_lock = client.base_client().state_store_lock().lock().await;
171
172 if let Err(error) = client.state_store().save_changes(&state_changes).await {
174 error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
175 }
176
177 room.set_room_info(room_info, RoomInfoNotableUpdateReasons::LATEST_EVENT);
179 }
180}
181
182#[cfg(all(not(target_family = "wasm"), test))]
183mod tests_latest_event {
184 use assert_matches::assert_matches;
185 use matrix_sdk_base::{
186 RoomInfoNotableUpdateReasons, RoomState,
187 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
188 store::StoreConfig,
189 };
190 use matrix_sdk_test::{async_test, event_factory::EventFactory};
191 use ruma::{
192 MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
193 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
194 room_id, user_id,
195 };
196
197 use super::{LatestEvent, LatestEventValue, LocalLatestEventValue, SerializableEventContent};
198 use crate::{
199 client::WeakClient,
200 room::WeakRoom,
201 send_queue::{LocalEcho, LocalEchoContent, RoomSendQueue, RoomSendQueueUpdate, SendHandle},
202 test_utils::mocks::MatrixMockServer,
203 };
204
205 fn local_room_message(body: &str) -> LocalLatestEventValue {
206 LocalLatestEventValue {
207 timestamp: MilliSecondsSinceUnixEpoch::now(),
208 content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
209 RoomMessageEventContent::text_plain(body),
210 ))
211 .unwrap(),
212 }
213 }
214
215 fn new_local_echo_content(
216 room_send_queue: &RoomSendQueue,
217 transaction_id: &OwnedTransactionId,
218 body: &str,
219 ) -> LocalEchoContent {
220 LocalEchoContent::Event {
221 serialized_event: SerializableEventContent::new(
222 &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
223 )
224 .unwrap(),
225 send_handle: SendHandle::new(
226 room_send_queue.clone(),
227 transaction_id.clone(),
228 MilliSecondsSinceUnixEpoch::now(),
229 ),
230 send_error: None,
231 }
232 }
233
234 #[async_test]
235 async fn test_update_ignores_none_value() {
236 let room_id = room_id!("!r0");
237
238 let server = MatrixMockServer::new().await;
239 let client = server.client_builder().build().await;
240 let weak_client = WeakClient::from_client(&client);
241
242 client.base_client().get_or_create_room(room_id, RoomState::Joined);
244 let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
245
246 let event_cache = client.event_cache();
248 event_cache.subscribe().unwrap();
249
250 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
251
252 let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
253
254 assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
256
257 latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
259
260 assert_matches!(
261 latest_event.current_value.get().await,
262 LatestEventValue::LocalIsSending(_)
263 );
264
265 latest_event.update(LatestEventValue::None).await;
267
268 assert_matches!(
269 latest_event.current_value.get().await,
270 LatestEventValue::LocalIsSending(_)
271 );
272 }
273
274 #[async_test]
275 async fn test_local_has_priority_over_remote() {
276 let room_id = room_id!("!r0").to_owned();
277 let user_id = user_id!("@mnt_io:matrix.org");
278 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
279
280 let server = MatrixMockServer::new().await;
281 let client = server.client_builder().build().await;
282 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
283 let room = client.get_room(&room_id).unwrap();
284 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
285
286 let event_cache = client.event_cache();
287 event_cache.subscribe().unwrap();
288
289 client
291 .event_cache_store()
292 .lock()
293 .await
294 .expect("Could not acquire the event cache lock")
295 .as_clean()
296 .expect("Could not acquire a clean event cache lock")
297 .handle_linked_chunk_updates(
298 LinkedChunkId::Room(&room_id),
299 vec![
300 Update::NewItemsChunk {
301 previous: None,
302 new: ChunkIdentifier::new(0),
303 next: None,
304 },
305 Update::PushItems {
306 at: Position::new(ChunkIdentifier::new(0), 0),
307 items: vec![event_factory.text_msg("A").event_id(event_id!("$ev0")).into()],
308 },
309 ],
310 )
311 .await
312 .unwrap();
313
314 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
315
316 let send_queue = client.send_queue();
317 let room_send_queue = send_queue.for_room(room);
318
319 let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
320
321 {
323 latest_event.update_with_event_cache(&room_event_cache, None, None).await;
324
325 assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
326 }
327
328 let transaction_id = OwnedTransactionId::from("txnid0");
331
332 {
333 let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
334
335 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
336 transaction_id: transaction_id.clone(),
337 content,
338 });
339
340 latest_event.update_with_send_queue(&update, &room_event_cache, None, None).await;
341
342 assert_matches!(
343 latest_event.current_value.get().await,
344 LatestEventValue::LocalIsSending(_)
345 );
346 }
347
348 {
352 latest_event.update_with_event_cache(&room_event_cache, None, None).await;
353
354 assert_matches!(
355 latest_event.current_value.get().await,
356 LatestEventValue::LocalIsSending(_)
357 );
358 }
359
360 {
363 let update = RoomSendQueueUpdate::SentEvent {
364 transaction_id,
365 event_id: event_id!("$ev1").to_owned(),
366 };
367
368 latest_event.update_with_send_queue(&update, &room_event_cache, None, None).await;
369
370 assert_matches!(
371 latest_event.current_value.get().await,
372 LatestEventValue::LocalIsSending(_)
373 );
374 }
375
376 {
379 latest_event.update_with_event_cache(&room_event_cache, None, None).await;
380
381 assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
382 }
383 }
384
385 #[async_test]
386 async fn test_store_latest_event_value() {
387 let room_id = room_id!("!r0").to_owned();
388 let user_id = user_id!("@mnt_io:matrix.org");
389 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
390
391 let server = MatrixMockServer::new().await;
392
393 let store_config = StoreConfig::new("cross-process-lock-holder".to_owned());
394
395 {
397 let client = server
398 .client_builder()
399 .on_builder(|builder| builder.store_config(store_config.clone()))
400 .build()
401 .await;
402 let mut room_info_notable_update_receiver = client.room_info_notable_update_receiver();
403 let room = client.base_client().get_or_create_room(&room_id, RoomState::Joined);
404 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
405
406 let event_cache = client.event_cache();
407 event_cache.subscribe().unwrap();
408
409 client
411 .event_cache_store()
412 .lock()
413 .await
414 .expect("Could not acquire the event cache lock")
415 .as_clean()
416 .expect("Could not acquire a clean event cache lock")
417 .handle_linked_chunk_updates(
418 LinkedChunkId::Room(&room_id),
419 vec![
420 Update::NewItemsChunk {
421 previous: None,
422 new: ChunkIdentifier::new(0),
423 next: None,
424 },
425 Update::PushItems {
426 at: Position::new(ChunkIdentifier::new(0), 0),
427 items: vec![
428 event_factory.text_msg("A").event_id(event_id!("$ev0")).into(),
429 ],
430 },
431 ],
432 )
433 .await
434 .unwrap();
435
436 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
437
438 {
440 let latest_event = room.new_latest_event();
441
442 assert_matches!(latest_event, LatestEventValue::None);
443 }
444
445 {
447 let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
448 latest_event.update_with_event_cache(&room_event_cache, None, None).await;
449
450 assert_matches!(
451 latest_event.current_value.get().await,
452 LatestEventValue::Remote(_)
453 );
454 }
455
456 {
458 let update = room_info_notable_update_receiver.recv().await.unwrap();
459
460 assert_eq!(update.room_id, room_id);
461 assert!(update.reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
462 }
463
464 {
466 let latest_event = room.new_latest_event();
467
468 assert_matches!(latest_event, LatestEventValue::Remote(_));
469 }
470 }
471
472 {
475 let client = server
476 .client_builder()
477 .on_builder(|builder| builder.store_config(store_config))
478 .build()
479 .await;
480 let room = client.get_room(&room_id).unwrap();
481 let latest_event = room.new_latest_event();
482
483 assert_matches!(latest_event, LatestEventValue::Remote(_));
484 }
485 }
486}
487
488struct LatestEventValueBuilder;
490
491impl LatestEventValueBuilder {
492 async fn new_remote(
494 room_event_cache: &RoomEventCache,
495 weak_room: &WeakRoom,
496 ) -> LatestEventValue {
497 let room = weak_room.get();
500 let (own_user_id, power_levels) = match &room {
501 Some(room) => {
502 let power_levels = room.power_levels().await.ok();
503
504 (Some(room.own_user_id()), power_levels)
505 }
506
507 None => (None, None),
508 };
509
510 Self::new_remote_with_power_levels(room_event_cache, own_user_id, power_levels.as_ref())
511 .await
512 }
513
514 async fn new_remote_with_power_levels(
517 room_event_cache: &RoomEventCache,
518 own_user_id: Option<&UserId>,
519 power_levels: Option<&RoomPowerLevels>,
520 ) -> LatestEventValue {
521 if let Ok(Some(event)) = room_event_cache
522 .rfind_map_event_in_memory_by(|event, previous_event_id| {
523 filter_timeline_event(event, previous_event_id, own_user_id, power_levels)
524 .then(|| event.clone())
525 })
526 .await
527 {
528 LatestEventValue::Remote(event)
529 } else {
530 LatestEventValue::default()
531 }
532 }
533
534 async fn new_local(
537 send_queue_update: &RoomSendQueueUpdate,
538 buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
539 room_event_cache: &RoomEventCache,
540 own_user_id: Option<&UserId>,
541 power_levels: Option<&RoomPowerLevels>,
542 ) -> LatestEventValue {
543 use crate::send_queue::{LocalEcho, LocalEchoContent};
544
545 match send_queue_update {
546 RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
550 transaction_id,
551 content: local_echo_content,
552 }) => match local_echo_content {
553 LocalEchoContent::Event { serialized_event: serialized_event_content, .. } => {
554 match serialized_event_content.deserialize() {
555 Ok(content) => {
556 if filter_any_message_like_event_content(content, None) {
557 let local_value = LocalLatestEventValue {
558 timestamp: MilliSecondsSinceUnixEpoch::now(),
559 content: serialized_event_content.clone(),
560 };
561
562 let value = if let Some(LatestEventValue::LocalCannotBeSent(_)) =
566 buffer_of_values_for_local_events.last()
567 {
568 LatestEventValue::LocalCannotBeSent(local_value)
569 } else {
570 LatestEventValue::LocalIsSending(local_value)
571 };
572
573 buffer_of_values_for_local_events
574 .push(transaction_id.to_owned(), value.clone());
575
576 value
577 } else {
578 LatestEventValue::None
579 }
580 }
581
582 Err(error) => {
583 error!(
584 ?error,
585 "Failed to deserialize an event from `RoomSendQueueUpdate::NewLocalEvent`"
586 );
587
588 LatestEventValue::None
589 }
590 }
591 }
592
593 LocalEchoContent::React { .. } => LatestEventValue::None,
594 },
595
596 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
601 if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
602 buffer_of_values_for_local_events.remove(position);
603 }
604
605 Self::new_local_or_remote(
606 buffer_of_values_for_local_events,
607 room_event_cache,
608 own_user_id,
609 power_levels,
610 )
611 .await
612 }
613
614 RoomSendQueueUpdate::SentEvent { transaction_id, .. } => {
623 let position =
624 buffer_of_values_for_local_events.mark_is_sending_after(transaction_id);
625
626 let value = Self::new_local_or_remote(
639 buffer_of_values_for_local_events,
640 room_event_cache,
641 own_user_id,
642 power_levels,
643 )
644 .await;
645
646 if let Some(position) = position {
647 buffer_of_values_for_local_events.remove(position);
648 }
649
650 value
651 }
652
653 RoomSendQueueUpdate::ReplacedLocalEvent {
658 transaction_id,
659 new_content: new_serialized_event_content,
660 } => {
661 if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
662 match new_serialized_event_content.deserialize() {
663 Ok(content) => {
664 if filter_any_message_like_event_content(content, None) {
665 buffer_of_values_for_local_events.replace_content(
666 position,
667 new_serialized_event_content.clone(),
668 );
669 } else {
670 buffer_of_values_for_local_events.remove(position);
671 }
672 }
673
674 Err(error) => {
675 error!(
676 ?error,
677 "Failed to deserialize an event from `RoomSendQueueUpdate::ReplacedLocalEvent`"
678 );
679
680 return LatestEventValue::None;
681 }
682 }
683 }
684
685 Self::new_local_or_remote(
686 buffer_of_values_for_local_events,
687 room_event_cache,
688 own_user_id,
689 power_levels,
690 )
691 .await
692 }
693
694 RoomSendQueueUpdate::SendError { transaction_id, .. } => {
699 buffer_of_values_for_local_events.mark_cannot_be_sent_from(transaction_id);
700
701 Self::new_local_or_remote(
702 buffer_of_values_for_local_events,
703 room_event_cache,
704 own_user_id,
705 power_levels,
706 )
707 .await
708 }
709
710 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
715 buffer_of_values_for_local_events.mark_is_sending_from(transaction_id);
716
717 Self::new_local_or_remote(
718 buffer_of_values_for_local_events,
719 room_event_cache,
720 own_user_id,
721 power_levels,
722 )
723 .await
724 }
725
726 RoomSendQueueUpdate::MediaUpload { .. } => LatestEventValue::None,
730 }
731 }
732
733 async fn new_local_or_remote(
740 buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
741 room_event_cache: &RoomEventCache,
742 own_user_id: Option<&UserId>,
743 power_levels: Option<&RoomPowerLevels>,
744 ) -> LatestEventValue {
745 if let Some(value) = buffer_of_values_for_local_events.last() {
746 value.clone()
747 } else {
748 Self::new_remote_with_power_levels(room_event_cache, own_user_id, power_levels).await
749 }
750 }
751}
752
753#[derive(Debug)]
793struct LatestEventValuesForLocalEvents {
794 buffer: Vec<(OwnedTransactionId, LatestEventValue)>,
795}
796
797impl LatestEventValuesForLocalEvents {
798 fn new() -> Self {
800 Self { buffer: Vec::with_capacity(2) }
801 }
802
803 fn is_empty(&self) -> bool {
805 self.buffer.is_empty()
806 }
807
808 fn last(&self) -> Option<&LatestEventValue> {
810 self.buffer.last().map(|(_, value)| value)
811 }
812
813 fn position(&self, transaction_id: &TransactionId) -> Option<usize> {
815 self.buffer
816 .iter()
817 .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
818 }
819
820 fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) {
827 assert!(
828 matches!(
829 value,
830 LatestEventValue::LocalIsSending(_) | LatestEventValue::LocalCannotBeSent(_)
831 ),
832 "`value` must be either `LocalIsSending` or `LocalCannotBeSent`"
833 );
834
835 self.buffer.push((transaction_id, value));
836 }
837
838 fn replace_content(&mut self, position: usize, new_content: SerializableEventContent) {
848 let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid");
849
850 match value {
851 LatestEventValue::LocalIsSending(LocalLatestEventValue { content, .. }) => {
852 *content = new_content;
853 }
854
855 LatestEventValue::LocalCannotBeSent(LocalLatestEventValue { content, .. }) => {
856 *content = new_content;
857 }
858
859 _ => panic!("`value` must be either `LocalIsSending` or `LocalCannotBeSent`"),
860 }
861 }
862
863 fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) {
869 self.buffer.remove(position)
870 }
871
872 fn mark_cannot_be_sent_from(&mut self, transaction_id: &TransactionId) {
875 let mut values = self.buffer.iter_mut();
876
877 if let Some(first_value_to_wedge) = values
878 .by_ref()
879 .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
880 {
881 for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) {
883 if let LatestEventValue::LocalIsSending(content) = value_to_wedge {
884 *value_to_wedge = LatestEventValue::LocalCannotBeSent(content.clone());
885 }
886 }
887 }
888 }
889
890 fn mark_is_sending_from(&mut self, transaction_id: &TransactionId) {
893 let mut values = self.buffer.iter_mut();
894
895 if let Some(first_value_to_unwedge) = values
896 .by_ref()
897 .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
898 {
899 for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) {
901 if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
902 *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
903 }
904 }
905 }
906 }
907
908 fn mark_is_sending_after(&mut self, transaction_id: &TransactionId) -> Option<usize> {
915 let mut values = self.buffer.iter_mut();
916
917 if let Some(position) = values
918 .by_ref()
919 .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
920 {
921 for (_, value_to_unwedge) in values {
923 if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
924 *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
925 }
926 }
927
928 Some(position)
929 } else {
930 None
931 }
932 }
933}
934
935fn filter_timeline_event(
936 event: &TimelineEvent,
937 previous_event_id: Option<OwnedEventId>,
938 own_user_id: Option<&UserId>,
939 power_levels: Option<&RoomPowerLevels>,
940) -> bool {
941 let event = match event.raw().deserialize() {
944 Ok(event) => event,
945 Err(error) => {
946 error!(
947 ?error,
948 "Failed to deserialize the event when looking for a suitable latest event"
949 );
950
951 return false;
952 }
953 };
954
955 match event {
956 AnySyncTimelineEvent::MessageLike(message_like_event) => {
957 match message_like_event.original_content() {
958 Some(any_message_like_event_content) => filter_any_message_like_event_content(
959 any_message_like_event_content,
960 previous_event_id,
961 ),
962
963 None => true,
965 }
966 }
967
968 AnySyncTimelineEvent::State(state) => {
969 filter_any_sync_state_event(state, own_user_id, power_levels)
970 }
971 }
972}
973
974fn filter_any_message_like_event_content(
975 event: AnyMessageLikeEventContent,
976 previous_event_id: Option<OwnedEventId>,
977) -> bool {
978 match event {
979 AnyMessageLikeEventContent::RoomMessage(message) => {
980 if let MessageType::VerificationRequest(_) = message.msgtype {
982 return false;
983 }
984
985 match &message.relates_to {
987 Some(Relation::Replacement(Replacement { event_id, .. })) => {
988 Some(event_id) == previous_event_id.as_ref()
991 }
992
993 _ => true,
994 }
995 }
996
997 AnyMessageLikeEventContent::UnstablePollStart(_)
998 | AnyMessageLikeEventContent::CallInvite(_)
999 | AnyMessageLikeEventContent::RtcNotification(_)
1000 | AnyMessageLikeEventContent::Sticker(_) => true,
1001
1002 AnyMessageLikeEventContent::RoomEncrypted(_) => false,
1004
1005 _ => false,
1007 }
1008}
1009
1010fn filter_any_sync_state_event(
1011 event: AnySyncStateEvent,
1012 own_user_id: Option<&UserId>,
1013 power_levels: Option<&RoomPowerLevels>,
1014) -> bool {
1015 match event {
1016 AnySyncStateEvent::RoomMember(member) => {
1017 match member.membership() {
1018 MembershipState::Knock => {
1019 let can_accept_or_decline_knocks = match (own_user_id, power_levels) {
1020 (Some(own_user_id), Some(room_power_levels)) => {
1021 room_power_levels.user_can_invite(own_user_id)
1022 || room_power_levels
1023 .user_can_kick_user(own_user_id, member.state_key())
1024 }
1025 _ => false,
1026 };
1027
1028 if can_accept_or_decline_knocks {
1031 return matches!(member, SyncStateEvent::Original(_));
1034 }
1035
1036 false
1037 }
1038
1039 MembershipState::Invite => {
1040 match member {
1042 SyncStateEvent::Original(state) => {
1045 Some(state.state_key.deref()) == own_user_id
1046 }
1047
1048 _ => false,
1049 }
1050 }
1051
1052 _ => false,
1053 }
1054 }
1055
1056 _ => false,
1057 }
1058}
1059
1060#[cfg(test)]
1061mod tests_latest_event_content {
1062 use std::ops::Not;
1063
1064 use matrix_sdk_test::event_factory::EventFactory;
1065 use ruma::{
1066 event_id,
1067 events::{room::message::RoomMessageEventContent, rtc::notification::NotificationType},
1068 owned_user_id, user_id,
1069 };
1070
1071 use super::filter_timeline_event;
1072
1073 macro_rules! assert_latest_event_content {
1074 ( event | $event_factory:ident | $event_builder:block
1075 is a candidate ) => {
1076 assert_latest_event_content!(@_ | $event_factory | $event_builder, true);
1077 };
1078
1079 ( event | $event_factory:ident | $event_builder:block
1080 is not a candidate ) => {
1081 assert_latest_event_content!(@_ | $event_factory | $event_builder, false);
1082 };
1083
1084 ( @_ | $event_factory:ident | $event_builder:block, $expect:literal ) => {
1085 let user_id = user_id!("@mnt_io:matrix.org");
1086 let event_factory = EventFactory::new().sender(user_id);
1087 let event = {
1088 let $event_factory = event_factory;
1089 $event_builder
1090 };
1091
1092 assert_eq!(filter_timeline_event(&event, None, Some(user_id!("@mnt_io:matrix.org")), None), $expect );
1093 };
1094 }
1095
1096 #[test]
1097 fn test_room_message() {
1098 assert_latest_event_content!(
1099 event | event_factory | { event_factory.text_msg("hello").into_event() }
1100 is a candidate
1101 );
1102 }
1103
1104 #[test]
1105 fn test_redacted() {
1106 assert_latest_event_content!(
1107 event | event_factory | {
1108 event_factory
1109 .redacted(
1110 user_id!("@mnt_io:matrix.org"),
1111 ruma::events::room::message::RedactedRoomMessageEventContent::new(),
1112 )
1113 .into_event()
1114 }
1115 is a candidate
1116 );
1117 }
1118
1119 #[test]
1120 fn test_room_message_replacement() {
1121 let user_id = user_id!("@mnt_io:matrix.org");
1122 let event_factory = EventFactory::new().sender(user_id);
1123 let event = event_factory
1124 .text_msg("bonjour")
1125 .edit(event_id!("$ev0"), RoomMessageEventContent::text_plain("hello").into())
1126 .into_event();
1127
1128 {
1135 let previous_event_id = None;
1136
1137 assert!(
1138 filter_timeline_event(
1139 &event,
1140 previous_event_id,
1141 Some(user_id!("@mnt_io:matrix.org")),
1142 None
1143 )
1144 .not()
1145 );
1146 }
1147
1148 {
1150 let previous_event_id = Some(event_id!("$ev1").to_owned());
1151
1152 assert!(
1153 filter_timeline_event(
1154 &event,
1155 previous_event_id,
1156 Some(user_id!("@mnt_io:matrix.org")),
1157 None
1158 )
1159 .not()
1160 );
1161 }
1162
1163 {
1165 let previous_event_id = Some(event_id!("$ev0").to_owned());
1166
1167 assert!(filter_timeline_event(
1168 &event,
1169 previous_event_id,
1170 Some(user_id!("@mnt_io:matrix.org")),
1171 None
1172 ));
1173 }
1174 }
1175
1176 #[test]
1177 fn test_poll() {
1178 assert_latest_event_content!(
1179 event | event_factory | {
1180 event_factory
1181 .poll_start("the people need to know", "comté > gruyère", vec!["yes", "oui"])
1182 .into_event()
1183 }
1184 is a candidate
1185 );
1186 }
1187
1188 #[test]
1189 fn test_call_invite() {
1190 assert_latest_event_content!(
1191 event | event_factory | {
1192 event_factory
1193 .call_invite(
1194 ruma::OwnedVoipId::from("vvooiipp".to_owned()),
1195 ruma::UInt::from(1234u32),
1196 ruma::events::call::SessionDescription::new(
1197 "type".to_owned(),
1198 "sdp".to_owned(),
1199 ),
1200 ruma::VoipVersionId::V1,
1201 )
1202 .into_event()
1203 }
1204 is a candidate
1205 );
1206 }
1207
1208 #[test]
1209 fn test_rtc_notification() {
1210 assert_latest_event_content!(
1211 event | event_factory | {
1212 event_factory
1213 .rtc_notification(
1214 NotificationType::Ring,
1215 )
1216 .mentions(vec![owned_user_id!("@alice:server.name")])
1217 .relates_to_membership_state_event(ruma::OwnedEventId::try_from("$abc:server.name").unwrap())
1218 .lifetime(60)
1219 .into_event()
1220 }
1221 is a candidate
1222 );
1223 }
1224
1225 #[test]
1226 fn test_sticker() {
1227 assert_latest_event_content!(
1228 event | event_factory | {
1229 event_factory
1230 .sticker(
1231 "wink wink",
1232 ruma::events::room::ImageInfo::new(),
1233 ruma::OwnedMxcUri::from("mxc://foo/bar"),
1234 )
1235 .into_event()
1236 }
1237 is a candidate
1238 );
1239 }
1240
1241 #[test]
1242 fn test_encrypted_room_message() {
1243 assert_latest_event_content!(
1244 event | event_factory | {
1245 event_factory
1246 .event(ruma::events::room::encrypted::RoomEncryptedEventContent::new(
1247 ruma::events::room::encrypted::EncryptedEventScheme::MegolmV1AesSha2(
1248 ruma::events::room::encrypted::MegolmV1AesSha2ContentInit {
1249 ciphertext: "cipher".to_owned(),
1250 sender_key: "sender_key".to_owned(),
1251 device_id: "device_id".into(),
1252 session_id: "session_id".to_owned(),
1253 }
1254 .into(),
1255 ),
1256 None,
1257 ))
1258 .into_event()
1259 }
1260 is not a candidate
1261 );
1262 }
1263
1264 #[test]
1265 fn test_reaction() {
1266 assert_latest_event_content!(
1268 event | event_factory | { event_factory.reaction(event_id!("$ev0"), "+1").into_event() }
1269 is not a candidate
1270 );
1271 }
1272
1273 #[test]
1274 fn test_state_event() {
1275 assert_latest_event_content!(
1276 event | event_factory | { event_factory.room_topic("new room topic").into_event() }
1277 is not a candidate
1278 );
1279 }
1280
1281 #[test]
1282 fn test_knocked_state_event_without_power_levels() {
1283 assert_latest_event_content!(
1284 event | event_factory | {
1285 event_factory
1286 .member(user_id!("@other_mnt_io:server.name"))
1287 .membership(ruma::events::room::member::MembershipState::Knock)
1288 .into_event()
1289 }
1290 is not a candidate
1291 );
1292 }
1293
1294 #[test]
1295 fn test_knocked_state_event_with_power_levels() {
1296 use ruma::{
1297 events::room::{
1298 member::MembershipState,
1299 power_levels::{RoomPowerLevels, RoomPowerLevelsSource},
1300 },
1301 room_version_rules::AuthorizationRules,
1302 };
1303
1304 let user_id = user_id!("@mnt_io:matrix.org");
1305 let other_user_id = user_id!("@other_mnt_io:server.name");
1306 let event_factory = EventFactory::new().sender(user_id);
1307 let event =
1308 event_factory.member(other_user_id).membership(MembershipState::Knock).into_event();
1309
1310 let mut room_power_levels =
1311 RoomPowerLevels::new(RoomPowerLevelsSource::None, &AuthorizationRules::V1, []);
1312 room_power_levels.users.insert(user_id.to_owned(), 5.into());
1313 room_power_levels.users.insert(other_user_id.to_owned(), 4.into());
1314
1315 {
1317 room_power_levels.invite = 10.into();
1318 room_power_levels.kick = 10.into();
1319 assert!(
1320 filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)).not(),
1321 "cannot accept, cannot decline",
1322 );
1323 }
1324
1325 {
1327 room_power_levels.invite = 0.into();
1328 room_power_levels.kick = 10.into();
1329 assert!(
1330 filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
1331 "can accept, cannot decline",
1332 );
1333 }
1334
1335 {
1337 room_power_levels.invite = 10.into();
1338 room_power_levels.kick = 0.into();
1339 assert!(
1340 filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
1341 "cannot accept, can decline",
1342 );
1343 }
1344
1345 {
1347 room_power_levels.invite = 0.into();
1348 room_power_levels.kick = 0.into();
1349 assert!(
1350 filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)),
1351 "can accept, can decline",
1352 );
1353 }
1354
1355 {
1359 room_power_levels.users.insert(user_id.to_owned(), 5.into());
1360 room_power_levels.users.insert(other_user_id.to_owned(), 5.into());
1361
1362 room_power_levels.invite = 10.into();
1363 room_power_levels.kick = 0.into();
1364
1365 assert!(
1366 filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)).not(),
1367 "cannot accept, can decline, at least same user levels",
1368 );
1369 }
1370 }
1371
1372 #[test]
1373 fn test_invite_state_event() {
1374 use ruma::events::room::member::MembershipState;
1375
1376 assert_latest_event_content!(
1378 event | event_factory | {
1379 event_factory
1380 .member(user_id!("@mnt_io:matrix.org"))
1381 .membership(MembershipState::Invite)
1382 .into_event()
1383 }
1384 is a candidate
1385 );
1386 }
1387
1388 #[test]
1389 fn test_invite_state_event_for_someone_else() {
1390 use ruma::events::room::member::MembershipState;
1391
1392 assert_latest_event_content!(
1394 event | event_factory | {
1395 event_factory
1396 .member(user_id!("@other_mnt_io:server.name"))
1397 .membership(MembershipState::Invite)
1398 .into_event()
1399 }
1400 is not a candidate
1401 );
1402 }
1403
1404 #[test]
1405 fn test_room_message_verification_request() {
1406 use ruma::{OwnedDeviceId, events::room::message};
1407
1408 assert_latest_event_content!(
1409 event | event_factory | {
1410 event_factory
1411 .event(RoomMessageEventContent::new(message::MessageType::VerificationRequest(
1412 message::KeyVerificationRequestEventContent::new(
1413 "body".to_owned(),
1414 vec![],
1415 OwnedDeviceId::from("device_id"),
1416 user_id!("@user:server.name").to_owned(),
1417 ),
1418 )))
1419 .into_event()
1420 }
1421 is not a candidate
1422 );
1423 }
1424}
1425
1426#[cfg(test)]
1427mod tests_latest_event_values_for_local_events {
1428 use assert_matches::assert_matches;
1429 use ruma::{
1430 MilliSecondsSinceUnixEpoch, OwnedTransactionId,
1431 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
1432 serde::Raw,
1433 };
1434 use serde_json::json;
1435
1436 use super::{
1437 LatestEventValue, LatestEventValuesForLocalEvents, LocalLatestEventValue,
1438 RemoteLatestEventValue, SerializableEventContent,
1439 };
1440
1441 fn remote_room_message(body: &str) -> RemoteLatestEventValue {
1442 RemoteLatestEventValue::from_plaintext(
1443 Raw::from_json_string(
1444 json!({
1445 "content": RoomMessageEventContent::text_plain(body),
1446 "type": "m.room.message",
1447 "event_id": "$ev0",
1448 "origin_server_ts": 42,
1449 "sender": "@mnt_io:matrix.org",
1450 })
1451 .to_string(),
1452 )
1453 .unwrap(),
1454 )
1455 }
1456
1457 fn local_room_message(body: &str) -> LocalLatestEventValue {
1458 LocalLatestEventValue {
1459 timestamp: MilliSecondsSinceUnixEpoch::now(),
1460 content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
1461 RoomMessageEventContent::text_plain(body),
1462 ))
1463 .unwrap(),
1464 }
1465 }
1466
1467 #[test]
1468 fn test_last() {
1469 let mut buffer = LatestEventValuesForLocalEvents::new();
1470
1471 assert!(buffer.last().is_none());
1472
1473 buffer.push(
1474 OwnedTransactionId::from("txnid"),
1475 LatestEventValue::LocalIsSending(local_room_message("tome")),
1476 );
1477
1478 assert_matches!(buffer.last(), Some(LatestEventValue::LocalIsSending(_)));
1479 }
1480
1481 #[test]
1482 fn test_position() {
1483 let mut buffer = LatestEventValuesForLocalEvents::new();
1484 let transaction_id = OwnedTransactionId::from("txnid");
1485
1486 assert!(buffer.position(&transaction_id).is_none());
1487
1488 buffer.push(
1489 transaction_id.clone(),
1490 LatestEventValue::LocalIsSending(local_room_message("raclette")),
1491 );
1492 buffer.push(
1493 OwnedTransactionId::from("othertxnid"),
1494 LatestEventValue::LocalIsSending(local_room_message("tome")),
1495 );
1496
1497 assert_eq!(buffer.position(&transaction_id), Some(0));
1498 }
1499
1500 #[test]
1501 #[should_panic]
1502 fn test_push_none() {
1503 let mut buffer = LatestEventValuesForLocalEvents::new();
1504
1505 buffer.push(OwnedTransactionId::from("txnid"), LatestEventValue::None);
1506 }
1507
1508 #[test]
1509 #[should_panic]
1510 fn test_push_remote() {
1511 let mut buffer = LatestEventValuesForLocalEvents::new();
1512
1513 buffer.push(
1514 OwnedTransactionId::from("txnid"),
1515 LatestEventValue::Remote(remote_room_message("tome")),
1516 );
1517 }
1518
1519 #[test]
1520 fn test_push_local() {
1521 let mut buffer = LatestEventValuesForLocalEvents::new();
1522
1523 buffer.push(
1524 OwnedTransactionId::from("txnid0"),
1525 LatestEventValue::LocalIsSending(local_room_message("tome")),
1526 );
1527 buffer.push(
1528 OwnedTransactionId::from("txnid1"),
1529 LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1530 );
1531
1532 }
1534
1535 #[test]
1536 fn test_replace_content() {
1537 let mut buffer = LatestEventValuesForLocalEvents::new();
1538
1539 buffer.push(
1540 OwnedTransactionId::from("txnid0"),
1541 LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1542 );
1543
1544 let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1545
1546 buffer.replace_content(0, new_content);
1547
1548 assert_matches!(
1549 buffer.last(),
1550 Some(LatestEventValue::LocalIsSending(local_event)) => {
1551 assert_matches!(
1552 local_event.content.deserialize().unwrap(),
1553 AnyMessageLikeEventContent::RoomMessage(content) => {
1554 assert_eq!(content.body(), "comté");
1555 }
1556 );
1557 }
1558 );
1559 }
1560
1561 #[test]
1562 fn test_remove() {
1563 let mut buffer = LatestEventValuesForLocalEvents::new();
1564
1565 buffer.push(
1566 OwnedTransactionId::from("txnid"),
1567 LatestEventValue::LocalIsSending(local_room_message("gryuère")),
1568 );
1569
1570 assert!(buffer.last().is_some());
1571
1572 buffer.remove(0);
1573
1574 assert!(buffer.last().is_none());
1575 }
1576
1577 #[test]
1578 fn test_mark_cannot_be_sent_from() {
1579 let mut buffer = LatestEventValuesForLocalEvents::new();
1580 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1581 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1582 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1583
1584 buffer.push(
1585 transaction_id_0,
1586 LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1587 );
1588 buffer.push(
1589 transaction_id_1.clone(),
1590 LatestEventValue::LocalIsSending(local_room_message("brigand")),
1591 );
1592 buffer.push(
1593 transaction_id_2,
1594 LatestEventValue::LocalIsSending(local_room_message("raclette")),
1595 );
1596
1597 buffer.mark_cannot_be_sent_from(&transaction_id_1);
1598
1599 assert_eq!(buffer.buffer.len(), 3);
1600 assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
1601 assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1602 assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalCannotBeSent(_));
1603 }
1604
1605 #[test]
1606 fn test_mark_is_sending_from() {
1607 let mut buffer = LatestEventValuesForLocalEvents::new();
1608 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1609 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1610 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1611
1612 buffer.push(
1613 transaction_id_0,
1614 LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1615 );
1616 buffer.push(
1617 transaction_id_1.clone(),
1618 LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1619 );
1620 buffer.push(
1621 transaction_id_2,
1622 LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1623 );
1624
1625 buffer.mark_is_sending_from(&transaction_id_1);
1626
1627 assert_eq!(buffer.buffer.len(), 3);
1628 assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1629 assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
1630 assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1631 }
1632
1633 #[test]
1634 fn test_mark_is_sending_after() {
1635 let mut buffer = LatestEventValuesForLocalEvents::new();
1636 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1637 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1638 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1639
1640 buffer.push(
1641 transaction_id_0,
1642 LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1643 );
1644 buffer.push(
1645 transaction_id_1.clone(),
1646 LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1647 );
1648 buffer.push(
1649 transaction_id_2,
1650 LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1651 );
1652
1653 buffer.mark_is_sending_after(&transaction_id_1);
1654
1655 assert_eq!(buffer.buffer.len(), 3);
1656 assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1657 assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1658 assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1659 }
1660}
1661
1662#[cfg(all(not(target_family = "wasm"), test))]
1663mod tests_latest_event_value_builder {
1664 use std::sync::Arc;
1665
1666 use assert_matches::assert_matches;
1667 use matrix_sdk_base::{
1668 RoomState,
1669 deserialized_responses::TimelineEventKind,
1670 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1671 store::SerializableEventContent,
1672 };
1673 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1674 use ruma::{
1675 MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedTransactionId, event_id,
1676 events::{
1677 AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
1678 SyncMessageLikeEvent, reaction::ReactionEventContent, relation::Annotation,
1679 room::message::RoomMessageEventContent,
1680 },
1681 room_id, user_id,
1682 };
1683
1684 use super::{
1685 LatestEventValue, LatestEventValueBuilder, LatestEventValuesForLocalEvents,
1686 RemoteLatestEventValue, RoomEventCache, RoomSendQueueUpdate,
1687 };
1688 use crate::{
1689 Client, Error,
1690 client::WeakClient,
1691 room::WeakRoom,
1692 send_queue::{AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle},
1693 test_utils::mocks::MatrixMockServer,
1694 };
1695
1696 macro_rules! assert_remote_value_matches_room_message_with_body {
1697 ( $latest_event_value:expr => with body = $body:expr ) => {
1698 assert_matches!(
1699 $latest_event_value,
1700 LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => {
1701 assert_matches!(
1702 event.deserialize().unwrap(),
1703 AnySyncTimelineEvent::MessageLike(
1704 AnySyncMessageLikeEvent::RoomMessage(
1705 SyncMessageLikeEvent::Original(message_content)
1706 )
1707 ) => {
1708 assert_eq!(message_content.content.body(), $body);
1709 }
1710 );
1711 }
1712 );
1713 };
1714 }
1715
1716 macro_rules! assert_local_value_matches_room_message_with_body {
1717 ( $latest_event_value:expr, $pattern:path => with body = $body:expr ) => {
1718 assert_matches!(
1719 $latest_event_value,
1720 $pattern (local_event) => {
1721 assert_matches!(
1722 local_event.content.deserialize().unwrap(),
1723 AnyMessageLikeEventContent::RoomMessage(message_content) => {
1724 assert_eq!(message_content.body(), $body);
1725 }
1726 );
1727 }
1728 );
1729 };
1730 }
1731
1732 #[async_test]
1733 async fn test_remote_is_scanning_event_backwards_from_event_cache() {
1734 let room_id = room_id!("!r0");
1735 let user_id = user_id!("@mnt_io:matrix.org");
1736 let event_factory = EventFactory::new().sender(user_id).room(room_id);
1737 let event_id_0 = event_id!("$ev0");
1738 let event_id_1 = event_id!("$ev1");
1739 let event_id_2 = event_id!("$ev2");
1740
1741 let server = MatrixMockServer::new().await;
1742 let client = server.client_builder().build().await;
1743
1744 {
1746 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1748
1749 client
1751 .event_cache_store()
1752 .lock()
1753 .await
1754 .expect("Could not acquire the event cache lock")
1755 .as_clean()
1756 .expect("Could not acquire a clean event cache lock")
1757 .handle_linked_chunk_updates(
1758 LinkedChunkId::Room(room_id),
1759 vec![
1760 Update::NewItemsChunk {
1761 previous: None,
1762 new: ChunkIdentifier::new(0),
1763 next: None,
1764 },
1765 Update::PushItems {
1766 at: Position::new(ChunkIdentifier::new(0), 0),
1767 items: vec![
1768 event_factory.text_msg("hello").event_id(event_id_0).into(),
1770 event_factory.text_msg("world").event_id(event_id_1).into(),
1772 event_factory
1774 .room_topic("new room topic")
1775 .event_id(event_id_2)
1776 .into(),
1777 ],
1778 },
1779 ],
1780 )
1781 .await
1782 .unwrap();
1783 }
1784
1785 let event_cache = client.event_cache();
1786 event_cache.subscribe().unwrap();
1787
1788 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1789 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.to_owned());
1790
1791 assert_remote_value_matches_room_message_with_body!(
1792 LatestEventValueBuilder::new_remote(&room_event_cache, &weak_room).await => with body = "world"
1796 );
1797 }
1798
1799 async fn local_prelude() -> (Client, OwnedRoomId, RoomSendQueue, RoomEventCache) {
1800 let room_id = room_id!("!r0").to_owned();
1801
1802 let server = MatrixMockServer::new().await;
1803 let client = server.client_builder().build().await;
1804 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1805 let room = client.get_room(&room_id).unwrap();
1806
1807 let event_cache = client.event_cache();
1808 event_cache.subscribe().unwrap();
1809
1810 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
1811
1812 let send_queue = client.send_queue();
1813 let room_send_queue = send_queue.for_room(room);
1814
1815 (client, room_id, room_send_queue, room_event_cache)
1816 }
1817
1818 fn new_local_echo_content(
1819 room_send_queue: &RoomSendQueue,
1820 transaction_id: &OwnedTransactionId,
1821 body: &str,
1822 ) -> LocalEchoContent {
1823 LocalEchoContent::Event {
1824 serialized_event: SerializableEventContent::new(
1825 &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
1826 )
1827 .unwrap(),
1828 send_handle: SendHandle::new(
1829 room_send_queue.clone(),
1830 transaction_id.clone(),
1831 MilliSecondsSinceUnixEpoch::now(),
1832 ),
1833 send_error: None,
1834 }
1835 }
1836
1837 #[async_test]
1838 async fn test_local_new_local_event() {
1839 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1840
1841 let mut buffer = LatestEventValuesForLocalEvents::new();
1842
1843 {
1845 let transaction_id = OwnedTransactionId::from("txnid0");
1846 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
1847
1848 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1849
1850 assert_local_value_matches_room_message_with_body!(
1852 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1853 LatestEventValue::LocalIsSending => with body = "A"
1854 );
1855 }
1856
1857 {
1859 let transaction_id = OwnedTransactionId::from("txnid1");
1860 let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
1861
1862 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1863
1864 assert_local_value_matches_room_message_with_body!(
1866 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1867 LatestEventValue::LocalIsSending => with body = "B"
1868 );
1869 }
1870
1871 assert_eq!(buffer.buffer.len(), 2);
1872 }
1873
1874 #[async_test]
1875 async fn test_local_new_local_event_when_previous_local_event_cannot_be_sent() {
1876 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1877
1878 let mut buffer = LatestEventValuesForLocalEvents::new();
1879
1880 let transaction_id_0 = {
1882 let transaction_id = OwnedTransactionId::from("txnid0");
1883 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
1884
1885 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1886 transaction_id: transaction_id.clone(),
1887 content,
1888 });
1889
1890 assert_local_value_matches_room_message_with_body!(
1892 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1893 LatestEventValue::LocalIsSending => with body = "A"
1894 );
1895
1896 transaction_id
1897 };
1898
1899 {
1902 let update = RoomSendQueueUpdate::SendError {
1903 transaction_id: transaction_id_0.clone(),
1904 error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
1905 is_recoverable: true,
1906 };
1907
1908 assert_local_value_matches_room_message_with_body!(
1911 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1912 LatestEventValue::LocalCannotBeSent => with body = "A"
1913 );
1914
1915 assert_eq!(buffer.buffer.len(), 1);
1916 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1917 }
1918
1919 {
1923 let transaction_id = OwnedTransactionId::from("txnid1");
1924 let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
1925
1926 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1927
1928 assert_local_value_matches_room_message_with_body!(
1930 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1931 LatestEventValue::LocalCannotBeSent => with body = "B"
1932 );
1933 }
1934
1935 assert_eq!(buffer.buffer.len(), 2);
1936 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1937 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1938 }
1939
1940 #[async_test]
1941 async fn test_local_cancelled_local_event() {
1942 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1943
1944 let mut buffer = LatestEventValuesForLocalEvents::new();
1945 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1946 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1947 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1948
1949 {
1951 for (transaction_id, body) in
1952 [(&transaction_id_0, "A"), (&transaction_id_1, "B"), (&transaction_id_2, "C")]
1953 {
1954 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
1955
1956 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1957 transaction_id: transaction_id.clone(),
1958 content,
1959 });
1960
1961 assert_local_value_matches_room_message_with_body!(
1963 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1964 LatestEventValue::LocalIsSending => with body = body
1965 );
1966 }
1967
1968 assert_eq!(buffer.buffer.len(), 3);
1969 }
1970
1971 {
1974 let update = RoomSendQueueUpdate::CancelledLocalEvent {
1975 transaction_id: transaction_id_1.clone(),
1976 };
1977
1978 assert_local_value_matches_room_message_with_body!(
1981 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1982 LatestEventValue::LocalIsSending => with body = "C"
1983 );
1984
1985 assert_eq!(buffer.buffer.len(), 2);
1986 }
1987
1988 {
1991 let update = RoomSendQueueUpdate::CancelledLocalEvent {
1992 transaction_id: transaction_id_2.clone(),
1993 };
1994
1995 assert_local_value_matches_room_message_with_body!(
1998 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
1999 LatestEventValue::LocalIsSending => with body = "A"
2000 );
2001
2002 assert_eq!(buffer.buffer.len(), 1);
2003 }
2004
2005 {
2010 let update =
2011 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: transaction_id_0 };
2012
2013 assert_matches!(
2015 LatestEventValueBuilder::new_local(
2016 &update,
2017 &mut buffer,
2018 &room_event_cache,
2019 None,
2020 None
2021 )
2022 .await,
2023 LatestEventValue::None
2024 );
2025
2026 assert!(buffer.buffer.is_empty());
2027 }
2028 }
2029
2030 #[async_test]
2031 async fn test_local_sent_event() {
2032 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2033
2034 let mut buffer = LatestEventValuesForLocalEvents::new();
2035 let transaction_id_0 = OwnedTransactionId::from("txnid0");
2036 let transaction_id_1 = OwnedTransactionId::from("txnid1");
2037
2038 {
2040 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2041 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2042
2043 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2044 transaction_id: transaction_id.clone(),
2045 content,
2046 });
2047
2048 assert_local_value_matches_room_message_with_body!(
2050 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2051 LatestEventValue::LocalIsSending => with body = body
2052 );
2053 }
2054
2055 assert_eq!(buffer.buffer.len(), 2);
2056 }
2057
2058 {
2061 let update = RoomSendQueueUpdate::SentEvent {
2062 transaction_id: transaction_id_0.clone(),
2063 event_id: event_id!("$ev0").to_owned(),
2064 };
2065
2066 assert_local_value_matches_room_message_with_body!(
2069 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2070 LatestEventValue::LocalIsSending => with body = "B"
2071 );
2072
2073 assert_eq!(buffer.buffer.len(), 1);
2074 }
2075
2076 {
2079 let update = RoomSendQueueUpdate::SentEvent {
2080 transaction_id: transaction_id_1,
2081 event_id: event_id!("$ev1").to_owned(),
2082 };
2083
2084 assert_local_value_matches_room_message_with_body!(
2086 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2087 LatestEventValue::LocalIsSending => with body = "B"
2088 );
2089
2090 assert!(buffer.buffer.is_empty());
2091 }
2092 }
2093
2094 #[async_test]
2095 async fn test_local_replaced_local_event() {
2096 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2097
2098 let mut buffer = LatestEventValuesForLocalEvents::new();
2099 let transaction_id_0 = OwnedTransactionId::from("txnid0");
2100 let transaction_id_1 = OwnedTransactionId::from("txnid1");
2101
2102 {
2104 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2105 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2106
2107 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2108 transaction_id: transaction_id.clone(),
2109 content,
2110 });
2111
2112 assert_local_value_matches_room_message_with_body!(
2114 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2115 LatestEventValue::LocalIsSending => with body = body
2116 );
2117 }
2118
2119 assert_eq!(buffer.buffer.len(), 2);
2120 }
2121
2122 {
2125 let transaction_id = &transaction_id_0;
2126 let LocalEchoContent::Event { serialized_event: new_content, .. } =
2127 new_local_echo_content(&room_send_queue, transaction_id, "A.")
2128 else {
2129 panic!("oopsy");
2130 };
2131
2132 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
2133 transaction_id: transaction_id.clone(),
2134 new_content,
2135 };
2136
2137 assert_local_value_matches_room_message_with_body!(
2140 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2141 LatestEventValue::LocalIsSending => with body = "B"
2142 );
2143
2144 assert_eq!(buffer.buffer.len(), 2);
2145 }
2146
2147 {
2150 let transaction_id = &transaction_id_1;
2151 let LocalEchoContent::Event { serialized_event: new_content, .. } =
2152 new_local_echo_content(&room_send_queue, transaction_id, "B.")
2153 else {
2154 panic!("oopsy");
2155 };
2156
2157 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
2158 transaction_id: transaction_id.clone(),
2159 new_content,
2160 };
2161
2162 assert_local_value_matches_room_message_with_body!(
2165 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2166 LatestEventValue::LocalIsSending => with body = "B."
2167 );
2168
2169 assert_eq!(buffer.buffer.len(), 2);
2170 }
2171 }
2172
2173 #[async_test]
2174 async fn test_local_replaced_local_event_by_a_non_suitable_event() {
2175 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2176
2177 let mut buffer = LatestEventValuesForLocalEvents::new();
2178 let transaction_id = OwnedTransactionId::from("txnid0");
2179
2180 {
2182 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2183
2184 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2185 transaction_id: transaction_id.clone(),
2186 content,
2187 });
2188
2189 assert_local_value_matches_room_message_with_body!(
2191 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2192 LatestEventValue::LocalIsSending => with body = "A"
2193 );
2194
2195 assert_eq!(buffer.buffer.len(), 1);
2196 }
2197
2198 {
2203 let new_content = SerializableEventContent::new(&AnyMessageLikeEventContent::Reaction(
2204 ReactionEventContent::new(Annotation::new(
2205 event_id!("$ev0").to_owned(),
2206 "+1".to_owned(),
2207 )),
2208 ))
2209 .unwrap();
2210
2211 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
2212 transaction_id: transaction_id.clone(),
2213 new_content,
2214 };
2215
2216 assert_matches!(
2218 LatestEventValueBuilder::new_local(
2219 &update,
2220 &mut buffer,
2221 &room_event_cache,
2222 None,
2223 None
2224 )
2225 .await,
2226 LatestEventValue::None
2227 );
2228
2229 assert_eq!(buffer.buffer.len(), 0);
2230 }
2231 }
2232
2233 #[async_test]
2234 async fn test_local_send_error() {
2235 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2236
2237 let mut buffer = LatestEventValuesForLocalEvents::new();
2238 let transaction_id_0 = OwnedTransactionId::from("txnid0");
2239 let transaction_id_1 = OwnedTransactionId::from("txnid1");
2240
2241 {
2243 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2244 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2245
2246 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2247 transaction_id: transaction_id.clone(),
2248 content,
2249 });
2250
2251 assert_local_value_matches_room_message_with_body!(
2253 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2254 LatestEventValue::LocalIsSending => with body = body
2255 );
2256 }
2257
2258 assert_eq!(buffer.buffer.len(), 2);
2259 }
2260
2261 {
2264 let update = RoomSendQueueUpdate::SendError {
2265 transaction_id: transaction_id_0.clone(),
2266 error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2267 is_recoverable: true,
2268 };
2269
2270 assert_local_value_matches_room_message_with_body!(
2273 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2274 LatestEventValue::LocalCannotBeSent => with body = "B"
2275 );
2276
2277 assert_eq!(buffer.buffer.len(), 2);
2278 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2279 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2280 }
2281
2282 {
2286 let update = RoomSendQueueUpdate::SentEvent {
2287 transaction_id: transaction_id_0.clone(),
2288 event_id: event_id!("$ev0").to_owned(),
2289 };
2290
2291 assert_local_value_matches_room_message_with_body!(
2294 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2295 LatestEventValue::LocalIsSending => with body = "B"
2296 );
2297
2298 assert_eq!(buffer.buffer.len(), 1);
2299 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
2300 }
2301 }
2302
2303 #[async_test]
2304 async fn test_local_retry_event() {
2305 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2306
2307 let mut buffer = LatestEventValuesForLocalEvents::new();
2308 let transaction_id_0 = OwnedTransactionId::from("txnid0");
2309 let transaction_id_1 = OwnedTransactionId::from("txnid1");
2310
2311 {
2313 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2314 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2315
2316 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2317 transaction_id: transaction_id.clone(),
2318 content,
2319 });
2320
2321 assert_local_value_matches_room_message_with_body!(
2323 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2324 LatestEventValue::LocalIsSending => with body = body
2325 );
2326 }
2327
2328 assert_eq!(buffer.buffer.len(), 2);
2329 }
2330
2331 {
2334 let update = RoomSendQueueUpdate::SendError {
2335 transaction_id: transaction_id_0.clone(),
2336 error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2337 is_recoverable: true,
2338 };
2339
2340 assert_local_value_matches_room_message_with_body!(
2343 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2344 LatestEventValue::LocalCannotBeSent => with body = "B"
2345 );
2346
2347 assert_eq!(buffer.buffer.len(), 2);
2348 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2349 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2350 }
2351
2352 {
2355 let update =
2356 RoomSendQueueUpdate::RetryEvent { transaction_id: transaction_id_0.clone() };
2357
2358 assert_local_value_matches_room_message_with_body!(
2361 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2362 LatestEventValue::LocalIsSending => with body = "B"
2363 );
2364
2365 assert_eq!(buffer.buffer.len(), 2);
2366 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
2367 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
2368 }
2369 }
2370
2371 #[async_test]
2372 async fn test_local_media_upload() {
2373 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2374
2375 let mut buffer = LatestEventValuesForLocalEvents::new();
2376 let transaction_id = OwnedTransactionId::from("txnid");
2377
2378 {
2380 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2381
2382 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2383 transaction_id: transaction_id.clone(),
2384 content,
2385 });
2386
2387 assert_local_value_matches_room_message_with_body!(
2389 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await,
2390 LatestEventValue::LocalIsSending => with body = "A"
2391 );
2392
2393 assert_eq!(buffer.buffer.len(), 1);
2394 }
2395
2396 {
2399 let update = RoomSendQueueUpdate::MediaUpload {
2400 related_to: transaction_id,
2401 file: None,
2402 index: 0,
2403 progress: AbstractProgress { current: 0, total: 0 },
2404 };
2405
2406 assert_matches!(
2409 LatestEventValueBuilder::new_local(
2410 &update,
2411 &mut buffer,
2412 &room_event_cache,
2413 None,
2414 None
2415 )
2416 .await,
2417 LatestEventValue::None
2418 );
2419
2420 assert_eq!(buffer.buffer.len(), 1);
2421 }
2422 }
2423
2424 #[async_test]
2425 async fn test_local_fallbacks_to_remote_when_empty() {
2426 let room_id = room_id!("!r0");
2427 let user_id = user_id!("@mnt_io:matrix.org");
2428 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2429 let event_id_0 = event_id!("$ev0");
2430 let event_id_1 = event_id!("$ev1");
2431
2432 let server = MatrixMockServer::new().await;
2433 let client = server.client_builder().build().await;
2434
2435 {
2437 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2439
2440 client
2442 .event_cache_store()
2443 .lock()
2444 .await
2445 .expect("Could not acquire the event cache lock")
2446 .as_clean()
2447 .expect("Could not acquire a clean event cache lock")
2448 .handle_linked_chunk_updates(
2449 LinkedChunkId::Room(room_id),
2450 vec![
2451 Update::NewItemsChunk {
2452 previous: None,
2453 new: ChunkIdentifier::new(0),
2454 next: None,
2455 },
2456 Update::PushItems {
2457 at: Position::new(ChunkIdentifier::new(0), 0),
2458 items: vec![
2459 event_factory.text_msg("hello").event_id(event_id_0).into(),
2460 ],
2461 },
2462 ],
2463 )
2464 .await
2465 .unwrap();
2466 }
2467
2468 let event_cache = client.event_cache();
2469 event_cache.subscribe().unwrap();
2470
2471 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2472
2473 let mut buffer = LatestEventValuesForLocalEvents::new();
2474
2475 assert_remote_value_matches_room_message_with_body!(
2477 LatestEventValueBuilder::new_local(
2478 &RoomSendQueueUpdate::SentEvent {
2480 transaction_id: OwnedTransactionId::from("txnid"),
2481 event_id: event_id_1.to_owned(),
2482 },
2483 &mut buffer,
2484 &room_event_cache,
2485 None,
2486 None,
2487 )
2488 .await
2489 => with body = "hello"
2490 );
2491 }
2492}