1mod builder;
16
17use std::ops::{Deref, DerefMut, Not};
18
19use builder::{BufferOfValuesForLocalEvents, Builder};
20use eyeball::{AsyncLock, ObservableWriteGuard, SharedObservable, Subscriber};
21pub use matrix_sdk_base::latest_event::{
22 LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue,
23};
24use matrix_sdk_base::{RoomInfoNotableUpdateReasons, RoomState};
25use ruma::{EventId, OwnedEventId, UserId, events::room::power_levels::RoomPowerLevels};
26use tracing::{error, info, instrument, trace, warn};
27
28use crate::{Room, event_cache::RoomEventCache, room::WeakRoom, send_queue::RoomSendQueueUpdate};
29
30#[derive(Debug)]
34pub(super) struct LatestEvent {
35 weak_room: WeakRoom,
37
38 _thread_id: Option<OwnedEventId>,
40
41 buffer_of_values_for_local_events: BufferOfValuesForLocalEvents,
45
46 current_value: SharedObservable<LatestEventValue, AsyncLock>,
48}
49
50impl LatestEvent {
51 pub fn new(
52 weak_room: &WeakRoom,
53 thread_id: Option<&EventId>,
54 ) -> With<Self, IsLatestEventValueNone> {
55 let latest_event_value = match thread_id {
56 Some(_thread_id) => LatestEventValue::default(),
57 None => weak_room.get().map(|room| room.latest_event()).unwrap_or_default(),
58 };
59 let is_none = latest_event_value.is_none();
60
61 With {
62 result: Self {
63 weak_room: weak_room.clone(),
64 _thread_id: thread_id.map(ToOwned::to_owned),
65 buffer_of_values_for_local_events: BufferOfValuesForLocalEvents::new(),
66 current_value: SharedObservable::new_async(latest_event_value),
67 },
68 with: is_none,
69 }
70 }
71
72 pub async fn subscribe(&self) -> Subscriber<LatestEventValue, AsyncLock> {
74 self.current_value.subscribe().await
75 }
76
77 #[cfg(test)]
78 pub async fn get(&self) -> LatestEventValue {
79 self.current_value.get().await
80 }
81
82 pub async fn update_with_event_cache(
92 &mut self,
93 room_event_cache: &RoomEventCache,
94 own_user_id: &UserId,
95 power_levels: Option<&RoomPowerLevels>,
96 ) {
97 if self.buffer_of_values_for_local_events.is_empty().not() {
98 return;
102 }
103
104 let current_event = self.current_value.get().await;
105 let new_value =
106 Builder::new_remote(room_event_cache, current_event, own_user_id, power_levels).await;
107
108 trace!(value = ?new_value, "Computed a remote `LatestEventValue`");
109
110 if let Some(new_value) = new_value {
111 self.update(new_value).await;
112 }
113 }
114
115 pub async fn update_with_send_queue(
118 &mut self,
119 send_queue_update: &RoomSendQueueUpdate,
120 room_event_cache: &RoomEventCache,
121 own_user_id: &UserId,
122 power_levels: Option<&RoomPowerLevels>,
123 ) {
124 let current_event = self.current_value.get().await;
125 let new_value = Builder::new_local(
126 send_queue_update,
127 &mut self.buffer_of_values_for_local_events,
128 room_event_cache,
129 current_event,
130 own_user_id,
131 power_levels,
132 )
133 .await;
134
135 trace!(value = ?new_value, "Computed a local `LatestEventValue`");
136
137 if let Some(new_value) = new_value {
138 self.update(new_value).await;
139 }
140 }
141
142 pub async fn update_with_room_info(
144 &mut self,
145 room: Room,
146 reasons: RoomInfoNotableUpdateReasons,
147 ) {
148 if reasons.contains(RoomInfoNotableUpdateReasons::MEMBERSHIP) {
150 let new_value = match room.state() {
151 RoomState::Invited => {
154 if matches!(
166 self.current_value.read().await.deref(),
167 LatestEventValue::RemoteInvite { .. }
168 ) {
169 return;
170 }
171
172 let new_value = Builder::new_remote_for_invite(&room).await;
173
174 trace!(value = ?new_value, "Computed a remote `LatestEventValue` for invite");
175
176 new_value
177 }
178
179 _ => {
180 info!(
181 "Skipping the computation of a remote `LatestEventValue` from a `RoomInfo`"
182 );
183
184 return;
185 }
186 };
187
188 self.update(new_value).await;
189 }
190 }
191
192 async fn update(&mut self, new_value: LatestEventValue) {
200 {
211 let mut guard = self.current_value.write().await;
212 let previous_value = guard.deref();
213
214 let do_update = match (previous_value, &new_value) {
215 (LatestEventValue::None, LatestEventValue::None) => false,
217
218 (_, LatestEventValue::None) | (LatestEventValue::None, _) => true,
220
221 (previous, new) if previous.event_id() == new.event_id() => false,
223
224 (_, _) => true,
226 };
227
228 if do_update {
229 ObservableWriteGuard::set(&mut guard, new_value.clone());
230
231 drop(guard);
233
234 self.store(new_value).await;
235 }
236 }
237 }
238
239 #[instrument(skip_all)]
244 async fn store(&mut self, new_value: LatestEventValue) {
245 let Some(room) = self.weak_room.get() else {
246 warn!(room_id = ?self.weak_room.room_id(), "Cannot store the latest event value because the room cannot be accessed");
247 return;
248 };
249 let result = room
250 .update_and_save_room_info(|mut info| {
251 info.set_latest_event(new_value);
252 (info, RoomInfoNotableUpdateReasons::LATEST_EVENT)
253 })
254 .await;
255 if let Err(error) = result {
256 error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
257 }
258 }
259}
260
261pub(super) struct With<T, W> {
264 result: T,
266
267 with: W,
269}
270
271impl<T, W> With<T, W> {
272 pub fn map<F, O>(this: With<T, W>, f: F) -> With<O, W>
274 where
275 F: FnOnce(T) -> O,
276 {
277 With { result: f(this.result), with: this.with }
278 }
279
280 pub fn inner(this: With<T, W>) -> T {
282 this.result
283 }
284
285 pub fn unzip(this: With<T, W>) -> (T, W) {
287 (this.result, this.with)
288 }
289}
290
291impl<T, W> Deref for With<T, W> {
292 type Target = T;
293
294 fn deref(&self) -> &Self::Target {
295 &self.result
296 }
297}
298
299impl<T, W> DerefMut for With<T, W> {
300 fn deref_mut(&mut self) -> &mut Self::Target {
301 &mut self.result
302 }
303}
304
305pub(super) type IsLatestEventValueNone = bool;
306
307#[cfg(all(not(target_family = "wasm"), test))]
308mod tests_latest_event {
309 use std::ops::Not;
310
311 use assert_matches::assert_matches;
312 use matrix_sdk_base::{
313 RoomInfoNotableUpdateReasons, RoomState,
314 latest_event::RemoteLatestEventValue,
315 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
316 store::{SerializableEventContent, StoreConfig},
317 };
318 use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
319 use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
320 use ruma::{
321 MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
322 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
323 owned_event_id, owned_room_id, room_id, user_id,
324 };
325 use stream_assert::{assert_next_matches, assert_pending};
326 use tokio::task::yield_now;
327
328 use super::{super::local_room_message, LatestEvent, LatestEventValue, With};
329 use crate::{
330 client::WeakClient,
331 room::WeakRoom,
332 send_queue::{LocalEcho, LocalEchoContent, RoomSendQueue, RoomSendQueueUpdate, SendHandle},
333 test_utils::mocks::MatrixMockServer,
334 };
335
336 fn new_local_echo_content(
337 room_send_queue: &RoomSendQueue,
338 transaction_id: &OwnedTransactionId,
339 body: &str,
340 ) -> LocalEchoContent {
341 LocalEchoContent::Event {
342 serialized_event: SerializableEventContent::new(
343 &AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::text_plain(body)),
344 )
345 .unwrap(),
346 send_handle: SendHandle::new(
347 room_send_queue.clone(),
348 transaction_id.clone(),
349 MilliSecondsSinceUnixEpoch::now(),
350 ),
351 send_error: None,
352 }
353 }
354
355 #[async_test]
356 async fn test_new_loads_from_room_info() {
357 let room_id = room_id!("!r0");
358
359 let server = MatrixMockServer::new().await;
360 let client = server.client_builder().build().await;
361 let weak_client = WeakClient::from_client(&client);
362
363 let room = client.base_client().get_or_create_room(room_id, RoomState::Joined);
365 let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
366
367 {
369 let (latest_event, is_none) = With::unzip(LatestEvent::new(&weak_room, None));
370
371 assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
373 assert!(is_none);
374 }
375
376 {
378 room.update_room_info(|mut info| {
379 info.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
380 (info, Default::default())
381 })
382 .await;
383 }
384
385 {
387 let (latest_event, is_none) = With::unzip(LatestEvent::new(&weak_room, None));
388
389 assert_matches!(
391 latest_event.current_value.get().await,
392 LatestEventValue::LocalIsSending(_)
393 );
394 assert!(is_none.not());
395 }
396 }
397
398 #[async_test]
399 async fn test_update_do_not_ignore_none_value() {
400 let room_id = room_id!("!r0");
401
402 let server = MatrixMockServer::new().await;
403 let client = server.client_builder().build().await;
404 let weak_client = WeakClient::from_client(&client);
405
406 client.base_client().get_or_create_room(room_id, RoomState::Joined);
408 let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
409
410 let event_cache = client.event_cache();
412 event_cache.subscribe().unwrap();
413
414 let mut latest_event = LatestEvent::new(&weak_room, None);
415
416 assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
418
419 latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
421
422 assert_matches!(
423 latest_event.current_value.get().await,
424 LatestEventValue::LocalIsSending(_)
425 );
426
427 latest_event.update(LatestEventValue::None).await;
429
430 assert_matches!(latest_event.current_value.get().await, LatestEventValue::None);
431 }
432
433 #[async_test]
434 async fn test_update_ignore_none_if_previous_value_is_none() {
435 let room_id = room_id!("!r0");
436
437 let server = MatrixMockServer::new().await;
438 let client = server.client_builder().build().await;
439 let weak_client = WeakClient::from_client(&client);
440
441 client.base_client().get_or_create_room(room_id, RoomState::Joined);
443 let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
444
445 let mut latest_event = LatestEvent::new(&weak_room, None);
446
447 let mut stream = latest_event.subscribe().await;
448 assert_pending!(stream);
449
450 latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
452 assert_next_matches!(stream, LatestEventValue::LocalIsSending(_));
454
455 latest_event.update(LatestEventValue::None).await;
457 assert_next_matches!(stream, LatestEventValue::None);
459
460 latest_event.update(LatestEventValue::None).await;
462 assert_pending!(stream);
464
465 latest_event.update(LatestEventValue::None).await;
467 assert_pending!(stream);
469
470 latest_event.update(LatestEventValue::LocalIsSending(local_room_message("bar"))).await;
472 assert_next_matches!(stream, LatestEventValue::LocalIsSending(_));
474
475 assert_pending!(stream);
476 }
477
478 #[async_test]
479 async fn test_update_ignore_when_previous_value_has_the_same_event_id() {
480 let room_id = room_id!("!r0");
481 let user_id = user_id!("@mnt_io:matrix.org");
482 let event_factory = EventFactory::new().sender(user_id).room(room_id);
483
484 let server = MatrixMockServer::new().await;
485 let client = server.client_builder().build().await;
486 let weak_client = WeakClient::from_client(&client);
487
488 client.base_client().get_or_create_room(room_id, RoomState::Joined);
490 let weak_room = WeakRoom::new(weak_client, room_id.to_owned());
491
492 let mut latest_event = LatestEvent::new(&weak_room, None);
493
494 let mut stream = latest_event.subscribe().await;
495 assert_pending!(stream);
496
497 latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await;
499 assert_next_matches!(stream, LatestEventValue::LocalIsSending(_));
501
502 let first_event: RemoteLatestEventValue =
504 event_factory.text_msg("A").event_id(event_id!("$ev0")).into();
505 latest_event.update(LatestEventValue::Remote(first_event.clone())).await;
506 assert_next_matches!(stream, LatestEventValue::Remote(_));
508
509 latest_event.update(LatestEventValue::Remote(first_event)).await;
511 assert_pending!(stream);
513
514 let second_event = event_factory.text_msg("A").event_id(event_id!("$ev1")).into();
516 latest_event.update(LatestEventValue::Remote(second_event)).await;
517 assert_next_matches!(stream, LatestEventValue::Remote(_));
519
520 assert_pending!(stream);
521 }
522
523 #[async_test]
524 async fn test_local_has_priority_over_remote() {
525 let room_id = owned_room_id!("!r0");
526 let user_id = user_id!("@mnt_io:matrix.org");
527 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
528
529 let server = MatrixMockServer::new().await;
530 let client = server.client_builder().build().await;
531 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
532 let room = client.get_room(&room_id).unwrap();
533 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
534
535 let event_cache = client.event_cache();
536 event_cache.subscribe().unwrap();
537
538 client
540 .event_cache_store()
541 .lock()
542 .await
543 .expect("Could not acquire the event cache lock")
544 .as_clean()
545 .expect("Could not acquire a clean event cache lock")
546 .handle_linked_chunk_updates(
547 LinkedChunkId::Room(&room_id),
548 vec![
549 Update::NewItemsChunk {
550 previous: None,
551 new: ChunkIdentifier::new(0),
552 next: None,
553 },
554 Update::PushItems {
555 at: Position::new(ChunkIdentifier::new(0), 0),
556 items: vec![event_factory.text_msg("A").event_id(event_id!("$ev0")).into()],
557 },
558 ],
559 )
560 .await
561 .unwrap();
562
563 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
564
565 let send_queue = client.send_queue();
566 let room_send_queue = send_queue.for_room(room);
567
568 let mut latest_event = LatestEvent::new(&weak_room, None);
569
570 {
572 latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
573
574 assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
575 }
576
577 let transaction_id = OwnedTransactionId::from("txnid0");
580
581 {
582 let content = new_local_echo_content(&room_send_queue, &transaction_id, "B");
583
584 let update = RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
585 transaction_id: transaction_id.clone(),
586 content,
587 });
588
589 latest_event.update_with_send_queue(&update, &room_event_cache, user_id, None).await;
590
591 assert_matches!(
592 latest_event.current_value.get().await,
593 LatestEventValue::LocalIsSending(_)
594 );
595 }
596
597 {
601 latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
602
603 assert_matches!(
604 latest_event.current_value.get().await,
605 LatestEventValue::LocalIsSending(_)
606 );
607 }
608
609 {
612 let update = RoomSendQueueUpdate::SentEvent {
613 transaction_id,
614 event_id: owned_event_id!("$ev1"),
615 };
616
617 latest_event.update_with_send_queue(&update, &room_event_cache, user_id, None).await;
618
619 assert_matches!(
620 latest_event.current_value.get().await,
621 LatestEventValue::LocalHasBeenSent { .. }
622 );
623 }
624
625 {
628 latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
629
630 assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_));
631 }
632 }
633
634 #[async_test]
635 async fn test_redacted_latest_event_is_removed() {
636 let room_id = owned_room_id!("!r0");
637 let user_id = user_id!("@mnt_io:matrix.org");
638 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
639
640 let server = MatrixMockServer::new().await;
641 let client = server.client_builder().build().await;
642 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
643 let _room = client.get_room(&room_id).unwrap();
644 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
645
646 let event_cache = client.event_cache();
647 event_cache.subscribe().unwrap();
648
649 let event_id_0 = event_id!("$ev0");
650 let event_id_1 = event_id!("$ev1");
651
652 client
654 .event_cache_store()
655 .lock()
656 .await
657 .expect("Could not acquire the event cache lock")
658 .as_clean()
659 .expect("Could not acquire a clean event cache lock")
660 .handle_linked_chunk_updates(
661 LinkedChunkId::Room(&room_id),
662 vec![
663 Update::NewItemsChunk {
664 previous: None,
665 new: ChunkIdentifier::new(0),
666 next: None,
667 },
668 Update::PushItems {
669 at: Position::new(ChunkIdentifier::new(0), 0),
670 items: vec![
671 event_factory.text_msg("A").event_id(event_id_0).into(),
672 event_factory.text_msg("B").event_id(event_id_1).into(),
673 ],
674 },
675 ],
676 )
677 .await
678 .unwrap();
679
680 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
681
682 let mut latest_event = LatestEvent::new(&weak_room, None);
683
684 {
686 latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
687
688 assert_matches!(
689 latest_event.current_value.get().await,
690 LatestEventValue::Remote(remote) => {
691 assert_eq!(remote.event_id().as_deref(), Some(event_id_1));
692 }
693 );
694 }
695
696 {
698 server
699 .mock_sync()
700 .ok_and_run(&client, |builder| {
701 builder.add_joined_room(
702 JoinedRoomBuilder::new(&room_id)
703 .add_timeline_event(event_factory.redaction(event_id_1)),
704 );
705 })
706 .await;
707
708 yield_now().await;
709
710 latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
711
712 assert_matches!(
713 latest_event.current_value.get().await,
714 LatestEventValue::Remote(remote) => {
715 assert_eq!(remote.event_id().as_deref(), Some(event_id_0));
717 }
718 );
719 }
720 }
721
722 #[async_test]
723 async fn test_store_latest_event_value() {
724 let room_id = owned_room_id!("!r0");
725 let user_id = user_id!("@mnt_io:matrix.org");
726 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
727
728 let server = MatrixMockServer::new().await;
729
730 let store_config =
731 StoreConfig::new(CrossProcessLockConfig::multi_process("cross-process-lock-holder"));
732
733 {
735 let client = server
736 .client_builder()
737 .on_builder(|builder| builder.store_config(store_config.clone()))
738 .build()
739 .await;
740 let mut room_info_notable_update_receiver = client.room_info_notable_update_receiver();
741 let room = client.base_client().get_or_create_room(&room_id, RoomState::Joined);
742 let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.clone());
743
744 let event_cache = client.event_cache();
745 event_cache.subscribe().unwrap();
746
747 client
749 .event_cache_store()
750 .lock()
751 .await
752 .expect("Could not acquire the event cache lock")
753 .as_clean()
754 .expect("Could not acquire a clean event cache lock")
755 .handle_linked_chunk_updates(
756 LinkedChunkId::Room(&room_id),
757 vec![
758 Update::NewItemsChunk {
759 previous: None,
760 new: ChunkIdentifier::new(0),
761 next: None,
762 },
763 Update::PushItems {
764 at: Position::new(ChunkIdentifier::new(0), 0),
765 items: vec![
766 event_factory.text_msg("A").event_id(event_id!("$ev0")).into(),
767 ],
768 },
769 ],
770 )
771 .await
772 .unwrap();
773
774 let (room_event_cache, _) = event_cache.for_room(&room_id).await.unwrap();
775
776 {
778 let latest_event = room.latest_event();
779
780 assert_matches!(latest_event, LatestEventValue::None);
781 }
782
783 {
785 let mut latest_event = LatestEvent::new(&weak_room, None);
786 latest_event.update_with_event_cache(&room_event_cache, user_id, None).await;
787
788 assert_matches!(
789 latest_event.current_value.get().await,
790 LatestEventValue::Remote(_)
791 );
792 }
793
794 {
796 let update = room_info_notable_update_receiver.recv().await.unwrap();
797
798 assert_eq!(update.room_id, room_id);
799 assert!(update.reasons.contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
800 }
801
802 {
804 let latest_event = room.latest_event();
805
806 assert_matches!(latest_event, LatestEventValue::Remote(_));
807 }
808 }
809
810 {
813 let client = server
814 .client_builder()
815 .on_builder(|builder| builder.store_config(store_config))
816 .build()
817 .await;
818 let room = client.get_room(&room_id).unwrap();
819 let latest_event = room.latest_event();
820
821 assert_matches!(latest_event, LatestEventValue::Remote(_));
822 }
823 }
824}