1use std::{
132 collections::{BTreeMap, HashMap},
133 str::FromStr as _,
134 sync::{
135 atomic::{AtomicBool, Ordering},
136 Arc, RwLock,
137 },
138};
139
140use as_variant::as_variant;
141use matrix_sdk_base::{
142 event_cache::store::EventCacheStoreError,
143 media::MediaRequestParameters,
144 store::{
145 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
146 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
147 SentMediaInfo, SentRequestKey, SerializableEventContent,
148 },
149 store_locks::LockStoreError,
150 RoomState, StoreError,
151};
152use matrix_sdk_common::executor::{spawn, JoinHandle};
153use mime::Mime;
154use ruma::{
155 events::{
156 reaction::ReactionEventContent,
157 relation::Annotation,
158 room::{
159 message::{FormattedBody, RoomMessageEventContent},
160 MediaSource,
161 },
162 AnyMessageLikeEventContent, EventContent as _, Mentions,
163 },
164 serde::Raw,
165 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
166};
167use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
168use tracing::{debug, error, info, instrument, trace, warn};
169
170#[cfg(feature = "e2e-encryption")]
171use crate::crypto::{OlmError, SessionRecipientCollectionError};
172use crate::{
173 client::WeakClient,
174 config::RequestConfig,
175 error::RetryKind,
176 room::{edit::EditedContent, WeakRoom},
177 Client, Media, Room,
178};
179
180mod upload;
181
182pub struct SendQueue {
184 client: Client,
185}
186
187#[cfg(not(tarpaulin_include))]
188impl std::fmt::Debug for SendQueue {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 f.debug_struct("SendQueue").finish_non_exhaustive()
191 }
192}
193
194impl SendQueue {
195 pub(super) fn new(client: Client) -> Self {
196 Self { client }
197 }
198
199 pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
202 if !self.is_enabled() {
203 return;
204 }
205
206 let room_ids =
207 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
208 |err| {
209 warn!("error when loading rooms with unsent requests: {err}");
210 Vec::new()
211 },
212 );
213
214 for room_id in room_ids {
216 if let Some(room) = self.client.get_room(&room_id) {
217 let _ = self.for_room(room);
218 }
219 }
220 }
221
222 #[inline(always)]
224 fn data(&self) -> &SendQueueData {
225 &self.client.inner.send_queue_data
226 }
227
228 fn for_room(&self, room: Room) -> RoomSendQueue {
231 let data = self.data();
232
233 let mut map = data.rooms.write().unwrap();
234
235 let room_id = room.room_id();
236 if let Some(room_q) = map.get(room_id).cloned() {
237 return room_q;
238 }
239
240 let owned_room_id = room_id.to_owned();
241 let room_q = RoomSendQueue::new(
242 self.is_enabled(),
243 data.error_reporter.clone(),
244 data.is_dropping.clone(),
245 &self.client,
246 owned_room_id.clone(),
247 );
248
249 map.insert(owned_room_id, room_q.clone());
250
251 room_q
252 }
253
254 pub async fn set_enabled(&self, enabled: bool) {
264 debug!(?enabled, "setting global send queue enablement");
265
266 self.data().globally_enabled.store(enabled, Ordering::SeqCst);
267
268 for room in self.data().rooms.read().unwrap().values() {
270 room.set_enabled(enabled);
271 }
272
273 self.respawn_tasks_for_rooms_with_unsent_requests().await;
276 }
277
278 pub fn is_enabled(&self) -> bool {
281 self.data().globally_enabled.load(Ordering::SeqCst)
282 }
283
284 pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
287 self.data().error_reporter.subscribe()
288 }
289}
290
291#[derive(Clone, Debug)]
293pub struct SendQueueRoomError {
294 pub room_id: OwnedRoomId,
296
297 pub error: Arc<crate::Error>,
299
300 pub is_recoverable: bool,
306}
307
308impl Client {
309 pub fn send_queue(&self) -> SendQueue {
312 SendQueue::new(self.clone())
313 }
314}
315
316pub(super) struct SendQueueData {
317 rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
319
320 globally_enabled: AtomicBool,
325
326 error_reporter: broadcast::Sender<SendQueueRoomError>,
328
329 is_dropping: Arc<AtomicBool>,
331}
332
333impl SendQueueData {
334 pub fn new(globally_enabled: bool) -> Self {
336 let (sender, _) = broadcast::channel(32);
337
338 Self {
339 rooms: Default::default(),
340 globally_enabled: AtomicBool::new(globally_enabled),
341 error_reporter: sender,
342 is_dropping: Arc::new(false.into()),
343 }
344 }
345}
346
347impl Drop for SendQueueData {
348 fn drop(&mut self) {
349 debug!("globally dropping the send queue");
352 self.is_dropping.store(true, Ordering::SeqCst);
353
354 let rooms = self.rooms.read().unwrap();
355 for room in rooms.values() {
356 room.inner.notifier.notify_one();
357 }
358 }
359}
360
361impl Room {
362 pub fn send_queue(&self) -> RoomSendQueue {
364 self.client.send_queue().for_room(self.clone())
365 }
366}
367
368#[derive(Clone)]
372pub struct RoomSendQueue {
373 inner: Arc<RoomSendQueueInner>,
374}
375
376#[cfg(not(tarpaulin_include))]
377impl std::fmt::Debug for RoomSendQueue {
378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379 f.debug_struct("RoomSendQueue").finish_non_exhaustive()
380 }
381}
382
383impl RoomSendQueue {
384 fn new(
385 globally_enabled: bool,
386 global_error_reporter: broadcast::Sender<SendQueueRoomError>,
387 is_dropping: Arc<AtomicBool>,
388 client: &Client,
389 room_id: OwnedRoomId,
390 ) -> Self {
391 let (updates_sender, _) = broadcast::channel(32);
392
393 let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
394 let notifier = Arc::new(Notify::new());
395
396 let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
397 let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
398
399 let task = spawn(Self::sending_task(
400 weak_room.clone(),
401 queue.clone(),
402 notifier.clone(),
403 updates_sender.clone(),
404 locally_enabled.clone(),
405 global_error_reporter,
406 is_dropping,
407 ));
408
409 Self {
410 inner: Arc::new(RoomSendQueueInner {
411 room: weak_room,
412 updates: updates_sender,
413 _task: task,
414 queue,
415 notifier,
416 locally_enabled,
417 }),
418 }
419 }
420
421 pub async fn send_raw(
436 &self,
437 content: Raw<AnyMessageLikeEventContent>,
438 event_type: String,
439 ) -> Result<SendHandle, RoomSendQueueError> {
440 let Some(room) = self.inner.room.get() else {
441 return Err(RoomSendQueueError::RoomDisappeared);
442 };
443 if room.state() != RoomState::Joined {
444 return Err(RoomSendQueueError::RoomNotJoined);
445 }
446
447 let content = SerializableEventContent::from_raw(content, event_type);
448
449 let created_at = MilliSecondsSinceUnixEpoch::now();
450 let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
451 trace!(%transaction_id, "manager sends a raw event to the background task");
452
453 self.inner.notifier.notify_one();
454
455 let send_handle = SendHandle {
456 room: self.clone(),
457 transaction_id: transaction_id.clone(),
458 media_handles: vec![],
459 created_at,
460 };
461
462 let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
463 transaction_id,
464 content: LocalEchoContent::Event {
465 serialized_event: content,
466 send_handle: send_handle.clone(),
467 send_error: None,
468 },
469 }));
470
471 Ok(send_handle)
472 }
473
474 pub async fn send(
489 &self,
490 content: AnyMessageLikeEventContent,
491 ) -> Result<SendHandle, RoomSendQueueError> {
492 self.send_raw(
493 Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
494 content.event_type().to_string(),
495 )
496 .await
497 }
498
499 pub async fn subscribe(
502 &self,
503 ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
504 {
505 let local_echoes = self.inner.queue.local_echoes(self).await?;
506
507 Ok((local_echoes, self.inner.updates.subscribe()))
508 }
509
510 #[instrument(skip_all, fields(room_id = %room.room_id()))]
516 async fn sending_task(
517 room: WeakRoom,
518 queue: QueueStorage,
519 notifier: Arc<Notify>,
520 updates: broadcast::Sender<RoomSendQueueUpdate>,
521 locally_enabled: Arc<AtomicBool>,
522 global_error_reporter: broadcast::Sender<SendQueueRoomError>,
523 is_dropping: Arc<AtomicBool>,
524 ) {
525 trace!("spawned the sending task");
526
527 loop {
528 if is_dropping.load(Ordering::SeqCst) {
530 trace!("shutting down!");
531 break;
532 }
533
534 let mut new_updates = Vec::new();
537 if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
538 warn!("errors when applying dependent requests: {err}");
539 }
540
541 for up in new_updates {
542 let _ = updates.send(up);
543 }
544
545 if !locally_enabled.load(Ordering::SeqCst) {
546 trace!("not enabled, sleeping");
547 notifier.notified().await;
549 continue;
550 }
551
552 let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
553 Ok(Some(request)) => request,
554
555 Ok(None) => {
556 trace!("queue is empty, sleeping");
557 notifier.notified().await;
559 continue;
560 }
561
562 Err(err) => {
563 warn!("error when loading next request to send: {err}");
564 continue;
565 }
566 };
567
568 let txn_id = queued_request.transaction_id.clone();
569 trace!(txn_id = %txn_id, "received a request to send!");
570
571 let related_txn_id = as_variant!(&queued_request.kind, QueuedRequestKind::MediaUpload { related_to, .. } => related_to.clone());
572
573 let Some(room) = room.get() else {
574 if is_dropping.load(Ordering::SeqCst) {
575 break;
576 }
577 error!("the weak room couldn't be upgraded but we're not shutting down?");
578 continue;
579 };
580
581 match Self::handle_request(&room, queued_request, cancel_upload_rx).await {
582 Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
583 {
584 Ok(()) => match parent_key {
585 SentRequestKey::Event(event_id) => {
586 let _ = updates.send(RoomSendQueueUpdate::SentEvent {
587 transaction_id: txn_id,
588 event_id,
589 });
590 }
591
592 SentRequestKey::Media(media_info) => {
593 let _ = updates.send(RoomSendQueueUpdate::UploadedMedia {
594 related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
595 file: media_info.file,
596 });
597 }
598 },
599
600 Err(err) => {
601 warn!("unable to mark queued request as sent: {err}");
602 }
603 },
604
605 Ok(None) => {
606 debug!("Request has been aborted while running, continuing.");
607 }
608
609 Err(err) => {
610 let is_recoverable = match err {
611 crate::Error::Http(ref http_err) => {
612 matches!(
614 http_err.retry_kind(),
615 RetryKind::Transient { .. } | RetryKind::NetworkFailure
616 )
617 }
618
619 crate::Error::ConcurrentRequestFailed => true,
624
625 _ => false,
627 };
628
629 locally_enabled.store(false, Ordering::SeqCst);
631
632 if is_recoverable {
633 warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
634
635 queue.mark_as_not_being_sent(&txn_id).await;
638
639 } else {
645 warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
646
647 if let Err(storage_error) =
649 queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
650 {
651 warn!("unable to mark request as wedged: {storage_error}");
652 }
653 }
654
655 let error = Arc::new(err);
656
657 let _ = global_error_reporter.send(SendQueueRoomError {
658 room_id: room.room_id().to_owned(),
659 error: error.clone(),
660 is_recoverable,
661 });
662
663 let _ = updates.send(RoomSendQueueUpdate::SendError {
664 transaction_id: related_txn_id.unwrap_or(txn_id),
665 error,
666 is_recoverable,
667 });
668 }
669 }
670 }
671
672 info!("exited sending task");
673 }
674
675 async fn handle_request(
679 room: &Room,
680 request: QueuedRequest,
681 cancel_upload_rx: Option<oneshot::Receiver<()>>,
682 ) -> Result<Option<SentRequestKey>, crate::Error> {
683 match request.kind {
684 QueuedRequestKind::Event { content } => {
685 let (event, event_type) = content.raw();
686
687 let res = room
688 .send_raw(event_type, event)
689 .with_transaction_id(&request.transaction_id)
690 .with_request_config(RequestConfig::short_retry())
691 .await?;
692
693 trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
694 Ok(Some(SentRequestKey::Event(res.event_id)))
695 }
696
697 QueuedRequestKind::MediaUpload {
698 content_type,
699 cache_key,
700 thumbnail_source,
701 related_to: relates_to,
702 } => {
703 trace!(%relates_to, "uploading media related to event");
704
705 let fut = async move {
706 let mime = Mime::from_str(&content_type).map_err(|_| {
707 crate::Error::SendQueueWedgeError(Box::new(
708 QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
709 ))
710 })?;
711
712 let data = room
713 .client()
714 .event_cache_store()
715 .lock()
716 .await?
717 .get_media_content(&cache_key)
718 .await?
719 .ok_or(crate::Error::SendQueueWedgeError(Box::new(
720 QueueWedgeError::MissingMediaContent,
721 )))?;
722
723 #[cfg(feature = "e2e-encryption")]
724 let media_source = if room.latest_encryption_state().await?.is_encrypted() {
725 trace!("upload will be encrypted (encrypted room)");
726 let mut cursor = std::io::Cursor::new(data);
727 let encrypted_file = room
728 .client()
729 .upload_encrypted_file(&mime, &mut cursor)
730 .with_request_config(RequestConfig::short_retry())
731 .await?;
732 MediaSource::Encrypted(Box::new(encrypted_file))
733 } else {
734 trace!("upload will be in clear text (room without encryption)");
735 let request_config = RequestConfig::short_retry()
736 .timeout(Media::reasonable_upload_timeout(&data));
737 let res =
738 room.client().media().upload(&mime, data, Some(request_config)).await?;
739 MediaSource::Plain(res.content_uri)
740 };
741
742 #[cfg(not(feature = "e2e-encryption"))]
743 let media_source = {
744 let request_config = RequestConfig::short_retry()
745 .timeout(Media::reasonable_upload_timeout(&data));
746 let res =
747 room.client().media().upload(&mime, data, Some(request_config)).await?;
748 MediaSource::Plain(res.content_uri)
749 };
750
751 let uri = match &media_source {
752 MediaSource::Plain(uri) => uri,
753 MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
754 };
755 trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
756
757 Ok(SentRequestKey::Media(SentMediaInfo {
758 file: media_source,
759 thumbnail: thumbnail_source,
760 }))
761 };
762
763 let wait_for_cancel = async move {
764 if let Some(rx) = cancel_upload_rx {
765 rx.await
766 } else {
767 std::future::pending().await
768 }
769 };
770
771 tokio::select! {
772 biased;
773
774 _ = wait_for_cancel => {
775 Ok(None)
776 }
777
778 res = fut => {
779 res.map(Some)
780 }
781 }
782 }
783 }
784 }
785
786 pub fn is_enabled(&self) -> bool {
788 self.inner.locally_enabled.load(Ordering::SeqCst)
789 }
790
791 pub fn set_enabled(&self, enabled: bool) {
793 self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
794
795 if enabled {
798 self.inner.notifier.notify_one();
799 }
800 }
801}
802
803impl From<&crate::Error> for QueueWedgeError {
804 fn from(value: &crate::Error) -> Self {
805 match value {
806 #[cfg(feature = "e2e-encryption")]
807 crate::Error::OlmError(error) => match &**error {
808 OlmError::SessionRecipientCollectionError(error) => match error {
809 SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
810 QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
811 }
812
813 SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
814 QueueWedgeError::IdentityViolations { users: users.clone() }
815 }
816
817 SessionRecipientCollectionError::CrossSigningNotSetup
818 | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
819 QueueWedgeError::CrossVerificationRequired
820 }
821 },
822 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
823 },
824
825 crate::Error::SendQueueWedgeError(error) => *error.clone(),
827
828 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
829 }
830 }
831}
832
833struct RoomSendQueueInner {
834 room: WeakRoom,
836
837 updates: broadcast::Sender<RoomSendQueueUpdate>,
841
842 queue: QueueStorage,
849
850 notifier: Arc<Notify>,
853
854 locally_enabled: Arc<AtomicBool>,
857
858 _task: JoinHandle<()>,
861}
862
863struct BeingSentInfo {
865 transaction_id: OwnedTransactionId,
867
868 cancel_upload: Option<oneshot::Sender<()>>,
871}
872
873impl BeingSentInfo {
874 fn cancel_upload(self) -> bool {
879 if let Some(cancel_upload) = self.cancel_upload {
880 let _ = cancel_upload.send(());
881 true
882 } else {
883 false
884 }
885 }
886}
887
888#[derive(Clone)]
891struct StoreLock {
892 client: WeakClient,
894
895 being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
900}
901
902impl StoreLock {
903 async fn lock(&self) -> StoreLockGuard {
905 StoreLockGuard {
906 client: self.client.clone(),
907 being_sent: self.being_sent.clone().lock_owned().await,
908 }
909 }
910}
911
912struct StoreLockGuard {
915 client: WeakClient,
917
918 being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
921}
922
923impl StoreLockGuard {
924 fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
926 self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
927 }
928}
929
930#[derive(Clone)]
931struct QueueStorage {
932 store: StoreLock,
935
936 room_id: OwnedRoomId,
938}
939
940impl QueueStorage {
941 const LOW_PRIORITY: usize = 0;
943
944 const HIGH_PRIORITY: usize = 10;
946
947 fn new(client: WeakClient, room: OwnedRoomId) -> Self {
949 Self { room_id: room, store: StoreLock { client, being_sent: Default::default() } }
950 }
951
952 async fn push(
956 &self,
957 request: QueuedRequestKind,
958 created_at: MilliSecondsSinceUnixEpoch,
959 ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
960 let transaction_id = TransactionId::new();
961
962 self.store
963 .lock()
964 .await
965 .client()?
966 .state_store()
967 .save_send_queue_request(
968 &self.room_id,
969 transaction_id.clone(),
970 created_at,
971 request,
972 Self::LOW_PRIORITY,
973 )
974 .await?;
975
976 Ok(transaction_id)
977 }
978
979 async fn peek_next_to_send(
984 &self,
985 ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
986 {
987 let mut guard = self.store.lock().await;
988 let queued_requests =
989 guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
990
991 if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
992 let (cancel_upload_tx, cancel_upload_rx) =
993 if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
994 let (tx, rx) = oneshot::channel();
995 (Some(tx), Some(rx))
996 } else {
997 Default::default()
998 };
999
1000 let prev = guard.being_sent.replace(BeingSentInfo {
1001 transaction_id: request.transaction_id.clone(),
1002 cancel_upload: cancel_upload_tx,
1003 });
1004
1005 if let Some(prev) = prev {
1006 error!(
1007 prev_txn = ?prev.transaction_id,
1008 "a previous request was still active while picking a new one"
1009 );
1010 }
1011
1012 Ok(Some((request.clone(), cancel_upload_rx)))
1013 } else {
1014 Ok(None)
1015 }
1016 }
1017
1018 async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1022 let was_being_sent = self.store.lock().await.being_sent.take();
1023
1024 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1025 if prev_txn != Some(transaction_id) {
1026 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1027 }
1028 }
1029
1030 async fn mark_as_wedged(
1034 &self,
1035 transaction_id: &TransactionId,
1036 reason: QueueWedgeError,
1037 ) -> Result<(), RoomSendQueueStorageError> {
1038 let mut guard = self.store.lock().await;
1040 let was_being_sent = guard.being_sent.take();
1041
1042 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1043 if prev_txn != Some(transaction_id) {
1044 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after permanent error)");
1045 }
1046
1047 Ok(guard
1048 .client()?
1049 .state_store()
1050 .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1051 .await?)
1052 }
1053
1054 async fn mark_as_unwedged(
1057 &self,
1058 transaction_id: &TransactionId,
1059 ) -> Result<(), RoomSendQueueStorageError> {
1060 Ok(self
1061 .store
1062 .lock()
1063 .await
1064 .client()?
1065 .state_store()
1066 .update_send_queue_request_status(&self.room_id, transaction_id, None)
1067 .await?)
1068 }
1069
1070 async fn mark_as_sent(
1073 &self,
1074 transaction_id: &TransactionId,
1075 parent_key: SentRequestKey,
1076 ) -> Result<(), RoomSendQueueStorageError> {
1077 let mut guard = self.store.lock().await;
1079 let was_being_sent = guard.being_sent.take();
1080
1081 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1082 if prev_txn != Some(transaction_id) {
1083 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after successful send");
1084 }
1085
1086 let client = guard.client()?;
1087 let store = client.state_store();
1088
1089 store
1091 .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1092 .await?;
1093
1094 let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1095
1096 if !removed {
1097 warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1098 }
1099
1100 Ok(())
1101 }
1102
1103 async fn cancel_event(
1110 &self,
1111 transaction_id: &TransactionId,
1112 ) -> Result<bool, RoomSendQueueStorageError> {
1113 let guard = self.store.lock().await;
1114
1115 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1116 == Some(transaction_id)
1117 {
1118 guard
1120 .client()?
1121 .state_store()
1122 .save_dependent_queued_request(
1123 &self.room_id,
1124 transaction_id,
1125 ChildTransactionId::new(),
1126 MilliSecondsSinceUnixEpoch::now(),
1127 DependentQueuedRequestKind::RedactEvent,
1128 )
1129 .await?;
1130
1131 return Ok(true);
1132 }
1133
1134 let removed = guard
1135 .client()?
1136 .state_store()
1137 .remove_send_queue_request(&self.room_id, transaction_id)
1138 .await?;
1139
1140 Ok(removed)
1141 }
1142
1143 async fn replace_event(
1150 &self,
1151 transaction_id: &TransactionId,
1152 serializable: SerializableEventContent,
1153 ) -> Result<bool, RoomSendQueueStorageError> {
1154 let guard = self.store.lock().await;
1155
1156 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1157 == Some(transaction_id)
1158 {
1159 guard
1161 .client()?
1162 .state_store()
1163 .save_dependent_queued_request(
1164 &self.room_id,
1165 transaction_id,
1166 ChildTransactionId::new(),
1167 MilliSecondsSinceUnixEpoch::now(),
1168 DependentQueuedRequestKind::EditEvent { new_content: serializable },
1169 )
1170 .await?;
1171
1172 return Ok(true);
1173 }
1174
1175 let edited = guard
1176 .client()?
1177 .state_store()
1178 .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1179 .await?;
1180
1181 Ok(edited)
1182 }
1183
1184 #[allow(clippy::too_many_arguments)]
1188 async fn push_media(
1189 &self,
1190 event: RoomMessageEventContent,
1191 content_type: Mime,
1192 send_event_txn: OwnedTransactionId,
1193 created_at: MilliSecondsSinceUnixEpoch,
1194 upload_file_txn: OwnedTransactionId,
1195 file_media_request: MediaRequestParameters,
1196 thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
1197 ) -> Result<(), RoomSendQueueStorageError> {
1198 let guard = self.store.lock().await;
1199 let client = guard.client()?;
1200 let store = client.state_store();
1201 let thumbnail_info =
1202 if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) =
1203 thumbnail
1204 {
1205 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1206
1207 store
1209 .save_send_queue_request(
1210 &self.room_id,
1211 upload_thumbnail_txn.clone(),
1212 created_at,
1213 QueuedRequestKind::MediaUpload {
1214 content_type: thumbnail_content_type.to_string(),
1215 cache_key: thumbnail_media_request,
1216 thumbnail_source: None, related_to: send_event_txn.clone(),
1218 },
1219 Self::LOW_PRIORITY,
1220 )
1221 .await?;
1222
1223 store
1225 .save_dependent_queued_request(
1226 &self.room_id,
1227 &upload_thumbnail_txn,
1228 upload_file_txn.clone().into(),
1229 created_at,
1230 DependentQueuedRequestKind::UploadFileWithThumbnail {
1231 content_type: content_type.to_string(),
1232 cache_key: file_media_request,
1233 related_to: send_event_txn.clone(),
1234 },
1235 )
1236 .await?;
1237
1238 Some(thumbnail_info)
1239 } else {
1240 store
1242 .save_send_queue_request(
1243 &self.room_id,
1244 upload_file_txn.clone(),
1245 created_at,
1246 QueuedRequestKind::MediaUpload {
1247 content_type: content_type.to_string(),
1248 cache_key: file_media_request,
1249 thumbnail_source: None,
1250 related_to: send_event_txn.clone(),
1251 },
1252 Self::LOW_PRIORITY,
1253 )
1254 .await?;
1255
1256 None
1257 };
1258
1259 store
1261 .save_dependent_queued_request(
1262 &self.room_id,
1263 &upload_file_txn,
1264 send_event_txn.into(),
1265 created_at,
1266 DependentQueuedRequestKind::FinishUpload {
1267 local_echo: Box::new(event),
1268 file_upload: upload_file_txn.clone(),
1269 thumbnail_info,
1270 },
1271 )
1272 .await?;
1273
1274 Ok(())
1275 }
1276
1277 #[instrument(skip(self))]
1279 async fn react(
1280 &self,
1281 transaction_id: &TransactionId,
1282 key: String,
1283 created_at: MilliSecondsSinceUnixEpoch,
1284 ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1285 let guard = self.store.lock().await;
1286 let client = guard.client()?;
1287 let store = client.state_store();
1288
1289 let requests = store.load_send_queue_requests(&self.room_id).await?;
1290
1291 if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1293 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1296 if !dependent_requests
1297 .into_iter()
1298 .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1299 .any(|child_txn| *child_txn == *transaction_id)
1300 {
1301 return Ok(None);
1303 }
1304 }
1305
1306 let reaction_txn_id = ChildTransactionId::new();
1308 store
1309 .save_dependent_queued_request(
1310 &self.room_id,
1311 transaction_id,
1312 reaction_txn_id.clone(),
1313 created_at,
1314 DependentQueuedRequestKind::ReactEvent { key },
1315 )
1316 .await?;
1317
1318 Ok(Some(reaction_txn_id))
1319 }
1320
1321 async fn local_echoes(
1324 &self,
1325 room: &RoomSendQueue,
1326 ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1327 let guard = self.store.lock().await;
1328 let client = guard.client()?;
1329 let store = client.state_store();
1330
1331 let local_requests =
1332 store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1333 Some(LocalEcho {
1334 transaction_id: queued.transaction_id.clone(),
1335 content: match queued.kind {
1336 QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1337 serialized_event: content,
1338 send_handle: SendHandle {
1339 room: room.clone(),
1340 transaction_id: queued.transaction_id,
1341 media_handles: vec![],
1342 created_at: queued.created_at,
1343 },
1344 send_error: queued.error,
1345 },
1346
1347 QueuedRequestKind::MediaUpload { .. } => {
1348 return None;
1351 }
1352 },
1353 })
1354 });
1355
1356 let reactions_and_medias = store
1357 .load_dependent_queued_requests(&self.room_id)
1358 .await?
1359 .into_iter()
1360 .filter_map(|dep| match dep.kind {
1361 DependentQueuedRequestKind::EditEvent { .. }
1362 | DependentQueuedRequestKind::RedactEvent => {
1363 None
1365 }
1366
1367 DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1368 transaction_id: dep.own_transaction_id.clone().into(),
1369 content: LocalEchoContent::React {
1370 key,
1371 send_handle: SendReactionHandle {
1372 room: room.clone(),
1373 transaction_id: dep.own_transaction_id,
1374 },
1375 applies_to: dep.parent_transaction_id,
1376 },
1377 }),
1378
1379 DependentQueuedRequestKind::UploadFileWithThumbnail { .. } => {
1380 None
1382 }
1383
1384 DependentQueuedRequestKind::FinishUpload {
1385 local_echo,
1386 file_upload,
1387 thumbnail_info,
1388 } => {
1389 Some(LocalEcho {
1391 transaction_id: dep.own_transaction_id.clone().into(),
1392 content: LocalEchoContent::Event {
1393 serialized_event: SerializableEventContent::new(&(*local_echo).into())
1394 .ok()?,
1395 send_handle: SendHandle {
1396 room: room.clone(),
1397 transaction_id: dep.own_transaction_id.into(),
1398 media_handles: vec![MediaHandles {
1399 upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1400 upload_file_txn: file_upload,
1401 }],
1402 created_at: dep.created_at,
1403 },
1404 send_error: None,
1405 },
1406 })
1407 }
1408 });
1409
1410 Ok(local_requests.chain(reactions_and_medias).collect())
1411 }
1412
1413 #[instrument(skip_all)]
1421 async fn try_apply_single_dependent_request(
1422 &self,
1423 client: &Client,
1424 dependent_request: DependentQueuedRequest,
1425 new_updates: &mut Vec<RoomSendQueueUpdate>,
1426 ) -> Result<bool, RoomSendQueueError> {
1427 let store = client.state_store();
1428
1429 let parent_key = dependent_request.parent_key;
1430
1431 match dependent_request.kind {
1432 DependentQueuedRequestKind::EditEvent { new_content } => {
1433 if let Some(parent_key) = parent_key {
1434 let Some(event_id) = parent_key.into_event_id() else {
1435 return Err(RoomSendQueueError::StorageError(
1436 RoomSendQueueStorageError::InvalidParentKey,
1437 ));
1438 };
1439
1440 let room = client
1442 .get_room(&self.room_id)
1443 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1444
1445 let edited_content = match new_content.deserialize() {
1449 Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
1450 EditedContent::RoomMessage(c.into())
1452 }
1453
1454 Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
1455 let poll_start = c.poll_start().clone();
1456 EditedContent::PollStart {
1457 fallback_text: poll_start.question.text.clone(),
1458 new_content: poll_start,
1459 }
1460 }
1461
1462 Ok(c) => {
1463 warn!("Unsupported edit content type: {:?}", c.event_type());
1464 return Ok(true);
1465 }
1466
1467 Err(err) => {
1468 warn!("Unable to deserialize: {err}");
1469 return Ok(true);
1470 }
1471 };
1472
1473 let edit_event = match room.make_edit_event(&event_id, edited_content).await {
1474 Ok(e) => e,
1475 Err(err) => {
1476 warn!("couldn't create edited event: {err}");
1477 return Ok(true);
1478 }
1479 };
1480
1481 let serializable = SerializableEventContent::from_raw(
1483 Raw::new(&edit_event)
1484 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1485 edit_event.event_type().to_string(),
1486 );
1487
1488 store
1489 .save_send_queue_request(
1490 &self.room_id,
1491 dependent_request.own_transaction_id.into(),
1492 dependent_request.created_at,
1493 serializable.into(),
1494 Self::HIGH_PRIORITY,
1495 )
1496 .await
1497 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1498 } else {
1499 let edited = store
1501 .update_send_queue_request(
1502 &self.room_id,
1503 &dependent_request.parent_transaction_id,
1504 new_content.into(),
1505 )
1506 .await
1507 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1508
1509 if !edited {
1510 warn!("missing local echo upon dependent edit");
1511 }
1512 }
1513 }
1514
1515 DependentQueuedRequestKind::RedactEvent => {
1516 if let Some(parent_key) = parent_key {
1517 let Some(event_id) = parent_key.into_event_id() else {
1518 return Err(RoomSendQueueError::StorageError(
1519 RoomSendQueueStorageError::InvalidParentKey,
1520 ));
1521 };
1522
1523 let room = client
1525 .get_room(&self.room_id)
1526 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1527
1528 if let Err(err) = room
1536 .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
1537 .await
1538 {
1539 warn!("error when sending a redact for {event_id}: {err}");
1540 return Ok(false);
1541 }
1542 } else {
1543 let removed = store
1546 .remove_send_queue_request(
1547 &self.room_id,
1548 &dependent_request.parent_transaction_id,
1549 )
1550 .await
1551 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1552
1553 if !removed {
1554 warn!("missing local echo upon dependent redact");
1555 }
1556 }
1557 }
1558
1559 DependentQueuedRequestKind::ReactEvent { key } => {
1560 if let Some(parent_key) = parent_key {
1561 let Some(parent_event_id) = parent_key.into_event_id() else {
1562 return Err(RoomSendQueueError::StorageError(
1563 RoomSendQueueStorageError::InvalidParentKey,
1564 ));
1565 };
1566
1567 let react_event =
1569 ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
1570 let serializable = SerializableEventContent::from_raw(
1571 Raw::new(&react_event)
1572 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1573 react_event.event_type().to_string(),
1574 );
1575
1576 store
1577 .save_send_queue_request(
1578 &self.room_id,
1579 dependent_request.own_transaction_id.into(),
1580 dependent_request.created_at,
1581 serializable.into(),
1582 Self::HIGH_PRIORITY,
1583 )
1584 .await
1585 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1586 } else {
1587 return Ok(false);
1589 }
1590 }
1591
1592 DependentQueuedRequestKind::UploadFileWithThumbnail {
1593 content_type,
1594 cache_key,
1595 related_to,
1596 } => {
1597 let Some(parent_key) = parent_key else {
1598 return Ok(false);
1600 };
1601 self.handle_dependent_file_upload_with_thumbnail(
1602 client,
1603 dependent_request.own_transaction_id.into(),
1604 parent_key,
1605 content_type,
1606 cache_key,
1607 related_to,
1608 )
1609 .await?;
1610 }
1611
1612 DependentQueuedRequestKind::FinishUpload {
1613 local_echo,
1614 file_upload,
1615 thumbnail_info,
1616 } => {
1617 let Some(parent_key) = parent_key else {
1618 return Ok(false);
1620 };
1621 self.handle_dependent_finish_upload(
1622 client,
1623 dependent_request.own_transaction_id.into(),
1624 parent_key,
1625 *local_echo,
1626 file_upload,
1627 thumbnail_info,
1628 new_updates,
1629 )
1630 .await?;
1631 }
1632 }
1633
1634 Ok(true)
1635 }
1636
1637 #[instrument(skip(self))]
1638 async fn apply_dependent_requests(
1639 &self,
1640 new_updates: &mut Vec<RoomSendQueueUpdate>,
1641 ) -> Result<(), RoomSendQueueError> {
1642 let guard = self.store.lock().await;
1643
1644 let client = guard.client()?;
1645 let store = client.state_store();
1646
1647 let dependent_requests = store
1648 .load_dependent_queued_requests(&self.room_id)
1649 .await
1650 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1651
1652 let num_initial_dependent_requests = dependent_requests.len();
1653 if num_initial_dependent_requests == 0 {
1654 return Ok(());
1656 }
1657
1658 let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
1659
1660 for original in &dependent_requests {
1662 if !canonicalized_dependent_requests
1663 .iter()
1664 .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
1665 {
1666 store
1667 .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
1668 .await
1669 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1670 }
1671 }
1672
1673 let mut num_dependent_requests = canonicalized_dependent_requests.len();
1674
1675 debug!(
1676 num_dependent_requests,
1677 num_initial_dependent_requests, "starting handling of dependent requests"
1678 );
1679
1680 for dependent in canonicalized_dependent_requests {
1681 let dependent_id = dependent.own_transaction_id.clone();
1682
1683 match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
1684 Ok(should_remove) => {
1685 if should_remove {
1686 store
1688 .remove_dependent_queued_request(&self.room_id, &dependent_id)
1689 .await
1690 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1691
1692 num_dependent_requests -= 1;
1693 }
1694 }
1695
1696 Err(err) => {
1697 warn!("error when applying single dependent request: {err}");
1698 }
1699 }
1700 }
1701
1702 debug!(
1703 leftover_dependent_requests = num_dependent_requests,
1704 "stopped handling dependent request"
1705 );
1706
1707 Ok(())
1708 }
1709
1710 async fn remove_dependent_send_queue_request(
1712 &self,
1713 dependent_event_id: &ChildTransactionId,
1714 ) -> Result<bool, RoomSendQueueStorageError> {
1715 Ok(self
1716 .store
1717 .lock()
1718 .await
1719 .client()?
1720 .state_store()
1721 .remove_dependent_queued_request(&self.room_id, dependent_event_id)
1722 .await?)
1723 }
1724}
1725
1726#[derive(Clone, Debug)]
1728pub enum LocalEchoContent {
1729 Event {
1731 serialized_event: SerializableEventContent,
1734 send_handle: SendHandle,
1736 send_error: Option<QueueWedgeError>,
1739 },
1740
1741 React {
1743 key: String,
1745 send_handle: SendReactionHandle,
1747 applies_to: OwnedTransactionId,
1749 },
1750}
1751
1752#[derive(Clone, Debug)]
1755pub struct LocalEcho {
1756 pub transaction_id: OwnedTransactionId,
1758 pub content: LocalEchoContent,
1760}
1761
1762#[derive(Clone, Debug)]
1765pub enum RoomSendQueueUpdate {
1766 NewLocalEvent(LocalEcho),
1771
1772 CancelledLocalEvent {
1775 transaction_id: OwnedTransactionId,
1777 },
1778
1779 ReplacedLocalEvent {
1781 transaction_id: OwnedTransactionId,
1783
1784 new_content: SerializableEventContent,
1786 },
1787
1788 SendError {
1793 transaction_id: OwnedTransactionId,
1795 error: Arc<crate::Error>,
1797 is_recoverable: bool,
1803 },
1804
1805 RetryEvent {
1807 transaction_id: OwnedTransactionId,
1809 },
1810
1811 SentEvent {
1814 transaction_id: OwnedTransactionId,
1816 event_id: OwnedEventId,
1818 },
1819
1820 UploadedMedia {
1822 related_to: OwnedTransactionId,
1824
1825 file: MediaSource,
1827 },
1828}
1829
1830#[derive(Debug, thiserror::Error)]
1832pub enum RoomSendQueueError {
1833 #[error("the room isn't in the joined state")]
1835 RoomNotJoined,
1836
1837 #[error("the room is now missing from the client")]
1841 RoomDisappeared,
1842
1843 #[error(transparent)]
1845 StorageError(#[from] RoomSendQueueStorageError),
1846
1847 #[error("the attachment event could not be created")]
1849 FailedToCreateAttachment,
1850}
1851
1852#[derive(Debug, thiserror::Error)]
1854pub enum RoomSendQueueStorageError {
1855 #[error(transparent)]
1857 StateStoreError(#[from] StoreError),
1858
1859 #[error(transparent)]
1861 EventCacheStoreError(#[from] EventCacheStoreError),
1862
1863 #[error(transparent)]
1865 LockError(#[from] LockStoreError),
1866
1867 #[error(transparent)]
1869 JsonSerialization(#[from] serde_json::Error),
1870
1871 #[error("a dependent event had an invalid parent key type")]
1874 InvalidParentKey,
1875
1876 #[error("The client is shutting down.")]
1878 ClientShuttingDown,
1879
1880 #[error("This operation is not implemented for media uploads")]
1882 OperationNotImplementedYet,
1883
1884 #[error("Can't edit a media caption when the underlying event isn't a media")]
1886 InvalidMediaCaptionEdit,
1887}
1888
1889#[derive(Clone, Debug)]
1891struct MediaHandles {
1892 upload_thumbnail_txn: Option<OwnedTransactionId>,
1896
1897 upload_file_txn: OwnedTransactionId,
1899}
1900
1901#[derive(Clone, Debug)]
1903pub struct SendHandle {
1904 room: RoomSendQueue,
1906
1907 transaction_id: OwnedTransactionId,
1912
1913 media_handles: Vec<MediaHandles>,
1915
1916 pub created_at: MilliSecondsSinceUnixEpoch,
1918}
1919
1920impl SendHandle {
1921 fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
1922 if !self.media_handles.is_empty() {
1923 Err(RoomSendQueueStorageError::OperationNotImplementedYet)
1924 } else {
1925 Ok(())
1926 }
1927 }
1928
1929 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
1934 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
1935 trace!("received an abort request");
1936
1937 let queue = &self.room.inner.queue;
1938
1939 for handles in &self.media_handles {
1940 if queue.abort_upload(&self.transaction_id, handles).await? {
1941 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
1943 transaction_id: self.transaction_id.clone(),
1944 });
1945
1946 return Ok(true);
1947 }
1948
1949 }
1953
1954 if queue.cancel_event(&self.transaction_id).await? {
1955 trace!("successful abort");
1956
1957 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
1959 transaction_id: self.transaction_id.clone(),
1960 });
1961
1962 Ok(true)
1963 } else {
1964 debug!("local echo didn't exist anymore, can't abort");
1965 Ok(false)
1966 }
1967 }
1968
1969 #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
1974 pub async fn edit_raw(
1975 &self,
1976 new_content: Raw<AnyMessageLikeEventContent>,
1977 event_type: String,
1978 ) -> Result<bool, RoomSendQueueStorageError> {
1979 trace!("received an edit request");
1980 self.nyi_for_uploads()?;
1981
1982 let serializable = SerializableEventContent::from_raw(new_content, event_type);
1983
1984 if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
1985 trace!("successful edit");
1986
1987 self.room.inner.notifier.notify_one();
1989
1990 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
1992 transaction_id: self.transaction_id.clone(),
1993 new_content: serializable,
1994 });
1995
1996 Ok(true)
1997 } else {
1998 debug!("local echo doesn't exist anymore, can't edit");
1999 Ok(false)
2000 }
2001 }
2002
2003 pub async fn edit(
2008 &self,
2009 new_content: AnyMessageLikeEventContent,
2010 ) -> Result<bool, RoomSendQueueStorageError> {
2011 self.edit_raw(
2012 Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2013 new_content.event_type().to_string(),
2014 )
2015 .await
2016 }
2017
2018 pub async fn edit_media_caption(
2023 &self,
2024 caption: Option<String>,
2025 formatted_caption: Option<FormattedBody>,
2026 mentions: Option<Mentions>,
2027 ) -> Result<bool, RoomSendQueueStorageError> {
2028 if let Some(new_content) = self
2029 .room
2030 .inner
2031 .queue
2032 .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2033 .await?
2034 {
2035 trace!("successful edit of media caption");
2036
2037 self.room.inner.notifier.notify_one();
2039
2040 let new_content = SerializableEventContent::new(&new_content)
2041 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2042
2043 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
2045 transaction_id: self.transaction_id.clone(),
2046 new_content,
2047 });
2048
2049 Ok(true)
2050 } else {
2051 debug!("local echo doesn't exist anymore, can't edit media caption");
2052 Ok(false)
2053 }
2054 }
2055
2056 pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2059 let room = &self.room.inner;
2060 room.queue
2061 .mark_as_unwedged(&self.transaction_id)
2062 .await
2063 .map_err(RoomSendQueueError::StorageError)?;
2064
2065 for handles in &self.media_handles {
2073 room.queue
2074 .mark_as_unwedged(&handles.upload_file_txn)
2075 .await
2076 .map_err(RoomSendQueueError::StorageError)?;
2077
2078 if let Some(txn) = &handles.upload_thumbnail_txn {
2079 room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2080 }
2081 }
2082
2083 room.notifier.notify_one();
2085
2086 let _ = room
2087 .updates
2088 .send(RoomSendQueueUpdate::RetryEvent { transaction_id: self.transaction_id.clone() });
2089
2090 Ok(())
2091 }
2092
2093 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2098 pub async fn react(
2099 &self,
2100 key: String,
2101 ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2102 trace!("received an intent to react");
2103
2104 let created_at = MilliSecondsSinceUnixEpoch::now();
2105 if let Some(reaction_txn_id) =
2106 self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2107 {
2108 trace!("successfully queued react");
2109
2110 self.room.inner.notifier.notify_one();
2112
2113 let send_handle = SendReactionHandle {
2115 room: self.room.clone(),
2116 transaction_id: reaction_txn_id.clone(),
2117 };
2118
2119 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2120 transaction_id: reaction_txn_id.into(),
2123 content: LocalEchoContent::React {
2124 key,
2125 send_handle: send_handle.clone(),
2126 applies_to: self.transaction_id.clone(),
2127 },
2128 }));
2129
2130 Ok(Some(send_handle))
2131 } else {
2132 debug!("local echo doesn't exist anymore, can't react");
2133 Ok(None)
2134 }
2135 }
2136}
2137
2138#[derive(Clone, Debug)]
2140pub struct SendReactionHandle {
2141 room: RoomSendQueue,
2143 transaction_id: ChildTransactionId,
2145}
2146
2147impl SendReactionHandle {
2148 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2153 if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2154 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2158 transaction_id: self.transaction_id.clone().into(),
2159 });
2160
2161 return Ok(true);
2162 }
2163
2164 let handle = SendHandle {
2167 room: self.room.clone(),
2168 transaction_id: self.transaction_id.clone().into(),
2169 media_handles: vec![],
2170 created_at: MilliSecondsSinceUnixEpoch::now(),
2171 };
2172
2173 handle.abort().await
2174 }
2175
2176 pub fn transaction_id(&self) -> &TransactionId {
2178 &self.transaction_id
2179 }
2180}
2181
2182fn canonicalize_dependent_requests(
2186 dependent: &[DependentQueuedRequest],
2187) -> Vec<DependentQueuedRequest> {
2188 let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2189
2190 for d in dependent {
2191 let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2192
2193 if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2194 continue;
2197 }
2198
2199 match &d.kind {
2200 DependentQueuedRequestKind::EditEvent { .. } => {
2201 if let Some(prev_edit) = prevs
2203 .iter_mut()
2204 .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2205 {
2206 *prev_edit = d;
2207 } else {
2208 prevs.insert(0, d);
2209 }
2210 }
2211
2212 DependentQueuedRequestKind::UploadFileWithThumbnail { .. }
2213 | DependentQueuedRequestKind::FinishUpload { .. }
2214 | DependentQueuedRequestKind::ReactEvent { .. } => {
2215 prevs.push(d);
2217 }
2218
2219 DependentQueuedRequestKind::RedactEvent => {
2220 prevs.clear();
2222 prevs.push(d);
2223 }
2224 }
2225 }
2226
2227 by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2228}
2229
2230#[cfg(all(test, not(target_arch = "wasm32")))]
2231mod tests {
2232 use std::{sync::Arc, time::Duration};
2233
2234 use assert_matches2::{assert_let, assert_matches};
2235 use matrix_sdk_base::store::{
2236 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2237 SerializableEventContent,
2238 };
2239 use matrix_sdk_test::{async_test, JoinedRoomBuilder, SyncResponseBuilder};
2240 use ruma::{
2241 events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
2242 room_id, MilliSecondsSinceUnixEpoch, TransactionId,
2243 };
2244
2245 use super::canonicalize_dependent_requests;
2246 use crate::{client::WeakClient, test_utils::logged_in_client};
2247
2248 #[test]
2249 fn test_canonicalize_dependent_events_created_at() {
2250 let txn = TransactionId::new();
2253 let created_at = MilliSecondsSinceUnixEpoch::now();
2254
2255 let edit = DependentQueuedRequest {
2256 own_transaction_id: ChildTransactionId::new(),
2257 parent_transaction_id: txn.clone(),
2258 kind: DependentQueuedRequestKind::EditEvent {
2259 new_content: SerializableEventContent::new(
2260 &RoomMessageEventContent::text_plain("edit").into(),
2261 )
2262 .unwrap(),
2263 },
2264 parent_key: None,
2265 created_at,
2266 };
2267
2268 let res = canonicalize_dependent_requests(&[edit]);
2269
2270 assert_eq!(res.len(), 1);
2271 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2272 assert_let!(
2273 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2274 );
2275 assert_eq!(msg.body(), "edit");
2276 assert_eq!(res[0].parent_transaction_id, txn);
2277 assert_eq!(res[0].created_at, created_at);
2278 }
2279
2280 #[async_test]
2281 async fn test_client_no_cycle_with_send_queue() {
2282 for enabled in [true, false] {
2283 let client = logged_in_client(None).await;
2284 let weak_client = WeakClient::from_client(&client);
2285
2286 {
2287 let mut sync_response_builder = SyncResponseBuilder::new();
2288
2289 let room_id = room_id!("!a:b.c");
2290
2291 client
2293 .base_client()
2294 .receive_sync_response(
2295 sync_response_builder
2296 .add_joined_room(JoinedRoomBuilder::new(room_id))
2297 .build_sync_response(),
2298 )
2299 .await
2300 .unwrap();
2301
2302 let room = client.get_room(room_id).unwrap();
2303 let q = room.send_queue();
2304
2305 let _watcher = q.subscribe().await;
2306
2307 client.send_queue().set_enabled(enabled).await;
2308 }
2309
2310 drop(client);
2311
2312 tokio::time::sleep(Duration::from_millis(500)).await;
2314
2315 let client = weak_client.get();
2317 assert!(
2318 client.is_none(),
2319 "too many strong references to the client: {}",
2320 Arc::strong_count(&client.unwrap().inner)
2321 );
2322 }
2323 }
2324
2325 #[test]
2326 fn test_canonicalize_dependent_events_smoke_test() {
2327 let txn = TransactionId::new();
2329
2330 let edit = DependentQueuedRequest {
2331 own_transaction_id: ChildTransactionId::new(),
2332 parent_transaction_id: txn.clone(),
2333 kind: DependentQueuedRequestKind::EditEvent {
2334 new_content: SerializableEventContent::new(
2335 &RoomMessageEventContent::text_plain("edit").into(),
2336 )
2337 .unwrap(),
2338 },
2339 parent_key: None,
2340 created_at: MilliSecondsSinceUnixEpoch::now(),
2341 };
2342 let res = canonicalize_dependent_requests(&[edit]);
2343
2344 assert_eq!(res.len(), 1);
2345 assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2346 assert_eq!(res[0].parent_transaction_id, txn);
2347 assert!(res[0].parent_key.is_none());
2348 }
2349
2350 #[test]
2351 fn test_canonicalize_dependent_events_redaction_preferred() {
2352 let txn = TransactionId::new();
2354
2355 let mut inputs = Vec::with_capacity(100);
2356 let redact = DependentQueuedRequest {
2357 own_transaction_id: ChildTransactionId::new(),
2358 parent_transaction_id: txn.clone(),
2359 kind: DependentQueuedRequestKind::RedactEvent,
2360 parent_key: None,
2361 created_at: MilliSecondsSinceUnixEpoch::now(),
2362 };
2363
2364 let edit = DependentQueuedRequest {
2365 own_transaction_id: ChildTransactionId::new(),
2366 parent_transaction_id: txn.clone(),
2367 kind: DependentQueuedRequestKind::EditEvent {
2368 new_content: SerializableEventContent::new(
2369 &RoomMessageEventContent::text_plain("edit").into(),
2370 )
2371 .unwrap(),
2372 },
2373 parent_key: None,
2374 created_at: MilliSecondsSinceUnixEpoch::now(),
2375 };
2376
2377 inputs.push({
2378 let mut edit = edit.clone();
2379 edit.own_transaction_id = ChildTransactionId::new();
2380 edit
2381 });
2382
2383 inputs.push(redact);
2384
2385 for _ in 0..98 {
2386 let mut edit = edit.clone();
2387 edit.own_transaction_id = ChildTransactionId::new();
2388 inputs.push(edit);
2389 }
2390
2391 let res = canonicalize_dependent_requests(&inputs);
2392
2393 assert_eq!(res.len(), 1);
2394 assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
2395 assert_eq!(res[0].parent_transaction_id, txn);
2396 }
2397
2398 #[test]
2399 fn test_canonicalize_dependent_events_last_edit_preferred() {
2400 let parent_txn = TransactionId::new();
2401
2402 let inputs = (0..10)
2404 .map(|i| DependentQueuedRequest {
2405 own_transaction_id: ChildTransactionId::new(),
2406 parent_transaction_id: parent_txn.clone(),
2407 kind: DependentQueuedRequestKind::EditEvent {
2408 new_content: SerializableEventContent::new(
2409 &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
2410 )
2411 .unwrap(),
2412 },
2413 parent_key: None,
2414 created_at: MilliSecondsSinceUnixEpoch::now(),
2415 })
2416 .collect::<Vec<_>>();
2417
2418 let txn = inputs[9].parent_transaction_id.clone();
2419
2420 let res = canonicalize_dependent_requests(&inputs);
2421
2422 assert_eq!(res.len(), 1);
2423 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2424 assert_let!(
2425 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2426 );
2427 assert_eq!(msg.body(), "edit9");
2428 assert_eq!(res[0].parent_transaction_id, txn);
2429 }
2430
2431 #[test]
2432 fn test_canonicalize_multiple_local_echoes() {
2433 let txn1 = TransactionId::new();
2434 let txn2 = TransactionId::new();
2435
2436 let child1 = ChildTransactionId::new();
2437 let child2 = ChildTransactionId::new();
2438
2439 let inputs = vec![
2440 DependentQueuedRequest {
2442 own_transaction_id: child1.clone(),
2443 kind: DependentQueuedRequestKind::RedactEvent,
2444 parent_transaction_id: txn1.clone(),
2445 parent_key: None,
2446 created_at: MilliSecondsSinceUnixEpoch::now(),
2447 },
2448 DependentQueuedRequest {
2450 own_transaction_id: child2,
2451 kind: DependentQueuedRequestKind::EditEvent {
2452 new_content: SerializableEventContent::new(
2453 &RoomMessageEventContent::text_plain("edit").into(),
2454 )
2455 .unwrap(),
2456 },
2457 parent_transaction_id: txn2.clone(),
2458 parent_key: None,
2459 created_at: MilliSecondsSinceUnixEpoch::now(),
2460 },
2461 ];
2462
2463 let res = canonicalize_dependent_requests(&inputs);
2464
2465 assert_eq!(res.len(), 2);
2467
2468 for dependent in res {
2469 if dependent.own_transaction_id == child1 {
2470 assert_eq!(dependent.parent_transaction_id, txn1);
2471 assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
2472 } else {
2473 assert_eq!(dependent.parent_transaction_id, txn2);
2474 assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
2475 }
2476 }
2477 }
2478
2479 #[test]
2480 fn test_canonicalize_reactions_after_edits() {
2481 let txn = TransactionId::new();
2483
2484 let react_id = ChildTransactionId::new();
2485 let react = DependentQueuedRequest {
2486 own_transaction_id: react_id.clone(),
2487 kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
2488 parent_transaction_id: txn.clone(),
2489 parent_key: None,
2490 created_at: MilliSecondsSinceUnixEpoch::now(),
2491 };
2492
2493 let edit_id = ChildTransactionId::new();
2494 let edit = DependentQueuedRequest {
2495 own_transaction_id: edit_id.clone(),
2496 kind: DependentQueuedRequestKind::EditEvent {
2497 new_content: SerializableEventContent::new(
2498 &RoomMessageEventContent::text_plain("edit").into(),
2499 )
2500 .unwrap(),
2501 },
2502 parent_transaction_id: txn,
2503 parent_key: None,
2504 created_at: MilliSecondsSinceUnixEpoch::now(),
2505 };
2506
2507 let res = canonicalize_dependent_requests(&[react, edit]);
2508
2509 assert_eq!(res.len(), 2);
2510 assert_eq!(res[0].own_transaction_id, edit_id);
2511 assert_eq!(res[1].own_transaction_id, react_id);
2512 }
2513}