1use ferogram_mtproto::{
14 EncryptedSession, SeenMsgIds, Session, authentication as auth, new_seen_msg_ids, step2_temp,
15};
16use ferogram_tl_types as tl;
17use ferogram_tl_types::{Cursor, Deserializable, RemoteCall};
18use tokio::io::{AsyncReadExt, AsyncWriteExt};
19use tokio::net::TcpStream;
20
21use crate::errors::InvocationError;
22use crate::pool::{build_msgs_ack_body, build_msgs_ack_ping_body};
23use ferogram_connect::TransportKind;
24#[allow(unused_imports)]
26use metrics::{counter, histogram};
27
28const PENDING_ACKS_THRESHOLD: usize = 10;
31
32const PING_EVERY_N_CHUNKS: u32 = 5;
35
36pub struct DcConnection {
37 stream: TcpStream,
38 enc: EncryptedSession,
39 pending_acks: Vec<i64>,
40 call_count: u32,
41 cipher: Option<ferogram_crypto::ObfuscatedCipher>,
43 #[allow(dead_code)]
45 seen_msg_ids: SeenMsgIds,
46}
47
48impl DcConnection {
49 #[tracing::instrument(skip(socks5), fields(addr = %addr, dc_id = dc_id))]
51 pub async fn connect_fastest(
52 addr: &str,
53 socks5: Option<&ferogram_connect::Socks5Config>,
54 dc_id: i16,
55 ) -> Result<(Self, &'static str), InvocationError> {
56 use tokio::task::JoinSet;
57 let addr = addr.to_owned();
58 let socks5 = socks5.cloned();
59 tracing::debug!("[dc_pool] probing {addr} with 3 transports");
60 let mut set: JoinSet<Result<(DcConnection, &'static str), InvocationError>> =
61 JoinSet::new();
62
63 {
64 let a = addr.clone();
65 let s = socks5.clone();
66 set.spawn(async move {
67 Ok((
68 DcConnection::connect_raw(
69 &a,
70 s.as_ref(),
71 &TransportKind::Obfuscated { secret: None },
72 dc_id,
73 )
74 .await?,
75 "Obfuscated",
76 ))
77 });
78 }
79 {
80 let a = addr.clone();
81 let s = socks5.clone();
82 set.spawn(async move {
83 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
84 Ok((
85 DcConnection::connect_raw(&a, s.as_ref(), &TransportKind::Abridged, dc_id)
86 .await?,
87 "Abridged",
88 ))
89 });
90 }
91 {
92 let a = addr.clone();
93 set.spawn(async move {
94 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
95 Ok((
96 DcConnection::connect_raw(&a, None, &TransportKind::Http, dc_id).await?,
97 "Http",
98 ))
99 });
100 }
101
102 let mut last_err = InvocationError::Deserialize("connect_fastest: no candidates".into());
103 while let Some(outcome) = set.join_next().await {
104 match outcome {
105 Ok(Ok((conn, label))) => {
106 set.abort_all();
107 return Ok((conn, label));
108 }
109 Ok(Err(e)) => {
110 last_err = e;
111 }
112 Err(e) if e.is_cancelled() => {}
113 Err(_) => {}
114 }
115 }
116 Err(last_err)
117 }
118
119 #[tracing::instrument(skip(socks5, transport), fields(addr = %addr, dc_id = dc_id))]
121 pub async fn connect_raw(
122 addr: &str,
123 socks5: Option<&ferogram_connect::Socks5Config>,
124 transport: &TransportKind,
125 dc_id: i16,
126 ) -> Result<Self, InvocationError> {
127 tracing::debug!("[dc_pool] Connecting to {addr} …");
128 let mut stream = Self::open_tcp(addr, socks5).await?;
129 let mut cipher = Self::send_transport_init(&mut stream, transport, dc_id).await?;
130
131 let mut plain = Session::new();
132
133 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
134 Self::send_plain_frame(
135 &mut stream,
136 &plain.pack(&req1).to_plaintext_bytes(),
137 cipher.as_mut(),
138 )
139 .await?;
140 let res_pq: tl::enums::ResPq = Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
141
142 let (req2, s2) = auth::step2(s1, res_pq, dc_id as i32)
143 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
144 Self::send_plain_frame(
145 &mut stream,
146 &plain.pack(&req2).to_plaintext_bytes(),
147 cipher.as_mut(),
148 )
149 .await?;
150 let dh: tl::enums::ServerDhParams =
151 Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
152
153 let (req3, s3) =
154 auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
155 Self::send_plain_frame(
156 &mut stream,
157 &plain.pack(&req3).to_plaintext_bytes(),
158 cipher.as_mut(),
159 )
160 .await?;
161 let ans: tl::enums::SetClientDhParamsAnswer =
162 Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
163
164 let done = {
166 let mut result =
167 auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
168 let mut attempts = 0u8;
169 loop {
170 match result {
171 auth::FinishResult::Done(d) => break d,
172 auth::FinishResult::Retry {
173 retry_id,
174 dh_params,
175 nonce,
176 server_nonce,
177 new_nonce,
178 } => {
179 attempts += 1;
180 if attempts >= 5 {
181 return Err(InvocationError::Deserialize(
182 "dh_gen_retry exceeded 5 attempts".into(),
183 ));
184 }
185 let (req_retry, s3_retry) =
186 auth::retry_step3(&dh_params, nonce, server_nonce, new_nonce, retry_id)
187 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
188 Self::send_plain_frame(
189 &mut stream,
190 &plain.pack(&req_retry).to_plaintext_bytes(),
191 cipher.as_mut(),
192 )
193 .await?;
194 let ans_retry: tl::enums::SetClientDhParamsAnswer =
195 Self::recv_plain_frame(&mut stream, cipher.as_mut()).await?;
196 result = auth::finish(s3_retry, ans_retry)
197 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
198 }
199 }
200 }
201 };
202 tracing::debug!("[dc_pool] DH complete ✓ for {addr}");
203
204 let seen = new_seen_msg_ids();
205 Ok(Self {
206 stream,
207 cipher,
208 enc: EncryptedSession::with_seen(
209 done.auth_key,
210 done.first_salt,
211 done.time_offset,
212 seen.clone(),
213 ),
214 pending_acks: Vec::new(),
215 call_count: 0,
216 seen_msg_ids: seen,
217 })
218 }
219
220 #[allow(clippy::too_many_arguments)]
223 pub async fn connect_with_key(
224 addr: &str,
225 auth_key: [u8; 256],
226 first_salt: i64,
227 time_offset: i32,
228 socks5: Option<&ferogram_connect::Socks5Config>,
229 mtproxy: Option<&ferogram_connect::MtProxyConfig>,
230 transport: &TransportKind,
231 dc_id: i16,
232 pfs: bool,
233 ) -> Result<Self, InvocationError> {
234 let (mut stream, mut cipher) = if let Some(mp) = mtproxy {
235 let mut s = mp.connect().await?;
236 s.set_nodelay(true)?;
237 let c = Self::send_transport_init(&mut s, &mp.transport, dc_id).await?;
238 (s, c)
239 } else {
240 let mut s = Self::open_tcp(addr, socks5).await?;
241 let c = Self::send_transport_init(&mut s, transport, dc_id).await?;
242 (s, c)
243 };
244
245 if pfs {
246 tracing::debug!("[dc_pool] PFS: temp DH bind for DC{dc_id}");
247 match Self::do_pool_pfs_bind(&mut stream, cipher.as_mut(), &auth_key, dc_id).await {
248 Ok(temp_enc) => {
249 tracing::info!("[dc_pool] PFS bind complete DC{dc_id}");
250 return Ok(Self {
251 stream,
252 cipher,
253 enc: temp_enc,
254 pending_acks: Vec::new(),
255 call_count: 0,
256 seen_msg_ids: new_seen_msg_ids(),
257 });
258 }
259 Err(e) => {
260 tracing::warn!("[dc_pool] PFS bind failed DC{dc_id} ({e}); falling back");
261 return Err(e);
262 }
263 }
264 }
265
266 let seen = new_seen_msg_ids();
267 Ok(Self {
268 stream,
269 cipher,
270 enc: EncryptedSession::with_seen(auth_key, first_salt, time_offset, seen.clone()),
271 pending_acks: Vec::new(),
272 call_count: 0,
273 seen_msg_ids: seen,
274 })
275 }
276
277 #[allow(clippy::needless_option_as_deref)]
279 async fn do_pool_pfs_bind(
280 stream: &mut tokio::net::TcpStream,
281 mut cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
282 perm_auth_key: &[u8; 256],
283 dc_id: i16,
284 ) -> Result<EncryptedSession, InvocationError> {
285 use ferogram_mtproto::{
286 auth_key_id_from_key, encrypt_bind_inner, gen_msg_id, new_seen_msg_ids,
287 serialize_bind_temp_auth_key,
288 };
289 const TEMP_EXPIRES: i32 = 86_400; let mut plain = ferogram_mtproto::Session::new();
293
294 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
295 Self::send_plain_frame(
296 stream,
297 &plain.pack(&req1).to_plaintext_bytes(),
298 cipher.as_deref_mut(),
299 )
300 .await?;
301 let res_pq: tl::enums::ResPq =
302 Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
303
304 let (req2, s2) = step2_temp(s1, res_pq, dc_id as i32, TEMP_EXPIRES)
305 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
306 Self::send_plain_frame(
307 stream,
308 &plain.pack(&req2).to_plaintext_bytes(),
309 cipher.as_deref_mut(),
310 )
311 .await?;
312 let dh: tl::enums::ServerDhParams =
313 Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
314
315 let (req3, s3) =
316 auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
317 Self::send_plain_frame(
318 stream,
319 &plain.pack(&req3).to_plaintext_bytes(),
320 cipher.as_deref_mut(),
321 )
322 .await?;
323 let ans: tl::enums::SetClientDhParamsAnswer =
324 Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
325
326 let done = {
327 let mut result =
328 auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
329 let mut attempts = 0u8;
330 loop {
331 match result {
332 ferogram_mtproto::FinishResult::Done(d) => break d,
333 ferogram_mtproto::FinishResult::Retry {
334 retry_id,
335 dh_params,
336 nonce,
337 server_nonce,
338 new_nonce,
339 } => {
340 attempts += 1;
341 if attempts >= 5 {
342 return Err(InvocationError::Deserialize(
343 "PFS pool temp DH retry exceeded 5".into(),
344 ));
345 }
346 let (rr, s3r) = ferogram_mtproto::retry_step3(
347 &dh_params,
348 nonce,
349 server_nonce,
350 new_nonce,
351 retry_id,
352 )
353 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
354 Self::send_plain_frame(
355 stream,
356 &plain.pack(&rr).to_plaintext_bytes(),
357 cipher.as_deref_mut(),
358 )
359 .await?;
360 let ar: tl::enums::SetClientDhParamsAnswer =
361 Self::recv_plain_frame(stream, cipher.as_deref_mut()).await?;
362 result = auth::finish(s3r, ar)
363 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
364 }
365 }
366 }
367 };
368
369 let temp_key = done.auth_key;
370 let temp_salt = done.first_salt;
371 let temp_offset = done.time_offset;
372
373 let temp_key_id = auth_key_id_from_key(&temp_key);
375 let perm_key_id = auth_key_id_from_key(perm_auth_key);
376
377 let mut nonce_buf = [0u8; 8];
378 getrandom::getrandom(&mut nonce_buf)
379 .map_err(|_| InvocationError::Deserialize("getrandom nonce".into()))?;
380 let nonce = i64::from_le_bytes(nonce_buf);
381
382 let server_now = std::time::SystemTime::now()
383 .duration_since(std::time::UNIX_EPOCH)
384 .expect("system clock is before UNIX epoch")
385 .as_secs() as i32
386 + temp_offset;
387 let expires_at = server_now + TEMP_EXPIRES;
388
389 let seen = new_seen_msg_ids();
390 let mut temp_enc = EncryptedSession::with_seen(temp_key, temp_salt, temp_offset, seen);
391 let temp_session_id = temp_enc.session_id();
392
393 let msg_id = gen_msg_id();
394 let enc_msg = encrypt_bind_inner(
395 perm_auth_key,
396 msg_id,
397 nonce,
398 temp_key_id,
399 perm_key_id,
400 temp_session_id,
401 expires_at,
402 );
403 let bind_body = serialize_bind_temp_auth_key(perm_key_id, nonce, expires_at, &enc_msg);
404
405 let wire = temp_enc.pack_body_at_msg_id(&bind_body, msg_id);
407 Self::send_abridged(stream, &wire, cipher.as_deref_mut()).await?;
408
409 for attempt in 0u8..5 {
413 let mut raw = Self::recv_abridged(stream, cipher.as_deref_mut()).await?;
414 let decrypted = temp_enc.unpack(&mut raw).map_err(|e| {
415 InvocationError::Deserialize(format!("PFS pool bind decrypt: {e:?}"))
416 })?;
417 match pfs_pool_decode_bind_response(&decrypted.body) {
418 Ok(()) => {
419 return Ok(temp_enc);
423 }
424 Err(ref e) if e == "__need_more__" => {
425 tracing::debug!(
426 "[ferogram] PFS pool bind (DC{dc_id}): informational frame {attempt}, reading next"
427 );
428 continue;
429 }
430 Err(reason) => {
431 tracing::error!(
432 "[ferogram] PFS pool bind server response (DC{dc_id}): {reason}"
433 );
434 return Err(InvocationError::Deserialize(format!(
435 "auth.bindTempAuthKey (pool): {reason}"
436 )));
437 }
438 }
439 }
440 Err(InvocationError::Deserialize(
441 "auth.bindTempAuthKey (pool): no boolTrue after 5 frames".into(),
442 ))
443 }
444
445 async fn open_tcp(
446 addr: &str,
447 socks5: Option<&ferogram_connect::Socks5Config>,
448 ) -> Result<TcpStream, InvocationError> {
449 let stream = match socks5 {
450 Some(proxy) => proxy.connect(addr).await?,
451 None => TcpStream::connect(addr).await?,
452 };
453 stream.set_nodelay(true)?;
455 {
457 let sock = socket2::SockRef::from(&stream);
458 let ka = socket2::TcpKeepalive::new()
459 .with_time(std::time::Duration::from_secs(10))
460 .with_interval(std::time::Duration::from_secs(5));
461 #[cfg(not(target_os = "windows"))]
462 let ka = ka.with_retries(3);
463 sock.set_tcp_keepalive(&ka).ok();
464 }
465 Ok(stream)
466 }
467
468 async fn send_transport_init(
469 stream: &mut TcpStream,
470 transport: &TransportKind,
471 dc_id: i16,
472 ) -> Result<Option<ferogram_crypto::ObfuscatedCipher>, InvocationError> {
473 match transport {
474 TransportKind::Abridged => {
475 stream.write_all(&[0xef]).await?;
476 }
477 TransportKind::Intermediate => {
478 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
479 }
480 TransportKind::Full => {}
481 TransportKind::Obfuscated { secret } => {
482 use sha2::Digest;
483 let mut nonce = [0u8; 64];
484 loop {
485 getrandom::getrandom(&mut nonce)
486 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
487 let first =
488 u32::from_le_bytes(nonce[0..4].try_into().expect("nonce is [u8;64]"));
489 let second =
490 u32::from_le_bytes(nonce[4..8].try_into().expect("nonce is [u8;64]"));
491 let bad = nonce[0] == 0xEF
492 || first == 0x44414548
493 || first == 0x54534F50
494 || first == 0x20544547
495 || first == 0x4954504f || first == 0xEEEEEEEE
497 || first == 0xDDDDDDDD
498 || first == 0x02010316
499 || second == 0x00000000;
500 if !bad {
501 break;
502 }
503 }
504 let tx_raw: [u8; 32] = nonce[8..40].try_into().expect("nonce is [u8;64]");
505 let tx_iv: [u8; 16] = nonce[40..56].try_into().expect("nonce is [u8;64]");
506 let mut rev48 = nonce[8..56].to_vec();
507 rev48.reverse();
508 let rx_raw: [u8; 32] = rev48[0..32]
509 .try_into()
510 .expect("rev48 is nonce[8..56].reversed(), len=48");
511 let rx_iv: [u8; 16] = rev48[32..48]
512 .try_into()
513 .expect("rev48 is nonce[8..56].reversed(), len=48");
514 let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
515 let mut h = sha2::Sha256::new();
516 h.update(tx_raw);
517 h.update(s.as_ref());
518 let tx: [u8; 32] = h.finalize().into();
519 let mut h = sha2::Sha256::new();
520 h.update(rx_raw);
521 h.update(s.as_ref());
522 let rx: [u8; 32] = h.finalize().into();
523 (tx, rx)
524 } else {
525 (tx_raw, rx_raw)
526 };
527 nonce[56] = 0xef;
528 nonce[57] = 0xef;
529 nonce[58] = 0xef;
530 nonce[59] = 0xef;
531 let dc_bytes = dc_id.to_le_bytes();
532 nonce[60] = dc_bytes[0];
533 nonce[61] = dc_bytes[1];
534 let mut enc =
535 ferogram_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
536 let mut skip = [0u8; 56];
537 enc.encrypt(&mut skip);
538 enc.encrypt(&mut nonce[56..64]);
539 stream.write_all(&nonce).await?;
540 return Ok(Some(enc));
541 }
542 TransportKind::PaddedIntermediate { secret } => {
543 use sha2::Digest;
544 let mut nonce = [0u8; 64];
545 loop {
546 getrandom::getrandom(&mut nonce)
547 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
548 let first =
549 u32::from_le_bytes(nonce[0..4].try_into().expect("nonce is [u8;64]"));
550 let second =
551 u32::from_le_bytes(nonce[4..8].try_into().expect("nonce is [u8;64]"));
552 let bad = nonce[0] == 0xEF
553 || first == 0x44414548
554 || first == 0x54534F50
555 || first == 0x20544547
556 || first == 0x4954504f
557 || first == 0xEEEEEEEE
558 || first == 0xDDDDDDDD
559 || first == 0x02010316
560 || second == 0x00000000;
561 if !bad {
562 break;
563 }
564 }
565 let tx_raw: [u8; 32] = nonce[8..40].try_into().expect("nonce is [u8;64]");
566 let tx_iv: [u8; 16] = nonce[40..56].try_into().expect("nonce is [u8;64]");
567 let mut rev48 = nonce[8..56].to_vec();
568 rev48.reverse();
569 let rx_raw: [u8; 32] = rev48[0..32]
570 .try_into()
571 .expect("rev48 is nonce[8..56].reversed(), len=48");
572 let rx_iv: [u8; 16] = rev48[32..48]
573 .try_into()
574 .expect("rev48 is nonce[8..56].reversed(), len=48");
575 let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
576 let mut h = sha2::Sha256::new();
577 h.update(tx_raw);
578 h.update(s.as_ref());
579 let tx: [u8; 32] = h.finalize().into();
580 let mut h = sha2::Sha256::new();
581 h.update(rx_raw);
582 h.update(s.as_ref());
583 let rx: [u8; 32] = h.finalize().into();
584 (tx, rx)
585 } else {
586 (tx_raw, rx_raw)
587 };
588 nonce[56] = 0xdd;
589 nonce[57] = 0xdd;
590 nonce[58] = 0xdd;
591 nonce[59] = 0xdd;
592 let dc_bytes = dc_id.to_le_bytes();
593 nonce[60] = dc_bytes[0];
594 nonce[61] = dc_bytes[1];
595 let mut enc =
596 ferogram_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
597 let mut skip = [0u8; 56];
598 enc.encrypt(&mut skip);
599 enc.encrypt(&mut nonce[56..64]);
600 stream.write_all(&nonce).await?;
601 return Ok(Some(enc));
602 }
603 TransportKind::FakeTls { .. } => {
604 return Err(InvocationError::Deserialize(
608 "FakeTls transport is not supported for DcPool connections".into(),
609 ));
610 }
611 TransportKind::Http => {}
612 }
613 Ok(None)
614 }
615
616 pub fn auth_key_bytes(&self) -> [u8; 256] {
617 self.enc.auth_key_bytes()
618 }
619 pub fn first_salt(&self) -> i64 {
620 self.enc.salt
621 }
622 pub fn time_offset(&self) -> i32 {
623 self.enc.time_offset
624 }
625
626 #[tracing::instrument(skip(self, req), fields(method = std::any::type_name::<R>()))]
627 pub async fn rpc_call<R: RemoteCall>(&mut self, req: &R) -> Result<Vec<u8>, InvocationError> {
628 let _t0 = std::time::Instant::now();
629 self.call_count += 1;
632 if self.call_count.is_multiple_of(PING_EVERY_N_CHUNKS) {
633 let ping_id = self.call_count as i64;
634 let ping_body = build_msgs_ack_ping_body(ping_id);
635 let (ping_wire, _) = self.enc.pack_body_with_msg_id(&ping_body, true);
637 let _ = Self::send_abridged(&mut self.stream, &ping_wire, self.cipher.as_mut()).await;
645 }
646
647 if !self.pending_acks.is_empty() {
649 let ack_body = build_msgs_ack_body(&self.pending_acks);
650 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
651 let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
652 self.pending_acks.clear();
653 }
654
655 let (wire, mut sent_msg_id) = self.enc.pack_with_msg_id(req);
657 Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
658 let mut salt_retries = 0u8;
659 let mut session_resets = 0u8;
660 loop {
661 let mut raw = Self::recv_abridged(&mut self.stream, self.cipher.as_mut()).await?;
662 let msg = self
663 .enc
664 .unpack(&mut raw)
665 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
666 self.pending_acks.push(msg.msg_id);
668 if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
669 let ack_body = build_msgs_ack_body(&self.pending_acks);
672 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
673 let _ =
674 Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
675 self.pending_acks.clear();
676 }
677 if msg.body.len() < 4 {
679 return Ok(msg.body);
680 }
681 let mut need_resend = false;
682 let mut need_session_reset = false;
683 let mut bad_msg_code: Option<u32> = None;
684 let mut bad_msg_server_id: Option<i64> = None;
685 let scan_result = Self::scan_body(
688 &msg.body,
689 &mut self.enc.salt,
690 &mut need_resend,
691 &mut need_session_reset,
692 &mut bad_msg_code,
693 &mut bad_msg_server_id,
694 Some(sent_msg_id),
695 msg.msg_id,
696 )?;
697 if need_session_reset {
699 session_resets += 1;
700 if session_resets > 2 {
701 return Err(InvocationError::Deserialize(
702 "new_session_created: exceeded 2 resets".into(),
703 ));
704 }
705 if !self.pending_acks.is_empty() {
706 let ack_body = build_msgs_ack_body(&self.pending_acks);
707 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
708 let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
709 .await;
710 self.pending_acks.clear();
711 }
712 if scan_result.is_none() {
716 tracing::debug!(
718 "[dc_pool] new_session_created: resending [{session_resets}/2]"
719 );
720 let (wire, new_id) = self.enc.pack_with_msg_id(req);
721 sent_msg_id = new_id;
722 Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
723 }
724 } else if need_resend {
728 match bad_msg_code {
730 Some(16) | Some(17) => {
731 if let Some(srv_id) = bad_msg_server_id {
732 self.enc.correct_time_offset(srv_id);
733 }
734 }
739 Some(32) | Some(33) => {
740 self.enc
743 .correct_seq_no(bad_msg_code.expect("matched Some arm"));
744 }
745 _ => {
746 self.enc.undo_seq_no();
748 }
749 }
750 salt_retries += 1;
751 if salt_retries >= 5 {
752 return Err(InvocationError::Deserialize(
753 "bad_server_salt/bad_msg: exceeded 5 retries".into(),
754 ));
755 }
756 tracing::debug!(
757 "[dc_pool] resend in transfer conn (code={bad_msg_code:?}) [{salt_retries}/5]"
758 );
759 if !self.pending_acks.is_empty() {
760 let ack_body = build_msgs_ack_body(&self.pending_acks);
761 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
762 let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
763 .await;
764 self.pending_acks.clear();
765 }
766 let (wire, new_id) = self.enc.pack_with_msg_id(req);
767 sent_msg_id = new_id;
768 Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
769 }
770 if let Some(result) = scan_result {
771 metrics::counter!("ferogram.rpc_calls_total", "result" => "ok").increment(1);
772 metrics::histogram!("ferogram.rpc_latency_ms")
773 .record(_t0.elapsed().as_millis() as f64);
774 return Ok(result);
775 }
776 }
777 }
778 #[allow(clippy::too_many_arguments)]
791 fn scan_body(
792 body: &[u8],
793 salt: &mut i64,
794 need_resend: &mut bool,
795 need_session_reset: &mut bool,
796 bad_msg_code: &mut Option<u32>,
797 bad_msg_server_id: &mut Option<i64>,
798 sent_msg_id: Option<i64>,
799 server_msg_id: i64,
800 ) -> Result<Option<Vec<u8>>, InvocationError> {
801 if body.len() < 4 {
802 return Ok(None);
803 }
804 let cid = u32::from_le_bytes(body[..4].try_into().expect("body.len() >= 4 checked above"));
805 match cid {
806 0xf35c6d01 => {
807 if body.len() >= 12
808 && let Some(expected) = sent_msg_id {
809 let resp_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 12 checked above"));
810 if resp_id != expected {
811 tracing::debug!(
812 "[dc_pool] rpc_result req_msg_id mismatch \
813 (got {resp_id:#018x}, want {expected:#018x}); skipping"
814 );
815 return Ok(None);
816 }
817 }
818 let inner = if body.len() >= 12 { &body[12..] } else { body };
819 if inner.len() >= 4
821 && u32::from_le_bytes(inner[..4].try_into().expect("inner.len() >= 4 checked above")) == 0x3072cfa1
822 {
823 let mut dummy_salt = *salt;
824 let mut nr = false; let mut nsr = false;
825 let mut bc = None; let mut bsi = None;
826 if let Some(r) = Self::scan_body(inner, &mut dummy_salt, &mut nr, &mut nsr, &mut bc, &mut bsi, None, server_msg_id)? {
827 return Ok(Some(r));
828 }
829 if let Some(compressed) = tl_read_bytes(&inner[4..]) {
831 let dec = flate2::read::GzDecoder::new(compressed.as_slice());
832 let mut limited = std::io::Read::take(dec, 16 * 1024 * 1024);
833 let mut out = Vec::new();
834 if std::io::Read::read_to_end(&mut limited, &mut out).is_ok() {
835 return Ok(Some(out));
836 }
837 }
838 return Ok(None);
839 }
840 if inner.len() >= 8
841 && u32::from_le_bytes(inner[..4].try_into().expect("inner.len() >= 8 checked above")) == 0x2144ca19
842 {
843 let code = i32::from_le_bytes(inner[4..8].try_into().expect("inner.len() >= 8 checked above"));
844 let message = tl_read_string(&inner[8..]).unwrap_or_default();
845 return Err(InvocationError::Rpc(
846 crate::errors::RpcError::from_telegram(code, &message),
847 ));
848 }
849 Ok(Some(inner.to_vec()))
850 }
851 0x2144ca19 => {
852 if body.len() < 8 {
853 return Err(InvocationError::Deserialize("rpc_error short".into()));
854 }
855 let code = i32::from_le_bytes(body[4..8].try_into().expect("body.len() >= 8 checked above"));
856 let message = tl_read_string(&body[8..]).unwrap_or_default();
857 Err(InvocationError::Rpc(crate::errors::RpcError::from_telegram(code, &message)))
858 }
859 0xedab447b => {
860 if body.len() >= 28 {
862 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 28 checked above"));
863 let new_salt = i64::from_le_bytes(body[20..28].try_into().expect("body.len() >= 28 checked above"));
864 if sent_msg_id.is_none_or(|id| id == bad_msg_id) {
867 *salt = new_salt;
868 *need_resend = true;
869 }
870 }
871 Ok(None)
872 }
873 0x9ec20908 => {
874 if body.len() >= 28 {
877 let first_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 28 checked above"));
878 let unique_id = i64::from_le_bytes(body[12..20].try_into().expect("body.len() >= 28 checked above"));
879 let server_salt = i64::from_le_bytes(body[20..28].try_into().expect("body.len() >= 28 checked above"));
880 tracing::debug!(
881 "[dc_pool] new_session_created: unique_id={unique_id:#018x} \
882 first_msg_id={first_msg_id} salt={server_salt}"
883 );
884 *salt = server_salt;
885 if sent_msg_id.is_some_and(|id| id < first_msg_id) {
891 *need_session_reset = true;
892 }
893 }
894 Ok(None)
895 }
896 0xa7eff811 => {
897 if body.len() >= 20 {
903 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 20 checked above"));
904 let error_code = u32::from_le_bytes(body[16..20].try_into().expect("body.len() >= 20 checked above"));
906 tracing::debug!(
907 "[dc_pool] bad_msg_notification: bad_msg_id={bad_msg_id:#018x} code={error_code}"
908 );
909 match error_code {
910 16 | 17 => {
911 *bad_msg_code = Some(error_code);
915 *bad_msg_server_id = Some(server_msg_id);
916 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
917 }
918 32 | 33 => {
919 *bad_msg_code = Some(error_code);
921 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
922 }
923 48 => {
924 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
931 tracing::debug!(
932 "[dc_pool] bad_msg code 48 (wrong salt): will resend with current salt"
933 );
934 }
935 _ => {
936 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
938 }
939 }
940 }
941 Ok(None)
942 }
943 0x347773c5 => {
944 if body.len() >= 12
950 && let Some(expected) = sent_msg_id
951 {
952 let pong_req_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 12 for pong"));
953 if pong_req_id == expected {
954 return Ok(Some(body.to_vec()));
955 }
956 }
957 Ok(None)
959 }
960 0x73f1f8dc => {
961 if body.len() < 8 {
962 return Ok(None);
963 }
964 let count = u32::from_le_bytes(body[4..8].try_into().expect("body.len() >= 8 for msg_container")) as usize;
965 let mut pos = 8usize;
966 let mut found: Option<Vec<u8>> = None;
969 for _ in 0..count {
970 if pos + 16 > body.len() { break; }
971 let inner_bytes =
972 u32::from_le_bytes(body[pos + 12..pos + 16].try_into().expect("pos+16 <= body.len() checked above")) as usize;
973 pos += 16;
974 if pos + inner_bytes > body.len() { break; }
975 let inner = &body[pos..pos + inner_bytes];
976 pos += inner_bytes;
977 if found.is_none() {
978 if let Some(r) = Self::scan_body(inner, salt, need_resend,
979 need_session_reset, bad_msg_code, bad_msg_server_id, sent_msg_id,
980 server_msg_id)?
981 {
982 found = Some(r);
983 }
986 } else {
987 let _ = Self::scan_body(inner, salt, need_resend, need_session_reset,
993 bad_msg_code, bad_msg_server_id, sent_msg_id,
994 server_msg_id)?;
995 }
996 }
997 Ok(found)
998 }
999 0x3072cfa1 => {
1000 if let Some(compressed) = tl_read_bytes(&body[4..]) {
1002 let decoder = flate2::read::GzDecoder::new(compressed.as_slice());
1003 let mut limited = std::io::Read::take(decoder, 16 * 1024 * 1024);
1004 let mut decompressed = Vec::new();
1005 if std::io::Read::read_to_end(&mut limited, &mut decompressed).is_ok()
1006 && !decompressed.is_empty()
1007 {
1008 return Self::scan_body(
1009 &decompressed, salt,
1010 need_resend, need_session_reset,
1011 bad_msg_code, bad_msg_server_id,
1012 sent_msg_id,
1013 server_msg_id,
1014 );
1015 }
1016 }
1017 Ok(None)
1018 }
1019 _ => Ok(None),
1020 }
1021 }
1022
1023 pub async fn rpc_call_serializable<S: ferogram_tl_types::Serializable>(
1025 &mut self,
1026 req: &S,
1027 ) -> Result<Vec<u8>, InvocationError> {
1028 if !self.pending_acks.is_empty() {
1029 let ack_body = build_msgs_ack_body(&self.pending_acks);
1030 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1031 let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
1032 self.pending_acks.clear();
1033 }
1034 let (wire, mut sent_msg_id) = self.enc.pack_serializable_with_msg_id(req);
1035 Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
1036 let mut salt_retries = 0u8;
1037 let mut session_resets = 0u8;
1038 loop {
1039 let mut raw = Self::recv_abridged(&mut self.stream, self.cipher.as_mut()).await?;
1040 let msg = self
1041 .enc
1042 .unpack(&mut raw)
1043 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
1044 self.pending_acks.push(msg.msg_id);
1045 if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
1046 let ack_body = build_msgs_ack_body(&self.pending_acks);
1047 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1048 let _ =
1049 Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut()).await;
1050 self.pending_acks.clear();
1051 }
1052 if msg.body.len() < 4 {
1054 return Ok(msg.body);
1055 }
1056 let mut need_resend = false;
1057 let mut need_session_reset = false;
1058 let mut bad_msg_code: Option<u32> = None;
1059 let mut bad_msg_server_id: Option<i64> = None;
1060 let scan_result = Self::scan_body(
1062 &msg.body,
1063 &mut self.enc.salt,
1064 &mut need_resend,
1065 &mut need_session_reset,
1066 &mut bad_msg_code,
1067 &mut bad_msg_server_id,
1068 Some(sent_msg_id),
1069 msg.msg_id,
1070 )?;
1071 if need_session_reset {
1072 session_resets += 1;
1073 if session_resets > 2 {
1074 return Err(InvocationError::Deserialize(
1075 "new_session_created (serializable): exceeded 2 resets".into(),
1076 ));
1077 }
1078 if !self.pending_acks.is_empty() {
1079 let ack_body = build_msgs_ack_body(&self.pending_acks);
1080 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1081 let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
1082 .await;
1083 self.pending_acks.clear();
1084 }
1085 if scan_result.is_none() {
1086 let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
1087 sent_msg_id = new_id;
1088 Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
1089 }
1090 } else if need_resend {
1091 match bad_msg_code {
1092 Some(16) | Some(17) => {
1093 if let Some(srv_id) = bad_msg_server_id {
1094 self.enc.correct_time_offset(srv_id);
1095 }
1096 }
1098 Some(32) | Some(33) => {
1099 self.enc
1100 .correct_seq_no(bad_msg_code.expect("matched Some arm"));
1101 }
1102 _ => {
1103 self.enc.undo_seq_no();
1104 }
1105 }
1106 salt_retries += 1;
1107 if salt_retries >= 5 {
1108 return Err(InvocationError::Deserialize(
1109 "bad_server_salt (serializable): exceeded 5 retries".into(),
1110 ));
1111 }
1112 tracing::debug!(
1113 "[dc_pool] resend serializable (code={bad_msg_code:?}) [{salt_retries}/5]"
1114 );
1115 if !self.pending_acks.is_empty() {
1116 let ack_body = build_msgs_ack_body(&self.pending_acks);
1117 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
1118 let _ = Self::send_abridged(&mut self.stream, &ack_wire, self.cipher.as_mut())
1119 .await;
1120 self.pending_acks.clear();
1121 }
1122 let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
1123 sent_msg_id = new_id;
1124 Self::send_abridged(&mut self.stream, &wire, self.cipher.as_mut()).await?;
1125 }
1126 if let Some(result) = scan_result {
1127 return Ok(result);
1128 }
1129 }
1130 }
1131
1132 pub async fn rpc_call_raw(&mut self, body: &[u8]) -> Result<Vec<u8>, InvocationError> {
1135 Self::send_abridged(&mut self.stream, body, self.cipher.as_mut()).await?;
1136 Self::recv_abridged(&mut self.stream, self.cipher.as_mut()).await
1137 }
1138
1139 async fn send_abridged(
1140 stream: &mut TcpStream,
1141 data: &[u8],
1142 cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1143 ) -> Result<(), InvocationError> {
1144 let words = data.len() / 4;
1146 let mut frame = if words < 0x7f {
1147 let mut v = Vec::with_capacity(1 + data.len());
1148 v.push(words as u8);
1149 v
1150 } else {
1151 let mut v = Vec::with_capacity(4 + data.len());
1152 v.extend_from_slice(&[
1153 0x7f,
1154 (words & 0xff) as u8,
1155 ((words >> 8) & 0xff) as u8,
1156 ((words >> 16) & 0xff) as u8,
1157 ]);
1158 v
1159 };
1160 frame.extend_from_slice(data);
1161 if let Some(c) = cipher {
1162 c.encrypt(&mut frame);
1163 }
1164 stream.write_all(&frame).await?;
1165 Ok(())
1166 }
1167
1168 async fn recv_abridged(
1169 stream: &mut TcpStream,
1170 mut cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1171 ) -> Result<Vec<u8>, InvocationError> {
1172 use tokio::time::{Duration, timeout};
1174 const RECV_TIMEOUT: Duration = Duration::from_secs(60);
1175
1176 let mut h = [0u8; 1];
1177 timeout(RECV_TIMEOUT, stream.read_exact(&mut h))
1178 .await
1179 .map_err(|_| {
1180 InvocationError::Io(std::io::Error::new(
1181 std::io::ErrorKind::TimedOut,
1182 "transfer recv: header timeout (60 s)",
1183 ))
1184 })??;
1185 if let Some(ref mut c) = cipher.as_mut() {
1186 c.decrypt(&mut h);
1187 }
1188
1189 let words = if h[0] == 0x7f {
1191 let mut b = [0u8; 3];
1192 timeout(RECV_TIMEOUT, stream.read_exact(&mut b))
1193 .await
1194 .map_err(|_| {
1195 InvocationError::Io(std::io::Error::new(
1196 std::io::ErrorKind::TimedOut,
1197 "transfer recv: length timeout (60 s)",
1198 ))
1199 })??;
1200 if let Some(ref mut c) = cipher.as_mut() {
1201 c.decrypt(&mut b);
1202 }
1203 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
1204 } else {
1205 h[0] as usize
1206 };
1207
1208 let mut buf = vec![0u8; words * 4];
1209 timeout(RECV_TIMEOUT, stream.read_exact(&mut buf))
1210 .await
1211 .map_err(|_| {
1212 InvocationError::Io(std::io::Error::new(
1213 std::io::ErrorKind::TimedOut,
1214 "transfer recv: body timeout (60 s)",
1215 ))
1216 })??;
1217 if let Some(c) = cipher {
1218 c.decrypt(&mut buf);
1219 }
1220
1221 if buf.len() == 4 {
1229 let code =
1230 i32::from_le_bytes(buf[..4].try_into().expect("buf.len() == 4 checked above"));
1231 if code < 0 {
1232 return Err(InvocationError::Io(std::io::Error::new(
1233 std::io::ErrorKind::ConnectionRefused,
1234 format!("transport error from server: {code}"),
1235 )));
1236 }
1237 }
1238
1239 Ok(buf)
1240 }
1241
1242 async fn send_plain_frame(
1243 stream: &mut TcpStream,
1244 data: &[u8],
1245 cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1246 ) -> Result<(), InvocationError> {
1247 if !data.len().is_multiple_of(4) {
1250 let mut padded = data.to_vec();
1251 let pad = 4 - (data.len() % 4);
1252 padded.resize(data.len() + pad, 0);
1253 Self::send_abridged(stream, &padded, cipher).await
1254 } else {
1255 Self::send_abridged(stream, data, cipher).await
1256 }
1257 }
1258
1259 async fn recv_plain_frame<T: Deserializable>(
1260 stream: &mut TcpStream,
1261 cipher: Option<&mut ferogram_crypto::ObfuscatedCipher>,
1262 ) -> Result<T, InvocationError> {
1263 let raw = Self::recv_abridged(stream, cipher).await?;
1264 if raw.len() == 4 {
1267 let code =
1268 i32::from_le_bytes(raw[..4].try_into().expect("raw.len() == 4 checked above"));
1269 if code < 0 {
1270 return Err(InvocationError::Deserialize(format!(
1271 "server transport error during DH: code {code}"
1272 )));
1273 }
1274 }
1275 if raw.len() < 20 {
1276 return Err(InvocationError::Deserialize("plain frame too short".into()));
1277 }
1278 if u64::from_le_bytes(
1280 raw[..8]
1281 .try_into()
1282 .expect("raw.len() >= 20 checked above, >= 8 implied"),
1283 ) != 0
1284 {
1285 return Err(InvocationError::Deserialize(
1286 "expected auth_key_id=0 in plaintext".into(),
1287 ));
1288 }
1289 let body_len = u32::from_le_bytes(
1290 raw[16..20]
1291 .try_into()
1292 .expect("raw.len() >= 20 checked above"),
1293 ) as usize;
1294 if raw.len() < 20 + body_len {
1295 return Err(InvocationError::Deserialize(format!(
1296 "plain frame truncated: have {} bytes, need {}",
1297 raw.len(),
1298 20 + body_len
1299 )));
1300 }
1301 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
1302 T::deserialize(&mut cur).map_err(Into::into)
1303 }
1304}
1305
1306fn pfs_pool_decode_bind_single(body: &[u8]) -> Result<(), String> {
1309 const RPC_RESULT: u32 = 0xf35c6d01;
1310 const BOOL_TRUE: u32 = 0x9972_75b5;
1311 const BOOL_FALSE: u32 = 0xbc79_9737;
1312 const RPC_ERROR: u32 = 0x2144_ca19;
1313 const BAD_MSG: u32 = 0xa7ef_f811;
1314 const BAD_SALT: u32 = 0xedab_447b;
1315 const NEW_SESSION: u32 = 0x9ec2_0908;
1316 const FUTURE_SALTS: u32 = 0xae50_0895;
1317 const MSGS_ACK: u32 = 0x62d6_b459; const PONG: u32 = 0x0347_73c5;
1319
1320 if body.len() < 4 {
1321 return Err("skip".into());
1322 }
1323 let ctor = u32::from_le_bytes(body[..4].try_into().expect("body.len() >= 4 checked above"));
1324
1325 match ctor {
1326 BOOL_TRUE => Ok(()),
1327 BOOL_FALSE => Err("server returned boolFalse (binding rejected)".into()),
1328 NEW_SESSION | FUTURE_SALTS | MSGS_ACK | PONG => Err("skip".into()),
1329
1330 RPC_RESULT if body.len() >= 16 => {
1331 let inner = u32::from_le_bytes(
1332 body[12..16]
1333 .try_into()
1334 .expect("body.len() >= 16 from match arm guard"),
1335 );
1336 match inner {
1337 BOOL_TRUE => Ok(()),
1338 BOOL_FALSE => Err("rpc_result{boolFalse} (server rejected binding)".into()),
1339 RPC_ERROR if body.len() >= 20 => {
1340 let code = i32::from_le_bytes(
1341 body[16..20]
1342 .try_into()
1343 .expect("body.len() >= 20 from match arm guard"),
1344 );
1345 let msg = tl_read_string(body.get(20..).unwrap_or(&[])).unwrap_or_default();
1346 Err(format!("rpc_error code={code} message={msg:?}"))
1347 }
1348 _ => Err(format!("rpc_result inner ctor={inner:#010x}")),
1349 }
1350 }
1351
1352 BAD_MSG if body.len() >= 16 => {
1353 let code = u32::from_le_bytes(
1354 body[12..16]
1355 .try_into()
1356 .expect("body.len() >= 16 from match arm guard"),
1357 );
1358 let desc = match code {
1359 16 => "msg_id too low (clock skew)",
1360 17 => "msg_id too high (clock skew)",
1361 18 => "incorrect lower 2 bits of msg_id",
1362 19 => "duplicate msg_id",
1363 20 => "message too old (>300s)",
1364 32 => "msg_seqno too low",
1365 33 => "msg_seqno too high",
1366 48 => "incorrect server salt",
1367 _ => "unknown code",
1368 };
1369 Err(format!("bad_msg_notification code={code} ({desc})"))
1370 }
1371
1372 BAD_SALT if body.len() >= 24 => {
1373 let new_salt = i64::from_le_bytes(
1374 body[16..24]
1375 .try_into()
1376 .expect("body.len() >= 24 from match arm guard"),
1377 );
1378 Err(format!(
1379 "bad_server_salt, server wants salt={new_salt:#018x}"
1380 ))
1381 }
1382
1383 _ => Err(format!("unknown ctor={ctor:#010x}")),
1384 }
1385}
1386
1387fn pfs_pool_decode_bind_response(body: &[u8]) -> Result<(), String> {
1392 const MSG_CONTAINER: u32 = 0x73f1f8dc;
1393
1394 if body.len() < 4 {
1395 return Err(format!("response body too short ({} bytes)", body.len()));
1396 }
1397 let ctor = u32::from_le_bytes(body[..4].try_into().expect("body.len() >= 4 checked above"));
1398
1399 if ctor != MSG_CONTAINER {
1400 return pfs_pool_decode_bind_single(body).map_err(|e| {
1401 if e == "skip" {
1402 "__need_more__".into()
1403 } else {
1404 e
1405 }
1406 });
1407 }
1408
1409 if body.len() < 8 {
1410 return Err("msg_container too short to read count".into());
1411 }
1412 let count = u32::from_le_bytes(
1413 body[4..8]
1414 .try_into()
1415 .expect("body.len() >= 8 checked above"),
1416 ) as usize;
1417 let mut pos = 8usize;
1418 let mut last_real_err: Option<String> = None;
1419
1420 for i in 0..count {
1421 if pos + 16 > body.len() {
1422 return Err(format!(
1423 "msg_container truncated at message {i}/{count} (pos={pos} body_len={})",
1424 body.len()
1425 ));
1426 }
1427 let msg_bytes = u32::from_le_bytes(
1428 body[pos + 12..pos + 16]
1429 .try_into()
1430 .expect("pos+16 <= body.len() checked above"),
1431 ) as usize;
1432 pos += 16;
1433
1434 if pos + msg_bytes > body.len() {
1435 return Err(format!(
1436 "msg_container message {i} body overflows (need {msg_bytes}, have {})",
1437 body.len() - pos
1438 ));
1439 }
1440 let msg_body = &body[pos..pos + msg_bytes];
1441 pos += msg_bytes;
1442
1443 match pfs_pool_decode_bind_single(msg_body) {
1444 Ok(()) => return Ok(()),
1445 Err(e) if e == "skip" => continue,
1446 Err(e) => {
1447 last_real_err = Some(e);
1448 }
1449 }
1450 }
1451
1452 Err(last_real_err.unwrap_or_else(|| "__need_more__".into()))
1453}
1454
1455fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
1456 if data.is_empty() {
1457 return Some(vec![]);
1458 }
1459 let (len, start) = if data[0] < 254 {
1460 (data[0] as usize, 1)
1461 } else if data.len() >= 4 {
1462 (
1463 data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16,
1464 4,
1465 )
1466 } else {
1467 return None;
1468 };
1469 if data.len() < start + len {
1470 return None;
1471 }
1472 Some(data[start..start + len].to_vec())
1473}
1474
1475fn tl_read_string(data: &[u8]) -> Option<String> {
1476 tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
1477}