1use ferogram_connect::FrameKind;
14use ferogram_mtproto::{
15 EncryptedSession, SeenMsgIds, Session, authentication as auth, new_seen_msg_ids, step2_temp,
16};
17use ferogram_tl_types as tl;
18use ferogram_tl_types::{Cursor, Deserializable, RemoteCall};
19use tokio::io::AsyncReadExt;
20use tokio::net::TcpStream;
21
22use crate::errors::InvocationError;
23use crate::pool::{build_msgs_ack_body, build_msgs_ack_ping_body};
24use ferogram_connect::TransportKind;
25#[allow(unused_imports)]
27use metrics::{counter, histogram};
28
29const PENDING_ACKS_THRESHOLD: usize = 10;
32
33const PING_EVERY_N_CHUNKS: u32 = 5;
36
37pub struct DcConnection {
38 stream: TcpStream,
39 enc: EncryptedSession,
40 pending_acks: Vec<i64>,
41 call_count: u32,
42 frame_kind: FrameKind,
44 #[allow(dead_code)]
46 seen_msg_ids: SeenMsgIds,
47}
48
49impl DcConnection {
50 #[tracing::instrument(skip(socks5), fields(addr = %addr, dc_id = dc_id))]
52 pub async fn connect_fastest(
53 addr: &str,
54 socks5: Option<&ferogram_connect::Socks5Config>,
55 dc_id: i16,
56 ) -> Result<(Self, &'static str), InvocationError> {
57 use tokio::task::JoinSet;
58 let addr = addr.to_owned();
59 let socks5 = socks5.cloned();
60 tracing::debug!(
61 "[ferogram::sender] probing {addr} with Full, Obfuscated, and Abridged transports in parallel"
62 );
63 let mut set: JoinSet<Result<(DcConnection, &'static str), InvocationError>> =
64 JoinSet::new();
65
66 {
67 let a = addr.clone();
68 let s = socks5.clone();
69 set.spawn(async move {
70 Ok((
71 DcConnection::connect_raw(&a, s.as_ref(), &TransportKind::Full, dc_id).await?,
72 "Full",
73 ))
74 });
75 }
76 {
77 let a = addr.clone();
78 let s = socks5.clone();
79 set.spawn(async move {
80 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
81 Ok((
82 DcConnection::connect_raw(
83 &a,
84 s.as_ref(),
85 &TransportKind::Obfuscated { secret: None },
86 dc_id,
87 )
88 .await?,
89 "Obfuscated",
90 ))
91 });
92 }
93 {
94 let a = addr.clone();
95 let s = socks5.clone();
96 set.spawn(async move {
97 tokio::time::sleep(std::time::Duration::from_millis(400)).await;
98 Ok((
99 DcConnection::connect_raw(&a, s.as_ref(), &TransportKind::Abridged, dc_id)
100 .await?,
101 "Abridged",
102 ))
103 });
104 }
105 {
106 let a = addr.clone();
107 set.spawn(async move {
108 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
109 Ok((
110 DcConnection::connect_raw(&a, None, &TransportKind::Http, dc_id).await?,
111 "Http",
112 ))
113 });
114 }
115
116 let mut last_err = InvocationError::Deserialize("connect_fastest: no candidates".into());
117 while let Some(outcome) = set.join_next().await {
118 match outcome {
119 Ok(Ok((conn, label))) => {
120 set.abort_all();
121 return Ok((conn, label));
122 }
123 Ok(Err(e)) => {
124 last_err = e;
125 }
126 Err(e) if e.is_cancelled() => {}
127 Err(_) => {}
128 }
129 }
130 Err(last_err)
131 }
132
133 #[tracing::instrument(skip(socks5, transport), fields(addr = %addr, dc_id = dc_id))]
135 pub async fn connect_raw(
136 addr: &str,
137 socks5: Option<&ferogram_connect::Socks5Config>,
138 transport: &TransportKind,
139 dc_id: i16,
140 ) -> Result<Self, InvocationError> {
141 tracing::debug!("[ferogram::sender] connecting to {addr} with known auth key");
142 let (stream, frame_kind, enc) =
143 ferogram_connect::connect_to_dc(addr, dc_id, transport, socks5, None).await?;
144
145 tracing::debug!("[ferogram::sender] DH complete, auth key established for {addr}");
146 let seen = new_seen_msg_ids();
147 Ok(Self {
148 stream,
149 frame_kind,
150 enc: EncryptedSession::with_seen(
151 enc.auth_key_bytes(),
152 enc.salt,
153 enc.time_offset,
154 seen.clone(),
155 ),
156 pending_acks: Vec::new(),
157 call_count: 0,
158 seen_msg_ids: seen,
159 })
160 }
161
162 #[allow(clippy::too_many_arguments)]
165 pub async fn connect_with_key(
166 addr: &str,
167 auth_key: [u8; 256],
168 first_salt: i64,
169 time_offset: i32,
170 socks5: Option<&ferogram_connect::Socks5Config>,
171 mtproxy: Option<&ferogram_connect::MtProxyConfig>,
172 transport: &TransportKind,
173 dc_id: i16,
174 pfs: bool,
175 ) -> Result<Self, InvocationError> {
176 let (mut stream, mut frame_kind) =
178 ferogram_connect::Connection::open_stream_pub(addr, dc_id, transport, socks5, mtproxy)
179 .await?;
180
181 if pfs {
182 tracing::debug!("[ferogram::sender] PFS: binding temporary key for DC{dc_id}");
183 match Self::do_pool_pfs_bind(&mut stream, &mut frame_kind, &auth_key, dc_id).await {
184 Ok(temp_enc) => {
185 tracing::debug!("[ferogram::sender] PFS: temporary key bound for DC{dc_id}");
186 return Ok(Self {
187 stream,
188 frame_kind,
189 enc: temp_enc,
190 pending_acks: Vec::new(),
191 call_count: 0,
192 seen_msg_ids: new_seen_msg_ids(),
193 });
194 }
195 Err(e) => {
196 tracing::warn!(
197 "[ferogram::sender] PFS bind failed for DC{dc_id} ({e}); using permanent key"
198 );
199 return Err(e);
200 }
201 }
202 }
203
204 let seen = new_seen_msg_ids();
205 Ok(Self {
206 stream,
207 frame_kind,
208 enc: EncryptedSession::with_seen(auth_key, first_salt, time_offset, seen.clone()),
209 pending_acks: Vec::new(),
210 call_count: 0,
211 seen_msg_ids: seen,
212 })
213 }
214
215 async fn do_pool_pfs_bind(
217 stream: &mut tokio::net::TcpStream,
218 kind: &mut FrameKind,
219 perm_auth_key: &[u8; 256],
220 dc_id: i16,
221 ) -> Result<EncryptedSession, InvocationError> {
222 use ferogram_mtproto::{
223 auth_key_id_from_key, encrypt_bind_inner, gen_msg_id, new_seen_msg_ids,
224 serialize_bind_temp_auth_key,
225 };
226 const TEMP_EXPIRES: i32 = 86_400; let mut plain = Session::new();
230
231 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
232 Self::send_plain_frame(stream, &plain.pack(&req1).to_plaintext_bytes(), kind).await?;
233 let res_pq: tl::enums::ResPq = Self::recv_plain_frame(stream, kind).await?;
234
235 let (req2, s2) = step2_temp(s1, res_pq, dc_id as i32, TEMP_EXPIRES)
236 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
237 Self::send_plain_frame(stream, &plain.pack(&req2).to_plaintext_bytes(), kind).await?;
238 let dh: tl::enums::ServerDhParams = Self::recv_plain_frame(stream, kind).await?;
239
240 let (req3, s3) =
241 auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
242 Self::send_plain_frame(stream, &plain.pack(&req3).to_plaintext_bytes(), kind).await?;
243 let ans: tl::enums::SetClientDhParamsAnswer = Self::recv_plain_frame(stream, kind).await?;
244
245 let done = {
246 let mut result =
247 auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
248 let mut attempts = 0u8;
249 loop {
250 match result {
251 ferogram_mtproto::FinishResult::Done(d) => break d,
252 ferogram_mtproto::FinishResult::Retry {
253 retry_id,
254 dh_params,
255 nonce,
256 server_nonce,
257 new_nonce,
258 } => {
259 attempts += 1;
260 if attempts >= 5 {
261 return Err(InvocationError::Deserialize(
262 "PFS pool temp DH retry exceeded 5".into(),
263 ));
264 }
265 let (rr, s3r) = ferogram_mtproto::retry_step3(
266 &dh_params,
267 nonce,
268 server_nonce,
269 new_nonce,
270 retry_id,
271 )
272 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
273 Self::send_plain_frame(stream, &plain.pack(&rr).to_plaintext_bytes(), kind)
274 .await?;
275 let ar: tl::enums::SetClientDhParamsAnswer =
276 Self::recv_plain_frame(stream, kind).await?;
277 result = auth::finish(s3r, ar)
278 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
279 }
280 }
281 }
282 };
283
284 let temp_key = done.auth_key;
285 let temp_salt = done.first_salt;
286 let temp_offset = done.time_offset;
287
288 let temp_key_id = auth_key_id_from_key(&temp_key);
290 let perm_key_id = auth_key_id_from_key(perm_auth_key);
291
292 let mut nonce_buf = [0u8; 8];
293 ferogram_crypto::fill_random(&mut nonce_buf);
294 let nonce = i64::from_le_bytes(nonce_buf);
295
296 let server_now = std::time::SystemTime::now()
297 .duration_since(std::time::UNIX_EPOCH)
298 .expect("system clock is before UNIX epoch")
299 .as_secs() as i32
300 + temp_offset;
301 let expires_at = server_now + TEMP_EXPIRES;
302
303 let seen = new_seen_msg_ids();
304 let mut temp_enc = EncryptedSession::with_seen(temp_key, temp_salt, temp_offset, seen);
305 let temp_session_id = temp_enc.session_id();
306
307 let msg_id = gen_msg_id();
308 let enc_msg = encrypt_bind_inner(
309 perm_auth_key,
310 msg_id,
311 nonce,
312 temp_key_id,
313 perm_key_id,
314 temp_session_id,
315 expires_at,
316 );
317 let bind_body = serialize_bind_temp_auth_key(perm_key_id, nonce, expires_at, &enc_msg);
318
319 let wire = temp_enc.pack_body_at_msg_id(&bind_body, msg_id);
321 Self::send_abridged(stream, &wire, kind).await?;
322
323 for attempt in 0u8..5 {
327 let mut raw = Self::recv_abridged(stream, kind).await?;
328 let decrypted = temp_enc.unpack(&mut raw).map_err(|e| {
329 InvocationError::Deserialize(format!("PFS pool bind decrypt: {e:?}"))
330 })?;
331 match ferogram_connect::decode_bind_response(&decrypted.body) {
332 Ok(()) => {
333 return Ok(temp_enc);
337 }
338 Err(ref e) if e == "__need_more__" => {
339 tracing::debug!(
340 "[ferogram::sender] PFS (DC{dc_id}): got informational frame on attempt {attempt}, reading next"
341 );
342 continue;
343 }
344 Err(reason) => {
345 tracing::error!(
346 "[ferogram::sender] PFS bind rejected by server for DC{dc_id}: {reason}"
347 );
348 return Err(InvocationError::Deserialize(format!(
349 "auth.bindTempAuthKey (pool): {reason}"
350 )));
351 }
352 }
353 }
354 Err(InvocationError::Deserialize(
355 "auth.bindTempAuthKey (pool): no boolTrue after 5 frames".into(),
356 ))
357 }
358
359 pub fn auth_key_bytes(&self) -> [u8; 256] {
360 self.enc.auth_key_bytes()
361 }
362 pub fn first_salt(&self) -> i64 {
363 self.enc.salt
364 }
365 pub fn time_offset(&self) -> i32 {
366 self.enc.time_offset
367 }
368
369 pub(crate) fn into_parts(self) -> (TcpStream, FrameKind, EncryptedSession) {
379 (self.stream, self.frame_kind, self.enc)
380 }
381
382 #[tracing::instrument(skip(self, req), fields(method = std::any::type_name::<R>()))]
383 pub async fn rpc_call<R: RemoteCall>(&mut self, req: &R) -> Result<Vec<u8>, InvocationError> {
384 let _t0 = std::time::Instant::now();
385 self.call_count += 1;
388 if self.call_count.is_multiple_of(PING_EVERY_N_CHUNKS) {
389 let ping_id = self.call_count as i64;
390 let ping_body = build_msgs_ack_ping_body(ping_id);
391 let (ping_wire, _) = self.enc.pack_body_with_msg_id(&ping_body, true);
393 let _ = Self::send_abridged(&mut self.stream, &ping_wire, &mut self.frame_kind).await;
401 }
402
403 if !self.pending_acks.is_empty() {
405 let ack_body = build_msgs_ack_body(&self.pending_acks);
406 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
407 let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
408 self.pending_acks.clear();
409 }
410
411 let (wire, mut sent_msg_id) = self.enc.pack_with_msg_id(req);
413 Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
414 let mut salt_retries = 0u8;
415 let mut session_resets = 0u8;
416 loop {
417 let mut raw = Self::recv_abridged(&mut self.stream, &mut self.frame_kind).await?;
418 let msg = self
419 .enc
420 .unpack(&mut raw)
421 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
422 self.pending_acks.push(msg.msg_id);
424 if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
425 let ack_body = build_msgs_ack_body(&self.pending_acks);
428 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
429 let _ =
430 Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
431 self.pending_acks.clear();
432 }
433 if msg.body.len() < 4 {
435 return Ok(msg.body);
436 }
437 let mut need_resend = false;
438 let mut need_session_reset = false;
439 let mut bad_msg_code: Option<u32> = None;
440 let mut bad_msg_server_id: Option<i64> = None;
441 let scan_result = Self::scan_body(
444 &msg.body,
445 &mut self.enc.salt,
446 &mut need_resend,
447 &mut need_session_reset,
448 &mut bad_msg_code,
449 &mut bad_msg_server_id,
450 Some(sent_msg_id),
451 msg.msg_id,
452 )?;
453 if need_session_reset {
455 session_resets += 1;
456 if session_resets > 2 {
457 return Err(InvocationError::Deserialize(
458 "new_session_created: exceeded 2 resets".into(),
459 ));
460 }
461 if !self.pending_acks.is_empty() {
462 let ack_body = build_msgs_ack_body(&self.pending_acks);
463 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
464 let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
465 .await;
466 self.pending_acks.clear();
467 }
468 if scan_result.is_none() {
472 tracing::debug!(
474 "[ferogram::sender] new_session_created: resending request (attempt {session_resets}/2)"
475 );
476 let (wire, new_id) = self.enc.pack_with_msg_id(req);
477 sent_msg_id = new_id;
478 Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
479 }
480 } else if need_resend {
484 match bad_msg_code {
486 Some(16) | Some(17) => {
487 if let Some(srv_id) = bad_msg_server_id {
488 self.enc.correct_time_offset(srv_id);
489 }
490 }
495 Some(32) | Some(33) => {
496 self.enc
499 .correct_seq_no(bad_msg_code.expect("matched Some arm"));
500 }
501 _ => {
502 self.enc.undo_seq_no();
504 }
505 }
506 salt_retries += 1;
507 if salt_retries >= 5 {
508 return Err(InvocationError::Deserialize(
509 "bad_server_salt/bad_msg: exceeded 5 retries".into(),
510 ));
511 }
512 tracing::debug!(
513 "[ferogram::sender] resending transfer request after bad_msg correction (code={bad_msg_code:?}, attempt {salt_retries}/5)"
514 );
515 if !self.pending_acks.is_empty() {
516 let ack_body = build_msgs_ack_body(&self.pending_acks);
517 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
518 let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
519 .await;
520 self.pending_acks.clear();
521 }
522 let (wire, new_id) = self.enc.pack_with_msg_id(req);
523 sent_msg_id = new_id;
524 Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
525 }
526 if let Some(result) = scan_result {
527 metrics::counter!("ferogram.rpc_calls_total", "result" => "ok").increment(1);
528 metrics::histogram!("ferogram.rpc_latency_ms")
529 .record(_t0.elapsed().as_millis() as f64);
530 return Ok(result);
531 }
532 }
533 }
534 #[allow(clippy::too_many_arguments)]
547 fn scan_body(
548 body: &[u8],
549 salt: &mut i64,
550 need_resend: &mut bool,
551 need_session_reset: &mut bool,
552 bad_msg_code: &mut Option<u32>,
553 bad_msg_server_id: &mut Option<i64>,
554 sent_msg_id: Option<i64>,
555 server_msg_id: i64,
556 ) -> Result<Option<Vec<u8>>, InvocationError> {
557 if body.len() < 4 {
558 return Ok(None);
559 }
560 let cid = u32::from_le_bytes(body[..4].try_into().expect("body.len() >= 4 checked above"));
561 match cid {
562 0xf35c6d01 => {
563 if body.len() >= 12
564 && let Some(expected) = sent_msg_id {
565 let resp_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 12 checked above"));
566 if resp_id != expected {
567 tracing::debug!(
568 "[ferogram::sender] rpc_result msg_id mismatch (got {resp_id:#018x}, want {expected:#018x}); skipping this frame"
569 );
570 return Ok(None);
571 }
572 }
573 let inner = if body.len() >= 12 { &body[12..] } else { body };
574 if inner.len() >= 4
576 && u32::from_le_bytes(inner[..4].try_into().expect("inner.len() >= 4 checked above")) == 0x3072cfa1
577 {
578 let mut dummy_salt = *salt;
579 let mut nr = false; let mut nsr = false;
580 let mut bc = None; let mut bsi = None;
581 if let Some(r) = Self::scan_body(inner, &mut dummy_salt, &mut nr, &mut nsr, &mut bc, &mut bsi, None, server_msg_id)? {
582 return Ok(Some(r));
583 }
584 if let Some(compressed) = ferogram_connect::tl_read_bytes(&inner[4..])
586 && let Ok(out) = ferogram_connect::gz_inflate(&compressed)
587 {
588 return Ok(Some(out));
589 }
590 return Ok(None);
591 }
592 if inner.len() >= 8
593 && u32::from_le_bytes(inner[..4].try_into().expect("inner.len() >= 8 checked above")) == 0x2144ca19
594 {
595 let code = i32::from_le_bytes(inner[4..8].try_into().expect("inner.len() >= 8 checked above"));
596 let message = ferogram_connect::tl_read_string(&inner[8..]).unwrap_or_default();
597 return Err(InvocationError::Rpc(
598 crate::errors::RpcError::from_telegram(code, &message),
599 ));
600 }
601 Ok(Some(inner.to_vec()))
602 }
603 0x2144ca19 => {
604 if body.len() < 8 {
605 return Err(InvocationError::Deserialize("rpc_error short".into()));
606 }
607 let code = i32::from_le_bytes(body[4..8].try_into().expect("body.len() >= 8 checked above"));
608 let message = ferogram_connect::tl_read_string(&body[8..]).unwrap_or_default();
609 Err(InvocationError::Rpc(crate::errors::RpcError::from_telegram(code, &message)))
610 }
611 0xedab447b => {
612 if body.len() >= 28 {
614 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 28 checked above"));
615 let new_salt = i64::from_le_bytes(body[20..28].try_into().expect("body.len() >= 28 checked above"));
616 if sent_msg_id.is_none_or(|id| id == bad_msg_id) {
619 *salt = new_salt;
620 *need_resend = true;
621 }
622 }
623 Ok(None)
624 }
625 0x9ec20908 => {
626 if body.len() >= 28 {
629 let first_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 28 checked above"));
630 let unique_id = i64::from_le_bytes(body[12..20].try_into().expect("body.len() >= 28 checked above"));
631 let server_salt = i64::from_le_bytes(body[20..28].try_into().expect("body.len() >= 28 checked above"));
632 tracing::debug!(
633 unique_id = format_args!("{unique_id:#018x}"),
634 first_msg_id,
635 salt = server_salt,
636 "[ferogram::sender] new_session_created: server opened fresh session"
637 );
638 *salt = server_salt;
639 if sent_msg_id.is_some_and(|id| id < first_msg_id) {
645 *need_session_reset = true;
646 }
647 }
648 Ok(None)
649 }
650 0xa7eff811 => {
651 if body.len() >= 20 {
657 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 20 checked above"));
658 let error_code = u32::from_le_bytes(body[16..20].try_into().expect("body.len() >= 20 checked above"));
660 tracing::debug!(
661 bad_msg_id = format_args!("{bad_msg_id:#018x}"),
662 error_code,
663 "[ferogram::sender] bad_msg_notification received"
664 );
665 match error_code {
666 16 | 17 => {
667 *bad_msg_code = Some(error_code);
671 *bad_msg_server_id = Some(server_msg_id);
672 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
673 }
674 32 | 33 => {
675 *bad_msg_code = Some(error_code);
677 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
678 }
679 48 => {
680 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
687 tracing::debug!(
688 "[ferogram::sender] bad_msg code 48 (wrong server salt): will resend with updated salt"
689 );
690 }
691 _ => {
692 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
694 }
695 }
696 }
697 Ok(None)
698 }
699 0x347773c5 => {
700 if body.len() >= 12
706 && let Some(expected) = sent_msg_id
707 {
708 let pong_req_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 12 for pong"));
709 if pong_req_id == expected {
710 return Ok(Some(body.to_vec()));
711 }
712 }
713 Ok(None)
715 }
716 0x73f1f8dc => {
717 if body.len() < 8 {
718 return Ok(None);
719 }
720 let count = u32::from_le_bytes(body[4..8].try_into().expect("body.len() >= 8 for msg_container")) as usize;
721 let mut pos = 8usize;
722 let mut found: Option<Vec<u8>> = None;
725 for _ in 0..count {
726 if pos + 16 > body.len() { break; }
727 let inner_bytes =
728 u32::from_le_bytes(body[pos + 12..pos + 16].try_into().expect("pos+16 <= body.len() checked above")) as usize;
729 pos += 16;
730 if pos + inner_bytes > body.len() { break; }
731 let inner = &body[pos..pos + inner_bytes];
732 pos += inner_bytes;
733 if found.is_none() {
734 if let Some(r) = Self::scan_body(inner, salt, need_resend,
735 need_session_reset, bad_msg_code, bad_msg_server_id, sent_msg_id,
736 server_msg_id)?
737 {
738 found = Some(r);
739 }
742 } else {
743 let _ = Self::scan_body(inner, salt, need_resend, need_session_reset,
749 bad_msg_code, bad_msg_server_id, sent_msg_id,
750 server_msg_id)?;
751 }
752 }
753 Ok(found)
754 }
755 0x3072cfa1 => {
756 if let Some(compressed) = ferogram_connect::tl_read_bytes(&body[4..])
758 && let Ok(decompressed) = ferogram_connect::gz_inflate(&compressed)
759 && !decompressed.is_empty()
760 {
761 return Self::scan_body(
762 &decompressed, salt,
763 need_resend, need_session_reset,
764 bad_msg_code, bad_msg_server_id,
765 sent_msg_id,
766 server_msg_id,
767 );
768 }
769 Ok(None)
770 }
771 _ => Ok(None),
772 }
773 }
774
775 pub async fn rpc_call_serializable<S: ferogram_tl_types::Serializable>(
777 &mut self,
778 req: &S,
779 ) -> Result<Vec<u8>, InvocationError> {
780 if !self.pending_acks.is_empty() {
781 let ack_body = build_msgs_ack_body(&self.pending_acks);
782 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
783 let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
784 self.pending_acks.clear();
785 }
786 let (wire, mut sent_msg_id) = self.enc.pack_serializable_with_msg_id(req);
787 Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
788 let mut salt_retries = 0u8;
789 let mut session_resets = 0u8;
790 loop {
791 let mut raw = Self::recv_abridged(&mut self.stream, &mut self.frame_kind).await?;
792 let msg = self
793 .enc
794 .unpack(&mut raw)
795 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
796 self.pending_acks.push(msg.msg_id);
797 if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
798 let ack_body = build_msgs_ack_body(&self.pending_acks);
799 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
800 let _ =
801 Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
802 self.pending_acks.clear();
803 }
804 if msg.body.len() < 4 {
806 return Ok(msg.body);
807 }
808 let mut need_resend = false;
809 let mut need_session_reset = false;
810 let mut bad_msg_code: Option<u32> = None;
811 let mut bad_msg_server_id: Option<i64> = None;
812 let scan_result = Self::scan_body(
814 &msg.body,
815 &mut self.enc.salt,
816 &mut need_resend,
817 &mut need_session_reset,
818 &mut bad_msg_code,
819 &mut bad_msg_server_id,
820 Some(sent_msg_id),
821 msg.msg_id,
822 )?;
823 if need_session_reset {
824 session_resets += 1;
825 if session_resets > 2 {
826 return Err(InvocationError::Deserialize(
827 "new_session_created (serializable): exceeded 2 resets".into(),
828 ));
829 }
830 if !self.pending_acks.is_empty() {
831 let ack_body = build_msgs_ack_body(&self.pending_acks);
832 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
833 let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
834 .await;
835 self.pending_acks.clear();
836 }
837 if scan_result.is_none() {
838 let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
839 sent_msg_id = new_id;
840 Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
841 }
842 } else if need_resend {
843 match bad_msg_code {
844 Some(16) | Some(17) => {
845 if let Some(srv_id) = bad_msg_server_id {
846 self.enc.correct_time_offset(srv_id);
847 }
848 }
850 Some(32) | Some(33) => {
851 self.enc
852 .correct_seq_no(bad_msg_code.expect("matched Some arm"));
853 }
854 _ => {
855 self.enc.undo_seq_no();
856 }
857 }
858 salt_retries += 1;
859 if salt_retries >= 5 {
860 return Err(InvocationError::Deserialize(
861 "bad_server_salt (serializable): exceeded 5 retries".into(),
862 ));
863 }
864 tracing::debug!(
865 "[ferogram::sender] resending serializable request after bad_msg correction (code={bad_msg_code:?}, attempt {salt_retries}/5)"
866 );
867 if !self.pending_acks.is_empty() {
868 let ack_body = build_msgs_ack_body(&self.pending_acks);
869 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
870 let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
871 .await;
872 self.pending_acks.clear();
873 }
874 let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
875 sent_msg_id = new_id;
876 Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
877 }
878 if let Some(result) = scan_result {
879 return Ok(result);
880 }
881 }
882 }
883
884 pub async fn rpc_call_raw(&mut self, body: &[u8]) -> Result<Vec<u8>, InvocationError> {
887 Self::send_abridged(&mut self.stream, body, &mut self.frame_kind).await?;
888 Self::recv_abridged(&mut self.stream, &mut self.frame_kind).await
889 }
890
891 async fn send_abridged(
894 stream: &mut TcpStream,
895 data: &[u8],
896 kind: &mut FrameKind,
897 ) -> Result<(), InvocationError> {
898 use tokio::io::AsyncWriteExt as _;
899 match kind {
900 FrameKind::Abridged => {
901 let words = data.len() / 4;
902 let mut frame = if words < 0x7f {
903 let mut v = Vec::with_capacity(1 + data.len());
904 v.push(words as u8);
905 v
906 } else {
907 let mut v = Vec::with_capacity(4 + data.len());
908 v.extend_from_slice(&[
909 0x7f,
910 (words & 0xff) as u8,
911 ((words >> 8) & 0xff) as u8,
912 ((words >> 16) & 0xff) as u8,
913 ]);
914 v
915 };
916 frame.extend_from_slice(data);
917 stream.write_all(&frame).await?;
918 }
919 FrameKind::Intermediate => {
920 let mut frame = Vec::with_capacity(4 + data.len());
921 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
922 frame.extend_from_slice(data);
923 stream.write_all(&frame).await?;
924 }
925 FrameKind::Full { send_seqno, .. } => {
926 let seq = send_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
927 let total_len = (data.len() as u32) + 12;
928 let mut packet = Vec::with_capacity(total_len as usize);
929 packet.extend_from_slice(&total_len.to_le_bytes());
930 packet.extend_from_slice(&seq.to_le_bytes());
931 packet.extend_from_slice(data);
932 let crc = ferogram_connect::crc32_ieee(&packet);
933 packet.extend_from_slice(&crc.to_le_bytes());
934 stream.write_all(&packet).await?;
935 }
936 FrameKind::Obfuscated { cipher } => {
937 let words = data.len() / 4;
938 let mut frame = if words < 0x7f {
939 let mut v = Vec::with_capacity(1 + data.len());
940 v.push(words as u8);
941 v
942 } else {
943 let mut v = Vec::with_capacity(4 + data.len());
944 v.extend_from_slice(&[
945 0x7f,
946 (words & 0xff) as u8,
947 ((words >> 8) & 0xff) as u8,
948 ((words >> 16) & 0xff) as u8,
949 ]);
950 v
951 };
952 frame.extend_from_slice(data);
953 cipher.lock().await.encrypt(&mut frame);
954 stream.write_all(&frame).await?;
955 }
956 FrameKind::PaddedIntermediate { cipher } => {
957 let mut pad_len_buf = [0u8; 1];
958 ferogram_crypto::fill_random(&mut pad_len_buf);
959 let pad_len = (pad_len_buf[0] & 0x0f) as usize;
960 let total_payload = data.len() + pad_len;
961 let mut frame = Vec::with_capacity(4 + total_payload);
962 frame.extend_from_slice(&(total_payload as u32).to_le_bytes());
963 frame.extend_from_slice(data);
964 let mut pad = vec![0u8; pad_len];
965 ferogram_crypto::fill_random(&mut pad);
966 frame.extend_from_slice(&pad);
967 cipher.lock().await.encrypt(&mut frame);
968 stream.write_all(&frame).await?;
969 }
970 FrameKind::FakeTls { cipher } => {
971 const TLS_APP_DATA: u8 = 0x17;
972 const TLS_VER: [u8; 2] = [0x03, 0x03];
973 const CHUNK: usize = 2878;
974 let mut locked = cipher.lock().await;
975 for chunk in data.chunks(CHUNK) {
976 let chunk_len = chunk.len() as u16;
977 let mut record = Vec::with_capacity(5 + chunk.len());
978 record.push(TLS_APP_DATA);
979 record.extend_from_slice(&TLS_VER);
980 record.extend_from_slice(&chunk_len.to_be_bytes());
981 record.extend_from_slice(chunk);
982 locked.encrypt(&mut record[5..]);
983 stream.write_all(&record).await?;
984 }
985 }
986 }
987 Ok(())
988 }
989
990 async fn recv_abridged(
992 stream: &mut TcpStream,
993 kind: &mut FrameKind,
994 ) -> Result<Vec<u8>, InvocationError> {
995 use tokio::time::{Duration, timeout};
996 const RECV_TIMEOUT: Duration = Duration::from_secs(60);
997
998 macro_rules! tread {
999 ($buf:expr) => {
1000 timeout(RECV_TIMEOUT, stream.read_exact($buf))
1001 .await
1002 .map_err(|_| {
1003 InvocationError::Io(std::io::Error::new(
1004 std::io::ErrorKind::TimedOut,
1005 "transfer recv: timeout (60 s)",
1006 ))
1007 })??
1008 };
1009 }
1010
1011 match kind {
1012 FrameKind::Abridged => {
1013 let mut h = [0u8; 1];
1014 tread!(&mut h);
1015 let words = if h[0] == 0x7f {
1016 let mut b = [0u8; 3];
1017 tread!(&mut b);
1018 let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
1019 if w == 1 {
1020 let mut code_buf = [0u8; 4];
1021 tread!(&mut code_buf);
1022 let code = i32::from_le_bytes(code_buf);
1023 return Err(InvocationError::Rpc(
1024 crate::errors::RpcError::from_telegram(code, "transport error"),
1025 ));
1026 }
1027 w
1028 } else {
1029 h[0] as usize
1030 };
1031 let mut buf = vec![0u8; words * 4];
1032 tread!(&mut buf);
1033 if buf.len() == 4 {
1034 let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
1035 if code < 0 {
1036 return Err(InvocationError::Rpc(
1037 crate::errors::RpcError::from_telegram(code, "transport error"),
1038 ));
1039 }
1040 }
1041 Ok(buf)
1042 }
1043 FrameKind::Intermediate => {
1044 let mut len_buf = [0u8; 4];
1045 tread!(&mut len_buf);
1046 let len_i32 = i32::from_le_bytes(len_buf);
1047 if len_i32 < 0 {
1048 return Err(InvocationError::Rpc(
1049 crate::errors::RpcError::from_telegram(len_i32, "transport error"),
1050 ));
1051 }
1052 let mut buf = vec![0u8; len_i32 as usize];
1053 tread!(&mut buf);
1054 Ok(buf)
1055 }
1056 FrameKind::Full { recv_seqno, .. } => {
1057 let mut len_buf = [0u8; 4];
1058 tread!(&mut len_buf);
1059 let total_len_i32 = i32::from_le_bytes(len_buf);
1060 if total_len_i32 < 0 {
1061 return Err(InvocationError::Rpc(
1062 crate::errors::RpcError::from_telegram(total_len_i32, "transport error"),
1063 ));
1064 }
1065 let total_len = total_len_i32 as usize;
1066 if total_len < 12 {
1067 return Err(InvocationError::Deserialize(
1068 "Full transport: packet too short".into(),
1069 ));
1070 }
1071 let mut rest = vec![0u8; total_len - 4];
1072 tread!(&mut rest);
1073 let (body, crc_bytes) = rest.split_at(rest.len() - 4);
1074 let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
1075 let mut check_input = Vec::with_capacity(4 + body.len());
1076 check_input.extend_from_slice(&len_buf);
1077 check_input.extend_from_slice(body);
1078 let actual_crc = ferogram_connect::crc32_ieee(&check_input);
1079 if actual_crc != expected_crc {
1080 return Err(InvocationError::Deserialize(format!(
1081 "Full transport: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
1082 )));
1083 }
1084 let recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
1085 let expected_seq = recv_seqno.load(std::sync::atomic::Ordering::Relaxed);
1086 if recv_seq != expected_seq {
1087 return Err(InvocationError::Deserialize(format!(
1088 "Full transport: seqno mismatch (got {recv_seq}, expected {expected_seq})"
1089 )));
1090 }
1091 recv_seqno.store(
1092 expected_seq.wrapping_add(1),
1093 std::sync::atomic::Ordering::Relaxed,
1094 );
1095 Ok(body[4..].to_vec())
1096 }
1097 FrameKind::Obfuscated { cipher } => {
1098 let mut h = [0u8; 1];
1099 tread!(&mut h);
1100 cipher.lock().await.decrypt(&mut h);
1101 let words = if h[0] == 0x7f {
1102 let mut b = [0u8; 3];
1103 tread!(&mut b);
1104 cipher.lock().await.decrypt(&mut b);
1105 let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
1106 if w == 1 {
1107 let mut code_buf = [0u8; 4];
1108 tread!(&mut code_buf);
1109 cipher.lock().await.decrypt(&mut code_buf);
1110 let code = i32::from_le_bytes(code_buf);
1111 return Err(InvocationError::Rpc(
1112 crate::errors::RpcError::from_telegram(code, "transport error"),
1113 ));
1114 }
1115 w
1116 } else {
1117 h[0] as usize
1118 };
1119 let mut buf = vec![0u8; words * 4];
1120 tread!(&mut buf);
1121 cipher.lock().await.decrypt(&mut buf);
1122 if buf.len() == 4 {
1123 let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
1124 if code < 0 {
1125 return Err(InvocationError::Rpc(
1126 crate::errors::RpcError::from_telegram(code, "transport error"),
1127 ));
1128 }
1129 }
1130 Ok(buf)
1131 }
1132 FrameKind::PaddedIntermediate { cipher } => {
1133 let mut len_buf = [0u8; 4];
1134 tread!(&mut len_buf);
1135 cipher.lock().await.decrypt(&mut len_buf);
1136 let total_len = i32::from_le_bytes(len_buf);
1137 if total_len < 0 {
1138 return Err(InvocationError::Rpc(
1139 crate::errors::RpcError::from_telegram(total_len, "transport error"),
1140 ));
1141 }
1142 let mut buf = vec![0u8; total_len as usize];
1143 tread!(&mut buf);
1144 cipher.lock().await.decrypt(&mut buf);
1145 if buf.len() >= 24 {
1146 let pad = (buf.len() - 24) % 16;
1147 buf.truncate(buf.len() - pad);
1148 }
1149 Ok(buf)
1150 }
1151 FrameKind::FakeTls { cipher } => {
1152 let mut hdr = [0u8; 5];
1153 tread!(&mut hdr);
1154 if hdr[0] != 0x17 {
1155 return Err(InvocationError::Deserialize(format!(
1156 "FakeTLS: unexpected record type 0x{:02x}",
1157 hdr[0]
1158 )));
1159 }
1160 let payload_len = u16::from_be_bytes([hdr[3], hdr[4]]) as usize;
1161 let mut buf = vec![0u8; payload_len];
1162 tread!(&mut buf);
1163 cipher.lock().await.decrypt(&mut buf);
1164 Ok(buf)
1165 }
1166 }
1167 }
1168
1169 async fn send_plain_frame(
1172 stream: &mut TcpStream,
1173 data: &[u8],
1174 kind: &mut FrameKind,
1175 ) -> Result<(), InvocationError> {
1176 let needs_align = matches!(kind, FrameKind::Abridged | FrameKind::Obfuscated { .. });
1179 if needs_align && !data.len().is_multiple_of(4) {
1180 let mut padded = data.to_vec();
1181 let pad = 4 - (data.len() % 4);
1182 padded.resize(data.len() + pad, 0);
1183 Self::send_abridged(stream, &padded, kind).await
1184 } else {
1185 Self::send_abridged(stream, data, kind).await
1186 }
1187 }
1188
1189 async fn recv_plain_frame<T: Deserializable>(
1190 stream: &mut TcpStream,
1191 kind: &mut FrameKind,
1192 ) -> Result<T, InvocationError> {
1193 let raw = Self::recv_abridged(stream, kind).await?;
1194 if raw.len() == 4 {
1195 let code = i32::from_le_bytes(raw[..4].try_into().unwrap());
1196 if code < 0 {
1197 return Err(InvocationError::Deserialize(format!(
1198 "server transport error during DH: code {code}"
1199 )));
1200 }
1201 }
1202 if raw.len() < 20 {
1203 return Err(InvocationError::Deserialize("plain frame too short".into()));
1204 }
1205 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
1206 return Err(InvocationError::Deserialize(
1207 "expected auth_key_id=0 in plaintext".into(),
1208 ));
1209 }
1210 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
1211 if raw.len() < 20 + body_len {
1212 return Err(InvocationError::Deserialize(format!(
1213 "plain frame truncated: have {} bytes, need {}",
1214 raw.len(),
1215 20 + body_len
1216 )));
1217 }
1218 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
1219 T::deserialize(&mut cur).map_err(Into::into)
1220 }
1221}