1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
//! Crate `ruma_client` is a [Matrix](https://matrix.org/) client library.
//!
//! # Usage
//!
//! Begin by creating a `Client` type, usually using the `https` method for a client that supports
//! secure connections, and then logging in:
//!
//! ```no_run
//! use ruma_client::Client;
//!
//! let work = async {
//!     let homeserver_url = "https://example.com".parse().unwrap();
//!     let client = Client::https(homeserver_url, None);
//!
//!     let session = client
//!         .log_in("@alice:example.com".to_string(), "secret".to_string(), None, None)
//!         .await?;
//!
//!     // You're now logged in! Write the session to a file if you want to restore it later.
//!     // Then start using the API!
//! # Result::<(), ruma_client::Error<_>>::Ok(())
//! };
//! ```
//!
//! You can also pass an existing session to the `Client` constructor to restore a previous session
//! rather than calling `log_in`. This can also be used to create a session for an application service
//! that does not need to log in, but uses the access_token directly:
//!
//! ```no_run
//! use ruma_client::{Client, Session};
//!
//! let work = async {
//!     let homeserver_url = "https://example.com".parse().unwrap();
//!     let session = Session{access_token: "as_access_token".to_string(), identification: None};
//!     let client = Client::https(homeserver_url, Some(session));
//!
//!     // make calls to the API
//! };
//! ```
//!
//! For the standard use case of synchronizing with the homeserver (i.e. getting all the latest
//! events), use the `Client::sync`:
//!
//! ```no_run
//! use std::time::Duration;
//!
//! # use futures_util::stream::{StreamExt as _, TryStreamExt as _};
//! # use ruma_client::{api::r0::sync::sync_events::SetPresence, Client};
//! # let homeserver_url = "https://example.com".parse().unwrap();
//! # let client = Client::https(homeserver_url, None);
//! # let next_batch_token = String::new();
//! # async {
//! let mut sync_stream = Box::pin(client.sync(
//!     None,
//!     Some(next_batch_token),
//!     SetPresence::Online,
//!     Some(Duration::from_secs(30)),
//! ));
//! while let Some(response) = sync_stream.try_next().await? {
//!     // Do something with the data in the response...
//! }
//! # Result::<(), ruma_client::Error<_>>::Ok(())
//! # };
//! ```
//!
//! The `Client` type also provides methods for registering a new account if you don't already have
//! one with the given homeserver.
//!
//! Beyond these basic convenience methods, `ruma-client` gives you access to the entire Matrix
//! client-server API via the `api` module. Each leaf module under this tree of modules contains
//! the necessary types for one API endpoint. Simply call the module's `call` method, passing it
//! the logged in `Client` and the relevant `Request` type. `call` will return a future that will
//! resolve to the relevant `Response` type.
//!
//! For example:
//!
//! ```no_run
//! # use ruma_client::Client;
//! # let homeserver_url = "https://example.com".parse().unwrap();
//! # let client = Client::https(homeserver_url, None);
//! use std::convert::TryFrom;
//!
//! use ruma_client::api::r0::alias::get_alias;
//! use ruma_identifiers::{RoomAliasId, RoomId};
//!
//! async {
//!     let response = client
//!         .request(get_alias::Request {
//!             room_alias: RoomAliasId::try_from("#example_room:example.com").unwrap(),
//!         })
//!         .await?;
//!
//!     assert_eq!(response.room_id, RoomId::try_from("!n8f893n9:example.com").unwrap());
//! #   Result::<(), ruma_client::Error<_>>::Ok(())
//! }
//! # ;
//! ```

#![warn(rust_2018_idioms)]
#![deny(
    missing_copy_implementations,
    missing_debug_implementations,
    missing_docs
)]

use std::{
    convert::TryFrom,
    str::FromStr,
    sync::{Arc, Mutex},
    time::Duration,
};

use futures_core::{
    future::Future,
    stream::{Stream, TryStream},
};
use futures_util::stream;
use http::Response as HttpResponse;
use hyper::{client::HttpConnector, Client as HyperClient, Uri};
#[cfg(feature = "hyper-tls")]
use hyper_tls::HttpsConnector;
use ruma_api::Endpoint;
use ruma_identifiers::DeviceId;
use std::collections::BTreeMap;
use url::Url;

pub use ruma_client_api as api;
pub use ruma_events as events;
pub use ruma_identifiers as identifiers;

mod error;
mod session;

pub use self::{error::Error, session::Identification, session::Session};

/// A client for the Matrix client-server API.
#[derive(Debug)]
pub struct Client<C>(Arc<ClientData<C>>);

/// Data contained in Client's Rc
#[derive(Debug)]
struct ClientData<C> {
    /// The URL of the homeserver to connect to.
    homeserver_url: Url,
    /// The underlying HTTP client.
    hyper: HyperClient<C>,
    /// User session data.
    session: Mutex<Option<Session>>,
}

/// Non-secured variant of the client (using plain HTTP requests)
pub type HttpClient = Client<HttpConnector>;

impl HttpClient {
    /// Creates a new client for making HTTP requests to the given homeserver.
    pub fn new(homeserver_url: Url, session: Option<Session>) -> Self {
        Self(Arc::new(ClientData {
            homeserver_url,
            hyper: HyperClient::builder().build_http(),
            session: Mutex::new(session),
        }))
    }
}

/// Secured variant of the client (using HTTPS requests)
#[cfg(feature = "tls")]
pub type HttpsClient = Client<HttpsConnector<HttpConnector>>;

#[cfg(feature = "tls")]
impl HttpsClient {
    /// Creates a new client for making HTTPS requests to the given homeserver.
    pub fn https(homeserver_url: Url, session: Option<Session>) -> Self {
        let connector = HttpsConnector::new();

        Self(Arc::new(ClientData {
            homeserver_url,
            hyper: HyperClient::builder().build(connector),
            session: Mutex::new(session),
        }))
    }
}

impl<C> Client<C>
where
    C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
{
    /// Creates a new client using the given `hyper::Client`.
    ///
    /// This allows the user to configure the details of HTTP as desired.
    pub fn custom(
        hyper_client: HyperClient<C>,
        homeserver_url: Url,
        session: Option<Session>,
    ) -> Self {
        Self(Arc::new(ClientData {
            homeserver_url,
            hyper: hyper_client,
            session: Mutex::new(session),
        }))
    }

    /// Get a copy of the current `Session`, if any.
    ///
    /// Useful for serializing and persisting the session to be restored later.
    pub fn session(&self) -> Option<Session> {
        self.0
            .session
            .lock()
            .expect("session mutex was poisoned")
            .clone()
    }

    /// Log in with a username and password.
    ///
    /// In contrast to `api::r0::session::login::call()`, this method stores the
    /// session data returned by the endpoint in this client, instead of
    /// returning it.
    pub async fn log_in(
        &self,
        user: String,
        password: String,
        device_id: Option<DeviceId>,
        initial_device_display_name: Option<String>,
    ) -> Result<Session, Error<api::Error>> {
        use api::r0::session::login;

        let response = self
            .request(login::Request {
                user: login::UserInfo::MatrixId(user),
                login_info: login::LoginInfo::Password { password },
                device_id,
                initial_device_display_name,
            })
            .await?;

        let session = Session {
            access_token: response.access_token,
            identification: Some(Identification {
                device_id: response.device_id,
                user_id: response.user_id,
            }),
        };
        *self.0.session.lock().unwrap() = Some(session.clone());

        Ok(session)
    }

    /// Register as a guest. In contrast to `api::r0::account::register::call()`,
    /// this method stores the session data returned by the endpoint in this
    /// client, instead of returning it.
    pub async fn register_guest(&self) -> Result<Session, Error<api::r0::uiaa::UiaaResponse>> {
        use api::r0::account::register;

        let response = self
            .request(register::Request {
                auth: None,
                device_id: None,
                inhibit_login: false,
                initial_device_display_name: None,
                kind: Some(register::RegistrationKind::Guest),
                password: None,
                username: None,
            })
            .await?;

        let session = Session {
            // since we supply inhibit_login: false above, the access token needs to be there
            // TODO: maybe unwrap is not the best solution though
            access_token: response.access_token.unwrap(),
            identification: Some(Identification {
                // same as access_token
                device_id: response.device_id.unwrap(),
                user_id: response.user_id,
            }),
        };
        *self.0.session.lock().unwrap() = Some(session.clone());

        Ok(session)
    }

    /// Register as a new user on this server.
    ///
    /// In contrast to `api::r0::account::register::call()`, this method stores
    /// the session data returned by the endpoint in this client, instead of
    /// returning it.
    ///
    /// The username is the local part of the returned user_id. If it is
    /// omitted from this request, the server will generate one.
    pub async fn register_user(
        &self,
        username: Option<String>,
        password: String,
    ) -> Result<Session, Error<api::r0::uiaa::UiaaResponse>> {
        use api::r0::account::register;

        let response = self
            .request(register::Request {
                auth: None,
                device_id: None,
                inhibit_login: false,
                initial_device_display_name: None,
                kind: Some(register::RegistrationKind::User),
                password: Some(password),
                username,
            })
            .await?;

        let session = Session {
            // since we supply inhibit_login: false above, the access token needs to be there
            // TODO: maybe unwrap is not the best solution though
            access_token: response.access_token.unwrap(),
            identification: Some(Identification {
                // same as access_token
                device_id: response.device_id.unwrap(),
                user_id: response.user_id,
            }),
        };
        *self.0.session.lock().unwrap() = Some(session.clone());

        Ok(session)
    }

    /// Convenience method that represents repeated calls to the sync_events endpoint as a stream.
    ///
    /// If the since parameter is None, the first Item might take a significant time to arrive and
    /// be deserialized, because it contains all events that have occurred in the whole lifetime of
    /// the logged-in users account and are visible to them.
    pub fn sync(
        &self,
        filter: Option<api::r0::sync::sync_events::Filter>,
        since: Option<String>,
        set_presence: api::r0::sync::sync_events::SetPresence,
        timeout: Option<Duration>,
    ) -> impl Stream<Item = Result<api::r0::sync::sync_events::Response, Error<api::Error>>>
           + TryStream<Ok = api::r0::sync::sync_events::Response, Error = Error<api::Error>> {
        use api::r0::sync::sync_events;

        let client = self.clone();
        stream::try_unfold(since, move |since| {
            let client = client.clone();
            let filter = filter.clone();

            async move {
                let response = client
                    .request(sync_events::Request {
                        filter,
                        since,
                        full_state: false,
                        set_presence,
                        timeout,
                    })
                    .await?;

                let next_batch_clone = response.next_batch.clone();
                Ok(Some((response, Some(next_batch_clone))))
            }
        })
    }

    /// Makes a request to a Matrix API endpoint.
    pub fn request<Request: Endpoint>(
        &self,
        request: Request,
    ) -> impl Future<Output = Result<Request::Response, Error<Request::ResponseError>>> {
        self.request_with_url_params(request, None)
    }

    /// Makes a request to a Matrix API endpoint including additional URL parameters.
    pub fn request_with_url_params<Request: Endpoint>(
        &self,
        request: Request,
        params: Option<BTreeMap<String, String>>,
    ) -> impl Future<Output = Result<Request::Response, Error<Request::ResponseError>>> {
        let client = self.0.clone();

        let mut url = client.homeserver_url.clone();

        async move {
            let mut hyper_request = request.try_into()?.map(hyper::Body::from);

            {
                let uri = hyper_request.uri();

                url.set_path(uri.path());
                url.set_query(uri.query());

                if let Some(params) = params {
                    for (key, value) in params {
                        url.query_pairs_mut().append_pair(&key, &value);
                    }
                }

                if Request::METADATA.requires_authentication {
                    if let Some(ref session) = *client.session.lock().unwrap() {
                        url.query_pairs_mut()
                            .append_pair("access_token", &session.access_token);
                    } else {
                        return Err(Error::AuthenticationRequired);
                    }
                }
            }

            *hyper_request.uri_mut() = Uri::from_str(url.as_ref())?;

            let hyper_response = client.hyper.request(hyper_request).await?;
            let (head, body) = hyper_response.into_parts();

            // FIXME: We read the response into a contiguous buffer here (not actually required for
            // deserialization) and then copy the whole thing to convert from Bytes to Vec<u8>.
            let full_body = hyper::body::to_bytes(body).await?;
            let full_response = HttpResponse::from_parts(head, full_body.as_ref().to_owned());

            Ok(Request::Response::try_from(full_response)?)
        }
    }
}

impl<C> Clone for Client<C> {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}