Skip to main content

layer_client/
dc_pool.rs

1// Copyright (c) Ankit Chaubey <ankitchaubey.dev@gmail.com>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4// NOTE:
5// The "Layer" project is no longer maintained or supported.
6// Its original purpose for personal SDK/APK experimentation and learning
7// has been fulfilled.
8//
9// Please use Ferogram instead:
10// https://github.com/ankit-chaubey/ferogram
11// Ferogram will receive future updates and development, although progress
12// may be slower.
13//
14// Ferogram is an async Telegram MTProto client library written in Rust.
15// Its implementation follows the behaviour of the official Telegram clients,
16// particularly Telegram Desktop and TDLib, and aims to provide a clean and
17// modern async interface for building Telegram clients and tools.
18
19//! Multi-DC connection pool.
20//!
21//! Maintains one authenticated [`DcConnection`] per DC ID and routes RPC calls
22//! to the correct DC automatically.  Auth keys are shared from the home DC via
23//! `auth.exportAuthorization` / `auth.importAuthorization`.
24
25use layer_mtproto::{EncryptedSession, Session, authentication as auth};
26use layer_tl_types as tl;
27use layer_tl_types::{Cursor, Deserializable, RemoteCall};
28use std::collections::HashMap;
29use tokio::io::{AsyncReadExt, AsyncWriteExt};
30use tokio::net::TcpStream;
31
32use crate::{InvocationError, TransportKind, session::DcEntry};
33
34// DcConnection
35
36/// A single encrypted connection to one Telegram DC.
37pub struct DcConnection {
38    stream: TcpStream,
39    enc: EncryptedSession,
40}
41
42impl DcConnection {
43    /// Connect and perform full DH handshake.
44    pub async fn connect_raw(
45        addr: &str,
46        socks5: Option<&crate::socks5::Socks5Config>,
47        transport: &TransportKind,
48        dc_id: i16,
49    ) -> Result<Self, InvocationError> {
50        tracing::debug!("[dc_pool] Connecting to {addr} …");
51        let mut stream = Self::open_tcp(addr, socks5).await?;
52        Self::send_transport_init(&mut stream, transport, dc_id).await?;
53
54        let mut plain = Session::new();
55
56        let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
57        Self::send_plain_frame(&mut stream, &plain.pack(&req1).to_plaintext_bytes()).await?;
58        let res_pq: tl::enums::ResPq = Self::recv_plain_frame(&mut stream).await?;
59
60        let (req2, s2) = auth::step2(s1, res_pq, dc_id as i32)
61            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
62        Self::send_plain_frame(&mut stream, &plain.pack(&req2).to_plaintext_bytes()).await?;
63        let dh: tl::enums::ServerDhParams = Self::recv_plain_frame(&mut stream).await?;
64
65        let (req3, s3) =
66            auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
67        Self::send_plain_frame(&mut stream, &plain.pack(&req3).to_plaintext_bytes()).await?;
68        let ans: tl::enums::SetClientDhParamsAnswer = Self::recv_plain_frame(&mut stream).await?;
69
70        // Retry loop for dh_gen_retry (up to 5 attempts, mirroring tDesktop).
71        let done = {
72            let mut result =
73                auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
74            let mut attempts = 0u8;
75            loop {
76                match result {
77                    auth::FinishResult::Done(d) => break d,
78                    auth::FinishResult::Retry {
79                        retry_id,
80                        dh_params,
81                        nonce,
82                        server_nonce,
83                        new_nonce,
84                    } => {
85                        attempts += 1;
86                        if attempts >= 5 {
87                            return Err(InvocationError::Deserialize(
88                                "dh_gen_retry exceeded 5 attempts".into(),
89                            ));
90                        }
91                        let (req_retry, s3_retry) =
92                            auth::retry_step3(&dh_params, nonce, server_nonce, new_nonce, retry_id)
93                                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
94                        Self::send_plain_frame(
95                            &mut stream,
96                            &plain.pack(&req_retry).to_plaintext_bytes(),
97                        )
98                        .await?;
99                        let ans_retry: tl::enums::SetClientDhParamsAnswer =
100                            Self::recv_plain_frame(&mut stream).await?;
101                        result = auth::finish(s3_retry, ans_retry)
102                            .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
103                    }
104                }
105            }
106        };
107        tracing::debug!("[dc_pool] DH complete ✓ for {addr}");
108
109        Ok(Self {
110            stream,
111            enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
112        })
113    }
114
115    /// Connect with an already-known auth key (no DH needed).
116    pub async fn connect_with_key(
117        addr: &str,
118        auth_key: [u8; 256],
119        first_salt: i64,
120        time_offset: i32,
121        socks5: Option<&crate::socks5::Socks5Config>,
122        transport: &TransportKind,
123        dc_id: i16,
124    ) -> Result<Self, InvocationError> {
125        let mut stream = Self::open_tcp(addr, socks5).await?;
126        Self::send_transport_init(&mut stream, transport, dc_id).await?;
127        Ok(Self {
128            stream,
129            enc: EncryptedSession::new(auth_key, first_salt, time_offset),
130        })
131    }
132
133    async fn open_tcp(
134        addr: &str,
135        socks5: Option<&crate::socks5::Socks5Config>,
136    ) -> Result<TcpStream, InvocationError> {
137        match socks5 {
138            Some(proxy) => proxy.connect(addr).await,
139            None => Ok(TcpStream::connect(addr).await?),
140        }
141    }
142
143    async fn send_transport_init(
144        stream: &mut TcpStream,
145        transport: &TransportKind,
146        dc_id: i16,
147    ) -> Result<(), InvocationError> {
148        match transport {
149            TransportKind::Abridged => {
150                stream.write_all(&[0xef]).await?;
151            }
152            TransportKind::Intermediate => {
153                stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
154            }
155            TransportKind::Full => {}
156            TransportKind::Obfuscated { secret } => {
157                use sha2::Digest;
158                let mut nonce = [0u8; 64];
159                loop {
160                    getrandom::getrandom(&mut nonce)
161                        .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
162                    let first = u32::from_le_bytes(nonce[0..4].try_into().unwrap());
163                    let second = u32::from_le_bytes(nonce[4..8].try_into().unwrap());
164                    let bad = nonce[0] == 0xEF
165                        || first == 0x44414548
166                        || first == 0x54534F50
167                        || first == 0x20544547
168                        || first == 0xEEEEEEEE
169                        || first == 0xDDDDDDDD
170                        || first == 0x02010316
171                        || second == 0x00000000;
172                    if !bad {
173                        break;
174                    }
175                }
176                let tx_raw: [u8; 32] = nonce[8..40].try_into().unwrap();
177                let tx_iv: [u8; 16] = nonce[40..56].try_into().unwrap();
178                let mut rev48 = nonce[8..56].to_vec();
179                rev48.reverse();
180                let rx_raw: [u8; 32] = rev48[0..32].try_into().unwrap();
181                let rx_iv: [u8; 16] = rev48[32..48].try_into().unwrap();
182                let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
183                    let mut h = sha2::Sha256::new();
184                    h.update(tx_raw);
185                    h.update(s.as_ref());
186                    let tx: [u8; 32] = h.finalize().into();
187                    let mut h = sha2::Sha256::new();
188                    h.update(rx_raw);
189                    h.update(s.as_ref());
190                    let rx: [u8; 32] = h.finalize().into();
191                    (tx, rx)
192                } else {
193                    (tx_raw, rx_raw)
194                };
195                nonce[56] = 0xef;
196                nonce[57] = 0xef;
197                nonce[58] = 0xef;
198                nonce[59] = 0xef;
199                let dc_bytes = dc_id.to_le_bytes();
200                nonce[60] = dc_bytes[0];
201                nonce[61] = dc_bytes[1];
202                {
203                    let mut enc =
204                        layer_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
205                    let mut skip = [0u8; 56];
206                    enc.encrypt(&mut skip);
207                    enc.encrypt(&mut nonce[56..64]);
208                }
209                stream.write_all(&nonce).await?;
210            }
211            // PaddedIntermediate and FakeTls are handled by the main Connection path
212            // (lib.rs apply_transport_init).  DcPool connections always use the
213            // transport supplied by the caller if a 0xDD/0xEE proxy is used,
214            // the caller should open the stream through Connection::open_stream_mtproxy
215            // and not use DcPool::connect_raw.  Treat these as Abridged fallback so
216            // dc_pool.rs compiles cleanly for non-proxy aux-DC connections.
217            TransportKind::PaddedIntermediate { .. } | TransportKind::FakeTls { .. } => {
218                stream.write_all(&[0xef]).await?;
219            }
220        }
221        Ok(())
222    }
223
224    pub fn auth_key_bytes(&self) -> [u8; 256] {
225        self.enc.auth_key_bytes()
226    }
227    pub fn first_salt(&self) -> i64 {
228        self.enc.salt
229    }
230    pub fn time_offset(&self) -> i32 {
231        self.enc.time_offset
232    }
233
234    pub async fn rpc_call<R: RemoteCall>(&mut self, req: &R) -> Result<Vec<u8>, InvocationError> {
235        let wire = self.enc.pack(req);
236        Self::send_abridged(&mut self.stream, &wire).await?;
237        self.recv_rpc().await
238    }
239
240    async fn recv_rpc(&mut self) -> Result<Vec<u8>, InvocationError> {
241        loop {
242            let mut raw = Self::recv_abridged(&mut self.stream).await?;
243            let msg = self
244                .enc
245                .unpack(&mut raw)
246                .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
247            if msg.salt != 0 {
248                self.enc.salt = msg.salt;
249            }
250            if msg.body.len() < 4 {
251                return Ok(msg.body);
252            }
253            let cid = u32::from_le_bytes(msg.body[..4].try_into().unwrap());
254            match cid {
255                0xf35c6d01 /* rpc_result */ => {
256                    if msg.body.len() >= 12 { return Ok(msg.body[12..].to_vec()); }
257                    return Ok(msg.body);
258                }
259                0x2144ca19 /* rpc_error */ => {
260                    if msg.body.len() < 8 {
261                        return Err(InvocationError::Deserialize("rpc_error short".into()));
262                    }
263                    let code = i32::from_le_bytes(msg.body[4..8].try_into().unwrap());
264                    let message = tl_read_string(&msg.body[8..]).unwrap_or_default();
265                    return Err(InvocationError::Rpc(crate::RpcError::from_telegram(code, &message)));
266                }
267                0x347773c5 | 0x62d6b459 | 0x9ec20908 | 0xedab447b | 0xa7eff811 => continue,
268                _ => return Ok(msg.body),
269            }
270        }
271    }
272
273    async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
274        let words = data.len() / 4;
275        if words < 0x7f {
276            stream.write_all(&[words as u8]).await?;
277        } else {
278            stream
279                .write_all(&[
280                    0x7f,
281                    (words & 0xff) as u8,
282                    ((words >> 8) & 0xff) as u8,
283                    ((words >> 16) & 0xff) as u8,
284                ])
285                .await?;
286        }
287        stream.write_all(data).await?;
288        Ok(())
289    }
290
291    async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
292        let mut h = [0u8; 1];
293        stream.read_exact(&mut h).await?;
294        let words = if h[0] < 0x7f {
295            h[0] as usize
296        } else {
297            let mut b = [0u8; 3];
298            stream.read_exact(&mut b).await?;
299            b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
300        };
301        let mut buf = vec![0u8; words * 4];
302        stream.read_exact(&mut buf).await?;
303        Ok(buf)
304    }
305
306    async fn send_plain_frame(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
307        Self::send_abridged(stream, data).await
308    }
309
310    async fn recv_plain_frame<T: Deserializable>(
311        stream: &mut TcpStream,
312    ) -> Result<T, InvocationError> {
313        let raw = Self::recv_abridged(stream).await?;
314        if raw.len() < 20 {
315            return Err(InvocationError::Deserialize("plain frame too short".into()));
316        }
317        if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
318            return Err(InvocationError::Deserialize(
319                "expected auth_key_id=0 in plaintext".into(),
320            ));
321        }
322        let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
323        let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
324        T::deserialize(&mut cur).map_err(Into::into)
325    }
326}
327
328fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
329    if data.is_empty() {
330        return Some(vec![]);
331    }
332    let (len, start) = if data[0] < 254 {
333        (data[0] as usize, 1)
334    } else if data.len() >= 4 {
335        (
336            data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16,
337            4,
338        )
339    } else {
340        return None;
341    };
342    if data.len() < start + len {
343        return None;
344    }
345    Some(data[start..start + len].to_vec())
346}
347
348fn tl_read_string(data: &[u8]) -> Option<String> {
349    tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
350}
351
352// DcPool
353
354/// Pool of per-DC authenticated connections.
355pub struct DcPool {
356    conns: HashMap<i32, DcConnection>,
357    addrs: HashMap<i32, String>,
358    #[allow(dead_code)]
359    home_dc_id: i32,
360}
361
362impl DcPool {
363    pub fn new(home_dc_id: i32, dc_entries: &[DcEntry]) -> Self {
364        let addrs = dc_entries
365            .iter()
366            .map(|e| (e.dc_id, e.addr.clone()))
367            .collect();
368        Self {
369            conns: HashMap::new(),
370            addrs,
371            home_dc_id,
372        }
373    }
374
375    /// Returns true if a connection for `dc_id` already exists in the pool.
376    pub fn has_connection(&self, dc_id: i32) -> bool {
377        self.conns.contains_key(&dc_id)
378    }
379
380    /// Insert a pre-built connection into the pool.
381    pub fn insert(&mut self, dc_id: i32, conn: DcConnection) {
382        self.conns.insert(dc_id, conn);
383    }
384
385    /// Invoke a raw RPC call on the given DC.
386    pub async fn invoke_on_dc<R: RemoteCall>(
387        &mut self,
388        dc_id: i32,
389        _dc_entries: &[DcEntry],
390        req: &R,
391    ) -> Result<Vec<u8>, InvocationError> {
392        let conn = self
393            .conns
394            .get_mut(&dc_id)
395            .ok_or_else(|| InvocationError::Deserialize(format!("no connection for DC{dc_id}")))?;
396        conn.rpc_call(req).await
397    }
398
399    /// Update the address table (called after `initConnection`).
400    pub fn update_addrs(&mut self, entries: &[DcEntry]) {
401        for e in entries {
402            self.addrs.insert(e.dc_id, e.addr.clone());
403        }
404    }
405
406    /// Save the auth keys from pool connections back into the DC entry list.
407    pub fn collect_keys(&self, entries: &mut [DcEntry]) {
408        for e in entries.iter_mut() {
409            if let Some(conn) = self.conns.get(&e.dc_id) {
410                e.auth_key = Some(conn.auth_key_bytes());
411                e.first_salt = conn.first_salt();
412                e.time_offset = conn.time_offset();
413            }
414        }
415    }
416}