use crate::data::{DeviceDataParser, DeviceFullMetadata, Sample};
use crate::tio;
use proto::DeviceRoute;
use tio::{proto, proxy, util};
use std::collections::VecDeque;
#[derive(Debug, Clone)]
pub enum DeviceEvent {
Status(proto::ProxyStatus),
RpcInvalidated(proto::RpcMethod),
Heartbeat {
session_id: Option<proto::identifiers::SessionId>,
},
MetadataReady(DeviceFullMetadata),
NewHash(Option<u32>),
}
pub enum DeviceItem {
Sample(Sample),
Event(DeviceEvent),
}
pub struct Device {
dev_port: proxy::Port,
parser: DeviceDataParser,
n_reqs: usize,
metadata_announced: bool,
sample_queue: VecDeque<Sample>,
event_queue: VecDeque<DeviceEvent>,
}
impl Device {
pub fn new(dev_port: proxy::Port) -> Device {
Device {
dev_port: dev_port,
parser: DeviceDataParser::new(false),
n_reqs: 0,
metadata_announced: false,
sample_queue: VecDeque::new(),
event_queue: VecDeque::new(),
}
}
pub fn open(proxy: &proxy::Interface, route: DeviceRoute) -> Result<Device, proxy::PortError> {
let port = proxy.device_full(route)?;
Ok(Self::new(port))
}
fn internal_rpcs(&mut self) -> Result<(), proxy::SendError> {
if self.n_reqs == 0 {
let reqs = self.parser.requests();
for req in reqs {
self.dev_port.send(req)?;
self.n_reqs += 1;
}
}
Ok(())
}
fn process_packet(&mut self, pkt: &tio::Packet) {
match &pkt.payload {
tio::proto::Payload::ProxyStatus(ps) => {
self.event_queue.push_back(DeviceEvent::Status(ps.0));
if matches!(ps.0, proto::ProxyStatus::SensorDisconnected) {
self.metadata_announced = false;
self.parser = DeviceDataParser::new(false);
}
if matches!(ps.0, proto::ProxyStatus::SensorReconnected) {
self.event_queue.push_back(DeviceEvent::NewHash(None));
}
return;
}
tio::proto::Payload::RpcUpdate(ru) => {
self.event_queue
.push_back(DeviceEvent::RpcInvalidated(ru.0.clone()));
return;
}
tio::proto::Payload::Heartbeat(hb) => {
let session_id = match hb {
tio::proto::HeartbeatPayload::Session(sid) => Some(*sid),
tio::proto::HeartbeatPayload::Any(_) => None,
};
self.event_queue
.push_back(DeviceEvent::Heartbeat { session_id });
}
tio::proto::Payload::Settings(set) => {
let hash = match set {
tio::proto::SettingsPayload::RpcHash(h) => Some(*h),
tio::proto::SettingsPayload::Unknown { .. } => None,
};
if let Some(hash) = hash {
self.event_queue.push_back(DeviceEvent::NewHash(Some(hash)));
}
}
tio::proto::Payload::RpcReply(rep) => {
if rep.id == 7855 {
self.n_reqs -= 1
}
}
tio::proto::Payload::RpcError(err) => {
if err.id == 7855 {
self.n_reqs -= 1
}
}
_ => {}
}
self.sample_queue
.append(&mut VecDeque::from(self.parser.process_packet(&pkt)));
if !self.metadata_announced {
if let Ok(full_metadata) = self.parser.get_metadata() {
self.metadata_announced = true;
self.event_queue
.push_back(DeviceEvent::MetadataReady(full_metadata));
}
}
}
pub fn get_metadata(&mut self) -> Result<DeviceFullMetadata, proxy::RpcError> {
loop {
if self.n_reqs == 0 {
match self.parser.get_metadata() {
Ok(full_meta) => return Ok(full_meta),
Err(reqs) => {
for req in reqs {
self.dev_port
.send(req)
.map_err(proxy::RpcError::SendFailed)?;
self.n_reqs += 1;
}
}
}
}
let pkt = self.dev_port.recv().map_err(proxy::RpcError::RecvFailed)?;
self.process_packet(&pkt);
}
}
pub fn next(&mut self) -> Result<Sample, proxy::RpcError> {
loop {
if let Some(sample) = self.sample_queue.pop_front() {
return Ok(sample);
}
self.internal_rpcs().map_err(proxy::RpcError::SendFailed)?;
let pkt = self.dev_port.recv().map_err(proxy::RpcError::RecvFailed)?;
self.process_packet(&pkt);
}
}
pub fn try_next(&mut self) -> Result<Option<Sample>, proxy::RpcError> {
loop {
if let Some(sample) = self.sample_queue.pop_front() {
return Ok(Some(sample));
}
self.internal_rpcs().map_err(proxy::RpcError::SendFailed)?;
let pkt = match self.dev_port.try_recv() {
Ok(pkt) => pkt,
Err(proxy::RecvError::WouldBlock) => return Ok(None),
Err(e) => return Err(proxy::RpcError::RecvFailed(e)),
};
self.process_packet(&pkt);
}
}
pub fn drain(&mut self) -> Result<Vec<Sample>, proxy::RpcError> {
loop {
self.internal_rpcs().map_err(proxy::RpcError::SendFailed)?;
match self.dev_port.try_recv() {
Ok(pkt) => {
self.process_packet(&pkt);
}
Err(proxy::RecvError::WouldBlock) => {
break;
}
Err(e) => {
return Err(proxy::RpcError::RecvFailed(e));
}
}
}
Ok(self.sample_queue.drain(0..).collect())
}
pub fn try_next_event(&mut self) -> Option<DeviceEvent> {
self.event_queue.pop_front()
}
pub fn drain_events(&mut self) -> Vec<DeviceEvent> {
self.event_queue.drain(..).collect()
}
pub fn next_item(&mut self) -> Result<DeviceItem, proxy::RpcError> {
loop {
if let Some(sample) = self.sample_queue.pop_front() {
return Ok(DeviceItem::Sample(sample));
}
if let Some(event) = self.event_queue.pop_front() {
return Ok(DeviceItem::Event(event));
}
self.internal_rpcs()?;
let pkt = self.dev_port.recv()?;
self.process_packet(&pkt);
}
}
pub fn try_next_item(&mut self) -> Result<Option<DeviceItem>, proxy::RpcError> {
loop {
if let Some(sample) = self.sample_queue.pop_front() {
return Ok(Some(DeviceItem::Sample(sample)));
}
if let Some(event) = self.event_queue.pop_front() {
return Ok(Some(DeviceItem::Event(event)));
}
self.internal_rpcs()?;
match self.dev_port.try_recv() {
Ok(pkt) => self.process_packet(&pkt),
Err(proxy::RecvError::WouldBlock) => return Ok(None),
Err(e) => return Err(e.into()),
}
}
}
pub fn raw_rpc(&mut self, name: &str, arg: &[u8]) -> Result<Vec<u8>, proxy::RpcError> {
if let Err(err) = self.dev_port.send(util::PacketBuilder::make_rpc_request(
name,
arg,
0,
DeviceRoute::root(),
)) {
return Err(proxy::RpcError::SendFailed(err));
}
loop {
self.internal_rpcs().map_err(proxy::RpcError::SendFailed)?;
let pkt = match self.dev_port.recv() {
Ok(packet) => packet,
Err(e) => return Err(proxy::RpcError::RecvFailed(e)),
};
self.process_packet(&pkt);
match pkt.payload {
tio::proto::Payload::RpcReply(rep) if rep.id != 7855 => {
return Ok(rep.reply);
}
tio::proto::Payload::RpcError(err) if err.id != 7855 => {
return Err(proxy::RpcError::ExecError(err));
}
_ => {}
}
}
}
pub fn rpc<ReqT: tio::util::TioRpcRequestable<ReqT>, RepT: tio::util::TioRpcReplyable<RepT>>(
&mut self,
name: &str,
arg: ReqT,
) -> Result<RepT, proxy::RpcError> {
let ret = self.raw_rpc(name, &arg.to_request())?;
if let Ok(val) = RepT::from_reply(&ret) {
Ok(val)
} else {
Err(proxy::RpcError::TypeError)
}
}
pub fn action(&mut self, name: &str) -> Result<(), proxy::RpcError> {
self.rpc(name, ())
}
pub fn get<T: tio::util::TioRpcReplyable<T>>(
&mut self,
name: &str,
) -> Result<T, proxy::RpcError> {
self.rpc(name, ())
}
pub fn get_multi(&mut self, name: &str) -> Result<Vec<u8>, proxy::RpcError> {
let mut full_reply = vec![];
for i in 0u16..=65535u16 {
match self.raw_rpc(&name, &i.to_le_bytes().to_vec()) {
Ok(mut rep) => full_reply.append(&mut rep),
Err(err @ proxy::RpcError::ExecError(_)) => {
if let proxy::RpcError::ExecError(payload) = &err {
if let tio::proto::RpcErrorCode::InvalidArgs = payload.error {
break;
}
}
return Err(err);
}
Err(e) => {
return Err(e);
}
}
}
Ok(full_reply)
}
pub fn get_multi_str(&mut self, name: &str) -> Result<String, proxy::RpcError> {
let reply_bytes = self.get_multi(name)?;
let result_string = String::from_utf8_lossy(&reply_bytes).to_string();
Ok(result_string)
}
}