matrix_sdk_base/store/send_queue.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All data types related to the send queue.
16
17use std::{collections::BTreeMap, fmt, ops::Deref};
18
19use as_variant::as_variant;
20use ruma::{
21 events::{
22 room::{message::RoomMessageEventContent, MediaSource},
23 AnyMessageLikeEventContent, EventContent as _, RawExt as _,
24 },
25 serde::Raw,
26 MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId,
27 TransactionId, UInt,
28};
29use serde::{Deserialize, Serialize};
30
31use crate::media::MediaRequestParameters;
32
33/// A thin wrapper to serialize a `AnyMessageLikeEventContent`.
34#[derive(Clone, Serialize, Deserialize)]
35pub struct SerializableEventContent {
36 event: Raw<AnyMessageLikeEventContent>,
37 event_type: String,
38}
39
40#[cfg(not(tarpaulin_include))]
41impl fmt::Debug for SerializableEventContent {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 // Don't include the event in the debug display.
44 f.debug_struct("SerializedEventContent")
45 .field("event_type", &self.event_type)
46 .finish_non_exhaustive()
47 }
48}
49
50impl SerializableEventContent {
51 /// Create a [`SerializableEventContent`] from a raw
52 /// [`AnyMessageLikeEventContent`] along with its type.
53 pub fn from_raw(event: Raw<AnyMessageLikeEventContent>, event_type: String) -> Self {
54 Self { event_type, event }
55 }
56
57 /// Create a [`SerializableEventContent`] from an
58 /// [`AnyMessageLikeEventContent`].
59 pub fn new(event: &AnyMessageLikeEventContent) -> Result<Self, serde_json::Error> {
60 Ok(Self::from_raw(Raw::new(event)?, event.event_type().to_string()))
61 }
62
63 /// Convert a [`SerializableEventContent`] back into a
64 /// [`AnyMessageLikeEventContent`].
65 pub fn deserialize(&self) -> Result<AnyMessageLikeEventContent, serde_json::Error> {
66 self.event.deserialize_with_type(self.event_type.clone().into())
67 }
68
69 /// Returns the raw event content along with its type.
70 ///
71 /// Useful for callers manipulating custom events.
72 pub fn raw(&self) -> (&Raw<AnyMessageLikeEventContent>, &str) {
73 (&self.event, &self.event_type)
74 }
75}
76
77/// The kind of a send queue request.
78#[derive(Clone, Debug, Serialize, Deserialize)]
79pub enum QueuedRequestKind {
80 /// An event to be sent via the send queue.
81 Event {
82 /// The content of the message-like event we'd like to send.
83 content: SerializableEventContent,
84 },
85
86 /// Content to upload on the media server.
87 ///
88 /// The bytes must be stored in the media cache, and are identified by the
89 /// cache key.
90 MediaUpload {
91 /// Content type of the media to be uploaded.
92 ///
93 /// Stored as a `String` because `Mime` which we'd really want to use
94 /// here, is not serializable. Oh well.
95 content_type: String,
96
97 /// The cache key used to retrieve the media's bytes in the event cache
98 /// store.
99 cache_key: MediaRequestParameters,
100
101 /// An optional media source for a thumbnail already uploaded.
102 thumbnail_source: Option<MediaSource>,
103
104 /// To which media event transaction does this upload relate?
105 related_to: OwnedTransactionId,
106
107 /// Accumulated list of infos for previously uploaded files and
108 /// thumbnails if used during a gallery transaction. Otherwise empty.
109 #[cfg(feature = "unstable-msc4274")]
110 #[serde(default)]
111 accumulated: Vec<AccumulatedSentMediaInfo>,
112 },
113}
114
115impl From<SerializableEventContent> for QueuedRequestKind {
116 fn from(content: SerializableEventContent) -> Self {
117 Self::Event { content }
118 }
119}
120
121/// A request to be sent with a send queue.
122#[derive(Clone)]
123pub struct QueuedRequest {
124 /// The kind of queued request we're going to send.
125 pub kind: QueuedRequestKind,
126
127 /// Unique transaction id for the queued request, acting as a key.
128 pub transaction_id: OwnedTransactionId,
129
130 /// Error returned when the request couldn't be sent and is stuck in the
131 /// unrecoverable state.
132 ///
133 /// `None` if the request is in the queue, waiting to be sent.
134 pub error: Option<QueueWedgeError>,
135
136 /// At which priority should this be handled?
137 ///
138 /// The bigger the value, the higher the priority at which this request
139 /// should be handled.
140 pub priority: usize,
141
142 /// The time that the request was originally attempted.
143 pub created_at: MilliSecondsSinceUnixEpoch,
144}
145
146impl QueuedRequest {
147 /// Returns `Some` if the queued request is about sending an event.
148 pub fn as_event(&self) -> Option<&SerializableEventContent> {
149 as_variant!(&self.kind, QueuedRequestKind::Event { content } => content)
150 }
151
152 /// True if the request couldn't be sent because of an unrecoverable API
153 /// error. See [`Self::error`] for more details on the reason.
154 pub fn is_wedged(&self) -> bool {
155 self.error.is_some()
156 }
157}
158
159/// Represents a failed to send unrecoverable error of an event sent via the
160/// send queue.
161///
162/// It is a serializable representation of a client error, see
163/// `From` implementation for more details. These errors can not be
164/// automatically retried, but yet some manual action can be taken before retry
165/// sending. If not the only solution is to delete the local event.
166#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
167pub enum QueueWedgeError {
168 /// This error occurs when there are some insecure devices in the room, and
169 /// the current encryption setting prohibits sharing with them.
170 #[error("There are insecure devices in the room")]
171 InsecureDevices {
172 /// The insecure devices as a Map of userID to deviceID.
173 user_device_map: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
174 },
175
176 /// This error occurs when a previously verified user is not anymore, and
177 /// the current encryption setting prohibits sharing when it happens.
178 #[error("Some users that were previously verified are not anymore")]
179 IdentityViolations {
180 /// The users that are expected to be verified but are not.
181 users: Vec<OwnedUserId>,
182 },
183
184 /// It is required to set up cross-signing and properly verify the current
185 /// session before sending.
186 #[error("Own verification is required")]
187 CrossVerificationRequired,
188
189 /// Media content was cached in the media store, but has disappeared before
190 /// we could upload it.
191 #[error("Media content disappeared")]
192 MissingMediaContent,
193
194 /// We tried to upload some media content with an unknown mime type.
195 #[error("Invalid mime type '{mime_type}' for media")]
196 InvalidMimeType {
197 /// The observed mime type that's expected to be invalid.
198 mime_type: String,
199 },
200
201 /// Other errors.
202 #[error("Other unrecoverable error: {msg}")]
203 GenericApiError {
204 /// Description of the error.
205 msg: String,
206 },
207}
208
209/// The specific user intent that characterizes a [`DependentQueuedRequest`].
210#[derive(Clone, Debug, Serialize, Deserialize)]
211pub enum DependentQueuedRequestKind {
212 /// The event should be edited.
213 EditEvent {
214 /// The new event for the content.
215 new_content: SerializableEventContent,
216 },
217
218 /// The event should be redacted/aborted/removed.
219 RedactEvent,
220
221 /// The event should be reacted to, with the given key.
222 ReactEvent {
223 /// Key used for the reaction.
224 key: String,
225 },
226
227 /// Upload a file or thumbnail depending on another file or thumbnail
228 /// upload.
229 #[serde(alias = "UploadFileWithThumbnail")]
230 UploadFileOrThumbnail {
231 /// Content type for the file or thumbnail.
232 content_type: String,
233
234 /// Media request necessary to retrieve the file or thumbnail itself.
235 cache_key: MediaRequestParameters,
236
237 /// To which media transaction id does this upload relate to?
238 related_to: OwnedTransactionId,
239
240 /// Whether the depended upon request was a thumbnail or a file upload.
241 #[cfg(feature = "unstable-msc4274")]
242 #[serde(default = "default_parent_is_thumbnail_upload")]
243 parent_is_thumbnail_upload: bool,
244 },
245
246 /// Finish an upload by updating references to the media cache and sending
247 /// the final media event with the remote MXC URIs.
248 FinishUpload {
249 /// Local echo for the event (containing the local MXC URIs).
250 ///
251 /// `Box` the local echo so that it reduces the size of the whole enum.
252 local_echo: Box<RoomMessageEventContent>,
253
254 /// Transaction id for the file upload.
255 file_upload: OwnedTransactionId,
256
257 /// Information about the thumbnail, if present.
258 thumbnail_info: Option<FinishUploadThumbnailInfo>,
259 },
260
261 /// Finish a gallery upload by updating references to the media cache and
262 /// sending the final gallery event with the remote MXC URIs.
263 #[cfg(feature = "unstable-msc4274")]
264 FinishGallery {
265 /// Local echo for the event (containing the local MXC URIs).
266 ///
267 /// `Box` the local echo so that it reduces the size of the whole enum.
268 local_echo: Box<RoomMessageEventContent>,
269
270 /// Metadata about the gallery items.
271 item_infos: Vec<FinishGalleryItemInfo>,
272 },
273}
274
275/// If parent_is_thumbnail_upload is missing, we assume the request is for a
276/// file upload following a thumbnail upload. This was the only possible case
277/// before parent_is_thumbnail_upload was introduced.
278#[cfg(feature = "unstable-msc4274")]
279fn default_parent_is_thumbnail_upload() -> bool {
280 true
281}
282
283/// Detailed record about a thumbnail used when finishing a media upload.
284#[derive(Clone, Debug, Serialize, Deserialize)]
285pub struct FinishUploadThumbnailInfo {
286 /// Transaction id for the thumbnail upload.
287 pub txn: OwnedTransactionId,
288 /// Thumbnail's width.
289 ///
290 /// Used previously, kept for backwards compatibility.
291 #[serde(default, skip_serializing_if = "Option::is_none")]
292 pub width: Option<UInt>,
293 /// Thumbnail's height.
294 ///
295 /// Used previously, kept for backwards compatibility.
296 #[serde(default, skip_serializing_if = "Option::is_none")]
297 pub height: Option<UInt>,
298}
299
300/// Detailed record about a file and thumbnail. When finishing a gallery
301/// upload, one [`FinishGalleryItemInfo`] will be used for each media in the
302/// gallery.
303#[cfg(feature = "unstable-msc4274")]
304#[derive(Clone, Debug, Serialize, Deserialize)]
305pub struct FinishGalleryItemInfo {
306 /// Transaction id for the file upload.
307 pub file_upload: OwnedTransactionId,
308 /// Information about the thumbnail, if present.
309 pub thumbnail_info: Option<FinishUploadThumbnailInfo>,
310}
311
312/// A transaction id identifying a [`DependentQueuedRequest`] rather than its
313/// parent [`QueuedRequest`].
314///
315/// This thin wrapper adds some safety to some APIs, making it possible to
316/// distinguish between the parent's `TransactionId` and the dependent event's
317/// own `TransactionId`.
318#[repr(transparent)]
319#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
320#[serde(transparent)]
321pub struct ChildTransactionId(OwnedTransactionId);
322
323impl ChildTransactionId {
324 /// Returns a new [`ChildTransactionId`].
325 #[allow(clippy::new_without_default)]
326 pub fn new() -> Self {
327 Self(TransactionId::new())
328 }
329}
330
331impl Deref for ChildTransactionId {
332 type Target = TransactionId;
333
334 fn deref(&self) -> &Self::Target {
335 &self.0
336 }
337}
338
339impl From<String> for ChildTransactionId {
340 fn from(val: String) -> Self {
341 Self(val.into())
342 }
343}
344
345impl From<ChildTransactionId> for OwnedTransactionId {
346 fn from(val: ChildTransactionId) -> Self {
347 val.0
348 }
349}
350
351impl From<OwnedTransactionId> for ChildTransactionId {
352 fn from(val: OwnedTransactionId) -> Self {
353 Self(val)
354 }
355}
356
357/// Information about a media (and its thumbnail) that have been sent to a
358/// homeserver.
359#[derive(Clone, Debug, Serialize, Deserialize)]
360pub struct SentMediaInfo {
361 /// File that was uploaded by this request.
362 ///
363 /// If the request related to a thumbnail upload, this contains the
364 /// thumbnail media source.
365 pub file: MediaSource,
366
367 /// Optional thumbnail previously uploaded, when uploading a file.
368 ///
369 /// When uploading a thumbnail, this is set to `None`.
370 pub thumbnail: Option<MediaSource>,
371
372 /// Accumulated list of infos for previously uploaded files and thumbnails
373 /// if used during a gallery transaction. Otherwise empty.
374 #[cfg(feature = "unstable-msc4274")]
375 #[serde(default)]
376 pub accumulated: Vec<AccumulatedSentMediaInfo>,
377}
378
379/// Accumulated information about a media (and its thumbnail) that have been
380/// sent to a homeserver.
381#[cfg(feature = "unstable-msc4274")]
382#[derive(Clone, Debug, Serialize, Deserialize)]
383pub struct AccumulatedSentMediaInfo {
384 /// File that was uploaded by this request.
385 ///
386 /// If the request related to a thumbnail upload, this contains the
387 /// thumbnail media source.
388 pub file: MediaSource,
389
390 /// Optional thumbnail previously uploaded, when uploading a file.
391 ///
392 /// When uploading a thumbnail, this is set to `None`.
393 pub thumbnail: Option<MediaSource>,
394}
395
396#[cfg(feature = "unstable-msc4274")]
397impl From<AccumulatedSentMediaInfo> for SentMediaInfo {
398 fn from(value: AccumulatedSentMediaInfo) -> Self {
399 Self { file: value.file, thumbnail: value.thumbnail, accumulated: vec![] }
400 }
401}
402
403/// A unique key (identifier) indicating that a transaction has been
404/// successfully sent to the server.
405///
406/// The owning child transactions can now be resolved.
407#[derive(Clone, Debug, Serialize, Deserialize)]
408pub enum SentRequestKey {
409 /// The parent transaction returned an event when it succeeded.
410 Event(OwnedEventId),
411
412 /// The parent transaction returned an uploaded resource URL.
413 Media(SentMediaInfo),
414}
415
416impl SentRequestKey {
417 /// Converts the current parent key into an event id, if possible.
418 pub fn into_event_id(self) -> Option<OwnedEventId> {
419 as_variant!(self, Self::Event)
420 }
421
422 /// Converts the current parent key into information about a sent media, if
423 /// possible.
424 pub fn into_media(self) -> Option<SentMediaInfo> {
425 as_variant!(self, Self::Media)
426 }
427}
428
429/// A request to be sent, depending on a [`QueuedRequest`] to be sent first.
430///
431/// Depending on whether the parent request has been sent or not, this will
432/// either update the local echo in the storage, or materialize an equivalent
433/// request implementing the user intent to the homeserver.
434#[derive(Clone, Debug, Serialize, Deserialize)]
435pub struct DependentQueuedRequest {
436 /// Unique identifier for this dependent queued request.
437 ///
438 /// Useful for deletion.
439 pub own_transaction_id: ChildTransactionId,
440
441 /// The kind of user intent.
442 pub kind: DependentQueuedRequestKind,
443
444 /// Transaction id for the parent's local echo / used in the server request.
445 ///
446 /// Note: this is the transaction id used for the depended-on request, i.e.
447 /// the one that was originally sent and that's being modified with this
448 /// dependent request.
449 pub parent_transaction_id: OwnedTransactionId,
450
451 /// If the parent request has been sent, the parent's request identifier
452 /// returned by the server once the local echo has been sent out.
453 pub parent_key: Option<SentRequestKey>,
454
455 /// The time that the request was originally attempted.
456 pub created_at: MilliSecondsSinceUnixEpoch,
457}
458
459impl DependentQueuedRequest {
460 /// Does the dependent request represent a new event that is *not*
461 /// aggregated, aka it is going to be its own item in a timeline?
462 pub fn is_own_event(&self) -> bool {
463 match self.kind {
464 DependentQueuedRequestKind::EditEvent { .. }
465 | DependentQueuedRequestKind::RedactEvent
466 | DependentQueuedRequestKind::ReactEvent { .. }
467 | DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
468 // These are all aggregated events, or non-visible items (file upload producing
469 // a new MXC ID).
470 false
471 }
472 DependentQueuedRequestKind::FinishUpload { .. } => {
473 // This one graduates into a new media event.
474 true
475 }
476 #[cfg(feature = "unstable-msc4274")]
477 DependentQueuedRequestKind::FinishGallery { .. } => {
478 // This one graduates into a new gallery event.
479 true
480 }
481 }
482 }
483}
484
485#[cfg(not(tarpaulin_include))]
486impl fmt::Debug for QueuedRequest {
487 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
488 // Hide the content from the debug log.
489 f.debug_struct("QueuedRequest")
490 .field("transaction_id", &self.transaction_id)
491 .field("is_wedged", &self.is_wedged())
492 .finish_non_exhaustive()
493 }
494}