use std::ops::Add;
use eyeball::SharedObservable;
#[cfg(feature = "unstable-msc4274")]
use matrix_sdk_base::store::AccumulatedSentMediaInfo;
use matrix_sdk_base::{media::MediaRequestParameters, store::DependentQueuedRequestKind};
use matrix_sdk_common::executor::spawn;
use ruma::{TransactionId, events::room::MediaSource};
use tokio::sync::broadcast;
use tracing::warn;
use crate::{
Room, TransmissionProgress,
send_queue::{QueueStorage, RoomSendQueue, RoomSendQueueStorageError, RoomSendQueueUpdate},
};
#[derive(Clone, Copy, Debug, Default)]
pub struct AbstractProgress {
pub current: usize,
pub total: usize,
}
impl Add for AbstractProgress {
type Output = Self;
fn add(self, other: Self) -> Self::Output {
Self { current: self.current + other.current, total: self.total + other.total }
}
}
#[derive(Clone, Copy, Debug)]
pub(super) struct MediaUploadProgressInfo {
pub index: u64,
pub bytes: usize,
pub offsets: AbstractProgress,
}
impl RoomSendQueue {
pub(super) async fn create_media_upload_progress_info(
own_txn_id: &TransactionId,
related_to: &TransactionId,
cache_key: &MediaRequestParameters,
thumbnail_source: Option<&MediaSource>,
#[cfg(feature = "unstable-msc4274")] accumulated: &[AccumulatedSentMediaInfo],
room: &Room,
queue: &QueueStorage,
) -> MediaUploadProgressInfo {
let index = {
cfg_if::cfg_if! {
if #[cfg(feature = "unstable-msc4274")] {
accumulated.len()
} else {
0 }
}
};
let bytes = match room.client().media_store().lock().await {
Ok(cache) => match cache.get_media_content(cache_key).await {
Ok(Some(content)) => content.len(),
Ok(None) => 0,
Err(err) => {
warn!("error when reading media content from media store: {err}");
0
}
},
Err(err) => {
warn!("couldn't acquire media store lock: {err}");
0
}
};
let offsets = {
let already_uploaded_thumbnail_bytes = if thumbnail_source.is_some() {
queue
.thumbnail_file_sizes
.lock()
.get(related_to)
.and_then(|sizes| sizes.get(index))
.copied()
.flatten()
} else {
None
};
let already_uploaded_thumbnail_bytes = already_uploaded_thumbnail_bytes.unwrap_or(0);
let pending_file_bytes = match RoomSendQueue::get_dependent_pending_file_upload_size(
own_txn_id, room,
)
.await
{
Ok(maybe_size) => maybe_size.unwrap_or(0),
Err(err) => {
warn!(
"error when getting pending file upload size: {err}; using 0 as fallback"
);
0
}
};
AbstractProgress {
current: already_uploaded_thumbnail_bytes,
total: already_uploaded_thumbnail_bytes + pending_file_bytes,
}
};
MediaUploadProgressInfo { index: index as u64, bytes, offsets }
}
async fn get_dependent_pending_file_upload_size(
txn_id: &TransactionId,
room: &Room,
) -> Result<Option<usize>, RoomSendQueueStorageError> {
let client = room.client();
let dependent_requests =
client.state_store().load_dependent_queued_requests(room.room_id()).await?;
let Some((cache_key, parent_is_thumbnail_upload)) =
dependent_requests.into_iter().find_map(|r| {
if r.parent_transaction_id != txn_id {
return None;
}
if let DependentQueuedRequestKind::UploadFileOrThumbnail {
cache_key,
parent_is_thumbnail_upload,
..
} = r.kind
{
Some((cache_key, parent_is_thumbnail_upload))
} else {
None
}
})
else {
return Ok(None);
};
if !parent_is_thumbnail_upload {
return Ok(None);
}
let media_store_guard = client.media_store().lock().await?;
let maybe_content = media_store_guard.get_media_content(&cache_key).await?;
Ok(maybe_content.map(|c| c.len()))
}
pub(super) fn create_media_upload_progress_observable(
media_upload_info: &MediaUploadProgressInfo,
related_txn_id: &TransactionId,
update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
) -> SharedObservable<TransmissionProgress> {
let progress: SharedObservable<TransmissionProgress> = Default::default();
let mut subscriber = progress.subscribe();
let related_txn_id = related_txn_id.to_owned();
let update_sender = update_sender.clone();
let media_upload_info = *media_upload_info;
spawn(async move {
while let Some(progress) = subscriber.next().await {
let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
related_to: related_txn_id.clone(),
file: None,
index: media_upload_info.index,
progress: estimate_media_upload_progress(progress, media_upload_info.bytes)
+ media_upload_info.offsets,
});
}
});
progress
}
}
fn estimate_media_upload_progress(
progress: TransmissionProgress,
bytes: usize,
) -> AbstractProgress {
if progress.total == 0 {
return AbstractProgress { current: 0, total: 0 };
}
if progress.current == progress.total {
return AbstractProgress { current: bytes, total: bytes };
}
AbstractProgress {
current: (progress.current as f64 / progress.total as f64 * bytes as f64).round() as usize,
total: bytes,
}
}