matrix_sdk_base/response_processors/room/
sync_v2.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16
17use ruma::{
18    OwnedRoomId, OwnedUserId, RoomId,
19    api::client::sync::sync_events::v3::{
20        InvitedRoom, JoinedRoom, KnockedRoom, LeftRoom, State as RumaState,
21    },
22};
23use tokio::sync::broadcast::Sender;
24use tracing::error;
25
26#[cfg(feature = "e2e-encryption")]
27use super::super::e2ee;
28use super::{
29    super::{Context, account_data, ephemeral_events, notification, state_events, timeline},
30    RoomCreationData,
31};
32use crate::{
33    Result, RoomInfoNotableUpdate, RoomState,
34    sync::{InvitedRoomUpdate, JoinedRoomUpdate, KnockedRoomUpdate, LeftRoomUpdate, State},
35};
36
37/// Process updates of a joined room.
38#[allow(clippy::too_many_arguments)]
39pub async fn update_joined_room(
40    context: &mut Context,
41    room_creation_data: RoomCreationData<'_>,
42    joined_room: JoinedRoom,
43    updated_members_in_room: &mut BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>>,
44    notification: notification::Notification<'_>,
45    #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>,
46) -> Result<JoinedRoomUpdate> {
47    let RoomCreationData {
48        room_id,
49        room_info_notable_update_sender,
50        requested_required_states,
51        ambiguity_cache,
52    } = room_creation_data;
53
54    let state_store = notification.state_store;
55
56    let room =
57        state_store.get_or_create_room(room_id, RoomState::Joined, room_info_notable_update_sender);
58
59    let mut room_info = room.clone_info();
60
61    room_info.mark_as_joined();
62    room_info.update_from_ruma_summary(&joined_room.summary);
63    room_info.set_prev_batch(joined_room.timeline.prev_batch.as_deref());
64    room_info.mark_state_fully_synced();
65    room_info.handle_encryption_state(requested_required_states.for_room(room_id));
66
67    let mut new_user_ids = BTreeSet::new();
68
69    let state = State::from_sync_v2(joined_room.state);
70    let (raw_state_events, state_events) = state.collect(&joined_room.timeline.events);
71
72    state_events::sync::dispatch(
73        context,
74        (&raw_state_events, &state_events),
75        &mut room_info,
76        ambiguity_cache,
77        &mut new_user_ids,
78        state_store,
79        #[cfg(feature = "experimental-encrypted-state-events")]
80        e2ee.clone(),
81    )
82    .await?;
83
84    ephemeral_events::dispatch(context, &joined_room.ephemeral.events, room_id);
85
86    if joined_room.timeline.limited {
87        room_info.mark_members_missing();
88    }
89
90    #[cfg(feature = "e2e-encryption")]
91    let olm_machine = e2ee.olm_machine;
92
93    let timeline = timeline::build(
94        context,
95        &room,
96        &mut room_info,
97        timeline::builder::Timeline::from(joined_room.timeline),
98        notification,
99        #[cfg(feature = "e2e-encryption")]
100        e2ee,
101    )
102    .await?;
103
104    // Save the new `RoomInfo`.
105    context.state_changes.add_room(room_info);
106
107    account_data::for_room(context, room_id, &joined_room.account_data.events, state_store);
108
109    // `processors::account_data::from_room` might have updated the `RoomInfo`.
110    // Let's fetch it again.
111    //
112    // SAFETY: `expect` is safe because the `RoomInfo` has been inserted 2 lines
113    // above.
114    let mut room_info = context
115        .state_changes
116        .room_infos
117        .get(room_id)
118        .expect("`RoomInfo` must exist in `StateChanges` at this point")
119        .clone();
120
121    #[cfg(feature = "e2e-encryption")]
122    e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted(
123        olm_machine,
124        &new_user_ids,
125        room_info.encryption_state(),
126        room.encryption_state(),
127        room_id,
128        state_store,
129    )
130    .await?;
131
132    updated_members_in_room.insert(room_id.to_owned(), new_user_ids);
133
134    let notification_count = joined_room.unread_notifications.into();
135    room_info.update_notification_count(notification_count);
136
137    context.state_changes.add_room(room_info);
138
139    Ok(JoinedRoomUpdate::new(
140        timeline,
141        state,
142        joined_room.account_data.events,
143        joined_room.ephemeral.events,
144        notification_count,
145        ambiguity_cache.changes.remove(room_id).unwrap_or_default(),
146    ))
147}
148
149/// Process historical updates of a left room.
150#[allow(clippy::too_many_arguments)]
151pub async fn update_left_room(
152    context: &mut Context,
153    room_creation_data: RoomCreationData<'_>,
154    left_room: LeftRoom,
155    notification: notification::Notification<'_>,
156    #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>,
157) -> Result<LeftRoomUpdate> {
158    let RoomCreationData {
159        room_id,
160        room_info_notable_update_sender,
161        requested_required_states,
162        ambiguity_cache,
163    } = room_creation_data;
164
165    let state_store = notification.state_store;
166
167    let room =
168        state_store.get_or_create_room(room_id, RoomState::Left, room_info_notable_update_sender);
169
170    let mut room_info = room.clone_info();
171    room_info.mark_as_left();
172    room_info.mark_state_partially_synced();
173    room_info.handle_encryption_state(requested_required_states.for_room(room_id));
174
175    let state = State::from_sync_v2(left_room.state);
176    let (raw_state_events, state_events) = state.collect(&left_room.timeline.events);
177
178    state_events::sync::dispatch(
179        context,
180        (&raw_state_events, &state_events),
181        &mut room_info,
182        ambiguity_cache,
183        &mut (),
184        state_store,
185        #[cfg(feature = "experimental-encrypted-state-events")]
186        e2ee.clone(),
187    )
188    .await?;
189
190    let timeline = timeline::build(
191        context,
192        &room,
193        &mut room_info,
194        timeline::builder::Timeline::from(left_room.timeline),
195        notification,
196        #[cfg(feature = "e2e-encryption")]
197        e2ee,
198    )
199    .await?;
200
201    // Save the new `RoomInfo`.
202    context.state_changes.add_room(room_info);
203
204    account_data::for_room(context, room_id, &left_room.account_data.events, state_store);
205
206    let ambiguity_changes = ambiguity_cache.changes.remove(room_id).unwrap_or_default();
207
208    Ok(LeftRoomUpdate::new(timeline, state, left_room.account_data.events, ambiguity_changes))
209}
210
211/// Process updates of an invited room.
212pub async fn update_invited_room(
213    context: &mut Context,
214    room_id: &RoomId,
215    invited_room: InvitedRoom,
216    room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
217    notification: notification::Notification<'_>,
218) -> Result<InvitedRoomUpdate> {
219    let state_store = notification.state_store;
220
221    let room = state_store.get_or_create_room(
222        room_id,
223        RoomState::Invited,
224        room_info_notable_update_sender,
225    );
226
227    let (raw_events, events) = state_events::stripped::collect(&invited_room.invite_state.events);
228
229    let mut room_info = room.clone_info();
230    room_info.mark_as_invited();
231    room_info.mark_state_fully_synced();
232
233    state_events::stripped::dispatch_invite_or_knock(
234        context,
235        (&raw_events, &events),
236        &room,
237        &mut room_info,
238        notification,
239    )
240    .await?;
241
242    context.state_changes.add_room(room_info);
243
244    Ok(invited_room)
245}
246
247/// Process updates of a knocked room.
248pub async fn update_knocked_room(
249    context: &mut Context,
250    room_id: &RoomId,
251    knocked_room: KnockedRoom,
252    room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
253    notification: notification::Notification<'_>,
254) -> Result<KnockedRoomUpdate> {
255    let state_store = notification.state_store;
256
257    let room = state_store.get_or_create_room(
258        room_id,
259        RoomState::Knocked,
260        room_info_notable_update_sender,
261    );
262
263    let (raw_events, events) = state_events::stripped::collect(&knocked_room.knock_state.events);
264
265    let mut room_info = room.clone_info();
266    room_info.mark_as_knocked();
267    room_info.mark_state_fully_synced();
268
269    state_events::stripped::dispatch_invite_or_knock(
270        context,
271        (&raw_events, &events),
272        &room,
273        &mut room_info,
274        notification,
275    )
276    .await?;
277
278    context.state_changes.add_room(room_info);
279
280    Ok(knocked_room)
281}
282
283impl State {
284    /// Construct a [`State`] from the state changes for a joined or left room
285    /// from a response of the sync v2 endpoint.
286    fn from_sync_v2(state: RumaState) -> Self {
287        match state {
288            RumaState::Before(state) => Self::Before(state.events),
289            RumaState::After(state) => Self::After(state.events),
290            // We shouldn't receive other variants because they are opt-in.
291            state => {
292                error!("Unsupported State variant received for joined room: {state:?}");
293                Self::default()
294            }
295        }
296    }
297}