1use std::{
16 sync::{Arc, Mutex},
17 time::Duration,
18};
19
20use futures_util::{pin_mut, StreamExt as _};
21use matrix_sdk::{
22 room::Room, sleep::sleep, Client, ClientBuildError, SlidingSyncList, SlidingSyncMode,
23};
24use matrix_sdk_base::{
25 deserialized_responses::TimelineEvent, sliding_sync::http, RoomState, StoreError,
26};
27use ruma::{
28 assign,
29 directory::RoomTypeFilter,
30 events::{
31 room::{
32 member::{MembershipState, StrippedRoomMemberEvent},
33 message::SyncRoomMessageEvent,
34 },
35 AnyFullStateEventContent, AnyStateEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
36 FullStateEventContent, StateEventType, TimelineEventType,
37 },
38 html::RemoveReplyFallback,
39 push::Action,
40 serde::Raw,
41 uint, EventId, OwnedEventId, RoomId, UserId,
42};
43use thiserror::Error;
44use tokio::sync::Mutex as AsyncMutex;
45use tracing::{debug, info, instrument, trace, warn};
46
47use crate::{
48 encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
49 sync_service::SyncService,
50 DEFAULT_SANITIZER_MODE,
51};
52
53#[derive(Clone)]
55pub enum NotificationProcessSetup {
56 MultipleProcesses,
65
66 SingleProcess { sync_service: Arc<SyncService> },
74}
75
76pub struct NotificationClient {
82 client: Client,
84
85 parent_client: Client,
87
88 process_setup: NotificationProcessSetup,
90
91 notification_sync_mutex: AsyncMutex<()>,
99
100 encryption_sync_mutex: AsyncMutex<()>,
105}
106
107impl NotificationClient {
108 const CONNECTION_ID: &'static str = "notifications";
109 const LOCK_ID: &'static str = "notifications";
110
111 pub async fn new(
113 parent_client: Client,
114 process_setup: NotificationProcessSetup,
115 ) -> Result<Self, Error> {
116 let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
117
118 Ok(NotificationClient {
119 client,
120 parent_client,
121 notification_sync_mutex: AsyncMutex::new(()),
122 encryption_sync_mutex: AsyncMutex::new(()),
123 process_setup,
124 })
125 }
126
127 #[instrument(skip(self))]
138 pub async fn get_notification(
139 &self,
140 room_id: &RoomId,
141 event_id: &EventId,
142 ) -> Result<Option<NotificationItem>, Error> {
143 match self.get_notification_with_sliding_sync(room_id, event_id).await? {
144 NotificationStatus::Event(event) => Ok(Some(event)),
145 NotificationStatus::EventFilteredOut => Ok(None),
146 NotificationStatus::EventNotFound => {
147 self.get_notification_with_context(room_id, event_id).await
148 }
149 }
150 }
151
152 #[instrument(skip_all)]
160 async fn retry_decryption(
161 &self,
162 room: &Room,
163 raw_event: &Raw<AnySyncTimelineEvent>,
164 ) -> Result<Option<TimelineEvent>, Error> {
165 let event: AnySyncTimelineEvent =
166 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
167
168 if !is_event_encrypted(event.event_type()) {
169 return Ok(None);
170 }
171
172 let _guard = self.encryption_sync_mutex.lock().await;
174
175 let with_locking = WithLocking::from(matches!(
186 self.process_setup,
187 NotificationProcessSetup::MultipleProcesses
188 ));
189
190 let sync_permit_guard = match &self.process_setup {
191 NotificationProcessSetup::MultipleProcesses => {
192 let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
196 sync_permit.lock_owned().await
197 }
198
199 NotificationProcessSetup::SingleProcess { sync_service } => {
200 if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
201 permit_guard
202 } else {
203 let mut wait = 200;
212
213 debug!("Encryption sync running in background");
214 for _ in 0..3 {
215 trace!("waiting for decryption…");
216
217 sleep(Duration::from_millis(wait)).await;
218
219 let new_event = room.decrypt_event(raw_event.cast_ref()).await?;
220
221 match new_event.kind {
222 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
223 utd_info, ..} => {
224 if utd_info.reason.is_missing_room_key() {
225 wait *= 2;
228 } else {
229 debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
230 return Ok(None);
231 }
232 }
233 _ => {
234 trace!("Waiting succeeded and event could be decrypted!");
235 return Ok(Some(new_event));
236 }
237 }
238 }
239
240 debug!("Timeout waiting for the encryption sync to decrypt notification.");
242 return Ok(None);
243 }
244 }
245 };
246
247 let encryption_sync = EncryptionSyncService::new(
248 self.client.clone(),
249 Some((Duration::from_secs(3), Duration::from_secs(4))),
250 with_locking,
251 )
252 .await;
253
254 match encryption_sync {
259 Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
260 Ok(()) => match room.decrypt_event(raw_event.cast_ref()).await {
261 Ok(new_event) => match new_event.kind {
262 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
263 utd_info, ..
264 } => {
265 trace!(
266 "Encryption sync failed to decrypt the event: {:?}",
267 utd_info.reason
268 );
269 Ok(None)
270 }
271 _ => {
272 trace!("Encryption sync managed to decrypt the event.");
273 Ok(Some(new_event))
274 }
275 },
276 Err(err) => {
277 trace!("Encryption sync failed to decrypt the event: {err}");
278 Ok(None)
279 }
280 },
281 Err(err) => {
282 warn!("Encryption sync error: {err:#}");
283 Ok(None)
284 }
285 },
286 Err(err) => {
287 warn!("Encryption sync build error: {err:#}",);
288 Ok(None)
289 }
290 }
291 }
292
293 #[instrument(skip_all)]
312 async fn try_sliding_sync(
313 &self,
314 room_id: &RoomId,
315 event_id: &EventId,
316 ) -> Result<Option<RawNotificationEvent>, Error> {
317 let _guard = self.notification_sync_mutex.lock().await;
320
321 let raw_notification = Arc::new(Mutex::new(None));
326
327 let handler_raw_notification = raw_notification.clone();
328 let target_event_id = event_id.to_owned();
329
330 let timeline_event_handler =
331 self.client.add_event_handler(move |raw: Raw<AnySyncTimelineEvent>| async move {
332 match raw.get_field::<OwnedEventId>("event_id") {
333 Ok(Some(event_id)) => {
334 if event_id == target_event_id {
335 *handler_raw_notification.lock().unwrap() =
338 Some(RawNotificationEvent::Timeline(raw));
339 }
340 }
341 Ok(None) => {
342 warn!("a sync event had no event id");
343 }
344 Err(err) => {
345 warn!("a sync event id couldn't be decoded: {err}");
346 }
347 }
348 });
349
350 let raw_invite = Arc::new(Mutex::new(None));
352
353 let target_event_id = event_id.to_owned();
354 let user_id = self.client.user_id().unwrap().to_owned();
355 let handler_raw_invite = raw_invite.clone();
356 let handler_raw_notification = raw_notification.clone();
357 let stripped_member_handler =
358 self.client.add_event_handler(move |raw: Raw<StrippedRoomMemberEvent>| async move {
359 let deserialized = match raw.deserialize() {
360 Ok(d) => d,
361 Err(err) => {
362 warn!("failed to deserialize raw stripped room member event: {err}");
363 return;
364 }
365 };
366
367 trace!("received a stripped room member event");
368
369 match raw.get_field::<OwnedEventId>("event_id") {
372 Ok(Some(event_id)) => {
373 if event_id == target_event_id {
374 *handler_raw_notification.lock().unwrap() =
377 Some(RawNotificationEvent::Invite(raw));
378 return;
379 }
380 }
381 Ok(None) => {
382 debug!("a room member event had no id");
383 }
384 Err(err) => {
385 debug!("a room member event id couldn't be decoded: {err}");
386 }
387 }
388
389 if deserialized.content.membership == MembershipState::Invite
391 && deserialized.state_key == user_id
392 {
393 debug!("found an invite event for the current user");
394 *handler_raw_invite.lock().unwrap() = Some(RawNotificationEvent::Invite(raw));
398 } else {
399 debug!("not an invite event, or not for the current user");
400 }
401 });
402
403 let required_state = vec![
405 (StateEventType::RoomEncryption, "".to_owned()),
406 (StateEventType::RoomMember, "$LAZY".to_owned()),
407 (StateEventType::RoomMember, "$ME".to_owned()),
408 (StateEventType::RoomCanonicalAlias, "".to_owned()),
409 (StateEventType::RoomName, "".to_owned()),
410 (StateEventType::RoomPowerLevels, "".to_owned()),
411 ];
412
413 let invites = SlidingSyncList::builder("invites")
414 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
415 .timeline_limit(8)
416 .required_state(required_state.clone())
417 .filters(Some(assign!(http::request::ListFilters::default(), {
418 is_invite: Some(true),
419 not_room_types: vec![RoomTypeFilter::Space],
420 })));
421
422 let sync = self
423 .client
424 .sliding_sync(Self::CONNECTION_ID)?
425 .poll_timeout(Duration::from_secs(1))
426 .network_timeout(Duration::from_secs(3))
427 .with_account_data_extension(
428 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
429 )
430 .add_list(invites)
431 .build()
432 .await?;
433
434 sync.subscribe_to_rooms(
435 &[room_id],
436 Some(assign!(http::request::RoomSubscription::default(), {
437 required_state,
438 timeline_limit: uint!(16)
439 })),
440 true,
441 );
442
443 let mut remaining_attempts = 3;
444
445 let stream = sync.sync();
446 pin_mut!(stream);
447
448 loop {
449 if stream.next().await.is_none() {
450 break;
452 }
453
454 if raw_notification.lock().unwrap().is_some() || raw_invite.lock().unwrap().is_some() {
455 break;
457 }
458
459 remaining_attempts -= 1;
460 if remaining_attempts == 0 {
461 break;
463 }
464 }
465
466 self.client.remove_event_handler(stripped_member_handler);
467 self.client.remove_event_handler(timeline_event_handler);
468
469 let mut maybe_event = raw_notification.lock().unwrap().take();
470
471 if maybe_event.is_none() {
472 trace!("we didn't have a non-invite event, looking for invited room now");
473 if let Some(room) = self.client.get_room(room_id) {
474 if room.state() == RoomState::Invited {
475 maybe_event = raw_invite.lock().unwrap().take();
476 } else {
477 debug!("the room isn't in the invited state");
478 }
479 } else {
480 debug!("the room isn't an invite");
481 }
482 }
483
484 let found = if maybe_event.is_some() { "" } else { "not " };
485 trace!("the notification event has been {found}found");
486
487 Ok(maybe_event)
488 }
489
490 pub async fn get_notification_with_sliding_sync(
495 &self,
496 room_id: &RoomId,
497 event_id: &EventId,
498 ) -> Result<NotificationStatus, Error> {
499 let Some(mut raw_event) = self.try_sliding_sync(room_id, event_id).await? else {
500 return Ok(NotificationStatus::EventNotFound);
501 };
502
503 let Some(room) = self.client.get_room(room_id) else { return Err(Error::UnknownRoom) };
505
506 let push_actions = match &raw_event {
507 RawNotificationEvent::Timeline(timeline_event) => {
508 if let Some(mut timeline_event) =
510 self.retry_decryption(&room, timeline_event).await?
511 {
512 let push_actions = timeline_event.push_actions.take();
513 raw_event = RawNotificationEvent::Timeline(timeline_event.into_raw());
514 push_actions
515 } else {
516 room.event_push_actions(timeline_event).await?
517 }
518 }
519 RawNotificationEvent::Invite(invite_event) => {
520 room.event_push_actions(invite_event).await?
522 }
523 };
524
525 if let Some(push_actions) = &push_actions {
526 if !push_actions.iter().any(|a| a.should_notify()) {
527 return Ok(NotificationStatus::EventFilteredOut);
528 }
529 }
530
531 Ok(NotificationStatus::Event(
532 NotificationItem::new(&room, raw_event, push_actions.as_deref(), Vec::new()).await?,
533 ))
534 }
535
536 pub async fn get_notification_with_context(
549 &self,
550 room_id: &RoomId,
551 event_id: &EventId,
552 ) -> Result<Option<NotificationItem>, Error> {
553 info!("fetching notification event with a /context query");
554
555 let Some(room) = self.parent_client.get_room(room_id) else {
557 return Err(Error::UnknownRoom);
558 };
559
560 let response = room.event_with_context(event_id, true, uint!(0), None).await?;
561
562 let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
563 let state_events = response.state;
564
565 if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
566 timeline_event = decrypted_event;
567 }
568
569 if let Some(actions) = timeline_event.push_actions.as_ref() {
570 if !actions.iter().any(|a| a.should_notify()) {
571 return Ok(None);
572 }
573 }
574
575 let push_actions = timeline_event.push_actions.take();
576 Ok(Some(
577 NotificationItem::new(
578 &room,
579 RawNotificationEvent::Timeline(timeline_event.into_raw()),
580 push_actions.as_deref(),
581 state_events,
582 )
583 .await?,
584 ))
585 }
586}
587
588fn is_event_encrypted(event_type: TimelineEventType) -> bool {
589 let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
590
591 #[cfg(feature = "unstable-msc3956")]
592 let is_still_encrypted =
593 is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
594
595 is_still_encrypted
596}
597
598#[derive(Debug)]
599pub enum NotificationStatus {
600 Event(NotificationItem),
601 EventNotFound,
602 EventFilteredOut,
603}
604
605#[derive(Debug)]
610pub enum RawNotificationEvent {
611 Timeline(Raw<AnySyncTimelineEvent>),
613 Invite(Raw<StrippedRoomMemberEvent>),
616}
617
618#[derive(Debug)]
621pub enum NotificationEvent {
622 Timeline(AnySyncTimelineEvent),
624 Invite(StrippedRoomMemberEvent),
626}
627
628impl NotificationEvent {
629 pub fn sender(&self) -> &UserId {
630 match self {
631 NotificationEvent::Timeline(ev) => ev.sender(),
632 NotificationEvent::Invite(ev) => &ev.sender,
633 }
634 }
635}
636
637#[derive(Debug)]
639pub struct NotificationItem {
640 pub event: NotificationEvent,
642
643 pub raw_event: RawNotificationEvent,
645
646 pub sender_display_name: Option<String>,
648 pub sender_avatar_url: Option<String>,
650 pub is_sender_name_ambiguous: bool,
652
653 pub room_computed_display_name: String,
655 pub room_avatar_url: Option<String>,
657 pub room_canonical_alias: Option<String>,
659 pub is_room_encrypted: Option<bool>,
661 pub is_direct_message_room: bool,
663 pub joined_members_count: u64,
665
666 pub is_noisy: Option<bool>,
671 pub has_mention: Option<bool>,
672}
673
674impl NotificationItem {
675 async fn new(
676 room: &Room,
677 raw_event: RawNotificationEvent,
678 push_actions: Option<&[Action]>,
679 state_events: Vec<Raw<AnyStateEvent>>,
680 ) -> Result<Self, Error> {
681 let event = match &raw_event {
682 RawNotificationEvent::Timeline(raw_event) => {
683 let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
684 if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
685 SyncRoomMessageEvent::Original(ev),
686 )) = &mut event
687 {
688 ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
689 }
690 NotificationEvent::Timeline(event)
691 }
692 RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(
693 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
694 ),
695 };
696
697 let sender = match room.state() {
698 RoomState::Invited => room.invite_details().await?.inviter,
699 _ => room.get_member_no_sync(event.sender()).await?,
700 };
701
702 let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
703 match &sender {
704 Some(sender) => (
705 sender.display_name().map(|s| s.to_owned()),
706 sender.avatar_url().map(|s| s.to_string()),
707 sender.name_ambiguous(),
708 ),
709 None => (None, None, false),
710 };
711
712 if sender_display_name.is_none() || sender_avatar_url.is_none() {
713 let sender_id = event.sender();
714 for ev in state_events {
715 let Ok(ev) = ev.deserialize() else {
716 continue;
717 };
718 if ev.sender() != sender_id {
719 continue;
720 }
721 if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
722 content,
723 ..
724 }) = ev.content()
725 {
726 if sender_display_name.is_none() {
727 sender_display_name = content.displayname;
728 }
729 if sender_avatar_url.is_none() {
730 sender_avatar_url = content.avatar_url.map(|url| url.to_string());
731 }
732 }
733 }
734 }
735
736 let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
737 let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
738
739 let item = NotificationItem {
740 event,
741 raw_event,
742 sender_display_name,
743 sender_avatar_url,
744 is_sender_name_ambiguous,
745 room_computed_display_name: room.display_name().await?.to_string(),
746 room_avatar_url: room.avatar_url().map(|s| s.to_string()),
747 room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
748 is_direct_message_room: room.is_direct().await?,
749 is_room_encrypted: room.is_encrypted().await.ok(),
750 joined_members_count: room.joined_members_count(),
751 is_noisy,
752 has_mention,
753 };
754
755 Ok(item)
756 }
757}
758
759#[derive(Debug, Error)]
761pub enum Error {
762 #[error(transparent)]
763 BuildingLocalClient(ClientBuildError),
764
765 #[error("unknown room for a notification")]
767 UnknownRoom,
768
769 #[error("invalid ruma event")]
771 InvalidRumaEvent,
772
773 #[error("the sliding sync response doesn't include the target room")]
776 SlidingSyncEmptyRoom,
777
778 #[error("the event was missing in the `/context` query")]
779 ContextMissingEvent,
780
781 #[error(transparent)]
783 SdkError(#[from] matrix_sdk::Error),
784
785 #[error(transparent)]
787 StoreError(#[from] StoreError),
788}