1use crate::error::ErrorKind;
9use crate::mesparse::{CommandType, Message, MessageParser, TuyaVersion};
10use crate::{ControlNewPayload, ControlNewPayloadData, Payload, PayloadStruct, Result};
11use aes::cipher::generic_array::GenericArray;
12use aes::cipher::{BlockEncrypt, KeyInit};
13use aes::Aes128;
14use log::{debug, info};
15use rand::Rng;
16use std::net::{IpAddr, SocketAddr};
17use std::time::{Duration, SystemTime};
18use tokio::io::{AsyncReadExt, AsyncWriteExt};
19use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
20use tokio::net::TcpStream;
21use tokio::sync::mpsc::{channel, Receiver};
22use tokio::time::sleep;
23
24#[derive(Default)]
25pub struct SeqId {
26 seq_id: u32,
27}
28
29impl SeqId {
30 pub fn current(&self) -> u32 {
31 self.seq_id
32 }
33
34 pub fn next_id(&mut self) -> u32 {
35 self.seq_id += 1;
36 self.seq_id
37 }
38}
39
40type RecvChannel = Receiver<Result<Vec<Message>>>;
41
42pub struct TuyaConnection {
43 seq_id: SeqId,
44 tcp_write_half: OwnedWriteHalf,
45 mp: MessageParser,
46}
47
48impl TuyaConnection {
49 async fn send(&mut self, mes: &Message) -> Result<()> {
50 info!(
51 "Writing message to {} ({}):\n",
52 self.tcp_write_half.peer_addr()?,
53 &mes
54 );
55 let mut mes = (*mes).clone();
56 if matches!(mes.seq_nr, None) {
57 mes.seq_nr = Some(self.seq_id.next_id());
58 }
59 self.tcp_write_half
60 .write_all(self.mp.encode(&mes, true)?.as_ref())
61 .await?;
62 Ok(())
66 }
67}
68
69async fn tcp_read(tcp_read_half: &mut OwnedReadHalf, mp: &MessageParser) -> Result<Vec<Message>> {
70 let mut buf = [0; 4096];
71 let mut bts = 0;
72 let mut attempts = 0;
73
74 while bts == 0 && attempts < 3 {
75 bts = tcp_read_half.read(&mut buf).await?;
76 info!("Received {} bytes", bts);
77 attempts += 1;
78 sleep(Duration::from_millis(100)).await;
79 }
80
81 if bts == 0 {
82 return Err(ErrorKind::TcpStreamClosed);
83 } else {
84 debug!("Received response:\n{}", hex::encode(&buf[..bts]));
85 }
86 mp.parse(&buf[..bts])
87}
88pub struct TuyaDevice {
89 addr: SocketAddr,
90 device_id: String,
91 key: Option<String>,
92 version: TuyaVersion,
93 connection: Option<TuyaConnection>,
94}
95
96impl TuyaDevice {
97 pub fn new(ver: &str, device_id: &str, key: Option<&str>, addr: IpAddr) -> Result<TuyaDevice> {
98 let version = ver.parse()?;
99 Ok(TuyaDevice {
100 device_id: device_id.to_string(),
101 addr: SocketAddr::new(addr, 6668),
102 key: key.map(|k| k.to_string()),
103 version,
104 connection: Default::default(),
105 })
106 }
107
108 pub async fn connect(&mut self) -> Result<RecvChannel> {
109 let tcp_stream = TcpStream::connect(&self.addr).await?;
110 tcp_stream.set_nodelay(true)?;
111
112 let (mut tcp_read_half, tcp_write_half) = tcp_stream.into_split();
113 let (tx, rx) = channel(10);
114
115 let mp = MessageParser::create(self.version.clone(), self.key.clone())?;
116 let mut connection = TuyaConnection {
117 mp,
118 seq_id: Default::default(),
119 tcp_write_half,
120 };
121
122 if self.version == TuyaVersion::ThreeFour {
124 let local_nonce: [u8; 16] = rand::rng().random();
126 let local_key = self.key.clone().ok_or(ErrorKind::MissingKey)?;
127
128 let start_negotiation_msg = Message {
129 payload: Payload::Raw(local_nonce.to_vec()),
130 command: Some(CommandType::SessKeyNegStart),
131 seq_nr: Some(connection.seq_id.next_id()),
132 ret_code: None,
133 };
134
135 info!(
136 "Writing SessKeyNegStart msg to {} ({}):\n{}",
137 self.addr,
138 connection.seq_id.current(),
139 &start_negotiation_msg
140 );
141 connection
142 .tcp_write_half
143 .write_all(connection.mp.encode(&start_negotiation_msg, true)?.as_ref())
144 .await?;
145
146 let rkey = tcp_read(&mut tcp_read_half, &connection.mp).await?;
147 let rkey = rkey.into_iter().next().ok_or(ErrorKind::MissingRemoteKey)?;
148 let rkey = match rkey.payload {
149 Payload::Raw(s) if s.len() == 48 => Ok(s),
150 _ => Err(ErrorKind::InvalidRemoteKey),
151 }?;
152
153 let remote_nonce = &rkey[..16];
154 let remote_hmac = &rkey[16..48];
155
156 let expected_hmac = connection.mp.cipher.hmac(&local_nonce)?;
158 if remote_hmac != expected_hmac.as_slice() {
159 debug!(
160 "HMAC mismatch during session negotiation: expected {}, got {}",
161 hex::encode(&expected_hmac),
162 hex::encode(remote_hmac)
163 );
164 }
166
167 let nonce_xor: Vec<u8> = local_nonce
170 .iter()
171 .zip(remote_nonce.iter())
172 .map(|(&a, &b)| a ^ b)
173 .collect();
174
175 debug!("nonce_xor: {}", hex::encode(&nonce_xor));
176 debug!("using local_key for crypter: {}", hex::encode(&local_key));
177
178 let local_key_arr = GenericArray::from_slice(local_key.as_bytes());
179 let cipher = Aes128::new(local_key_arr);
180
181 let mut nonce_xor = nonce_xor;
182 let block = GenericArray::from_mut_slice(nonce_xor.as_mut_slice());
183 cipher.encrypt_block(block);
184
185 debug!("session key: {}", hex::encode(&block));
186
187 if block[0] == 0x00 {
191 return Err(ErrorKind::InvalidSessionKey);
192 }
193
194 let rkey_hmac = connection.mp.cipher.hmac(remote_nonce)?;
196
197 let session_negotiation_finish_msg = Message {
198 payload: Payload::Raw(rkey_hmac),
199 command: Some(CommandType::SessKeyNegFinish),
200 seq_nr: Some(connection.seq_id.next_id()),
201 ret_code: None,
202 };
203
204 info!(
205 "Writing SessKeyNegFinish msg to {} ({}):\n{}",
206 self.addr,
207 connection.seq_id.current(),
208 &session_negotiation_finish_msg
209 );
210 connection
211 .tcp_write_half
212 .write_all(
213 connection
214 .mp
215 .encode(&session_negotiation_finish_msg, true)?
216 .as_ref(),
217 )
218 .await?;
219
220 connection.mp.cipher.set_key(block.to_vec())
221 }
222
223 let mp = connection.mp.clone();
224 self.connection = Some(connection);
225
226 tokio::spawn(async move {
227 loop {
228 let mut buf = [0; 4096];
229 let result = tcp_read_half.read(&mut buf).await;
230
231 let result = match result {
232 Ok(0) => Err(ErrorKind::TcpStreamClosed),
233 Ok(bytes) => {
234 info!("Received {} bytes", bytes);
235 mp.parse(&buf[..bytes])
236 }
237 Err(e) => Err(ErrorKind::TcpError(e)),
238 };
239
240 let send_result = match result {
241 Ok(messages) => tx.send(Ok(messages)).await,
242 Err(e) => {
243 info!("TCP Error: {:?}", e);
244 tx.send(Err(e)).await.ok();
245 break;
246 }
247 };
248
249 if let Err(e) = send_result {
250 info!("Receiver was dropped, disconnecting: {:?}", e);
251 break;
252 }
253 }
254 });
255
256 Ok(rx)
257 }
258
259 pub async fn set(&mut self, tuya_payload: Payload) -> Result<()> {
260 let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
261 let command = match self.version {
262 TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => CommandType::Control,
263 TuyaVersion::ThreeFour => CommandType::ControlNew,
264 };
265 let mes = Message::new(tuya_payload, command);
266 connection.send(&mes).await?;
267
268 Ok(())
269 }
270
271 pub async fn set_values(&mut self, dps: serde_json::Value) -> Result<()> {
272 let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
273 let command = match self.version {
274 TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => CommandType::Control,
275 TuyaVersion::ThreeFour => CommandType::ControlNew,
276 };
277
278 let current_time = SystemTime::now()
279 .duration_since(SystemTime::UNIX_EPOCH)?
280 .as_secs() as u32;
281 let device_id = self.device_id.clone();
284
285 let payload = match self.version {
286 TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => Payload::Struct(PayloadStruct {
287 gw_id: Some(device_id.clone()),
288 dev_id: device_id.clone(),
289 uid: Some(device_id.clone()),
290 t: Some(current_time.to_string()),
291 dp_id: None,
292 dps: Some(dps),
293 }),
294 TuyaVersion::ThreeFour => Payload::ControlNewStruct(ControlNewPayload {
295 protocol: 5,
296 t: current_time,
297 data: ControlNewPayloadData { dps },
298 }),
299 };
300 let mes = Message::new(payload, command);
301 connection.send(&mes).await?;
302
303 Ok(())
304 }
305
306 pub async fn get(&mut self, tuya_payload: Payload) -> Result<()> {
307 let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
308 let command = match self.version {
309 TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => CommandType::DpQuery,
310 TuyaVersion::ThreeFour => CommandType::DpQueryNew,
311 };
312 let mes = Message::new(tuya_payload, command);
313 connection.send(&mes).await?;
314
315 Ok(())
316 }
317
318 pub async fn refresh(&mut self, tuya_payload: Payload) -> Result<()> {
319 let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
320 let mes = Message::new(tuya_payload, CommandType::DpRefresh);
321 connection.send(&mes).await?;
322
323 Ok(())
324 }
325
326 pub async fn send_msg(&mut self, msg: Message) -> Result<()> {
327 let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
328 connection.send(&msg).await?;
329
330 Ok(())
331 }
332
333 pub async fn heartbeat(&mut self) -> Result<()> {
337 let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
338 let mes = Message::new(Payload::Raw(vec![]), CommandType::HeartBeat);
339 connection.send(&mes).await?;
340 Ok(())
341 }
342
343 pub async fn disconnect(&mut self) -> Result<()> {
347 if let Some(mut connection) = self.connection.take() {
348 connection.tcp_write_half.shutdown().await?;
350 info!("Disconnected from {}", self.addr);
351 }
352 Ok(())
353 }
354}