1use std::{iter::once, ops::Not};
16
17use eyeball::{AsyncLock, SharedObservable, Subscriber};
18pub use matrix_sdk_base::latest_event::{
19 LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue,
20};
21use matrix_sdk_base::{
22 deserialized_responses::TimelineEvent, store::SerializableEventContent,
23 RoomInfoNotableUpdateReasons, StateChanges,
24};
25use ruma::{
26 events::{
27 relation::RelationType,
28 room::{member::MembershipState, message::MessageType, power_levels::RoomPowerLevels},
29 AnyMessageLikeEventContent, AnySyncStateEvent, AnySyncTimelineEvent, SyncStateEvent,
30 },
31 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, TransactionId, UserId,
32};
33use tracing::{error, instrument, warn};
34
35use crate::{event_cache::RoomEventCache, room::WeakRoom, send_queue::RoomSendQueueUpdate};
36
37#[derive(Debug)]
41pub(super) struct LatestEvent {
42 weak_room: WeakRoom,
44
45 _thread_id: Option<OwnedEventId>,
47
48 buffer_of_values_for_local_events: LatestEventValuesForLocalEvents,
52
53 current_value: SharedObservable<LatestEventValue, AsyncLock>,
55}
56
57impl LatestEvent {
58 pub(super) async fn new(
59 weak_room: &WeakRoom,
60 thread_id: Option<&EventId>,
61 room_event_cache: &RoomEventCache,
62 ) -> Self {
63 Self {
64 weak_room: weak_room.clone(),
65 _thread_id: thread_id.map(ToOwned::to_owned),
66 buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(),
67 current_value: SharedObservable::new_async(
68 LatestEventValueBuilder::new_remote(room_event_cache, weak_room).await,
69 ),
70 }
71 }
72
73 pub async fn subscribe(&self) -> Subscriber<LatestEventValue, AsyncLock> {
75 self.current_value.subscribe().await
76 }
77
78 pub async fn update_with_event_cache(
88 &mut self,
89 room_event_cache: &RoomEventCache,
90 power_levels: &Option<(&UserId, RoomPowerLevels)>,
91 ) {
92 if self.buffer_of_values_for_local_events.is_empty().not() {
93 return;
97 }
98
99 let new_value =
100 LatestEventValueBuilder::new_remote_with_power_levels(room_event_cache, power_levels)
101 .await;
102
103 self.update(new_value).await;
104 }
105
106 pub async fn update_with_send_queue(
109 &mut self,
110 send_queue_update: &RoomSendQueueUpdate,
111 room_event_cache: &RoomEventCache,
112 power_levels: &Option<(&UserId, RoomPowerLevels)>,
113 ) {
114 let new_value = LatestEventValueBuilder::new_local(
115 send_queue_update,
116 &mut self.buffer_of_values_for_local_events,
117 room_event_cache,
118 power_levels,
119 )
120 .await;
121
122 self.update(new_value).await;
123 }
124
125 async fn update(&mut self, new_value: LatestEventValue) {
128 if let LatestEventValue::None = new_value {
129 } else {
131 self.current_value.set(new_value.clone()).await;
132 self.store(new_value).await;
133 }
134 }
135
136 #[instrument(skip_all)]
141 async fn store(&mut self, new_value: LatestEventValue) {
142 let Some(room) = self.weak_room.get() else {
143 warn!(room_id = ?self.weak_room.room_id(), "Cannot store the latest event value because the room cannot be accessed");
144 return;
145 };
146
147 let mut room_info = room.clone_info();
149 room_info.set_new_latest_event(new_value);
150
151 let mut state_changes = StateChanges::default();
152 state_changes.add_room(room_info.clone());
153
154 let client = room.client();
155
156 let _sync_lock = client.base_client().sync_lock().lock().await;
158
159 if let Err(error) = client.state_store().save_changes(&state_changes).await {
161 error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
162 }
163
164 room.set_room_info(room_info, RoomInfoNotableUpdateReasons::LATEST_EVENT);
166 }
167}
168
169#[cfg(all(not(target_family = "wasm"), test))]
170mod tests_latest_event {
171 use assert_matches::assert_matches;
172 use matrix_sdk_base::{
173 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
174 store::StoreConfig,
175 RoomInfoNotableUpdateReasons, RoomState,
176 };
177 use matrix_sdk_test::{async_test, event_factory::EventFactory};
178 use ruma::{
179 event_id,
180 events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
181 room_id,
182 serde::Raw,
183 user_id, MilliSecondsSinceUnixEpoch, OwnedTransactionId,
184 };
185
186 use super::{LatestEvent, LatestEventValue, LocalLatestEventValue, SerializableEventContent};
187 use crate::{
188 client::WeakClient,
189 room::WeakRoom,
190 send_queue::{LocalEcho, LocalEchoContent, RoomSendQueue, RoomSendQueueUpdate, SendHandle},
191 test_utils::mocks::MatrixMockServer,
192 };
193
194 fn local_room_message(body: &str) -> LocalLatestEventValue {
195 LocalLatestEventValue {
196 timestamp: MilliSecondsSinceUnixEpoch::now(),
197 content: SerializableEventContent::from_raw(
198 Raw::new(&AnyMessageLikeEventContent::RoomMessage(
199 RoomMessageEventContent::text_plain(body),
200 ))
201 .unwrap(),
202 "m.room.message".to_owned(),
203 ),
204 }
205 }
206
207 fn new_local_echo_content(
208 room_send_queue: &RoomSendQueue,
209 transaction_id: &OwnedTransactionId,
210 body: &str,
211 ) -> LocalEchoContent {
212 LocalEchoContent::Event {
213 serialized_event: SerializableEventContent::new(
214 &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
215 )
216 .unwrap(),
217 send_handle: SendHandle::new(
218 room_send_queue.clone(),
219 transaction_id.clone(),
220 MilliSecondsSinceUnixEpoch::now(),
221 ),
222 send_error: None,
223 }
224 }
225
226 #[async_test]
227 async fn test_update_ignores_none_value() {
228 let room_id = room_id!("!r0");
229
230 let server = MatrixMockServer::new().await;
231 let client = server.client_builder().build().await;
232 let weak_client = WeakClient::from_client(&client);
233
234 client.base_client().get_or_create_room(room_id, RoomState::Joined);
236 let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
237
238 let event_cache = client.event_cache();
240 event_cache.subscribe().unwrap();
241
242 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
243
244 let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
245
246 assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
248
249 latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
251
252 assert_matches!(
253 latest_event.current_value.get().await,
254 LatestEventValue::LocalIsSending(_)
255 );
256
257 latest_event.update(LatestEventValue::None).await;
259
260 assert_matches!(
261 latest_event.current_value.get().await,
262 LatestEventValue::LocalIsSending(_)
263 );
264 }
265
266 #[async_test]
267 async fn test_local_has_priority_over_remote() {
268 let room_id = room_id!("!r0").to_owned();
269 let user_id = user_id!("@mnt_io:matrix.org");
270 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
271
272 let server = MatrixMockServer::new().await;
273 let client = server.client_builder().build().await;
274 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
275 let room = client.get_room(&room_id).unwrap();
276 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
277
278 let event_cache = client.event_cache();
279 event_cache.subscribe().unwrap();
280
281 client
283 .event_cache_store()
284 .lock()
285 .await
286 .unwrap()
287 .handle_linked_chunk_updates(
288 LinkedChunkId::Room(&room_id),
289 vec![
290 Update::NewItemsChunk {
291 previous: None,
292 new: ChunkIdentifier::new(0),
293 next: None,
294 },
295 Update::PushItems {
296 at: Position::new(ChunkIdentifier::new(0), 0),
297 items: vec![event_factory.text_msg("A").event_id(event_id!("$ev0")).into()],
298 },
299 ],
300 )
301 .await
302 .unwrap();
303
304 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
305
306 let send_queue = client.send_queue();
307 let room_send_queue = send_queue.for_room(room);
308
309 let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
310
311 {
313 latest_event.update_with_event_cache(&room_event_cache, &None).await;
314
315 assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
316 }
317
318 let transaction_id = OwnedTransactionId::from("txnid0");
321
322 {
323 let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
324
325 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
326 transaction_id: transaction_id.clone(),
327 content,
328 });
329
330 latest_event.update_with_send_queue(&update, &room_event_cache, &None).await;
331
332 assert_matches!(
333 latest_event.current_value.get().await,
334 LatestEventValue::LocalIsSending(_)
335 );
336 }
337
338 {
342 latest_event.update_with_event_cache(&room_event_cache, &None).await;
343
344 assert_matches!(
345 latest_event.current_value.get().await,
346 LatestEventValue::LocalIsSending(_)
347 );
348 }
349
350 {
353 let update = RoomSendQueueUpdate::SentEvent {
354 transaction_id,
355 event_id: event_id!("$ev1").to_owned(),
356 };
357
358 latest_event.update_with_send_queue(&update, &room_event_cache, &None).await;
359
360 assert_matches!(
361 latest_event.current_value.get().await,
362 LatestEventValue::LocalIsSending(_)
363 );
364 }
365
366 {
369 latest_event.update_with_event_cache(&room_event_cache, &None).await;
370
371 assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
372 }
373 }
374
375 #[async_test]
376 async fn test_store_latest_event_value() {
377 let room_id = room_id!("!r0").to_owned();
378 let user_id = user_id!("@mnt_io:matrix.org");
379 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
380
381 let server = MatrixMockServer::new().await;
382
383 let store_config = StoreConfig::new("cross-process-lock-holder".to_owned());
384
385 {
387 let client = server
388 .client_builder()
389 .on_builder(|builder| builder.store_config(store_config.clone()))
390 .build()
391 .await;
392 let mut room_info_notable_update_receiver = client.room_info_notable_update_receiver();
393 let room = client.base_client().get_or_create_room(&room_id, RoomState::Joined);
394 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
395
396 let event_cache = client.event_cache();
397 event_cache.subscribe().unwrap();
398
399 client
401 .event_cache_store()
402 .lock()
403 .await
404 .unwrap()
405 .handle_linked_chunk_updates(
406 LinkedChunkId::Room(&room_id),
407 vec![
408 Update::NewItemsChunk {
409 previous: None,
410 new: ChunkIdentifier::new(0),
411 next: None,
412 },
413 Update::PushItems {
414 at: Position::new(ChunkIdentifier::new(0), 0),
415 items: vec![event_factory
416 .text_msg("A")
417 .event_id(event_id!("$ev0"))
418 .into()],
419 },
420 ],
421 )
422 .await
423 .unwrap();
424
425 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
426
427 {
429 let latest_event = room.new_latest_event();
430
431 assert_matches!(latest_event, LatestEventValue::None);
432 }
433
434 {
436 let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await;
437 latest_event.update_with_event_cache(&room_event_cache, &None).await;
438
439 assert_matches!(
440 latest_event.current_value.get().await,
441 LatestEventValue::Remote(_)
442 );
443 }
444
445 {
447 let update = room_info_notable_update_receiver.recv().await.unwrap();
448
449 assert_eq!(update.room_id, room_id);
450 assert!(update.reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
451 }
452
453 {
455 let latest_event = room.new_latest_event();
456
457 assert_matches!(latest_event, LatestEventValue::Remote(_));
458 }
459 }
460
461 {
464 let client = server
465 .client_builder()
466 .on_builder(|builder| builder.store_config(store_config))
467 .build()
468 .await;
469 let room = client.get_room(&room_id).unwrap();
470 let latest_event = room.new_latest_event();
471
472 assert_matches!(latest_event, LatestEventValue::Remote(_));
473 }
474 }
475}
476
477struct LatestEventValueBuilder;
479
480impl LatestEventValueBuilder {
481 async fn new_remote(
483 room_event_cache: &RoomEventCache,
484 weak_room: &WeakRoom,
485 ) -> LatestEventValue {
486 let room = weak_room.get();
489 let power_levels = match &room {
490 Some(room) => {
491 let power_levels = room.power_levels().await.ok();
492
493 Some(room.own_user_id()).zip(power_levels)
494 }
495
496 None => None,
497 };
498
499 Self::new_remote_with_power_levels(room_event_cache, &power_levels).await
500 }
501
502 async fn new_remote_with_power_levels(
505 room_event_cache: &RoomEventCache,
506 power_levels: &Option<(&UserId, RoomPowerLevels)>,
507 ) -> LatestEventValue {
508 room_event_cache
509 .rfind_map_event_in_memory_by(|event| {
510 filter_timeline_event(event, power_levels).then(|| event.clone())
511 })
512 .await
513 .map(LatestEventValue::Remote)
514 .unwrap_or_default()
515 }
516
517 async fn new_local(
520 send_queue_update: &RoomSendQueueUpdate,
521 buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
522 room_event_cache: &RoomEventCache,
523 power_levels: &Option<(&UserId, RoomPowerLevels)>,
524 ) -> LatestEventValue {
525 use crate::send_queue::{LocalEcho, LocalEchoContent};
526
527 match send_queue_update {
528 RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
532 transaction_id,
533 content: local_echo_content,
534 }) => match local_echo_content {
535 LocalEchoContent::Event { serialized_event: serialized_event_content, .. } => {
536 match serialized_event_content.deserialize() {
537 Ok(content) => {
538 if filter_any_message_like_event_content(content) {
539 let value =
540 LatestEventValue::LocalIsSending(LocalLatestEventValue {
541 timestamp: MilliSecondsSinceUnixEpoch::now(),
542 content: serialized_event_content.clone(),
543 });
544
545 buffer_of_values_for_local_events
546 .push(transaction_id.to_owned(), value.clone());
547
548 value
549 } else {
550 LatestEventValue::None
551 }
552 }
553
554 Err(error) => {
555 error!(?error, "Failed to deserialize an event from `RoomSendQueueUpdate::NewLocalEvent`");
556
557 LatestEventValue::None
558 }
559 }
560 }
561
562 LocalEchoContent::React { .. } => LatestEventValue::None,
563 },
564
565 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
570 if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
571 buffer_of_values_for_local_events.remove(position);
572 }
573
574 Self::new_local_or_remote(
575 buffer_of_values_for_local_events,
576 room_event_cache,
577 power_levels,
578 )
579 .await
580 }
581
582 RoomSendQueueUpdate::SentEvent { transaction_id, .. } => {
591 let position =
592 buffer_of_values_for_local_events.mark_is_sending_after(transaction_id);
593
594 let value = Self::new_local_or_remote(
607 buffer_of_values_for_local_events,
608 room_event_cache,
609 power_levels,
610 )
611 .await;
612
613 if let Some(position) = position {
614 buffer_of_values_for_local_events.remove(position);
615 }
616
617 value
618 }
619
620 RoomSendQueueUpdate::ReplacedLocalEvent {
625 transaction_id,
626 new_content: new_serialized_event_content,
627 } => {
628 if let Some(position) = buffer_of_values_for_local_events.position(transaction_id) {
629 match new_serialized_event_content.deserialize() {
630 Ok(content) => {
631 if filter_any_message_like_event_content(content) {
632 buffer_of_values_for_local_events.replace_content(
633 position,
634 new_serialized_event_content.clone(),
635 );
636 } else {
637 buffer_of_values_for_local_events.remove(position);
638 }
639 }
640
641 Err(error) => {
642 error!(?error, "Failed to deserialize an event from `RoomSendQueueUpdate::ReplacedLocalEvent`");
643
644 return LatestEventValue::None;
645 }
646 }
647 }
648
649 Self::new_local_or_remote(
650 buffer_of_values_for_local_events,
651 room_event_cache,
652 power_levels,
653 )
654 .await
655 }
656
657 RoomSendQueueUpdate::SendError { transaction_id, .. } => {
662 buffer_of_values_for_local_events.mark_cannot_be_sent_from(transaction_id);
663
664 Self::new_local_or_remote(
665 buffer_of_values_for_local_events,
666 room_event_cache,
667 power_levels,
668 )
669 .await
670 }
671
672 RoomSendQueueUpdate::RetryEvent { transaction_id } => {
677 buffer_of_values_for_local_events.mark_is_sending_from(transaction_id);
678
679 Self::new_local_or_remote(
680 buffer_of_values_for_local_events,
681 room_event_cache,
682 power_levels,
683 )
684 .await
685 }
686
687 RoomSendQueueUpdate::MediaUpload { .. } => LatestEventValue::None,
691 }
692 }
693
694 async fn new_local_or_remote(
701 buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents,
702 room_event_cache: &RoomEventCache,
703 power_levels: &Option<(&UserId, RoomPowerLevels)>,
704 ) -> LatestEventValue {
705 if let Some(value) = buffer_of_values_for_local_events.last() {
706 value.clone()
707 } else {
708 Self::new_remote_with_power_levels(room_event_cache, power_levels).await
709 }
710 }
711}
712
713#[derive(Debug)]
753struct LatestEventValuesForLocalEvents {
754 buffer: Vec<(OwnedTransactionId, LatestEventValue)>,
755}
756
757impl LatestEventValuesForLocalEvents {
758 fn new() -> Self {
760 Self { buffer: Vec::with_capacity(2) }
761 }
762
763 fn is_empty(&self) -> bool {
765 self.buffer.is_empty()
766 }
767
768 fn last(&self) -> Option<&LatestEventValue> {
770 self.buffer.last().map(|(_, value)| value)
771 }
772
773 fn position(&self, transaction_id: &TransactionId) -> Option<usize> {
775 self.buffer
776 .iter()
777 .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
778 }
779
780 fn push(&mut self, transaction_id: OwnedTransactionId, value: LatestEventValue) {
787 assert!(
788 matches!(
789 value,
790 LatestEventValue::LocalIsSending(_) | LatestEventValue::LocalCannotBeSent(_)
791 ),
792 "`value` must be either `LocalIsSending` or `LocalCannotBeSent`"
793 );
794
795 self.buffer.push((transaction_id, value));
796 }
797
798 fn replace_content(&mut self, position: usize, new_content: SerializableEventContent) {
808 let (_, value) = self.buffer.get_mut(position).expect("`position` must be valid");
809
810 match value {
811 LatestEventValue::LocalIsSending(LocalLatestEventValue { content, .. }) => {
812 *content = new_content;
813 }
814
815 LatestEventValue::LocalCannotBeSent(LocalLatestEventValue { content, .. }) => {
816 *content = new_content;
817 }
818
819 _ => panic!("`value` must be either `LocalIsSending` or `LocalCannotBeSent`"),
820 }
821 }
822
823 fn remove(&mut self, position: usize) -> (OwnedTransactionId, LatestEventValue) {
829 self.buffer.remove(position)
830 }
831
832 fn mark_cannot_be_sent_from(&mut self, transaction_id: &TransactionId) {
835 let mut values = self.buffer.iter_mut();
836
837 if let Some(first_value_to_wedge) = values
838 .by_ref()
839 .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
840 {
841 for (_, value_to_wedge) in once(first_value_to_wedge).chain(values) {
843 if let LatestEventValue::LocalIsSending(content) = value_to_wedge {
844 *value_to_wedge = LatestEventValue::LocalCannotBeSent(content.clone());
845 }
846 }
847 }
848 }
849
850 fn mark_is_sending_from(&mut self, transaction_id: &TransactionId) {
853 let mut values = self.buffer.iter_mut();
854
855 if let Some(first_value_to_unwedge) = values
856 .by_ref()
857 .find(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
858 {
859 for (_, value_to_unwedge) in once(first_value_to_unwedge).chain(values) {
861 if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
862 *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
863 }
864 }
865 }
866 }
867
868 fn mark_is_sending_after(&mut self, transaction_id: &TransactionId) -> Option<usize> {
875 let mut values = self.buffer.iter_mut();
876
877 if let Some(position) = values
878 .by_ref()
879 .position(|(transaction_id_candidate, _)| transaction_id == transaction_id_candidate)
880 {
881 for (_, value_to_unwedge) in values {
883 if let LatestEventValue::LocalCannotBeSent(content) = value_to_unwedge {
884 *value_to_unwedge = LatestEventValue::LocalIsSending(content.clone());
885 }
886 }
887
888 Some(position)
889 } else {
890 None
891 }
892 }
893}
894
895fn filter_timeline_event(
896 event: &TimelineEvent,
897 power_levels: &Option<(&UserId, RoomPowerLevels)>,
898) -> bool {
899 let event = match event.raw().deserialize() {
902 Ok(event) => event,
903 Err(error) => {
904 error!(
905 ?error,
906 "Failed to deserialize the event when looking for a suitable latest event"
907 );
908
909 return false;
910 }
911 };
912
913 match event {
914 AnySyncTimelineEvent::MessageLike(message_like_event) => {
915 match message_like_event.original_content() {
916 Some(any_message_like_event_content) => {
917 filter_any_message_like_event_content(any_message_like_event_content)
918 }
919
920 None => true,
922 }
923 }
924
925 AnySyncTimelineEvent::State(state) => {
927 if let AnySyncStateEvent::RoomMember(member) = state {
930 if matches!(member.membership(), MembershipState::Knock) {
931 let can_accept_or_decline_knocks = match power_levels {
932 Some((own_user_id, room_power_levels)) => {
933 room_power_levels.user_can_invite(own_user_id)
934 || room_power_levels.user_can_kick(own_user_id)
935 }
936 _ => false,
937 };
938
939 if can_accept_or_decline_knocks {
942 return matches!(member, SyncStateEvent::Original(_));
945 }
946 }
947 }
948
949 false
950 }
951 }
952}
953
954fn filter_any_message_like_event_content(event: AnyMessageLikeEventContent) -> bool {
955 match event {
956 AnyMessageLikeEventContent::RoomMessage(message) => {
957 if let MessageType::VerificationRequest(_) = message.msgtype {
959 return false;
960 }
961
962 let is_replacement = message.relates_to.as_ref().is_some_and(|relates_to| {
968 if let Some(relation_type) = relates_to.rel_type() {
969 relation_type == RelationType::Replacement
970 } else {
971 false
972 }
973 });
974
975 !is_replacement
976 }
977
978 AnyMessageLikeEventContent::UnstablePollStart(_)
979 | AnyMessageLikeEventContent::CallInvite(_)
980 | AnyMessageLikeEventContent::CallNotify(_)
981 | AnyMessageLikeEventContent::Sticker(_) => true,
982
983 AnyMessageLikeEventContent::RoomEncrypted(_) => false,
985
986 _ => false,
988 }
989}
990
991#[cfg(test)]
992mod tests_latest_event_content {
993 use std::ops::Not;
994
995 use matrix_sdk_test::event_factory::EventFactory;
996 use ruma::{event_id, events::room::message::RoomMessageEventContent, user_id};
997
998 use super::filter_timeline_event;
999
1000 macro_rules! assert_latest_event_content {
1001 ( event | $event_factory:ident | $event_builder:block
1002 is a candidate ) => {
1003 assert_latest_event_content!(@_ | $event_factory | $event_builder, true);
1004 };
1005
1006 ( event | $event_factory:ident | $event_builder:block
1007 is not a candidate ) => {
1008 assert_latest_event_content!(@_ | $event_factory | $event_builder, false);
1009 };
1010
1011 ( @_ | $event_factory:ident | $event_builder:block, $expect:literal ) => {
1012 let user_id = user_id!("@mnt_io:matrix.org");
1013 let event_factory = EventFactory::new().sender(user_id);
1014 let event = {
1015 let $event_factory = event_factory;
1016 $event_builder
1017 };
1018
1019 assert_eq!(filter_timeline_event(&event, &None), $expect );
1020 };
1021 }
1022
1023 #[test]
1024 fn test_room_message() {
1025 assert_latest_event_content!(
1026 event | event_factory | { event_factory.text_msg("hello").into_event() }
1027 is a candidate
1028 );
1029 }
1030
1031 #[test]
1032 fn test_redacted() {
1033 assert_latest_event_content!(
1034 event | event_factory | {
1035 event_factory
1036 .redacted(
1037 user_id!("@mnt_io:matrix.org"),
1038 ruma::events::room::message::RedactedRoomMessageEventContent::new(),
1039 )
1040 .into_event()
1041 }
1042 is a candidate
1043 );
1044 }
1045
1046 #[test]
1047 fn test_room_message_replacement() {
1048 assert_latest_event_content!(
1049 event | event_factory | {
1050 event_factory
1051 .text_msg("bonjour")
1052 .edit(event_id!("$ev0"), RoomMessageEventContent::text_plain("hello").into())
1053 .into_event()
1054 }
1055 is not a candidate
1056 );
1057 }
1058
1059 #[test]
1060 fn test_poll() {
1061 assert_latest_event_content!(
1062 event | event_factory | {
1063 event_factory
1064 .poll_start("the people need to know", "comté > gruyère", vec!["yes", "oui"])
1065 .into_event()
1066 }
1067 is a candidate
1068 );
1069 }
1070
1071 #[test]
1072 fn test_call_invite() {
1073 assert_latest_event_content!(
1074 event | event_factory | {
1075 event_factory
1076 .call_invite(
1077 ruma::OwnedVoipId::from("vvooiipp".to_owned()),
1078 ruma::UInt::from(1234u32),
1079 ruma::events::call::SessionDescription::new(
1080 "type".to_owned(),
1081 "sdp".to_owned(),
1082 ),
1083 ruma::VoipVersionId::V1,
1084 )
1085 .into_event()
1086 }
1087 is a candidate
1088 );
1089 }
1090
1091 #[test]
1092 fn test_call_notify() {
1093 assert_latest_event_content!(
1094 event | event_factory | {
1095 event_factory
1096 .call_notify(
1097 "call_id".to_owned(),
1098 ruma::events::call::notify::ApplicationType::Call,
1099 ruma::events::call::notify::NotifyType::Ring,
1100 ruma::events::Mentions::new(),
1101 )
1102 .into_event()
1103 }
1104 is a candidate
1105 );
1106 }
1107
1108 #[test]
1109 fn test_sticker() {
1110 assert_latest_event_content!(
1111 event | event_factory | {
1112 event_factory
1113 .sticker(
1114 "wink wink",
1115 ruma::events::room::ImageInfo::new(),
1116 ruma::OwnedMxcUri::from("mxc://foo/bar"),
1117 )
1118 .into_event()
1119 }
1120 is a candidate
1121 );
1122 }
1123
1124 #[test]
1125 fn test_encrypted_room_message() {
1126 assert_latest_event_content!(
1127 event | event_factory | {
1128 event_factory
1129 .event(ruma::events::room::encrypted::RoomEncryptedEventContent::new(
1130 ruma::events::room::encrypted::EncryptedEventScheme::MegolmV1AesSha2(
1131 ruma::events::room::encrypted::MegolmV1AesSha2ContentInit {
1132 ciphertext: "cipher".to_owned(),
1133 sender_key: "sender_key".to_owned(),
1134 device_id: "device_id".into(),
1135 session_id: "session_id".to_owned(),
1136 }
1137 .into(),
1138 ),
1139 None,
1140 ))
1141 .into_event()
1142 }
1143 is not a candidate
1144 );
1145 }
1146
1147 #[test]
1148 fn test_reaction() {
1149 assert_latest_event_content!(
1151 event | event_factory | { event_factory.reaction(event_id!("$ev0"), "+1").into_event() }
1152 is not a candidate
1153 );
1154 }
1155
1156 #[test]
1157 fn test_state_event() {
1158 assert_latest_event_content!(
1159 event | event_factory | { event_factory.room_topic("new room topic").into_event() }
1160 is not a candidate
1161 );
1162 }
1163
1164 #[test]
1165 fn test_knocked_state_event_without_power_levels() {
1166 assert_latest_event_content!(
1167 event | event_factory | {
1168 event_factory
1169 .member(user_id!("@other_mnt_io:server.name"))
1170 .membership(ruma::events::room::member::MembershipState::Knock)
1171 .into_event()
1172 }
1173 is not a candidate
1174 );
1175 }
1176
1177 #[test]
1178 fn test_knocked_state_event_with_power_levels() {
1179 use ruma::{
1180 events::room::{
1181 member::MembershipState,
1182 power_levels::{RoomPowerLevels, RoomPowerLevelsSource},
1183 },
1184 room_version_rules::AuthorizationRules,
1185 };
1186
1187 let user_id = user_id!("@mnt_io:matrix.org");
1188 let other_user_id = user_id!("@other_mnt_io:server.name");
1189 let event_factory = EventFactory::new().sender(user_id);
1190 let event =
1191 event_factory.member(other_user_id).membership(MembershipState::Knock).into_event();
1192
1193 let mut room_power_levels =
1194 RoomPowerLevels::new(RoomPowerLevelsSource::None, &AuthorizationRules::V1, []);
1195 room_power_levels.users_default = 5.into();
1196
1197 {
1199 let mut room_power_levels = room_power_levels.clone();
1200 room_power_levels.invite = 10.into();
1201 room_power_levels.kick = 10.into();
1202 assert!(
1203 filter_timeline_event(&event, &Some((user_id, room_power_levels))).not(),
1204 "cannot accept, cannot decline",
1205 );
1206 }
1207
1208 {
1210 let mut room_power_levels = room_power_levels.clone();
1211 room_power_levels.invite = 0.into();
1212 room_power_levels.kick = 10.into();
1213 assert!(
1214 filter_timeline_event(&event, &Some((user_id, room_power_levels))),
1215 "can accept, cannot decline",
1216 );
1217 }
1218
1219 {
1221 let mut room_power_levels = room_power_levels.clone();
1222 room_power_levels.invite = 10.into();
1223 room_power_levels.kick = 0.into();
1224 assert!(
1225 filter_timeline_event(&event, &Some((user_id, room_power_levels))),
1226 "cannot accept, can decline",
1227 );
1228 }
1229
1230 {
1232 room_power_levels.invite = 0.into();
1233 room_power_levels.kick = 0.into();
1234 assert!(
1235 filter_timeline_event(&event, &Some((user_id, room_power_levels))),
1236 "can accept, can decline",
1237 );
1238 }
1239 }
1240
1241 #[test]
1242 fn test_room_message_verification_request() {
1243 use ruma::{events::room::message, OwnedDeviceId};
1244
1245 assert_latest_event_content!(
1246 event | event_factory | {
1247 event_factory
1248 .event(RoomMessageEventContent::new(message::MessageType::VerificationRequest(
1249 message::KeyVerificationRequestEventContent::new(
1250 "body".to_owned(),
1251 vec![],
1252 OwnedDeviceId::from("device_id"),
1253 user_id!("@user:server.name").to_owned(),
1254 ),
1255 )))
1256 .into_event()
1257 }
1258 is not a candidate
1259 );
1260 }
1261}
1262
1263#[cfg(test)]
1264mod tests_latest_event_values_for_local_events {
1265 use assert_matches::assert_matches;
1266 use ruma::{
1267 events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
1268 serde::Raw,
1269 MilliSecondsSinceUnixEpoch, OwnedTransactionId,
1270 };
1271 use serde_json::json;
1272
1273 use super::{
1274 LatestEventValue, LatestEventValuesForLocalEvents, LocalLatestEventValue,
1275 RemoteLatestEventValue, SerializableEventContent,
1276 };
1277
1278 fn remote_room_message(body: &str) -> RemoteLatestEventValue {
1279 RemoteLatestEventValue::from_plaintext(
1280 Raw::from_json_string(
1281 json!({
1282 "content": RoomMessageEventContent::text_plain(body),
1283 "type": "m.room.message",
1284 "event_id": "$ev0",
1285 "room_id": "!r0",
1286 "origin_server_ts": 42,
1287 "sender": "@mnt_io:matrix.org",
1288 })
1289 .to_string(),
1290 )
1291 .unwrap(),
1292 )
1293 }
1294
1295 fn local_room_message(body: &str) -> LocalLatestEventValue {
1296 LocalLatestEventValue {
1297 timestamp: MilliSecondsSinceUnixEpoch::now(),
1298 content: SerializableEventContent::from_raw(
1299 Raw::new(&AnyMessageLikeEventContent::RoomMessage(
1300 RoomMessageEventContent::text_plain(body),
1301 ))
1302 .unwrap(),
1303 "m.room.message".to_owned(),
1304 ),
1305 }
1306 }
1307
1308 #[test]
1309 fn test_last() {
1310 let mut buffer = LatestEventValuesForLocalEvents::new();
1311
1312 assert!(buffer.last().is_none());
1313
1314 buffer.push(
1315 OwnedTransactionId::from("txnid"),
1316 LatestEventValue::LocalIsSending(local_room_message("tome")),
1317 );
1318
1319 assert_matches!(buffer.last(), Some(LatestEventValue::LocalIsSending(_)));
1320 }
1321
1322 #[test]
1323 fn test_position() {
1324 let mut buffer = LatestEventValuesForLocalEvents::new();
1325 let transaction_id = OwnedTransactionId::from("txnid");
1326
1327 assert!(buffer.position(&transaction_id).is_none());
1328
1329 buffer.push(
1330 transaction_id.clone(),
1331 LatestEventValue::LocalIsSending(local_room_message("raclette")),
1332 );
1333 buffer.push(
1334 OwnedTransactionId::from("othertxnid"),
1335 LatestEventValue::LocalIsSending(local_room_message("tome")),
1336 );
1337
1338 assert_eq!(buffer.position(&transaction_id), Some(0));
1339 }
1340
1341 #[test]
1342 #[should_panic]
1343 fn test_push_none() {
1344 let mut buffer = LatestEventValuesForLocalEvents::new();
1345
1346 buffer.push(OwnedTransactionId::from("txnid"), LatestEventValue::None);
1347 }
1348
1349 #[test]
1350 #[should_panic]
1351 fn test_push_remote() {
1352 let mut buffer = LatestEventValuesForLocalEvents::new();
1353
1354 buffer.push(
1355 OwnedTransactionId::from("txnid"),
1356 LatestEventValue::Remote(remote_room_message("tome")),
1357 );
1358 }
1359
1360 #[test]
1361 fn test_push_local() {
1362 let mut buffer = LatestEventValuesForLocalEvents::new();
1363
1364 buffer.push(
1365 OwnedTransactionId::from("txnid0"),
1366 LatestEventValue::LocalIsSending(local_room_message("tome")),
1367 );
1368 buffer.push(
1369 OwnedTransactionId::from("txnid1"),
1370 LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1371 );
1372
1373 }
1375
1376 #[test]
1377 fn test_replace_content() {
1378 let mut buffer = LatestEventValuesForLocalEvents::new();
1379
1380 buffer.push(
1381 OwnedTransactionId::from("txnid0"),
1382 LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1383 );
1384
1385 let LocalLatestEventValue { content: new_content, .. } = local_room_message("comté");
1386
1387 buffer.replace_content(0, new_content);
1388
1389 assert_matches!(
1390 buffer.last(),
1391 Some(LatestEventValue::LocalIsSending(local_event)) => {
1392 assert_matches!(
1393 local_event.content.deserialize().unwrap(),
1394 AnyMessageLikeEventContent::RoomMessage(content) => {
1395 assert_eq!(content.body(), "comté");
1396 }
1397 );
1398 }
1399 );
1400 }
1401
1402 #[test]
1403 fn test_remove() {
1404 let mut buffer = LatestEventValuesForLocalEvents::new();
1405
1406 buffer.push(
1407 OwnedTransactionId::from("txnid"),
1408 LatestEventValue::LocalIsSending(local_room_message("gryuère")),
1409 );
1410
1411 assert!(buffer.last().is_some());
1412
1413 buffer.remove(0);
1414
1415 assert!(buffer.last().is_none());
1416 }
1417
1418 #[test]
1419 fn test_mark_cannot_be_sent_from() {
1420 let mut buffer = LatestEventValuesForLocalEvents::new();
1421 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1422 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1423 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1424
1425 buffer.push(
1426 transaction_id_0,
1427 LatestEventValue::LocalIsSending(local_room_message("gruyère")),
1428 );
1429 buffer.push(
1430 transaction_id_1.clone(),
1431 LatestEventValue::LocalIsSending(local_room_message("brigand")),
1432 );
1433 buffer.push(
1434 transaction_id_2,
1435 LatestEventValue::LocalIsSending(local_room_message("raclette")),
1436 );
1437
1438 buffer.mark_cannot_be_sent_from(&transaction_id_1);
1439
1440 assert_eq!(buffer.buffer.len(), 3);
1441 assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
1442 assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1443 assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalCannotBeSent(_));
1444 }
1445
1446 #[test]
1447 fn test_mark_is_sending_from() {
1448 let mut buffer = LatestEventValuesForLocalEvents::new();
1449 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1450 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1451 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1452
1453 buffer.push(
1454 transaction_id_0,
1455 LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1456 );
1457 buffer.push(
1458 transaction_id_1.clone(),
1459 LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1460 );
1461 buffer.push(
1462 transaction_id_2,
1463 LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1464 );
1465
1466 buffer.mark_is_sending_from(&transaction_id_1);
1467
1468 assert_eq!(buffer.buffer.len(), 3);
1469 assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1470 assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
1471 assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1472 }
1473
1474 #[test]
1475 fn test_mark_is_sending_after() {
1476 let mut buffer = LatestEventValuesForLocalEvents::new();
1477 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1478 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1479 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1480
1481 buffer.push(
1482 transaction_id_0,
1483 LatestEventValue::LocalCannotBeSent(local_room_message("gruyère")),
1484 );
1485 buffer.push(
1486 transaction_id_1.clone(),
1487 LatestEventValue::LocalCannotBeSent(local_room_message("brigand")),
1488 );
1489 buffer.push(
1490 transaction_id_2,
1491 LatestEventValue::LocalCannotBeSent(local_room_message("raclette")),
1492 );
1493
1494 buffer.mark_is_sending_after(&transaction_id_1);
1495
1496 assert_eq!(buffer.buffer.len(), 3);
1497 assert_matches!(buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
1498 assert_matches!(buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
1499 assert_matches!(buffer.buffer[2].1, LatestEventValue::LocalIsSending(_));
1500 }
1501}
1502
1503#[cfg(all(not(target_family = "wasm"), test))]
1504mod tests_latest_event_value_builder {
1505 use std::sync::Arc;
1506
1507 use assert_matches::assert_matches;
1508 use matrix_sdk_base::{
1509 deserialized_responses::TimelineEventKind,
1510 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
1511 store::SerializableEventContent,
1512 RoomState,
1513 };
1514 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1515 use ruma::{
1516 event_id,
1517 events::{
1518 reaction::ReactionEventContent, relation::Annotation,
1519 room::message::RoomMessageEventContent, AnyMessageLikeEventContent,
1520 AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent,
1521 },
1522 room_id, user_id, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedTransactionId,
1523 };
1524
1525 use super::{
1526 LatestEventValue, LatestEventValueBuilder, LatestEventValuesForLocalEvents,
1527 RemoteLatestEventValue, RoomEventCache, RoomSendQueueUpdate,
1528 };
1529 use crate::{
1530 client::WeakClient,
1531 room::WeakRoom,
1532 send_queue::{AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle},
1533 test_utils::mocks::MatrixMockServer,
1534 Client, Error,
1535 };
1536
1537 macro_rules! assert_remote_value_matches_room_message_with_body {
1538 ( $latest_event_value:expr => with body = $body:expr ) => {
1539 assert_matches!(
1540 $latest_event_value,
1541 LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => {
1542 assert_matches!(
1543 event.deserialize().unwrap(),
1544 AnySyncTimelineEvent::MessageLike(
1545 AnySyncMessageLikeEvent::RoomMessage(
1546 SyncMessageLikeEvent::Original(message_content)
1547 )
1548 ) => {
1549 assert_eq!(message_content.content.body(), $body);
1550 }
1551 );
1552 }
1553 );
1554 };
1555 }
1556
1557 macro_rules! assert_local_value_matches_room_message_with_body {
1558 ( $latest_event_value:expr, $pattern:path => with body = $body:expr ) => {
1559 assert_matches!(
1560 $latest_event_value,
1561 $pattern (local_event) => {
1562 assert_matches!(
1563 local_event.content.deserialize().unwrap(),
1564 AnyMessageLikeEventContent::RoomMessage(message_content) => {
1565 assert_eq!(message_content.body(), $body);
1566 }
1567 );
1568 }
1569 );
1570 };
1571 }
1572
1573 #[async_test]
1574 async fn test_remote_is_scanning_event_backwards_from_event_cache() {
1575 let room_id = room_id!("!r0");
1576 let user_id = user_id!("@mnt_io:matrix.org");
1577 let event_factory = EventFactory::new().sender(user_id).room(room_id);
1578 let event_id_0 = event_id!("$ev0");
1579 let event_id_1 = event_id!("$ev1");
1580 let event_id_2 = event_id!("$ev2");
1581
1582 let server = MatrixMockServer::new().await;
1583 let client = server.client_builder().build().await;
1584
1585 {
1587 client.base_client().get_or_create_room(room_id, RoomState::Joined);
1589
1590 client
1592 .event_cache_store()
1593 .lock()
1594 .await
1595 .unwrap()
1596 .handle_linked_chunk_updates(
1597 LinkedChunkId::Room(room_id),
1598 vec![
1599 Update::NewItemsChunk {
1600 previous: None,
1601 new: ChunkIdentifier::new(0),
1602 next: None,
1603 },
1604 Update::PushItems {
1605 at: Position::new(ChunkIdentifier::new(0), 0),
1606 items: vec![
1607 event_factory.text_msg("hello").event_id(event_id_0).into(),
1609 event_factory.text_msg("world").event_id(event_id_1).into(),
1611 event_factory
1613 .room_topic("new room topic")
1614 .event_id(event_id_2)
1615 .into(),
1616 ],
1617 },
1618 ],
1619 )
1620 .await
1621 .unwrap();
1622 }
1623
1624 let event_cache = client.event_cache();
1625 event_cache.subscribe().unwrap();
1626
1627 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
1628 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.to_owned());
1629
1630 assert_remote_value_matches_room_message_with_body!(
1631 LatestEventValueBuilder::new_remote(&room_event_cache, &weak_room).await => with body = "world"
1635 );
1636 }
1637
1638 async fn local_prelude() -> (Client, OwnedRoomId, RoomSendQueue, RoomEventCache) {
1639 let room_id = room_id!("!r0").to_owned();
1640
1641 let server = MatrixMockServer::new().await;
1642 let client = server.client_builder().build().await;
1643 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1644 let room = client.get_room(&room_id).unwrap();
1645
1646 let event_cache = client.event_cache();
1647 event_cache.subscribe().unwrap();
1648
1649 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
1650
1651 let send_queue = client.send_queue();
1652 let room_send_queue = send_queue.for_room(room);
1653
1654 (client, room_id, room_send_queue, room_event_cache)
1655 }
1656
1657 fn new_local_echo_content(
1658 room_send_queue: &RoomSendQueue,
1659 transaction_id: &OwnedTransactionId,
1660 body: &str,
1661 ) -> LocalEchoContent {
1662 LocalEchoContent::Event {
1663 serialized_event: SerializableEventContent::new(
1664 &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
1665 )
1666 .unwrap(),
1667 send_handle: SendHandle::new(
1668 room_send_queue.clone(),
1669 transaction_id.clone(),
1670 MilliSecondsSinceUnixEpoch::now(),
1671 ),
1672 send_error: None,
1673 }
1674 }
1675
1676 #[async_test]
1677 async fn test_local_new_local_event() {
1678 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1679
1680 let mut buffer = LatestEventValuesForLocalEvents::new();
1681
1682 {
1684 let transaction_id = OwnedTransactionId::from("txnid0");
1685 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
1686
1687 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1688
1689 assert_local_value_matches_room_message_with_body!(
1691 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1692 LatestEventValue::LocalIsSending => with body = "A"
1693 );
1694 }
1695
1696 {
1698 let transaction_id = OwnedTransactionId::from("txnid1");
1699 let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
1700
1701 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content });
1702
1703 assert_local_value_matches_room_message_with_body!(
1705 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1706 LatestEventValue::LocalIsSending => with body = "B"
1707 );
1708 }
1709
1710 assert_eq!(buffer.buffer.len(), 2);
1711 }
1712
1713 #[async_test]
1714 async fn test_local_cancelled_local_event() {
1715 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1716
1717 let mut buffer = LatestEventValuesForLocalEvents::new();
1718 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1719 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1720 let transaction_id_2 = OwnedTransactionId::from("txnid2");
1721
1722 {
1724 for (transaction_id, body) in
1725 [(&transaction_id_0, "A"), (&transaction_id_1, "B"), (&transaction_id_2, "C")]
1726 {
1727 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
1728
1729 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1730 transaction_id: transaction_id.clone(),
1731 content,
1732 });
1733
1734 assert_local_value_matches_room_message_with_body!(
1736 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1737 LatestEventValue::LocalIsSending => with body = body
1738 );
1739 }
1740
1741 assert_eq!(buffer.buffer.len(), 3);
1742 }
1743
1744 {
1747 let update = RoomSendQueueUpdate::CancelledLocalEvent {
1748 transaction_id: transaction_id_1.clone(),
1749 };
1750
1751 assert_local_value_matches_room_message_with_body!(
1754 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1755 LatestEventValue::LocalIsSending => with body = "C"
1756 );
1757
1758 assert_eq!(buffer.buffer.len(), 2);
1759 }
1760
1761 {
1764 let update = RoomSendQueueUpdate::CancelledLocalEvent {
1765 transaction_id: transaction_id_2.clone(),
1766 };
1767
1768 assert_local_value_matches_room_message_with_body!(
1771 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1772 LatestEventValue::LocalIsSending => with body = "A"
1773 );
1774
1775 assert_eq!(buffer.buffer.len(), 1);
1776 }
1777
1778 {
1783 let update =
1784 RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: transaction_id_0 };
1785
1786 assert_matches!(
1788 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None)
1789 .await,
1790 LatestEventValue::None
1791 );
1792
1793 assert!(buffer.buffer.is_empty());
1794 }
1795 }
1796
1797 #[async_test]
1798 async fn test_local_sent_event() {
1799 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1800
1801 let mut buffer = LatestEventValuesForLocalEvents::new();
1802 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1803 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1804
1805 {
1807 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
1808 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
1809
1810 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1811 transaction_id: transaction_id.clone(),
1812 content,
1813 });
1814
1815 assert_local_value_matches_room_message_with_body!(
1817 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1818 LatestEventValue::LocalIsSending => with body = body
1819 );
1820 }
1821
1822 assert_eq!(buffer.buffer.len(), 2);
1823 }
1824
1825 {
1828 let update = RoomSendQueueUpdate::SentEvent {
1829 transaction_id: transaction_id_0.clone(),
1830 event_id: event_id!("$ev0").to_owned(),
1831 };
1832
1833 assert_local_value_matches_room_message_with_body!(
1836 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1837 LatestEventValue::LocalIsSending => with body = "B"
1838 );
1839
1840 assert_eq!(buffer.buffer.len(), 1);
1841 }
1842
1843 {
1846 let update = RoomSendQueueUpdate::SentEvent {
1847 transaction_id: transaction_id_1,
1848 event_id: event_id!("$ev1").to_owned(),
1849 };
1850
1851 assert_local_value_matches_room_message_with_body!(
1853 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1854 LatestEventValue::LocalIsSending => with body = "B"
1855 );
1856
1857 assert!(buffer.buffer.is_empty());
1858 }
1859 }
1860
1861 #[async_test]
1862 async fn test_local_replaced_local_event() {
1863 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1864
1865 let mut buffer = LatestEventValuesForLocalEvents::new();
1866 let transaction_id_0 = OwnedTransactionId::from("txnid0");
1867 let transaction_id_1 = OwnedTransactionId::from("txnid1");
1868
1869 {
1871 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
1872 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
1873
1874 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1875 transaction_id: transaction_id.clone(),
1876 content,
1877 });
1878
1879 assert_local_value_matches_room_message_with_body!(
1881 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1882 LatestEventValue::LocalIsSending => with body = body
1883 );
1884 }
1885
1886 assert_eq!(buffer.buffer.len(), 2);
1887 }
1888
1889 {
1892 let transaction_id = &transaction_id_0;
1893 let LocalEchoContent::Event { serialized_event: new_content, .. } =
1894 new_local_echo_content(&room_send_queue, transaction_id, "A.")
1895 else {
1896 panic!("oopsy");
1897 };
1898
1899 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
1900 transaction_id: transaction_id.clone(),
1901 new_content,
1902 };
1903
1904 assert_local_value_matches_room_message_with_body!(
1907 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1908 LatestEventValue::LocalIsSending => with body = "B"
1909 );
1910
1911 assert_eq!(buffer.buffer.len(), 2);
1912 }
1913
1914 {
1917 let transaction_id = &transaction_id_1;
1918 let LocalEchoContent::Event { serialized_event: new_content, .. } =
1919 new_local_echo_content(&room_send_queue, transaction_id, "B.")
1920 else {
1921 panic!("oopsy");
1922 };
1923
1924 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
1925 transaction_id: transaction_id.clone(),
1926 new_content,
1927 };
1928
1929 assert_local_value_matches_room_message_with_body!(
1932 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1933 LatestEventValue::LocalIsSending => with body = "B."
1934 );
1935
1936 assert_eq!(buffer.buffer.len(), 2);
1937 }
1938 }
1939
1940 #[async_test]
1941 async fn test_local_replaced_local_event_by_a_non_suitable_event() {
1942 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1943
1944 let mut buffer = LatestEventValuesForLocalEvents::new();
1945 let transaction_id = OwnedTransactionId::from("txnid0");
1946
1947 {
1949 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
1950
1951 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
1952 transaction_id: transaction_id.clone(),
1953 content,
1954 });
1955
1956 assert_local_value_matches_room_message_with_body!(
1958 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
1959 LatestEventValue::LocalIsSending => with body = "A"
1960 );
1961
1962 assert_eq!(buffer.buffer.len(), 1);
1963 }
1964
1965 {
1970 let new_content = SerializableEventContent::new(&AnyMessageLikeEventContent::Reaction(
1971 ReactionEventContent::new(Annotation::new(
1972 event_id!("$ev0").to_owned(),
1973 "+1".to_owned(),
1974 )),
1975 ))
1976 .unwrap();
1977
1978 let update = RoomSendQueueUpdate::ReplacedLocalEvent {
1979 transaction_id: transaction_id.clone(),
1980 new_content,
1981 };
1982
1983 assert_matches!(
1985 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None)
1986 .await,
1987 LatestEventValue::None
1988 );
1989
1990 assert_eq!(buffer.buffer.len(), 0);
1991 }
1992 }
1993
1994 #[async_test]
1995 async fn test_local_send_error() {
1996 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
1997
1998 let mut buffer = LatestEventValuesForLocalEvents::new();
1999 let transaction_id_0 = OwnedTransactionId::from("txnid0");
2000 let transaction_id_1 = OwnedTransactionId::from("txnid1");
2001
2002 {
2004 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2005 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2006
2007 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2008 transaction_id: transaction_id.clone(),
2009 content,
2010 });
2011
2012 assert_local_value_matches_room_message_with_body!(
2014 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2015 LatestEventValue::LocalIsSending => with body = body
2016 );
2017 }
2018
2019 assert_eq!(buffer.buffer.len(), 2);
2020 }
2021
2022 {
2025 let update = RoomSendQueueUpdate::SendError {
2026 transaction_id: transaction_id_0.clone(),
2027 error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2028 is_recoverable: true,
2029 };
2030
2031 assert_local_value_matches_room_message_with_body!(
2034 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2035 LatestEventValue::LocalCannotBeSent => with body = "B"
2036 );
2037
2038 assert_eq!(buffer.buffer.len(), 2);
2039 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2040 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2041 }
2042
2043 {
2047 let update = RoomSendQueueUpdate::SentEvent {
2048 transaction_id: transaction_id_0.clone(),
2049 event_id: event_id!("$ev0").to_owned(),
2050 };
2051
2052 assert_local_value_matches_room_message_with_body!(
2055 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2056 LatestEventValue::LocalIsSending => with body = "B"
2057 );
2058
2059 assert_eq!(buffer.buffer.len(), 1);
2060 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
2061 }
2062 }
2063
2064 #[async_test]
2065 async fn test_local_retry_event() {
2066 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2067
2068 let mut buffer = LatestEventValuesForLocalEvents::new();
2069 let transaction_id_0 = OwnedTransactionId::from("txnid0");
2070 let transaction_id_1 = OwnedTransactionId::from("txnid1");
2071
2072 {
2074 for (transaction_id, body) in [(&transaction_id_0, "A"), (&transaction_id_1, "B")] {
2075 let content = new_local_echo_content(&room_send_queue, transaction_id, body);
2076
2077 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2078 transaction_id: transaction_id.clone(),
2079 content,
2080 });
2081
2082 assert_local_value_matches_room_message_with_body!(
2084 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2085 LatestEventValue::LocalIsSending => with body = body
2086 );
2087 }
2088
2089 assert_eq!(buffer.buffer.len(), 2);
2090 }
2091
2092 {
2095 let update = RoomSendQueueUpdate::SendError {
2096 transaction_id: transaction_id_0.clone(),
2097 error: Arc::new(Error::UnknownError("oopsy".to_owned().into())),
2098 is_recoverable: true,
2099 };
2100
2101 assert_local_value_matches_room_message_with_body!(
2104 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2105 LatestEventValue::LocalCannotBeSent => with body = "B"
2106 );
2107
2108 assert_eq!(buffer.buffer.len(), 2);
2109 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalCannotBeSent(_));
2110 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalCannotBeSent(_));
2111 }
2112
2113 {
2116 let update =
2117 RoomSendQueueUpdate::RetryEvent { transaction_id: transaction_id_0.clone() };
2118
2119 assert_local_value_matches_room_message_with_body!(
2122 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2123 LatestEventValue::LocalIsSending => with body = "B"
2124 );
2125
2126 assert_eq!(buffer.buffer.len(), 2);
2127 assert_matches!(&buffer.buffer[0].1, LatestEventValue::LocalIsSending(_));
2128 assert_matches!(&buffer.buffer[1].1, LatestEventValue::LocalIsSending(_));
2129 }
2130 }
2131
2132 #[async_test]
2133 async fn test_local_media_upload() {
2134 let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await;
2135
2136 let mut buffer = LatestEventValuesForLocalEvents::new();
2137 let transaction_id = OwnedTransactionId::from("txnid");
2138
2139 {
2141 let content = new_local_echo_content(&room_send_queue, &transaction_id, "A");
2142
2143 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2144 transaction_id: transaction_id.clone(),
2145 content,
2146 });
2147
2148 assert_local_value_matches_room_message_with_body!(
2150 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None).await,
2151 LatestEventValue::LocalIsSending => with body = "A"
2152 );
2153
2154 assert_eq!(buffer.buffer.len(), 1);
2155 }
2156
2157 {
2160 let update = RoomSendQueueUpdate::MediaUpload {
2161 related_to: transaction_id,
2162 file: None,
2163 index: 0,
2164 progress: AbstractProgress { current: 0, total: 0 },
2165 };
2166
2167 assert_matches!(
2170 LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, &None)
2171 .await,
2172 LatestEventValue::None
2173 );
2174
2175 assert_eq!(buffer.buffer.len(), 1);
2176 }
2177 }
2178
2179 #[async_test]
2180 async fn test_local_fallbacks_to_remote_when_empty() {
2181 let room_id = room_id!("!r0");
2182 let user_id = user_id!("@mnt_io:matrix.org");
2183 let event_factory = EventFactory::new().sender(user_id).room(room_id);
2184 let event_id_0 = event_id!("$ev0");
2185 let event_id_1 = event_id!("$ev1");
2186
2187 let server = MatrixMockServer::new().await;
2188 let client = server.client_builder().build().await;
2189
2190 {
2192 client.base_client().get_or_create_room(room_id, RoomState::Joined);
2194
2195 client
2197 .event_cache_store()
2198 .lock()
2199 .await
2200 .unwrap()
2201 .handle_linked_chunk_updates(
2202 LinkedChunkId::Room(room_id),
2203 vec![
2204 Update::NewItemsChunk {
2205 previous: None,
2206 new: ChunkIdentifier::new(0),
2207 next: None,
2208 },
2209 Update::PushItems {
2210 at: Position::new(ChunkIdentifier::new(0), 0),
2211 items: vec![event_factory
2212 .text_msg("hello")
2213 .event_id(event_id_0)
2214 .into()],
2215 },
2216 ],
2217 )
2218 .await
2219 .unwrap();
2220 }
2221
2222 let event_cache = client.event_cache();
2223 event_cache.subscribe().unwrap();
2224
2225 let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap();
2226
2227 let mut buffer = LatestEventValuesForLocalEvents::new();
2228
2229 assert_remote_value_matches_room_message_with_body!(
2231 LatestEventValueBuilder::new_local(
2232 &RoomSendQueueUpdate::SentEvent {
2234 transaction_id: OwnedTransactionId::from("txnid"),
2235 event_id: event_id_1.to_owned(),
2236 },
2237 &mut buffer,
2238 &room_event_cache,
2239 &None,
2240 )
2241 .await
2242 => with body = "hello"
2243 );
2244 }
2245}