1#[cfg(any(feature = "anyhow", feature = "eyre"))]
35use std::any::TypeId;
36use std::{
37 borrow::Cow,
38 fmt,
39 future::Future,
40 pin::Pin,
41 sync::{
42 atomic::{AtomicU64, Ordering::SeqCst},
43 Arc, RwLock, Weak,
44 },
45 task::{Context, Poll},
46};
47
48#[cfg(target_family = "wasm")]
49use anymap2::any::CloneAny;
50#[cfg(not(target_family = "wasm"))]
51use anymap2::any::CloneAnySendSync;
52use eyeball::{SharedObservable, Subscriber};
53use futures_core::Stream;
54use futures_util::stream::{FuturesUnordered, StreamExt};
55use matrix_sdk_base::{
56 deserialized_responses::{EncryptionInfo, TimelineEvent},
57 SendOutsideWasm, SyncOutsideWasm,
58};
59use pin_project_lite::pin_project;
60use ruma::{events::AnySyncStateEvent, push::Action, serde::Raw, OwnedRoomId};
61use serde::{de::DeserializeOwned, Deserialize};
62use serde_json::value::RawValue as RawJsonValue;
63use tracing::{debug, error, field::debug, instrument, warn};
64
65use self::maps::EventHandlerMaps;
66use crate::{Client, Room};
67
68mod context;
69mod maps;
70mod static_events;
71
72pub use self::context::{Ctx, EventHandlerContext, RawEvent};
73
74#[cfg(not(target_family = "wasm"))]
75type EventHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
76#[cfg(target_family = "wasm")]
77type EventHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
78
79#[cfg(not(target_family = "wasm"))]
80type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut + Send + Sync;
81#[cfg(target_family = "wasm")]
82type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut;
83
84#[cfg(not(target_family = "wasm"))]
85type AnyMap = anymap2::Map<dyn CloneAnySendSync + Send + Sync>;
86#[cfg(target_family = "wasm")]
87type AnyMap = anymap2::Map<dyn CloneAny>;
88
89#[derive(Default)]
90pub(crate) struct EventHandlerStore {
91 handlers: RwLock<EventHandlerMaps>,
92 context: RwLock<AnyMap>,
93 counter: AtomicU64,
94}
95
96impl EventHandlerStore {
97 pub fn add_handler(&self, handle: EventHandlerHandle, handler_fn: Box<EventHandlerFn>) {
98 self.handlers.write().unwrap().add(handle, handler_fn);
99 }
100
101 pub fn add_context<T>(&self, ctx: T)
102 where
103 T: Clone + Send + Sync + 'static,
104 {
105 self.context.write().unwrap().insert(ctx);
106 }
107
108 pub fn remove(&self, handle: EventHandlerHandle) {
109 self.handlers.write().unwrap().remove(handle);
110 }
111
112 #[cfg(test)]
113 fn len(&self) -> usize {
114 self.handlers.read().unwrap().len()
115 }
116}
117
118#[doc(hidden)]
119#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
120pub enum HandlerKind {
121 GlobalAccountData,
122 RoomAccountData,
123 EphemeralRoomData,
124 Timeline,
125 MessageLike,
126 OriginalMessageLike,
127 RedactedMessageLike,
128 State,
129 OriginalState,
130 RedactedState,
131 StrippedState,
132 ToDevice,
133 Presence,
134}
135
136impl HandlerKind {
137 fn message_like_redacted(redacted: bool) -> Self {
138 if redacted {
139 Self::RedactedMessageLike
140 } else {
141 Self::OriginalMessageLike
142 }
143 }
144
145 fn state_redacted(redacted: bool) -> Self {
146 if redacted {
147 Self::RedactedState
148 } else {
149 Self::OriginalState
150 }
151 }
152}
153
154pub trait SyncEvent {
156 #[doc(hidden)]
157 const KIND: HandlerKind;
158 #[doc(hidden)]
159 const TYPE: Option<&'static str>;
160}
161
162pub(crate) struct EventHandlerWrapper {
163 handler_fn: Box<EventHandlerFn>,
164 pub handler_id: u64,
165}
166
167#[derive(Clone, Debug)]
170pub struct EventHandlerHandle {
171 pub(crate) ev_kind: HandlerKind,
172 pub(crate) ev_type: Option<&'static str>,
173 pub(crate) room_id: Option<OwnedRoomId>,
174 pub(crate) handler_id: u64,
175}
176
177pub trait EventHandler<Ev, Ctx>: Clone + SendOutsideWasm + SyncOutsideWasm + 'static {
211 #[doc(hidden)]
213 type Future: EventHandlerFuture;
214
215 #[doc(hidden)]
222 fn handle_event(self, ev: Ev, data: EventHandlerData<'_>) -> Option<Self::Future>;
223}
224
225#[doc(hidden)]
226pub trait EventHandlerFuture:
227 Future<Output = <Self as EventHandlerFuture>::Output> + SendOutsideWasm + 'static
228{
229 type Output: EventHandlerResult;
230}
231
232impl<T> EventHandlerFuture for T
233where
234 T: Future + SendOutsideWasm + 'static,
235 <T as Future>::Output: EventHandlerResult,
236{
237 type Output = <T as Future>::Output;
238}
239
240#[doc(hidden)]
241#[derive(Debug)]
242pub struct EventHandlerData<'a> {
243 client: Client,
244 room: Option<Room>,
245 raw: &'a RawJsonValue,
246 encryption_info: Option<&'a EncryptionInfo>,
247 push_actions: &'a [Action],
248 handle: EventHandlerHandle,
249}
250
251pub trait EventHandlerResult: Sized {
255 #[doc(hidden)]
256 fn print_error(&self, event_type: Option<&str>);
257}
258
259impl EventHandlerResult for () {
260 fn print_error(&self, _event_type: Option<&str>) {}
261}
262
263impl<E: fmt::Debug + fmt::Display + 'static> EventHandlerResult for Result<(), E> {
264 fn print_error(&self, event_type: Option<&str>) {
265 let msg_fragment = match event_type {
266 Some(event_type) => format!(" for `{event_type}`"),
267 None => "".to_owned(),
268 };
269
270 match self {
271 #[cfg(feature = "anyhow")]
272 Err(e) if TypeId::of::<E>() == TypeId::of::<anyhow::Error>() => {
273 error!("Event handler{msg_fragment} failed: {e:?}");
274 }
275 #[cfg(feature = "eyre")]
276 Err(e) if TypeId::of::<E>() == TypeId::of::<eyre::Report>() => {
277 error!("Event handler{msg_fragment} failed: {e:?}");
278 }
279 Err(e) => {
280 error!("Event handler{msg_fragment} failed: {e}");
281 }
282 Ok(_) => {}
283 }
284 }
285}
286
287#[derive(Deserialize)]
288struct UnsignedDetails {
289 redacted_because: Option<serde::de::IgnoredAny>,
290}
291
292impl Client {
294 pub(crate) fn add_event_handler_impl<Ev, Ctx, H>(
295 &self,
296 handler: H,
297 room_id: Option<OwnedRoomId>,
298 ) -> EventHandlerHandle
299 where
300 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
301 H: EventHandler<Ev, Ctx>,
302 {
303 let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
304 let maybe_fut = serde_json::from_str(data.raw.get())
305 .map(|ev| handler.clone().handle_event(ev, data));
306
307 Box::pin(async move {
308 match maybe_fut {
309 Ok(Some(fut)) => {
310 fut.await.print_error(Ev::TYPE);
311 }
312 Ok(None) => {
313 error!(
314 event_type = Ev::TYPE, event_kind = ?Ev::KIND,
315 "Event handler has an invalid context argument",
316 );
317 }
318 Err(e) => {
319 warn!(
320 event_type = Ev::TYPE, event_kind = ?Ev::KIND,
321 "Failed to deserialize event, skipping event handler.\n
322 Deserialization error: {e}",
323 );
324 }
325 }
326 })
327 });
328
329 let handler_id = self.inner.event_handlers.counter.fetch_add(1, SeqCst);
330 let handle =
331 EventHandlerHandle { ev_kind: Ev::KIND, ev_type: Ev::TYPE, room_id, handler_id };
332
333 self.inner.event_handlers.add_handler(handle.clone(), handler_fn);
334
335 handle
336 }
337
338 pub(crate) async fn handle_sync_events<T>(
339 &self,
340 kind: HandlerKind,
341 room: Option<&Room>,
342 events: &[Raw<T>],
343 ) -> serde_json::Result<()> {
344 #[derive(Deserialize)]
345 struct ExtractType<'a> {
346 #[serde(borrow, rename = "type")]
347 event_type: Cow<'a, str>,
348 }
349
350 for raw_event in events {
351 let event_type = raw_event.deserialize_as::<ExtractType<'_>>()?.event_type;
352 self.call_event_handlers(room, raw_event.json(), kind, &event_type, None, &[]).await;
353 }
354
355 Ok(())
356 }
357
358 pub(crate) async fn handle_sync_state_events(
359 &self,
360 room: Option<&Room>,
361 state_events: &[Raw<AnySyncStateEvent>],
362 ) -> serde_json::Result<()> {
363 #[derive(Deserialize)]
364 struct StateEventDetails<'a> {
365 #[serde(borrow, rename = "type")]
366 event_type: Cow<'a, str>,
367 unsigned: Option<UnsignedDetails>,
368 }
369
370 self.handle_sync_events(HandlerKind::State, room, state_events).await?;
372
373 for raw_event in state_events {
375 let StateEventDetails { event_type, unsigned } = raw_event.deserialize_as()?;
376 let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
377 let handler_kind = HandlerKind::state_redacted(redacted);
378
379 self.call_event_handlers(room, raw_event.json(), handler_kind, &event_type, None, &[])
380 .await;
381 }
382
383 Ok(())
384 }
385
386 pub(crate) async fn handle_sync_timeline_events(
387 &self,
388 room: Option<&Room>,
389 timeline_events: &[TimelineEvent],
390 ) -> serde_json::Result<()> {
391 #[derive(Deserialize)]
392 struct TimelineEventDetails<'a> {
393 #[serde(borrow, rename = "type")]
394 event_type: Cow<'a, str>,
395 state_key: Option<serde::de::IgnoredAny>,
396 unsigned: Option<UnsignedDetails>,
397 }
398
399 for item in timeline_events {
400 let TimelineEventDetails { event_type, state_key, unsigned } =
401 item.raw().deserialize_as()?;
402
403 let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
404 let (handler_kind_g, handler_kind_r) = match state_key {
405 Some(_) => (HandlerKind::State, HandlerKind::state_redacted(redacted)),
406 None => (HandlerKind::MessageLike, HandlerKind::message_like_redacted(redacted)),
407 };
408
409 let raw_event = item.raw().json();
410 let encryption_info = item.encryption_info().map(|i| &**i);
411 let push_actions = item.push_actions().unwrap_or(&[]);
412
413 self.call_event_handlers(
415 room,
416 raw_event,
417 handler_kind_g,
418 &event_type,
419 encryption_info,
420 push_actions,
421 )
422 .await;
423
424 self.call_event_handlers(
426 room,
427 raw_event,
428 handler_kind_r,
429 &event_type,
430 encryption_info,
431 push_actions,
432 )
433 .await;
434
435 let kind = HandlerKind::Timeline;
437 self.call_event_handlers(
438 room,
439 raw_event,
440 kind,
441 &event_type,
442 encryption_info,
443 push_actions,
444 )
445 .await;
446 }
447
448 Ok(())
449 }
450
451 #[instrument(skip_all, fields(?event_kind, ?event_type, room_id))]
452 async fn call_event_handlers(
453 &self,
454 room: Option<&Room>,
455 raw: &RawJsonValue,
456 event_kind: HandlerKind,
457 event_type: &str,
458 encryption_info: Option<&EncryptionInfo>,
459 push_actions: &[Action],
460 ) {
461 let room_id = room.map(|r| r.room_id());
462 if let Some(room_id) = room_id {
463 tracing::Span::current().record("room_id", debug(room_id));
464 }
465
466 let mut futures: FuturesUnordered<_> = self
468 .inner
469 .event_handlers
470 .handlers
471 .read()
472 .unwrap()
473 .get_handlers(event_kind, event_type, room_id)
474 .map(|(handle, handler_fn)| {
475 let data = EventHandlerData {
476 client: self.clone(),
477 room: room.cloned(),
478 raw,
479 encryption_info,
480 push_actions,
481 handle,
482 };
483
484 (handler_fn)(data)
485 })
486 .collect();
487
488 if !futures.is_empty() {
489 debug!(amount = futures.len(), "Calling event handlers");
490
491 while let Some(()) = futures.next().await {}
494 }
495 }
496}
497
498#[derive(Debug)]
503pub struct EventHandlerDropGuard {
504 handle: EventHandlerHandle,
505 client: Client,
506}
507
508impl EventHandlerDropGuard {
509 pub(crate) fn new(handle: EventHandlerHandle, client: Client) -> Self {
510 Self { handle, client }
511 }
512}
513
514impl Drop for EventHandlerDropGuard {
515 fn drop(&mut self) {
516 self.client.remove_event_handler(self.handle.clone());
517 }
518}
519
520macro_rules! impl_event_handler {
521 ($($ty:ident),* $(,)?) => {
522 impl<Ev, Fun, Fut, $($ty),*> EventHandler<Ev, ($($ty,)*)> for Fun
523 where
524 Ev: SyncEvent,
525 Fun: FnOnce(Ev, $($ty),*) -> Fut + Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
526 Fut: EventHandlerFuture,
527 $($ty: EventHandlerContext),*
528 {
529 type Future = Fut;
530
531 fn handle_event(self, ev: Ev, _d: EventHandlerData<'_>) -> Option<Self::Future> {
532 Some((self)(ev, $($ty::from_data(&_d)?),*))
533 }
534 }
535 };
536}
537
538impl_event_handler!();
539impl_event_handler!(A);
540impl_event_handler!(A, B);
541impl_event_handler!(A, B, C);
542impl_event_handler!(A, B, C, D);
543impl_event_handler!(A, B, C, D, E);
544impl_event_handler!(A, B, C, D, E, F);
545impl_event_handler!(A, B, C, D, E, F, G);
546impl_event_handler!(A, B, C, D, E, F, G, H);
547
548#[derive(Debug)]
556pub struct ObservableEventHandler<T> {
557 shared_observable: SharedObservable<Option<T>>,
562
563 event_handler_guard: Arc<EventHandlerDropGuard>,
570}
571
572impl<T> ObservableEventHandler<T> {
573 pub(crate) fn new(
574 shared_observable: SharedObservable<Option<T>>,
575 event_handler_guard: EventHandlerDropGuard,
576 ) -> Self {
577 Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
578 }
579
580 pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
585 EventHandlerSubscriber::new(
586 self.shared_observable.subscribe(),
587 Arc::downgrade(&self.event_handler_guard),
590 )
591 }
592}
593
594pin_project! {
595 #[derive(Debug)]
604 pub struct EventHandlerSubscriber<T> {
605 #[pin]
611 subscriber: Subscriber<Option<T>>,
612
613 event_handler_guard: Weak<EventHandlerDropGuard>,
619 }
620}
621
622impl<T> EventHandlerSubscriber<T> {
623 fn new(
624 subscriber: Subscriber<Option<T>>,
625 event_handler_handle: Weak<EventHandlerDropGuard>,
626 ) -> Self {
627 Self { subscriber, event_handler_guard: event_handler_handle }
628 }
629}
630
631impl<T> Stream for EventHandlerSubscriber<T>
632where
633 T: Clone,
634{
635 type Item = T;
636
637 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
638 let mut this = self.project();
639
640 let Some(_) = this.event_handler_guard.upgrade() else {
641 return Poll::Ready(None);
645 };
646
647 loop {
657 match this.subscriber.as_mut().poll_next(context) {
658 Poll::Ready(None) => return Poll::Ready(None),
660
661 Poll::Ready(Some(None)) => {
664 continue;
666 }
667
668 Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
670
671 Poll::Pending => return Poll::Pending,
673 }
674 }
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 use matrix_sdk_test::{
681 async_test,
682 event_factory::{EventFactory, PreviousMembership},
683 InvitedRoomBuilder, JoinedRoomBuilder, DEFAULT_TEST_ROOM_ID,
684 };
685 use stream_assert::{assert_closed, assert_pending, assert_ready};
686 #[cfg(target_family = "wasm")]
687 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
688 use std::{
689 future,
690 sync::{
691 atomic::{AtomicU8, Ordering::SeqCst},
692 Arc,
693 },
694 };
695
696 use matrix_sdk_test::{StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder};
697 use once_cell::sync::Lazy;
698 use ruma::{
699 event_id,
700 events::{
701 room::{
702 member::{MembershipState, OriginalSyncRoomMemberEvent, StrippedRoomMemberEvent},
703 name::OriginalSyncRoomNameEvent,
704 power_levels::OriginalSyncRoomPowerLevelsEvent,
705 },
706 typing::SyncTypingEvent,
707 AnySyncStateEvent, AnySyncTimelineEvent,
708 },
709 room_id,
710 serde::Raw,
711 user_id,
712 };
713 use serde_json::json;
714
715 use crate::{
716 event_handler::Ctx,
717 test_utils::{logged_in_client, no_retry_test_client},
718 Client, Room,
719 };
720
721 static MEMBER_EVENT: Lazy<Raw<AnySyncTimelineEvent>> = Lazy::new(|| {
722 EventFactory::new()
723 .member(user_id!("@example:localhost"))
724 .membership(MembershipState::Join)
725 .display_name("example")
726 .event_id(event_id!("$151800140517rfvjc:localhost"))
727 .previous(PreviousMembership::new(MembershipState::Invite).display_name("example"))
728 .into()
729 });
730
731 #[async_test]
732 async fn test_add_event_handler() -> crate::Result<()> {
733 let client = logged_in_client(None).await;
734
735 let member_count = Arc::new(AtomicU8::new(0));
736 let typing_count = Arc::new(AtomicU8::new(0));
737 let power_levels_count = Arc::new(AtomicU8::new(0));
738 let invited_member_count = Arc::new(AtomicU8::new(0));
739
740 client.add_event_handler({
741 let member_count = member_count.clone();
742 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| async move {
743 member_count.fetch_add(1, SeqCst);
744 }
745 });
746 client.add_event_handler({
747 let typing_count = typing_count.clone();
748 move |_ev: SyncTypingEvent| async move {
749 typing_count.fetch_add(1, SeqCst);
750 }
751 });
752 client.add_event_handler({
753 let power_levels_count = power_levels_count.clone();
754 move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| async move {
755 power_levels_count.fetch_add(1, SeqCst);
756 }
757 });
758 client.add_event_handler({
759 let invited_member_count = invited_member_count.clone();
760 move |_ev: StrippedRoomMemberEvent| async move {
761 invited_member_count.fetch_add(1, SeqCst);
762 }
763 });
764
765 let f = EventFactory::new();
766 let response = SyncResponseBuilder::default()
767 .add_joined_room(
768 JoinedRoomBuilder::default()
769 .add_timeline_event(MEMBER_EVENT.clone())
770 .add_typing(
771 f.typing(vec![user_id!("@alice:matrix.org"), user_id!("@bob:example.com")]),
772 )
773 .add_state_event(StateTestEvent::PowerLevels),
774 )
775 .add_invited_room(
776 InvitedRoomBuilder::new(room_id!("!test_invited:example.org")).add_state_event(
777 StrippedStateTestEvent::Custom(json!({
778 "content": {
779 "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
780 "displayname": "Alice",
781 "membership": "invite",
782 },
783 "event_id": "$143273582443PhrSn:example.org",
784 "origin_server_ts": 1432735824653u64,
785 "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
786 "sender": "@example:example.org",
787 "state_key": "@alice:example.org",
788 "type": "m.room.member",
789 "unsigned": {
790 "age": 1234,
791 "invite_room_state": [
792 {
793 "content": {
794 "name": "Example Room"
795 },
796 "sender": "@bob:example.org",
797 "state_key": "",
798 "type": "m.room.name"
799 },
800 {
801 "content": {
802 "join_rule": "invite"
803 },
804 "sender": "@bob:example.org",
805 "state_key": "",
806 "type": "m.room.join_rules"
807 }
808 ]
809 }
810 })),
811 ),
812 )
813 .build_sync_response();
814 client.process_sync(response).await?;
815
816 assert_eq!(member_count.load(SeqCst), 1);
817 assert_eq!(typing_count.load(SeqCst), 1);
818 assert_eq!(power_levels_count.load(SeqCst), 1);
819 assert_eq!(invited_member_count.load(SeqCst), 1);
820
821 Ok(())
822 }
823
824 #[async_test]
825 #[allow(dependency_on_unit_never_type_fallback)]
826 async fn test_add_room_event_handler() -> crate::Result<()> {
827 let client = logged_in_client(None).await;
828
829 let room_id_a = room_id!("!foo:example.org");
830 let room_id_b = room_id!("!bar:matrix.org");
831
832 let member_count = Arc::new(AtomicU8::new(0));
833 let power_levels_count = Arc::new(AtomicU8::new(0));
834
835 client.add_room_event_handler(room_id_a, {
837 let member_count = member_count.clone();
838 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
839 member_count.fetch_add(1, SeqCst);
840 future::ready(())
841 }
842 });
843 client.add_room_event_handler(room_id_b, {
844 let member_count = member_count.clone();
845 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
846 member_count.fetch_add(1, SeqCst);
847 future::ready(())
848 }
849 });
850
851 client.add_room_event_handler(room_id_a, {
853 let power_levels_count = power_levels_count.clone();
854 move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| {
855 power_levels_count.fetch_add(1, SeqCst);
856 future::ready(())
857 }
858 });
859
860 client.add_room_event_handler(room_id_b, move |_ev: OriginalSyncRoomNameEvent| async {
862 unreachable!("No room event in room B")
863 });
864
865 let response = SyncResponseBuilder::default()
866 .add_joined_room(
867 JoinedRoomBuilder::new(room_id_a)
868 .add_timeline_event(MEMBER_EVENT.clone())
869 .add_state_event(StateTestEvent::PowerLevels)
870 .add_state_event(StateTestEvent::RoomName),
871 )
872 .add_joined_room(
873 JoinedRoomBuilder::new(room_id_b)
874 .add_timeline_event(MEMBER_EVENT.clone())
875 .add_state_event(StateTestEvent::PowerLevels),
876 )
877 .build_sync_response();
878 client.process_sync(response).await?;
879
880 assert_eq!(member_count.load(SeqCst), 2);
881 assert_eq!(power_levels_count.load(SeqCst), 1);
882
883 Ok(())
884 }
885
886 #[async_test]
887 #[allow(dependency_on_unit_never_type_fallback)]
888 async fn test_add_event_handler_with_tuples() -> crate::Result<()> {
889 let client = logged_in_client(None).await;
890
891 client.add_event_handler(
892 |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()),
893 );
894
895 Ok(())
898 }
899
900 #[async_test]
901 #[allow(dependency_on_unit_never_type_fallback)]
902 async fn test_remove_event_handler() -> crate::Result<()> {
903 let client = logged_in_client(None).await;
904
905 let member_count = Arc::new(AtomicU8::new(0));
906
907 client.add_event_handler({
908 let member_count = member_count.clone();
909 move |_ev: OriginalSyncRoomMemberEvent| async move {
910 member_count.fetch_add(1, SeqCst);
911 }
912 });
913
914 let handle_a = client.add_event_handler(move |_ev: OriginalSyncRoomMemberEvent| async {
915 panic!("handler should have been removed");
916 });
917 let handle_b = client.add_room_event_handler(
918 #[allow(unknown_lints, clippy::explicit_auto_deref)] *DEFAULT_TEST_ROOM_ID,
920 move |_ev: OriginalSyncRoomMemberEvent| async {
921 panic!("handler should have been removed");
922 },
923 );
924
925 client.add_event_handler({
926 let member_count = member_count.clone();
927 move |_ev: OriginalSyncRoomMemberEvent| async move {
928 member_count.fetch_add(1, SeqCst);
929 }
930 });
931
932 let response = SyncResponseBuilder::default()
933 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
934 .build_sync_response();
935
936 client.remove_event_handler(handle_a);
937 client.remove_event_handler(handle_b);
938
939 client.process_sync(response).await?;
940
941 assert_eq!(member_count.load(SeqCst), 2);
942
943 Ok(())
944 }
945
946 #[async_test]
947 async fn test_event_handler_drop_guard() {
948 let client = no_retry_test_client(None).await;
949
950 let handle = client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent| async {});
951 assert_eq!(client.inner.event_handlers.len(), 1);
952
953 {
954 let _guard = client.event_handler_drop_guard(handle);
955 assert_eq!(client.inner.event_handlers.len(), 1);
956 }
958
959 assert_eq!(client.inner.event_handlers.len(), 0);
960 }
961
962 #[async_test]
963 async fn test_use_client_in_handler() {
964 let client = no_retry_test_client(None).await;
968
969 client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent, client: Client| async move {
970 let _caps = client.get_capabilities().await.map_err(|e| anyhow::anyhow!("{}", e))?;
974 anyhow::Ok(())
975 });
976 }
977
978 #[async_test]
979 async fn test_raw_event_handler() -> crate::Result<()> {
980 let client = logged_in_client(None).await;
981 let counter = Arc::new(AtomicU8::new(0));
982 client.add_event_handler_context(counter.clone());
983 client.add_event_handler(
984 |_ev: Raw<OriginalSyncRoomMemberEvent>, counter: Ctx<Arc<AtomicU8>>| async move {
985 counter.fetch_add(1, SeqCst);
986 },
987 );
988
989 let response = SyncResponseBuilder::default()
990 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
991 .build_sync_response();
992 client.process_sync(response).await?;
993
994 assert_eq!(counter.load(SeqCst), 1);
995 Ok(())
996 }
997
998 #[async_test]
999 async fn test_enum_event_handler() -> crate::Result<()> {
1000 let client = logged_in_client(None).await;
1001 let counter = Arc::new(AtomicU8::new(0));
1002 client.add_event_handler_context(counter.clone());
1003 client.add_event_handler(
1004 |_ev: AnySyncStateEvent, counter: Ctx<Arc<AtomicU8>>| async move {
1005 counter.fetch_add(1, SeqCst);
1006 },
1007 );
1008
1009 let response = SyncResponseBuilder::default()
1010 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1011 .build_sync_response();
1012 client.process_sync(response).await?;
1013
1014 assert_eq!(counter.load(SeqCst), 1);
1015 Ok(())
1016 }
1017
1018 #[async_test]
1019 #[allow(dependency_on_unit_never_type_fallback)]
1020 async fn test_observe_events() -> crate::Result<()> {
1021 let client = logged_in_client(None).await;
1022
1023 let room_id_0 = room_id!("!r0.matrix.org");
1024 let room_id_1 = room_id!("!r1.matrix.org");
1025
1026 let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
1027
1028 let mut subscriber = observable.subscribe();
1029
1030 assert_pending!(subscriber);
1031
1032 let mut response_builder = SyncResponseBuilder::new();
1033 let response = response_builder
1034 .add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event(
1035 StateTestEvent::Custom(json!({
1036 "content": {
1037 "name": "Name 0"
1038 },
1039 "event_id": "$ev0",
1040 "origin_server_ts": 1,
1041 "sender": "@mnt_io:matrix.org",
1042 "state_key": "",
1043 "type": "m.room.name",
1044 "unsigned": {
1045 "age": 1,
1046 }
1047 })),
1048 ))
1049 .build_sync_response();
1050 client.process_sync(response).await?;
1051
1052 let (room_name, room) = assert_ready!(subscriber);
1053
1054 assert_eq!(room_name.event_id.as_str(), "$ev0");
1055 assert_eq!(room.room_id(), room_id_0);
1056 assert_eq!(room.name().unwrap(), "Name 0");
1057
1058 assert_pending!(subscriber);
1059
1060 let response = response_builder
1061 .add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event(
1062 StateTestEvent::Custom(json!({
1063 "content": {
1064 "name": "Name 1"
1065 },
1066 "event_id": "$ev1",
1067 "origin_server_ts": 2,
1068 "sender": "@mnt_io:matrix.org",
1069 "state_key": "",
1070 "type": "m.room.name",
1071 "unsigned": {
1072 "age": 2,
1073 }
1074 })),
1075 ))
1076 .build_sync_response();
1077 client.process_sync(response).await?;
1078
1079 let (room_name, room) = assert_ready!(subscriber);
1080
1081 assert_eq!(room_name.event_id.as_str(), "$ev1");
1082 assert_eq!(room.room_id(), room_id_1);
1083 assert_eq!(room.name().unwrap(), "Name 1");
1084
1085 assert_pending!(subscriber);
1086
1087 drop(observable);
1088 assert_closed!(subscriber);
1089
1090 Ok(())
1091 }
1092
1093 #[async_test]
1094 #[allow(dependency_on_unit_never_type_fallback)]
1095 async fn test_observe_room_events() -> crate::Result<()> {
1096 let client = logged_in_client(None).await;
1097
1098 let room_id = room_id!("!r0.matrix.org");
1099
1100 let observable_for_room =
1101 client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1102
1103 let mut subscriber_for_room = observable_for_room.subscribe();
1104
1105 assert_pending!(subscriber_for_room);
1106
1107 let mut response_builder = SyncResponseBuilder::new();
1108 let response = response_builder
1109 .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1110 StateTestEvent::Custom(json!({
1111 "content": {
1112 "name": "Name 0"
1113 },
1114 "event_id": "$ev0",
1115 "origin_server_ts": 1,
1116 "sender": "@mnt_io:matrix.org",
1117 "state_key": "",
1118 "type": "m.room.name",
1119 "unsigned": {
1120 "age": 1,
1121 }
1122 })),
1123 ))
1124 .build_sync_response();
1125 client.process_sync(response).await?;
1126
1127 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1128
1129 assert_eq!(room_name.event_id.as_str(), "$ev0");
1130 assert_eq!(room.name().unwrap(), "Name 0");
1131
1132 assert_pending!(subscriber_for_room);
1133
1134 let response = response_builder
1135 .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1136 StateTestEvent::Custom(json!({
1137 "content": {
1138 "name": "Name 1"
1139 },
1140 "event_id": "$ev1",
1141 "origin_server_ts": 2,
1142 "sender": "@mnt_io:matrix.org",
1143 "state_key": "",
1144 "type": "m.room.name",
1145 "unsigned": {
1146 "age": 2,
1147 }
1148 })),
1149 ))
1150 .build_sync_response();
1151 client.process_sync(response).await?;
1152
1153 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1154
1155 assert_eq!(room_name.event_id.as_str(), "$ev1");
1156 assert_eq!(room.name().unwrap(), "Name 1");
1157
1158 assert_pending!(subscriber_for_room);
1159
1160 drop(observable_for_room);
1161 assert_closed!(subscriber_for_room);
1162
1163 Ok(())
1164 }
1165
1166 #[async_test]
1167 async fn test_observe_several_room_events() -> crate::Result<()> {
1168 let client = logged_in_client(None).await;
1169
1170 let room_id = room_id!("!r0.matrix.org");
1171
1172 let observable_for_room =
1173 client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1174
1175 let mut subscriber_for_room = observable_for_room.subscribe();
1176
1177 assert_pending!(subscriber_for_room);
1178
1179 let mut response_builder = SyncResponseBuilder::new();
1180 let response = response_builder
1181 .add_joined_room(
1182 JoinedRoomBuilder::new(room_id)
1183 .add_state_event(StateTestEvent::Custom(json!({
1184 "content": {
1185 "name": "Name 0"
1186 },
1187 "event_id": "$ev0",
1188 "origin_server_ts": 1,
1189 "sender": "@mnt_io:matrix.org",
1190 "state_key": "",
1191 "type": "m.room.name",
1192 "unsigned": {
1193 "age": 1,
1194 }
1195 })))
1196 .add_state_event(StateTestEvent::Custom(json!({
1197 "content": {
1198 "name": "Name 1"
1199 },
1200 "event_id": "$ev1",
1201 "origin_server_ts": 2,
1202 "sender": "@mnt_io:matrix.org",
1203 "state_key": "",
1204 "type": "m.room.name",
1205 "unsigned": {
1206 "age": 1,
1207 }
1208 })))
1209 .add_state_event(StateTestEvent::Custom(json!({
1210 "content": {
1211 "name": "Name 2"
1212 },
1213 "event_id": "$ev2",
1214 "origin_server_ts": 3,
1215 "sender": "@mnt_io:matrix.org",
1216 "state_key": "",
1217 "type": "m.room.name",
1218 "unsigned": {
1219 "age": 1,
1220 }
1221 }))),
1222 )
1223 .build_sync_response();
1224 client.process_sync(response).await?;
1225
1226 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1227
1228 assert_eq!(room_name.event_id.as_str(), "$ev2");
1230 assert_eq!(room.name().unwrap(), "Name 2");
1231
1232 assert_pending!(subscriber_for_room);
1233
1234 drop(observable_for_room);
1235 assert_closed!(subscriber_for_room);
1236
1237 Ok(())
1238 }
1239}