Skip to main content

matrix_sdk_base/response_processors/
changes.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use eyeball::SharedObservable;
16use matrix_sdk_common::timer;
17use ruma::{
18    events::{GlobalAccountDataEventType, ignored_user_list::IgnoredUserListEvent},
19    serde::Raw,
20};
21use tokio::sync::MutexGuard;
22use tracing::{error, instrument, trace};
23
24use super::Context;
25use crate::{
26    Result, StoreError,
27    store::{BaseStateStore, StateStoreExt as _},
28};
29
30/// Save the [`StateChanges`] from the [`Context`] inside the [`BaseStateStore`]
31/// only! The changes aren't applied on the in-memory rooms.
32#[instrument(skip_all)]
33pub async fn save_only(
34    context: Context,
35    state_store: &BaseStateStore,
36    state_store_guard: &MutexGuard<'_, ()>,
37) -> Result<()> {
38    let _timer = timer!(tracing::Level::TRACE, "_method");
39
40    save_changes(&context, state_store, state_store_guard, None).await?;
41    broadcast_room_info_notable_updates(&context, state_store, state_store_guard)?;
42
43    Ok(())
44}
45
46/// Save the [`StateChanges`] from the [`Context`] inside the
47/// [`BaseStateStore`], and apply them on the in-memory rooms.
48#[instrument(skip_all)]
49pub async fn save_and_apply(
50    context: Context,
51    state_store: &BaseStateStore,
52    state_store_guard: &MutexGuard<'_, ()>,
53    ignore_user_list_changes: &SharedObservable<Vec<String>>,
54    sync_token: Option<String>,
55) -> Result<()> {
56    let _timer = timer!(tracing::Level::TRACE, "_method");
57
58    trace!("ready to submit changes to store");
59
60    let previous_ignored_user_list =
61        state_store.get_account_data_event_static().await.ok().flatten();
62
63    save_changes(&context, state_store, state_store_guard, sync_token).await?;
64    apply_changes(&context, ignore_user_list_changes, previous_ignored_user_list);
65    broadcast_room_info_notable_updates(&context, state_store, state_store_guard)?;
66
67    trace!("applied changes");
68
69    Ok(())
70}
71
72async fn save_changes(
73    context: &Context,
74    state_store: &BaseStateStore,
75    state_store_guard: &MutexGuard<'_, ()>,
76    sync_token: Option<String>,
77) -> Result<()> {
78    state_store.save_changes_with_guard(state_store_guard, &context.state_changes).await?;
79
80    if let Some(sync_token) = sync_token {
81        *state_store.sync_token.write().await = Some(sync_token);
82    }
83
84    Ok(())
85}
86
87fn apply_changes(
88    context: &Context,
89    ignore_user_list_changes: &SharedObservable<Vec<String>>,
90    previous_ignored_user_list: Option<Raw<IgnoredUserListEvent>>,
91) {
92    if let Some(event) =
93        context.state_changes.account_data.get(&GlobalAccountDataEventType::IgnoredUserList)
94    {
95        match event.deserialize_as_unchecked::<IgnoredUserListEvent>() {
96            Ok(event) => {
97                let user_ids: Vec<String> =
98                    event.content.ignored_users.keys().map(|id| id.to_string()).collect();
99
100                // Try to only trigger the observable if the ignored user list has changed,
101                // from the previous time we've seen it. If we couldn't load the previous event
102                // for any reason, always trigger.
103                if let Some(prev_user_ids) =
104                    previous_ignored_user_list.and_then(|raw| raw.deserialize().ok()).map(|event| {
105                        event
106                            .content
107                            .ignored_users
108                            .keys()
109                            .map(|id| id.to_string())
110                            .collect::<Vec<_>>()
111                    })
112                {
113                    if user_ids != prev_user_ids {
114                        ignore_user_list_changes.set(user_ids);
115                    }
116                } else {
117                    ignore_user_list_changes.set(user_ids);
118                }
119            }
120
121            Err(error) => {
122                error!("Failed to deserialize ignored user list event: {error}")
123            }
124        }
125    }
126}
127
128fn broadcast_room_info_notable_updates(
129    context: &Context,
130    state_store: &BaseStateStore,
131    state_store_guard: &MutexGuard<'_, ()>,
132) -> Result<()> {
133    for (room_id, room_info) in &context.state_changes.room_infos {
134        if let Some(room) = state_store.room(room_id) {
135            let room_info_notable_update_reasons =
136                context.room_info_notable_updates.get(room_id).copied().unwrap_or_default();
137            room.update_room_info_with_store_guard(state_store_guard, |_| {
138                (room_info.clone(), room_info_notable_update_reasons)
139            })
140            .map_err(StoreError::from)?;
141        }
142    }
143    Ok(())
144}