mod actor;
mod decision;
mod listener;
use crate::crypto::TuyaCipher;
use crate::crypto::hex_encode;
use crate::error::{Result, TuyaError};
use crate::protocol::{CommandType, DeviceType, TuyaMessage, Version};
use log::{debug, info, warn};
use parking_lot::RwLock;
use serde::Serialize;
use serde_json::Value;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use actor::DeviceCommand;
pub use listener::{DeviceEvent, unified_listener};
pub(super) const SLEEP_HEARTBEAT_DEFAULT: Duration = Duration::from_secs(7);
pub(super) const SLEEP_HEARTBEAT_CHECK: Duration = Duration::from_secs(5);
pub(super) const SLEEP_RECONNECT_MIN: Duration = Duration::from_secs(16);
pub(super) const SLEEP_RECONNECT_MAX: Duration = Duration::from_secs(4096);
pub(super) const SLEEP_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(30);
pub(super) const SCANNER_BYPASS_BASE_COOLDOWN: Duration = Duration::from_secs(60);
pub(super) const ADDR_AUTO: &str = "Auto";
pub(super) const DATA_UNVALID: &str = "data unvalid";
pub(super) const CHAN_BROADCAST_CAPACITY: usize = 128;
pub(super) const CHAN_MPSC_CAPACITY: usize = 64;
pub(super) const MANDATORY_DATA_CMDS: &[u32] = &[CommandType::LanExtStream as u32];
pub(super) const NO_RESPONSE_CMDS: &[u32] = &[
CommandType::SessKeyNegStart as u32,
CommandType::SessKeyNegResp as u32,
CommandType::SessKeyNegFinish as u32,
CommandType::HeartBeat as u32,
];
pub(super) mod keys {
pub const REQ_TYPE: &str = "reqType";
pub const ERR_CODE: &str = "errorCode";
pub const ERR_MSG: &str = "errorMsg";
pub const ERR_PAYLOAD_OBJ: &str = "errorPayload";
pub const PAYLOAD_STR: &str = "payloadStr";
pub const PAYLOAD_RAW: &str = "payloadRaw";
pub const IN_DATA: &str = "data";
pub const IN_PAYLOAD: &str = "payload";
}
#[derive(Clone)]
pub struct SubDevice {
parent: Device,
cid: String,
}
impl SubDevice {
pub(crate) fn new(parent: Device, cid: &str) -> Self {
Self {
parent,
cid: cid.to_string(),
}
}
#[must_use]
pub fn id(&self) -> &str {
&self.cid
}
pub async fn status(&self) -> Result<Option<String>> {
self.request(CommandType::DpQuery, None).await
}
pub async fn set_dps(&self, dps: Value) -> Result<Option<String>> {
self.request(CommandType::Control, Some(dps)).await
}
pub async fn set_value<I: ToString, T: Serialize>(
&self,
index: I,
value: T,
) -> Result<Option<String>> {
if let Ok(val) = serde_json::to_value(value) {
self.set_dps(serde_json::json!({ index.to_string(): val }))
.await
} else {
Err(TuyaError::InvalidPayload)
}
}
pub async fn request(&self, cmd: CommandType, data: Option<Value>) -> Result<Option<String>> {
self.parent.request(cmd, data, Some(self.cid.clone())).await
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
Disconnected,
Connecting,
Connected,
Stopped,
}
pub(super) struct DeviceState {
pub(super) config_address: String,
pub(super) real_ip: String,
pub(super) version: Version,
pub(super) port: u16,
pub(super) dev_type: DeviceType,
pub(super) state: ConnectionState,
pub(super) last_received: Instant,
pub(super) last_sent: Instant,
pub(super) persist: bool,
pub(super) session_key: Option<Vec<u8>>,
pub(super) failure_count: u32,
pub(super) success_count: u32,
pub(super) force_discovery: bool,
pub(super) timeout: Duration,
pub(super) cipher: Option<Arc<TuyaCipher>>,
pub(super) last_reported_discovered_ip: Option<String>,
pub(super) last_scanner_bypass_at: Option<Instant>,
pub(super) scanner_bypass_failures: u32,
}
pub struct DeviceBuilder {
id: String,
address: String,
local_key: Vec<u8>,
version: Version,
dev_type: DeviceType,
port: u16,
persist: bool,
timeout: Duration,
nowait: bool,
}
impl DeviceBuilder {
pub fn new<I, K>(id: I, local_key: K) -> Self
where
I: Into<String>,
K: Into<Vec<u8>>,
{
Self {
id: id.into(),
address: ADDR_AUTO.to_string(),
local_key: local_key.into(),
version: Version::Auto,
dev_type: DeviceType::Auto,
port: 6668,
persist: true,
timeout: Duration::from_secs(10),
nowait: false,
}
}
pub fn address<A: Into<String>>(mut self, address: A) -> Self {
self.address = address.into();
self
}
pub fn version<V: Into<Version>>(mut self, version: V) -> Self {
self.version = version.into();
self
}
pub fn dev_type<DT: Into<DeviceType>>(mut self, dev_type: DT) -> Self {
self.dev_type = dev_type.into();
self
}
#[must_use]
pub fn port(mut self, port: u16) -> Self {
self.port = port;
self
}
#[must_use]
pub fn persist(mut self, persist: bool) -> Self {
self.persist = persist;
self
}
#[must_use]
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
#[must_use]
pub fn nowait(mut self, nowait: bool) -> Self {
self.nowait = nowait;
self
}
#[must_use]
pub fn build(self) -> Device {
Device::with_builder(self)
}
}
pub(super) struct DeviceInner {
pub(super) id: String,
pub(super) local_key: Vec<u8>,
pub(super) state: RwLock<DeviceState>,
pub(super) broadcast_tx: tokio::sync::broadcast::Sender<TuyaMessage>,
pub(super) cancel_token: CancellationToken,
pub(super) close_notify: tokio::sync::Notify,
pub(super) nowait: AtomicBool,
}
impl Drop for DeviceInner {
fn drop(&mut self) {
self.cancel_token.cancel();
debug!(
"DeviceInner for {} dropped, cancelling connection task",
self.id
);
}
}
#[derive(Clone)]
pub struct Device {
pub(super) inner: Arc<DeviceInner>,
pub(super) tx: Option<mpsc::Sender<DeviceCommand>>,
}
impl Device {
pub fn new<I, K>(id: I, local_key: K) -> Self
where
I: Into<String>,
K: Into<Vec<u8>>,
{
DeviceBuilder::new(id, local_key).build()
}
pub fn builder<I, K>(id: I, local_key: K) -> DeviceBuilder
where
I: Into<String>,
K: Into<Vec<u8>>,
{
DeviceBuilder::new(id, local_key)
}
pub(crate) fn with_builder(builder: DeviceBuilder) -> Self {
let (addr, ip) = match builder.address.as_str() {
"" | ADDR_AUTO => (ADDR_AUTO.to_string(), String::new()),
_ => (builder.address.clone(), builder.address),
};
let (broadcast_tx, _) = tokio::sync::broadcast::channel(CHAN_BROADCAST_CAPACITY);
let (tx, rx) = mpsc::channel(CHAN_MPSC_CAPACITY);
let state = DeviceState {
config_address: addr,
real_ip: ip,
version: builder.version,
port: builder.port,
dev_type: builder.dev_type,
state: ConnectionState::Disconnected,
last_received: Instant::now(),
last_sent: Instant::now(),
persist: builder.persist,
session_key: None,
failure_count: 0,
success_count: 0,
force_discovery: false,
timeout: builder.timeout,
cipher: TuyaCipher::new(&builder.local_key).ok().map(Arc::new),
last_reported_discovered_ip: None,
last_scanner_bypass_at: None,
scanner_bypass_failures: 0,
};
let inner = Arc::new(DeviceInner {
id: builder.id,
local_key: builder.local_key,
state: RwLock::new(state),
broadcast_tx,
cancel_token: CancellationToken::new(),
close_notify: tokio::sync::Notify::new(),
nowait: AtomicBool::new(builder.nowait),
});
let device = Self {
inner: Arc::clone(&inner),
tx: Some(tx),
};
let initial_jitter = actor::record_construction_and_compute_jitter();
let inner_weak = Arc::downgrade(&inner);
let d_id = device.inner.id.clone();
crate::runtime::spawn(async move {
if let Some(inner) = inner_weak.upgrade() {
let cancel_token = inner.cancel_token.clone();
let d_task = Device { inner, tx: None };
tokio::select! {
() = cancel_token.cancelled() => {
debug!("Device {d_id} connection task stopped via token");
}
() = d_task.run_connection_task(rx, initial_jitter) => {
debug!("Device {d_id} connection task finished");
}
}
}
});
device
}
#[must_use]
pub fn id(&self) -> &str {
&self.inner.id
}
#[must_use]
pub fn dev_type(&self) -> DeviceType {
self.with_state(|s| s.dev_type)
}
#[must_use]
pub fn local_key(&self) -> &[u8] {
&self.inner.local_key
}
#[must_use]
pub fn address(&self) -> String {
self.with_state(|s| {
if s.real_ip.is_empty() {
s.config_address.clone()
} else {
s.real_ip.clone()
}
})
}
#[must_use]
pub fn config_address(&self) -> String {
self.with_state(|s| s.config_address.clone())
}
#[must_use]
pub fn version(&self) -> Version {
self.with_state(|s| s.version)
}
#[must_use]
pub fn is_connected(&self) -> bool {
self.with_state(|s| s.state == ConnectionState::Connected)
}
#[must_use]
pub fn is_stopped(&self) -> bool {
self.with_state(|s| s.state == ConnectionState::Stopped)
}
#[must_use]
pub fn timeout(&self) -> Duration {
self.with_state(|s| s.timeout)
}
#[must_use]
pub fn port(&self) -> u16 {
self.with_state(|s| s.port)
}
#[must_use]
pub fn persist(&self) -> bool {
self.with_state(|s| s.persist)
}
#[must_use]
pub fn nowait(&self) -> bool {
self.inner.nowait.load(Ordering::Relaxed)
}
}
impl Device {
pub fn set_persist(&self, persist: bool) {
self.with_state_mut(|s| s.persist = persist);
}
pub fn set_timeout(&self, timeout: Duration) {
self.with_state_mut(|s| s.timeout = timeout);
}
pub fn set_port(&self, port: u16) {
self.with_state_mut(|s| s.port = port);
}
pub fn set_nowait(&self, nowait: bool) {
self.inner.nowait.store(nowait, Ordering::Relaxed);
}
pub fn set_version<V: Into<Version>>(&self, version: V) {
let ver = version.into();
self.with_state_mut(|s| {
s.version = ver;
});
}
pub fn set_dev_type<DT: Into<DeviceType>>(&self, dev_type: DT) {
self.with_state_mut(|s| s.dev_type = dev_type.into());
}
pub fn set_address<A: Into<String>>(&self, address: A) {
let addr = address.into();
self.with_state_mut(|s| {
s.config_address = addr;
s.force_discovery = true; });
}
}
impl Device {
pub async fn status(&self) -> Result<Option<String>> {
self.request(CommandType::DpQuery, None, None).await
}
pub async fn set_dps(&self, dps: Value) -> Result<Option<String>> {
self.request(CommandType::Control, Some(dps), None).await
}
pub async fn set_value<I: ToString, T: Serialize>(
&self,
dp_id: I,
value: T,
) -> Result<Option<String>> {
if let Ok(val) = serde_json::to_value(value) {
self.set_dps(serde_json::json!({ dp_id.to_string(): val }))
.await
} else {
Err(TuyaError::InvalidPayload)
}
}
pub async fn sub_discover(&self) -> Result<Option<String>> {
let data = serde_json::json!({
"cids": [],
keys::REQ_TYPE: "subdev_online_stat_query"
});
self.request(CommandType::LanExtStream, Some(data), None)
.await
}
pub async fn receive(&self) -> Result<TuyaMessage> {
use tokio::sync::broadcast::error::RecvError;
let mut rx = self.inner.broadcast_tx.subscribe();
loop {
match rx.recv().await {
Ok(msg) => {
if !msg.payload.is_empty() {
return Ok(msg);
}
}
Err(RecvError::Lagged(skipped)) => {
warn!(
"Device {} receive() lagged, skipped {} broadcast messages",
self.inner.id, skipped
);
return Err(TuyaError::BroadcastLagged { skipped });
}
Err(RecvError::Closed) => return Err(TuyaError::Offline),
}
}
}
#[must_use]
pub fn sub(&self, cid: &str) -> SubDevice {
SubDevice::new(self.clone(), cid)
}
pub async fn request(
&self,
command: CommandType,
data: Option<Value>,
cid: Option<String>,
) -> Result<Option<String>> {
debug!("request: cmd={command:?}, data={data:?}");
let resp = self
.send_command_to_task(|resp_tx| DeviceCommand::Request {
command,
data,
cid,
resp_tx,
})
.await?;
match resp {
Some(msg) => {
if let Some(s) = msg.payload_as_string() {
Ok(Some(s))
} else {
Ok(Some(hex_encode(&msg.payload)))
}
}
None => Ok(None),
}
}
}
impl Device {
pub async fn close(&self) {
self.fire_close();
}
pub async fn stop(&self) {
self.fire_stop();
}
pub fn fire_close(&self) {
info!("Closing connection to device {}", self.inner.id);
self.with_state_mut(|state| {
if state.state != ConnectionState::Stopped {
state.state = ConnectionState::Disconnected;
}
});
self.inner.close_notify.notify_waiters();
}
pub fn fire_stop(&self) {
info!("Stopping device {} (explicit stop called)", self.inner.id);
self.with_state_mut(|state| {
state.state = ConnectionState::Stopped;
});
self.inner.close_notify.notify_waiters();
self.inner.cancel_token.cancel();
}
pub async fn connect_now(&self) {
self.send_to_task(DeviceCommand::ConnectNow).await;
}
}