matrix_sdk/event_handler/
mod.rs

1// Copyright 2021 Jonas Platte
2// Copyright 2022 Famedly GmbH
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types and traits related for event handlers. For usage, see
17//! [`Client::add_event_handler`].
18//!
19//! ### How it works
20//!
21//! The `add_event_handler` method registers event handlers of different
22//! signatures by actually storing boxed closures that all have the same
23//! signature of `async (EventHandlerData) -> ()` where `EventHandlerData` is a
24//! private type that contains all of the data an event handler *might* need.
25//!
26//! The stored closure takes care of deserializing the event which the
27//! `EventHandlerData` contains as a (borrowed) [`serde_json::value::RawValue`],
28//! extracting the context arguments from other fields of `EventHandlerData` and
29//! calling / `.await`ing the event handler if the previous steps succeeded.
30//! It also logs any errors from the above chain of function calls.
31//!
32//! For more details, see the [`EventHandler`] trait.
33
34#[cfg(any(feature = "anyhow", feature = "eyre"))]
35use std::any::TypeId;
36use std::{
37    borrow::Cow,
38    fmt,
39    future::Future,
40    pin::Pin,
41    sync::{
42        atomic::{AtomicU64, Ordering::SeqCst},
43        Arc, RwLock, Weak,
44    },
45    task::{Context, Poll},
46};
47
48#[cfg(target_family = "wasm")]
49use anymap2::any::CloneAny;
50#[cfg(not(target_family = "wasm"))]
51use anymap2::any::CloneAnySendSync;
52use eyeball::{SharedObservable, Subscriber};
53use futures_core::Stream;
54use futures_util::stream::{FuturesUnordered, StreamExt};
55use matrix_sdk_base::{
56    deserialized_responses::{EncryptionInfo, TimelineEvent},
57    SendOutsideWasm, SyncOutsideWasm,
58};
59use pin_project_lite::pin_project;
60use ruma::{events::AnySyncStateEvent, push::Action, serde::Raw, OwnedRoomId};
61use serde::{de::DeserializeOwned, Deserialize};
62use serde_json::value::RawValue as RawJsonValue;
63use tracing::{debug, error, field::debug, instrument, warn};
64
65use self::maps::EventHandlerMaps;
66use crate::{Client, Room};
67
68mod context;
69mod maps;
70mod static_events;
71
72pub use self::context::{Ctx, EventHandlerContext, RawEvent};
73
74#[cfg(not(target_family = "wasm"))]
75type EventHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
76#[cfg(target_family = "wasm")]
77type EventHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
78
79#[cfg(not(target_family = "wasm"))]
80type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut + Send + Sync;
81#[cfg(target_family = "wasm")]
82type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut;
83
84#[cfg(not(target_family = "wasm"))]
85type AnyMap = anymap2::Map<dyn CloneAnySendSync + Send + Sync>;
86#[cfg(target_family = "wasm")]
87type AnyMap = anymap2::Map<dyn CloneAny>;
88
89#[derive(Default)]
90pub(crate) struct EventHandlerStore {
91    handlers: RwLock<EventHandlerMaps>,
92    context: RwLock<AnyMap>,
93    counter: AtomicU64,
94}
95
96impl EventHandlerStore {
97    pub fn add_handler(&self, handle: EventHandlerHandle, handler_fn: Box<EventHandlerFn>) {
98        self.handlers.write().unwrap().add(handle, handler_fn);
99    }
100
101    pub fn add_context<T>(&self, ctx: T)
102    where
103        T: Clone + Send + Sync + 'static,
104    {
105        self.context.write().unwrap().insert(ctx);
106    }
107
108    pub fn remove(&self, handle: EventHandlerHandle) {
109        self.handlers.write().unwrap().remove(handle);
110    }
111
112    #[cfg(test)]
113    fn len(&self) -> usize {
114        self.handlers.read().unwrap().len()
115    }
116}
117
118#[doc(hidden)]
119#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
120pub enum HandlerKind {
121    GlobalAccountData,
122    RoomAccountData,
123    EphemeralRoomData,
124    Timeline,
125    MessageLike,
126    OriginalMessageLike,
127    RedactedMessageLike,
128    State,
129    OriginalState,
130    RedactedState,
131    StrippedState,
132    ToDevice,
133    Presence,
134}
135
136impl HandlerKind {
137    fn message_like_redacted(redacted: bool) -> Self {
138        if redacted {
139            Self::RedactedMessageLike
140        } else {
141            Self::OriginalMessageLike
142        }
143    }
144
145    fn state_redacted(redacted: bool) -> Self {
146        if redacted {
147            Self::RedactedState
148        } else {
149            Self::OriginalState
150        }
151    }
152}
153
154/// A statically-known event kind/type that can be retrieved from an event sync.
155pub trait SyncEvent {
156    #[doc(hidden)]
157    const KIND: HandlerKind;
158    #[doc(hidden)]
159    const TYPE: Option<&'static str>;
160}
161
162pub(crate) struct EventHandlerWrapper {
163    handler_fn: Box<EventHandlerFn>,
164    pub handler_id: u64,
165}
166
167/// Handle to remove a registered event handler by passing it to
168/// [`Client::remove_event_handler`].
169#[derive(Clone, Debug)]
170pub struct EventHandlerHandle {
171    pub(crate) ev_kind: HandlerKind,
172    pub(crate) ev_type: Option<&'static str>,
173    pub(crate) room_id: Option<OwnedRoomId>,
174    pub(crate) handler_id: u64,
175}
176
177/// Interface for event handlers.
178///
179/// This trait is an abstraction for a certain kind of functions / closures,
180/// specifically:
181///
182/// * They must have at least one argument, which is the event itself, a type
183///   that implements [`SyncEvent`]. Any additional arguments need to implement
184///   the [`EventHandlerContext`] trait.
185/// * Their return type has to be one of: `()`, `Result<(), impl Display + Debug
186///   + 'static>` (if you are using `anyhow::Result` or `eyre::Result` you can
187///   additionally enable the `anyhow` / `eyre` feature to get the verbose
188///   `Debug` output printed on error)
189///
190/// ### How it works
191///
192/// This trait is basically a very constrained version of `Fn`: It requires at
193/// least one argument, which is represented as its own generic parameter `Ev`
194/// with the remaining parameter types being represented by the second generic
195/// parameter `Ctx`; they have to be stuffed into one generic parameter as a
196/// tuple because Rust doesn't have variadic generics.
197///
198/// `Ev` and `Ctx` are generic parameters rather than associated types because
199/// the argument list is a generic parameter for the `Fn` traits too, so a
200/// single type could implement `Fn` multiple times with different argument
201/// lists¹. Luckily, when calling [`Client::add_event_handler`] with a
202/// closure argument the trait solver takes into account that only a single one
203/// of the implementations applies (even though this could theoretically change
204/// through a dependency upgrade) and uses that rather than raising an ambiguity
205/// error. This is the same trick used by web frameworks like actix-web and
206/// axum.
207///
208/// ¹ the only thing stopping such types from existing in stable Rust is that
209/// all manual implementations of the `Fn` traits require a Nightly feature
210pub trait EventHandler<Ev, Ctx>: Clone + SendOutsideWasm + SyncOutsideWasm + 'static {
211    /// The future returned by `handle_event`.
212    #[doc(hidden)]
213    type Future: EventHandlerFuture;
214
215    /// Create a future for handling the given event.
216    ///
217    /// `data` provides additional data about the event, for example the room it
218    /// appeared in.
219    ///
220    /// Returns `None` if one of the context extractors failed.
221    #[doc(hidden)]
222    fn handle_event(self, ev: Ev, data: EventHandlerData<'_>) -> Option<Self::Future>;
223}
224
225#[doc(hidden)]
226pub trait EventHandlerFuture:
227    Future<Output = <Self as EventHandlerFuture>::Output> + SendOutsideWasm + 'static
228{
229    type Output: EventHandlerResult;
230}
231
232impl<T> EventHandlerFuture for T
233where
234    T: Future + SendOutsideWasm + 'static,
235    <T as Future>::Output: EventHandlerResult,
236{
237    type Output = <T as Future>::Output;
238}
239
240#[doc(hidden)]
241#[derive(Debug)]
242pub struct EventHandlerData<'a> {
243    client: Client,
244    room: Option<Room>,
245    raw: &'a RawJsonValue,
246    encryption_info: Option<&'a EncryptionInfo>,
247    push_actions: &'a [Action],
248    handle: EventHandlerHandle,
249}
250
251/// Return types supported for event handlers implement this trait.
252///
253/// It is not meant to be implemented outside of matrix-sdk.
254pub trait EventHandlerResult: Sized {
255    #[doc(hidden)]
256    fn print_error(&self, event_type: Option<&str>);
257}
258
259impl EventHandlerResult for () {
260    fn print_error(&self, _event_type: Option<&str>) {}
261}
262
263impl<E: fmt::Debug + fmt::Display + 'static> EventHandlerResult for Result<(), E> {
264    fn print_error(&self, event_type: Option<&str>) {
265        let msg_fragment = match event_type {
266            Some(event_type) => format!(" for `{event_type}`"),
267            None => "".to_owned(),
268        };
269
270        match self {
271            #[cfg(feature = "anyhow")]
272            Err(e) if TypeId::of::<E>() == TypeId::of::<anyhow::Error>() => {
273                error!("Event handler{msg_fragment} failed: {e:?}");
274            }
275            #[cfg(feature = "eyre")]
276            Err(e) if TypeId::of::<E>() == TypeId::of::<eyre::Report>() => {
277                error!("Event handler{msg_fragment} failed: {e:?}");
278            }
279            Err(e) => {
280                error!("Event handler{msg_fragment} failed: {e}");
281            }
282            Ok(_) => {}
283        }
284    }
285}
286
287#[derive(Deserialize)]
288struct UnsignedDetails {
289    redacted_because: Option<serde::de::IgnoredAny>,
290}
291
292/// Event handling internals.
293impl Client {
294    pub(crate) fn add_event_handler_impl<Ev, Ctx, H>(
295        &self,
296        handler: H,
297        room_id: Option<OwnedRoomId>,
298    ) -> EventHandlerHandle
299    where
300        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
301        H: EventHandler<Ev, Ctx>,
302    {
303        let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
304            let maybe_fut = serde_json::from_str(data.raw.get())
305                .map(|ev| handler.clone().handle_event(ev, data));
306
307            Box::pin(async move {
308                match maybe_fut {
309                    Ok(Some(fut)) => {
310                        fut.await.print_error(Ev::TYPE);
311                    }
312                    Ok(None) => {
313                        error!(
314                            event_type = Ev::TYPE, event_kind = ?Ev::KIND,
315                            "Event handler has an invalid context argument",
316                        );
317                    }
318                    Err(e) => {
319                        warn!(
320                            event_type = Ev::TYPE, event_kind = ?Ev::KIND,
321                            "Failed to deserialize event, skipping event handler.\n
322                             Deserialization error: {e}",
323                        );
324                    }
325                }
326            })
327        });
328
329        let handler_id = self.inner.event_handlers.counter.fetch_add(1, SeqCst);
330        let handle =
331            EventHandlerHandle { ev_kind: Ev::KIND, ev_type: Ev::TYPE, room_id, handler_id };
332
333        self.inner.event_handlers.add_handler(handle.clone(), handler_fn);
334
335        handle
336    }
337
338    pub(crate) async fn handle_sync_events<T>(
339        &self,
340        kind: HandlerKind,
341        room: Option<&Room>,
342        events: &[Raw<T>],
343    ) -> serde_json::Result<()> {
344        #[derive(Deserialize)]
345        struct ExtractType<'a> {
346            #[serde(borrow, rename = "type")]
347            event_type: Cow<'a, str>,
348        }
349
350        for raw_event in events {
351            let event_type = raw_event.deserialize_as::<ExtractType<'_>>()?.event_type;
352            self.call_event_handlers(room, raw_event.json(), kind, &event_type, None, &[]).await;
353        }
354
355        Ok(())
356    }
357
358    pub(crate) async fn handle_sync_state_events(
359        &self,
360        room: Option<&Room>,
361        state_events: &[Raw<AnySyncStateEvent>],
362    ) -> serde_json::Result<()> {
363        #[derive(Deserialize)]
364        struct StateEventDetails<'a> {
365            #[serde(borrow, rename = "type")]
366            event_type: Cow<'a, str>,
367            unsigned: Option<UnsignedDetails>,
368        }
369
370        // Event handlers for possibly-redacted state events
371        self.handle_sync_events(HandlerKind::State, room, state_events).await?;
372
373        // Event handlers specifically for redacted OR unredacted state events
374        for raw_event in state_events {
375            let StateEventDetails { event_type, unsigned } = raw_event.deserialize_as()?;
376            let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
377            let handler_kind = HandlerKind::state_redacted(redacted);
378
379            self.call_event_handlers(room, raw_event.json(), handler_kind, &event_type, None, &[])
380                .await;
381        }
382
383        Ok(())
384    }
385
386    pub(crate) async fn handle_sync_timeline_events(
387        &self,
388        room: Option<&Room>,
389        timeline_events: &[TimelineEvent],
390    ) -> serde_json::Result<()> {
391        #[derive(Deserialize)]
392        struct TimelineEventDetails<'a> {
393            #[serde(borrow, rename = "type")]
394            event_type: Cow<'a, str>,
395            state_key: Option<serde::de::IgnoredAny>,
396            unsigned: Option<UnsignedDetails>,
397        }
398
399        for item in timeline_events {
400            let TimelineEventDetails { event_type, state_key, unsigned } =
401                item.raw().deserialize_as()?;
402
403            let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
404            let (handler_kind_g, handler_kind_r) = match state_key {
405                Some(_) => (HandlerKind::State, HandlerKind::state_redacted(redacted)),
406                None => (HandlerKind::MessageLike, HandlerKind::message_like_redacted(redacted)),
407            };
408
409            let raw_event = item.raw().json();
410            let encryption_info = item.encryption_info().map(|i| &**i);
411            let push_actions = item.push_actions().unwrap_or(&[]);
412
413            // Event handlers for possibly-redacted timeline events
414            self.call_event_handlers(
415                room,
416                raw_event,
417                handler_kind_g,
418                &event_type,
419                encryption_info,
420                push_actions,
421            )
422            .await;
423
424            // Event handlers specifically for redacted OR unredacted timeline events
425            self.call_event_handlers(
426                room,
427                raw_event,
428                handler_kind_r,
429                &event_type,
430                encryption_info,
431                push_actions,
432            )
433            .await;
434
435            // Event handlers for `AnySyncTimelineEvent`
436            let kind = HandlerKind::Timeline;
437            self.call_event_handlers(
438                room,
439                raw_event,
440                kind,
441                &event_type,
442                encryption_info,
443                push_actions,
444            )
445            .await;
446        }
447
448        Ok(())
449    }
450
451    #[instrument(skip_all, fields(?event_kind, ?event_type, room_id))]
452    async fn call_event_handlers(
453        &self,
454        room: Option<&Room>,
455        raw: &RawJsonValue,
456        event_kind: HandlerKind,
457        event_type: &str,
458        encryption_info: Option<&EncryptionInfo>,
459        push_actions: &[Action],
460    ) {
461        let room_id = room.map(|r| r.room_id());
462        if let Some(room_id) = room_id {
463            tracing::Span::current().record("room_id", debug(room_id));
464        }
465
466        // Construct event handler futures
467        let mut futures: FuturesUnordered<_> = self
468            .inner
469            .event_handlers
470            .handlers
471            .read()
472            .unwrap()
473            .get_handlers(event_kind, event_type, room_id)
474            .map(|(handle, handler_fn)| {
475                let data = EventHandlerData {
476                    client: self.clone(),
477                    room: room.cloned(),
478                    raw,
479                    encryption_info,
480                    push_actions,
481                    handle,
482                };
483
484                (handler_fn)(data)
485            })
486            .collect();
487
488        if !futures.is_empty() {
489            debug!(amount = futures.len(), "Calling event handlers");
490
491            // Run the event handler futures with the `self.event_handlers.handlers`
492            // lock no longer being held.
493            while let Some(()) = futures.next().await {}
494        }
495    }
496}
497
498/// A guard type that removes an event handler when it drops (goes out of
499/// scope).
500///
501/// Created with [`Client::event_handler_drop_guard`].
502#[derive(Debug)]
503pub struct EventHandlerDropGuard {
504    handle: EventHandlerHandle,
505    client: Client,
506}
507
508impl EventHandlerDropGuard {
509    pub(crate) fn new(handle: EventHandlerHandle, client: Client) -> Self {
510        Self { handle, client }
511    }
512}
513
514impl Drop for EventHandlerDropGuard {
515    fn drop(&mut self) {
516        self.client.remove_event_handler(self.handle.clone());
517    }
518}
519
520macro_rules! impl_event_handler {
521    ($($ty:ident),* $(,)?) => {
522        impl<Ev, Fun, Fut, $($ty),*> EventHandler<Ev, ($($ty,)*)> for Fun
523        where
524            Ev: SyncEvent,
525            Fun: FnOnce(Ev, $($ty),*) -> Fut + Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
526            Fut: EventHandlerFuture,
527            $($ty: EventHandlerContext),*
528        {
529            type Future = Fut;
530
531            fn handle_event(self, ev: Ev, _d: EventHandlerData<'_>) -> Option<Self::Future> {
532                Some((self)(ev, $($ty::from_data(&_d)?),*))
533            }
534        }
535    };
536}
537
538impl_event_handler!();
539impl_event_handler!(A);
540impl_event_handler!(A, B);
541impl_event_handler!(A, B, C);
542impl_event_handler!(A, B, C, D);
543impl_event_handler!(A, B, C, D, E);
544impl_event_handler!(A, B, C, D, E, F);
545impl_event_handler!(A, B, C, D, E, F, G);
546impl_event_handler!(A, B, C, D, E, F, G, H);
547
548/// An observer of events (may be tailored to a room).
549///
550/// Only the most recent value can be observed. Subscribers are notified when a
551/// new value is sent, but there is no guarantee that they will see all values.
552///
553/// To create such observer, use [`Client::observe_events`] or
554/// [`Client::observe_room_events`].
555#[derive(Debug)]
556pub struct ObservableEventHandler<T> {
557    /// This type is actually nothing more than a thin glue layer between the
558    /// [`EventHandler`] mechanism and the reactive programming types from
559    /// [`eyeball`]. Here, we use a [`SharedObservable`] that is updated by the
560    /// [`EventHandler`].
561    shared_observable: SharedObservable<Option<T>>,
562
563    /// This type owns the [`EventHandlerDropGuard`]. As soon as this type goes
564    /// out of scope, the event handler is unregistered/removed.
565    ///
566    /// [`EventHandlerSubscriber`] holds a weak, non-owning reference, to this
567    /// guard. It is useful to detect when to close the [`Stream`]: as soon as
568    /// this type goes out of scope, the subscriber will close itself on poll.
569    event_handler_guard: Arc<EventHandlerDropGuard>,
570}
571
572impl<T> ObservableEventHandler<T> {
573    pub(crate) fn new(
574        shared_observable: SharedObservable<Option<T>>,
575        event_handler_guard: EventHandlerDropGuard,
576    ) -> Self {
577        Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
578    }
579
580    /// Subscribe to this observer.
581    ///
582    /// It returns an [`EventHandlerSubscriber`], which implements [`Stream`].
583    /// See its documentation to learn more.
584    pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
585        EventHandlerSubscriber::new(
586            self.shared_observable.subscribe(),
587            // The subscriber holds a weak non-owning reference to the event handler guard, so that
588            // it can detect when this observer is dropped, and can close the subscriber's stream.
589            Arc::downgrade(&self.event_handler_guard),
590        )
591    }
592}
593
594pin_project! {
595    /// The subscriber of an [`ObservableEventHandler`].
596    ///
597    /// To create such subscriber, use [`ObservableEventHandler::subscribe`].
598    ///
599    /// This type implements [`Stream`], which means it is possible to poll the
600    /// next value asynchronously. In other terms, polling this type will return
601    /// the new event as soon as they are synced. See [`Client::observe_events`]
602    /// to learn more.
603    #[derive(Debug)]
604    pub struct EventHandlerSubscriber<T> {
605        // The `Subscriber` associated to the `SharedObservable` inside
606        // `ObservableEventHandle`.
607        //
608        // Keep in mind all this API is just a thin glue layer between
609        // `EventHandle` and `SharedObservable`, that's… maagiic!
610        #[pin]
611        subscriber: Subscriber<Option<T>>,
612
613        // A weak non-owning reference to the event handler guard from
614        // `ObservableEventHandler`. When this type is polled (via its `Stream`
615        // implementation), it is possible to detect whether the observable has
616        // been dropped by upgrading this weak reference, and close the `Stream`
617        // if it needs to.
618        event_handler_guard: Weak<EventHandlerDropGuard>,
619    }
620}
621
622impl<T> EventHandlerSubscriber<T> {
623    fn new(
624        subscriber: Subscriber<Option<T>>,
625        event_handler_handle: Weak<EventHandlerDropGuard>,
626    ) -> Self {
627        Self { subscriber, event_handler_guard: event_handler_handle }
628    }
629}
630
631impl<T> Stream for EventHandlerSubscriber<T>
632where
633    T: Clone,
634{
635    type Item = T;
636
637    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
638        let mut this = self.project();
639
640        let Some(_) = this.event_handler_guard.upgrade() else {
641            // The `EventHandlerHandle` has been dropped via `EventHandlerDropGuard`. It
642            // means the `ObservableEventHandler` has been dropped. It's time to
643            // close this stream.
644            return Poll::Ready(None);
645        };
646
647        // First off, the subscriber is of type `Subscriber<Option<T>>` because the
648        // `SharedObservable` starts with a `None` value to indicate it has no yet
649        // received any update. We want the `Stream` to return `T`, not `Option<T>`. We
650        // then filter out all `None` value.
651        //
652        // Second, when a `None` value is met, we want to poll again (hence the `loop`).
653        // At best, there is a new value to return. At worst, the subscriber will return
654        // `Poll::Pending` and will register the wakers accordingly.
655
656        loop {
657            match this.subscriber.as_mut().poll_next(context) {
658                // Stream has been closed somehow.
659                Poll::Ready(None) => return Poll::Ready(None),
660
661                // The initial value (of the `SharedObservable` behind `self.subscriber`) has been
662                // polled. We want to filter it out.
663                Poll::Ready(Some(None)) => {
664                    // Loop over.
665                    continue;
666                }
667
668                // We have a new value!
669                Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
670
671                // Classical pending.
672                Poll::Pending => return Poll::Pending,
673            }
674        }
675    }
676}
677
678#[cfg(test)]
679mod tests {
680    use matrix_sdk_test::{
681        async_test,
682        event_factory::{EventFactory, PreviousMembership},
683        InvitedRoomBuilder, JoinedRoomBuilder, DEFAULT_TEST_ROOM_ID,
684    };
685    use stream_assert::{assert_closed, assert_pending, assert_ready};
686    #[cfg(target_family = "wasm")]
687    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
688    use std::{
689        future,
690        sync::{
691            atomic::{AtomicU8, Ordering::SeqCst},
692            Arc,
693        },
694    };
695
696    use matrix_sdk_test::{StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder};
697    use once_cell::sync::Lazy;
698    use ruma::{
699        event_id,
700        events::{
701            room::{
702                member::{MembershipState, OriginalSyncRoomMemberEvent, StrippedRoomMemberEvent},
703                name::OriginalSyncRoomNameEvent,
704                power_levels::OriginalSyncRoomPowerLevelsEvent,
705            },
706            typing::SyncTypingEvent,
707            AnySyncStateEvent, AnySyncTimelineEvent,
708        },
709        room_id,
710        serde::Raw,
711        user_id,
712    };
713    use serde_json::json;
714
715    use crate::{
716        event_handler::Ctx,
717        test_utils::{logged_in_client, no_retry_test_client},
718        Client, Room,
719    };
720
721    static MEMBER_EVENT: Lazy<Raw<AnySyncTimelineEvent>> = Lazy::new(|| {
722        EventFactory::new()
723            .member(user_id!("@example:localhost"))
724            .membership(MembershipState::Join)
725            .display_name("example")
726            .event_id(event_id!("$151800140517rfvjc:localhost"))
727            .previous(PreviousMembership::new(MembershipState::Invite).display_name("example"))
728            .into()
729    });
730
731    #[async_test]
732    async fn test_add_event_handler() -> crate::Result<()> {
733        let client = logged_in_client(None).await;
734
735        let member_count = Arc::new(AtomicU8::new(0));
736        let typing_count = Arc::new(AtomicU8::new(0));
737        let power_levels_count = Arc::new(AtomicU8::new(0));
738        let invited_member_count = Arc::new(AtomicU8::new(0));
739
740        client.add_event_handler({
741            let member_count = member_count.clone();
742            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| async move {
743                member_count.fetch_add(1, SeqCst);
744            }
745        });
746        client.add_event_handler({
747            let typing_count = typing_count.clone();
748            move |_ev: SyncTypingEvent| async move {
749                typing_count.fetch_add(1, SeqCst);
750            }
751        });
752        client.add_event_handler({
753            let power_levels_count = power_levels_count.clone();
754            move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| async move {
755                power_levels_count.fetch_add(1, SeqCst);
756            }
757        });
758        client.add_event_handler({
759            let invited_member_count = invited_member_count.clone();
760            move |_ev: StrippedRoomMemberEvent| async move {
761                invited_member_count.fetch_add(1, SeqCst);
762            }
763        });
764
765        let f = EventFactory::new();
766        let response = SyncResponseBuilder::default()
767            .add_joined_room(
768                JoinedRoomBuilder::default()
769                    .add_timeline_event(MEMBER_EVENT.clone())
770                    .add_typing(
771                        f.typing(vec![user_id!("@alice:matrix.org"), user_id!("@bob:example.com")]),
772                    )
773                    .add_state_event(StateTestEvent::PowerLevels),
774            )
775            .add_invited_room(
776                InvitedRoomBuilder::new(room_id!("!test_invited:example.org")).add_state_event(
777                    StrippedStateTestEvent::Custom(json!({
778                        "content": {
779                            "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
780                            "displayname": "Alice",
781                            "membership": "invite",
782                        },
783                        "event_id": "$143273582443PhrSn:example.org",
784                        "origin_server_ts": 1432735824653u64,
785                        "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
786                        "sender": "@example:example.org",
787                        "state_key": "@alice:example.org",
788                        "type": "m.room.member",
789                        "unsigned": {
790                            "age": 1234,
791                            "invite_room_state": [
792                                {
793                                    "content": {
794                                        "name": "Example Room"
795                                    },
796                                    "sender": "@bob:example.org",
797                                    "state_key": "",
798                                    "type": "m.room.name"
799                                },
800                                {
801                                    "content": {
802                                        "join_rule": "invite"
803                                    },
804                                    "sender": "@bob:example.org",
805                                    "state_key": "",
806                                    "type": "m.room.join_rules"
807                                }
808                            ]
809                        }
810                    })),
811                ),
812            )
813            .build_sync_response();
814        client.process_sync(response).await?;
815
816        assert_eq!(member_count.load(SeqCst), 1);
817        assert_eq!(typing_count.load(SeqCst), 1);
818        assert_eq!(power_levels_count.load(SeqCst), 1);
819        assert_eq!(invited_member_count.load(SeqCst), 1);
820
821        Ok(())
822    }
823
824    #[async_test]
825    #[allow(dependency_on_unit_never_type_fallback)]
826    async fn test_add_room_event_handler() -> crate::Result<()> {
827        let client = logged_in_client(None).await;
828
829        let room_id_a = room_id!("!foo:example.org");
830        let room_id_b = room_id!("!bar:matrix.org");
831
832        let member_count = Arc::new(AtomicU8::new(0));
833        let power_levels_count = Arc::new(AtomicU8::new(0));
834
835        // Room event handlers for member events in both rooms
836        client.add_room_event_handler(room_id_a, {
837            let member_count = member_count.clone();
838            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
839                member_count.fetch_add(1, SeqCst);
840                future::ready(())
841            }
842        });
843        client.add_room_event_handler(room_id_b, {
844            let member_count = member_count.clone();
845            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
846                member_count.fetch_add(1, SeqCst);
847                future::ready(())
848            }
849        });
850
851        // Power levels event handlers for member events in room A
852        client.add_room_event_handler(room_id_a, {
853            let power_levels_count = power_levels_count.clone();
854            move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| {
855                power_levels_count.fetch_add(1, SeqCst);
856                future::ready(())
857            }
858        });
859
860        // Room name event handler for room name events in room B
861        client.add_room_event_handler(room_id_b, move |_ev: OriginalSyncRoomNameEvent| async {
862            unreachable!("No room event in room B")
863        });
864
865        let response = SyncResponseBuilder::default()
866            .add_joined_room(
867                JoinedRoomBuilder::new(room_id_a)
868                    .add_timeline_event(MEMBER_EVENT.clone())
869                    .add_state_event(StateTestEvent::PowerLevels)
870                    .add_state_event(StateTestEvent::RoomName),
871            )
872            .add_joined_room(
873                JoinedRoomBuilder::new(room_id_b)
874                    .add_timeline_event(MEMBER_EVENT.clone())
875                    .add_state_event(StateTestEvent::PowerLevels),
876            )
877            .build_sync_response();
878        client.process_sync(response).await?;
879
880        assert_eq!(member_count.load(SeqCst), 2);
881        assert_eq!(power_levels_count.load(SeqCst), 1);
882
883        Ok(())
884    }
885
886    #[async_test]
887    #[allow(dependency_on_unit_never_type_fallback)]
888    async fn test_add_event_handler_with_tuples() -> crate::Result<()> {
889        let client = logged_in_client(None).await;
890
891        client.add_event_handler(
892            |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()),
893        );
894
895        // If it compiles, it works. No need to assert anything.
896
897        Ok(())
898    }
899
900    #[async_test]
901    #[allow(dependency_on_unit_never_type_fallback)]
902    async fn test_remove_event_handler() -> crate::Result<()> {
903        let client = logged_in_client(None).await;
904
905        let member_count = Arc::new(AtomicU8::new(0));
906
907        client.add_event_handler({
908            let member_count = member_count.clone();
909            move |_ev: OriginalSyncRoomMemberEvent| async move {
910                member_count.fetch_add(1, SeqCst);
911            }
912        });
913
914        let handle_a = client.add_event_handler(move |_ev: OriginalSyncRoomMemberEvent| async {
915            panic!("handler should have been removed");
916        });
917        let handle_b = client.add_room_event_handler(
918            #[allow(unknown_lints, clippy::explicit_auto_deref)] // lint is buggy
919            *DEFAULT_TEST_ROOM_ID,
920            move |_ev: OriginalSyncRoomMemberEvent| async {
921                panic!("handler should have been removed");
922            },
923        );
924
925        client.add_event_handler({
926            let member_count = member_count.clone();
927            move |_ev: OriginalSyncRoomMemberEvent| async move {
928                member_count.fetch_add(1, SeqCst);
929            }
930        });
931
932        let response = SyncResponseBuilder::default()
933            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
934            .build_sync_response();
935
936        client.remove_event_handler(handle_a);
937        client.remove_event_handler(handle_b);
938
939        client.process_sync(response).await?;
940
941        assert_eq!(member_count.load(SeqCst), 2);
942
943        Ok(())
944    }
945
946    #[async_test]
947    async fn test_event_handler_drop_guard() {
948        let client = no_retry_test_client(None).await;
949
950        let handle = client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent| async {});
951        assert_eq!(client.inner.event_handlers.len(), 1);
952
953        {
954            let _guard = client.event_handler_drop_guard(handle);
955            assert_eq!(client.inner.event_handlers.len(), 1);
956            // guard dropped here
957        }
958
959        assert_eq!(client.inner.event_handlers.len(), 0);
960    }
961
962    #[async_test]
963    async fn test_use_client_in_handler() {
964        // This used to not work because we were requiring `Send` of event
965        // handler futures even on WASM, where practically all futures that do
966        // I/O aren't.
967        let client = no_retry_test_client(None).await;
968
969        client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent, client: Client| async move {
970            // All of Client's async methods that do network requests (and
971            // possibly some that don't) are `!Send` on wasm. We obviously want
972            // to be able to use them in event handlers.
973            let _caps = client.get_capabilities().await.map_err(|e| anyhow::anyhow!("{}", e))?;
974            anyhow::Ok(())
975        });
976    }
977
978    #[async_test]
979    async fn test_raw_event_handler() -> crate::Result<()> {
980        let client = logged_in_client(None).await;
981        let counter = Arc::new(AtomicU8::new(0));
982        client.add_event_handler_context(counter.clone());
983        client.add_event_handler(
984            |_ev: Raw<OriginalSyncRoomMemberEvent>, counter: Ctx<Arc<AtomicU8>>| async move {
985                counter.fetch_add(1, SeqCst);
986            },
987        );
988
989        let response = SyncResponseBuilder::default()
990            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
991            .build_sync_response();
992        client.process_sync(response).await?;
993
994        assert_eq!(counter.load(SeqCst), 1);
995        Ok(())
996    }
997
998    #[async_test]
999    async fn test_enum_event_handler() -> crate::Result<()> {
1000        let client = logged_in_client(None).await;
1001        let counter = Arc::new(AtomicU8::new(0));
1002        client.add_event_handler_context(counter.clone());
1003        client.add_event_handler(
1004            |_ev: AnySyncStateEvent, counter: Ctx<Arc<AtomicU8>>| async move {
1005                counter.fetch_add(1, SeqCst);
1006            },
1007        );
1008
1009        let response = SyncResponseBuilder::default()
1010            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1011            .build_sync_response();
1012        client.process_sync(response).await?;
1013
1014        assert_eq!(counter.load(SeqCst), 1);
1015        Ok(())
1016    }
1017
1018    #[async_test]
1019    #[allow(dependency_on_unit_never_type_fallback)]
1020    async fn test_observe_events() -> crate::Result<()> {
1021        let client = logged_in_client(None).await;
1022
1023        let room_id_0 = room_id!("!r0.matrix.org");
1024        let room_id_1 = room_id!("!r1.matrix.org");
1025
1026        let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
1027
1028        let mut subscriber = observable.subscribe();
1029
1030        assert_pending!(subscriber);
1031
1032        let mut response_builder = SyncResponseBuilder::new();
1033        let response = response_builder
1034            .add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event(
1035                StateTestEvent::Custom(json!({
1036                    "content": {
1037                        "name": "Name 0"
1038                    },
1039                    "event_id": "$ev0",
1040                    "origin_server_ts": 1,
1041                    "sender": "@mnt_io:matrix.org",
1042                    "state_key": "",
1043                    "type": "m.room.name",
1044                    "unsigned": {
1045                        "age": 1,
1046                    }
1047                })),
1048            ))
1049            .build_sync_response();
1050        client.process_sync(response).await?;
1051
1052        let (room_name, room) = assert_ready!(subscriber);
1053
1054        assert_eq!(room_name.event_id.as_str(), "$ev0");
1055        assert_eq!(room.room_id(), room_id_0);
1056        assert_eq!(room.name().unwrap(), "Name 0");
1057
1058        assert_pending!(subscriber);
1059
1060        let response = response_builder
1061            .add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event(
1062                StateTestEvent::Custom(json!({
1063                    "content": {
1064                        "name": "Name 1"
1065                    },
1066                    "event_id": "$ev1",
1067                    "origin_server_ts": 2,
1068                    "sender": "@mnt_io:matrix.org",
1069                    "state_key": "",
1070                    "type": "m.room.name",
1071                    "unsigned": {
1072                        "age": 2,
1073                    }
1074                })),
1075            ))
1076            .build_sync_response();
1077        client.process_sync(response).await?;
1078
1079        let (room_name, room) = assert_ready!(subscriber);
1080
1081        assert_eq!(room_name.event_id.as_str(), "$ev1");
1082        assert_eq!(room.room_id(), room_id_1);
1083        assert_eq!(room.name().unwrap(), "Name 1");
1084
1085        assert_pending!(subscriber);
1086
1087        drop(observable);
1088        assert_closed!(subscriber);
1089
1090        Ok(())
1091    }
1092
1093    #[async_test]
1094    #[allow(dependency_on_unit_never_type_fallback)]
1095    async fn test_observe_room_events() -> crate::Result<()> {
1096        let client = logged_in_client(None).await;
1097
1098        let room_id = room_id!("!r0.matrix.org");
1099
1100        let observable_for_room =
1101            client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1102
1103        let mut subscriber_for_room = observable_for_room.subscribe();
1104
1105        assert_pending!(subscriber_for_room);
1106
1107        let mut response_builder = SyncResponseBuilder::new();
1108        let response = response_builder
1109            .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1110                StateTestEvent::Custom(json!({
1111                    "content": {
1112                        "name": "Name 0"
1113                    },
1114                    "event_id": "$ev0",
1115                    "origin_server_ts": 1,
1116                    "sender": "@mnt_io:matrix.org",
1117                    "state_key": "",
1118                    "type": "m.room.name",
1119                    "unsigned": {
1120                        "age": 1,
1121                    }
1122                })),
1123            ))
1124            .build_sync_response();
1125        client.process_sync(response).await?;
1126
1127        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1128
1129        assert_eq!(room_name.event_id.as_str(), "$ev0");
1130        assert_eq!(room.name().unwrap(), "Name 0");
1131
1132        assert_pending!(subscriber_for_room);
1133
1134        let response = response_builder
1135            .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1136                StateTestEvent::Custom(json!({
1137                    "content": {
1138                        "name": "Name 1"
1139                    },
1140                    "event_id": "$ev1",
1141                    "origin_server_ts": 2,
1142                    "sender": "@mnt_io:matrix.org",
1143                    "state_key": "",
1144                    "type": "m.room.name",
1145                    "unsigned": {
1146                        "age": 2,
1147                    }
1148                })),
1149            ))
1150            .build_sync_response();
1151        client.process_sync(response).await?;
1152
1153        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1154
1155        assert_eq!(room_name.event_id.as_str(), "$ev1");
1156        assert_eq!(room.name().unwrap(), "Name 1");
1157
1158        assert_pending!(subscriber_for_room);
1159
1160        drop(observable_for_room);
1161        assert_closed!(subscriber_for_room);
1162
1163        Ok(())
1164    }
1165
1166    #[async_test]
1167    async fn test_observe_several_room_events() -> crate::Result<()> {
1168        let client = logged_in_client(None).await;
1169
1170        let room_id = room_id!("!r0.matrix.org");
1171
1172        let observable_for_room =
1173            client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1174
1175        let mut subscriber_for_room = observable_for_room.subscribe();
1176
1177        assert_pending!(subscriber_for_room);
1178
1179        let mut response_builder = SyncResponseBuilder::new();
1180        let response = response_builder
1181            .add_joined_room(
1182                JoinedRoomBuilder::new(room_id)
1183                    .add_state_event(StateTestEvent::Custom(json!({
1184                        "content": {
1185                            "name": "Name 0"
1186                        },
1187                        "event_id": "$ev0",
1188                        "origin_server_ts": 1,
1189                        "sender": "@mnt_io:matrix.org",
1190                        "state_key": "",
1191                        "type": "m.room.name",
1192                        "unsigned": {
1193                            "age": 1,
1194                        }
1195                    })))
1196                    .add_state_event(StateTestEvent::Custom(json!({
1197                        "content": {
1198                            "name": "Name 1"
1199                        },
1200                        "event_id": "$ev1",
1201                        "origin_server_ts": 2,
1202                        "sender": "@mnt_io:matrix.org",
1203                        "state_key": "",
1204                        "type": "m.room.name",
1205                        "unsigned": {
1206                            "age": 1,
1207                        }
1208                    })))
1209                    .add_state_event(StateTestEvent::Custom(json!({
1210                        "content": {
1211                            "name": "Name 2"
1212                        },
1213                        "event_id": "$ev2",
1214                        "origin_server_ts": 3,
1215                        "sender": "@mnt_io:matrix.org",
1216                        "state_key": "",
1217                        "type": "m.room.name",
1218                        "unsigned": {
1219                            "age": 1,
1220                        }
1221                    }))),
1222            )
1223            .build_sync_response();
1224        client.process_sync(response).await?;
1225
1226        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1227
1228        // Check we only get notified about the latest received event
1229        assert_eq!(room_name.event_id.as_str(), "$ev2");
1230        assert_eq!(room.name().unwrap(), "Name 2");
1231
1232        assert_pending!(subscriber_for_room);
1233
1234        drop(observable_for_room);
1235        assert_closed!(subscriber_for_room);
1236
1237        Ok(())
1238    }
1239}