1use matrix_sdk_base::{
18 event_cache::store::media::IgnoreMediaRetentionPolicy,
19 media::{MediaFormat, MediaRequestParameters},
20 store::{
21 ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
22 QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
23 },
24 RoomState,
25};
26use mime::Mime;
27use ruma::{
28 events::{
29 room::message::{FormattedBody, MessageType, RoomMessageEventContent},
30 AnyMessageLikeEventContent, Mentions,
31 },
32 MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
33};
34use tracing::{debug, error, instrument, trace, warn, Span};
35
36use super::{QueueStorage, RoomSendQueue, RoomSendQueueError};
37use crate::{
38 attachment::AttachmentConfig,
39 room::edit::update_media_caption,
40 send_queue::{
41 LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
42 SendHandle,
43 },
44 Client, Media,
45};
46
47fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
50 match &mut echo.msgtype {
53 MessageType::Audio(event) => {
54 event.source = sent.file;
55 }
56 MessageType::File(event) => {
57 event.source = sent.file;
58 if let Some(info) = event.info.as_mut() {
59 info.thumbnail_source = sent.thumbnail;
60 }
61 }
62 MessageType::Image(event) => {
63 event.source = sent.file;
64 if let Some(info) = event.info.as_mut() {
65 info.thumbnail_source = sent.thumbnail;
66 }
67 }
68 MessageType::Video(event) => {
69 event.source = sent.file;
70 if let Some(info) = event.info.as_mut() {
71 info.thumbnail_source = sent.thumbnail;
72 }
73 }
74
75 _ => {
76 error!("Invalid message type in database: {}", echo.msgtype());
80 debug_assert!(false, "invalid message type in database");
82 }
83 }
84}
85
86impl RoomSendQueue {
87 #[instrument(skip_all, fields(event_txn))]
107 pub async fn send_attachment(
108 &self,
109 filename: impl Into<String>,
110 content_type: Mime,
111 data: Vec<u8>,
112 mut config: AttachmentConfig,
113 ) -> Result<SendHandle, RoomSendQueueError> {
114 let Some(room) = self.inner.room.get() else {
115 return Err(RoomSendQueueError::RoomDisappeared);
116 };
117
118 if room.state() != RoomState::Joined {
119 return Err(RoomSendQueueError::RoomNotJoined);
120 }
121
122 let filename = filename.into();
123 let upload_file_txn = TransactionId::new();
124 let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
125
126 Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
127 debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
128
129 let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
130
131 let (upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info) = {
132 let client = room.client();
133 let cache_store = client
134 .event_cache_store()
135 .lock()
136 .await
137 .map_err(RoomSendQueueStorageError::LockError)?;
138
139 cache_store
141 .add_media_content(
142 &file_media_request,
143 data.clone(),
144 IgnoreMediaRetentionPolicy::Yes,
146 )
147 .await
148 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
149
150 if let Some(thumbnail) = config.thumbnail.take() {
152 let txn = TransactionId::new();
153 trace!(upload_thumbnail_txn = %txn, "attachment has a thumbnail");
154
155 let (data, content_type, thumbnail_info) = thumbnail.into_parts();
158
159 let thumbnail_media_request = Media::make_local_file_media_request(&txn);
161 cache_store
162 .add_media_content(
163 &thumbnail_media_request,
164 data,
165 IgnoreMediaRetentionPolicy::Yes,
167 )
168 .await
169 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
170
171 (
172 Some(txn.clone()),
173 Some((thumbnail_media_request.source.clone(), thumbnail_info)),
174 Some((
175 FinishUploadThumbnailInfo { txn, width: None, height: None },
176 thumbnail_media_request,
177 content_type,
178 )),
179 )
180 } else {
181 Default::default()
182 }
183 };
184
185 let event_content = room
187 .make_attachment_event(
188 room.make_attachment_type(
189 &content_type,
190 filename,
191 file_media_request.source.clone(),
192 config.caption,
193 config.formatted_caption,
194 config.info,
195 event_thumbnail_info,
196 ),
197 config.mentions,
198 config.reply,
199 )
200 .await
201 .map_err(|_| RoomSendQueueError::FailedToCreateAttachment)?;
202
203 let created_at = MilliSecondsSinceUnixEpoch::now();
204
205 self.inner
207 .queue
208 .push_media(
209 event_content.clone(),
210 content_type,
211 send_event_txn.clone().into(),
212 created_at,
213 upload_file_txn.clone(),
214 file_media_request,
215 queue_thumbnail_info,
216 )
217 .await?;
218
219 trace!("manager sends a media to the background task");
220
221 self.inner.notifier.notify_one();
222
223 let send_handle = SendHandle {
224 room: self.clone(),
225 transaction_id: send_event_txn.clone().into(),
226 media_handles: vec![MediaHandles { upload_thumbnail_txn, upload_file_txn }],
227 created_at,
228 };
229
230 let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
231 transaction_id: send_event_txn.clone().into(),
232 content: LocalEchoContent::Event {
233 serialized_event: SerializableEventContent::new(&event_content.into())
234 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
235 send_handle: send_handle.clone(),
236 send_error: None,
237 },
238 }));
239
240 Ok(send_handle)
241 }
242}
243
244impl QueueStorage {
245 #[allow(clippy::too_many_arguments)]
247 pub(super) async fn handle_dependent_finish_upload(
248 &self,
249 client: &Client,
250 event_txn: OwnedTransactionId,
251 parent_key: SentRequestKey,
252 mut local_echo: RoomMessageEventContent,
253 file_upload_txn: OwnedTransactionId,
254 thumbnail_info: Option<FinishUploadThumbnailInfo>,
255 new_updates: &mut Vec<RoomSendQueueUpdate>,
256 ) -> Result<(), RoomSendQueueError> {
257 let sent_media = parent_key
259 .into_media()
260 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
261
262 {
264 let from_req = Media::make_local_file_media_request(&file_upload_txn);
266
267 trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
268 let cache_store = client
269 .event_cache_store()
270 .lock()
271 .await
272 .map_err(RoomSendQueueStorageError::LockError)?;
273
274 cache_store
276 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
277 .await
278 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
279
280 cache_store
281 .replace_media_key(
282 &from_req,
283 &MediaRequestParameters {
284 source: sent_media.file.clone(),
285 format: MediaFormat::File,
286 },
287 )
288 .await
289 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
290
291 if let Some((info, new_source)) =
293 thumbnail_info.as_ref().zip(sent_media.thumbnail.clone())
294 {
295 let from_req = if let Some((height, width)) = info.height.zip(info.width) {
298 Media::make_local_thumbnail_media_request(&info.txn, height, width)
299 } else {
300 Media::make_local_file_media_request(&info.txn)
301 };
302
303 trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
304
305 cache_store
307 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
308 .await
309 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
310
311 cache_store
312 .replace_media_key(
313 &from_req,
314 &MediaRequestParameters { source: new_source, format: MediaFormat::File },
315 )
316 .await
317 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
318 }
319 }
320
321 update_media_event_after_upload(&mut local_echo, sent_media);
322
323 let new_content = SerializableEventContent::new(&local_echo.into())
324 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
325
326 new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
329 transaction_id: event_txn.clone(),
330 new_content: new_content.clone(),
331 });
332
333 trace!(%event_txn, "queueing media event after successfully uploading media(s)");
334
335 client
336 .state_store()
337 .save_send_queue_request(
338 &self.room_id,
339 event_txn,
340 MilliSecondsSinceUnixEpoch::now(),
341 new_content.into(),
342 Self::HIGH_PRIORITY,
343 )
344 .await
345 .map_err(RoomSendQueueStorageError::StateStoreError)?;
346
347 Ok(())
348 }
349
350 pub(super) async fn handle_dependent_file_upload_with_thumbnail(
352 &self,
353 client: &Client,
354 next_upload_txn: OwnedTransactionId,
355 parent_key: SentRequestKey,
356 content_type: String,
357 cache_key: MediaRequestParameters,
358 event_txn: OwnedTransactionId,
359 ) -> Result<(), RoomSendQueueError> {
360 let sent_media = parent_key
363 .into_media()
364 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
365
366 debug_assert!(sent_media.thumbnail.is_none());
369 if sent_media.thumbnail.is_some() {
370 warn!("unexpected thumbnail for a thumbnail!");
371 }
372
373 trace!(related_to = %event_txn, "done uploading thumbnail, now queuing a request to send the media file itself");
374
375 let request = QueuedRequestKind::MediaUpload {
376 content_type,
377 cache_key,
378 thumbnail_source: Some(sent_media.file),
380 related_to: event_txn,
381 };
382
383 client
384 .state_store()
385 .save_send_queue_request(
386 &self.room_id,
387 next_upload_txn,
388 MilliSecondsSinceUnixEpoch::now(),
389 request,
390 Self::HIGH_PRIORITY,
391 )
392 .await
393 .map_err(RoomSendQueueStorageError::StateStoreError)?;
394
395 Ok(())
396 }
397
398 #[instrument(skip(self, handles))]
405 pub(super) async fn abort_upload(
406 &self,
407 event_txn: &TransactionId,
408 handles: &MediaHandles,
409 ) -> Result<bool, RoomSendQueueStorageError> {
410 let mut guard = self.store.lock().await;
411 let client = guard.client()?;
412
413 debug!("trying to abort an upload");
415
416 let store = client.state_store();
417
418 let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
419 let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
420
421 let mut removed_dependent_upload = false;
422 let mut removed_dependent_event = false;
423
424 if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn {
425 if store.remove_send_queue_request(&self.room_id, thumbnail_txn).await? {
426 trace!("could remove thumbnail request, removing 2 dependent requests now");
429
430 if let Some(info) = guard.being_sent.as_ref() {
432 if info.transaction_id == *thumbnail_txn {
433 let info = guard.being_sent.take().unwrap();
435 if info.cancel_upload() {
436 trace!("aborted ongoing thumbnail upload");
437 }
438 }
439 }
440
441 removed_dependent_upload = store
443 .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
444 .await?;
445
446 if !removed_dependent_upload {
447 warn!("unable to find the dependent file upload request");
448 }
449
450 removed_dependent_event = store
451 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
452 .await?;
453
454 if !removed_dependent_event {
455 warn!("unable to find the dependent media event upload request");
456 }
457 }
458 }
459
460 if !removed_dependent_upload {
467 if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
468 trace!("could remove file upload request, removing 1 dependent request");
471
472 if let Some(info) = guard.being_sent.as_ref() {
474 if info.transaction_id == handles.upload_file_txn {
475 let info = guard.being_sent.take().unwrap();
477 if info.cancel_upload() {
478 trace!("aborted ongoing file upload");
479 }
480 }
481 }
482
483 if !store
485 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
486 .await?
487 {
488 warn!("unable to find the dependent media event upload request");
489 }
490 } else {
491 if !removed_dependent_event
496 && !store
497 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
498 .await?
499 {
500 debug!("uploads already happened => deferring to aborting an event sending");
503 return Ok(false);
504 }
505 }
506 }
507
508 {
511 let event_cache = client.event_cache_store().lock().await?;
512 event_cache
513 .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
514 .await?;
515 if let Some(txn) = &handles.upload_thumbnail_txn {
516 event_cache.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
517 }
518 }
519
520 debug!("successfully aborted!");
521 Ok(true)
522 }
523
524 #[instrument(skip(self, caption, formatted_caption))]
525 pub(super) async fn edit_media_caption(
526 &self,
527 txn: &TransactionId,
528 caption: Option<String>,
529 formatted_caption: Option<FormattedBody>,
530 mentions: Option<Mentions>,
531 ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
532 use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
534
535 let guard = self.store.lock().await;
536 let client = guard.client()?;
537 let store = client.state_store();
538
539 {
547 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
550
551 if let Some(found) =
552 dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
553 {
554 trace!("found the caption to edit in a dependent request");
555
556 let DependentQueuedRequestKind::FinishUpload {
557 mut local_echo,
558 file_upload,
559 thumbnail_info,
560 } = found.kind
561 else {
562 return Err(InvalidMediaCaptionEdit);
563 };
564
565 if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
566 return Err(InvalidMediaCaptionEdit);
567 }
568
569 let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
570 local_echo: local_echo.clone(),
571 file_upload,
572 thumbnail_info,
573 };
574 store
575 .update_dependent_queued_request(
576 &self.room_id,
577 &found.own_transaction_id,
578 new_dependent_request,
579 )
580 .await?;
581
582 trace!("caption successfully updated");
583 return Ok(Some((*local_echo).into()));
584 }
585 }
586
587 let requests = store.load_send_queue_requests(&self.room_id).await?;
588 let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
589 return Ok(None);
591 };
592
593 trace!("found the caption to edit as a request");
594
595 let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
596 return Err(InvalidMediaCaptionEdit);
597 };
598
599 let deserialized = serialized_content.deserialize()?;
600 let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
601 return Err(InvalidMediaCaptionEdit);
602 };
603
604 if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
605 return Err(InvalidMediaCaptionEdit);
606 }
607
608 let any_content: AnyMessageLikeEventContent = content.into();
609 let new_serialized = SerializableEventContent::new(&any_content.clone())?;
610
611 if let Some(being_sent) = guard.being_sent.as_ref() {
613 if being_sent.transaction_id == *txn {
614 store
616 .save_dependent_queued_request(
617 &self.room_id,
618 txn,
619 ChildTransactionId::new(),
620 MilliSecondsSinceUnixEpoch::now(),
621 DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
622 )
623 .await?;
624
625 trace!("media event was being sent, pushed a dependent edit");
626 return Ok(Some(any_content));
627 }
628 }
629
630 store
632 .update_send_queue_request(
633 &self.room_id,
634 txn,
635 QueuedRequestKind::Event { content: new_serialized },
636 )
637 .await?;
638
639 trace!("media event was not being sent, updated local echo");
640 Ok(Some(any_content))
641 }
642}