1use std::{
2 sync::{Arc, Mutex},
3 time::Duration,
4};
5
6use assign::assign;
7use async_stream::try_stream;
8use futures_core::stream::Stream;
9use ruma::{
10 api::{
11 client::{
12 account::register::{self, RegistrationKind},
13 session::login::{self, v3::LoginInfo},
14 sync::sync_events,
15 uiaa::UserIdentifier,
16 },
17 OutgoingRequest, SendAccessToken, SupportedVersions,
18 },
19 presence::PresenceState,
20 DeviceId, UserId,
21};
22
23use crate::{
24 add_user_id_to_query, send_customized_request, Error, HttpClient, ResponseError, ResponseResult,
25};
26
27mod builder;
28
29pub use self::builder::ClientBuilder;
30
31#[derive(Clone, Debug)]
33pub struct Client<C>(Arc<ClientData<C>>);
34
35#[derive(Debug)]
37struct ClientData<C> {
38 homeserver_url: String,
40
41 http_client: C,
43
44 access_token: Mutex<Option<String>>,
46
47 supported_matrix_versions: SupportedVersions,
49}
50
51impl Client<()> {
52 pub fn builder() -> ClientBuilder {
54 ClientBuilder::new()
55 }
56}
57
58impl<C> Client<C> {
59 pub fn access_token(&self) -> Option<String> {
63 self.0.access_token.lock().expect("session mutex was poisoned").clone()
64 }
65}
66
67impl<C: HttpClient> Client<C> {
68 pub async fn send_request<R: OutgoingRequest>(&self, request: R) -> ResponseResult<C, R> {
70 self.send_customized_request(request, |_| Ok(())).await
71 }
72
73 pub async fn send_customized_request<R, F>(
75 &self,
76 request: R,
77 customize: F,
78 ) -> ResponseResult<C, R>
79 where
80 R: OutgoingRequest,
81 F: FnOnce(&mut http::Request<C::RequestBody>) -> Result<(), ResponseError<C, R>>,
82 {
83 let access_token = self.access_token();
84 let send_access_token = match access_token.as_deref() {
85 Some(at) => SendAccessToken::IfRequired(at),
86 None => SendAccessToken::None,
87 };
88
89 send_customized_request(
90 &self.0.http_client,
91 &self.0.homeserver_url,
92 send_access_token,
93 &self.0.supported_matrix_versions,
94 request,
95 customize,
96 )
97 .await
98 }
99
100 pub async fn send_request_as<R: OutgoingRequest>(
105 &self,
106 user_id: &UserId,
107 request: R,
108 ) -> ResponseResult<C, R> {
109 self.send_customized_request(request, add_user_id_to_query::<C, R>(user_id)).await
110 }
111
112 pub async fn log_in(
117 &self,
118 user: &str,
119 password: &str,
120 device_id: Option<&DeviceId>,
121 initial_device_display_name: Option<&str>,
122 ) -> Result<login::v3::Response, Error<C::Error, ruma::api::client::Error>> {
123 let login_info = LoginInfo::Password(login::v3::Password::new(
124 UserIdentifier::UserIdOrLocalpart(user.to_owned()),
125 password.to_owned(),
126 ));
127 let response = self
128 .send_request(assign!(login::v3::Request::new(login_info), {
129 device_id: device_id.map(ToOwned::to_owned),
130 initial_device_display_name: initial_device_display_name.map(ToOwned::to_owned),
131 }))
132 .await?;
133
134 *self.0.access_token.lock().unwrap() = Some(response.access_token.clone());
135
136 Ok(response)
137 }
138
139 pub async fn register_guest(
144 &self,
145 ) -> Result<register::v3::Response, Error<C::Error, ruma::api::client::uiaa::UiaaResponse>>
146 {
147 let response = self
148 .send_request(assign!(register::v3::Request::new(), { kind: RegistrationKind::Guest }))
149 .await?;
150
151 self.0.access_token.lock().unwrap().clone_from(&response.access_token);
152
153 Ok(response)
154 }
155
156 pub async fn register_user(
164 &self,
165 username: Option<&str>,
166 password: &str,
167 ) -> Result<register::v3::Response, Error<C::Error, ruma::api::client::uiaa::UiaaResponse>>
168 {
169 let response = self
170 .send_request(assign!(register::v3::Request::new(), {
171 username: username.map(ToOwned::to_owned),
172 password: Some(password.to_owned())
173 }))
174 .await?;
175
176 self.0.access_token.lock().unwrap().clone_from(&response.access_token);
177
178 Ok(response)
179 }
180
181 pub fn sync(
210 &self,
211 filter: Option<sync_events::v3::Filter>,
212 mut since: String,
213 set_presence: PresenceState,
214 timeout: Option<Duration>,
215 ) -> impl Stream<
216 Item = Result<sync_events::v3::Response, Error<C::Error, ruma::api::client::Error>>,
217 > + '_ {
218 try_stream! {
219 loop {
220 let response = self
221 .send_request(assign!(sync_events::v3::Request::new(), {
222 filter: filter.clone(),
223 since: Some(since.clone()),
224 set_presence: set_presence.clone(),
225 timeout,
226 }))
227 .await?;
228
229 since.clone_from(&response.next_batch);
230 yield response;
231 }
232 }
233 }
234}