use crate::crypto::TuyaCipher;
use crate::error::{Result, TuyaError};
use crate::protocol::{self, CommandType, PREFIX_6699, TuyaMessage, Version};
use log::{debug, info, trace, warn};
use parking_lot::RwLock;
use serde_json::Value;
use socket2::{Domain, Protocol, SockAddr, Socket, Type};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use tokio::net::UdpSocket;
use tokio::sync::{Notify, mpsc};
use tokio::time::{interval, sleep, timeout};
use serde::Serialize;
#[derive(Debug, Clone, PartialEq, Serialize)]
pub struct DiscoveryResult {
pub id: String,
pub ip: String,
pub version: Option<Version>,
pub product_key: Option<String>,
#[serde(skip)]
pub discovered_at: Instant,
}
impl DiscoveryResult {
#[must_use]
pub fn is_same_device(&self, other: &Self) -> bool {
self.id == other.id
&& self.ip == other.ip
&& self.version == other.version
&& self.product_key == other.product_key
}
}
const UDP_KEY_34: &[u8] = &[
0x6c, 0x1e, 0xc8, 0xe2, 0xbb, 0x9b, 0xb5, 0x9a, 0xb5, 0x0b, 0x0d, 0xaf, 0x64, 0x9b, 0x41, 0x0a,
];
const UDP_KEY_35: &[u8] = UDP_KEY_34;
const UDP_KEY_33: &[u8] = b"yG9shRKIBrIBUjc3";
const BROADCAST_INTERVAL: Duration = Duration::from_secs(6);
const GLOBAL_SCAN_COOLDOWN: Duration = Duration::from_secs(1800); const SCAN_THROTTLE_INTERVAL: Duration = Duration::from_secs(60); const DEFAULT_SCAN_TIMEOUT: Duration = Duration::from_secs(18); const CACHE_TTL: Duration = Duration::from_secs(24 * 60 * 60);
#[derive(Debug)]
struct ScannerState {
cache: RwLock<HashMap<String, DiscoveryResult>>,
notify: Notify,
active_scanning: AtomicBool,
last_scan_time: RwLock<Option<Instant>>,
listener_started: AtomicBool,
cancel_token: tokio_util::sync::CancellationToken,
sockets: RwLock<HashMap<u16, Arc<UdpSocket>>>,
receiver_tasks: RwLock<Vec<tokio::task::JoinHandle<()>>>,
}
impl ScannerState {
fn new() -> Self {
Self {
cache: RwLock::new(HashMap::new()),
notify: Notify::new(),
active_scanning: AtomicBool::new(false),
last_scan_time: RwLock::new(None),
listener_started: AtomicBool::new(false),
cancel_token: tokio_util::sync::CancellationToken::new(),
sockets: RwLock::new(HashMap::new()),
receiver_tasks: RwLock::new(Vec::new()),
}
}
}
impl Drop for ScannerState {
fn drop(&mut self) {
self.cancel_token.cancel();
for task in self.receiver_tasks.write().drain(..) {
task.abort();
}
}
}
#[derive(Debug, Clone)]
pub struct Scanner {
inner: Arc<ScannerState>,
pub timeout: Duration,
pub bind_addr: String,
pub ports: Vec<u16>,
}
impl Default for Scanner {
fn default() -> Self {
Self::new()
}
}
static GLOBAL_SCANNER: OnceLock<Scanner> = OnceLock::new();
pub fn get() -> &'static Scanner {
GLOBAL_SCANNER.get_or_init(Scanner::new)
}
pub fn builder() -> ScannerBuilder {
ScannerBuilder::new()
}
type ReceiverResult = (
mpsc::Receiver<(Vec<u8>, SocketAddr)>,
Vec<tokio::task::JoinHandle<()>>,
);
impl Scanner {
pub fn get() -> &'static Self {
get()
}
#[must_use]
pub(crate) fn new() -> Self {
let scanner = Self {
inner: Arc::new(ScannerState::new()),
timeout: DEFAULT_SCAN_TIMEOUT,
bind_addr: "0.0.0.0".to_string(),
ports: vec![6666, 6667, 7000],
};
scanner.ensure_passive_listener();
scanner
}
fn ensure_passive_listener(&self) {
let state = &self.inner;
let mut ports_to_add = Vec::new();
{
let guard = state.sockets.read();
for &port in &self.ports {
if !guard.contains_key(&port) {
ports_to_add.push(port);
}
}
}
if ports_to_add.is_empty() && state.listener_started.load(Ordering::SeqCst) {
return;
}
let bind_addr = self.bind_addr.clone();
let mut new_sockets = Vec::new();
{
let mut guard = state.sockets.write();
for port in ports_to_add {
if let Ok(socket) = Self::create_udp_socket(&bind_addr, port) {
let arc_socket = Arc::new(socket);
guard.insert(port, arc_socket.clone());
new_sockets.push(arc_socket);
}
}
}
if new_sockets.is_empty() && state.listener_started.load(Ordering::SeqCst) {
return;
}
if new_sockets.is_empty() {
warn!(
"Passive listener failed to bind to any ports: {:?}",
self.ports
);
return;
}
if !state.listener_started.swap(true, Ordering::SeqCst) {
let cancel_token = state.cancel_token.clone();
let state_weak = Arc::downgrade(&self.inner);
let (mut rx, tasks) = Self::spawn_receiver_tasks(new_sockets, cancel_token.clone());
{
let mut guard = state.receiver_tasks.write();
guard.extend(tasks);
}
crate::runtime::spawn(async move {
debug!("Starting background passive listener task...");
loop {
tokio::select! {
() = cancel_token.cancelled() => break,
Some((data, _addr)) = rx.recv() => {
let state = match state_weak.upgrade() {
Some(s) => s,
None => break,
};
if let Some(res) = parse_packet(&data) {
let mut guard = state.cache.write();
guard.retain(|_, v| v.discovered_at.elapsed() < CACHE_TTL);
let should_log = match guard.get(&res.id) {
Some(existing) => !res.is_same_device(existing),
None => true,
};
if should_log {
let mode = if state.active_scanning.load(Ordering::SeqCst) { "A" } else { "P" };
let version = res.version.map_or_else(|| "unknown".to_string(), |v| v.to_string());
info!("Discovered device {}(v{}) at {} - {}", res.id, version, res.ip, mode);
}
guard.insert(res.id.clone(), res.clone());
state.notify.notify_waiters();
}
}
}
}
debug!("Background passive listener task stopped");
});
}
}
fn spawn_receiver_tasks(
sockets: Vec<Arc<UdpSocket>>,
cancel_token: tokio_util::sync::CancellationToken,
) -> ReceiverResult {
let (tx, rx) = mpsc::channel::<(Vec<u8>, SocketAddr)>(100);
let mut tasks = Vec::new();
for socket in sockets {
let tx = tx.clone();
let socket = socket.clone();
let ct = cancel_token.clone();
let task = crate::runtime::spawn(async move {
let mut buf = vec![0u8; 4096];
loop {
tokio::select! {
() = ct.cancelled() => break,
res = socket.recv_from(&mut buf) => {
match res {
Ok((len, addr)) => {
if tx.send((buf[..len].to_vec(), addr)).await.is_err() {
break;
}
}
Err(_) => break,
}
}
}
}
});
tasks.push(task);
}
(rx, tasks)
}
fn create_udp_socket(bind_addr: &str, port: u16) -> Result<UdpSocket> {
let addr: SocketAddr = format!("{bind_addr}:{port}")
.parse()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))?;
let socket = Socket::new(Domain::for_address(addr), Type::DGRAM, Some(Protocol::UDP))?;
let _ = socket.set_reuse_address(true);
let _ = socket.set_broadcast(true);
socket.bind(&SockAddr::from(addr))?;
socket.set_nonblocking(true)?;
let std_socket: std::net::UdpSocket = socket.into();
let _guard = crate::runtime::get_runtime().enter();
Ok(UdpSocket::from_std(std_socket)?)
}
pub fn stop_passive_listener(&self) {
self.inner.cancel_token.cancel();
self.inner.listener_started.store(false, Ordering::SeqCst);
self.inner.sockets.write().clear();
}
pub fn set_timeout(&mut self, timeout: Duration) {
self.timeout = timeout;
}
pub fn set_ports(&mut self, ports: Vec<u16>) {
self.ports = ports;
self.ensure_passive_listener();
}
pub fn set_bind_address(&mut self, addr: &str) -> Result<()> {
self.bind_addr = addr.to_string();
self.ensure_passive_listener();
Ok(())
}
#[must_use]
pub fn with_timeout(&self, timeout: Duration) -> Self {
let mut s = self.clone();
s.set_timeout(timeout);
s
}
#[must_use]
pub fn with_ports(&self, ports: Vec<u16>) -> Self {
let mut s = self.clone();
s.set_ports(ports);
s
}
#[must_use]
pub fn with_bind_addr(&self, addr: String) -> Self {
let mut s = self.clone();
let _ = s.set_bind_address(&addr);
s
}
pub(crate) fn notified(&self) -> tokio::sync::futures::Notified<'_> {
self.inner.notify.notified()
}
#[must_use]
pub fn get_cached_result(&self, device_id: &str) -> Option<DiscoveryResult> {
let guard = self.inner.cache.read();
guard.get(device_id).cloned()
}
#[must_use]
pub fn is_recently_discovered(&self, device_id: &str, within: Duration) -> bool {
let guard = self.inner.cache.read();
if let Some(res) = guard.get(device_id) {
return res.discovered_at.elapsed() < within;
}
false
}
fn get_local_ip(&self) -> Option<String> {
let socket = std::net::UdpSocket::bind("0.0.0.0:0").ok()?;
socket.connect("8.8.8.8:80").ok()?;
socket.local_addr().ok().map(|addr| addr.ip().to_string())
}
async fn send_discovery_broadcast(&self, socket: &UdpSocket, port: u16) -> Result<()> {
let local_ip = self.get_local_ip().unwrap_or_else(|| "0.0.0.0".to_string());
debug!("Sending discovery broadcast on port {port} (local IP: {local_ip})");
let (payload, prefix) = if port == 7000 {
(
serde_json::json!({
"from": "app",
"ip": local_ip,
}),
PREFIX_6699,
)
} else {
(
serde_json::json!({
"gwId": "",
"devId": "",
}),
protocol::PREFIX_55AA,
)
};
let msg = TuyaMessage {
seqno: 0,
cmd: if port == 7000 {
CommandType::ReqDevInfo as u32
} else {
CommandType::UdpNew as u32
},
retcode: None,
payload: serde_json::to_vec(&payload)?,
prefix,
iv: None,
};
let packed =
protocol::pack_message(&msg, if port == 7000 { Some(UDP_KEY_35) } else { None })?;
let broadcast_addr: SocketAddr = format!("255.255.255.255:{port}")
.parse()
.map_err(|_| TuyaError::Offline)?;
match socket.send_to(&packed, broadcast_addr).await {
Ok(len) => debug!("Sent discovery broadcast to {broadcast_addr}: {len} bytes"),
Err(e) => warn!("Failed to send discovery broadcast to {broadcast_addr}: {e}"),
}
Ok(())
}
pub fn scan_stream() -> impl futures_util::Stream<Item = DiscoveryResult> + Send + 'static {
Self::get().scan_stream_instance()
}
pub fn scan_stream_instance(
&self,
) -> impl futures_util::Stream<Item = DiscoveryResult> + Send + 'static {
let state = self.inner.clone();
let timeout_dur = self.timeout;
let start_time = Instant::now();
let scanner = self.clone();
let should_start = !state.active_scanning.load(Ordering::SeqCst) && {
let last_scan = state.last_scan_time.read();
last_scan.is_none_or(|t| t.elapsed() >= GLOBAL_SCAN_COOLDOWN)
};
if should_start {
state.active_scanning.store(true, Ordering::SeqCst);
*state.last_scan_time.write() = Some(Instant::now());
let state_clone = state.clone();
crate::runtime::spawn(async move {
let _ = scanner.perform_discovery_loop().await;
state_clone.active_scanning.store(false, Ordering::SeqCst);
state_clone.notify.notify_waiters();
});
}
async_stream::stream! {
let mut yielded_ids = std::collections::HashSet::new();
let initial_items: Vec<_> = {
let guard = state.cache.read();
guard.values().cloned().collect()
};
for item in initial_items {
yielded_ids.insert(item.id.clone());
yield item;
}
loop {
let elapsed = start_time.elapsed();
if elapsed >= timeout_dur {
break;
}
let remaining = timeout_dur.saturating_sub(elapsed);
tokio::select! {
() = sleep(remaining) => break,
() = state.notify.notified() => {
let new_items: Vec<_> = {
let guard = state.cache.read();
guard.values()
.filter(|v| !yielded_ids.contains(&v.id))
.cloned()
.collect()
};
for item in new_items {
yielded_ids.insert(item.id.clone());
yield item;
}
if !state.active_scanning.load(Ordering::SeqCst) {
let final_items: Vec<_> = {
let guard = state.cache.read();
guard.values()
.filter(|v| !yielded_ids.contains(&v.id))
.cloned()
.collect()
};
for item in final_items {
yield item;
}
break;
}
}
}
}
}
}
pub async fn scan() -> Result<Vec<DiscoveryResult>> {
Self::get().scan_instance().await
}
pub async fn scan_instance(&self) -> Result<Vec<DiscoveryResult>> {
use futures_util::StreamExt;
info!(
"Starting Tuya device scan (addr: {}, ports: {:?})...",
self.bind_addr, self.ports
);
let results: Vec<_> = self.scan_stream_instance().collect().await;
info!("Scan finished. Found {} devices.", results.len());
Ok(results)
}
pub async fn discover_device(device_id: &str) -> Result<Option<DiscoveryResult>> {
Self::get().discover_device_instance(device_id).await
}
pub async fn discover_device_instance(
&self,
device_id: &str,
) -> Result<Option<DiscoveryResult>> {
self.discover_device_internal(device_id, false, None).await
}
pub async fn discover_device_internal(
&self,
device_id: &str,
force_scan: bool,
cancel: Option<&tokio_util::sync::CancellationToken>,
) -> Result<Option<DiscoveryResult>> {
if let Some(res) = self.check_cache_and_cooldown(device_id, force_scan) {
return Ok(Some(res));
}
self.ensure_scan_started(device_id, force_scan).await;
Ok(self.wait_for_cache_result(device_id, cancel).await)
}
fn check_cache_and_cooldown(
&self,
device_id: &str,
force_scan: bool,
) -> Option<DiscoveryResult> {
let state = &self.inner;
let guard = state.cache.read();
if let Some(res) = guard.get(device_id).cloned()
&& !force_scan
&& res.discovered_at.elapsed() < GLOBAL_SCAN_COOLDOWN
{
debug!("Found device {device_id} in discovery cache");
return Some(res);
}
if !force_scan
&& let Some(last) = *state.last_scan_time.read()
&& last.elapsed() < GLOBAL_SCAN_COOLDOWN
&& let Some(res) = guard.get(device_id).cloned()
{
debug!("Global scan cooldown active (30m). Returning cached result for {device_id}.");
return Some(res);
}
None
}
async fn ensure_scan_started(&self, device_id: &str, force_scan: bool) {
let state = self.inner.clone();
let can_scan = {
let last_scan = *state.last_scan_time.read();
match last_scan {
Some(last) if !force_scan && last.elapsed() < SCAN_THROTTLE_INTERVAL => false,
_ => !state.active_scanning.swap(true, Ordering::SeqCst),
}
};
if can_scan {
info!("Initiating background scan for device ID: {device_id}...");
*state.last_scan_time.write() = Some(Instant::now());
let scanner = self.clone();
crate::runtime::spawn(async move {
let _ = scanner.perform_discovery_loop().await;
state.active_scanning.store(false, Ordering::SeqCst);
state.notify.notify_waiters();
});
}
}
async fn wait_for_cache_result(
&self,
device_id: &str,
cancel: Option<&tokio_util::sync::CancellationToken>,
) -> Option<DiscoveryResult> {
let state = &self.inner;
let start_wait = Instant::now();
loop {
if let Some(res) = state.cache.read().get(device_id).cloned() {
return Some(res);
}
let elapsed = start_wait.elapsed();
if elapsed >= self.timeout || !state.active_scanning.load(Ordering::SeqCst) {
return state.cache.read().get(device_id).cloned();
}
let remaining = self.timeout.saturating_sub(elapsed);
let notified = state.notify.notified();
tokio::pin!(notified);
if let Some(ct) = cancel {
tokio::select! {
_ = ct.cancelled() => return None,
_ = sleep(remaining) => {}
_ = &mut notified => {}
}
} else {
let _ = timeout(remaining, &mut notified).await;
}
}
}
async fn perform_discovery_loop(self) -> Result<()> {
let state = &self.inner;
let mut target_sockets = Vec::new();
{
let guard = state.sockets.read();
for &port in &self.ports {
if let Some(socket) = guard.get(&port) {
target_sockets.push((socket.clone(), port));
}
}
}
if target_sockets.is_empty() {
self.ensure_passive_listener();
let guard = state.sockets.read();
for &port in &self.ports {
if let Some(socket) = guard.get(&port) {
target_sockets.push((socket.clone(), port));
}
}
}
if target_sockets.is_empty() {
return Err(std::io::Error::other("No available ports for scanning").into());
}
let start = Instant::now();
let mut broadcast_interval = interval(BROADCAST_INTERVAL);
let mut broadcast_count = 0;
while start.elapsed() < self.timeout {
let remaining = self.timeout.saturating_sub(start.elapsed());
if remaining.is_zero() {
break;
}
tokio::select! {
() = sleep(remaining) => break,
_ = broadcast_interval.tick() => {
if broadcast_count < 3 {
broadcast_count += 1;
debug!("Sent broadcast {broadcast_count}/3");
for (socket, port) in &target_sockets {
let _ = self.send_discovery_broadcast(socket, *port).await;
}
}
}
}
}
Ok(())
}
#[must_use]
pub fn invalidate_cache(&self, id: &str) -> bool {
let mut guard = self.inner.cache.write();
guard.remove(id).is_some()
}
}
const UDP_TRY_KEYS: [Option<&[u8]>; 4] =
[Some(UDP_KEY_35), Some(UDP_KEY_34), Some(UDP_KEY_33), None];
const UDP_TRY_RETCODES: [Option<bool>; 3] = [Some(true), Some(false), None];
fn parse_packet(data: &[u8]) -> Option<DiscoveryResult> {
trace!("Parsing UDP packet of {} bytes...", data.len());
if let Ok(val) = serde_json::from_slice::<Value>(data) {
trace!("Successfully parsed raw JSON packet");
return parse_json(&val);
}
for key in UDP_TRY_KEYS {
for no_retcode in UDP_TRY_RETCODES {
match protocol::unpack_message(data, key, None, no_retcode) {
Ok(msg) => {
if msg.payload.is_empty() {
continue;
}
if let Ok(val) = serde_json::from_slice::<Value>(&msg.payload) {
trace!("Successfully parsed JSON from Tuya message payload");
return parse_json(&val);
}
let keys_to_try: Vec<&[u8]> = match key {
Some(k) => vec![k],
None => vec![UDP_KEY_33, UDP_KEY_34, UDP_KEY_35],
};
for k in keys_to_try {
if let Ok(cipher) = TuyaCipher::new(k)
&& let Ok(decrypted) =
cipher.decrypt(&msg.payload, false, None, None, None)
&& let Ok(val) = serde_json::from_slice::<Value>(&decrypted)
{
trace!(
"Successfully decrypted and parsed JSON from Tuya message payload"
);
return parse_json(&val);
}
}
}
Err(e) => {
if !matches!(
e,
crate::error::TuyaError::DecodeError(_)
| crate::error::TuyaError::HmacMismatch
| crate::error::TuyaError::CrcMismatch
| crate::error::TuyaError::InvalidHeader
) {
trace!(
"unpack_message failed with key {:?}: {e}",
key.map(hex::encode),
);
}
}
}
}
}
for key in &[UDP_KEY_33, UDP_KEY_34] {
if let Ok(cipher) = TuyaCipher::new(key)
&& let Ok(decrypted) = cipher.decrypt(data, false, None, None, None)
&& let Ok(val) = serde_json::from_slice::<Value>(&decrypted)
{
trace!("Successfully decrypted and parsed JSON from entire packet");
return parse_json(&val);
}
}
if let Some(pos) = data.iter().position(|&b| b == b'{')
&& let Ok(val) = serde_json::from_slice::<Value>(&data[pos..])
{
trace!("Successfully found and parsed JSON from middle of packet");
return parse_json(&val);
}
trace!("Failed to parse UDP packet");
None
}
fn parse_json(val: &Value) -> Option<DiscoveryResult> {
let id = val
.get("gwId")
.or_else(|| val.get("devId"))
.or_else(|| val.get("id"))
.and_then(|v| v.as_str())?;
let ip = val.get("ip").and_then(|v| v.as_str())?;
let ver_s = val.get("version").and_then(|v| v.as_str());
let pk = val.get("productKey").and_then(|v| v.as_str());
Some(DiscoveryResult {
id: id.to_string(),
ip: ip.to_string(),
version: ver_s.and_then(|s| Version::from_str(s).ok()),
product_key: pk.map(std::string::ToString::to_string),
discovered_at: Instant::now(),
})
}
#[derive(Debug, Default)]
pub struct ScannerBuilder {
timeout: Option<Duration>,
bind_addr: Option<String>,
ports: Option<Vec<u16>>,
}
impl ScannerBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn bind_addr<S: Into<String>>(mut self, addr: S) -> Self {
self.bind_addr = Some(addr.into());
self
}
pub fn ports(mut self, ports: Vec<u16>) -> Self {
self.ports = Some(ports);
self
}
pub fn build(self) -> Scanner {
let scanner = Scanner {
inner: Arc::new(ScannerState::new()),
timeout: self.timeout.unwrap_or(DEFAULT_SCAN_TIMEOUT),
bind_addr: self.bind_addr.unwrap_or_else(|| "0.0.0.0".to_string()),
ports: self.ports.unwrap_or_else(|| vec![6666, 6667, 7000]),
};
scanner.ensure_passive_listener();
scanner
}
}