1use crate::error::ErrorKind;
9use crate::mesparse::{CommandType, Message, MessageParser, TuyaVersion};
10use crate::{ControlNewPayload, ControlNewPayloadData, Payload, PayloadStruct, Result};
11use aes::cipher::generic_array::GenericArray;
12use aes::cipher::{BlockEncrypt, KeyInit};
13use aes::Aes128;
14use log::{debug, info};
15use std::net::{IpAddr, SocketAddr};
16use std::time::{Duration, SystemTime};
17use tokio::io::{AsyncReadExt, AsyncWriteExt};
18use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
19use tokio::net::TcpStream;
20use tokio::sync::mpsc::{channel, Receiver};
21use tokio::time::sleep;
22
23#[derive(Default)]
24pub struct SeqId {
25 seq_id: u32,
26}
27
28impl SeqId {
29 pub fn current(&self) -> u32 {
30 self.seq_id
31 }
32
33 pub fn next_id(&mut self) -> u32 {
34 self.seq_id += 1;
35 self.seq_id
36 }
37}
38
39type RecvChannel = Receiver<Result<Vec<Message>>>;
40
41pub struct TuyaConnection {
42 seq_id: SeqId,
43 tcp_write_half: OwnedWriteHalf,
44 mp: MessageParser,
45}
46
47impl TuyaConnection {
48 async fn send(&mut self, mes: &Message) -> Result<()> {
49 info!(
50 "Writing message to {} ({}):\n",
51 self.tcp_write_half.peer_addr()?,
52 &mes
53 );
54 let mut mes = (*mes).clone();
55 if matches!(mes.seq_nr, None) {
56 mes.seq_nr = Some(self.seq_id.next_id());
57 }
58 self.tcp_write_half
59 .write_all(self.mp.encode(&mes, true)?.as_ref())
60 .await?;
61 Ok(())
65 }
66}
67
68async fn tcp_read(tcp_read_half: &mut OwnedReadHalf, mp: &MessageParser) -> Result<Vec<Message>> {
69 let mut buf = [0; 1024];
70 let mut bts = 0;
71 let mut attempts = 0;
72
73 while bts == 0 && attempts < 3 {
74 bts = tcp_read_half.read(&mut buf).await?;
75 info!("Received {} bytes", bts);
76 attempts += 1;
77 sleep(Duration::from_millis(100)).await;
78 }
79
80 if bts == 0 {
81 return Err(ErrorKind::TcpStreamClosed);
82 } else {
83 debug!("Received response:\n{}", hex::encode(&buf[..bts]));
84 }
85 mp.parse(&buf[..bts])
86}
87pub struct TuyaDevice {
88 addr: SocketAddr,
89 device_id: String,
90 key: Option<String>,
91 version: TuyaVersion,
92 connection: Option<TuyaConnection>,
93}
94
95impl TuyaDevice {
96 pub fn new(ver: &str, device_id: &str, key: Option<&str>, addr: IpAddr) -> Result<TuyaDevice> {
97 let version = ver.parse()?;
98 Ok(TuyaDevice {
99 device_id: device_id.to_string(),
100 addr: SocketAddr::new(addr, 6668),
101 key: key.map(|k| k.to_string()),
102 version,
103 connection: Default::default(),
104 })
105 }
106
107 pub async fn connect(&mut self) -> Result<RecvChannel> {
108 let tcp_stream = TcpStream::connect(&self.addr).await?;
109 tcp_stream.set_nodelay(true)?;
110
111 let (mut tcp_read_half, tcp_write_half) = tcp_stream.into_split();
112 let (tx, rx) = channel(10);
113
114 let mp = MessageParser::create(self.version.clone(), self.key.clone())?;
115 let mut connection = TuyaConnection {
116 mp,
117 seq_id: Default::default(),
118 tcp_write_half,
119 };
120
121 if self.version == TuyaVersion::ThreeFour {
123 let local_nonce = b"0123456789abcdef";
124 let local_key = self.key.clone().ok_or(ErrorKind::MissingKey)?;
125
126 let start_negotiation_msg = Message {
127 payload: Payload::Raw(local_nonce.to_vec()),
128 command: Some(CommandType::SessKeyNegStart),
129 seq_nr: Some(connection.seq_id.next_id()),
130 ret_code: None,
131 };
132
133 info!(
134 "Writing SessKeyNegStart msg to {} ({}):\n{}",
135 self.addr,
136 connection.seq_id.current(),
137 &start_negotiation_msg
138 );
139 connection
140 .tcp_write_half
141 .write_all(connection.mp.encode(&start_negotiation_msg, true)?.as_ref())
142 .await?;
143
144 let rkey = tcp_read(&mut tcp_read_half, &connection.mp).await?;
145 let rkey = rkey.into_iter().next().ok_or(ErrorKind::MissingRemoteKey)?;
146 let rkey = match rkey.payload {
147 Payload::Raw(s) if s.len() == 48 => Ok(s),
148 _ => Err(ErrorKind::InvalidRemoteKey),
149 }?;
150
151 let remote_nonce = &rkey[..16];
152 let _hmac = &rkey[16..48];
154
155 let rkey_hmac = connection.mp.cipher.hmac(remote_nonce)?;
156
157 let session_negotiation_finish_msg = Message {
158 payload: Payload::Raw(rkey_hmac),
159 command: Some(CommandType::SessKeyNegFinish),
160 seq_nr: Some(connection.seq_id.next_id()),
161 ret_code: None,
162 };
163
164 info!(
165 "Writing SessKeyNegFinish msg to {} ({}):\n{}",
166 self.addr,
167 connection.seq_id.current(),
168 &session_negotiation_finish_msg
169 );
170 connection
171 .tcp_write_half
172 .write_all(
173 connection
174 .mp
175 .encode(&session_negotiation_finish_msg, true)?
176 .as_ref(),
177 )
178 .await?;
179
180 let nonce_xor: Vec<u8> = local_nonce
181 .iter()
182 .zip(remote_nonce.iter())
183 .map(|(&a, &b)| a ^ b)
184 .collect();
185
186 debug!("nonce_xor: {}", hex::encode(&nonce_xor));
187
188 debug!("using local_key for crypter: {}", hex::encode(&local_key));
189
190 let local_key = GenericArray::from_slice(local_key.as_bytes());
191 let cipher = Aes128::new(local_key);
192
193 let mut nonce_xor = nonce_xor;
194 let block = GenericArray::from_mut_slice(nonce_xor.as_mut_slice());
195 cipher.encrypt_block(block);
196
197 debug!("session key: {}", hex::encode(&block));
198
199 connection.mp.cipher.set_key(block.to_vec())
200 }
201
202 let mp = connection.mp.clone();
203 self.connection = Some(connection);
204
205 tokio::spawn(async move {
206 loop {
207 let mut buf = [0; 1024];
208 let result = tcp_read_half.read(&mut buf).await;
209
210 let result = match result {
211 Ok(0) => Err(ErrorKind::TcpStreamClosed),
212 Ok(bytes) => {
213 info!("Received {} bytes", bytes);
214 mp.parse(&buf[..bytes])
215 }
216 Err(e) => Err(ErrorKind::TcpError(e)),
217 };
218
219 let send_result = match result {
220 Ok(messages) => tx.send(Ok(messages)).await,
221 Err(e) => {
222 info!("TCP Error: {:?}", e);
223 tx.send(Err(e)).await.ok();
224 break;
225 }
226 };
227
228 if let Err(e) = send_result {
229 info!("Receiver was dropped, disconnecting: {:?}", e);
230 break;
231 }
232 }
233 });
234
235 Ok(rx)
236 }
237
238 pub async fn set(&mut self, tuya_payload: Payload) -> Result<()> {
239 let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
240 let command = match self.version {
241 TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => CommandType::Control,
242 TuyaVersion::ThreeFour => CommandType::ControlNew,
243 };
244 let mes = Message::new(tuya_payload, command);
245 connection.send(&mes).await?;
246
247 Ok(())
248 }
249
250 pub async fn set_values(&mut self, dps: serde_json::Value) -> Result<()> {
251 let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
252 let command = match self.version {
253 TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => CommandType::Control,
254 TuyaVersion::ThreeFour => CommandType::ControlNew,
255 };
256
257 let current_time = SystemTime::now()
258 .duration_since(SystemTime::UNIX_EPOCH)?
259 .as_secs() as u32;
260 let device_id = self.device_id.clone();
263
264 let payload = match self.version {
265 TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => Payload::Struct(PayloadStruct {
266 gw_id: Some(device_id.clone()),
267 dev_id: device_id.clone(),
268 uid: Some(device_id.clone()),
269 t: Some(current_time.to_string()),
270 dp_id: None,
271 dps: Some(dps),
272 }),
273 TuyaVersion::ThreeFour => Payload::ControlNewStruct(ControlNewPayload {
274 protocol: 5,
275 t: current_time,
276 data: ControlNewPayloadData { dps },
277 }),
278 };
279 let mes = Message::new(payload, command);
280 connection.send(&mes).await?;
281
282 Ok(())
283 }
284
285 pub async fn get(&mut self, tuya_payload: Payload) -> Result<()> {
286 let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
287 let command = match self.version {
288 TuyaVersion::ThreeOne | TuyaVersion::ThreeThree => CommandType::DpQuery,
289 TuyaVersion::ThreeFour => CommandType::DpQueryNew,
290 };
291 let mes = Message::new(tuya_payload, command);
292 connection.send(&mes).await?;
293
294 Ok(())
295 }
296
297 pub async fn refresh(&mut self, tuya_payload: Payload) -> Result<()> {
298 let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
299 let mes = Message::new(tuya_payload, CommandType::DpRefresh);
300 connection.send(&mes).await?;
301
302 Ok(())
303 }
304
305 pub async fn send_msg(&mut self, msg: Message) -> Result<()> {
306 let connection = self.connection.as_mut().ok_or(ErrorKind::NotConnected)?;
307 connection.send(&msg).await?;
308
309 Ok(())
310 }
311}