matrix_sdk/send_queue/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A send queue facility to serializing queuing and sending of messages.
16//!
17//! # [`Room`] send queue
18//!
19//! Each room gets its own [`RoomSendQueue`], that's available by calling
20//! [`Room::send_queue()`]. The first time this method is called, it will spawn
21//! a background task that's used to actually send events, in the order they
22//! were passed from calls to [`RoomSendQueue::send()`].
23//!
24//! This queue tries to simplify error management around sending events, using
25//! [`RoomSendQueue::send`] or [`RoomSendQueue::send_raw`]: by default, it will retry to send the
26//! same event a few times, before automatically disabling itself, and emitting
27//! a notification that can be listened to with the global send queue (see
28//! paragraph below) or using [`RoomSendQueue::subscribe()`].
29//!
30//! It is possible to control whether a single room is enabled using
31//! [`RoomSendQueue::set_enabled()`].
32//!
33//! # Global [`SendQueue`] object
34//!
35//! The [`Client::send_queue()`] method returns an API object allowing to
36//! control all the room send queues:
37//!
38//! - enable/disable them all at once with [`SendQueue::set_enabled()`].
39//! - get notifications about send errors with [`SendQueue::subscribe_errors`].
40//! - reload all unsent events that had been persisted in storage using
41//!   [`SendQueue::respawn_tasks_for_rooms_with_unsent_requests()`]. It is
42//!   recommended to call this method during initialization of a client,
43//!   otherwise persisted unsent events will only be re-sent after the send
44//!   queue for the given room has been reopened for the first time.
45//!
46//! # Send handle
47//!
48//! Just after queuing a request to send something, a [`SendHandle`] is
49//! returned, allowing manipulating the inflight request.
50//!
51//! For a send handle for an event, it's possible to edit the event / abort
52//! sending it. If it was still in the queue (i.e. not sent yet, or not being
53//! sent), then such an action would happen locally (i.e. in the database).
54//! Otherwise, it is "too late": the background task may be sending
55//! the event already, or has sent it; in that case, the edit/aborting must
56//! happen as an actual event materializing this, on the server. To accomplish
57//! this, the send queue may send such an event, using the dependency system
58//! described below.
59//!
60//! # Dependency system
61//!
62//! The send queue includes a simple dependency system, where a
63//! [`QueuedRequest`] can have zero or more dependents in the form of
64//! [`DependentQueuedRequest`]. A dependent queued request can have at most one
65//! depended-upon (parent) queued request.
66//!
67//! This allows implementing deferred edits/redacts, as hinted to in the
68//! previous section.
69//!
70//! ## Media upload
71//!
72//! This dependency system also allows uploading medias, since the media's
73//! *content* must be uploaded before we send the media *event* that describes
74//! it.
75//!
76//! In the simplest case, that is, a media file and its event must be sent (i.e.
77//! no thumbnails):
78//!
79//! - The file's content is immediately cached in the
80//!   [`matrix_sdk_base::event_cache::store::EventCacheStore`], using an MXC ID
81//!   that is temporary and designates a local URI without any possible doubt.
82//! - An initial media event is created and uses this temporary MXC ID, and
83//!   propagated as a local echo for an event.
84//! - A [`QueuedRequest`] is pushed to upload the file's media
85//!   ([`QueuedRequestKind::MediaUpload`]).
86//! - A [`DependentQueuedRequest`] is pushed to finish the upload
87//!   ([`DependentQueuedRequestKind::FinishUpload`]).
88//!
89//! What is expected to happen, if all goes well, is the following:
90//!
91//! - the media is uploaded to the media homeserver, which returns the final MXC
92//!   ID.
93//! - when marking the upload request as sent, the MXC ID is injected (as a
94//!   [`matrix_sdk_base::store::SentRequestKey`]) into the dependent request
95//!   [`DependentQueuedRequestKind::FinishUpload`] created in the last step
96//!   above.
97//! - next time the send queue handles dependent queries, it'll see this one is
98//!   ready to be sent, and it will transform it into an event queued request
99//!   ([`QueuedRequestKind::Event`]), with the event created in the local echo
100//!   before, updated with the MXC ID returned from the server.
101//! - this updated local echo is also propagated as an edit of the local echo to
102//!   observers, who get the final version with the final MXC IDs at this point
103//!   too.
104//! - then the event is sent normally, as any event sent with the send queue.
105//!
106//! When there is a thumbnail, things behave similarly, with some tweaks:
107//!
108//! - the thumbnail's content is also stored into the cache store immediately,
109//! - the thumbnail is sent first as an [`QueuedRequestKind::MediaUpload`]
110//!   request,
111//! - the file upload is pushed as a dependent request of kind
112//!   [`DependentQueuedRequestKind::UploadFileOrThumbnail`] (this variant keeps
113//!   the file's key used to look it up in the cache store).
114//! - the media event is then sent as a dependent request as described in the
115//!   previous section.
116//!
117//! What's expected to happen is thus the following:
118//!
119//! - After the thumbnail has been uploaded, the dependent query will retrieve
120//!   the final MXC ID returned by the homeserver for the thumbnail, and store
121//!   it into the [`QueuedRequestKind::MediaUpload`]'s `thumbnail_source` field,
122//!   allowing to remember the thumbnail MXC ID when it's time to finish the
123//!   upload later.
124//! - The dependent request is morphed into another
125//!   [`QueuedRequestKind::MediaUpload`], for the file itself.
126//!
127//! The rest of the process is then similar to that of uploading a file without
128//! a thumbnail. The only difference is that there's a thumbnail source (MXC ID)
129//! remembered and fixed up into the media event, just before sending it.
130
131use std::{
132    collections::{BTreeMap, HashMap},
133    str::FromStr as _,
134    sync::{
135        Arc, RwLock,
136        atomic::{AtomicBool, Ordering},
137    },
138};
139
140use eyeball::SharedObservable;
141#[cfg(feature = "e2e-encryption")]
142use matrix_sdk_base::crypto::{OlmError, SessionRecipientCollectionError};
143#[cfg(feature = "unstable-msc4274")]
144use matrix_sdk_base::store::FinishGalleryItemInfo;
145use matrix_sdk_base::{
146    RoomState, StoreError,
147    cross_process_lock::CrossProcessLockError,
148    deserialized_responses::TimelineEvent,
149    event_cache::store::EventCacheStoreError,
150    media::{MediaRequestParameters, store::MediaStoreError},
151    store::{
152        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
153        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
154        SentMediaInfo, SentRequestKey, SerializableEventContent,
155    },
156};
157use matrix_sdk_common::{
158    executor::{JoinHandle, spawn},
159    locks::Mutex as SyncMutex,
160};
161use mime::Mime;
162use ruma::{
163    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
164    TransactionId,
165    events::{
166        AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _,
167        reaction::ReactionEventContent,
168        relation::Annotation,
169        room::{
170            MediaSource,
171            message::{FormattedBody, RoomMessageEventContent},
172        },
173    },
174    serde::Raw,
175};
176use tokio::sync::{Mutex, Notify, OwnedMutexGuard, broadcast, oneshot};
177use tracing::{debug, error, info, instrument, trace, warn};
178
179use crate::{
180    Client, Media, Room, TransmissionProgress,
181    client::WeakClient,
182    config::RequestConfig,
183    error::RetryKind,
184    room::{WeakRoom, edit::EditedContent},
185};
186
187mod progress;
188mod upload;
189
190pub use progress::AbstractProgress;
191
192/// A client-wide send queue, for all the rooms known by a client.
193pub struct SendQueue {
194    client: Client,
195}
196
197#[cfg(not(tarpaulin_include))]
198impl std::fmt::Debug for SendQueue {
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        f.debug_struct("SendQueue").finish_non_exhaustive()
201    }
202}
203
204impl SendQueue {
205    pub(super) fn new(client: Client) -> Self {
206        Self { client }
207    }
208
209    /// Reload all the rooms which had unsent requests, and respawn tasks for
210    /// those rooms.
211    pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
212        if !self.is_enabled() {
213            return;
214        }
215
216        let room_ids =
217            self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
218                |err| {
219                    warn!("error when loading rooms with unsent requests: {err}");
220                    Vec::new()
221                },
222            );
223
224        // Getting the [`RoomSendQueue`] is sufficient to spawn the task if needs be.
225        for room_id in room_ids {
226            if let Some(room) = self.client.get_room(&room_id) {
227                let _ = self.for_room(room);
228            }
229        }
230    }
231
232    /// Tiny helper to get the send queue's global context from the [`Client`].
233    #[inline(always)]
234    fn data(&self) -> &SendQueueData {
235        &self.client.inner.send_queue_data
236    }
237
238    /// Get or create a new send queue for a given room, and insert it into our
239    /// memoized rooms mapping.
240    pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue {
241        let data = self.data();
242
243        let mut map = data.rooms.write().unwrap();
244
245        let room_id = room.room_id();
246        if let Some(room_q) = map.get(room_id).cloned() {
247            return room_q;
248        }
249
250        let owned_room_id = room_id.to_owned();
251        let room_q = RoomSendQueue::new(
252            self.is_enabled(),
253            data.global_update_sender.clone(),
254            data.error_sender.clone(),
255            data.is_dropping.clone(),
256            &self.client,
257            owned_room_id.clone(),
258            data.report_media_upload_progress.clone(),
259        );
260
261        map.insert(owned_room_id, room_q.clone());
262
263        room_q
264    }
265
266    /// Enable or disable the send queue for the entire client, i.e. all rooms.
267    ///
268    /// If we're disabling the queue, and requests were being sent, they're not
269    /// aborted, and will continue until a status resolves (error responses
270    /// will keep the events in the buffer of events to send later). The
271    /// disablement will happen before the next request is sent.
272    ///
273    /// This may wake up background tasks and resume sending of requests in the
274    /// background.
275    pub async fn set_enabled(&self, enabled: bool) {
276        debug!(?enabled, "setting global send queue enablement");
277
278        self.data().globally_enabled.store(enabled, Ordering::SeqCst);
279
280        // Wake up individual rooms we already know about.
281        for room in self.data().rooms.read().unwrap().values() {
282            room.set_enabled(enabled);
283        }
284
285        // Reload some extra rooms that might not have been awaken yet, but could have
286        // requests from previous sessions.
287        self.respawn_tasks_for_rooms_with_unsent_requests().await;
288    }
289
290    /// Returns whether the send queue is enabled, at a client-wide
291    /// granularity.
292    pub fn is_enabled(&self) -> bool {
293        self.data().globally_enabled.load(Ordering::SeqCst)
294    }
295
296    /// Enable or disable progress reporting for media uploads.
297    pub fn enable_upload_progress(&self, enabled: bool) {
298        self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst);
299    }
300
301    /// Subscribe to all updates for all rooms.
302    ///
303    /// Use [`RoomSendQueue::subscribe`] to subscribe to update for a _specific
304    /// room_.
305    pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
306        self.data().global_update_sender.subscribe()
307    }
308
309    /// Get local echoes from all room send queues.
310    pub async fn local_echoes(
311        &self,
312    ) -> Result<BTreeMap<OwnedRoomId, Vec<LocalEcho>>, RoomSendQueueError> {
313        let room_ids =
314            self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
315                |err| {
316                    warn!("error when loading rooms with unsent requests: {err}");
317                    Vec::new()
318                },
319            );
320
321        let mut local_echoes: BTreeMap<OwnedRoomId, Vec<LocalEcho>> = BTreeMap::new();
322
323        for room_id in room_ids {
324            if let Some(room) = self.client.get_room(&room_id) {
325                let queue = self.for_room(room);
326                local_echoes
327                    .insert(room_id.to_owned(), queue.inner.queue.local_echoes(&queue).await?);
328            }
329        }
330
331        Ok(local_echoes)
332    }
333
334    /// A subscriber to the enablement status (enabled or disabled) of the
335    /// send queue, along with useful errors.
336    pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
337        self.data().error_sender.subscribe()
338    }
339}
340
341/// Metadata about a thumbnail needed when pushing media uploads to the send
342/// queue.
343#[derive(Clone, Debug)]
344struct QueueThumbnailInfo {
345    /// Metadata about the thumbnail needed when finishing a media upload.
346    finish_upload_thumbnail_info: FinishUploadThumbnailInfo,
347
348    /// The parameters for the request to retrieve the thumbnail data.
349    media_request_parameters: MediaRequestParameters,
350
351    /// The thumbnail's mime type.
352    content_type: Mime,
353
354    /// The thumbnail's file size in bytes.
355    file_size: usize,
356}
357
358/// A specific room's send queue ran into an error, and it has disabled itself.
359#[derive(Clone, Debug)]
360pub struct SendQueueRoomError {
361    /// For which room is the send queue failing?
362    pub room_id: OwnedRoomId,
363
364    /// The error the room has ran into, when trying to send a request.
365    pub error: Arc<crate::Error>,
366
367    /// Whether the error is considered recoverable or not.
368    ///
369    /// An error that's recoverable will disable the room's send queue, while an
370    /// unrecoverable error will be parked, until the user decides to do
371    /// something about it.
372    pub is_recoverable: bool,
373}
374
375impl Client {
376    /// Returns a [`SendQueue`] that handles sending, retrying and not
377    /// forgetting about requests that are to be sent.
378    pub fn send_queue(&self) -> SendQueue {
379        SendQueue::new(self.clone())
380    }
381}
382
383pub(super) struct SendQueueData {
384    /// Mapping of room to their unique send queue.
385    rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
386
387    /// Is the whole mechanism enabled or disabled?
388    ///
389    /// This is only kept in memory to initialize new room queues with an
390    /// initial enablement state.
391    globally_enabled: AtomicBool,
392
393    /// Global sender to send [`SendQueueUpdate`].
394    ///
395    /// See [`SendQueue::subscribe`].
396    global_update_sender: broadcast::Sender<SendQueueUpdate>,
397
398    /// Global error updates for the send queue.
399    error_sender: broadcast::Sender<SendQueueRoomError>,
400
401    /// Are we currently dropping the Client?
402    is_dropping: Arc<AtomicBool>,
403
404    /// Will media upload progress be reported via send queue updates?
405    report_media_upload_progress: Arc<AtomicBool>,
406}
407
408impl SendQueueData {
409    /// Create the data for a send queue, in the given enabled state.
410    pub fn new(globally_enabled: bool) -> Self {
411        let (global_update_sender, _) = broadcast::channel(32);
412        let (error_sender, _) = broadcast::channel(32);
413
414        Self {
415            rooms: Default::default(),
416            globally_enabled: AtomicBool::new(globally_enabled),
417            global_update_sender,
418            error_sender,
419            is_dropping: Arc::new(false.into()),
420            report_media_upload_progress: Arc::new(false.into()),
421        }
422    }
423}
424
425impl Drop for SendQueueData {
426    fn drop(&mut self) {
427        // Mark the whole send queue as shutting down, then wake up all the room
428        // queues so they're stopped too.
429        debug!("globally dropping the send queue");
430        self.is_dropping.store(true, Ordering::SeqCst);
431
432        let rooms = self.rooms.read().unwrap();
433        for room in rooms.values() {
434            room.inner.notifier.notify_one();
435        }
436    }
437}
438
439impl Room {
440    /// Returns the [`RoomSendQueue`] for this specific room.
441    pub fn send_queue(&self) -> RoomSendQueue {
442        self.client.send_queue().for_room(self.clone())
443    }
444}
445
446/// A per-room send queue.
447///
448/// This is cheap to clone.
449#[derive(Clone)]
450pub struct RoomSendQueue {
451    inner: Arc<RoomSendQueueInner>,
452}
453
454#[cfg(not(tarpaulin_include))]
455impl std::fmt::Debug for RoomSendQueue {
456    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457        f.debug_struct("RoomSendQueue").finish_non_exhaustive()
458    }
459}
460
461impl RoomSendQueue {
462    fn new(
463        globally_enabled: bool,
464        global_update_sender: broadcast::Sender<SendQueueUpdate>,
465        global_error_sender: broadcast::Sender<SendQueueRoomError>,
466        is_dropping: Arc<AtomicBool>,
467        client: &Client,
468        room_id: OwnedRoomId,
469        report_media_upload_progress: Arc<AtomicBool>,
470    ) -> Self {
471        let (update_sender, _) = broadcast::channel(32);
472
473        let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
474        let notifier = Arc::new(Notify::new());
475
476        let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
477        let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
478
479        let task = spawn(Self::sending_task(
480            weak_room.clone(),
481            queue.clone(),
482            notifier.clone(),
483            global_update_sender.clone(),
484            update_sender.clone(),
485            locally_enabled.clone(),
486            global_error_sender,
487            is_dropping,
488            report_media_upload_progress,
489        ));
490
491        Self {
492            inner: Arc::new(RoomSendQueueInner {
493                room: weak_room,
494                global_update_sender,
495                update_sender,
496                _task: task,
497                queue,
498                notifier,
499                locally_enabled,
500            }),
501        }
502    }
503
504    /// Queues a raw event for sending it to this room.
505    ///
506    /// This immediately returns, and will push the event to be sent into a
507    /// queue, handled in the background.
508    ///
509    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
510    /// the [`Self::subscribe()`] method to get updates about the sending of
511    /// that event.
512    ///
513    /// By default, if sending failed on the first attempt, it will be retried a
514    /// few times. If sending failed after those retries, the entire
515    /// client's sending queue will be disabled, and it will need to be
516    /// manually re-enabled by the caller (e.g. after network is back, or when
517    /// something has been done about the faulty requests).
518    pub async fn send_raw(
519        &self,
520        content: Raw<AnyMessageLikeEventContent>,
521        event_type: String,
522    ) -> Result<SendHandle, RoomSendQueueError> {
523        let Some(room) = self.inner.room.get() else {
524            return Err(RoomSendQueueError::RoomDisappeared);
525        };
526        if room.state() != RoomState::Joined {
527            return Err(RoomSendQueueError::RoomNotJoined);
528        }
529
530        let content = SerializableEventContent::from_raw(content, event_type);
531
532        let created_at = MilliSecondsSinceUnixEpoch::now();
533        let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
534        trace!(%transaction_id, "manager sends a raw event to the background task");
535
536        self.inner.notifier.notify_one();
537
538        let send_handle = SendHandle {
539            room: self.clone(),
540            transaction_id: transaction_id.clone(),
541            media_handles: vec![],
542            created_at,
543        };
544
545        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
546            transaction_id,
547            content: LocalEchoContent::Event {
548                serialized_event: content,
549                send_handle: send_handle.clone(),
550                send_error: None,
551            },
552        }));
553
554        Ok(send_handle)
555    }
556
557    /// Queues an event for sending it to this room.
558    ///
559    /// This immediately returns, and will push the event to be sent into a
560    /// queue, handled in the background.
561    ///
562    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
563    /// the [`Self::subscribe()`] method to get updates about the sending of
564    /// that event.
565    ///
566    /// By default, if sending failed on the first attempt, it will be retried a
567    /// few times. If sending failed after those retries, the entire
568    /// client's sending queue will be disabled, and it will need to be
569    /// manually re-enabled by the caller (e.g. after network is back, or when
570    /// something has been done about the faulty requests).
571    pub async fn send(
572        &self,
573        content: AnyMessageLikeEventContent,
574    ) -> Result<SendHandle, RoomSendQueueError> {
575        self.send_raw(
576            Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
577            content.event_type().to_string(),
578        )
579        .await
580    }
581
582    /// Returns the current local requests as well as a receiver to listen to
583    /// the send queue updates, as defined in [`RoomSendQueueUpdate`].
584    ///
585    /// Use [`SendQueue::subscribe`] to subscribe to update for _all rooms_ with
586    /// a single receiver.
587    pub async fn subscribe(
588        &self,
589    ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
590    {
591        let local_echoes = self.inner.queue.local_echoes(self).await?;
592
593        Ok((local_echoes, self.inner.update_sender.subscribe()))
594    }
595
596    /// A task that must be spawned in the async runtime, running in the
597    /// background for each room that has a send queue.
598    ///
599    /// It only progresses forward: nothing can be cancelled at any point, which
600    /// makes the implementation not overly complicated to follow.
601    #[allow(clippy::too_many_arguments)]
602    #[instrument(skip_all, fields(room_id = %room.room_id()))]
603    async fn sending_task(
604        room: WeakRoom,
605        queue: QueueStorage,
606        notifier: Arc<Notify>,
607        global_update_sender: broadcast::Sender<SendQueueUpdate>,
608        update_sender: broadcast::Sender<RoomSendQueueUpdate>,
609        locally_enabled: Arc<AtomicBool>,
610        global_error_sender: broadcast::Sender<SendQueueRoomError>,
611        is_dropping: Arc<AtomicBool>,
612        report_media_upload_progress: Arc<AtomicBool>,
613    ) {
614        trace!("spawned the sending task");
615
616        let room_id = room.room_id();
617
618        loop {
619            // A request to shut down should be preferred above everything else.
620            if is_dropping.load(Ordering::SeqCst) {
621                trace!("shutting down!");
622                break;
623            }
624
625            // Try to apply dependent requests now; those applying to previously failed
626            // attempts (local echoes) would succeed now.
627            let mut new_updates = Vec::new();
628            if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
629                warn!("errors when applying dependent requests: {err}");
630            }
631
632            for up in new_updates {
633                send_update(&global_update_sender, &update_sender, room_id, up);
634            }
635
636            if !locally_enabled.load(Ordering::SeqCst) {
637                trace!("not enabled, sleeping");
638                // Wait for an explicit wakeup.
639                notifier.notified().await;
640                continue;
641            }
642
643            let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
644                Ok(Some(request)) => request,
645
646                Ok(None) => {
647                    trace!("queue is empty, sleeping");
648                    // Wait for an explicit wakeup.
649                    notifier.notified().await;
650                    continue;
651                }
652
653                Err(err) => {
654                    warn!("error when loading next request to send: {err}");
655                    continue;
656                }
657            };
658
659            let txn_id = queued_request.transaction_id.clone();
660            trace!(txn_id = %txn_id, "received a request to send!");
661
662            let Some(room) = room.get() else {
663                if is_dropping.load(Ordering::SeqCst) {
664                    break;
665                }
666                error!("the weak room couldn't be upgraded but we're not shutting down?");
667                continue;
668            };
669
670            // If this is a media/gallery upload, prepare the following:
671            // - transaction id for the related media event request,
672            // - progress metadata to feed the final media upload progress
673            // - an observable to watch the media upload progress.
674            let (related_txn_id, media_upload_progress_info, http_progress) =
675                if let QueuedRequestKind::MediaUpload {
676                    cache_key,
677                    thumbnail_source,
678                    #[cfg(feature = "unstable-msc4274")]
679                    accumulated,
680                    related_to,
681                    ..
682                } = &queued_request.kind
683                {
684                    // Prepare to watch and communicate the request's progress for media uploads, if
685                    // it has been requested.
686                    let (media_upload_progress_info, http_progress) =
687                        if report_media_upload_progress.load(Ordering::SeqCst) {
688                            let media_upload_progress_info =
689                                RoomSendQueue::create_media_upload_progress_info(
690                                    &queued_request.transaction_id,
691                                    related_to,
692                                    cache_key,
693                                    thumbnail_source.as_ref(),
694                                    #[cfg(feature = "unstable-msc4274")]
695                                    accumulated,
696                                    &room,
697                                    &queue,
698                                )
699                                .await;
700
701                            let progress = RoomSendQueue::create_media_upload_progress_observable(
702                                &media_upload_progress_info,
703                                related_to,
704                                &update_sender,
705                            );
706
707                            (Some(media_upload_progress_info), Some(progress))
708                        } else {
709                            Default::default()
710                        };
711
712                    (Some(related_to.clone()), media_upload_progress_info, http_progress)
713                } else {
714                    Default::default()
715                };
716
717            match Self::handle_request(&room, queued_request, cancel_upload_rx, http_progress).await
718            {
719                Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
720                {
721                    Ok(()) => match parent_key {
722                        SentRequestKey::Event { event_id, event, event_type } => {
723                            send_update(
724                                &global_update_sender,
725                                &update_sender,
726                                room_id,
727                                RoomSendQueueUpdate::SentEvent {
728                                    transaction_id: txn_id,
729                                    event_id: event_id.clone(),
730                                },
731                            );
732
733                            // The event has been sent to the server and the server has received it.
734                            // Yepee! Now, we usually wait on the server to give us back the event
735                            // via the sync.
736                            //
737                            // Problem: sometimes the network lags, can be down, or the server may
738                            // be slow; well, anything can happen.
739                            //
740                            // It results in a weird situation where the user sees its event being
741                            // sent, then disappears before it's received again from the server.
742                            //
743                            // To avoid this situation, we eagerly save the event in the Event
744                            // Cache. It's similar to what would happen if the event was echoed back
745                            // from the server via the sync, but we avoid any network issues. The
746                            // Event Cache is smart enough to deduplicate events based on the event
747                            // ID, so it's safe to do that.
748                            //
749                            // If this little feature fails, it MUST NOT stop the Send Queue. Any
750                            // errors are logged, but the Send Queue will continue as if everything
751                            // happened successfully. This feature is not considered “crucial”.
752                            if let Ok((room_event_cache, _drop_handles)) = room.event_cache().await
753                            {
754                                let timeline_event = match event_type.as_str() {
755                                    #[cfg(feature = "e2e-encryption")]
756                                    "m.room.encrypted" => {
757                                        use ruma::events::{
758                                            OriginalSyncMessageLikeEvent,
759                                            room::encrypted::RoomEncryptedEventContent,
760                                        };
761
762                                        let push_context = room.push_context().await.ok().flatten();
763
764                                        // SAFETY: The event type is `m.room.encrypted`, so we hope
765                                        // it can
766                                        // be casted from an `AnyMessageLikeEventContent` to an
767                                        // `OriginalSyncMessageLikeEvent<RoomEncryptedEventContent>`. It is
768                                        // wrong if and only if the event type doesn't match the
769                                        // event content.
770                                        let event: Raw<AnyMessageLikeEventContent> = event;
771                                        let event: Raw<
772                                            OriginalSyncMessageLikeEvent<RoomEncryptedEventContent>,
773                                        > = event.cast_unchecked();
774
775                                        match room
776                                            .decrypt_event(&event, push_context.as_ref())
777                                            .await
778                                        {
779                                            Ok(timeline_event) => Some(timeline_event),
780                                            Err(err) => {
781                                                error!(
782                                                    ?err,
783                                                    "Failed to decrypt the event before the saving in the Event Cache"
784                                                );
785                                                None
786                                            }
787                                        }
788                                    }
789
790                                    event_type => {
791                                        match Raw::from_json_string(
792                                            // Create a compact string: remove all useless spaces.
793                                            format!(
794                                                "{{\
795                                                    \"event_id\":\"{event_id}\",\
796                                                    \"origin_server_ts\":{ts},\
797                                                    \"sender\":\"{sender}\",\
798                                                    \"type\":\"{type}\",\
799                                                    \"content\":{content}\
800                                                }}",
801                                                event_id = event_id,
802                                                ts = MilliSecondsSinceUnixEpoch::now().get(),
803                                                sender = room.client().user_id().expect("Client must be logged-in"),
804                                                type = event_type,
805                                                content = event.into_json(),
806                                            ),
807                                        ) {
808                                            Ok(event) => Some(TimelineEvent::from_plaintext(event)),
809                                            Err(err) => {
810                                                error!(
811                                                    ?err,
812                                                    "Failed to build the (sync) event before the saving in the Event Cache"
813                                                );
814                                                None
815                                            }
816                                        }
817                                    }
818                                };
819
820                                // In case of an error, just log the error but not stop the Send
821                                // Queue. This feature is not
822                                // crucial.
823                                if let Some(timeline_event) = timeline_event
824                                    && let Err(err) = room_event_cache
825                                        .insert_sent_event_from_send_queue(timeline_event)
826                                        .await
827                                {
828                                    error!(
829                                        ?err,
830                                        "Failed to save the sent event in the Event Cache"
831                                    );
832                                }
833                            } else {
834                                info!(
835                                    "Cannot insert the sent event in the Event Cache because \
836                                    either the room no longer exists, or the Room Event Cache cannot be retrieved"
837                                );
838                            }
839                        }
840
841                        SentRequestKey::Media(sent_media_info) => {
842                            // Generate some final progress information, even if incremental
843                            // progress wasn't requested.
844                            let index =
845                                media_upload_progress_info.as_ref().map_or(0, |info| info.index);
846                            let progress = media_upload_progress_info
847                                .as_ref()
848                                .map(|info| {
849                                    AbstractProgress { current: info.bytes, total: info.bytes }
850                                        + info.offsets
851                                })
852                                .unwrap_or(AbstractProgress { current: 1, total: 1 });
853
854                            // Purposefully don't use `send_update` here, because we don't want to
855                            // notify the global listeners about an upload progress update.
856                            let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
857                                related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
858                                file: Some(sent_media_info.file),
859                                index,
860                                progress,
861                            });
862                        }
863                    },
864
865                    Err(err) => {
866                        warn!("unable to mark queued request as sent: {err}");
867                    }
868                },
869
870                Ok(None) => {
871                    debug!("Request has been aborted while running, continuing.");
872                }
873
874                Err(err) => {
875                    let is_recoverable = match err {
876                        crate::Error::Http(ref http_err) => {
877                            // All transient errors are recoverable.
878                            matches!(
879                                http_err.retry_kind(),
880                                RetryKind::Transient { .. } | RetryKind::NetworkFailure
881                            )
882                        }
883
884                        // `ConcurrentRequestFailed` typically happens because of an HTTP failure;
885                        // since we don't get the underlying error, be lax and consider it
886                        // recoverable, and let observers decide to retry it or not. At some point
887                        // we'll get the actual underlying error.
888                        crate::Error::ConcurrentRequestFailed => true,
889
890                        // As of 2024-06-27, all other error types are considered unrecoverable.
891                        _ => false,
892                    };
893
894                    // Disable the queue for this room after any kind of error happened.
895                    locally_enabled.store(false, Ordering::SeqCst);
896
897                    if is_recoverable {
898                        warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
899
900                        // In this case, we intentionally keep the request in the queue, but mark it
901                        // as not being sent anymore.
902                        queue.mark_as_not_being_sent(&txn_id).await;
903
904                        // Let observers know about a failure *after* we've
905                        // marked the item as not being sent anymore. Otherwise,
906                        // there's a possible race where a caller might try to
907                        // remove an item, while it's still marked as being
908                        // sent, resulting in a cancellation failure.
909                    } else {
910                        warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
911
912                        // Mark the request as wedged, so it's not picked at any future point.
913                        if let Err(storage_error) =
914                            queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
915                        {
916                            warn!("unable to mark request as wedged: {storage_error}");
917                        }
918                    }
919
920                    let error = Arc::new(err);
921
922                    let _ = global_error_sender.send(SendQueueRoomError {
923                        room_id: room_id.to_owned(),
924                        error: error.clone(),
925                        is_recoverable,
926                    });
927
928                    send_update(
929                        &global_update_sender,
930                        &update_sender,
931                        room_id,
932                        RoomSendQueueUpdate::SendError {
933                            transaction_id: related_txn_id.unwrap_or(txn_id),
934                            error,
935                            is_recoverable,
936                        },
937                    );
938                }
939            }
940        }
941
942        info!("exited sending task");
943    }
944
945    /// Handles a single request and returns the [`SentRequestKey`] on success
946    /// (unless the request was cancelled, in which case it'll return
947    /// `None`).
948    async fn handle_request(
949        room: &Room,
950        request: QueuedRequest,
951        cancel_upload_rx: Option<oneshot::Receiver<()>>,
952        progress: Option<SharedObservable<TransmissionProgress>>,
953    ) -> Result<Option<SentRequestKey>, crate::Error> {
954        match request.kind {
955            QueuedRequestKind::Event { content } => {
956                let (event, event_type) = content.into_raw();
957
958                let res = room
959                    .send_raw(&event_type, &event)
960                    .with_transaction_id(&request.transaction_id)
961                    .with_request_config(RequestConfig::short_retry())
962                    .await?;
963
964                trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
965
966                Ok(Some(SentRequestKey::Event { event_id: res.event_id, event, event_type }))
967            }
968
969            QueuedRequestKind::MediaUpload {
970                content_type,
971                cache_key,
972                thumbnail_source,
973                related_to: relates_to,
974                #[cfg(feature = "unstable-msc4274")]
975                accumulated,
976            } => {
977                trace!(%relates_to, "uploading media related to event");
978
979                let fut = async move {
980                    let data = room
981                        .client()
982                        .media_store()
983                        .lock()
984                        .await?
985                        .get_media_content(&cache_key)
986                        .await?
987                        .ok_or(crate::Error::SendQueueWedgeError(Box::new(
988                            QueueWedgeError::MissingMediaContent,
989                        )))?;
990
991                    let mime = Mime::from_str(&content_type).map_err(|_| {
992                        crate::Error::SendQueueWedgeError(Box::new(
993                            QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
994                        ))
995                    })?;
996
997                    #[cfg(feature = "e2e-encryption")]
998                    let media_source = if room.latest_encryption_state().await?.is_encrypted() {
999                        trace!("upload will be encrypted (encrypted room)");
1000
1001                        let mut cursor = std::io::Cursor::new(data);
1002                        let mut req = room
1003                            .client
1004                            .upload_encrypted_file(&mut cursor)
1005                            .with_request_config(RequestConfig::short_retry());
1006                        if let Some(progress) = progress {
1007                            req = req.with_send_progress_observable(progress);
1008                        }
1009                        let encrypted_file = req.await?;
1010
1011                        MediaSource::Encrypted(Box::new(encrypted_file))
1012                    } else {
1013                        trace!("upload will be in clear text (room without encryption)");
1014
1015                        let request_config = RequestConfig::short_retry()
1016                            .timeout(Media::reasonable_upload_timeout(&data));
1017                        let mut req =
1018                            room.client().media().upload(&mime, data, Some(request_config));
1019                        if let Some(progress) = progress {
1020                            req = req.with_send_progress_observable(progress);
1021                        }
1022                        let res = req.await?;
1023
1024                        MediaSource::Plain(res.content_uri)
1025                    };
1026
1027                    #[cfg(not(feature = "e2e-encryption"))]
1028                    let media_source = {
1029                        let request_config = RequestConfig::short_retry()
1030                            .timeout(Media::reasonable_upload_timeout(&data));
1031                        let mut req =
1032                            room.client().media().upload(&mime, data, Some(request_config));
1033                        if let Some(progress) = progress {
1034                            req = req.with_send_progress_observable(progress);
1035                        }
1036                        let res = req.await?;
1037                        MediaSource::Plain(res.content_uri)
1038                    };
1039
1040                    let uri = match &media_source {
1041                        MediaSource::Plain(uri) => uri,
1042                        MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
1043                    };
1044                    trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
1045
1046                    Ok(SentRequestKey::Media(SentMediaInfo {
1047                        file: media_source,
1048                        thumbnail: thumbnail_source,
1049                        #[cfg(feature = "unstable-msc4274")]
1050                        accumulated,
1051                    }))
1052                };
1053
1054                let wait_for_cancel = async move {
1055                    if let Some(rx) = cancel_upload_rx {
1056                        rx.await
1057                    } else {
1058                        std::future::pending().await
1059                    }
1060                };
1061
1062                tokio::select! {
1063                    biased;
1064
1065                    _ = wait_for_cancel => {
1066                        Ok(None)
1067                    }
1068
1069                    res = fut => {
1070                        res.map(Some)
1071                    }
1072                }
1073            }
1074        }
1075    }
1076
1077    /// Returns whether the room is enabled, at the room level.
1078    pub fn is_enabled(&self) -> bool {
1079        self.inner.locally_enabled.load(Ordering::SeqCst)
1080    }
1081
1082    /// Set the locally enabled flag for this room queue.
1083    pub fn set_enabled(&self, enabled: bool) {
1084        self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
1085
1086        // No need to wake a task to tell it it's been disabled, so only notify if we're
1087        // re-enabling the queue.
1088        if enabled {
1089            self.inner.notifier.notify_one();
1090        }
1091    }
1092
1093    /// Send an update on the room send queue channel, and on the global send
1094    /// queue channel, i.e. it sends a [`RoomSendQueueUpdate`] and a
1095    /// [`SendQueueUpdate`].
1096    fn send_update(&self, update: RoomSendQueueUpdate) {
1097        let _ = self.inner.update_sender.send(update.clone());
1098        let _ = self
1099            .inner
1100            .global_update_sender
1101            .send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
1102    }
1103}
1104
1105fn send_update(
1106    global_update_sender: &broadcast::Sender<SendQueueUpdate>,
1107    update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
1108    room_id: &RoomId,
1109    update: RoomSendQueueUpdate,
1110) {
1111    let _ = update_sender.send(update.clone());
1112    let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
1113}
1114
1115impl From<&crate::Error> for QueueWedgeError {
1116    fn from(value: &crate::Error) -> Self {
1117        match value {
1118            #[cfg(feature = "e2e-encryption")]
1119            crate::Error::OlmError(error) => match &**error {
1120                OlmError::SessionRecipientCollectionError(error) => match error {
1121                    SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
1122                        QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
1123                    }
1124
1125                    SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
1126                        QueueWedgeError::IdentityViolations { users: users.clone() }
1127                    }
1128
1129                    SessionRecipientCollectionError::CrossSigningNotSetup
1130                    | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
1131                        QueueWedgeError::CrossVerificationRequired
1132                    }
1133                },
1134                _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1135            },
1136
1137            // Flatten errors of `Self` type.
1138            crate::Error::SendQueueWedgeError(error) => *error.clone(),
1139
1140            _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1141        }
1142    }
1143}
1144
1145struct RoomSendQueueInner {
1146    /// The room which this send queue relates to.
1147    room: WeakRoom,
1148
1149    /// Global sender to send [`SendQueueUpdate`].
1150    ///
1151    /// See [`SendQueue::subscribe`].
1152    global_update_sender: broadcast::Sender<SendQueueUpdate>,
1153
1154    /// Broadcaster for notifications about the statuses of requests to be sent.
1155    ///
1156    /// Can be subscribed to from the outside.
1157    ///
1158    /// See [`RoomSendQueue::subscribe`].
1159    update_sender: broadcast::Sender<RoomSendQueueUpdate>,
1160
1161    /// Queue of requests that are either to be sent, or being sent.
1162    ///
1163    /// When a request has been sent to the server, it is removed from that
1164    /// queue *after* being sent. That way, we will retry sending upon
1165    /// failure, in the same order requests have been inserted in the first
1166    /// place.
1167    queue: QueueStorage,
1168
1169    /// A notifier that's updated any time common data is touched (stopped or
1170    /// enabled statuses), or the associated room [`QueueStorage`].
1171    notifier: Arc<Notify>,
1172
1173    /// Should the room process new requests or not (because e.g. it might be
1174    /// running off the network)?
1175    locally_enabled: Arc<AtomicBool>,
1176
1177    /// Handle to the actual sending task. Unused, but kept alive along this
1178    /// data structure.
1179    _task: JoinHandle<()>,
1180}
1181
1182/// Information about a request being sent right this moment.
1183struct BeingSentInfo {
1184    /// Transaction id of the thing being sent.
1185    transaction_id: OwnedTransactionId,
1186
1187    /// For an upload request, a trigger to cancel the upload before it
1188    /// completes.
1189    cancel_upload: Option<oneshot::Sender<()>>,
1190}
1191
1192impl BeingSentInfo {
1193    /// Aborts the upload, if a trigger is available.
1194    ///
1195    /// Consumes the object because the sender is a oneshot and will be consumed
1196    /// upon sending.
1197    fn cancel_upload(self) -> bool {
1198        if let Some(cancel_upload) = self.cancel_upload {
1199            let _ = cancel_upload.send(());
1200            true
1201        } else {
1202            false
1203        }
1204    }
1205}
1206
1207/// A specialized lock that guards both against the state store and the
1208/// [`Self::being_sent`] data.
1209#[derive(Clone)]
1210struct StoreLock {
1211    /// Reference to the client, to get access to the underlying store.
1212    client: WeakClient,
1213
1214    /// The one queued request that is being sent at the moment, along with
1215    /// associated data that can be useful to act upon it.
1216    ///
1217    /// Also used as the lock to access the state store.
1218    being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
1219}
1220
1221impl StoreLock {
1222    /// Gets a hold of the locked store and [`Self::being_sent`] pair.
1223    async fn lock(&self) -> StoreLockGuard {
1224        StoreLockGuard {
1225            client: self.client.clone(),
1226            being_sent: self.being_sent.clone().lock_owned().await,
1227        }
1228    }
1229}
1230
1231/// A lock guard obtained through locking with [`StoreLock`].
1232/// `being_sent` data.
1233struct StoreLockGuard {
1234    /// Reference to the client, to get access to the underlying store.
1235    client: WeakClient,
1236
1237    /// The one queued request that is being sent at the moment, along with
1238    /// associated data that can be useful to act upon it.
1239    being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
1240}
1241
1242impl StoreLockGuard {
1243    /// Get a client from the locked state, useful to get a handle on a store.
1244    fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
1245        self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
1246    }
1247}
1248
1249#[derive(Clone)]
1250struct QueueStorage {
1251    /// A lock to make sure the state store is only accessed once at a time, to
1252    /// make some store operations atomic.
1253    store: StoreLock,
1254
1255    /// To which room is this storage related.
1256    room_id: OwnedRoomId,
1257
1258    /// In-memory mapping of media transaction IDs to thumbnail sizes for the
1259    /// purpose of progress reporting.
1260    ///
1261    /// The keys are the transaction IDs for sending the media or gallery event
1262    /// after all uploads have finished. This allows us to easily clean up the
1263    /// cache after the event was sent.
1264    ///
1265    /// For media uploads, the value vector will always have a single element.
1266    ///
1267    /// For galleries, some gallery items might not have a thumbnail while
1268    /// others do. Since we access the thumbnails by their index within the
1269    /// gallery, the vector needs to hold optional usize's.
1270    thumbnail_file_sizes: Arc<SyncMutex<HashMap<OwnedTransactionId, Vec<Option<usize>>>>>,
1271}
1272
1273impl QueueStorage {
1274    /// Default priority for a queued request.
1275    const LOW_PRIORITY: usize = 0;
1276
1277    /// High priority for a queued request that must be handled before others.
1278    const HIGH_PRIORITY: usize = 10;
1279
1280    /// Create a new queue for queuing requests to be sent later.
1281    fn new(client: WeakClient, room: OwnedRoomId) -> Self {
1282        Self {
1283            room_id: room,
1284            store: StoreLock { client, being_sent: Default::default() },
1285            thumbnail_file_sizes: Default::default(),
1286        }
1287    }
1288
1289    /// Push a new event to be sent in the queue, with a default priority of 0.
1290    ///
1291    /// Returns the transaction id chosen to identify the request.
1292    async fn push(
1293        &self,
1294        request: QueuedRequestKind,
1295        created_at: MilliSecondsSinceUnixEpoch,
1296    ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
1297        let transaction_id = TransactionId::new();
1298
1299        self.store
1300            .lock()
1301            .await
1302            .client()?
1303            .state_store()
1304            .save_send_queue_request(
1305                &self.room_id,
1306                transaction_id.clone(),
1307                created_at,
1308                request,
1309                Self::LOW_PRIORITY,
1310            )
1311            .await?;
1312
1313        Ok(transaction_id)
1314    }
1315
1316    /// Peeks the next request to be sent, marking it as being sent.
1317    ///
1318    /// It is required to call [`Self::mark_as_sent`] after it's been
1319    /// effectively sent.
1320    async fn peek_next_to_send(
1321        &self,
1322    ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
1323    {
1324        let mut guard = self.store.lock().await;
1325        let queued_requests =
1326            guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
1327
1328        if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
1329            let (cancel_upload_tx, cancel_upload_rx) =
1330                if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1331                    let (tx, rx) = oneshot::channel();
1332                    (Some(tx), Some(rx))
1333                } else {
1334                    Default::default()
1335                };
1336
1337            let prev = guard.being_sent.replace(BeingSentInfo {
1338                transaction_id: request.transaction_id.clone(),
1339                cancel_upload: cancel_upload_tx,
1340            });
1341
1342            if let Some(prev) = prev {
1343                error!(
1344                    prev_txn = ?prev.transaction_id,
1345                    "a previous request was still active while picking a new one"
1346                );
1347            }
1348
1349            Ok(Some((request.clone(), cancel_upload_rx)))
1350        } else {
1351            Ok(None)
1352        }
1353    }
1354
1355    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1356    /// with the given transaction id as not being sent anymore, so it can
1357    /// be removed from the queue later.
1358    async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1359        let was_being_sent = self.store.lock().await.being_sent.take();
1360
1361        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1362        if prev_txn != Some(transaction_id) {
1363            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1364        }
1365    }
1366
1367    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1368    /// with the given transaction id as being wedged (and not being sent
1369    /// anymore), so it can be removed from the queue later.
1370    async fn mark_as_wedged(
1371        &self,
1372        transaction_id: &TransactionId,
1373        reason: QueueWedgeError,
1374    ) -> Result<(), RoomSendQueueStorageError> {
1375        // Keep the lock until we're done touching the storage.
1376        let mut guard = self.store.lock().await;
1377        let was_being_sent = guard.being_sent.take();
1378
1379        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1380        if prev_txn != Some(transaction_id) {
1381            error!(
1382                ?prev_txn,
1383                "previous active request didn't match that we expect (after permanent error)",
1384            );
1385        }
1386
1387        Ok(guard
1388            .client()?
1389            .state_store()
1390            .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1391            .await?)
1392    }
1393
1394    /// Marks a request identified with the given transaction id as being now
1395    /// unwedged and adds it back to the queue.
1396    async fn mark_as_unwedged(
1397        &self,
1398        transaction_id: &TransactionId,
1399    ) -> Result<(), RoomSendQueueStorageError> {
1400        Ok(self
1401            .store
1402            .lock()
1403            .await
1404            .client()?
1405            .state_store()
1406            .update_send_queue_request_status(&self.room_id, transaction_id, None)
1407            .await?)
1408    }
1409
1410    /// Marks a request pushed with [`Self::push`] and identified with the given
1411    /// transaction id as sent, by removing it from the local queue.
1412    async fn mark_as_sent(
1413        &self,
1414        transaction_id: &TransactionId,
1415        parent_key: SentRequestKey,
1416    ) -> Result<(), RoomSendQueueStorageError> {
1417        // Keep the lock until we're done touching the storage.
1418        let mut guard = self.store.lock().await;
1419        let was_being_sent = guard.being_sent.take();
1420
1421        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1422        if prev_txn != Some(transaction_id) {
1423            error!(
1424                ?prev_txn,
1425                "previous active request didn't match that we expect (after successful send)",
1426            );
1427        }
1428
1429        let client = guard.client()?;
1430        let store = client.state_store();
1431
1432        // Update all dependent requests.
1433        store
1434            .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1435            .await?;
1436
1437        let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1438
1439        if !removed {
1440            warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1441        }
1442
1443        self.thumbnail_file_sizes.lock().remove(transaction_id);
1444
1445        Ok(())
1446    }
1447
1448    /// Cancel a sending command for an event that has been sent with
1449    /// [`Self::push`] with the given transaction id.
1450    ///
1451    /// Returns whether the given transaction has been effectively removed. If
1452    /// false, this either means that the transaction id was unrelated to
1453    /// this queue, or that the request was sent before we cancelled it.
1454    async fn cancel_event(
1455        &self,
1456        transaction_id: &TransactionId,
1457    ) -> Result<bool, RoomSendQueueStorageError> {
1458        let guard = self.store.lock().await;
1459
1460        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1461            == Some(transaction_id)
1462        {
1463            // Save the intent to redact the event.
1464            guard
1465                .client()?
1466                .state_store()
1467                .save_dependent_queued_request(
1468                    &self.room_id,
1469                    transaction_id,
1470                    ChildTransactionId::new(),
1471                    MilliSecondsSinceUnixEpoch::now(),
1472                    DependentQueuedRequestKind::RedactEvent,
1473                )
1474                .await?;
1475
1476            return Ok(true);
1477        }
1478
1479        let removed = guard
1480            .client()?
1481            .state_store()
1482            .remove_send_queue_request(&self.room_id, transaction_id)
1483            .await?;
1484
1485        self.thumbnail_file_sizes.lock().remove(transaction_id);
1486
1487        Ok(removed)
1488    }
1489
1490    /// Replace an event that has been sent with [`Self::push`] with the given
1491    /// transaction id, before it's been actually sent.
1492    ///
1493    /// Returns whether the given transaction has been effectively edited. If
1494    /// false, this either means that the transaction id was unrelated to
1495    /// this queue, or that the request was sent before we edited it.
1496    async fn replace_event(
1497        &self,
1498        transaction_id: &TransactionId,
1499        serializable: SerializableEventContent,
1500    ) -> Result<bool, RoomSendQueueStorageError> {
1501        let guard = self.store.lock().await;
1502
1503        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1504            == Some(transaction_id)
1505        {
1506            // Save the intent to edit the associated event.
1507            guard
1508                .client()?
1509                .state_store()
1510                .save_dependent_queued_request(
1511                    &self.room_id,
1512                    transaction_id,
1513                    ChildTransactionId::new(),
1514                    MilliSecondsSinceUnixEpoch::now(),
1515                    DependentQueuedRequestKind::EditEvent { new_content: serializable },
1516                )
1517                .await?;
1518
1519            return Ok(true);
1520        }
1521
1522        let edited = guard
1523            .client()?
1524            .state_store()
1525            .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1526            .await?;
1527
1528        Ok(edited)
1529    }
1530
1531    /// Push requests (and dependents) to upload a media.
1532    ///
1533    /// See the module-level description for details of the whole processus.
1534    #[allow(clippy::too_many_arguments)]
1535    async fn push_media(
1536        &self,
1537        event: RoomMessageEventContent,
1538        content_type: Mime,
1539        send_event_txn: OwnedTransactionId,
1540        created_at: MilliSecondsSinceUnixEpoch,
1541        upload_file_txn: OwnedTransactionId,
1542        file_media_request: MediaRequestParameters,
1543        thumbnail: Option<QueueThumbnailInfo>,
1544    ) -> Result<(), RoomSendQueueStorageError> {
1545        let guard = self.store.lock().await;
1546        let client = guard.client()?;
1547        let store = client.state_store();
1548
1549        // There's only a single media to be sent, so it has at most one thumbnail.
1550        let thumbnail_file_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)];
1551
1552        let thumbnail_info = self
1553            .push_thumbnail_and_media_uploads(
1554                store,
1555                &content_type,
1556                send_event_txn.clone(),
1557                created_at,
1558                upload_file_txn.clone(),
1559                file_media_request,
1560                thumbnail,
1561            )
1562            .await?;
1563
1564        // Push the dependent request for the event itself.
1565        store
1566            .save_dependent_queued_request(
1567                &self.room_id,
1568                &upload_file_txn,
1569                send_event_txn.clone().into(),
1570                created_at,
1571                DependentQueuedRequestKind::FinishUpload {
1572                    local_echo: Box::new(event),
1573                    file_upload: upload_file_txn.clone(),
1574                    thumbnail_info,
1575                },
1576            )
1577            .await?;
1578
1579        self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1580
1581        Ok(())
1582    }
1583
1584    /// Push requests (and dependents) to upload a gallery.
1585    ///
1586    /// See the module-level description for details of the whole processus.
1587    #[cfg(feature = "unstable-msc4274")]
1588    #[allow(clippy::too_many_arguments)]
1589    async fn push_gallery(
1590        &self,
1591        event: RoomMessageEventContent,
1592        send_event_txn: OwnedTransactionId,
1593        created_at: MilliSecondsSinceUnixEpoch,
1594        item_queue_infos: Vec<GalleryItemQueueInfo>,
1595    ) -> Result<(), RoomSendQueueStorageError> {
1596        let guard = self.store.lock().await;
1597        let client = guard.client()?;
1598        let store = client.state_store();
1599
1600        let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1601        let mut thumbnail_file_sizes = Vec::with_capacity(item_queue_infos.len());
1602
1603        let Some((first, rest)) = item_queue_infos.split_first() else {
1604            return Ok(());
1605        };
1606
1607        let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1608            first;
1609
1610        let thumbnail_info = self
1611            .push_thumbnail_and_media_uploads(
1612                store,
1613                content_type,
1614                send_event_txn.clone(),
1615                created_at,
1616                upload_file_txn.clone(),
1617                file_media_request.clone(),
1618                thumbnail.clone(),
1619            )
1620            .await?;
1621
1622        finish_item_infos
1623            .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1624        thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1625
1626        let mut last_upload_file_txn = upload_file_txn.clone();
1627
1628        for item_queue_info in rest {
1629            let GalleryItemQueueInfo {
1630                content_type,
1631                upload_file_txn,
1632                file_media_request,
1633                thumbnail,
1634            } = item_queue_info;
1635
1636            let thumbnail_info = if let Some(QueueThumbnailInfo {
1637                finish_upload_thumbnail_info: thumbnail_info,
1638                media_request_parameters: thumbnail_media_request,
1639                content_type: thumbnail_content_type,
1640                ..
1641            }) = thumbnail
1642            {
1643                let upload_thumbnail_txn = thumbnail_info.txn.clone();
1644
1645                // Save the thumbnail upload request as a dependent request of the last file
1646                // upload.
1647                store
1648                    .save_dependent_queued_request(
1649                        &self.room_id,
1650                        &last_upload_file_txn,
1651                        upload_thumbnail_txn.clone().into(),
1652                        created_at,
1653                        DependentQueuedRequestKind::UploadFileOrThumbnail {
1654                            content_type: thumbnail_content_type.to_string(),
1655                            cache_key: thumbnail_media_request.clone(),
1656                            related_to: send_event_txn.clone(),
1657                            parent_is_thumbnail_upload: false,
1658                        },
1659                    )
1660                    .await?;
1661
1662                last_upload_file_txn = upload_thumbnail_txn;
1663
1664                Some(thumbnail_info)
1665            } else {
1666                None
1667            };
1668
1669            // Save the file upload as a dependent request of the previous upload.
1670            store
1671                .save_dependent_queued_request(
1672                    &self.room_id,
1673                    &last_upload_file_txn,
1674                    upload_file_txn.clone().into(),
1675                    created_at,
1676                    DependentQueuedRequestKind::UploadFileOrThumbnail {
1677                        content_type: content_type.to_string(),
1678                        cache_key: file_media_request.clone(),
1679                        related_to: send_event_txn.clone(),
1680                        parent_is_thumbnail_upload: thumbnail.is_some(),
1681                    },
1682                )
1683                .await?;
1684
1685            finish_item_infos.push(FinishGalleryItemInfo {
1686                file_upload: upload_file_txn.clone(),
1687                thumbnail_info: thumbnail_info.cloned(),
1688            });
1689            thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1690
1691            last_upload_file_txn = upload_file_txn.clone();
1692        }
1693
1694        // Push the request for the event itself as a dependent request of the last file
1695        // upload.
1696        store
1697            .save_dependent_queued_request(
1698                &self.room_id,
1699                &last_upload_file_txn,
1700                send_event_txn.clone().into(),
1701                created_at,
1702                DependentQueuedRequestKind::FinishGallery {
1703                    local_echo: Box::new(event),
1704                    item_infos: finish_item_infos,
1705                },
1706            )
1707            .await?;
1708
1709        self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1710
1711        Ok(())
1712    }
1713
1714    /// If a thumbnail exists, pushes a [`QueuedRequestKind::MediaUpload`] to
1715    /// upload it
1716    /// and a [`DependentQueuedRequestKind::UploadFileOrThumbnail`] to upload
1717    /// the media itself. Otherwise, pushes a
1718    /// [`QueuedRequestKind::MediaUpload`] to upload the media directly.
1719    #[allow(clippy::too_many_arguments)]
1720    async fn push_thumbnail_and_media_uploads(
1721        &self,
1722        store: &DynStateStore,
1723        content_type: &Mime,
1724        send_event_txn: OwnedTransactionId,
1725        created_at: MilliSecondsSinceUnixEpoch,
1726        upload_file_txn: OwnedTransactionId,
1727        file_media_request: MediaRequestParameters,
1728        thumbnail: Option<QueueThumbnailInfo>,
1729    ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1730        if let Some(QueueThumbnailInfo {
1731            finish_upload_thumbnail_info: thumbnail_info,
1732            media_request_parameters: thumbnail_media_request,
1733            content_type: thumbnail_content_type,
1734            ..
1735        }) = thumbnail
1736        {
1737            let upload_thumbnail_txn = thumbnail_info.txn.clone();
1738
1739            // Save the thumbnail upload request.
1740            store
1741                .save_send_queue_request(
1742                    &self.room_id,
1743                    upload_thumbnail_txn.clone(),
1744                    created_at,
1745                    QueuedRequestKind::MediaUpload {
1746                        content_type: thumbnail_content_type.to_string(),
1747                        cache_key: thumbnail_media_request,
1748                        thumbnail_source: None, // the thumbnail has no thumbnails :)
1749                        related_to: send_event_txn.clone(),
1750                        #[cfg(feature = "unstable-msc4274")]
1751                        accumulated: vec![],
1752                    },
1753                    Self::LOW_PRIORITY,
1754                )
1755                .await?;
1756
1757            // Save the file upload request as a dependent request of the thumbnail upload.
1758            store
1759                .save_dependent_queued_request(
1760                    &self.room_id,
1761                    &upload_thumbnail_txn,
1762                    upload_file_txn.into(),
1763                    created_at,
1764                    DependentQueuedRequestKind::UploadFileOrThumbnail {
1765                        content_type: content_type.to_string(),
1766                        cache_key: file_media_request,
1767                        related_to: send_event_txn,
1768                        parent_is_thumbnail_upload: true,
1769                    },
1770                )
1771                .await?;
1772
1773            Ok(Some(thumbnail_info))
1774        } else {
1775            // Save the file upload as its own request, not a dependent one.
1776            store
1777                .save_send_queue_request(
1778                    &self.room_id,
1779                    upload_file_txn,
1780                    created_at,
1781                    QueuedRequestKind::MediaUpload {
1782                        content_type: content_type.to_string(),
1783                        cache_key: file_media_request,
1784                        thumbnail_source: None,
1785                        related_to: send_event_txn,
1786                        #[cfg(feature = "unstable-msc4274")]
1787                        accumulated: vec![],
1788                    },
1789                    Self::LOW_PRIORITY,
1790                )
1791                .await?;
1792
1793            Ok(None)
1794        }
1795    }
1796
1797    /// Reacts to the given local echo of an event.
1798    #[instrument(skip(self))]
1799    async fn react(
1800        &self,
1801        transaction_id: &TransactionId,
1802        key: String,
1803        created_at: MilliSecondsSinceUnixEpoch,
1804    ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1805        let guard = self.store.lock().await;
1806        let client = guard.client()?;
1807        let store = client.state_store();
1808
1809        let requests = store.load_send_queue_requests(&self.room_id).await?;
1810
1811        // If the target event has been already sent, abort immediately.
1812        if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1813            // We didn't find it as a queued request; try to find it as a dependent queued
1814            // request.
1815            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1816            if !dependent_requests
1817                .into_iter()
1818                .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1819                .any(|child_txn| *child_txn == *transaction_id)
1820            {
1821                // We didn't find it as either a request or a dependent request, abort.
1822                return Ok(None);
1823            }
1824        }
1825
1826        // Record the dependent request.
1827        let reaction_txn_id = ChildTransactionId::new();
1828        store
1829            .save_dependent_queued_request(
1830                &self.room_id,
1831                transaction_id,
1832                reaction_txn_id.clone(),
1833                created_at,
1834                DependentQueuedRequestKind::ReactEvent { key },
1835            )
1836            .await?;
1837
1838        Ok(Some(reaction_txn_id))
1839    }
1840
1841    /// Returns a list of the local echoes, that is, all the requests that we're
1842    /// about to send but that haven't been sent yet (or are being sent).
1843    async fn local_echoes(
1844        &self,
1845        room: &RoomSendQueue,
1846    ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1847        let guard = self.store.lock().await;
1848        let client = guard.client()?;
1849        let store = client.state_store();
1850
1851        let local_requests =
1852            store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1853                Some(LocalEcho {
1854                    transaction_id: queued.transaction_id.clone(),
1855                    content: match queued.kind {
1856                        QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1857                            serialized_event: content,
1858                            send_handle: SendHandle {
1859                                room: room.clone(),
1860                                transaction_id: queued.transaction_id,
1861                                media_handles: vec![],
1862                                created_at: queued.created_at,
1863                            },
1864                            send_error: queued.error,
1865                        },
1866
1867                        QueuedRequestKind::MediaUpload { .. } => {
1868                            // Don't return uploaded medias as their own things; the accompanying
1869                            // event represented as a dependent request should be sufficient.
1870                            return None;
1871                        }
1872                    },
1873                })
1874            });
1875
1876        let reactions_and_medias = store
1877            .load_dependent_queued_requests(&self.room_id)
1878            .await?
1879            .into_iter()
1880            .filter_map(|dep| match dep.kind {
1881                DependentQueuedRequestKind::EditEvent { .. }
1882                | DependentQueuedRequestKind::RedactEvent => {
1883                    // TODO: reflect local edits/redacts too?
1884                    None
1885                }
1886
1887                DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1888                    transaction_id: dep.own_transaction_id.clone().into(),
1889                    content: LocalEchoContent::React {
1890                        key,
1891                        send_handle: SendReactionHandle {
1892                            room: room.clone(),
1893                            transaction_id: dep.own_transaction_id,
1894                        },
1895                        applies_to: dep.parent_transaction_id,
1896                    },
1897                }),
1898
1899                DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
1900                    // Don't reflect these: only the associated event is interesting to observers.
1901                    None
1902                }
1903
1904                DependentQueuedRequestKind::FinishUpload {
1905                    local_echo,
1906                    file_upload,
1907                    thumbnail_info,
1908                } => {
1909                    // Materialize as an event local echo.
1910                    Some(LocalEcho {
1911                        transaction_id: dep.own_transaction_id.clone().into(),
1912                        content: LocalEchoContent::Event {
1913                            serialized_event: SerializableEventContent::new(&(*local_echo).into())
1914                                .ok()?,
1915                            send_handle: SendHandle {
1916                                room: room.clone(),
1917                                transaction_id: dep.own_transaction_id.into(),
1918                                media_handles: vec![MediaHandles {
1919                                    upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1920                                    upload_file_txn: file_upload,
1921                                }],
1922                                created_at: dep.created_at,
1923                            },
1924                            send_error: None,
1925                        },
1926                    })
1927                }
1928
1929                #[cfg(feature = "unstable-msc4274")]
1930                DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1931                    // Materialize as an event local echo.
1932                    self.create_gallery_local_echo(
1933                        dep.own_transaction_id,
1934                        room,
1935                        dep.created_at,
1936                        local_echo,
1937                        item_infos,
1938                    )
1939                }
1940            });
1941
1942        Ok(local_requests.chain(reactions_and_medias).collect())
1943    }
1944
1945    /// Create a local echo for a gallery event.
1946    #[cfg(feature = "unstable-msc4274")]
1947    fn create_gallery_local_echo(
1948        &self,
1949        transaction_id: ChildTransactionId,
1950        room: &RoomSendQueue,
1951        created_at: MilliSecondsSinceUnixEpoch,
1952        local_echo: Box<RoomMessageEventContent>,
1953        item_infos: Vec<FinishGalleryItemInfo>,
1954    ) -> Option<LocalEcho> {
1955        Some(LocalEcho {
1956            transaction_id: transaction_id.clone().into(),
1957            content: LocalEchoContent::Event {
1958                serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
1959                send_handle: SendHandle {
1960                    room: room.clone(),
1961                    transaction_id: transaction_id.into(),
1962                    media_handles: item_infos
1963                        .into_iter()
1964                        .map(|i| MediaHandles {
1965                            upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
1966                            upload_file_txn: i.file_upload,
1967                        })
1968                        .collect(),
1969                    created_at,
1970                },
1971                send_error: None,
1972            },
1973        })
1974    }
1975
1976    /// Try to apply a single dependent request, whether it's local or remote.
1977    ///
1978    /// This swallows errors that would retrigger every time if we retried
1979    /// applying the dependent request: invalid edit content, etc.
1980    ///
1981    /// Returns true if the dependent request has been sent (or should not be
1982    /// retried later).
1983    #[instrument(skip_all)]
1984    async fn try_apply_single_dependent_request(
1985        &self,
1986        client: &Client,
1987        dependent_request: DependentQueuedRequest,
1988        new_updates: &mut Vec<RoomSendQueueUpdate>,
1989    ) -> Result<bool, RoomSendQueueError> {
1990        let store = client.state_store();
1991
1992        let parent_key = dependent_request.parent_key;
1993
1994        match dependent_request.kind {
1995            DependentQueuedRequestKind::EditEvent { new_content } => {
1996                if let Some(parent_key) = parent_key {
1997                    let Some(event_id) = parent_key.into_event_id() else {
1998                        return Err(RoomSendQueueError::StorageError(
1999                            RoomSendQueueStorageError::InvalidParentKey,
2000                        ));
2001                    };
2002
2003                    // The parent event has been sent, so send an edit event.
2004                    let room = client
2005                        .get_room(&self.room_id)
2006                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
2007
2008                    // Check the event is one we know how to edit with an edit event.
2009
2010                    // It must be deserializable…
2011                    let edited_content = match new_content.deserialize() {
2012                        Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
2013                            // Assume no relationships.
2014                            EditedContent::RoomMessage(c.into())
2015                        }
2016
2017                        Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
2018                            let poll_start = c.poll_start().clone();
2019                            EditedContent::PollStart {
2020                                fallback_text: poll_start.question.text.clone(),
2021                                new_content: poll_start,
2022                            }
2023                        }
2024
2025                        Ok(c) => {
2026                            warn!("Unsupported edit content type: {:?}", c.event_type());
2027                            return Ok(true);
2028                        }
2029
2030                        Err(err) => {
2031                            warn!("Unable to deserialize: {err}");
2032                            return Ok(true);
2033                        }
2034                    };
2035
2036                    let edit_event = match room.make_edit_event(&event_id, edited_content).await {
2037                        Ok(e) => e,
2038                        Err(err) => {
2039                            warn!("couldn't create edited event: {err}");
2040                            return Ok(true);
2041                        }
2042                    };
2043
2044                    // Queue the edit event in the send queue 🧠.
2045                    let serializable = SerializableEventContent::from_raw(
2046                        Raw::new(&edit_event)
2047                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2048                        edit_event.event_type().to_string(),
2049                    );
2050
2051                    store
2052                        .save_send_queue_request(
2053                            &self.room_id,
2054                            dependent_request.own_transaction_id.into(),
2055                            dependent_request.created_at,
2056                            serializable.into(),
2057                            Self::HIGH_PRIORITY,
2058                        )
2059                        .await
2060                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2061                } else {
2062                    // The parent event is still local; update the local echo.
2063                    let edited = store
2064                        .update_send_queue_request(
2065                            &self.room_id,
2066                            &dependent_request.parent_transaction_id,
2067                            new_content.into(),
2068                        )
2069                        .await
2070                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2071
2072                    if !edited {
2073                        warn!("missing local echo upon dependent edit");
2074                    }
2075                }
2076            }
2077
2078            DependentQueuedRequestKind::RedactEvent => {
2079                if let Some(parent_key) = parent_key {
2080                    let Some(event_id) = parent_key.into_event_id() else {
2081                        return Err(RoomSendQueueError::StorageError(
2082                            RoomSendQueueStorageError::InvalidParentKey,
2083                        ));
2084                    };
2085
2086                    // The parent event has been sent; send a redaction.
2087                    let room = client
2088                        .get_room(&self.room_id)
2089                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
2090
2091                    // Ideally we'd use the send queue to send the redaction, but the protocol has
2092                    // changed the shape of a room.redaction after v11, so keep it simple and try
2093                    // once here.
2094
2095                    // Note: no reason is provided because we materialize the intent of "cancel
2096                    // sending the parent event".
2097
2098                    if let Err(err) = room
2099                        .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
2100                        .await
2101                    {
2102                        warn!("error when sending a redact for {event_id}: {err}");
2103                        return Ok(false);
2104                    }
2105                } else {
2106                    // The parent event is still local (sending must have failed); redact the local
2107                    // echo.
2108                    let removed = store
2109                        .remove_send_queue_request(
2110                            &self.room_id,
2111                            &dependent_request.parent_transaction_id,
2112                        )
2113                        .await
2114                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2115
2116                    if !removed {
2117                        warn!("missing local echo upon dependent redact");
2118                    }
2119                }
2120            }
2121
2122            DependentQueuedRequestKind::ReactEvent { key } => {
2123                if let Some(parent_key) = parent_key {
2124                    let Some(parent_event_id) = parent_key.into_event_id() else {
2125                        return Err(RoomSendQueueError::StorageError(
2126                            RoomSendQueueStorageError::InvalidParentKey,
2127                        ));
2128                    };
2129
2130                    // Queue the reaction event in the send queue 🧠.
2131                    let react_event =
2132                        ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
2133                    let serializable = SerializableEventContent::from_raw(
2134                        Raw::new(&react_event)
2135                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2136                        react_event.event_type().to_string(),
2137                    );
2138
2139                    store
2140                        .save_send_queue_request(
2141                            &self.room_id,
2142                            dependent_request.own_transaction_id.into(),
2143                            dependent_request.created_at,
2144                            serializable.into(),
2145                            Self::HIGH_PRIORITY,
2146                        )
2147                        .await
2148                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2149                } else {
2150                    // Not applied yet, we should retry later => false.
2151                    return Ok(false);
2152                }
2153            }
2154
2155            DependentQueuedRequestKind::UploadFileOrThumbnail {
2156                content_type,
2157                cache_key,
2158                related_to,
2159                parent_is_thumbnail_upload,
2160            } => {
2161                let Some(parent_key) = parent_key else {
2162                    // Not finished yet, we should retry later => false.
2163                    return Ok(false);
2164                };
2165                self.handle_dependent_file_or_thumbnail_upload(
2166                    client,
2167                    dependent_request.own_transaction_id.into(),
2168                    parent_key,
2169                    content_type,
2170                    cache_key,
2171                    related_to,
2172                    parent_is_thumbnail_upload,
2173                )
2174                .await?;
2175            }
2176
2177            DependentQueuedRequestKind::FinishUpload {
2178                local_echo,
2179                file_upload,
2180                thumbnail_info,
2181            } => {
2182                let Some(parent_key) = parent_key else {
2183                    // Not finished yet, we should retry later => false.
2184                    return Ok(false);
2185                };
2186                self.handle_dependent_finish_upload(
2187                    client,
2188                    dependent_request.own_transaction_id.into(),
2189                    parent_key,
2190                    *local_echo,
2191                    file_upload,
2192                    thumbnail_info,
2193                    new_updates,
2194                )
2195                .await?;
2196            }
2197
2198            #[cfg(feature = "unstable-msc4274")]
2199            DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2200                let Some(parent_key) = parent_key else {
2201                    // Not finished yet, we should retry later => false.
2202                    return Ok(false);
2203                };
2204                self.handle_dependent_finish_gallery_upload(
2205                    client,
2206                    dependent_request.own_transaction_id.into(),
2207                    parent_key,
2208                    *local_echo,
2209                    item_infos,
2210                    new_updates,
2211                )
2212                .await?;
2213            }
2214        }
2215
2216        Ok(true)
2217    }
2218
2219    #[instrument(skip(self))]
2220    async fn apply_dependent_requests(
2221        &self,
2222        new_updates: &mut Vec<RoomSendQueueUpdate>,
2223    ) -> Result<(), RoomSendQueueError> {
2224        let guard = self.store.lock().await;
2225
2226        let client = guard.client()?;
2227        let store = client.state_store();
2228
2229        let dependent_requests = store
2230            .load_dependent_queued_requests(&self.room_id)
2231            .await
2232            .map_err(RoomSendQueueStorageError::StateStoreError)?;
2233
2234        let num_initial_dependent_requests = dependent_requests.len();
2235        if num_initial_dependent_requests == 0 {
2236            // Returning early here avoids a bit of useless logging.
2237            return Ok(());
2238        }
2239
2240        let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
2241
2242        // Get rid of the all non-canonical dependent events.
2243        for original in &dependent_requests {
2244            if !canonicalized_dependent_requests
2245                .iter()
2246                .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
2247            {
2248                store
2249                    .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
2250                    .await
2251                    .map_err(RoomSendQueueStorageError::StateStoreError)?;
2252            }
2253        }
2254
2255        let mut num_dependent_requests = canonicalized_dependent_requests.len();
2256
2257        debug!(
2258            num_dependent_requests,
2259            num_initial_dependent_requests, "starting handling of dependent requests"
2260        );
2261
2262        for dependent in canonicalized_dependent_requests {
2263            let dependent_id = dependent.own_transaction_id.clone();
2264
2265            match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
2266                Ok(should_remove) => {
2267                    if should_remove {
2268                        // The dependent request has been successfully applied, forget about it.
2269                        store
2270                            .remove_dependent_queued_request(&self.room_id, &dependent_id)
2271                            .await
2272                            .map_err(RoomSendQueueStorageError::StateStoreError)?;
2273
2274                        num_dependent_requests -= 1;
2275                    }
2276                }
2277
2278                Err(err) => {
2279                    warn!("error when applying single dependent request: {err}");
2280                }
2281            }
2282        }
2283
2284        debug!(
2285            leftover_dependent_requests = num_dependent_requests,
2286            "stopped handling dependent request"
2287        );
2288
2289        Ok(())
2290    }
2291
2292    /// Remove a single dependent request from storage.
2293    async fn remove_dependent_send_queue_request(
2294        &self,
2295        dependent_event_id: &ChildTransactionId,
2296    ) -> Result<bool, RoomSendQueueStorageError> {
2297        Ok(self
2298            .store
2299            .lock()
2300            .await
2301            .client()?
2302            .state_store()
2303            .remove_dependent_queued_request(&self.room_id, dependent_event_id)
2304            .await?)
2305    }
2306}
2307
2308#[cfg(feature = "unstable-msc4274")]
2309/// Metadata needed for pushing gallery item uploads onto the send queue.
2310struct GalleryItemQueueInfo {
2311    content_type: Mime,
2312    upload_file_txn: OwnedTransactionId,
2313    file_media_request: MediaRequestParameters,
2314    thumbnail: Option<QueueThumbnailInfo>,
2315}
2316
2317/// The content of a local echo.
2318#[derive(Clone, Debug)]
2319pub enum LocalEchoContent {
2320    /// The local echo contains an actual event ready to display.
2321    Event {
2322        /// Content of the event itself (along with its type) that we are about
2323        /// to send.
2324        serialized_event: SerializableEventContent,
2325        /// A handle to manipulate the sending of the associated event.
2326        send_handle: SendHandle,
2327        /// Whether trying to send this local echo failed in the past with an
2328        /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
2329        send_error: Option<QueueWedgeError>,
2330    },
2331
2332    /// A local echo has been reacted to.
2333    React {
2334        /// The key with which the local echo has been reacted to.
2335        key: String,
2336        /// A handle to manipulate the sending of the reaction.
2337        send_handle: SendReactionHandle,
2338        /// The local echo which has been reacted to.
2339        applies_to: OwnedTransactionId,
2340    },
2341}
2342
2343/// A local representation for a request that hasn't been sent yet to the user's
2344/// homeserver.
2345#[derive(Clone, Debug)]
2346pub struct LocalEcho {
2347    /// Transaction id used to identify the associated request.
2348    pub transaction_id: OwnedTransactionId,
2349    /// The content for the local echo.
2350    pub content: LocalEchoContent,
2351}
2352
2353/// An update to a room send queue, observable with
2354/// [`RoomSendQueue::subscribe`].
2355#[derive(Clone, Debug)]
2356pub enum RoomSendQueueUpdate {
2357    /// A new local event is being sent.
2358    ///
2359    /// There's been a user query to create this event. It is being sent to the
2360    /// server.
2361    NewLocalEvent(LocalEcho),
2362
2363    /// A local event that hadn't been sent to the server yet has been cancelled
2364    /// before sending.
2365    CancelledLocalEvent {
2366        /// Transaction id used to identify this event.
2367        transaction_id: OwnedTransactionId,
2368    },
2369
2370    /// A local event's content has been replaced with something else.
2371    ReplacedLocalEvent {
2372        /// Transaction id used to identify this event.
2373        transaction_id: OwnedTransactionId,
2374
2375        /// The new content replacing the previous one.
2376        new_content: SerializableEventContent,
2377    },
2378
2379    /// An error happened when an event was being sent.
2380    ///
2381    /// The event has not been removed from the queue. All the send queues
2382    /// will be disabled after this happens, and must be manually re-enabled.
2383    SendError {
2384        /// Transaction id used to identify this event.
2385        transaction_id: OwnedTransactionId,
2386        /// Error received while sending the event.
2387        error: Arc<crate::Error>,
2388        /// Whether the error is considered recoverable or not.
2389        ///
2390        /// An error that's recoverable will disable the room's send queue,
2391        /// while an unrecoverable error will be parked, until the user
2392        /// decides to cancel sending it.
2393        is_recoverable: bool,
2394    },
2395
2396    /// The event has been unwedged and sending is now being retried.
2397    RetryEvent {
2398        /// Transaction id used to identify this event.
2399        transaction_id: OwnedTransactionId,
2400    },
2401
2402    /// The event has been sent to the server, and the query returned
2403    /// successfully.
2404    SentEvent {
2405        /// Transaction id used to identify this event.
2406        transaction_id: OwnedTransactionId,
2407        /// Received event id from the send response.
2408        event_id: OwnedEventId,
2409    },
2410
2411    /// A media upload (consisting of a file and possibly a thumbnail) has made
2412    /// progress.
2413    MediaUpload {
2414        /// The media event this uploaded media relates to.
2415        related_to: OwnedTransactionId,
2416
2417        /// The final media source for the file if it has finished uploading.
2418        file: Option<MediaSource>,
2419
2420        /// The index of the media within the transaction. A file and its
2421        /// thumbnail share the same index. Will always be 0 for non-gallery
2422        /// media uploads.
2423        index: u64,
2424
2425        /// The combined upload progress across the file and, if existing, its
2426        /// thumbnail. For gallery uploads, the progress is reported per indexed
2427        /// gallery item.
2428        progress: AbstractProgress,
2429    },
2430}
2431
2432/// A [`RoomSendQueueUpdate`] with an associated [`OwnedRoomId`].
2433///
2434/// This is used by [`SendQueue::subscribe`] to get a single channel to receive
2435/// updates for all [`RoomSendQueue`]s.
2436#[derive(Clone, Debug)]
2437pub struct SendQueueUpdate {
2438    /// The room where the update happened.
2439    pub room_id: OwnedRoomId,
2440
2441    /// The update for this room.
2442    pub update: RoomSendQueueUpdate,
2443}
2444
2445/// An error triggered by the send queue module.
2446#[derive(Debug, thiserror::Error)]
2447pub enum RoomSendQueueError {
2448    /// The room isn't in the joined state.
2449    #[error("the room isn't in the joined state")]
2450    RoomNotJoined,
2451
2452    /// The room is missing from the client.
2453    ///
2454    /// This happens only whenever the client is shutting down.
2455    #[error("the room is now missing from the client")]
2456    RoomDisappeared,
2457
2458    /// Error coming from storage.
2459    #[error(transparent)]
2460    StorageError(#[from] RoomSendQueueStorageError),
2461
2462    /// The attachment event failed to be created.
2463    #[error("the attachment event could not be created")]
2464    FailedToCreateAttachment,
2465
2466    /// The gallery contains no items.
2467    #[cfg(feature = "unstable-msc4274")]
2468    #[error("the gallery contains no items")]
2469    EmptyGallery,
2470
2471    /// The gallery event failed to be created.
2472    #[cfg(feature = "unstable-msc4274")]
2473    #[error("the gallery event could not be created")]
2474    FailedToCreateGallery,
2475}
2476
2477/// An error triggered by the send queue storage.
2478#[derive(Debug, thiserror::Error)]
2479pub enum RoomSendQueueStorageError {
2480    /// Error caused by the state store.
2481    #[error(transparent)]
2482    StateStoreError(#[from] StoreError),
2483
2484    /// Error caused by the event cache store.
2485    #[error(transparent)]
2486    EventCacheStoreError(#[from] EventCacheStoreError),
2487
2488    /// Error caused by the event cache store.
2489    #[error(transparent)]
2490    MediaStoreError(#[from] MediaStoreError),
2491
2492    /// Error caused when attempting to get a handle on the event cache store.
2493    #[error(transparent)]
2494    LockError(#[from] CrossProcessLockError),
2495
2496    /// Error caused when (de)serializing into/from json.
2497    #[error(transparent)]
2498    JsonSerialization(#[from] serde_json::Error),
2499
2500    /// A parent key was expected to be of a certain type, and it was another
2501    /// type instead.
2502    #[error("a dependent event had an invalid parent key type")]
2503    InvalidParentKey,
2504
2505    /// The client is shutting down.
2506    #[error("The client is shutting down.")]
2507    ClientShuttingDown,
2508
2509    /// An operation not implemented on a send handle.
2510    #[error("This operation is not implemented for media uploads")]
2511    OperationNotImplementedYet,
2512
2513    /// Trying to edit a media caption for something that's not a media.
2514    #[error("Can't edit a media caption when the underlying event isn't a media")]
2515    InvalidMediaCaptionEdit,
2516}
2517
2518/// Extra transaction IDs useful during an upload.
2519#[derive(Clone, Debug)]
2520struct MediaHandles {
2521    /// Transaction id used when uploading the thumbnail.
2522    ///
2523    /// Optional because a media can be uploaded without a thumbnail.
2524    upload_thumbnail_txn: Option<OwnedTransactionId>,
2525
2526    /// Transaction id used when uploading the media itself.
2527    upload_file_txn: OwnedTransactionId,
2528}
2529
2530/// A handle to manipulate an event that was scheduled to be sent to a room.
2531#[derive(Clone, Debug)]
2532pub struct SendHandle {
2533    /// Link to the send queue used to send this request.
2534    room: RoomSendQueue,
2535
2536    /// Transaction id used for the sent request.
2537    ///
2538    /// If this is a media upload, this is the "main" transaction id, i.e. the
2539    /// one used to send the event, and that will be seen by observers.
2540    transaction_id: OwnedTransactionId,
2541
2542    /// Additional handles for a media upload.
2543    media_handles: Vec<MediaHandles>,
2544
2545    /// The time at which the event to be sent has been created.
2546    pub created_at: MilliSecondsSinceUnixEpoch,
2547}
2548
2549impl SendHandle {
2550    /// Creates a new [`SendHandle`].
2551    #[cfg(test)]
2552    pub(crate) fn new(
2553        room: RoomSendQueue,
2554        transaction_id: OwnedTransactionId,
2555        created_at: MilliSecondsSinceUnixEpoch,
2556    ) -> Self {
2557        Self { room, transaction_id, media_handles: vec![], created_at }
2558    }
2559
2560    fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2561        if !self.media_handles.is_empty() {
2562            Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2563        } else {
2564            Ok(())
2565        }
2566    }
2567
2568    /// Aborts the sending of the event, if it wasn't sent yet.
2569    ///
2570    /// Returns true if the sending could be aborted, false if not (i.e. the
2571    /// event had already been sent).
2572    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2573    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2574        trace!("received an abort request");
2575
2576        let queue = &self.room.inner.queue;
2577
2578        for handles in &self.media_handles {
2579            if queue.abort_upload(&self.transaction_id, handles).await? {
2580                // Propagate a cancelled update.
2581                self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2582                    transaction_id: self.transaction_id.clone(),
2583                });
2584
2585                return Ok(true);
2586            }
2587
2588            // If it failed, it means the sending of the event is not a
2589            // dependent request anymore. Fall back to the regular
2590            // code path below, that handles aborting sending of an event.
2591        }
2592
2593        if queue.cancel_event(&self.transaction_id).await? {
2594            trace!("successful abort");
2595
2596            // Propagate a cancelled update too.
2597            self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2598                transaction_id: self.transaction_id.clone(),
2599            });
2600
2601            Ok(true)
2602        } else {
2603            debug!("local echo didn't exist anymore, can't abort");
2604            Ok(false)
2605        }
2606    }
2607
2608    /// Edits the content of a local echo with a raw event content.
2609    ///
2610    /// Returns true if the event to be sent was replaced, false if not (i.e.
2611    /// the event had already been sent).
2612    #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2613    pub async fn edit_raw(
2614        &self,
2615        new_content: Raw<AnyMessageLikeEventContent>,
2616        event_type: String,
2617    ) -> Result<bool, RoomSendQueueStorageError> {
2618        trace!("received an edit request");
2619        self.nyi_for_uploads()?;
2620
2621        let serializable = SerializableEventContent::from_raw(new_content, event_type);
2622
2623        if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2624            trace!("successful edit");
2625
2626            // Wake up the queue, in case the room was asleep before the edit.
2627            self.room.inner.notifier.notify_one();
2628
2629            // Propagate a replaced update too.
2630            self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2631                transaction_id: self.transaction_id.clone(),
2632                new_content: serializable,
2633            });
2634
2635            Ok(true)
2636        } else {
2637            debug!("local echo doesn't exist anymore, can't edit");
2638            Ok(false)
2639        }
2640    }
2641
2642    /// Edits the content of a local echo with an event content.
2643    ///
2644    /// Returns true if the event to be sent was replaced, false if not (i.e.
2645    /// the event had already been sent).
2646    pub async fn edit(
2647        &self,
2648        new_content: AnyMessageLikeEventContent,
2649    ) -> Result<bool, RoomSendQueueStorageError> {
2650        self.edit_raw(
2651            Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2652            new_content.event_type().to_string(),
2653        )
2654        .await
2655    }
2656
2657    /// Edits the content of a local echo with a media caption.
2658    ///
2659    /// Will fail if the event to be sent, represented by this send handle,
2660    /// wasn't a media.
2661    pub async fn edit_media_caption(
2662        &self,
2663        caption: Option<String>,
2664        formatted_caption: Option<FormattedBody>,
2665        mentions: Option<Mentions>,
2666    ) -> Result<bool, RoomSendQueueStorageError> {
2667        if let Some(new_content) = self
2668            .room
2669            .inner
2670            .queue
2671            .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2672            .await?
2673        {
2674            trace!("successful edit of media caption");
2675
2676            // Wake up the queue, in case the room was asleep before the edit.
2677            self.room.inner.notifier.notify_one();
2678
2679            let new_content = SerializableEventContent::new(&new_content)
2680                .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2681
2682            // Propagate a replaced update too.
2683            self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2684                transaction_id: self.transaction_id.clone(),
2685                new_content,
2686            });
2687
2688            Ok(true)
2689        } else {
2690            debug!("local echo doesn't exist anymore, can't edit media caption");
2691            Ok(false)
2692        }
2693    }
2694
2695    /// Unwedge a local echo identified by its transaction identifier and try to
2696    /// resend it.
2697    pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2698        let room = &self.room.inner;
2699        room.queue
2700            .mark_as_unwedged(&self.transaction_id)
2701            .await
2702            .map_err(RoomSendQueueError::StorageError)?;
2703
2704        // If we have media handles, also try to unwedge them.
2705        //
2706        // It's fine to always do it to *all* the transaction IDs at once, because only
2707        // one of the three requests will be active at the same time, i.e. only
2708        // one entry will be updated in the store. The other two are either
2709        // done, or dependent requests.
2710
2711        for handles in &self.media_handles {
2712            room.queue
2713                .mark_as_unwedged(&handles.upload_file_txn)
2714                .await
2715                .map_err(RoomSendQueueError::StorageError)?;
2716
2717            if let Some(txn) = &handles.upload_thumbnail_txn {
2718                room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2719            }
2720        }
2721
2722        // Wake up the queue, in case the room was asleep before unwedging the request.
2723        room.notifier.notify_one();
2724
2725        self.room.send_update(RoomSendQueueUpdate::RetryEvent {
2726            transaction_id: self.transaction_id.clone(),
2727        });
2728
2729        Ok(())
2730    }
2731
2732    /// Send a reaction to the event as soon as it's sent.
2733    ///
2734    /// If returning `Ok(None)`; this means the reaction couldn't be sent
2735    /// because the event is already a remote one.
2736    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2737    pub async fn react(
2738        &self,
2739        key: String,
2740    ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2741        trace!("received an intent to react");
2742
2743        let created_at = MilliSecondsSinceUnixEpoch::now();
2744        if let Some(reaction_txn_id) =
2745            self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2746        {
2747            trace!("successfully queued react");
2748
2749            // Wake up the queue, in case the room was asleep before the sending.
2750            self.room.inner.notifier.notify_one();
2751
2752            // Propagate a new local event.
2753            let send_handle = SendReactionHandle {
2754                room: self.room.clone(),
2755                transaction_id: reaction_txn_id.clone(),
2756            };
2757
2758            self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2759                // Note: we do want to use the `txn_id` we're going to use for the reaction, not
2760                // the one for the event we're reacting to.
2761                transaction_id: reaction_txn_id.into(),
2762                content: LocalEchoContent::React {
2763                    key,
2764                    send_handle: send_handle.clone(),
2765                    applies_to: self.transaction_id.clone(),
2766                },
2767            }));
2768
2769            Ok(Some(send_handle))
2770        } else {
2771            debug!("local echo doesn't exist anymore, can't react");
2772            Ok(None)
2773        }
2774    }
2775}
2776
2777/// A handle to execute actions on the sending of a reaction.
2778#[derive(Clone, Debug)]
2779pub struct SendReactionHandle {
2780    /// Reference to the send queue for the room where this reaction was sent.
2781    room: RoomSendQueue,
2782    /// The own transaction id for the reaction.
2783    transaction_id: ChildTransactionId,
2784}
2785
2786impl SendReactionHandle {
2787    /// Abort the sending of the reaction.
2788    ///
2789    /// Will return true if the reaction could be aborted, false if it's been
2790    /// sent (and there's no matching local echo anymore).
2791    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2792        if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2793            // Simple case: the reaction was found in the dependent event list.
2794
2795            // Propagate a cancelled update too.
2796            self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2797                transaction_id: self.transaction_id.clone().into(),
2798            });
2799
2800            return Ok(true);
2801        }
2802
2803        // The reaction has already been queued for sending, try to abort it using a
2804        // regular abort.
2805        let handle = SendHandle {
2806            room: self.room.clone(),
2807            transaction_id: self.transaction_id.clone().into(),
2808            media_handles: vec![],
2809            created_at: MilliSecondsSinceUnixEpoch::now(),
2810        };
2811
2812        handle.abort().await
2813    }
2814
2815    /// The transaction id that will be used to send this reaction later.
2816    pub fn transaction_id(&self) -> &TransactionId {
2817        &self.transaction_id
2818    }
2819}
2820
2821/// From a given source of [`DependentQueuedRequest`], return only the most
2822/// meaningful, i.e. the ones that wouldn't be overridden after applying the
2823/// others.
2824fn canonicalize_dependent_requests(
2825    dependent: &[DependentQueuedRequest],
2826) -> Vec<DependentQueuedRequest> {
2827    let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2828
2829    for d in dependent {
2830        let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2831
2832        if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2833            // The parent event has already been flagged for redaction, don't consider the
2834            // other dependent events.
2835            continue;
2836        }
2837
2838        match &d.kind {
2839            DependentQueuedRequestKind::EditEvent { .. } => {
2840                // Replace any previous edit with this one.
2841                if let Some(prev_edit) = prevs
2842                    .iter_mut()
2843                    .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2844                {
2845                    *prev_edit = d;
2846                } else {
2847                    prevs.insert(0, d);
2848                }
2849            }
2850
2851            DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
2852            | DependentQueuedRequestKind::FinishUpload { .. }
2853            | DependentQueuedRequestKind::ReactEvent { .. } => {
2854                // These requests can't be canonicalized, push them as is.
2855                prevs.push(d);
2856            }
2857
2858            #[cfg(feature = "unstable-msc4274")]
2859            DependentQueuedRequestKind::FinishGallery { .. } => {
2860                // This request can't be canonicalized, push it as is.
2861                prevs.push(d);
2862            }
2863
2864            DependentQueuedRequestKind::RedactEvent => {
2865                // Remove every other dependent action.
2866                prevs.clear();
2867                prevs.push(d);
2868            }
2869        }
2870    }
2871
2872    by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2873}
2874
2875#[cfg(all(test, not(target_family = "wasm")))]
2876mod tests {
2877    use std::{sync::Arc, time::Duration};
2878
2879    use assert_matches2::{assert_let, assert_matches};
2880    use matrix_sdk_base::store::{
2881        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2882        SerializableEventContent,
2883    };
2884    use matrix_sdk_test::{JoinedRoomBuilder, SyncResponseBuilder, async_test};
2885    use ruma::{
2886        MilliSecondsSinceUnixEpoch, TransactionId,
2887        events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
2888        room_id,
2889    };
2890
2891    use super::canonicalize_dependent_requests;
2892    use crate::{client::WeakClient, test_utils::logged_in_client};
2893
2894    #[test]
2895    fn test_canonicalize_dependent_events_created_at() {
2896        // Test to ensure the created_at field is being serialized and retrieved
2897        // correctly.
2898        let txn = TransactionId::new();
2899        let created_at = MilliSecondsSinceUnixEpoch::now();
2900
2901        let edit = DependentQueuedRequest {
2902            own_transaction_id: ChildTransactionId::new(),
2903            parent_transaction_id: txn.clone(),
2904            kind: DependentQueuedRequestKind::EditEvent {
2905                new_content: SerializableEventContent::new(
2906                    &RoomMessageEventContent::text_plain("edit").into(),
2907                )
2908                .unwrap(),
2909            },
2910            parent_key: None,
2911            created_at,
2912        };
2913
2914        let res = canonicalize_dependent_requests(&[edit]);
2915
2916        assert_eq!(res.len(), 1);
2917        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2918        assert_let!(
2919            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2920        );
2921        assert_eq!(msg.body(), "edit");
2922        assert_eq!(res[0].parent_transaction_id, txn);
2923        assert_eq!(res[0].created_at, created_at);
2924    }
2925
2926    #[async_test]
2927    async fn test_client_no_cycle_with_send_queue() {
2928        for enabled in [true, false] {
2929            let client = logged_in_client(None).await;
2930            let weak_client = WeakClient::from_client(&client);
2931
2932            {
2933                let mut sync_response_builder = SyncResponseBuilder::new();
2934
2935                let room_id = room_id!("!a:b.c");
2936
2937                // Make sure the client knows about the room.
2938                client
2939                    .base_client()
2940                    .receive_sync_response(
2941                        sync_response_builder
2942                            .add_joined_room(JoinedRoomBuilder::new(room_id))
2943                            .build_sync_response(),
2944                    )
2945                    .await
2946                    .unwrap();
2947
2948                let room = client.get_room(room_id).unwrap();
2949                let q = room.send_queue();
2950
2951                let _watcher = q.subscribe().await;
2952
2953                client.send_queue().set_enabled(enabled).await;
2954            }
2955
2956            drop(client);
2957
2958            // Give a bit of time for background tasks to die.
2959            tokio::time::sleep(Duration::from_millis(500)).await;
2960
2961            // The weak client must be the last reference to the client now.
2962            let client = weak_client.get();
2963            assert!(
2964                client.is_none(),
2965                "too many strong references to the client: {}",
2966                Arc::strong_count(&client.unwrap().inner)
2967            );
2968        }
2969    }
2970
2971    #[test]
2972    fn test_canonicalize_dependent_events_smoke_test() {
2973        // Smoke test: canonicalizing a single dependent event returns it.
2974        let txn = TransactionId::new();
2975
2976        let edit = DependentQueuedRequest {
2977            own_transaction_id: ChildTransactionId::new(),
2978            parent_transaction_id: txn.clone(),
2979            kind: DependentQueuedRequestKind::EditEvent {
2980                new_content: SerializableEventContent::new(
2981                    &RoomMessageEventContent::text_plain("edit").into(),
2982                )
2983                .unwrap(),
2984            },
2985            parent_key: None,
2986            created_at: MilliSecondsSinceUnixEpoch::now(),
2987        };
2988        let res = canonicalize_dependent_requests(&[edit]);
2989
2990        assert_eq!(res.len(), 1);
2991        assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2992        assert_eq!(res[0].parent_transaction_id, txn);
2993        assert!(res[0].parent_key.is_none());
2994    }
2995
2996    #[test]
2997    fn test_canonicalize_dependent_events_redaction_preferred() {
2998        // A redaction is preferred over any other kind of dependent event.
2999        let txn = TransactionId::new();
3000
3001        let mut inputs = Vec::with_capacity(100);
3002        let redact = DependentQueuedRequest {
3003            own_transaction_id: ChildTransactionId::new(),
3004            parent_transaction_id: txn.clone(),
3005            kind: DependentQueuedRequestKind::RedactEvent,
3006            parent_key: None,
3007            created_at: MilliSecondsSinceUnixEpoch::now(),
3008        };
3009
3010        let edit = DependentQueuedRequest {
3011            own_transaction_id: ChildTransactionId::new(),
3012            parent_transaction_id: txn.clone(),
3013            kind: DependentQueuedRequestKind::EditEvent {
3014                new_content: SerializableEventContent::new(
3015                    &RoomMessageEventContent::text_plain("edit").into(),
3016                )
3017                .unwrap(),
3018            },
3019            parent_key: None,
3020            created_at: MilliSecondsSinceUnixEpoch::now(),
3021        };
3022
3023        inputs.push({
3024            let mut edit = edit.clone();
3025            edit.own_transaction_id = ChildTransactionId::new();
3026            edit
3027        });
3028
3029        inputs.push(redact);
3030
3031        for _ in 0..98 {
3032            let mut edit = edit.clone();
3033            edit.own_transaction_id = ChildTransactionId::new();
3034            inputs.push(edit);
3035        }
3036
3037        let res = canonicalize_dependent_requests(&inputs);
3038
3039        assert_eq!(res.len(), 1);
3040        assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
3041        assert_eq!(res[0].parent_transaction_id, txn);
3042    }
3043
3044    #[test]
3045    fn test_canonicalize_dependent_events_last_edit_preferred() {
3046        let parent_txn = TransactionId::new();
3047
3048        // The latest edit of a list is always preferred.
3049        let inputs = (0..10)
3050            .map(|i| DependentQueuedRequest {
3051                own_transaction_id: ChildTransactionId::new(),
3052                parent_transaction_id: parent_txn.clone(),
3053                kind: DependentQueuedRequestKind::EditEvent {
3054                    new_content: SerializableEventContent::new(
3055                        &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
3056                    )
3057                    .unwrap(),
3058                },
3059                parent_key: None,
3060                created_at: MilliSecondsSinceUnixEpoch::now(),
3061            })
3062            .collect::<Vec<_>>();
3063
3064        let txn = inputs[9].parent_transaction_id.clone();
3065
3066        let res = canonicalize_dependent_requests(&inputs);
3067
3068        assert_eq!(res.len(), 1);
3069        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
3070        assert_let!(
3071            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
3072        );
3073        assert_eq!(msg.body(), "edit9");
3074        assert_eq!(res[0].parent_transaction_id, txn);
3075    }
3076
3077    #[test]
3078    fn test_canonicalize_multiple_local_echoes() {
3079        let txn1 = TransactionId::new();
3080        let txn2 = TransactionId::new();
3081
3082        let child1 = ChildTransactionId::new();
3083        let child2 = ChildTransactionId::new();
3084
3085        let inputs = vec![
3086            // This one pertains to txn1.
3087            DependentQueuedRequest {
3088                own_transaction_id: child1.clone(),
3089                kind: DependentQueuedRequestKind::RedactEvent,
3090                parent_transaction_id: txn1.clone(),
3091                parent_key: None,
3092                created_at: MilliSecondsSinceUnixEpoch::now(),
3093            },
3094            // This one pertains to txn2.
3095            DependentQueuedRequest {
3096                own_transaction_id: child2,
3097                kind: DependentQueuedRequestKind::EditEvent {
3098                    new_content: SerializableEventContent::new(
3099                        &RoomMessageEventContent::text_plain("edit").into(),
3100                    )
3101                    .unwrap(),
3102                },
3103                parent_transaction_id: txn2.clone(),
3104                parent_key: None,
3105                created_at: MilliSecondsSinceUnixEpoch::now(),
3106            },
3107        ];
3108
3109        let res = canonicalize_dependent_requests(&inputs);
3110
3111        // The canonicalization shouldn't depend per event id.
3112        assert_eq!(res.len(), 2);
3113
3114        for dependent in res {
3115            if dependent.own_transaction_id == child1 {
3116                assert_eq!(dependent.parent_transaction_id, txn1);
3117                assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
3118            } else {
3119                assert_eq!(dependent.parent_transaction_id, txn2);
3120                assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
3121            }
3122        }
3123    }
3124
3125    #[test]
3126    fn test_canonicalize_reactions_after_edits() {
3127        // Sending reactions should happen after edits to a given event.
3128        let txn = TransactionId::new();
3129
3130        let react_id = ChildTransactionId::new();
3131        let react = DependentQueuedRequest {
3132            own_transaction_id: react_id.clone(),
3133            kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
3134            parent_transaction_id: txn.clone(),
3135            parent_key: None,
3136            created_at: MilliSecondsSinceUnixEpoch::now(),
3137        };
3138
3139        let edit_id = ChildTransactionId::new();
3140        let edit = DependentQueuedRequest {
3141            own_transaction_id: edit_id.clone(),
3142            kind: DependentQueuedRequestKind::EditEvent {
3143                new_content: SerializableEventContent::new(
3144                    &RoomMessageEventContent::text_plain("edit").into(),
3145                )
3146                .unwrap(),
3147            },
3148            parent_transaction_id: txn,
3149            parent_key: None,
3150            created_at: MilliSecondsSinceUnixEpoch::now(),
3151        };
3152
3153        let res = canonicalize_dependent_requests(&[react, edit]);
3154
3155        assert_eq!(res.len(), 2);
3156        assert_eq!(res[0].own_transaction_id, edit_id);
3157        assert_eq!(res[1].own_transaction_id, react_id);
3158    }
3159}