use super::port;
use super::proto::{self, DeviceRoute, Packet, ProxyStatus};
use super::proxy_core::{ProxyClient, ProxyCore};
use super::util;
use super::util::{TioRpcReplyable, TioRpcRequestable};
use std::env;
use std::thread;
use std::time::Duration;
use crossbeam::channel;
#[derive(Debug)]
pub enum Event {
SensorConnected,
SensorDisconnected,
SensorReconnected,
FailedToConnect,
FailedToReconnect,
Exiting,
ProtocolError(proto::Error),
FatalError(port::RecvError),
NewClient(u64),
RpcRemap((u64, u16), u16),
RpcRestore(u16, (u64, u16)),
RpcRestoreNotFound(u16),
RpcClientNotFound(u64),
RpcTimeout(u16),
RpcCancel(u16),
ClientSendFailed(u64),
ClientTerminated(u64),
RootDeviceRestarted,
AutoRateGaveUp,
AutoRateQueried(u32),
AutoRateRpcError(proto::RpcErrorCode),
AutoRateRpcInvalid,
AutoRateIncompatible(u32),
AutoRateCompatible(u32),
AutoRateWait,
AutoRateSet(u32),
SetRate(u32),
SetRateFailed,
NoData,
}
impl From<ProxyStatus> for super::proxy::Event {
fn from(status: ProxyStatus) -> Self {
match status {
ProxyStatus::SensorDisconnected => Event::SensorDisconnected,
ProxyStatus::SensorReconnected => Event::SensorReconnected,
ProxyStatus::FailedToReconnect => Event::FailedToReconnect,
ProxyStatus::FailedToConnect => Event::FailedToConnect,
ProxyStatus::Unknown(_) => Event::SensorDisconnected,
}
}
}
pub struct Port {
tx: channel::Sender<Packet>,
rx: channel::Receiver<Packet>,
depth: usize,
scope: DeviceRoute,
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum SendError {
#[error("channel full")]
WouldBlock(Packet),
#[error("proxy disconnected")]
ProxyDisconnected(Packet),
#[error("route exceeds port scope")]
InvalidRoute(Packet),
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum RecvError {
#[error("no packet available")]
WouldBlock,
#[error("proxy disconnected")]
ProxyDisconnected,
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum RpcError {
#[error("failed to send RPC request: {0}")]
SendFailed(#[from] SendError),
#[error("device returned error: {0}")]
ExecError(proto::RpcErrorPayload),
#[error("failed to receive RPC reply: {0}")]
RecvFailed(#[from] RecvError),
#[error("RPC reply did not match expected type")]
TypeError,
}
impl Port {
pub fn scope(&self) -> DeviceRoute {
self.scope.clone()
}
pub fn send(&self, packet: Packet) -> Result<(), SendError> {
if packet.routing.len() > self.depth {
return Err(SendError::InvalidRoute(packet));
}
match self.tx.send(packet) {
Ok(()) => Ok(()),
Err(se) => Err(SendError::ProxyDisconnected(se.into_inner())),
}
}
pub fn try_send(&self, packet: Packet) -> Result<(), SendError> {
if packet.routing.len() > self.depth {
return Err(SendError::InvalidRoute(packet));
}
match self.tx.try_send(packet) {
Ok(()) => Ok(()),
Err(crossbeam::channel::TrySendError::Full(pkt)) => Err(SendError::WouldBlock(pkt)),
Err(crossbeam::channel::TrySendError::Disconnected(pkt)) => {
Err(SendError::ProxyDisconnected(pkt))
}
}
}
pub fn select_send<'a>(&'a self, sel: &mut crossbeam::channel::Select<'a>) -> usize {
sel.send(&self.tx)
}
pub fn recv(&self) -> Result<Packet, RecvError> {
match self.rx.recv() {
Ok(pkt) => Ok(pkt),
Err(crossbeam::channel::RecvError) => Err(RecvError::ProxyDisconnected),
}
}
pub fn try_recv(&self) -> Result<Packet, RecvError> {
match self.rx.try_recv() {
Ok(pkt) => Ok(pkt),
Err(crossbeam::channel::TryRecvError::Empty) => Err(RecvError::WouldBlock),
Err(crossbeam::channel::TryRecvError::Disconnected) => {
Err(RecvError::ProxyDisconnected)
}
}
}
pub fn select_recv<'a>(&'a self, sel: &mut crossbeam::channel::Select<'a>) -> usize {
sel.recv(&self.rx)
}
pub fn receiver<'a>(&'a self) -> &'a crossbeam::channel::Receiver<Packet> {
&self.rx
}
pub fn iter(&self) -> crossbeam::channel::Iter<'_, Packet> {
self.rx.iter()
}
pub fn try_iter(&self) -> crossbeam::channel::TryIter<'_, Packet> {
self.rx.try_iter()
}
pub fn raw_rpc(&self, name: &str, arg: &[u8]) -> Result<Vec<u8>, RpcError> {
if let Err(err) = self.send(util::PacketBuilder::make_rpc_request(
name,
arg,
0,
DeviceRoute::root(),
)) {
return Err(RpcError::SendFailed(err));
}
loop {
match self.recv() {
Ok(pkt) => match pkt.payload {
proto::Payload::RpcReply(rep) => return Ok(rep.reply),
proto::Payload::RpcError(err) => return Err(RpcError::ExecError(err)),
_ => continue,
},
Err(err) => {
return Err(RpcError::RecvFailed(err));
}
}
}
}
pub fn rpc<ReqT: TioRpcRequestable<ReqT>, RepT: TioRpcReplyable<RepT>>(
&self,
name: &str,
arg: ReqT,
) -> Result<RepT, RpcError> {
let ret = self.raw_rpc(name, &arg.to_request())?;
if let Ok(val) = RepT::from_reply(&ret) {
Ok(val)
} else {
Err(RpcError::TypeError)
}
}
pub fn action(&self, name: &str) -> Result<(), RpcError> {
self.rpc(name, ())
}
pub fn get<T: TioRpcReplyable<T>>(&self, name: &str) -> Result<T, RpcError> {
self.rpc(name, ())
}
}
#[derive(Debug, Clone, thiserror::Error)]
pub enum PortError {
#[error("RPC timeout too short")]
RpcTimeoutTooShort,
#[error("RPC timeout too long")]
RpcTimeoutTooLong,
#[error("failed to set up new proxy client")]
FailedNewClientSetup,
}
pub struct Interface {
new_client_queue: channel::Sender<ProxyClient>,
new_client_confirm: Option<channel::Receiver<Event>>,
client_rx_channel_size: usize,
client_tx_channel_size: usize,
}
impl Interface {
pub fn new_proxy(
url: &str,
reconnect_timeout: Option<Duration>,
status_queue: Option<channel::Sender<Event>>,
) -> Interface {
let (client_sender, client_receiver) = channel::bounded::<ProxyClient>(5);
let (status_sender, status_receiver, only_clients) = {
if let Some(status_sender) = status_queue {
(status_sender, None, false)
} else {
let (s, r) = channel::bounded::<Event>(50);
(s, Some(r), true)
}
};
let url_string = url.to_string();
thread::spawn(move || {
#[cfg(target_os = "windows")]
let _priority = super::os::windows_helpers::ActivityGuard::latency_critical()
.map_err(|e| eprintln!("proxy core: failed to raise thread priority: {e}"))
.ok();
#[cfg(target_os = "macos")]
let _activity =
super::os::macos_helpers::ActivityGuard::latency_critical("Twinleaf proxy core");
let mut proxy = ProxyCore::new(
url_string,
reconnect_timeout,
client_receiver,
status_sender,
only_clients,
);
proxy.run();
});
Interface {
new_client_queue: client_sender,
new_client_confirm: status_receiver,
client_rx_channel_size: Self::get_client_rx_channel_size(),
client_tx_channel_size: Self::get_client_tx_channel_size(),
}
}
pub fn new(url: &str) -> Interface {
Self::new_proxy(url, None, None)
}
pub fn default() -> Interface {
Self::new(util::default_proxy_url())
}
pub fn get_client_rx_channel_size() -> usize {
let min_size = port::DEFAULT_RX_CHANNEL_SIZE;
if let Ok(req) = env::var("TWINLEAF_PROXY_INTERFACE_RX_BUFSIZE") {
std::cmp::max(req.parse().unwrap_or(0), min_size)
} else {
min_size
}
}
pub fn get_client_tx_channel_size() -> usize {
let min_size = port::DEFAULT_TX_CHANNEL_SIZE;
if let Ok(req) = env::var("TWINLEAF_PROXY_INTERFACE_TX_BUFSIZE") {
std::cmp::max(req.parse().unwrap_or(0), min_size)
} else {
min_size
}
}
pub fn new_port(
&self,
rpc_timeout: Option<Duration>,
scope: DeviceRoute,
depth: usize,
forward_data: bool,
forward_nonrpc: bool,
) -> Result<Port, PortError> {
let default_rpc_timeout = Duration::from_millis(3000);
let rpc_timeout = rpc_timeout.unwrap_or(default_rpc_timeout);
if rpc_timeout < Duration::from_millis(100) {
return Err(PortError::RpcTimeoutTooShort);
}
if rpc_timeout > Duration::from_secs(60) {
return Err(PortError::RpcTimeoutTooLong);
}
let (client_to_proxy_sender, proxy_from_client_receiver) =
channel::bounded::<Packet>(self.client_tx_channel_size);
let (proxy_to_client_sender, client_from_proxy_receiver) =
channel::bounded::<Packet>(self.client_rx_channel_size);
if let Err(_) = self.new_client_queue.send(ProxyClient::new(
proxy_to_client_sender,
proxy_from_client_receiver,
rpc_timeout,
scope.clone(),
depth,
forward_data,
forward_nonrpc,
)) {
return Err(PortError::FailedNewClientSetup);
}
if let Some(confirm) = &self.new_client_confirm {
if let Err(_) = confirm.recv() {
return Err(PortError::FailedNewClientSetup);
}
}
Ok(Port {
tx: client_to_proxy_sender,
rx: client_from_proxy_receiver,
depth: depth,
scope: scope,
})
}
pub fn subtree_full(&self, subtree_root: DeviceRoute) -> Result<Port, PortError> {
self.new_port(None, subtree_root, usize::MAX, true, true)
}
pub fn subtree_rpc(&self, subtree_root: DeviceRoute) -> Result<Port, PortError> {
self.new_port(None, subtree_root, usize::MAX, false, false)
}
pub fn subtree_probe(&self, subtree_root: DeviceRoute) -> Result<Port, PortError> {
self.new_port(None, subtree_root, usize::MAX, false, true)
}
pub fn tree_probe(&self) -> Result<Port, PortError> {
self.subtree_probe(DeviceRoute::root())
}
pub fn tree_full(&self) -> Result<Port, PortError> {
self.subtree_full(DeviceRoute::root())
}
pub fn tree_rpc(&self) -> Result<Port, PortError> {
self.subtree_rpc(DeviceRoute::root())
}
pub fn device_full(&self, address: DeviceRoute) -> Result<Port, PortError> {
self.new_port(None, address, 0, true, true)
}
pub fn device_rpc(&self, address: DeviceRoute) -> Result<Port, PortError> {
self.new_port(None, address, 0, false, false)
}
pub fn root_full(&self) -> Result<Port, PortError> {
self.device_full(DeviceRoute::root())
}
pub fn root_rpc(&self) -> Result<Port, PortError> {
self.device_rpc(DeviceRoute::root())
}
}