use std::{
borrow::Cow,
sync::{Arc, Mutex},
time::Duration,
};
use assign::assign;
use async_stream::try_stream;
use futures_core::stream::Stream;
use ruma::{
DeviceId,
api::{
AppserviceUserIdentity, OutgoingRequest, SupportedVersions,
auth_scheme::{AuthScheme, SendAccessToken},
client::{
account::register::{self, RegistrationKind},
session::login::{self, v3::LoginInfo},
sync::sync_events,
uiaa::{MatrixUserIdentifier, UserIdentifier},
},
path_builder::{PathBuilder, SinglePath, VersionHistory},
},
presence::PresenceState,
};
use crate::{Error, HttpClient, ResponseError, ResponseResult, send_customized_request};
mod builder;
pub use self::builder::ClientBuilder;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum TokenMode {
#[default]
SendIfRequired,
SendAlways,
AppService,
}
#[derive(Clone, Debug)]
pub struct Client<C>(Arc<ClientData<C>>);
#[derive(Debug)]
struct ClientData<C> {
homeserver_url: String,
http_client: C,
access_token: Mutex<Option<String>>,
token_mode: TokenMode,
supported_matrix_versions: SupportedVersions,
}
impl Client<()> {
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
}
impl<C> Client<C> {
pub fn access_token(&self) -> Option<String> {
self.0.access_token.lock().expect("session mutex was poisoned").clone()
}
}
impl<C: HttpClient> Client<C> {
pub async fn send_request<R>(&self, request: R) -> ResponseResult<C, R>
where
R: OutgoingRequest,
for<'a> R::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
R::PathBuilder: SupportedPathBuilder,
{
self.send_customized_request(request, |_| Ok(())).await
}
pub async fn send_customized_request<R, F>(
&self,
request: R,
customize: F,
) -> ResponseResult<C, R>
where
R: OutgoingRequest,
for<'a> R::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
R::PathBuilder: SupportedPathBuilder,
F: FnOnce(&mut http::Request<C::RequestBody>) -> Result<(), ResponseError<C, R>>,
{
let token_mode = self.0.token_mode;
let access_token = self.access_token();
let send_access_token = match (token_mode, access_token.as_deref()) {
(TokenMode::AppService, Some(at)) => SendAccessToken::Appservice(at),
(TokenMode::SendIfRequired, Some(at)) => SendAccessToken::IfRequired(at),
(TokenMode::SendAlways, Some(at)) => SendAccessToken::Always(at),
(_, None) => SendAccessToken::None,
};
send_customized_request(
&self.0.http_client,
&self.0.homeserver_url,
send_access_token,
R::PathBuilder::get_path_builder_input(self),
request,
customize,
)
.await
}
pub async fn send_request_as<R>(
&self,
identity: AppserviceUserIdentity<'_>,
request: R,
) -> ResponseResult<C, R>
where
R: OutgoingRequest,
for<'a> R::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
R::PathBuilder: SupportedPathBuilder,
{
self.send_customized_request(request, |request| {
Ok(identity.maybe_add_to_uri(request.uri_mut())?)
})
.await
}
pub async fn log_in(
&self,
user: &str,
password: &str,
device_id: Option<&DeviceId>,
initial_device_display_name: Option<&str>,
) -> Result<login::v3::Response, Error<C::Error, ruma::api::error::Error>> {
let login_info = LoginInfo::Password(login::v3::Password::new(
UserIdentifier::Matrix(MatrixUserIdentifier::new(user.to_owned())),
password.to_owned(),
));
let response = self
.send_request(assign!(login::v3::Request::new(login_info), {
device_id: device_id.map(ToOwned::to_owned),
initial_device_display_name: initial_device_display_name.map(ToOwned::to_owned),
}))
.await?;
*self.0.access_token.lock().unwrap() = Some(response.access_token.clone());
Ok(response)
}
pub async fn register_guest(
&self,
) -> Result<register::v3::Response, Error<C::Error, ruma::api::client::uiaa::UiaaResponse>>
{
let response = self
.send_request(assign!(register::v3::Request::new(), { kind: RegistrationKind::Guest }))
.await?;
self.0.access_token.lock().unwrap().clone_from(&response.access_token);
Ok(response)
}
pub async fn register_user(
&self,
username: Option<&str>,
password: &str,
) -> Result<register::v3::Response, Error<C::Error, ruma::api::client::uiaa::UiaaResponse>>
{
let response = self
.send_request(assign!(register::v3::Request::new(), {
username: username.map(ToOwned::to_owned),
password: Some(password.to_owned())
}))
.await?;
self.0.access_token.lock().unwrap().clone_from(&response.access_token);
Ok(response)
}
pub fn sync(
&self,
filter: Option<sync_events::v3::Filter>,
mut since: String,
set_presence: PresenceState,
timeout: Option<Duration>,
) -> impl Stream<
Item = Result<sync_events::v3::Response, Error<C::Error, ruma::api::error::Error>>,
> + '_ {
try_stream! {
loop {
let response = self
.send_request(assign!(sync_events::v3::Request::new(), {
filter: filter.clone(),
since: Some(since.clone()),
set_presence: set_presence.clone(),
timeout,
}))
.await?;
since.clone_from(&response.next_batch);
yield response;
}
}
}
}
pub trait SupportedPathBuilder: PathBuilder {
fn get_path_builder_input<C>(client: &Client<C>) -> Self::Input<'_>;
}
impl SupportedPathBuilder for VersionHistory {
fn get_path_builder_input<C>(client: &Client<C>) -> Self::Input<'_> {
Cow::Borrowed(&client.0.supported_matrix_versions)
}
}
impl SupportedPathBuilder for SinglePath {
fn get_path_builder_input<C>(_client: &Client<C>) -> Self::Input<'_> {}
}