rust_async_tuyapi/
tuyadevice.rs

1//! # TuyaDevice
2//! The TuyaDevice represents a communication channel with a Tuya compatible device. It
3//! encapsulates the device key, version and ip address. By supplying a Payload to either set() or
4//! get() functions the framework takes care of sending and receiving the reply from the device.
5//!
6//! The TuyaDevice is the high level device communication API. To get in to the nitty gritty
7//! details, create a MessageParser.
8use crate::error::ErrorKind;
9use crate::mesparse::{CommandType, Message, MessageParser, TuyaVersion};
10use crate::{ControlNewPayload, ControlNewPayloadData, Payload, PayloadStruct, Result};
11use aes::cipher::generic_array::GenericArray;
12use aes::cipher::{BlockEncrypt, KeyInit};
13use aes::Aes128;
14use log::{debug, info};
15use rand::Rng;
16use std::net::{IpAddr, SocketAddr};
17use std::time::{Duration, SystemTime};
18use tokio::io::{AsyncReadExt, AsyncWriteExt};
19use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
20use tokio::net::TcpStream;
21use tokio::sync::mpsc::{channel, Receiver};
22use tokio::time::sleep;
23
24#[derive(Default)]
25pub struct SeqId {
26    seq_id: u32,
27}
28
29impl SeqId {
30    pub fn current(&self) -> u32 {
31        self.seq_id
32    }
33
34    pub fn next_id(&mut self) -> u32 {
35        self.seq_id += 1;
36        self.seq_id
37    }
38}
39
40type RecvChannel = Receiver<Result<Vec<Message>>>;
41
42pub struct TuyaConnection {
43    seq_id: SeqId,
44    tcp_write_half: OwnedWriteHalf,
45    mp: MessageParser,
46}
47
48impl TuyaConnection {
49    async fn send(&mut self, mes: &Message) -> Result<()> {
50        info!(
51            "Writing message to {} ({}):\n",
52            self.tcp_write_half.peer_addr()?,
53            &mes
54        );
55        let mut mes = (*mes).clone();
56        if matches!(mes.seq_nr, None) {
57            mes.seq_nr = Some(self.seq_id.next_id());
58        }
59        self.tcp_write_half
60            .write_all(self.mp.encode(&mes, true)?.as_ref())
61            .await?;
62        // info!("Wrote {} bytes", bts);
63
64        // self.read().await
65        Ok(())
66    }
67}
68
69async fn tcp_read(tcp_read_half: &mut OwnedReadHalf, mp: &MessageParser) -> Result<Vec<Message>> {
70    let mut buf = [0; 4096];
71    let mut bts = 0;
72    let mut attempts = 0;
73
74    while bts == 0 && attempts < 3 {
75        bts = tcp_read_half.read(&mut buf).await?;
76        info!("Received {} bytes", bts);
77        attempts += 1;
78        sleep(Duration::from_millis(100)).await;
79    }
80
81    if bts == 0 {
82        return Err(ErrorKind::TcpStreamClosed);
83    } else {
84        debug!("Received response:\n{}", hex::encode(&buf[..bts]));
85    }
86    mp.parse(&buf[..bts])
87}
88pub struct TuyaDevice {
89    addr: SocketAddr,
90    device_id: String,
91    key: Option<String>,
92    version: TuyaVersion,
93    connection: Option<TuyaConnection>,
94}
95
96impl TuyaDevice {
97    pub fn new(ver: &str, device_id: &str, key: Option<&str>, addr: IpAddr) -> Result<TuyaDevice> {
98        let version = ver.parse()?;
99        Ok(TuyaDevice {
100            device_id: device_id.to_string(),
101            addr: SocketAddr::new(addr, 6668),
102            key: key.map(|k| k.to_string()),
103            version,
104            connection: Default::default(),
105        })
106    }
107
108    pub async fn connect(&mut self) -> Result<RecvChannel> {
109        let tcp_stream = TcpStream::connect(&self.addr).await?;
110        tcp_stream.set_nodelay(true)?;
111
112        let (mut tcp_read_half, tcp_write_half) = tcp_stream.into_split();
113        let (tx, rx) = channel(10);
114
115        let mp = MessageParser::create(self.version.clone(), self.key.clone())?;
116        let mut connection = TuyaConnection {
117            mp,
118            seq_id: Default::default(),
119            tcp_write_half,
120        };
121
122        // Tuya protocol v3.4 requires session key negotiation
123        if self.version == TuyaVersion::ThreeFour {
124            // Generate random 16-byte nonce for session key negotiation
125            let local_nonce: [u8; 16] = rand::rng().random();
126            let local_key = self.key.clone().ok_or(ErrorKind::MissingKey)?;
127
128            let start_negotiation_msg = Message {
129                payload: Payload::Raw(local_nonce.to_vec()),
130                command: Some(CommandType::SessKeyNegStart),
131                seq_nr: Some(connection.seq_id.next_id()),
132                ret_code: None,
133            };
134
135            info!(
136                "Writing SessKeyNegStart msg to {} ({}):\n{}",
137                self.addr,
138                connection.seq_id.current(),
139                &start_negotiation_msg
140            );
141            connection
142                .tcp_write_half
143                .write_all(connection.mp.encode(&start_negotiation_msg, true)?.as_ref())
144                .await?;
145
146            let rkey = tcp_read(&mut tcp_read_half, &connection.mp).await?;
147            let rkey = rkey.into_iter().next().ok_or(ErrorKind::MissingRemoteKey)?;
148            let rkey = match rkey.payload {
149                Payload::Raw(s) if s.len() == 48 => Ok(s),
150                _ => Err(ErrorKind::InvalidRemoteKey),
151            }?;
152
153            let remote_nonce = &rkey[..16];
154            let remote_hmac = &rkey[16..48];
155
156            // Verify device's HMAC to ensure it knows the local key
157            let expected_hmac = connection.mp.cipher.hmac(&local_nonce)?;
158            if remote_hmac != expected_hmac.as_slice() {
159                debug!(
160                    "HMAC mismatch during session negotiation: expected {}, got {}",
161                    hex::encode(&expected_hmac),
162                    hex::encode(remote_hmac)
163                );
164                // Note: Some devices may not send correct HMAC, so we log but don't fail
165            }
166
167            // Compute session key BEFORE sending FINISH so we can check for 0x00 bug
168            // and abort before sending SessKeyNegFinish
169            let nonce_xor: Vec<u8> = local_nonce
170                .iter()
171                .zip(remote_nonce.iter())
172                .map(|(&a, &b)| a ^ b)
173                .collect();
174
175            debug!("nonce_xor: {}", hex::encode(&nonce_xor));
176            debug!("using local_key for crypter: {}", hex::encode(&local_key));
177
178            let local_key_arr = GenericArray::from_slice(local_key.as_bytes());
179            let cipher = Aes128::new(local_key_arr);
180
181            let mut nonce_xor = nonce_xor;
182            let block = GenericArray::from_mut_slice(nonce_xor.as_mut_slice());
183            cipher.encrypt_block(block);
184
185            debug!("session key: {}", hex::encode(&block));
186
187            // Known v3.4 bug: if first byte of session key is 0x00, device considers it invalid
188            // This causes "Error 914: Check device key or version" and connection failures
189            // https://github.com/jasonacox/tinytuya/discussions/260#:~:text=Bug%2Fquirk%3A%20If%20the%20first%20byte%20of%20the%20resulting%20session%20key%20is%200x00%20then%20the%20device%20will%20not%20consider%20it%20valid%20and%20you%20will%20need%20to%20restart%20the%20negotiation%20over
190            if block[0] == 0x00 {
191                return Err(ErrorKind::InvalidSessionKey);
192            }
193
194            // Session key is valid, now send SessKeyNegFinish to complete the handshake
195            let rkey_hmac = connection.mp.cipher.hmac(remote_nonce)?;
196
197            let session_negotiation_finish_msg = Message {
198                payload: Payload::Raw(rkey_hmac),
199                command: Some(CommandType::SessKeyNegFinish),
200                seq_nr: Some(connection.seq_id.next_id()),
201                ret_code: None,
202            };
203
204            info!(
205                "Writing SessKeyNegFinish msg to {} ({}):\n{}",
206                self.addr,
207                connection.seq_id.current(),
208                &session_negotiation_finish_msg
209            );
210            connection
211                .tcp_write_half
212                .write_all(
213                    connection
214                        .mp
215                        .encode(&session_negotiation_finish_msg, true)?
216                        .as_ref(),
217                )
218                .await?;
219
220            connection.mp.cipher.set_key(block.to_vec())
221        }
222
223        let mp = connection.mp.clone();
224        self.connection = Some(connection);
225
226        tokio::spawn(async move {
227            loop {
228                let mut buf = [0; 4096];
229                let result = tcp_read_half.read(&mut buf).await;
230
231                let result = match result {
232                    Ok(0) => Err(ErrorKind::TcpStreamClosed),
233                    Ok(bytes) => {
234                        info!("Received {} bytes", bytes);
235                        mp.parse(&buf[..bytes])
236                    }
237                    Err(e) => Err(ErrorKind::TcpError(e)),
238                };
239
240                let send_result = match result {
241                    Ok(messages) => tx.send(Ok(messages)).await,
242                    Err(e) => {
243                        info!("TCP Error: {:?}", e);
244                        tx.send(Err(e)).await.ok();
245                        break;
246                    }
247                };
248
249                if let Err(e) = send_result {
250                    info!("Receiver was dropped, disconnecting: {:?}", e);
251                    break;
252                }
253            }
254        });
255
256        Ok(rx)
257    }
258
259    pub async fn set(&mut self, tuya_payload: Payload) -> Result<()> {
260        let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
261        let command = match self.version {
262            TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => CommandType::Control,
263            TuyaVersion::ThreeFour => CommandType::ControlNew,
264        };
265        let mes = Message::new(tuya_payload, command);
266        connection.send(&mes).await?;
267
268        Ok(())
269    }
270
271    pub async fn set_values(&mut self, dps: serde_json::Value) -> Result<()> {
272        let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
273        let command = match self.version {
274            TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => CommandType::Control,
275            TuyaVersion::ThreeFour => CommandType::ControlNew,
276        };
277
278        let current_time = SystemTime::now()
279            .duration_since(SystemTime::UNIX_EPOCH)?
280            .as_secs() as u32;
281        // let current_time = 1;
282
283        let device_id = self.device_id.clone();
284
285        let payload = match self.version {
286            TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => Payload::Struct(PayloadStruct {
287                gw_id: Some(device_id.clone()),
288                dev_id: device_id.clone(),
289                uid: Some(device_id.clone()),
290                t: Some(current_time.to_string()),
291                dp_id: None,
292                dps: Some(dps),
293            }),
294            TuyaVersion::ThreeFour => Payload::ControlNewStruct(ControlNewPayload {
295                protocol: 5,
296                t: current_time,
297                data: ControlNewPayloadData { dps },
298            }),
299        };
300        let mes = Message::new(payload, command);
301        connection.send(&mes).await?;
302
303        Ok(())
304    }
305
306    pub async fn get(&mut self, tuya_payload: Payload) -> Result<()> {
307        let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
308        let command = match self.version {
309            TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => CommandType::DpQuery,
310            TuyaVersion::ThreeFour => CommandType::DpQueryNew,
311        };
312        let mes = Message::new(tuya_payload, command);
313        connection.send(&mes).await?;
314
315        Ok(())
316    }
317
318    pub async fn refresh(&mut self, tuya_payload: Payload) -> Result<()> {
319        let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
320        let mes = Message::new(tuya_payload, CommandType::DpRefresh);
321        connection.send(&mes).await?;
322
323        Ok(())
324    }
325
326    pub async fn send_msg(&mut self, msg: Message) -> Result<()> {
327        let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
328        connection.send(&msg).await?;
329
330        Ok(())
331    }
332
333    /// Send a heartbeat to keep the connection alive.
334    /// This is especially important for v3.4 devices which may close
335    /// connections that appear idle.
336    pub async fn heartbeat(&mut self) -> Result<()> {
337        let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
338        let mes = Message::new(Payload::Raw(vec![]), CommandType::HeartBeat);
339        connection.send(&mes).await?;
340        Ok(())
341    }
342
343    /// Gracefully disconnect from the device.
344    /// This shuts down the TCP write half, signaling to the device that
345    /// we're closing the connection.
346    pub async fn disconnect(&mut self) -> Result<()> {
347        if let Some(mut connection) = self.connection.take() {
348            // Shutdown the write half to signal connection close
349            connection.tcp_write_half.shutdown().await?;
350            info!("Disconnected from {}", self.addr);
351        }
352        Ok(())
353    }
354}