matrix_sdk/send_queue/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A send queue facility to serializing queuing and sending of messages.
16//!
17//! # [`Room`] send queue
18//!
19//! Each room gets its own [`RoomSendQueue`], that's available by calling
20//! [`Room::send_queue()`]. The first time this method is called, it will spawn
21//! a background task that's used to actually send events, in the order they
22//! were passed from calls to [`RoomSendQueue::send()`].
23//!
24//! This queue tries to simplify error management around sending events, using
25//! [`RoomSendQueue::send`] or [`RoomSendQueue::send_raw`]: by default, it will retry to send the
26//! same event a few times, before automatically disabling itself, and emitting
27//! a notification that can be listened to with the global send queue (see
28//! paragraph below) or using [`RoomSendQueue::subscribe()`].
29//!
30//! It is possible to control whether a single room is enabled using
31//! [`RoomSendQueue::set_enabled()`].
32//!
33//! # Global [`SendQueue`] object
34//!
35//! The [`Client::send_queue()`] method returns an API object allowing to
36//! control all the room send queues:
37//!
38//! - enable/disable them all at once with [`SendQueue::set_enabled()`].
39//! - get notifications about send errors with [`SendQueue::subscribe_errors`].
40//! - reload all unsent events that had been persisted in storage using
41//!   [`SendQueue::respawn_tasks_for_rooms_with_unsent_requests()`]. It is
42//!   recommended to call this method during initialization of a client,
43//!   otherwise persisted unsent events will only be re-sent after the send
44//!   queue for the given room has been reopened for the first time.
45//!
46//! # Send handle
47//!
48//! Just after queuing a request to send something, a [`SendHandle`] is
49//! returned, allowing manipulating the inflight request.
50//!
51//! For a send handle for an event, it's possible to edit the event / abort
52//! sending it. If it was still in the queue (i.e. not sent yet, or not being
53//! sent), then such an action would happen locally (i.e. in the database).
54//! Otherwise, it is "too late": the background task may be sending
55//! the event already, or has sent it; in that case, the edit/aborting must
56//! happen as an actual event materializing this, on the server. To accomplish
57//! this, the send queue may send such an event, using the dependency system
58//! described below.
59//!
60//! # Dependency system
61//!
62//! The send queue includes a simple dependency system, where a
63//! [`QueuedRequest`] can have zero or more dependents in the form of
64//! [`DependentQueuedRequest`]. A dependent queued request can have at most one
65//! depended-upon (parent) queued request.
66//!
67//! This allows implementing deferred edits/redacts, as hinted to in the
68//! previous section.
69//!
70//! ## Media upload
71//!
72//! This dependency system also allows uploading medias, since the media's
73//! *content* must be uploaded before we send the media *event* that describes
74//! it.
75//!
76//! In the simplest case, that is, a media file and its event must be sent (i.e.
77//! no thumbnails):
78//!
79//! - The file's content is immediately cached in the
80//!   [`matrix_sdk_base::event_cache::store::EventCacheStore`], using an MXC ID
81//!   that is temporary and designates a local URI without any possible doubt.
82//! - An initial media event is created and uses this temporary MXC ID, and
83//!   propagated as a local echo for an event.
84//! - A [`QueuedRequest`] is pushed to upload the file's media
85//!   ([`QueuedRequestKind::MediaUpload`]).
86//! - A [`DependentQueuedRequest`] is pushed to finish the upload
87//!   ([`DependentQueuedRequestKind::FinishUpload`]).
88//!
89//! What is expected to happen, if all goes well, is the following:
90//!
91//! - the media is uploaded to the media homeserver, which returns the final MXC
92//!   ID.
93//! - when marking the upload request as sent, the MXC ID is injected (as a
94//!   [`matrix_sdk_base::store::SentRequestKey`]) into the dependent request
95//!   [`DependentQueuedRequestKind::FinishUpload`] created in the last step
96//!   above.
97//! - next time the send queue handles dependent queries, it'll see this one is
98//!   ready to be sent, and it will transform it into an event queued request
99//!   ([`QueuedRequestKind::Event`]), with the event created in the local echo
100//!   before, updated with the MXC ID returned from the server.
101//! - this updated local echo is also propagated as an edit of the local echo to
102//!   observers, who get the final version with the final MXC IDs at this point
103//!   too.
104//! - then the event is sent normally, as any event sent with the send queue.
105//!
106//! When there is a thumbnail, things behave similarly, with some tweaks:
107//!
108//! - the thumbnail's content is also stored into the cache store immediately,
109//! - the thumbnail is sent first as an [`QueuedRequestKind::MediaUpload`]
110//!   request,
111//! - the file upload is pushed as a dependent request of kind
112//!   [`DependentQueuedRequestKind::UploadFileWithThumbnail`] (this variant
113//!   keeps the file's key used to look it up in the cache store).
114//! - the media event is then sent as a dependent request as described in the
115//!   previous section.
116//!
117//! What's expected to happen is thus the following:
118//!
119//! - After the thumbnail has been uploaded, the dependent query will retrieve
120//!   the final MXC ID returned by the homeserver for the thumbnail, and store
121//!   it into the [`QueuedRequestKind::MediaUpload`]'s `thumbnail_source` field,
122//!   allowing to remember the thumbnail MXC ID when it's time to finish the
123//!   upload later.
124//! - The dependent request is morphed into another
125//!   [`QueuedRequestKind::MediaUpload`], for the file itself.
126//!
127//! The rest of the process is then similar to that of uploading a file without
128//! a thumbnail. The only difference is that there's a thumbnail source (MXC ID)
129//! remembered and fixed up into the media event, just before sending it.
130
131use std::{
132    collections::{BTreeMap, HashMap},
133    str::FromStr as _,
134    sync::{
135        atomic::{AtomicBool, Ordering},
136        Arc, RwLock,
137    },
138};
139
140use as_variant::as_variant;
141use matrix_sdk_base::{
142    event_cache::store::EventCacheStoreError,
143    media::MediaRequestParameters,
144    store::{
145        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
146        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
147        SentMediaInfo, SentRequestKey, SerializableEventContent,
148    },
149    store_locks::LockStoreError,
150    RoomState, StoreError,
151};
152use matrix_sdk_common::executor::{spawn, JoinHandle};
153use mime::Mime;
154use ruma::{
155    events::{
156        reaction::ReactionEventContent,
157        relation::Annotation,
158        room::{
159            message::{FormattedBody, RoomMessageEventContent},
160            MediaSource,
161        },
162        AnyMessageLikeEventContent, EventContent as _, Mentions,
163    },
164    serde::Raw,
165    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
166};
167use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
168use tracing::{debug, error, info, instrument, trace, warn};
169
170#[cfg(feature = "e2e-encryption")]
171use crate::crypto::{OlmError, SessionRecipientCollectionError};
172use crate::{
173    client::WeakClient,
174    config::RequestConfig,
175    error::RetryKind,
176    room::{edit::EditedContent, WeakRoom},
177    Client, Media, Room,
178};
179
180mod upload;
181
182/// A client-wide send queue, for all the rooms known by a client.
183pub struct SendQueue {
184    client: Client,
185}
186
187#[cfg(not(tarpaulin_include))]
188impl std::fmt::Debug for SendQueue {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        f.debug_struct("SendQueue").finish_non_exhaustive()
191    }
192}
193
194impl SendQueue {
195    pub(super) fn new(client: Client) -> Self {
196        Self { client }
197    }
198
199    /// Reload all the rooms which had unsent requests, and respawn tasks for
200    /// those rooms.
201    pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
202        if !self.is_enabled() {
203            return;
204        }
205
206        let room_ids =
207            self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
208                |err| {
209                    warn!("error when loading rooms with unsent requests: {err}");
210                    Vec::new()
211                },
212            );
213
214        // Getting the [`RoomSendQueue`] is sufficient to spawn the task if needs be.
215        for room_id in room_ids {
216            if let Some(room) = self.client.get_room(&room_id) {
217                let _ = self.for_room(room);
218            }
219        }
220    }
221
222    /// Tiny helper to get the send queue's global context from the [`Client`].
223    #[inline(always)]
224    fn data(&self) -> &SendQueueData {
225        &self.client.inner.send_queue_data
226    }
227
228    /// Get or create a new send queue for a given room, and insert it into our
229    /// memoized rooms mapping.
230    fn for_room(&self, room: Room) -> RoomSendQueue {
231        let data = self.data();
232
233        let mut map = data.rooms.write().unwrap();
234
235        let room_id = room.room_id();
236        if let Some(room_q) = map.get(room_id).cloned() {
237            return room_q;
238        }
239
240        let owned_room_id = room_id.to_owned();
241        let room_q = RoomSendQueue::new(
242            self.is_enabled(),
243            data.error_reporter.clone(),
244            data.is_dropping.clone(),
245            &self.client,
246            owned_room_id.clone(),
247        );
248
249        map.insert(owned_room_id, room_q.clone());
250
251        room_q
252    }
253
254    /// Enable or disable the send queue for the entire client, i.e. all rooms.
255    ///
256    /// If we're disabling the queue, and requests were being sent, they're not
257    /// aborted, and will continue until a status resolves (error responses
258    /// will keep the events in the buffer of events to send later). The
259    /// disablement will happen before the next request is sent.
260    ///
261    /// This may wake up background tasks and resume sending of requests in the
262    /// background.
263    pub async fn set_enabled(&self, enabled: bool) {
264        debug!(?enabled, "setting global send queue enablement");
265
266        self.data().globally_enabled.store(enabled, Ordering::SeqCst);
267
268        // Wake up individual rooms we already know about.
269        for room in self.data().rooms.read().unwrap().values() {
270            room.set_enabled(enabled);
271        }
272
273        // Reload some extra rooms that might not have been awaken yet, but could have
274        // requests from previous sessions.
275        self.respawn_tasks_for_rooms_with_unsent_requests().await;
276    }
277
278    /// Returns whether the send queue is enabled, at a client-wide
279    /// granularity.
280    pub fn is_enabled(&self) -> bool {
281        self.data().globally_enabled.load(Ordering::SeqCst)
282    }
283
284    /// A subscriber to the enablement status (enabled or disabled) of the
285    /// send queue, along with useful errors.
286    pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
287        self.data().error_reporter.subscribe()
288    }
289}
290
291/// A specific room's send queue ran into an error, and it has disabled itself.
292#[derive(Clone, Debug)]
293pub struct SendQueueRoomError {
294    /// For which room is the send queue failing?
295    pub room_id: OwnedRoomId,
296
297    /// The error the room has ran into, when trying to send a request.
298    pub error: Arc<crate::Error>,
299
300    /// Whether the error is considered recoverable or not.
301    ///
302    /// An error that's recoverable will disable the room's send queue, while an
303    /// unrecoverable error will be parked, until the user decides to do
304    /// something about it.
305    pub is_recoverable: bool,
306}
307
308impl Client {
309    /// Returns a [`SendQueue`] that handles sending, retrying and not
310    /// forgetting about requests that are to be sent.
311    pub fn send_queue(&self) -> SendQueue {
312        SendQueue::new(self.clone())
313    }
314}
315
316pub(super) struct SendQueueData {
317    /// Mapping of room to their unique send queue.
318    rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
319
320    /// Is the whole mechanism enabled or disabled?
321    ///
322    /// This is only kept in memory to initialize new room queues with an
323    /// initial enablement state.
324    globally_enabled: AtomicBool,
325
326    /// Global error updates for the send queue.
327    error_reporter: broadcast::Sender<SendQueueRoomError>,
328
329    /// Are we currently dropping the Client?
330    is_dropping: Arc<AtomicBool>,
331}
332
333impl SendQueueData {
334    /// Create the data for a send queue, in the given enabled state.
335    pub fn new(globally_enabled: bool) -> Self {
336        let (sender, _) = broadcast::channel(32);
337
338        Self {
339            rooms: Default::default(),
340            globally_enabled: AtomicBool::new(globally_enabled),
341            error_reporter: sender,
342            is_dropping: Arc::new(false.into()),
343        }
344    }
345}
346
347impl Drop for SendQueueData {
348    fn drop(&mut self) {
349        // Mark the whole send queue as shutting down, then wake up all the room
350        // queues so they're stopped too.
351        debug!("globally dropping the send queue");
352        self.is_dropping.store(true, Ordering::SeqCst);
353
354        let rooms = self.rooms.read().unwrap();
355        for room in rooms.values() {
356            room.inner.notifier.notify_one();
357        }
358    }
359}
360
361impl Room {
362    /// Returns the [`RoomSendQueue`] for this specific room.
363    pub fn send_queue(&self) -> RoomSendQueue {
364        self.client.send_queue().for_room(self.clone())
365    }
366}
367
368/// A per-room send queue.
369///
370/// This is cheap to clone.
371#[derive(Clone)]
372pub struct RoomSendQueue {
373    inner: Arc<RoomSendQueueInner>,
374}
375
376#[cfg(not(tarpaulin_include))]
377impl std::fmt::Debug for RoomSendQueue {
378    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379        f.debug_struct("RoomSendQueue").finish_non_exhaustive()
380    }
381}
382
383impl RoomSendQueue {
384    fn new(
385        globally_enabled: bool,
386        global_error_reporter: broadcast::Sender<SendQueueRoomError>,
387        is_dropping: Arc<AtomicBool>,
388        client: &Client,
389        room_id: OwnedRoomId,
390    ) -> Self {
391        let (updates_sender, _) = broadcast::channel(32);
392
393        let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
394        let notifier = Arc::new(Notify::new());
395
396        let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
397        let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
398
399        let task = spawn(Self::sending_task(
400            weak_room.clone(),
401            queue.clone(),
402            notifier.clone(),
403            updates_sender.clone(),
404            locally_enabled.clone(),
405            global_error_reporter,
406            is_dropping,
407        ));
408
409        Self {
410            inner: Arc::new(RoomSendQueueInner {
411                room: weak_room,
412                updates: updates_sender,
413                _task: task,
414                queue,
415                notifier,
416                locally_enabled,
417            }),
418        }
419    }
420
421    /// Queues a raw event for sending it to this room.
422    ///
423    /// This immediately returns, and will push the event to be sent into a
424    /// queue, handled in the background.
425    ///
426    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
427    /// the [`Self::subscribe()`] method to get updates about the sending of
428    /// that event.
429    ///
430    /// By default, if sending failed on the first attempt, it will be retried a
431    /// few times. If sending failed after those retries, the entire
432    /// client's sending queue will be disabled, and it will need to be
433    /// manually re-enabled by the caller (e.g. after network is back, or when
434    /// something has been done about the faulty requests).
435    pub async fn send_raw(
436        &self,
437        content: Raw<AnyMessageLikeEventContent>,
438        event_type: String,
439    ) -> Result<SendHandle, RoomSendQueueError> {
440        let Some(room) = self.inner.room.get() else {
441            return Err(RoomSendQueueError::RoomDisappeared);
442        };
443        if room.state() != RoomState::Joined {
444            return Err(RoomSendQueueError::RoomNotJoined);
445        }
446
447        let content = SerializableEventContent::from_raw(content, event_type);
448
449        let created_at = MilliSecondsSinceUnixEpoch::now();
450        let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
451        trace!(%transaction_id, "manager sends a raw event to the background task");
452
453        self.inner.notifier.notify_one();
454
455        let send_handle = SendHandle {
456            room: self.clone(),
457            transaction_id: transaction_id.clone(),
458            media_handles: vec![],
459            created_at,
460        };
461
462        let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
463            transaction_id,
464            content: LocalEchoContent::Event {
465                serialized_event: content,
466                send_handle: send_handle.clone(),
467                send_error: None,
468            },
469        }));
470
471        Ok(send_handle)
472    }
473
474    /// Queues an event for sending it to this room.
475    ///
476    /// This immediately returns, and will push the event to be sent into a
477    /// queue, handled in the background.
478    ///
479    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
480    /// the [`Self::subscribe()`] method to get updates about the sending of
481    /// that event.
482    ///
483    /// By default, if sending failed on the first attempt, it will be retried a
484    /// few times. If sending failed after those retries, the entire
485    /// client's sending queue will be disabled, and it will need to be
486    /// manually re-enabled by the caller (e.g. after network is back, or when
487    /// something has been done about the faulty requests).
488    pub async fn send(
489        &self,
490        content: AnyMessageLikeEventContent,
491    ) -> Result<SendHandle, RoomSendQueueError> {
492        self.send_raw(
493            Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
494            content.event_type().to_string(),
495        )
496        .await
497    }
498
499    /// Returns the current local requests as well as a receiver to listen to
500    /// the send queue updates, as defined in [`RoomSendQueueUpdate`].
501    pub async fn subscribe(
502        &self,
503    ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
504    {
505        let local_echoes = self.inner.queue.local_echoes(self).await?;
506
507        Ok((local_echoes, self.inner.updates.subscribe()))
508    }
509
510    /// A task that must be spawned in the async runtime, running in the
511    /// background for each room that has a send queue.
512    ///
513    /// It only progresses forward: nothing can be cancelled at any point, which
514    /// makes the implementation not overly complicated to follow.
515    #[instrument(skip_all, fields(room_id = %room.room_id()))]
516    async fn sending_task(
517        room: WeakRoom,
518        queue: QueueStorage,
519        notifier: Arc<Notify>,
520        updates: broadcast::Sender<RoomSendQueueUpdate>,
521        locally_enabled: Arc<AtomicBool>,
522        global_error_reporter: broadcast::Sender<SendQueueRoomError>,
523        is_dropping: Arc<AtomicBool>,
524    ) {
525        trace!("spawned the sending task");
526
527        loop {
528            // A request to shut down should be preferred above everything else.
529            if is_dropping.load(Ordering::SeqCst) {
530                trace!("shutting down!");
531                break;
532            }
533
534            // Try to apply dependent requests now; those applying to previously failed
535            // attempts (local echoes) would succeed now.
536            let mut new_updates = Vec::new();
537            if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
538                warn!("errors when applying dependent requests: {err}");
539            }
540
541            for up in new_updates {
542                let _ = updates.send(up);
543            }
544
545            if !locally_enabled.load(Ordering::SeqCst) {
546                trace!("not enabled, sleeping");
547                // Wait for an explicit wakeup.
548                notifier.notified().await;
549                continue;
550            }
551
552            let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
553                Ok(Some(request)) => request,
554
555                Ok(None) => {
556                    trace!("queue is empty, sleeping");
557                    // Wait for an explicit wakeup.
558                    notifier.notified().await;
559                    continue;
560                }
561
562                Err(err) => {
563                    warn!("error when loading next request to send: {err}");
564                    continue;
565                }
566            };
567
568            let txn_id = queued_request.transaction_id.clone();
569            trace!(txn_id = %txn_id, "received a request to send!");
570
571            let related_txn_id = as_variant!(&queued_request.kind, QueuedRequestKind::MediaUpload { related_to, .. } => related_to.clone());
572
573            let Some(room) = room.get() else {
574                if is_dropping.load(Ordering::SeqCst) {
575                    break;
576                }
577                error!("the weak room couldn't be upgraded but we're not shutting down?");
578                continue;
579            };
580
581            match Self::handle_request(&room, queued_request, cancel_upload_rx).await {
582                Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
583                {
584                    Ok(()) => match parent_key {
585                        SentRequestKey::Event(event_id) => {
586                            let _ = updates.send(RoomSendQueueUpdate::SentEvent {
587                                transaction_id: txn_id,
588                                event_id,
589                            });
590                        }
591
592                        SentRequestKey::Media(media_info) => {
593                            let _ = updates.send(RoomSendQueueUpdate::UploadedMedia {
594                                related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
595                                file: media_info.file,
596                            });
597                        }
598                    },
599
600                    Err(err) => {
601                        warn!("unable to mark queued request as sent: {err}");
602                    }
603                },
604
605                Ok(None) => {
606                    debug!("Request has been aborted while running, continuing.");
607                }
608
609                Err(err) => {
610                    let is_recoverable = match err {
611                        crate::Error::Http(ref http_err) => {
612                            // All transient errors are recoverable.
613                            matches!(
614                                http_err.retry_kind(),
615                                RetryKind::Transient { .. } | RetryKind::NetworkFailure
616                            )
617                        }
618
619                        // `ConcurrentRequestFailed` typically happens because of an HTTP failure;
620                        // since we don't get the underlying error, be lax and consider it
621                        // recoverable, and let observers decide to retry it or not. At some point
622                        // we'll get the actual underlying error.
623                        crate::Error::ConcurrentRequestFailed => true,
624
625                        // As of 2024-06-27, all other error types are considered unrecoverable.
626                        _ => false,
627                    };
628
629                    // Disable the queue for this room after any kind of error happened.
630                    locally_enabled.store(false, Ordering::SeqCst);
631
632                    if is_recoverable {
633                        warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
634
635                        // In this case, we intentionally keep the request in the queue, but mark it
636                        // as not being sent anymore.
637                        queue.mark_as_not_being_sent(&txn_id).await;
638
639                        // Let observers know about a failure *after* we've
640                        // marked the item as not being sent anymore. Otherwise,
641                        // there's a possible race where a caller might try to
642                        // remove an item, while it's still marked as being
643                        // sent, resulting in a cancellation failure.
644                    } else {
645                        warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
646
647                        // Mark the request as wedged, so it's not picked at any future point.
648                        if let Err(storage_error) =
649                            queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
650                        {
651                            warn!("unable to mark request as wedged: {storage_error}");
652                        }
653                    }
654
655                    let error = Arc::new(err);
656
657                    let _ = global_error_reporter.send(SendQueueRoomError {
658                        room_id: room.room_id().to_owned(),
659                        error: error.clone(),
660                        is_recoverable,
661                    });
662
663                    let _ = updates.send(RoomSendQueueUpdate::SendError {
664                        transaction_id: related_txn_id.unwrap_or(txn_id),
665                        error,
666                        is_recoverable,
667                    });
668                }
669            }
670        }
671
672        info!("exited sending task");
673    }
674
675    /// Handles a single request and returns the [`SentRequestKey`] on success
676    /// (unless the request was cancelled, in which case it'll return
677    /// `None`).
678    async fn handle_request(
679        room: &Room,
680        request: QueuedRequest,
681        cancel_upload_rx: Option<oneshot::Receiver<()>>,
682    ) -> Result<Option<SentRequestKey>, crate::Error> {
683        match request.kind {
684            QueuedRequestKind::Event { content } => {
685                let (event, event_type) = content.raw();
686
687                let res = room
688                    .send_raw(event_type, event)
689                    .with_transaction_id(&request.transaction_id)
690                    .with_request_config(RequestConfig::short_retry())
691                    .await?;
692
693                trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
694                Ok(Some(SentRequestKey::Event(res.event_id)))
695            }
696
697            QueuedRequestKind::MediaUpload {
698                content_type,
699                cache_key,
700                thumbnail_source,
701                related_to: relates_to,
702            } => {
703                trace!(%relates_to, "uploading media related to event");
704
705                let fut = async move {
706                    let mime = Mime::from_str(&content_type).map_err(|_| {
707                        crate::Error::SendQueueWedgeError(Box::new(
708                            QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
709                        ))
710                    })?;
711
712                    let data = room
713                        .client()
714                        .event_cache_store()
715                        .lock()
716                        .await?
717                        .get_media_content(&cache_key)
718                        .await?
719                        .ok_or(crate::Error::SendQueueWedgeError(Box::new(
720                            QueueWedgeError::MissingMediaContent,
721                        )))?;
722
723                    #[cfg(feature = "e2e-encryption")]
724                    let media_source = if room.latest_encryption_state().await?.is_encrypted() {
725                        trace!("upload will be encrypted (encrypted room)");
726                        let mut cursor = std::io::Cursor::new(data);
727                        let encrypted_file = room
728                            .client()
729                            .upload_encrypted_file(&mime, &mut cursor)
730                            .with_request_config(RequestConfig::short_retry())
731                            .await?;
732                        MediaSource::Encrypted(Box::new(encrypted_file))
733                    } else {
734                        trace!("upload will be in clear text (room without encryption)");
735                        let request_config = RequestConfig::short_retry()
736                            .timeout(Media::reasonable_upload_timeout(&data));
737                        let res =
738                            room.client().media().upload(&mime, data, Some(request_config)).await?;
739                        MediaSource::Plain(res.content_uri)
740                    };
741
742                    #[cfg(not(feature = "e2e-encryption"))]
743                    let media_source = {
744                        let request_config = RequestConfig::short_retry()
745                            .timeout(Media::reasonable_upload_timeout(&data));
746                        let res =
747                            room.client().media().upload(&mime, data, Some(request_config)).await?;
748                        MediaSource::Plain(res.content_uri)
749                    };
750
751                    let uri = match &media_source {
752                        MediaSource::Plain(uri) => uri,
753                        MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
754                    };
755                    trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
756
757                    Ok(SentRequestKey::Media(SentMediaInfo {
758                        file: media_source,
759                        thumbnail: thumbnail_source,
760                    }))
761                };
762
763                let wait_for_cancel = async move {
764                    if let Some(rx) = cancel_upload_rx {
765                        rx.await
766                    } else {
767                        std::future::pending().await
768                    }
769                };
770
771                tokio::select! {
772                    biased;
773
774                    _ = wait_for_cancel => {
775                        Ok(None)
776                    }
777
778                    res = fut => {
779                        res.map(Some)
780                    }
781                }
782            }
783        }
784    }
785
786    /// Returns whether the room is enabled, at the room level.
787    pub fn is_enabled(&self) -> bool {
788        self.inner.locally_enabled.load(Ordering::SeqCst)
789    }
790
791    /// Set the locally enabled flag for this room queue.
792    pub fn set_enabled(&self, enabled: bool) {
793        self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
794
795        // No need to wake a task to tell it it's been disabled, so only notify if we're
796        // re-enabling the queue.
797        if enabled {
798            self.inner.notifier.notify_one();
799        }
800    }
801}
802
803impl From<&crate::Error> for QueueWedgeError {
804    fn from(value: &crate::Error) -> Self {
805        match value {
806            #[cfg(feature = "e2e-encryption")]
807            crate::Error::OlmError(error) => match &**error {
808                OlmError::SessionRecipientCollectionError(error) => match error {
809                    SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
810                        QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
811                    }
812
813                    SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
814                        QueueWedgeError::IdentityViolations { users: users.clone() }
815                    }
816
817                    SessionRecipientCollectionError::CrossSigningNotSetup
818                    | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
819                        QueueWedgeError::CrossVerificationRequired
820                    }
821                },
822                _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
823            },
824
825            // Flatten errors of `Self` type.
826            crate::Error::SendQueueWedgeError(error) => *error.clone(),
827
828            _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
829        }
830    }
831}
832
833struct RoomSendQueueInner {
834    /// The room which this send queue relates to.
835    room: WeakRoom,
836
837    /// Broadcaster for notifications about the statuses of requests to be sent.
838    ///
839    /// Can be subscribed to from the outside.
840    updates: broadcast::Sender<RoomSendQueueUpdate>,
841
842    /// Queue of requests that are either to be sent, or being sent.
843    ///
844    /// When a request has been sent to the server, it is removed from that
845    /// queue *after* being sent. That way, we will retry sending upon
846    /// failure, in the same order requests have been inserted in the first
847    /// place.
848    queue: QueueStorage,
849
850    /// A notifier that's updated any time common data is touched (stopped or
851    /// enabled statuses), or the associated room [`QueueStorage`].
852    notifier: Arc<Notify>,
853
854    /// Should the room process new requests or not (because e.g. it might be
855    /// running off the network)?
856    locally_enabled: Arc<AtomicBool>,
857
858    /// Handle to the actual sending task. Unused, but kept alive along this
859    /// data structure.
860    _task: JoinHandle<()>,
861}
862
863/// Information about a request being sent right this moment.
864struct BeingSentInfo {
865    /// Transaction id of the thing being sent.
866    transaction_id: OwnedTransactionId,
867
868    /// For an upload request, a trigger to cancel the upload before it
869    /// completes.
870    cancel_upload: Option<oneshot::Sender<()>>,
871}
872
873impl BeingSentInfo {
874    /// Aborts the upload, if a trigger is available.
875    ///
876    /// Consumes the object because the sender is a oneshot and will be consumed
877    /// upon sending.
878    fn cancel_upload(self) -> bool {
879        if let Some(cancel_upload) = self.cancel_upload {
880            let _ = cancel_upload.send(());
881            true
882        } else {
883            false
884        }
885    }
886}
887
888/// A specialized lock that guards both against the state store and the
889/// [`Self::being_sent`] data.
890#[derive(Clone)]
891struct StoreLock {
892    /// Reference to the client, to get access to the underlying store.
893    client: WeakClient,
894
895    /// The one queued request that is being sent at the moment, along with
896    /// associated data that can be useful to act upon it.
897    ///
898    /// Also used as the lock to access the state store.
899    being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
900}
901
902impl StoreLock {
903    /// Gets a hold of the locked store and [`Self::being_sent`] pair.
904    async fn lock(&self) -> StoreLockGuard {
905        StoreLockGuard {
906            client: self.client.clone(),
907            being_sent: self.being_sent.clone().lock_owned().await,
908        }
909    }
910}
911
912/// A lock guard obtained through locking with [`StoreLock`].
913/// `being_sent` data.
914struct StoreLockGuard {
915    /// Reference to the client, to get access to the underlying store.
916    client: WeakClient,
917
918    /// The one queued request that is being sent at the moment, along with
919    /// associated data that can be useful to act upon it.
920    being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
921}
922
923impl StoreLockGuard {
924    /// Get a client from the locked state, useful to get a handle on a store.
925    fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
926        self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
927    }
928}
929
930#[derive(Clone)]
931struct QueueStorage {
932    /// A lock to make sure the state store is only accessed once at a time, to
933    /// make some store operations atomic.
934    store: StoreLock,
935
936    /// To which room is this storage related.
937    room_id: OwnedRoomId,
938}
939
940impl QueueStorage {
941    /// Default priority for a queued request.
942    const LOW_PRIORITY: usize = 0;
943
944    /// High priority for a queued request that must be handled before others.
945    const HIGH_PRIORITY: usize = 10;
946
947    /// Create a new queue for queuing requests to be sent later.
948    fn new(client: WeakClient, room: OwnedRoomId) -> Self {
949        Self { room_id: room, store: StoreLock { client, being_sent: Default::default() } }
950    }
951
952    /// Push a new event to be sent in the queue, with a default priority of 0.
953    ///
954    /// Returns the transaction id chosen to identify the request.
955    async fn push(
956        &self,
957        request: QueuedRequestKind,
958        created_at: MilliSecondsSinceUnixEpoch,
959    ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
960        let transaction_id = TransactionId::new();
961
962        self.store
963            .lock()
964            .await
965            .client()?
966            .state_store()
967            .save_send_queue_request(
968                &self.room_id,
969                transaction_id.clone(),
970                created_at,
971                request,
972                Self::LOW_PRIORITY,
973            )
974            .await?;
975
976        Ok(transaction_id)
977    }
978
979    /// Peeks the next request to be sent, marking it as being sent.
980    ///
981    /// It is required to call [`Self::mark_as_sent`] after it's been
982    /// effectively sent.
983    async fn peek_next_to_send(
984        &self,
985    ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
986    {
987        let mut guard = self.store.lock().await;
988        let queued_requests =
989            guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
990
991        if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
992            let (cancel_upload_tx, cancel_upload_rx) =
993                if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
994                    let (tx, rx) = oneshot::channel();
995                    (Some(tx), Some(rx))
996                } else {
997                    Default::default()
998                };
999
1000            let prev = guard.being_sent.replace(BeingSentInfo {
1001                transaction_id: request.transaction_id.clone(),
1002                cancel_upload: cancel_upload_tx,
1003            });
1004
1005            if let Some(prev) = prev {
1006                error!(
1007                    prev_txn = ?prev.transaction_id,
1008                    "a previous request was still active while picking a new one"
1009                );
1010            }
1011
1012            Ok(Some((request.clone(), cancel_upload_rx)))
1013        } else {
1014            Ok(None)
1015        }
1016    }
1017
1018    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1019    /// with the given transaction id as not being sent anymore, so it can
1020    /// be removed from the queue later.
1021    async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1022        let was_being_sent = self.store.lock().await.being_sent.take();
1023
1024        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1025        if prev_txn != Some(transaction_id) {
1026            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1027        }
1028    }
1029
1030    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1031    /// with the given transaction id as being wedged (and not being sent
1032    /// anymore), so it can be removed from the queue later.
1033    async fn mark_as_wedged(
1034        &self,
1035        transaction_id: &TransactionId,
1036        reason: QueueWedgeError,
1037    ) -> Result<(), RoomSendQueueStorageError> {
1038        // Keep the lock until we're done touching the storage.
1039        let mut guard = self.store.lock().await;
1040        let was_being_sent = guard.being_sent.take();
1041
1042        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1043        if prev_txn != Some(transaction_id) {
1044            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after permanent error)");
1045        }
1046
1047        Ok(guard
1048            .client()?
1049            .state_store()
1050            .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1051            .await?)
1052    }
1053
1054    /// Marks a request identified with the given transaction id as being now
1055    /// unwedged and adds it back to the queue.
1056    async fn mark_as_unwedged(
1057        &self,
1058        transaction_id: &TransactionId,
1059    ) -> Result<(), RoomSendQueueStorageError> {
1060        Ok(self
1061            .store
1062            .lock()
1063            .await
1064            .client()?
1065            .state_store()
1066            .update_send_queue_request_status(&self.room_id, transaction_id, None)
1067            .await?)
1068    }
1069
1070    /// Marks a request pushed with [`Self::push`] and identified with the given
1071    /// transaction id as sent, by removing it from the local queue.
1072    async fn mark_as_sent(
1073        &self,
1074        transaction_id: &TransactionId,
1075        parent_key: SentRequestKey,
1076    ) -> Result<(), RoomSendQueueStorageError> {
1077        // Keep the lock until we're done touching the storage.
1078        let mut guard = self.store.lock().await;
1079        let was_being_sent = guard.being_sent.take();
1080
1081        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1082        if prev_txn != Some(transaction_id) {
1083            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after successful send");
1084        }
1085
1086        let client = guard.client()?;
1087        let store = client.state_store();
1088
1089        // Update all dependent requests.
1090        store
1091            .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1092            .await?;
1093
1094        let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1095
1096        if !removed {
1097            warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1098        }
1099
1100        Ok(())
1101    }
1102
1103    /// Cancel a sending command for an event that has been sent with
1104    /// [`Self::push`] with the given transaction id.
1105    ///
1106    /// Returns whether the given transaction has been effectively removed. If
1107    /// false, this either means that the transaction id was unrelated to
1108    /// this queue, or that the request was sent before we cancelled it.
1109    async fn cancel_event(
1110        &self,
1111        transaction_id: &TransactionId,
1112    ) -> Result<bool, RoomSendQueueStorageError> {
1113        let guard = self.store.lock().await;
1114
1115        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1116            == Some(transaction_id)
1117        {
1118            // Save the intent to redact the event.
1119            guard
1120                .client()?
1121                .state_store()
1122                .save_dependent_queued_request(
1123                    &self.room_id,
1124                    transaction_id,
1125                    ChildTransactionId::new(),
1126                    MilliSecondsSinceUnixEpoch::now(),
1127                    DependentQueuedRequestKind::RedactEvent,
1128                )
1129                .await?;
1130
1131            return Ok(true);
1132        }
1133
1134        let removed = guard
1135            .client()?
1136            .state_store()
1137            .remove_send_queue_request(&self.room_id, transaction_id)
1138            .await?;
1139
1140        Ok(removed)
1141    }
1142
1143    /// Replace an event that has been sent with [`Self::push`] with the given
1144    /// transaction id, before it's been actually sent.
1145    ///
1146    /// Returns whether the given transaction has been effectively edited. If
1147    /// false, this either means that the transaction id was unrelated to
1148    /// this queue, or that the request was sent before we edited it.
1149    async fn replace_event(
1150        &self,
1151        transaction_id: &TransactionId,
1152        serializable: SerializableEventContent,
1153    ) -> Result<bool, RoomSendQueueStorageError> {
1154        let guard = self.store.lock().await;
1155
1156        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1157            == Some(transaction_id)
1158        {
1159            // Save the intent to edit the associated event.
1160            guard
1161                .client()?
1162                .state_store()
1163                .save_dependent_queued_request(
1164                    &self.room_id,
1165                    transaction_id,
1166                    ChildTransactionId::new(),
1167                    MilliSecondsSinceUnixEpoch::now(),
1168                    DependentQueuedRequestKind::EditEvent { new_content: serializable },
1169                )
1170                .await?;
1171
1172            return Ok(true);
1173        }
1174
1175        let edited = guard
1176            .client()?
1177            .state_store()
1178            .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1179            .await?;
1180
1181        Ok(edited)
1182    }
1183
1184    /// Push requests (and dependents) to upload a media.
1185    ///
1186    /// See the module-level description for details of the whole processus.
1187    #[allow(clippy::too_many_arguments)]
1188    async fn push_media(
1189        &self,
1190        event: RoomMessageEventContent,
1191        content_type: Mime,
1192        send_event_txn: OwnedTransactionId,
1193        created_at: MilliSecondsSinceUnixEpoch,
1194        upload_file_txn: OwnedTransactionId,
1195        file_media_request: MediaRequestParameters,
1196        thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
1197    ) -> Result<(), RoomSendQueueStorageError> {
1198        let guard = self.store.lock().await;
1199        let client = guard.client()?;
1200        let store = client.state_store();
1201        let thumbnail_info =
1202            if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) =
1203                thumbnail
1204            {
1205                let upload_thumbnail_txn = thumbnail_info.txn.clone();
1206
1207                // Save the thumbnail upload request.
1208                store
1209                    .save_send_queue_request(
1210                        &self.room_id,
1211                        upload_thumbnail_txn.clone(),
1212                        created_at,
1213                        QueuedRequestKind::MediaUpload {
1214                            content_type: thumbnail_content_type.to_string(),
1215                            cache_key: thumbnail_media_request,
1216                            thumbnail_source: None, // the thumbnail has no thumbnails :)
1217                            related_to: send_event_txn.clone(),
1218                        },
1219                        Self::LOW_PRIORITY,
1220                    )
1221                    .await?;
1222
1223                // Save the file upload request as a dependent request of the thumbnail upload.
1224                store
1225                    .save_dependent_queued_request(
1226                        &self.room_id,
1227                        &upload_thumbnail_txn,
1228                        upload_file_txn.clone().into(),
1229                        created_at,
1230                        DependentQueuedRequestKind::UploadFileWithThumbnail {
1231                            content_type: content_type.to_string(),
1232                            cache_key: file_media_request,
1233                            related_to: send_event_txn.clone(),
1234                        },
1235                    )
1236                    .await?;
1237
1238                Some(thumbnail_info)
1239            } else {
1240                // Save the file upload as its own request, not a dependent one.
1241                store
1242                    .save_send_queue_request(
1243                        &self.room_id,
1244                        upload_file_txn.clone(),
1245                        created_at,
1246                        QueuedRequestKind::MediaUpload {
1247                            content_type: content_type.to_string(),
1248                            cache_key: file_media_request,
1249                            thumbnail_source: None,
1250                            related_to: send_event_txn.clone(),
1251                        },
1252                        Self::LOW_PRIORITY,
1253                    )
1254                    .await?;
1255
1256                None
1257            };
1258
1259        // Push the dependent request for the event itself.
1260        store
1261            .save_dependent_queued_request(
1262                &self.room_id,
1263                &upload_file_txn,
1264                send_event_txn.into(),
1265                created_at,
1266                DependentQueuedRequestKind::FinishUpload {
1267                    local_echo: Box::new(event),
1268                    file_upload: upload_file_txn.clone(),
1269                    thumbnail_info,
1270                },
1271            )
1272            .await?;
1273
1274        Ok(())
1275    }
1276
1277    /// Reacts to the given local echo of an event.
1278    #[instrument(skip(self))]
1279    async fn react(
1280        &self,
1281        transaction_id: &TransactionId,
1282        key: String,
1283        created_at: MilliSecondsSinceUnixEpoch,
1284    ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1285        let guard = self.store.lock().await;
1286        let client = guard.client()?;
1287        let store = client.state_store();
1288
1289        let requests = store.load_send_queue_requests(&self.room_id).await?;
1290
1291        // If the target event has been already sent, abort immediately.
1292        if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1293            // We didn't find it as a queued request; try to find it as a dependent queued
1294            // request.
1295            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1296            if !dependent_requests
1297                .into_iter()
1298                .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1299                .any(|child_txn| *child_txn == *transaction_id)
1300            {
1301                // We didn't find it as either a request or a dependent request, abort.
1302                return Ok(None);
1303            }
1304        }
1305
1306        // Record the dependent request.
1307        let reaction_txn_id = ChildTransactionId::new();
1308        store
1309            .save_dependent_queued_request(
1310                &self.room_id,
1311                transaction_id,
1312                reaction_txn_id.clone(),
1313                created_at,
1314                DependentQueuedRequestKind::ReactEvent { key },
1315            )
1316            .await?;
1317
1318        Ok(Some(reaction_txn_id))
1319    }
1320
1321    /// Returns a list of the local echoes, that is, all the requests that we're
1322    /// about to send but that haven't been sent yet (or are being sent).
1323    async fn local_echoes(
1324        &self,
1325        room: &RoomSendQueue,
1326    ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1327        let guard = self.store.lock().await;
1328        let client = guard.client()?;
1329        let store = client.state_store();
1330
1331        let local_requests =
1332            store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1333                Some(LocalEcho {
1334                    transaction_id: queued.transaction_id.clone(),
1335                    content: match queued.kind {
1336                        QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1337                            serialized_event: content,
1338                            send_handle: SendHandle {
1339                                room: room.clone(),
1340                                transaction_id: queued.transaction_id,
1341                                media_handles: vec![],
1342                                created_at: queued.created_at,
1343                            },
1344                            send_error: queued.error,
1345                        },
1346
1347                        QueuedRequestKind::MediaUpload { .. } => {
1348                            // Don't return uploaded medias as their own things; the accompanying
1349                            // event represented as a dependent request should be sufficient.
1350                            return None;
1351                        }
1352                    },
1353                })
1354            });
1355
1356        let reactions_and_medias = store
1357            .load_dependent_queued_requests(&self.room_id)
1358            .await?
1359            .into_iter()
1360            .filter_map(|dep| match dep.kind {
1361                DependentQueuedRequestKind::EditEvent { .. }
1362                | DependentQueuedRequestKind::RedactEvent => {
1363                    // TODO: reflect local edits/redacts too?
1364                    None
1365                }
1366
1367                DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1368                    transaction_id: dep.own_transaction_id.clone().into(),
1369                    content: LocalEchoContent::React {
1370                        key,
1371                        send_handle: SendReactionHandle {
1372                            room: room.clone(),
1373                            transaction_id: dep.own_transaction_id,
1374                        },
1375                        applies_to: dep.parent_transaction_id,
1376                    },
1377                }),
1378
1379                DependentQueuedRequestKind::UploadFileWithThumbnail { .. } => {
1380                    // Don't reflect these: only the associated event is interesting to observers.
1381                    None
1382                }
1383
1384                DependentQueuedRequestKind::FinishUpload {
1385                    local_echo,
1386                    file_upload,
1387                    thumbnail_info,
1388                } => {
1389                    // Materialize as an event local echo.
1390                    Some(LocalEcho {
1391                        transaction_id: dep.own_transaction_id.clone().into(),
1392                        content: LocalEchoContent::Event {
1393                            serialized_event: SerializableEventContent::new(&(*local_echo).into())
1394                                .ok()?,
1395                            send_handle: SendHandle {
1396                                room: room.clone(),
1397                                transaction_id: dep.own_transaction_id.into(),
1398                                media_handles: vec![MediaHandles {
1399                                    upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1400                                    upload_file_txn: file_upload,
1401                                }],
1402                                created_at: dep.created_at,
1403                            },
1404                            send_error: None,
1405                        },
1406                    })
1407                }
1408            });
1409
1410        Ok(local_requests.chain(reactions_and_medias).collect())
1411    }
1412
1413    /// Try to apply a single dependent request, whether it's local or remote.
1414    ///
1415    /// This swallows errors that would retrigger every time if we retried
1416    /// applying the dependent request: invalid edit content, etc.
1417    ///
1418    /// Returns true if the dependent request has been sent (or should not be
1419    /// retried later).
1420    #[instrument(skip_all)]
1421    async fn try_apply_single_dependent_request(
1422        &self,
1423        client: &Client,
1424        dependent_request: DependentQueuedRequest,
1425        new_updates: &mut Vec<RoomSendQueueUpdate>,
1426    ) -> Result<bool, RoomSendQueueError> {
1427        let store = client.state_store();
1428
1429        let parent_key = dependent_request.parent_key;
1430
1431        match dependent_request.kind {
1432            DependentQueuedRequestKind::EditEvent { new_content } => {
1433                if let Some(parent_key) = parent_key {
1434                    let Some(event_id) = parent_key.into_event_id() else {
1435                        return Err(RoomSendQueueError::StorageError(
1436                            RoomSendQueueStorageError::InvalidParentKey,
1437                        ));
1438                    };
1439
1440                    // The parent event has been sent, so send an edit event.
1441                    let room = client
1442                        .get_room(&self.room_id)
1443                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1444
1445                    // Check the event is one we know how to edit with an edit event.
1446
1447                    // It must be deserializable…
1448                    let edited_content = match new_content.deserialize() {
1449                        Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
1450                            // Assume no relationships.
1451                            EditedContent::RoomMessage(c.into())
1452                        }
1453
1454                        Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
1455                            let poll_start = c.poll_start().clone();
1456                            EditedContent::PollStart {
1457                                fallback_text: poll_start.question.text.clone(),
1458                                new_content: poll_start,
1459                            }
1460                        }
1461
1462                        Ok(c) => {
1463                            warn!("Unsupported edit content type: {:?}", c.event_type());
1464                            return Ok(true);
1465                        }
1466
1467                        Err(err) => {
1468                            warn!("Unable to deserialize: {err}");
1469                            return Ok(true);
1470                        }
1471                    };
1472
1473                    let edit_event = match room.make_edit_event(&event_id, edited_content).await {
1474                        Ok(e) => e,
1475                        Err(err) => {
1476                            warn!("couldn't create edited event: {err}");
1477                            return Ok(true);
1478                        }
1479                    };
1480
1481                    // Queue the edit event in the send queue 🧠.
1482                    let serializable = SerializableEventContent::from_raw(
1483                        Raw::new(&edit_event)
1484                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1485                        edit_event.event_type().to_string(),
1486                    );
1487
1488                    store
1489                        .save_send_queue_request(
1490                            &self.room_id,
1491                            dependent_request.own_transaction_id.into(),
1492                            dependent_request.created_at,
1493                            serializable.into(),
1494                            Self::HIGH_PRIORITY,
1495                        )
1496                        .await
1497                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1498                } else {
1499                    // The parent event is still local; update the local echo.
1500                    let edited = store
1501                        .update_send_queue_request(
1502                            &self.room_id,
1503                            &dependent_request.parent_transaction_id,
1504                            new_content.into(),
1505                        )
1506                        .await
1507                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1508
1509                    if !edited {
1510                        warn!("missing local echo upon dependent edit");
1511                    }
1512                }
1513            }
1514
1515            DependentQueuedRequestKind::RedactEvent => {
1516                if let Some(parent_key) = parent_key {
1517                    let Some(event_id) = parent_key.into_event_id() else {
1518                        return Err(RoomSendQueueError::StorageError(
1519                            RoomSendQueueStorageError::InvalidParentKey,
1520                        ));
1521                    };
1522
1523                    // The parent event has been sent; send a redaction.
1524                    let room = client
1525                        .get_room(&self.room_id)
1526                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1527
1528                    // Ideally we'd use the send queue to send the redaction, but the protocol has
1529                    // changed the shape of a room.redaction after v11, so keep it simple and try
1530                    // once here.
1531
1532                    // Note: no reason is provided because we materialize the intent of "cancel
1533                    // sending the parent event".
1534
1535                    if let Err(err) = room
1536                        .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
1537                        .await
1538                    {
1539                        warn!("error when sending a redact for {event_id}: {err}");
1540                        return Ok(false);
1541                    }
1542                } else {
1543                    // The parent event is still local (sending must have failed); redact the local
1544                    // echo.
1545                    let removed = store
1546                        .remove_send_queue_request(
1547                            &self.room_id,
1548                            &dependent_request.parent_transaction_id,
1549                        )
1550                        .await
1551                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1552
1553                    if !removed {
1554                        warn!("missing local echo upon dependent redact");
1555                    }
1556                }
1557            }
1558
1559            DependentQueuedRequestKind::ReactEvent { key } => {
1560                if let Some(parent_key) = parent_key {
1561                    let Some(parent_event_id) = parent_key.into_event_id() else {
1562                        return Err(RoomSendQueueError::StorageError(
1563                            RoomSendQueueStorageError::InvalidParentKey,
1564                        ));
1565                    };
1566
1567                    // Queue the reaction event in the send queue 🧠.
1568                    let react_event =
1569                        ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
1570                    let serializable = SerializableEventContent::from_raw(
1571                        Raw::new(&react_event)
1572                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1573                        react_event.event_type().to_string(),
1574                    );
1575
1576                    store
1577                        .save_send_queue_request(
1578                            &self.room_id,
1579                            dependent_request.own_transaction_id.into(),
1580                            dependent_request.created_at,
1581                            serializable.into(),
1582                            Self::HIGH_PRIORITY,
1583                        )
1584                        .await
1585                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1586                } else {
1587                    // Not applied yet, we should retry later => false.
1588                    return Ok(false);
1589                }
1590            }
1591
1592            DependentQueuedRequestKind::UploadFileWithThumbnail {
1593                content_type,
1594                cache_key,
1595                related_to,
1596            } => {
1597                let Some(parent_key) = parent_key else {
1598                    // Not finished yet, we should retry later => false.
1599                    return Ok(false);
1600                };
1601                self.handle_dependent_file_upload_with_thumbnail(
1602                    client,
1603                    dependent_request.own_transaction_id.into(),
1604                    parent_key,
1605                    content_type,
1606                    cache_key,
1607                    related_to,
1608                )
1609                .await?;
1610            }
1611
1612            DependentQueuedRequestKind::FinishUpload {
1613                local_echo,
1614                file_upload,
1615                thumbnail_info,
1616            } => {
1617                let Some(parent_key) = parent_key else {
1618                    // Not finished yet, we should retry later => false.
1619                    return Ok(false);
1620                };
1621                self.handle_dependent_finish_upload(
1622                    client,
1623                    dependent_request.own_transaction_id.into(),
1624                    parent_key,
1625                    *local_echo,
1626                    file_upload,
1627                    thumbnail_info,
1628                    new_updates,
1629                )
1630                .await?;
1631            }
1632        }
1633
1634        Ok(true)
1635    }
1636
1637    #[instrument(skip(self))]
1638    async fn apply_dependent_requests(
1639        &self,
1640        new_updates: &mut Vec<RoomSendQueueUpdate>,
1641    ) -> Result<(), RoomSendQueueError> {
1642        let guard = self.store.lock().await;
1643
1644        let client = guard.client()?;
1645        let store = client.state_store();
1646
1647        let dependent_requests = store
1648            .load_dependent_queued_requests(&self.room_id)
1649            .await
1650            .map_err(RoomSendQueueStorageError::StateStoreError)?;
1651
1652        let num_initial_dependent_requests = dependent_requests.len();
1653        if num_initial_dependent_requests == 0 {
1654            // Returning early here avoids a bit of useless logging.
1655            return Ok(());
1656        }
1657
1658        let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
1659
1660        // Get rid of the all non-canonical dependent events.
1661        for original in &dependent_requests {
1662            if !canonicalized_dependent_requests
1663                .iter()
1664                .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
1665            {
1666                store
1667                    .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
1668                    .await
1669                    .map_err(RoomSendQueueStorageError::StateStoreError)?;
1670            }
1671        }
1672
1673        let mut num_dependent_requests = canonicalized_dependent_requests.len();
1674
1675        debug!(
1676            num_dependent_requests,
1677            num_initial_dependent_requests, "starting handling of dependent requests"
1678        );
1679
1680        for dependent in canonicalized_dependent_requests {
1681            let dependent_id = dependent.own_transaction_id.clone();
1682
1683            match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
1684                Ok(should_remove) => {
1685                    if should_remove {
1686                        // The dependent request has been successfully applied, forget about it.
1687                        store
1688                            .remove_dependent_queued_request(&self.room_id, &dependent_id)
1689                            .await
1690                            .map_err(RoomSendQueueStorageError::StateStoreError)?;
1691
1692                        num_dependent_requests -= 1;
1693                    }
1694                }
1695
1696                Err(err) => {
1697                    warn!("error when applying single dependent request: {err}");
1698                }
1699            }
1700        }
1701
1702        debug!(
1703            leftover_dependent_requests = num_dependent_requests,
1704            "stopped handling dependent request"
1705        );
1706
1707        Ok(())
1708    }
1709
1710    /// Remove a single dependent request from storage.
1711    async fn remove_dependent_send_queue_request(
1712        &self,
1713        dependent_event_id: &ChildTransactionId,
1714    ) -> Result<bool, RoomSendQueueStorageError> {
1715        Ok(self
1716            .store
1717            .lock()
1718            .await
1719            .client()?
1720            .state_store()
1721            .remove_dependent_queued_request(&self.room_id, dependent_event_id)
1722            .await?)
1723    }
1724}
1725
1726/// The content of a local echo.
1727#[derive(Clone, Debug)]
1728pub enum LocalEchoContent {
1729    /// The local echo contains an actual event ready to display.
1730    Event {
1731        /// Content of the event itself (along with its type) that we are about
1732        /// to send.
1733        serialized_event: SerializableEventContent,
1734        /// A handle to manipulate the sending of the associated event.
1735        send_handle: SendHandle,
1736        /// Whether trying to send this local echo failed in the past with an
1737        /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
1738        send_error: Option<QueueWedgeError>,
1739    },
1740
1741    /// A local echo has been reacted to.
1742    React {
1743        /// The key with which the local echo has been reacted to.
1744        key: String,
1745        /// A handle to manipulate the sending of the reaction.
1746        send_handle: SendReactionHandle,
1747        /// The local echo which has been reacted to.
1748        applies_to: OwnedTransactionId,
1749    },
1750}
1751
1752/// A local representation for a request that hasn't been sent yet to the user's
1753/// homeserver.
1754#[derive(Clone, Debug)]
1755pub struct LocalEcho {
1756    /// Transaction id used to identify the associated request.
1757    pub transaction_id: OwnedTransactionId,
1758    /// The content for the local echo.
1759    pub content: LocalEchoContent,
1760}
1761
1762/// An update to a room send queue, observable with
1763/// [`RoomSendQueue::subscribe`].
1764#[derive(Clone, Debug)]
1765pub enum RoomSendQueueUpdate {
1766    /// A new local event is being sent.
1767    ///
1768    /// There's been a user query to create this event. It is being sent to the
1769    /// server.
1770    NewLocalEvent(LocalEcho),
1771
1772    /// A local event that hadn't been sent to the server yet has been cancelled
1773    /// before sending.
1774    CancelledLocalEvent {
1775        /// Transaction id used to identify this event.
1776        transaction_id: OwnedTransactionId,
1777    },
1778
1779    /// A local event's content has been replaced with something else.
1780    ReplacedLocalEvent {
1781        /// Transaction id used to identify this event.
1782        transaction_id: OwnedTransactionId,
1783
1784        /// The new content replacing the previous one.
1785        new_content: SerializableEventContent,
1786    },
1787
1788    /// An error happened when an event was being sent.
1789    ///
1790    /// The event has not been removed from the queue. All the send queues
1791    /// will be disabled after this happens, and must be manually re-enabled.
1792    SendError {
1793        /// Transaction id used to identify this event.
1794        transaction_id: OwnedTransactionId,
1795        /// Error received while sending the event.
1796        error: Arc<crate::Error>,
1797        /// Whether the error is considered recoverable or not.
1798        ///
1799        /// An error that's recoverable will disable the room's send queue,
1800        /// while an unrecoverable error will be parked, until the user
1801        /// decides to cancel sending it.
1802        is_recoverable: bool,
1803    },
1804
1805    /// The event has been unwedged and sending is now being retried.
1806    RetryEvent {
1807        /// Transaction id used to identify this event.
1808        transaction_id: OwnedTransactionId,
1809    },
1810
1811    /// The event has been sent to the server, and the query returned
1812    /// successfully.
1813    SentEvent {
1814        /// Transaction id used to identify this event.
1815        transaction_id: OwnedTransactionId,
1816        /// Received event id from the send response.
1817        event_id: OwnedEventId,
1818    },
1819
1820    /// A media has been successfully uploaded.
1821    UploadedMedia {
1822        /// The media event this uploaded media relates to.
1823        related_to: OwnedTransactionId,
1824
1825        /// The final media source for the file that was just uploaded.
1826        file: MediaSource,
1827    },
1828}
1829
1830/// An error triggered by the send queue module.
1831#[derive(Debug, thiserror::Error)]
1832pub enum RoomSendQueueError {
1833    /// The room isn't in the joined state.
1834    #[error("the room isn't in the joined state")]
1835    RoomNotJoined,
1836
1837    /// The room is missing from the client.
1838    ///
1839    /// This happens only whenever the client is shutting down.
1840    #[error("the room is now missing from the client")]
1841    RoomDisappeared,
1842
1843    /// Error coming from storage.
1844    #[error(transparent)]
1845    StorageError(#[from] RoomSendQueueStorageError),
1846
1847    /// The attachment event failed to be created.
1848    #[error("the attachment event could not be created")]
1849    FailedToCreateAttachment,
1850}
1851
1852/// An error triggered by the send queue storage.
1853#[derive(Debug, thiserror::Error)]
1854pub enum RoomSendQueueStorageError {
1855    /// Error caused by the state store.
1856    #[error(transparent)]
1857    StateStoreError(#[from] StoreError),
1858
1859    /// Error caused by the event cache store.
1860    #[error(transparent)]
1861    EventCacheStoreError(#[from] EventCacheStoreError),
1862
1863    /// Error caused when attempting to get a handle on the event cache store.
1864    #[error(transparent)]
1865    LockError(#[from] LockStoreError),
1866
1867    /// Error caused when (de)serializing into/from json.
1868    #[error(transparent)]
1869    JsonSerialization(#[from] serde_json::Error),
1870
1871    /// A parent key was expected to be of a certain type, and it was another
1872    /// type instead.
1873    #[error("a dependent event had an invalid parent key type")]
1874    InvalidParentKey,
1875
1876    /// The client is shutting down.
1877    #[error("The client is shutting down.")]
1878    ClientShuttingDown,
1879
1880    /// An operation not implemented on a send handle.
1881    #[error("This operation is not implemented for media uploads")]
1882    OperationNotImplementedYet,
1883
1884    /// Trying to edit a media caption for something that's not a media.
1885    #[error("Can't edit a media caption when the underlying event isn't a media")]
1886    InvalidMediaCaptionEdit,
1887}
1888
1889/// Extra transaction IDs useful during an upload.
1890#[derive(Clone, Debug)]
1891struct MediaHandles {
1892    /// Transaction id used when uploading the thumbnail.
1893    ///
1894    /// Optional because a media can be uploaded without a thumbnail.
1895    upload_thumbnail_txn: Option<OwnedTransactionId>,
1896
1897    /// Transaction id used when uploading the media itself.
1898    upload_file_txn: OwnedTransactionId,
1899}
1900
1901/// A handle to manipulate an event that was scheduled to be sent to a room.
1902#[derive(Clone, Debug)]
1903pub struct SendHandle {
1904    /// Link to the send queue used to send this request.
1905    room: RoomSendQueue,
1906
1907    /// Transaction id used for the sent request.
1908    ///
1909    /// If this is a media upload, this is the "main" transaction id, i.e. the
1910    /// one used to send the event, and that will be seen by observers.
1911    transaction_id: OwnedTransactionId,
1912
1913    /// Additional handles for a media upload.
1914    media_handles: Vec<MediaHandles>,
1915
1916    /// The time at which the event to be sent has been created.
1917    pub created_at: MilliSecondsSinceUnixEpoch,
1918}
1919
1920impl SendHandle {
1921    fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
1922        if !self.media_handles.is_empty() {
1923            Err(RoomSendQueueStorageError::OperationNotImplementedYet)
1924        } else {
1925            Ok(())
1926        }
1927    }
1928
1929    /// Aborts the sending of the event, if it wasn't sent yet.
1930    ///
1931    /// Returns true if the sending could be aborted, false if not (i.e. the
1932    /// event had already been sent).
1933    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
1934    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
1935        trace!("received an abort request");
1936
1937        let queue = &self.room.inner.queue;
1938
1939        for handles in &self.media_handles {
1940            if queue.abort_upload(&self.transaction_id, handles).await? {
1941                // Propagate a cancelled update.
1942                let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
1943                    transaction_id: self.transaction_id.clone(),
1944                });
1945
1946                return Ok(true);
1947            }
1948
1949            // If it failed, it means the sending of the event is not a
1950            // dependent request anymore. Fall back to the regular
1951            // code path below, that handles aborting sending of an event.
1952        }
1953
1954        if queue.cancel_event(&self.transaction_id).await? {
1955            trace!("successful abort");
1956
1957            // Propagate a cancelled update too.
1958            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
1959                transaction_id: self.transaction_id.clone(),
1960            });
1961
1962            Ok(true)
1963        } else {
1964            debug!("local echo didn't exist anymore, can't abort");
1965            Ok(false)
1966        }
1967    }
1968
1969    /// Edits the content of a local echo with a raw event content.
1970    ///
1971    /// Returns true if the event to be sent was replaced, false if not (i.e.
1972    /// the event had already been sent).
1973    #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
1974    pub async fn edit_raw(
1975        &self,
1976        new_content: Raw<AnyMessageLikeEventContent>,
1977        event_type: String,
1978    ) -> Result<bool, RoomSendQueueStorageError> {
1979        trace!("received an edit request");
1980        self.nyi_for_uploads()?;
1981
1982        let serializable = SerializableEventContent::from_raw(new_content, event_type);
1983
1984        if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
1985            trace!("successful edit");
1986
1987            // Wake up the queue, in case the room was asleep before the edit.
1988            self.room.inner.notifier.notify_one();
1989
1990            // Propagate a replaced update too.
1991            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
1992                transaction_id: self.transaction_id.clone(),
1993                new_content: serializable,
1994            });
1995
1996            Ok(true)
1997        } else {
1998            debug!("local echo doesn't exist anymore, can't edit");
1999            Ok(false)
2000        }
2001    }
2002
2003    /// Edits the content of a local echo with an event content.
2004    ///
2005    /// Returns true if the event to be sent was replaced, false if not (i.e.
2006    /// the event had already been sent).
2007    pub async fn edit(
2008        &self,
2009        new_content: AnyMessageLikeEventContent,
2010    ) -> Result<bool, RoomSendQueueStorageError> {
2011        self.edit_raw(
2012            Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2013            new_content.event_type().to_string(),
2014        )
2015        .await
2016    }
2017
2018    /// Edits the content of a local echo with a media caption.
2019    ///
2020    /// Will fail if the event to be sent, represented by this send handle,
2021    /// wasn't a media.
2022    pub async fn edit_media_caption(
2023        &self,
2024        caption: Option<String>,
2025        formatted_caption: Option<FormattedBody>,
2026        mentions: Option<Mentions>,
2027    ) -> Result<bool, RoomSendQueueStorageError> {
2028        if let Some(new_content) = self
2029            .room
2030            .inner
2031            .queue
2032            .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2033            .await?
2034        {
2035            trace!("successful edit of media caption");
2036
2037            // Wake up the queue, in case the room was asleep before the edit.
2038            self.room.inner.notifier.notify_one();
2039
2040            let new_content = SerializableEventContent::new(&new_content)
2041                .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2042
2043            // Propagate a replaced update too.
2044            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
2045                transaction_id: self.transaction_id.clone(),
2046                new_content,
2047            });
2048
2049            Ok(true)
2050        } else {
2051            debug!("local echo doesn't exist anymore, can't edit media caption");
2052            Ok(false)
2053        }
2054    }
2055
2056    /// Unwedge a local echo identified by its transaction identifier and try to
2057    /// resend it.
2058    pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2059        let room = &self.room.inner;
2060        room.queue
2061            .mark_as_unwedged(&self.transaction_id)
2062            .await
2063            .map_err(RoomSendQueueError::StorageError)?;
2064
2065        // If we have media handles, also try to unwedge them.
2066        //
2067        // It's fine to always do it to *all* the transaction IDs at once, because only
2068        // one of the three requests will be active at the same time, i.e. only
2069        // one entry will be updated in the store. The other two are either
2070        // done, or dependent requests.
2071
2072        for handles in &self.media_handles {
2073            room.queue
2074                .mark_as_unwedged(&handles.upload_file_txn)
2075                .await
2076                .map_err(RoomSendQueueError::StorageError)?;
2077
2078            if let Some(txn) = &handles.upload_thumbnail_txn {
2079                room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2080            }
2081        }
2082
2083        // Wake up the queue, in case the room was asleep before unwedging the request.
2084        room.notifier.notify_one();
2085
2086        let _ = room
2087            .updates
2088            .send(RoomSendQueueUpdate::RetryEvent { transaction_id: self.transaction_id.clone() });
2089
2090        Ok(())
2091    }
2092
2093    /// Send a reaction to the event as soon as it's sent.
2094    ///
2095    /// If returning `Ok(None)`; this means the reaction couldn't be sent
2096    /// because the event is already a remote one.
2097    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2098    pub async fn react(
2099        &self,
2100        key: String,
2101    ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2102        trace!("received an intent to react");
2103
2104        let created_at = MilliSecondsSinceUnixEpoch::now();
2105        if let Some(reaction_txn_id) =
2106            self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2107        {
2108            trace!("successfully queued react");
2109
2110            // Wake up the queue, in case the room was asleep before the sending.
2111            self.room.inner.notifier.notify_one();
2112
2113            // Propagate a new local event.
2114            let send_handle = SendReactionHandle {
2115                room: self.room.clone(),
2116                transaction_id: reaction_txn_id.clone(),
2117            };
2118
2119            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2120                // Note: we do want to use the txn_id we're going to use for the reaction, not the
2121                // one for the event we're reacting to.
2122                transaction_id: reaction_txn_id.into(),
2123                content: LocalEchoContent::React {
2124                    key,
2125                    send_handle: send_handle.clone(),
2126                    applies_to: self.transaction_id.clone(),
2127                },
2128            }));
2129
2130            Ok(Some(send_handle))
2131        } else {
2132            debug!("local echo doesn't exist anymore, can't react");
2133            Ok(None)
2134        }
2135    }
2136}
2137
2138/// A handle to execute actions on the sending of a reaction.
2139#[derive(Clone, Debug)]
2140pub struct SendReactionHandle {
2141    /// Reference to the send queue for the room where this reaction was sent.
2142    room: RoomSendQueue,
2143    /// The own transaction id for the reaction.
2144    transaction_id: ChildTransactionId,
2145}
2146
2147impl SendReactionHandle {
2148    /// Abort the sending of the reaction.
2149    ///
2150    /// Will return true if the reaction could be aborted, false if it's been
2151    /// sent (and there's no matching local echo anymore).
2152    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2153        if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2154            // Simple case: the reaction was found in the dependent event list.
2155
2156            // Propagate a cancelled update too.
2157            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2158                transaction_id: self.transaction_id.clone().into(),
2159            });
2160
2161            return Ok(true);
2162        }
2163
2164        // The reaction has already been queued for sending, try to abort it using a
2165        // regular abort.
2166        let handle = SendHandle {
2167            room: self.room.clone(),
2168            transaction_id: self.transaction_id.clone().into(),
2169            media_handles: vec![],
2170            created_at: MilliSecondsSinceUnixEpoch::now(),
2171        };
2172
2173        handle.abort().await
2174    }
2175
2176    /// The transaction id that will be used to send this reaction later.
2177    pub fn transaction_id(&self) -> &TransactionId {
2178        &self.transaction_id
2179    }
2180}
2181
2182/// From a given source of [`DependentQueuedRequest`], return only the most
2183/// meaningful, i.e. the ones that wouldn't be overridden after applying the
2184/// others.
2185fn canonicalize_dependent_requests(
2186    dependent: &[DependentQueuedRequest],
2187) -> Vec<DependentQueuedRequest> {
2188    let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2189
2190    for d in dependent {
2191        let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2192
2193        if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2194            // The parent event has already been flagged for redaction, don't consider the
2195            // other dependent events.
2196            continue;
2197        }
2198
2199        match &d.kind {
2200            DependentQueuedRequestKind::EditEvent { .. } => {
2201                // Replace any previous edit with this one.
2202                if let Some(prev_edit) = prevs
2203                    .iter_mut()
2204                    .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2205                {
2206                    *prev_edit = d;
2207                } else {
2208                    prevs.insert(0, d);
2209                }
2210            }
2211
2212            DependentQueuedRequestKind::UploadFileWithThumbnail { .. }
2213            | DependentQueuedRequestKind::FinishUpload { .. }
2214            | DependentQueuedRequestKind::ReactEvent { .. } => {
2215                // These requests can't be canonicalized, push them as is.
2216                prevs.push(d);
2217            }
2218
2219            DependentQueuedRequestKind::RedactEvent => {
2220                // Remove every other dependent action.
2221                prevs.clear();
2222                prevs.push(d);
2223            }
2224        }
2225    }
2226
2227    by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2228}
2229
2230#[cfg(all(test, not(target_arch = "wasm32")))]
2231mod tests {
2232    use std::{sync::Arc, time::Duration};
2233
2234    use assert_matches2::{assert_let, assert_matches};
2235    use matrix_sdk_base::store::{
2236        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2237        SerializableEventContent,
2238    };
2239    use matrix_sdk_test::{async_test, JoinedRoomBuilder, SyncResponseBuilder};
2240    use ruma::{
2241        events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
2242        room_id, MilliSecondsSinceUnixEpoch, TransactionId,
2243    };
2244
2245    use super::canonicalize_dependent_requests;
2246    use crate::{client::WeakClient, test_utils::logged_in_client};
2247
2248    #[test]
2249    fn test_canonicalize_dependent_events_created_at() {
2250        // Test to ensure the created_at field is being serialized and retrieved
2251        // correctly.
2252        let txn = TransactionId::new();
2253        let created_at = MilliSecondsSinceUnixEpoch::now();
2254
2255        let edit = DependentQueuedRequest {
2256            own_transaction_id: ChildTransactionId::new(),
2257            parent_transaction_id: txn.clone(),
2258            kind: DependentQueuedRequestKind::EditEvent {
2259                new_content: SerializableEventContent::new(
2260                    &RoomMessageEventContent::text_plain("edit").into(),
2261                )
2262                .unwrap(),
2263            },
2264            parent_key: None,
2265            created_at,
2266        };
2267
2268        let res = canonicalize_dependent_requests(&[edit]);
2269
2270        assert_eq!(res.len(), 1);
2271        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2272        assert_let!(
2273            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2274        );
2275        assert_eq!(msg.body(), "edit");
2276        assert_eq!(res[0].parent_transaction_id, txn);
2277        assert_eq!(res[0].created_at, created_at);
2278    }
2279
2280    #[async_test]
2281    async fn test_client_no_cycle_with_send_queue() {
2282        for enabled in [true, false] {
2283            let client = logged_in_client(None).await;
2284            let weak_client = WeakClient::from_client(&client);
2285
2286            {
2287                let mut sync_response_builder = SyncResponseBuilder::new();
2288
2289                let room_id = room_id!("!a:b.c");
2290
2291                // Make sure the client knows about the room.
2292                client
2293                    .base_client()
2294                    .receive_sync_response(
2295                        sync_response_builder
2296                            .add_joined_room(JoinedRoomBuilder::new(room_id))
2297                            .build_sync_response(),
2298                    )
2299                    .await
2300                    .unwrap();
2301
2302                let room = client.get_room(room_id).unwrap();
2303                let q = room.send_queue();
2304
2305                let _watcher = q.subscribe().await;
2306
2307                client.send_queue().set_enabled(enabled).await;
2308            }
2309
2310            drop(client);
2311
2312            // Give a bit of time for background tasks to die.
2313            tokio::time::sleep(Duration::from_millis(500)).await;
2314
2315            // The weak client must be the last reference to the client now.
2316            let client = weak_client.get();
2317            assert!(
2318                client.is_none(),
2319                "too many strong references to the client: {}",
2320                Arc::strong_count(&client.unwrap().inner)
2321            );
2322        }
2323    }
2324
2325    #[test]
2326    fn test_canonicalize_dependent_events_smoke_test() {
2327        // Smoke test: canonicalizing a single dependent event returns it.
2328        let txn = TransactionId::new();
2329
2330        let edit = DependentQueuedRequest {
2331            own_transaction_id: ChildTransactionId::new(),
2332            parent_transaction_id: txn.clone(),
2333            kind: DependentQueuedRequestKind::EditEvent {
2334                new_content: SerializableEventContent::new(
2335                    &RoomMessageEventContent::text_plain("edit").into(),
2336                )
2337                .unwrap(),
2338            },
2339            parent_key: None,
2340            created_at: MilliSecondsSinceUnixEpoch::now(),
2341        };
2342        let res = canonicalize_dependent_requests(&[edit]);
2343
2344        assert_eq!(res.len(), 1);
2345        assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2346        assert_eq!(res[0].parent_transaction_id, txn);
2347        assert!(res[0].parent_key.is_none());
2348    }
2349
2350    #[test]
2351    fn test_canonicalize_dependent_events_redaction_preferred() {
2352        // A redaction is preferred over any other kind of dependent event.
2353        let txn = TransactionId::new();
2354
2355        let mut inputs = Vec::with_capacity(100);
2356        let redact = DependentQueuedRequest {
2357            own_transaction_id: ChildTransactionId::new(),
2358            parent_transaction_id: txn.clone(),
2359            kind: DependentQueuedRequestKind::RedactEvent,
2360            parent_key: None,
2361            created_at: MilliSecondsSinceUnixEpoch::now(),
2362        };
2363
2364        let edit = DependentQueuedRequest {
2365            own_transaction_id: ChildTransactionId::new(),
2366            parent_transaction_id: txn.clone(),
2367            kind: DependentQueuedRequestKind::EditEvent {
2368                new_content: SerializableEventContent::new(
2369                    &RoomMessageEventContent::text_plain("edit").into(),
2370                )
2371                .unwrap(),
2372            },
2373            parent_key: None,
2374            created_at: MilliSecondsSinceUnixEpoch::now(),
2375        };
2376
2377        inputs.push({
2378            let mut edit = edit.clone();
2379            edit.own_transaction_id = ChildTransactionId::new();
2380            edit
2381        });
2382
2383        inputs.push(redact);
2384
2385        for _ in 0..98 {
2386            let mut edit = edit.clone();
2387            edit.own_transaction_id = ChildTransactionId::new();
2388            inputs.push(edit);
2389        }
2390
2391        let res = canonicalize_dependent_requests(&inputs);
2392
2393        assert_eq!(res.len(), 1);
2394        assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
2395        assert_eq!(res[0].parent_transaction_id, txn);
2396    }
2397
2398    #[test]
2399    fn test_canonicalize_dependent_events_last_edit_preferred() {
2400        let parent_txn = TransactionId::new();
2401
2402        // The latest edit of a list is always preferred.
2403        let inputs = (0..10)
2404            .map(|i| DependentQueuedRequest {
2405                own_transaction_id: ChildTransactionId::new(),
2406                parent_transaction_id: parent_txn.clone(),
2407                kind: DependentQueuedRequestKind::EditEvent {
2408                    new_content: SerializableEventContent::new(
2409                        &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
2410                    )
2411                    .unwrap(),
2412                },
2413                parent_key: None,
2414                created_at: MilliSecondsSinceUnixEpoch::now(),
2415            })
2416            .collect::<Vec<_>>();
2417
2418        let txn = inputs[9].parent_transaction_id.clone();
2419
2420        let res = canonicalize_dependent_requests(&inputs);
2421
2422        assert_eq!(res.len(), 1);
2423        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2424        assert_let!(
2425            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2426        );
2427        assert_eq!(msg.body(), "edit9");
2428        assert_eq!(res[0].parent_transaction_id, txn);
2429    }
2430
2431    #[test]
2432    fn test_canonicalize_multiple_local_echoes() {
2433        let txn1 = TransactionId::new();
2434        let txn2 = TransactionId::new();
2435
2436        let child1 = ChildTransactionId::new();
2437        let child2 = ChildTransactionId::new();
2438
2439        let inputs = vec![
2440            // This one pertains to txn1.
2441            DependentQueuedRequest {
2442                own_transaction_id: child1.clone(),
2443                kind: DependentQueuedRequestKind::RedactEvent,
2444                parent_transaction_id: txn1.clone(),
2445                parent_key: None,
2446                created_at: MilliSecondsSinceUnixEpoch::now(),
2447            },
2448            // This one pertains to txn2.
2449            DependentQueuedRequest {
2450                own_transaction_id: child2,
2451                kind: DependentQueuedRequestKind::EditEvent {
2452                    new_content: SerializableEventContent::new(
2453                        &RoomMessageEventContent::text_plain("edit").into(),
2454                    )
2455                    .unwrap(),
2456                },
2457                parent_transaction_id: txn2.clone(),
2458                parent_key: None,
2459                created_at: MilliSecondsSinceUnixEpoch::now(),
2460            },
2461        ];
2462
2463        let res = canonicalize_dependent_requests(&inputs);
2464
2465        // The canonicalization shouldn't depend per event id.
2466        assert_eq!(res.len(), 2);
2467
2468        for dependent in res {
2469            if dependent.own_transaction_id == child1 {
2470                assert_eq!(dependent.parent_transaction_id, txn1);
2471                assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
2472            } else {
2473                assert_eq!(dependent.parent_transaction_id, txn2);
2474                assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
2475            }
2476        }
2477    }
2478
2479    #[test]
2480    fn test_canonicalize_reactions_after_edits() {
2481        // Sending reactions should happen after edits to a given event.
2482        let txn = TransactionId::new();
2483
2484        let react_id = ChildTransactionId::new();
2485        let react = DependentQueuedRequest {
2486            own_transaction_id: react_id.clone(),
2487            kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
2488            parent_transaction_id: txn.clone(),
2489            parent_key: None,
2490            created_at: MilliSecondsSinceUnixEpoch::now(),
2491        };
2492
2493        let edit_id = ChildTransactionId::new();
2494        let edit = DependentQueuedRequest {
2495            own_transaction_id: edit_id.clone(),
2496            kind: DependentQueuedRequestKind::EditEvent {
2497                new_content: SerializableEventContent::new(
2498                    &RoomMessageEventContent::text_plain("edit").into(),
2499                )
2500                .unwrap(),
2501            },
2502            parent_transaction_id: txn,
2503            parent_key: None,
2504            created_at: MilliSecondsSinceUnixEpoch::now(),
2505        };
2506
2507        let res = canonicalize_dependent_requests(&[react, edit]);
2508
2509        assert_eq!(res.len(), 2);
2510        assert_eq!(res[0].own_transaction_id, edit_id);
2511        assert_eq!(res[1].own_transaction_id, react_id);
2512    }
2513}