use std::collections::VecDeque;
use std::time::{Duration, Instant};
use grammers_mtsender::InvocationError;
use grammers_session::Session;
use grammers_session::types::{PeerId, PeerInfo, UpdateState, UpdatesState};
use grammers_session::updates::{MessageBoxes, PrematureEndReason, State, UpdatesLike};
use grammers_tl_types as tl;
use log::{trace, warn};
use tokio::sync::mpsc;
use tokio::time::timeout_at;
use super::{Client, UpdatesConfiguration};
use crate::peer::PeerMap;
use crate::update::Update;
const UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN: Duration = Duration::from_secs(300);
const BOT_CHANNEL_DIFF_LIMIT: i32 = 100000;
const USER_CHANNEL_DIFF_LIMIT: i32 = 100;
async fn prepare_channel_difference(
mut request: tl::functions::updates::GetChannelDifference,
session: &dyn Session,
message_box: &mut MessageBoxes,
) -> Option<tl::functions::updates::GetChannelDifference> {
let id = match &request.channel {
tl::enums::InputChannel::Channel(channel) => PeerId::channel_unchecked(channel.channel_id),
_ => unreachable!(),
};
if let Some(PeerInfo::Channel {
id,
auth: Some(auth),
..
}) = session.peer(id).await
{
request.channel = tl::enums::InputChannel::Channel(tl::types::InputChannel {
channel_id: id,
access_hash: auth.hash(),
});
request.limit = if session
.peer(PeerId::self_user())
.await
.map(|user| match user {
PeerInfo::User { bot, .. } => bot.unwrap_or(false),
_ => false,
})
.unwrap_or(false)
{
BOT_CHANNEL_DIFF_LIMIT
} else {
USER_CHANNEL_DIFF_LIMIT
};
trace!("requesting {:?}", request);
Some(request)
} else {
warn!(
"cannot getChannelDifference for {:?} as we're missing its hash",
id
);
message_box.end_channel_difference(PrematureEndReason::Banned);
None
}
}
pub struct UpdateStream {
client: Client,
message_box: MessageBoxes,
last_update_limit_warn: Option<Instant>,
buffer: VecDeque<(tl::enums::Update, State, PeerMap)>,
updates: mpsc::UnboundedReceiver<UpdatesLike>,
configuration: UpdatesConfiguration,
should_get_state: bool,
}
impl UpdateStream {
pub async fn next(&mut self) -> Result<Update, InvocationError> {
let (update, state, peers) = self.next_raw().await?;
Ok(Update::from_raw(&self.client, update, state, peers))
}
pub async fn next_raw(
&mut self,
) -> Result<(tl::enums::Update, State, PeerMap), InvocationError> {
if self.should_get_state {
self.should_get_state = false;
match self
.client
.invoke(&tl::functions::updates::GetState {})
.await
{
Ok(tl::enums::updates::State::State(state)) => {
self.client
.0
.session
.set_update_state(UpdateState::All(UpdatesState {
pts: state.pts,
qts: state.qts,
date: state.date,
seq: state.seq,
channels: Vec::new(),
}))
.await;
}
Err(_err) => {
}
}
}
loop {
let (deadline, get_diff, get_channel_diff) = {
if let Some(update) = self.buffer.pop_front() {
return Ok(update);
}
(
self.message_box.check_deadlines(), self.message_box.get_difference(),
match self.message_box.get_channel_difference() {
Some(gd) => {
prepare_channel_difference(
gd,
self.client.0.session.as_ref(),
&mut self.message_box,
)
.await
}
None => None,
},
)
};
if let Some(request) = get_diff {
let response = self.client.invoke(&request).await?;
let (updates, users, chats) = self.message_box.apply_difference(response);
let peers = self.client.build_peer_map(users, chats).await;
self.extend_update_queue(updates, peers);
continue;
}
if let Some(request) = get_channel_diff {
let maybe_response = self.client.invoke(&request).await;
let response = match maybe_response {
Ok(r) => r,
Err(e) if e.is("PERSISTENT_TIMESTAMP_OUTDATED") => {
log::warn!(
"Getting difference for channel updates caused PersistentTimestampOutdated; ending getting difference prematurely until server issues are resolved"
);
{
self.message_box
.end_channel_difference(PrematureEndReason::TemporaryServerIssues);
}
continue;
}
Err(e) if e.is("CHANNEL_PRIVATE") => {
log::info!(
"Account is now banned so we can no longer fetch updates with request: {:?}",
request
);
{
self.message_box
.end_channel_difference(PrematureEndReason::Banned);
}
continue;
}
Err(InvocationError::Rpc(rpc_error)) if rpc_error.code == 500 => {
log::warn!("Telegram is having internal issues: {:#?}", rpc_error);
{
self.message_box
.end_channel_difference(PrematureEndReason::TemporaryServerIssues);
}
continue;
}
Err(e) => return Err(e),
};
let (updates, users, chats) = self.message_box.apply_channel_difference(response);
let peers = self.client.build_peer_map(users, chats).await;
self.extend_update_queue(updates, peers);
continue;
}
match timeout_at(deadline.into(), self.updates.recv()).await {
Ok(Some(updates)) => self.process_socket_updates(updates).await,
Ok(None) => break Err(InvocationError::Dropped),
Err(_) => {}
}
}
}
pub(crate) async fn process_socket_updates(&mut self, updates: UpdatesLike) {
let mut result = Option::<(Vec<_>, Vec<_>, Vec<_>)>::None;
match self.message_box.process_updates(updates) {
Ok(tup) => {
if let Some(res) = result.as_mut() {
res.0.extend(tup.0);
res.1.extend(tup.1);
res.2.extend(tup.2);
} else {
result = Some(tup);
}
}
Err(_) => return,
}
if let Some((updates, users, chats)) = result {
let peers = self.client.build_peer_map(users, chats).await;
self.extend_update_queue(updates, peers);
}
}
fn extend_update_queue(
&mut self,
mut updates: Vec<(tl::enums::Update, State)>,
peer_map: PeerMap,
) {
if let Some(limit) = self.configuration.update_queue_limit {
if let Some(exceeds) = (self.buffer.len() + updates.len()).checked_sub(limit + 1) {
let exceeds = exceeds + 1;
let now = Instant::now();
let notify = match self.last_update_limit_warn {
None => true,
Some(instant) => now - instant > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN,
};
updates.truncate(updates.len() - exceeds);
if notify {
log::warn!(
"{} updates were dropped because the update_queue_limit was exceeded",
exceeds
);
}
self.last_update_limit_warn = Some(now);
}
}
self.buffer
.extend(updates.into_iter().map(|(u, s)| (u, s, peer_map.handle())));
}
pub async fn sync_update_state(&self) {
self.client
.0
.session
.set_update_state(UpdateState::All(self.message_box.session_state()))
.await;
}
}
impl Client {
pub async fn stream_updates(
&self,
updates: mpsc::UnboundedReceiver<UpdatesLike>,
configuration: UpdatesConfiguration,
) -> UpdateStream {
let message_box = if configuration.catch_up {
MessageBoxes::load(self.0.session.updates_state().await)
} else {
MessageBoxes::new()
};
let should_get_state =
message_box.is_empty() && self.0.session.peer(PeerId::self_user()).await.is_some();
UpdateStream {
client: self.clone(),
message_box,
last_update_limit_warn: None,
buffer: VecDeque::new(),
updates,
configuration,
should_get_state,
}
}
}
#[cfg(test)]
mod tests {
use core::future::Future;
use super::*;
fn get_update_stream() -> UpdateStream {
panic!()
}
#[test]
fn ensure_next_update_future_impls_send() {
if false {
fn typeck(_: impl Future + Send) {}
typeck(get_update_stream().next());
}
}
}