matrix_sdk_base/response_processors/
changes.rs1use eyeball::SharedObservable;
16use matrix_sdk_common::timer;
17use ruma::{
18 events::{GlobalAccountDataEventType, ignored_user_list::IgnoredUserListEvent},
19 serde::Raw,
20};
21use tokio::sync::MutexGuard;
22use tracing::{error, instrument, trace};
23
24use super::Context;
25use crate::{
26 Result, StoreError,
27 store::{BaseStateStore, StateStoreExt as _},
28};
29
30#[instrument(skip_all)]
33pub async fn save_only(
34 context: Context,
35 state_store: &BaseStateStore,
36 state_store_guard: &MutexGuard<'_, ()>,
37) -> Result<()> {
38 let _timer = timer!(tracing::Level::TRACE, "_method");
39
40 save_changes(&context, state_store, state_store_guard, None).await?;
41 broadcast_room_info_notable_updates(&context, state_store, state_store_guard)?;
42
43 Ok(())
44}
45
46#[instrument(skip_all)]
49pub async fn save_and_apply(
50 context: Context,
51 state_store: &BaseStateStore,
52 state_store_guard: &MutexGuard<'_, ()>,
53 ignore_user_list_changes: &SharedObservable<Vec<String>>,
54 sync_token: Option<String>,
55) -> Result<()> {
56 let _timer = timer!(tracing::Level::TRACE, "_method");
57
58 trace!("ready to submit changes to store");
59
60 let previous_ignored_user_list =
61 state_store.get_account_data_event_static().await.ok().flatten();
62
63 save_changes(&context, state_store, state_store_guard, sync_token).await?;
64 apply_changes(&context, ignore_user_list_changes, previous_ignored_user_list);
65 broadcast_room_info_notable_updates(&context, state_store, state_store_guard)?;
66
67 trace!("applied changes");
68
69 Ok(())
70}
71
72async fn save_changes(
73 context: &Context,
74 state_store: &BaseStateStore,
75 state_store_guard: &MutexGuard<'_, ()>,
76 sync_token: Option<String>,
77) -> Result<()> {
78 state_store.save_changes_with_guard(state_store_guard, &context.state_changes).await?;
79
80 if let Some(sync_token) = sync_token {
81 *state_store.sync_token.write().await = Some(sync_token);
82 }
83
84 Ok(())
85}
86
87fn apply_changes(
88 context: &Context,
89 ignore_user_list_changes: &SharedObservable<Vec<String>>,
90 previous_ignored_user_list: Option<Raw<IgnoredUserListEvent>>,
91) {
92 if let Some(event) =
93 context.state_changes.account_data.get(&GlobalAccountDataEventType::IgnoredUserList)
94 {
95 match event.deserialize_as_unchecked::<IgnoredUserListEvent>() {
96 Ok(event) => {
97 let user_ids: Vec<String> =
98 event.content.ignored_users.keys().map(|id| id.to_string()).collect();
99
100 if let Some(prev_user_ids) =
104 previous_ignored_user_list.and_then(|raw| raw.deserialize().ok()).map(|event| {
105 event
106 .content
107 .ignored_users
108 .keys()
109 .map(|id| id.to_string())
110 .collect::<Vec<_>>()
111 })
112 {
113 if user_ids != prev_user_ids {
114 ignore_user_list_changes.set(user_ids);
115 }
116 } else {
117 ignore_user_list_changes.set(user_ids);
118 }
119 }
120
121 Err(error) => {
122 error!("Failed to deserialize ignored user list event: {error}")
123 }
124 }
125 }
126}
127
128fn broadcast_room_info_notable_updates(
129 context: &Context,
130 state_store: &BaseStateStore,
131 state_store_guard: &MutexGuard<'_, ()>,
132) -> Result<()> {
133 for (room_id, room_info) in &context.state_changes.room_infos {
134 if let Some(room) = state_store.room(room_id) {
135 let room_info_notable_update_reasons =
136 context.room_info_notable_updates.get(room_id).copied().unwrap_or_default();
137 room.update_room_info_with_store_guard(state_store_guard, |_| {
138 (room_info.clone(), room_info_notable_update_reasons)
139 })
140 .map_err(StoreError::from)?;
141 }
142 }
143 Ok(())
144}