1#[cfg(feature = "unstable-msc4274")]
18use std::{collections::HashMap, iter::zip};
19
20use matrix_sdk_base::{
21 event_cache::store::media::IgnoreMediaRetentionPolicy,
22 media::{MediaFormat, MediaRequestParameters},
23 store::{
24 ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
25 QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
26 },
27 RoomState,
28};
29#[cfg(feature = "unstable-msc4274")]
30use matrix_sdk_base::{
31 media::UniqueKey,
32 store::{AccumulatedSentMediaInfo, FinishGalleryItemInfo},
33};
34use mime::Mime;
35#[cfg(feature = "unstable-msc4274")]
36use ruma::events::room::message::{GalleryItemType, GalleryMessageEventContent};
37use ruma::{
38 events::{
39 room::{
40 message::{FormattedBody, MessageType, RoomMessageEventContent},
41 MediaSource, ThumbnailInfo,
42 },
43 AnyMessageLikeEventContent, Mentions,
44 },
45 MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
46};
47use tracing::{debug, error, instrument, trace, warn, Span};
48
49use super::{QueueStorage, RoomSendQueue, RoomSendQueueError};
50use crate::{
51 attachment::{AttachmentConfig, Thumbnail},
52 room::edit::update_media_caption,
53 send_queue::{
54 LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
55 SendHandle,
56 },
57 Client, Media, Room,
58};
59#[cfg(feature = "unstable-msc4274")]
60use crate::{
61 attachment::{GalleryConfig, GalleryItemInfo},
62 send_queue::GalleryItemQueueInfo,
63};
64
65fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
68 match &mut echo.msgtype {
71 MessageType::Audio(event) => {
72 event.source = sent.file;
73 }
74 MessageType::File(event) => {
75 event.source = sent.file;
76 if let Some(info) = event.info.as_mut() {
77 info.thumbnail_source = sent.thumbnail;
78 }
79 }
80 MessageType::Image(event) => {
81 event.source = sent.file;
82 if let Some(info) = event.info.as_mut() {
83 info.thumbnail_source = sent.thumbnail;
84 }
85 }
86 MessageType::Video(event) => {
87 event.source = sent.file;
88 if let Some(info) = event.info.as_mut() {
89 info.thumbnail_source = sent.thumbnail;
90 }
91 }
92
93 _ => {
94 error!("Invalid message type in database: {}", echo.msgtype());
98 debug_assert!(false, "invalid message type in database");
100 }
101 }
102}
103
104#[cfg(feature = "unstable-msc4274")]
107fn update_gallery_event_after_upload(
108 echo: &mut RoomMessageEventContent,
109 sent: HashMap<String, AccumulatedSentMediaInfo>,
110) {
111 let MessageType::Gallery(gallery) = &mut echo.msgtype else {
112 error!("Invalid gallery item types in database");
116 debug_assert!(false, "invalid item type in database {:?}", echo.msgtype());
118 return;
119 };
120
121 for itemtype in gallery.itemtypes.iter_mut() {
124 match itemtype {
125 GalleryItemType::Audio(event) => match sent.get(&event.source.unique_key()) {
126 Some(sent) => event.source = sent.file.clone(),
127 None => error!("key for item {:?} does not exist on gallery event", event.source),
128 },
129 GalleryItemType::File(event) => match sent.get(&event.source.unique_key()) {
130 Some(sent) => {
131 event.source = sent.file.clone();
132 if let Some(info) = event.info.as_mut() {
133 info.thumbnail_source = sent.thumbnail.clone();
134 }
135 }
136 None => error!("key for item {:?} does not exist on gallery event", event.source),
137 },
138 GalleryItemType::Image(event) => match sent.get(&event.source.unique_key()) {
139 Some(sent) => {
140 event.source = sent.file.clone();
141 if let Some(info) = event.info.as_mut() {
142 info.thumbnail_source = sent.thumbnail.clone();
143 }
144 }
145 None => error!("key for item {:?} does not exist on gallery event", event.source),
146 },
147 GalleryItemType::Video(event) => match sent.get(&event.source.unique_key()) {
148 Some(sent) => {
149 event.source = sent.file.clone();
150 if let Some(info) = event.info.as_mut() {
151 info.thumbnail_source = sent.thumbnail.clone();
152 }
153 }
154 None => error!("key for item {:?} does not exist on gallery event", event.source),
155 },
156
157 _ => {
158 error!("Invalid gallery item types in database");
162 debug_assert!(false, "invalid gallery item type in database {itemtype:?}");
164 }
165 }
166 }
167}
168
169#[derive(Default)]
170struct MediaCacheResult {
171 upload_thumbnail_txn: Option<OwnedTransactionId>,
172 event_thumbnail_info: Option<(MediaSource, Box<ThumbnailInfo>)>,
173 queue_thumbnail_info: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
174}
175
176impl RoomSendQueue {
177 #[instrument(skip_all, fields(event_txn))]
197 pub async fn send_attachment(
198 &self,
199 filename: impl Into<String>,
200 content_type: Mime,
201 data: Vec<u8>,
202 mut config: AttachmentConfig,
203 ) -> Result<SendHandle, RoomSendQueueError> {
204 let Some(room) = self.inner.room.get() else {
205 return Err(RoomSendQueueError::RoomDisappeared);
206 };
207
208 if room.state() != RoomState::Joined {
209 return Err(RoomSendQueueError::RoomNotJoined);
210 }
211
212 let filename = filename.into();
213 let upload_file_txn = TransactionId::new();
214 let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
215
216 Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
217 debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
218
219 let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
220
221 let MediaCacheResult { upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info } =
222 RoomSendQueue::cache_media(&room, data, config.thumbnail.take(), &file_media_request)
223 .await?;
224
225 let event_content = room
227 .make_media_event(
228 Room::make_attachment_type(
229 &content_type,
230 filename,
231 file_media_request.source.clone(),
232 config.caption,
233 config.formatted_caption,
234 config.info,
235 event_thumbnail_info,
236 ),
237 config.mentions,
238 config.reply,
239 )
240 .await
241 .map_err(|_| RoomSendQueueError::FailedToCreateAttachment)?;
242
243 let created_at = MilliSecondsSinceUnixEpoch::now();
244
245 self.inner
247 .queue
248 .push_media(
249 event_content.clone(),
250 content_type,
251 send_event_txn.clone().into(),
252 created_at,
253 upload_file_txn.clone(),
254 file_media_request,
255 queue_thumbnail_info,
256 )
257 .await?;
258
259 trace!("manager sends a media to the background task");
260
261 self.inner.notifier.notify_one();
262
263 let send_handle = SendHandle {
264 room: self.clone(),
265 transaction_id: send_event_txn.clone().into(),
266 media_handles: vec![MediaHandles { upload_thumbnail_txn, upload_file_txn }],
267 created_at,
268 };
269
270 let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
271 transaction_id: send_event_txn.clone().into(),
272 content: LocalEchoContent::Event {
273 serialized_event: SerializableEventContent::new(&event_content.into())
274 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
275 send_handle: send_handle.clone(),
276 send_error: None,
277 },
278 }));
279
280 Ok(send_handle)
281 }
282
283 #[cfg(feature = "unstable-msc4274")]
303 #[instrument(skip_all, fields(event_txn))]
304 pub async fn send_gallery(
305 &self,
306 gallery: GalleryConfig,
307 ) -> Result<SendHandle, RoomSendQueueError> {
308 let Some(room) = self.inner.room.get() else {
309 return Err(RoomSendQueueError::RoomDisappeared);
310 };
311
312 if room.state() != RoomState::Joined {
313 return Err(RoomSendQueueError::RoomNotJoined);
314 }
315
316 if gallery.is_empty() {
317 return Err(RoomSendQueueError::EmptyGallery);
318 }
319
320 let send_event_txn =
321 gallery.txn_id.clone().map_or_else(ChildTransactionId::new, Into::into);
322
323 Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
324
325 let mut item_types = Vec::with_capacity(gallery.len());
326 let mut item_queue_infos = Vec::with_capacity(gallery.len());
327 let mut media_handles = Vec::with_capacity(gallery.len());
328
329 for item_info in gallery.items {
330 let GalleryItemInfo { filename, content_type, data, .. } = item_info;
331
332 let upload_file_txn = TransactionId::new();
333
334 debug!(filename, %content_type, %upload_file_txn, "uploading a gallery attachment");
335
336 let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
337
338 let MediaCacheResult {
339 upload_thumbnail_txn,
340 event_thumbnail_info,
341 queue_thumbnail_info,
342 } = RoomSendQueue::cache_media(&room, data, item_info.thumbnail, &file_media_request)
343 .await?;
344
345 item_types.push(Room::make_gallery_item_type(
346 &content_type,
347 filename,
348 file_media_request.source.clone(),
349 item_info.caption,
350 item_info.formatted_caption,
351 Some(item_info.attachment_info),
352 event_thumbnail_info,
353 ));
354
355 item_queue_infos.push(GalleryItemQueueInfo {
356 content_type,
357 upload_file_txn: upload_file_txn.clone(),
358 file_media_request,
359 thumbnail: queue_thumbnail_info,
360 });
361
362 media_handles.push(MediaHandles { upload_file_txn, upload_thumbnail_txn });
363 }
364
365 let event_content = room
367 .make_media_event(
368 MessageType::Gallery(GalleryMessageEventContent::new(
369 gallery.caption.unwrap_or_default(),
370 gallery.formatted_caption,
371 item_types,
372 )),
373 gallery.mentions,
374 gallery.reply,
375 )
376 .await
377 .map_err(|_| RoomSendQueueError::FailedToCreateGallery)?;
378
379 let created_at = MilliSecondsSinceUnixEpoch::now();
380
381 self.inner
383 .queue
384 .push_gallery(
385 event_content.clone(),
386 send_event_txn.clone().into(),
387 created_at,
388 item_queue_infos,
389 )
390 .await?;
391
392 trace!("manager sends a gallery to the background task");
393
394 self.inner.notifier.notify_one();
395
396 let send_handle = SendHandle {
397 room: self.clone(),
398 transaction_id: send_event_txn.clone().into(),
399 media_handles,
400 created_at,
401 };
402
403 let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
404 transaction_id: send_event_txn.clone().into(),
405 content: LocalEchoContent::Event {
406 serialized_event: SerializableEventContent::new(&event_content.into())
407 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
408 send_handle: send_handle.clone(),
409 send_error: None,
410 },
411 }));
412
413 Ok(send_handle)
414 }
415
416 async fn cache_media(
417 room: &Room,
418 data: Vec<u8>,
419 thumbnail: Option<Thumbnail>,
420 file_media_request: &MediaRequestParameters,
421 ) -> Result<MediaCacheResult, RoomSendQueueError> {
422 let client = room.client();
423 let cache_store = client
424 .event_cache_store()
425 .lock()
426 .await
427 .map_err(RoomSendQueueStorageError::LockError)?;
428
429 cache_store
431 .add_media_content(
432 file_media_request,
433 data,
434 IgnoreMediaRetentionPolicy::Yes,
436 )
437 .await
438 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
439
440 if let Some(thumbnail) = thumbnail {
442 let txn = TransactionId::new();
443 trace!(upload_thumbnail_txn = %txn, "media has a thumbnail");
444
445 let (data, content_type, thumbnail_info) = thumbnail.into_parts();
448
449 let thumbnail_media_request = Media::make_local_file_media_request(&txn);
451 cache_store
452 .add_media_content(
453 &thumbnail_media_request,
454 data,
455 IgnoreMediaRetentionPolicy::Yes,
457 )
458 .await
459 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
460
461 Ok(MediaCacheResult {
462 upload_thumbnail_txn: Some(txn.clone()),
463 event_thumbnail_info: Some((
464 thumbnail_media_request.source.clone(),
465 thumbnail_info,
466 )),
467 queue_thumbnail_info: Some((
468 FinishUploadThumbnailInfo { txn, width: None, height: None },
469 thumbnail_media_request,
470 content_type,
471 )),
472 })
473 } else {
474 Ok(Default::default())
475 }
476 }
477}
478
479impl QueueStorage {
480 #[allow(clippy::too_many_arguments)]
482 pub(super) async fn handle_dependent_finish_upload(
483 &self,
484 client: &Client,
485 event_txn: OwnedTransactionId,
486 parent_key: SentRequestKey,
487 mut local_echo: RoomMessageEventContent,
488 file_upload_txn: OwnedTransactionId,
489 thumbnail_info: Option<FinishUploadThumbnailInfo>,
490 new_updates: &mut Vec<RoomSendQueueUpdate>,
491 ) -> Result<(), RoomSendQueueError> {
492 let sent_media = parent_key
494 .into_media()
495 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
496
497 update_media_cache_keys_after_upload(client, &file_upload_txn, thumbnail_info, &sent_media)
498 .await?;
499 update_media_event_after_upload(&mut local_echo, sent_media);
500
501 let new_content = SerializableEventContent::new(&local_echo.into())
502 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
503
504 new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
507 transaction_id: event_txn.clone(),
508 new_content: new_content.clone(),
509 });
510
511 trace!(%event_txn, "queueing media event after successfully uploading media(s)");
512
513 client
514 .state_store()
515 .save_send_queue_request(
516 &self.room_id,
517 event_txn,
518 MilliSecondsSinceUnixEpoch::now(),
519 new_content.into(),
520 Self::HIGH_PRIORITY,
521 )
522 .await
523 .map_err(RoomSendQueueStorageError::StateStoreError)?;
524
525 Ok(())
526 }
527
528 #[cfg(feature = "unstable-msc4274")]
531 #[allow(clippy::too_many_arguments)]
532 pub(super) async fn handle_dependent_finish_gallery_upload(
533 &self,
534 client: &Client,
535 event_txn: OwnedTransactionId,
536 parent_key: SentRequestKey,
537 mut local_echo: RoomMessageEventContent,
538 item_infos: Vec<FinishGalleryItemInfo>,
539 new_updates: &mut Vec<RoomSendQueueUpdate>,
540 ) -> Result<(), RoomSendQueueError> {
541 let sent_gallery = parent_key
543 .into_media()
544 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
545
546 let mut sent_media_vec = sent_gallery.accumulated;
547 sent_media_vec.push(AccumulatedSentMediaInfo {
548 file: sent_gallery.file,
549 thumbnail: sent_gallery.thumbnail,
550 });
551
552 let mut sent_infos = HashMap::new();
553
554 for (item_info, sent_media) in zip(item_infos, sent_media_vec) {
555 let FinishGalleryItemInfo { file_upload: file_upload_txn, thumbnail_info } = item_info;
556
557 let from_req = Media::make_local_file_media_request(&file_upload_txn);
560 sent_infos.insert(from_req.source.unique_key(), sent_media.clone());
561
562 update_media_cache_keys_after_upload(
563 client,
564 &file_upload_txn,
565 thumbnail_info,
566 &sent_media.into(),
567 )
568 .await?;
569 }
570
571 update_gallery_event_after_upload(&mut local_echo, sent_infos);
572
573 let new_content = SerializableEventContent::new(&local_echo.into())
574 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
575
576 new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
579 transaction_id: event_txn.clone(),
580 new_content: new_content.clone(),
581 });
582
583 trace!(%event_txn, "queueing media event after successfully uploading media(s)");
584
585 client
586 .state_store()
587 .save_send_queue_request(
588 &self.room_id,
589 event_txn,
590 MilliSecondsSinceUnixEpoch::now(),
591 new_content.into(),
592 Self::HIGH_PRIORITY,
593 )
594 .await
595 .map_err(RoomSendQueueStorageError::StateStoreError)?;
596
597 Ok(())
598 }
599
600 #[allow(clippy::too_many_arguments)]
603 pub(super) async fn handle_dependent_file_or_thumbnail_upload(
604 &self,
605 client: &Client,
606 next_upload_txn: OwnedTransactionId,
607 parent_key: SentRequestKey,
608 content_type: String,
609 cache_key: MediaRequestParameters,
610 event_txn: OwnedTransactionId,
611 parent_is_thumbnail_upload: bool,
612 ) -> Result<(), RoomSendQueueError> {
613 let sent_media = parent_key
616 .into_media()
617 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
618
619 if parent_is_thumbnail_upload {
622 debug_assert!(sent_media.thumbnail.is_none());
623 if sent_media.thumbnail.is_some() {
624 warn!("unexpected thumbnail for a thumbnail!");
625 }
626 }
627
628 trace!(
629 related_to = %event_txn,
630 "done uploading file or thumbnail, now queuing the dependent file \
631 or thumbnail upload request",
632 );
633
634 #[cfg(feature = "unstable-msc4274")]
640 let accumulated = if parent_is_thumbnail_upload {
641 sent_media.accumulated
642 } else {
643 let mut accumulated = sent_media.accumulated;
644 accumulated.push(AccumulatedSentMediaInfo {
645 file: sent_media.file.clone(),
646 thumbnail: sent_media.thumbnail,
647 });
648 accumulated
649 };
650
651 let request = QueuedRequestKind::MediaUpload {
652 content_type,
653 cache_key,
654 thumbnail_source: parent_is_thumbnail_upload.then_some(sent_media.file),
657 related_to: event_txn,
658 #[cfg(feature = "unstable-msc4274")]
659 accumulated,
660 };
661
662 client
663 .state_store()
664 .save_send_queue_request(
665 &self.room_id,
666 next_upload_txn,
667 MilliSecondsSinceUnixEpoch::now(),
668 request,
669 Self::HIGH_PRIORITY,
670 )
671 .await
672 .map_err(RoomSendQueueStorageError::StateStoreError)?;
673
674 Ok(())
675 }
676
677 #[instrument(skip(self, handles))]
684 pub(super) async fn abort_upload(
685 &self,
686 event_txn: &TransactionId,
687 handles: &MediaHandles,
688 ) -> Result<bool, RoomSendQueueStorageError> {
689 let mut guard = self.store.lock().await;
690 let client = guard.client()?;
691
692 debug!("trying to abort an upload");
694
695 let store = client.state_store();
696
697 let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
698 let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
699
700 let mut removed_dependent_upload = false;
701 let mut removed_dependent_event = false;
702
703 if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn {
704 if store.remove_send_queue_request(&self.room_id, thumbnail_txn).await? {
705 trace!("could remove thumbnail request, removing 2 dependent requests now");
708
709 if let Some(info) = guard.being_sent.as_ref() {
711 if info.transaction_id == *thumbnail_txn {
712 let info = guard.being_sent.take().unwrap();
714 if info.cancel_upload() {
715 trace!("aborted ongoing thumbnail upload");
716 }
717 }
718 }
719
720 removed_dependent_upload = store
722 .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
723 .await?;
724
725 if !removed_dependent_upload {
726 warn!("unable to find the dependent file upload request");
727 }
728
729 removed_dependent_event = store
730 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
731 .await?;
732
733 if !removed_dependent_event {
734 warn!("unable to find the dependent media event upload request");
735 }
736 }
737 }
738
739 if !removed_dependent_upload {
746 if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
747 trace!("could remove file upload request, removing 1 dependent request");
750
751 if let Some(info) = guard.being_sent.as_ref() {
753 if info.transaction_id == handles.upload_file_txn {
754 let info = guard.being_sent.take().unwrap();
756 if info.cancel_upload() {
757 trace!("aborted ongoing file upload");
758 }
759 }
760 }
761
762 if !store
764 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
765 .await?
766 {
767 warn!("unable to find the dependent media event upload request");
768 }
769 } else {
770 if !removed_dependent_event
775 && !store
776 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
777 .await?
778 {
779 debug!("uploads already happened => deferring to aborting an event sending");
782 return Ok(false);
783 }
784 }
785 }
786
787 {
790 let event_cache = client.event_cache_store().lock().await?;
791 event_cache
792 .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
793 .await?;
794 if let Some(txn) = &handles.upload_thumbnail_txn {
795 event_cache.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
796 }
797 }
798
799 debug!("successfully aborted!");
800 Ok(true)
801 }
802
803 #[instrument(skip(self, caption, formatted_caption))]
804 pub(super) async fn edit_media_caption(
805 &self,
806 txn: &TransactionId,
807 caption: Option<String>,
808 formatted_caption: Option<FormattedBody>,
809 mentions: Option<Mentions>,
810 ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
811 use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
813
814 let guard = self.store.lock().await;
815 let client = guard.client()?;
816 let store = client.state_store();
817
818 {
826 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
829
830 if let Some(found) =
831 dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
832 {
833 trace!("found the caption to edit in a dependent request");
834
835 let DependentQueuedRequestKind::FinishUpload {
836 mut local_echo,
837 file_upload,
838 thumbnail_info,
839 } = found.kind
840 else {
841 return Err(InvalidMediaCaptionEdit);
842 };
843
844 if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
845 return Err(InvalidMediaCaptionEdit);
846 }
847
848 let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
849 local_echo: local_echo.clone(),
850 file_upload,
851 thumbnail_info,
852 };
853 store
854 .update_dependent_queued_request(
855 &self.room_id,
856 &found.own_transaction_id,
857 new_dependent_request,
858 )
859 .await?;
860
861 trace!("caption successfully updated");
862 return Ok(Some((*local_echo).into()));
863 }
864 }
865
866 let requests = store.load_send_queue_requests(&self.room_id).await?;
867 let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
868 return Ok(None);
870 };
871
872 trace!("found the caption to edit as a request");
873
874 let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
875 return Err(InvalidMediaCaptionEdit);
876 };
877
878 let deserialized = serialized_content.deserialize()?;
879 let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
880 return Err(InvalidMediaCaptionEdit);
881 };
882
883 if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
884 return Err(InvalidMediaCaptionEdit);
885 }
886
887 let any_content: AnyMessageLikeEventContent = content.into();
888 let new_serialized = SerializableEventContent::new(&any_content.clone())?;
889
890 if let Some(being_sent) = guard.being_sent.as_ref() {
892 if being_sent.transaction_id == *txn {
893 store
895 .save_dependent_queued_request(
896 &self.room_id,
897 txn,
898 ChildTransactionId::new(),
899 MilliSecondsSinceUnixEpoch::now(),
900 DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
901 )
902 .await?;
903
904 trace!("media event was being sent, pushed a dependent edit");
905 return Ok(Some(any_content));
906 }
907 }
908
909 store
911 .update_send_queue_request(
912 &self.room_id,
913 txn,
914 QueuedRequestKind::Event { content: new_serialized },
915 )
916 .await?;
917
918 trace!("media event was not being sent, updated local echo");
919 Ok(Some(any_content))
920 }
921}
922
923async fn update_media_cache_keys_after_upload(
926 client: &Client,
927 file_upload_txn: &OwnedTransactionId,
928 thumbnail_info: Option<FinishUploadThumbnailInfo>,
929 sent_media: &SentMediaInfo,
930) -> Result<(), RoomSendQueueError> {
931 let from_req = Media::make_local_file_media_request(file_upload_txn);
933
934 trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
935 let cache_store =
936 client.event_cache_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
937
938 cache_store
940 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
941 .await
942 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
943
944 cache_store
945 .replace_media_key(
946 &from_req,
947 &MediaRequestParameters { source: sent_media.file.clone(), format: MediaFormat::File },
948 )
949 .await
950 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
951
952 if let Some((info, new_source)) = thumbnail_info.as_ref().zip(sent_media.thumbnail.clone()) {
954 let from_req = if let Some((height, width)) = info.height.zip(info.width) {
957 Media::make_local_thumbnail_media_request(&info.txn, height, width)
958 } else {
959 Media::make_local_file_media_request(&info.txn)
960 };
961
962 trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
963
964 cache_store
966 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
967 .await
968 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
969
970 cache_store
971 .replace_media_key(
972 &from_req,
973 &MediaRequestParameters { source: new_source, format: MediaFormat::File },
974 )
975 .await
976 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
977 }
978
979 Ok(())
980}