1#![allow(deprecated)]
12
13use super::{Client, UpdatesConfiguration};
14use crate::types::{PeerMap, Update};
15use grammers_mtsender::InvocationError;
16use grammers_session::PeerAuthCache;
17use grammers_session::defs::{PeerId, UpdateState, UpdatesState};
18pub use grammers_session::updates::{MessageBoxes, PrematureEndReason, State, UpdatesLike};
19use grammers_tl_types as tl;
20use log::{trace, warn};
21use std::collections::VecDeque;
22use std::sync::Arc;
23use std::time::{Duration, Instant};
24use tokio::sync::mpsc;
25use tokio::time::timeout_at;
26
27const UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN: Duration = Duration::from_secs(300);
29
30const BOT_CHANNEL_DIFF_LIMIT: i32 = 100000;
32const USER_CHANNEL_DIFF_LIMIT: i32 = 100;
33
34fn prepare_channel_difference(
35 mut request: tl::functions::updates::GetChannelDifference,
36 peer_auths: &PeerAuthCache,
37 message_box: &mut MessageBoxes,
38) -> Option<tl::functions::updates::GetChannelDifference> {
39 let id = match &request.channel {
40 tl::enums::InputChannel::Channel(channel) => PeerId::channel(channel.channel_id),
41 _ => unreachable!(),
42 };
43
44 if let Some(peer) = peer_auths.get(id) {
45 request.channel = peer.into();
46 request.limit = if peer_auths.is_self_bot() {
47 BOT_CHANNEL_DIFF_LIMIT
48 } else {
49 USER_CHANNEL_DIFF_LIMIT
50 };
51 trace!("requesting {:?}", request);
52 Some(request)
53 } else {
54 warn!(
55 "cannot getChannelDifference for {:?} as we're missing its hash",
56 id
57 );
58 message_box.end_channel_difference(PrematureEndReason::Banned);
59 None
60 }
61}
62
63pub struct UpdateStream {
64 client: Client,
65 message_box: MessageBoxes,
66 peer_auths: PeerAuthCache,
67 last_update_limit_warn: Option<Instant>,
70 buffer: VecDeque<(tl::enums::Update, State, Arc<crate::types::PeerMap>)>,
71 updates: mpsc::UnboundedReceiver<UpdatesLike>,
72 configuration: UpdatesConfiguration,
73 should_get_state: bool,
74}
75
76impl UpdateStream {
77 pub async fn next(&mut self) -> Result<Update, InvocationError> {
78 let (update, state, peers) = self.next_raw().await?;
79 Ok(Update::new(&self.client, update, state, &peers))
80 }
81
82 pub async fn next_raw(
83 &mut self,
84 ) -> Result<(tl::enums::Update, State, Arc<PeerMap>), InvocationError> {
85 if self.should_get_state {
86 self.should_get_state = false;
87 match self
88 .client
89 .invoke(&tl::functions::updates::GetState {})
90 .await
91 {
92 Ok(tl::enums::updates::State::State(state)) => {
93 self.client
94 .0
95 .session
96 .set_update_state(UpdateState::All(UpdatesState {
97 pts: state.pts,
98 qts: state.qts,
99 date: state.date,
100 seq: state.seq,
101 channels: Vec::new(),
102 }));
103 }
104 Err(_err) => {
105 }
108 }
109 }
110
111 loop {
112 let (deadline, get_diff, get_channel_diff) = {
113 if let Some(update) = self.buffer.pop_front() {
114 return Ok(update);
115 }
116 (
117 self.message_box.check_deadlines(), self.message_box.get_difference(),
119 self.message_box.get_channel_difference().and_then(|gd| {
120 prepare_channel_difference(gd, &self.peer_auths, &mut self.message_box)
121 }),
122 )
123 };
124
125 if let Some(request) = get_diff {
126 let response = self.client.invoke(&request).await?;
127 let (updates, users, chats) = self.message_box.apply_difference(response);
128 let _ = self.peer_auths.extend(&users, &chats);
129 self.extend_update_queue(updates, PeerMap::new(users, chats));
130 continue;
131 }
132
133 if let Some(request) = get_channel_diff {
134 let maybe_response = self.client.invoke(&request).await;
135
136 let response = match maybe_response {
137 Ok(r) => r,
138 Err(e) if e.is("PERSISTENT_TIMESTAMP_OUTDATED") => {
139 log::warn!(
151 "Getting difference for channel updates caused PersistentTimestampOutdated; ending getting difference prematurely until server issues are resolved"
152 );
153 {
154 self.message_box
155 .end_channel_difference(PrematureEndReason::TemporaryServerIssues);
156 }
157 continue;
158 }
159 Err(e) if e.is("CHANNEL_PRIVATE") => {
160 log::info!(
161 "Account is now banned so we can no longer fetch updates with request: {:?}",
162 request
163 );
164 {
165 self.message_box
166 .end_channel_difference(PrematureEndReason::Banned);
167 }
168 continue;
169 }
170 Err(InvocationError::Rpc(rpc_error)) if rpc_error.code == 500 => {
171 log::warn!("Telegram is having internal issues: {:#?}", rpc_error);
172 {
173 self.message_box
174 .end_channel_difference(PrematureEndReason::TemporaryServerIssues);
175 }
176 continue;
177 }
178 Err(e) => return Err(e),
179 };
180
181 let (updates, users, chats) = self.message_box.apply_channel_difference(response);
182 let _ = self.peer_auths.extend(&users, &chats);
183
184 self.extend_update_queue(updates, PeerMap::new(users, chats));
185 continue;
186 }
187
188 match timeout_at(deadline.into(), self.updates.recv()).await {
189 Ok(Some(updates)) => self.process_socket_updates(updates),
190 Ok(None) => break Err(InvocationError::Dropped),
191 Err(_) => {}
192 }
193 }
194 }
195
196 pub(crate) fn process_socket_updates(&mut self, updates: UpdatesLike) {
197 let mut result = Option::<(Vec<_>, Vec<_>, Vec<_>)>::None;
198 match self.message_box.process_updates(updates) {
199 Ok(tup) => {
200 if let Some(res) = result.as_mut() {
201 res.0.extend(tup.0);
202 res.1.extend(tup.1);
203 res.2.extend(tup.2);
204 } else {
205 result = Some(tup);
206 }
207 }
208 Err(_) => return,
209 }
210
211 if let Some((updates, users, chats)) = result {
212 self.extend_update_queue(updates, PeerMap::new(users, chats));
213 }
214 }
215
216 fn extend_update_queue(
217 &mut self,
218 mut updates: Vec<(tl::enums::Update, State)>,
219 peer_map: Arc<PeerMap>,
220 ) {
221 if let Some(limit) = self.configuration.update_queue_limit {
222 if let Some(exceeds) = (self.buffer.len() + updates.len()).checked_sub(limit + 1) {
223 let exceeds = exceeds + 1;
224 let now = Instant::now();
225 let notify = match self.last_update_limit_warn {
226 None => true,
227 Some(instant) => now - instant > UPDATE_LIMIT_EXCEEDED_LOG_COOLDOWN,
228 };
229
230 updates.truncate(updates.len() - exceeds);
231 if notify {
232 log::warn!(
233 "{} updates were dropped because the update_queue_limit was exceeded",
234 exceeds
235 );
236 }
237
238 self.last_update_limit_warn = Some(now);
239 }
240 }
241
242 self.buffer
243 .extend(updates.into_iter().map(|(u, s)| (u, s, peer_map.clone())));
244 }
245
246 pub fn sync_update_state(&self) {
248 self.client
249 .0
250 .session
251 .set_update_state(UpdateState::All(self.message_box.session_state()));
252 }
253}
254
255impl Drop for UpdateStream {
256 fn drop(&mut self) {
257 self.sync_update_state();
258 }
259}
260
261impl Client {
262 pub fn stream_updates(
269 &self,
270 updates: mpsc::UnboundedReceiver<UpdatesLike>,
271 configuration: UpdatesConfiguration,
272 ) -> UpdateStream {
273 let message_box = if configuration.catch_up {
274 MessageBoxes::load(self.0.session.updates_state())
275 } else {
276 MessageBoxes::new()
279 };
280 let should_get_state =
282 message_box.is_empty() && self.0.session.peer(PeerId::self_user()).is_some();
283
284 UpdateStream {
285 client: self.clone(),
286 message_box,
287 peer_auths: PeerAuthCache::new(None),
288 last_update_limit_warn: None,
289 buffer: VecDeque::new(),
290 updates,
291 configuration,
292 should_get_state,
293 }
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use core::future::Future;
301
302 fn get_update_stream() -> UpdateStream {
303 panic!()
304 }
305
306 #[test]
307 fn ensure_next_update_future_impls_send() {
308 if false {
309 fn typeck(_: impl Future + Send) {}
311 typeck(get_update_stream().next());
312 }
313 }
314}