use std::num::NonZeroU32;
use std::ops::ControlFlow;
use std::sync::Arc;
use std::time::Duration;
use grammers_mtsender::{InvocationError, SenderPoolFatHandle};
use grammers_tl_types::{self as tl, Deserializable};
use log::info;
use tokio::{sync::Mutex, time::sleep};
use super::{Client, ClientConfiguration, ClientInner, RetryContext};
impl Client {
pub fn new(sender_pool: SenderPoolFatHandle) -> Self {
Self::with_configuration(sender_pool, Default::default())
}
pub fn with_configuration(
sender_pool: SenderPoolFatHandle,
configuration: ClientConfiguration,
) -> Self {
Self(Arc::new(ClientInner {
session: sender_pool.session,
api_id: sender_pool.api_id,
handle: sender_pool.thin,
configuration,
auth_copied_to_dcs: Mutex::new(Vec::new()),
}))
}
pub async fn invoke<R: tl::RemoteCall>(
&self,
request: &R,
) -> Result<R::Return, InvocationError> {
let dc_id = self.0.session.home_dc_id();
self.do_invoke_in_dc(dc_id, request.to_bytes())
.await
.and_then(|body| R::Return::from_bytes(&body).map_err(|e| e.into()))
}
pub async fn invoke_in_dc<R: tl::RemoteCall>(
&self,
dc_id: i32,
request: &R,
) -> Result<R::Return, InvocationError> {
self.do_invoke_in_dc(dc_id, request.to_bytes())
.await
.and_then(|body| R::Return::from_bytes(&body).map_err(|e| e.into()))
}
async fn do_invoke_in_dc(
&self,
dc_id: i32,
request_body: Vec<u8>,
) -> Result<Vec<u8>, InvocationError> {
let mut retry_context = RetryContext {
fail_count: NonZeroU32::new(1).unwrap(),
slept_so_far: Duration::default(),
error: InvocationError::Dropped,
};
loop {
match self
.0
.handle
.invoke_in_dc(dc_id, request_body.clone())
.await
{
Ok(response) => break Ok(response),
Err(e) => {
retry_context.error = e;
match self
.0
.configuration
.retry_policy
.should_retry(&retry_context)
{
ControlFlow::Continue(delay) => {
info!(
"sleeping on {} for {:?} before retrying",
retry_context.error, delay,
);
sleep(delay).await;
retry_context.fail_count = retry_context.fail_count.saturating_add(1);
retry_context.slept_so_far += delay;
continue;
}
ControlFlow::Break(()) => break Err(retry_context.error),
}
}
}
}
}
pub(crate) async fn copy_auth_to_dc(&self, target_dc_id: i32) -> Result<(), InvocationError> {
let mut auth_copied_to_dcs = self.0.auth_copied_to_dcs.lock().await;
if auth_copied_to_dcs.contains(&target_dc_id) {
return Ok(());
}
let home_dc_id = self.0.session.home_dc_id();
if target_dc_id == home_dc_id {
return Ok(());
}
let tl::enums::auth::ExportedAuthorization::Authorization(exported_auth) = self
.invoke(&tl::functions::auth::ExportAuthorization {
dc_id: target_dc_id,
})
.await?;
self.invoke_in_dc(
target_dc_id,
&tl::functions::auth::ImportAuthorization {
id: exported_auth.id,
bytes: exported_auth.bytes,
},
)
.await?;
auth_copied_to_dcs.push(target_dc_id);
Ok(())
}
}