matrix_sdk/send_queue/
upload.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Private implementations of the media upload mechanism.
16
17#[cfg(feature = "unstable-msc4274")]
18use std::{collections::HashMap, iter::zip};
19
20use matrix_sdk_base::{
21    event_cache::store::media::IgnoreMediaRetentionPolicy,
22    media::{MediaFormat, MediaRequestParameters},
23    store::{
24        ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
25        QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
26    },
27    RoomState,
28};
29#[cfg(feature = "unstable-msc4274")]
30use matrix_sdk_base::{
31    media::UniqueKey,
32    store::{AccumulatedSentMediaInfo, FinishGalleryItemInfo},
33};
34use mime::Mime;
35#[cfg(feature = "unstable-msc4274")]
36use ruma::events::room::message::{GalleryItemType, GalleryMessageEventContent};
37use ruma::{
38    events::{
39        room::{
40            message::{FormattedBody, MessageType, RoomMessageEventContent},
41            MediaSource, ThumbnailInfo,
42        },
43        AnyMessageLikeEventContent, Mentions,
44    },
45    MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
46};
47use tracing::{debug, error, instrument, trace, warn, Span};
48
49use super::{QueueStorage, RoomSendQueue, RoomSendQueueError};
50use crate::{
51    attachment::{AttachmentConfig, Thumbnail},
52    room::edit::update_media_caption,
53    send_queue::{
54        LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
55        SendHandle,
56    },
57    Client, Media, Room,
58};
59#[cfg(feature = "unstable-msc4274")]
60use crate::{
61    attachment::{GalleryConfig, GalleryItemInfo},
62    send_queue::GalleryItemQueueInfo,
63};
64
65/// Replace the source by the final ones in all the media types handled by
66/// [`Room::make_attachment_type()`].
67fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
68    // Some variants look really similar below, but the `event` and `info` are all
69    // different types…
70    match &mut echo.msgtype {
71        MessageType::Audio(event) => {
72            event.source = sent.file;
73        }
74        MessageType::File(event) => {
75            event.source = sent.file;
76            if let Some(info) = event.info.as_mut() {
77                info.thumbnail_source = sent.thumbnail;
78            }
79        }
80        MessageType::Image(event) => {
81            event.source = sent.file;
82            if let Some(info) = event.info.as_mut() {
83                info.thumbnail_source = sent.thumbnail;
84            }
85        }
86        MessageType::Video(event) => {
87            event.source = sent.file;
88            if let Some(info) = event.info.as_mut() {
89                info.thumbnail_source = sent.thumbnail;
90            }
91        }
92
93        _ => {
94            // All `MessageType` created by `Room::make_attachment_type` should be
95            // handled here. The only way to end up here is that a message type has
96            // been tampered with in the database.
97            error!("Invalid message type in database: {}", echo.msgtype());
98            // Only crash debug builds.
99            debug_assert!(false, "invalid message type in database");
100        }
101    }
102}
103
104/// Replace the sources by the final ones in all the media types handled by
105/// [`Room::make_gallery_item_type()`].
106#[cfg(feature = "unstable-msc4274")]
107fn update_gallery_event_after_upload(
108    echo: &mut RoomMessageEventContent,
109    sent: HashMap<String, AccumulatedSentMediaInfo>,
110) {
111    let MessageType::Gallery(gallery) = &mut echo.msgtype else {
112        // All `GalleryItemType` created by `Room::make_gallery_item_type` should be
113        // handled here. The only way to end up here is that a item type has
114        // been tampered with in the database.
115        error!("Invalid gallery item types in database");
116        // Only crash debug builds.
117        debug_assert!(false, "invalid item type in database {:?}", echo.msgtype());
118        return;
119    };
120
121    // Some variants look really similar below, but the `event` and `info` are all
122    // different types…
123    for itemtype in gallery.itemtypes.iter_mut() {
124        match itemtype {
125            GalleryItemType::Audio(event) => match sent.get(&event.source.unique_key()) {
126                Some(sent) => event.source = sent.file.clone(),
127                None => error!("key for item {:?} does not exist on gallery event", event.source),
128            },
129            GalleryItemType::File(event) => match sent.get(&event.source.unique_key()) {
130                Some(sent) => {
131                    event.source = sent.file.clone();
132                    if let Some(info) = event.info.as_mut() {
133                        info.thumbnail_source = sent.thumbnail.clone();
134                    }
135                }
136                None => error!("key for item {:?} does not exist on gallery event", event.source),
137            },
138            GalleryItemType::Image(event) => match sent.get(&event.source.unique_key()) {
139                Some(sent) => {
140                    event.source = sent.file.clone();
141                    if let Some(info) = event.info.as_mut() {
142                        info.thumbnail_source = sent.thumbnail.clone();
143                    }
144                }
145                None => error!("key for item {:?} does not exist on gallery event", event.source),
146            },
147            GalleryItemType::Video(event) => match sent.get(&event.source.unique_key()) {
148                Some(sent) => {
149                    event.source = sent.file.clone();
150                    if let Some(info) = event.info.as_mut() {
151                        info.thumbnail_source = sent.thumbnail.clone();
152                    }
153                }
154                None => error!("key for item {:?} does not exist on gallery event", event.source),
155            },
156
157            _ => {
158                // All `GalleryItemType` created by `Room::make_gallery_item_type` should be
159                // handled here. The only way to end up here is that a item type has
160                // been tampered with in the database.
161                error!("Invalid gallery item types in database");
162                // Only crash debug builds.
163                debug_assert!(false, "invalid gallery item type in database {itemtype:?}");
164            }
165        }
166    }
167}
168
169#[derive(Default)]
170struct MediaCacheResult {
171    upload_thumbnail_txn: Option<OwnedTransactionId>,
172    event_thumbnail_info: Option<(MediaSource, Box<ThumbnailInfo>)>,
173    queue_thumbnail_info: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
174}
175
176impl RoomSendQueue {
177    /// Queues an attachment to be sent to the room, using the send queue.
178    ///
179    /// This returns quickly (without sending or uploading anything), and will
180    /// push the event to be sent into a queue, handled in the background.
181    ///
182    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
183    /// the [`Self::subscribe()`] method to get updates about the sending of
184    /// that event.
185    ///
186    /// By default, if sending failed on the first attempt, it will be retried a
187    /// few times. If sending failed after those retries, the entire
188    /// client's sending queue will be disabled, and it will need to be
189    /// manually re-enabled by the caller (e.g. after network is back, or when
190    /// something has been done about the faulty requests).
191    ///
192    /// The attachment and its optional thumbnail are stored in the media cache
193    /// and can be retrieved at any time, by calling
194    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
195    /// in the local or remote echo, and using a `MediaFormat::File`.
196    #[instrument(skip_all, fields(event_txn))]
197    pub async fn send_attachment(
198        &self,
199        filename: impl Into<String>,
200        content_type: Mime,
201        data: Vec<u8>,
202        mut config: AttachmentConfig,
203    ) -> Result<SendHandle, RoomSendQueueError> {
204        let Some(room) = self.inner.room.get() else {
205            return Err(RoomSendQueueError::RoomDisappeared);
206        };
207
208        if room.state() != RoomState::Joined {
209            return Err(RoomSendQueueError::RoomNotJoined);
210        }
211
212        let filename = filename.into();
213        let upload_file_txn = TransactionId::new();
214        let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
215
216        Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
217        debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
218
219        let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
220
221        let MediaCacheResult { upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info } =
222            RoomSendQueue::cache_media(&room, data, config.thumbnail.take(), &file_media_request)
223                .await?;
224
225        // Create the content for the media event.
226        let event_content = room
227            .make_media_event(
228                Room::make_attachment_type(
229                    &content_type,
230                    filename,
231                    file_media_request.source.clone(),
232                    config.caption,
233                    config.formatted_caption,
234                    config.info,
235                    event_thumbnail_info,
236                ),
237                config.mentions,
238                config.reply,
239            )
240            .await
241            .map_err(|_| RoomSendQueueError::FailedToCreateAttachment)?;
242
243        let created_at = MilliSecondsSinceUnixEpoch::now();
244
245        // Save requests in the queue storage.
246        self.inner
247            .queue
248            .push_media(
249                event_content.clone(),
250                content_type,
251                send_event_txn.clone().into(),
252                created_at,
253                upload_file_txn.clone(),
254                file_media_request,
255                queue_thumbnail_info,
256            )
257            .await?;
258
259        trace!("manager sends a media to the background task");
260
261        self.inner.notifier.notify_one();
262
263        let send_handle = SendHandle {
264            room: self.clone(),
265            transaction_id: send_event_txn.clone().into(),
266            media_handles: vec![MediaHandles { upload_thumbnail_txn, upload_file_txn }],
267            created_at,
268        };
269
270        let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
271            transaction_id: send_event_txn.clone().into(),
272            content: LocalEchoContent::Event {
273                serialized_event: SerializableEventContent::new(&event_content.into())
274                    .map_err(RoomSendQueueStorageError::JsonSerialization)?,
275                send_handle: send_handle.clone(),
276                send_error: None,
277            },
278        }));
279
280        Ok(send_handle)
281    }
282
283    /// Queues a gallery to be sent to the room, using the send queue.
284    ///
285    /// This returns quickly (without sending or uploading anything), and will
286    /// push the event to be sent into a queue, handled in the background.
287    ///
288    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
289    /// the [`Self::subscribe()`] method to get updates about the sending of
290    /// that event.
291    ///
292    /// By default, if sending failed on the first attempt, it will be retried a
293    /// few times. If sending failed after those retries, the entire
294    /// client's sending queue will be disabled, and it will need to be
295    /// manually re-enabled by the caller (e.g. after network is back, or when
296    /// something has been done about the faulty requests).
297    ///
298    /// The attachments and their optional thumbnails are stored in the media
299    /// cache and can be retrieved at any time, by calling
300    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
301    /// in the local or remote echo, and using a `MediaFormat::File`.
302    #[cfg(feature = "unstable-msc4274")]
303    #[instrument(skip_all, fields(event_txn))]
304    pub async fn send_gallery(
305        &self,
306        gallery: GalleryConfig,
307    ) -> Result<SendHandle, RoomSendQueueError> {
308        let Some(room) = self.inner.room.get() else {
309            return Err(RoomSendQueueError::RoomDisappeared);
310        };
311
312        if room.state() != RoomState::Joined {
313            return Err(RoomSendQueueError::RoomNotJoined);
314        }
315
316        if gallery.is_empty() {
317            return Err(RoomSendQueueError::EmptyGallery);
318        }
319
320        let send_event_txn =
321            gallery.txn_id.clone().map_or_else(ChildTransactionId::new, Into::into);
322
323        Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
324
325        let mut item_types = Vec::with_capacity(gallery.len());
326        let mut item_queue_infos = Vec::with_capacity(gallery.len());
327        let mut media_handles = Vec::with_capacity(gallery.len());
328
329        for item_info in gallery.items {
330            let GalleryItemInfo { filename, content_type, data, .. } = item_info;
331
332            let upload_file_txn = TransactionId::new();
333
334            debug!(filename, %content_type, %upload_file_txn, "uploading a gallery attachment");
335
336            let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
337
338            let MediaCacheResult {
339                upload_thumbnail_txn,
340                event_thumbnail_info,
341                queue_thumbnail_info,
342            } = RoomSendQueue::cache_media(&room, data, item_info.thumbnail, &file_media_request)
343                .await?;
344
345            item_types.push(Room::make_gallery_item_type(
346                &content_type,
347                filename,
348                file_media_request.source.clone(),
349                item_info.caption,
350                item_info.formatted_caption,
351                Some(item_info.attachment_info),
352                event_thumbnail_info,
353            ));
354
355            item_queue_infos.push(GalleryItemQueueInfo {
356                content_type,
357                upload_file_txn: upload_file_txn.clone(),
358                file_media_request,
359                thumbnail: queue_thumbnail_info,
360            });
361
362            media_handles.push(MediaHandles { upload_file_txn, upload_thumbnail_txn });
363        }
364
365        // Create the content for the gallery event.
366        let event_content = room
367            .make_media_event(
368                MessageType::Gallery(GalleryMessageEventContent::new(
369                    gallery.caption.unwrap_or_default(),
370                    gallery.formatted_caption,
371                    item_types,
372                )),
373                gallery.mentions,
374                gallery.reply,
375            )
376            .await
377            .map_err(|_| RoomSendQueueError::FailedToCreateGallery)?;
378
379        let created_at = MilliSecondsSinceUnixEpoch::now();
380
381        // Save requests in the queue storage.
382        self.inner
383            .queue
384            .push_gallery(
385                event_content.clone(),
386                send_event_txn.clone().into(),
387                created_at,
388                item_queue_infos,
389            )
390            .await?;
391
392        trace!("manager sends a gallery to the background task");
393
394        self.inner.notifier.notify_one();
395
396        let send_handle = SendHandle {
397            room: self.clone(),
398            transaction_id: send_event_txn.clone().into(),
399            media_handles,
400            created_at,
401        };
402
403        let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
404            transaction_id: send_event_txn.clone().into(),
405            content: LocalEchoContent::Event {
406                serialized_event: SerializableEventContent::new(&event_content.into())
407                    .map_err(RoomSendQueueStorageError::JsonSerialization)?,
408                send_handle: send_handle.clone(),
409                send_error: None,
410            },
411        }));
412
413        Ok(send_handle)
414    }
415
416    async fn cache_media(
417        room: &Room,
418        data: Vec<u8>,
419        thumbnail: Option<Thumbnail>,
420        file_media_request: &MediaRequestParameters,
421    ) -> Result<MediaCacheResult, RoomSendQueueError> {
422        let client = room.client();
423        let cache_store = client
424            .event_cache_store()
425            .lock()
426            .await
427            .map_err(RoomSendQueueStorageError::LockError)?;
428
429        // Cache the file itself in the cache store.
430        cache_store
431            .add_media_content(
432                file_media_request,
433                data,
434                // Make sure that the file is stored until it has been uploaded.
435                IgnoreMediaRetentionPolicy::Yes,
436            )
437            .await
438            .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
439
440        // Process the thumbnail, if it's been provided.
441        if let Some(thumbnail) = thumbnail {
442            let txn = TransactionId::new();
443            trace!(upload_thumbnail_txn = %txn, "media has a thumbnail");
444
445            // Create the information required for filling the thumbnail section of the
446            // event.
447            let (data, content_type, thumbnail_info) = thumbnail.into_parts();
448
449            // Cache thumbnail in the cache store.
450            let thumbnail_media_request = Media::make_local_file_media_request(&txn);
451            cache_store
452                .add_media_content(
453                    &thumbnail_media_request,
454                    data,
455                    // Make sure that the thumbnail is stored until it has been uploaded.
456                    IgnoreMediaRetentionPolicy::Yes,
457                )
458                .await
459                .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
460
461            Ok(MediaCacheResult {
462                upload_thumbnail_txn: Some(txn.clone()),
463                event_thumbnail_info: Some((
464                    thumbnail_media_request.source.clone(),
465                    thumbnail_info,
466                )),
467                queue_thumbnail_info: Some((
468                    FinishUploadThumbnailInfo { txn, width: None, height: None },
469                    thumbnail_media_request,
470                    content_type,
471                )),
472            })
473        } else {
474            Ok(Default::default())
475        }
476    }
477}
478
479impl QueueStorage {
480    /// Consumes a finished upload and queues sending of the final media event.
481    #[allow(clippy::too_many_arguments)]
482    pub(super) async fn handle_dependent_finish_upload(
483        &self,
484        client: &Client,
485        event_txn: OwnedTransactionId,
486        parent_key: SentRequestKey,
487        mut local_echo: RoomMessageEventContent,
488        file_upload_txn: OwnedTransactionId,
489        thumbnail_info: Option<FinishUploadThumbnailInfo>,
490        new_updates: &mut Vec<RoomSendQueueUpdate>,
491    ) -> Result<(), RoomSendQueueError> {
492        // Both uploads are ready: enqueue the event with its final data.
493        let sent_media = parent_key
494            .into_media()
495            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
496
497        update_media_cache_keys_after_upload(client, &file_upload_txn, thumbnail_info, &sent_media)
498            .await?;
499        update_media_event_after_upload(&mut local_echo, sent_media);
500
501        let new_content = SerializableEventContent::new(&local_echo.into())
502            .map_err(RoomSendQueueStorageError::JsonSerialization)?;
503
504        // Indicates observers that the upload finished, by editing the local echo for
505        // the event into its final form before sending.
506        new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
507            transaction_id: event_txn.clone(),
508            new_content: new_content.clone(),
509        });
510
511        trace!(%event_txn, "queueing media event after successfully uploading media(s)");
512
513        client
514            .state_store()
515            .save_send_queue_request(
516                &self.room_id,
517                event_txn,
518                MilliSecondsSinceUnixEpoch::now(),
519                new_content.into(),
520                Self::HIGH_PRIORITY,
521            )
522            .await
523            .map_err(RoomSendQueueStorageError::StateStoreError)?;
524
525        Ok(())
526    }
527
528    /// Consumes a finished gallery upload and queues sending of the final
529    /// gallery event.
530    #[cfg(feature = "unstable-msc4274")]
531    #[allow(clippy::too_many_arguments)]
532    pub(super) async fn handle_dependent_finish_gallery_upload(
533        &self,
534        client: &Client,
535        event_txn: OwnedTransactionId,
536        parent_key: SentRequestKey,
537        mut local_echo: RoomMessageEventContent,
538        item_infos: Vec<FinishGalleryItemInfo>,
539        new_updates: &mut Vec<RoomSendQueueUpdate>,
540    ) -> Result<(), RoomSendQueueError> {
541        // All uploads are ready: enqueue the event with its final data.
542        let sent_gallery = parent_key
543            .into_media()
544            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
545
546        let mut sent_media_vec = sent_gallery.accumulated;
547        sent_media_vec.push(AccumulatedSentMediaInfo {
548            file: sent_gallery.file,
549            thumbnail: sent_gallery.thumbnail,
550        });
551
552        let mut sent_infos = HashMap::new();
553
554        for (item_info, sent_media) in zip(item_infos, sent_media_vec) {
555            let FinishGalleryItemInfo { file_upload: file_upload_txn, thumbnail_info } = item_info;
556
557            // Store the sent media under the original cache key for later insertion into
558            // the local echo.
559            let from_req = Media::make_local_file_media_request(&file_upload_txn);
560            sent_infos.insert(from_req.source.unique_key(), sent_media.clone());
561
562            update_media_cache_keys_after_upload(
563                client,
564                &file_upload_txn,
565                thumbnail_info,
566                &sent_media.into(),
567            )
568            .await?;
569        }
570
571        update_gallery_event_after_upload(&mut local_echo, sent_infos);
572
573        let new_content = SerializableEventContent::new(&local_echo.into())
574            .map_err(RoomSendQueueStorageError::JsonSerialization)?;
575
576        // Indicates observers that the upload finished, by editing the local echo for
577        // the event into its final form before sending.
578        new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
579            transaction_id: event_txn.clone(),
580            new_content: new_content.clone(),
581        });
582
583        trace!(%event_txn, "queueing media event after successfully uploading media(s)");
584
585        client
586            .state_store()
587            .save_send_queue_request(
588                &self.room_id,
589                event_txn,
590                MilliSecondsSinceUnixEpoch::now(),
591                new_content.into(),
592                Self::HIGH_PRIORITY,
593            )
594            .await
595            .map_err(RoomSendQueueStorageError::StateStoreError)?;
596
597        Ok(())
598    }
599
600    /// Consumes a finished file or thumbnail upload and queues the dependent
601    /// file or thumbnail upload.
602    #[allow(clippy::too_many_arguments)]
603    pub(super) async fn handle_dependent_file_or_thumbnail_upload(
604        &self,
605        client: &Client,
606        next_upload_txn: OwnedTransactionId,
607        parent_key: SentRequestKey,
608        content_type: String,
609        cache_key: MediaRequestParameters,
610        event_txn: OwnedTransactionId,
611        parent_is_thumbnail_upload: bool,
612    ) -> Result<(), RoomSendQueueError> {
613        // The previous file or thumbnail has been sent, now transform the dependent
614        // file or thumbnail upload request into a ready one.
615        let sent_media = parent_key
616            .into_media()
617            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
618
619        // If the previous upload was a thumbnail, it shouldn't have
620        // a thumbnail itself.
621        if parent_is_thumbnail_upload {
622            debug_assert!(sent_media.thumbnail.is_none());
623            if sent_media.thumbnail.is_some() {
624                warn!("unexpected thumbnail for a thumbnail!");
625            }
626        }
627
628        trace!(
629            related_to = %event_txn,
630            "done uploading file or thumbnail, now queuing the dependent file \
631             or thumbnail upload request",
632        );
633
634        // If the parent request was a thumbnail upload, don't add it to the list of
635        // accumulated medias yet because its dependent file upload is still
636        // pending. If the parent request was a file upload, we know that both
637        // the file and its thumbnail (if any) have finished uploading and we
638        // can add them to the accumulated sent media.
639        #[cfg(feature = "unstable-msc4274")]
640        let accumulated = if parent_is_thumbnail_upload {
641            sent_media.accumulated
642        } else {
643            let mut accumulated = sent_media.accumulated;
644            accumulated.push(AccumulatedSentMediaInfo {
645                file: sent_media.file.clone(),
646                thumbnail: sent_media.thumbnail,
647            });
648            accumulated
649        };
650
651        let request = QueuedRequestKind::MediaUpload {
652            content_type,
653            cache_key,
654            // If the previous upload was a thumbnail, it becomes the thumbnail source for the next
655            // upload.
656            thumbnail_source: parent_is_thumbnail_upload.then_some(sent_media.file),
657            related_to: event_txn,
658            #[cfg(feature = "unstable-msc4274")]
659            accumulated,
660        };
661
662        client
663            .state_store()
664            .save_send_queue_request(
665                &self.room_id,
666                next_upload_txn,
667                MilliSecondsSinceUnixEpoch::now(),
668                request,
669                Self::HIGH_PRIORITY,
670            )
671            .await
672            .map_err(RoomSendQueueStorageError::StateStoreError)?;
673
674        Ok(())
675    }
676
677    /// Try to abort an upload that would be ongoing.
678    ///
679    /// Return true if any media (media itself or its thumbnail) was being
680    /// uploaded. In this case, the media event has also been removed from
681    /// the send queue. If it returns false, then the uploads already
682    /// happened, and the event sending *may* have started.
683    #[instrument(skip(self, handles))]
684    pub(super) async fn abort_upload(
685        &self,
686        event_txn: &TransactionId,
687        handles: &MediaHandles,
688    ) -> Result<bool, RoomSendQueueStorageError> {
689        let mut guard = self.store.lock().await;
690        let client = guard.client()?;
691
692        // Keep the lock until we're done touching the storage.
693        debug!("trying to abort an upload");
694
695        let store = client.state_store();
696
697        let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
698        let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
699
700        let mut removed_dependent_upload = false;
701        let mut removed_dependent_event = false;
702
703        if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn {
704            if store.remove_send_queue_request(&self.room_id, thumbnail_txn).await? {
705                // The thumbnail upload existed as a request: either it was pending (something
706                // else was being sent), or it was actively being sent.
707                trace!("could remove thumbnail request, removing 2 dependent requests now");
708
709                // 1. Try to abort sending using the being_sent info, in case it was active.
710                if let Some(info) = guard.being_sent.as_ref() {
711                    if info.transaction_id == *thumbnail_txn {
712                        // SAFETY: we knew it was Some(), two lines above.
713                        let info = guard.being_sent.take().unwrap();
714                        if info.cancel_upload() {
715                            trace!("aborted ongoing thumbnail upload");
716                        }
717                    }
718                }
719
720                // 2. Remove the dependent requests.
721                removed_dependent_upload = store
722                    .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
723                    .await?;
724
725                if !removed_dependent_upload {
726                    warn!("unable to find the dependent file upload request");
727                }
728
729                removed_dependent_event = store
730                    .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
731                    .await?;
732
733                if !removed_dependent_event {
734                    warn!("unable to find the dependent media event upload request");
735                }
736            }
737        }
738
739        // If we're here:
740        // - either there was no thumbnail to upload,
741        // - or the thumbnail request has terminated already.
742        //
743        // So the next target is the upload request itself, in both cases.
744
745        if !removed_dependent_upload {
746            if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
747                // The upload existed as a request: either it was pending (something else was
748                // being sent), or it was actively being sent.
749                trace!("could remove file upload request, removing 1 dependent request");
750
751                // 1. Try to abort sending using the being_sent info, in case it was active.
752                if let Some(info) = guard.being_sent.as_ref() {
753                    if info.transaction_id == handles.upload_file_txn {
754                        // SAFETY: we knew it was Some(), two lines above.
755                        let info = guard.being_sent.take().unwrap();
756                        if info.cancel_upload() {
757                            trace!("aborted ongoing file upload");
758                        }
759                    }
760                }
761
762                // 2. Remove the dependent request.
763                if !store
764                    .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
765                    .await?
766                {
767                    warn!("unable to find the dependent media event upload request");
768                }
769            } else {
770                // The upload was not in the send queue, so it's completed.
771                //
772                // It means the event sending is either still queued as a dependent request, or
773                // it's graduated into a request.
774                if !removed_dependent_event
775                    && !store
776                        .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
777                        .await?
778                {
779                    // The media event has been promoted into a request, or the promoted request
780                    // has been sent already: we couldn't abort, let the caller decide what to do.
781                    debug!("uploads already happened => deferring to aborting an event sending");
782                    return Ok(false);
783                }
784            }
785        }
786
787        // At this point, all the requests and dependent requests have been cleaned up.
788        // Perform the final step: empty the cache from the local items.
789        {
790            let event_cache = client.event_cache_store().lock().await?;
791            event_cache
792                .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
793                .await?;
794            if let Some(txn) = &handles.upload_thumbnail_txn {
795                event_cache.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
796            }
797        }
798
799        debug!("successfully aborted!");
800        Ok(true)
801    }
802
803    #[instrument(skip(self, caption, formatted_caption))]
804    pub(super) async fn edit_media_caption(
805        &self,
806        txn: &TransactionId,
807        caption: Option<String>,
808        formatted_caption: Option<FormattedBody>,
809        mentions: Option<Mentions>,
810    ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
811        // This error will be popular here.
812        use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
813
814        let guard = self.store.lock().await;
815        let client = guard.client()?;
816        let store = client.state_store();
817
818        // The media event can be in one of three states:
819        // - still stored as a dependent request,
820        // - stored as a queued request, active (aka it's being sent).
821        // - stored as a queued request, not active yet (aka it's not being sent yet),
822        //
823        // We'll handle each of these cases one by one.
824
825        {
826            // If the event can be found as a dependent event, update the captions, save it
827            // back into the database, and return early.
828            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
829
830            if let Some(found) =
831                dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
832            {
833                trace!("found the caption to edit in a dependent request");
834
835                let DependentQueuedRequestKind::FinishUpload {
836                    mut local_echo,
837                    file_upload,
838                    thumbnail_info,
839                } = found.kind
840                else {
841                    return Err(InvalidMediaCaptionEdit);
842                };
843
844                if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
845                    return Err(InvalidMediaCaptionEdit);
846                }
847
848                let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
849                    local_echo: local_echo.clone(),
850                    file_upload,
851                    thumbnail_info,
852                };
853                store
854                    .update_dependent_queued_request(
855                        &self.room_id,
856                        &found.own_transaction_id,
857                        new_dependent_request,
858                    )
859                    .await?;
860
861                trace!("caption successfully updated");
862                return Ok(Some((*local_echo).into()));
863            }
864        }
865
866        let requests = store.load_send_queue_requests(&self.room_id).await?;
867        let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
868            // Couldn't be found anymore, it's not possible to update captions.
869            return Ok(None);
870        };
871
872        trace!("found the caption to edit as a request");
873
874        let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
875            return Err(InvalidMediaCaptionEdit);
876        };
877
878        let deserialized = serialized_content.deserialize()?;
879        let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
880            return Err(InvalidMediaCaptionEdit);
881        };
882
883        if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
884            return Err(InvalidMediaCaptionEdit);
885        }
886
887        let any_content: AnyMessageLikeEventContent = content.into();
888        let new_serialized = SerializableEventContent::new(&any_content.clone())?;
889
890        // If the request is active (being sent), send a dependent request.
891        if let Some(being_sent) = guard.being_sent.as_ref() {
892            if being_sent.transaction_id == *txn {
893                // Record a dependent request to edit, and exit.
894                store
895                    .save_dependent_queued_request(
896                        &self.room_id,
897                        txn,
898                        ChildTransactionId::new(),
899                        MilliSecondsSinceUnixEpoch::now(),
900                        DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
901                    )
902                    .await?;
903
904                trace!("media event was being sent, pushed a dependent edit");
905                return Ok(Some(any_content));
906            }
907        }
908
909        // The request is not active: edit the local echo.
910        store
911            .update_send_queue_request(
912                &self.room_id,
913                txn,
914                QueuedRequestKind::Event { content: new_serialized },
915            )
916            .await?;
917
918        trace!("media event was not being sent, updated local echo");
919        Ok(Some(any_content))
920    }
921}
922
923/// Update cache keys in the cache store after uploading a media file /
924/// thumbnail.
925async fn update_media_cache_keys_after_upload(
926    client: &Client,
927    file_upload_txn: &OwnedTransactionId,
928    thumbnail_info: Option<FinishUploadThumbnailInfo>,
929    sent_media: &SentMediaInfo,
930) -> Result<(), RoomSendQueueError> {
931    // Do it for the file itself.
932    let from_req = Media::make_local_file_media_request(file_upload_txn);
933
934    trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
935    let cache_store =
936        client.event_cache_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
937
938    // The media can now be removed during cleanups.
939    cache_store
940        .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
941        .await
942        .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
943
944    cache_store
945        .replace_media_key(
946            &from_req,
947            &MediaRequestParameters { source: sent_media.file.clone(), format: MediaFormat::File },
948        )
949        .await
950        .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
951
952    // Rename the thumbnail too, if needs be.
953    if let Some((info, new_source)) = thumbnail_info.as_ref().zip(sent_media.thumbnail.clone()) {
954        // Previously the media request used `MediaFormat::Thumbnail`. Handle this case
955        // for send queue requests that were in the state store before the change.
956        let from_req = if let Some((height, width)) = info.height.zip(info.width) {
957            Media::make_local_thumbnail_media_request(&info.txn, height, width)
958        } else {
959            Media::make_local_file_media_request(&info.txn)
960        };
961
962        trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
963
964        // The media can now be removed during cleanups.
965        cache_store
966            .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
967            .await
968            .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
969
970        cache_store
971            .replace_media_key(
972                &from_req,
973                &MediaRequestParameters { source: new_source, format: MediaFormat::File },
974            )
975            .await
976            .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
977    }
978
979    Ok(())
980}