rust_async_tuyapi/
tuyadevice.rs

1//! # TuyaDevice
2//! The TuyaDevice represents a communication channel with a Tuya compatible device. It
3//! encapsulates the device key, version and ip address. By supplying a Payload to either set() or
4//! get() functions the framework takes care of sending and receiving the reply from the device.
5//!
6//! The TuyaDevice is the high level device communication API. To get in to the nitty gritty
7//! details, create a MessageParser.
8use crate::error::ErrorKind;
9use crate::mesparse::{CommandType, Message, MessageParser, TuyaVersion};
10use crate::{ControlNewPayload, ControlNewPayloadData, Payload, PayloadStruct, Result};
11use aes::cipher::generic_array::GenericArray;
12use aes::cipher::{BlockEncrypt, KeyInit};
13use aes::Aes128;
14use log::{debug, info};
15use std::net::{IpAddr, SocketAddr};
16use std::time::{Duration, SystemTime};
17use tokio::io::{AsyncReadExt, AsyncWriteExt};
18use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
19use tokio::net::TcpStream;
20use tokio::sync::mpsc::{channel, Receiver};
21use tokio::time::sleep;
22
23#[derive(Default)]
24pub struct SeqId {
25    seq_id: u32,
26}
27
28impl SeqId {
29    pub fn current(&self) -> u32 {
30        self.seq_id
31    }
32
33    pub fn next_id(&mut self) -> u32 {
34        self.seq_id += 1;
35        self.seq_id
36    }
37}
38
39type RecvChannel = Receiver<Result<Vec<Message>>>;
40
41pub struct TuyaConnection {
42    seq_id: SeqId,
43    tcp_write_half: OwnedWriteHalf,
44    mp: MessageParser,
45}
46
47impl TuyaConnection {
48    async fn send(&mut self, mes: &Message) -> Result<()> {
49        info!(
50            "Writing message to {} ({}):\n",
51            self.tcp_write_half.peer_addr()?,
52            &mes
53        );
54        let mut mes = (*mes).clone();
55        if matches!(mes.seq_nr, None) {
56            mes.seq_nr = Some(self.seq_id.next_id());
57        }
58        self.tcp_write_half
59            .write_all(self.mp.encode(&mes, true)?.as_ref())
60            .await?;
61        // info!("Wrote {} bytes", bts);
62
63        // self.read().await
64        Ok(())
65    }
66}
67
68async fn tcp_read(tcp_read_half: &mut OwnedReadHalf, mp: &MessageParser) -> Result<Vec<Message>> {
69    let mut buf = [0; 1024];
70    let mut bts = 0;
71    let mut attempts = 0;
72
73    while bts == 0 && attempts < 3 {
74        bts = tcp_read_half.read(&mut buf).await?;
75        info!("Received {} bytes", bts);
76        attempts += 1;
77        sleep(Duration::from_millis(100)).await;
78    }
79
80    if bts == 0 {
81        return Err(ErrorKind::TcpStreamClosed);
82    } else {
83        debug!("Received response:\n{}", hex::encode(&buf[..bts]));
84    }
85    mp.parse(&buf[..bts])
86}
87pub struct TuyaDevice {
88    addr: SocketAddr,
89    device_id: String,
90    key: Option<String>,
91    version: TuyaVersion,
92    connection: Option<TuyaConnection>,
93}
94
95impl TuyaDevice {
96    pub fn new(ver: &str, device_id: &str, key: Option<&str>, addr: IpAddr) -> Result<TuyaDevice> {
97        let version = ver.parse()?;
98        Ok(TuyaDevice {
99            device_id: device_id.to_string(),
100            addr: SocketAddr::new(addr, 6668),
101            key: key.map(|k| k.to_string()),
102            version,
103            connection: Default::default(),
104        })
105    }
106
107    pub async fn connect(&mut self) -> Result<RecvChannel> {
108        let tcp_stream = TcpStream::connect(&self.addr).await?;
109        tcp_stream.set_nodelay(true)?;
110
111        let (mut tcp_read_half, tcp_write_half) = tcp_stream.into_split();
112        let (tx, rx) = channel(10);
113
114        let mp = MessageParser::create(self.version.clone(), self.key.clone())?;
115        let mut connection = TuyaConnection {
116            mp,
117            seq_id: Default::default(),
118            tcp_write_half,
119        };
120
121        // Tuya protocol v3.4 requires session key negotiation
122        if self.version == TuyaVersion::ThreeFour {
123            let local_nonce = b"0123456789abcdef";
124            let local_key = self.key.clone().ok_or(ErrorKind::MissingKey)?;
125
126            let start_negotiation_msg = Message {
127                payload: Payload::Raw(local_nonce.to_vec()),
128                command: Some(CommandType::SessKeyNegStart),
129                seq_nr: Some(connection.seq_id.next_id()),
130                ret_code: None,
131            };
132
133            info!(
134                "Writing SessKeyNegStart msg to {} ({}):\n{}",
135                self.addr,
136                connection.seq_id.current(),
137                &start_negotiation_msg
138            );
139            connection
140                .tcp_write_half
141                .write_all(connection.mp.encode(&start_negotiation_msg, true)?.as_ref())
142                .await?;
143
144            let rkey = tcp_read(&mut tcp_read_half, &connection.mp).await?;
145            let rkey = rkey.into_iter().next().ok_or(ErrorKind::MissingRemoteKey)?;
146            let rkey = match rkey.payload {
147                Payload::Raw(s) if s.len() == 48 => Ok(s),
148                _ => Err(ErrorKind::InvalidRemoteKey),
149            }?;
150
151            let remote_nonce = &rkey[..16];
152            // let remote_nonce = b"1123456789abcdef";
153            let _hmac = &rkey[16..48];
154
155            let rkey_hmac = connection.mp.cipher.hmac(remote_nonce)?;
156
157            let session_negotiation_finish_msg = Message {
158                payload: Payload::Raw(rkey_hmac),
159                command: Some(CommandType::SessKeyNegFinish),
160                seq_nr: Some(connection.seq_id.next_id()),
161                ret_code: None,
162            };
163
164            info!(
165                "Writing SessKeyNegFinish msg to {} ({}):\n{}",
166                self.addr,
167                connection.seq_id.current(),
168                &session_negotiation_finish_msg
169            );
170            connection
171                .tcp_write_half
172                .write_all(
173                    connection
174                        .mp
175                        .encode(&session_negotiation_finish_msg, true)?
176                        .as_ref(),
177                )
178                .await?;
179
180            let nonce_xor: Vec<u8> = local_nonce
181                .iter()
182                .zip(remote_nonce.iter())
183                .map(|(&a, &b)| a ^ b)
184                .collect();
185
186            debug!("nonce_xor: {}", hex::encode(&nonce_xor));
187
188            debug!("using local_key for crypter: {}", hex::encode(&local_key));
189
190            let local_key = GenericArray::from_slice(local_key.as_bytes());
191            let cipher = Aes128::new(local_key);
192
193            let mut nonce_xor = nonce_xor;
194            let block = GenericArray::from_mut_slice(nonce_xor.as_mut_slice());
195            cipher.encrypt_block(block);
196
197            debug!("session key: {}", hex::encode(&block));
198
199            connection.mp.cipher.set_key(block.to_vec())
200        }
201
202        let mp = connection.mp.clone();
203        self.connection = Some(connection);
204
205        tokio::spawn(async move {
206            loop {
207                let mut buf = [0; 1024];
208                let result = tcp_read_half.read(&mut buf).await;
209
210                let result = match result {
211                    Ok(0) => Err(ErrorKind::TcpStreamClosed),
212                    Ok(bytes) => {
213                        info!("Received {} bytes", bytes);
214                        mp.parse(&buf[..bytes])
215                    }
216                    Err(e) => Err(ErrorKind::TcpError(e)),
217                };
218
219                let send_result = match result {
220                    Ok(messages) => tx.send(Ok(messages)).await,
221                    Err(e) => {
222                        info!("TCP Error: {:?}", e);
223                        tx.send(Err(e)).await.ok();
224                        break;
225                    }
226                };
227
228                if let Err(e) = send_result {
229                    info!("Receiver was dropped, disconnecting: {:?}", e);
230                    break;
231                }
232            }
233        });
234
235        Ok(rx)
236    }
237
238    pub async fn set(&mut self, tuya_payload: Payload) -> Result<()> {
239        let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
240        let command = match self.version {
241            TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => CommandType::Control,
242            TuyaVersion::ThreeFour => CommandType::ControlNew,
243        };
244        let mes = Message::new(tuya_payload, command);
245        connection.send(&mes).await?;
246
247        Ok(())
248    }
249
250    pub async fn set_values(&mut self, dps: serde_json::Value) -> Result<()> {
251        let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
252        let command = match self.version {
253            TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => CommandType::Control,
254            TuyaVersion::ThreeFour => CommandType::ControlNew,
255        };
256
257        let current_time = SystemTime::now()
258            .duration_since(SystemTime::UNIX_EPOCH)?
259            .as_secs() as u32;
260        // let current_time = 1;
261
262        let device_id = self.device_id.clone();
263
264        let payload = match self.version {
265            TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => Payload::Struct(PayloadStruct {
266                gw_id: Some(device_id.clone()),
267                dev_id: device_id.clone(),
268                uid: Some(device_id.clone()),
269                t: Some(current_time.to_string()),
270                dp_id: None,
271                dps: Some(dps),
272            }),
273            TuyaVersion::ThreeFour => Payload::ControlNewStruct(ControlNewPayload {
274                protocol: 5,
275                t: current_time,
276                data: ControlNewPayloadData { dps },
277            }),
278        };
279        let mes = Message::new(payload, command);
280        connection.send(&mes).await?;
281
282        Ok(())
283    }
284
285    pub async fn get(&mut self, tuya_payload: Payload) -> Result<()> {
286        let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
287        let command = match self.version {
288            TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => CommandType::DpQuery,
289            TuyaVersion::ThreeFour => CommandType::DpQueryNew,
290        };
291        let mes = Message::new(tuya_payload, command);
292        connection.send(&mes).await?;
293
294        Ok(())
295    }
296
297    pub async fn refresh(&mut self, tuya_payload: Payload) -> Result<()> {
298        let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
299        let mes = Message::new(tuya_payload, CommandType::DpRefresh);
300        connection.send(&mes).await?;
301
302        Ok(())
303    }
304
305    pub async fn send_msg(&mut self, msg: Message) -> Result<()> {
306        let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
307        connection.send(&msg).await?;
308
309        Ok(())
310    }
311}