use super::sample::{Boundary, BoundaryReason, Column, PriorState, Sample};
use crate::tio;
use proto::meta::MetadataType;
use proto::DeviceRoute;
use std::collections::HashMap;
use std::sync::Arc;
use tio::proto::meta::{
ColumnMetadata, DeviceMetadata, MetadataContent, SegmentMetadata, StreamMetadata,
};
use tio::{proto, util};
static TL_STREAMRPC_MAX_META: usize = 16;
const META_RPC_ID: u16 = 7855;
#[derive(Debug, Clone)]
struct StreamRpcMetaReq {
mtype: MetadataType,
stream_id: u8,
index: u8,
}
impl StreamRpcMetaReq {
pub fn device() -> StreamRpcMetaReq {
StreamRpcMetaReq {
mtype: MetadataType::Device,
stream_id: 0,
index: 0,
}
}
pub fn stream(id: u8) -> StreamRpcMetaReq {
StreamRpcMetaReq {
mtype: MetadataType::Stream,
stream_id: id,
index: 0,
}
}
pub fn segment(index: u8, stream_id: u8) -> StreamRpcMetaReq {
StreamRpcMetaReq {
mtype: MetadataType::Segment,
stream_id: stream_id,
index: index,
}
}
pub fn column(index: u8, stream_id: u8) -> StreamRpcMetaReq {
StreamRpcMetaReq {
mtype: MetadataType::Column,
stream_id: stream_id,
index: index,
}
}
}
fn make_metareq(reqs: Vec<StreamRpcMetaReq>) -> Vec<u8> {
let mut ret = vec![];
if reqs.len() > TL_STREAMRPC_MAX_META {
panic!("too many requests")
}
for req in reqs {
ret.push(req.mtype.clone().into());
ret.push(req.stream_id);
ret.push(req.index);
}
ret
}
fn metareqs_to_rpcs(metareqs: &Vec<StreamRpcMetaReq>) -> Vec<tio::Packet> {
let mut ret = vec![];
let mut reqs = &metareqs[..];
loop {
if reqs.len() == 0 {
break;
}
let n_reqs = if reqs.len() > TL_STREAMRPC_MAX_META {
TL_STREAMRPC_MAX_META
} else if reqs.len() == 1 {
if let MetadataType::Device = reqs[0].mtype {
reqs = &reqs[1..];
0
} else {
1
}
} else {
reqs.len()
};
ret.push(util::PacketBuilder::make_rpc_request(
"dev.metadata",
&make_metareq((&reqs[0..n_reqs]).to_vec()),
META_RPC_ID,
DeviceRoute::root(),
));
reqs = &reqs[n_reqs..];
}
ret
}
fn parse_metarep(rep: Vec<u8>) -> Vec<tio::proto::meta::MetadataContent> {
use tio::proto::meta;
let mut ret = vec![];
let mut offset: usize = 0;
while (offset + 2) <= rep.len() {
let mtype = MetadataType::from(rep[offset]);
let varlen = usize::from(rep[offset + 1]);
let metadata = &rep[offset + 2..offset + 2 + varlen];
offset += varlen + 2;
match mtype {
MetadataType::Device => {
let (dm, _, _) = meta::DeviceMetadata::deserialize(metadata, &[]).unwrap();
ret.push(meta::MetadataContent::Device(dm));
}
MetadataType::Stream => {
let (sm, _, _) = meta::StreamMetadata::deserialize(metadata, &[]).unwrap();
ret.push(meta::MetadataContent::Stream(sm));
}
MetadataType::Segment => {
let (sm, _, _) = meta::SegmentMetadata::deserialize(metadata, &[]).unwrap();
ret.push(meta::MetadataContent::Segment(sm));
}
MetadataType::Column => {
let (cm, _, _) = meta::ColumnMetadata::deserialize(metadata, &[]).unwrap();
ret.push(meta::MetadataContent::Column(cm));
}
_ => {}
}
}
ret
}
#[derive(Debug, Clone)]
pub struct DeviceStreamMetadata {
pub stream: Arc<StreamMetadata>,
pub segment: Arc<SegmentMetadata>,
pub columns: Vec<Arc<ColumnMetadata>>,
}
#[derive(Debug)]
struct DeviceColumn {
metadata: Arc<ColumnMetadata>,
offset: usize,
}
#[derive(Debug)]
struct DeviceStream {
stream: Option<Arc<StreamMetadata>>,
segment: Option<Arc<SegmentMetadata>>,
columns: Vec<DeviceColumn>,
id: u8,
current_data_seg: u8,
established: bool,
last_seg: u8,
last_sample_number: u32,
last_timestamp: f64,
last_session_id: u32,
last_time_ref_session_id: u32,
effective_rate: f64,
}
impl DeviceStream {
fn new(id: u8) -> Self {
DeviceStream {
stream: None,
segment: None,
columns: vec![],
id,
current_data_seg: 0,
established: false,
last_seg: 0,
last_sample_number: 0,
last_timestamp: 0.0,
last_session_id: 0,
last_time_ref_session_id: 0,
effective_rate: 0.0,
}
}
fn requests(&self) -> Vec<StreamRpcMetaReq> {
let mut ret = vec![];
match self.stream.as_ref() {
Some(stream) => {
let n_cols = stream.n_columns;
for i in self.columns.len()..n_cols {
ret.push(StreamRpcMetaReq::column(i as u8, self.id))
}
}
None => {
ret.push(StreamRpcMetaReq::stream(self.id));
}
}
if match self.segment.as_ref() {
Some(seg) => seg.segment_id != self.current_data_seg,
None => true,
} {
ret.push(StreamRpcMetaReq::segment(self.current_data_seg, self.id));
}
ret
}
fn parse_sample(&self, data: &[u8]) -> Vec<Column> {
let mut ret = vec![];
for col in &self.columns {
ret.push(Column::from_le_bytes(
&data[col.offset..],
col.metadata.clone(),
));
}
ret
}
fn capture_prior_state(&self) -> Option<PriorState> {
if !self.established {
return None;
}
Some(PriorState {
session_id: self.last_session_id,
segment_id: self.last_seg,
time_ref_session_id: self.last_time_ref_session_id,
sample_number: self.last_sample_number,
timestamp: self.last_timestamp,
effective_rate: self.effective_rate,
})
}
fn detect_boundary(
&self,
first_sample_n: u32,
first_timestamp: f64,
dev: &Arc<DeviceMetadata>,
segment: &Arc<SegmentMetadata>,
new_rate: f64,
is_segment_rollover: bool,
) -> Option<Boundary> {
let prior = self.capture_prior_state();
if !self.established {
return Some(Boundary {
reason: BoundaryReason::Initial,
prior: None,
});
}
if dev.session_id != self.last_session_id {
return Some(Boundary {
reason: BoundaryReason::SessionChanged {
old: self.last_session_id,
new: dev.session_id,
},
prior,
});
}
if segment.time_ref_session_id != self.last_time_ref_session_id {
return Some(Boundary {
reason: BoundaryReason::TimeRefSessionChanged {
old: self.last_time_ref_session_id,
new: segment.time_ref_session_id,
},
prior,
});
}
if (new_rate - self.effective_rate).abs() > 1e-9 {
return Some(Boundary {
reason: BoundaryReason::RateChanged {
old_rate: self.effective_rate,
new_rate,
},
prior,
});
}
let half_period = 0.5 / new_rate;
if first_timestamp < self.last_timestamp - half_period {
return Some(Boundary {
reason: BoundaryReason::TimeBackward {
gap_seconds: self.last_timestamp - first_timestamp,
},
prior,
});
}
if segment.segment_id != self.last_seg {
return Some(Boundary {
reason: if is_segment_rollover {
BoundaryReason::SegmentRollover {
old_id: self.last_seg,
new_id: segment.segment_id,
}
} else {
BoundaryReason::SegmentChanged {
old_id: self.last_seg,
new_id: segment.segment_id,
}
},
prior,
});
}
let expected_sample = self.last_sample_number.wrapping_add(1);
if first_sample_n != expected_sample {
let ts_gap = (first_timestamp - self.last_timestamp).abs();
let is_benign_rollover =
first_sample_n < self.last_sample_number && ts_gap < half_period;
if !is_benign_rollover && ts_gap > half_period {
return Some(Boundary {
reason: BoundaryReason::SamplesLost {
expected: expected_sample,
received: first_sample_n,
},
prior,
});
}
}
None
}
fn process_samples(
&mut self,
data: &tio::proto::StreamDataPayload,
dev: Arc<DeviceMetadata>,
) -> Vec<Sample> {
self.current_data_seg = data.segment_id;
if self.stream.is_none() || self.segment.is_none() {
return vec![];
}
let stream = self.stream.as_ref().unwrap().clone();
if stream.n_columns != self.columns.len() {
return vec![];
}
let expected_sample_size = self
.columns
.last()
.map(|col| col.offset + col.metadata.data_type.size())
.unwrap_or(0);
if expected_sample_size > stream.sample_size {
return vec![];
}
if stream.sample_size == 0 {
return vec![];
}
if data.data.len() % stream.sample_size != 0 {
return vec![];
}
let segment = self.segment.as_ref().unwrap().clone();
if segment.decimation == 0 || segment.sampling_rate == 0 {
return vec![];
}
if stream.n_segments == 0 {
return vec![];
}
let new_rate = segment.sampling_rate as f64 / segment.decimation as f64;
let (segment, is_segment_rollover) = if segment.segment_id != data.segment_id {
let next_sample = self.last_sample_number.wrapping_add(1);
let next_segment = (segment.segment_id + 1).rem_euclid(stream.n_segments as u8);
let rate = segment.sampling_rate / segment.decimation;
if (data.first_sample_n == 0)
&& ((next_sample % rate) == 0)
&& (data.segment_id == next_segment)
{
let mut new_seg = (*segment).clone();
new_seg.segment_id = data.segment_id;
new_seg.start_time += next_sample / rate;
(Arc::new(new_seg), true)
} else {
return vec![];
}
} else {
(segment, false)
};
let period = 1.0 / new_rate;
let first_timestamp =
f64::from(segment.start_time) + period * f64::from(data.first_sample_n);
let boundary = self.detect_boundary(
data.first_sample_n,
first_timestamp,
&dev,
&segment,
new_rate,
is_segment_rollover,
);
let mut ret = vec![];
let mut sample_n = data.first_sample_n;
let mut offset = 0;
let mut is_first = true;
while offset < data.data.len() {
let raw_sample = &data.data.get(offset..(offset + stream.sample_size));
let columns = if let Some(r) = raw_sample {
self.parse_sample(r)
} else {
return Vec::new();
};
let sample = Sample {
n: sample_n,
columns: columns,
segment: segment.clone(),
stream: stream.clone(),
device: dev.clone(),
source: data.clone(),
boundary: if is_first { boundary.clone() } else { None },
};
self.last_sample_number = sample_n;
self.last_timestamp = sample.timestamp_end();
self.last_session_id = dev.session_id;
self.last_time_ref_session_id = segment.time_ref_session_id;
self.last_seg = segment.segment_id;
self.effective_rate = new_rate;
self.established = true;
ret.push(sample);
offset += stream.sample_size;
sample_n = sample_n.wrapping_add(1);
is_first = false;
}
ret
}
fn invalidate_metadata(&mut self) {
self.stream = None;
self.segment = None;
self.columns.clear();
}
fn get_metadata(&self) -> Result<DeviceStreamMetadata, Vec<StreamRpcMetaReq>> {
let reqs = self.requests();
if reqs.is_empty() {
Ok(DeviceStreamMetadata {
stream: self.stream.as_ref().unwrap().clone(),
segment: self.segment.as_ref().unwrap().clone(),
columns: self.columns.iter().map(|x| x.metadata.clone()).collect(),
})
} else {
Err(reqs)
}
}
}
#[derive(Debug, Clone)]
pub struct DeviceFullMetadata {
pub device: Arc<DeviceMetadata>,
pub streams: HashMap<u8, DeviceStreamMetadata>,
}
pub struct DeviceDataParser {
device: Option<Arc<DeviceMetadata>>,
streams: HashMap<u8, DeviceStream>,
ignore_session: bool,
}
impl DeviceDataParser {
pub fn new(ignore_session: bool) -> DeviceDataParser {
DeviceDataParser {
device: None,
streams: HashMap::new(),
ignore_session,
}
}
fn get_stream(&mut self, stream_id: u8) -> &mut DeviceStream {
if !self.streams.contains_key(&stream_id) {
self.streams.insert(stream_id, DeviceStream::new(stream_id));
}
self.streams.get_mut(&stream_id).unwrap()
}
fn process_metadata(&mut self, metadata: &MetadataContent, from_update: bool) {
match metadata {
MetadataContent::Device(dm) => {
if let Some(cur) = &self.device {
if cur.serial_number != dm.serial_number {
self.streams.clear();
} else if (cur.session_id != dm.session_id)
|| (cur.firmware_hash != dm.firmware_hash)
{
for stream in self.streams.values_mut() {
stream.invalidate_metadata();
}
}
}
self.device.replace(Arc::new(dm.clone()));
}
MetadataContent::Stream(sm) => {
if let Some(dev) = &self.device {
if usize::from(sm.stream_id) > dev.n_streams {
self.device.take();
self.streams.clear();
}
}
let dstream = self.get_stream(sm.stream_id);
if let Some(stream) = &dstream.stream {
if stream.as_ref() != sm {
self.device.take();
self.streams.clear();
}
} else {
dstream.stream.replace(Arc::new(sm.clone()));
}
}
MetadataContent::Segment(sm) => {
if let Some(dev) = &self.device {
if usize::from(sm.stream_id) > dev.n_streams {
self.device.take();
self.streams.clear();
}
}
let dstream = self.get_stream(sm.stream_id);
if let Some(segment) = &dstream.segment {
if segment.as_ref() != sm {
dstream.segment.replace(Arc::new(sm.clone()));
if from_update {
dstream.current_data_seg = sm.segment_id;
}
}
} else {
dstream.segment.replace(Arc::new(sm.clone()));
if from_update {
dstream.current_data_seg = sm.segment_id;
}
}
}
MetadataContent::Column(cm) => {
if let Some(dev) = &self.device {
if usize::from(cm.stream_id) > dev.n_streams {
self.device.take();
self.streams.clear();
}
}
let dstream = self.get_stream(cm.stream_id);
if usize::from(cm.index) < dstream.columns.len() {
if dstream.columns[cm.index].metadata.as_ref() != cm {
self.device.take();
self.streams.clear();
}
} else if usize::from(cm.index) == dstream.columns.len() {
let offset = if cm.index == 0 {
0
} else {
let prev_col = &dstream.columns[cm.index - 1];
prev_col.offset + prev_col.metadata.data_type.size()
};
dstream.columns.push(DeviceColumn {
metadata: Arc::new(cm.clone()),
offset: offset,
})
}
}
_ => {}
}
}
pub fn process_packet(&mut self, pkt: &tio::Packet) -> Vec<Sample> {
match &pkt.payload {
tio::proto::Payload::RpcReply(rep) => {
for metadata in parse_metarep(rep.reply.clone()) {
self.process_metadata(&metadata, false)
}
}
tio::proto::Payload::Metadata(mp) => self.process_metadata(&mp.content, true),
tio::proto::Payload::Heartbeat(hb) => {
if let tio::proto::HeartbeatPayload::Session(session_id) = hb {
if let Some(dev) = &self.device {
if (dev.session_id != *session_id) && !self.ignore_session {
for stream in self.streams.values_mut() {
stream.invalidate_metadata();
}
self.device.take();
}
}
}
}
tio::proto::Payload::StreamData(data) => {
if let Some(dev) = &self.device {
if usize::from(data.stream_id) > dev.n_streams {
self.device.take();
self.streams.clear();
} else {
let ndev = dev.clone();
let dstream = self.get_stream(data.stream_id);
return dstream.process_samples(data, ndev);
}
}
}
_ => {
}
}
return vec![];
}
pub fn requests(&self) -> Vec<tio::Packet> {
let mut reqs = vec![];
match self.device.as_ref() {
Some(device) => {
for i in 0..device.n_streams {
let stream_id = (i + 1) as u8;
if let Some(stream) = self.streams.get(&stream_id) {
reqs.extend(stream.requests())
} else {
reqs.push(StreamRpcMetaReq::stream(stream_id));
}
}
}
None => {
reqs.push(StreamRpcMetaReq::device());
}
}
metareqs_to_rpcs(&reqs)
}
pub fn get_metadata(&self) -> Result<DeviceFullMetadata, Vec<tio::Packet>> {
let reqs = self.requests();
if !reqs.is_empty() {
return Err(reqs);
}
let mut streams = HashMap::new();
for (id, stream) in &self.streams {
streams.insert(*id, stream.get_metadata().unwrap());
}
Ok(DeviceFullMetadata {
device: self.device.as_ref().unwrap().clone(),
streams: streams,
})
}
}