#![deny(unreachable_pub)]
use std::{fmt::Debug, future::IntoFuture};
use eyeball::{SharedObservable, Subscriber};
use js_int::UInt;
use matrix_sdk_common::{SendOutsideWasm, SyncOutsideWasm, boxed_into_future};
use oauth2::{RequestTokenError, basic::BasicErrorResponseType};
use ruma::api::{
OutgoingRequest,
auth_scheme::{AuthScheme, SendAccessToken},
client::{error::ErrorKind, media},
error::FromHttpResponseError,
path_builder::PathBuilder,
};
use tracing::{error, trace};
use super::super::Client;
use crate::{
Error, RefreshTokenError, TransmissionProgress,
authentication::oauth::OAuthError,
config::RequestConfig,
error::{HttpError, HttpResult},
http_client::SupportedPathBuilder,
media::MediaError,
};
#[allow(missing_debug_implementations)]
pub struct SendRequest<R> {
pub(crate) client: Client,
pub(crate) request: R,
pub(crate) config: Option<RequestConfig>,
pub(crate) send_progress: SharedObservable<TransmissionProgress>,
}
impl<R> SendRequest<R> {
pub fn with_send_progress_observable(
mut self,
send_progress: SharedObservable<TransmissionProgress>,
) -> Self {
self.send_progress = send_progress;
self
}
pub fn with_request_config(mut self, request_config: impl Into<Option<RequestConfig>>) -> Self {
self.config = request_config.into();
self
}
pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
self.send_progress.subscribe()
}
}
impl<R> IntoFuture for SendRequest<R>
where
R: OutgoingRequest + Clone + Debug + SendOutsideWasm + SyncOutsideWasm + 'static,
for<'a> R::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
R::PathBuilder: SupportedPathBuilder,
for<'a> <R::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
R::IncomingResponse: SendOutsideWasm + SyncOutsideWasm,
HttpError: From<FromHttpResponseError<R::EndpointError>>,
{
type Output = HttpResult<R::IncomingResponse>;
boxed_into_future!();
fn into_future(self) -> Self::IntoFuture {
let Self { client, request, config, send_progress } = self;
Box::pin(async move {
let res =
Box::pin(client.send_inner(request.clone(), config, send_progress.clone())).await;
if let Err(Some(ErrorKind::UnknownToken { soft_logout })) =
res.as_ref().map_err(HttpError::client_api_error_kind)
{
trace!("Token refresh: Unknown token error received.");
if !client.inner.auth_ctx.handle_refresh_tokens {
trace!("Token refresh: Automatic refresh disabled.");
client.broadcast_unknown_token(soft_logout);
return res;
}
if let Err(refresh_error) = client.refresh_access_token().await {
match &refresh_error {
RefreshTokenError::RefreshTokenRequired => {
trace!("Token refresh: The session doesn't have a refresh token.");
client.broadcast_unknown_token(soft_logout);
}
RefreshTokenError::OAuth(oauth_error) => {
match &**oauth_error {
OAuthError::RefreshToken(RequestTokenError::ServerResponse(
error_response,
)) if *error_response.error()
== BasicErrorResponseType::InvalidGrant =>
{
error!(
"Token refresh: OAuth 2.0 refresh_token rejected \
with invalid grant"
);
client.broadcast_unknown_token(soft_logout);
}
_ => {
trace!(
"Token refresh: OAuth 2.0 refresh encountered a problem."
);
}
}
return Err(HttpError::RefreshToken(refresh_error));
}
_ => {
trace!("Token refresh: Token refresh failed.");
client.broadcast_unknown_token(soft_logout);
return Err(HttpError::RefreshToken(refresh_error));
}
}
} else {
trace!("Token refresh: Refresh succeeded, retrying request.");
return Box::pin(client.send_inner(request, config, send_progress)).await;
}
}
res
})
}
}
#[allow(missing_debug_implementations)]
pub struct SendMediaUploadRequest {
send_request: SendRequest<media::create_content::v3::Request>,
}
impl SendMediaUploadRequest {
pub fn new(request: SendRequest<media::create_content::v3::Request>) -> Self {
Self { send_request: request }
}
pub fn with_send_progress_observable(
mut self,
send_progress: SharedObservable<TransmissionProgress>,
) -> Self {
self.send_request = self.send_request.with_send_progress_observable(send_progress);
self
}
pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
self.send_request.send_progress.subscribe()
}
}
impl IntoFuture for SendMediaUploadRequest {
type Output = Result<media::create_content::v3::Response, Error>;
boxed_into_future!();
fn into_future(self) -> Self::IntoFuture {
let request_length = self.send_request.request.file.len();
let client = self.send_request.client.clone();
let send_request = self.send_request;
Box::pin(async move {
let max_upload_size = client.load_or_fetch_max_upload_size().await?;
let request_length = UInt::new_wrapping(request_length as u64);
if request_length > max_upload_size {
return Err(Error::Media(MediaError::MediaTooLargeToUpload {
max: max_upload_size,
current: request_length,
}));
}
send_request.into_future().await.map_err(Into::into)
})
}
}