1use std::{
16 collections::{
17 btree_map::{IntoIter, Iter},
18 BTreeMap,
19 },
20 sync::{Arc, Mutex},
21 time::Duration,
22};
23
24use futures_util::{pin_mut, StreamExt as _};
25use matrix_sdk::{
26 room::Room, sleep::sleep, Client, ClientBuildError, SlidingSyncList, SlidingSyncMode,
27};
28use matrix_sdk_base::{deserialized_responses::TimelineEvent, RoomState, StoreError};
29use ruma::{
30 api::client::sync::sync_events::v5 as http,
31 assign,
32 directory::RoomTypeFilter,
33 events::{
34 room::{
35 join_rules::JoinRule,
36 member::{MembershipState, StrippedRoomMemberEvent},
37 message::{Relation, SyncRoomMessageEvent},
38 },
39 AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
40 AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
41 TimelineEventType,
42 },
43 html::RemoveReplyFallback,
44 push::Action,
45 serde::Raw,
46 uint, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
47};
48use thiserror::Error;
49use tokio::sync::Mutex as AsyncMutex;
50use tracing::{debug, info, instrument, trace, warn};
51
52use crate::{
53 encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
54 sync_service::SyncService,
55 DEFAULT_SANITIZER_MODE,
56};
57
58#[derive(Clone)]
60pub enum NotificationProcessSetup {
61 MultipleProcesses,
70
71 SingleProcess { sync_service: Arc<SyncService> },
79}
80
81pub struct NotificationClient {
87 client: Client,
89
90 parent_client: Client,
92
93 process_setup: NotificationProcessSetup,
95
96 notification_sync_mutex: AsyncMutex<()>,
104
105 encryption_sync_mutex: AsyncMutex<()>,
110}
111
112impl NotificationClient {
113 const CONNECTION_ID: &'static str = "notifications";
114 const LOCK_ID: &'static str = "notifications";
115
116 pub async fn new(
118 parent_client: Client,
119 process_setup: NotificationProcessSetup,
120 ) -> Result<Self, Error> {
121 let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
122
123 Ok(NotificationClient {
124 client,
125 parent_client,
126 notification_sync_mutex: AsyncMutex::new(()),
127 encryption_sync_mutex: AsyncMutex::new(()),
128 process_setup,
129 })
130 }
131
132 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
136 self.client.get_room(room_id)
137 }
138
139 #[instrument(skip(self))]
150 pub async fn get_notification(
151 &self,
152 room_id: &RoomId,
153 event_id: &EventId,
154 ) -> Result<Option<NotificationItem>, Error> {
155 match self.get_notification_with_sliding_sync(room_id, event_id).await? {
156 NotificationStatus::Event(event) => Ok(Some(*event)),
157 NotificationStatus::EventFilteredOut => Ok(None),
158 NotificationStatus::EventNotFound => {
159 self.get_notification_with_context(room_id, event_id).await
160 }
161 }
162 }
163
164 pub async fn get_notifications(
178 &self,
179 requests: &[NotificationItemsRequest],
180 ) -> Result<BatchNotificationFetchingResult<NotificationItem>, Error> {
181 let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
182 let mut notification_items = BatchNotificationFetchingResult::new();
183
184 for request in requests {
185 for event_id in &request.event_ids {
186 match notifications.remove(event_id) {
187 Some(Ok(NotificationStatus::Event(item))) => {
188 notification_items.add_notification(event_id.to_owned(), *item);
189 }
190 Some(Ok(NotificationStatus::EventNotFound)) | None => {
191 match self.get_notification_with_context(&request.room_id, event_id).await {
192 Ok(Some(item)) => {
193 notification_items.add_notification(event_id.to_owned(), item)
194 }
195 Ok(None) => (),
197 Err(error) => notification_items
198 .mark_fetching_notification_failed(event_id.to_owned(), error),
199 }
200 }
201 Some(Ok(NotificationStatus::EventFilteredOut)) => (),
203 Some(Err(e)) => {
204 notification_items
205 .mark_fetching_notification_failed(event_id.to_owned(), e);
206 }
207 }
208 }
209 }
210
211 Ok(notification_items)
212 }
213
214 #[instrument(skip_all)]
222 async fn retry_decryption(
223 &self,
224 room: &Room,
225 raw_event: &Raw<AnySyncTimelineEvent>,
226 ) -> Result<Option<TimelineEvent>, Error> {
227 let event: AnySyncTimelineEvent =
228 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
229
230 if !is_event_encrypted(event.event_type()) {
231 return Ok(None);
232 }
233
234 let _guard = self.encryption_sync_mutex.lock().await;
236
237 let with_locking = WithLocking::from(matches!(
248 self.process_setup,
249 NotificationProcessSetup::MultipleProcesses
250 ));
251
252 let push_ctx = room.push_context().await?;
253 let sync_permit_guard = match &self.process_setup {
254 NotificationProcessSetup::MultipleProcesses => {
255 let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
259 sync_permit.lock_owned().await
260 }
261
262 NotificationProcessSetup::SingleProcess { sync_service } => {
263 if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
264 permit_guard
265 } else {
266 let mut wait = 200;
275
276 debug!("Encryption sync running in background");
277 for _ in 0..3 {
278 trace!("waiting for decryption…");
279
280 sleep(Duration::from_millis(wait)).await;
281
282 let new_event =
283 room.decrypt_event(raw_event.cast_ref(), push_ctx.as_ref()).await?;
284
285 match new_event.kind {
286 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
287 utd_info, ..} => {
288 if utd_info.reason.is_missing_room_key() {
289 wait *= 2;
292 } else {
293 debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
294 return Ok(None);
295 }
296 }
297 _ => {
298 trace!("Waiting succeeded and event could be decrypted!");
299 return Ok(Some(new_event));
300 }
301 }
302 }
303
304 debug!("Timeout waiting for the encryption sync to decrypt notification.");
306 return Ok(None);
307 }
308 }
309 };
310
311 let encryption_sync = EncryptionSyncService::new(
312 self.client.clone(),
313 Some((Duration::from_secs(3), Duration::from_secs(4))),
314 with_locking,
315 )
316 .await;
317
318 match encryption_sync {
323 Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
324 Ok(()) => match room.decrypt_event(raw_event.cast_ref(), push_ctx.as_ref()).await {
325 Ok(new_event) => match new_event.kind {
326 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
327 utd_info, ..
328 } => {
329 trace!(
330 "Encryption sync failed to decrypt the event: {:?}",
331 utd_info.reason
332 );
333 Ok(None)
334 }
335 _ => {
336 trace!("Encryption sync managed to decrypt the event.");
337 Ok(Some(new_event))
338 }
339 },
340 Err(err) => {
341 trace!("Encryption sync failed to decrypt the event: {err}");
342 Ok(None)
343 }
344 },
345 Err(err) => {
346 warn!("Encryption sync error: {err:#}");
347 Ok(None)
348 }
349 },
350 Err(err) => {
351 warn!("Encryption sync build error: {err:#}",);
352 Ok(None)
353 }
354 }
355 }
356
357 #[instrument(skip_all)]
376 async fn try_sliding_sync(
377 &self,
378 requests: &[NotificationItemsRequest],
379 ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
380 let _guard = self.notification_sync_mutex.lock().await;
383
384 let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
389
390 let handler_raw_notification = raw_notifications.clone();
391
392 let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
393
394 let timeline_event_handler = self.client.add_event_handler({
395 let requests = requests.clone();
396 move |raw: Raw<AnySyncTimelineEvent>| async move {
397 match &raw.get_field::<OwnedEventId>("event_id") {
398 Ok(Some(event_id)) => {
399 let request =
400 &requests.iter().find(|request| request.event_ids.contains(event_id));
401 if request.is_none() {
402 return;
403 }
404 let room_id = request.unwrap().room_id.clone();
405 for request in requests.iter() {
406 if request.event_ids.contains(event_id) {
407 handler_raw_notification.lock().unwrap().insert(
411 event_id.to_owned(),
412 (room_id, Some(RawNotificationEvent::Timeline(raw))),
413 );
414 return;
415 }
416 }
417 }
418 Ok(None) => {
419 warn!("a sync event had no event id");
420 }
421 Err(err) => {
422 warn!("a sync event id couldn't be decoded: {err}");
423 }
424 }
425 }
426 });
427
428 let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
430
431 let user_id = self.client.user_id().unwrap().to_owned();
432 let handler_raw_invites = raw_invites.clone();
433 let handler_raw_notifications = raw_notifications.clone();
434 let stripped_member_handler = self.client.add_event_handler({
435 let requests = requests.clone();
436 move |raw: Raw<StrippedRoomMemberEvent>| async move {
437 let deserialized = match raw.deserialize() {
438 Ok(d) => d,
439 Err(err) => {
440 warn!("failed to deserialize raw stripped room member event: {err}");
441 return;
442 }
443 };
444
445 trace!("received a stripped room member event");
446
447 match &raw.get_field::<OwnedEventId>("event_id") {
450 Ok(Some(event_id)) => {
451 let request =
452 &requests.iter().find(|request| request.event_ids.contains(event_id));
453 if request.is_none() {
454 return;
455 }
456 let room_id = request.unwrap().room_id.clone();
457
458 handler_raw_notifications.lock().unwrap().insert(
462 event_id.to_owned(),
463 (room_id, Some(RawNotificationEvent::Invite(raw))),
464 );
465 return;
466 }
467 Ok(None) => {
468 debug!("a room member event had no id");
469 }
470 Err(err) => {
471 debug!("a room member event id couldn't be decoded: {err}");
472 }
473 }
474
475 if deserialized.content.membership == MembershipState::Invite
477 && deserialized.state_key == user_id
478 {
479 debug!("found an invite event for the current user");
480 handler_raw_invites
484 .lock()
485 .unwrap()
486 .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
487 } else {
488 debug!("not an invite event, or not for the current user");
489 }
490 }
491 });
492
493 let required_state = vec![
495 (StateEventType::RoomEncryption, "".to_owned()),
496 (StateEventType::RoomMember, "$LAZY".to_owned()),
497 (StateEventType::RoomMember, "$ME".to_owned()),
498 (StateEventType::RoomCanonicalAlias, "".to_owned()),
499 (StateEventType::RoomName, "".to_owned()),
500 (StateEventType::RoomPowerLevels, "".to_owned()),
501 (StateEventType::CallMember, "*".to_owned()),
502 ];
503
504 let invites = SlidingSyncList::builder("invites")
505 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
506 .timeline_limit(8)
507 .required_state(required_state.clone())
508 .filters(Some(assign!(http::request::ListFilters::default(), {
509 is_invite: Some(true),
510 not_room_types: vec![RoomTypeFilter::Space],
511 })));
512
513 let sync = self
514 .client
515 .sliding_sync(Self::CONNECTION_ID)?
516 .poll_timeout(Duration::from_secs(1))
517 .network_timeout(Duration::from_secs(3))
518 .with_account_data_extension(
519 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
520 )
521 .add_list(invites)
522 .build()
523 .await?;
524
525 let room_ids = requests.iter().map(|req| req.room_id.as_ref()).collect::<Vec<_>>();
526 sync.subscribe_to_rooms(
527 &room_ids,
528 Some(assign!(http::request::RoomSubscription::default(), {
529 required_state,
530 timeline_limit: uint!(16)
531 })),
532 true,
533 );
534
535 let mut remaining_attempts = 3;
536
537 let stream = sync.sync();
538 pin_mut!(stream);
539
540 let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
542
543 loop {
544 if stream.next().await.is_none() {
545 break;
547 }
548
549 if raw_notifications.lock().unwrap().len() + raw_invites.lock().unwrap().len()
550 == expected_event_count
551 {
552 break;
554 }
555
556 remaining_attempts -= 1;
557 if remaining_attempts == 0 {
558 break;
560 }
561 }
562
563 self.client.remove_event_handler(stripped_member_handler);
564 self.client.remove_event_handler(timeline_event_handler);
565
566 let mut notifications = raw_notifications.clone().lock().unwrap().clone();
567 let mut missing_event_ids = Vec::new();
568
569 for request in requests.iter() {
571 for event_id in &request.event_ids {
572 if !notifications.contains_key(event_id) {
573 missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
574 }
575 }
576 }
577
578 for (room_id, missing_event_id) in missing_event_ids {
580 trace!("we didn't have a non-invite event, looking for invited room now");
581 if let Some(room) = self.client.get_room(&room_id) {
582 if room.state() == RoomState::Invited {
583 if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
584 notifications.insert(
585 missing_event_id.to_owned(),
586 (room_id.to_owned(), stripped_event),
587 );
588 }
589 } else {
590 debug!("the room isn't in the invited state");
591 }
592 } else {
593 debug!("the room isn't an invite");
594 }
595 }
596
597 let found = if notifications.len() == expected_event_count { "" } else { "not " };
598 trace!("all notification events have{found} been found");
599
600 Ok(notifications)
601 }
602
603 pub async fn get_notification_with_sliding_sync(
604 &self,
605 room_id: &RoomId,
606 event_id: &EventId,
607 ) -> Result<NotificationStatus, Error> {
608 let event_ids = vec![event_id.to_owned()];
609 let request = NotificationItemsRequest { room_id: room_id.to_owned(), event_ids };
610 let mut get_notifications_result =
611 self.get_notifications_with_sliding_sync(&[request]).await?;
612 get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
613 }
614
615 pub async fn get_notifications_with_sliding_sync(
620 &self,
621 requests: &[NotificationItemsRequest],
622 ) -> Result<BatchNotificationFetchingResult<NotificationStatus>, Error> {
623 let raw_events = self.try_sliding_sync(requests).await?;
624
625 let mut result = BatchNotificationFetchingResult::new();
626
627 for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
628 let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
630
631 if let Some(raw_event) = raw_event {
632 let (raw_event, push_actions) = match &raw_event {
633 RawNotificationEvent::Timeline(timeline_event) => {
634 match self.retry_decryption(&room, timeline_event).await {
636 Ok(Some(timeline_event)) => {
637 let push_actions =
638 timeline_event.push_actions().map(ToOwned::to_owned);
639 (
640 RawNotificationEvent::Timeline(timeline_event.into_raw()),
641 push_actions,
642 )
643 }
644 Ok(None) => {
645 match room.event_push_actions(timeline_event).await {
646 Ok(push_actions) => (raw_event.clone(), push_actions),
647 Err(error) => {
648 result.mark_fetching_notification_failed(
650 event_id,
651 error.into(),
652 );
653 continue;
654 }
655 }
656 }
657 Err(error) => {
658 result.mark_fetching_notification_failed(event_id, error);
659 continue;
660 }
661 }
662 }
663 RawNotificationEvent::Invite(invite_event) => {
664 match room.event_push_actions(invite_event).await {
666 Ok(push_actions) => {
667 (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
668 }
669 Err(error) => {
670 result.mark_fetching_notification_failed(event_id, error.into());
671 continue;
672 }
673 }
674 }
675 };
676
677 let should_notify = push_actions
678 .as_ref()
679 .map(|actions| actions.iter().any(|a| a.should_notify()))
680 .unwrap_or(false);
681
682 if !should_notify {
683 result.add_notification(event_id, NotificationStatus::EventFilteredOut);
684 } else {
685 let notification_result = NotificationItem::new(
686 &room,
687 raw_event,
688 push_actions.as_deref(),
689 Vec::new(),
690 )
691 .await
692 .map(|event| NotificationStatus::Event(Box::new(event)));
693
694 match notification_result {
695 Ok(notification_status) => match notification_status {
696 NotificationStatus::Event(event) => {
697 if self.client.is_user_ignored(event.event.sender()).await {
698 result.add_notification(
699 event_id,
700 NotificationStatus::EventFilteredOut,
701 );
702 } else {
703 result.add_notification(
704 event_id,
705 NotificationStatus::Event(event),
706 );
707 }
708 }
709 _ => result.add_notification(event_id, notification_status),
710 },
711 Err(error) => {
712 result.mark_fetching_notification_failed(event_id, error);
713 }
714 }
715 }
716 } else {
717 result.add_notification(event_id, NotificationStatus::EventNotFound);
718 }
719 }
720
721 Ok(result)
722 }
723
724 pub async fn get_notification_with_context(
737 &self,
738 room_id: &RoomId,
739 event_id: &EventId,
740 ) -> Result<Option<NotificationItem>, Error> {
741 info!("fetching notification event with a /context query");
742
743 let Some(room) = self.parent_client.get_room(room_id) else {
745 return Err(Error::UnknownRoom);
746 };
747
748 let response = room.event_with_context(event_id, true, uint!(0), None).await?;
749
750 let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
751 let state_events = response.state;
752
753 if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
754 timeline_event = decrypted_event;
755 }
756
757 if let Some(actions) = timeline_event.push_actions() {
758 if !actions.iter().any(|a| a.should_notify()) {
759 return Ok(None);
760 }
761 }
762
763 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
764 let notification_item = NotificationItem::new(
765 &room,
766 RawNotificationEvent::Timeline(timeline_event.into_raw()),
767 push_actions.as_deref(),
768 state_events,
769 )
770 .await?;
771
772 if self.client.is_user_ignored(notification_item.event.sender()).await {
773 Ok(None)
774 } else {
775 Ok(Some(notification_item))
776 }
777 }
778}
779
780fn is_event_encrypted(event_type: TimelineEventType) -> bool {
781 let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
782
783 #[cfg(feature = "unstable-msc3956")]
784 let is_still_encrypted =
785 is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
786
787 is_still_encrypted
788}
789
790#[derive(Debug)]
791pub enum NotificationStatus {
792 Event(Box<NotificationItem>),
793 EventNotFound,
794 EventFilteredOut,
795}
796
797#[derive(Debug, Clone)]
798pub struct NotificationItemsRequest {
799 pub room_id: OwnedRoomId,
800 pub event_ids: Vec<OwnedEventId>,
801}
802
803#[derive(Default)]
804pub struct BatchNotificationFetchingResult<T> {
805 notifications: BTreeMap<OwnedEventId, Result<T, Error>>,
806}
807
808impl<T> BatchNotificationFetchingResult<T> {
809 pub fn new() -> Self {
810 Self { notifications: BTreeMap::new() }
811 }
812
813 fn add_notification(&mut self, event_id: OwnedEventId, notification: T) {
814 self.notifications.insert(event_id, Ok(notification));
815 }
816
817 fn mark_fetching_notification_failed(&mut self, event_id: OwnedEventId, error: Error) {
818 self.notifications.insert(event_id, Err(error));
819 }
820
821 pub fn remove(&mut self, id: &EventId) -> Option<Result<T, Error>> {
822 self.notifications.remove(id)
823 }
824
825 pub fn iter(&self) -> Iter<'_, OwnedEventId, Result<T, Error>> {
826 self.notifications.iter()
827 }
828}
829
830impl<T> IntoIterator for BatchNotificationFetchingResult<T> {
831 type Item = (OwnedEventId, Result<T, Error>);
832 type IntoIter = IntoIter<OwnedEventId, Result<T, Error>>;
833 fn into_iter(self) -> Self::IntoIter {
834 self.notifications.into_iter()
835 }
836}
837
838#[derive(Debug, Clone)]
843pub enum RawNotificationEvent {
844 Timeline(Raw<AnySyncTimelineEvent>),
846 Invite(Raw<StrippedRoomMemberEvent>),
849}
850
851#[derive(Debug)]
854pub enum NotificationEvent {
855 Timeline(Box<AnySyncTimelineEvent>),
857 Invite(Box<StrippedRoomMemberEvent>),
859}
860
861impl NotificationEvent {
862 pub fn sender(&self) -> &UserId {
863 match self {
864 NotificationEvent::Timeline(ev) => ev.sender(),
865 NotificationEvent::Invite(ev) => &ev.sender,
866 }
867 }
868
869 fn thread_id(&self) -> Option<OwnedEventId> {
872 let NotificationEvent::Timeline(sync_timeline_event) = &self else {
873 return None;
874 };
875 let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
876 return None;
877 };
878 let content = event.original_content()?;
879 match content {
880 AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
881 Relation::Thread(thread) => Some(thread.event_id),
882 _ => None,
883 },
884 _ => None,
885 }
886 }
887}
888
889#[derive(Debug)]
891pub struct NotificationItem {
892 pub event: NotificationEvent,
894
895 pub raw_event: RawNotificationEvent,
897
898 pub sender_display_name: Option<String>,
900 pub sender_avatar_url: Option<String>,
902 pub is_sender_name_ambiguous: bool,
904
905 pub room_computed_display_name: String,
907 pub room_avatar_url: Option<String>,
909 pub room_canonical_alias: Option<String>,
911 pub room_join_rule: JoinRule,
913 pub is_room_encrypted: Option<bool>,
915 pub is_room_public: bool,
917 pub is_direct_message_room: bool,
919 pub joined_members_count: u64,
921
922 pub is_noisy: Option<bool>,
927 pub has_mention: Option<bool>,
928 pub thread_id: Option<OwnedEventId>,
929}
930
931impl NotificationItem {
932 async fn new(
933 room: &Room,
934 raw_event: RawNotificationEvent,
935 push_actions: Option<&[Action]>,
936 state_events: Vec<Raw<AnyStateEvent>>,
937 ) -> Result<Self, Error> {
938 let event = match &raw_event {
939 RawNotificationEvent::Timeline(raw_event) => {
940 let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
941 if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
942 SyncRoomMessageEvent::Original(ev),
943 )) = &mut event
944 {
945 ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
946 }
947 NotificationEvent::Timeline(Box::new(event))
948 }
949 RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
950 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
951 )),
952 };
953
954 let sender = match room.state() {
955 RoomState::Invited => room.invite_details().await?.inviter,
956 _ => room.get_member_no_sync(event.sender()).await?,
957 };
958
959 let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
960 match &sender {
961 Some(sender) => (
962 sender.display_name().map(|s| s.to_owned()),
963 sender.avatar_url().map(|s| s.to_string()),
964 sender.name_ambiguous(),
965 ),
966 None => (None, None, false),
967 };
968
969 if sender_display_name.is_none() || sender_avatar_url.is_none() {
970 let sender_id = event.sender();
971 for ev in state_events {
972 let ev = match ev.deserialize() {
973 Ok(ev) => ev,
974 Err(error) => {
975 warn!(?error, "Failed to deserialize a state event");
976 continue;
977 }
978 };
979 if ev.sender() != sender_id {
980 continue;
981 }
982 if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
983 content,
984 ..
985 }) = ev.content()
986 {
987 if sender_display_name.is_none() {
988 sender_display_name = content.displayname;
989 }
990 if sender_avatar_url.is_none() {
991 sender_avatar_url = content.avatar_url.map(|url| url.to_string());
992 }
993 }
994 }
995 }
996
997 let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
998 let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
999 let thread_id = event.thread_id().clone();
1000
1001 let item = NotificationItem {
1002 event,
1003 raw_event,
1004 sender_display_name,
1005 sender_avatar_url,
1006 is_sender_name_ambiguous,
1007 room_computed_display_name: room.display_name().await?.to_string(),
1008 room_avatar_url: room.avatar_url().map(|s| s.to_string()),
1009 room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
1010 room_join_rule: room.join_rule(),
1011 is_direct_message_room: room.is_direct().await?,
1012 is_room_public: room.is_public(),
1013 is_room_encrypted: room
1014 .latest_encryption_state()
1015 .await
1016 .map(|state| state.is_encrypted())
1017 .ok(),
1018 joined_members_count: room.joined_members_count(),
1019 is_noisy,
1020 has_mention,
1021 thread_id,
1022 };
1023
1024 Ok(item)
1025 }
1026}
1027
1028#[derive(Debug, Error)]
1030pub enum Error {
1031 #[error(transparent)]
1032 BuildingLocalClient(ClientBuildError),
1033
1034 #[error("unknown room for a notification")]
1036 UnknownRoom,
1037
1038 #[error("invalid ruma event")]
1040 InvalidRumaEvent,
1041
1042 #[error("the sliding sync response doesn't include the target room")]
1045 SlidingSyncEmptyRoom,
1046
1047 #[error("the event was missing in the `/context` query")]
1048 ContextMissingEvent,
1049
1050 #[error(transparent)]
1052 SdkError(#[from] matrix_sdk::Error),
1053
1054 #[error(transparent)]
1056 StoreError(#[from] StoreError),
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061 use assert_matches2::assert_let;
1062 use matrix_sdk::test_utils::mocks::MatrixMockServer;
1063 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1064 use ruma::{event_id, room_id, user_id};
1065
1066 use crate::notification_client::{NotificationItem, RawNotificationEvent};
1067
1068 #[async_test]
1069 async fn test_notification_item_returns_thread_id() {
1070 let server = MatrixMockServer::new().await;
1071 let client = server.client_builder().build().await;
1072
1073 let room_id = room_id!("!a:b.c");
1074 let thread_root_event_id = event_id!("$root:b.c");
1075 let message = EventFactory::new()
1076 .room(room_id)
1077 .sender(user_id!("@sender:b.c"))
1078 .text_msg("Threaded")
1079 .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1080 .into_raw_sync();
1081 let room = server.sync_joined_room(&client, room_id).await;
1082
1083 let raw_notification_event = RawNotificationEvent::Timeline(message);
1084 let notification_item =
1085 NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1086 .await
1087 .expect("Could not create notification item");
1088
1089 assert_let!(Some(thread_id) = notification_item.thread_id);
1090 assert_eq!(thread_id, thread_root_event_id);
1091 }
1092}