use std::{future::IntoFuture, time::Duration};
use futures_core::Stream;
use futures_util::StreamExt;
use matrix_sdk_common::boxed_into_future;
use thiserror::Error;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::trace;
use super::{Backups, UploadState};
use crate::utils::ChannelObservable;
#[derive(Clone, Copy, Debug, Error)]
pub enum SteadyStateError {
#[error("The backup got disabled while waiting for the room keys to be uploaded.")]
BackupDisabled,
#[error("There was a network connection error.")]
Connection,
#[error("We couldn't read status updates from the upload task quickly enough.")]
Lagged,
}
#[derive(Debug)]
pub struct WaitForSteadyState<'a> {
pub(super) backups: &'a Backups,
pub(super) progress: ChannelObservable<UploadState>,
pub(super) timeout: Option<Duration>,
}
impl WaitForSteadyState<'_> {
pub fn subscribe_to_progress(
&self,
) -> impl Stream<Item = Result<UploadState, BroadcastStreamRecvError>> + use<> {
self.progress.subscribe()
}
pub fn with_delay(mut self, delay: Duration) -> Self {
self.timeout = Some(delay);
self
}
}
impl<'a> IntoFuture for WaitForSteadyState<'a> {
type Output = Result<(), SteadyStateError>;
boxed_into_future!(extra_bounds: 'a);
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let Self { backups, timeout, progress } = self;
trace!("Creating a stream to wait for the steady state");
let mut progress_stream = progress.subscribe();
let old_delay = if let Some(delay) = timeout {
let mut lock = backups.client.inner.e2ee.backup_state.upload_delay.write().unwrap();
let old_delay = Some(lock.to_owned());
*lock = delay;
old_delay
} else {
None
};
trace!("Waiting for the upload steady state");
let ret = if backups.are_enabled().await {
backups.maybe_trigger_backup();
let mut ret = Ok(());
while let Some(state) = progress_stream.next().await {
trace!(?state, "Update state while waiting for the backup steady state");
match state {
Ok(UploadState::Done) => {
ret = Ok(());
break;
}
Ok(UploadState::Error) => {
if backups.are_enabled().await {
ret = Err(SteadyStateError::Connection);
} else {
ret = Err(SteadyStateError::BackupDisabled);
}
break;
}
Err(_) => {
ret = Err(SteadyStateError::Lagged);
break;
}
_ => (),
}
}
ret
} else {
Err(SteadyStateError::BackupDisabled)
};
if let Some(old_delay) = old_delay {
let mut lock = backups.client.inner.e2ee.backup_state.upload_delay.write().unwrap();
*lock = old_delay;
}
ret
})
}
}