1use std::collections::VecDeque;
12use std::time::{Duration, Instant};
13
14use grammers_mtsender::InvocationError;
15use grammers_session::Session;
16use grammers_session::types::{PeerId, PeerInfo, UpdateState, UpdatesState};
17use grammers_session::updates::{MessageBoxes, PrematureEndReason, State, UpdatesLike};
18use grammers_tl_types as tl;
19use log::{trace, warn};
20use tokio::sync::mpsc;
21use tokio::time::timeout_at;
22
23use super::{Client, UpdatesConfiguration};
24use crate::peer::PeerMap;
25use crate::update::Update;
26
27const UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN: Duration = Duration::from_secs(300);
29
30const BOT_CHANNEL_DIFF_LIMIT: i32 = 100000;
32const USER_CHANNEL_DIFF_LIMIT: i32 = 100;
33
34async fn prepare_channel_difference(
35 mut request: tl::functions::updates::GetChannelDifference,
36 session: &dyn Session,
37 message_box: &mut MessageBoxes,
38) -> Option<tl::functions::updates::GetChannelDifference> {
39 let id = match &request.channel {
40 tl::enums::InputChannel::Channel(channel) => PeerId::channel_unchecked(channel.channel_id),
41 _ => unreachable!(),
42 };
43
44 if let Some(PeerInfo::Channel {
45 id,
46 auth: Some(auth),
47 ..
48 }) = session.peer(id).await
49 {
50 request.channel = tl::enums::InputChannel::Channel(tl::types::InputChannel {
51 channel_id: id,
52 access_hash: auth.hash(),
53 });
54 request.limit = if session
55 .peer(PeerId::self_user())
56 .await
57 .map(|user| match user {
58 PeerInfo::User { bot, .. } => bot.unwrap_or(false),
59 _ => false,
60 })
61 .unwrap_or(false)
62 {
63 BOT_CHANNEL_DIFF_LIMIT
64 } else {
65 USER_CHANNEL_DIFF_LIMIT
66 };
67 trace!("requesting {:?}", request);
68 Some(request)
69 } else {
70 warn!(
71 "cannot getChannelDifference for {:?} as we're missing its hash",
72 id
73 );
74 message_box.end_channel_difference(PrematureEndReason::Banned);
75 None
76 }
77}
78
79pub struct UpdateStream {
81 client: Client,
82 message_box: MessageBoxes,
83 last_update_limit_warn: Option<Instant>,
86 buffer: VecDeque<(tl::enums::Update, State, PeerMap)>,
87 updates: mpsc::UnboundedReceiver<UpdatesLike>,
88 configuration: UpdatesConfiguration,
89 should_get_state: bool,
90}
91
92impl UpdateStream {
93 pub async fn next(&mut self) -> Result<Update, InvocationError> {
95 let (update, state, peers) = self.next_raw().await?;
96 Ok(Update::from_raw(&self.client, update, state, peers))
97 }
98
99 pub async fn next_raw(
103 &mut self,
104 ) -> Result<(tl::enums::Update, State, PeerMap), InvocationError> {
105 if self.should_get_state {
106 self.should_get_state = false;
107 match self
108 .client
109 .invoke(&tl::functions::updates::GetState {})
110 .await
111 {
112 Ok(tl::enums::updates::State::State(state)) => {
113 self.client
114 .0
115 .session
116 .set_update_state(UpdateState::All(UpdatesState {
117 pts: state.pts,
118 qts: state.qts,
119 date: state.date,
120 seq: state.seq,
121 channels: Vec::new(),
122 }))
123 .await;
124 }
125 Err(_err) => {
126 }
129 }
130 }
131
132 loop {
133 let (deadline, get_diff, get_channel_diff) = {
134 if let Some(update) = self.buffer.pop_front() {
135 return Ok(update);
136 }
137 (
138 self.message_box.check_deadlines(), self.message_box.get_difference(),
140 match self.message_box.get_channel_difference() {
141 Some(gd) => {
142 prepare_channel_difference(
143 gd,
144 self.client.0.session.as_ref(),
145 &mut self.message_box,
146 )
147 .await
148 }
149 None => None,
150 },
151 )
152 };
153
154 if let Some(request) = get_diff {
155 let response = self.client.invoke(&request).await?;
156 let (updates, users, chats) = self.message_box.apply_difference(response);
157 let peers = self.client.build_peer_map(users, chats).await;
158 self.extend_update_queue(updates, peers);
159 continue;
160 }
161
162 if let Some(request) = get_channel_diff {
163 let maybe_response = self.client.invoke(&request).await;
164
165 let response = match maybe_response {
166 Ok(r) => r,
167 Err(e) if e.is("PERSISTENT_TIMESTAMP_OUTDATED") => {
168 log::warn!(
180 "Getting difference for channel updates caused PersistentTimestampOutdated; ending getting difference prematurely until server issues are resolved"
181 );
182 {
183 self.message_box
184 .end_channel_difference(PrematureEndReason::TemporaryServerIssues);
185 }
186 continue;
187 }
188 Err(e) if e.is("CHANNEL_PRIVATE") => {
189 log::info!(
190 "Account is now banned so we can no longer fetch updates with request: {:?}",
191 request
192 );
193 {
194 self.message_box
195 .end_channel_difference(PrematureEndReason::Banned);
196 }
197 continue;
198 }
199 Err(InvocationError::Rpc(rpc_error)) if rpc_error.code == 500 => {
200 log::warn!("Telegram is having internal issues: {:#?}", rpc_error);
201 {
202 self.message_box
203 .end_channel_difference(PrematureEndReason::TemporaryServerIssues);
204 }
205 continue;
206 }
207 Err(e) => return Err(e),
208 };
209
210 let (updates, users, chats) = self.message_box.apply_channel_difference(response);
211
212 let peers = self.client.build_peer_map(users, chats).await;
213 self.extend_update_queue(updates, peers);
214 continue;
215 }
216
217 match timeout_at(deadline.into(), self.updates.recv()).await {
218 Ok(Some(updates)) => self.process_socket_updates(updates).await,
219 Ok(None) => break Err(InvocationError::Dropped),
220 Err(_) => {}
221 }
222 }
223 }
224
225 pub(crate) async fn process_socket_updates(&mut self, updates: UpdatesLike) {
226 let mut result = Option::<(Vec<_>, Vec<_>, Vec<_>)>::None;
227 match self.message_box.process_updates(updates) {
228 Ok(tup) => {
229 if let Some(res) = result.as_mut() {
230 res.0.extend(tup.0);
231 res.1.extend(tup.1);
232 res.2.extend(tup.2);
233 } else {
234 result = Some(tup);
235 }
236 }
237 Err(_) => return,
238 }
239
240 if let Some((updates, users, chats)) = result {
241 let peers = self.client.build_peer_map(users, chats).await;
242 self.extend_update_queue(updates, peers);
243 }
244 }
245
246 fn extend_update_queue(
247 &mut self,
248 mut updates: Vec<(tl::enums::Update, State)>,
249 peer_map: PeerMap,
250 ) {
251 if let Some(limit) = self.configuration.update_queue_limit {
252 if let Some(exceeds) = (self.buffer.len() + updates.len()).checked_sub(limit + 1) {
253 let exceeds = exceeds + 1;
254 let now = Instant::now();
255 let notify = match self.last_update_limit_warn {
256 None => true,
257 Some(instant) => now - instant > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN,
258 };
259
260 updates.truncate(updates.len() - exceeds);
261 if notify {
262 log::warn!(
263 "{} updates were dropped because the update_queue_limit was exceeded",
264 exceeds
265 );
266 }
267
268 self.last_update_limit_warn = Some(now);
269 }
270 }
271
272 self.buffer
273 .extend(updates.into_iter().map(|(u, s)| (u, s, peer_map.handle())));
274 }
275
276 pub async fn sync_update_state(&self) {
280 self.client
281 .0
282 .session
283 .set_update_state(UpdateState::All(self.message_box.session_state()))
284 .await;
285 }
286}
287
288impl Client {
289 pub async fn stream_updates(
299 &self,
300 updates: mpsc::UnboundedReceiver<UpdatesLike>,
301 configuration: UpdatesConfiguration,
302 ) -> UpdateStream {
303 let message_box = if configuration.catch_up {
304 MessageBoxes::load(self.0.session.updates_state().await)
305 } else {
306 MessageBoxes::new()
309 };
310 let should_get_state =
312 message_box.is_empty() && self.0.session.peer(PeerId::self_user()).await.is_some();
313
314 UpdateStream {
315 client: self.clone(),
316 message_box,
317 last_update_limit_warn: None,
318 buffer: VecDeque::new(),
319 updates,
320 configuration,
321 should_get_state,
322 }
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use core::future::Future;
329
330 use super::*;
331
332 fn get_update_stream() -> UpdateStream {
333 panic!()
334 }
335
336 #[test]
337 fn ensure_next_update_future_impls_send() {
338 if false {
339 fn typeck(_: impl Future + Send) {}
341 typeck(get_update_stream().next());
342 }
343 }
344}