emotibit_data/
parser.rs

1//! Parser functions
2use crate::types::{DataPacket, DataType, TimeSync, TimeSyncMap};
3use anyhow::{anyhow, Result};
4use chrono::{offset::TimeZone, DateTime, Local, NaiveDateTime};
5use csv::ReaderBuilder;
6use itertools::izip;
7
8const PARSER_VERSION: &str = "0.1.0";
9const MIN_SYNCS_REQUIRED: usize = 3;
10
11/// Reads a csv file and creates `DataPacket`s
12pub fn get_packets(file_path: &str) -> Result<Vec<Result<DataPacket>>> {
13    let mut vec: Vec<Result<DataPacket>> = Vec::new();
14    let mut reader = ReaderBuilder::new()
15        .flexible(true)
16        .has_headers(false)
17        .from_path(file_path)?;
18    for record in reader.records() {
19        match DataPacket::try_from(record.as_ref().unwrap()) {
20            Ok(packet) => vec.push(Ok(split_tx_or_as_is(packet)?)),
21            Err(e) => vec.push(Err(anyhow!("{}, record: {:?}", e, record))),
22        }
23    }
24    Ok(vec)
25}
26
27fn split_tx_or_as_is(x: DataPacket) -> Result<DataPacket> {
28    if let DataType::TX(data) = &x.data_type {
29        if let (Some(tag1), Some(val1), Some(tag2), Some(val2)) =
30            (data.get(0), data.get(1), data.get(2), data.get(3))
31        {
32            let data_type = match (tag1.as_ref(), tag2.as_ref()) {
33                ("LC", "LM") => Some(DataType::TxLcLm(vec![val1.parse()?, val2.parse()?])),
34                ("TL", "LC") => Some(DataType::TxTlLc((val1.to_owned(), val2.parse()?))),
35                _ => None,
36            };
37
38            if let Some(data_type) = data_type {
39                return Ok(DataPacket {
40                    host_timestamp: f64::NAN,
41                    emotibit_timestamp: x.emotibit_timestamp,
42                    packet_id: x.packet_id,
43                    data_points: x.data_points,
44                    version: x.version,
45                    reliability: x.reliability,
46                    data_type,
47                });
48            }
49            return Err(anyhow!("Invalid data"));
50        }
51    }
52    Ok(x)
53}
54
55/// Finds blocks of RD, TL, and AK and creates `TimeSync`s
56pub fn find_syncs(packets: &[Result<DataPacket>]) -> Result<Vec<TimeSync>> {
57    use DataType::*;
58    let mut vec = vec![];
59    let syncs: Vec<&DataPacket> = packets
60        .iter()
61        .filter_map(|x| {
62            x.as_ref().ok().and_then(|x| match x.data_type {
63                RD(_) | TL(_) | AK(_) => Some(x),
64                _ => None,
65            })
66        })
67        .collect();
68
69    if syncs.len() < MIN_SYNCS_REQUIRED {
70        return Err(anyhow!("Not enough sync data"));
71    }
72
73    let syncs2 = syncs.clone();
74    let syncs3 = syncs.clone();
75    for (rd, tl, ak) in izip!(&syncs, &syncs2[1..], &syncs3[2..]) {
76        if let (RD(_), TL(date_time), AK(_)) = (&rd.data_type, &tl.data_type, &ak.data_type) {
77            vec.push(TimeSync {
78                rd: rd.emotibit_timestamp,
79                ts_received: tl.emotibit_timestamp,
80                ts_sent: date_time.to_owned(),
81                ak: ak.emotibit_timestamp,
82                round_trip: tl.emotibit_timestamp - rd.emotibit_timestamp,
83            });
84        }
85    }
86    Ok(vec)
87}
88
89/// Creates a `TimeSyncMap`
90pub fn generate_sync_map(packets: &[Result<DataPacket>]) -> Result<TimeSyncMap> {
91    let filtered = packets
92        .iter()
93        .filter_map(|result| result.as_ref().ok())
94        .map(|p| p.emotibit_timestamp);
95
96    let emotibit_start_time = filtered.clone().reduce(f64::min).unwrap();
97    let emotibit_end_time = filtered.reduce(f64::max).unwrap();
98
99    let syncs = find_syncs(packets)?;
100
101    let quartiles: Vec<Option<&TimeSync>> = syncs
102        .chunks(num::integer::div_ceil(syncs.len(), 4))
103        .map(|x| {
104            x.iter()
105                .min_by(|a, b| a.round_trip.partial_cmp(&b.round_trip).unwrap())
106        })
107        .collect();
108
109    let best_timestamps = match (
110        quartiles.get(0),
111        quartiles.get(1),
112        quartiles.get(2),
113        quartiles.get(3),
114    ) {
115        (Some(Some(x)), _, _, Some(Some(y))) => Some((x, y)),
116        (_, Some(Some(x)), _, Some(Some(y))) => Some((x, y)),
117        (_, Some(Some(x)), Some(Some(y)), _) => Some((x, y)),
118        (Some(Some(x)), Some(Some(y)), _, _) => Some((x, y)),
119        (_, _, Some(Some(x)), Some(Some(y))) => Some((x, y)),
120        // TODO: add data from the same quartile
121        _ => None,
122    };
123
124    let (p0, p1) = best_timestamps
125        .ok_or_else(|| anyhow!("Cannot generate a time sync map from these:\n{:?}", syncs))?;
126    let (tl0, te0) = get_tl_te(p0)?;
127    let (tl1, te1) = get_tl_te(p1)?;
128
129    Ok(TimeSyncMap {
130        te0,
131        te1,
132        tl0,
133        tl1,
134        syncs_received: syncs.len(),
135        emotibit_start_time,
136        emotibit_end_time,
137        parse_version: PARSER_VERSION.to_owned(),
138    })
139}
140
141fn get_tl_te(sync: &TimeSync) -> Result<(f64, f64)> {
142    let e0 = sync.ts_received;
143    let ts = &sync.ts_sent;
144
145    let pos = ts
146        .rfind('-')
147        .ok_or_else(|| anyhow!("Invalid date string. : {:?}", sync))?;
148    let (head, tail) = ts.split_at(pos);
149
150    let naive_date_time = NaiveDateTime::parse_from_str(head, "%Y-%m-%d_%H-%M-%S")?;
151    let date_time: DateTime<Local> = Local.from_local_datetime(&naive_date_time).unwrap();
152    let c = date_time.timestamp();
153
154    let last_n_char = tail.len() - 1;
155    let m: f64 = tail[1..].parse()?;
156    let m = m / 10_i32.pow(last_n_char.try_into()?) as f64;
157
158    let mut c0 = (c as f64) + m;
159    c0 += sync.round_trip as f64 / 2_f64 / 1000_f64;
160
161    Ok((c0, e0))
162}