1use drmem_api::{
33 device,
34 driver::{self, DriverConfig},
35 Error, Result,
36};
37use futures::{Future, FutureExt};
38use std::net::SocketAddrV4;
39use std::sync::Arc;
40use std::{convert::Infallible, pin::Pin};
41use tokio::{
42 io::{AsyncReadExt, AsyncWriteExt},
43 net::TcpStream,
44 sync::{Mutex, MutexGuard},
45 time,
46};
47use tracing::{debug, error, warn, Span};
48
49mod tplink_api;
50
51const BUF_TOTAL: usize = 4_096;
52
53pub struct Instance {
54 addr: SocketAddrV4,
55 reported_error: Option<bool>,
56 buf: [u8; BUF_TOTAL],
57}
58
59pub struct Devices {
60 d_error: driver::ReadOnlyDevice<bool>,
61 d_brightness: driver::ReadWriteDevice<f64>,
62 d_led: driver::ReadWriteDevice<bool>,
63}
64
65impl Instance {
66 pub const NAME: &'static str = "tplink";
67
68 pub const SUMMARY: &'static str = "monitors and controls TP-Link devices";
69
70 pub const DESCRIPTION: &'static str = include_str!("../README.md");
71
72 fn get_cfg_address(cfg: &DriverConfig) -> Result<SocketAddrV4> {
76 match cfg.get("addr") {
77 Some(toml::value::Value::String(addr)) => {
78 if let Ok(addr) = addr.parse::<SocketAddrV4>() {
79 Ok(addr)
80 } else {
81 Err(Error::ConfigError(String::from(
82 "'addr' not in hostname:port format",
83 )))
84 }
85 }
86 Some(_) => Err(Error::ConfigError(String::from(
87 "'addr' config parameter should be a string",
88 ))),
89 None => Err(Error::ConfigError(String::from(
90 "missing 'addr' parameter in config",
91 ))),
92 }
93 }
94
95 async fn read_reply<R>(&mut self, s: &mut R) -> Result<tplink_api::Reply>
100 where
101 R: AsyncReadExt + std::marker::Unpin,
102 {
103 if let Ok(sz) = s.read_u32().await {
104 let sz = sz as usize;
105
106 if sz <= BUF_TOTAL {
107 let filled = &mut self.buf[0..sz];
108
109 if let Err(e) = s.read_exact(filled).await {
110 Err(Error::MissingPeer(e.to_string()))
111 } else {
112 tplink_api::Reply::decode(filled).ok_or_else(|| {
113 Error::ParseError(format!(
114 "bad reply : {}",
115 String::from_utf8_lossy(filled)
116 ))
117 })
118 }
119 } else {
120 Err(Error::ParseError(format!(
121 "reply size ({sz}) is greater than {BUF_TOTAL}"
122 )))
123 }
124 } else {
125 Err(Error::MissingPeer("error reading header".into()))
126 }
127 }
128
129 async fn send_cmd<S>(s: &mut S, cmd: tplink_api::Cmd) -> Result<()>
132 where
133 S: AsyncWriteExt + std::marker::Unpin,
134 {
135 const ERR_F: fn(std::io::Error) -> Error =
136 |e| Error::MissingPeer(e.to_string());
137 let out_buf = cmd.encode();
138
139 #[rustfmt::skip]
140 tokio::select! {
141 result = s.write_all(&out_buf[..]) => {
142 match result {
143 Ok(_) => s.flush().await.map_err(ERR_F),
144 Err(e) => Err(ERR_F(e))
145 }
146 }
147 _ = time::sleep(time::Duration::from_millis(500)) =>
148 Err(Error::TimeoutError)
149 }
150 }
151
152 async fn rpc<R, S>(
156 &mut self,
157 rx: &mut R,
158 tx: &mut S,
159 cmd: tplink_api::Cmd,
160 ) -> Result<tplink_api::Reply>
161 where
162 R: AsyncReadExt + std::marker::Unpin,
163 S: AsyncWriteExt + std::marker::Unpin,
164 {
165 Instance::send_cmd(tx, cmd)
166 .then(|res| async {
167 match res {
168 Ok(()) => {
169 #[rustfmt::skip]
170 tokio::select! {
171 result = self.read_reply(rx) => result,
172 _ = time::sleep(time::Duration::from_millis(500)) =>
173 Err(Error::TimeoutError)
174 }
175 }
176 Err(e) => Err(e),
177 }
178 })
179 .await
180 }
181
182 async fn relay_state_rpc(
185 &mut self,
186 s: &mut TcpStream,
187 v: bool,
188 ) -> Result<()> {
189 use tplink_api::{active_cmd, ErrorStatus, Reply};
190
191 let (mut rx, mut tx) = s.split();
192
193 match self.rpc(&mut rx, &mut tx, active_cmd(v as u8)).await? {
194 Reply::System {
195 set_relay_state: Some(ErrorStatus { err_code: 0, .. }),
196 ..
197 } => Ok(()),
198
199 Reply::System {
200 set_relay_state:
201 Some(ErrorStatus {
202 err_msg: Some(em), ..
203 }),
204 ..
205 } => Err(Error::ProtocolError(em)),
206
207 reply => Err(Error::ProtocolError(format!(
208 "unexpected reply : {:?}",
209 &reply
210 ))),
211 }
212 }
213
214 async fn led_state_rpc(
217 &mut self,
218 s: &mut TcpStream,
219 v: bool,
220 ) -> Result<()> {
221 use tplink_api::{led_cmd, ErrorStatus, Reply};
222
223 let (mut rx, mut tx) = s.split();
224
225 match self.rpc(&mut rx, &mut tx, led_cmd(v)).await? {
229 Reply::System {
230 set_led_off: Some(ErrorStatus { err_code: 0, .. }),
231 ..
232 } => Ok(()),
233
234 Reply::System {
235 set_led_off:
236 Some(ErrorStatus {
237 err_msg: Some(em), ..
238 }),
239 ..
240 } => Err(Error::ProtocolError(em)),
241
242 reply => Err(Error::ProtocolError(format!(
243 "unexpected reply : {:?}",
244 &reply
245 ))),
246 }
247 }
248
249 async fn info_rpc(&mut self, s: &mut TcpStream) -> Result<(bool, u8)> {
252 use tplink_api::{info_cmd, Reply};
253
254 let (mut rx, mut tx) = s.split();
255
256 match self.rpc(&mut rx, &mut tx, info_cmd()).await? {
260 Reply::System {
261 get_sysinfo: Some(info),
262 ..
263 } => {
264 let led = info.led_off.unwrap_or(1) == 0;
265
266 match (info.relay_state.map(|v| v != 0), info.brightness) {
267 (None, None) => Ok((led, 0)),
268 (None, Some(br)) => Ok((led, br)),
269 (Some(false), _) => Ok((led, 0)),
270 (Some(true), br) => Ok((led, br.unwrap_or(100))),
271 }
272 }
273
274 reply => Err(Error::ProtocolError(format!(
275 "unexpected reply : {:?}",
276 &reply
277 ))),
278 }
279 }
280
281 async fn brightness_rpc(&mut self, s: &mut TcpStream, v: u8) -> Result<()> {
285 use tplink_api::{brightness_cmd, ErrorStatus, Reply};
286
287 let (mut rx, mut tx) = s.split();
288
289 match self.rpc(&mut rx, &mut tx, brightness_cmd(v)).await? {
290 Reply::Dimmer {
291 set_brightness: Some(ErrorStatus { err_code: 0, .. }),
292 ..
293 } => Ok(()),
294
295 Reply::Dimmer {
296 set_brightness:
297 Some(ErrorStatus {
298 err_msg: Some(em), ..
299 }),
300 ..
301 } => Err(Error::ProtocolError(em)),
302
303 reply => Err(Error::ProtocolError(format!(
304 "unexpected reply : {:?}",
305 &reply
306 ))),
307 }
308 }
309
310 async fn set_brightness(
314 &mut self,
315 s: &mut TcpStream,
316 v: f64,
317 ) -> Result<()> {
318 if v > 0.0 {
323 self.brightness_rpc(s, v as u8).await?;
324 self.relay_state_rpc(s, true).await
325 } else {
326 self.relay_state_rpc(s, false).await
327 }
328 }
329
330 async fn connect(addr: &SocketAddrV4) -> Result<TcpStream> {
334 use tokio::net::TcpSocket;
335
336 let fut = time::timeout(time::Duration::from_secs(1), async {
337 match TcpSocket::new_v4() {
338 Ok(s) => {
339 s.set_recv_buffer_size((BUF_TOTAL * 2) as u32)?;
340
341 let s = s.connect((*addr).into()).await?;
342
343 s.set_nodelay(false)?;
344 Ok(s)
345 }
346 Err(e) => Err(e),
347 }
348 });
349
350 match fut.await {
351 Ok(Ok(s)) => Ok(s),
352 Ok(Err(e)) => Err(Error::MissingPeer(e.to_string())),
353 Err(_) => Err(Error::MissingPeer("timeout".into())),
354 }
355 }
356
357 async fn handle_brightness_setting<'a>(
360 &mut self,
361 s: &'a mut TcpStream,
362 v: f64,
363 reply: driver::SettingReply<f64>,
364 report: &'a mut driver::ReadWriteDevice<f64>,
365 ) -> Result<Option<f64>> {
366 if !v.is_nan() {
367 let v = match v {
371 v if v == f64::INFINITY => 100.0,
372 v if v == f64::NEG_INFINITY => 0.0,
373 v if v < 0.0 => 0.0,
374 v if v > 100.0 => 100.0,
375 v => v,
376 };
377
378 reply(Ok(v));
381
382 match self.set_brightness(s, v).await {
387 Ok(()) => {
388 report.report_update(v).await;
389 Ok(Some(v))
390 }
391 Err(e) => {
392 error!("setting brightness : {}", &e);
393 Err(e)
394 }
395 }
396 } else {
397 reply(Err(Error::InvArgument("device doesn't accept NaN".into())));
398 Ok(None)
399 }
400 }
401
402 async fn handle_led_setting<'a>(
405 &mut self,
406 s: &'a mut TcpStream,
407 v: bool,
408 reply: driver::SettingReply<bool>,
409 report: &'a mut driver::ReadWriteDevice<bool>,
410 ) -> Result<()> {
411 reply(Ok(v));
412 match self.led_state_rpc(s, v).await {
413 Ok(()) => {
414 report.report_update(v).await;
415 Ok(())
416 }
417 Err(e) => {
418 error!("setting LED : {}", &e);
419 Err(e)
420 }
421 }
422 }
423
424 async fn sync_error_state(
429 &mut self,
430 report: &mut driver::ReadOnlyDevice<bool>,
431 value: bool,
432 ) {
433 if self.reported_error != Some(value) {
434 self.reported_error = Some(value);
435 report.report_update(value).await;
436 }
437 }
438
439 async fn main_loop<'a>(
440 &mut self,
441 s: &mut TcpStream,
442 devices: &mut MutexGuard<'_, Devices>,
443 ) {
444 let mut timer =
449 tokio::time::interval(tokio::time::Duration::from_secs(5));
450 let mut current_led = false;
451 let mut current_brightness = -1.0f64;
452
453 'main: loop {
456 self.sync_error_state(&mut devices.d_error, false).await;
457
458 let Devices {
461 d_brightness: ref mut d_b,
462 d_led: ref mut d_l,
463 ..
464 } = **devices;
465
466 #[rustfmt::skip]
469 tokio::select! {
470 _ = timer.tick() => {
476 if let Ok((led, br)) = self.info_rpc(s).await {
477 let br = br as f64;
478
479 if current_led != led {
483 debug!("external LED update: {}", led);
484 current_led = led;
485 d_l.report_update(led).await;
486 }
487
488 if current_brightness != br {
492 debug!("external brightness update: {}", br);
493 current_brightness = br;
494 devices.d_brightness.report_update(br).await;
495 }
496 } else {
497 break 'main
498 }
499 }
500
501 Some((v, reply)) = d_b.next_setting() => {
504
505 if current_brightness != v {
509 match self.handle_brightness_setting(
510 s, v, reply, d_b
511 ).await {
512 Ok(Some(v)) => current_brightness = v,
513 Ok(None) => (),
514 Err(_) => break 'main
515 }
516 } else {
517 debug!("don't need to apply brightness setting");
518
519 d_b.report_update(v).await;
524 reply(Ok(v))
525 }
526 }
527
528 Some((v, reply)) = d_l.next_setting() => {
531 debug!("led setting -> {}", &v);
532
533 if current_led != v {
537 if self.handle_led_setting(
538 s, v, reply, &mut devices.d_led
539 ).await == Ok(()) {
540 current_led = v;
541 } else {
542 break 'main
543 }
544 } else {
545 debug!("don't need to apply led setting");
546
547 d_l.report_update(v).await;
552 reply(Ok(v))
553 }
554 }
555 }
556 }
557 }
558}
559
560impl driver::API for Instance {
561 type DeviceSet = Devices;
562
563 fn register_devices(
566 core: driver::RequestChan,
567 _cfg: &DriverConfig,
568 max_history: Option<usize>,
569 ) -> Pin<Box<dyn Future<Output = Result<Self::DeviceSet>> + Send>> {
570 let error_name = "error"
571 .parse::<device::Base>()
572 .expect("parsing 'error' should never fail");
573 let brightness_name = "brightness"
574 .parse::<device::Base>()
575 .expect("parsing 'brightness' should never fail");
576 let led_name = "led"
577 .parse::<device::Base>()
578 .expect("parsing 'led' should never fail");
579
580 Box::pin(async move {
581 let d_error =
584 core.add_ro_device(error_name, None, max_history).await?;
585 let d_brightness = core
586 .add_rw_device(brightness_name, None, max_history)
587 .await?;
588 let d_led = core.add_rw_device(led_name, None, max_history).await?;
589
590 Ok(Devices {
591 d_error,
592 d_brightness,
593 d_led,
594 })
595 })
596 }
597
598 fn create_instance(
602 cfg: &DriverConfig,
603 ) -> Pin<Box<dyn Future<Output = Result<Box<Self>>> + Send>> {
604 let cfg_addr = Instance::get_cfg_address(cfg);
605
606 Box::pin(async {
607 Ok(Box::new(Instance {
608 addr: cfg_addr?,
609 reported_error: None,
610 buf: [0; BUF_TOTAL],
611 }))
612 })
613 }
614
615 fn run<'a>(
618 &'a mut self,
619 devices: Arc<Mutex<Devices>>,
620 ) -> Pin<Box<dyn Future<Output = Infallible> + Send + 'a>> {
621 let fut = async move {
622 let mut devices = devices.lock().await;
628
629 Span::current().record("cfg", self.addr.to_string());
633
634 loop {
635 match Instance::connect(&self.addr).await {
641 Ok(mut s) => {
642 self.main_loop(&mut s, &mut devices).await;
643 }
644 Err(e) => {
645 warn!("couldn't connect : '{}'", e);
646 }
647 }
648
649 self.sync_error_state(&mut devices.d_error, true).await;
650
651 tokio::time::sleep(tokio::time::Duration::from_secs(10)).await
655 }
656 };
657
658 Box::pin(fut)
659 }
660}
661
662#[cfg(test)]
663mod test {
664 use super::{tplink_api, Instance};
665 use crate::BUF_TOTAL;
666 use std::{
667 io::Write,
668 net::{Ipv4Addr, SocketAddrV4},
669 };
670
671 #[tokio::test]
672 async fn test_read_reply() {
673 {
676 let buf: &[u8] = &[0, 0, 0];
677 let mut inst = Instance {
678 addr: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0),
679 reported_error: None,
680 buf: [0u8; BUF_TOTAL],
681 };
682
683 assert!(inst.read_reply(&mut &buf[0..=0]).await.is_err());
684 assert!(inst.read_reply(&mut &buf[0..1]).await.is_err());
685 assert!(inst.read_reply(&mut &buf[0..2]).await.is_err());
686 assert!(inst.read_reply(&mut &buf[0..3]).await.is_err());
687 }
688
689 {
690 const REPLY: &[u8] =
691 b"{\"system\":{\"set_led_off\":{\"err_code\":0}}}";
692
693 let mut buf = vec![0, 0, 0, REPLY.len() as u8];
694
695 {
696 let mut wr = tplink_api::CmdWriter::create(&mut buf);
697
698 assert_eq!(wr.write(REPLY).unwrap(), REPLY.len());
699 }
700
701 assert!(buf.len() == 45);
702 assert!(buf.as_slice().len() == 45);
703
704 let mut inst = Instance {
705 addr: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0),
706 reported_error: None,
707 buf: [0u8; BUF_TOTAL],
708 };
709
710 assert!(inst.read_reply(&mut &buf[0..4]).await.is_err());
711 assert!(inst.read_reply(&mut &buf[0..5]).await.is_err());
712 assert!(inst.read_reply(&mut buf.as_slice()).await.is_ok());
713 }
714 }
715}