pub mod api;
pub mod error;
pub mod exports {
pub use reqwest;
}
use crate::api::{
auth::{BeginAuthRequest, NextStepResponse, StepBackResponse},
Endpoint,
};
use api::{auth::*, chat::EventSource, Hmc, HmcFromStrError};
use error::*;
use tracing::Span;
use std::{
convert::TryFrom,
fmt::{self, Debug, Formatter},
future::{self, Future},
sync::Arc,
};
use hrpc::{
client::layer::backoff::Backoff,
common::layer::trace::Trace,
encode::encode_protobuf_message,
exports::{
bytes::{Bytes, BytesMut},
futures_util::{future::Either, TryFutureExt},
tower::Service,
},
proto::Error as HrpcError,
request::BoxRequest,
response::BoxResponse,
Response,
};
use http::Uri;
use reqwest::Client as HttpClient;
use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard};
type SpanFnPtr = fn(&BoxRequest) -> Span;
type OnRequestFnPtr = fn(&BoxRequest, &Span);
type OnSuccessFnPtr = fn(&BoxResponse, &Span);
type OnErrorFnPtr = fn(&BoxResponse, &Span, &HrpcError);
type TraceClient<Transport> =
Trace<AddAuth<Transport>, SpanFnPtr, OnRequestFnPtr, OnSuccessFnPtr, OnErrorFnPtr>;
#[cfg(not(feature = "client_backoff"))]
type BaseClient<Transport> = TraceClient<Transport>;
#[cfg(feature = "client_backoff")]
type BaseClient<Transport> = Backoff<TraceClient<Transport>>;
type SharedAuthStatus = Arc<RwLock<(AuthStatus, Bytes)>>;
fn add_base_layers<Err, Transport>(
transport: Transport,
auth_status: SharedAuthStatus,
) -> BaseClient<Transport>
where
Transport: Service<
BoxRequest,
Response = BoxResponse,
Error = hrpc::client::transport::TransportError<Err>,
>,
Err: std::error::Error + 'static,
{
let transport = AddAuth {
inner: transport,
auth_status,
};
let transport = TraceClient::new(
transport,
|req| tracing::debug_span!("request", endpoint = %req.endpoint(), headers = ?req.header_map()),
|_, _| tracing::debug!("processing request"),
|_, _| tracing::debug!("request successful"),
|_, _, err| tracing::error!("request failed: {}", err),
);
#[cfg(feature = "client_backoff")]
let transport = Backoff::new(transport)
.clone_extensions_fn(hrpc::client::transport::http::clone_http_extensions);
transport
}
#[cfg(feature = "client_web")]
mod transport {
use super::*;
pub(super) type GenericClientTransport = hrpc::client::transport::http::Wasm;
pub(super) type GenericClient = BaseClient<GenericClientTransport>;
pub(super) fn create_client(
homeserver_url: Uri,
auth_status: SharedAuthStatus,
) -> ClientResult<GenericClient> {
let transport = GenericClientTransport::new(homeserver_url)
.map_err(|err| ClientError::Internal(InternalClientError::Transport(err)))?;
let transport = add_base_layers(transport, auth_status);
Ok(transport)
}
}
#[cfg(all(feature = "client_native", not(feature = "client_web")))]
mod transport {
use super::*;
use hrpc::client::{layer::backoff::Backoff, transport::http};
pub(super) type GenericClientTransport = http::Hyper;
pub(super) type GenericClient = BaseClient<GenericClientTransport>;
pub(super) fn create_client(
homeserver_url: Uri,
auth_status: SharedAuthStatus,
) -> ClientResult<GenericClient> {
let transport = GenericClientTransport::new(homeserver_url)
.map_err(|err| ClientError::Internal(InternalClientError::Transport(err)))?;
let transport = add_base_layers(transport, auth_status);
Ok(transport)
}
}
use transport::*;
type AuthService = crate::api::auth::auth_service_client::AuthServiceClient<GenericClient>;
type ChatService = crate::api::chat::chat_service_client::ChatServiceClient<GenericClient>;
type MediaProxyService =
crate::api::mediaproxy::media_proxy_service_client::MediaProxyServiceClient<GenericClient>;
type ProfileService =
crate::api::profile::profile_service_client::ProfileServiceClient<GenericClient>;
type EmoteService = crate::api::emote::emote_service_client::EmoteServiceClient<GenericClient>;
type BatchService = crate::api::batch::batch_service_client::BatchServiceClient<GenericClient>;
#[derive(Debug, Clone)]
pub enum AuthStatus {
None,
InProgress(String),
Complete(Session),
}
impl AuthStatus {
pub fn session(&self) -> Option<&Session> {
match self {
AuthStatus::None => None,
AuthStatus::InProgress(_) => None,
AuthStatus::Complete(session) => Some(session),
}
}
pub fn is_authenticated(&self) -> bool {
matches!(self, AuthStatus::Complete(_))
}
}
struct ClientData {
homeserver_url: Uri,
auth_status: SharedAuthStatus,
chat: Mutex<ChatService>,
auth: Mutex<AuthService>,
mediaproxy: Mutex<MediaProxyService>,
profile: Mutex<ProfileService>,
emote: Mutex<EmoteService>,
batch: Mutex<BatchService>,
http: HttpClient,
}
impl Debug for ClientData {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("ClientData")
.field("homeserver_url", &self.homeserver_url)
.field("http", &self.http)
.field("auth_status", &self.auth_status)
.finish()
}
}
#[derive(Clone, Debug)]
pub struct Client {
data: Arc<ClientData>,
}
impl Client {
pub async fn new(mut homeserver_url: Uri, session: Option<Session>) -> ClientResult<Self> {
if !matches!(homeserver_url.scheme_str(), Some("http" | "https")) {
homeserver_url = {
let mut parts = homeserver_url.into_parts();
parts.scheme = Some("https".parse().unwrap());
Uri::from_parts(parts).unwrap()
};
}
let http = HttpClient::builder().build()?;
if homeserver_url.port().is_none() {
use serde::Deserialize;
#[derive(Deserialize)]
struct Server {
#[serde(rename(deserialize = "h.server"))]
server: String,
}
let url = {
let mut parts = homeserver_url.clone().into_parts();
parts.path_and_query = Some("/_harmony/server".parse().unwrap());
Uri::from_parts(parts).unwrap()
};
if let Ok(response) = http
.get(&url.to_string())
.send()
.await?
.json::<Server>()
.await
{
let host: Uri = response.server.parse().unwrap();
homeserver_url = host;
}
};
if homeserver_url.port().is_none() {
homeserver_url = {
let mut parts = homeserver_url.into_parts();
parts.authority = Some(
format!("{}:2289", parts.authority.unwrap().as_str())
.parse()
.unwrap(),
);
Uri::from_parts(parts).unwrap()
}
}
#[cfg(debug_assertions)]
tracing::debug!(
"Using homeserver URL {} with session {:?} to create a `Client`",
homeserver_url,
session
);
let session = session.map_or(AuthStatus::None, AuthStatus::Complete);
let token_bytes = session.session().map_or_else(Bytes::new, |s| {
Bytes::copy_from_slice(s.session_token.as_bytes())
});
let auth_status = Arc::new(RwLock::new((session, token_bytes)));
let transport = create_client(homeserver_url.clone(), auth_status.clone())?;
let inner = hrpc::client::Client::new(transport);
let auth = AuthService::new_inner(inner.clone());
let chat = ChatService::new_inner(inner.clone());
let mediaproxy = MediaProxyService::new_inner(inner.clone());
let profile = ProfileService::new_inner(inner.clone());
let emote = EmoteService::new_inner(inner.clone());
let batch = BatchService::new_inner(inner);
let data = ClientData {
homeserver_url,
auth_status,
chat: Mutex::new(chat),
auth: Mutex::new(auth),
mediaproxy: Mutex::new(mediaproxy),
profile: Mutex::new(profile),
emote: Mutex::new(emote),
batch: Mutex::new(batch),
http,
};
Ok(Self {
data: Arc::new(data),
})
}
pub async fn event_loop<'a, Fut, Hndlr>(
&'a self,
subs: Vec<EventSource>,
mut handler: Hndlr,
) -> Result<(), ClientError>
where
Fut: Future<Output = ClientResult<bool>>,
Hndlr: FnMut(&'a Client, crate::api::chat::Event) -> Fut + Send + 'a,
{
let mut sock = self.subscribe_events(subs).await?;
loop {
match sock.get_event().await {
Ok(Some(ev)) => {
let fut = handler(self, ev);
if fut.await? {
return Ok(());
}
}
Err(err) => tracing::error!("{}", err),
_ => std::hint::spin_loop(),
}
}
}
#[inline(always)]
pub fn chat(&self) -> MutexGuard<'_, ChatService> {
self.data.chat.lock().expect("poisoned")
}
#[inline(always)]
pub fn auth(&self) -> MutexGuard<'_, AuthService> {
self.data.auth.lock().expect("poisoned")
}
#[inline(always)]
pub fn mediaproxy(&self) -> MutexGuard<'_, MediaProxyService> {
self.data.mediaproxy.lock().expect("poisoned")
}
#[inline(always)]
pub fn profile(&self) -> MutexGuard<'_, ProfileService> {
self.data.profile.lock().expect("poisoned")
}
#[inline(always)]
pub fn emote(&self) -> MutexGuard<'_, EmoteService> {
self.data.emote.lock().expect("poisoned")
}
#[inline(always)]
pub fn batch(&self) -> MutexGuard<'_, BatchService> {
self.data.batch.lock().expect("poisoned")
}
pub fn call<Req>(
&self,
request: Req,
) -> impl Future<Output = ClientResult<Req::Response>> + Send + 'static
where
Req: Endpoint,
Req::Response: prost::Message + Default + 'static,
{
let fut = request.call_with(self);
async move { Ok(fut.await?.into_message().await?) }
}
pub fn call_response<Req>(
&self,
request: Req,
) -> impl Future<Output = ClientResult<Response<Req::Response>>> + Send + 'static
where
Req: Endpoint,
Req::Response: 'static,
{
request.call_with(self)
}
pub fn batch_call<Req>(
&self,
requests: Vec<Req>,
) -> impl Future<Output = ClientResult<Vec<<Req as Endpoint>::Response>>> + Send + 'static
where
Req: Endpoint + prost::Message,
<Req as Endpoint>::Response: prost::Message + Default + 'static,
{
use prost::Message;
let encoded = requests
.iter()
.map(encode_protobuf_message)
.map(BytesMut::freeze);
let batch_req = crate::api::batch::BatchSameRequest {
endpoint: Req::ENDPOINT_PATH.to_string(),
requests: encoded.collect(),
};
let fut = self.batch().batch_same(batch_req);
async move {
let responses = fut.await?.into_message().await?.responses;
let mut decoded = Vec::with_capacity(responses.len());
for response in responses {
let decoded_msg = <Req as Endpoint>::Response::decode(response.as_ref())?;
decoded.push(decoded_msg);
}
Ok(decoded)
}
}
#[inline(always)]
fn auth_status_lock(&self) -> RwLockReadGuard<(AuthStatus, Bytes)> {
self.data.auth_status.read().unwrap()
}
pub fn auth_status(&self) -> AuthStatus {
self.auth_status_lock().0.clone()
}
pub fn homeserver_url(&self) -> &Uri {
&self.data.homeserver_url
}
pub fn make_hmc(&self, id: impl fmt::Display) -> Result<Hmc, HmcFromStrError> {
let url = &self.data.homeserver_url;
Hmc::new(
format!("{}:{}", url.host().unwrap(), url.port().unwrap()),
id,
)
}
pub fn begin_auth(&self) -> impl Future<Output = ClientResult<()>> + Send + 'static {
let fut = self.auth().begin_auth(BeginAuthRequest {});
let auth_status_lock = self.data.auth_status.clone();
async move {
let resp = fut.await?.into_message().await?;
auth_status_lock.write().expect("poisoned").0 = AuthStatus::InProgress(resp.auth_id);
Ok(())
}
}
pub fn next_auth_step(
&self,
response: AuthStepResponse,
) -> impl Future<Output = ClientResult<Option<NextStepResponse>>> + Send + 'static {
if let AuthStatus::InProgress(auth_id) = self.auth_status() {
let auth_status_lock = self.data.auth_status.clone();
let fut = self
.auth()
.next_step(NextStepRequest::new(auth_id, response.into()));
Either::Left(async move {
let step = fut.await?.into_message().await?;
let step = if let Some(AuthStep {
step: Some(auth_step::Step::Session(session)),
..
}) = step.step
{
let token_bytes = Bytes::copy_from_slice(session.session_token.as_bytes());
*auth_status_lock.write().expect("poisoned") =
(AuthStatus::Complete(session), token_bytes);
None
} else {
Some(step)
};
Ok(step)
})
} else {
Either::Right(future::ready(Err(ClientError::NoAuthId)))
}
}
pub fn prev_auth_step(
&self,
) -> impl Future<Output = ClientResult<StepBackResponse>> + Send + 'static {
if let AuthStatus::InProgress(auth_id) = self.auth_status() {
let fut = self.auth().step_back(StepBackRequest::new(auth_id));
Either::Left(async move { Ok(fut.await?.into_message().await?) })
} else {
Either::Right(future::ready(Err(ClientError::NoAuthId)))
}
}
pub fn auth_stream(&self) -> impl Future<Output = ClientResult<AuthSocket>> + Send + 'static {
if let AuthStatus::InProgress(auth_id) = self.auth_status() {
let fut = self.auth().stream_steps(AuthId::new(auth_id));
Either::Left(
fut.map_ok(|inner| AuthSocket { inner })
.map_err(ClientError::from),
)
} else {
Either::Right(future::ready(Err(ClientError::NoAuthId)))
}
}
pub fn subscribe_events(
&self,
subscriptions: Vec<EventSource>,
) -> impl Future<Output = ClientResult<EventsSocket>> + Send + 'static {
let fut = self.chat().stream_events(());
async move {
let (tx, rx) = fut.await?.split();
let mut socket = EventsSocket {
write: EventsWriteSocket { inner: tx },
read: EventsReadSocket { inner: rx },
};
for source in subscriptions {
socket.add_source(source).await?;
}
Ok(socket)
}
}
}
#[derive(Debug, Clone)]
pub struct AddAuth<S> {
inner: S,
auth_status: SharedAuthStatus,
}
impl<S> Service<BoxRequest> for AddAuth<S>
where
S: Service<BoxRequest>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
Service::poll_ready(&mut self.inner, cx)
}
fn call(&mut self, mut req: BoxRequest) -> Self::Future {
let guard = self.auth_status.read().expect("poisoned");
if guard.0.is_authenticated() {
req.get_or_insert_header_map().insert(
http::header::AUTHORIZATION,
unsafe { http::HeaderValue::from_maybe_shared_unchecked(guard.1.clone()) },
);
}
Service::call(&mut self.inner, req)
}
}
pub struct EventsSocket {
read: EventsReadSocket,
write: EventsWriteSocket,
}
impl EventsSocket {
#[inline]
pub async fn get_event(&mut self) -> ClientResult<Option<crate::api::chat::Event>> {
self.read.get_event().await
}
#[inline]
pub async fn add_source(&mut self, source: EventSource) -> ClientResult<()> {
self.write.add_source(source).await
}
#[inline]
pub async fn close(self) -> ClientResult<()> {
self.write.close().await
}
pub fn split(self) -> (EventsWriteSocket, EventsReadSocket) {
(self.write, self.read)
}
}
impl Debug for EventsSocket {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str("EventsSocket")
}
}
pub struct EventsWriteSocket {
inner: hrpc::client::socket::WriteSocket<crate::api::chat::StreamEventsRequest>,
}
impl EventsWriteSocket {
pub async fn add_source(&mut self, source: EventSource) -> ClientResult<()> {
self.inner
.send_message(source.into())
.await
.map_err(Into::into)
}
pub async fn close(self) -> ClientResult<()> {
self.inner.close().await.map_err(Into::into)
}
}
impl Debug for EventsWriteSocket {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str("EventsWriteSocket")
}
}
pub struct EventsReadSocket {
inner: hrpc::client::socket::ReadSocket<crate::api::chat::StreamEventsResponse>,
}
impl EventsReadSocket {
pub async fn get_event(&mut self) -> ClientResult<Option<crate::api::chat::Event>> {
let resp = self.inner.receive_message().await?;
Ok(resp
.event
.and_then(|a| crate::api::chat::Event::try_from(a).ok()))
}
}
impl Debug for EventsReadSocket {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str("EventsReadSocket")
}
}
pub struct AuthSocket {
inner: hrpc::client::socket::Socket<
crate::api::auth::StreamStepsRequest,
crate::api::auth::StreamStepsResponse,
>,
}
impl AuthSocket {
pub async fn get_step(&mut self) -> ClientResult<Option<crate::api::auth::AuthStep>> {
self.inner
.receive_message()
.await
.map(|s| s.step)
.map_err(Into::into)
}
pub async fn close(self) -> ClientResult<()> {
self.inner.close().await.map_err(Into::into)
}
}
impl Debug for AuthSocket {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str("AuthSocket")
}
}