1use ferogram_mtproto::{
14 EncryptedSession, SeenMsgIds, Session, authentication as auth, new_seen_msg_ids, step2_temp,
15};
16use ferogram_tl_types as tl;
17use ferogram_tl_types::{Cursor, Deserializable, RemoteCall};
18use tokio::io::{AsyncReadExt, AsyncWriteExt};
19use tokio::net::TcpStream;
20
21use crate::errors::InvocationError;
22use crate::pool::{build_msgs_ack_body, build_msgs_ack_ping_body};
23use ferogram_connect::TransportKind;
24#[allow(unused_imports)]
26use metrics::{counter, histogram};
27
28const PENDING_ACKS_THRESHOLD: usize = 10;
31
32const PING_EVERY_N_CHUNKS: u32 = 5;
35
36pub struct DcConnection {
37 stream: TcpStream,
38 enc: EncryptedSession,
39 pending_acks: Vec<i64>,
40 call_count: u32,
41 cipher: Option<ferogram_crypto::ObfuscatedCipher>,
43 #[allow(dead_code)]
45 seen_msg_ids: SeenMsgIds,
46}
47
48impl DcConnection {
49 #[tracing::instrument(skip(socks5), fields(addr = %addr, dc_id = dc_id))]
51 pub async fn connect_fastest(
52 addr: &str,
53 socks5: Option<&ferogram_connect::Socks5Config>,
54 dc_id: i16,
55 ) -> Result<(Self, &'static str), InvocationError> {
56 use tokio::task::JoinSet;
57 let addr = addr.to_owned();
58 let socks5 = socks5.cloned();
59 tracing::debug!("[dc_pool] probing {addr} with 3 transports");
60 let mut set: JoinSet<Result<(DcConnection, &'static str), InvocationError>> =
61 JoinSet::new();
62
63 {
64 let a = addr.clone();
65 let s = socks5.clone();
66 set.spawn(async move {
67 Ok((
68 DcConnection::connect_raw(
69 &a,
70 s.as_ref(),
71 &TransportKind::Obfuscated { secret: None },
72 dc_id,
73 )
74 .await?,
75 "Obfuscated",
76 ))
77 });
78 }
79 {
80 let a = addr.clone();
81 let s = socks5.clone();
82 set.spawn(async move {
83 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
84 Ok((
85 DcConnection::connect_raw(&a, s.as_ref(), &TransportKind::Abridged, dc_id)
86 .await?,
87 "Abridged",
88 ))
89 });
90 }
91 {
92 let a = addr.clone();
93 set.spawn(async move {
94 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
95 Ok((
96 DcConnection::connect_raw(&a, None, &TransportKind::Http, dc_id).await?,
97 "Http",
98 ))
99 });
100 }
101
102 let mut last_err = InvocationError::Deserialize("connect_fastest: no candidates".into());
103 while let Some(outcome) = set.join_next().await {
104 match outcome {
105 Ok(Ok((conn, label))) => {
106 set.abort_all();
107 return Ok((conn, label));
108 }
109 Ok(Err(e)) => {
110 last_err = e;
111 }
112 Err(e) if e.is_cancelled() => {}
113 Err(_) => {}
114 }
115 }
116 Err(last_err)
117 }
118
119 #[tracing::instrument(skip(socks5, transport), fields(addr = %addr, dc_id = dc_id))]
121 pub async fn connect_raw(
122 addr: &str,
123 socks5: Option<&ferogram_connect::Socks5Config>,
124 transport: &TransportKind,
125 dc_id: i16,
126 ) -> Result<Self, InvocationError> {
127 tracing::debug!("[dc_pool] Connecting to {addr} …");
128 let mut stream = Self::open_tcp(addr, socks5).await?;
129 let mut cipher = Self::send_transport_init(&mut stream, transport, dc_id).await?;
130
131 let mut plain = Session::new();
132
133 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
134 Self::send_plain_frame(
135 &mut stream,
136 &plain.pack(&req1).to_plaintext_bytes(),
137 cipher.as_mut(),
138 )
139 .await?;
140 let res_pq: tl::enums::ResPq = Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
141
142 let (req2, s2) = auth::step2(s1, res_pq, dc_id as i32)
143 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
144 Self::send_plain_frame(
145 &mut stream,
146 &plain.pack(&req2).to_plaintext_bytes(),
147 cipher.as_mut(),
148 )
149 .await?;
150 let dh: tl::enums::ServerDhParams =
151 Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
152
153 let (req3, s3) =
154 auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
155 Self::send_plain_frame(
156 &mut stream,
157 &plain.pack(&req3).to_plaintext_bytes(),
158 cipher.as_mut(),
159 )
160 .await?;
161 let ans: tl::enums::SetClientDhParamsAnswer =
162 Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
163
164 let done = {
166 let mut result =
167 auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
168 let mut attempts = 0u8;
169 loop {
170 match result {
171 auth::FinishResult::Done(d) => break d,
172 auth::FinishResult::Retry {
173 retry_id,
174 dh_params,
175 nonce,
176 server_nonce,
177 new_nonce,
178 } => {
179 attempts += 1;
180 if attempts >= 5 {
181 return Err(InvocationError::Deserialize(
182 "dh_gen_retry exceeded 5 attempts".into(),
183 ));
184 }
185 let (req_retry, s3_retry) =
186 auth::retry_step3(&dh_params, nonce, server_nonce, new_nonce, retry_id)
187 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
188 Self::send_plain_frame(
189 &mut stream,
190 &plain.pack(&req_retry).to_plaintext_bytes(),
191 cipher.as_mut(),
192 )
193 .await?;
194 let ans_retry: tl::enums::SetClientDhParamsAnswer =
195 Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
196 result = auth::finish(s3_retry, ans_retry)
197 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
198 }
199 }
200 }
201 };
202 tracing::debug!("[dc_pool] DH complete ✓ for {addr}");
203
204 let seen = new_seen_msg_ids();
205 Ok(Self {
206 stream,
207 cipher,
208 enc: EncryptedSession::with_seen(
209 done.auth_key,
210 done.first_salt,
211 done.time_offset,
212 seen.clone(),
213 ),
214 pending_acks: Vec::new(),
215 call_count: 0,
216 seen_msg_ids: seen,
217 })
218 }
219
220 #[allow(clippy::too_many_arguments)]
223 pub async fn connect_with_key(
224 addr: &str,
225 auth_key: [u8; 256],
226 first_salt: i64,
227 time_offset: i32,
228 socks5: Option<&ferogram_connect::Socks5Config>,
229 mtproxy: Option<&ferogram_connect::MtProxyConfig>,
230 transport: &TransportKind,
231 dc_id: i16,
232 pfs: bool,
233 ) -> Result<Self, InvocationError> {
234 let (mut stream, mut cipher) = if let Some(mp) = mtproxy {
235 let mut s = mp.connect().await?;
236 s.set_nodelay(true)?;
237 let c = Self::send_transport_init(&mut s, &mp.transport, dc_id).await?;
238 (s, c)
239 } else {
240 let mut s = Self::open_tcp(addr, socks5).await?;
241 let c = Self::send_transport_init(&mut s, transport, dc_id).await?;
242 (s, c)
243 };
244
245 if pfs {
246 tracing::debug!("[dc_pool] PFS: temp DH bind for DC{dc_id}");
247 match Self::do_pool_pfs_bind(&mut stream, cipher.as_mut(), &auth_key, dc_id).await {
248 Ok(temp_enc) => {
249 tracing::info!("[dc_pool] PFS bind complete DC{dc_id}");
250 return Ok(Self {
251 stream,
252 cipher,
253 enc: temp_enc,
254 pending_acks: Vec::new(),
255 call_count: 0,
256 seen_msg_ids: new_seen_msg_ids(),
257 });
258 }
259 Err(e) => {
260 tracing::warn!("[dc_pool] PFS bind failed DC{dc_id} ({e}); falling back");
261 return Err(e);
262 }
263 }
264 }
265
266 let seen = new_seen_msg_ids();
267 Ok(Self {
268 stream,
269 cipher,
270 enc: EncryptedSession::with_seen(auth_key, first_salt, time_offset, seen.clone()),
271 pending_acks: Vec::new(),
272 call_count: 0,
273 seen_msg_ids: seen,
274 })
275 }
276
277 #[allow(clippy::needless_option_as_deref)]
279 async fn do_pool_pfs_bind(
280 stream: &mut tokio::net::TcpStream,
281 mut cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
282 perm_auth_key: &[u8; 256],
283 dc_id: i16,
284 ) -> Result<EncryptedSession, InvocationError> {
285 use ferogram_mtproto::{
286 auth_key_id_from_key, encrypt_bind_inner, gen_msg_id, new_seen_msg_ids,
287 serialize_bind_temp_auth_key,
288 };
289 const TEMP_EXPIRES: i32 = 86_400; let mut plain = ferogram_mtproto::Session::new();
293
294 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
295 Self::send_plain_frame(
296 stream,
297 &plain.pack(&req1).to_plaintext_bytes(),
298 cipher.as_deref_mut(),
299 )
300 .await?;
301 let res_pq: tl::enums::ResPq =
302 Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
303
304 let (req2, s2) = step2_temp(s1, res_pq, dc_id as i32, TEMP_EXPIRES)
305 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
306 Self::send_plain_frame(
307 stream,
308 &plain.pack(&req2).to_plaintext_bytes(),
309 cipher.as_deref_mut(),
310 )
311 .await?;
312 let dh: tl::enums::ServerDhParams =
313 Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
314
315 let (req3, s3) =
316 auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
317 Self::send_plain_frame(
318 stream,
319 &plain.pack(&req3).to_plaintext_bytes(),
320 cipher.as_deref_mut(),
321 )
322 .await?;
323 let ans: tl::enums::SetClientDhParamsAnswer =
324 Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
325
326 let done = {
327 let mut result =
328 auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
329 let mut attempts = 0u8;
330 loop {
331 match result {
332 ferogram_mtproto::FinishResult::Done(d) => break d,
333 ferogram_mtproto::FinishResult::Retry {
334 retry_id,
335 dh_params,
336 nonce,
337 server_nonce,
338 new_nonce,
339 } => {
340 attempts += 1;
341 if attempts >= 5 {
342 return Err(InvocationError::Deserialize(
343 "PFS pool temp DH retry exceeded 5".into(),
344 ));
345 }
346 let (rr, s3r) = ferogram_mtproto::retry_step3(
347 &dh_params,
348 nonce,
349 server_nonce,
350 new_nonce,
351 retry_id,
352 )
353 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
354 Self::send_plain_frame(
355 stream,
356 &plain.pack(&rr).to_plaintext_bytes(),
357 cipher.as_deref_mut(),
358 )
359 .await?;
360 let ar: tl::enums::SetClientDhParamsAnswer =
361 Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
362 result = auth::finish(s3r, ar)
363 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
364 }
365 }
366 }
367 };
368
369 let temp_key = done.auth_key;
370 let temp_salt = done.first_salt;
371 let temp_offset = done.time_offset;
372
373 let temp_key_id = auth_key_id_from_key(&temp_key);
375 let perm_key_id = auth_key_id_from_key(perm_auth_key);
376
377 let mut nonce_buf = [0u8; 8];
378 getrandom::getrandom(&mut nonce_buf)
379 .map_err(|_| InvocationError::Deserialize("getrandom nonce".into()))?;
380 let nonce = i64::from_le_bytes(nonce_buf);
381
382 let server_now = std::time::SystemTime::now()
383 .duration_since(std::time::UNIX_EPOCH)
384 .expect("system clock is before UNIX epoch")
385 .as_secs() as i32
386 + temp_offset;
387 let expires_at = server_now + TEMP_EXPIRES;
388
389 let seen = new_seen_msg_ids();
390 let mut temp_enc = EncryptedSession::with_seen(temp_key, temp_salt, temp_offset, seen);
391 let temp_session_id = temp_enc.session_id();
392
393 let msg_id = gen_msg_id();
394 let enc_msg = encrypt_bind_inner(
395 perm_auth_key,
396 msg_id,
397 nonce,
398 temp_key_id,
399 perm_key_id,
400 temp_session_id,
401 expires_at,
402 );
403 let bind_body = serialize_bind_temp_auth_key(perm_key_id, nonce, expires_at, &enc_msg);
404
405 let wire = temp_enc.pack_body_at_msg_id(&bind_body, msg_id);
407 Self::send_abridged(stream, &wire, cipher.as_deref_mut()).await?;
408
409 for attempt in 0u8..5 {
413 let mut raw = Self::recv_abridged(stream, cipher.as_deref_mut()).await?;
414 let decrypted = temp_enc.unpack(&mut raw).map_err(|e| {
415 InvocationError::Deserialize(format!("PFS pool bind decrypt: {e:?}"))
416 })?;
417 match pfs_pool_decode_bind_response(&decrypted.body) {
418 Ok(()) => {
419 return Ok(temp_enc);
423 }
424 Err(ref e) if e == "__need_more__" => {
425 tracing::debug!(
426 "[ferogram] PFS pool bind (DC{dc_id}): informational frame {attempt}, reading next"
427 );
428 continue;
429 }
430 Err(reason) => {
431 tracing::error!(
432 "[ferogram] PFS pool bind server response (DC{dc_id}): {reason}"
433 );
434 return Err(InvocationError::Deserialize(format!(
435 "auth.bindTempAuthKey (pool): {reason}"
436 )));
437 }
438 }
439 }
440 Err(InvocationError::Deserialize(
441 "auth.bindTempAuthKey (pool): no boolTrue after 5 frames".into(),
442 ))
443 }
444
445 async fn open_tcp(
446 addr: &str,
447 socks5: Option<&ferogram_connect::Socks5Config>,
448 ) -> Result<TcpStream, InvocationError> {
449 let stream = match socks5 {
450 Some(proxy) => proxy.connect(addr).await?,
451 None => TcpStream::connect(addr).await?,
452 };
453 stream.set_nodelay(true)?;
455 {
457 let sock = socket2::SockRef::from(&stream);
458 let ka = socket2::TcpKeepalive::new()
459 .with_time(std::time::Duration::from_secs(10))
460 .with_interval(std::time::Duration::from_secs(5));
461 #[cfg(not(target_os = "windows"))]
462 let ka = ka.with_retries(3);
463 sock.set_tcp_keepalive(&ka).ok();
464 }
465 Ok(stream)
466 }
467
468 async fn send_transport_init(
469 stream: &mut TcpStream,
470 transport: &TransportKind,
471 dc_id: i16,
472 ) -> Result<Option<ferogram_crypto::ObfuscatedCipher>, InvocationError> {
473 match transport {
474 TransportKind::Abridged => {
475 stream.write_all(&[0xef]).await?;
476 }
477 TransportKind::Intermediate => {
478 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
479 }
480 TransportKind::Full => {}
481 TransportKind::Obfuscated { secret } => {
482 use sha2::Digest;
483 let mut nonce = [0u8; 64];
484 loop {
485 getrandom::getrandom(&mut nonce)
486 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
487 let first = u32::from_le_bytes(nonce[0..4].try_into().unwrap());
488 let second = u32::from_le_bytes(nonce[4..8].try_into().unwrap());
489 let bad = nonce[0] == 0xEF
490 || first == 0x44414548
491 || first == 0x54534F50
492 || first == 0x20544547
493 || first == 0x4954504f || first == 0xEEEEEEEE
495 || first == 0xDDDDDDDD
496 || first == 0x02010316
497 || second == 0x00000000;
498 if !bad {
499 break;
500 }
501 }
502 let tx_raw: [u8; 32] = nonce[8..40].try_into().unwrap();
503 let tx_iv: [u8; 16] = nonce[40..56].try_into().unwrap();
504 let mut rev48 = nonce[8..56].to_vec();
505 rev48.reverse();
506 let rx_raw: [u8; 32] = rev48[0..32].try_into().unwrap();
507 let rx_iv: [u8; 16] = rev48[32..48].try_into().unwrap();
508 let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
509 let mut h = sha2::Sha256::new();
510 h.update(tx_raw);
511 h.update(s.as_ref());
512 let tx: [u8; 32] = h.finalize().into();
513 let mut h = sha2::Sha256::new();
514 h.update(rx_raw);
515 h.update(s.as_ref());
516 let rx: [u8; 32] = h.finalize().into();
517 (tx, rx)
518 } else {
519 (tx_raw, rx_raw)
520 };
521 nonce[56] = 0xef;
522 nonce[57] = 0xef;
523 nonce[58] = 0xef;
524 nonce[59] = 0xef;
525 let dc_bytes = dc_id.to_le_bytes();
526 nonce[60] = dc_bytes[0];
527 nonce[61] = dc_bytes[1];
528 let mut enc =
529 ferogram_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
530 let mut skip = [0u8; 56];
531 enc.encrypt(&mut skip);
532 enc.encrypt(&mut nonce[56..64]);
533 stream.write_all(&nonce).await?;
534 return Ok(Some(enc));
535 }
536 TransportKind::PaddedIntermediate { secret } => {
537 use sha2::Digest;
538 let mut nonce = [0u8; 64];
539 loop {
540 getrandom::getrandom(&mut nonce)
541 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
542 let first = u32::from_le_bytes(nonce[0..4].try_into().unwrap());
543 let second = u32::from_le_bytes(nonce[4..8].try_into().unwrap());
544 let bad = nonce[0] == 0xEF
545 || first == 0x44414548
546 || first == 0x54534F50
547 || first == 0x20544547
548 || first == 0x4954504f
549 || first == 0xEEEEEEEE
550 || first == 0xDDDDDDDD
551 || first == 0x02010316
552 || second == 0x00000000;
553 if !bad {
554 break;
555 }
556 }
557 let tx_raw: [u8; 32] = nonce[8..40].try_into().unwrap();
558 let tx_iv: [u8; 16] = nonce[40..56].try_into().unwrap();
559 let mut rev48 = nonce[8..56].to_vec();
560 rev48.reverse();
561 let rx_raw: [u8; 32] = rev48[0..32].try_into().unwrap();
562 let rx_iv: [u8; 16] = rev48[32..48].try_into().unwrap();
563 let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
564 let mut h = sha2::Sha256::new();
565 h.update(tx_raw);
566 h.update(s.as_ref());
567 let tx: [u8; 32] = h.finalize().into();
568 let mut h = sha2::Sha256::new();
569 h.update(rx_raw);
570 h.update(s.as_ref());
571 let rx: [u8; 32] = h.finalize().into();
572 (tx, rx)
573 } else {
574 (tx_raw, rx_raw)
575 };
576 nonce[56] = 0xdd;
577 nonce[57] = 0xdd;
578 nonce[58] = 0xdd;
579 nonce[59] = 0xdd;
580 let dc_bytes = dc_id.to_le_bytes();
581 nonce[60] = dc_bytes[0];
582 nonce[61] = dc_bytes[1];
583 let mut enc =
584 ferogram_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
585 let mut skip = [0u8; 56];
586 enc.encrypt(&mut skip);
587 enc.encrypt(&mut nonce[56..64]);
588 stream.write_all(&nonce).await?;
589 return Ok(Some(enc));
590 }
591 TransportKind::FakeTls { .. } => {
592 return Err(InvocationError::Deserialize(
596 "FakeTls transport is not supported for DcPool connections".into(),
597 ));
598 }
599 TransportKind::Http => {}
600 }
601 Ok(None)
602 }
603
604 pub fn auth_key_bytes(&self) -> [u8; 256] {
605 self.enc.auth_key_bytes()
606 }
607 pub fn first_salt(&self) -> i64 {
608 self.enc.salt
609 }
610 pub fn time_offset(&self) -> i32 {
611 self.enc.time_offset
612 }
613
614 #[tracing::instrument(skip(self, req), fields(method = std::any::type_name::<R>()))]
615 pub async fn rpc_call<R: RemoteCall>(&mut self, req: &R) -> Result<Vec<u8>, InvocationError> {
616 let _t0 = std::time::Instant::now();
617 self.call_count += 1;
620 if self.call_count.is_multiple_of(PING_EVERY_N_CHUNKS) {
621 let ping_id = self.call_count as i64;
622 let ping_body = build_msgs_ack_ping_body(ping_id);
623 let (ping_wire, _) = self.enc.pack_body_with_msg_id(&ping_body, true);
625 let _ = Self::send_abridged(&mut self.stream, &ping_wire, self.cipher.as_mut()).await;
633 }
634
635 if !self.pending_acks.is_empty() {
637 let ack_body = build_msgs_ack_body(&self.pending_acks);
638 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
639 let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
640 self.pending_acks.clear();
641 }
642
643 let (wire, mut sent_msg_id) = self.enc.pack_with_msg_id(req);
645 Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
646 let mut salt_retries = 0u8;
647 let mut session_resets = 0u8;
648 loop {
649 let mut raw = Self::recv_abridged(&mut self.stream, self.cipher.as_mut()).await?;
650 let msg = self
651 .enc
652 .unpack(&mut raw)
653 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
654 self.pending_acks.push(msg.msg_id);
656 if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
657 let ack_body = build_msgs_ack_body(&self.pending_acks);
660 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
661 let _ =
662 Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
663 self.pending_acks.clear();
664 }
665 if msg.body.len() < 4 {
667 return Ok(msg.body);
668 }
669 let mut need_resend = false;
670 let mut need_session_reset = false;
671 let mut bad_msg_code: Option<u32> = None;
672 let mut bad_msg_server_id: Option<i64> = None;
673 let scan_result = Self::scan_body(
676 &msg.body,
677 &mut self.enc.salt,
678 &mut need_resend,
679 &mut need_session_reset,
680 &mut bad_msg_code,
681 &mut bad_msg_server_id,
682 Some(sent_msg_id),
683 msg.msg_id,
684 )?;
685 if need_session_reset {
687 session_resets += 1;
688 if session_resets > 2 {
689 return Err(InvocationError::Deserialize(
690 "new_session_created: exceeded 2 resets".into(),
691 ));
692 }
693 if !self.pending_acks.is_empty() {
694 let ack_body = build_msgs_ack_body(&self.pending_acks);
695 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
696 let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
697 .await;
698 self.pending_acks.clear();
699 }
700 if scan_result.is_none() {
704 tracing::debug!(
706 "[dc_pool] new_session_created: resending [{session_resets}/2]"
707 );
708 let (wire, new_id) = self.enc.pack_with_msg_id(req);
709 sent_msg_id = new_id;
710 Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
711 }
712 } else if need_resend {
716 match bad_msg_code {
718 Some(16) | Some(17) => {
719 if let Some(srv_id) = bad_msg_server_id {
720 self.enc.correct_time_offset(srv_id);
721 }
722 }
727 Some(32) | Some(33) => {
728 self.enc
731 .correct_seq_no(bad_msg_code.expect("matched Some arm"));
732 }
733 _ => {
734 self.enc.undo_seq_no();
736 }
737 }
738 salt_retries += 1;
739 if salt_retries >= 5 {
740 return Err(InvocationError::Deserialize(
741 "bad_server_salt/bad_msg: exceeded 5 retries".into(),
742 ));
743 }
744 tracing::debug!(
745 "[dc_pool] resend in transfer conn (code={bad_msg_code:?}) [{salt_retries}/5]"
746 );
747 if !self.pending_acks.is_empty() {
748 let ack_body = build_msgs_ack_body(&self.pending_acks);
749 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
750 let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
751 .await;
752 self.pending_acks.clear();
753 }
754 let (wire, new_id) = self.enc.pack_with_msg_id(req);
755 sent_msg_id = new_id;
756 Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
757 }
758 if let Some(result) = scan_result {
759 metrics::counter!("ferogram.rpc_calls_total", "result" => "ok").increment(1);
760 metrics::histogram!("ferogram.rpc_latency_ms")
761 .record(_t0.elapsed().as_millis() as f64);
762 return Ok(result);
763 }
764 }
765 }
766 #[allow(clippy::too_many_arguments)]
779 fn scan_body(
780 body: &[u8],
781 salt: &mut i64,
782 need_resend: &mut bool,
783 need_session_reset: &mut bool,
784 bad_msg_code: &mut Option<u32>,
785 bad_msg_server_id: &mut Option<i64>,
786 sent_msg_id: Option<i64>,
787 server_msg_id: i64,
788 ) -> Result<Option<Vec<u8>>, InvocationError> {
789 if body.len() < 4 {
790 return Ok(None);
791 }
792 let cid = u32::from_le_bytes(body[..4].try_into().unwrap());
793 match cid {
794 0xf35c6d01 => {
795 if body.len() >= 12
796 && let Some(expected) = sent_msg_id {
797 let resp_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
798 if resp_id != expected {
799 tracing::debug!(
800 "[dc_pool] rpc_result req_msg_id mismatch \
801 (got {resp_id:#018x}, want {expected:#018x}); skipping"
802 );
803 return Ok(None);
804 }
805 }
806 let inner = if body.len() >= 12 { &body[12..] } else { body };
807 if inner.len() >= 4
809 && u32::from_le_bytes(inner[..4].try_into().unwrap()) == 0x3072cfa1
810 {
811 let mut dummy_salt = *salt;
812 let mut nr = false; let mut nsr = false;
813 let mut bc = None; let mut bsi = None;
814 if let Some(r) = Self::scan_body(inner, &mut dummy_salt, &mut nr, &mut nsr, &mut bc, &mut bsi, None, server_msg_id)? {
815 return Ok(Some(r));
816 }
817 if let Some(compressed) = tl_read_bytes(&inner[4..]) {
819 let dec = flate2::read::GzDecoder::new(compressed.as_slice());
820 let mut limited = std::io::Read::take(dec, 16 * 1024 * 1024);
821 let mut out = Vec::new();
822 if std::io::Read::read_to_end(&mut limited, &mut out).is_ok() {
823 return Ok(Some(out));
824 }
825 }
826 return Ok(None);
827 }
828 if inner.len() >= 8
829 && u32::from_le_bytes(inner[..4].try_into().unwrap()) == 0x2144ca19
830 {
831 let code = i32::from_le_bytes(inner[4..8].try_into().unwrap());
832 let message = tl_read_string(&inner[8..]).unwrap_or_default();
833 return Err(InvocationError::Rpc(
834 crate::errors::RpcError::from_telegram(code, &message),
835 ));
836 }
837 Ok(Some(inner.to_vec()))
838 }
839 0x2144ca19 => {
840 if body.len() < 8 {
841 return Err(InvocationError::Deserialize("rpc_error short".into()));
842 }
843 let code = i32::from_le_bytes(body[4..8].try_into().unwrap());
844 let message = tl_read_string(&body[8..]).unwrap_or_default();
845 Err(InvocationError::Rpc(crate::errors::RpcError::from_telegram(code, &message)))
846 }
847 0xedab447b => {
848 if body.len() >= 28 {
850 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
851 let new_salt = i64::from_le_bytes(body[20..28].try_into().unwrap());
852 if sent_msg_id.is_none_or(|id| id == bad_msg_id) {
855 *salt = new_salt;
856 *need_resend = true;
857 }
858 }
859 Ok(None)
860 }
861 0x9ec20908 => {
862 if body.len() >= 28 {
865 let first_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
866 let unique_id = i64::from_le_bytes(body[12..20].try_into().unwrap());
867 let server_salt = i64::from_le_bytes(body[20..28].try_into().unwrap());
868 tracing::debug!(
869 "[dc_pool] new_session_created: unique_id={unique_id:#018x} \
870 first_msg_id={first_msg_id} salt={server_salt}"
871 );
872 *salt = server_salt;
873 if sent_msg_id.is_some_and(|id| id < first_msg_id) {
879 *need_session_reset = true;
880 }
881 }
882 Ok(None)
883 }
884 0xa7eff811 => {
885 if body.len() >= 20 {
891 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
892 let error_code = u32::from_le_bytes(body[16..20].try_into().unwrap());
894 tracing::debug!(
895 "[dc_pool] bad_msg_notification: bad_msg_id={bad_msg_id:#018x} code={error_code}"
896 );
897 match error_code {
898 16 | 17 => {
899 *bad_msg_code = Some(error_code);
903 *bad_msg_server_id = Some(server_msg_id);
904 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
905 }
906 32 | 33 => {
907 *bad_msg_code = Some(error_code);
909 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
910 }
911 48 => {
912 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
919 tracing::debug!(
920 "[dc_pool] bad_msg code 48 (wrong salt): will resend with current salt"
921 );
922 }
923 _ => {
924 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
926 }
927 }
928 }
929 Ok(None)
930 }
931 0x347773c5 => {
932 if body.len() >= 12
938 && let Some(expected) = sent_msg_id
939 {
940 let pong_req_id = i64::from_le_bytes(body[4..12].try_into().unwrap());
941 if pong_req_id == expected {
942 return Ok(Some(body.to_vec()));
943 }
944 }
945 Ok(None)
947 }
948 0x73f1f8dc => {
949 if body.len() < 8 {
950 return Ok(None);
951 }
952 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
953 let mut pos = 8usize;
954 let mut found: Option<Vec<u8>> = None;
957 for _ in 0..count {
958 if pos + 16 > body.len() { break; }
959 let inner_bytes =
960 u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
961 pos += 16;
962 if pos + inner_bytes > body.len() { break; }
963 let inner = &body[pos..pos + inner_bytes];
964 pos += inner_bytes;
965 if found.is_none() {
966 if let Some(r) = Self::scan_body(inner, salt, need_resend,
967 need_session_reset, bad_msg_code, bad_msg_server_id, sent_msg_id,
968 server_msg_id)?
969 {
970 found = Some(r);
971 }
974 } else {
975 let _ = Self::scan_body(inner, salt, need_resend, need_session_reset,
981 bad_msg_code, bad_msg_server_id, sent_msg_id,
982 server_msg_id)?;
983 }
984 }
985 Ok(found)
986 }
987 0x3072cfa1 => {
988 if let Some(compressed) = tl_read_bytes(&body[4..]) {
990 let decoder = flate2::read::GzDecoder::new(compressed.as_slice());
991 let mut limited = std::io::Read::take(decoder, 16 * 1024 * 1024);
992 let mut decompressed = Vec::new();
993 if std::io::Read::read_to_end(&mut limited, &mut decompressed).is_ok()
994 && !decompressed.is_empty()
995 {
996 return Self::scan_body(
997 &decompressed, salt,
998 need_resend, need_session_reset,
999 bad_msg_code, bad_msg_server_id,
1000 sent_msg_id,
1001 server_msg_id,
1002 );
1003 }
1004 }
1005 Ok(None)
1006 }
1007 _ => Ok(None),
1008 }
1009 }
1010
1011 pub async fn rpc_call_serializable<S: ferogram_tl_types::Serializable>(
1013 &mut self,
1014 req: &S,
1015 ) -> Result<Vec<u8>, InvocationError> {
1016 if !self.pending_acks.is_empty() {
1017 let ack_body = build_msgs_ack_body(&self.pending_acks);
1018 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1019 let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
1020 self.pending_acks.clear();
1021 }
1022 let (wire, mut sent_msg_id) = self.enc.pack_serializable_with_msg_id(req);
1023 Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
1024 let mut salt_retries = 0u8;
1025 let mut session_resets = 0u8;
1026 loop {
1027 let mut raw = Self::recv_abridged(&mut self.stream, self.cipher.as_mut()).await?;
1028 let msg = self
1029 .enc
1030 .unpack(&mut raw)
1031 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1032 self.pending_acks.push(msg.msg_id);
1033 if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
1034 let ack_body = build_msgs_ack_body(&self.pending_acks);
1035 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1036 let _ =
1037 Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
1038 self.pending_acks.clear();
1039 }
1040 if msg.body.len() < 4 {
1042 return Ok(msg.body);
1043 }
1044 let mut need_resend = false;
1045 let mut need_session_reset = false;
1046 let mut bad_msg_code: Option<u32> = None;
1047 let mut bad_msg_server_id: Option<i64> = None;
1048 let scan_result = Self::scan_body(
1050 &msg.body,
1051 &mut self.enc.salt,
1052 &mut need_resend,
1053 &mut need_session_reset,
1054 &mut bad_msg_code,
1055 &mut bad_msg_server_id,
1056 Some(sent_msg_id),
1057 msg.msg_id,
1058 )?;
1059 if need_session_reset {
1060 session_resets += 1;
1061 if session_resets > 2 {
1062 return Err(InvocationError::Deserialize(
1063 "new_session_created (serializable): exceeded 2 resets".into(),
1064 ));
1065 }
1066 if !self.pending_acks.is_empty() {
1067 let ack_body = build_msgs_ack_body(&self.pending_acks);
1068 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1069 let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
1070 .await;
1071 self.pending_acks.clear();
1072 }
1073 if scan_result.is_none() {
1074 let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
1075 sent_msg_id = new_id;
1076 Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
1077 }
1078 } else if need_resend {
1079 match bad_msg_code {
1080 Some(16) | Some(17) => {
1081 if let Some(srv_id) = bad_msg_server_id {
1082 self.enc.correct_time_offset(srv_id);
1083 }
1084 }
1086 Some(32) | Some(33) => {
1087 self.enc
1088 .correct_seq_no(bad_msg_code.expect("matched Some arm"));
1089 }
1090 _ => {
1091 self.enc.undo_seq_no();
1092 }
1093 }
1094 salt_retries += 1;
1095 if salt_retries >= 5 {
1096 return Err(InvocationError::Deserialize(
1097 "bad_server_salt (serializable): exceeded 5 retries".into(),
1098 ));
1099 }
1100 tracing::debug!(
1101 "[dc_pool] resend serializable (code={bad_msg_code:?}) [{salt_retries}/5]"
1102 );
1103 if !self.pending_acks.is_empty() {
1104 let ack_body = build_msgs_ack_body(&self.pending_acks);
1105 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1106 let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
1107 .await;
1108 self.pending_acks.clear();
1109 }
1110 let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
1111 sent_msg_id = new_id;
1112 Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
1113 }
1114 if let Some(result) = scan_result {
1115 return Ok(result);
1116 }
1117 }
1118 }
1119
1120 pub async fn rpc_call_raw(&mut self, body: &[u8]) -> Result<Vec<u8>, InvocationError> {
1123 Self::send_abridged(&mut self.stream, body, self.cipher.as_mut()).await?;
1124 Self::recv_abridged(&mut self.stream, self.cipher.as_mut()).await
1125 }
1126
1127 async fn send_abridged(
1128 stream: &mut TcpStream,
1129 data: &[u8],
1130 cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1131 ) -> Result<(), InvocationError> {
1132 let words = data.len() / 4;
1134 let mut frame = if words < 0x7f {
1135 let mut v = Vec::with_capacity(1 + data.len());
1136 v.push(words as u8);
1137 v
1138 } else {
1139 let mut v = Vec::with_capacity(4 + data.len());
1140 v.extend_from_slice(&[
1141 0x7f,
1142 (words & 0xff) as u8,
1143 ((words >> 8) & 0xff) as u8,
1144 ((words >> 16) & 0xff) as u8,
1145 ]);
1146 v
1147 };
1148 frame.extend_from_slice(data);
1149 if let Some(c) = cipher {
1150 c.encrypt(&mut frame);
1151 }
1152 stream.write_all(&frame).await?;
1153 Ok(())
1154 }
1155
1156 async fn recv_abridged(
1157 stream: &mut TcpStream,
1158 mut cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1159 ) -> Result<Vec<u8>, InvocationError> {
1160 use tokio::time::{Duration, timeout};
1162 const RECV_TIMEOUT: Duration = Duration::from_secs(60);
1163
1164 let mut h = [0u8; 1];
1165 timeout(RECV_TIMEOUT, stream.read_exact(&mut h))
1166 .await
1167 .map_err(|_| {
1168 InvocationError::Io(std::io::Error::new(
1169 std::io::ErrorKind::TimedOut,
1170 "transfer recv: header timeout (60 s)",
1171 ))
1172 })??;
1173 if let Some(ref mut c) = cipher.as_mut() {
1174 c.decrypt(&mut h);
1175 }
1176
1177 let words = if h[0] == 0x7f {
1179 let mut b = [0u8; 3];
1180 timeout(RECV_TIMEOUT, stream.read_exact(&mut b))
1181 .await
1182 .map_err(|_| {
1183 InvocationError::Io(std::io::Error::new(
1184 std::io::ErrorKind::TimedOut,
1185 "transfer recv: length timeout (60 s)",
1186 ))
1187 })??;
1188 if let Some(ref mut c) = cipher.as_mut() {
1189 c.decrypt(&mut b);
1190 }
1191 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
1192 } else {
1193 h[0] as usize
1194 };
1195
1196 let mut buf = vec![0u8; words * 4];
1197 timeout(RECV_TIMEOUT, stream.read_exact(&mut buf))
1198 .await
1199 .map_err(|_| {
1200 InvocationError::Io(std::io::Error::new(
1201 std::io::ErrorKind::TimedOut,
1202 "transfer recv: body timeout (60 s)",
1203 ))
1204 })??;
1205 if let Some(c) = cipher {
1206 c.decrypt(&mut buf);
1207 }
1208
1209 if buf.len() == 4 {
1217 let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
1218 if code < 0 {
1219 return Err(InvocationError::Io(std::io::Error::new(
1220 std::io::ErrorKind::ConnectionRefused,
1221 format!("transport error from server: {code}"),
1222 )));
1223 }
1224 }
1225
1226 Ok(buf)
1227 }
1228
1229 async fn send_plain_frame(
1230 stream: &mut TcpStream,
1231 data: &[u8],
1232 cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1233 ) -> Result<(), InvocationError> {
1234 if !data.len().is_multiple_of(4) {
1237 let mut padded = data.to_vec();
1238 let pad = 4 - (data.len() % 4);
1239 padded.resize(data.len() + pad, 0);
1240 Self::send_abridged(stream, &padded, cipher).await
1241 } else {
1242 Self::send_abridged(stream, data, cipher).await
1243 }
1244 }
1245
1246 async fn recv_plain_frame<T: Deserializable>(
1247 stream: &mut TcpStream,
1248 cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1249 ) -> Result<T, InvocationError> {
1250 let raw = Self::recv_abridged(stream, cipher).await?;
1251 if raw.len() == 4 {
1254 let code = i32::from_le_bytes(raw[..4].try_into().unwrap());
1255 if code < 0 {
1256 return Err(InvocationError::Deserialize(format!(
1257 "server transport error during DH: code {code}"
1258 )));
1259 }
1260 }
1261 if raw.len() < 20 {
1262 return Err(InvocationError::Deserialize("plain frame too short".into()));
1263 }
1264 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
1265 return Err(InvocationError::Deserialize(
1266 "expected auth_key_id=0 in plaintext".into(),
1267 ));
1268 }
1269 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
1270 if raw.len() < 20 + body_len {
1271 return Err(InvocationError::Deserialize(format!(
1272 "plain frame truncated: have {} bytes, need {}",
1273 raw.len(),
1274 20 + body_len
1275 )));
1276 }
1277 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
1278 T::deserialize(&mut cur).map_err(Into::into)
1279 }
1280}
1281
1282fn pfs_pool_decode_bind_single(body: &[u8]) -> Result<(), String> {
1285 const RPC_RESULT: u32 = 0xf35c6d01;
1286 const BOOL_TRUE: u32 = 0x9972_75b5;
1287 const BOOL_FALSE: u32 = 0xbc79_9737;
1288 const RPC_ERROR: u32 = 0x2144_ca19;
1289 const BAD_MSG: u32 = 0xa7ef_f811;
1290 const BAD_SALT: u32 = 0xedab_447b;
1291 const NEW_SESSION: u32 = 0x9ec2_0908;
1292 const FUTURE_SALTS: u32 = 0xae50_0895;
1293 const MSGS_ACK: u32 = 0x62d6_b459; const PONG: u32 = 0x0347_73c5;
1295
1296 if body.len() < 4 {
1297 return Err("skip".into());
1298 }
1299 let ctor = u32::from_le_bytes(body[..4].try_into().unwrap());
1300
1301 match ctor {
1302 BOOL_TRUE => Ok(()),
1303 BOOL_FALSE => Err("server returned boolFalse (binding rejected)".into()),
1304 NEW_SESSION | FUTURE_SALTS | MSGS_ACK | PONG => Err("skip".into()),
1305
1306 RPC_RESULT if body.len() >= 16 => {
1307 let inner = u32::from_le_bytes(body[12..16].try_into().unwrap());
1308 match inner {
1309 BOOL_TRUE => Ok(()),
1310 BOOL_FALSE => Err("rpc_result{boolFalse} (server rejected binding)".into()),
1311 RPC_ERROR if body.len() >= 20 => {
1312 let code = i32::from_le_bytes(body[16..20].try_into().unwrap());
1313 let msg = tl_read_string(body.get(20..).unwrap_or(&[])).unwrap_or_default();
1314 Err(format!("rpc_error code={code} message={msg:?}"))
1315 }
1316 _ => Err(format!("rpc_result inner ctor={inner:#010x}")),
1317 }
1318 }
1319
1320 BAD_MSG if body.len() >= 16 => {
1321 let code = u32::from_le_bytes(body[12..16].try_into().unwrap());
1322 let desc = match code {
1323 16 => "msg_id too low (clock skew)",
1324 17 => "msg_id too high (clock skew)",
1325 18 => "incorrect lower 2 bits of msg_id",
1326 19 => "duplicate msg_id",
1327 20 => "message too old (>300s)",
1328 32 => "msg_seqno too low",
1329 33 => "msg_seqno too high",
1330 48 => "incorrect server salt",
1331 _ => "unknown code",
1332 };
1333 Err(format!("bad_msg_notification code={code} ({desc})"))
1334 }
1335
1336 BAD_SALT if body.len() >= 24 => {
1337 let new_salt = i64::from_le_bytes(body[16..24].try_into().unwrap());
1338 Err(format!(
1339 "bad_server_salt, server wants salt={new_salt:#018x}"
1340 ))
1341 }
1342
1343 _ => Err(format!("unknown ctor={ctor:#010x}")),
1344 }
1345}
1346
1347fn pfs_pool_decode_bind_response(body: &[u8]) -> Result<(), String> {
1352 const MSG_CONTAINER: u32 = 0x73f1f8dc;
1353
1354 if body.len() < 4 {
1355 return Err(format!("response body too short ({} bytes)", body.len()));
1356 }
1357 let ctor = u32::from_le_bytes(body[..4].try_into().unwrap());
1358
1359 if ctor != MSG_CONTAINER {
1360 return pfs_pool_decode_bind_single(body).map_err(|e| {
1361 if e == "skip" {
1362 "__need_more__".into()
1363 } else {
1364 e
1365 }
1366 });
1367 }
1368
1369 if body.len() < 8 {
1370 return Err("msg_container too short to read count".into());
1371 }
1372 let count = u32::from_le_bytes(body[4..8].try_into().unwrap()) as usize;
1373 let mut pos = 8usize;
1374 let mut last_real_err: Option<String> = None;
1375
1376 for i in 0..count {
1377 if pos + 16 > body.len() {
1378 return Err(format!(
1379 "msg_container truncated at message {i}/{count} (pos={pos} body_len={})",
1380 body.len()
1381 ));
1382 }
1383 let msg_bytes = u32::from_le_bytes(body[pos + 12..pos + 16].try_into().unwrap()) as usize;
1384 pos += 16;
1385
1386 if pos + msg_bytes > body.len() {
1387 return Err(format!(
1388 "msg_container message {i} body overflows (need {msg_bytes}, have {})",
1389 body.len() - pos
1390 ));
1391 }
1392 let msg_body = &body[pos..pos + msg_bytes];
1393 pos += msg_bytes;
1394
1395 match pfs_pool_decode_bind_single(msg_body) {
1396 Ok(()) => return Ok(()),
1397 Err(e) if e == "skip" => continue,
1398 Err(e) => {
1399 last_real_err = Some(e);
1400 }
1401 }
1402 }
1403
1404 Err(last_real_err.unwrap_or_else(|| "__need_more__".into()))
1405}
1406
1407fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
1408 if data.is_empty() {
1409 return Some(vec![]);
1410 }
1411 let (len, start) = if data[0] < 254 {
1412 (data[0] as usize, 1)
1413 } else if data.len() >= 4 {
1414 (
1415 data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16,
1416 4,
1417 )
1418 } else {
1419 return None;
1420 };
1421 if data.len() < start + len {
1422 return None;
1423 }
1424 Some(data[start..start + len].to_vec())
1425}
1426
1427fn tl_read_string(data: &[u8]) -> Option<String> {
1428 tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
1429}