1use std::collections::VecDeque;
12use std::time::{Duration, Instant};
13
14use grammers_mtsender::InvocationError;
15use grammers_session::ErasedSession;
16use grammers_session::types::{PeerId, PeerInfo, UpdateState, UpdatesState};
17use grammers_session::updates::{MessageBoxes, PrematureEndReason, State, UpdatesLike};
18use grammers_tl_types as tl;
19use log::{trace, warn};
20use tokio::sync::mpsc;
21use tokio::time::timeout_at;
22
23use super::{Client, UpdatesConfiguration};
24use crate::peer::PeerMap;
25use crate::update::Update;
26
27const UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN: Duration = Duration::from_secs(300);
29
30const BOT_CHANNEL_DIFF_LIMIT: i32 = 100000;
32const USER_CHANNEL_DIFF_LIMIT: i32 = 100;
33
34async fn prepare_channel_difference(
35 mut request: tl::functions::updates::GetChannelDifference,
36 session: &ErasedSession,
37 message_box: &mut MessageBoxes,
38) -> Result<
39 Option<tl::functions::updates::GetChannelDifference>,
40 Box<dyn std::error::Error + Send + Sync>,
41> {
42 let id = match &request.channel {
43 tl::enums::InputChannel::Channel(channel) => PeerId::channel_unchecked(channel.channel_id),
44 _ => unreachable!(),
45 };
46
47 if let Some(PeerInfo::Channel {
48 id,
49 auth: Some(auth),
50 ..
51 }) = session.peer(id).await?
52 {
53 request.channel = tl::enums::InputChannel::Channel(tl::types::InputChannel {
54 channel_id: id,
55 access_hash: auth.hash(),
56 });
57 request.limit = if session
58 .peer(PeerId::self_user())
59 .await?
60 .map(|user| match user {
61 PeerInfo::User { bot, .. } => bot.unwrap_or(false),
62 _ => false,
63 })
64 .unwrap_or(false)
65 {
66 BOT_CHANNEL_DIFF_LIMIT
67 } else {
68 USER_CHANNEL_DIFF_LIMIT
69 };
70 trace!("requesting {:?}", request);
71 Ok(Some(request))
72 } else {
73 warn!(
74 "cannot getChannelDifference for {:?} as we're missing its hash",
75 id
76 );
77 message_box.end_channel_difference(PrematureEndReason::Banned);
78 Ok(None)
79 }
80}
81
82pub struct UpdateStream {
84 client: Client,
85 message_box: MessageBoxes,
86 last_update_limit_warn: Option<Instant>,
89 buffer: VecDeque<(tl::enums::Update, State, PeerMap)>,
90 updates: mpsc::UnboundedReceiver<UpdatesLike>,
91 configuration: UpdatesConfiguration,
92 should_get_state: bool,
93}
94
95impl UpdateStream {
96 pub async fn next(&mut self) -> Result<Update, InvocationError> {
98 let (update, state, peers) = self.next_raw().await?;
99 Ok(Update::from_raw(&self.client, update, state, peers))
100 }
101
102 pub async fn next_raw(
106 &mut self,
107 ) -> Result<(tl::enums::Update, State, PeerMap), InvocationError> {
108 if self.should_get_state {
109 self.should_get_state = false;
110 match self
111 .client
112 .invoke(&tl::functions::updates::GetState {})
113 .await
114 {
115 Ok(tl::enums::updates::State::State(state)) => {
116 self.client
117 .0
118 .session
119 .set_update_state(UpdateState::All(UpdatesState {
120 pts: state.pts,
121 qts: state.qts,
122 date: state.date,
123 seq: state.seq,
124 channels: Vec::new(),
125 }))
126 .await?;
127 self.message_box.set_state(state);
128 }
129 Err(_err) => {
130 }
133 }
134 }
135
136 loop {
137 let (deadline, get_diff, get_channel_diff) = {
138 if let Some(update) = self.buffer.pop_front() {
139 return Ok(update);
140 }
141 (
142 self.message_box.check_deadlines(), self.message_box.get_difference(),
144 match self.message_box.get_channel_difference() {
145 Some(gd) => {
146 prepare_channel_difference(
147 gd,
148 self.client.0.session.as_ref(),
149 &mut self.message_box,
150 )
151 .await?
152 }
153 None => None,
154 },
155 )
156 };
157
158 if let Some(request) = get_diff {
159 let response = self.client.invoke(&request).await?;
160 let (updates, users, chats) = self.message_box.apply_difference(response);
161 let peers = self.client.build_peer_map(users, chats).await;
162 self.extend_update_queue(updates, peers);
163 continue;
164 }
165
166 if let Some(request) = get_channel_diff {
167 let maybe_response = self.client.invoke(&request).await;
168
169 let response = match maybe_response {
170 Ok(r) => r,
171 Err(e) if e.is("PERSISTENT_TIMESTAMP_OUTDATED") => {
172 log::warn!(
184 "Getting difference for channel updates caused PersistentTimestampOutdated; ending getting difference prematurely until server issues are resolved"
185 );
186 {
187 self.message_box
188 .end_channel_difference(PrematureEndReason::TemporaryServerIssues);
189 }
190 continue;
191 }
192 Err(e) if e.is("CHANNEL_PRIVATE") => {
193 log::info!(
194 "Account is now banned so we can no longer fetch updates with request: {:?}",
195 request
196 );
197 {
198 self.message_box
199 .end_channel_difference(PrematureEndReason::Banned);
200 }
201 continue;
202 }
203 Err(InvocationError::Rpc(rpc_error)) if rpc_error.code == 500 => {
204 log::warn!("Telegram is having internal issues: {:#?}", rpc_error);
205 {
206 self.message_box
207 .end_channel_difference(PrematureEndReason::TemporaryServerIssues);
208 }
209 continue;
210 }
211 Err(e) => return Err(e),
212 };
213
214 let (updates, users, chats) = self.message_box.apply_channel_difference(response);
215
216 let peers = self.client.build_peer_map(users, chats).await;
217 self.extend_update_queue(updates, peers);
218 continue;
219 }
220
221 match timeout_at(deadline.into(), self.updates.recv()).await {
222 Ok(Some(updates)) => self.process_socket_updates(updates).await?,
223 Ok(None) => break Err(InvocationError::Dropped),
224 Err(_) => {}
225 }
226 }
227 }
228
229 pub(crate) async fn process_socket_updates(
230 &mut self,
231 updates: UpdatesLike,
232 ) -> Result<(), InvocationError> {
233 let mut result = Option::<(Vec<_>, Vec<_>, Vec<_>)>::None;
234 match self.message_box.process_updates(updates) {
235 Ok(tup) => {
236 if let Some(res) = result.as_mut() {
237 res.0.extend(tup.0);
238 res.1.extend(tup.1);
239 res.2.extend(tup.2);
240 } else {
241 result = Some(tup);
242 }
243 }
244 Err(_) => return Ok(()),
245 }
246
247 if let Some((updates, users, chats)) = result {
248 let peers = self.client.build_peer_map(users, chats).await;
249 self.extend_update_queue(updates, peers);
250 }
251
252 Ok(())
253 }
254
255 fn extend_update_queue(
256 &mut self,
257 mut updates: Vec<(tl::enums::Update, State)>,
258 peer_map: PeerMap,
259 ) {
260 if let Some(limit) = self.configuration.update_queue_limit {
261 if let Some(exceeds) = (self.buffer.len() + updates.len()).checked_sub(limit + 1) {
262 let exceeds = exceeds + 1;
263 let now = Instant::now();
264 let notify = match self.last_update_limit_warn {
265 None => true,
266 Some(instant) => now - instant > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN,
267 };
268
269 updates.truncate(updates.len() - exceeds);
270 if notify {
271 log::warn!(
272 "{} updates were dropped because the update_queue_limit was exceeded",
273 exceeds
274 );
275 }
276
277 self.last_update_limit_warn = Some(now);
278 }
279 }
280
281 self.buffer
282 .extend(updates.into_iter().map(|(u, s)| (u, s, peer_map.handle())));
283 }
284
285 pub async fn sync_update_state(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
289 self.client
290 .0
291 .session
292 .set_update_state(UpdateState::All(self.message_box.session_state()))
293 .await?;
294 Ok(())
295 }
296}
297
298impl Client {
299 pub async fn stream_updates(
309 &self,
310 updates: mpsc::UnboundedReceiver<UpdatesLike>,
311 configuration: UpdatesConfiguration,
312 ) -> Result<UpdateStream, Box<dyn std::error::Error + Send + Sync>> {
313 let message_box = if configuration.catch_up {
314 MessageBoxes::load(self.0.session.updates_state().await?)
315 } else {
316 MessageBoxes::new()
319 };
320 let should_get_state =
322 message_box.is_empty() && self.0.session.peer(PeerId::self_user()).await?.is_some();
323
324 Ok(UpdateStream {
325 client: self.clone(),
326 message_box,
327 last_update_limit_warn: None,
328 buffer: VecDeque::new(),
329 updates,
330 configuration,
331 should_get_state,
332 })
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use core::future::Future;
339
340 use super::*;
341
342 fn get_update_stream() -> UpdateStream {
343 panic!()
344 }
345
346 #[test]
347 fn ensure_next_update_future_impls_send() {
348 if false {
349 fn typeck(_: impl Future + Send) {}
351 typeck(get_update_stream().next());
352 }
353 }
354}