1use std::{
132 collections::{BTreeMap, HashMap},
133 str::FromStr as _,
134 sync::{
135 atomic::{AtomicBool, Ordering},
136 Arc, RwLock,
137 },
138};
139
140use as_variant::as_variant;
141use matrix_sdk_base::{
142 event_cache::store::EventCacheStoreError,
143 media::MediaRequestParameters,
144 store::{
145 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
146 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
147 SentMediaInfo, SentRequestKey, SerializableEventContent,
148 },
149 store_locks::LockStoreError,
150 RoomState, StoreError,
151};
152use matrix_sdk_common::executor::{spawn, JoinHandle};
153use mime::Mime;
154use ruma::{
155 events::{
156 reaction::ReactionEventContent,
157 relation::Annotation,
158 room::{
159 message::{FormattedBody, RoomMessageEventContent},
160 MediaSource,
161 },
162 AnyMessageLikeEventContent, EventContent as _, Mentions,
163 },
164 serde::Raw,
165 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
166};
167use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
168use tracing::{debug, error, info, instrument, trace, warn};
169
170#[cfg(feature = "e2e-encryption")]
171use crate::crypto::{OlmError, SessionRecipientCollectionError};
172use crate::{
173 client::WeakClient,
174 config::RequestConfig,
175 error::RetryKind,
176 room::{edit::EditedContent, WeakRoom},
177 Client, Media, Room,
178};
179
180mod upload;
181
182pub struct SendQueue {
184 client: Client,
185}
186
187#[cfg(not(tarpaulin_include))]
188impl std::fmt::Debug for SendQueue {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 f.debug_struct("SendQueue").finish_non_exhaustive()
191 }
192}
193
194impl SendQueue {
195 pub(super) fn new(client: Client) -> Self {
196 Self { client }
197 }
198
199 pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
202 if !self.is_enabled() {
203 return;
204 }
205
206 let room_ids =
207 self.client.store().load_rooms_with_unsent_requests().await.unwrap_or_else(|err| {
208 warn!("error when loading rooms with unsent requests: {err}");
209 Vec::new()
210 });
211
212 for room_id in room_ids {
214 if let Some(room) = self.client.get_room(&room_id) {
215 let _ = self.for_room(room);
216 }
217 }
218 }
219
220 #[inline(always)]
222 fn data(&self) -> &SendQueueData {
223 &self.client.inner.send_queue_data
224 }
225
226 fn for_room(&self, room: Room) -> RoomSendQueue {
229 let data = self.data();
230
231 let mut map = data.rooms.write().unwrap();
232
233 let room_id = room.room_id();
234 if let Some(room_q) = map.get(room_id).cloned() {
235 return room_q;
236 }
237
238 let owned_room_id = room_id.to_owned();
239 let room_q = RoomSendQueue::new(
240 self.is_enabled(),
241 data.error_reporter.clone(),
242 data.is_dropping.clone(),
243 &self.client,
244 owned_room_id.clone(),
245 );
246
247 map.insert(owned_room_id, room_q.clone());
248
249 room_q
250 }
251
252 pub async fn set_enabled(&self, enabled: bool) {
262 debug!(?enabled, "setting global send queue enablement");
263
264 self.data().globally_enabled.store(enabled, Ordering::SeqCst);
265
266 for room in self.data().rooms.read().unwrap().values() {
268 room.set_enabled(enabled);
269 }
270
271 self.respawn_tasks_for_rooms_with_unsent_requests().await;
274 }
275
276 pub fn is_enabled(&self) -> bool {
279 self.data().globally_enabled.load(Ordering::SeqCst)
280 }
281
282 pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
285 self.data().error_reporter.subscribe()
286 }
287}
288
289#[derive(Clone, Debug)]
291pub struct SendQueueRoomError {
292 pub room_id: OwnedRoomId,
294
295 pub error: Arc<crate::Error>,
297
298 pub is_recoverable: bool,
304}
305
306impl Client {
307 pub fn send_queue(&self) -> SendQueue {
310 SendQueue::new(self.clone())
311 }
312}
313
314pub(super) struct SendQueueData {
315 rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
317
318 globally_enabled: AtomicBool,
323
324 error_reporter: broadcast::Sender<SendQueueRoomError>,
326
327 is_dropping: Arc<AtomicBool>,
329}
330
331impl SendQueueData {
332 pub fn new(globally_enabled: bool) -> Self {
334 let (sender, _) = broadcast::channel(32);
335
336 Self {
337 rooms: Default::default(),
338 globally_enabled: AtomicBool::new(globally_enabled),
339 error_reporter: sender,
340 is_dropping: Arc::new(false.into()),
341 }
342 }
343}
344
345impl Drop for SendQueueData {
346 fn drop(&mut self) {
347 debug!("globally dropping the send queue");
350 self.is_dropping.store(true, Ordering::SeqCst);
351
352 let rooms = self.rooms.read().unwrap();
353 for room in rooms.values() {
354 room.inner.notifier.notify_one();
355 }
356 }
357}
358
359impl Room {
360 pub fn send_queue(&self) -> RoomSendQueue {
362 self.client.send_queue().for_room(self.clone())
363 }
364}
365
366#[derive(Clone)]
370pub struct RoomSendQueue {
371 inner: Arc<RoomSendQueueInner>,
372}
373
374#[cfg(not(tarpaulin_include))]
375impl std::fmt::Debug for RoomSendQueue {
376 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
377 f.debug_struct("RoomSendQueue").finish_non_exhaustive()
378 }
379}
380
381impl RoomSendQueue {
382 fn new(
383 globally_enabled: bool,
384 global_error_reporter: broadcast::Sender<SendQueueRoomError>,
385 is_dropping: Arc<AtomicBool>,
386 client: &Client,
387 room_id: OwnedRoomId,
388 ) -> Self {
389 let (updates_sender, _) = broadcast::channel(32);
390
391 let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
392 let notifier = Arc::new(Notify::new());
393
394 let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
395 let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
396
397 let task = spawn(Self::sending_task(
398 weak_room.clone(),
399 queue.clone(),
400 notifier.clone(),
401 updates_sender.clone(),
402 locally_enabled.clone(),
403 global_error_reporter,
404 is_dropping,
405 ));
406
407 Self {
408 inner: Arc::new(RoomSendQueueInner {
409 room: weak_room,
410 updates: updates_sender,
411 _task: task,
412 queue,
413 notifier,
414 locally_enabled,
415 }),
416 }
417 }
418
419 pub async fn send_raw(
434 &self,
435 content: Raw<AnyMessageLikeEventContent>,
436 event_type: String,
437 ) -> Result<SendHandle, RoomSendQueueError> {
438 let Some(room) = self.inner.room.get() else {
439 return Err(RoomSendQueueError::RoomDisappeared);
440 };
441 if room.state() != RoomState::Joined {
442 return Err(RoomSendQueueError::RoomNotJoined);
443 }
444
445 let content = SerializableEventContent::from_raw(content, event_type);
446
447 let created_at = MilliSecondsSinceUnixEpoch::now();
448 let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
449 trace!(%transaction_id, "manager sends a raw event to the background task");
450
451 self.inner.notifier.notify_one();
452
453 let send_handle = SendHandle {
454 room: self.clone(),
455 transaction_id: transaction_id.clone(),
456 media_handles: None,
457 created_at,
458 };
459
460 let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
461 transaction_id,
462 content: LocalEchoContent::Event {
463 serialized_event: content,
464 send_handle: send_handle.clone(),
465 send_error: None,
466 },
467 }));
468
469 Ok(send_handle)
470 }
471
472 pub async fn send(
487 &self,
488 content: AnyMessageLikeEventContent,
489 ) -> Result<SendHandle, RoomSendQueueError> {
490 self.send_raw(
491 Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
492 content.event_type().to_string(),
493 )
494 .await
495 }
496
497 pub async fn subscribe(
500 &self,
501 ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
502 {
503 let local_echoes = self.inner.queue.local_echoes(self).await?;
504
505 Ok((local_echoes, self.inner.updates.subscribe()))
506 }
507
508 #[instrument(skip_all, fields(room_id = %room.room_id()))]
514 async fn sending_task(
515 room: WeakRoom,
516 queue: QueueStorage,
517 notifier: Arc<Notify>,
518 updates: broadcast::Sender<RoomSendQueueUpdate>,
519 locally_enabled: Arc<AtomicBool>,
520 global_error_reporter: broadcast::Sender<SendQueueRoomError>,
521 is_dropping: Arc<AtomicBool>,
522 ) {
523 info!("spawned the sending task");
524
525 loop {
526 if is_dropping.load(Ordering::SeqCst) {
528 trace!("shutting down!");
529 break;
530 }
531
532 let mut new_updates = Vec::new();
535 if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
536 warn!("errors when applying dependent requests: {err}");
537 }
538
539 for up in new_updates {
540 let _ = updates.send(up);
541 }
542
543 if !locally_enabled.load(Ordering::SeqCst) {
544 trace!("not enabled, sleeping");
545 notifier.notified().await;
547 continue;
548 }
549
550 let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
551 Ok(Some(request)) => request,
552
553 Ok(None) => {
554 trace!("queue is empty, sleeping");
555 notifier.notified().await;
557 continue;
558 }
559
560 Err(err) => {
561 warn!("error when loading next request to send: {err}");
562 continue;
563 }
564 };
565
566 let txn_id = queued_request.transaction_id.clone();
567 trace!(txn_id = %txn_id, "received a request to send!");
568
569 let related_txn_id = as_variant!(&queued_request.kind, QueuedRequestKind::MediaUpload { related_to, .. } => related_to.clone());
570
571 let Some(room) = room.get() else {
572 if is_dropping.load(Ordering::SeqCst) {
573 break;
574 }
575 error!("the weak room couldn't be upgraded but we're not shutting down?");
576 continue;
577 };
578
579 match Self::handle_request(&room, queued_request, cancel_upload_rx).await {
580 Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
581 {
582 Ok(()) => match parent_key {
583 SentRequestKey::Event(event_id) => {
584 let _ = updates.send(RoomSendQueueUpdate::SentEvent {
585 transaction_id: txn_id,
586 event_id,
587 });
588 }
589
590 SentRequestKey::Media(media_info) => {
591 let _ = updates.send(RoomSendQueueUpdate::UploadedMedia {
592 related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
593 file: media_info.file,
594 });
595 }
596 },
597
598 Err(err) => {
599 warn!("unable to mark queued request as sent: {err}");
600 }
601 },
602
603 Ok(None) => {
604 debug!("Request has been aborted while running, continuing.");
605 }
606
607 Err(err) => {
608 let is_recoverable = match err {
609 crate::Error::Http(ref http_err) => {
610 matches!(
612 http_err.retry_kind(),
613 RetryKind::Transient { .. } | RetryKind::NetworkFailure
614 )
615 }
616
617 crate::Error::ConcurrentRequestFailed => true,
622
623 _ => false,
625 };
626
627 locally_enabled.store(false, Ordering::SeqCst);
629
630 if is_recoverable {
631 warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
632
633 queue.mark_as_not_being_sent(&txn_id).await;
636
637 } else {
643 warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
644
645 if let Err(storage_error) =
647 queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
648 {
649 warn!("unable to mark request as wedged: {storage_error}");
650 }
651 }
652
653 let error = Arc::new(err);
654
655 let _ = global_error_reporter.send(SendQueueRoomError {
656 room_id: room.room_id().to_owned(),
657 error: error.clone(),
658 is_recoverable,
659 });
660
661 let _ = updates.send(RoomSendQueueUpdate::SendError {
662 transaction_id: related_txn_id.unwrap_or(txn_id),
663 error,
664 is_recoverable,
665 });
666 }
667 }
668 }
669
670 info!("exited sending task");
671 }
672
673 async fn handle_request(
677 room: &Room,
678 request: QueuedRequest,
679 cancel_upload_rx: Option<oneshot::Receiver<()>>,
680 ) -> Result<Option<SentRequestKey>, crate::Error> {
681 match request.kind {
682 QueuedRequestKind::Event { content } => {
683 let (event, event_type) = content.raw();
684
685 let res = room
686 .send_raw(event_type, event)
687 .with_transaction_id(&request.transaction_id)
688 .with_request_config(RequestConfig::short_retry())
689 .await?;
690
691 trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
692 Ok(Some(SentRequestKey::Event(res.event_id)))
693 }
694
695 QueuedRequestKind::MediaUpload {
696 content_type,
697 cache_key,
698 thumbnail_source,
699 related_to: relates_to,
700 } => {
701 trace!(%relates_to, "uploading media related to event");
702
703 let fut = async move {
704 let mime = Mime::from_str(&content_type).map_err(|_| {
705 crate::Error::SendQueueWedgeError(QueueWedgeError::InvalidMimeType {
706 mime_type: content_type.clone(),
707 })
708 })?;
709
710 let data = room
711 .client()
712 .event_cache_store()
713 .lock()
714 .await?
715 .get_media_content(&cache_key)
716 .await?
717 .ok_or(crate::Error::SendQueueWedgeError(
718 QueueWedgeError::MissingMediaContent,
719 ))?;
720
721 #[cfg(feature = "e2e-encryption")]
722 let media_source = if room.is_encrypted().await? {
723 trace!("upload will be encrypted (encrypted room)");
724 let mut cursor = std::io::Cursor::new(data);
725 let encrypted_file = room
726 .client()
727 .upload_encrypted_file(&mime, &mut cursor)
728 .with_request_config(RequestConfig::short_retry())
729 .await?;
730 MediaSource::Encrypted(Box::new(encrypted_file))
731 } else {
732 trace!("upload will be in clear text (room without encryption)");
733 let request_config = RequestConfig::short_retry()
734 .timeout(Media::reasonable_upload_timeout(&data));
735 let res =
736 room.client().media().upload(&mime, data, Some(request_config)).await?;
737 MediaSource::Plain(res.content_uri)
738 };
739
740 #[cfg(not(feature = "e2e-encryption"))]
741 let media_source = {
742 let request_config = RequestConfig::short_retry()
743 .timeout(Media::reasonable_upload_timeout(&data));
744 let res =
745 room.client().media().upload(&mime, data, Some(request_config)).await?;
746 MediaSource::Plain(res.content_uri)
747 };
748
749 let uri = match &media_source {
750 MediaSource::Plain(uri) => uri,
751 MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
752 };
753 trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
754
755 Ok(SentRequestKey::Media(SentMediaInfo {
756 file: media_source,
757 thumbnail: thumbnail_source,
758 }))
759 };
760
761 let wait_for_cancel = async move {
762 if let Some(rx) = cancel_upload_rx {
763 rx.await
764 } else {
765 std::future::pending().await
766 }
767 };
768
769 tokio::select! {
770 biased;
771
772 _ = wait_for_cancel => {
773 Ok(None)
774 }
775
776 res = fut => {
777 res.map(Some)
778 }
779 }
780 }
781 }
782 }
783
784 pub fn is_enabled(&self) -> bool {
786 self.inner.locally_enabled.load(Ordering::SeqCst)
787 }
788
789 pub fn set_enabled(&self, enabled: bool) {
791 self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
792
793 if enabled {
796 self.inner.notifier.notify_one();
797 }
798 }
799}
800
801impl From<&crate::Error> for QueueWedgeError {
802 fn from(value: &crate::Error) -> Self {
803 match value {
804 #[cfg(feature = "e2e-encryption")]
805 crate::Error::OlmError(OlmError::SessionRecipientCollectionError(error)) => match error
806 {
807 SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
808 QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
809 }
810
811 SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
812 QueueWedgeError::IdentityViolations { users: users.clone() }
813 }
814
815 SessionRecipientCollectionError::CrossSigningNotSetup
816 | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
817 QueueWedgeError::CrossVerificationRequired
818 }
819 },
820
821 crate::Error::SendQueueWedgeError(error) => error.clone(),
823
824 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
825 }
826 }
827}
828
829struct RoomSendQueueInner {
830 room: WeakRoom,
832
833 updates: broadcast::Sender<RoomSendQueueUpdate>,
837
838 queue: QueueStorage,
845
846 notifier: Arc<Notify>,
849
850 locally_enabled: Arc<AtomicBool>,
853
854 _task: JoinHandle<()>,
857}
858
859struct BeingSentInfo {
861 transaction_id: OwnedTransactionId,
863
864 cancel_upload: Option<oneshot::Sender<()>>,
867}
868
869impl BeingSentInfo {
870 fn cancel_upload(self) -> bool {
875 if let Some(cancel_upload) = self.cancel_upload {
876 let _ = cancel_upload.send(());
877 true
878 } else {
879 false
880 }
881 }
882}
883
884#[derive(Clone)]
887struct StoreLock {
888 client: WeakClient,
890
891 being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
896}
897
898impl StoreLock {
899 async fn lock(&self) -> StoreLockGuard {
901 StoreLockGuard {
902 client: self.client.clone(),
903 being_sent: self.being_sent.clone().lock_owned().await,
904 }
905 }
906}
907
908struct StoreLockGuard {
911 client: WeakClient,
913
914 being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
917}
918
919impl StoreLockGuard {
920 fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
922 self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
923 }
924}
925
926#[derive(Clone)]
927struct QueueStorage {
928 store: StoreLock,
931
932 room_id: OwnedRoomId,
934}
935
936impl QueueStorage {
937 const LOW_PRIORITY: usize = 0;
939
940 const HIGH_PRIORITY: usize = 10;
942
943 fn new(client: WeakClient, room: OwnedRoomId) -> Self {
945 Self { room_id: room, store: StoreLock { client, being_sent: Default::default() } }
946 }
947
948 async fn push(
952 &self,
953 request: QueuedRequestKind,
954 created_at: MilliSecondsSinceUnixEpoch,
955 ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
956 let transaction_id = TransactionId::new();
957
958 self.store
959 .lock()
960 .await
961 .client()?
962 .store()
963 .save_send_queue_request(
964 &self.room_id,
965 transaction_id.clone(),
966 created_at,
967 request,
968 Self::LOW_PRIORITY,
969 )
970 .await?;
971
972 Ok(transaction_id)
973 }
974
975 async fn peek_next_to_send(
980 &self,
981 ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
982 {
983 let mut guard = self.store.lock().await;
984 let queued_requests =
985 guard.client()?.store().load_send_queue_requests(&self.room_id).await?;
986
987 if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
988 let (cancel_upload_tx, cancel_upload_rx) =
989 if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
990 let (tx, rx) = oneshot::channel();
991 (Some(tx), Some(rx))
992 } else {
993 Default::default()
994 };
995
996 let prev = guard.being_sent.replace(BeingSentInfo {
997 transaction_id: request.transaction_id.clone(),
998 cancel_upload: cancel_upload_tx,
999 });
1000
1001 if let Some(prev) = prev {
1002 error!(
1003 prev_txn = ?prev.transaction_id,
1004 "a previous request was still active while picking a new one"
1005 );
1006 }
1007
1008 Ok(Some((request.clone(), cancel_upload_rx)))
1009 } else {
1010 Ok(None)
1011 }
1012 }
1013
1014 async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1018 let was_being_sent = self.store.lock().await.being_sent.take();
1019
1020 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1021 if prev_txn != Some(transaction_id) {
1022 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1023 }
1024 }
1025
1026 async fn mark_as_wedged(
1030 &self,
1031 transaction_id: &TransactionId,
1032 reason: QueueWedgeError,
1033 ) -> Result<(), RoomSendQueueStorageError> {
1034 let mut guard = self.store.lock().await;
1036 let was_being_sent = guard.being_sent.take();
1037
1038 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1039 if prev_txn != Some(transaction_id) {
1040 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after permanent error)");
1041 }
1042
1043 Ok(guard
1044 .client()?
1045 .store()
1046 .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1047 .await?)
1048 }
1049
1050 async fn mark_as_unwedged(
1053 &self,
1054 transaction_id: &TransactionId,
1055 ) -> Result<(), RoomSendQueueStorageError> {
1056 Ok(self
1057 .store
1058 .lock()
1059 .await
1060 .client()?
1061 .store()
1062 .update_send_queue_request_status(&self.room_id, transaction_id, None)
1063 .await?)
1064 }
1065
1066 async fn mark_as_sent(
1069 &self,
1070 transaction_id: &TransactionId,
1071 parent_key: SentRequestKey,
1072 ) -> Result<(), RoomSendQueueStorageError> {
1073 let mut guard = self.store.lock().await;
1075 let was_being_sent = guard.being_sent.take();
1076
1077 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1078 if prev_txn != Some(transaction_id) {
1079 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after successful send");
1080 }
1081
1082 let client = guard.client()?;
1083 let store = client.store();
1084
1085 store
1087 .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1088 .await?;
1089
1090 let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1091
1092 if !removed {
1093 warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1094 }
1095
1096 Ok(())
1097 }
1098
1099 async fn cancel_event(
1106 &self,
1107 transaction_id: &TransactionId,
1108 ) -> Result<bool, RoomSendQueueStorageError> {
1109 let guard = self.store.lock().await;
1110
1111 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1112 == Some(transaction_id)
1113 {
1114 guard
1116 .client()?
1117 .store()
1118 .save_dependent_queued_request(
1119 &self.room_id,
1120 transaction_id,
1121 ChildTransactionId::new(),
1122 MilliSecondsSinceUnixEpoch::now(),
1123 DependentQueuedRequestKind::RedactEvent,
1124 )
1125 .await?;
1126
1127 return Ok(true);
1128 }
1129
1130 let removed = guard
1131 .client()?
1132 .store()
1133 .remove_send_queue_request(&self.room_id, transaction_id)
1134 .await?;
1135
1136 Ok(removed)
1137 }
1138
1139 async fn replace_event(
1146 &self,
1147 transaction_id: &TransactionId,
1148 serializable: SerializableEventContent,
1149 ) -> Result<bool, RoomSendQueueStorageError> {
1150 let guard = self.store.lock().await;
1151
1152 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1153 == Some(transaction_id)
1154 {
1155 guard
1157 .client()?
1158 .store()
1159 .save_dependent_queued_request(
1160 &self.room_id,
1161 transaction_id,
1162 ChildTransactionId::new(),
1163 MilliSecondsSinceUnixEpoch::now(),
1164 DependentQueuedRequestKind::EditEvent { new_content: serializable },
1165 )
1166 .await?;
1167
1168 return Ok(true);
1169 }
1170
1171 let edited = guard
1172 .client()?
1173 .store()
1174 .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1175 .await?;
1176
1177 Ok(edited)
1178 }
1179
1180 #[allow(clippy::too_many_arguments)]
1184 async fn push_media(
1185 &self,
1186 event: RoomMessageEventContent,
1187 content_type: Mime,
1188 send_event_txn: OwnedTransactionId,
1189 created_at: MilliSecondsSinceUnixEpoch,
1190 upload_file_txn: OwnedTransactionId,
1191 file_media_request: MediaRequestParameters,
1192 thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
1193 ) -> Result<(), RoomSendQueueStorageError> {
1194 let guard = self.store.lock().await;
1195 let client = guard.client()?;
1196 let store = client.store();
1197 let thumbnail_info =
1198 if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) =
1199 thumbnail
1200 {
1201 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1202
1203 store
1205 .save_send_queue_request(
1206 &self.room_id,
1207 upload_thumbnail_txn.clone(),
1208 created_at,
1209 QueuedRequestKind::MediaUpload {
1210 content_type: thumbnail_content_type.to_string(),
1211 cache_key: thumbnail_media_request,
1212 thumbnail_source: None, related_to: send_event_txn.clone(),
1214 },
1215 Self::LOW_PRIORITY,
1216 )
1217 .await?;
1218
1219 store
1221 .save_dependent_queued_request(
1222 &self.room_id,
1223 &upload_thumbnail_txn,
1224 upload_file_txn.clone().into(),
1225 created_at,
1226 DependentQueuedRequestKind::UploadFileWithThumbnail {
1227 content_type: content_type.to_string(),
1228 cache_key: file_media_request,
1229 related_to: send_event_txn.clone(),
1230 },
1231 )
1232 .await?;
1233
1234 Some(thumbnail_info)
1235 } else {
1236 store
1238 .save_send_queue_request(
1239 &self.room_id,
1240 upload_file_txn.clone(),
1241 created_at,
1242 QueuedRequestKind::MediaUpload {
1243 content_type: content_type.to_string(),
1244 cache_key: file_media_request,
1245 thumbnail_source: None,
1246 related_to: send_event_txn.clone(),
1247 },
1248 Self::LOW_PRIORITY,
1249 )
1250 .await?;
1251
1252 None
1253 };
1254
1255 store
1257 .save_dependent_queued_request(
1258 &self.room_id,
1259 &upload_file_txn,
1260 send_event_txn.into(),
1261 created_at,
1262 DependentQueuedRequestKind::FinishUpload {
1263 local_echo: event,
1264 file_upload: upload_file_txn.clone(),
1265 thumbnail_info,
1266 },
1267 )
1268 .await?;
1269
1270 Ok(())
1271 }
1272
1273 #[instrument(skip(self))]
1275 async fn react(
1276 &self,
1277 transaction_id: &TransactionId,
1278 key: String,
1279 created_at: MilliSecondsSinceUnixEpoch,
1280 ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1281 let guard = self.store.lock().await;
1282 let client = guard.client()?;
1283 let store = client.store();
1284
1285 let requests = store.load_send_queue_requests(&self.room_id).await?;
1286
1287 if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1289 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1292 if !dependent_requests
1293 .into_iter()
1294 .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1295 .any(|child_txn| *child_txn == *transaction_id)
1296 {
1297 return Ok(None);
1299 }
1300 }
1301
1302 let reaction_txn_id = ChildTransactionId::new();
1304 store
1305 .save_dependent_queued_request(
1306 &self.room_id,
1307 transaction_id,
1308 reaction_txn_id.clone(),
1309 created_at,
1310 DependentQueuedRequestKind::ReactEvent { key },
1311 )
1312 .await?;
1313
1314 Ok(Some(reaction_txn_id))
1315 }
1316
1317 async fn local_echoes(
1320 &self,
1321 room: &RoomSendQueue,
1322 ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1323 let guard = self.store.lock().await;
1324 let client = guard.client()?;
1325 let store = client.store();
1326
1327 let local_requests =
1328 store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1329 Some(LocalEcho {
1330 transaction_id: queued.transaction_id.clone(),
1331 content: match queued.kind {
1332 QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1333 serialized_event: content,
1334 send_handle: SendHandle {
1335 room: room.clone(),
1336 transaction_id: queued.transaction_id,
1337 media_handles: None,
1338 created_at: queued.created_at,
1339 },
1340 send_error: queued.error,
1341 },
1342
1343 QueuedRequestKind::MediaUpload { .. } => {
1344 return None;
1347 }
1348 },
1349 })
1350 });
1351
1352 let reactions_and_medias = store
1353 .load_dependent_queued_requests(&self.room_id)
1354 .await?
1355 .into_iter()
1356 .filter_map(|dep| match dep.kind {
1357 DependentQueuedRequestKind::EditEvent { .. }
1358 | DependentQueuedRequestKind::RedactEvent => {
1359 None
1361 }
1362
1363 DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1364 transaction_id: dep.own_transaction_id.clone().into(),
1365 content: LocalEchoContent::React {
1366 key,
1367 send_handle: SendReactionHandle {
1368 room: room.clone(),
1369 transaction_id: dep.own_transaction_id,
1370 },
1371 applies_to: dep.parent_transaction_id,
1372 },
1373 }),
1374
1375 DependentQueuedRequestKind::UploadFileWithThumbnail { .. } => {
1376 None
1378 }
1379
1380 DependentQueuedRequestKind::FinishUpload {
1381 local_echo,
1382 file_upload,
1383 thumbnail_info,
1384 } => {
1385 Some(LocalEcho {
1387 transaction_id: dep.own_transaction_id.clone().into(),
1388 content: LocalEchoContent::Event {
1389 serialized_event: SerializableEventContent::new(&local_echo.into())
1390 .ok()?,
1391 send_handle: SendHandle {
1392 room: room.clone(),
1393 transaction_id: dep.own_transaction_id.into(),
1394 media_handles: Some(MediaHandles {
1395 upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1396 upload_file_txn: file_upload,
1397 }),
1398 created_at: dep.created_at,
1399 },
1400 send_error: None,
1401 },
1402 })
1403 }
1404 });
1405
1406 Ok(local_requests.chain(reactions_and_medias).collect())
1407 }
1408
1409 #[instrument(skip_all)]
1417 async fn try_apply_single_dependent_request(
1418 &self,
1419 client: &Client,
1420 dependent_request: DependentQueuedRequest,
1421 new_updates: &mut Vec<RoomSendQueueUpdate>,
1422 ) -> Result<bool, RoomSendQueueError> {
1423 let store = client.store();
1424
1425 let parent_key = dependent_request.parent_key;
1426
1427 match dependent_request.kind {
1428 DependentQueuedRequestKind::EditEvent { new_content } => {
1429 if let Some(parent_key) = parent_key {
1430 let Some(event_id) = parent_key.into_event_id() else {
1431 return Err(RoomSendQueueError::StorageError(
1432 RoomSendQueueStorageError::InvalidParentKey,
1433 ));
1434 };
1435
1436 let room = client
1438 .get_room(&self.room_id)
1439 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1440
1441 let edited_content = match new_content.deserialize() {
1445 Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
1446 EditedContent::RoomMessage(c.into())
1448 }
1449
1450 Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
1451 let poll_start = c.poll_start().clone();
1452 EditedContent::PollStart {
1453 fallback_text: poll_start.question.text.clone(),
1454 new_content: poll_start,
1455 }
1456 }
1457
1458 Ok(c) => {
1459 warn!("Unsupported edit content type: {:?}", c.event_type());
1460 return Ok(true);
1461 }
1462
1463 Err(err) => {
1464 warn!("Unable to deserialize: {err}");
1465 return Ok(true);
1466 }
1467 };
1468
1469 let edit_event = match room.make_edit_event(&event_id, edited_content).await {
1470 Ok(e) => e,
1471 Err(err) => {
1472 warn!("couldn't create edited event: {err}");
1473 return Ok(true);
1474 }
1475 };
1476
1477 let serializable = SerializableEventContent::from_raw(
1479 Raw::new(&edit_event)
1480 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1481 edit_event.event_type().to_string(),
1482 );
1483
1484 store
1485 .save_send_queue_request(
1486 &self.room_id,
1487 dependent_request.own_transaction_id.into(),
1488 dependent_request.created_at,
1489 serializable.into(),
1490 Self::HIGH_PRIORITY,
1491 )
1492 .await
1493 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1494 } else {
1495 let edited = store
1497 .update_send_queue_request(
1498 &self.room_id,
1499 &dependent_request.parent_transaction_id,
1500 new_content.into(),
1501 )
1502 .await
1503 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1504
1505 if !edited {
1506 warn!("missing local echo upon dependent edit");
1507 }
1508 }
1509 }
1510
1511 DependentQueuedRequestKind::RedactEvent => {
1512 if let Some(parent_key) = parent_key {
1513 let Some(event_id) = parent_key.into_event_id() else {
1514 return Err(RoomSendQueueError::StorageError(
1515 RoomSendQueueStorageError::InvalidParentKey,
1516 ));
1517 };
1518
1519 let room = client
1521 .get_room(&self.room_id)
1522 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1523
1524 if let Err(err) = room
1532 .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
1533 .await
1534 {
1535 warn!("error when sending a redact for {event_id}: {err}");
1536 return Ok(false);
1537 }
1538 } else {
1539 let removed = store
1542 .remove_send_queue_request(
1543 &self.room_id,
1544 &dependent_request.parent_transaction_id,
1545 )
1546 .await
1547 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1548
1549 if !removed {
1550 warn!("missing local echo upon dependent redact");
1551 }
1552 }
1553 }
1554
1555 DependentQueuedRequestKind::ReactEvent { key } => {
1556 if let Some(parent_key) = parent_key {
1557 let Some(parent_event_id) = parent_key.into_event_id() else {
1558 return Err(RoomSendQueueError::StorageError(
1559 RoomSendQueueStorageError::InvalidParentKey,
1560 ));
1561 };
1562
1563 let react_event =
1565 ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
1566 let serializable = SerializableEventContent::from_raw(
1567 Raw::new(&react_event)
1568 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1569 react_event.event_type().to_string(),
1570 );
1571
1572 store
1573 .save_send_queue_request(
1574 &self.room_id,
1575 dependent_request.own_transaction_id.into(),
1576 dependent_request.created_at,
1577 serializable.into(),
1578 Self::HIGH_PRIORITY,
1579 )
1580 .await
1581 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1582 } else {
1583 return Ok(false);
1585 }
1586 }
1587
1588 DependentQueuedRequestKind::UploadFileWithThumbnail {
1589 content_type,
1590 cache_key,
1591 related_to,
1592 } => {
1593 let Some(parent_key) = parent_key else {
1594 return Ok(false);
1596 };
1597 self.handle_dependent_file_upload_with_thumbnail(
1598 client,
1599 dependent_request.own_transaction_id.into(),
1600 parent_key,
1601 content_type,
1602 cache_key,
1603 related_to,
1604 )
1605 .await?;
1606 }
1607
1608 DependentQueuedRequestKind::FinishUpload {
1609 local_echo,
1610 file_upload,
1611 thumbnail_info,
1612 } => {
1613 let Some(parent_key) = parent_key else {
1614 return Ok(false);
1616 };
1617 self.handle_dependent_finish_upload(
1618 client,
1619 dependent_request.own_transaction_id.into(),
1620 parent_key,
1621 local_echo,
1622 file_upload,
1623 thumbnail_info,
1624 new_updates,
1625 )
1626 .await?;
1627 }
1628 }
1629
1630 Ok(true)
1631 }
1632
1633 #[instrument(skip(self))]
1634 async fn apply_dependent_requests(
1635 &self,
1636 new_updates: &mut Vec<RoomSendQueueUpdate>,
1637 ) -> Result<(), RoomSendQueueError> {
1638 let guard = self.store.lock().await;
1639
1640 let client = guard.client()?;
1641 let store = client.store();
1642
1643 let dependent_requests = store
1644 .load_dependent_queued_requests(&self.room_id)
1645 .await
1646 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1647
1648 let num_initial_dependent_requests = dependent_requests.len();
1649 if num_initial_dependent_requests == 0 {
1650 return Ok(());
1652 }
1653
1654 let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
1655
1656 for original in &dependent_requests {
1658 if !canonicalized_dependent_requests
1659 .iter()
1660 .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
1661 {
1662 store
1663 .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
1664 .await
1665 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1666 }
1667 }
1668
1669 let mut num_dependent_requests = canonicalized_dependent_requests.len();
1670
1671 debug!(
1672 num_dependent_requests,
1673 num_initial_dependent_requests, "starting handling of dependent requests"
1674 );
1675
1676 for dependent in canonicalized_dependent_requests {
1677 let dependent_id = dependent.own_transaction_id.clone();
1678
1679 match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
1680 Ok(should_remove) => {
1681 if should_remove {
1682 store
1684 .remove_dependent_queued_request(&self.room_id, &dependent_id)
1685 .await
1686 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1687
1688 num_dependent_requests -= 1;
1689 }
1690 }
1691
1692 Err(err) => {
1693 warn!("error when applying single dependent request: {err}");
1694 }
1695 }
1696 }
1697
1698 debug!(
1699 leftover_dependent_requests = num_dependent_requests,
1700 "stopped handling dependent request"
1701 );
1702
1703 Ok(())
1704 }
1705
1706 async fn remove_dependent_send_queue_request(
1708 &self,
1709 dependent_event_id: &ChildTransactionId,
1710 ) -> Result<bool, RoomSendQueueStorageError> {
1711 Ok(self
1712 .store
1713 .lock()
1714 .await
1715 .client()?
1716 .store()
1717 .remove_dependent_queued_request(&self.room_id, dependent_event_id)
1718 .await?)
1719 }
1720}
1721
1722#[derive(Clone, Debug)]
1724pub enum LocalEchoContent {
1725 Event {
1727 serialized_event: SerializableEventContent,
1730 send_handle: SendHandle,
1732 send_error: Option<QueueWedgeError>,
1735 },
1736
1737 React {
1739 key: String,
1741 send_handle: SendReactionHandle,
1743 applies_to: OwnedTransactionId,
1745 },
1746}
1747
1748#[derive(Clone, Debug)]
1751pub struct LocalEcho {
1752 pub transaction_id: OwnedTransactionId,
1754 pub content: LocalEchoContent,
1756}
1757
1758#[derive(Clone, Debug)]
1761pub enum RoomSendQueueUpdate {
1762 NewLocalEvent(LocalEcho),
1767
1768 CancelledLocalEvent {
1771 transaction_id: OwnedTransactionId,
1773 },
1774
1775 ReplacedLocalEvent {
1777 transaction_id: OwnedTransactionId,
1779
1780 new_content: SerializableEventContent,
1782 },
1783
1784 SendError {
1789 transaction_id: OwnedTransactionId,
1791 error: Arc<crate::Error>,
1793 is_recoverable: bool,
1799 },
1800
1801 RetryEvent {
1803 transaction_id: OwnedTransactionId,
1805 },
1806
1807 SentEvent {
1810 transaction_id: OwnedTransactionId,
1812 event_id: OwnedEventId,
1814 },
1815
1816 UploadedMedia {
1818 related_to: OwnedTransactionId,
1820
1821 file: MediaSource,
1823 },
1824}
1825
1826#[derive(Debug, thiserror::Error)]
1828pub enum RoomSendQueueError {
1829 #[error("the room isn't in the joined state")]
1831 RoomNotJoined,
1832
1833 #[error("the room is now missing from the client")]
1837 RoomDisappeared,
1838
1839 #[error(transparent)]
1841 StorageError(#[from] RoomSendQueueStorageError),
1842}
1843
1844#[derive(Debug, thiserror::Error)]
1846pub enum RoomSendQueueStorageError {
1847 #[error(transparent)]
1849 StateStoreError(#[from] StoreError),
1850
1851 #[error(transparent)]
1853 EventCacheStoreError(#[from] EventCacheStoreError),
1854
1855 #[error(transparent)]
1857 LockError(#[from] LockStoreError),
1858
1859 #[error(transparent)]
1861 JsonSerialization(#[from] serde_json::Error),
1862
1863 #[error("a dependent event had an invalid parent key type")]
1866 InvalidParentKey,
1867
1868 #[error("The client is shutting down.")]
1870 ClientShuttingDown,
1871
1872 #[error("This operation is not implemented for media uploads")]
1874 OperationNotImplementedYet,
1875
1876 #[error("Can't edit a media caption when the underlying event isn't a media")]
1878 InvalidMediaCaptionEdit,
1879}
1880
1881#[derive(Clone, Debug)]
1883struct MediaHandles {
1884 upload_thumbnail_txn: Option<OwnedTransactionId>,
1888
1889 upload_file_txn: OwnedTransactionId,
1891}
1892
1893#[derive(Clone, Debug)]
1895pub struct SendHandle {
1896 room: RoomSendQueue,
1898
1899 transaction_id: OwnedTransactionId,
1904
1905 media_handles: Option<MediaHandles>,
1907
1908 pub created_at: MilliSecondsSinceUnixEpoch,
1910}
1911
1912impl SendHandle {
1913 fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
1914 if self.media_handles.is_some() {
1915 Err(RoomSendQueueStorageError::OperationNotImplementedYet)
1916 } else {
1917 Ok(())
1918 }
1919 }
1920
1921 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
1926 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
1927 trace!("received an abort request");
1928
1929 let queue = &self.room.inner.queue;
1930
1931 if let Some(handles) = &self.media_handles {
1932 if queue.abort_upload(&self.transaction_id, handles).await? {
1933 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
1935 transaction_id: self.transaction_id.clone(),
1936 });
1937
1938 return Ok(true);
1939 }
1940
1941 }
1945
1946 if queue.cancel_event(&self.transaction_id).await? {
1947 trace!("successful abort");
1948
1949 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
1951 transaction_id: self.transaction_id.clone(),
1952 });
1953
1954 Ok(true)
1955 } else {
1956 debug!("local echo didn't exist anymore, can't abort");
1957 Ok(false)
1958 }
1959 }
1960
1961 #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
1966 pub async fn edit_raw(
1967 &self,
1968 new_content: Raw<AnyMessageLikeEventContent>,
1969 event_type: String,
1970 ) -> Result<bool, RoomSendQueueStorageError> {
1971 trace!("received an edit request");
1972 self.nyi_for_uploads()?;
1973
1974 let serializable = SerializableEventContent::from_raw(new_content, event_type);
1975
1976 if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
1977 trace!("successful edit");
1978
1979 self.room.inner.notifier.notify_one();
1981
1982 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
1984 transaction_id: self.transaction_id.clone(),
1985 new_content: serializable,
1986 });
1987
1988 Ok(true)
1989 } else {
1990 debug!("local echo doesn't exist anymore, can't edit");
1991 Ok(false)
1992 }
1993 }
1994
1995 pub async fn edit(
2000 &self,
2001 new_content: AnyMessageLikeEventContent,
2002 ) -> Result<bool, RoomSendQueueStorageError> {
2003 self.edit_raw(
2004 Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2005 new_content.event_type().to_string(),
2006 )
2007 .await
2008 }
2009
2010 pub async fn edit_media_caption(
2015 &self,
2016 caption: Option<String>,
2017 formatted_caption: Option<FormattedBody>,
2018 mentions: Option<Mentions>,
2019 ) -> Result<bool, RoomSendQueueStorageError> {
2020 if let Some(new_content) = self
2021 .room
2022 .inner
2023 .queue
2024 .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2025 .await?
2026 {
2027 trace!("successful edit of media caption");
2028
2029 self.room.inner.notifier.notify_one();
2031
2032 let new_content = SerializableEventContent::new(&new_content)
2033 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2034
2035 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
2037 transaction_id: self.transaction_id.clone(),
2038 new_content,
2039 });
2040
2041 Ok(true)
2042 } else {
2043 debug!("local echo doesn't exist anymore, can't edit media caption");
2044 Ok(false)
2045 }
2046 }
2047
2048 pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2051 let room = &self.room.inner;
2052 room.queue
2053 .mark_as_unwedged(&self.transaction_id)
2054 .await
2055 .map_err(RoomSendQueueError::StorageError)?;
2056
2057 if let Some(handles) = &self.media_handles {
2065 room.queue
2066 .mark_as_unwedged(&handles.upload_file_txn)
2067 .await
2068 .map_err(RoomSendQueueError::StorageError)?;
2069
2070 if let Some(txn) = &handles.upload_thumbnail_txn {
2071 room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2072 }
2073 }
2074
2075 room.notifier.notify_one();
2077
2078 let _ = room
2079 .updates
2080 .send(RoomSendQueueUpdate::RetryEvent { transaction_id: self.transaction_id.clone() });
2081
2082 Ok(())
2083 }
2084
2085 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2090 pub async fn react(
2091 &self,
2092 key: String,
2093 ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2094 trace!("received an intent to react");
2095
2096 let created_at = MilliSecondsSinceUnixEpoch::now();
2097 if let Some(reaction_txn_id) =
2098 self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2099 {
2100 trace!("successfully queued react");
2101
2102 self.room.inner.notifier.notify_one();
2104
2105 let send_handle = SendReactionHandle {
2107 room: self.room.clone(),
2108 transaction_id: reaction_txn_id.clone(),
2109 };
2110
2111 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2112 transaction_id: reaction_txn_id.into(),
2115 content: LocalEchoContent::React {
2116 key,
2117 send_handle: send_handle.clone(),
2118 applies_to: self.transaction_id.clone(),
2119 },
2120 }));
2121
2122 Ok(Some(send_handle))
2123 } else {
2124 debug!("local echo doesn't exist anymore, can't react");
2125 Ok(None)
2126 }
2127 }
2128}
2129
2130#[derive(Clone, Debug)]
2132pub struct SendReactionHandle {
2133 room: RoomSendQueue,
2135 transaction_id: ChildTransactionId,
2137}
2138
2139impl SendReactionHandle {
2140 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2145 if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2146 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2150 transaction_id: self.transaction_id.clone().into(),
2151 });
2152
2153 return Ok(true);
2154 }
2155
2156 let handle = SendHandle {
2159 room: self.room.clone(),
2160 transaction_id: self.transaction_id.clone().into(),
2161 media_handles: None,
2162 created_at: MilliSecondsSinceUnixEpoch::now(),
2163 };
2164
2165 handle.abort().await
2166 }
2167
2168 pub fn transaction_id(&self) -> &TransactionId {
2170 &self.transaction_id
2171 }
2172}
2173
2174fn canonicalize_dependent_requests(
2178 dependent: &[DependentQueuedRequest],
2179) -> Vec<DependentQueuedRequest> {
2180 let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2181
2182 for d in dependent {
2183 let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2184
2185 if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2186 continue;
2189 }
2190
2191 match &d.kind {
2192 DependentQueuedRequestKind::EditEvent { .. } => {
2193 if let Some(prev_edit) = prevs
2195 .iter_mut()
2196 .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2197 {
2198 *prev_edit = d;
2199 } else {
2200 prevs.insert(0, d);
2201 }
2202 }
2203
2204 DependentQueuedRequestKind::UploadFileWithThumbnail { .. }
2205 | DependentQueuedRequestKind::FinishUpload { .. }
2206 | DependentQueuedRequestKind::ReactEvent { .. } => {
2207 prevs.push(d);
2209 }
2210
2211 DependentQueuedRequestKind::RedactEvent => {
2212 prevs.clear();
2214 prevs.push(d);
2215 }
2216 }
2217 }
2218
2219 by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2220}
2221
2222#[cfg(all(test, not(target_arch = "wasm32")))]
2223mod tests {
2224 use std::{sync::Arc, time::Duration};
2225
2226 use assert_matches2::{assert_let, assert_matches};
2227 use matrix_sdk_base::store::{
2228 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2229 SerializableEventContent,
2230 };
2231 use matrix_sdk_test::{async_test, JoinedRoomBuilder, SyncResponseBuilder};
2232 use ruma::{
2233 events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
2234 room_id, MilliSecondsSinceUnixEpoch, TransactionId,
2235 };
2236
2237 use super::canonicalize_dependent_requests;
2238 use crate::{client::WeakClient, test_utils::logged_in_client};
2239
2240 #[test]
2241 fn test_canonicalize_dependent_events_created_at() {
2242 let txn = TransactionId::new();
2245 let created_at = MilliSecondsSinceUnixEpoch::now();
2246
2247 let edit = DependentQueuedRequest {
2248 own_transaction_id: ChildTransactionId::new(),
2249 parent_transaction_id: txn.clone(),
2250 kind: DependentQueuedRequestKind::EditEvent {
2251 new_content: SerializableEventContent::new(
2252 &RoomMessageEventContent::text_plain("edit").into(),
2253 )
2254 .unwrap(),
2255 },
2256 parent_key: None,
2257 created_at,
2258 };
2259
2260 let res = canonicalize_dependent_requests(&[edit]);
2261
2262 assert_eq!(res.len(), 1);
2263 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2264 assert_let!(
2265 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2266 );
2267 assert_eq!(msg.body(), "edit");
2268 assert_eq!(res[0].parent_transaction_id, txn);
2269 assert_eq!(res[0].created_at, created_at);
2270 }
2271
2272 #[async_test]
2273 async fn test_client_no_cycle_with_send_queue() {
2274 for enabled in [true, false] {
2275 let client = logged_in_client(None).await;
2276 let weak_client = WeakClient::from_client(&client);
2277
2278 {
2279 let mut sync_response_builder = SyncResponseBuilder::new();
2280
2281 let room_id = room_id!("!a:b.c");
2282
2283 client
2285 .base_client()
2286 .receive_sync_response(
2287 sync_response_builder
2288 .add_joined_room(JoinedRoomBuilder::new(room_id))
2289 .build_sync_response(),
2290 )
2291 .await
2292 .unwrap();
2293
2294 let room = client.get_room(room_id).unwrap();
2295 let q = room.send_queue();
2296
2297 let _watcher = q.subscribe().await;
2298
2299 client.send_queue().set_enabled(enabled).await;
2300 }
2301
2302 drop(client);
2303
2304 tokio::time::sleep(Duration::from_millis(500)).await;
2306
2307 let client = weak_client.get();
2309 assert!(
2310 client.is_none(),
2311 "too many strong references to the client: {}",
2312 Arc::strong_count(&client.unwrap().inner)
2313 );
2314 }
2315 }
2316
2317 #[test]
2318 fn test_canonicalize_dependent_events_smoke_test() {
2319 let txn = TransactionId::new();
2321
2322 let edit = DependentQueuedRequest {
2323 own_transaction_id: ChildTransactionId::new(),
2324 parent_transaction_id: txn.clone(),
2325 kind: DependentQueuedRequestKind::EditEvent {
2326 new_content: SerializableEventContent::new(
2327 &RoomMessageEventContent::text_plain("edit").into(),
2328 )
2329 .unwrap(),
2330 },
2331 parent_key: None,
2332 created_at: MilliSecondsSinceUnixEpoch::now(),
2333 };
2334 let res = canonicalize_dependent_requests(&[edit]);
2335
2336 assert_eq!(res.len(), 1);
2337 assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2338 assert_eq!(res[0].parent_transaction_id, txn);
2339 assert!(res[0].parent_key.is_none());
2340 }
2341
2342 #[test]
2343 fn test_canonicalize_dependent_events_redaction_preferred() {
2344 let txn = TransactionId::new();
2346
2347 let mut inputs = Vec::with_capacity(100);
2348 let redact = DependentQueuedRequest {
2349 own_transaction_id: ChildTransactionId::new(),
2350 parent_transaction_id: txn.clone(),
2351 kind: DependentQueuedRequestKind::RedactEvent,
2352 parent_key: None,
2353 created_at: MilliSecondsSinceUnixEpoch::now(),
2354 };
2355
2356 let edit = DependentQueuedRequest {
2357 own_transaction_id: ChildTransactionId::new(),
2358 parent_transaction_id: txn.clone(),
2359 kind: DependentQueuedRequestKind::EditEvent {
2360 new_content: SerializableEventContent::new(
2361 &RoomMessageEventContent::text_plain("edit").into(),
2362 )
2363 .unwrap(),
2364 },
2365 parent_key: None,
2366 created_at: MilliSecondsSinceUnixEpoch::now(),
2367 };
2368
2369 inputs.push({
2370 let mut edit = edit.clone();
2371 edit.own_transaction_id = ChildTransactionId::new();
2372 edit
2373 });
2374
2375 inputs.push(redact);
2376
2377 for _ in 0..98 {
2378 let mut edit = edit.clone();
2379 edit.own_transaction_id = ChildTransactionId::new();
2380 inputs.push(edit);
2381 }
2382
2383 let res = canonicalize_dependent_requests(&inputs);
2384
2385 assert_eq!(res.len(), 1);
2386 assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
2387 assert_eq!(res[0].parent_transaction_id, txn);
2388 }
2389
2390 #[test]
2391 fn test_canonicalize_dependent_events_last_edit_preferred() {
2392 let parent_txn = TransactionId::new();
2393
2394 let inputs = (0..10)
2396 .map(|i| DependentQueuedRequest {
2397 own_transaction_id: ChildTransactionId::new(),
2398 parent_transaction_id: parent_txn.clone(),
2399 kind: DependentQueuedRequestKind::EditEvent {
2400 new_content: SerializableEventContent::new(
2401 &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
2402 )
2403 .unwrap(),
2404 },
2405 parent_key: None,
2406 created_at: MilliSecondsSinceUnixEpoch::now(),
2407 })
2408 .collect::<Vec<_>>();
2409
2410 let txn = inputs[9].parent_transaction_id.clone();
2411
2412 let res = canonicalize_dependent_requests(&inputs);
2413
2414 assert_eq!(res.len(), 1);
2415 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2416 assert_let!(
2417 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2418 );
2419 assert_eq!(msg.body(), "edit9");
2420 assert_eq!(res[0].parent_transaction_id, txn);
2421 }
2422
2423 #[test]
2424 fn test_canonicalize_multiple_local_echoes() {
2425 let txn1 = TransactionId::new();
2426 let txn2 = TransactionId::new();
2427
2428 let child1 = ChildTransactionId::new();
2429 let child2 = ChildTransactionId::new();
2430
2431 let inputs = vec![
2432 DependentQueuedRequest {
2434 own_transaction_id: child1.clone(),
2435 kind: DependentQueuedRequestKind::RedactEvent,
2436 parent_transaction_id: txn1.clone(),
2437 parent_key: None,
2438 created_at: MilliSecondsSinceUnixEpoch::now(),
2439 },
2440 DependentQueuedRequest {
2442 own_transaction_id: child2,
2443 kind: DependentQueuedRequestKind::EditEvent {
2444 new_content: SerializableEventContent::new(
2445 &RoomMessageEventContent::text_plain("edit").into(),
2446 )
2447 .unwrap(),
2448 },
2449 parent_transaction_id: txn2.clone(),
2450 parent_key: None,
2451 created_at: MilliSecondsSinceUnixEpoch::now(),
2452 },
2453 ];
2454
2455 let res = canonicalize_dependent_requests(&inputs);
2456
2457 assert_eq!(res.len(), 2);
2459
2460 for dependent in res {
2461 if dependent.own_transaction_id == child1 {
2462 assert_eq!(dependent.parent_transaction_id, txn1);
2463 assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
2464 } else {
2465 assert_eq!(dependent.parent_transaction_id, txn2);
2466 assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
2467 }
2468 }
2469 }
2470
2471 #[test]
2472 fn test_canonicalize_reactions_after_edits() {
2473 let txn = TransactionId::new();
2475
2476 let react_id = ChildTransactionId::new();
2477 let react = DependentQueuedRequest {
2478 own_transaction_id: react_id.clone(),
2479 kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
2480 parent_transaction_id: txn.clone(),
2481 parent_key: None,
2482 created_at: MilliSecondsSinceUnixEpoch::now(),
2483 };
2484
2485 let edit_id = ChildTransactionId::new();
2486 let edit = DependentQueuedRequest {
2487 own_transaction_id: edit_id.clone(),
2488 kind: DependentQueuedRequestKind::EditEvent {
2489 new_content: SerializableEventContent::new(
2490 &RoomMessageEventContent::text_plain("edit").into(),
2491 )
2492 .unwrap(),
2493 },
2494 parent_transaction_id: txn,
2495 parent_key: None,
2496 created_at: MilliSecondsSinceUnixEpoch::now(),
2497 };
2498
2499 let res = canonicalize_dependent_requests(&[react, edit]);
2500
2501 assert_eq!(res.len(), 2);
2502 assert_eq!(res[0].own_transaction_id, edit_id);
2503 assert_eq!(res[1].own_transaction_id, react_id);
2504 }
2505}