1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
mod typing_notice;
use matrix_sdk::{
Room, RoomMemberships, RoomState,
ruma::events::{
AnySyncStateEvent, AnySyncTimelineEvent,
room::member::{MembershipState, StrippedRoomMemberEvent},
},
};
use thiserror::Error;
use tracing::Instrument;
use crate::{CallbackError, InvitationDecision};
pub use typing_notice::TypingNoticeGuard;
const MAX_JOIN_DELAY_SECONDS: u64 = 3600;
#[derive(Error, Debug)]
pub enum JoinError {
#[error(
"Refusing to retry joining room due to expontential backoff delay being too large: {0}"
)]
BackOffTooLarge(u64),
}
#[derive(Clone)]
pub struct Rooms {
matrix_link: super::MatrixLink,
}
impl Rooms {
pub(super) fn new(matrix_link: super::MatrixLink) -> Self {
Self { matrix_link }
}
#[tracing::instrument(skip_all, name="own_display_name_in_room", fields(room_id = room.room_id().as_str()))]
pub async fn own_display_name_in_room(
&self,
room: &Room,
) -> matrix_sdk::Result<Option<String>> {
let members = room.members(RoomMemberships::JOIN).await?;
for member in members {
if !member.is_account_user() {
// Another user, not us.
continue;
}
return Ok(member.display_name().map(|s| s.to_owned()));
}
Ok(None)
}
/// Starts sending typing notices for the given room and returns a guard object.
///
/// If multiple callers invoke this method for the same room, only the first caller will start
/// the typing notice sending loop and it will remain active until all callers have released their guards.
///
/// When all guard objects for a given room have gone out of scope, the typing notice will be turned off.
#[tracing::instrument(skip_all, name="start_typing_notice", fields(room_id = room.room_id().as_str()))]
pub async fn start_typing_notice(&self, room: &Room) -> TypingNoticeGuard {
typing_notice::start_typing_notice(self.matrix_link.clone(), room).await
}
#[tracing::instrument(skip_all, name="join_with_retries", fields(room_id = room.room_id().as_str(), max_delay_seconds = ?max_delay_seconds))]
async fn join_with_retries(
&self,
room: &Room,
max_delay_seconds: Option<u64>,
) -> Result<(), JoinError> {
tracing::debug!("Joining room");
let mut delay = 2;
while let Err(err) = room.join().await {
// retry autojoin due to synapse sending invites, before the
// invited user can join for more information see
// https://github.com/matrix-org/synapse/issues/4345
tracing::warn!(?err, ?delay, "Failed to join. Retrying..",);
tokio::time::sleep(std::time::Duration::from_secs(delay)).await;
delay *= 2;
if let Some(max_delay_seconds) = max_delay_seconds
&& delay > max_delay_seconds
{
return Err(JoinError::BackOffTooLarge(delay));
}
}
tracing::info!("Successfully joined room");
Ok(())
}
/// Register a callback to be called when an invitation for the room arrives.
/// The callback is expected to return a decision as to whether the room should be joined or not.
pub fn on_invitation<F, Fut>(&self, callback: F)
where
F: FnOnce(StrippedRoomMemberEvent, Room) -> Fut + Send + 'static + Clone + Sync,
Fut: std::future::Future<Output = Result<InvitationDecision, CallbackError>>
+ Send
+ 'static,
{
let self_ref = self.clone();
let own_user_id = self.matrix_link.user_id().to_owned();
self.matrix_link.client().add_event_handler(
|room_member: StrippedRoomMemberEvent, room: Room| async move {
let event_span = tracing::error_span!(
"on_invitation",
room_id = room.room_id().as_str(),
sender_id = room_member.sender.as_str(),
decision = tracing::field::Empty,
);
{
let _enter = event_span.enter();
if room_member.state_key != own_user_id {
// Invite for someone else. Ignore.
return;
}
if room.state() != RoomState::Invited {
return;
}
tracing::debug!(
"Deciding how to respond to room invitation",
);
}
let decision = callback(room_member.clone(), room.clone()).instrument(event_span.clone()).await;
match decision {
Err(err) => {
let _enter = event_span.enter();
tracing::error!(
?err,
"Error while determining decision for joining. The invitation will be ignored",
);
}
Ok(status) => {
event_span.record("decision", format!("{:?}", status));
tracing::info!(
"Decision for joining {} (due to invitation from {}) is {:?}",
room.room_id(),
room_member.sender.clone().as_str(),
status,
);
match status {
InvitationDecision::Join => {
tokio::spawn(async move {
if let Err(err) = self_ref.join_with_retries(&room, Some(MAX_JOIN_DELAY_SECONDS)).await {
tracing::error!(?err, "Failed to join room");
} else {
tracing::info!("Accepted invitation and joined");
}
}.instrument(event_span));
}
InvitationDecision::Reject => {
tokio::spawn(async move {
let result = room.leave().await;
if let Err(err) = result {
tracing::error!(?err, "Failed to reject invitation");
} else {
tracing::info!("Rejected invitation and left");
}
}.instrument(event_span));
}
}
}
}
},
);
}
/// Register a callback to be called when a room has been joined.
pub fn on_joined<F, Fut>(&self, callback: F)
where
F: FnOnce(AnySyncTimelineEvent, Room) -> Fut + Send + 'static + Clone + Sync,
Fut: std::future::Future<Output = Result<(), CallbackError>> + Send + 'static,
{
let own_user_id = self.matrix_link.user_id().to_owned();
self.matrix_link.client().add_event_handler(
move |ev: AnySyncTimelineEvent, room: Room| async move {
let event_span = tracing::error_span!(
"on_joined",
event_id = ev.event_id().as_str(),
room_id = room.room_id().as_str(),
sender_id = ev.sender().as_str()
);
{
let _enter = event_span.enter();
tracing::trace!(
"Sync timeline event handler (on_joined_room) for event: {:?}",
ev
);
let membership = if let AnySyncTimelineEvent::State(
AnySyncStateEvent::RoomMember(membership),
) = ev.clone()
{
membership
} else {
tracing::trace!("Ignoring non-state/non-membership event");
return;
};
match membership.membership() {
MembershipState::Join => {}
event_type => {
tracing::debug!(?event_type, "Ignoring non-join membership event");
return;
}
}
if membership.state_key() != own_user_id.as_str() {
tracing::debug!(
state_key = membership.state_key().as_str(),
"Ignoring join for another user"
);
return;
}
// We wish to ignore events that are a result of the bot's display name changing.
// When that happens, the event's content still looks like a join event:
// > "content": {"displayname": "some_display_name", "membership": "join"}
//
// The difference is that join events that are a result to an invitation have a `prev_content` field like this:
// > "prev_content": {"displayname": "some_display_name", "membership": "invite"}
//
// Join events that are a result of a display name change have a `prev_content` field like this:
// > "prev_content": {"displayname": "some_display_name", "membership": "join"}
//
// That is.. it's only an actual join event if the `membership` field in `prev_content` was not "join" already.
let Some(original) = membership.as_original() else {
tracing::debug!("Ignoring redacted join event");
return;
};
let Some(unsigned) = original.prev_content() else {
tracing::debug!("Ignoring join event without prev_content");
return;
};
if let MembershipState::Join = unsigned.membership {
tracing::debug!("Ignoring join event that supersedes another join event (likely a displayname/avatar change, etc.)");
return;
};
}
if let Err(err) = callback(ev, room).instrument(event_span).await {
tracing::error!(?err, "Error in callback");
}
},
);
}
/// Register a callback to be called when we've determined to be the last member in the room.
/// When this happens, you usually may wish to clean up and leave the room.
pub fn on_being_last_member<F, Fut>(&self, callback: F)
where
F: FnOnce(AnySyncTimelineEvent, Room) -> Fut + Send + 'static + Clone + Sync,
Fut: std::future::Future<Output = Result<(), CallbackError>> + Send + 'static,
{
let own_user_id = self.matrix_link.user_id().to_owned();
self.matrix_link.client().add_event_handler(
move |ev: AnySyncTimelineEvent, room: Room| async move {
let event_span = tracing::error_span!(
"on_being_last_member",
room_id = room.room_id().as_str(),
sender_id = ev.sender().as_str(),
);
{
let _enter = event_span.enter();
tracing::trace!(
"Sync timeline event handler (on_being_last_member_in_room) for event: {:?}",
ev
);
let membership =
if let AnySyncTimelineEvent::State(AnySyncStateEvent::RoomMember(membership)) =
ev.clone()
{
membership
} else {
tracing::trace!("Ignoring non-state/non-membership event");
return;
};
match membership.membership() {
MembershipState::Leave | MembershipState::Ban => {}
_ => {
tracing::debug!("Ignoring non-leave/ban membership event");
return;
}
}
if membership.sender() == own_user_id {
tracing::debug!("Ignoring leave/ban initiated by us");
return;
}
if membership.state_key() == own_user_id.as_str() {
tracing::debug!("Ignoring leave/ban targeting us");
return;
}
}
// RoomMemberships::ACTIVE is another possibility (which includes invited members),
// but we don't care if someone is invited and may possibly join later (or not).
// If we're the only actually-active member right now, it sounds like it's time to leave.
match room.members(RoomMemberships::JOIN).instrument(event_span.clone()).await {
Ok(members) => {
{
let _enter = event_span.enter();
tracing::info!(
count = members.len(),
"Determined room members count",
);
if members.len() != 1 {
// It's more than just us, so we shouldn't leave.
return;
}
}
tokio::spawn(async move {
if let Err(err) = callback(ev, room).await {
tracing::error!(?err, "Error in callback");
}
});
}
Err(err) => {
let _enter = event_span.enter();
tracing::error!(?err, "Failed to get members");
}
}
},
);
}
}