1use std::{
132 collections::{BTreeMap, HashMap},
133 str::FromStr as _,
134 sync::{
135 Arc, RwLock,
136 atomic::{AtomicBool, Ordering},
137 },
138};
139
140use eyeball::SharedObservable;
141#[cfg(feature = "e2e-encryption")]
142use matrix_sdk_base::crypto::{OlmError, SessionRecipientCollectionError};
143#[cfg(feature = "unstable-msc4274")]
144use matrix_sdk_base::store::FinishGalleryItemInfo;
145use matrix_sdk_base::{
146 RoomState, StoreError,
147 cross_process_lock::CrossProcessLockError,
148 deserialized_responses::TimelineEvent,
149 event_cache::store::EventCacheStoreError,
150 media::{MediaRequestParameters, store::MediaStoreError},
151 store::{
152 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
153 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
154 SentMediaInfo, SentRequestKey, SerializableEventContent,
155 },
156};
157use matrix_sdk_common::{
158 executor::{JoinHandle, spawn},
159 locks::Mutex as SyncMutex,
160};
161use mime::Mime;
162use ruma::{
163 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
164 TransactionId,
165 events::{
166 AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _,
167 reaction::ReactionEventContent,
168 relation::Annotation,
169 room::{
170 MediaSource,
171 message::{FormattedBody, RoomMessageEventContent},
172 },
173 },
174 serde::Raw,
175};
176use tokio::sync::{Mutex, Notify, OwnedMutexGuard, broadcast, oneshot};
177use tracing::{debug, error, info, instrument, trace, warn};
178
179use crate::{
180 Client, Media, Room, TransmissionProgress,
181 client::WeakClient,
182 config::RequestConfig,
183 error::RetryKind,
184 room::{WeakRoom, edit::EditedContent},
185};
186
187mod progress;
188mod upload;
189
190pub use progress::AbstractProgress;
191
192pub struct SendQueue {
194 client: Client,
195}
196
197#[cfg(not(tarpaulin_include))]
198impl std::fmt::Debug for SendQueue {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 f.debug_struct("SendQueue").finish_non_exhaustive()
201 }
202}
203
204impl SendQueue {
205 pub(super) fn new(client: Client) -> Self {
206 Self { client }
207 }
208
209 pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
212 if !self.is_enabled() {
213 return;
214 }
215
216 let room_ids =
217 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
218 |err| {
219 warn!("error when loading rooms with unsent requests: {err}");
220 Vec::new()
221 },
222 );
223
224 for room_id in room_ids {
226 if let Some(room) = self.client.get_room(&room_id) {
227 let _ = self.for_room(room);
228 }
229 }
230 }
231
232 #[inline(always)]
234 fn data(&self) -> &SendQueueData {
235 &self.client.inner.send_queue_data
236 }
237
238 pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue {
241 let data = self.data();
242
243 let mut map = data.rooms.write().unwrap();
244
245 let room_id = room.room_id();
246 if let Some(room_q) = map.get(room_id).cloned() {
247 return room_q;
248 }
249
250 let owned_room_id = room_id.to_owned();
251 let room_q = RoomSendQueue::new(
252 self.is_enabled(),
253 data.global_update_sender.clone(),
254 data.error_sender.clone(),
255 data.is_dropping.clone(),
256 &self.client,
257 owned_room_id.clone(),
258 data.report_media_upload_progress.clone(),
259 );
260
261 map.insert(owned_room_id, room_q.clone());
262
263 room_q
264 }
265
266 pub async fn set_enabled(&self, enabled: bool) {
276 debug!(?enabled, "setting global send queue enablement");
277
278 self.data().globally_enabled.store(enabled, Ordering::SeqCst);
279
280 for room in self.data().rooms.read().unwrap().values() {
282 room.set_enabled(enabled);
283 }
284
285 self.respawn_tasks_for_rooms_with_unsent_requests().await;
288 }
289
290 pub fn is_enabled(&self) -> bool {
293 self.data().globally_enabled.load(Ordering::SeqCst)
294 }
295
296 pub fn enable_upload_progress(&self, enabled: bool) {
298 self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst);
299 }
300
301 pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
306 self.data().global_update_sender.subscribe()
307 }
308
309 pub async fn local_echoes(
311 &self,
312 ) -> Result<BTreeMap<OwnedRoomId, Vec<LocalEcho>>, RoomSendQueueError> {
313 let room_ids =
314 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
315 |err| {
316 warn!("error when loading rooms with unsent requests: {err}");
317 Vec::new()
318 },
319 );
320
321 let mut local_echoes: BTreeMap<OwnedRoomId, Vec<LocalEcho>> = BTreeMap::new();
322
323 for room_id in room_ids {
324 if let Some(room) = self.client.get_room(&room_id) {
325 let queue = self.for_room(room);
326 local_echoes
327 .insert(room_id.to_owned(), queue.inner.queue.local_echoes(&queue).await?);
328 }
329 }
330
331 Ok(local_echoes)
332 }
333
334 pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
337 self.data().error_sender.subscribe()
338 }
339}
340
341#[derive(Clone, Debug)]
344struct QueueThumbnailInfo {
345 finish_upload_thumbnail_info: FinishUploadThumbnailInfo,
347
348 media_request_parameters: MediaRequestParameters,
350
351 content_type: Mime,
353
354 file_size: usize,
356}
357
358#[derive(Clone, Debug)]
360pub struct SendQueueRoomError {
361 pub room_id: OwnedRoomId,
363
364 pub error: Arc<crate::Error>,
366
367 pub is_recoverable: bool,
373}
374
375impl Client {
376 pub fn send_queue(&self) -> SendQueue {
379 SendQueue::new(self.clone())
380 }
381}
382
383pub(super) struct SendQueueData {
384 rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
386
387 globally_enabled: AtomicBool,
392
393 global_update_sender: broadcast::Sender<SendQueueUpdate>,
397
398 error_sender: broadcast::Sender<SendQueueRoomError>,
400
401 is_dropping: Arc<AtomicBool>,
403
404 report_media_upload_progress: Arc<AtomicBool>,
406}
407
408impl SendQueueData {
409 pub fn new(globally_enabled: bool) -> Self {
411 let (global_update_sender, _) = broadcast::channel(32);
412 let (error_sender, _) = broadcast::channel(32);
413
414 Self {
415 rooms: Default::default(),
416 globally_enabled: AtomicBool::new(globally_enabled),
417 global_update_sender,
418 error_sender,
419 is_dropping: Arc::new(false.into()),
420 report_media_upload_progress: Arc::new(false.into()),
421 }
422 }
423}
424
425impl Drop for SendQueueData {
426 fn drop(&mut self) {
427 debug!("globally dropping the send queue");
430 self.is_dropping.store(true, Ordering::SeqCst);
431
432 let rooms = self.rooms.read().unwrap();
433 for room in rooms.values() {
434 room.inner.notifier.notify_one();
435 }
436 }
437}
438
439impl Room {
440 pub fn send_queue(&self) -> RoomSendQueue {
442 self.client.send_queue().for_room(self.clone())
443 }
444}
445
446#[derive(Clone)]
450pub struct RoomSendQueue {
451 inner: Arc<RoomSendQueueInner>,
452}
453
454#[cfg(not(tarpaulin_include))]
455impl std::fmt::Debug for RoomSendQueue {
456 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457 f.debug_struct("RoomSendQueue").finish_non_exhaustive()
458 }
459}
460
461impl RoomSendQueue {
462 fn new(
463 globally_enabled: bool,
464 global_update_sender: broadcast::Sender<SendQueueUpdate>,
465 global_error_sender: broadcast::Sender<SendQueueRoomError>,
466 is_dropping: Arc<AtomicBool>,
467 client: &Client,
468 room_id: OwnedRoomId,
469 report_media_upload_progress: Arc<AtomicBool>,
470 ) -> Self {
471 let (update_sender, _) = broadcast::channel(32);
472
473 let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
474 let notifier = Arc::new(Notify::new());
475
476 let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
477 let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
478
479 let task = spawn(Self::sending_task(
480 weak_room.clone(),
481 queue.clone(),
482 notifier.clone(),
483 global_update_sender.clone(),
484 update_sender.clone(),
485 locally_enabled.clone(),
486 global_error_sender,
487 is_dropping,
488 report_media_upload_progress,
489 ));
490
491 Self {
492 inner: Arc::new(RoomSendQueueInner {
493 room: weak_room,
494 global_update_sender,
495 update_sender,
496 _task: task,
497 queue,
498 notifier,
499 locally_enabled,
500 }),
501 }
502 }
503
504 pub async fn send_raw(
519 &self,
520 content: Raw<AnyMessageLikeEventContent>,
521 event_type: String,
522 ) -> Result<SendHandle, RoomSendQueueError> {
523 let Some(room) = self.inner.room.get() else {
524 return Err(RoomSendQueueError::RoomDisappeared);
525 };
526 if room.state() != RoomState::Joined {
527 return Err(RoomSendQueueError::RoomNotJoined);
528 }
529
530 let content = SerializableEventContent::from_raw(content, event_type);
531
532 let created_at = MilliSecondsSinceUnixEpoch::now();
533 let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
534 trace!(%transaction_id, "manager sends a raw event to the background task");
535
536 self.inner.notifier.notify_one();
537
538 let send_handle = SendHandle {
539 room: self.clone(),
540 transaction_id: transaction_id.clone(),
541 media_handles: vec![],
542 created_at,
543 };
544
545 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
546 transaction_id,
547 content: LocalEchoContent::Event {
548 serialized_event: content,
549 send_handle: send_handle.clone(),
550 send_error: None,
551 },
552 }));
553
554 Ok(send_handle)
555 }
556
557 pub async fn send(
572 &self,
573 content: AnyMessageLikeEventContent,
574 ) -> Result<SendHandle, RoomSendQueueError> {
575 self.send_raw(
576 Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
577 content.event_type().to_string(),
578 )
579 .await
580 }
581
582 pub async fn subscribe(
588 &self,
589 ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
590 {
591 let local_echoes = self.inner.queue.local_echoes(self).await?;
592
593 Ok((local_echoes, self.inner.update_sender.subscribe()))
594 }
595
596 #[allow(clippy::too_many_arguments)]
602 #[instrument(skip_all, fields(room_id = %room.room_id()))]
603 async fn sending_task(
604 room: WeakRoom,
605 queue: QueueStorage,
606 notifier: Arc<Notify>,
607 global_update_sender: broadcast::Sender<SendQueueUpdate>,
608 update_sender: broadcast::Sender<RoomSendQueueUpdate>,
609 locally_enabled: Arc<AtomicBool>,
610 global_error_sender: broadcast::Sender<SendQueueRoomError>,
611 is_dropping: Arc<AtomicBool>,
612 report_media_upload_progress: Arc<AtomicBool>,
613 ) {
614 trace!("spawned the sending task");
615
616 let room_id = room.room_id();
617
618 loop {
619 if is_dropping.load(Ordering::SeqCst) {
621 trace!("shutting down!");
622 break;
623 }
624
625 let mut new_updates = Vec::new();
628 if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
629 warn!("errors when applying dependent requests: {err}");
630 }
631
632 for up in new_updates {
633 send_update(&global_update_sender, &update_sender, room_id, up);
634 }
635
636 if !locally_enabled.load(Ordering::SeqCst) {
637 trace!("not enabled, sleeping");
638 notifier.notified().await;
640 continue;
641 }
642
643 let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
644 Ok(Some(request)) => request,
645
646 Ok(None) => {
647 trace!("queue is empty, sleeping");
648 notifier.notified().await;
650 continue;
651 }
652
653 Err(err) => {
654 warn!("error when loading next request to send: {err}");
655 continue;
656 }
657 };
658
659 let txn_id = queued_request.transaction_id.clone();
660 trace!(txn_id = %txn_id, "received a request to send!");
661
662 let Some(room) = room.get() else {
663 if is_dropping.load(Ordering::SeqCst) {
664 break;
665 }
666 error!("the weak room couldn't be upgraded but we're not shutting down?");
667 continue;
668 };
669
670 let (related_txn_id, media_upload_progress_info, http_progress) =
675 if let QueuedRequestKind::MediaUpload {
676 cache_key,
677 thumbnail_source,
678 #[cfg(feature = "unstable-msc4274")]
679 accumulated,
680 related_to,
681 ..
682 } = &queued_request.kind
683 {
684 let (media_upload_progress_info, http_progress) =
687 if report_media_upload_progress.load(Ordering::SeqCst) {
688 let media_upload_progress_info =
689 RoomSendQueue::create_media_upload_progress_info(
690 &queued_request.transaction_id,
691 related_to,
692 cache_key,
693 thumbnail_source.as_ref(),
694 #[cfg(feature = "unstable-msc4274")]
695 accumulated,
696 &room,
697 &queue,
698 )
699 .await;
700
701 let progress = RoomSendQueue::create_media_upload_progress_observable(
702 &media_upload_progress_info,
703 related_to,
704 &update_sender,
705 );
706
707 (Some(media_upload_progress_info), Some(progress))
708 } else {
709 Default::default()
710 };
711
712 (Some(related_to.clone()), media_upload_progress_info, http_progress)
713 } else {
714 Default::default()
715 };
716
717 match Self::handle_request(&room, queued_request, cancel_upload_rx, http_progress).await
718 {
719 Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
720 {
721 Ok(()) => match parent_key {
722 SentRequestKey::Event { event_id, event, event_type } => {
723 send_update(
724 &global_update_sender,
725 &update_sender,
726 room_id,
727 RoomSendQueueUpdate::SentEvent {
728 transaction_id: txn_id,
729 event_id: event_id.clone(),
730 },
731 );
732
733 if let Ok((room_event_cache, _drop_handles)) = room.event_cache().await
753 {
754 let timeline_event = match event_type.as_str() {
755 #[cfg(feature = "e2e-encryption")]
756 "m.room.encrypted" => {
757 use ruma::events::{
758 OriginalSyncMessageLikeEvent,
759 room::encrypted::RoomEncryptedEventContent,
760 };
761
762 let push_context = room.push_context().await.ok().flatten();
763
764 let event: Raw<AnyMessageLikeEventContent> = event;
771 let event: Raw<
772 OriginalSyncMessageLikeEvent<RoomEncryptedEventContent>,
773 > = event.cast_unchecked();
774
775 match room
776 .decrypt_event(&event, push_context.as_ref())
777 .await
778 {
779 Ok(timeline_event) => Some(timeline_event),
780 Err(err) => {
781 error!(
782 ?err,
783 "Failed to decrypt the event before the saving in the Event Cache"
784 );
785 None
786 }
787 }
788 }
789
790 event_type => {
791 match Raw::from_json_string(
792 format!(
794 "{{\
795 \"event_id\":\"{event_id}\",\
796 \"origin_server_ts\":{ts},\
797 \"sender\":\"{sender}\",\
798 \"type\":\"{type}\",\
799 \"content\":{content}\
800 }}",
801 event_id = event_id,
802 ts = MilliSecondsSinceUnixEpoch::now().get(),
803 sender = room.client().user_id().expect("Client must be logged-in"),
804 type = event_type,
805 content = event.into_json(),
806 ),
807 ) {
808 Ok(event) => Some(TimelineEvent::from_plaintext(event)),
809 Err(err) => {
810 error!(
811 ?err,
812 "Failed to build the (sync) event before the saving in the Event Cache"
813 );
814 None
815 }
816 }
817 }
818 };
819
820 if let Some(timeline_event) = timeline_event
824 && let Err(err) = room_event_cache
825 .insert_sent_event_from_send_queue(timeline_event)
826 .await
827 {
828 error!(
829 ?err,
830 "Failed to save the sent event in the Event Cache"
831 );
832 }
833 } else {
834 info!(
835 "Cannot insert the sent event in the Event Cache because \
836 either the room no longer exists, or the Room Event Cache cannot be retrieved"
837 );
838 }
839 }
840
841 SentRequestKey::Media(sent_media_info) => {
842 let index =
845 media_upload_progress_info.as_ref().map_or(0, |info| info.index);
846 let progress = media_upload_progress_info
847 .as_ref()
848 .map(|info| {
849 AbstractProgress { current: info.bytes, total: info.bytes }
850 + info.offsets
851 })
852 .unwrap_or(AbstractProgress { current: 1, total: 1 });
853
854 let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
857 related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
858 file: Some(sent_media_info.file),
859 index,
860 progress,
861 });
862 }
863 },
864
865 Err(err) => {
866 warn!("unable to mark queued request as sent: {err}");
867 }
868 },
869
870 Ok(None) => {
871 debug!("Request has been aborted while running, continuing.");
872 }
873
874 Err(err) => {
875 let is_recoverable = match err {
876 crate::Error::Http(ref http_err) => {
877 matches!(
879 http_err.retry_kind(),
880 RetryKind::Transient { .. } | RetryKind::NetworkFailure
881 )
882 }
883
884 crate::Error::ConcurrentRequestFailed => true,
889
890 _ => false,
892 };
893
894 locally_enabled.store(false, Ordering::SeqCst);
896
897 if is_recoverable {
898 warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
899
900 queue.mark_as_not_being_sent(&txn_id).await;
903
904 } else {
910 warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
911
912 if let Err(storage_error) =
914 queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
915 {
916 warn!("unable to mark request as wedged: {storage_error}");
917 }
918 }
919
920 let error = Arc::new(err);
921
922 let _ = global_error_sender.send(SendQueueRoomError {
923 room_id: room_id.to_owned(),
924 error: error.clone(),
925 is_recoverable,
926 });
927
928 send_update(
929 &global_update_sender,
930 &update_sender,
931 room_id,
932 RoomSendQueueUpdate::SendError {
933 transaction_id: related_txn_id.unwrap_or(txn_id),
934 error,
935 is_recoverable,
936 },
937 );
938 }
939 }
940 }
941
942 info!("exited sending task");
943 }
944
945 async fn handle_request(
949 room: &Room,
950 request: QueuedRequest,
951 cancel_upload_rx: Option<oneshot::Receiver<()>>,
952 progress: Option<SharedObservable<TransmissionProgress>>,
953 ) -> Result<Option<SentRequestKey>, crate::Error> {
954 match request.kind {
955 QueuedRequestKind::Event { content } => {
956 let (event, event_type) = content.into_raw();
957
958 let res = room
959 .send_raw(&event_type, &event)
960 .with_transaction_id(&request.transaction_id)
961 .with_request_config(RequestConfig::short_retry())
962 .await?;
963
964 trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
965
966 Ok(Some(SentRequestKey::Event { event_id: res.event_id, event, event_type }))
967 }
968
969 QueuedRequestKind::MediaUpload {
970 content_type,
971 cache_key,
972 thumbnail_source,
973 related_to: relates_to,
974 #[cfg(feature = "unstable-msc4274")]
975 accumulated,
976 } => {
977 trace!(%relates_to, "uploading media related to event");
978
979 let fut = async move {
980 let data = room
981 .client()
982 .media_store()
983 .lock()
984 .await?
985 .get_media_content(&cache_key)
986 .await?
987 .ok_or(crate::Error::SendQueueWedgeError(Box::new(
988 QueueWedgeError::MissingMediaContent,
989 )))?;
990
991 let mime = Mime::from_str(&content_type).map_err(|_| {
992 crate::Error::SendQueueWedgeError(Box::new(
993 QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
994 ))
995 })?;
996
997 #[cfg(feature = "e2e-encryption")]
998 let media_source = if room.latest_encryption_state().await?.is_encrypted() {
999 trace!("upload will be encrypted (encrypted room)");
1000
1001 let mut cursor = std::io::Cursor::new(data);
1002 let mut req = room
1003 .client
1004 .upload_encrypted_file(&mut cursor)
1005 .with_request_config(RequestConfig::short_retry());
1006 if let Some(progress) = progress {
1007 req = req.with_send_progress_observable(progress);
1008 }
1009 let encrypted_file = req.await?;
1010
1011 MediaSource::Encrypted(Box::new(encrypted_file))
1012 } else {
1013 trace!("upload will be in clear text (room without encryption)");
1014
1015 let request_config = RequestConfig::short_retry()
1016 .timeout(Media::reasonable_upload_timeout(&data));
1017 let mut req =
1018 room.client().media().upload(&mime, data, Some(request_config));
1019 if let Some(progress) = progress {
1020 req = req.with_send_progress_observable(progress);
1021 }
1022 let res = req.await?;
1023
1024 MediaSource::Plain(res.content_uri)
1025 };
1026
1027 #[cfg(not(feature = "e2e-encryption"))]
1028 let media_source = {
1029 let request_config = RequestConfig::short_retry()
1030 .timeout(Media::reasonable_upload_timeout(&data));
1031 let mut req =
1032 room.client().media().upload(&mime, data, Some(request_config));
1033 if let Some(progress) = progress {
1034 req = req.with_send_progress_observable(progress);
1035 }
1036 let res = req.await?;
1037 MediaSource::Plain(res.content_uri)
1038 };
1039
1040 let uri = match &media_source {
1041 MediaSource::Plain(uri) => uri,
1042 MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
1043 };
1044 trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
1045
1046 Ok(SentRequestKey::Media(SentMediaInfo {
1047 file: media_source,
1048 thumbnail: thumbnail_source,
1049 #[cfg(feature = "unstable-msc4274")]
1050 accumulated,
1051 }))
1052 };
1053
1054 let wait_for_cancel = async move {
1055 if let Some(rx) = cancel_upload_rx {
1056 rx.await
1057 } else {
1058 std::future::pending().await
1059 }
1060 };
1061
1062 tokio::select! {
1063 biased;
1064
1065 _ = wait_for_cancel => {
1066 Ok(None)
1067 }
1068
1069 res = fut => {
1070 res.map(Some)
1071 }
1072 }
1073 }
1074 }
1075 }
1076
1077 pub fn is_enabled(&self) -> bool {
1079 self.inner.locally_enabled.load(Ordering::SeqCst)
1080 }
1081
1082 pub fn set_enabled(&self, enabled: bool) {
1084 self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
1085
1086 if enabled {
1089 self.inner.notifier.notify_one();
1090 }
1091 }
1092
1093 fn send_update(&self, update: RoomSendQueueUpdate) {
1097 let _ = self.inner.update_sender.send(update.clone());
1098 let _ = self
1099 .inner
1100 .global_update_sender
1101 .send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
1102 }
1103}
1104
1105fn send_update(
1106 global_update_sender: &broadcast::Sender<SendQueueUpdate>,
1107 update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
1108 room_id: &RoomId,
1109 update: RoomSendQueueUpdate,
1110) {
1111 let _ = update_sender.send(update.clone());
1112 let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
1113}
1114
1115impl From<&crate::Error> for QueueWedgeError {
1116 fn from(value: &crate::Error) -> Self {
1117 match value {
1118 #[cfg(feature = "e2e-encryption")]
1119 crate::Error::OlmError(error) => match &**error {
1120 OlmError::SessionRecipientCollectionError(error) => match error {
1121 SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
1122 QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
1123 }
1124
1125 SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
1126 QueueWedgeError::IdentityViolations { users: users.clone() }
1127 }
1128
1129 SessionRecipientCollectionError::CrossSigningNotSetup
1130 | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
1131 QueueWedgeError::CrossVerificationRequired
1132 }
1133 },
1134 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1135 },
1136
1137 crate::Error::SendQueueWedgeError(error) => *error.clone(),
1139
1140 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1141 }
1142 }
1143}
1144
1145struct RoomSendQueueInner {
1146 room: WeakRoom,
1148
1149 global_update_sender: broadcast::Sender<SendQueueUpdate>,
1153
1154 update_sender: broadcast::Sender<RoomSendQueueUpdate>,
1160
1161 queue: QueueStorage,
1168
1169 notifier: Arc<Notify>,
1172
1173 locally_enabled: Arc<AtomicBool>,
1176
1177 _task: JoinHandle<()>,
1180}
1181
1182struct BeingSentInfo {
1184 transaction_id: OwnedTransactionId,
1186
1187 cancel_upload: Option<oneshot::Sender<()>>,
1190}
1191
1192impl BeingSentInfo {
1193 fn cancel_upload(self) -> bool {
1198 if let Some(cancel_upload) = self.cancel_upload {
1199 let _ = cancel_upload.send(());
1200 true
1201 } else {
1202 false
1203 }
1204 }
1205}
1206
1207#[derive(Clone)]
1210struct StoreLock {
1211 client: WeakClient,
1213
1214 being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
1219}
1220
1221impl StoreLock {
1222 async fn lock(&self) -> StoreLockGuard {
1224 StoreLockGuard {
1225 client: self.client.clone(),
1226 being_sent: self.being_sent.clone().lock_owned().await,
1227 }
1228 }
1229}
1230
1231struct StoreLockGuard {
1234 client: WeakClient,
1236
1237 being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
1240}
1241
1242impl StoreLockGuard {
1243 fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
1245 self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
1246 }
1247}
1248
1249#[derive(Clone)]
1250struct QueueStorage {
1251 store: StoreLock,
1254
1255 room_id: OwnedRoomId,
1257
1258 thumbnail_file_sizes: Arc<SyncMutex<HashMap<OwnedTransactionId, Vec<Option<usize>>>>>,
1271}
1272
1273impl QueueStorage {
1274 const LOW_PRIORITY: usize = 0;
1276
1277 const HIGH_PRIORITY: usize = 10;
1279
1280 fn new(client: WeakClient, room: OwnedRoomId) -> Self {
1282 Self {
1283 room_id: room,
1284 store: StoreLock { client, being_sent: Default::default() },
1285 thumbnail_file_sizes: Default::default(),
1286 }
1287 }
1288
1289 async fn push(
1293 &self,
1294 request: QueuedRequestKind,
1295 created_at: MilliSecondsSinceUnixEpoch,
1296 ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
1297 let transaction_id = TransactionId::new();
1298
1299 self.store
1300 .lock()
1301 .await
1302 .client()?
1303 .state_store()
1304 .save_send_queue_request(
1305 &self.room_id,
1306 transaction_id.clone(),
1307 created_at,
1308 request,
1309 Self::LOW_PRIORITY,
1310 )
1311 .await?;
1312
1313 Ok(transaction_id)
1314 }
1315
1316 async fn peek_next_to_send(
1321 &self,
1322 ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
1323 {
1324 let mut guard = self.store.lock().await;
1325 let queued_requests =
1326 guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
1327
1328 if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
1329 let (cancel_upload_tx, cancel_upload_rx) =
1330 if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1331 let (tx, rx) = oneshot::channel();
1332 (Some(tx), Some(rx))
1333 } else {
1334 Default::default()
1335 };
1336
1337 let prev = guard.being_sent.replace(BeingSentInfo {
1338 transaction_id: request.transaction_id.clone(),
1339 cancel_upload: cancel_upload_tx,
1340 });
1341
1342 if let Some(prev) = prev {
1343 error!(
1344 prev_txn = ?prev.transaction_id,
1345 "a previous request was still active while picking a new one"
1346 );
1347 }
1348
1349 Ok(Some((request.clone(), cancel_upload_rx)))
1350 } else {
1351 Ok(None)
1352 }
1353 }
1354
1355 async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1359 let was_being_sent = self.store.lock().await.being_sent.take();
1360
1361 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1362 if prev_txn != Some(transaction_id) {
1363 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1364 }
1365 }
1366
1367 async fn mark_as_wedged(
1371 &self,
1372 transaction_id: &TransactionId,
1373 reason: QueueWedgeError,
1374 ) -> Result<(), RoomSendQueueStorageError> {
1375 let mut guard = self.store.lock().await;
1377 let was_being_sent = guard.being_sent.take();
1378
1379 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1380 if prev_txn != Some(transaction_id) {
1381 error!(
1382 ?prev_txn,
1383 "previous active request didn't match that we expect (after permanent error)",
1384 );
1385 }
1386
1387 Ok(guard
1388 .client()?
1389 .state_store()
1390 .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1391 .await?)
1392 }
1393
1394 async fn mark_as_unwedged(
1397 &self,
1398 transaction_id: &TransactionId,
1399 ) -> Result<(), RoomSendQueueStorageError> {
1400 Ok(self
1401 .store
1402 .lock()
1403 .await
1404 .client()?
1405 .state_store()
1406 .update_send_queue_request_status(&self.room_id, transaction_id, None)
1407 .await?)
1408 }
1409
1410 async fn mark_as_sent(
1413 &self,
1414 transaction_id: &TransactionId,
1415 parent_key: SentRequestKey,
1416 ) -> Result<(), RoomSendQueueStorageError> {
1417 let mut guard = self.store.lock().await;
1419 let was_being_sent = guard.being_sent.take();
1420
1421 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1422 if prev_txn != Some(transaction_id) {
1423 error!(
1424 ?prev_txn,
1425 "previous active request didn't match that we expect (after successful send)",
1426 );
1427 }
1428
1429 let client = guard.client()?;
1430 let store = client.state_store();
1431
1432 store
1434 .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1435 .await?;
1436
1437 let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1438
1439 if !removed {
1440 warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1441 }
1442
1443 self.thumbnail_file_sizes.lock().remove(transaction_id);
1444
1445 Ok(())
1446 }
1447
1448 async fn cancel_event(
1455 &self,
1456 transaction_id: &TransactionId,
1457 ) -> Result<bool, RoomSendQueueStorageError> {
1458 let guard = self.store.lock().await;
1459
1460 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1461 == Some(transaction_id)
1462 {
1463 guard
1465 .client()?
1466 .state_store()
1467 .save_dependent_queued_request(
1468 &self.room_id,
1469 transaction_id,
1470 ChildTransactionId::new(),
1471 MilliSecondsSinceUnixEpoch::now(),
1472 DependentQueuedRequestKind::RedactEvent,
1473 )
1474 .await?;
1475
1476 return Ok(true);
1477 }
1478
1479 let removed = guard
1480 .client()?
1481 .state_store()
1482 .remove_send_queue_request(&self.room_id, transaction_id)
1483 .await?;
1484
1485 self.thumbnail_file_sizes.lock().remove(transaction_id);
1486
1487 Ok(removed)
1488 }
1489
1490 async fn replace_event(
1497 &self,
1498 transaction_id: &TransactionId,
1499 serializable: SerializableEventContent,
1500 ) -> Result<bool, RoomSendQueueStorageError> {
1501 let guard = self.store.lock().await;
1502
1503 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1504 == Some(transaction_id)
1505 {
1506 guard
1508 .client()?
1509 .state_store()
1510 .save_dependent_queued_request(
1511 &self.room_id,
1512 transaction_id,
1513 ChildTransactionId::new(),
1514 MilliSecondsSinceUnixEpoch::now(),
1515 DependentQueuedRequestKind::EditEvent { new_content: serializable },
1516 )
1517 .await?;
1518
1519 return Ok(true);
1520 }
1521
1522 let edited = guard
1523 .client()?
1524 .state_store()
1525 .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1526 .await?;
1527
1528 Ok(edited)
1529 }
1530
1531 #[allow(clippy::too_many_arguments)]
1535 async fn push_media(
1536 &self,
1537 event: RoomMessageEventContent,
1538 content_type: Mime,
1539 send_event_txn: OwnedTransactionId,
1540 created_at: MilliSecondsSinceUnixEpoch,
1541 upload_file_txn: OwnedTransactionId,
1542 file_media_request: MediaRequestParameters,
1543 thumbnail: Option<QueueThumbnailInfo>,
1544 ) -> Result<(), RoomSendQueueStorageError> {
1545 let guard = self.store.lock().await;
1546 let client = guard.client()?;
1547 let store = client.state_store();
1548
1549 let thumbnail_file_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)];
1551
1552 let thumbnail_info = self
1553 .push_thumbnail_and_media_uploads(
1554 store,
1555 &content_type,
1556 send_event_txn.clone(),
1557 created_at,
1558 upload_file_txn.clone(),
1559 file_media_request,
1560 thumbnail,
1561 )
1562 .await?;
1563
1564 store
1566 .save_dependent_queued_request(
1567 &self.room_id,
1568 &upload_file_txn,
1569 send_event_txn.clone().into(),
1570 created_at,
1571 DependentQueuedRequestKind::FinishUpload {
1572 local_echo: Box::new(event),
1573 file_upload: upload_file_txn.clone(),
1574 thumbnail_info,
1575 },
1576 )
1577 .await?;
1578
1579 self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1580
1581 Ok(())
1582 }
1583
1584 #[cfg(feature = "unstable-msc4274")]
1588 #[allow(clippy::too_many_arguments)]
1589 async fn push_gallery(
1590 &self,
1591 event: RoomMessageEventContent,
1592 send_event_txn: OwnedTransactionId,
1593 created_at: MilliSecondsSinceUnixEpoch,
1594 item_queue_infos: Vec<GalleryItemQueueInfo>,
1595 ) -> Result<(), RoomSendQueueStorageError> {
1596 let guard = self.store.lock().await;
1597 let client = guard.client()?;
1598 let store = client.state_store();
1599
1600 let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1601 let mut thumbnail_file_sizes = Vec::with_capacity(item_queue_infos.len());
1602
1603 let Some((first, rest)) = item_queue_infos.split_first() else {
1604 return Ok(());
1605 };
1606
1607 let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1608 first;
1609
1610 let thumbnail_info = self
1611 .push_thumbnail_and_media_uploads(
1612 store,
1613 content_type,
1614 send_event_txn.clone(),
1615 created_at,
1616 upload_file_txn.clone(),
1617 file_media_request.clone(),
1618 thumbnail.clone(),
1619 )
1620 .await?;
1621
1622 finish_item_infos
1623 .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1624 thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1625
1626 let mut last_upload_file_txn = upload_file_txn.clone();
1627
1628 for item_queue_info in rest {
1629 let GalleryItemQueueInfo {
1630 content_type,
1631 upload_file_txn,
1632 file_media_request,
1633 thumbnail,
1634 } = item_queue_info;
1635
1636 let thumbnail_info = if let Some(QueueThumbnailInfo {
1637 finish_upload_thumbnail_info: thumbnail_info,
1638 media_request_parameters: thumbnail_media_request,
1639 content_type: thumbnail_content_type,
1640 ..
1641 }) = thumbnail
1642 {
1643 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1644
1645 store
1648 .save_dependent_queued_request(
1649 &self.room_id,
1650 &last_upload_file_txn,
1651 upload_thumbnail_txn.clone().into(),
1652 created_at,
1653 DependentQueuedRequestKind::UploadFileOrThumbnail {
1654 content_type: thumbnail_content_type.to_string(),
1655 cache_key: thumbnail_media_request.clone(),
1656 related_to: send_event_txn.clone(),
1657 parent_is_thumbnail_upload: false,
1658 },
1659 )
1660 .await?;
1661
1662 last_upload_file_txn = upload_thumbnail_txn;
1663
1664 Some(thumbnail_info)
1665 } else {
1666 None
1667 };
1668
1669 store
1671 .save_dependent_queued_request(
1672 &self.room_id,
1673 &last_upload_file_txn,
1674 upload_file_txn.clone().into(),
1675 created_at,
1676 DependentQueuedRequestKind::UploadFileOrThumbnail {
1677 content_type: content_type.to_string(),
1678 cache_key: file_media_request.clone(),
1679 related_to: send_event_txn.clone(),
1680 parent_is_thumbnail_upload: thumbnail.is_some(),
1681 },
1682 )
1683 .await?;
1684
1685 finish_item_infos.push(FinishGalleryItemInfo {
1686 file_upload: upload_file_txn.clone(),
1687 thumbnail_info: thumbnail_info.cloned(),
1688 });
1689 thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1690
1691 last_upload_file_txn = upload_file_txn.clone();
1692 }
1693
1694 store
1697 .save_dependent_queued_request(
1698 &self.room_id,
1699 &last_upload_file_txn,
1700 send_event_txn.clone().into(),
1701 created_at,
1702 DependentQueuedRequestKind::FinishGallery {
1703 local_echo: Box::new(event),
1704 item_infos: finish_item_infos,
1705 },
1706 )
1707 .await?;
1708
1709 self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1710
1711 Ok(())
1712 }
1713
1714 #[allow(clippy::too_many_arguments)]
1720 async fn push_thumbnail_and_media_uploads(
1721 &self,
1722 store: &DynStateStore,
1723 content_type: &Mime,
1724 send_event_txn: OwnedTransactionId,
1725 created_at: MilliSecondsSinceUnixEpoch,
1726 upload_file_txn: OwnedTransactionId,
1727 file_media_request: MediaRequestParameters,
1728 thumbnail: Option<QueueThumbnailInfo>,
1729 ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1730 if let Some(QueueThumbnailInfo {
1731 finish_upload_thumbnail_info: thumbnail_info,
1732 media_request_parameters: thumbnail_media_request,
1733 content_type: thumbnail_content_type,
1734 ..
1735 }) = thumbnail
1736 {
1737 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1738
1739 store
1741 .save_send_queue_request(
1742 &self.room_id,
1743 upload_thumbnail_txn.clone(),
1744 created_at,
1745 QueuedRequestKind::MediaUpload {
1746 content_type: thumbnail_content_type.to_string(),
1747 cache_key: thumbnail_media_request,
1748 thumbnail_source: None, related_to: send_event_txn.clone(),
1750 #[cfg(feature = "unstable-msc4274")]
1751 accumulated: vec![],
1752 },
1753 Self::LOW_PRIORITY,
1754 )
1755 .await?;
1756
1757 store
1759 .save_dependent_queued_request(
1760 &self.room_id,
1761 &upload_thumbnail_txn,
1762 upload_file_txn.into(),
1763 created_at,
1764 DependentQueuedRequestKind::UploadFileOrThumbnail {
1765 content_type: content_type.to_string(),
1766 cache_key: file_media_request,
1767 related_to: send_event_txn,
1768 parent_is_thumbnail_upload: true,
1769 },
1770 )
1771 .await?;
1772
1773 Ok(Some(thumbnail_info))
1774 } else {
1775 store
1777 .save_send_queue_request(
1778 &self.room_id,
1779 upload_file_txn,
1780 created_at,
1781 QueuedRequestKind::MediaUpload {
1782 content_type: content_type.to_string(),
1783 cache_key: file_media_request,
1784 thumbnail_source: None,
1785 related_to: send_event_txn,
1786 #[cfg(feature = "unstable-msc4274")]
1787 accumulated: vec![],
1788 },
1789 Self::LOW_PRIORITY,
1790 )
1791 .await?;
1792
1793 Ok(None)
1794 }
1795 }
1796
1797 #[instrument(skip(self))]
1799 async fn react(
1800 &self,
1801 transaction_id: &TransactionId,
1802 key: String,
1803 created_at: MilliSecondsSinceUnixEpoch,
1804 ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1805 let guard = self.store.lock().await;
1806 let client = guard.client()?;
1807 let store = client.state_store();
1808
1809 let requests = store.load_send_queue_requests(&self.room_id).await?;
1810
1811 if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1813 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1816 if !dependent_requests
1817 .into_iter()
1818 .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1819 .any(|child_txn| *child_txn == *transaction_id)
1820 {
1821 return Ok(None);
1823 }
1824 }
1825
1826 let reaction_txn_id = ChildTransactionId::new();
1828 store
1829 .save_dependent_queued_request(
1830 &self.room_id,
1831 transaction_id,
1832 reaction_txn_id.clone(),
1833 created_at,
1834 DependentQueuedRequestKind::ReactEvent { key },
1835 )
1836 .await?;
1837
1838 Ok(Some(reaction_txn_id))
1839 }
1840
1841 async fn local_echoes(
1844 &self,
1845 room: &RoomSendQueue,
1846 ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1847 let guard = self.store.lock().await;
1848 let client = guard.client()?;
1849 let store = client.state_store();
1850
1851 let local_requests =
1852 store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1853 Some(LocalEcho {
1854 transaction_id: queued.transaction_id.clone(),
1855 content: match queued.kind {
1856 QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1857 serialized_event: content,
1858 send_handle: SendHandle {
1859 room: room.clone(),
1860 transaction_id: queued.transaction_id,
1861 media_handles: vec![],
1862 created_at: queued.created_at,
1863 },
1864 send_error: queued.error,
1865 },
1866
1867 QueuedRequestKind::MediaUpload { .. } => {
1868 return None;
1871 }
1872 },
1873 })
1874 });
1875
1876 let reactions_and_medias = store
1877 .load_dependent_queued_requests(&self.room_id)
1878 .await?
1879 .into_iter()
1880 .filter_map(|dep| match dep.kind {
1881 DependentQueuedRequestKind::EditEvent { .. }
1882 | DependentQueuedRequestKind::RedactEvent => {
1883 None
1885 }
1886
1887 DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1888 transaction_id: dep.own_transaction_id.clone().into(),
1889 content: LocalEchoContent::React {
1890 key,
1891 send_handle: SendReactionHandle {
1892 room: room.clone(),
1893 transaction_id: dep.own_transaction_id,
1894 },
1895 applies_to: dep.parent_transaction_id,
1896 },
1897 }),
1898
1899 DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
1900 None
1902 }
1903
1904 DependentQueuedRequestKind::FinishUpload {
1905 local_echo,
1906 file_upload,
1907 thumbnail_info,
1908 } => {
1909 Some(LocalEcho {
1911 transaction_id: dep.own_transaction_id.clone().into(),
1912 content: LocalEchoContent::Event {
1913 serialized_event: SerializableEventContent::new(&(*local_echo).into())
1914 .ok()?,
1915 send_handle: SendHandle {
1916 room: room.clone(),
1917 transaction_id: dep.own_transaction_id.into(),
1918 media_handles: vec![MediaHandles {
1919 upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1920 upload_file_txn: file_upload,
1921 }],
1922 created_at: dep.created_at,
1923 },
1924 send_error: None,
1925 },
1926 })
1927 }
1928
1929 #[cfg(feature = "unstable-msc4274")]
1930 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1931 self.create_gallery_local_echo(
1933 dep.own_transaction_id,
1934 room,
1935 dep.created_at,
1936 local_echo,
1937 item_infos,
1938 )
1939 }
1940 });
1941
1942 Ok(local_requests.chain(reactions_and_medias).collect())
1943 }
1944
1945 #[cfg(feature = "unstable-msc4274")]
1947 fn create_gallery_local_echo(
1948 &self,
1949 transaction_id: ChildTransactionId,
1950 room: &RoomSendQueue,
1951 created_at: MilliSecondsSinceUnixEpoch,
1952 local_echo: Box<RoomMessageEventContent>,
1953 item_infos: Vec<FinishGalleryItemInfo>,
1954 ) -> Option<LocalEcho> {
1955 Some(LocalEcho {
1956 transaction_id: transaction_id.clone().into(),
1957 content: LocalEchoContent::Event {
1958 serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
1959 send_handle: SendHandle {
1960 room: room.clone(),
1961 transaction_id: transaction_id.into(),
1962 media_handles: item_infos
1963 .into_iter()
1964 .map(|i| MediaHandles {
1965 upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
1966 upload_file_txn: i.file_upload,
1967 })
1968 .collect(),
1969 created_at,
1970 },
1971 send_error: None,
1972 },
1973 })
1974 }
1975
1976 #[instrument(skip_all)]
1984 async fn try_apply_single_dependent_request(
1985 &self,
1986 client: &Client,
1987 dependent_request: DependentQueuedRequest,
1988 new_updates: &mut Vec<RoomSendQueueUpdate>,
1989 ) -> Result<bool, RoomSendQueueError> {
1990 let store = client.state_store();
1991
1992 let parent_key = dependent_request.parent_key;
1993
1994 match dependent_request.kind {
1995 DependentQueuedRequestKind::EditEvent { new_content } => {
1996 if let Some(parent_key) = parent_key {
1997 let Some(event_id) = parent_key.into_event_id() else {
1998 return Err(RoomSendQueueError::StorageError(
1999 RoomSendQueueStorageError::InvalidParentKey,
2000 ));
2001 };
2002
2003 let room = client
2005 .get_room(&self.room_id)
2006 .ok_or(RoomSendQueueError::RoomDisappeared)?;
2007
2008 let edited_content = match new_content.deserialize() {
2012 Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
2013 EditedContent::RoomMessage(c.into())
2015 }
2016
2017 Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
2018 let poll_start = c.poll_start().clone();
2019 EditedContent::PollStart {
2020 fallback_text: poll_start.question.text.clone(),
2021 new_content: poll_start,
2022 }
2023 }
2024
2025 Ok(c) => {
2026 warn!("Unsupported edit content type: {:?}", c.event_type());
2027 return Ok(true);
2028 }
2029
2030 Err(err) => {
2031 warn!("Unable to deserialize: {err}");
2032 return Ok(true);
2033 }
2034 };
2035
2036 let edit_event = match room.make_edit_event(&event_id, edited_content).await {
2037 Ok(e) => e,
2038 Err(err) => {
2039 warn!("couldn't create edited event: {err}");
2040 return Ok(true);
2041 }
2042 };
2043
2044 let serializable = SerializableEventContent::from_raw(
2046 Raw::new(&edit_event)
2047 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2048 edit_event.event_type().to_string(),
2049 );
2050
2051 store
2052 .save_send_queue_request(
2053 &self.room_id,
2054 dependent_request.own_transaction_id.into(),
2055 dependent_request.created_at,
2056 serializable.into(),
2057 Self::HIGH_PRIORITY,
2058 )
2059 .await
2060 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2061 } else {
2062 let edited = store
2064 .update_send_queue_request(
2065 &self.room_id,
2066 &dependent_request.parent_transaction_id,
2067 new_content.into(),
2068 )
2069 .await
2070 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2071
2072 if !edited {
2073 warn!("missing local echo upon dependent edit");
2074 }
2075 }
2076 }
2077
2078 DependentQueuedRequestKind::RedactEvent => {
2079 if let Some(parent_key) = parent_key {
2080 let Some(event_id) = parent_key.into_event_id() else {
2081 return Err(RoomSendQueueError::StorageError(
2082 RoomSendQueueStorageError::InvalidParentKey,
2083 ));
2084 };
2085
2086 let room = client
2088 .get_room(&self.room_id)
2089 .ok_or(RoomSendQueueError::RoomDisappeared)?;
2090
2091 if let Err(err) = room
2099 .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
2100 .await
2101 {
2102 warn!("error when sending a redact for {event_id}: {err}");
2103 return Ok(false);
2104 }
2105 } else {
2106 let removed = store
2109 .remove_send_queue_request(
2110 &self.room_id,
2111 &dependent_request.parent_transaction_id,
2112 )
2113 .await
2114 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2115
2116 if !removed {
2117 warn!("missing local echo upon dependent redact");
2118 }
2119 }
2120 }
2121
2122 DependentQueuedRequestKind::ReactEvent { key } => {
2123 if let Some(parent_key) = parent_key {
2124 let Some(parent_event_id) = parent_key.into_event_id() else {
2125 return Err(RoomSendQueueError::StorageError(
2126 RoomSendQueueStorageError::InvalidParentKey,
2127 ));
2128 };
2129
2130 let react_event =
2132 ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
2133 let serializable = SerializableEventContent::from_raw(
2134 Raw::new(&react_event)
2135 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2136 react_event.event_type().to_string(),
2137 );
2138
2139 store
2140 .save_send_queue_request(
2141 &self.room_id,
2142 dependent_request.own_transaction_id.into(),
2143 dependent_request.created_at,
2144 serializable.into(),
2145 Self::HIGH_PRIORITY,
2146 )
2147 .await
2148 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2149 } else {
2150 return Ok(false);
2152 }
2153 }
2154
2155 DependentQueuedRequestKind::UploadFileOrThumbnail {
2156 content_type,
2157 cache_key,
2158 related_to,
2159 parent_is_thumbnail_upload,
2160 } => {
2161 let Some(parent_key) = parent_key else {
2162 return Ok(false);
2164 };
2165 self.handle_dependent_file_or_thumbnail_upload(
2166 client,
2167 dependent_request.own_transaction_id.into(),
2168 parent_key,
2169 content_type,
2170 cache_key,
2171 related_to,
2172 parent_is_thumbnail_upload,
2173 )
2174 .await?;
2175 }
2176
2177 DependentQueuedRequestKind::FinishUpload {
2178 local_echo,
2179 file_upload,
2180 thumbnail_info,
2181 } => {
2182 let Some(parent_key) = parent_key else {
2183 return Ok(false);
2185 };
2186 self.handle_dependent_finish_upload(
2187 client,
2188 dependent_request.own_transaction_id.into(),
2189 parent_key,
2190 *local_echo,
2191 file_upload,
2192 thumbnail_info,
2193 new_updates,
2194 )
2195 .await?;
2196 }
2197
2198 #[cfg(feature = "unstable-msc4274")]
2199 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2200 let Some(parent_key) = parent_key else {
2201 return Ok(false);
2203 };
2204 self.handle_dependent_finish_gallery_upload(
2205 client,
2206 dependent_request.own_transaction_id.into(),
2207 parent_key,
2208 *local_echo,
2209 item_infos,
2210 new_updates,
2211 )
2212 .await?;
2213 }
2214 }
2215
2216 Ok(true)
2217 }
2218
2219 #[instrument(skip(self))]
2220 async fn apply_dependent_requests(
2221 &self,
2222 new_updates: &mut Vec<RoomSendQueueUpdate>,
2223 ) -> Result<(), RoomSendQueueError> {
2224 let guard = self.store.lock().await;
2225
2226 let client = guard.client()?;
2227 let store = client.state_store();
2228
2229 let dependent_requests = store
2230 .load_dependent_queued_requests(&self.room_id)
2231 .await
2232 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2233
2234 let num_initial_dependent_requests = dependent_requests.len();
2235 if num_initial_dependent_requests == 0 {
2236 return Ok(());
2238 }
2239
2240 let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
2241
2242 for original in &dependent_requests {
2244 if !canonicalized_dependent_requests
2245 .iter()
2246 .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
2247 {
2248 store
2249 .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
2250 .await
2251 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2252 }
2253 }
2254
2255 let mut num_dependent_requests = canonicalized_dependent_requests.len();
2256
2257 debug!(
2258 num_dependent_requests,
2259 num_initial_dependent_requests, "starting handling of dependent requests"
2260 );
2261
2262 for dependent in canonicalized_dependent_requests {
2263 let dependent_id = dependent.own_transaction_id.clone();
2264
2265 match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
2266 Ok(should_remove) => {
2267 if should_remove {
2268 store
2270 .remove_dependent_queued_request(&self.room_id, &dependent_id)
2271 .await
2272 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2273
2274 num_dependent_requests -= 1;
2275 }
2276 }
2277
2278 Err(err) => {
2279 warn!("error when applying single dependent request: {err}");
2280 }
2281 }
2282 }
2283
2284 debug!(
2285 leftover_dependent_requests = num_dependent_requests,
2286 "stopped handling dependent request"
2287 );
2288
2289 Ok(())
2290 }
2291
2292 async fn remove_dependent_send_queue_request(
2294 &self,
2295 dependent_event_id: &ChildTransactionId,
2296 ) -> Result<bool, RoomSendQueueStorageError> {
2297 Ok(self
2298 .store
2299 .lock()
2300 .await
2301 .client()?
2302 .state_store()
2303 .remove_dependent_queued_request(&self.room_id, dependent_event_id)
2304 .await?)
2305 }
2306}
2307
2308#[cfg(feature = "unstable-msc4274")]
2309struct GalleryItemQueueInfo {
2311 content_type: Mime,
2312 upload_file_txn: OwnedTransactionId,
2313 file_media_request: MediaRequestParameters,
2314 thumbnail: Option<QueueThumbnailInfo>,
2315}
2316
2317#[derive(Clone, Debug)]
2319pub enum LocalEchoContent {
2320 Event {
2322 serialized_event: SerializableEventContent,
2325 send_handle: SendHandle,
2327 send_error: Option<QueueWedgeError>,
2330 },
2331
2332 React {
2334 key: String,
2336 send_handle: SendReactionHandle,
2338 applies_to: OwnedTransactionId,
2340 },
2341}
2342
2343#[derive(Clone, Debug)]
2346pub struct LocalEcho {
2347 pub transaction_id: OwnedTransactionId,
2349 pub content: LocalEchoContent,
2351}
2352
2353#[derive(Clone, Debug)]
2356pub enum RoomSendQueueUpdate {
2357 NewLocalEvent(LocalEcho),
2362
2363 CancelledLocalEvent {
2366 transaction_id: OwnedTransactionId,
2368 },
2369
2370 ReplacedLocalEvent {
2372 transaction_id: OwnedTransactionId,
2374
2375 new_content: SerializableEventContent,
2377 },
2378
2379 SendError {
2384 transaction_id: OwnedTransactionId,
2386 error: Arc<crate::Error>,
2388 is_recoverable: bool,
2394 },
2395
2396 RetryEvent {
2398 transaction_id: OwnedTransactionId,
2400 },
2401
2402 SentEvent {
2405 transaction_id: OwnedTransactionId,
2407 event_id: OwnedEventId,
2409 },
2410
2411 MediaUpload {
2414 related_to: OwnedTransactionId,
2416
2417 file: Option<MediaSource>,
2419
2420 index: u64,
2424
2425 progress: AbstractProgress,
2429 },
2430}
2431
2432#[derive(Clone, Debug)]
2437pub struct SendQueueUpdate {
2438 pub room_id: OwnedRoomId,
2440
2441 pub update: RoomSendQueueUpdate,
2443}
2444
2445#[derive(Debug, thiserror::Error)]
2447pub enum RoomSendQueueError {
2448 #[error("the room isn't in the joined state")]
2450 RoomNotJoined,
2451
2452 #[error("the room is now missing from the client")]
2456 RoomDisappeared,
2457
2458 #[error(transparent)]
2460 StorageError(#[from] RoomSendQueueStorageError),
2461
2462 #[error("the attachment event could not be created")]
2464 FailedToCreateAttachment,
2465
2466 #[cfg(feature = "unstable-msc4274")]
2468 #[error("the gallery contains no items")]
2469 EmptyGallery,
2470
2471 #[cfg(feature = "unstable-msc4274")]
2473 #[error("the gallery event could not be created")]
2474 FailedToCreateGallery,
2475}
2476
2477#[derive(Debug, thiserror::Error)]
2479pub enum RoomSendQueueStorageError {
2480 #[error(transparent)]
2482 StateStoreError(#[from] StoreError),
2483
2484 #[error(transparent)]
2486 EventCacheStoreError(#[from] EventCacheStoreError),
2487
2488 #[error(transparent)]
2490 MediaStoreError(#[from] MediaStoreError),
2491
2492 #[error(transparent)]
2494 LockError(#[from] CrossProcessLockError),
2495
2496 #[error(transparent)]
2498 JsonSerialization(#[from] serde_json::Error),
2499
2500 #[error("a dependent event had an invalid parent key type")]
2503 InvalidParentKey,
2504
2505 #[error("The client is shutting down.")]
2507 ClientShuttingDown,
2508
2509 #[error("This operation is not implemented for media uploads")]
2511 OperationNotImplementedYet,
2512
2513 #[error("Can't edit a media caption when the underlying event isn't a media")]
2515 InvalidMediaCaptionEdit,
2516}
2517
2518#[derive(Clone, Debug)]
2520struct MediaHandles {
2521 upload_thumbnail_txn: Option<OwnedTransactionId>,
2525
2526 upload_file_txn: OwnedTransactionId,
2528}
2529
2530#[derive(Clone, Debug)]
2532pub struct SendHandle {
2533 room: RoomSendQueue,
2535
2536 transaction_id: OwnedTransactionId,
2541
2542 media_handles: Vec<MediaHandles>,
2544
2545 pub created_at: MilliSecondsSinceUnixEpoch,
2547}
2548
2549impl SendHandle {
2550 #[cfg(test)]
2552 pub(crate) fn new(
2553 room: RoomSendQueue,
2554 transaction_id: OwnedTransactionId,
2555 created_at: MilliSecondsSinceUnixEpoch,
2556 ) -> Self {
2557 Self { room, transaction_id, media_handles: vec![], created_at }
2558 }
2559
2560 fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2561 if !self.media_handles.is_empty() {
2562 Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2563 } else {
2564 Ok(())
2565 }
2566 }
2567
2568 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2573 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2574 trace!("received an abort request");
2575
2576 let queue = &self.room.inner.queue;
2577
2578 for handles in &self.media_handles {
2579 if queue.abort_upload(&self.transaction_id, handles).await? {
2580 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2582 transaction_id: self.transaction_id.clone(),
2583 });
2584
2585 return Ok(true);
2586 }
2587
2588 }
2592
2593 if queue.cancel_event(&self.transaction_id).await? {
2594 trace!("successful abort");
2595
2596 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2598 transaction_id: self.transaction_id.clone(),
2599 });
2600
2601 Ok(true)
2602 } else {
2603 debug!("local echo didn't exist anymore, can't abort");
2604 Ok(false)
2605 }
2606 }
2607
2608 #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2613 pub async fn edit_raw(
2614 &self,
2615 new_content: Raw<AnyMessageLikeEventContent>,
2616 event_type: String,
2617 ) -> Result<bool, RoomSendQueueStorageError> {
2618 trace!("received an edit request");
2619 self.nyi_for_uploads()?;
2620
2621 let serializable = SerializableEventContent::from_raw(new_content, event_type);
2622
2623 if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2624 trace!("successful edit");
2625
2626 self.room.inner.notifier.notify_one();
2628
2629 self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2631 transaction_id: self.transaction_id.clone(),
2632 new_content: serializable,
2633 });
2634
2635 Ok(true)
2636 } else {
2637 debug!("local echo doesn't exist anymore, can't edit");
2638 Ok(false)
2639 }
2640 }
2641
2642 pub async fn edit(
2647 &self,
2648 new_content: AnyMessageLikeEventContent,
2649 ) -> Result<bool, RoomSendQueueStorageError> {
2650 self.edit_raw(
2651 Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2652 new_content.event_type().to_string(),
2653 )
2654 .await
2655 }
2656
2657 pub async fn edit_media_caption(
2662 &self,
2663 caption: Option<String>,
2664 formatted_caption: Option<FormattedBody>,
2665 mentions: Option<Mentions>,
2666 ) -> Result<bool, RoomSendQueueStorageError> {
2667 if let Some(new_content) = self
2668 .room
2669 .inner
2670 .queue
2671 .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2672 .await?
2673 {
2674 trace!("successful edit of media caption");
2675
2676 self.room.inner.notifier.notify_one();
2678
2679 let new_content = SerializableEventContent::new(&new_content)
2680 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2681
2682 self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2684 transaction_id: self.transaction_id.clone(),
2685 new_content,
2686 });
2687
2688 Ok(true)
2689 } else {
2690 debug!("local echo doesn't exist anymore, can't edit media caption");
2691 Ok(false)
2692 }
2693 }
2694
2695 pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2698 let room = &self.room.inner;
2699 room.queue
2700 .mark_as_unwedged(&self.transaction_id)
2701 .await
2702 .map_err(RoomSendQueueError::StorageError)?;
2703
2704 for handles in &self.media_handles {
2712 room.queue
2713 .mark_as_unwedged(&handles.upload_file_txn)
2714 .await
2715 .map_err(RoomSendQueueError::StorageError)?;
2716
2717 if let Some(txn) = &handles.upload_thumbnail_txn {
2718 room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2719 }
2720 }
2721
2722 room.notifier.notify_one();
2724
2725 self.room.send_update(RoomSendQueueUpdate::RetryEvent {
2726 transaction_id: self.transaction_id.clone(),
2727 });
2728
2729 Ok(())
2730 }
2731
2732 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2737 pub async fn react(
2738 &self,
2739 key: String,
2740 ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2741 trace!("received an intent to react");
2742
2743 let created_at = MilliSecondsSinceUnixEpoch::now();
2744 if let Some(reaction_txn_id) =
2745 self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2746 {
2747 trace!("successfully queued react");
2748
2749 self.room.inner.notifier.notify_one();
2751
2752 let send_handle = SendReactionHandle {
2754 room: self.room.clone(),
2755 transaction_id: reaction_txn_id.clone(),
2756 };
2757
2758 self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2759 transaction_id: reaction_txn_id.into(),
2762 content: LocalEchoContent::React {
2763 key,
2764 send_handle: send_handle.clone(),
2765 applies_to: self.transaction_id.clone(),
2766 },
2767 }));
2768
2769 Ok(Some(send_handle))
2770 } else {
2771 debug!("local echo doesn't exist anymore, can't react");
2772 Ok(None)
2773 }
2774 }
2775}
2776
2777#[derive(Clone, Debug)]
2779pub struct SendReactionHandle {
2780 room: RoomSendQueue,
2782 transaction_id: ChildTransactionId,
2784}
2785
2786impl SendReactionHandle {
2787 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2792 if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2793 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2797 transaction_id: self.transaction_id.clone().into(),
2798 });
2799
2800 return Ok(true);
2801 }
2802
2803 let handle = SendHandle {
2806 room: self.room.clone(),
2807 transaction_id: self.transaction_id.clone().into(),
2808 media_handles: vec![],
2809 created_at: MilliSecondsSinceUnixEpoch::now(),
2810 };
2811
2812 handle.abort().await
2813 }
2814
2815 pub fn transaction_id(&self) -> &TransactionId {
2817 &self.transaction_id
2818 }
2819}
2820
2821fn canonicalize_dependent_requests(
2825 dependent: &[DependentQueuedRequest],
2826) -> Vec<DependentQueuedRequest> {
2827 let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2828
2829 for d in dependent {
2830 let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2831
2832 if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2833 continue;
2836 }
2837
2838 match &d.kind {
2839 DependentQueuedRequestKind::EditEvent { .. } => {
2840 if let Some(prev_edit) = prevs
2842 .iter_mut()
2843 .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2844 {
2845 *prev_edit = d;
2846 } else {
2847 prevs.insert(0, d);
2848 }
2849 }
2850
2851 DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
2852 | DependentQueuedRequestKind::FinishUpload { .. }
2853 | DependentQueuedRequestKind::ReactEvent { .. } => {
2854 prevs.push(d);
2856 }
2857
2858 #[cfg(feature = "unstable-msc4274")]
2859 DependentQueuedRequestKind::FinishGallery { .. } => {
2860 prevs.push(d);
2862 }
2863
2864 DependentQueuedRequestKind::RedactEvent => {
2865 prevs.clear();
2867 prevs.push(d);
2868 }
2869 }
2870 }
2871
2872 by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2873}
2874
2875#[cfg(all(test, not(target_family = "wasm")))]
2876mod tests {
2877 use std::{sync::Arc, time::Duration};
2878
2879 use assert_matches2::{assert_let, assert_matches};
2880 use matrix_sdk_base::store::{
2881 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2882 SerializableEventContent,
2883 };
2884 use matrix_sdk_test::{JoinedRoomBuilder, SyncResponseBuilder, async_test};
2885 use ruma::{
2886 MilliSecondsSinceUnixEpoch, TransactionId,
2887 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
2888 room_id,
2889 };
2890
2891 use super::canonicalize_dependent_requests;
2892 use crate::{client::WeakClient, test_utils::logged_in_client};
2893
2894 #[test]
2895 fn test_canonicalize_dependent_events_created_at() {
2896 let txn = TransactionId::new();
2899 let created_at = MilliSecondsSinceUnixEpoch::now();
2900
2901 let edit = DependentQueuedRequest {
2902 own_transaction_id: ChildTransactionId::new(),
2903 parent_transaction_id: txn.clone(),
2904 kind: DependentQueuedRequestKind::EditEvent {
2905 new_content: SerializableEventContent::new(
2906 &RoomMessageEventContent::text_plain("edit").into(),
2907 )
2908 .unwrap(),
2909 },
2910 parent_key: None,
2911 created_at,
2912 };
2913
2914 let res = canonicalize_dependent_requests(&[edit]);
2915
2916 assert_eq!(res.len(), 1);
2917 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2918 assert_let!(
2919 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2920 );
2921 assert_eq!(msg.body(), "edit");
2922 assert_eq!(res[0].parent_transaction_id, txn);
2923 assert_eq!(res[0].created_at, created_at);
2924 }
2925
2926 #[async_test]
2927 async fn test_client_no_cycle_with_send_queue() {
2928 for enabled in [true, false] {
2929 let client = logged_in_client(None).await;
2930 let weak_client = WeakClient::from_client(&client);
2931
2932 {
2933 let mut sync_response_builder = SyncResponseBuilder::new();
2934
2935 let room_id = room_id!("!a:b.c");
2936
2937 client
2939 .base_client()
2940 .receive_sync_response(
2941 sync_response_builder
2942 .add_joined_room(JoinedRoomBuilder::new(room_id))
2943 .build_sync_response(),
2944 )
2945 .await
2946 .unwrap();
2947
2948 let room = client.get_room(room_id).unwrap();
2949 let q = room.send_queue();
2950
2951 let _watcher = q.subscribe().await;
2952
2953 client.send_queue().set_enabled(enabled).await;
2954 }
2955
2956 drop(client);
2957
2958 tokio::time::sleep(Duration::from_millis(500)).await;
2960
2961 let client = weak_client.get();
2963 assert!(
2964 client.is_none(),
2965 "too many strong references to the client: {}",
2966 Arc::strong_count(&client.unwrap().inner)
2967 );
2968 }
2969 }
2970
2971 #[test]
2972 fn test_canonicalize_dependent_events_smoke_test() {
2973 let txn = TransactionId::new();
2975
2976 let edit = DependentQueuedRequest {
2977 own_transaction_id: ChildTransactionId::new(),
2978 parent_transaction_id: txn.clone(),
2979 kind: DependentQueuedRequestKind::EditEvent {
2980 new_content: SerializableEventContent::new(
2981 &RoomMessageEventContent::text_plain("edit").into(),
2982 )
2983 .unwrap(),
2984 },
2985 parent_key: None,
2986 created_at: MilliSecondsSinceUnixEpoch::now(),
2987 };
2988 let res = canonicalize_dependent_requests(&[edit]);
2989
2990 assert_eq!(res.len(), 1);
2991 assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2992 assert_eq!(res[0].parent_transaction_id, txn);
2993 assert!(res[0].parent_key.is_none());
2994 }
2995
2996 #[test]
2997 fn test_canonicalize_dependent_events_redaction_preferred() {
2998 let txn = TransactionId::new();
3000
3001 let mut inputs = Vec::with_capacity(100);
3002 let redact = DependentQueuedRequest {
3003 own_transaction_id: ChildTransactionId::new(),
3004 parent_transaction_id: txn.clone(),
3005 kind: DependentQueuedRequestKind::RedactEvent,
3006 parent_key: None,
3007 created_at: MilliSecondsSinceUnixEpoch::now(),
3008 };
3009
3010 let edit = DependentQueuedRequest {
3011 own_transaction_id: ChildTransactionId::new(),
3012 parent_transaction_id: txn.clone(),
3013 kind: DependentQueuedRequestKind::EditEvent {
3014 new_content: SerializableEventContent::new(
3015 &RoomMessageEventContent::text_plain("edit").into(),
3016 )
3017 .unwrap(),
3018 },
3019 parent_key: None,
3020 created_at: MilliSecondsSinceUnixEpoch::now(),
3021 };
3022
3023 inputs.push({
3024 let mut edit = edit.clone();
3025 edit.own_transaction_id = ChildTransactionId::new();
3026 edit
3027 });
3028
3029 inputs.push(redact);
3030
3031 for _ in 0..98 {
3032 let mut edit = edit.clone();
3033 edit.own_transaction_id = ChildTransactionId::new();
3034 inputs.push(edit);
3035 }
3036
3037 let res = canonicalize_dependent_requests(&inputs);
3038
3039 assert_eq!(res.len(), 1);
3040 assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
3041 assert_eq!(res[0].parent_transaction_id, txn);
3042 }
3043
3044 #[test]
3045 fn test_canonicalize_dependent_events_last_edit_preferred() {
3046 let parent_txn = TransactionId::new();
3047
3048 let inputs = (0..10)
3050 .map(|i| DependentQueuedRequest {
3051 own_transaction_id: ChildTransactionId::new(),
3052 parent_transaction_id: parent_txn.clone(),
3053 kind: DependentQueuedRequestKind::EditEvent {
3054 new_content: SerializableEventContent::new(
3055 &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
3056 )
3057 .unwrap(),
3058 },
3059 parent_key: None,
3060 created_at: MilliSecondsSinceUnixEpoch::now(),
3061 })
3062 .collect::<Vec<_>>();
3063
3064 let txn = inputs[9].parent_transaction_id.clone();
3065
3066 let res = canonicalize_dependent_requests(&inputs);
3067
3068 assert_eq!(res.len(), 1);
3069 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
3070 assert_let!(
3071 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
3072 );
3073 assert_eq!(msg.body(), "edit9");
3074 assert_eq!(res[0].parent_transaction_id, txn);
3075 }
3076
3077 #[test]
3078 fn test_canonicalize_multiple_local_echoes() {
3079 let txn1 = TransactionId::new();
3080 let txn2 = TransactionId::new();
3081
3082 let child1 = ChildTransactionId::new();
3083 let child2 = ChildTransactionId::new();
3084
3085 let inputs = vec![
3086 DependentQueuedRequest {
3088 own_transaction_id: child1.clone(),
3089 kind: DependentQueuedRequestKind::RedactEvent,
3090 parent_transaction_id: txn1.clone(),
3091 parent_key: None,
3092 created_at: MilliSecondsSinceUnixEpoch::now(),
3093 },
3094 DependentQueuedRequest {
3096 own_transaction_id: child2,
3097 kind: DependentQueuedRequestKind::EditEvent {
3098 new_content: SerializableEventContent::new(
3099 &RoomMessageEventContent::text_plain("edit").into(),
3100 )
3101 .unwrap(),
3102 },
3103 parent_transaction_id: txn2.clone(),
3104 parent_key: None,
3105 created_at: MilliSecondsSinceUnixEpoch::now(),
3106 },
3107 ];
3108
3109 let res = canonicalize_dependent_requests(&inputs);
3110
3111 assert_eq!(res.len(), 2);
3113
3114 for dependent in res {
3115 if dependent.own_transaction_id == child1 {
3116 assert_eq!(dependent.parent_transaction_id, txn1);
3117 assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
3118 } else {
3119 assert_eq!(dependent.parent_transaction_id, txn2);
3120 assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
3121 }
3122 }
3123 }
3124
3125 #[test]
3126 fn test_canonicalize_reactions_after_edits() {
3127 let txn = TransactionId::new();
3129
3130 let react_id = ChildTransactionId::new();
3131 let react = DependentQueuedRequest {
3132 own_transaction_id: react_id.clone(),
3133 kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
3134 parent_transaction_id: txn.clone(),
3135 parent_key: None,
3136 created_at: MilliSecondsSinceUnixEpoch::now(),
3137 };
3138
3139 let edit_id = ChildTransactionId::new();
3140 let edit = DependentQueuedRequest {
3141 own_transaction_id: edit_id.clone(),
3142 kind: DependentQueuedRequestKind::EditEvent {
3143 new_content: SerializableEventContent::new(
3144 &RoomMessageEventContent::text_plain("edit").into(),
3145 )
3146 .unwrap(),
3147 },
3148 parent_transaction_id: txn,
3149 parent_key: None,
3150 created_at: MilliSecondsSinceUnixEpoch::now(),
3151 };
3152
3153 let res = canonicalize_dependent_requests(&[react, edit]);
3154
3155 assert_eq!(res.len(), 2);
3156 assert_eq!(res[0].own_transaction_id, edit_id);
3157 assert_eq!(res[1].own_transaction_id, react_id);
3158 }
3159}