Skip to main content

matrix_sdk_base/response_processors/room/
sync_v2.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16
17use ruma::{
18    OwnedRoomId, OwnedUserId, RoomId, UserId,
19    api::client::sync::sync_events::v3::{
20        InvitedRoom, JoinedRoom, KnockedRoom, LeftRoom, State as RumaState,
21    },
22};
23use tracing::error;
24
25#[cfg(feature = "e2e-encryption")]
26use super::super::e2ee;
27use super::{
28    super::{Context, account_data, ephemeral_events, notification, state_events, timeline},
29    RoomCreationData,
30};
31use crate::{
32    Result, RoomState,
33    sync::{InvitedRoomUpdate, JoinedRoomUpdate, KnockedRoomUpdate, LeftRoomUpdate, State},
34};
35
36/// Process updates of a joined room.
37#[allow(clippy::too_many_arguments)]
38pub async fn update_joined_room(
39    context: &mut Context,
40    room_creation_data: RoomCreationData<'_>,
41    joined_room: JoinedRoom,
42    updated_members_in_room: &mut BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>>,
43    notification: notification::Notification<'_>,
44    #[cfg(feature = "e2e-encryption")] e2ee: &e2ee::E2EE<'_>,
45) -> Result<JoinedRoomUpdate> {
46    let RoomCreationData { room_id, requested_required_states, ambiguity_cache } =
47        room_creation_data;
48
49    let state_store = notification.state_store;
50
51    let room = state_store.get_or_create_room(room_id, RoomState::Joined);
52
53    let mut room_info = room.clone_info();
54
55    room_info.mark_as_joined();
56    room_info.update_from_ruma_summary(&joined_room.summary);
57    room_info.set_prev_batch(joined_room.timeline.prev_batch.as_deref());
58    room_info.mark_state_fully_synced();
59    room_info.handle_encryption_state(requested_required_states.for_room(room_id));
60
61    let mut new_user_ids = BTreeSet::new();
62
63    let state = State::from_sync_v2(joined_room.state);
64    let raw_state_events = state.collect(&joined_room.timeline.events);
65
66    state_events::sync::dispatch(
67        context,
68        raw_state_events,
69        &mut room_info,
70        ambiguity_cache,
71        &mut new_user_ids,
72        state_store,
73        #[cfg(feature = "experimental-encrypted-state-events")]
74        e2ee,
75    )
76    .await?;
77
78    ephemeral_events::dispatch(context, &joined_room.ephemeral.events, room_id);
79
80    if joined_room.timeline.limited {
81        room_info.mark_members_missing();
82    }
83
84    #[cfg(feature = "e2e-encryption")]
85    let olm_machine = e2ee.olm_machine;
86
87    let timeline = timeline::build(
88        context,
89        &room,
90        &mut room_info,
91        timeline::builder::Timeline::from(joined_room.timeline),
92        notification,
93        #[cfg(feature = "e2e-encryption")]
94        e2ee,
95    )
96    .await?;
97
98    // Save the new `RoomInfo`.
99    context.state_changes.add_room(room_info);
100
101    account_data::for_room(context, room_id, &joined_room.account_data.events, state_store);
102
103    // `processors::account_data::from_room` might have updated the `RoomInfo`.
104    // Let's fetch it again.
105    //
106    // SAFETY: `expect` is safe because the `RoomInfo` has been inserted 2 lines
107    // above.
108    let mut room_info = context
109        .state_changes
110        .room_infos
111        .get(room_id)
112        .expect("`RoomInfo` must exist in `StateChanges` at this point")
113        .clone();
114
115    #[cfg(feature = "e2e-encryption")]
116    e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted(
117        olm_machine,
118        &new_user_ids,
119        room_info.encryption_state(),
120        room.encryption_state(),
121        room_id,
122        state_store,
123    )
124    .await?;
125
126    updated_members_in_room.insert(room_id.to_owned(), new_user_ids);
127
128    let notification_count = joined_room.unread_notifications.into();
129    room_info.update_notification_count(notification_count);
130
131    context.state_changes.add_room(room_info);
132
133    Ok(JoinedRoomUpdate::new(
134        timeline,
135        state,
136        joined_room.account_data.events,
137        joined_room.ephemeral.events,
138        notification_count,
139        ambiguity_cache.changes.remove(room_id).unwrap_or_default(),
140    ))
141}
142
143/// Process historical updates of a left room.
144#[allow(clippy::too_many_arguments)]
145pub async fn update_left_room(
146    context: &mut Context,
147    room_creation_data: RoomCreationData<'_>,
148    left_room: LeftRoom,
149    notification: notification::Notification<'_>,
150    #[cfg(feature = "e2e-encryption")] e2ee: &e2ee::E2EE<'_>,
151) -> Result<LeftRoomUpdate> {
152    let RoomCreationData { room_id, requested_required_states, ambiguity_cache } =
153        room_creation_data;
154
155    #[cfg(feature = "e2e-encryption")]
156    let olm_machine = e2ee.olm_machine;
157
158    let state_store = notification.state_store;
159
160    let room = state_store.get_or_create_room(room_id, RoomState::Left);
161
162    let mut room_info = room.clone_info();
163    room_info.mark_as_left();
164    room_info.mark_state_partially_synced();
165    room_info.handle_encryption_state(requested_required_states.for_room(room_id));
166
167    let state = State::from_sync_v2(left_room.state);
168    let raw_state_events = state.collect(&left_room.timeline.events);
169
170    state_events::sync::dispatch(
171        context,
172        raw_state_events,
173        &mut room_info,
174        ambiguity_cache,
175        &mut (),
176        state_store,
177        #[cfg(feature = "experimental-encrypted-state-events")]
178        e2ee,
179    )
180    .await?;
181
182    let timeline = timeline::build(
183        context,
184        &room,
185        &mut room_info,
186        timeline::builder::Timeline::from(left_room.timeline),
187        notification,
188        #[cfg(feature = "e2e-encryption")]
189        e2ee,
190    )
191    .await?;
192
193    // Since we are no longer joined to the room, we cannot be waiting for a key
194    // bundle: clear any flag that we are.
195    #[cfg(feature = "e2e-encryption")]
196    if let Some(olm_machine) = olm_machine {
197        olm_machine.store().clear_room_pending_key_bundle(room_info.room_id()).await?
198    }
199
200    // Save the new `RoomInfo`.
201    context.state_changes.add_room(room_info);
202
203    account_data::for_room(context, room_id, &left_room.account_data.events, state_store);
204
205    let ambiguity_changes = ambiguity_cache.changes.remove(room_id).unwrap_or_default();
206
207    Ok(LeftRoomUpdate::new(timeline, state, left_room.account_data.events, ambiguity_changes))
208}
209
210/// Process updates of an invited room.
211pub async fn update_invited_room(
212    context: &mut Context,
213    room_id: &RoomId,
214    user_id: &UserId,
215    invited_room: InvitedRoom,
216    notification: notification::Notification<'_>,
217    #[cfg(feature = "e2e-encryption")] e2ee: &e2ee::E2EE<'_>,
218) -> Result<InvitedRoomUpdate> {
219    let state_store = notification.state_store;
220
221    let room = state_store.get_or_create_room(room_id, RoomState::Invited);
222
223    let raw_state_events = state_events::stripped::collect(&invited_room.invite_state.events);
224
225    let mut room_info = room.clone_info();
226    room_info.mark_as_invited();
227    room_info.mark_state_fully_synced();
228
229    state_events::stripped::dispatch_invite_or_knock(
230        context,
231        raw_state_events,
232        &room,
233        &mut room_info,
234        user_id,
235        notification,
236    )
237    .await?;
238
239    // Since we are no longer joined to the room, we cannot be waiting for a key
240    // bundle: clear any flag that we are.
241    #[cfg(feature = "e2e-encryption")]
242    if let Some(olm_machine) = e2ee.olm_machine {
243        olm_machine.store().clear_room_pending_key_bundle(room_info.room_id()).await?
244    }
245
246    context.state_changes.add_room(room_info);
247
248    Ok(invited_room)
249}
250
251/// Process updates of a knocked room.
252pub async fn update_knocked_room(
253    context: &mut Context,
254    room_id: &RoomId,
255    user_id: &UserId,
256    knocked_room: KnockedRoom,
257    notification: notification::Notification<'_>,
258    #[cfg(feature = "e2e-encryption")] e2ee: &e2ee::E2EE<'_>,
259) -> Result<KnockedRoomUpdate> {
260    let state_store = notification.state_store;
261
262    let room = state_store.get_or_create_room(room_id, RoomState::Knocked);
263
264    let raw_state_events = state_events::stripped::collect(&knocked_room.knock_state.events);
265
266    let mut room_info = room.clone_info();
267    room_info.mark_as_knocked();
268    room_info.mark_state_fully_synced();
269
270    state_events::stripped::dispatch_invite_or_knock(
271        context,
272        raw_state_events,
273        &room,
274        &mut room_info,
275        user_id,
276        notification,
277    )
278    .await?;
279
280    // Since we are not joined to the room, we cannot be waiting for a key
281    // bundle: clear any flag that we are.
282    #[cfg(feature = "e2e-encryption")]
283    if let Some(olm_machine) = e2ee.olm_machine {
284        olm_machine.store().clear_room_pending_key_bundle(room_info.room_id()).await?
285    }
286
287    context.state_changes.add_room(room_info);
288
289    Ok(knocked_room)
290}
291
292impl State {
293    /// Construct a [`State`] from the state changes for a joined or left room
294    /// from a response of the sync v2 endpoint.
295    fn from_sync_v2(state: RumaState) -> Self {
296        match state {
297            RumaState::Before(state) => Self::Before(state.events),
298            RumaState::After(state) => Self::After(state.events),
299            // We shouldn't receive other variants because they are opt-in.
300            state => {
301                error!("Unsupported State variant received for joined room: {state:?}");
302                Self::default()
303            }
304        }
305    }
306}