1use ferogram_connect::FrameKind;
14use ferogram_mtproto::{
15 EncryptedSession, SeenMsgIds, Session, authentication as auth, new_seen_msg_ids, step2_temp,
16};
17use ferogram_tl_types as tl;
18use ferogram_tl_types::{Cursor, Deserializable, RemoteCall};
19use tokio::io::AsyncReadExt;
20use tokio::net::TcpStream;
21
22use crate::errors::InvocationError;
23use crate::pool::{build_msgs_ack_body, build_msgs_ack_ping_body};
24use ferogram_connect::TransportKind;
25#[allow(unused_imports)]
27use metrics::{counter, histogram};
28
29const PENDING_ACKS_THRESHOLD: usize = 10;
32
33const PING_EVERY_N_CHUNKS: u32 = 5;
36
37pub struct DcConnection {
38 stream: TcpStream,
39 enc: EncryptedSession,
40 pending_acks: Vec<i64>,
41 call_count: u32,
42 frame_kind: FrameKind,
44 #[allow(dead_code)]
46 seen_msg_ids: SeenMsgIds,
47}
48
49impl DcConnection {
50 #[tracing::instrument(skip(socks5), fields(addr = %addr, dc_id = dc_id))]
52 pub async fn connect_fastest(
53 addr: &str,
54 socks5: Option<&ferogram_connect::Socks5Config>,
55 dc_id: i16,
56 ) -> Result<(Self, &'static str), InvocationError> {
57 use tokio::task::JoinSet;
58 let addr = addr.to_owned();
59 let socks5 = socks5.cloned();
60 tracing::debug!(
61 "[ferogram::sender] probing {addr} with Full, Obfuscated, and Abridged transports in parallel"
62 );
63 let mut set: JoinSet<Result<(DcConnection, &'static str), InvocationError>> =
64 JoinSet::new();
65
66 {
67 let a = addr.clone();
68 let s = socks5.clone();
69 set.spawn(async move {
70 Ok((
71 DcConnection::connect_raw(&a, s.as_ref(), &TransportKind::Full, dc_id).await?,
72 "Full",
73 ))
74 });
75 }
76 {
77 let a = addr.clone();
78 let s = socks5.clone();
79 set.spawn(async move {
80 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
81 Ok((
82 DcConnection::connect_raw(
83 &a,
84 s.as_ref(),
85 &TransportKind::Obfuscated { secret: None },
86 dc_id,
87 )
88 .await?,
89 "Obfuscated",
90 ))
91 });
92 }
93 {
94 let a = addr.clone();
95 let s = socks5.clone();
96 set.spawn(async move {
97 tokio::time::sleep(std::time::Duration::from_millis(400)).await;
98 Ok((
99 DcConnection::connect_raw(&a, s.as_ref(), &TransportKind::Abridged, dc_id)
100 .await?,
101 "Abridged",
102 ))
103 });
104 }
105 {
106 let a = addr.clone();
107 set.spawn(async move {
108 tokio::time::sleep(std::time::Duration::from_millis(800)).await;
109 Ok((
110 DcConnection::connect_raw(&a, None, &TransportKind::Http, dc_id).await?,
111 "Http",
112 ))
113 });
114 }
115
116 let mut last_err = InvocationError::Deserialize("connect_fastest: no candidates".into());
117 while let Some(outcome) = set.join_next().await {
118 match outcome {
119 Ok(Ok((conn, label))) => {
120 set.abort_all();
121 return Ok((conn, label));
122 }
123 Ok(Err(e)) => {
124 last_err = e;
125 }
126 Err(e) if e.is_cancelled() => {}
127 Err(_) => {}
128 }
129 }
130 Err(last_err)
131 }
132
133 #[tracing::instrument(skip(socks5, transport), fields(addr = %addr, dc_id = dc_id))]
135 pub async fn connect_raw(
136 addr: &str,
137 socks5: Option<&ferogram_connect::Socks5Config>,
138 transport: &TransportKind,
139 dc_id: i16,
140 ) -> Result<Self, InvocationError> {
141 tracing::debug!("[ferogram::sender] connecting to {addr} with known auth key");
142 let (stream, frame_kind, enc) =
143 ferogram_connect::connect_to_dc(addr, dc_id, transport, socks5, None).await?;
144
145 tracing::debug!("[ferogram::sender] DH complete, auth key established for {addr}");
146 let seen = new_seen_msg_ids();
147 Ok(Self {
148 stream,
149 frame_kind,
150 enc: EncryptedSession::with_seen(
151 enc.auth_key_bytes(),
152 enc.salt,
153 enc.time_offset,
154 seen.clone(),
155 ),
156 pending_acks: Vec::new(),
157 call_count: 0,
158 seen_msg_ids: seen,
159 })
160 }
161
162 #[allow(clippy::too_many_arguments)]
165 pub async fn connect_with_key(
166 addr: &str,
167 auth_key: [u8; 256],
168 first_salt: i64,
169 time_offset: i32,
170 socks5: Option<&ferogram_connect::Socks5Config>,
171 mtproxy: Option<&ferogram_connect::MtProxyConfig>,
172 transport: &TransportKind,
173 dc_id: i16,
174 pfs: bool,
175 ) -> Result<Self, InvocationError> {
176 let (mut stream, mut frame_kind) =
178 ferogram_connect::Connection::open_stream_pub(addr, dc_id, transport, socks5, mtproxy)
179 .await?;
180
181 if pfs {
182 tracing::debug!("[ferogram::sender] PFS: binding temporary key for DC{dc_id}");
183 match Self::do_pool_pfs_bind(&mut stream, &mut frame_kind, &auth_key, dc_id).await {
184 Ok(temp_enc) => {
185 tracing::debug!("[ferogram::sender] PFS: temporary key bound for DC{dc_id}");
186 return Ok(Self {
187 stream,
188 frame_kind,
189 enc: temp_enc,
190 pending_acks: Vec::new(),
191 call_count: 0,
192 seen_msg_ids: new_seen_msg_ids(),
193 });
194 }
195 Err(e) => {
196 tracing::warn!(
197 "[ferogram::sender] PFS bind failed for DC{dc_id} ({e}); using permanent key"
198 );
199 return Err(e);
200 }
201 }
202 }
203
204 let seen = new_seen_msg_ids();
205 Ok(Self {
206 stream,
207 frame_kind,
208 enc: EncryptedSession::with_seen(auth_key, first_salt, time_offset, seen.clone()),
209 pending_acks: Vec::new(),
210 call_count: 0,
211 seen_msg_ids: seen,
212 })
213 }
214
215 async fn do_pool_pfs_bind(
217 stream: &mut tokio::net::TcpStream,
218 kind: &mut FrameKind,
219 perm_auth_key: &[u8; 256],
220 dc_id: i16,
221 ) -> Result<EncryptedSession, InvocationError> {
222 use ferogram_mtproto::{
223 auth_key_id_from_key, encrypt_bind_inner, gen_msg_id, new_seen_msg_ids,
224 serialize_bind_temp_auth_key,
225 };
226 const TEMP_EXPIRES: i32 = 86_400; let mut plain = Session::new();
230
231 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
232 Self::send_plain_frame(stream, &plain.pack(&req1).to_plaintext_bytes(), kind).await?;
233 let res_pq: tl::enums::ResPq = Self::recv_plain_frame(stream, kind).await?;
234
235 let (req2, s2) = step2_temp(s1, res_pq, dc_id as i32, TEMP_EXPIRES)
236 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
237 Self::send_plain_frame(stream, &plain.pack(&req2).to_plaintext_bytes(), kind).await?;
238 let dh: tl::enums::ServerDhParams = Self::recv_plain_frame(stream, kind).await?;
239
240 let (req3, s3) =
241 auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
242 Self::send_plain_frame(stream, &plain.pack(&req3).to_plaintext_bytes(), kind).await?;
243 let ans: tl::enums::SetClientDhParamsAnswer = Self::recv_plain_frame(stream, kind).await?;
244
245 let done = {
246 let mut result =
247 auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
248 let mut attempts = 0u8;
249 loop {
250 match result {
251 ferogram_mtproto::FinishResult::Done(d) => break d,
252 ferogram_mtproto::FinishResult::Retry {
253 retry_id,
254 dh_params,
255 nonce,
256 server_nonce,
257 new_nonce,
258 } => {
259 attempts += 1;
260 if attempts >= 5 {
261 return Err(InvocationError::Deserialize(
262 "PFS pool temp DH retry exceeded 5".into(),
263 ));
264 }
265 let (rr, s3r) = ferogram_mtproto::retry_step3(
266 &dh_params,
267 nonce,
268 server_nonce,
269 new_nonce,
270 retry_id,
271 )
272 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
273 Self::send_plain_frame(stream, &plain.pack(&rr).to_plaintext_bytes(), kind)
274 .await?;
275 let ar: tl::enums::SetClientDhParamsAnswer =
276 Self::recv_plain_frame(stream, kind).await?;
277 result = auth::finish(s3r, ar)
278 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
279 }
280 }
281 }
282 };
283
284 let temp_key = done.auth_key;
285 let temp_salt = done.first_salt;
286 let temp_offset = done.time_offset;
287
288 let temp_key_id = auth_key_id_from_key(&temp_key);
290 let perm_key_id = auth_key_id_from_key(perm_auth_key);
291
292 let mut nonce_buf = [0u8; 8];
293 ferogram_crypto::fill_random(&mut nonce_buf);
294 let nonce = i64::from_le_bytes(nonce_buf);
295
296 let server_now = std::time::SystemTime::now()
297 .duration_since(std::time::UNIX_EPOCH)
298 .expect("system clock is before UNIX epoch")
299 .as_secs() as i32
300 + temp_offset;
301 let expires_at = server_now + TEMP_EXPIRES;
302
303 let seen = new_seen_msg_ids();
304 let mut temp_enc = EncryptedSession::with_seen(temp_key, temp_salt, temp_offset, seen);
305 let temp_session_id = temp_enc.session_id();
306
307 let msg_id = gen_msg_id();
308 let enc_msg = encrypt_bind_inner(
309 perm_auth_key,
310 msg_id,
311 nonce,
312 temp_key_id,
313 perm_key_id,
314 temp_session_id,
315 expires_at,
316 );
317 let bind_body = serialize_bind_temp_auth_key(perm_key_id, nonce, expires_at, &enc_msg);
318
319 let wire = temp_enc.pack_body_at_msg_id(&bind_body, msg_id);
321 Self::send_abridged(stream, &wire, kind).await?;
322
323 for attempt in 0u8..5 {
327 let mut raw = Self::recv_abridged(stream, kind).await?;
328 let decrypted = temp_enc.unpack(&mut raw).map_err(|e| {
329 InvocationError::Deserialize(format!("PFS pool bind decrypt: {e:?}"))
330 })?;
331 match ferogram_connect::decode_bind_response(&decrypted.body) {
332 Ok(()) => {
333 return Ok(temp_enc);
337 }
338 Err(ref e) if e == "__need_more__" => {
339 tracing::debug!(
340 "[ferogram::sender] PFS (DC{dc_id}): got informational frame on attempt {attempt}, reading next"
341 );
342 continue;
343 }
344 Err(reason) => {
345 tracing::error!(
346 "[ferogram::sender] PFS bind rejected by server for DC{dc_id}: {reason}"
347 );
348 return Err(InvocationError::Deserialize(format!(
349 "auth.bindTempAuthKey (pool): {reason}"
350 )));
351 }
352 }
353 }
354 Err(InvocationError::Deserialize(
355 "auth.bindTempAuthKey (pool): no boolTrue after 5 frames".into(),
356 ))
357 }
358
359 pub fn auth_key_bytes(&self) -> [u8; 256] {
364 self.enc.auth_key_bytes()
365 }
366 pub fn first_salt(&self) -> i64 {
368 self.enc.salt
369 }
370 pub fn time_offset(&self) -> i32 {
372 self.enc.time_offset
373 }
374
375 pub(crate) fn into_parts(self) -> (TcpStream, FrameKind, EncryptedSession) {
385 (self.stream, self.frame_kind, self.enc)
386 }
387
388 #[tracing::instrument(skip(self, req), fields(method = std::any::type_name::<R>()))]
395 pub async fn rpc_call<R: RemoteCall>(&mut self, req: &R) -> Result<Vec<u8>, InvocationError> {
396 let _t0 = std::time::Instant::now();
397 self.call_count += 1;
400 if self.call_count.is_multiple_of(PING_EVERY_N_CHUNKS) {
401 let ping_id = self.call_count as i64;
402 let ping_body = build_msgs_ack_ping_body(ping_id);
403 let (ping_wire, _) = self.enc.pack_body_with_msg_id(&ping_body, true);
405 let _ = Self::send_abridged(&mut self.stream, &ping_wire, &mut self.frame_kind).await;
413 }
414
415 if !self.pending_acks.is_empty() {
417 let ack_body = build_msgs_ack_body(&self.pending_acks);
418 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
419 let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
420 self.pending_acks.clear();
421 }
422
423 let (wire, mut sent_msg_id) = self.enc.pack_with_msg_id(req);
425 Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
426 let mut salt_retries = 0u8;
427 let mut session_resets = 0u8;
428 loop {
429 let mut raw = Self::recv_abridged(&mut self.stream, &mut self.frame_kind).await?;
430 let msg = self
431 .enc
432 .unpack(&mut raw)
433 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
434 self.pending_acks.push(msg.msg_id);
436 if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
437 let ack_body = build_msgs_ack_body(&self.pending_acks);
440 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
441 let _ =
442 Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
443 self.pending_acks.clear();
444 }
445 if msg.body.len() < 4 {
447 return Ok(msg.body);
448 }
449 let mut need_resend = false;
450 let mut need_session_reset = false;
451 let mut bad_msg_code: Option<u32> = None;
452 let mut bad_msg_server_id: Option<i64> = None;
453 let scan_result = Self::scan_body(
456 &msg.body,
457 &mut self.enc.salt,
458 &mut need_resend,
459 &mut need_session_reset,
460 &mut bad_msg_code,
461 &mut bad_msg_server_id,
462 Some(sent_msg_id),
463 msg.msg_id,
464 )?;
465 if need_session_reset {
467 session_resets += 1;
468 if session_resets > 2 {
469 return Err(InvocationError::Deserialize(
470 "new_session_created: exceeded 2 resets".into(),
471 ));
472 }
473 if !self.pending_acks.is_empty() {
474 let ack_body = build_msgs_ack_body(&self.pending_acks);
475 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
476 let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
477 .await;
478 self.pending_acks.clear();
479 }
480 if scan_result.is_none() {
484 tracing::debug!(
486 "[ferogram::sender] new_session_created: resending request (attempt {session_resets}/2)"
487 );
488 let (wire, new_id) = self.enc.pack_with_msg_id(req);
489 sent_msg_id = new_id;
490 Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
491 }
492 } else if need_resend {
496 match bad_msg_code {
498 Some(16) | Some(17) => {
499 if let Some(srv_id) = bad_msg_server_id {
500 self.enc.correct_time_offset(srv_id);
501 }
502 }
507 Some(32) | Some(33) => {
508 self.enc
511 .correct_seq_no(bad_msg_code.expect("matched Some arm"));
512 }
513 _ => {
514 self.enc.undo_seq_no();
516 }
517 }
518 salt_retries += 1;
519 if salt_retries >= 5 {
520 return Err(InvocationError::Deserialize(
521 "bad_server_salt/bad_msg: exceeded 5 retries".into(),
522 ));
523 }
524 tracing::debug!(
525 "[ferogram::sender] resending transfer request after bad_msg correction (code={bad_msg_code:?}, attempt {salt_retries}/5)"
526 );
527 if !self.pending_acks.is_empty() {
528 let ack_body = build_msgs_ack_body(&self.pending_acks);
529 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
530 let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
531 .await;
532 self.pending_acks.clear();
533 }
534 let (wire, new_id) = self.enc.pack_with_msg_id(req);
535 sent_msg_id = new_id;
536 Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
537 }
538 if let Some(result) = scan_result {
539 metrics::counter!("ferogram.rpc_calls_total", "result" => "ok").increment(1);
540 metrics::histogram!("ferogram.rpc_latency_ms")
541 .record(_t0.elapsed().as_millis() as f64);
542 return Ok(result);
543 }
544 }
545 }
546 #[allow(clippy::too_many_arguments)]
559 fn scan_body(
560 body: &[u8],
561 salt: &mut i64,
562 need_resend: &mut bool,
563 need_session_reset: &mut bool,
564 bad_msg_code: &mut Option<u32>,
565 bad_msg_server_id: &mut Option<i64>,
566 sent_msg_id: Option<i64>,
567 server_msg_id: i64,
568 ) -> Result<Option<Vec<u8>>, InvocationError> {
569 if body.len() < 4 {
570 return Ok(None);
571 }
572 let cid = u32::from_le_bytes(body[..4].try_into().expect("body.len() >= 4 checked above"));
573 match cid {
574 0xf35c6d01 => {
575 if body.len() >= 12
576 && let Some(expected) = sent_msg_id {
577 let resp_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 12 checked above"));
578 if resp_id != expected {
579 tracing::debug!(
580 "[ferogram::sender] rpc_result msg_id mismatch (got {resp_id:#018x}, want {expected:#018x}); skipping this frame"
581 );
582 return Ok(None);
583 }
584 }
585 let inner = if body.len() >= 12 { &body[12..] } else { body };
586 if inner.len() >= 4
588 && u32::from_le_bytes(inner[..4].try_into().expect("inner.len() >= 4 checked above")) == 0x3072cfa1
589 {
590 let mut dummy_salt = *salt;
591 let mut nr = false; let mut nsr = false;
592 let mut bc = None; let mut bsi = None;
593 if let Some(r) = Self::scan_body(inner, &mut dummy_salt, &mut nr, &mut nsr, &mut bc, &mut bsi, None, server_msg_id)? {
594 return Ok(Some(r));
595 }
596 if let Some(compressed) = ferogram_connect::tl_read_bytes(&inner[4..])
598 && let Ok(out) = ferogram_connect::gz_inflate(&compressed)
599 {
600 return Ok(Some(out));
601 }
602 return Ok(None);
603 }
604 if inner.len() >= 8
605 && u32::from_le_bytes(inner[..4].try_into().expect("inner.len() >= 8 checked above")) == 0x2144ca19
606 {
607 let code = i32::from_le_bytes(inner[4..8].try_into().expect("inner.len() >= 8 checked above"));
608 let message = ferogram_connect::tl_read_string(&inner[8..]).unwrap_or_default();
609 return Err(InvocationError::Rpc(
610 crate::errors::RpcError::from_telegram(code, &message),
611 ));
612 }
613 Ok(Some(inner.to_vec()))
614 }
615 0x2144ca19 => {
616 if body.len() < 8 {
617 return Err(InvocationError::Deserialize("rpc_error short".into()));
618 }
619 let code = i32::from_le_bytes(body[4..8].try_into().expect("body.len() >= 8 checked above"));
620 let message = ferogram_connect::tl_read_string(&body[8..]).unwrap_or_default();
621 Err(InvocationError::Rpc(crate::errors::RpcError::from_telegram(code, &message)))
622 }
623 0xedab447b => {
624 if body.len() >= 28 {
626 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 28 checked above"));
627 let new_salt = i64::from_le_bytes(body[20..28].try_into().expect("body.len() >= 28 checked above"));
628 if sent_msg_id.is_none_or(|id| id == bad_msg_id) {
631 *salt = new_salt;
632 *need_resend = true;
633 }
634 }
635 Ok(None)
636 }
637 0x9ec20908 => {
638 if body.len() >= 28 {
641 let first_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 28 checked above"));
642 let unique_id = i64::from_le_bytes(body[12..20].try_into().expect("body.len() >= 28 checked above"));
643 let server_salt = i64::from_le_bytes(body[20..28].try_into().expect("body.len() >= 28 checked above"));
644 tracing::debug!(
645 unique_id = format_args!("{unique_id:#018x}"),
646 first_msg_id,
647 salt = server_salt,
648 "[ferogram::sender] new_session_created: server opened fresh session"
649 );
650 *salt = server_salt;
651 if sent_msg_id.is_some_and(|id| id < first_msg_id) {
657 *need_session_reset = true;
658 }
659 }
660 Ok(None)
661 }
662 0xa7eff811 => {
663 if body.len() >= 20 {
669 let bad_msg_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 20 checked above"));
670 let error_code = u32::from_le_bytes(body[16..20].try_into().expect("body.len() >= 20 checked above"));
672 tracing::debug!(
673 bad_msg_id = format_args!("{bad_msg_id:#018x}"),
674 error_code,
675 "[ferogram::sender] bad_msg_notification received"
676 );
677 match error_code {
678 16 | 17 => {
679 *bad_msg_code = Some(error_code);
683 *bad_msg_server_id = Some(server_msg_id);
684 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
685 }
686 32 | 33 => {
687 *bad_msg_code = Some(error_code);
689 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
690 }
691 48 => {
692 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
699 tracing::debug!(
700 "[ferogram::sender] bad_msg code 48 (wrong server salt): will resend with updated salt"
701 );
702 }
703 _ => {
704 *need_resend = sent_msg_id.is_none_or(|id| id == bad_msg_id);
706 }
707 }
708 }
709 Ok(None)
710 }
711 0x347773c5 => {
712 if body.len() >= 12
718 && let Some(expected) = sent_msg_id
719 {
720 let pong_req_id = i64::from_le_bytes(body[4..12].try_into().expect("body.len() >= 12 for pong"));
721 if pong_req_id == expected {
722 return Ok(Some(body.to_vec()));
723 }
724 }
725 Ok(None)
727 }
728 0x73f1f8dc => {
729 if body.len() < 8 {
730 return Ok(None);
731 }
732 let count = u32::from_le_bytes(body[4..8].try_into().expect("body.len() >= 8 for msg_container")) as usize;
733 let mut pos = 8usize;
734 let mut found: Option<Vec<u8>> = None;
737 for _ in 0..count {
738 if pos + 16 > body.len() { break; }
739 let inner_bytes =
740 u32::from_le_bytes(body[pos + 12..pos + 16].try_into().expect("pos+16 <= body.len() checked above")) as usize;
741 pos += 16;
742 if pos + inner_bytes > body.len() { break; }
743 let inner = &body[pos..pos + inner_bytes];
744 pos += inner_bytes;
745 if found.is_none() {
746 if let Some(r) = Self::scan_body(inner, salt, need_resend,
747 need_session_reset, bad_msg_code, bad_msg_server_id, sent_msg_id,
748 server_msg_id)?
749 {
750 found = Some(r);
751 }
754 } else {
755 let _ = Self::scan_body(inner, salt, need_resend, need_session_reset,
761 bad_msg_code, bad_msg_server_id, sent_msg_id,
762 server_msg_id)?;
763 }
764 }
765 Ok(found)
766 }
767 0x3072cfa1 => {
768 if let Some(compressed) = ferogram_connect::tl_read_bytes(&body[4..])
770 && let Ok(decompressed) = ferogram_connect::gz_inflate(&compressed)
771 && !decompressed.is_empty()
772 {
773 return Self::scan_body(
774 &decompressed, salt,
775 need_resend, need_session_reset,
776 bad_msg_code, bad_msg_server_id,
777 sent_msg_id,
778 server_msg_id,
779 );
780 }
781 Ok(None)
782 }
783 _ => Ok(None),
784 }
785 }
786
787 pub async fn rpc_call_serializable<S: ferogram_tl_types::Serializable>(
789 &mut self,
790 req: &S,
791 ) -> Result<Vec<u8>, InvocationError> {
792 if !self.pending_acks.is_empty() {
793 let ack_body = build_msgs_ack_body(&self.pending_acks);
794 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
795 let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
796 self.pending_acks.clear();
797 }
798 let (wire, mut sent_msg_id) = self.enc.pack_serializable_with_msg_id(req);
799 Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
800 let mut salt_retries = 0u8;
801 let mut session_resets = 0u8;
802 loop {
803 let mut raw = Self::recv_abridged(&mut self.stream, &mut self.frame_kind).await?;
804 let msg = self
805 .enc
806 .unpack(&mut raw)
807 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
808 self.pending_acks.push(msg.msg_id);
809 if self.pending_acks.len() >= PENDING_ACKS_THRESHOLD {
810 let ack_body = build_msgs_ack_body(&self.pending_acks);
811 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
812 let _ =
813 Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind).await;
814 self.pending_acks.clear();
815 }
816 if msg.body.len() < 4 {
818 return Ok(msg.body);
819 }
820 let mut need_resend = false;
821 let mut need_session_reset = false;
822 let mut bad_msg_code: Option<u32> = None;
823 let mut bad_msg_server_id: Option<i64> = None;
824 let scan_result = Self::scan_body(
826 &msg.body,
827 &mut self.enc.salt,
828 &mut need_resend,
829 &mut need_session_reset,
830 &mut bad_msg_code,
831 &mut bad_msg_server_id,
832 Some(sent_msg_id),
833 msg.msg_id,
834 )?;
835 if need_session_reset {
836 session_resets += 1;
837 if session_resets > 2 {
838 return Err(InvocationError::Deserialize(
839 "new_session_created (serializable): exceeded 2 resets".into(),
840 ));
841 }
842 if !self.pending_acks.is_empty() {
843 let ack_body = build_msgs_ack_body(&self.pending_acks);
844 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
845 let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
846 .await;
847 self.pending_acks.clear();
848 }
849 if scan_result.is_none() {
850 let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
851 sent_msg_id = new_id;
852 Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
853 }
854 } else if need_resend {
855 match bad_msg_code {
856 Some(16) | Some(17) => {
857 if let Some(srv_id) = bad_msg_server_id {
858 self.enc.correct_time_offset(srv_id);
859 }
860 }
862 Some(32) | Some(33) => {
863 self.enc
864 .correct_seq_no(bad_msg_code.expect("matched Some arm"));
865 }
866 _ => {
867 self.enc.undo_seq_no();
868 }
869 }
870 salt_retries += 1;
871 if salt_retries >= 5 {
872 return Err(InvocationError::Deserialize(
873 "bad_server_salt (serializable): exceeded 5 retries".into(),
874 ));
875 }
876 tracing::debug!(
877 "[ferogram::sender] resending serializable request after bad_msg correction (code={bad_msg_code:?}, attempt {salt_retries}/5)"
878 );
879 if !self.pending_acks.is_empty() {
880 let ack_body = build_msgs_ack_body(&self.pending_acks);
881 let (ack_wire, _) = self.enc.pack_body_with_msg_id(&ack_body, false);
882 let _ = Self::send_abridged(&mut self.stream, &ack_wire, &mut self.frame_kind)
883 .await;
884 self.pending_acks.clear();
885 }
886 let (wire, new_id) = self.enc.pack_serializable_with_msg_id(req);
887 sent_msg_id = new_id;
888 Self::send_abridged(&mut self.stream, &wire, &mut self.frame_kind).await?;
889 }
890 if let Some(result) = scan_result {
891 return Ok(result);
892 }
893 }
894 }
895
896 pub async fn rpc_call_raw(&mut self, body: &[u8]) -> Result<Vec<u8>, InvocationError> {
899 Self::send_abridged(&mut self.stream, body, &mut self.frame_kind).await?;
900 Self::recv_abridged(&mut self.stream, &mut self.frame_kind).await
901 }
902
903 async fn send_abridged(
906 stream: &mut TcpStream,
907 data: &[u8],
908 kind: &mut FrameKind,
909 ) -> Result<(), InvocationError> {
910 use tokio::io::AsyncWriteExt as _;
911 match kind {
912 FrameKind::Abridged => {
913 let words = data.len() / 4;
914 let mut frame = if words < 0x7f {
915 let mut v = Vec::with_capacity(1 + data.len());
916 v.push(words as u8);
917 v
918 } else {
919 let mut v = Vec::with_capacity(4 + data.len());
920 v.extend_from_slice(&[
921 0x7f,
922 (words & 0xff) as u8,
923 ((words >> 8) & 0xff) as u8,
924 ((words >> 16) & 0xff) as u8,
925 ]);
926 v
927 };
928 frame.extend_from_slice(data);
929 stream.write_all(&frame).await?;
930 }
931 FrameKind::Intermediate => {
932 let mut frame = Vec::with_capacity(4 + data.len());
933 frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
934 frame.extend_from_slice(data);
935 stream.write_all(&frame).await?;
936 }
937 FrameKind::Full { send_seqno, .. } => {
938 let seq = send_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
939 let total_len = (data.len() as u32) + 12;
940 let mut packet = Vec::with_capacity(total_len as usize);
941 packet.extend_from_slice(&total_len.to_le_bytes());
942 packet.extend_from_slice(&seq.to_le_bytes());
943 packet.extend_from_slice(data);
944 let crc = ferogram_connect::crc32_ieee(&packet);
945 packet.extend_from_slice(&crc.to_le_bytes());
946 stream.write_all(&packet).await?;
947 }
948 FrameKind::Obfuscated { cipher } => {
949 let words = data.len() / 4;
950 let mut frame = if words < 0x7f {
951 let mut v = Vec::with_capacity(1 + data.len());
952 v.push(words as u8);
953 v
954 } else {
955 let mut v = Vec::with_capacity(4 + data.len());
956 v.extend_from_slice(&[
957 0x7f,
958 (words & 0xff) as u8,
959 ((words >> 8) & 0xff) as u8,
960 ((words >> 16) & 0xff) as u8,
961 ]);
962 v
963 };
964 frame.extend_from_slice(data);
965 cipher.lock().await.encrypt(&mut frame);
966 stream.write_all(&frame).await?;
967 }
968 FrameKind::PaddedIntermediate { cipher } => {
969 let mut pad_len_buf = [0u8; 1];
970 ferogram_crypto::fill_random(&mut pad_len_buf);
971 let pad_len = (pad_len_buf[0] & 0x0f) as usize;
972 let total_payload = data.len() + pad_len;
973 let mut frame = Vec::with_capacity(4 + total_payload);
974 frame.extend_from_slice(&(total_payload as u32).to_le_bytes());
975 frame.extend_from_slice(data);
976 let mut pad = vec![0u8; pad_len];
977 ferogram_crypto::fill_random(&mut pad);
978 frame.extend_from_slice(&pad);
979 cipher.lock().await.encrypt(&mut frame);
980 stream.write_all(&frame).await?;
981 }
982 FrameKind::FakeTls { cipher } => {
983 const TLS_APP_DATA: u8 = 0x17;
984 const TLS_VER: [u8; 2] = [0x03, 0x03];
985 const CHUNK: usize = 2878;
986 let mut locked = cipher.lock().await;
987 for chunk in data.chunks(CHUNK) {
988 let chunk_len = chunk.len() as u16;
989 let mut record = Vec::with_capacity(5 + chunk.len());
990 record.push(TLS_APP_DATA);
991 record.extend_from_slice(&TLS_VER);
992 record.extend_from_slice(&chunk_len.to_be_bytes());
993 record.extend_from_slice(chunk);
994 locked.encrypt(&mut record[5..]);
995 stream.write_all(&record).await?;
996 }
997 }
998 }
999 Ok(())
1000 }
1001
1002 async fn recv_abridged(
1004 stream: &mut TcpStream,
1005 kind: &mut FrameKind,
1006 ) -> Result<Vec<u8>, InvocationError> {
1007 use tokio::time::{Duration, timeout};
1008 const RECV_TIMEOUT: Duration = Duration::from_secs(60);
1009
1010 macro_rules! tread {
1011 ($buf:expr) => {
1012 timeout(RECV_TIMEOUT, stream.read_exact($buf))
1013 .await
1014 .map_err(|_| {
1015 InvocationError::Io(std::io::Error::new(
1016 std::io::ErrorKind::TimedOut,
1017 "transfer recv: timeout (60 s)",
1018 ))
1019 })??
1020 };
1021 }
1022
1023 match kind {
1024 FrameKind::Abridged => {
1025 let mut h = [0u8; 1];
1026 tread!(&mut h);
1027 let words = if h[0] == 0x7f {
1028 let mut b = [0u8; 3];
1029 tread!(&mut b);
1030 let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
1031 if w == 1 {
1032 let mut code_buf = [0u8; 4];
1033 tread!(&mut code_buf);
1034 let code = i32::from_le_bytes(code_buf);
1035 return Err(InvocationError::Rpc(
1036 crate::errors::RpcError::from_telegram(code, "transport error"),
1037 ));
1038 }
1039 w
1040 } else {
1041 h[0] as usize
1042 };
1043 let mut buf = vec![0u8; words * 4];
1044 tread!(&mut buf);
1045 if buf.len() == 4 {
1046 let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
1047 if code < 0 {
1048 return Err(InvocationError::Rpc(
1049 crate::errors::RpcError::from_telegram(code, "transport error"),
1050 ));
1051 }
1052 }
1053 Ok(buf)
1054 }
1055 FrameKind::Intermediate => {
1056 let mut len_buf = [0u8; 4];
1057 tread!(&mut len_buf);
1058 let len_i32 = i32::from_le_bytes(len_buf);
1059 if len_i32 < 0 {
1060 return Err(InvocationError::Rpc(
1061 crate::errors::RpcError::from_telegram(len_i32, "transport error"),
1062 ));
1063 }
1064 let mut buf = vec![0u8; len_i32 as usize];
1065 tread!(&mut buf);
1066 Ok(buf)
1067 }
1068 FrameKind::Full { recv_seqno, .. } => {
1069 let mut len_buf = [0u8; 4];
1070 tread!(&mut len_buf);
1071 let total_len_i32 = i32::from_le_bytes(len_buf);
1072 if total_len_i32 < 0 {
1073 return Err(InvocationError::Rpc(
1074 crate::errors::RpcError::from_telegram(total_len_i32, "transport error"),
1075 ));
1076 }
1077 let total_len = total_len_i32 as usize;
1078 if total_len < 12 {
1079 return Err(InvocationError::Deserialize(
1080 "Full transport: packet too short".into(),
1081 ));
1082 }
1083 let mut rest = vec![0u8; total_len - 4];
1084 tread!(&mut rest);
1085 let (body, crc_bytes) = rest.split_at(rest.len() - 4);
1086 let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
1087 let mut check_input = Vec::with_capacity(4 + body.len());
1088 check_input.extend_from_slice(&len_buf);
1089 check_input.extend_from_slice(body);
1090 let actual_crc = ferogram_connect::crc32_ieee(&check_input);
1091 if actual_crc != expected_crc {
1092 return Err(InvocationError::Deserialize(format!(
1093 "Full transport: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
1094 )));
1095 }
1096 let recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
1097 let expected_seq = recv_seqno.load(std::sync::atomic::Ordering::Relaxed);
1098 if recv_seq != expected_seq {
1099 return Err(InvocationError::Deserialize(format!(
1100 "Full transport: seqno mismatch (got {recv_seq}, expected {expected_seq})"
1101 )));
1102 }
1103 recv_seqno.store(
1104 expected_seq.wrapping_add(1),
1105 std::sync::atomic::Ordering::Relaxed,
1106 );
1107 Ok(body[4..].to_vec())
1108 }
1109 FrameKind::Obfuscated { cipher } => {
1110 let mut h = [0u8; 1];
1111 tread!(&mut h);
1112 cipher.lock().await.decrypt(&mut h);
1113 let words = if h[0] == 0x7f {
1114 let mut b = [0u8; 3];
1115 tread!(&mut b);
1116 cipher.lock().await.decrypt(&mut b);
1117 let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
1118 if w == 1 {
1119 let mut code_buf = [0u8; 4];
1120 tread!(&mut code_buf);
1121 cipher.lock().await.decrypt(&mut code_buf);
1122 let code = i32::from_le_bytes(code_buf);
1123 return Err(InvocationError::Rpc(
1124 crate::errors::RpcError::from_telegram(code, "transport error"),
1125 ));
1126 }
1127 w
1128 } else {
1129 h[0] as usize
1130 };
1131 let mut buf = vec![0u8; words * 4];
1132 tread!(&mut buf);
1133 cipher.lock().await.decrypt(&mut buf);
1134 if buf.len() == 4 {
1135 let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
1136 if code < 0 {
1137 return Err(InvocationError::Rpc(
1138 crate::errors::RpcError::from_telegram(code, "transport error"),
1139 ));
1140 }
1141 }
1142 Ok(buf)
1143 }
1144 FrameKind::PaddedIntermediate { cipher } => {
1145 let mut len_buf = [0u8; 4];
1146 tread!(&mut len_buf);
1147 cipher.lock().await.decrypt(&mut len_buf);
1148 let total_len = i32::from_le_bytes(len_buf);
1149 if total_len < 0 {
1150 return Err(InvocationError::Rpc(
1151 crate::errors::RpcError::from_telegram(total_len, "transport error"),
1152 ));
1153 }
1154 let mut buf = vec![0u8; total_len as usize];
1155 tread!(&mut buf);
1156 cipher.lock().await.decrypt(&mut buf);
1157 if buf.len() >= 24 {
1158 let pad = (buf.len() - 24) % 16;
1159 buf.truncate(buf.len() - pad);
1160 }
1161 Ok(buf)
1162 }
1163 FrameKind::FakeTls { cipher } => {
1164 let mut hdr = [0u8; 5];
1165 tread!(&mut hdr);
1166 if hdr[0] != 0x17 {
1167 return Err(InvocationError::Deserialize(format!(
1168 "FakeTLS: unexpected record type 0x{:02x}",
1169 hdr[0]
1170 )));
1171 }
1172 let payload_len = u16::from_be_bytes([hdr[3], hdr[4]]) as usize;
1173 let mut buf = vec![0u8; payload_len];
1174 tread!(&mut buf);
1175 cipher.lock().await.decrypt(&mut buf);
1176 Ok(buf)
1177 }
1178 }
1179 }
1180
1181 async fn send_plain_frame(
1184 stream: &mut TcpStream,
1185 data: &[u8],
1186 kind: &mut FrameKind,
1187 ) -> Result<(), InvocationError> {
1188 let needs_align = matches!(kind, FrameKind::Abridged | FrameKind::Obfuscated { .. });
1191 if needs_align && !data.len().is_multiple_of(4) {
1192 let mut padded = data.to_vec();
1193 let pad = 4 - (data.len() % 4);
1194 padded.resize(data.len() + pad, 0);
1195 Self::send_abridged(stream, &padded, kind).await
1196 } else {
1197 Self::send_abridged(stream, data, kind).await
1198 }
1199 }
1200
1201 async fn recv_plain_frame<T: Deserializable>(
1202 stream: &mut TcpStream,
1203 kind: &mut FrameKind,
1204 ) -> Result<T, InvocationError> {
1205 let raw = Self::recv_abridged(stream, kind).await?;
1206 if raw.len() == 4 {
1207 let code = i32::from_le_bytes(raw[..4].try_into().unwrap());
1208 if code < 0 {
1209 return Err(InvocationError::Deserialize(format!(
1210 "server transport error during DH: code {code}"
1211 )));
1212 }
1213 }
1214 if raw.len() < 20 {
1215 return Err(InvocationError::Deserialize("plain frame too short".into()));
1216 }
1217 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
1218 return Err(InvocationError::Deserialize(
1219 "expected auth_key_id=0 in plaintext".into(),
1220 ));
1221 }
1222 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
1223 if raw.len() < 20 + body_len {
1224 return Err(InvocationError::Deserialize(format!(
1225 "plain frame truncated: have {} bytes, need {}",
1226 raw.len(),
1227 20 + body_len
1228 )));
1229 }
1230 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
1231 T::deserialize(&mut cur).map_err(Into::into)
1232 }
1233}