matrix_sdk_base/response_processors/
state_events.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use ruma::{events::AnySyncStateEvent, serde::Raw};
16use serde::Deserialize;
17use tracing::warn;
18
19use super::Context;
20
21/// Collect [`AnySyncStateEvent`].
22pub mod sync {
23    use std::{collections::BTreeSet, iter};
24
25    use ruma::{
26        events::{
27            room::member::{MembershipState, RoomMemberEventContent},
28            AnySyncTimelineEvent, SyncStateEvent,
29        },
30        OwnedUserId, RoomId, UserId,
31    };
32    use tracing::instrument;
33
34    use super::{super::profiles, AnySyncStateEvent, Context, Raw};
35    use crate::{
36        store::{ambiguity_map::AmbiguityCache, Result as StoreResult},
37        RoomInfo,
38    };
39
40    /// Collect [`AnySyncStateEvent`] to [`AnySyncStateEvent`].
41    pub fn collect(
42        raw_events: &[Raw<AnySyncStateEvent>],
43    ) -> (Vec<Raw<AnySyncStateEvent>>, Vec<AnySyncStateEvent>) {
44        super::collect(raw_events)
45    }
46
47    /// Collect [`AnySyncTimelineEvent`] to [`AnySyncStateEvent`].
48    ///
49    /// A [`AnySyncTimelineEvent`] can represent either message-like events or
50    /// state events. The message-like events are filtered out.
51    pub fn collect_from_timeline(
52        raw_events: &[Raw<AnySyncTimelineEvent>],
53    ) -> (Vec<Raw<AnySyncStateEvent>>, Vec<AnySyncStateEvent>) {
54        super::collect(raw_events.iter().filter_map(|raw_event| {
55            // Only state events have a `state_key` field.
56            match raw_event.get_field::<&str>("state_key") {
57                Ok(Some(_)) => Some(raw_event.cast_ref()),
58                _ => None,
59            }
60        }))
61    }
62
63    /// Dispatch the sync state events.
64    ///
65    /// `raw_events` and `events` must be generated from [`collect`].
66    /// Events must be exactly the same list of events that are in
67    /// `raw_events`, but deserialised. We demand them here to avoid
68    /// deserialising multiple times.
69    ///
70    /// The `new_users` mutable reference allows to collect the new users for
71    /// this room.
72    #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
73    pub async fn dispatch<U>(
74        context: &mut Context,
75        (raw_events, events): (&[Raw<AnySyncStateEvent>], &[AnySyncStateEvent]),
76        room_info: &mut RoomInfo,
77        ambiguity_cache: &mut AmbiguityCache,
78        new_users: &mut U,
79    ) -> StoreResult<()>
80    where
81        U: NewUsers,
82    {
83        for (raw_event, event) in iter::zip(raw_events, events) {
84            room_info.handle_state_event(event);
85
86            if let AnySyncStateEvent::RoomMember(member) = event {
87                dispatch_room_member(
88                    context,
89                    &room_info.room_id,
90                    member,
91                    ambiguity_cache,
92                    new_users,
93                )
94                .await?;
95            }
96
97            context
98                .state_changes
99                .state
100                .entry(room_info.room_id.to_owned())
101                .or_default()
102                .entry(event.event_type())
103                .or_default()
104                .insert(event.state_key().to_owned(), raw_event.clone());
105        }
106
107        Ok(())
108    }
109
110    /// Dispatch a [`RoomMemberEventContent>`] state event.
111    async fn dispatch_room_member<U>(
112        context: &mut Context,
113        room_id: &RoomId,
114        event: &SyncStateEvent<RoomMemberEventContent>,
115        ambiguity_cache: &mut AmbiguityCache,
116        new_users: &mut U,
117    ) -> StoreResult<()>
118    where
119        U: NewUsers,
120    {
121        ambiguity_cache.handle_event(&context.state_changes, room_id, event).await?;
122
123        match event.membership() {
124            MembershipState::Join | MembershipState::Invite => {
125                new_users.insert(event.state_key());
126            }
127            _ => (),
128        }
129
130        profiles::upsert_or_delete(context, room_id, event);
131
132        Ok(())
133    }
134
135    /// A trait to collect new users in [`dispatch`].
136    trait NewUsers {
137        /// Insert a new user in the collection of new users.
138        fn insert(&mut self, user_id: &UserId);
139    }
140
141    impl NewUsers for BTreeSet<OwnedUserId> {
142        fn insert(&mut self, user_id: &UserId) {
143            self.insert(user_id.to_owned());
144        }
145    }
146
147    impl NewUsers for () {
148        fn insert(&mut self, _user_id: &UserId) {}
149    }
150}
151
152/// Collect [`AnyStrippedStateEvent`].
153pub mod stripped {
154    use std::{collections::BTreeMap, iter};
155
156    use ruma::{events::AnyStrippedStateEvent, push::Action};
157    use tracing::instrument;
158
159    use super::{
160        super::{notification, timeline},
161        Context, Raw,
162    };
163    use crate::{Result, Room, RoomInfo};
164
165    /// Collect [`AnyStrippedStateEvent`] to [`AnyStrippedStateEvent`].
166    pub fn collect(
167        raw_events: &[Raw<AnyStrippedStateEvent>],
168    ) -> (Vec<Raw<AnyStrippedStateEvent>>, Vec<AnyStrippedStateEvent>) {
169        super::collect(raw_events)
170    }
171
172    /// Dispatch the stripped state events.
173    ///
174    /// `raw_events` and `events` must be generated from [`collect`].
175    /// Events must be exactly the same list of events that are in
176    /// `raw_events`, but deserialised. We demand them here to avoid
177    /// deserialising multiple times.
178    ///
179    /// Dispatch the stripped state events in `invite_state` or `knock_state`,
180    /// modifying the room's info and posting notifications as needed.
181    ///
182    /// * `raw_events` and `events` - The contents of `invite_state` in the form
183    ///   of list of pairs of raw stripped state events with their deserialized
184    ///   counterpart.
185    /// * `room` - The [`Room`] to modify.
186    /// * `room_info` - The current room's info.
187    /// * `notifications` - Notifications to post for the current room.
188    #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
189    pub(crate) async fn dispatch_invite_or_knock(
190        context: &mut Context,
191        (raw_events, events): (&[Raw<AnyStrippedStateEvent>], &[AnyStrippedStateEvent]),
192        room: &Room,
193        room_info: &mut RoomInfo,
194        mut notification: notification::Notification<'_>,
195    ) -> Result<()> {
196        let mut state_events = BTreeMap::new();
197
198        for (raw_event, event) in iter::zip(raw_events, events) {
199            room_info.handle_stripped_state_event(event);
200            state_events
201                .entry(event.event_type())
202                .or_insert_with(BTreeMap::new)
203                .insert(event.state_key().to_owned(), raw_event.clone());
204        }
205
206        context
207            .state_changes
208            .stripped_state
209            .insert(room_info.room_id().to_owned(), state_events.clone());
210
211        // We need to check for notifications after we have handled all state
212        // events, to make sure we have the full push context.
213        if let Some(push_condition_room_ctx) =
214            timeline::get_push_room_context(context, room, room_info, notification.state_store)
215                .await?
216        {
217            let room_id = room.room_id();
218
219            // Check every event again for notification.
220            for event in state_events.values().flat_map(|map| map.values()) {
221                notification.push_notification_from_event_if(
222                    room_id,
223                    &push_condition_room_ctx,
224                    event,
225                    Action::should_notify,
226                );
227            }
228        }
229
230        Ok(())
231    }
232}
233
234fn collect<'a, I, T>(raw_events: I) -> (Vec<Raw<T>>, Vec<T>)
235where
236    I: IntoIterator<Item = &'a Raw<T>>,
237    T: Deserialize<'a> + 'a,
238{
239    raw_events
240        .into_iter()
241        .filter_map(|raw_event| match raw_event.deserialize() {
242            Ok(event) => Some((raw_event.clone(), event)),
243            Err(e) => {
244                warn!("Couldn't deserialize stripped state event: {e}");
245                None
246            }
247        })
248        .unzip()
249}
250
251#[cfg(test)]
252mod tests {
253    use matrix_sdk_test::{
254        async_test, event_factory::EventFactory, JoinedRoomBuilder, StateTestEvent,
255        SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
256    };
257    use ruma::{event_id, user_id};
258
259    use crate::test_utils::logged_in_base_client;
260
261    #[async_test]
262    async fn test_state_events_after_sync() {
263        // Given a room
264        let user_id = user_id!("@u:u.to");
265
266        let client = logged_in_base_client(Some(user_id)).await;
267        let mut sync_builder = SyncResponseBuilder::new();
268
269        let room_name = EventFactory::new()
270            .sender(user_id)
271            .room_topic("this is the test topic in the timeline")
272            .event_id(event_id!("$2"))
273            .into_raw_sync();
274
275        let response = sync_builder
276            .add_joined_room(
277                JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
278                    .add_timeline_event(room_name)
279                    .add_state_event(StateTestEvent::PowerLevels),
280            )
281            .build_sync_response();
282        client.receive_sync_response(response).await.unwrap();
283
284        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Just-created room not found!");
285
286        // ensure that we have the power levels
287        assert!(room.power_levels().await.is_ok());
288
289        // ensure that we have the topic
290        assert_eq!(room.topic().unwrap(), "this is the test topic in the timeline");
291    }
292}