matrix_sdk_base/response_processors/
timeline.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use matrix_sdk_common::{deserialized_responses::TimelineEvent, timer};
16#[cfg(feature = "e2e-encryption")]
17use ruma::events::SyncMessageLikeEvent;
18use ruma::{
19    MilliSecondsSinceUnixEpoch, UInt, UserId, assign,
20    events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent},
21    push::{Action, PushConditionRoomCtx},
22};
23use tracing::{instrument, trace, warn};
24
25use super::{Context, notification};
26#[cfg(feature = "e2e-encryption")]
27use super::{e2ee, verification};
28use crate::{Result, Room, RoomInfo, sync::Timeline};
29
30/// Process a set of sync timeline event, and create a [`Timeline`].
31///
32/// For each event:
33/// - will try to decrypt it,
34/// - will fix the `origin_server_ts` if considered invalid,
35/// - will process verification,
36/// - will process redaction,
37/// - will process notification.
38#[allow(clippy::extra_unused_lifetimes)]
39#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
40pub async fn build<'notification, 'e2ee>(
41    context: &mut Context,
42    room: &Room,
43    room_info: &mut RoomInfo,
44    timeline_inputs: builder::Timeline,
45    mut notification: notification::Notification<'notification>,
46    #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'e2ee>,
47) -> Result<Timeline> {
48    let _timer = timer!(tracing::Level::TRACE, "build a timeline from sync");
49
50    let now = MilliSecondsSinceUnixEpoch::now();
51    let mut timeline = Timeline::new(timeline_inputs.limited, timeline_inputs.prev_batch);
52    let mut push_condition_room_ctx = get_push_room_context(context, room, room_info).await?;
53    let room_id = room.room_id();
54
55    for raw_event in timeline_inputs.raw_events {
56        // Start by assuming we have a plaintext event. We'll replace it with a
57        // decrypted or UTD event below if necessary.
58        let mut timeline_event = TimelineEvent::from_plaintext_with_max_timestamp(raw_event, now);
59
60        // Do some special stuff on the `timeline_event` before collecting it.
61        match timeline_event.raw().deserialize() {
62            Ok(sync_timeline_event) => {
63                match &sync_timeline_event {
64                    // State events are ignored. They must be processed separately.
65                    AnySyncTimelineEvent::State(_) => {
66                        // do nothing
67                    }
68
69                    // A room redaction.
70                    AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
71                        redaction_event,
72                    )) => {
73                        let redaction_rules = room_info.room_version_rules_or_default().redaction;
74
75                        if let Some(redacts) = redaction_event.redacts(&redaction_rules) {
76                            room_info.handle_redaction(
77                                redaction_event,
78                                timeline_event.raw().cast_ref_unchecked(),
79                            );
80
81                            context.state_changes.add_redaction(
82                                room_id,
83                                redacts,
84                                timeline_event.raw().clone().cast_unchecked(),
85                            );
86                        }
87                    }
88
89                    // Decrypt encrypted event, or process verification event.
90                    #[cfg(feature = "e2e-encryption")]
91                    AnySyncTimelineEvent::MessageLike(sync_message_like_event) => {
92                        match sync_message_like_event {
93                            AnySyncMessageLikeEvent::RoomEncrypted(
94                                SyncMessageLikeEvent::Original(_),
95                            ) => {
96                                if let Some(decrypted_timeline_event) =
97                                    Box::pin(e2ee::decrypt::sync_timeline_event(
98                                        e2ee.clone(),
99                                        &timeline_event,
100                                        room_id,
101                                    ))
102                                    .await?
103                                {
104                                    timeline_event = decrypted_timeline_event;
105                                }
106                            }
107
108                            _ => {
109                                Box::pin(verification::process_if_relevant(
110                                    &sync_timeline_event,
111                                    e2ee.clone(),
112                                    room_id,
113                                ))
114                                .await?;
115                            }
116                        }
117                    }
118
119                    // Nothing particular to do.
120                    #[cfg(not(feature = "e2e-encryption"))]
121                    AnySyncTimelineEvent::MessageLike(_) => (),
122                }
123
124                if let Some(push_condition_room_ctx) = &mut push_condition_room_ctx {
125                    update_push_room_context(
126                        context,
127                        push_condition_room_ctx,
128                        room.own_user_id(),
129                        room_info,
130                    )
131                } else {
132                    push_condition_room_ctx =
133                        get_push_room_context(context, room, room_info).await?;
134                }
135
136                if let Some(push_condition_room_ctx) = &push_condition_room_ctx {
137                    let actions = notification
138                        .push_notification_from_event_if(
139                            push_condition_room_ctx,
140                            timeline_event.raw(),
141                            Action::should_notify,
142                        )
143                        .await;
144
145                    timeline_event.set_push_actions(actions.to_owned());
146                }
147            }
148            Err(error) => {
149                warn!("Error deserializing event: {error}");
150            }
151        }
152
153        // Finally, we have process the timeline event. We can collect it.
154        timeline.events.push(timeline_event);
155    }
156
157    Ok(timeline)
158}
159
160/// Set of types used by [`build`] to reduce the number of arguments by grouping
161/// them by thematics.
162pub mod builder {
163    use ruma::{
164        api::client::sync::sync_events::{v3, v5},
165        events::AnySyncTimelineEvent,
166        serde::Raw,
167    };
168
169    pub struct Timeline {
170        pub limited: bool,
171        pub raw_events: Vec<Raw<AnySyncTimelineEvent>>,
172        pub prev_batch: Option<String>,
173    }
174
175    impl From<v3::Timeline> for Timeline {
176        fn from(value: v3::Timeline) -> Self {
177            Self { limited: value.limited, raw_events: value.events, prev_batch: value.prev_batch }
178        }
179    }
180
181    impl From<&v5::response::Room> for Timeline {
182        fn from(value: &v5::response::Room) -> Self {
183            Self {
184                limited: value.limited,
185                raw_events: value.timeline.clone(),
186                prev_batch: value.prev_batch.clone(),
187            }
188        }
189    }
190}
191
192/// Update the push context for the given room.
193///
194/// Updates the context data from `context.state_changes` or `room_info`.
195fn update_push_room_context(
196    context: &Context,
197    push_rules: &mut PushConditionRoomCtx,
198    user_id: &UserId,
199    room_info: &RoomInfo,
200) {
201    let room_id = &*room_info.room_id;
202
203    push_rules.member_count = UInt::new(room_info.active_members_count()).unwrap_or(UInt::MAX);
204
205    // TODO: Use if let chain once stable
206    if let Some(member) = context.state_changes.member(room_id, user_id) {
207        push_rules.user_display_name =
208            member.content.displayname.unwrap_or_else(|| user_id.localpart().to_owned())
209    }
210
211    if let Some(power_levels) = context.state_changes.power_levels(room_id) {
212        push_rules.power_levels = Some(power_levels.into());
213    }
214}
215
216/// Get the push context for the given room.
217///
218/// Tries to get the data from `changes` or the up to date `room_info`.
219/// Loads the data from the store otherwise.
220///
221/// Returns `None` if some data couldn't be found. This should only happen
222/// in brand new rooms, while we process its state.
223pub async fn get_push_room_context(
224    context: &Context,
225    room: &Room,
226    room_info: &RoomInfo,
227) -> Result<Option<PushConditionRoomCtx>> {
228    let room_id = room.room_id();
229    let user_id = room.own_user_id();
230
231    let member_count = room_info.active_members_count();
232
233    // TODO: Use if let chain once stable
234    let user_display_name = if let Some(member) = context.state_changes.member(room_id, user_id) {
235        member.content.displayname.unwrap_or_else(|| user_id.localpart().to_owned())
236    } else if let Some(member) = Box::pin(room.get_member(user_id)).await? {
237        member.name().to_owned()
238    } else {
239        trace!("Couldn't get push context because of missing own member information");
240        return Ok(None);
241    };
242
243    let power_levels = if let Some(power_levels) = context.state_changes.power_levels(room_id) {
244        Some(power_levels)
245    } else {
246        room.power_levels().await.ok()
247    };
248
249    Ok(Some(assign!(
250        PushConditionRoomCtx::new(
251            room_id.to_owned(),
252            UInt::new(member_count).unwrap_or(UInt::MAX),
253            user_id.to_owned(),
254            user_display_name
255        ),
256        { power_levels: power_levels.map(Into::into) }
257    )))
258}