Skip to main content

drmem_drv_tplink/
lib.rs

1// A driver to manage devices that use the TP-Link protocol. This
2// protocol sends and receives JSON data over a TCP connection. Some
3// sample exchanges for the HS220 dimmer:
4//
5//  Get status:
6//
7//   Sent:      {"system":{"get_sysinfo":{}}}
8//   Received:  ???
9//
10//  Turning it on/off:
11//
12//   Turn on:   {"system":{"set_relay_state":{"state":1}}}
13//   Turn off:  {"system":{"set_relay_state":{"state":0}}}
14//   Received:  {"system":{"set_relay_state":{"err_code":0}}}
15//
16//  Setting the brightness to 75%:
17//
18//   Sent:      {"smartlife.iot.dimmer":{"set_brightness":{"brightness":75}}}
19//   Received:  {"smartlife.iot.dimmer":{"set_brightness":{"err_code":0}}}
20//
21//  Controlling LED indicator:
22//
23//   Turn on:   {"system":{"set_led_off":{"off":0}}}
24//   Turn off:  {"system":{"set_led_off":{"off":1}}}
25//   Received:  {"system":{"set_led_off":{"err_code":0}}}
26//
27//  Error reply (example):
28//
29//   Sent:      {"system":{"set_bright":{"bright":75}}}
30//   Received:  {"system":{"set_bright":{"err_code":-2,"err_msg":"member not support"}}}
31
32use drmem_api::{
33    device,
34    driver::{self, DriverConfig},
35    Error, Result,
36};
37use futures::{Future, FutureExt};
38use std::net::SocketAddrV4;
39use std::sync::Arc;
40use std::{convert::Infallible, pin::Pin};
41use tokio::{
42    io::{AsyncReadExt, AsyncWriteExt},
43    net::TcpStream,
44    sync::{Mutex, MutexGuard},
45    time,
46};
47use tracing::{debug, error, warn, Span};
48
49mod tplink_api;
50
51const BUF_TOTAL: usize = 4_096;
52
53pub struct Instance {
54    addr: SocketAddrV4,
55    reported_error: Option<bool>,
56    buf: [u8; BUF_TOTAL],
57}
58
59pub struct Devices {
60    d_error: driver::ReadOnlyDevice<bool>,
61    d_brightness: driver::ReadWriteDevice<f64>,
62    d_led: driver::ReadWriteDevice<bool>,
63}
64
65impl Instance {
66    pub const NAME: &'static str = "tplink";
67
68    pub const SUMMARY: &'static str = "monitors and controls TP-Link devices";
69
70    pub const DESCRIPTION: &'static str = include_str!("../README.md");
71
72    // Pull the hostname/port for the remote process from the
73    // configuration.
74
75    fn get_cfg_address(cfg: &DriverConfig) -> Result<SocketAddrV4> {
76        match cfg.get("addr") {
77            Some(toml::value::Value::String(addr)) => {
78                if let Ok(addr) = addr.parse::<SocketAddrV4>() {
79                    Ok(addr)
80                } else {
81                    Err(Error::ConfigError(String::from(
82                        "'addr' not in hostname:port format",
83                    )))
84                }
85            }
86            Some(_) => Err(Error::ConfigError(String::from(
87                "'addr' config parameter should be a string",
88            ))),
89            None => Err(Error::ConfigError(String::from(
90                "missing 'addr' parameter in config",
91            ))),
92        }
93    }
94
95    // Attempts to read a `tplink_api::Reply` type from the socket.
96    // All replies have a 4-byte length header so we know how much
97    // data to read.
98
99    async fn read_reply<R>(&mut self, s: &mut R) -> Result<tplink_api::Reply>
100    where
101        R: AsyncReadExt + std::marker::Unpin,
102    {
103        if let Ok(sz) = s.read_u32().await {
104            let sz = sz as usize;
105
106            if sz <= BUF_TOTAL {
107                let filled = &mut self.buf[0..sz];
108
109                if let Err(e) = s.read_exact(filled).await {
110                    Err(Error::MissingPeer(e.to_string()))
111                } else {
112                    tplink_api::Reply::decode(filled).ok_or_else(|| {
113                        Error::ParseError(format!(
114                            "bad reply : {}",
115                            String::from_utf8_lossy(filled)
116                        ))
117                    })
118                }
119            } else {
120                Err(Error::ParseError(format!(
121                    "reply size ({sz}) is greater than {BUF_TOTAL}"
122                )))
123            }
124        } else {
125            Err(Error::MissingPeer("error reading header".into()))
126        }
127    }
128
129    // Attempts to send a command to the socket.
130
131    async fn send_cmd<S>(s: &mut S, cmd: tplink_api::Cmd) -> Result<()>
132    where
133        S: AsyncWriteExt + std::marker::Unpin,
134    {
135        const ERR_F: fn(std::io::Error) -> Error =
136            |e| Error::MissingPeer(e.to_string());
137        let out_buf = cmd.encode();
138
139        #[rustfmt::skip]
140	tokio::select! {
141	    result = s.write_all(&out_buf[..]) => {
142		match result {
143		    Ok(_) => s.flush().await.map_err(ERR_F),
144		    Err(e) => Err(ERR_F(e))
145		}
146	    }
147	    _ = time::sleep(time::Duration::from_millis(500)) =>
148		Err(Error::TimeoutError)
149	}
150    }
151
152    // Performs an "RPC" call to the device; it sends the command and
153    // returns the reply.
154
155    async fn rpc<R, S>(
156        &mut self,
157        rx: &mut R,
158        tx: &mut S,
159        cmd: tplink_api::Cmd,
160    ) -> Result<tplink_api::Reply>
161    where
162        R: AsyncReadExt + std::marker::Unpin,
163        S: AsyncWriteExt + std::marker::Unpin,
164    {
165        Instance::send_cmd(tx, cmd)
166            .then(|res| async {
167                match res {
168                    Ok(()) => {
169                        #[rustfmt::skip]
170			tokio::select! {
171			    result = self.read_reply(rx) => result,
172			    _ = time::sleep(time::Duration::from_millis(500)) =>
173				Err(Error::TimeoutError)
174			}
175                    }
176                    Err(e) => Err(e),
177                }
178            })
179            .await
180    }
181
182    // Sets the relay state on or off, depending on the argument.
183
184    async fn relay_state_rpc(
185        &mut self,
186        s: &mut TcpStream,
187        v: bool,
188    ) -> Result<()> {
189        use tplink_api::{active_cmd, ErrorStatus, Reply};
190
191        let (mut rx, mut tx) = s.split();
192
193        match self.rpc(&mut rx, &mut tx, active_cmd(v as u8)).await? {
194            Reply::System {
195                set_relay_state: Some(ErrorStatus { err_code: 0, .. }),
196                ..
197            } => Ok(()),
198
199            Reply::System {
200                set_relay_state:
201                    Some(ErrorStatus {
202                        err_msg: Some(em), ..
203                    }),
204                ..
205            } => Err(Error::ProtocolError(em)),
206
207            reply => Err(Error::ProtocolError(format!(
208                "unexpected reply : {:?}",
209                &reply
210            ))),
211        }
212    }
213
214    // Sets the LED state on or off, depending on the argument.
215
216    async fn led_state_rpc(
217        &mut self,
218        s: &mut TcpStream,
219        v: bool,
220    ) -> Result<()> {
221        use tplink_api::{led_cmd, ErrorStatus, Reply};
222
223        let (mut rx, mut tx) = s.split();
224
225        // Send the request and receive the reply. Use pattern
226        // matching to determine the return value of the function.
227
228        match self.rpc(&mut rx, &mut tx, led_cmd(v)).await? {
229            Reply::System {
230                set_led_off: Some(ErrorStatus { err_code: 0, .. }),
231                ..
232            } => Ok(()),
233
234            Reply::System {
235                set_led_off:
236                    Some(ErrorStatus {
237                        err_msg: Some(em), ..
238                    }),
239                ..
240            } => Err(Error::ProtocolError(em)),
241
242            reply => Err(Error::ProtocolError(format!(
243                "unexpected reply : {:?}",
244                &reply
245            ))),
246        }
247    }
248
249    // Retrieves info.
250
251    async fn info_rpc(&mut self, s: &mut TcpStream) -> Result<(bool, u8)> {
252        use tplink_api::{info_cmd, Reply};
253
254        let (mut rx, mut tx) = s.split();
255
256        // Send the request and receive the reply. Use pattern
257        // matching to determine the return value of the function.
258
259        match self.rpc(&mut rx, &mut tx, info_cmd()).await? {
260            Reply::System {
261                get_sysinfo: Some(info),
262                ..
263            } => {
264                let led = info.led_off.unwrap_or(1) == 0;
265
266                match (info.relay_state.map(|v| v != 0), info.brightness) {
267                    (None, None) => Ok((led, 0)),
268                    (None, Some(br)) => Ok((led, br)),
269                    (Some(false), _) => Ok((led, 0)),
270                    (Some(true), br) => Ok((led, br.unwrap_or(100))),
271                }
272            }
273
274            reply => Err(Error::ProtocolError(format!(
275                "unexpected reply : {:?}",
276                &reply
277            ))),
278        }
279    }
280
281    // Sets the brightness between 0 and 100, depending on the
282    // argument.
283
284    async fn brightness_rpc(&mut self, s: &mut TcpStream, v: u8) -> Result<()> {
285        use tplink_api::{brightness_cmd, ErrorStatus, Reply};
286
287        let (mut rx, mut tx) = s.split();
288
289        match self.rpc(&mut rx, &mut tx, brightness_cmd(v)).await? {
290            Reply::Dimmer {
291                set_brightness: Some(ErrorStatus { err_code: 0, .. }),
292                ..
293            } => Ok(()),
294
295            Reply::Dimmer {
296                set_brightness:
297                    Some(ErrorStatus {
298                        err_msg: Some(em), ..
299                    }),
300                ..
301            } => Err(Error::ProtocolError(em)),
302
303            reply => Err(Error::ProtocolError(format!(
304                "unexpected reply : {:?}",
305                &reply
306            ))),
307        }
308    }
309
310    // Sends commands to change the brightness. NOTE: This function
311    // assumes `v` is in the range 0.0..=100.0.
312
313    async fn set_brightness(
314        &mut self,
315        s: &mut TcpStream,
316        v: f64,
317    ) -> Result<()> {
318        // If the brightness is zero, we turn off the dimmer instead
319        // of setting the brightness to 0.0. If it's greater than 0.0,
320        // set the brightness and then turn on the dimmer.
321
322        if v > 0.0 {
323            self.brightness_rpc(s, v as u8).await?;
324            self.relay_state_rpc(s, true).await
325        } else {
326            self.relay_state_rpc(s, false).await
327        }
328    }
329
330    // Connects to the address. Sets a timeout of 1 second for the
331    // connection.
332
333    async fn connect(addr: &SocketAddrV4) -> Result<TcpStream> {
334        use tokio::net::TcpSocket;
335
336        let fut = time::timeout(time::Duration::from_secs(1), async {
337            match TcpSocket::new_v4() {
338                Ok(s) => {
339                    s.set_recv_buffer_size((BUF_TOTAL * 2) as u32)?;
340
341                    let s = s.connect((*addr).into()).await?;
342
343                    s.set_nodelay(false)?;
344                    Ok(s)
345                }
346                Err(e) => Err(e),
347            }
348        });
349
350        match fut.await {
351            Ok(Ok(s)) => Ok(s),
352            Ok(Err(e)) => Err(Error::MissingPeer(e.to_string())),
353            Err(_) => Err(Error::MissingPeer("timeout".into())),
354        }
355    }
356
357    // Handles incoming settings for brightness.
358
359    async fn handle_brightness_setting<'a>(
360        &mut self,
361        s: &'a mut TcpStream,
362        v: f64,
363        reply: driver::SettingReply<f64>,
364        report: &'a mut driver::ReadWriteDevice<f64>,
365    ) -> Result<Option<f64>> {
366        if !v.is_nan() {
367            // Clip incoming settings to the range 0.0..=100.0. Handle
368            // infinities, too.
369
370            let v = match v {
371                v if v == f64::INFINITY => 100.0,
372                v if v == f64::NEG_INFINITY => 0.0,
373                v if v < 0.0 => 0.0,
374                v if v > 100.0 => 100.0,
375                v => v,
376            };
377
378            // Send an OK reply to the client with the updated value.
379
380            reply(Ok(v));
381
382            // Always log incoming settings. Let the client know there
383            // was a successful setting, and include the value that
384            // was used.
385
386            match self.set_brightness(s, v).await {
387                Ok(()) => {
388                    report.report_update(v).await;
389                    Ok(Some(v))
390                }
391                Err(e) => {
392                    error!("setting brightness : {}", &e);
393                    Err(e)
394                }
395            }
396        } else {
397            reply(Err(Error::InvArgument("device doesn't accept NaN".into())));
398            Ok(None)
399        }
400    }
401
402    // Handles incoming settings for controlling the LED indicator.
403
404    async fn handle_led_setting<'a>(
405        &mut self,
406        s: &'a mut TcpStream,
407        v: bool,
408        reply: driver::SettingReply<bool>,
409        report: &'a mut driver::ReadWriteDevice<bool>,
410    ) -> Result<()> {
411        reply(Ok(v));
412        match self.led_state_rpc(s, v).await {
413            Ok(()) => {
414                report.report_update(v).await;
415                Ok(())
416            }
417            Err(e) => {
418                error!("setting LED : {}", &e);
419                Err(e)
420            }
421        }
422    }
423
424    // Checks to see if the current error state ('value') matches the
425    // previosuly reported error state. If not, it saves the current
426    // state and sends the updated value to the backend.
427
428    async fn sync_error_state(
429        &mut self,
430        report: &mut driver::ReadOnlyDevice<bool>,
431        value: bool,
432    ) {
433        if self.reported_error != Some(value) {
434            self.reported_error = Some(value);
435            report.report_update(value).await;
436        }
437    }
438
439    async fn main_loop<'a>(
440        &mut self,
441        s: &mut TcpStream,
442        devices: &mut MutexGuard<'_, Devices>,
443    ) {
444        // Create a 5-second interval timer which will be used to poll
445        // the device to see if its state was changed by some outside
446        // mechanism.
447
448        let mut timer =
449            tokio::time::interval(tokio::time::Duration::from_secs(5));
450        let mut current_led = false;
451        let mut current_brightness = -1.0f64;
452
453        // Main loop of the driver. This loop never ends.
454
455        'main: loop {
456            self.sync_error_state(&mut devices.d_error, false).await;
457
458            // Get mutable references to the setting channels.
459
460            let Devices {
461                d_brightness: ref mut d_b,
462                d_led: ref mut d_l,
463                ..
464            } = **devices;
465
466            // Now wait for one of three events to occur.
467
468            #[rustfmt::skip]
469            tokio::select! {
470                // If the timer tick expires, it's time to get the
471                // latest state of the device. Since external apps can
472                // modify the device outside of DrMem's control, we
473                // have to periodically poll it to stay in sync.
474
475                _ = timer.tick() => {
476		    if let Ok((led, br)) = self.info_rpc(s).await {
477			let br = br as f64;
478
479			// If the LED state has changed outside of the
480			// driver, update the local state.
481
482			if current_led != led {
483			    debug!("external LED update: {}", led);
484			    current_led = led;
485			    d_l.report_update(led).await;
486			}
487
488			// If the brightness state has changed outside
489			// of the driver, update the local state.
490
491			if current_brightness != br {
492			    debug!("external brightness update: {}", br);
493			    current_brightness = br;
494			    devices.d_brightness.report_update(br).await;
495			}
496		    } else {
497			break 'main
498		    }
499                }
500
501		// Handle settings to the brightness device.
502
503                Some((v, reply)) = d_b.next_setting() => {
504
505		    // If the settings matches the current state, then
506		    // don't actually control the hardware.
507
508		    if current_brightness != v {
509			match self.handle_brightness_setting(
510			    s, v, reply, d_b
511			).await {
512			    Ok(Some(v)) => current_brightness = v,
513			    Ok(None) => (),
514			    Err(_) => break 'main
515			}
516		    } else {
517			debug!("don't need to apply brightness setting");
518
519			// Hardware wasn't updated, but we still need
520			// to log the setting and return a reply to
521			// the client.
522
523			d_b.report_update(v).await;
524			reply(Ok(v))
525		    }
526                }
527
528		// Handle settings to the LED indicator device.
529
530                Some((v, reply)) = d_l.next_setting() => {
531		    debug!("led setting -> {}", &v);
532
533		    // If the settings matches the current state, then
534		    // don't actually control the hardware.
535
536		    if current_led != v {
537			if self.handle_led_setting(
538			    s, v, reply, &mut devices.d_led
539			).await == Ok(()) {
540			    current_led = v;
541			} else {
542			    break 'main
543			}
544		    } else {
545			debug!("don't need to apply led setting");
546
547			// Hardware wasn't updated, but we still need
548			// to log the setting and return a reply to
549			// the client.
550
551			d_l.report_update(v).await;
552			reply(Ok(v))
553		    }
554                }
555            }
556        }
557    }
558}
559
560impl driver::API for Instance {
561    type DeviceSet = Devices;
562
563    // Registers two devices, `error` and `brightness`.
564
565    fn register_devices(
566        core: driver::RequestChan,
567        _cfg: &DriverConfig,
568        max_history: Option<usize>,
569    ) -> Pin<Box<dyn Future<Output = Result<Self::DeviceSet>> + Send>> {
570        let error_name = "error"
571            .parse::<device::Base>()
572            .expect("parsing 'error' should never fail");
573        let brightness_name = "brightness"
574            .parse::<device::Base>()
575            .expect("parsing 'brightness' should never fail");
576        let led_name = "led"
577            .parse::<device::Base>()
578            .expect("parsing 'led' should never fail");
579
580        Box::pin(async move {
581            // Define the devices managed by this driver.
582
583            let d_error =
584                core.add_ro_device(error_name, None, max_history).await?;
585            let d_brightness = core
586                .add_rw_device(brightness_name, None, max_history)
587                .await?;
588            let d_led = core.add_rw_device(led_name, None, max_history).await?;
589
590            Ok(Devices {
591                d_error,
592                d_brightness,
593                d_led,
594            })
595        })
596    }
597
598    // This driver doesn't store any data in its instance; it's all
599    // stored in local variables in the `.run()` method.
600
601    fn create_instance(
602        cfg: &DriverConfig,
603    ) -> Pin<Box<dyn Future<Output = Result<Box<Self>>> + Send>> {
604        let cfg_addr = Instance::get_cfg_address(cfg);
605
606        Box::pin(async {
607            Ok(Box::new(Instance {
608                addr: cfg_addr?,
609                reported_error: None,
610                buf: [0; BUF_TOTAL],
611            }))
612        })
613    }
614
615    // Main run loop for the driver.
616
617    fn run<'a>(
618        &'a mut self,
619        devices: Arc<Mutex<Devices>>,
620    ) -> Pin<Box<dyn Future<Output = Infallible> + Send + 'a>> {
621        let fut = async move {
622            // Lock the mutex for the life of the driver. There is no
623            // other task that wants access to these device handles.
624            // An Arc<Mutex<>> is the only way I know of sharing a
625            // mutable value with async tasks.
626
627            let mut devices = devices.lock().await;
628
629            // Record the devices's address in the "cfg" field of the
630            // span.
631
632            Span::current().record("cfg", self.addr.to_string());
633
634            loop {
635                // First, connect to the device. We'll leave the TCP
636                // connection open so we're ready for the next
637                // transaction. Tests have shown that the HS220
638                // handles multiple client connections.
639
640                match Instance::connect(&self.addr).await {
641                    Ok(mut s) => {
642                        self.main_loop(&mut s, &mut devices).await;
643                    }
644                    Err(e) => {
645                        warn!("couldn't connect : '{}'", e);
646                    }
647                }
648
649                self.sync_error_state(&mut devices.d_error, true).await;
650
651                // Log the error and then sleep for 10 seconds.
652                // Hopefully the device will be available then.
653
654                tokio::time::sleep(tokio::time::Duration::from_secs(10)).await
655            }
656        };
657
658        Box::pin(fut)
659    }
660}
661
662#[cfg(test)]
663mod test {
664    use super::{tplink_api, Instance};
665    use crate::BUF_TOTAL;
666    use std::{
667        io::Write,
668        net::{Ipv4Addr, SocketAddrV4},
669    };
670
671    #[tokio::test]
672    async fn test_read_reply() {
673        // Make sure packets with less than 4 bytes causes an error.
674
675        {
676            let buf: &[u8] = &[0, 0, 0];
677            let mut inst = Instance {
678                addr: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0),
679                reported_error: None,
680                buf: [0u8; BUF_TOTAL],
681            };
682
683            assert!(inst.read_reply(&mut &buf[0..=0]).await.is_err());
684            assert!(inst.read_reply(&mut &buf[0..1]).await.is_err());
685            assert!(inst.read_reply(&mut &buf[0..2]).await.is_err());
686            assert!(inst.read_reply(&mut &buf[0..3]).await.is_err());
687        }
688
689        {
690            const REPLY: &[u8] =
691                b"{\"system\":{\"set_led_off\":{\"err_code\":0}}}";
692
693            let mut buf = vec![0, 0, 0, REPLY.len() as u8];
694
695            {
696                let mut wr = tplink_api::CmdWriter::create(&mut buf);
697
698                assert_eq!(wr.write(REPLY).unwrap(), REPLY.len());
699            }
700
701            assert!(buf.len() == 45);
702            assert!(buf.as_slice().len() == 45);
703
704            let mut inst = Instance {
705                addr: SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 0),
706                reported_error: None,
707                buf: [0u8; BUF_TOTAL],
708            };
709
710            assert!(inst.read_reply(&mut &buf[0..4]).await.is_err());
711            assert!(inst.read_reply(&mut &buf[0..5]).await.is_err());
712            assert!(inst.read_reply(&mut buf.as_slice()).await.is_ok());
713        }
714    }
715}