matrix_sdk/send_queue/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A send queue facility to serializing queuing and sending of messages.
16//!
17//! # [`Room`] send queue
18//!
19//! Each room gets its own [`RoomSendQueue`], that's available by calling
20//! [`Room::send_queue()`]. The first time this method is called, it will spawn
21//! a background task that's used to actually send events, in the order they
22//! were passed from calls to [`RoomSendQueue::send()`].
23//!
24//! This queue tries to simplify error management around sending events, using
25//! [`RoomSendQueue::send`] or [`RoomSendQueue::send_raw`]: by default, it will retry to send the
26//! same event a few times, before automatically disabling itself, and emitting
27//! a notification that can be listened to with the global send queue (see
28//! paragraph below) or using [`RoomSendQueue::subscribe()`].
29//!
30//! It is possible to control whether a single room is enabled using
31//! [`RoomSendQueue::set_enabled()`].
32//!
33//! # Global [`SendQueue`] object
34//!
35//! The [`Client::send_queue()`] method returns an API object allowing to
36//! control all the room send queues:
37//!
38//! - enable/disable them all at once with [`SendQueue::set_enabled()`].
39//! - get notifications about send errors with [`SendQueue::subscribe_errors`].
40//! - reload all unsent events that had been persisted in storage using
41//!   [`SendQueue::respawn_tasks_for_rooms_with_unsent_requests()`]. It is
42//!   recommended to call this method during initialization of a client,
43//!   otherwise persisted unsent events will only be re-sent after the send
44//!   queue for the given room has been reopened for the first time.
45//!
46//! # Send handle
47//!
48//! Just after queuing a request to send something, a [`SendHandle`] is
49//! returned, allowing manipulating the inflight request.
50//!
51//! For a send handle for an event, it's possible to edit the event / abort
52//! sending it. If it was still in the queue (i.e. not sent yet, or not being
53//! sent), then such an action would happen locally (i.e. in the database).
54//! Otherwise, it is "too late": the background task may be sending
55//! the event already, or has sent it; in that case, the edit/aborting must
56//! happen as an actual event materializing this, on the server. To accomplish
57//! this, the send queue may send such an event, using the dependency system
58//! described below.
59//!
60//! # Dependency system
61//!
62//! The send queue includes a simple dependency system, where a
63//! [`QueuedRequest`] can have zero or more dependents in the form of
64//! [`DependentQueuedRequest`]. A dependent queued request can have at most one
65//! depended-upon (parent) queued request.
66//!
67//! This allows implementing deferred edits/redacts, as hinted to in the
68//! previous section.
69//!
70//! ## Media upload
71//!
72//! This dependency system also allows uploading medias, since the media's
73//! *content* must be uploaded before we send the media *event* that describes
74//! it.
75//!
76//! In the simplest case, that is, a media file and its event must be sent (i.e.
77//! no thumbnails):
78//!
79//! - The file's content is immediately cached in the
80//!   [`matrix_sdk_base::event_cache::store::EventCacheStore`], using an MXC ID
81//!   that is temporary and designates a local URI without any possible doubt.
82//! - An initial media event is created and uses this temporary MXC ID, and
83//!   propagated as a local echo for an event.
84//! - A [`QueuedRequest`] is pushed to upload the file's media
85//!   ([`QueuedRequestKind::MediaUpload`]).
86//! - A [`DependentQueuedRequest`] is pushed to finish the upload
87//!   ([`DependentQueuedRequestKind::FinishUpload`]).
88//!
89//! What is expected to happen, if all goes well, is the following:
90//!
91//! - the media is uploaded to the media homeserver, which returns the final MXC
92//!   ID.
93//! - when marking the upload request as sent, the MXC ID is injected (as a
94//!   [`matrix_sdk_base::store::SentRequestKey`]) into the dependent request
95//!   [`DependentQueuedRequestKind::FinishUpload`] created in the last step
96//!   above.
97//! - next time the send queue handles dependent queries, it'll see this one is
98//!   ready to be sent, and it will transform it into an event queued request
99//!   ([`QueuedRequestKind::Event`]), with the event created in the local echo
100//!   before, updated with the MXC ID returned from the server.
101//! - this updated local echo is also propagated as an edit of the local echo to
102//!   observers, who get the final version with the final MXC IDs at this point
103//!   too.
104//! - then the event is sent normally, as any event sent with the send queue.
105//!
106//! When there is a thumbnail, things behave similarly, with some tweaks:
107//!
108//! - the thumbnail's content is also stored into the cache store immediately,
109//! - the thumbnail is sent first as an [`QueuedRequestKind::MediaUpload`]
110//!   request,
111//! - the file upload is pushed as a dependent request of kind
112//!   [`DependentQueuedRequestKind::UploadFileWithThumbnail`] (this variant
113//!   keeps the file's key used to look it up in the cache store).
114//! - the media event is then sent as a dependent request as described in the
115//!   previous section.
116//!
117//! What's expected to happen is thus the following:
118//!
119//! - After the thumbnail has been uploaded, the dependent query will retrieve
120//!   the final MXC ID returned by the homeserver for the thumbnail, and store
121//!   it into the [`QueuedRequestKind::MediaUpload`]'s `thumbnail_source` field,
122//!   allowing to remember the thumbnail MXC ID when it's time to finish the
123//!   upload later.
124//! - The dependent request is morphed into another
125//!   [`QueuedRequestKind::MediaUpload`], for the file itself.
126//!
127//! The rest of the process is then similar to that of uploading a file without
128//! a thumbnail. The only difference is that there's a thumbnail source (MXC ID)
129//! remembered and fixed up into the media event, just before sending it.
130
131use std::{
132    collections::{BTreeMap, HashMap},
133    str::FromStr as _,
134    sync::{
135        atomic::{AtomicBool, Ordering},
136        Arc, RwLock,
137    },
138};
139
140use as_variant::as_variant;
141use matrix_sdk_base::{
142    event_cache::store::EventCacheStoreError,
143    media::MediaRequestParameters,
144    store::{
145        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
146        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
147        SentMediaInfo, SentRequestKey, SerializableEventContent,
148    },
149    store_locks::LockStoreError,
150    RoomState, StoreError,
151};
152use matrix_sdk_common::executor::{spawn, JoinHandle};
153use mime::Mime;
154use ruma::{
155    events::{
156        reaction::ReactionEventContent,
157        relation::Annotation,
158        room::{
159            message::{FormattedBody, RoomMessageEventContent},
160            MediaSource,
161        },
162        AnyMessageLikeEventContent, EventContent as _, Mentions,
163    },
164    serde::Raw,
165    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
166};
167use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
168use tracing::{debug, error, info, instrument, trace, warn};
169
170#[cfg(feature = "e2e-encryption")]
171use crate::crypto::{OlmError, SessionRecipientCollectionError};
172use crate::{
173    client::WeakClient,
174    config::RequestConfig,
175    error::RetryKind,
176    room::{edit::EditedContent, WeakRoom},
177    Client, Media, Room,
178};
179
180mod upload;
181
182/// A client-wide send queue, for all the rooms known by a client.
183pub struct SendQueue {
184    client: Client,
185}
186
187#[cfg(not(tarpaulin_include))]
188impl std::fmt::Debug for SendQueue {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        f.debug_struct("SendQueue").finish_non_exhaustive()
191    }
192}
193
194impl SendQueue {
195    pub(super) fn new(client: Client) -> Self {
196        Self { client }
197    }
198
199    /// Reload all the rooms which had unsent requests, and respawn tasks for
200    /// those rooms.
201    pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
202        if !self.is_enabled() {
203            return;
204        }
205
206        let room_ids =
207            self.client.store().load_rooms_with_unsent_requests().await.unwrap_or_else(|err| {
208                warn!("error when loading rooms with unsent requests: {err}");
209                Vec::new()
210            });
211
212        // Getting the [`RoomSendQueue`] is sufficient to spawn the task if needs be.
213        for room_id in room_ids {
214            if let Some(room) = self.client.get_room(&room_id) {
215                let _ = self.for_room(room);
216            }
217        }
218    }
219
220    /// Tiny helper to get the send queue's global context from the [`Client`].
221    #[inline(always)]
222    fn data(&self) -> &SendQueueData {
223        &self.client.inner.send_queue_data
224    }
225
226    /// Get or create a new send queue for a given room, and insert it into our
227    /// memoized rooms mapping.
228    fn for_room(&self, room: Room) -> RoomSendQueue {
229        let data = self.data();
230
231        let mut map = data.rooms.write().unwrap();
232
233        let room_id = room.room_id();
234        if let Some(room_q) = map.get(room_id).cloned() {
235            return room_q;
236        }
237
238        let owned_room_id = room_id.to_owned();
239        let room_q = RoomSendQueue::new(
240            self.is_enabled(),
241            data.error_reporter.clone(),
242            data.is_dropping.clone(),
243            &self.client,
244            owned_room_id.clone(),
245        );
246
247        map.insert(owned_room_id, room_q.clone());
248
249        room_q
250    }
251
252    /// Enable or disable the send queue for the entire client, i.e. all rooms.
253    ///
254    /// If we're disabling the queue, and requests were being sent, they're not
255    /// aborted, and will continue until a status resolves (error responses
256    /// will keep the events in the buffer of events to send later). The
257    /// disablement will happen before the next request is sent.
258    ///
259    /// This may wake up background tasks and resume sending of requests in the
260    /// background.
261    pub async fn set_enabled(&self, enabled: bool) {
262        debug!(?enabled, "setting global send queue enablement");
263
264        self.data().globally_enabled.store(enabled, Ordering::SeqCst);
265
266        // Wake up individual rooms we already know about.
267        for room in self.data().rooms.read().unwrap().values() {
268            room.set_enabled(enabled);
269        }
270
271        // Reload some extra rooms that might not have been awaken yet, but could have
272        // requests from previous sessions.
273        self.respawn_tasks_for_rooms_with_unsent_requests().await;
274    }
275
276    /// Returns whether the send queue is enabled, at a client-wide
277    /// granularity.
278    pub fn is_enabled(&self) -> bool {
279        self.data().globally_enabled.load(Ordering::SeqCst)
280    }
281
282    /// A subscriber to the enablement status (enabled or disabled) of the
283    /// send queue, along with useful errors.
284    pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
285        self.data().error_reporter.subscribe()
286    }
287}
288
289/// A specific room's send queue ran into an error, and it has disabled itself.
290#[derive(Clone, Debug)]
291pub struct SendQueueRoomError {
292    /// For which room is the send queue failing?
293    pub room_id: OwnedRoomId,
294
295    /// The error the room has ran into, when trying to send a request.
296    pub error: Arc<crate::Error>,
297
298    /// Whether the error is considered recoverable or not.
299    ///
300    /// An error that's recoverable will disable the room's send queue, while an
301    /// unrecoverable error will be parked, until the user decides to do
302    /// something about it.
303    pub is_recoverable: bool,
304}
305
306impl Client {
307    /// Returns a [`SendQueue`] that handles sending, retrying and not
308    /// forgetting about requests that are to be sent.
309    pub fn send_queue(&self) -> SendQueue {
310        SendQueue::new(self.clone())
311    }
312}
313
314pub(super) struct SendQueueData {
315    /// Mapping of room to their unique send queue.
316    rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
317
318    /// Is the whole mechanism enabled or disabled?
319    ///
320    /// This is only kept in memory to initialize new room queues with an
321    /// initial enablement state.
322    globally_enabled: AtomicBool,
323
324    /// Global error updates for the send queue.
325    error_reporter: broadcast::Sender<SendQueueRoomError>,
326
327    /// Are we currently dropping the Client?
328    is_dropping: Arc<AtomicBool>,
329}
330
331impl SendQueueData {
332    /// Create the data for a send queue, in the given enabled state.
333    pub fn new(globally_enabled: bool) -> Self {
334        let (sender, _) = broadcast::channel(32);
335
336        Self {
337            rooms: Default::default(),
338            globally_enabled: AtomicBool::new(globally_enabled),
339            error_reporter: sender,
340            is_dropping: Arc::new(false.into()),
341        }
342    }
343}
344
345impl Drop for SendQueueData {
346    fn drop(&mut self) {
347        // Mark the whole send queue as shutting down, then wake up all the room
348        // queues so they're stopped too.
349        debug!("globally dropping the send queue");
350        self.is_dropping.store(true, Ordering::SeqCst);
351
352        let rooms = self.rooms.read().unwrap();
353        for room in rooms.values() {
354            room.inner.notifier.notify_one();
355        }
356    }
357}
358
359impl Room {
360    /// Returns the [`RoomSendQueue`] for this specific room.
361    pub fn send_queue(&self) -> RoomSendQueue {
362        self.client.send_queue().for_room(self.clone())
363    }
364}
365
366/// A per-room send queue.
367///
368/// This is cheap to clone.
369#[derive(Clone)]
370pub struct RoomSendQueue {
371    inner: Arc<RoomSendQueueInner>,
372}
373
374#[cfg(not(tarpaulin_include))]
375impl std::fmt::Debug for RoomSendQueue {
376    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
377        f.debug_struct("RoomSendQueue").finish_non_exhaustive()
378    }
379}
380
381impl RoomSendQueue {
382    fn new(
383        globally_enabled: bool,
384        global_error_reporter: broadcast::Sender<SendQueueRoomError>,
385        is_dropping: Arc<AtomicBool>,
386        client: &Client,
387        room_id: OwnedRoomId,
388    ) -> Self {
389        let (updates_sender, _) = broadcast::channel(32);
390
391        let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
392        let notifier = Arc::new(Notify::new());
393
394        let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
395        let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
396
397        let task = spawn(Self::sending_task(
398            weak_room.clone(),
399            queue.clone(),
400            notifier.clone(),
401            updates_sender.clone(),
402            locally_enabled.clone(),
403            global_error_reporter,
404            is_dropping,
405        ));
406
407        Self {
408            inner: Arc::new(RoomSendQueueInner {
409                room: weak_room,
410                updates: updates_sender,
411                _task: task,
412                queue,
413                notifier,
414                locally_enabled,
415            }),
416        }
417    }
418
419    /// Queues a raw event for sending it to this room.
420    ///
421    /// This immediately returns, and will push the event to be sent into a
422    /// queue, handled in the background.
423    ///
424    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
425    /// the [`Self::subscribe()`] method to get updates about the sending of
426    /// that event.
427    ///
428    /// By default, if sending failed on the first attempt, it will be retried a
429    /// few times. If sending failed after those retries, the entire
430    /// client's sending queue will be disabled, and it will need to be
431    /// manually re-enabled by the caller (e.g. after network is back, or when
432    /// something has been done about the faulty requests).
433    pub async fn send_raw(
434        &self,
435        content: Raw<AnyMessageLikeEventContent>,
436        event_type: String,
437    ) -> Result<SendHandle, RoomSendQueueError> {
438        let Some(room) = self.inner.room.get() else {
439            return Err(RoomSendQueueError::RoomDisappeared);
440        };
441        if room.state() != RoomState::Joined {
442            return Err(RoomSendQueueError::RoomNotJoined);
443        }
444
445        let content = SerializableEventContent::from_raw(content, event_type);
446
447        let created_at = MilliSecondsSinceUnixEpoch::now();
448        let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
449        trace!(%transaction_id, "manager sends a raw event to the background task");
450
451        self.inner.notifier.notify_one();
452
453        let send_handle = SendHandle {
454            room: self.clone(),
455            transaction_id: transaction_id.clone(),
456            media_handles: None,
457            created_at,
458        };
459
460        let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
461            transaction_id,
462            content: LocalEchoContent::Event {
463                serialized_event: content,
464                send_handle: send_handle.clone(),
465                send_error: None,
466            },
467        }));
468
469        Ok(send_handle)
470    }
471
472    /// Queues an event for sending it to this room.
473    ///
474    /// This immediately returns, and will push the event to be sent into a
475    /// queue, handled in the background.
476    ///
477    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
478    /// the [`Self::subscribe()`] method to get updates about the sending of
479    /// that event.
480    ///
481    /// By default, if sending failed on the first attempt, it will be retried a
482    /// few times. If sending failed after those retries, the entire
483    /// client's sending queue will be disabled, and it will need to be
484    /// manually re-enabled by the caller (e.g. after network is back, or when
485    /// something has been done about the faulty requests).
486    pub async fn send(
487        &self,
488        content: AnyMessageLikeEventContent,
489    ) -> Result<SendHandle, RoomSendQueueError> {
490        self.send_raw(
491            Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
492            content.event_type().to_string(),
493        )
494        .await
495    }
496
497    /// Returns the current local requests as well as a receiver to listen to
498    /// the send queue updates, as defined in [`RoomSendQueueUpdate`].
499    pub async fn subscribe(
500        &self,
501    ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
502    {
503        let local_echoes = self.inner.queue.local_echoes(self).await?;
504
505        Ok((local_echoes, self.inner.updates.subscribe()))
506    }
507
508    /// A task that must be spawned in the async runtime, running in the
509    /// background for each room that has a send queue.
510    ///
511    /// It only progresses forward: nothing can be cancelled at any point, which
512    /// makes the implementation not overly complicated to follow.
513    #[instrument(skip_all, fields(room_id = %room.room_id()))]
514    async fn sending_task(
515        room: WeakRoom,
516        queue: QueueStorage,
517        notifier: Arc<Notify>,
518        updates: broadcast::Sender<RoomSendQueueUpdate>,
519        locally_enabled: Arc<AtomicBool>,
520        global_error_reporter: broadcast::Sender<SendQueueRoomError>,
521        is_dropping: Arc<AtomicBool>,
522    ) {
523        info!("spawned the sending task");
524
525        loop {
526            // A request to shut down should be preferred above everything else.
527            if is_dropping.load(Ordering::SeqCst) {
528                trace!("shutting down!");
529                break;
530            }
531
532            // Try to apply dependent requests now; those applying to previously failed
533            // attempts (local echoes) would succeed now.
534            let mut new_updates = Vec::new();
535            if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
536                warn!("errors when applying dependent requests: {err}");
537            }
538
539            for up in new_updates {
540                let _ = updates.send(up);
541            }
542
543            if !locally_enabled.load(Ordering::SeqCst) {
544                trace!("not enabled, sleeping");
545                // Wait for an explicit wakeup.
546                notifier.notified().await;
547                continue;
548            }
549
550            let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
551                Ok(Some(request)) => request,
552
553                Ok(None) => {
554                    trace!("queue is empty, sleeping");
555                    // Wait for an explicit wakeup.
556                    notifier.notified().await;
557                    continue;
558                }
559
560                Err(err) => {
561                    warn!("error when loading next request to send: {err}");
562                    continue;
563                }
564            };
565
566            let txn_id = queued_request.transaction_id.clone();
567            trace!(txn_id = %txn_id, "received a request to send!");
568
569            let related_txn_id = as_variant!(&queued_request.kind, QueuedRequestKind::MediaUpload { related_to, .. } => related_to.clone());
570
571            let Some(room) = room.get() else {
572                if is_dropping.load(Ordering::SeqCst) {
573                    break;
574                }
575                error!("the weak room couldn't be upgraded but we're not shutting down?");
576                continue;
577            };
578
579            match Self::handle_request(&room, queued_request, cancel_upload_rx).await {
580                Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
581                {
582                    Ok(()) => match parent_key {
583                        SentRequestKey::Event(event_id) => {
584                            let _ = updates.send(RoomSendQueueUpdate::SentEvent {
585                                transaction_id: txn_id,
586                                event_id,
587                            });
588                        }
589
590                        SentRequestKey::Media(media_info) => {
591                            let _ = updates.send(RoomSendQueueUpdate::UploadedMedia {
592                                related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
593                                file: media_info.file,
594                            });
595                        }
596                    },
597
598                    Err(err) => {
599                        warn!("unable to mark queued request as sent: {err}");
600                    }
601                },
602
603                Ok(None) => {
604                    debug!("Request has been aborted while running, continuing.");
605                }
606
607                Err(err) => {
608                    let is_recoverable = match err {
609                        crate::Error::Http(ref http_err) => {
610                            // All transient errors are recoverable.
611                            matches!(
612                                http_err.retry_kind(),
613                                RetryKind::Transient { .. } | RetryKind::NetworkFailure
614                            )
615                        }
616
617                        // `ConcurrentRequestFailed` typically happens because of an HTTP failure;
618                        // since we don't get the underlying error, be lax and consider it
619                        // recoverable, and let observers decide to retry it or not. At some point
620                        // we'll get the actual underlying error.
621                        crate::Error::ConcurrentRequestFailed => true,
622
623                        // As of 2024-06-27, all other error types are considered unrecoverable.
624                        _ => false,
625                    };
626
627                    // Disable the queue for this room after any kind of error happened.
628                    locally_enabled.store(false, Ordering::SeqCst);
629
630                    if is_recoverable {
631                        warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
632
633                        // In this case, we intentionally keep the request in the queue, but mark it
634                        // as not being sent anymore.
635                        queue.mark_as_not_being_sent(&txn_id).await;
636
637                        // Let observers know about a failure *after* we've
638                        // marked the item as not being sent anymore. Otherwise,
639                        // there's a possible race where a caller might try to
640                        // remove an item, while it's still marked as being
641                        // sent, resulting in a cancellation failure.
642                    } else {
643                        warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
644
645                        // Mark the request as wedged, so it's not picked at any future point.
646                        if let Err(storage_error) =
647                            queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
648                        {
649                            warn!("unable to mark request as wedged: {storage_error}");
650                        }
651                    }
652
653                    let error = Arc::new(err);
654
655                    let _ = global_error_reporter.send(SendQueueRoomError {
656                        room_id: room.room_id().to_owned(),
657                        error: error.clone(),
658                        is_recoverable,
659                    });
660
661                    let _ = updates.send(RoomSendQueueUpdate::SendError {
662                        transaction_id: related_txn_id.unwrap_or(txn_id),
663                        error,
664                        is_recoverable,
665                    });
666                }
667            }
668        }
669
670        info!("exited sending task");
671    }
672
673    /// Handles a single request and returns the [`SentRequestKey`] on success
674    /// (unless the request was cancelled, in which case it'll return
675    /// `None`).
676    async fn handle_request(
677        room: &Room,
678        request: QueuedRequest,
679        cancel_upload_rx: Option<oneshot::Receiver<()>>,
680    ) -> Result<Option<SentRequestKey>, crate::Error> {
681        match request.kind {
682            QueuedRequestKind::Event { content } => {
683                let (event, event_type) = content.raw();
684
685                let res = room
686                    .send_raw(event_type, event)
687                    .with_transaction_id(&request.transaction_id)
688                    .with_request_config(RequestConfig::short_retry())
689                    .await?;
690
691                trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
692                Ok(Some(SentRequestKey::Event(res.event_id)))
693            }
694
695            QueuedRequestKind::MediaUpload {
696                content_type,
697                cache_key,
698                thumbnail_source,
699                related_to: relates_to,
700            } => {
701                trace!(%relates_to, "uploading media related to event");
702
703                let fut = async move {
704                    let mime = Mime::from_str(&content_type).map_err(|_| {
705                        crate::Error::SendQueueWedgeError(QueueWedgeError::InvalidMimeType {
706                            mime_type: content_type.clone(),
707                        })
708                    })?;
709
710                    let data = room
711                        .client()
712                        .event_cache_store()
713                        .lock()
714                        .await?
715                        .get_media_content(&cache_key)
716                        .await?
717                        .ok_or(crate::Error::SendQueueWedgeError(
718                            QueueWedgeError::MissingMediaContent,
719                        ))?;
720
721                    #[cfg(feature = "e2e-encryption")]
722                    let media_source = if room.is_encrypted().await? {
723                        trace!("upload will be encrypted (encrypted room)");
724                        let mut cursor = std::io::Cursor::new(data);
725                        let encrypted_file = room
726                            .client()
727                            .upload_encrypted_file(&mime, &mut cursor)
728                            .with_request_config(RequestConfig::short_retry())
729                            .await?;
730                        MediaSource::Encrypted(Box::new(encrypted_file))
731                    } else {
732                        trace!("upload will be in clear text (room without encryption)");
733                        let request_config = RequestConfig::short_retry()
734                            .timeout(Media::reasonable_upload_timeout(&data));
735                        let res =
736                            room.client().media().upload(&mime, data, Some(request_config)).await?;
737                        MediaSource::Plain(res.content_uri)
738                    };
739
740                    #[cfg(not(feature = "e2e-encryption"))]
741                    let media_source = {
742                        let request_config = RequestConfig::short_retry()
743                            .timeout(Media::reasonable_upload_timeout(&data));
744                        let res =
745                            room.client().media().upload(&mime, data, Some(request_config)).await?;
746                        MediaSource::Plain(res.content_uri)
747                    };
748
749                    let uri = match &media_source {
750                        MediaSource::Plain(uri) => uri,
751                        MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
752                    };
753                    trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
754
755                    Ok(SentRequestKey::Media(SentMediaInfo {
756                        file: media_source,
757                        thumbnail: thumbnail_source,
758                    }))
759                };
760
761                let wait_for_cancel = async move {
762                    if let Some(rx) = cancel_upload_rx {
763                        rx.await
764                    } else {
765                        std::future::pending().await
766                    }
767                };
768
769                tokio::select! {
770                    biased;
771
772                    _ = wait_for_cancel => {
773                        Ok(None)
774                    }
775
776                    res = fut => {
777                        res.map(Some)
778                    }
779                }
780            }
781        }
782    }
783
784    /// Returns whether the room is enabled, at the room level.
785    pub fn is_enabled(&self) -> bool {
786        self.inner.locally_enabled.load(Ordering::SeqCst)
787    }
788
789    /// Set the locally enabled flag for this room queue.
790    pub fn set_enabled(&self, enabled: bool) {
791        self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
792
793        // No need to wake a task to tell it it's been disabled, so only notify if we're
794        // re-enabling the queue.
795        if enabled {
796            self.inner.notifier.notify_one();
797        }
798    }
799}
800
801impl From<&crate::Error> for QueueWedgeError {
802    fn from(value: &crate::Error) -> Self {
803        match value {
804            #[cfg(feature = "e2e-encryption")]
805            crate::Error::OlmError(OlmError::SessionRecipientCollectionError(error)) => match error
806            {
807                SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
808                    QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
809                }
810
811                SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
812                    QueueWedgeError::IdentityViolations { users: users.clone() }
813                }
814
815                SessionRecipientCollectionError::CrossSigningNotSetup
816                | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
817                    QueueWedgeError::CrossVerificationRequired
818                }
819            },
820
821            // Flatten errors of `Self` type.
822            crate::Error::SendQueueWedgeError(error) => error.clone(),
823
824            _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
825        }
826    }
827}
828
829struct RoomSendQueueInner {
830    /// The room which this send queue relates to.
831    room: WeakRoom,
832
833    /// Broadcaster for notifications about the statuses of requests to be sent.
834    ///
835    /// Can be subscribed to from the outside.
836    updates: broadcast::Sender<RoomSendQueueUpdate>,
837
838    /// Queue of requests that are either to be sent, or being sent.
839    ///
840    /// When a request has been sent to the server, it is removed from that
841    /// queue *after* being sent. That way, we will retry sending upon
842    /// failure, in the same order requests have been inserted in the first
843    /// place.
844    queue: QueueStorage,
845
846    /// A notifier that's updated any time common data is touched (stopped or
847    /// enabled statuses), or the associated room [`QueueStorage`].
848    notifier: Arc<Notify>,
849
850    /// Should the room process new requests or not (because e.g. it might be
851    /// running off the network)?
852    locally_enabled: Arc<AtomicBool>,
853
854    /// Handle to the actual sending task. Unused, but kept alive along this
855    /// data structure.
856    _task: JoinHandle<()>,
857}
858
859/// Information about a request being sent right this moment.
860struct BeingSentInfo {
861    /// Transaction id of the thing being sent.
862    transaction_id: OwnedTransactionId,
863
864    /// For an upload request, a trigger to cancel the upload before it
865    /// completes.
866    cancel_upload: Option<oneshot::Sender<()>>,
867}
868
869impl BeingSentInfo {
870    /// Aborts the upload, if a trigger is available.
871    ///
872    /// Consumes the object because the sender is a oneshot and will be consumed
873    /// upon sending.
874    fn cancel_upload(self) -> bool {
875        if let Some(cancel_upload) = self.cancel_upload {
876            let _ = cancel_upload.send(());
877            true
878        } else {
879            false
880        }
881    }
882}
883
884/// A specialized lock that guards both against the state store and the
885/// [`Self::being_sent`] data.
886#[derive(Clone)]
887struct StoreLock {
888    /// Reference to the client, to get access to the underlying store.
889    client: WeakClient,
890
891    /// The one queued request that is being sent at the moment, along with
892    /// associated data that can be useful to act upon it.
893    ///
894    /// Also used as the lock to access the state store.
895    being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
896}
897
898impl StoreLock {
899    /// Gets a hold of the locked store and [`Self::being_sent`] pair.
900    async fn lock(&self) -> StoreLockGuard {
901        StoreLockGuard {
902            client: self.client.clone(),
903            being_sent: self.being_sent.clone().lock_owned().await,
904        }
905    }
906}
907
908/// A lock guard obtained through locking with [`StoreLock`].
909/// `being_sent` data.
910struct StoreLockGuard {
911    /// Reference to the client, to get access to the underlying store.
912    client: WeakClient,
913
914    /// The one queued request that is being sent at the moment, along with
915    /// associated data that can be useful to act upon it.
916    being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
917}
918
919impl StoreLockGuard {
920    /// Get a client from the locked state, useful to get a handle on a store.
921    fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
922        self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
923    }
924}
925
926#[derive(Clone)]
927struct QueueStorage {
928    /// A lock to make sure the state store is only accessed once at a time, to
929    /// make some store operations atomic.
930    store: StoreLock,
931
932    /// To which room is this storage related.
933    room_id: OwnedRoomId,
934}
935
936impl QueueStorage {
937    /// Default priority for a queued request.
938    const LOW_PRIORITY: usize = 0;
939
940    /// High priority for a queued request that must be handled before others.
941    const HIGH_PRIORITY: usize = 10;
942
943    /// Create a new queue for queuing requests to be sent later.
944    fn new(client: WeakClient, room: OwnedRoomId) -> Self {
945        Self { room_id: room, store: StoreLock { client, being_sent: Default::default() } }
946    }
947
948    /// Push a new event to be sent in the queue, with a default priority of 0.
949    ///
950    /// Returns the transaction id chosen to identify the request.
951    async fn push(
952        &self,
953        request: QueuedRequestKind,
954        created_at: MilliSecondsSinceUnixEpoch,
955    ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
956        let transaction_id = TransactionId::new();
957
958        self.store
959            .lock()
960            .await
961            .client()?
962            .store()
963            .save_send_queue_request(
964                &self.room_id,
965                transaction_id.clone(),
966                created_at,
967                request,
968                Self::LOW_PRIORITY,
969            )
970            .await?;
971
972        Ok(transaction_id)
973    }
974
975    /// Peeks the next request to be sent, marking it as being sent.
976    ///
977    /// It is required to call [`Self::mark_as_sent`] after it's been
978    /// effectively sent.
979    async fn peek_next_to_send(
980        &self,
981    ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
982    {
983        let mut guard = self.store.lock().await;
984        let queued_requests =
985            guard.client()?.store().load_send_queue_requests(&self.room_id).await?;
986
987        if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
988            let (cancel_upload_tx, cancel_upload_rx) =
989                if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
990                    let (tx, rx) = oneshot::channel();
991                    (Some(tx), Some(rx))
992                } else {
993                    Default::default()
994                };
995
996            let prev = guard.being_sent.replace(BeingSentInfo {
997                transaction_id: request.transaction_id.clone(),
998                cancel_upload: cancel_upload_tx,
999            });
1000
1001            if let Some(prev) = prev {
1002                error!(
1003                    prev_txn = ?prev.transaction_id,
1004                    "a previous request was still active while picking a new one"
1005                );
1006            }
1007
1008            Ok(Some((request.clone(), cancel_upload_rx)))
1009        } else {
1010            Ok(None)
1011        }
1012    }
1013
1014    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1015    /// with the given transaction id as not being sent anymore, so it can
1016    /// be removed from the queue later.
1017    async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1018        let was_being_sent = self.store.lock().await.being_sent.take();
1019
1020        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1021        if prev_txn != Some(transaction_id) {
1022            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1023        }
1024    }
1025
1026    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1027    /// with the given transaction id as being wedged (and not being sent
1028    /// anymore), so it can be removed from the queue later.
1029    async fn mark_as_wedged(
1030        &self,
1031        transaction_id: &TransactionId,
1032        reason: QueueWedgeError,
1033    ) -> Result<(), RoomSendQueueStorageError> {
1034        // Keep the lock until we're done touching the storage.
1035        let mut guard = self.store.lock().await;
1036        let was_being_sent = guard.being_sent.take();
1037
1038        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1039        if prev_txn != Some(transaction_id) {
1040            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after permanent error)");
1041        }
1042
1043        Ok(guard
1044            .client()?
1045            .store()
1046            .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1047            .await?)
1048    }
1049
1050    /// Marks a request identified with the given transaction id as being now
1051    /// unwedged and adds it back to the queue.
1052    async fn mark_as_unwedged(
1053        &self,
1054        transaction_id: &TransactionId,
1055    ) -> Result<(), RoomSendQueueStorageError> {
1056        Ok(self
1057            .store
1058            .lock()
1059            .await
1060            .client()?
1061            .store()
1062            .update_send_queue_request_status(&self.room_id, transaction_id, None)
1063            .await?)
1064    }
1065
1066    /// Marks a request pushed with [`Self::push`] and identified with the given
1067    /// transaction id as sent, by removing it from the local queue.
1068    async fn mark_as_sent(
1069        &self,
1070        transaction_id: &TransactionId,
1071        parent_key: SentRequestKey,
1072    ) -> Result<(), RoomSendQueueStorageError> {
1073        // Keep the lock until we're done touching the storage.
1074        let mut guard = self.store.lock().await;
1075        let was_being_sent = guard.being_sent.take();
1076
1077        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1078        if prev_txn != Some(transaction_id) {
1079            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after successful send");
1080        }
1081
1082        let client = guard.client()?;
1083        let store = client.store();
1084
1085        // Update all dependent requests.
1086        store
1087            .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1088            .await?;
1089
1090        let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1091
1092        if !removed {
1093            warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1094        }
1095
1096        Ok(())
1097    }
1098
1099    /// Cancel a sending command for an event that has been sent with
1100    /// [`Self::push`] with the given transaction id.
1101    ///
1102    /// Returns whether the given transaction has been effectively removed. If
1103    /// false, this either means that the transaction id was unrelated to
1104    /// this queue, or that the request was sent before we cancelled it.
1105    async fn cancel_event(
1106        &self,
1107        transaction_id: &TransactionId,
1108    ) -> Result<bool, RoomSendQueueStorageError> {
1109        let guard = self.store.lock().await;
1110
1111        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1112            == Some(transaction_id)
1113        {
1114            // Save the intent to redact the event.
1115            guard
1116                .client()?
1117                .store()
1118                .save_dependent_queued_request(
1119                    &self.room_id,
1120                    transaction_id,
1121                    ChildTransactionId::new(),
1122                    MilliSecondsSinceUnixEpoch::now(),
1123                    DependentQueuedRequestKind::RedactEvent,
1124                )
1125                .await?;
1126
1127            return Ok(true);
1128        }
1129
1130        let removed = guard
1131            .client()?
1132            .store()
1133            .remove_send_queue_request(&self.room_id, transaction_id)
1134            .await?;
1135
1136        Ok(removed)
1137    }
1138
1139    /// Replace an event that has been sent with [`Self::push`] with the given
1140    /// transaction id, before it's been actually sent.
1141    ///
1142    /// Returns whether the given transaction has been effectively edited. If
1143    /// false, this either means that the transaction id was unrelated to
1144    /// this queue, or that the request was sent before we edited it.
1145    async fn replace_event(
1146        &self,
1147        transaction_id: &TransactionId,
1148        serializable: SerializableEventContent,
1149    ) -> Result<bool, RoomSendQueueStorageError> {
1150        let guard = self.store.lock().await;
1151
1152        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1153            == Some(transaction_id)
1154        {
1155            // Save the intent to edit the associated event.
1156            guard
1157                .client()?
1158                .store()
1159                .save_dependent_queued_request(
1160                    &self.room_id,
1161                    transaction_id,
1162                    ChildTransactionId::new(),
1163                    MilliSecondsSinceUnixEpoch::now(),
1164                    DependentQueuedRequestKind::EditEvent { new_content: serializable },
1165                )
1166                .await?;
1167
1168            return Ok(true);
1169        }
1170
1171        let edited = guard
1172            .client()?
1173            .store()
1174            .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1175            .await?;
1176
1177        Ok(edited)
1178    }
1179
1180    /// Push requests (and dependents) to upload a media.
1181    ///
1182    /// See the module-level description for details of the whole processus.
1183    #[allow(clippy::too_many_arguments)]
1184    async fn push_media(
1185        &self,
1186        event: RoomMessageEventContent,
1187        content_type: Mime,
1188        send_event_txn: OwnedTransactionId,
1189        created_at: MilliSecondsSinceUnixEpoch,
1190        upload_file_txn: OwnedTransactionId,
1191        file_media_request: MediaRequestParameters,
1192        thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
1193    ) -> Result<(), RoomSendQueueStorageError> {
1194        let guard = self.store.lock().await;
1195        let client = guard.client()?;
1196        let store = client.store();
1197        let thumbnail_info =
1198            if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) =
1199                thumbnail
1200            {
1201                let upload_thumbnail_txn = thumbnail_info.txn.clone();
1202
1203                // Save the thumbnail upload request.
1204                store
1205                    .save_send_queue_request(
1206                        &self.room_id,
1207                        upload_thumbnail_txn.clone(),
1208                        created_at,
1209                        QueuedRequestKind::MediaUpload {
1210                            content_type: thumbnail_content_type.to_string(),
1211                            cache_key: thumbnail_media_request,
1212                            thumbnail_source: None, // the thumbnail has no thumbnails :)
1213                            related_to: send_event_txn.clone(),
1214                        },
1215                        Self::LOW_PRIORITY,
1216                    )
1217                    .await?;
1218
1219                // Save the file upload request as a dependent request of the thumbnail upload.
1220                store
1221                    .save_dependent_queued_request(
1222                        &self.room_id,
1223                        &upload_thumbnail_txn,
1224                        upload_file_txn.clone().into(),
1225                        created_at,
1226                        DependentQueuedRequestKind::UploadFileWithThumbnail {
1227                            content_type: content_type.to_string(),
1228                            cache_key: file_media_request,
1229                            related_to: send_event_txn.clone(),
1230                        },
1231                    )
1232                    .await?;
1233
1234                Some(thumbnail_info)
1235            } else {
1236                // Save the file upload as its own request, not a dependent one.
1237                store
1238                    .save_send_queue_request(
1239                        &self.room_id,
1240                        upload_file_txn.clone(),
1241                        created_at,
1242                        QueuedRequestKind::MediaUpload {
1243                            content_type: content_type.to_string(),
1244                            cache_key: file_media_request,
1245                            thumbnail_source: None,
1246                            related_to: send_event_txn.clone(),
1247                        },
1248                        Self::LOW_PRIORITY,
1249                    )
1250                    .await?;
1251
1252                None
1253            };
1254
1255        // Push the dependent request for the event itself.
1256        store
1257            .save_dependent_queued_request(
1258                &self.room_id,
1259                &upload_file_txn,
1260                send_event_txn.into(),
1261                created_at,
1262                DependentQueuedRequestKind::FinishUpload {
1263                    local_echo: event,
1264                    file_upload: upload_file_txn.clone(),
1265                    thumbnail_info,
1266                },
1267            )
1268            .await?;
1269
1270        Ok(())
1271    }
1272
1273    /// Reacts to the given local echo of an event.
1274    #[instrument(skip(self))]
1275    async fn react(
1276        &self,
1277        transaction_id: &TransactionId,
1278        key: String,
1279        created_at: MilliSecondsSinceUnixEpoch,
1280    ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1281        let guard = self.store.lock().await;
1282        let client = guard.client()?;
1283        let store = client.store();
1284
1285        let requests = store.load_send_queue_requests(&self.room_id).await?;
1286
1287        // If the target event has been already sent, abort immediately.
1288        if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1289            // We didn't find it as a queued request; try to find it as a dependent queued
1290            // request.
1291            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1292            if !dependent_requests
1293                .into_iter()
1294                .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1295                .any(|child_txn| *child_txn == *transaction_id)
1296            {
1297                // We didn't find it as either a request or a dependent request, abort.
1298                return Ok(None);
1299            }
1300        }
1301
1302        // Record the dependent request.
1303        let reaction_txn_id = ChildTransactionId::new();
1304        store
1305            .save_dependent_queued_request(
1306                &self.room_id,
1307                transaction_id,
1308                reaction_txn_id.clone(),
1309                created_at,
1310                DependentQueuedRequestKind::ReactEvent { key },
1311            )
1312            .await?;
1313
1314        Ok(Some(reaction_txn_id))
1315    }
1316
1317    /// Returns a list of the local echoes, that is, all the requests that we're
1318    /// about to send but that haven't been sent yet (or are being sent).
1319    async fn local_echoes(
1320        &self,
1321        room: &RoomSendQueue,
1322    ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1323        let guard = self.store.lock().await;
1324        let client = guard.client()?;
1325        let store = client.store();
1326
1327        let local_requests =
1328            store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1329                Some(LocalEcho {
1330                    transaction_id: queued.transaction_id.clone(),
1331                    content: match queued.kind {
1332                        QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1333                            serialized_event: content,
1334                            send_handle: SendHandle {
1335                                room: room.clone(),
1336                                transaction_id: queued.transaction_id,
1337                                media_handles: None,
1338                                created_at: queued.created_at,
1339                            },
1340                            send_error: queued.error,
1341                        },
1342
1343                        QueuedRequestKind::MediaUpload { .. } => {
1344                            // Don't return uploaded medias as their own things; the accompanying
1345                            // event represented as a dependent request should be sufficient.
1346                            return None;
1347                        }
1348                    },
1349                })
1350            });
1351
1352        let reactions_and_medias = store
1353            .load_dependent_queued_requests(&self.room_id)
1354            .await?
1355            .into_iter()
1356            .filter_map(|dep| match dep.kind {
1357                DependentQueuedRequestKind::EditEvent { .. }
1358                | DependentQueuedRequestKind::RedactEvent => {
1359                    // TODO: reflect local edits/redacts too?
1360                    None
1361                }
1362
1363                DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1364                    transaction_id: dep.own_transaction_id.clone().into(),
1365                    content: LocalEchoContent::React {
1366                        key,
1367                        send_handle: SendReactionHandle {
1368                            room: room.clone(),
1369                            transaction_id: dep.own_transaction_id,
1370                        },
1371                        applies_to: dep.parent_transaction_id,
1372                    },
1373                }),
1374
1375                DependentQueuedRequestKind::UploadFileWithThumbnail { .. } => {
1376                    // Don't reflect these: only the associated event is interesting to observers.
1377                    None
1378                }
1379
1380                DependentQueuedRequestKind::FinishUpload {
1381                    local_echo,
1382                    file_upload,
1383                    thumbnail_info,
1384                } => {
1385                    // Materialize as an event local echo.
1386                    Some(LocalEcho {
1387                        transaction_id: dep.own_transaction_id.clone().into(),
1388                        content: LocalEchoContent::Event {
1389                            serialized_event: SerializableEventContent::new(&local_echo.into())
1390                                .ok()?,
1391                            send_handle: SendHandle {
1392                                room: room.clone(),
1393                                transaction_id: dep.own_transaction_id.into(),
1394                                media_handles: Some(MediaHandles {
1395                                    upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1396                                    upload_file_txn: file_upload,
1397                                }),
1398                                created_at: dep.created_at,
1399                            },
1400                            send_error: None,
1401                        },
1402                    })
1403                }
1404            });
1405
1406        Ok(local_requests.chain(reactions_and_medias).collect())
1407    }
1408
1409    /// Try to apply a single dependent request, whether it's local or remote.
1410    ///
1411    /// This swallows errors that would retrigger every time if we retried
1412    /// applying the dependent request: invalid edit content, etc.
1413    ///
1414    /// Returns true if the dependent request has been sent (or should not be
1415    /// retried later).
1416    #[instrument(skip_all)]
1417    async fn try_apply_single_dependent_request(
1418        &self,
1419        client: &Client,
1420        dependent_request: DependentQueuedRequest,
1421        new_updates: &mut Vec<RoomSendQueueUpdate>,
1422    ) -> Result<bool, RoomSendQueueError> {
1423        let store = client.store();
1424
1425        let parent_key = dependent_request.parent_key;
1426
1427        match dependent_request.kind {
1428            DependentQueuedRequestKind::EditEvent { new_content } => {
1429                if let Some(parent_key) = parent_key {
1430                    let Some(event_id) = parent_key.into_event_id() else {
1431                        return Err(RoomSendQueueError::StorageError(
1432                            RoomSendQueueStorageError::InvalidParentKey,
1433                        ));
1434                    };
1435
1436                    // The parent event has been sent, so send an edit event.
1437                    let room = client
1438                        .get_room(&self.room_id)
1439                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1440
1441                    // Check the event is one we know how to edit with an edit event.
1442
1443                    // It must be deserializable…
1444                    let edited_content = match new_content.deserialize() {
1445                        Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
1446                            // Assume no relationships.
1447                            EditedContent::RoomMessage(c.into())
1448                        }
1449
1450                        Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
1451                            let poll_start = c.poll_start().clone();
1452                            EditedContent::PollStart {
1453                                fallback_text: poll_start.question.text.clone(),
1454                                new_content: poll_start,
1455                            }
1456                        }
1457
1458                        Ok(c) => {
1459                            warn!("Unsupported edit content type: {:?}", c.event_type());
1460                            return Ok(true);
1461                        }
1462
1463                        Err(err) => {
1464                            warn!("Unable to deserialize: {err}");
1465                            return Ok(true);
1466                        }
1467                    };
1468
1469                    let edit_event = match room.make_edit_event(&event_id, edited_content).await {
1470                        Ok(e) => e,
1471                        Err(err) => {
1472                            warn!("couldn't create edited event: {err}");
1473                            return Ok(true);
1474                        }
1475                    };
1476
1477                    // Queue the edit event in the send queue 🧠.
1478                    let serializable = SerializableEventContent::from_raw(
1479                        Raw::new(&edit_event)
1480                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1481                        edit_event.event_type().to_string(),
1482                    );
1483
1484                    store
1485                        .save_send_queue_request(
1486                            &self.room_id,
1487                            dependent_request.own_transaction_id.into(),
1488                            dependent_request.created_at,
1489                            serializable.into(),
1490                            Self::HIGH_PRIORITY,
1491                        )
1492                        .await
1493                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1494                } else {
1495                    // The parent event is still local; update the local echo.
1496                    let edited = store
1497                        .update_send_queue_request(
1498                            &self.room_id,
1499                            &dependent_request.parent_transaction_id,
1500                            new_content.into(),
1501                        )
1502                        .await
1503                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1504
1505                    if !edited {
1506                        warn!("missing local echo upon dependent edit");
1507                    }
1508                }
1509            }
1510
1511            DependentQueuedRequestKind::RedactEvent => {
1512                if let Some(parent_key) = parent_key {
1513                    let Some(event_id) = parent_key.into_event_id() else {
1514                        return Err(RoomSendQueueError::StorageError(
1515                            RoomSendQueueStorageError::InvalidParentKey,
1516                        ));
1517                    };
1518
1519                    // The parent event has been sent; send a redaction.
1520                    let room = client
1521                        .get_room(&self.room_id)
1522                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1523
1524                    // Ideally we'd use the send queue to send the redaction, but the protocol has
1525                    // changed the shape of a room.redaction after v11, so keep it simple and try
1526                    // once here.
1527
1528                    // Note: no reason is provided because we materialize the intent of "cancel
1529                    // sending the parent event".
1530
1531                    if let Err(err) = room
1532                        .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
1533                        .await
1534                    {
1535                        warn!("error when sending a redact for {event_id}: {err}");
1536                        return Ok(false);
1537                    }
1538                } else {
1539                    // The parent event is still local (sending must have failed); redact the local
1540                    // echo.
1541                    let removed = store
1542                        .remove_send_queue_request(
1543                            &self.room_id,
1544                            &dependent_request.parent_transaction_id,
1545                        )
1546                        .await
1547                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1548
1549                    if !removed {
1550                        warn!("missing local echo upon dependent redact");
1551                    }
1552                }
1553            }
1554
1555            DependentQueuedRequestKind::ReactEvent { key } => {
1556                if let Some(parent_key) = parent_key {
1557                    let Some(parent_event_id) = parent_key.into_event_id() else {
1558                        return Err(RoomSendQueueError::StorageError(
1559                            RoomSendQueueStorageError::InvalidParentKey,
1560                        ));
1561                    };
1562
1563                    // Queue the reaction event in the send queue 🧠.
1564                    let react_event =
1565                        ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
1566                    let serializable = SerializableEventContent::from_raw(
1567                        Raw::new(&react_event)
1568                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1569                        react_event.event_type().to_string(),
1570                    );
1571
1572                    store
1573                        .save_send_queue_request(
1574                            &self.room_id,
1575                            dependent_request.own_transaction_id.into(),
1576                            dependent_request.created_at,
1577                            serializable.into(),
1578                            Self::HIGH_PRIORITY,
1579                        )
1580                        .await
1581                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1582                } else {
1583                    // Not applied yet, we should retry later => false.
1584                    return Ok(false);
1585                }
1586            }
1587
1588            DependentQueuedRequestKind::UploadFileWithThumbnail {
1589                content_type,
1590                cache_key,
1591                related_to,
1592            } => {
1593                let Some(parent_key) = parent_key else {
1594                    // Not finished yet, we should retry later => false.
1595                    return Ok(false);
1596                };
1597                self.handle_dependent_file_upload_with_thumbnail(
1598                    client,
1599                    dependent_request.own_transaction_id.into(),
1600                    parent_key,
1601                    content_type,
1602                    cache_key,
1603                    related_to,
1604                )
1605                .await?;
1606            }
1607
1608            DependentQueuedRequestKind::FinishUpload {
1609                local_echo,
1610                file_upload,
1611                thumbnail_info,
1612            } => {
1613                let Some(parent_key) = parent_key else {
1614                    // Not finished yet, we should retry later => false.
1615                    return Ok(false);
1616                };
1617                self.handle_dependent_finish_upload(
1618                    client,
1619                    dependent_request.own_transaction_id.into(),
1620                    parent_key,
1621                    local_echo,
1622                    file_upload,
1623                    thumbnail_info,
1624                    new_updates,
1625                )
1626                .await?;
1627            }
1628        }
1629
1630        Ok(true)
1631    }
1632
1633    #[instrument(skip(self))]
1634    async fn apply_dependent_requests(
1635        &self,
1636        new_updates: &mut Vec<RoomSendQueueUpdate>,
1637    ) -> Result<(), RoomSendQueueError> {
1638        let guard = self.store.lock().await;
1639
1640        let client = guard.client()?;
1641        let store = client.store();
1642
1643        let dependent_requests = store
1644            .load_dependent_queued_requests(&self.room_id)
1645            .await
1646            .map_err(RoomSendQueueStorageError::StateStoreError)?;
1647
1648        let num_initial_dependent_requests = dependent_requests.len();
1649        if num_initial_dependent_requests == 0 {
1650            // Returning early here avoids a bit of useless logging.
1651            return Ok(());
1652        }
1653
1654        let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
1655
1656        // Get rid of the all non-canonical dependent events.
1657        for original in &dependent_requests {
1658            if !canonicalized_dependent_requests
1659                .iter()
1660                .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
1661            {
1662                store
1663                    .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
1664                    .await
1665                    .map_err(RoomSendQueueStorageError::StateStoreError)?;
1666            }
1667        }
1668
1669        let mut num_dependent_requests = canonicalized_dependent_requests.len();
1670
1671        debug!(
1672            num_dependent_requests,
1673            num_initial_dependent_requests, "starting handling of dependent requests"
1674        );
1675
1676        for dependent in canonicalized_dependent_requests {
1677            let dependent_id = dependent.own_transaction_id.clone();
1678
1679            match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
1680                Ok(should_remove) => {
1681                    if should_remove {
1682                        // The dependent request has been successfully applied, forget about it.
1683                        store
1684                            .remove_dependent_queued_request(&self.room_id, &dependent_id)
1685                            .await
1686                            .map_err(RoomSendQueueStorageError::StateStoreError)?;
1687
1688                        num_dependent_requests -= 1;
1689                    }
1690                }
1691
1692                Err(err) => {
1693                    warn!("error when applying single dependent request: {err}");
1694                }
1695            }
1696        }
1697
1698        debug!(
1699            leftover_dependent_requests = num_dependent_requests,
1700            "stopped handling dependent request"
1701        );
1702
1703        Ok(())
1704    }
1705
1706    /// Remove a single dependent request from storage.
1707    async fn remove_dependent_send_queue_request(
1708        &self,
1709        dependent_event_id: &ChildTransactionId,
1710    ) -> Result<bool, RoomSendQueueStorageError> {
1711        Ok(self
1712            .store
1713            .lock()
1714            .await
1715            .client()?
1716            .store()
1717            .remove_dependent_queued_request(&self.room_id, dependent_event_id)
1718            .await?)
1719    }
1720}
1721
1722/// The content of a local echo.
1723#[derive(Clone, Debug)]
1724pub enum LocalEchoContent {
1725    /// The local echo contains an actual event ready to display.
1726    Event {
1727        /// Content of the event itself (along with its type) that we are about
1728        /// to send.
1729        serialized_event: SerializableEventContent,
1730        /// A handle to manipulate the sending of the associated event.
1731        send_handle: SendHandle,
1732        /// Whether trying to send this local echo failed in the past with an
1733        /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
1734        send_error: Option<QueueWedgeError>,
1735    },
1736
1737    /// A local echo has been reacted to.
1738    React {
1739        /// The key with which the local echo has been reacted to.
1740        key: String,
1741        /// A handle to manipulate the sending of the reaction.
1742        send_handle: SendReactionHandle,
1743        /// The local echo which has been reacted to.
1744        applies_to: OwnedTransactionId,
1745    },
1746}
1747
1748/// A local representation for a request that hasn't been sent yet to the user's
1749/// homeserver.
1750#[derive(Clone, Debug)]
1751pub struct LocalEcho {
1752    /// Transaction id used to identify the associated request.
1753    pub transaction_id: OwnedTransactionId,
1754    /// The content for the local echo.
1755    pub content: LocalEchoContent,
1756}
1757
1758/// An update to a room send queue, observable with
1759/// [`RoomSendQueue::subscribe`].
1760#[derive(Clone, Debug)]
1761pub enum RoomSendQueueUpdate {
1762    /// A new local event is being sent.
1763    ///
1764    /// There's been a user query to create this event. It is being sent to the
1765    /// server.
1766    NewLocalEvent(LocalEcho),
1767
1768    /// A local event that hadn't been sent to the server yet has been cancelled
1769    /// before sending.
1770    CancelledLocalEvent {
1771        /// Transaction id used to identify this event.
1772        transaction_id: OwnedTransactionId,
1773    },
1774
1775    /// A local event's content has been replaced with something else.
1776    ReplacedLocalEvent {
1777        /// Transaction id used to identify this event.
1778        transaction_id: OwnedTransactionId,
1779
1780        /// The new content replacing the previous one.
1781        new_content: SerializableEventContent,
1782    },
1783
1784    /// An error happened when an event was being sent.
1785    ///
1786    /// The event has not been removed from the queue. All the send queues
1787    /// will be disabled after this happens, and must be manually re-enabled.
1788    SendError {
1789        /// Transaction id used to identify this event.
1790        transaction_id: OwnedTransactionId,
1791        /// Error received while sending the event.
1792        error: Arc<crate::Error>,
1793        /// Whether the error is considered recoverable or not.
1794        ///
1795        /// An error that's recoverable will disable the room's send queue,
1796        /// while an unrecoverable error will be parked, until the user
1797        /// decides to cancel sending it.
1798        is_recoverable: bool,
1799    },
1800
1801    /// The event has been unwedged and sending is now being retried.
1802    RetryEvent {
1803        /// Transaction id used to identify this event.
1804        transaction_id: OwnedTransactionId,
1805    },
1806
1807    /// The event has been sent to the server, and the query returned
1808    /// successfully.
1809    SentEvent {
1810        /// Transaction id used to identify this event.
1811        transaction_id: OwnedTransactionId,
1812        /// Received event id from the send response.
1813        event_id: OwnedEventId,
1814    },
1815
1816    /// A media has been successfully uploaded.
1817    UploadedMedia {
1818        /// The media event this uploaded media relates to.
1819        related_to: OwnedTransactionId,
1820
1821        /// The final media source for the file that was just uploaded.
1822        file: MediaSource,
1823    },
1824}
1825
1826/// An error triggered by the send queue module.
1827#[derive(Debug, thiserror::Error)]
1828pub enum RoomSendQueueError {
1829    /// The room isn't in the joined state.
1830    #[error("the room isn't in the joined state")]
1831    RoomNotJoined,
1832
1833    /// The room is missing from the client.
1834    ///
1835    /// This happens only whenever the client is shutting down.
1836    #[error("the room is now missing from the client")]
1837    RoomDisappeared,
1838
1839    /// Error coming from storage.
1840    #[error(transparent)]
1841    StorageError(#[from] RoomSendQueueStorageError),
1842}
1843
1844/// An error triggered by the send queue storage.
1845#[derive(Debug, thiserror::Error)]
1846pub enum RoomSendQueueStorageError {
1847    /// Error caused by the state store.
1848    #[error(transparent)]
1849    StateStoreError(#[from] StoreError),
1850
1851    /// Error caused by the event cache store.
1852    #[error(transparent)]
1853    EventCacheStoreError(#[from] EventCacheStoreError),
1854
1855    /// Error caused when attempting to get a handle on the event cache store.
1856    #[error(transparent)]
1857    LockError(#[from] LockStoreError),
1858
1859    /// Error caused when (de)serializing into/from json.
1860    #[error(transparent)]
1861    JsonSerialization(#[from] serde_json::Error),
1862
1863    /// A parent key was expected to be of a certain type, and it was another
1864    /// type instead.
1865    #[error("a dependent event had an invalid parent key type")]
1866    InvalidParentKey,
1867
1868    /// The client is shutting down.
1869    #[error("The client is shutting down.")]
1870    ClientShuttingDown,
1871
1872    /// An operation not implemented on a send handle.
1873    #[error("This operation is not implemented for media uploads")]
1874    OperationNotImplementedYet,
1875
1876    /// Trying to edit a media caption for something that's not a media.
1877    #[error("Can't edit a media caption when the underlying event isn't a media")]
1878    InvalidMediaCaptionEdit,
1879}
1880
1881/// Extra transaction IDs useful during an upload.
1882#[derive(Clone, Debug)]
1883struct MediaHandles {
1884    /// Transaction id used when uploading the thumbnail.
1885    ///
1886    /// Optional because a media can be uploaded without a thumbnail.
1887    upload_thumbnail_txn: Option<OwnedTransactionId>,
1888
1889    /// Transaction id used when uploading the media itself.
1890    upload_file_txn: OwnedTransactionId,
1891}
1892
1893/// A handle to manipulate an event that was scheduled to be sent to a room.
1894#[derive(Clone, Debug)]
1895pub struct SendHandle {
1896    /// Link to the send queue used to send this request.
1897    room: RoomSendQueue,
1898
1899    /// Transaction id used for the sent request.
1900    ///
1901    /// If this is a media upload, this is the "main" transaction id, i.e. the
1902    /// one used to send the event, and that will be seen by observers.
1903    transaction_id: OwnedTransactionId,
1904
1905    /// Additional handles for a media upload.
1906    media_handles: Option<MediaHandles>,
1907
1908    /// The time at which the event to be sent has been created.
1909    pub created_at: MilliSecondsSinceUnixEpoch,
1910}
1911
1912impl SendHandle {
1913    fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
1914        if self.media_handles.is_some() {
1915            Err(RoomSendQueueStorageError::OperationNotImplementedYet)
1916        } else {
1917            Ok(())
1918        }
1919    }
1920
1921    /// Aborts the sending of the event, if it wasn't sent yet.
1922    ///
1923    /// Returns true if the sending could be aborted, false if not (i.e. the
1924    /// event had already been sent).
1925    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
1926    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
1927        trace!("received an abort request");
1928
1929        let queue = &self.room.inner.queue;
1930
1931        if let Some(handles) = &self.media_handles {
1932            if queue.abort_upload(&self.transaction_id, handles).await? {
1933                // Propagate a cancelled update.
1934                let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
1935                    transaction_id: self.transaction_id.clone(),
1936                });
1937
1938                return Ok(true);
1939            }
1940
1941            // If it failed, it means the sending of the event is not a
1942            // dependent request anymore. Fall back to the regular
1943            // code path below, that handles aborting sending of an event.
1944        }
1945
1946        if queue.cancel_event(&self.transaction_id).await? {
1947            trace!("successful abort");
1948
1949            // Propagate a cancelled update too.
1950            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
1951                transaction_id: self.transaction_id.clone(),
1952            });
1953
1954            Ok(true)
1955        } else {
1956            debug!("local echo didn't exist anymore, can't abort");
1957            Ok(false)
1958        }
1959    }
1960
1961    /// Edits the content of a local echo with a raw event content.
1962    ///
1963    /// Returns true if the event to be sent was replaced, false if not (i.e.
1964    /// the event had already been sent).
1965    #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
1966    pub async fn edit_raw(
1967        &self,
1968        new_content: Raw<AnyMessageLikeEventContent>,
1969        event_type: String,
1970    ) -> Result<bool, RoomSendQueueStorageError> {
1971        trace!("received an edit request");
1972        self.nyi_for_uploads()?;
1973
1974        let serializable = SerializableEventContent::from_raw(new_content, event_type);
1975
1976        if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
1977            trace!("successful edit");
1978
1979            // Wake up the queue, in case the room was asleep before the edit.
1980            self.room.inner.notifier.notify_one();
1981
1982            // Propagate a replaced update too.
1983            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
1984                transaction_id: self.transaction_id.clone(),
1985                new_content: serializable,
1986            });
1987
1988            Ok(true)
1989        } else {
1990            debug!("local echo doesn't exist anymore, can't edit");
1991            Ok(false)
1992        }
1993    }
1994
1995    /// Edits the content of a local echo with an event content.
1996    ///
1997    /// Returns true if the event to be sent was replaced, false if not (i.e.
1998    /// the event had already been sent).
1999    pub async fn edit(
2000        &self,
2001        new_content: AnyMessageLikeEventContent,
2002    ) -> Result<bool, RoomSendQueueStorageError> {
2003        self.edit_raw(
2004            Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2005            new_content.event_type().to_string(),
2006        )
2007        .await
2008    }
2009
2010    /// Edits the content of a local echo with a media caption.
2011    ///
2012    /// Will fail if the event to be sent, represented by this send handle,
2013    /// wasn't a media.
2014    pub async fn edit_media_caption(
2015        &self,
2016        caption: Option<String>,
2017        formatted_caption: Option<FormattedBody>,
2018        mentions: Option<Mentions>,
2019    ) -> Result<bool, RoomSendQueueStorageError> {
2020        if let Some(new_content) = self
2021            .room
2022            .inner
2023            .queue
2024            .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2025            .await?
2026        {
2027            trace!("successful edit of media caption");
2028
2029            // Wake up the queue, in case the room was asleep before the edit.
2030            self.room.inner.notifier.notify_one();
2031
2032            let new_content = SerializableEventContent::new(&new_content)
2033                .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2034
2035            // Propagate a replaced update too.
2036            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
2037                transaction_id: self.transaction_id.clone(),
2038                new_content,
2039            });
2040
2041            Ok(true)
2042        } else {
2043            debug!("local echo doesn't exist anymore, can't edit media caption");
2044            Ok(false)
2045        }
2046    }
2047
2048    /// Unwedge a local echo identified by its transaction identifier and try to
2049    /// resend it.
2050    pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2051        let room = &self.room.inner;
2052        room.queue
2053            .mark_as_unwedged(&self.transaction_id)
2054            .await
2055            .map_err(RoomSendQueueError::StorageError)?;
2056
2057        // If we have media handles, also try to unwedge them.
2058        //
2059        // It's fine to always do it to *all* the transaction IDs at once, because only
2060        // one of the three requests will be active at the same time, i.e. only
2061        // one entry will be updated in the store. The other two are either
2062        // done, or dependent requests.
2063
2064        if let Some(handles) = &self.media_handles {
2065            room.queue
2066                .mark_as_unwedged(&handles.upload_file_txn)
2067                .await
2068                .map_err(RoomSendQueueError::StorageError)?;
2069
2070            if let Some(txn) = &handles.upload_thumbnail_txn {
2071                room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2072            }
2073        }
2074
2075        // Wake up the queue, in case the room was asleep before unwedging the request.
2076        room.notifier.notify_one();
2077
2078        let _ = room
2079            .updates
2080            .send(RoomSendQueueUpdate::RetryEvent { transaction_id: self.transaction_id.clone() });
2081
2082        Ok(())
2083    }
2084
2085    /// Send a reaction to the event as soon as it's sent.
2086    ///
2087    /// If returning `Ok(None)`; this means the reaction couldn't be sent
2088    /// because the event is already a remote one.
2089    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2090    pub async fn react(
2091        &self,
2092        key: String,
2093    ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2094        trace!("received an intent to react");
2095
2096        let created_at = MilliSecondsSinceUnixEpoch::now();
2097        if let Some(reaction_txn_id) =
2098            self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2099        {
2100            trace!("successfully queued react");
2101
2102            // Wake up the queue, in case the room was asleep before the sending.
2103            self.room.inner.notifier.notify_one();
2104
2105            // Propagate a new local event.
2106            let send_handle = SendReactionHandle {
2107                room: self.room.clone(),
2108                transaction_id: reaction_txn_id.clone(),
2109            };
2110
2111            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2112                // Note: we do want to use the txn_id we're going to use for the reaction, not the
2113                // one for the event we're reacting to.
2114                transaction_id: reaction_txn_id.into(),
2115                content: LocalEchoContent::React {
2116                    key,
2117                    send_handle: send_handle.clone(),
2118                    applies_to: self.transaction_id.clone(),
2119                },
2120            }));
2121
2122            Ok(Some(send_handle))
2123        } else {
2124            debug!("local echo doesn't exist anymore, can't react");
2125            Ok(None)
2126        }
2127    }
2128}
2129
2130/// A handle to execute actions on the sending of a reaction.
2131#[derive(Clone, Debug)]
2132pub struct SendReactionHandle {
2133    /// Reference to the send queue for the room where this reaction was sent.
2134    room: RoomSendQueue,
2135    /// The own transaction id for the reaction.
2136    transaction_id: ChildTransactionId,
2137}
2138
2139impl SendReactionHandle {
2140    /// Abort the sending of the reaction.
2141    ///
2142    /// Will return true if the reaction could be aborted, false if it's been
2143    /// sent (and there's no matching local echo anymore).
2144    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2145        if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2146            // Simple case: the reaction was found in the dependent event list.
2147
2148            // Propagate a cancelled update too.
2149            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2150                transaction_id: self.transaction_id.clone().into(),
2151            });
2152
2153            return Ok(true);
2154        }
2155
2156        // The reaction has already been queued for sending, try to abort it using a
2157        // regular abort.
2158        let handle = SendHandle {
2159            room: self.room.clone(),
2160            transaction_id: self.transaction_id.clone().into(),
2161            media_handles: None,
2162            created_at: MilliSecondsSinceUnixEpoch::now(),
2163        };
2164
2165        handle.abort().await
2166    }
2167
2168    /// The transaction id that will be used to send this reaction later.
2169    pub fn transaction_id(&self) -> &TransactionId {
2170        &self.transaction_id
2171    }
2172}
2173
2174/// From a given source of [`DependentQueuedRequest`], return only the most
2175/// meaningful, i.e. the ones that wouldn't be overridden after applying the
2176/// others.
2177fn canonicalize_dependent_requests(
2178    dependent: &[DependentQueuedRequest],
2179) -> Vec<DependentQueuedRequest> {
2180    let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2181
2182    for d in dependent {
2183        let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2184
2185        if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2186            // The parent event has already been flagged for redaction, don't consider the
2187            // other dependent events.
2188            continue;
2189        }
2190
2191        match &d.kind {
2192            DependentQueuedRequestKind::EditEvent { .. } => {
2193                // Replace any previous edit with this one.
2194                if let Some(prev_edit) = prevs
2195                    .iter_mut()
2196                    .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2197                {
2198                    *prev_edit = d;
2199                } else {
2200                    prevs.insert(0, d);
2201                }
2202            }
2203
2204            DependentQueuedRequestKind::UploadFileWithThumbnail { .. }
2205            | DependentQueuedRequestKind::FinishUpload { .. }
2206            | DependentQueuedRequestKind::ReactEvent { .. } => {
2207                // These requests can't be canonicalized, push them as is.
2208                prevs.push(d);
2209            }
2210
2211            DependentQueuedRequestKind::RedactEvent => {
2212                // Remove every other dependent action.
2213                prevs.clear();
2214                prevs.push(d);
2215            }
2216        }
2217    }
2218
2219    by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2220}
2221
2222#[cfg(all(test, not(target_arch = "wasm32")))]
2223mod tests {
2224    use std::{sync::Arc, time::Duration};
2225
2226    use assert_matches2::{assert_let, assert_matches};
2227    use matrix_sdk_base::store::{
2228        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2229        SerializableEventContent,
2230    };
2231    use matrix_sdk_test::{async_test, JoinedRoomBuilder, SyncResponseBuilder};
2232    use ruma::{
2233        events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
2234        room_id, MilliSecondsSinceUnixEpoch, TransactionId,
2235    };
2236
2237    use super::canonicalize_dependent_requests;
2238    use crate::{client::WeakClient, test_utils::logged_in_client};
2239
2240    #[test]
2241    fn test_canonicalize_dependent_events_created_at() {
2242        // Test to ensure the created_at field is being serialized and retrieved
2243        // correctly.
2244        let txn = TransactionId::new();
2245        let created_at = MilliSecondsSinceUnixEpoch::now();
2246
2247        let edit = DependentQueuedRequest {
2248            own_transaction_id: ChildTransactionId::new(),
2249            parent_transaction_id: txn.clone(),
2250            kind: DependentQueuedRequestKind::EditEvent {
2251                new_content: SerializableEventContent::new(
2252                    &RoomMessageEventContent::text_plain("edit").into(),
2253                )
2254                .unwrap(),
2255            },
2256            parent_key: None,
2257            created_at,
2258        };
2259
2260        let res = canonicalize_dependent_requests(&[edit]);
2261
2262        assert_eq!(res.len(), 1);
2263        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2264        assert_let!(
2265            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2266        );
2267        assert_eq!(msg.body(), "edit");
2268        assert_eq!(res[0].parent_transaction_id, txn);
2269        assert_eq!(res[0].created_at, created_at);
2270    }
2271
2272    #[async_test]
2273    async fn test_client_no_cycle_with_send_queue() {
2274        for enabled in [true, false] {
2275            let client = logged_in_client(None).await;
2276            let weak_client = WeakClient::from_client(&client);
2277
2278            {
2279                let mut sync_response_builder = SyncResponseBuilder::new();
2280
2281                let room_id = room_id!("!a:b.c");
2282
2283                // Make sure the client knows about the room.
2284                client
2285                    .base_client()
2286                    .receive_sync_response(
2287                        sync_response_builder
2288                            .add_joined_room(JoinedRoomBuilder::new(room_id))
2289                            .build_sync_response(),
2290                    )
2291                    .await
2292                    .unwrap();
2293
2294                let room = client.get_room(room_id).unwrap();
2295                let q = room.send_queue();
2296
2297                let _watcher = q.subscribe().await;
2298
2299                client.send_queue().set_enabled(enabled).await;
2300            }
2301
2302            drop(client);
2303
2304            // Give a bit of time for background tasks to die.
2305            tokio::time::sleep(Duration::from_millis(500)).await;
2306
2307            // The weak client must be the last reference to the client now.
2308            let client = weak_client.get();
2309            assert!(
2310                client.is_none(),
2311                "too many strong references to the client: {}",
2312                Arc::strong_count(&client.unwrap().inner)
2313            );
2314        }
2315    }
2316
2317    #[test]
2318    fn test_canonicalize_dependent_events_smoke_test() {
2319        // Smoke test: canonicalizing a single dependent event returns it.
2320        let txn = TransactionId::new();
2321
2322        let edit = DependentQueuedRequest {
2323            own_transaction_id: ChildTransactionId::new(),
2324            parent_transaction_id: txn.clone(),
2325            kind: DependentQueuedRequestKind::EditEvent {
2326                new_content: SerializableEventContent::new(
2327                    &RoomMessageEventContent::text_plain("edit").into(),
2328                )
2329                .unwrap(),
2330            },
2331            parent_key: None,
2332            created_at: MilliSecondsSinceUnixEpoch::now(),
2333        };
2334        let res = canonicalize_dependent_requests(&[edit]);
2335
2336        assert_eq!(res.len(), 1);
2337        assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2338        assert_eq!(res[0].parent_transaction_id, txn);
2339        assert!(res[0].parent_key.is_none());
2340    }
2341
2342    #[test]
2343    fn test_canonicalize_dependent_events_redaction_preferred() {
2344        // A redaction is preferred over any other kind of dependent event.
2345        let txn = TransactionId::new();
2346
2347        let mut inputs = Vec::with_capacity(100);
2348        let redact = DependentQueuedRequest {
2349            own_transaction_id: ChildTransactionId::new(),
2350            parent_transaction_id: txn.clone(),
2351            kind: DependentQueuedRequestKind::RedactEvent,
2352            parent_key: None,
2353            created_at: MilliSecondsSinceUnixEpoch::now(),
2354        };
2355
2356        let edit = DependentQueuedRequest {
2357            own_transaction_id: ChildTransactionId::new(),
2358            parent_transaction_id: txn.clone(),
2359            kind: DependentQueuedRequestKind::EditEvent {
2360                new_content: SerializableEventContent::new(
2361                    &RoomMessageEventContent::text_plain("edit").into(),
2362                )
2363                .unwrap(),
2364            },
2365            parent_key: None,
2366            created_at: MilliSecondsSinceUnixEpoch::now(),
2367        };
2368
2369        inputs.push({
2370            let mut edit = edit.clone();
2371            edit.own_transaction_id = ChildTransactionId::new();
2372            edit
2373        });
2374
2375        inputs.push(redact);
2376
2377        for _ in 0..98 {
2378            let mut edit = edit.clone();
2379            edit.own_transaction_id = ChildTransactionId::new();
2380            inputs.push(edit);
2381        }
2382
2383        let res = canonicalize_dependent_requests(&inputs);
2384
2385        assert_eq!(res.len(), 1);
2386        assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
2387        assert_eq!(res[0].parent_transaction_id, txn);
2388    }
2389
2390    #[test]
2391    fn test_canonicalize_dependent_events_last_edit_preferred() {
2392        let parent_txn = TransactionId::new();
2393
2394        // The latest edit of a list is always preferred.
2395        let inputs = (0..10)
2396            .map(|i| DependentQueuedRequest {
2397                own_transaction_id: ChildTransactionId::new(),
2398                parent_transaction_id: parent_txn.clone(),
2399                kind: DependentQueuedRequestKind::EditEvent {
2400                    new_content: SerializableEventContent::new(
2401                        &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
2402                    )
2403                    .unwrap(),
2404                },
2405                parent_key: None,
2406                created_at: MilliSecondsSinceUnixEpoch::now(),
2407            })
2408            .collect::<Vec<_>>();
2409
2410        let txn = inputs[9].parent_transaction_id.clone();
2411
2412        let res = canonicalize_dependent_requests(&inputs);
2413
2414        assert_eq!(res.len(), 1);
2415        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2416        assert_let!(
2417            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2418        );
2419        assert_eq!(msg.body(), "edit9");
2420        assert_eq!(res[0].parent_transaction_id, txn);
2421    }
2422
2423    #[test]
2424    fn test_canonicalize_multiple_local_echoes() {
2425        let txn1 = TransactionId::new();
2426        let txn2 = TransactionId::new();
2427
2428        let child1 = ChildTransactionId::new();
2429        let child2 = ChildTransactionId::new();
2430
2431        let inputs = vec![
2432            // This one pertains to txn1.
2433            DependentQueuedRequest {
2434                own_transaction_id: child1.clone(),
2435                kind: DependentQueuedRequestKind::RedactEvent,
2436                parent_transaction_id: txn1.clone(),
2437                parent_key: None,
2438                created_at: MilliSecondsSinceUnixEpoch::now(),
2439            },
2440            // This one pertains to txn2.
2441            DependentQueuedRequest {
2442                own_transaction_id: child2,
2443                kind: DependentQueuedRequestKind::EditEvent {
2444                    new_content: SerializableEventContent::new(
2445                        &RoomMessageEventContent::text_plain("edit").into(),
2446                    )
2447                    .unwrap(),
2448                },
2449                parent_transaction_id: txn2.clone(),
2450                parent_key: None,
2451                created_at: MilliSecondsSinceUnixEpoch::now(),
2452            },
2453        ];
2454
2455        let res = canonicalize_dependent_requests(&inputs);
2456
2457        // The canonicalization shouldn't depend per event id.
2458        assert_eq!(res.len(), 2);
2459
2460        for dependent in res {
2461            if dependent.own_transaction_id == child1 {
2462                assert_eq!(dependent.parent_transaction_id, txn1);
2463                assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
2464            } else {
2465                assert_eq!(dependent.parent_transaction_id, txn2);
2466                assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
2467            }
2468        }
2469    }
2470
2471    #[test]
2472    fn test_canonicalize_reactions_after_edits() {
2473        // Sending reactions should happen after edits to a given event.
2474        let txn = TransactionId::new();
2475
2476        let react_id = ChildTransactionId::new();
2477        let react = DependentQueuedRequest {
2478            own_transaction_id: react_id.clone(),
2479            kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
2480            parent_transaction_id: txn.clone(),
2481            parent_key: None,
2482            created_at: MilliSecondsSinceUnixEpoch::now(),
2483        };
2484
2485        let edit_id = ChildTransactionId::new();
2486        let edit = DependentQueuedRequest {
2487            own_transaction_id: edit_id.clone(),
2488            kind: DependentQueuedRequestKind::EditEvent {
2489                new_content: SerializableEventContent::new(
2490                    &RoomMessageEventContent::text_plain("edit").into(),
2491                )
2492                .unwrap(),
2493            },
2494            parent_transaction_id: txn,
2495            parent_key: None,
2496            created_at: MilliSecondsSinceUnixEpoch::now(),
2497        };
2498
2499        let res = canonicalize_dependent_requests(&[react, edit]);
2500
2501        assert_eq!(res.len(), 2);
2502        assert_eq!(res[0].own_transaction_id, edit_id);
2503        assert_eq!(res[1].own_transaction_id, react_id);
2504    }
2505}