matrix_sdk_base/response_processors/room/
sync_v2.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16
17use ruma::{
18    api::client::sync::sync_events::v3::{InvitedRoom, JoinedRoom, KnockedRoom, LeftRoom},
19    OwnedRoomId, OwnedUserId, RoomId,
20};
21use tokio::sync::broadcast::Sender;
22
23#[cfg(feature = "e2e-encryption")]
24use super::super::e2ee;
25use super::{
26    super::{account_data, ephemeral_events, notification, state_events, timeline, Context},
27    RoomCreationData,
28};
29use crate::{
30    sync::{InvitedRoomUpdate, JoinedRoomUpdate, KnockedRoomUpdate, LeftRoomUpdate},
31    Result, RoomInfoNotableUpdate, RoomState,
32};
33
34/// Process updates of a joined room.
35#[allow(clippy::too_many_arguments)]
36pub async fn update_joined_room(
37    context: &mut Context,
38    room_creation_data: RoomCreationData<'_>,
39    joined_room: JoinedRoom,
40    updated_members_in_room: &mut BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>>,
41    notification: notification::Notification<'_>,
42    #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>,
43) -> Result<JoinedRoomUpdate> {
44    let RoomCreationData {
45        room_id,
46        room_info_notable_update_sender,
47        requested_required_states,
48        ambiguity_cache,
49    } = room_creation_data;
50
51    let state_store = notification.state_store;
52
53    let room =
54        state_store.get_or_create_room(room_id, RoomState::Joined, room_info_notable_update_sender);
55
56    let mut room_info = room.clone_info();
57
58    room_info.mark_as_joined();
59    room_info.update_from_ruma_summary(&joined_room.summary);
60    room_info.set_prev_batch(joined_room.timeline.prev_batch.as_deref());
61    room_info.mark_state_fully_synced();
62    room_info.handle_encryption_state(requested_required_states.for_room(room_id));
63
64    let (raw_state_events, state_events) = state_events::sync::collect(&joined_room.state.events);
65
66    let mut new_user_ids = BTreeSet::new();
67
68    state_events::sync::dispatch(
69        context,
70        (&raw_state_events, &state_events),
71        &mut room_info,
72        ambiguity_cache,
73        &mut new_user_ids,
74    )
75    .await?;
76
77    ephemeral_events::dispatch(context, &joined_room.ephemeral.events, room_id);
78
79    if joined_room.timeline.limited {
80        room_info.mark_members_missing();
81    }
82
83    let (raw_state_events_from_timeline, state_events_from_timeline) =
84        state_events::sync::collect_from_timeline(&joined_room.timeline.events);
85
86    state_events::sync::dispatch(
87        context,
88        (&raw_state_events_from_timeline, &state_events_from_timeline),
89        &mut room_info,
90        ambiguity_cache,
91        &mut new_user_ids,
92    )
93    .await?;
94
95    #[cfg(feature = "e2e-encryption")]
96    let olm_machine = e2ee.olm_machine;
97
98    let timeline = timeline::build(
99        context,
100        &room,
101        &mut room_info,
102        timeline::builder::Timeline::from(joined_room.timeline),
103        notification,
104        #[cfg(feature = "e2e-encryption")]
105        e2ee,
106    )
107    .await?;
108
109    // Save the new `RoomInfo`.
110    context.state_changes.add_room(room_info);
111
112    account_data::for_room(context, room_id, &joined_room.account_data.events, state_store).await;
113
114    // `processors::account_data::from_room` might have updated the `RoomInfo`.
115    // Let's fetch it again.
116    //
117    // SAFETY: `expect` is safe because the `RoomInfo` has been inserted 2 lines
118    // above.
119    let mut room_info = context
120        .state_changes
121        .room_infos
122        .get(room_id)
123        .expect("`RoomInfo` must exist in `StateChanges` at this point")
124        .clone();
125
126    #[cfg(feature = "e2e-encryption")]
127    e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted(
128        olm_machine,
129        &new_user_ids,
130        room_info.encryption_state(),
131        room.encryption_state(),
132        room_id,
133        state_store,
134    )
135    .await?;
136
137    updated_members_in_room.insert(room_id.to_owned(), new_user_ids);
138
139    let notification_count = joined_room.unread_notifications.into();
140    room_info.update_notification_count(notification_count);
141
142    context.state_changes.add_room(room_info);
143
144    Ok(JoinedRoomUpdate::new(
145        timeline,
146        joined_room.state.events,
147        joined_room.account_data.events,
148        joined_room.ephemeral.events,
149        notification_count,
150        ambiguity_cache.changes.remove(room_id).unwrap_or_default(),
151    ))
152}
153
154/// Process historical updates of a left room.
155#[allow(clippy::too_many_arguments)]
156pub async fn update_left_room(
157    context: &mut Context,
158    room_creation_data: RoomCreationData<'_>,
159    left_room: LeftRoom,
160    notification: notification::Notification<'_>,
161    #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>,
162) -> Result<LeftRoomUpdate> {
163    let RoomCreationData {
164        room_id,
165        room_info_notable_update_sender,
166        requested_required_states,
167        ambiguity_cache,
168    } = room_creation_data;
169
170    let state_store = notification.state_store;
171
172    let room =
173        state_store.get_or_create_room(room_id, RoomState::Left, room_info_notable_update_sender);
174
175    let mut room_info = room.clone_info();
176    room_info.mark_as_left();
177    room_info.mark_state_partially_synced();
178    room_info.handle_encryption_state(requested_required_states.for_room(room_id));
179
180    let (raw_state_events, state_events) = state_events::sync::collect(&left_room.state.events);
181
182    state_events::sync::dispatch(
183        context,
184        (&raw_state_events, &state_events),
185        &mut room_info,
186        ambiguity_cache,
187        &mut (),
188    )
189    .await?;
190
191    let (raw_state_events_from_timeline, state_events_from_timeline) =
192        state_events::sync::collect_from_timeline(&left_room.timeline.events);
193
194    state_events::sync::dispatch(
195        context,
196        (&raw_state_events_from_timeline, &state_events_from_timeline),
197        &mut room_info,
198        ambiguity_cache,
199        &mut (),
200    )
201    .await?;
202
203    let timeline = timeline::build(
204        context,
205        &room,
206        &mut room_info,
207        timeline::builder::Timeline::from(left_room.timeline),
208        notification,
209        #[cfg(feature = "e2e-encryption")]
210        e2ee,
211    )
212    .await?;
213
214    // Save the new `RoomInfo`.
215    context.state_changes.add_room(room_info);
216
217    account_data::for_room(context, room_id, &left_room.account_data.events, state_store).await;
218
219    let ambiguity_changes = ambiguity_cache.changes.remove(room_id).unwrap_or_default();
220
221    Ok(LeftRoomUpdate::new(
222        timeline,
223        left_room.state.events,
224        left_room.account_data.events,
225        ambiguity_changes,
226    ))
227}
228
229/// Process updates of an invited room.
230pub async fn update_invited_room(
231    context: &mut Context,
232    room_id: &RoomId,
233    invited_room: InvitedRoom,
234    room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
235    notification: notification::Notification<'_>,
236) -> Result<InvitedRoomUpdate> {
237    let state_store = notification.state_store;
238
239    let room = state_store.get_or_create_room(
240        room_id,
241        RoomState::Invited,
242        room_info_notable_update_sender,
243    );
244
245    let (raw_events, events) = state_events::stripped::collect(&invited_room.invite_state.events);
246
247    let mut room_info = room.clone_info();
248    room_info.mark_as_invited();
249    room_info.mark_state_fully_synced();
250
251    state_events::stripped::dispatch_invite_or_knock(
252        context,
253        (&raw_events, &events),
254        &room,
255        &mut room_info,
256        notification,
257    )
258    .await?;
259
260    context.state_changes.add_room(room_info);
261
262    Ok(invited_room)
263}
264
265/// Process updates of a knocked room.
266pub async fn update_knocked_room(
267    context: &mut Context,
268    room_id: &RoomId,
269    knocked_room: KnockedRoom,
270    room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
271    notification: notification::Notification<'_>,
272) -> Result<KnockedRoomUpdate> {
273    let state_store = notification.state_store;
274
275    let room = state_store.get_or_create_room(
276        room_id,
277        RoomState::Knocked,
278        room_info_notable_update_sender,
279    );
280
281    let (raw_events, events) = state_events::stripped::collect(&knocked_room.knock_state.events);
282
283    let mut room_info = room.clone_info();
284    room_info.mark_as_knocked();
285    room_info.mark_state_fully_synced();
286
287    state_events::stripped::dispatch_invite_or_knock(
288        context,
289        (&raw_events, &events),
290        &room,
291        &mut room_info,
292        notification,
293    )
294    .await?;
295
296    context.state_changes.add_room(room_info);
297
298    Ok(knocked_room)
299}