use crate::device::SubDevice as AsyncSubDevice;
use crate::device::{
Device as AsyncDevice, DeviceBuilder as AsyncDeviceBuilder, DeviceEvent,
unified_listener as async_unified_listener,
};
use crate::error::Result;
use crate::protocol::{TuyaMessage, Version};
use crate::runtime::{self, get_runtime};
use crate::scanner::{DiscoveryResult, Scanner as AsyncScanner, get as get_async_scanner};
use futures_util::StreamExt;
use log::warn;
use serde::Serialize;
use serde_json::Value;
use std::sync::OnceLock;
use std::sync::mpsc::TrySendError;
use std::time::Duration;
use tokio::sync::mpsc;
const CHAN_SYNC_CAPACITY: usize = 128;
const CHAN_WORKER_COMMAND_CAPACITY: usize = 32;
const CHAN_UNIFIED_CAPACITY: usize = 2048;
#[doc(hidden)]
pub mod internal {
use super::Result;
use super::get_runtime;
use serde_json::Value;
pub fn get_sync_runtime() -> &'static tokio::runtime::Runtime {
get_runtime()
}
pub struct SyncRequest<C, R = Option<String>> {
pub command: C,
pub resp_tx: std::sync::mpsc::Sender<Result<R>>,
}
#[derive(Debug)]
pub enum DeviceCommand {
Status,
SetDps(Value),
SetValue(String, Value),
Request {
command: crate::protocol::CommandType,
data: Option<Value>,
cid: Option<String>,
},
SubDiscover,
Close,
Stop,
}
#[derive(Debug)]
pub enum SubDeviceCommand {
Status,
SetDps(Value),
SetValue(String, Value),
Request {
command: crate::protocol::CommandType,
data: Option<Value>,
},
}
}
use internal::{DeviceCommand, SubDeviceCommand, SyncRequest};
#[inline]
fn check_no_runtime_context() -> Result<()> {
if tokio::runtime::Handle::try_current().is_ok() {
return Err(crate::error::TuyaError::io_other(
"rustuya sync API called from inside a tokio runtime — \
use rustuya::Device (async) instead",
));
}
Ok(())
}
fn send_sync<C, R>(tx: &mpsc::Sender<SyncRequest<C, R>>, command: C) -> Result<R> {
check_no_runtime_context()?;
let (resp_tx, resp_rx) = std::sync::mpsc::channel();
if tx.blocking_send(SyncRequest { command, resp_tx }).is_err() {
return Err(crate::error::TuyaError::io_other("Worker died"));
}
resp_rx
.recv()
.map_err(|_| crate::error::TuyaError::io_other("Worker died"))?
}
fn wait_for_response<C, R, F>(tx: &mpsc::Sender<C>, build: F) -> Result<R>
where
F: FnOnce(std::sync::mpsc::Sender<R>) -> C,
{
check_no_runtime_context()?;
let (resp_tx, resp_rx) = std::sync::mpsc::channel::<R>();
if tx.blocking_send(build(resp_tx)).is_err() {
return Err(crate::error::TuyaError::io_other("Worker died"));
}
resp_rx
.recv()
.map_err(|_| crate::error::TuyaError::io_other("Worker died"))
}
#[derive(Clone)]
pub struct Device {
#[doc(hidden)]
pub inner: AsyncDevice,
#[doc(hidden)]
pub cmd_tx: mpsc::Sender<SyncRequest<DeviceCommand>>,
}
impl Device {
pub fn new<I, K>(id: I, local_key: K) -> Self
where
I: Into<String>,
K: Into<Vec<u8>>,
{
Self::from_async(AsyncDevice::new(id, local_key))
}
pub fn builder<I, K>(id: I, local_key: K) -> DeviceBuilder
where
I: Into<String>,
K: Into<Vec<u8>>,
{
DeviceBuilder::new(id, local_key)
}
pub(crate) fn from_async(device: AsyncDevice) -> Self {
let (tx, mut rx) =
mpsc::channel::<SyncRequest<DeviceCommand>>(CHAN_WORKER_COMMAND_CAPACITY);
let inner_clone = device.clone();
runtime::spawn(async move {
while let Some(req) = rx.recv().await {
let res = match req.command {
DeviceCommand::Status => inner_clone.status().await,
DeviceCommand::SetDps(dps) => inner_clone.set_dps(dps).await,
DeviceCommand::SetValue(dp_id, value) => {
inner_clone.set_value(dp_id, value).await
}
DeviceCommand::Request { command, data, cid } => {
inner_clone.request(command, data, cid).await
}
DeviceCommand::SubDiscover => inner_clone.sub_discover().await,
DeviceCommand::Close => {
inner_clone.close().await;
Ok(None)
}
DeviceCommand::Stop => {
inner_clone.stop().await;
Ok(None)
}
};
let _ = req.resp_tx.send(res);
}
});
Self {
inner: device,
cmd_tx: tx,
}
}
pub fn id(&self) -> &str {
self.inner.id()
}
pub fn status(&self) -> Result<Option<String>> {
send_sync(&self.cmd_tx, DeviceCommand::Status)
}
pub fn set_dps(&self, dps: Value) -> Result<Option<String>> {
send_sync(&self.cmd_tx, DeviceCommand::SetDps(dps))
}
pub fn set_value<I: ToString, T: Serialize>(
&self,
dp_id: I,
value: T,
) -> Result<Option<String>> {
if let Ok(val) = serde_json::to_value(value) {
send_sync(
&self.cmd_tx,
DeviceCommand::SetValue(dp_id.to_string(), val),
)
} else {
Err(crate::error::TuyaError::InvalidPayload)
}
}
pub fn request(
&self,
cmd: crate::protocol::CommandType,
data: Option<Value>,
cid: Option<String>,
) -> Result<Option<String>> {
send_sync(
&self.cmd_tx,
DeviceCommand::Request {
command: cmd,
data,
cid,
},
)
}
pub fn sub_discover(&self) -> Result<Option<String>> {
send_sync(&self.cmd_tx, DeviceCommand::SubDiscover)
}
pub fn sub(&self, cid: &str) -> SubDevice {
SubDevice::new(self.inner.sub(cid))
}
pub fn close(&self) {
self.inner.fire_close();
}
pub fn stop(&self) {
self.inner.fire_stop();
}
pub fn as_async(&self) -> &AsyncDevice {
&self.inner
}
pub fn dev_type(&self) -> crate::protocol::DeviceType {
self.inner.dev_type()
}
pub fn local_key(&self) -> &[u8] {
self.inner.local_key()
}
pub fn address(&self) -> String {
self.inner.address()
}
pub fn config_address(&self) -> String {
self.inner.config_address()
}
pub fn version(&self) -> Version {
self.inner.version()
}
pub fn is_connected(&self) -> bool {
self.inner.is_connected()
}
pub fn is_stopped(&self) -> bool {
self.inner.is_stopped()
}
pub fn timeout(&self) -> std::time::Duration {
self.inner.timeout()
}
pub fn port(&self) -> u16 {
self.inner.port()
}
pub fn persist(&self) -> bool {
self.inner.persist()
}
pub fn nowait(&self) -> bool {
self.inner.nowait()
}
pub fn set_persist(&self, persist: bool) {
self.inner.set_persist(persist);
}
pub fn set_timeout(&self, timeout: std::time::Duration) {
self.inner.set_timeout(timeout);
}
pub fn set_port(&self, port: u16) {
self.inner.set_port(port);
}
pub fn set_nowait(&self, nowait: bool) {
self.inner.set_nowait(nowait);
}
pub fn set_version<V: Into<Version>>(&self, version: V) {
self.inner.set_version(version);
}
pub fn set_dev_type<DT: Into<crate::protocol::DeviceType>>(&self, dev_type: DT) {
self.inner.set_dev_type(dev_type);
}
pub fn set_address<A: Into<String>>(&self, address: A) {
self.inner.set_address(address);
}
pub fn connect_now(&self) {
self.inner.fire_close();
}
pub fn listener(&self) -> std::sync::mpsc::Receiver<Result<TuyaMessage>> {
let (tx, rx) = std::sync::mpsc::sync_channel(CHAN_SYNC_CAPACITY);
let stream = self.inner.listener();
let device_id = self.inner.id().to_string();
runtime::spawn(async move {
tokio::pin!(stream);
let messages = stream;
tokio::pin!(messages);
bridge_to_sync(messages, tx, move |dropped| {
warn!(
"Sync listener for device {device_id} resumed after dropping {dropped} buffered messages"
);
})
.await;
});
rx
}
}
pub struct DeviceBuilder {
inner: AsyncDeviceBuilder,
}
impl DeviceBuilder {
pub fn new<I, K>(id: I, local_key: K) -> Self
where
I: Into<String>,
K: Into<Vec<u8>>,
{
Self {
inner: AsyncDeviceBuilder::new(id, local_key),
}
}
pub fn address<A: Into<String>>(mut self, address: A) -> Self {
self.inner = self.inner.address(address);
self
}
pub fn version<V: Into<Version>>(mut self, version: V) -> Self {
self.inner = self.inner.version(version);
self
}
pub fn dev_type<D: Into<crate::protocol::DeviceType>>(mut self, dev_type: D) -> Self {
self.inner = self.inner.dev_type(dev_type);
self
}
pub fn port(mut self, port: u16) -> Self {
self.inner = self.inner.port(port);
self
}
pub fn persist(mut self, persist: bool) -> Self {
self.inner = self.inner.persist(persist);
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.inner = self.inner.timeout(timeout);
self
}
pub fn nowait(mut self, nowait: bool) -> Self {
self.inner = self.inner.nowait(nowait);
self
}
pub fn build(self) -> Device {
Device::from_async(self.inner.build())
}
}
#[derive(Clone)]
pub struct SubDevice {
#[doc(hidden)]
pub inner: AsyncSubDevice,
#[doc(hidden)]
pub cmd_tx: mpsc::Sender<SyncRequest<SubDeviceCommand>>,
}
impl SubDevice {
pub(crate) fn new(inner: AsyncSubDevice) -> Self {
let (tx, mut rx) =
mpsc::channel::<SyncRequest<SubDeviceCommand>>(CHAN_WORKER_COMMAND_CAPACITY);
let inner_clone = inner.clone();
runtime::spawn(async move {
while let Some(req) = rx.recv().await {
let res = match req.command {
SubDeviceCommand::Status => inner_clone.status().await,
SubDeviceCommand::SetDps(dps) => inner_clone.set_dps(dps).await,
SubDeviceCommand::SetValue(index, value) => {
inner_clone.set_value(index, value).await
}
SubDeviceCommand::Request { command, data } => {
inner_clone.request(command, data).await
}
};
let _ = req.resp_tx.send(res);
}
});
Self { inner, cmd_tx: tx }
}
pub fn id(&self) -> &str {
self.inner.id()
}
pub fn status(&self) -> Result<Option<String>> {
send_sync(&self.cmd_tx, SubDeviceCommand::Status)
}
pub fn set_dps(&self, dps: Value) -> Result<Option<String>> {
send_sync(&self.cmd_tx, SubDeviceCommand::SetDps(dps))
}
pub fn set_value<I: ToString, T: Serialize>(
&self,
index: I,
value: T,
) -> Result<Option<String>> {
if let Ok(val) = serde_json::to_value(value) {
send_sync(
&self.cmd_tx,
SubDeviceCommand::SetValue(index.to_string(), val),
)
} else {
Err(crate::error::TuyaError::InvalidPayload)
}
}
pub fn request(
&self,
cmd: crate::protocol::CommandType,
data: Option<Value>,
) -> Result<Option<String>> {
send_sync(
&self.cmd_tx,
SubDeviceCommand::Request { command: cmd, data },
)
}
pub fn as_async(&self) -> &AsyncSubDevice {
&self.inner
}
}
enum ScannerCommand {
Scan(std::sync::mpsc::Sender<Result<Vec<DiscoveryResult>>>),
Discover(String, std::sync::mpsc::Sender<Option<DiscoveryResult>>),
}
#[derive(Clone)]
pub struct Scanner {
inner: AsyncScanner,
cmd_tx: mpsc::Sender<ScannerCommand>,
}
static SYNC_SCANNER: OnceLock<Scanner> = OnceLock::new();
impl Scanner {
pub fn get() -> &'static Self {
SYNC_SCANNER.get_or_init(Self::new)
}
fn new() -> Self {
Self::from_async(get_async_scanner().clone())
}
pub fn set_timeout(&self, timeout: std::time::Duration) {
self.inner.set_timeout(timeout);
}
pub fn set_ports(&self, ports: Vec<u16>) {
self.inner.set_ports(ports);
}
pub fn set_bind_address(&self, addr: &str) -> Result<()> {
self.inner.set_bind_address(addr)
}
pub(crate) fn from_async(async_scanner: AsyncScanner) -> Self {
let (tx, mut rx) = mpsc::channel::<ScannerCommand>(CHAN_WORKER_COMMAND_CAPACITY);
let scanner_inner = async_scanner.clone();
runtime::spawn(async move {
while let Some(cmd) = rx.recv().await {
match cmd {
ScannerCommand::Scan(resp_tx) => {
let res = scanner_inner.scan_instance().await;
let _ = resp_tx.send(res);
}
ScannerCommand::Discover(id, resp_tx) => {
let res = scanner_inner
.discover_device_instance(&id)
.await
.ok()
.flatten();
let _ = resp_tx.send(res);
}
}
}
});
Self {
inner: async_scanner,
cmd_tx: tx,
}
}
pub fn scan() -> Result<Vec<DiscoveryResult>> {
Self::get().scan_instance()
}
pub fn scan_instance(&self) -> Result<Vec<DiscoveryResult>> {
wait_for_response(&self.cmd_tx, ScannerCommand::Scan)?
}
pub fn discover(id: &str) -> Option<DiscoveryResult> {
Self::get().discover_instance(id)
}
pub fn discover_instance(&self, id: &str) -> Option<DiscoveryResult> {
wait_for_response(&self.cmd_tx, |resp_tx| {
ScannerCommand::Discover(id.to_string(), resp_tx)
})
.ok()
.flatten()
}
pub fn scan_stream() -> std::sync::mpsc::Receiver<DiscoveryResult> {
Self::get().scan_stream_instance()
}
pub fn scan_stream_instance(&self) -> std::sync::mpsc::Receiver<DiscoveryResult> {
let (tx, rx) = std::sync::mpsc::sync_channel(CHAN_SYNC_CAPACITY);
let async_scanner = self.inner.clone();
runtime::spawn(async move {
let stream = async_scanner.scan_stream_instance();
tokio::pin!(stream);
bridge_to_sync(stream, tx, |dropped| {
warn!("Sync scan stream resumed after dropping {dropped} buffered results");
})
.await;
});
rx
}
}
pub fn unified_listener(devices: Vec<Device>) -> std::sync::mpsc::Receiver<Result<DeviceEvent>> {
unified_listener_with_capacity(devices, CHAN_UNIFIED_CAPACITY)
}
pub fn unified_listener_with_capacity(
devices: Vec<Device>,
capacity: usize,
) -> std::sync::mpsc::Receiver<Result<DeviceEvent>> {
let (tx, rx) = std::sync::mpsc::sync_channel(capacity);
let async_devices: Vec<AsyncDevice> = devices.into_iter().map(|d| d.inner.clone()).collect();
runtime::spawn(async move {
let stream = async_unified_listener(async_devices);
tokio::pin!(stream);
bridge_to_sync(stream, tx, |dropped| {
warn!("Sync unified listener resumed after dropping {dropped} buffered events");
})
.await;
});
rx
}
async fn bridge_to_sync<S, T, F>(
mut stream: std::pin::Pin<&mut S>,
tx: std::sync::mpsc::SyncSender<T>,
mut on_resume: F,
) where
S: futures_core::stream::Stream<Item = T>,
F: FnMut(u64),
{
let mut dropped: u64 = 0;
while let Some(item) = stream.next().await {
match tx.try_send(item) {
Ok(()) => {
if dropped > 0 {
on_resume(dropped);
dropped = 0;
}
}
Err(TrySendError::Full(_)) => {
dropped = dropped.saturating_add(1);
}
Err(TrySendError::Disconnected(_)) => break,
}
}
}
#[cfg(test)]
mod tests {
use super::{bridge_to_sync, check_no_runtime_context};
use futures_util::stream;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[test]
fn check_no_runtime_context_errors_inside_runtime() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let res = check_no_runtime_context();
assert!(res.is_err(), "expected error when inside a tokio runtime");
let msg = format!("{}", res.unwrap_err());
assert!(
msg.contains("tokio runtime"),
"error message should mention runtime, got: {msg}"
);
});
}
#[test]
fn check_no_runtime_context_ok_outside_runtime() {
assert!(check_no_runtime_context().is_ok());
}
#[test]
fn bridge_drops_overflow_and_keeps_running() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let (tx, rx) = std::sync::mpsc::sync_channel::<u32>(2);
let resumed_with = Arc::new(AtomicU64::new(0));
let resumed_clone = resumed_with.clone();
let stream = stream::iter(vec![1u32, 2, 3, 4, 5]);
tokio::pin!(stream);
bridge_to_sync(stream, tx, move |dropped| {
resumed_clone.store(dropped, Ordering::SeqCst);
})
.await;
assert_eq!(rx.try_recv().unwrap(), 1);
assert_eq!(rx.try_recv().unwrap(), 2);
assert!(rx.try_recv().is_err());
assert_eq!(resumed_with.load(Ordering::SeqCst), 0);
});
}
#[test]
fn bridge_reports_dropped_count_when_consumer_catches_up() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let (tx, rx) = std::sync::mpsc::sync_channel::<u32>(1);
let resumed_with = Arc::new(AtomicU64::new(0));
let resumed_clone = resumed_with.clone();
let (gate_tx, mut gate_rx) = tokio::sync::mpsc::channel::<()>(8);
let producer = async_stream::stream! {
yield 1u32; yield 2; yield 3; gate_rx.recv().await; yield 4; };
tokio::pin!(producer);
let consumer = tokio::spawn(async move {
tokio::task::spawn_blocking(move || {
let first = rx.recv().unwrap();
assert_eq!(first, 1);
let _ = gate_tx.blocking_send(());
let next = rx.recv().unwrap();
assert_eq!(next, 4);
})
.await
.unwrap();
});
bridge_to_sync(producer, tx, move |dropped| {
resumed_clone.store(dropped, Ordering::SeqCst);
})
.await;
consumer.await.unwrap();
assert_eq!(resumed_with.load(Ordering::SeqCst), 2);
});
}
#[test]
fn bridge_exits_on_receiver_disconnect() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let (tx, rx) = std::sync::mpsc::sync_channel::<u32>(4);
drop(rx);
let stream = stream::iter(vec![1u32, 2, 3]);
tokio::pin!(stream);
bridge_to_sync(stream, tx, |_| {
panic!("on_resume must not fire on disconnect")
})
.await;
});
}
}