matrix_sdk/client/
futures.rs1#![deny(unreachable_pub)]
16
17use std::{fmt::Debug, future::IntoFuture};
18
19use eyeball::SharedObservable;
20#[cfg(not(target_arch = "wasm32"))]
21use eyeball::Subscriber;
22use matrix_sdk_common::boxed_into_future;
23use oauth2::{basic::BasicErrorResponseType, RequestTokenError};
24use ruma::api::{client::error::ErrorKind, error::FromHttpResponseError, OutgoingRequest};
25use tracing::{error, trace};
26
27use super::super::Client;
28use crate::{
29 authentication::oauth::OAuthError,
30 config::RequestConfig,
31 error::{HttpError, HttpResult},
32 RefreshTokenError, TransmissionProgress,
33};
34
35#[allow(missing_debug_implementations)]
37pub struct SendRequest<R> {
38 pub(crate) client: Client,
39 pub(crate) request: R,
40 pub(crate) config: Option<RequestConfig>,
41 pub(crate) send_progress: SharedObservable<TransmissionProgress>,
42}
43
44impl<R> SendRequest<R> {
45 pub fn with_send_progress_observable(
52 mut self,
53 send_progress: SharedObservable<TransmissionProgress>,
54 ) -> Self {
55 self.send_progress = send_progress;
56 self
57 }
58
59 pub fn with_request_config(mut self, request_config: impl Into<Option<RequestConfig>>) -> Self {
62 self.config = request_config.into();
63 self
64 }
65
66 #[cfg(not(target_arch = "wasm32"))]
69 pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
70 self.send_progress.subscribe()
71 }
72}
73
74impl<R> IntoFuture for SendRequest<R>
75where
76 R: OutgoingRequest + Clone + Debug + Send + Sync + 'static,
77 R::IncomingResponse: Send + Sync,
78 HttpError: From<FromHttpResponseError<R::EndpointError>>,
79{
80 type Output = HttpResult<R::IncomingResponse>;
81 boxed_into_future!();
82
83 fn into_future(self) -> Self::IntoFuture {
84 let Self { client, request, config, send_progress } = self;
85
86 Box::pin(async move {
87 let res =
88 Box::pin(client.send_inner(request.clone(), config, send_progress.clone())).await;
89
90 if let Err(Some(ErrorKind::UnknownToken { soft_logout })) =
92 res.as_ref().map_err(HttpError::client_api_error_kind)
93 {
94 trace!("Token refresh: Unknown token error received.");
95
96 if !client.inner.auth_ctx.handle_refresh_tokens {
98 trace!("Token refresh: Automatic refresh disabled.");
99 client.broadcast_unknown_token(soft_logout);
100 return res;
101 }
102
103 if let Err(refresh_error) = client.refresh_access_token().await {
105 match &refresh_error {
106 RefreshTokenError::RefreshTokenRequired => {
107 trace!("Token refresh: The session doesn't have a refresh token.");
108 client.broadcast_unknown_token(soft_logout);
110 }
111
112 RefreshTokenError::OAuth(oauth_error) => {
113 match &**oauth_error {
114 OAuthError::RefreshToken(RequestTokenError::ServerResponse(
115 error_response,
116 )) if *error_response.error()
117 == BasicErrorResponseType::InvalidGrant =>
118 {
119 error!("Token refresh: OAuth 2.0 refresh_token rejected with invalid grant");
120 client.broadcast_unknown_token(soft_logout);
122 }
123 _ => {
124 trace!(
125 "Token refresh: OAuth 2.0 refresh encountered a problem."
126 );
127 }
130 };
131 return Err(HttpError::RefreshToken(refresh_error));
132 }
133
134 _ => {
135 trace!("Token refresh: Token refresh failed.");
136 client.broadcast_unknown_token(soft_logout);
139 return Err(HttpError::RefreshToken(refresh_error));
140 }
141 }
142 } else {
143 trace!("Token refresh: Refresh succeeded, retrying request.");
144 return Box::pin(client.send_inner(request, config, send_progress)).await;
145 }
146 }
147
148 res
149 })
150 }
151}