matrix_sdk_base/response_processors/
state_events.rs1use ruma::{events::AnySyncStateEvent, serde::Raw};
16use serde::Deserialize;
17use tracing::warn;
18
19use super::Context;
20
21pub mod sync {
23 use std::{collections::BTreeSet, iter};
24
25 use ruma::{
26 events::{
27 room::member::{MembershipState, RoomMemberEventContent},
28 AnySyncTimelineEvent, SyncStateEvent,
29 },
30 OwnedUserId, RoomId, UserId,
31 };
32 use tracing::instrument;
33
34 use super::{super::profiles, AnySyncStateEvent, Context, Raw};
35 use crate::{
36 store::{ambiguity_map::AmbiguityCache, Result as StoreResult},
37 RoomInfo,
38 };
39
40 pub fn collect(
42 raw_events: &[Raw<AnySyncStateEvent>],
43 ) -> (Vec<Raw<AnySyncStateEvent>>, Vec<AnySyncStateEvent>) {
44 super::collect(raw_events)
45 }
46
47 pub fn collect_from_timeline(
52 raw_events: &[Raw<AnySyncTimelineEvent>],
53 ) -> (Vec<Raw<AnySyncStateEvent>>, Vec<AnySyncStateEvent>) {
54 super::collect(raw_events.iter().filter_map(|raw_event| {
55 match raw_event.get_field::<&str>("state_key") {
57 Ok(Some(_)) => Some(raw_event.cast_ref()),
58 _ => None,
59 }
60 }))
61 }
62
63 #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
73 pub async fn dispatch<U>(
74 context: &mut Context,
75 (raw_events, events): (&[Raw<AnySyncStateEvent>], &[AnySyncStateEvent]),
76 room_info: &mut RoomInfo,
77 ambiguity_cache: &mut AmbiguityCache,
78 new_users: &mut U,
79 ) -> StoreResult<()>
80 where
81 U: NewUsers,
82 {
83 for (raw_event, event) in iter::zip(raw_events, events) {
84 room_info.handle_state_event(event);
85
86 if let AnySyncStateEvent::RoomMember(member) = event {
87 dispatch_room_member(
88 context,
89 &room_info.room_id,
90 member,
91 ambiguity_cache,
92 new_users,
93 )
94 .await?;
95 }
96
97 context
98 .state_changes
99 .state
100 .entry(room_info.room_id.to_owned())
101 .or_default()
102 .entry(event.event_type())
103 .or_default()
104 .insert(event.state_key().to_owned(), raw_event.clone());
105 }
106
107 Ok(())
108 }
109
110 async fn dispatch_room_member<U>(
112 context: &mut Context,
113 room_id: &RoomId,
114 event: &SyncStateEvent<RoomMemberEventContent>,
115 ambiguity_cache: &mut AmbiguityCache,
116 new_users: &mut U,
117 ) -> StoreResult<()>
118 where
119 U: NewUsers,
120 {
121 ambiguity_cache.handle_event(&context.state_changes, room_id, event).await?;
122
123 match event.membership() {
124 MembershipState::Join | MembershipState::Invite => {
125 new_users.insert(event.state_key());
126 }
127 _ => (),
128 }
129
130 profiles::upsert_or_delete(context, room_id, event);
131
132 Ok(())
133 }
134
135 trait NewUsers {
137 fn insert(&mut self, user_id: &UserId);
139 }
140
141 impl NewUsers for BTreeSet<OwnedUserId> {
142 fn insert(&mut self, user_id: &UserId) {
143 self.insert(user_id.to_owned());
144 }
145 }
146
147 impl NewUsers for () {
148 fn insert(&mut self, _user_id: &UserId) {}
149 }
150}
151
152pub mod stripped {
154 use std::{collections::BTreeMap, iter};
155
156 use ruma::{events::AnyStrippedStateEvent, push::Action};
157 use tracing::instrument;
158
159 use super::{
160 super::{notification, timeline},
161 Context, Raw,
162 };
163 use crate::{Result, Room, RoomInfo};
164
165 pub fn collect(
167 raw_events: &[Raw<AnyStrippedStateEvent>],
168 ) -> (Vec<Raw<AnyStrippedStateEvent>>, Vec<AnyStrippedStateEvent>) {
169 super::collect(raw_events)
170 }
171
172 #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
189 pub(crate) async fn dispatch_invite_or_knock(
190 context: &mut Context,
191 (raw_events, events): (&[Raw<AnyStrippedStateEvent>], &[AnyStrippedStateEvent]),
192 room: &Room,
193 room_info: &mut RoomInfo,
194 mut notification: notification::Notification<'_>,
195 ) -> Result<()> {
196 let mut state_events = BTreeMap::new();
197
198 for (raw_event, event) in iter::zip(raw_events, events) {
199 room_info.handle_stripped_state_event(event);
200 state_events
201 .entry(event.event_type())
202 .or_insert_with(BTreeMap::new)
203 .insert(event.state_key().to_owned(), raw_event.clone());
204 }
205
206 context
207 .state_changes
208 .stripped_state
209 .insert(room_info.room_id().to_owned(), state_events.clone());
210
211 if let Some(push_condition_room_ctx) =
214 timeline::get_push_room_context(context, room, room_info, notification.state_store)
215 .await?
216 {
217 let room_id = room.room_id();
218
219 for event in state_events.values().flat_map(|map| map.values()) {
221 notification.push_notification_from_event_if(
222 room_id,
223 &push_condition_room_ctx,
224 event,
225 Action::should_notify,
226 );
227 }
228 }
229
230 Ok(())
231 }
232}
233
234fn collect<'a, I, T>(raw_events: I) -> (Vec<Raw<T>>, Vec<T>)
235where
236 I: IntoIterator<Item = &'a Raw<T>>,
237 T: Deserialize<'a> + 'a,
238{
239 raw_events
240 .into_iter()
241 .filter_map(|raw_event| match raw_event.deserialize() {
242 Ok(event) => Some((raw_event.clone(), event)),
243 Err(e) => {
244 warn!("Couldn't deserialize stripped state event: {e}");
245 None
246 }
247 })
248 .unzip()
249}
250
251#[cfg(test)]
252mod tests {
253 use matrix_sdk_test::{
254 async_test, event_factory::EventFactory, JoinedRoomBuilder, StateTestEvent,
255 SyncResponseBuilder, DEFAULT_TEST_ROOM_ID,
256 };
257 use ruma::{event_id, user_id};
258
259 use crate::test_utils::logged_in_base_client;
260
261 #[async_test]
262 async fn test_state_events_after_sync() {
263 let user_id = user_id!("@u:u.to");
265
266 let client = logged_in_base_client(Some(user_id)).await;
267 let mut sync_builder = SyncResponseBuilder::new();
268
269 let room_name = EventFactory::new()
270 .sender(user_id)
271 .room_topic("this is the test topic in the timeline")
272 .event_id(event_id!("$2"))
273 .into_raw_sync();
274
275 let response = sync_builder
276 .add_joined_room(
277 JoinedRoomBuilder::new(&DEFAULT_TEST_ROOM_ID)
278 .add_timeline_event(room_name)
279 .add_state_event(StateTestEvent::PowerLevels),
280 )
281 .build_sync_response();
282 client.receive_sync_response(response).await.unwrap();
283
284 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Just-created room not found!");
285
286 assert!(room.power_levels().await.is_ok());
288
289 assert_eq!(room.topic().unwrap(), "this is the test topic in the timeline");
291 }
292}