matrix_sdk/send_queue/
upload.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Private implementations of the media upload mechanism.
16
17use matrix_sdk_base::{
18    event_cache::store::media::IgnoreMediaRetentionPolicy,
19    media::{MediaFormat, MediaRequestParameters},
20    store::{
21        ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
22        QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
23    },
24    RoomState,
25};
26use mime::Mime;
27use ruma::{
28    events::{
29        room::message::{FormattedBody, MessageType, RoomMessageEventContent},
30        AnyMessageLikeEventContent, Mentions,
31    },
32    MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
33};
34use tracing::{debug, error, instrument, trace, warn, Span};
35
36use super::{QueueStorage, RoomSendQueue, RoomSendQueueError};
37use crate::{
38    attachment::AttachmentConfig,
39    room::edit::update_media_caption,
40    send_queue::{
41        LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
42        SendHandle,
43    },
44    Client, Media,
45};
46
47/// Replace the source by the final ones in all the media types handled by
48/// [`Room::make_attachment_type()`].
49fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
50    // Some variants look really similar below, but the `event` and `info` are all
51    // different types…
52    match &mut echo.msgtype {
53        MessageType::Audio(event) => {
54            event.source = sent.file;
55        }
56        MessageType::File(event) => {
57            event.source = sent.file;
58            if let Some(info) = event.info.as_mut() {
59                info.thumbnail_source = sent.thumbnail;
60            }
61        }
62        MessageType::Image(event) => {
63            event.source = sent.file;
64            if let Some(info) = event.info.as_mut() {
65                info.thumbnail_source = sent.thumbnail;
66            }
67        }
68        MessageType::Video(event) => {
69            event.source = sent.file;
70            if let Some(info) = event.info.as_mut() {
71                info.thumbnail_source = sent.thumbnail;
72            }
73        }
74
75        _ => {
76            // All `MessageType` created by `Room::make_attachment_type` should be
77            // handled here. The only way to end up here is that a message type has
78            // been tampered with in the database.
79            error!("Invalid message type in database: {}", echo.msgtype());
80            // Only crash debug builds.
81            debug_assert!(false, "invalid message type in database");
82        }
83    }
84}
85
86impl RoomSendQueue {
87    /// Queues an attachment to be sent to the room, using the send queue.
88    ///
89    /// This returns quickly (without sending or uploading anything), and will
90    /// push the event to be sent into a queue, handled in the background.
91    ///
92    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
93    /// the [`Self::subscribe()`] method to get updates about the sending of
94    /// that event.
95    ///
96    /// By default, if sending failed on the first attempt, it will be retried a
97    /// few times. If sending failed after those retries, the entire
98    /// client's sending queue will be disabled, and it will need to be
99    /// manually re-enabled by the caller (e.g. after network is back, or when
100    /// something has been done about the faulty requests).
101    ///
102    /// The attachment and its optional thumbnail are stored in the media cache
103    /// and can be retrieved at any time, by calling
104    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
105    /// in the local or remote echo, and using a `MediaFormat::File`.
106    #[instrument(skip_all, fields(event_txn))]
107    pub async fn send_attachment(
108        &self,
109        filename: impl Into<String>,
110        content_type: Mime,
111        data: Vec<u8>,
112        mut config: AttachmentConfig,
113    ) -> Result<SendHandle, RoomSendQueueError> {
114        let Some(room) = self.inner.room.get() else {
115            return Err(RoomSendQueueError::RoomDisappeared);
116        };
117
118        if room.state() != RoomState::Joined {
119            return Err(RoomSendQueueError::RoomNotJoined);
120        }
121
122        let filename = filename.into();
123        let upload_file_txn = TransactionId::new();
124        let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
125
126        Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
127        debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
128
129        let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
130
131        let (upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info) = {
132            let client = room.client();
133            let cache_store = client
134                .event_cache_store()
135                .lock()
136                .await
137                .map_err(RoomSendQueueStorageError::LockError)?;
138
139            // Cache the file itself in the cache store.
140            cache_store
141                .add_media_content(
142                    &file_media_request,
143                    data.clone(),
144                    // Make sure that the file is stored until it has been uploaded.
145                    IgnoreMediaRetentionPolicy::Yes,
146                )
147                .await
148                .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
149
150            // Process the thumbnail, if it's been provided.
151            if let Some(thumbnail) = config.thumbnail.take() {
152                let txn = TransactionId::new();
153                trace!(upload_thumbnail_txn = %txn, "attachment has a thumbnail");
154
155                // Create the information required for filling the thumbnail section of the
156                // media event.
157                let (data, content_type, thumbnail_info) = thumbnail.into_parts();
158
159                // Cache thumbnail in the cache store.
160                let thumbnail_media_request = Media::make_local_file_media_request(&txn);
161                cache_store
162                    .add_media_content(
163                        &thumbnail_media_request,
164                        data,
165                        // Make sure that the thumbnail is stored until it has been uploaded.
166                        IgnoreMediaRetentionPolicy::Yes,
167                    )
168                    .await
169                    .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
170
171                (
172                    Some(txn.clone()),
173                    Some((thumbnail_media_request.source.clone(), thumbnail_info)),
174                    Some((
175                        FinishUploadThumbnailInfo { txn, width: None, height: None },
176                        thumbnail_media_request,
177                        content_type,
178                    )),
179                )
180            } else {
181                Default::default()
182            }
183        };
184
185        // Create the content for the media event.
186        let event_content = room
187            .make_attachment_event(
188                room.make_attachment_type(
189                    &content_type,
190                    filename,
191                    file_media_request.source.clone(),
192                    config.caption,
193                    config.formatted_caption,
194                    config.info,
195                    event_thumbnail_info,
196                ),
197                config.mentions,
198                config.reply,
199            )
200            .await
201            .map_err(|_| RoomSendQueueError::FailedToCreateAttachment)?;
202
203        let created_at = MilliSecondsSinceUnixEpoch::now();
204
205        // Save requests in the queue storage.
206        self.inner
207            .queue
208            .push_media(
209                event_content.clone(),
210                content_type,
211                send_event_txn.clone().into(),
212                created_at,
213                upload_file_txn.clone(),
214                file_media_request,
215                queue_thumbnail_info,
216            )
217            .await?;
218
219        trace!("manager sends a media to the background task");
220
221        self.inner.notifier.notify_one();
222
223        let send_handle = SendHandle {
224            room: self.clone(),
225            transaction_id: send_event_txn.clone().into(),
226            media_handles: vec![MediaHandles { upload_thumbnail_txn, upload_file_txn }],
227            created_at,
228        };
229
230        let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
231            transaction_id: send_event_txn.clone().into(),
232            content: LocalEchoContent::Event {
233                serialized_event: SerializableEventContent::new(&event_content.into())
234                    .map_err(RoomSendQueueStorageError::JsonSerialization)?,
235                send_handle: send_handle.clone(),
236                send_error: None,
237            },
238        }));
239
240        Ok(send_handle)
241    }
242}
243
244impl QueueStorage {
245    /// Consumes a finished upload and queues sending of the final media event.
246    #[allow(clippy::too_many_arguments)]
247    pub(super) async fn handle_dependent_finish_upload(
248        &self,
249        client: &Client,
250        event_txn: OwnedTransactionId,
251        parent_key: SentRequestKey,
252        mut local_echo: RoomMessageEventContent,
253        file_upload_txn: OwnedTransactionId,
254        thumbnail_info: Option<FinishUploadThumbnailInfo>,
255        new_updates: &mut Vec<RoomSendQueueUpdate>,
256    ) -> Result<(), RoomSendQueueError> {
257        // Both uploads are ready: enqueue the event with its final data.
258        let sent_media = parent_key
259            .into_media()
260            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
261
262        // Update cache keys in the cache store.
263        {
264            // Do it for the file itself.
265            let from_req = Media::make_local_file_media_request(&file_upload_txn);
266
267            trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
268            let cache_store = client
269                .event_cache_store()
270                .lock()
271                .await
272                .map_err(RoomSendQueueStorageError::LockError)?;
273
274            // The media can now be removed during cleanups.
275            cache_store
276                .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
277                .await
278                .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
279
280            cache_store
281                .replace_media_key(
282                    &from_req,
283                    &MediaRequestParameters {
284                        source: sent_media.file.clone(),
285                        format: MediaFormat::File,
286                    },
287                )
288                .await
289                .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
290
291            // Rename the thumbnail too, if needs be.
292            if let Some((info, new_source)) =
293                thumbnail_info.as_ref().zip(sent_media.thumbnail.clone())
294            {
295                // Previously the media request used `MediaFormat::Thumbnail`. Handle this case
296                // for send queue requests that were in the state store before the change.
297                let from_req = if let Some((height, width)) = info.height.zip(info.width) {
298                    Media::make_local_thumbnail_media_request(&info.txn, height, width)
299                } else {
300                    Media::make_local_file_media_request(&info.txn)
301                };
302
303                trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
304
305                // The media can now be removed during cleanups.
306                cache_store
307                    .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
308                    .await
309                    .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
310
311                cache_store
312                    .replace_media_key(
313                        &from_req,
314                        &MediaRequestParameters { source: new_source, format: MediaFormat::File },
315                    )
316                    .await
317                    .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
318            }
319        }
320
321        update_media_event_after_upload(&mut local_echo, sent_media);
322
323        let new_content = SerializableEventContent::new(&local_echo.into())
324            .map_err(RoomSendQueueStorageError::JsonSerialization)?;
325
326        // Indicates observers that the upload finished, by editing the local echo for
327        // the event into its final form before sending.
328        new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
329            transaction_id: event_txn.clone(),
330            new_content: new_content.clone(),
331        });
332
333        trace!(%event_txn, "queueing media event after successfully uploading media(s)");
334
335        client
336            .state_store()
337            .save_send_queue_request(
338                &self.room_id,
339                event_txn,
340                MilliSecondsSinceUnixEpoch::now(),
341                new_content.into(),
342                Self::HIGH_PRIORITY,
343            )
344            .await
345            .map_err(RoomSendQueueStorageError::StateStoreError)?;
346
347        Ok(())
348    }
349
350    /// Consumes a finished upload of a thumbnail and queues the file upload.
351    pub(super) async fn handle_dependent_file_upload_with_thumbnail(
352        &self,
353        client: &Client,
354        next_upload_txn: OwnedTransactionId,
355        parent_key: SentRequestKey,
356        content_type: String,
357        cache_key: MediaRequestParameters,
358        event_txn: OwnedTransactionId,
359    ) -> Result<(), RoomSendQueueError> {
360        // The thumbnail has been sent, now transform the dependent file upload request
361        // into a ready one.
362        let sent_media = parent_key
363            .into_media()
364            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
365
366        // The media we just uploaded was a thumbnail, so the thumbnail shouldn't have
367        // a thumbnail itself.
368        debug_assert!(sent_media.thumbnail.is_none());
369        if sent_media.thumbnail.is_some() {
370            warn!("unexpected thumbnail for a thumbnail!");
371        }
372
373        trace!(related_to = %event_txn, "done uploading thumbnail, now queuing a request to send the media file itself");
374
375        let request = QueuedRequestKind::MediaUpload {
376            content_type,
377            cache_key,
378            // The thumbnail for the next upload is the file we just uploaded here.
379            thumbnail_source: Some(sent_media.file),
380            related_to: event_txn,
381        };
382
383        client
384            .state_store()
385            .save_send_queue_request(
386                &self.room_id,
387                next_upload_txn,
388                MilliSecondsSinceUnixEpoch::now(),
389                request,
390                Self::HIGH_PRIORITY,
391            )
392            .await
393            .map_err(RoomSendQueueStorageError::StateStoreError)?;
394
395        Ok(())
396    }
397
398    /// Try to abort an upload that would be ongoing.
399    ///
400    /// Return true if any media (media itself or its thumbnail) was being
401    /// uploaded. In this case, the media event has also been removed from
402    /// the send queue. If it returns false, then the uploads already
403    /// happened, and the event sending *may* have started.
404    #[instrument(skip(self, handles))]
405    pub(super) async fn abort_upload(
406        &self,
407        event_txn: &TransactionId,
408        handles: &MediaHandles,
409    ) -> Result<bool, RoomSendQueueStorageError> {
410        let mut guard = self.store.lock().await;
411        let client = guard.client()?;
412
413        // Keep the lock until we're done touching the storage.
414        debug!("trying to abort an upload");
415
416        let store = client.state_store();
417
418        let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
419        let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
420
421        let mut removed_dependent_upload = false;
422        let mut removed_dependent_event = false;
423
424        if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn {
425            if store.remove_send_queue_request(&self.room_id, thumbnail_txn).await? {
426                // The thumbnail upload existed as a request: either it was pending (something
427                // else was being sent), or it was actively being sent.
428                trace!("could remove thumbnail request, removing 2 dependent requests now");
429
430                // 1. Try to abort sending using the being_sent info, in case it was active.
431                if let Some(info) = guard.being_sent.as_ref() {
432                    if info.transaction_id == *thumbnail_txn {
433                        // SAFETY: we knew it was Some(), two lines above.
434                        let info = guard.being_sent.take().unwrap();
435                        if info.cancel_upload() {
436                            trace!("aborted ongoing thumbnail upload");
437                        }
438                    }
439                }
440
441                // 2. Remove the dependent requests.
442                removed_dependent_upload = store
443                    .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
444                    .await?;
445
446                if !removed_dependent_upload {
447                    warn!("unable to find the dependent file upload request");
448                }
449
450                removed_dependent_event = store
451                    .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
452                    .await?;
453
454                if !removed_dependent_event {
455                    warn!("unable to find the dependent media event upload request");
456                }
457            }
458        }
459
460        // If we're here:
461        // - either there was no thumbnail to upload,
462        // - or the thumbnail request has terminated already.
463        //
464        // So the next target is the upload request itself, in both cases.
465
466        if !removed_dependent_upload {
467            if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
468                // The upload existed as a request: either it was pending (something else was
469                // being sent), or it was actively being sent.
470                trace!("could remove file upload request, removing 1 dependent request");
471
472                // 1. Try to abort sending using the being_sent info, in case it was active.
473                if let Some(info) = guard.being_sent.as_ref() {
474                    if info.transaction_id == handles.upload_file_txn {
475                        // SAFETY: we knew it was Some(), two lines above.
476                        let info = guard.being_sent.take().unwrap();
477                        if info.cancel_upload() {
478                            trace!("aborted ongoing file upload");
479                        }
480                    }
481                }
482
483                // 2. Remove the dependent request.
484                if !store
485                    .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
486                    .await?
487                {
488                    warn!("unable to find the dependent media event upload request");
489                }
490            } else {
491                // The upload was not in the send queue, so it's completed.
492                //
493                // It means the event sending is either still queued as a dependent request, or
494                // it's graduated into a request.
495                if !removed_dependent_event
496                    && !store
497                        .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
498                        .await?
499                {
500                    // The media event has been promoted into a request, or the promoted request
501                    // has been sent already: we couldn't abort, let the caller decide what to do.
502                    debug!("uploads already happened => deferring to aborting an event sending");
503                    return Ok(false);
504                }
505            }
506        }
507
508        // At this point, all the requests and dependent requests have been cleaned up.
509        // Perform the final step: empty the cache from the local items.
510        {
511            let event_cache = client.event_cache_store().lock().await?;
512            event_cache
513                .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
514                .await?;
515            if let Some(txn) = &handles.upload_thumbnail_txn {
516                event_cache.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
517            }
518        }
519
520        debug!("successfully aborted!");
521        Ok(true)
522    }
523
524    #[instrument(skip(self, caption, formatted_caption))]
525    pub(super) async fn edit_media_caption(
526        &self,
527        txn: &TransactionId,
528        caption: Option<String>,
529        formatted_caption: Option<FormattedBody>,
530        mentions: Option<Mentions>,
531    ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
532        // This error will be popular here.
533        use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
534
535        let guard = self.store.lock().await;
536        let client = guard.client()?;
537        let store = client.state_store();
538
539        // The media event can be in one of three states:
540        // - still stored as a dependent request,
541        // - stored as a queued request, active (aka it's being sent).
542        // - stored as a queued request, not active yet (aka it's not being sent yet),
543        //
544        // We'll handle each of these cases one by one.
545
546        {
547            // If the event can be found as a dependent event, update the captions, save it
548            // back into the database, and return early.
549            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
550
551            if let Some(found) =
552                dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
553            {
554                trace!("found the caption to edit in a dependent request");
555
556                let DependentQueuedRequestKind::FinishUpload {
557                    mut local_echo,
558                    file_upload,
559                    thumbnail_info,
560                } = found.kind
561                else {
562                    return Err(InvalidMediaCaptionEdit);
563                };
564
565                if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
566                    return Err(InvalidMediaCaptionEdit);
567                }
568
569                let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
570                    local_echo: local_echo.clone(),
571                    file_upload,
572                    thumbnail_info,
573                };
574                store
575                    .update_dependent_queued_request(
576                        &self.room_id,
577                        &found.own_transaction_id,
578                        new_dependent_request,
579                    )
580                    .await?;
581
582                trace!("caption successfully updated");
583                return Ok(Some((*local_echo).into()));
584            }
585        }
586
587        let requests = store.load_send_queue_requests(&self.room_id).await?;
588        let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
589            // Couldn't be found anymore, it's not possible to update captions.
590            return Ok(None);
591        };
592
593        trace!("found the caption to edit as a request");
594
595        let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
596            return Err(InvalidMediaCaptionEdit);
597        };
598
599        let deserialized = serialized_content.deserialize()?;
600        let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
601            return Err(InvalidMediaCaptionEdit);
602        };
603
604        if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
605            return Err(InvalidMediaCaptionEdit);
606        }
607
608        let any_content: AnyMessageLikeEventContent = content.into();
609        let new_serialized = SerializableEventContent::new(&any_content.clone())?;
610
611        // If the request is active (being sent), send a dependent request.
612        if let Some(being_sent) = guard.being_sent.as_ref() {
613            if being_sent.transaction_id == *txn {
614                // Record a dependent request to edit, and exit.
615                store
616                    .save_dependent_queued_request(
617                        &self.room_id,
618                        txn,
619                        ChildTransactionId::new(),
620                        MilliSecondsSinceUnixEpoch::now(),
621                        DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
622                    )
623                    .await?;
624
625                trace!("media event was being sent, pushed a dependent edit");
626                return Ok(Some(any_content));
627            }
628        }
629
630        // The request is not active: edit the local echo.
631        store
632            .update_send_queue_request(
633                &self.room_id,
634                txn,
635                QueuedRequestKind::Event { content: new_serialized },
636            )
637            .await?;
638
639        trace!("media event was not being sent, updated local echo");
640        Ok(Some(any_content))
641    }
642}