use std::{
collections::{BTreeMap, HashMap},
str::FromStr as _,
sync::{
Arc, RwLock,
atomic::{AtomicBool, Ordering},
},
};
use eyeball::SharedObservable;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_base::crypto::{OlmError, SessionRecipientCollectionError};
#[cfg(feature = "unstable-msc4274")]
use matrix_sdk_base::store::FinishGalleryItemInfo;
use matrix_sdk_base::{
RoomState, StoreError,
cross_process_lock::CrossProcessLockError,
deserialized_responses::TimelineEvent,
event_cache::store::EventCacheStoreError,
media::{MediaRequestParameters, store::MediaStoreError},
store::{
ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
SentMediaInfo, SentRequestKey, SerializableEventContent,
},
};
use matrix_sdk_common::{
executor::{JoinHandle, spawn},
locks::Mutex as SyncMutex,
};
use mime::Mime;
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
TransactionId,
events::{
AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _,
reaction::ReactionEventContent,
relation::Annotation,
room::{
MediaSource,
message::{FormattedBody, RoomMessageEventContent},
},
},
serde::Raw,
};
use tokio::sync::{Mutex, Notify, OwnedMutexGuard, broadcast, oneshot};
use tracing::{debug, error, info, instrument, trace, warn};
use crate::{
Client, Media, Room, TransmissionProgress,
client::WeakClient,
config::RequestConfig,
error::RetryKind,
room::{WeakRoom, edit::EditedContent},
};
mod progress;
mod upload;
pub use progress::AbstractProgress;
pub struct SendQueue {
client: Client,
}
#[cfg(not(tarpaulin_include))]
impl std::fmt::Debug for SendQueue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SendQueue").finish_non_exhaustive()
}
}
impl SendQueue {
pub(super) fn new(client: Client) -> Self {
Self { client }
}
pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
if !self.is_enabled() {
return;
}
let room_ids =
self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
|err| {
warn!("error when loading rooms with unsent requests: {err}");
Vec::new()
},
);
for room_id in room_ids {
if let Some(room) = self.client.get_room(&room_id) {
let _ = self.for_room(room);
}
}
}
#[inline(always)]
fn data(&self) -> &SendQueueData {
&self.client.inner.send_queue_data
}
pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue {
let data = self.data();
let mut map = data.rooms.write().unwrap();
let room_id = room.room_id();
if let Some(room_q) = map.get(room_id).cloned() {
return room_q;
}
let owned_room_id = room_id.to_owned();
let room_q = RoomSendQueue::new(
self.is_enabled(),
data.global_update_sender.clone(),
data.error_sender.clone(),
data.is_dropping.clone(),
&self.client,
owned_room_id.clone(),
data.report_media_upload_progress.clone(),
);
map.insert(owned_room_id, room_q.clone());
room_q
}
pub async fn set_enabled(&self, enabled: bool) {
debug!(?enabled, "setting global send queue enablement");
self.data().globally_enabled.store(enabled, Ordering::SeqCst);
for room in self.data().rooms.read().unwrap().values() {
room.set_enabled(enabled);
}
self.respawn_tasks_for_rooms_with_unsent_requests().await;
}
pub fn is_enabled(&self) -> bool {
self.data().globally_enabled.load(Ordering::SeqCst)
}
pub fn enable_upload_progress(&self, enabled: bool) {
self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst);
}
pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
self.data().global_update_sender.subscribe()
}
pub async fn local_echoes(
&self,
) -> Result<BTreeMap<OwnedRoomId, Vec<LocalEcho>>, RoomSendQueueError> {
let room_ids =
self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
|err| {
warn!("error when loading rooms with unsent requests: {err}");
Vec::new()
},
);
let mut local_echoes: BTreeMap<OwnedRoomId, Vec<LocalEcho>> = BTreeMap::new();
for room_id in room_ids {
if let Some(room) = self.client.get_room(&room_id) {
let queue = self.for_room(room);
local_echoes
.insert(room_id.to_owned(), queue.inner.queue.local_echoes(&queue).await?);
}
}
Ok(local_echoes)
}
pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
self.data().error_sender.subscribe()
}
}
#[derive(Clone, Debug)]
struct QueueThumbnailInfo {
finish_upload_thumbnail_info: FinishUploadThumbnailInfo,
media_request_parameters: MediaRequestParameters,
content_type: Mime,
file_size: usize,
}
#[derive(Clone, Debug)]
pub struct SendQueueRoomError {
pub room_id: OwnedRoomId,
pub error: Arc<crate::Error>,
pub is_recoverable: bool,
}
impl Client {
pub fn send_queue(&self) -> SendQueue {
SendQueue::new(self.clone())
}
}
pub(super) struct SendQueueData {
rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
globally_enabled: AtomicBool,
global_update_sender: broadcast::Sender<SendQueueUpdate>,
error_sender: broadcast::Sender<SendQueueRoomError>,
is_dropping: Arc<AtomicBool>,
report_media_upload_progress: Arc<AtomicBool>,
}
impl SendQueueData {
pub fn new(globally_enabled: bool) -> Self {
let (global_update_sender, _) = broadcast::channel(32);
let (error_sender, _) = broadcast::channel(32);
Self {
rooms: Default::default(),
globally_enabled: AtomicBool::new(globally_enabled),
global_update_sender,
error_sender,
is_dropping: Arc::new(false.into()),
report_media_upload_progress: Arc::new(false.into()),
}
}
}
impl Drop for SendQueueData {
fn drop(&mut self) {
debug!("globally dropping the send queue");
self.is_dropping.store(true, Ordering::SeqCst);
let rooms = self.rooms.read().unwrap();
for room in rooms.values() {
room.inner.notifier.notify_one();
}
}
}
impl Room {
pub fn send_queue(&self) -> RoomSendQueue {
self.client.send_queue().for_room(self.clone())
}
}
#[derive(Clone)]
pub struct RoomSendQueue {
inner: Arc<RoomSendQueueInner>,
}
#[cfg(not(tarpaulin_include))]
impl std::fmt::Debug for RoomSendQueue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RoomSendQueue").finish_non_exhaustive()
}
}
impl RoomSendQueue {
fn new(
globally_enabled: bool,
global_update_sender: broadcast::Sender<SendQueueUpdate>,
global_error_sender: broadcast::Sender<SendQueueRoomError>,
is_dropping: Arc<AtomicBool>,
client: &Client,
room_id: OwnedRoomId,
report_media_upload_progress: Arc<AtomicBool>,
) -> Self {
let (update_sender, _) = broadcast::channel(32);
let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
let notifier = Arc::new(Notify::new());
let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
let task = spawn(Self::sending_task(
weak_room.clone(),
queue.clone(),
notifier.clone(),
global_update_sender.clone(),
update_sender.clone(),
locally_enabled.clone(),
global_error_sender,
is_dropping,
report_media_upload_progress,
));
Self {
inner: Arc::new(RoomSendQueueInner {
room: weak_room,
global_update_sender,
update_sender,
_task: task,
queue,
notifier,
locally_enabled,
}),
}
}
pub async fn send_raw(
&self,
content: Raw<AnyMessageLikeEventContent>,
event_type: String,
) -> Result<SendHandle, RoomSendQueueError> {
let Some(room) = self.inner.room.get() else {
return Err(RoomSendQueueError::RoomDisappeared);
};
if room.state() != RoomState::Joined {
return Err(RoomSendQueueError::RoomNotJoined);
}
let content = SerializableEventContent::from_raw(content, event_type);
let created_at = MilliSecondsSinceUnixEpoch::now();
let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
trace!(%transaction_id, "manager sends a raw event to the background task");
self.inner.notifier.notify_one();
let send_handle = SendHandle {
room: self.clone(),
transaction_id: transaction_id.clone(),
media_handles: vec![],
created_at,
};
self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id,
content: LocalEchoContent::Event {
serialized_event: content,
send_handle: send_handle.clone(),
send_error: None,
},
}));
Ok(send_handle)
}
pub async fn send(
&self,
content: AnyMessageLikeEventContent,
) -> Result<SendHandle, RoomSendQueueError> {
self.send_raw(
Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
content.event_type().to_string(),
)
.await
}
pub async fn subscribe(
&self,
) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
{
let local_echoes = self.inner.queue.local_echoes(self).await?;
Ok((local_echoes, self.inner.update_sender.subscribe()))
}
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(room_id = %room.room_id()))]
async fn sending_task(
room: WeakRoom,
queue: QueueStorage,
notifier: Arc<Notify>,
global_update_sender: broadcast::Sender<SendQueueUpdate>,
update_sender: broadcast::Sender<RoomSendQueueUpdate>,
locally_enabled: Arc<AtomicBool>,
global_error_sender: broadcast::Sender<SendQueueRoomError>,
is_dropping: Arc<AtomicBool>,
report_media_upload_progress: Arc<AtomicBool>,
) {
trace!("spawned the sending task");
let room_id = room.room_id();
loop {
if is_dropping.load(Ordering::SeqCst) {
trace!("shutting down!");
break;
}
let mut new_updates = Vec::new();
if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
warn!("errors when applying dependent requests: {err}");
}
for up in new_updates {
send_update(&global_update_sender, &update_sender, room_id, up);
}
if !locally_enabled.load(Ordering::SeqCst) {
trace!("not enabled, sleeping");
notifier.notified().await;
continue;
}
let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
Ok(Some(request)) => request,
Ok(None) => {
trace!("queue is empty, sleeping");
notifier.notified().await;
continue;
}
Err(err) => {
warn!("error when loading next request to send: {err}");
continue;
}
};
let txn_id = queued_request.transaction_id.clone();
trace!(txn_id = %txn_id, "received a request to send!");
let Some(room) = room.get() else {
if is_dropping.load(Ordering::SeqCst) {
break;
}
error!("the weak room couldn't be upgraded but we're not shutting down?");
continue;
};
let (related_txn_id, media_upload_progress_info, http_progress) =
if let QueuedRequestKind::MediaUpload {
cache_key,
thumbnail_source,
#[cfg(feature = "unstable-msc4274")]
accumulated,
related_to,
..
} = &queued_request.kind
{
let (media_upload_progress_info, http_progress) =
if report_media_upload_progress.load(Ordering::SeqCst) {
let media_upload_progress_info =
RoomSendQueue::create_media_upload_progress_info(
&queued_request.transaction_id,
related_to,
cache_key,
thumbnail_source.as_ref(),
#[cfg(feature = "unstable-msc4274")]
accumulated,
&room,
&queue,
)
.await;
let progress = RoomSendQueue::create_media_upload_progress_observable(
&media_upload_progress_info,
related_to,
&update_sender,
);
(Some(media_upload_progress_info), Some(progress))
} else {
Default::default()
};
(Some(related_to.clone()), media_upload_progress_info, http_progress)
} else {
Default::default()
};
match Self::handle_request(&room, queued_request, cancel_upload_rx, http_progress).await
{
Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
{
Ok(()) => match parent_key {
SentRequestKey::Event { event_id, event, event_type } => {
send_update(
&global_update_sender,
&update_sender,
room_id,
RoomSendQueueUpdate::SentEvent {
transaction_id: txn_id,
event_id: event_id.clone(),
},
);
if let Ok((room_event_cache, _drop_handles)) = room.event_cache().await
{
let timeline_event = match event_type.as_str() {
#[cfg(feature = "e2e-encryption")]
"m.room.encrypted" => {
use ruma::events::{
OriginalSyncMessageLikeEvent,
room::encrypted::RoomEncryptedEventContent,
};
let push_context = room.push_context().await.ok().flatten();
let event: Raw<AnyMessageLikeEventContent> = event;
let event: Raw<
OriginalSyncMessageLikeEvent<RoomEncryptedEventContent>,
> = event.cast_unchecked();
match room
.decrypt_event(&event, push_context.as_ref())
.await
{
Ok(timeline_event) => Some(timeline_event),
Err(err) => {
error!(
?err,
"Failed to decrypt the event before the saving in the Event Cache"
);
None
}
}
}
event_type => {
match Raw::from_json_string(
format!(
"{{\
\"event_id\":\"{event_id}\",\
\"origin_server_ts\":{ts},\
\"sender\":\"{sender}\",\
\"type\":\"{type}\",\
\"content\":{content}\
}}",
event_id = event_id,
ts = MilliSecondsSinceUnixEpoch::now().get(),
sender = room.client().user_id().expect("Client must be logged-in"),
type = event_type,
content = event.into_json(),
),
) {
Ok(event) => Some(TimelineEvent::from_plaintext(event)),
Err(err) => {
error!(
?err,
"Failed to build the (sync) event before the saving in the Event Cache"
);
None
}
}
}
};
if let Some(timeline_event) = timeline_event
&& let Err(err) = room_event_cache
.insert_sent_event_from_send_queue(timeline_event)
.await
{
error!(
?err,
"Failed to save the sent event in the Event Cache"
);
}
} else {
info!(
"Cannot insert the sent event in the Event Cache because \
either the room no longer exists, or the Room Event Cache cannot be retrieved"
);
}
}
SentRequestKey::Media(sent_media_info) => {
let index =
media_upload_progress_info.as_ref().map_or(0, |info| info.index);
let progress = media_upload_progress_info
.as_ref()
.map(|info| {
AbstractProgress { current: info.bytes, total: info.bytes }
+ info.offsets
})
.unwrap_or(AbstractProgress { current: 1, total: 1 });
let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
file: Some(sent_media_info.file),
index,
progress,
});
}
},
Err(err) => {
warn!("unable to mark queued request as sent: {err}");
}
},
Ok(None) => {
debug!("Request has been aborted while running, continuing.");
}
Err(err) => {
let is_recoverable = match err {
crate::Error::Http(ref http_err) => {
matches!(
http_err.retry_kind(),
RetryKind::Transient { .. } | RetryKind::NetworkFailure
)
}
crate::Error::ConcurrentRequestFailed => true,
_ => false,
};
locally_enabled.store(false, Ordering::SeqCst);
if is_recoverable {
warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
queue.mark_as_not_being_sent(&txn_id).await;
} else {
warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
if let Err(storage_error) =
queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
{
warn!("unable to mark request as wedged: {storage_error}");
}
}
let error = Arc::new(err);
let _ = global_error_sender.send(SendQueueRoomError {
room_id: room_id.to_owned(),
error: error.clone(),
is_recoverable,
});
send_update(
&global_update_sender,
&update_sender,
room_id,
RoomSendQueueUpdate::SendError {
transaction_id: related_txn_id.unwrap_or(txn_id),
error,
is_recoverable,
},
);
}
}
}
info!("exited sending task");
}
async fn handle_request(
room: &Room,
request: QueuedRequest,
cancel_upload_rx: Option<oneshot::Receiver<()>>,
progress: Option<SharedObservable<TransmissionProgress>>,
) -> Result<Option<SentRequestKey>, crate::Error> {
match request.kind {
QueuedRequestKind::Event { content } => {
let (event, event_type) = content.into_raw();
let res = room
.send_raw(&event_type, &event)
.with_transaction_id(&request.transaction_id)
.with_request_config(RequestConfig::short_retry())
.await?;
trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
Ok(Some(SentRequestKey::Event { event_id: res.event_id, event, event_type }))
}
QueuedRequestKind::MediaUpload {
content_type,
cache_key,
thumbnail_source,
related_to: relates_to,
#[cfg(feature = "unstable-msc4274")]
accumulated,
} => {
trace!(%relates_to, "uploading media related to event");
let fut = async move {
let data = room
.client()
.media_store()
.lock()
.await?
.get_media_content(&cache_key)
.await?
.ok_or(crate::Error::SendQueueWedgeError(Box::new(
QueueWedgeError::MissingMediaContent,
)))?;
let mime = Mime::from_str(&content_type).map_err(|_| {
crate::Error::SendQueueWedgeError(Box::new(
QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
))
})?;
#[cfg(feature = "e2e-encryption")]
let media_source = if room.latest_encryption_state().await?.is_encrypted() {
trace!("upload will be encrypted (encrypted room)");
let mut cursor = std::io::Cursor::new(data);
let mut req = room
.client
.upload_encrypted_file(&mut cursor)
.with_request_config(RequestConfig::short_retry());
if let Some(progress) = progress {
req = req.with_send_progress_observable(progress);
}
let encrypted_file = req.await?;
MediaSource::Encrypted(Box::new(encrypted_file))
} else {
trace!("upload will be in clear text (room without encryption)");
let request_config = RequestConfig::short_retry()
.timeout(Media::reasonable_upload_timeout(&data));
let mut req =
room.client().media().upload(&mime, data, Some(request_config));
if let Some(progress) = progress {
req = req.with_send_progress_observable(progress);
}
let res = req.await?;
MediaSource::Plain(res.content_uri)
};
#[cfg(not(feature = "e2e-encryption"))]
let media_source = {
let request_config = RequestConfig::short_retry()
.timeout(Media::reasonable_upload_timeout(&data));
let mut req =
room.client().media().upload(&mime, data, Some(request_config));
if let Some(progress) = progress {
req = req.with_send_progress_observable(progress);
}
let res = req.await?;
MediaSource::Plain(res.content_uri)
};
let uri = match &media_source {
MediaSource::Plain(uri) => uri,
MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
};
trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
Ok(SentRequestKey::Media(SentMediaInfo {
file: media_source,
thumbnail: thumbnail_source,
#[cfg(feature = "unstable-msc4274")]
accumulated,
}))
};
let wait_for_cancel = async move {
if let Some(rx) = cancel_upload_rx {
rx.await
} else {
std::future::pending().await
}
};
tokio::select! {
biased;
_ = wait_for_cancel => {
Ok(None)
}
res = fut => {
res.map(Some)
}
}
}
}
}
pub fn is_enabled(&self) -> bool {
self.inner.locally_enabled.load(Ordering::SeqCst)
}
pub fn set_enabled(&self, enabled: bool) {
self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
if enabled {
self.inner.notifier.notify_one();
}
}
fn send_update(&self, update: RoomSendQueueUpdate) {
let _ = self.inner.update_sender.send(update.clone());
let _ = self
.inner
.global_update_sender
.send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
}
}
fn send_update(
global_update_sender: &broadcast::Sender<SendQueueUpdate>,
update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
room_id: &RoomId,
update: RoomSendQueueUpdate,
) {
let _ = update_sender.send(update.clone());
let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
}
impl From<&crate::Error> for QueueWedgeError {
fn from(value: &crate::Error) -> Self {
match value {
#[cfg(feature = "e2e-encryption")]
crate::Error::OlmError(error) => match &**error {
OlmError::SessionRecipientCollectionError(error) => match error {
SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
}
SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
QueueWedgeError::IdentityViolations { users: users.clone() }
}
SessionRecipientCollectionError::CrossSigningNotSetup
| SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
QueueWedgeError::CrossVerificationRequired
}
},
_ => QueueWedgeError::GenericApiError { msg: value.to_string() },
},
crate::Error::SendQueueWedgeError(error) => *error.clone(),
_ => QueueWedgeError::GenericApiError { msg: value.to_string() },
}
}
}
struct RoomSendQueueInner {
room: WeakRoom,
global_update_sender: broadcast::Sender<SendQueueUpdate>,
update_sender: broadcast::Sender<RoomSendQueueUpdate>,
queue: QueueStorage,
notifier: Arc<Notify>,
locally_enabled: Arc<AtomicBool>,
_task: JoinHandle<()>,
}
struct BeingSentInfo {
transaction_id: OwnedTransactionId,
cancel_upload: Option<oneshot::Sender<()>>,
}
impl BeingSentInfo {
fn cancel_upload(self) -> bool {
if let Some(cancel_upload) = self.cancel_upload {
let _ = cancel_upload.send(());
true
} else {
false
}
}
}
#[derive(Clone)]
struct StoreLock {
client: WeakClient,
being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
}
impl StoreLock {
async fn lock(&self) -> StoreLockGuard {
StoreLockGuard {
client: self.client.clone(),
being_sent: self.being_sent.clone().lock_owned().await,
}
}
}
struct StoreLockGuard {
client: WeakClient,
being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
}
impl StoreLockGuard {
fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
}
}
#[derive(Clone)]
struct QueueStorage {
store: StoreLock,
room_id: OwnedRoomId,
thumbnail_file_sizes: Arc<SyncMutex<HashMap<OwnedTransactionId, Vec<Option<usize>>>>>,
}
impl QueueStorage {
const LOW_PRIORITY: usize = 0;
const HIGH_PRIORITY: usize = 10;
fn new(client: WeakClient, room: OwnedRoomId) -> Self {
Self {
room_id: room,
store: StoreLock { client, being_sent: Default::default() },
thumbnail_file_sizes: Default::default(),
}
}
async fn push(
&self,
request: QueuedRequestKind,
created_at: MilliSecondsSinceUnixEpoch,
) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
let transaction_id = TransactionId::new();
self.store
.lock()
.await
.client()?
.state_store()
.save_send_queue_request(
&self.room_id,
transaction_id.clone(),
created_at,
request,
Self::LOW_PRIORITY,
)
.await?;
Ok(transaction_id)
}
async fn peek_next_to_send(
&self,
) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
{
let mut guard = self.store.lock().await;
let queued_requests =
guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
let (cancel_upload_tx, cancel_upload_rx) =
if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
let (tx, rx) = oneshot::channel();
(Some(tx), Some(rx))
} else {
Default::default()
};
let prev = guard.being_sent.replace(BeingSentInfo {
transaction_id: request.transaction_id.clone(),
cancel_upload: cancel_upload_tx,
});
if let Some(prev) = prev {
error!(
prev_txn = ?prev.transaction_id,
"a previous request was still active while picking a new one"
);
}
Ok(Some((request.clone(), cancel_upload_rx)))
} else {
Ok(None)
}
}
async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
let was_being_sent = self.store.lock().await.being_sent.take();
let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
if prev_txn != Some(transaction_id) {
error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
}
}
async fn mark_as_wedged(
&self,
transaction_id: &TransactionId,
reason: QueueWedgeError,
) -> Result<(), RoomSendQueueStorageError> {
let mut guard = self.store.lock().await;
let was_being_sent = guard.being_sent.take();
let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
if prev_txn != Some(transaction_id) {
error!(
?prev_txn,
"previous active request didn't match that we expect (after permanent error)",
);
}
Ok(guard
.client()?
.state_store()
.update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
.await?)
}
async fn mark_as_unwedged(
&self,
transaction_id: &TransactionId,
) -> Result<(), RoomSendQueueStorageError> {
Ok(self
.store
.lock()
.await
.client()?
.state_store()
.update_send_queue_request_status(&self.room_id, transaction_id, None)
.await?)
}
async fn mark_as_sent(
&self,
transaction_id: &TransactionId,
parent_key: SentRequestKey,
) -> Result<(), RoomSendQueueStorageError> {
let mut guard = self.store.lock().await;
let was_being_sent = guard.being_sent.take();
let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
if prev_txn != Some(transaction_id) {
error!(
?prev_txn,
"previous active request didn't match that we expect (after successful send)",
);
}
let client = guard.client()?;
let store = client.state_store();
store
.mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
.await?;
let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
if !removed {
warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
}
self.thumbnail_file_sizes.lock().remove(transaction_id);
Ok(())
}
async fn cancel_event(
&self,
transaction_id: &TransactionId,
) -> Result<bool, RoomSendQueueStorageError> {
let guard = self.store.lock().await;
if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
== Some(transaction_id)
{
guard
.client()?
.state_store()
.save_dependent_queued_request(
&self.room_id,
transaction_id,
ChildTransactionId::new(),
MilliSecondsSinceUnixEpoch::now(),
DependentQueuedRequestKind::RedactEvent,
)
.await?;
return Ok(true);
}
let removed = guard
.client()?
.state_store()
.remove_send_queue_request(&self.room_id, transaction_id)
.await?;
self.thumbnail_file_sizes.lock().remove(transaction_id);
Ok(removed)
}
async fn replace_event(
&self,
transaction_id: &TransactionId,
serializable: SerializableEventContent,
) -> Result<bool, RoomSendQueueStorageError> {
let guard = self.store.lock().await;
if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
== Some(transaction_id)
{
guard
.client()?
.state_store()
.save_dependent_queued_request(
&self.room_id,
transaction_id,
ChildTransactionId::new(),
MilliSecondsSinceUnixEpoch::now(),
DependentQueuedRequestKind::EditEvent { new_content: serializable },
)
.await?;
return Ok(true);
}
let edited = guard
.client()?
.state_store()
.update_send_queue_request(&self.room_id, transaction_id, serializable.into())
.await?;
Ok(edited)
}
#[allow(clippy::too_many_arguments)]
async fn push_media(
&self,
event: RoomMessageEventContent,
content_type: Mime,
send_event_txn: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
upload_file_txn: OwnedTransactionId,
file_media_request: MediaRequestParameters,
thumbnail: Option<QueueThumbnailInfo>,
) -> Result<(), RoomSendQueueStorageError> {
let guard = self.store.lock().await;
let client = guard.client()?;
let store = client.state_store();
let thumbnail_file_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)];
let thumbnail_info = self
.push_thumbnail_and_media_uploads(
store,
&content_type,
send_event_txn.clone(),
created_at,
upload_file_txn.clone(),
file_media_request,
thumbnail,
)
.await?;
store
.save_dependent_queued_request(
&self.room_id,
&upload_file_txn,
send_event_txn.clone().into(),
created_at,
DependentQueuedRequestKind::FinishUpload {
local_echo: Box::new(event),
file_upload: upload_file_txn.clone(),
thumbnail_info,
},
)
.await?;
self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
Ok(())
}
#[cfg(feature = "unstable-msc4274")]
#[allow(clippy::too_many_arguments)]
async fn push_gallery(
&self,
event: RoomMessageEventContent,
send_event_txn: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
item_queue_infos: Vec<GalleryItemQueueInfo>,
) -> Result<(), RoomSendQueueStorageError> {
let guard = self.store.lock().await;
let client = guard.client()?;
let store = client.state_store();
let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
let mut thumbnail_file_sizes = Vec::with_capacity(item_queue_infos.len());
let Some((first, rest)) = item_queue_infos.split_first() else {
return Ok(());
};
let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
first;
let thumbnail_info = self
.push_thumbnail_and_media_uploads(
store,
content_type,
send_event_txn.clone(),
created_at,
upload_file_txn.clone(),
file_media_request.clone(),
thumbnail.clone(),
)
.await?;
finish_item_infos
.push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
let mut last_upload_file_txn = upload_file_txn.clone();
for item_queue_info in rest {
let GalleryItemQueueInfo {
content_type,
upload_file_txn,
file_media_request,
thumbnail,
} = item_queue_info;
let thumbnail_info = if let Some(QueueThumbnailInfo {
finish_upload_thumbnail_info: thumbnail_info,
media_request_parameters: thumbnail_media_request,
content_type: thumbnail_content_type,
..
}) = thumbnail
{
let upload_thumbnail_txn = thumbnail_info.txn.clone();
store
.save_dependent_queued_request(
&self.room_id,
&last_upload_file_txn,
upload_thumbnail_txn.clone().into(),
created_at,
DependentQueuedRequestKind::UploadFileOrThumbnail {
content_type: thumbnail_content_type.to_string(),
cache_key: thumbnail_media_request.clone(),
related_to: send_event_txn.clone(),
parent_is_thumbnail_upload: false,
},
)
.await?;
last_upload_file_txn = upload_thumbnail_txn;
Some(thumbnail_info)
} else {
None
};
store
.save_dependent_queued_request(
&self.room_id,
&last_upload_file_txn,
upload_file_txn.clone().into(),
created_at,
DependentQueuedRequestKind::UploadFileOrThumbnail {
content_type: content_type.to_string(),
cache_key: file_media_request.clone(),
related_to: send_event_txn.clone(),
parent_is_thumbnail_upload: thumbnail.is_some(),
},
)
.await?;
finish_item_infos.push(FinishGalleryItemInfo {
file_upload: upload_file_txn.clone(),
thumbnail_info: thumbnail_info.cloned(),
});
thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
last_upload_file_txn = upload_file_txn.clone();
}
store
.save_dependent_queued_request(
&self.room_id,
&last_upload_file_txn,
send_event_txn.clone().into(),
created_at,
DependentQueuedRequestKind::FinishGallery {
local_echo: Box::new(event),
item_infos: finish_item_infos,
},
)
.await?;
self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn push_thumbnail_and_media_uploads(
&self,
store: &DynStateStore,
content_type: &Mime,
send_event_txn: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
upload_file_txn: OwnedTransactionId,
file_media_request: MediaRequestParameters,
thumbnail: Option<QueueThumbnailInfo>,
) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
if let Some(QueueThumbnailInfo {
finish_upload_thumbnail_info: thumbnail_info,
media_request_parameters: thumbnail_media_request,
content_type: thumbnail_content_type,
..
}) = thumbnail
{
let upload_thumbnail_txn = thumbnail_info.txn.clone();
store
.save_send_queue_request(
&self.room_id,
upload_thumbnail_txn.clone(),
created_at,
QueuedRequestKind::MediaUpload {
content_type: thumbnail_content_type.to_string(),
cache_key: thumbnail_media_request,
thumbnail_source: None, related_to: send_event_txn.clone(),
#[cfg(feature = "unstable-msc4274")]
accumulated: vec![],
},
Self::LOW_PRIORITY,
)
.await?;
store
.save_dependent_queued_request(
&self.room_id,
&upload_thumbnail_txn,
upload_file_txn.into(),
created_at,
DependentQueuedRequestKind::UploadFileOrThumbnail {
content_type: content_type.to_string(),
cache_key: file_media_request,
related_to: send_event_txn,
parent_is_thumbnail_upload: true,
},
)
.await?;
Ok(Some(thumbnail_info))
} else {
store
.save_send_queue_request(
&self.room_id,
upload_file_txn,
created_at,
QueuedRequestKind::MediaUpload {
content_type: content_type.to_string(),
cache_key: file_media_request,
thumbnail_source: None,
related_to: send_event_txn,
#[cfg(feature = "unstable-msc4274")]
accumulated: vec![],
},
Self::LOW_PRIORITY,
)
.await?;
Ok(None)
}
}
#[instrument(skip(self))]
async fn react(
&self,
transaction_id: &TransactionId,
key: String,
created_at: MilliSecondsSinceUnixEpoch,
) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
let guard = self.store.lock().await;
let client = guard.client()?;
let store = client.state_store();
let requests = store.load_send_queue_requests(&self.room_id).await?;
if !requests.iter().any(|item| item.transaction_id == transaction_id) {
let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
if !dependent_requests
.into_iter()
.filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
.any(|child_txn| *child_txn == *transaction_id)
{
return Ok(None);
}
}
let reaction_txn_id = ChildTransactionId::new();
store
.save_dependent_queued_request(
&self.room_id,
transaction_id,
reaction_txn_id.clone(),
created_at,
DependentQueuedRequestKind::ReactEvent { key },
)
.await?;
Ok(Some(reaction_txn_id))
}
async fn local_echoes(
&self,
room: &RoomSendQueue,
) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
let guard = self.store.lock().await;
let client = guard.client()?;
let store = client.state_store();
let local_requests =
store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
Some(LocalEcho {
transaction_id: queued.transaction_id.clone(),
content: match queued.kind {
QueuedRequestKind::Event { content } => LocalEchoContent::Event {
serialized_event: content,
send_handle: SendHandle {
room: room.clone(),
transaction_id: queued.transaction_id,
media_handles: vec![],
created_at: queued.created_at,
},
send_error: queued.error,
},
QueuedRequestKind::MediaUpload { .. } => {
return None;
}
},
})
});
let reactions_and_medias = store
.load_dependent_queued_requests(&self.room_id)
.await?
.into_iter()
.filter_map(|dep| match dep.kind {
DependentQueuedRequestKind::EditEvent { .. }
| DependentQueuedRequestKind::RedactEvent => {
None
}
DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
transaction_id: dep.own_transaction_id.clone().into(),
content: LocalEchoContent::React {
key,
send_handle: SendReactionHandle {
room: room.clone(),
transaction_id: dep.own_transaction_id,
},
applies_to: dep.parent_transaction_id,
},
}),
DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
None
}
DependentQueuedRequestKind::FinishUpload {
local_echo,
file_upload,
thumbnail_info,
} => {
Some(LocalEcho {
transaction_id: dep.own_transaction_id.clone().into(),
content: LocalEchoContent::Event {
serialized_event: SerializableEventContent::new(&(*local_echo).into())
.ok()?,
send_handle: SendHandle {
room: room.clone(),
transaction_id: dep.own_transaction_id.into(),
media_handles: vec![MediaHandles {
upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
upload_file_txn: file_upload,
}],
created_at: dep.created_at,
},
send_error: None,
},
})
}
#[cfg(feature = "unstable-msc4274")]
DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
self.create_gallery_local_echo(
dep.own_transaction_id,
room,
dep.created_at,
local_echo,
item_infos,
)
}
});
Ok(local_requests.chain(reactions_and_medias).collect())
}
#[cfg(feature = "unstable-msc4274")]
fn create_gallery_local_echo(
&self,
transaction_id: ChildTransactionId,
room: &RoomSendQueue,
created_at: MilliSecondsSinceUnixEpoch,
local_echo: Box<RoomMessageEventContent>,
item_infos: Vec<FinishGalleryItemInfo>,
) -> Option<LocalEcho> {
Some(LocalEcho {
transaction_id: transaction_id.clone().into(),
content: LocalEchoContent::Event {
serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
send_handle: SendHandle {
room: room.clone(),
transaction_id: transaction_id.into(),
media_handles: item_infos
.into_iter()
.map(|i| MediaHandles {
upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
upload_file_txn: i.file_upload,
})
.collect(),
created_at,
},
send_error: None,
},
})
}
#[instrument(skip_all)]
async fn try_apply_single_dependent_request(
&self,
client: &Client,
dependent_request: DependentQueuedRequest,
new_updates: &mut Vec<RoomSendQueueUpdate>,
) -> Result<bool, RoomSendQueueError> {
let store = client.state_store();
let parent_key = dependent_request.parent_key;
match dependent_request.kind {
DependentQueuedRequestKind::EditEvent { new_content } => {
if let Some(parent_key) = parent_key {
let Some(event_id) = parent_key.into_event_id() else {
return Err(RoomSendQueueError::StorageError(
RoomSendQueueStorageError::InvalidParentKey,
));
};
let room = client
.get_room(&self.room_id)
.ok_or(RoomSendQueueError::RoomDisappeared)?;
let edited_content = match new_content.deserialize() {
Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
EditedContent::RoomMessage(c.into())
}
Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
let poll_start = c.poll_start().clone();
EditedContent::PollStart {
fallback_text: poll_start.question.text.clone(),
new_content: poll_start,
}
}
Ok(c) => {
warn!("Unsupported edit content type: {:?}", c.event_type());
return Ok(true);
}
Err(err) => {
warn!("Unable to deserialize: {err}");
return Ok(true);
}
};
let edit_event = match room.make_edit_event(&event_id, edited_content).await {
Ok(e) => e,
Err(err) => {
warn!("couldn't create edited event: {err}");
return Ok(true);
}
};
let serializable = SerializableEventContent::from_raw(
Raw::new(&edit_event)
.map_err(RoomSendQueueStorageError::JsonSerialization)?,
edit_event.event_type().to_string(),
);
store
.save_send_queue_request(
&self.room_id,
dependent_request.own_transaction_id.into(),
dependent_request.created_at,
serializable.into(),
Self::HIGH_PRIORITY,
)
.await
.map_err(RoomSendQueueStorageError::StateStoreError)?;
} else {
let edited = store
.update_send_queue_request(
&self.room_id,
&dependent_request.parent_transaction_id,
new_content.into(),
)
.await
.map_err(RoomSendQueueStorageError::StateStoreError)?;
if !edited {
warn!("missing local echo upon dependent edit");
}
}
}
DependentQueuedRequestKind::RedactEvent => {
if let Some(parent_key) = parent_key {
let Some(event_id) = parent_key.into_event_id() else {
return Err(RoomSendQueueError::StorageError(
RoomSendQueueStorageError::InvalidParentKey,
));
};
let room = client
.get_room(&self.room_id)
.ok_or(RoomSendQueueError::RoomDisappeared)?;
if let Err(err) = room
.redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
.await
{
warn!("error when sending a redact for {event_id}: {err}");
return Ok(false);
}
} else {
let removed = store
.remove_send_queue_request(
&self.room_id,
&dependent_request.parent_transaction_id,
)
.await
.map_err(RoomSendQueueStorageError::StateStoreError)?;
if !removed {
warn!("missing local echo upon dependent redact");
}
}
}
DependentQueuedRequestKind::ReactEvent { key } => {
if let Some(parent_key) = parent_key {
let Some(parent_event_id) = parent_key.into_event_id() else {
return Err(RoomSendQueueError::StorageError(
RoomSendQueueStorageError::InvalidParentKey,
));
};
let react_event =
ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
let serializable = SerializableEventContent::from_raw(
Raw::new(&react_event)
.map_err(RoomSendQueueStorageError::JsonSerialization)?,
react_event.event_type().to_string(),
);
store
.save_send_queue_request(
&self.room_id,
dependent_request.own_transaction_id.into(),
dependent_request.created_at,
serializable.into(),
Self::HIGH_PRIORITY,
)
.await
.map_err(RoomSendQueueStorageError::StateStoreError)?;
} else {
return Ok(false);
}
}
DependentQueuedRequestKind::UploadFileOrThumbnail {
content_type,
cache_key,
related_to,
parent_is_thumbnail_upload,
} => {
let Some(parent_key) = parent_key else {
return Ok(false);
};
self.handle_dependent_file_or_thumbnail_upload(
client,
dependent_request.own_transaction_id.into(),
parent_key,
content_type,
cache_key,
related_to,
parent_is_thumbnail_upload,
)
.await?;
}
DependentQueuedRequestKind::FinishUpload {
local_echo,
file_upload,
thumbnail_info,
} => {
let Some(parent_key) = parent_key else {
return Ok(false);
};
self.handle_dependent_finish_upload(
client,
dependent_request.own_transaction_id.into(),
parent_key,
*local_echo,
file_upload,
thumbnail_info,
new_updates,
)
.await?;
}
#[cfg(feature = "unstable-msc4274")]
DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
let Some(parent_key) = parent_key else {
return Ok(false);
};
self.handle_dependent_finish_gallery_upload(
client,
dependent_request.own_transaction_id.into(),
parent_key,
*local_echo,
item_infos,
new_updates,
)
.await?;
}
}
Ok(true)
}
#[instrument(skip(self))]
async fn apply_dependent_requests(
&self,
new_updates: &mut Vec<RoomSendQueueUpdate>,
) -> Result<(), RoomSendQueueError> {
let guard = self.store.lock().await;
let client = guard.client()?;
let store = client.state_store();
let dependent_requests = store
.load_dependent_queued_requests(&self.room_id)
.await
.map_err(RoomSendQueueStorageError::StateStoreError)?;
let num_initial_dependent_requests = dependent_requests.len();
if num_initial_dependent_requests == 0 {
return Ok(());
}
let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
for original in &dependent_requests {
if !canonicalized_dependent_requests
.iter()
.any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
{
store
.remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
.await
.map_err(RoomSendQueueStorageError::StateStoreError)?;
}
}
let mut num_dependent_requests = canonicalized_dependent_requests.len();
debug!(
num_dependent_requests,
num_initial_dependent_requests, "starting handling of dependent requests"
);
for dependent in canonicalized_dependent_requests {
let dependent_id = dependent.own_transaction_id.clone();
match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
Ok(should_remove) => {
if should_remove {
store
.remove_dependent_queued_request(&self.room_id, &dependent_id)
.await
.map_err(RoomSendQueueStorageError::StateStoreError)?;
num_dependent_requests -= 1;
}
}
Err(err) => {
warn!("error when applying single dependent request: {err}");
}
}
}
debug!(
leftover_dependent_requests = num_dependent_requests,
"stopped handling dependent request"
);
Ok(())
}
async fn remove_dependent_send_queue_request(
&self,
dependent_event_id: &ChildTransactionId,
) -> Result<bool, RoomSendQueueStorageError> {
Ok(self
.store
.lock()
.await
.client()?
.state_store()
.remove_dependent_queued_request(&self.room_id, dependent_event_id)
.await?)
}
}
#[cfg(feature = "unstable-msc4274")]
struct GalleryItemQueueInfo {
content_type: Mime,
upload_file_txn: OwnedTransactionId,
file_media_request: MediaRequestParameters,
thumbnail: Option<QueueThumbnailInfo>,
}
#[derive(Clone, Debug)]
pub enum LocalEchoContent {
Event {
serialized_event: SerializableEventContent,
send_handle: SendHandle,
send_error: Option<QueueWedgeError>,
},
React {
key: String,
send_handle: SendReactionHandle,
applies_to: OwnedTransactionId,
},
}
#[derive(Clone, Debug)]
pub struct LocalEcho {
pub transaction_id: OwnedTransactionId,
pub content: LocalEchoContent,
}
#[derive(Clone, Debug)]
pub enum RoomSendQueueUpdate {
NewLocalEvent(LocalEcho),
CancelledLocalEvent {
transaction_id: OwnedTransactionId,
},
ReplacedLocalEvent {
transaction_id: OwnedTransactionId,
new_content: SerializableEventContent,
},
SendError {
transaction_id: OwnedTransactionId,
error: Arc<crate::Error>,
is_recoverable: bool,
},
RetryEvent {
transaction_id: OwnedTransactionId,
},
SentEvent {
transaction_id: OwnedTransactionId,
event_id: OwnedEventId,
},
MediaUpload {
related_to: OwnedTransactionId,
file: Option<MediaSource>,
index: u64,
progress: AbstractProgress,
},
}
#[derive(Clone, Debug)]
pub struct SendQueueUpdate {
pub room_id: OwnedRoomId,
pub update: RoomSendQueueUpdate,
}
#[derive(Debug, thiserror::Error)]
pub enum RoomSendQueueError {
#[error("the room isn't in the joined state")]
RoomNotJoined,
#[error("the room is now missing from the client")]
RoomDisappeared,
#[error(transparent)]
StorageError(#[from] RoomSendQueueStorageError),
#[error("the attachment event could not be created")]
FailedToCreateAttachment,
#[cfg(feature = "unstable-msc4274")]
#[error("the gallery contains no items")]
EmptyGallery,
#[cfg(feature = "unstable-msc4274")]
#[error("the gallery event could not be created")]
FailedToCreateGallery,
}
#[derive(Debug, thiserror::Error)]
pub enum RoomSendQueueStorageError {
#[error(transparent)]
StateStoreError(#[from] StoreError),
#[error(transparent)]
EventCacheStoreError(#[from] EventCacheStoreError),
#[error(transparent)]
MediaStoreError(#[from] MediaStoreError),
#[error(transparent)]
LockError(#[from] CrossProcessLockError),
#[error(transparent)]
JsonSerialization(#[from] serde_json::Error),
#[error("a dependent event had an invalid parent key type")]
InvalidParentKey,
#[error("The client is shutting down.")]
ClientShuttingDown,
#[error("This operation is not implemented for media uploads")]
OperationNotImplementedYet,
#[error("Can't edit a media caption when the underlying event isn't a media")]
InvalidMediaCaptionEdit,
}
#[derive(Clone, Debug)]
struct MediaHandles {
upload_thumbnail_txn: Option<OwnedTransactionId>,
upload_file_txn: OwnedTransactionId,
}
#[derive(Clone, Debug)]
pub struct SendHandle {
room: RoomSendQueue,
transaction_id: OwnedTransactionId,
media_handles: Vec<MediaHandles>,
pub created_at: MilliSecondsSinceUnixEpoch,
}
impl SendHandle {
#[cfg(test)]
pub(crate) fn new(
room: RoomSendQueue,
transaction_id: OwnedTransactionId,
created_at: MilliSecondsSinceUnixEpoch,
) -> Self {
Self { room, transaction_id, media_handles: vec![], created_at }
}
fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
if !self.media_handles.is_empty() {
Err(RoomSendQueueStorageError::OperationNotImplementedYet)
} else {
Ok(())
}
}
#[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
trace!("received an abort request");
let queue = &self.room.inner.queue;
for handles in &self.media_handles {
if queue.abort_upload(&self.transaction_id, handles).await? {
self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
transaction_id: self.transaction_id.clone(),
});
return Ok(true);
}
}
if queue.cancel_event(&self.transaction_id).await? {
trace!("successful abort");
self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
transaction_id: self.transaction_id.clone(),
});
Ok(true)
} else {
debug!("local echo didn't exist anymore, can't abort");
Ok(false)
}
}
#[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
pub async fn edit_raw(
&self,
new_content: Raw<AnyMessageLikeEventContent>,
event_type: String,
) -> Result<bool, RoomSendQueueStorageError> {
trace!("received an edit request");
self.nyi_for_uploads()?;
let serializable = SerializableEventContent::from_raw(new_content, event_type);
if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
trace!("successful edit");
self.room.inner.notifier.notify_one();
self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
transaction_id: self.transaction_id.clone(),
new_content: serializable,
});
Ok(true)
} else {
debug!("local echo doesn't exist anymore, can't edit");
Ok(false)
}
}
pub async fn edit(
&self,
new_content: AnyMessageLikeEventContent,
) -> Result<bool, RoomSendQueueStorageError> {
self.edit_raw(
Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
new_content.event_type().to_string(),
)
.await
}
pub async fn edit_media_caption(
&self,
caption: Option<String>,
formatted_caption: Option<FormattedBody>,
mentions: Option<Mentions>,
) -> Result<bool, RoomSendQueueStorageError> {
if let Some(new_content) = self
.room
.inner
.queue
.edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
.await?
{
trace!("successful edit of media caption");
self.room.inner.notifier.notify_one();
let new_content = SerializableEventContent::new(&new_content)
.map_err(RoomSendQueueStorageError::JsonSerialization)?;
self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
transaction_id: self.transaction_id.clone(),
new_content,
});
Ok(true)
} else {
debug!("local echo doesn't exist anymore, can't edit media caption");
Ok(false)
}
}
pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
let room = &self.room.inner;
room.queue
.mark_as_unwedged(&self.transaction_id)
.await
.map_err(RoomSendQueueError::StorageError)?;
for handles in &self.media_handles {
room.queue
.mark_as_unwedged(&handles.upload_file_txn)
.await
.map_err(RoomSendQueueError::StorageError)?;
if let Some(txn) = &handles.upload_thumbnail_txn {
room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
}
}
room.notifier.notify_one();
self.room.send_update(RoomSendQueueUpdate::RetryEvent {
transaction_id: self.transaction_id.clone(),
});
Ok(())
}
#[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
pub async fn react(
&self,
key: String,
) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
trace!("received an intent to react");
let created_at = MilliSecondsSinceUnixEpoch::now();
if let Some(reaction_txn_id) =
self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
{
trace!("successfully queued react");
self.room.inner.notifier.notify_one();
let send_handle = SendReactionHandle {
room: self.room.clone(),
transaction_id: reaction_txn_id.clone(),
};
self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
transaction_id: reaction_txn_id.into(),
content: LocalEchoContent::React {
key,
send_handle: send_handle.clone(),
applies_to: self.transaction_id.clone(),
},
}));
Ok(Some(send_handle))
} else {
debug!("local echo doesn't exist anymore, can't react");
Ok(None)
}
}
}
#[derive(Clone, Debug)]
pub struct SendReactionHandle {
room: RoomSendQueue,
transaction_id: ChildTransactionId,
}
impl SendReactionHandle {
pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
transaction_id: self.transaction_id.clone().into(),
});
return Ok(true);
}
let handle = SendHandle {
room: self.room.clone(),
transaction_id: self.transaction_id.clone().into(),
media_handles: vec![],
created_at: MilliSecondsSinceUnixEpoch::now(),
};
handle.abort().await
}
pub fn transaction_id(&self) -> &TransactionId {
&self.transaction_id
}
}
fn canonicalize_dependent_requests(
dependent: &[DependentQueuedRequest],
) -> Vec<DependentQueuedRequest> {
let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
for d in dependent {
let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
continue;
}
match &d.kind {
DependentQueuedRequestKind::EditEvent { .. } => {
if let Some(prev_edit) = prevs
.iter_mut()
.find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
{
*prev_edit = d;
} else {
prevs.insert(0, d);
}
}
DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
| DependentQueuedRequestKind::FinishUpload { .. }
| DependentQueuedRequestKind::ReactEvent { .. } => {
prevs.push(d);
}
#[cfg(feature = "unstable-msc4274")]
DependentQueuedRequestKind::FinishGallery { .. } => {
prevs.push(d);
}
DependentQueuedRequestKind::RedactEvent => {
prevs.clear();
prevs.push(d);
}
}
}
by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
}
#[cfg(all(test, not(target_family = "wasm")))]
mod tests {
use std::{sync::Arc, time::Duration};
use assert_matches2::{assert_let, assert_matches};
use matrix_sdk_base::store::{
ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
SerializableEventContent,
};
use matrix_sdk_test::{JoinedRoomBuilder, SyncResponseBuilder, async_test};
use ruma::{
MilliSecondsSinceUnixEpoch, TransactionId,
events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
room_id,
};
use super::canonicalize_dependent_requests;
use crate::{client::WeakClient, test_utils::logged_in_client};
#[test]
fn test_canonicalize_dependent_events_created_at() {
let txn = TransactionId::new();
let created_at = MilliSecondsSinceUnixEpoch::now();
let edit = DependentQueuedRequest {
own_transaction_id: ChildTransactionId::new(),
parent_transaction_id: txn.clone(),
kind: DependentQueuedRequestKind::EditEvent {
new_content: SerializableEventContent::new(
&RoomMessageEventContent::text_plain("edit").into(),
)
.unwrap(),
},
parent_key: None,
created_at,
};
let res = canonicalize_dependent_requests(&[edit]);
assert_eq!(res.len(), 1);
assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
assert_let!(
AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
);
assert_eq!(msg.body(), "edit");
assert_eq!(res[0].parent_transaction_id, txn);
assert_eq!(res[0].created_at, created_at);
}
#[async_test]
async fn test_client_no_cycle_with_send_queue() {
for enabled in [true, false] {
let client = logged_in_client(None).await;
let weak_client = WeakClient::from_client(&client);
{
let mut sync_response_builder = SyncResponseBuilder::new();
let room_id = room_id!("!a:b.c");
client
.base_client()
.receive_sync_response(
sync_response_builder
.add_joined_room(JoinedRoomBuilder::new(room_id))
.build_sync_response(),
)
.await
.unwrap();
let room = client.get_room(room_id).unwrap();
let q = room.send_queue();
let _watcher = q.subscribe().await;
client.send_queue().set_enabled(enabled).await;
}
drop(client);
tokio::time::sleep(Duration::from_millis(500)).await;
let client = weak_client.get();
assert!(
client.is_none(),
"too many strong references to the client: {}",
Arc::strong_count(&client.unwrap().inner)
);
}
}
#[test]
fn test_canonicalize_dependent_events_smoke_test() {
let txn = TransactionId::new();
let edit = DependentQueuedRequest {
own_transaction_id: ChildTransactionId::new(),
parent_transaction_id: txn.clone(),
kind: DependentQueuedRequestKind::EditEvent {
new_content: SerializableEventContent::new(
&RoomMessageEventContent::text_plain("edit").into(),
)
.unwrap(),
},
parent_key: None,
created_at: MilliSecondsSinceUnixEpoch::now(),
};
let res = canonicalize_dependent_requests(&[edit]);
assert_eq!(res.len(), 1);
assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
assert_eq!(res[0].parent_transaction_id, txn);
assert!(res[0].parent_key.is_none());
}
#[test]
fn test_canonicalize_dependent_events_redaction_preferred() {
let txn = TransactionId::new();
let mut inputs = Vec::with_capacity(100);
let redact = DependentQueuedRequest {
own_transaction_id: ChildTransactionId::new(),
parent_transaction_id: txn.clone(),
kind: DependentQueuedRequestKind::RedactEvent,
parent_key: None,
created_at: MilliSecondsSinceUnixEpoch::now(),
};
let edit = DependentQueuedRequest {
own_transaction_id: ChildTransactionId::new(),
parent_transaction_id: txn.clone(),
kind: DependentQueuedRequestKind::EditEvent {
new_content: SerializableEventContent::new(
&RoomMessageEventContent::text_plain("edit").into(),
)
.unwrap(),
},
parent_key: None,
created_at: MilliSecondsSinceUnixEpoch::now(),
};
inputs.push({
let mut edit = edit.clone();
edit.own_transaction_id = ChildTransactionId::new();
edit
});
inputs.push(redact);
for _ in 0..98 {
let mut edit = edit.clone();
edit.own_transaction_id = ChildTransactionId::new();
inputs.push(edit);
}
let res = canonicalize_dependent_requests(&inputs);
assert_eq!(res.len(), 1);
assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
assert_eq!(res[0].parent_transaction_id, txn);
}
#[test]
fn test_canonicalize_dependent_events_last_edit_preferred() {
let parent_txn = TransactionId::new();
let inputs = (0..10)
.map(|i| DependentQueuedRequest {
own_transaction_id: ChildTransactionId::new(),
parent_transaction_id: parent_txn.clone(),
kind: DependentQueuedRequestKind::EditEvent {
new_content: SerializableEventContent::new(
&RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
)
.unwrap(),
},
parent_key: None,
created_at: MilliSecondsSinceUnixEpoch::now(),
})
.collect::<Vec<_>>();
let txn = inputs[9].parent_transaction_id.clone();
let res = canonicalize_dependent_requests(&inputs);
assert_eq!(res.len(), 1);
assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
assert_let!(
AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
);
assert_eq!(msg.body(), "edit9");
assert_eq!(res[0].parent_transaction_id, txn);
}
#[test]
fn test_canonicalize_multiple_local_echoes() {
let txn1 = TransactionId::new();
let txn2 = TransactionId::new();
let child1 = ChildTransactionId::new();
let child2 = ChildTransactionId::new();
let inputs = vec![
DependentQueuedRequest {
own_transaction_id: child1.clone(),
kind: DependentQueuedRequestKind::RedactEvent,
parent_transaction_id: txn1.clone(),
parent_key: None,
created_at: MilliSecondsSinceUnixEpoch::now(),
},
DependentQueuedRequest {
own_transaction_id: child2,
kind: DependentQueuedRequestKind::EditEvent {
new_content: SerializableEventContent::new(
&RoomMessageEventContent::text_plain("edit").into(),
)
.unwrap(),
},
parent_transaction_id: txn2.clone(),
parent_key: None,
created_at: MilliSecondsSinceUnixEpoch::now(),
},
];
let res = canonicalize_dependent_requests(&inputs);
assert_eq!(res.len(), 2);
for dependent in res {
if dependent.own_transaction_id == child1 {
assert_eq!(dependent.parent_transaction_id, txn1);
assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
} else {
assert_eq!(dependent.parent_transaction_id, txn2);
assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
}
}
}
#[test]
fn test_canonicalize_reactions_after_edits() {
let txn = TransactionId::new();
let react_id = ChildTransactionId::new();
let react = DependentQueuedRequest {
own_transaction_id: react_id.clone(),
kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
parent_transaction_id: txn.clone(),
parent_key: None,
created_at: MilliSecondsSinceUnixEpoch::now(),
};
let edit_id = ChildTransactionId::new();
let edit = DependentQueuedRequest {
own_transaction_id: edit_id.clone(),
kind: DependentQueuedRequestKind::EditEvent {
new_content: SerializableEventContent::new(
&RoomMessageEventContent::text_plain("edit").into(),
)
.unwrap(),
},
parent_transaction_id: txn,
parent_key: None,
created_at: MilliSecondsSinceUnixEpoch::now(),
};
let res = canonicalize_dependent_requests(&[react, edit]);
assert_eq!(res.len(), 2);
assert_eq!(res[0].own_transaction_id, edit_id);
assert_eq!(res[1].own_transaction_id, react_id);
}
}