1use layer_mtproto::{EncryptedSession, Session, authentication as auth};
26use layer_tl_types as tl;
27use layer_tl_types::{Cursor, Deserializable, RemoteCall};
28use std::collections::HashMap;
29use tokio::io::{AsyncReadExt, AsyncWriteExt};
30use tokio::net::TcpStream;
31
32use crate::{InvocationError, TransportKind, session::DcEntry};
33
34pub struct DcConnection {
38 stream: TcpStream,
39 enc: EncryptedSession,
40}
41
42impl DcConnection {
43 pub async fn connect_raw(
45 addr: &str,
46 socks5: Option<&crate::socks5::Socks5Config>,
47 transport: &TransportKind,
48 dc_id: i16,
49 ) -> Result<Self, InvocationError> {
50 tracing::debug!("[dc_pool] Connecting to {addr} …");
51 let mut stream = Self::open_tcp(addr, socks5).await?;
52 Self::send_transport_init(&mut stream, transport, dc_id).await?;
53
54 let mut plain = Session::new();
55
56 let (req1, s1) = auth::step1().map_err(|e| InvocationError::Deserialize(e.to_string()))?;
57 Self::send_plain_frame(&mut stream, &plain.pack(&req1).to_plaintext_bytes()).await?;
58 let res_pq: tl::enums::ResPq = Self::recv_plain_frame(&mut stream).await?;
59
60 let (req2, s2) = auth::step2(s1, res_pq, dc_id as i32)
61 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
62 Self::send_plain_frame(&mut stream, &plain.pack(&req2).to_plaintext_bytes()).await?;
63 let dh: tl::enums::ServerDhParams = Self::recv_plain_frame(&mut stream).await?;
64
65 let (req3, s3) =
66 auth::step3(s2, dh).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
67 Self::send_plain_frame(&mut stream, &plain.pack(&req3).to_plaintext_bytes()).await?;
68 let ans: tl::enums::SetClientDhParamsAnswer = Self::recv_plain_frame(&mut stream).await?;
69
70 let done = {
72 let mut result =
73 auth::finish(s3, ans).map_err(|e| InvocationError::Deserialize(e.to_string()))?;
74 let mut attempts = 0u8;
75 loop {
76 match result {
77 auth::FinishResult::Done(d) => break d,
78 auth::FinishResult::Retry {
79 retry_id,
80 dh_params,
81 nonce,
82 server_nonce,
83 new_nonce,
84 } => {
85 attempts += 1;
86 if attempts >= 5 {
87 return Err(InvocationError::Deserialize(
88 "dh_gen_retry exceeded 5 attempts".into(),
89 ));
90 }
91 let (req_retry, s3_retry) =
92 auth::retry_step3(&dh_params, nonce, server_nonce, new_nonce, retry_id)
93 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
94 Self::send_plain_frame(
95 &mut stream,
96 &plain.pack(&req_retry).to_plaintext_bytes(),
97 )
98 .await?;
99 let ans_retry: tl::enums::SetClientDhParamsAnswer =
100 Self::recv_plain_frame(&mut stream).await?;
101 result = auth::finish(s3_retry, ans_retry)
102 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
103 }
104 }
105 }
106 };
107 tracing::debug!("[dc_pool] DH complete ✓ for {addr}");
108
109 Ok(Self {
110 stream,
111 enc: EncryptedSession::new(done.auth_key, done.first_salt, done.time_offset),
112 })
113 }
114
115 pub async fn connect_with_key(
117 addr: &str,
118 auth_key: [u8; 256],
119 first_salt: i64,
120 time_offset: i32,
121 socks5: Option<&crate::socks5::Socks5Config>,
122 transport: &TransportKind,
123 dc_id: i16,
124 ) -> Result<Self, InvocationError> {
125 let mut stream = Self::open_tcp(addr, socks5).await?;
126 Self::send_transport_init(&mut stream, transport, dc_id).await?;
127 Ok(Self {
128 stream,
129 enc: EncryptedSession::new(auth_key, first_salt, time_offset),
130 })
131 }
132
133 async fn open_tcp(
134 addr: &str,
135 socks5: Option<&crate::socks5::Socks5Config>,
136 ) -> Result<TcpStream, InvocationError> {
137 match socks5 {
138 Some(proxy) => proxy.connect(addr).await,
139 None => Ok(TcpStream::connect(addr).await?),
140 }
141 }
142
143 async fn send_transport_init(
144 stream: &mut TcpStream,
145 transport: &TransportKind,
146 dc_id: i16,
147 ) -> Result<(), InvocationError> {
148 match transport {
149 TransportKind::Abridged => {
150 stream.write_all(&[0xef]).await?;
151 }
152 TransportKind::Intermediate => {
153 stream.write_all(&[0xee, 0xee, 0xee, 0xee]).await?;
154 }
155 TransportKind::Full => {}
156 TransportKind::Obfuscated { secret } => {
157 use sha2::Digest;
158 let mut nonce = [0u8; 64];
159 loop {
160 getrandom::getrandom(&mut nonce)
161 .map_err(|_| InvocationError::Deserialize("getrandom".into()))?;
162 let first = u32::from_le_bytes(nonce[0..4].try_into().unwrap());
163 let second = u32::from_le_bytes(nonce[4..8].try_into().unwrap());
164 let bad = nonce[0] == 0xEF
165 || first == 0x44414548
166 || first == 0x54534F50
167 || first == 0x20544547
168 || first == 0xEEEEEEEE
169 || first == 0xDDDDDDDD
170 || first == 0x02010316
171 || second == 0x00000000;
172 if !bad {
173 break;
174 }
175 }
176 let tx_raw: [u8; 32] = nonce[8..40].try_into().unwrap();
177 let tx_iv: [u8; 16] = nonce[40..56].try_into().unwrap();
178 let mut rev48 = nonce[8..56].to_vec();
179 rev48.reverse();
180 let rx_raw: [u8; 32] = rev48[0..32].try_into().unwrap();
181 let rx_iv: [u8; 16] = rev48[32..48].try_into().unwrap();
182 let (tx_key, rx_key): ([u8; 32], [u8; 32]) = if let Some(s) = secret {
183 let mut h = sha2::Sha256::new();
184 h.update(tx_raw);
185 h.update(s.as_ref());
186 let tx: [u8; 32] = h.finalize().into();
187 let mut h = sha2::Sha256::new();
188 h.update(rx_raw);
189 h.update(s.as_ref());
190 let rx: [u8; 32] = h.finalize().into();
191 (tx, rx)
192 } else {
193 (tx_raw, rx_raw)
194 };
195 nonce[56] = 0xef;
196 nonce[57] = 0xef;
197 nonce[58] = 0xef;
198 nonce[59] = 0xef;
199 let dc_bytes = dc_id.to_le_bytes();
200 nonce[60] = dc_bytes[0];
201 nonce[61] = dc_bytes[1];
202 {
203 let mut enc =
204 layer_crypto::ObfuscatedCipher::from_keys(&tx_key, &tx_iv, &rx_key, &rx_iv);
205 let mut skip = [0u8; 56];
206 enc.encrypt(&mut skip);
207 enc.encrypt(&mut nonce[56..64]);
208 }
209 stream.write_all(&nonce).await?;
210 }
211 TransportKind::PaddedIntermediate { .. } | TransportKind::FakeTls { .. } => {
218 stream.write_all(&[0xef]).await?;
219 }
220 }
221 Ok(())
222 }
223
224 pub fn auth_key_bytes(&self) -> [u8; 256] {
225 self.enc.auth_key_bytes()
226 }
227 pub fn first_salt(&self) -> i64 {
228 self.enc.salt
229 }
230 pub fn time_offset(&self) -> i32 {
231 self.enc.time_offset
232 }
233
234 pub async fn rpc_call<R: RemoteCall>(&mut self, req: &R) -> Result<Vec<u8>, InvocationError> {
235 let wire = self.enc.pack(req);
236 Self::send_abridged(&mut self.stream, &wire).await?;
237 self.recv_rpc().await
238 }
239
240 async fn recv_rpc(&mut self) -> Result<Vec<u8>, InvocationError> {
241 loop {
242 let mut raw = Self::recv_abridged(&mut self.stream).await?;
243 let msg = self
244 .enc
245 .unpack(&mut raw)
246 .map_err(|e| InvocationError::Deserialize(e.to_string()))?;
247 if msg.salt != 0 {
248 self.enc.salt = msg.salt;
249 }
250 if msg.body.len() < 4 {
251 return Ok(msg.body);
252 }
253 let cid = u32::from_le_bytes(msg.body[..4].try_into().unwrap());
254 match cid {
255 0xf35c6d01 => {
256 if msg.body.len() >= 12 { return Ok(msg.body[12..].to_vec()); }
257 return Ok(msg.body);
258 }
259 0x2144ca19 => {
260 if msg.body.len() < 8 {
261 return Err(InvocationError::Deserialize("rpc_error short".into()));
262 }
263 let code = i32::from_le_bytes(msg.body[4..8].try_into().unwrap());
264 let message = tl_read_string(&msg.body[8..]).unwrap_or_default();
265 return Err(InvocationError::Rpc(crate::RpcError::from_telegram(code, &message)));
266 }
267 0x347773c5 | 0x62d6b459 | 0x9ec20908 | 0xedab447b | 0xa7eff811 => continue,
268 _ => return Ok(msg.body),
269 }
270 }
271 }
272
273 async fn send_abridged(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
274 let words = data.len() / 4;
275 if words < 0x7f {
276 stream.write_all(&[words as u8]).await?;
277 } else {
278 stream
279 .write_all(&[
280 0x7f,
281 (words & 0xff) as u8,
282 ((words >> 8) & 0xff) as u8,
283 ((words >> 16) & 0xff) as u8,
284 ])
285 .await?;
286 }
287 stream.write_all(data).await?;
288 Ok(())
289 }
290
291 async fn recv_abridged(stream: &mut TcpStream) -> Result<Vec<u8>, InvocationError> {
292 let mut h = [0u8; 1];
293 stream.read_exact(&mut h).await?;
294 let words = if h[0] < 0x7f {
295 h[0] as usize
296 } else {
297 let mut b = [0u8; 3];
298 stream.read_exact(&mut b).await?;
299 b[0] as usize | (b[1] as usize) << 8 | (b[2] as usize) << 16
300 };
301 let mut buf = vec![0u8; words * 4];
302 stream.read_exact(&mut buf).await?;
303 Ok(buf)
304 }
305
306 async fn send_plain_frame(stream: &mut TcpStream, data: &[u8]) -> Result<(), InvocationError> {
307 Self::send_abridged(stream, data).await
308 }
309
310 async fn recv_plain_frame<T: Deserializable>(
311 stream: &mut TcpStream,
312 ) -> Result<T, InvocationError> {
313 let raw = Self::recv_abridged(stream).await?;
314 if raw.len() < 20 {
315 return Err(InvocationError::Deserialize("plain frame too short".into()));
316 }
317 if u64::from_le_bytes(raw[..8].try_into().unwrap()) != 0 {
318 return Err(InvocationError::Deserialize(
319 "expected auth_key_id=0 in plaintext".into(),
320 ));
321 }
322 let body_len = u32::from_le_bytes(raw[16..20].try_into().unwrap()) as usize;
323 let mut cur = Cursor::from_slice(&raw[20..20 + body_len]);
324 T::deserialize(&mut cur).map_err(Into::into)
325 }
326}
327
328fn tl_read_bytes(data: &[u8]) -> Option<Vec<u8>> {
329 if data.is_empty() {
330 return Some(vec![]);
331 }
332 let (len, start) = if data[0] < 254 {
333 (data[0] as usize, 1)
334 } else if data.len() >= 4 {
335 (
336 data[1] as usize | (data[2] as usize) << 8 | (data[3] as usize) << 16,
337 4,
338 )
339 } else {
340 return None;
341 };
342 if data.len() < start + len {
343 return None;
344 }
345 Some(data[start..start + len].to_vec())
346}
347
348fn tl_read_string(data: &[u8]) -> Option<String> {
349 tl_read_bytes(data).map(|b| String::from_utf8_lossy(&b).into_owned())
350}
351
352pub struct DcPool {
356 conns: HashMap<i32, DcConnection>,
357 addrs: HashMap<i32, String>,
358 #[allow(dead_code)]
359 home_dc_id: i32,
360}
361
362impl DcPool {
363 pub fn new(home_dc_id: i32, dc_entries: &[DcEntry]) -> Self {
364 let addrs = dc_entries
365 .iter()
366 .map(|e| (e.dc_id, e.addr.clone()))
367 .collect();
368 Self {
369 conns: HashMap::new(),
370 addrs,
371 home_dc_id,
372 }
373 }
374
375 pub fn has_connection(&self, dc_id: i32) -> bool {
377 self.conns.contains_key(&dc_id)
378 }
379
380 pub fn insert(&mut self, dc_id: i32, conn: DcConnection) {
382 self.conns.insert(dc_id, conn);
383 }
384
385 pub async fn invoke_on_dc<R: RemoteCall>(
387 &mut self,
388 dc_id: i32,
389 _dc_entries: &[DcEntry],
390 req: &R,
391 ) -> Result<Vec<u8>, InvocationError> {
392 let conn = self
393 .conns
394 .get_mut(&dc_id)
395 .ok_or_else(|| InvocationError::Deserialize(format!("no connection for DC{dc_id}")))?;
396 conn.rpc_call(req).await
397 }
398
399 pub fn update_addrs(&mut self, entries: &[DcEntry]) {
401 for e in entries {
402 self.addrs.insert(e.dc_id, e.addr.clone());
403 }
404 }
405
406 pub fn collect_keys(&self, entries: &mut [DcEntry]) {
408 for e in entries.iter_mut() {
409 if let Some(conn) = self.conns.get(&e.dc_id) {
410 e.auth_key = Some(conn.auth_key_bytes());
411 e.first_salt = conn.first_salt();
412 e.time_offset = conn.time_offset();
413 }
414 }
415 }
416}