Skip to main content

ferogram_connect/
frame.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2//
3// ferogram: async Telegram MTProto client in Rust
4// https://github.com/ankit-chaubey/ferogram
5//
6// Licensed under either the MIT License or the Apache License 2.0.
7// See the LICENSE-MIT or LICENSE-APACHE file in this repository:
8// https://github.com/ankit-chaubey/ferogram
9//
10// Feel free to use, modify, and share this code.
11// Please keep this notice when redistributing.
12
13use tokio::io::{AsyncReadExt, AsyncWriteExt};
14use tokio::net::TcpStream;
15use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
16use tokio::time::Duration;
17
18use ferogram_tl_types as tl;
19use ferogram_tl_types::{Cursor, Deserializable};
20
21use crate::connection::{FrameKind, NO_PING_DISCONNECT, PING_DELAY_SECS};
22use crate::error::ConnectError;
23use crate::transport::{recv_abridged, send_abridged};
24use crate::util::random_i64;
25
26/// Outcome of a timed frame read attempt.
27pub enum FrameOutcome {
28    Frame(Vec<u8>),
29    Error(ConnectError),
30    Keepalive,
31}
32
33/// Send a framed message using the active transport kind.
34pub async fn send_frame(
35    stream: &mut TcpStream,
36    data: &[u8],
37    kind: &FrameKind,
38) -> Result<(), ConnectError> {
39    match kind {
40        FrameKind::Abridged => send_abridged(stream, data).await,
41        FrameKind::Intermediate => {
42            let mut frame = Vec::with_capacity(4 + data.len());
43            frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
44            frame.extend_from_slice(data);
45            stream.write_all(&frame).await?;
46            Ok(())
47        }
48        FrameKind::Full { send_seqno, .. } => {
49            // Full: [total_len(4)][seq(4)][payload][crc32(4)]
50            // total_len covers all 4 fields including itself.
51            let seq = send_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
52            let total_len = (data.len() as u32) + 12;
53            let mut packet = Vec::with_capacity(total_len as usize);
54            packet.extend_from_slice(&total_len.to_le_bytes());
55            packet.extend_from_slice(&seq.to_le_bytes());
56            packet.extend_from_slice(data);
57            let crc = crate::util::crc32_ieee(&packet);
58            packet.extend_from_slice(&crc.to_le_bytes());
59            stream.write_all(&packet).await?;
60            Ok(())
61        }
62        FrameKind::Obfuscated { cipher } => {
63            // Abridged framing with AES-256-CTR encryption over the whole frame.
64            let words = data.len() / 4;
65            let mut frame = if words < 0x7f {
66                let mut v = Vec::with_capacity(1 + data.len());
67                v.push(words as u8);
68                v
69            } else {
70                let mut v = Vec::with_capacity(4 + data.len());
71                v.extend_from_slice(&[
72                    0x7f,
73                    (words & 0xff) as u8,
74                    ((words >> 8) & 0xff) as u8,
75                    ((words >> 16) & 0xff) as u8,
76                ]);
77                v
78            };
79            frame.extend_from_slice(data);
80            cipher.lock().await.encrypt(&mut frame);
81            stream.write_all(&frame).await?;
82            Ok(())
83        }
84        FrameKind::PaddedIntermediate { cipher } => {
85            // Intermediate framing + 0-15 random padding bytes, encrypted.
86            let mut pad_len_buf = [0u8; 1];
87            getrandom::getrandom(&mut pad_len_buf).ok();
88            let pad_len = (pad_len_buf[0] & 0x0f) as usize;
89            let total_payload = data.len() + pad_len;
90            let mut frame = Vec::with_capacity(4 + total_payload);
91            frame.extend_from_slice(&(total_payload as u32).to_le_bytes());
92            frame.extend_from_slice(data);
93            let mut pad = vec![0u8; pad_len];
94            getrandom::getrandom(&mut pad).ok();
95            frame.extend_from_slice(&pad);
96            cipher.lock().await.encrypt(&mut frame);
97            stream.write_all(&frame).await?;
98            Ok(())
99        }
100        FrameKind::FakeTls { cipher } => {
101            // Wrap each MTProto message as a TLS Application Data record (type 0x17).
102            // Telegram's FakeTLS sends one MTProto frame per TLS record, encrypted
103            // with the Obfuscated2 cipher (no real TLS encryption).
104            const TLS_APP_DATA: u8 = 0x17;
105            const TLS_VER: [u8; 2] = [0x03, 0x03];
106            // Split into 2878-byte chunks per TLS record framing.
107            const CHUNK: usize = 2878;
108            let mut locked = cipher.lock().await;
109            for chunk in data.chunks(CHUNK) {
110                let chunk_len = chunk.len() as u16;
111                let mut record = Vec::with_capacity(5 + chunk.len());
112                record.push(TLS_APP_DATA);
113                record.extend_from_slice(&TLS_VER);
114                record.extend_from_slice(&chunk_len.to_be_bytes());
115                record.extend_from_slice(chunk);
116                // Encrypt only the payload portion (after the 5-byte header).
117                locked.encrypt(&mut record[5..]);
118                stream.write_all(&record).await?;
119            }
120            Ok(())
121        }
122    }
123}
124
125// Split-reader helpers
126
127/// Read one frame with a 60-second keepalive timeout (PING_DELAY_SECS).
128///
129/// If the timeout fires we send a `PingDelayDisconnect`: this tells Telegram
130/// to forcibly close the connection after `NO_PING_DISCONNECT` seconds of
131/// silence, giving us a clean EOF to detect rather than a silently stale socket.
132/// That mirrors what both  and the official Telegram clients do.
133pub async fn recv_frame_with_keepalive(
134    rh: &mut OwnedReadHalf,
135    fk: &FrameKind,
136    writer: &tokio::sync::Mutex<crate::connection::ConnectionWriter>,
137    write_half: &tokio::sync::Mutex<OwnedWriteHalf>,
138) -> FrameOutcome {
139    match tokio::time::timeout(
140        Duration::from_secs(PING_DELAY_SECS),
141        recv_frame_read(rh, fk),
142    )
143    .await
144    {
145        Ok(Ok(raw)) => FrameOutcome::Frame(raw),
146        Ok(Err(e)) => FrameOutcome::Error(e),
147        Err(_) => {
148            // Keepalive timeout: send PingDelayDisconnect so Telegram closes the
149            // connection cleanly (EOF) if it hears nothing for NO_PING_DISCONNECT
150            // seconds, rather than leaving a silently stale socket.
151            let ping_req = tl::functions::PingDelayDisconnect {
152                ping_id: random_i64(),
153                disconnect_delay: NO_PING_DISCONNECT,
154            };
155            let (wire, fk) = {
156                let mut w = writer.lock().await;
157                let fk = w.frame_kind.clone();
158                (w.enc.pack(&ping_req), fk)
159            };
160            match send_frame_write(&mut *write_half.lock().await, &wire, &fk).await {
161                Ok(()) => FrameOutcome::Keepalive,
162                Err(e) => FrameOutcome::Error(e),
163            }
164        }
165    }
166}
167
168/// Send a framed message via an OwnedWriteHalf (split connection).
169///
170/// Header and payload are combined into a single Vec before calling
171/// write_all, reducing write syscalls from 2 -> 1 per frame.  With Abridged
172/// framing this previously sent a 1-byte header then the payload in separate
173/// syscalls (and two TCP segments even with TCP_NODELAY on fast paths).
174pub async fn send_frame_write(
175    stream: &mut OwnedWriteHalf,
176    data: &[u8],
177    kind: &FrameKind,
178) -> Result<(), ConnectError> {
179    match kind {
180        FrameKind::Abridged => {
181            let words = data.len() / 4;
182            // Build header + payload in one allocation -> single syscall.
183            let mut frame = if words < 0x7f {
184                let mut v = Vec::with_capacity(1 + data.len());
185                v.push(words as u8);
186                v
187            } else {
188                let mut v = Vec::with_capacity(4 + data.len());
189                v.extend_from_slice(&[
190                    0x7f,
191                    (words & 0xff) as u8,
192                    ((words >> 8) & 0xff) as u8,
193                    ((words >> 16) & 0xff) as u8,
194                ]);
195                v
196            };
197            frame.extend_from_slice(data);
198            stream.write_all(&frame).await?;
199            Ok(())
200        }
201        FrameKind::Intermediate => {
202            let mut frame = Vec::with_capacity(4 + data.len());
203            frame.extend_from_slice(&(data.len() as u32).to_le_bytes());
204            frame.extend_from_slice(data);
205            stream.write_all(&frame).await?;
206            Ok(())
207        }
208        FrameKind::Full { send_seqno, .. } => {
209            // Full: [total_len(4)][seq(4)][payload][crc32(4)]
210            let seq = send_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
211            let total_len = (data.len() as u32) + 12;
212            let mut packet = Vec::with_capacity(total_len as usize);
213            packet.extend_from_slice(&total_len.to_le_bytes());
214            packet.extend_from_slice(&seq.to_le_bytes());
215            packet.extend_from_slice(data);
216            let crc = crate::util::crc32_ieee(&packet);
217            packet.extend_from_slice(&crc.to_le_bytes());
218            stream.write_all(&packet).await?;
219            Ok(())
220        }
221        FrameKind::Obfuscated { cipher } => {
222            // Abridged framing + AES-256-CTR encryption (cipher stored).
223            let words = data.len() / 4;
224            let mut frame = if words < 0x7f {
225                let mut v = Vec::with_capacity(1 + data.len());
226                v.push(words as u8);
227                v
228            } else {
229                let mut v = Vec::with_capacity(4 + data.len());
230                v.extend_from_slice(&[
231                    0x7f,
232                    (words & 0xff) as u8,
233                    ((words >> 8) & 0xff) as u8,
234                    ((words >> 16) & 0xff) as u8,
235                ]);
236                v
237            };
238            frame.extend_from_slice(data);
239            cipher.lock().await.encrypt(&mut frame);
240            stream.write_all(&frame).await?;
241            Ok(())
242        }
243        FrameKind::PaddedIntermediate { cipher } => {
244            let mut pad_len_buf = [0u8; 1];
245            getrandom::getrandom(&mut pad_len_buf).ok();
246            let pad_len = (pad_len_buf[0] & 0x0f) as usize;
247            let total_payload = data.len() + pad_len;
248            let mut frame = Vec::with_capacity(4 + total_payload);
249            frame.extend_from_slice(&(total_payload as u32).to_le_bytes());
250            frame.extend_from_slice(data);
251            let mut pad = vec![0u8; pad_len];
252            getrandom::getrandom(&mut pad).ok();
253            frame.extend_from_slice(&pad);
254            cipher.lock().await.encrypt(&mut frame);
255            stream.write_all(&frame).await?;
256            Ok(())
257        }
258        FrameKind::FakeTls { cipher } => {
259            const TLS_APP_DATA: u8 = 0x17;
260            const TLS_VER: [u8; 2] = [0x03, 0x03];
261            const CHUNK: usize = 2878;
262            let mut locked = cipher.lock().await;
263            for chunk in data.chunks(CHUNK) {
264                let chunk_len = chunk.len() as u16;
265                let mut record = Vec::with_capacity(5 + chunk.len());
266                record.push(TLS_APP_DATA);
267                record.extend_from_slice(&TLS_VER);
268                record.extend_from_slice(&chunk_len.to_be_bytes());
269                record.extend_from_slice(chunk);
270                locked.encrypt(&mut record[5..]);
271                stream.write_all(&record).await?;
272            }
273            Ok(())
274        }
275    }
276}
277
278/// Receive a framed message via an OwnedReadHalf (split connection).
279pub async fn recv_frame_read(
280    stream: &mut OwnedReadHalf,
281    kind: &FrameKind,
282) -> Result<Vec<u8>, ConnectError> {
283    match kind {
284        FrameKind::Abridged => {
285            // h[0] ranges: 0x00-0x7e = word count, 0x7f = extended, 0x80-0xFF = transport error
286            let mut h = [0u8; 1];
287            stream.read_exact(&mut h).await?;
288            let words = if h[0] < 0x7f {
289                h[0] as usize
290            } else if h[0] == 0x7f {
291                let mut b = [0u8; 3];
292                stream.read_exact(&mut b).await?;
293                let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
294                if w > 4 * 1024 * 1024 {
295                    return Err(ConnectError::other(format!(
296                        "abridged: implausible word count {w}"
297                    )));
298                }
299                w
300            } else {
301                let mut rest = [0u8; 3];
302                stream.read_exact(&mut rest).await?;
303                let code = i32::from_le_bytes([h[0], rest[0], rest[1], rest[2]]);
304                return Err(ConnectError::TransportCode(code));
305            };
306            if words == 0 {
307                return Err(ConnectError::other("abridged: zero-length frame"));
308            }
309            let mut buf = vec![0u8; words * 4];
310            stream.read_exact(&mut buf).await?;
311            if words == 1 {
312                let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
313                if code < 0 {
314                    return Err(ConnectError::TransportCode(code));
315                }
316            }
317            Ok(buf)
318        }
319        FrameKind::Intermediate => {
320            let mut len_buf = [0u8; 4];
321            stream.read_exact(&mut len_buf).await?;
322            let len_i32 = i32::from_le_bytes(len_buf);
323            if len_i32 < 0 {
324                return Err(ConnectError::TransportCode(len_i32));
325            }
326            if len_i32 <= 4 {
327                let mut code_buf = [0u8; 4];
328                stream.read_exact(&mut code_buf).await?;
329                let code = i32::from_le_bytes(code_buf);
330                return Err(ConnectError::TransportCode(code));
331            }
332            let len = len_i32 as usize;
333            let mut buf = vec![0u8; len];
334            stream.read_exact(&mut buf).await?;
335            Ok(buf)
336        }
337        FrameKind::Full { recv_seqno, .. } => {
338            let mut len_buf = [0u8; 4];
339            stream.read_exact(&mut len_buf).await?;
340            let total_len_i32 = i32::from_le_bytes(len_buf);
341            if total_len_i32 < 0 {
342                return Err(ConnectError::TransportCode(total_len_i32));
343            }
344            let total_len = total_len_i32 as usize;
345            if total_len < 12 {
346                return Err(ConnectError::other("Full transport: packet too short"));
347            }
348            let mut rest = vec![0u8; total_len - 4];
349            stream.read_exact(&mut rest).await?;
350            let (body, crc_bytes) = rest.split_at(rest.len() - 4);
351            let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
352            let mut check_input = Vec::with_capacity(4 + body.len());
353            check_input.extend_from_slice(&len_buf);
354            check_input.extend_from_slice(body);
355            let actual_crc = crate::util::crc32_ieee(&check_input);
356            if actual_crc != expected_crc {
357                return Err(ConnectError::other(format!(
358                    "Full transport: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
359                )));
360            }
361            let recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
362            let expected_seq = recv_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
363            if recv_seq != expected_seq {
364                return Err(ConnectError::other(format!(
365                    "Full transport: seqno mismatch (got {recv_seq}, expected {expected_seq})"
366                )));
367            }
368            Ok(body[4..].to_vec())
369        }
370        FrameKind::Obfuscated { cipher } => {
371            let mut h = [0u8; 1];
372            stream.read_exact(&mut h).await?;
373            cipher.lock().await.decrypt(&mut h);
374            let words = if h[0] < 0x7f {
375                h[0] as usize
376            } else if h[0] == 0x7f {
377                let mut b = [0u8; 3];
378                stream.read_exact(&mut b).await?;
379                cipher.lock().await.decrypt(&mut b);
380                let w = b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16;
381                if w > 4 * 1024 * 1024 {
382                    return Err(ConnectError::other(format!(
383                        "obfuscated: implausible word count {w}"
384                    )));
385                }
386                w
387            } else {
388                let mut rest = [0u8; 3];
389                stream.read_exact(&mut rest).await?;
390                cipher.lock().await.decrypt(&mut rest);
391                let code = i32::from_le_bytes([h[0], rest[0], rest[1], rest[2]]);
392                return Err(ConnectError::TransportCode(code));
393            };
394            if words == 0 {
395                return Err(ConnectError::other("obfuscated: zero-length frame"));
396            }
397            let mut buf = vec![0u8; words * 4];
398            stream.read_exact(&mut buf).await?;
399            cipher.lock().await.decrypt(&mut buf);
400            if words == 1 {
401                let code = i32::from_le_bytes(buf[..4].try_into().unwrap());
402                if code < 0 {
403                    return Err(ConnectError::TransportCode(code));
404                }
405            }
406            Ok(buf)
407        }
408        FrameKind::PaddedIntermediate { cipher } => {
409            // Read 4-byte encrypted length prefix, then payload+padding.
410            let mut len_buf = [0u8; 4];
411            stream.read_exact(&mut len_buf).await?;
412            cipher.lock().await.decrypt(&mut len_buf);
413            let total_len = i32::from_le_bytes(len_buf);
414            if total_len < 0 {
415                return Err(ConnectError::TransportCode(total_len));
416            }
417            let mut buf = vec![0u8; total_len as usize];
418            stream.read_exact(&mut buf).await?;
419            cipher.lock().await.decrypt(&mut buf);
420            if buf.len() >= 24 {
421                let pad = (buf.len() - 24) % 16;
422                buf.truncate(buf.len() - pad);
423            }
424            Ok(buf)
425        }
426        FrameKind::FakeTls { cipher } => {
427            // Read TLS Application Data record: 5-byte header + payload.
428            let mut hdr = [0u8; 5];
429            stream.read_exact(&mut hdr).await?;
430            if hdr[0] != 0x17 {
431                return Err(ConnectError::other(format!(
432                    "FakeTLS: unexpected record type 0x{:02x}",
433                    hdr[0]
434                )));
435            }
436            let payload_len = u16::from_be_bytes([hdr[3], hdr[4]]) as usize;
437            let mut buf = vec![0u8; payload_len];
438            stream.read_exact(&mut buf).await?;
439            cipher.lock().await.decrypt(&mut buf);
440            Ok(buf)
441        }
442    }
443}
444
445/// Receive a plaintext (pre-auth) frame and deserialize it.
446pub async fn recv_frame_plain<T: Deserializable>(
447    stream: &mut TcpStream,
448    kind: &FrameKind,
449) -> Result<T, ConnectError> {
450    // DH handshake uses the same transport framing as all other frames.
451    let raw = match kind {
452        FrameKind::Abridged => recv_abridged(stream).await?,
453        FrameKind::Intermediate => {
454            let mut len_buf = [0u8; 4];
455            stream.read_exact(&mut len_buf).await?;
456            let len = u32::from_le_bytes(len_buf) as usize;
457            if len == 0 || len > 1 << 24 {
458                return Err(ConnectError::other(format!(
459                    "plaintext frame: implausible length {len}"
460                )));
461            }
462            let mut buf = vec![0u8; len];
463            stream.read_exact(&mut buf).await?;
464            buf
465        }
466        FrameKind::Full { recv_seqno, .. } => {
467            // Full: [total_len(4)][seq(4)][payload][crc32(4)]
468            let mut len_buf = [0u8; 4];
469            stream.read_exact(&mut len_buf).await?;
470            let total_len = u32::from_le_bytes(len_buf) as usize;
471            if !(12..=(1 << 24) + 12).contains(&total_len) {
472                return Err(ConnectError::other(format!(
473                    "Full plaintext frame: implausible total_len {total_len}"
474                )));
475            }
476            let mut rest = vec![0u8; total_len - 4];
477            stream.read_exact(&mut rest).await?;
478
479            // Verify CRC-32.
480            let (body, crc_bytes) = rest.split_at(rest.len() - 4);
481            let expected_crc = u32::from_le_bytes(crc_bytes.try_into().unwrap());
482            let mut check_input = Vec::with_capacity(4 + body.len());
483            check_input.extend_from_slice(&len_buf);
484            check_input.extend_from_slice(body);
485            let actual_crc = crate::util::crc32_ieee(&check_input);
486            if actual_crc != expected_crc {
487                return Err(ConnectError::other(format!(
488                    "Full plaintext: CRC mismatch (got {actual_crc:#010x}, expected {expected_crc:#010x})"
489                )));
490            }
491
492            // Validate and advance seqno.
493            let recv_seq = u32::from_le_bytes(body[..4].try_into().unwrap());
494            let expected_seq = recv_seqno.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
495            if recv_seq != expected_seq {
496                return Err(ConnectError::other(format!(
497                    "Full plaintext: seqno mismatch (got {recv_seq}, expected {expected_seq})"
498                )));
499            }
500
501            body[4..].to_vec()
502        }
503        FrameKind::Obfuscated { cipher } => {
504            // Obfuscated2: Abridged framing with AES-256-CTR decryption.
505            let mut h = [0u8; 1];
506            stream.read_exact(&mut h).await?;
507            cipher.lock().await.decrypt(&mut h);
508            let words = if h[0] < 0x7f {
509                h[0] as usize
510            } else {
511                let mut b = [0u8; 3];
512                stream.read_exact(&mut b).await?;
513                cipher.lock().await.decrypt(&mut b);
514                b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
515            };
516            let mut buf = vec![0u8; words * 4];
517            stream.read_exact(&mut buf).await?;
518            cipher.lock().await.decrypt(&mut buf);
519            buf
520        }
521        FrameKind::PaddedIntermediate { cipher } => {
522            let mut len_buf = [0u8; 4];
523            stream.read_exact(&mut len_buf).await?;
524            cipher.lock().await.decrypt(&mut len_buf);
525            let len = u32::from_le_bytes(len_buf) as usize;
526            if len == 0 || len > 1 << 24 {
527                return Err(ConnectError::other(format!(
528                    "PaddedIntermediate plaintext: implausible length {len}"
529                )));
530            }
531            let mut buf = vec![0u8; len];
532            stream.read_exact(&mut buf).await?;
533            cipher.lock().await.decrypt(&mut buf);
534            buf
535        }
536        FrameKind::FakeTls { cipher } => {
537            let mut hdr = [0u8; 5];
538            stream.read_exact(&mut hdr).await?;
539            if hdr[0] != 0x17 {
540                return Err(ConnectError::other(format!(
541                    "FakeTLS plaintext: unexpected record type 0x{:02x}",
542                    hdr[0]
543                )));
544            }
545            let payload_len = u16::from_be_bytes([hdr[3], hdr[4]]) as usize;
546            let mut buf = vec![0u8; payload_len];
547            stream.read_exact(&mut buf).await?;
548            cipher.lock().await.decrypt(&mut buf);
549            buf
550        }
551    };
552    if raw.len() < 20 {
553        return Err(ConnectError::other("plaintext frame too short"));
554    }
555    if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
556        return Err(ConnectError::other("expected auth_key_id=0 in plaintext"));
557    }
558    let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
559    if 20 + body_len > raw.len() {
560        return Err(ConnectError::other(
561            "plaintext frame: body_len exceeds frame size",
562        ));
563    }
564    let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
565    T::deserialize(&mut cur).map_err(Into::into)
566}