matrix_sdk_base/response_processors/room/
sync_v2.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16
17use ruma::{
18    api::client::sync::sync_events::v3::{InvitedRoom, JoinedRoom, KnockedRoom, LeftRoom},
19    OwnedRoomId, OwnedUserId, RoomId,
20};
21use tokio::sync::broadcast::Sender;
22
23#[cfg(feature = "e2e-encryption")]
24use super::super::e2ee;
25use super::{
26    super::{account_data, ephemeral_events, notification, state_events, timeline, Context},
27    RoomCreationData,
28};
29use crate::{
30    sync::{InvitedRoomUpdate, JoinedRoomUpdate, KnockedRoomUpdate, LeftRoomUpdate},
31    Result, RoomInfoNotableUpdate, RoomState,
32};
33
34/// Process updates of a joined room.
35#[allow(clippy::too_many_arguments)]
36pub async fn update_joined_room(
37    context: &mut Context,
38    room_creation_data: RoomCreationData<'_>,
39    joined_room: JoinedRoom,
40    updated_members_in_room: &mut BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>>,
41    notification: notification::Notification<'_>,
42    #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>,
43) -> Result<JoinedRoomUpdate> {
44    let RoomCreationData {
45        room_id,
46        room_info_notable_update_sender,
47        requested_required_states,
48        ambiguity_cache,
49    } = room_creation_data;
50
51    let state_store = notification.state_store;
52
53    let room =
54        state_store.get_or_create_room(room_id, RoomState::Joined, room_info_notable_update_sender);
55
56    let mut room_info = room.clone_info();
57
58    room_info.mark_as_joined();
59    room_info.update_from_ruma_summary(&joined_room.summary);
60    room_info.set_prev_batch(joined_room.timeline.prev_batch.as_deref());
61    room_info.mark_state_fully_synced();
62    room_info.handle_encryption_state(requested_required_states.for_room(room_id));
63
64    let (raw_state_events, state_events) = state_events::sync::collect(&joined_room.state.events);
65
66    let mut new_user_ids = BTreeSet::new();
67
68    state_events::sync::dispatch(
69        context,
70        (&raw_state_events, &state_events),
71        &mut room_info,
72        ambiguity_cache,
73        &mut new_user_ids,
74        state_store,
75    )
76    .await?;
77
78    ephemeral_events::dispatch(context, &joined_room.ephemeral.events, room_id);
79
80    if joined_room.timeline.limited {
81        room_info.mark_members_missing();
82    }
83
84    let (raw_state_events_from_timeline, state_events_from_timeline) =
85        state_events::sync::collect_from_timeline(&joined_room.timeline.events);
86
87    state_events::sync::dispatch(
88        context,
89        (&raw_state_events_from_timeline, &state_events_from_timeline),
90        &mut room_info,
91        ambiguity_cache,
92        &mut new_user_ids,
93        state_store,
94    )
95    .await?;
96
97    #[cfg(feature = "e2e-encryption")]
98    let olm_machine = e2ee.olm_machine;
99
100    let timeline = timeline::build(
101        context,
102        &room,
103        &mut room_info,
104        timeline::builder::Timeline::from(joined_room.timeline),
105        notification,
106        #[cfg(feature = "e2e-encryption")]
107        e2ee,
108    )
109    .await?;
110
111    // Save the new `RoomInfo`.
112    context.state_changes.add_room(room_info);
113
114    account_data::for_room(context, room_id, &joined_room.account_data.events, state_store).await;
115
116    // `processors::account_data::from_room` might have updated the `RoomInfo`.
117    // Let's fetch it again.
118    //
119    // SAFETY: `expect` is safe because the `RoomInfo` has been inserted 2 lines
120    // above.
121    let mut room_info = context
122        .state_changes
123        .room_infos
124        .get(room_id)
125        .expect("`RoomInfo` must exist in `StateChanges` at this point")
126        .clone();
127
128    #[cfg(feature = "e2e-encryption")]
129    e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted(
130        olm_machine,
131        &new_user_ids,
132        room_info.encryption_state(),
133        room.encryption_state(),
134        room_id,
135        state_store,
136    )
137    .await?;
138
139    updated_members_in_room.insert(room_id.to_owned(), new_user_ids);
140
141    let notification_count = joined_room.unread_notifications.into();
142    room_info.update_notification_count(notification_count);
143
144    context.state_changes.add_room(room_info);
145
146    Ok(JoinedRoomUpdate::new(
147        timeline,
148        joined_room.state.events,
149        joined_room.account_data.events,
150        joined_room.ephemeral.events,
151        notification_count,
152        ambiguity_cache.changes.remove(room_id).unwrap_or_default(),
153    ))
154}
155
156/// Process historical updates of a left room.
157#[allow(clippy::too_many_arguments)]
158pub async fn update_left_room(
159    context: &mut Context,
160    room_creation_data: RoomCreationData<'_>,
161    left_room: LeftRoom,
162    notification: notification::Notification<'_>,
163    #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>,
164) -> Result<LeftRoomUpdate> {
165    let RoomCreationData {
166        room_id,
167        room_info_notable_update_sender,
168        requested_required_states,
169        ambiguity_cache,
170    } = room_creation_data;
171
172    let state_store = notification.state_store;
173
174    let room =
175        state_store.get_or_create_room(room_id, RoomState::Left, room_info_notable_update_sender);
176
177    let mut room_info = room.clone_info();
178    room_info.mark_as_left();
179    room_info.mark_state_partially_synced();
180    room_info.handle_encryption_state(requested_required_states.for_room(room_id));
181
182    let (raw_state_events, state_events) = state_events::sync::collect(&left_room.state.events);
183
184    state_events::sync::dispatch(
185        context,
186        (&raw_state_events, &state_events),
187        &mut room_info,
188        ambiguity_cache,
189        &mut (),
190        state_store,
191    )
192    .await?;
193
194    let (raw_state_events_from_timeline, state_events_from_timeline) =
195        state_events::sync::collect_from_timeline(&left_room.timeline.events);
196
197    state_events::sync::dispatch(
198        context,
199        (&raw_state_events_from_timeline, &state_events_from_timeline),
200        &mut room_info,
201        ambiguity_cache,
202        &mut (),
203        state_store,
204    )
205    .await?;
206
207    let timeline = timeline::build(
208        context,
209        &room,
210        &mut room_info,
211        timeline::builder::Timeline::from(left_room.timeline),
212        notification,
213        #[cfg(feature = "e2e-encryption")]
214        e2ee,
215    )
216    .await?;
217
218    // Save the new `RoomInfo`.
219    context.state_changes.add_room(room_info);
220
221    account_data::for_room(context, room_id, &left_room.account_data.events, state_store).await;
222
223    let ambiguity_changes = ambiguity_cache.changes.remove(room_id).unwrap_or_default();
224
225    Ok(LeftRoomUpdate::new(
226        timeline,
227        left_room.state.events,
228        left_room.account_data.events,
229        ambiguity_changes,
230    ))
231}
232
233/// Process updates of an invited room.
234pub async fn update_invited_room(
235    context: &mut Context,
236    room_id: &RoomId,
237    invited_room: InvitedRoom,
238    room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
239    notification: notification::Notification<'_>,
240) -> Result<InvitedRoomUpdate> {
241    let state_store = notification.state_store;
242
243    let room = state_store.get_or_create_room(
244        room_id,
245        RoomState::Invited,
246        room_info_notable_update_sender,
247    );
248
249    let (raw_events, events) = state_events::stripped::collect(&invited_room.invite_state.events);
250
251    let mut room_info = room.clone_info();
252    room_info.mark_as_invited();
253    room_info.mark_state_fully_synced();
254
255    state_events::stripped::dispatch_invite_or_knock(
256        context,
257        (&raw_events, &events),
258        &room,
259        &mut room_info,
260        notification,
261    )
262    .await?;
263
264    context.state_changes.add_room(room_info);
265
266    Ok(invited_room)
267}
268
269/// Process updates of a knocked room.
270pub async fn update_knocked_room(
271    context: &mut Context,
272    room_id: &RoomId,
273    knocked_room: KnockedRoom,
274    room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
275    notification: notification::Notification<'_>,
276) -> Result<KnockedRoomUpdate> {
277    let state_store = notification.state_store;
278
279    let room = state_store.get_or_create_room(
280        room_id,
281        RoomState::Knocked,
282        room_info_notable_update_sender,
283    );
284
285    let (raw_events, events) = state_events::stripped::collect(&knocked_room.knock_state.events);
286
287    let mut room_info = room.clone_info();
288    room_info.mark_as_knocked();
289    room_info.mark_state_fully_synced();
290
291    state_events::stripped::dispatch_invite_or_knock(
292        context,
293        (&raw_events, &events),
294        &room,
295        &mut room_info,
296        notification,
297    )
298    .await?;
299
300    context.state_changes.add_room(room_info);
301
302    Ok(knocked_room)
303}