#![forbid(unsafe_code)]
#![forbid(clippy::unwrap_used)]
#![deny(nonstandard_style)]
#![warn(array_into_iter)]
#![warn(missing_docs)]
#![warn(rustdoc::all)]
#![deny(clippy::pedantic)]
#![allow(clippy::cast_precision_loss)] #![crate_type = "lib"]
#![crate_name = "xdf"]
use std::collections::HashMap;
use std::iter::Iterator;
use std::sync::Arc;
mod chunk_structs;
mod errors;
mod sample;
pub use sample::Sample;
mod streams;
mod util;
use chunk_structs::{BoundaryChunk, ClockOffsetChunk, FileHeaderChunk, StreamFooterChunk, StreamHeaderChunk};
use errors::{ParseError, StreamError, XDFError};
use log::warn;
use streams::Stream;
use util::FiniteF64;
use crate::chunk_structs::Chunk;
mod parsers;
use crate::parsers::xdf_file::xdf_file_parser;
type StreamID = u32;
type SampleIter = std::vec::IntoIter<Sample>;
#[derive(Debug, Clone, PartialEq)]
pub struct XDFFile {
pub version: f32,
pub header: xmltree::Element,
pub streams: Vec<Stream>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Format {
Int8,
Int16,
Int32,
Int64,
Float32,
Float64,
String,
}
#[allow(missing_docs)]
#[derive(Debug, Clone, PartialEq)]
pub enum Values {
Int8(Vec<i8>),
Int16(Vec<i16>),
Int32(Vec<i32>),
Int64(Vec<i64>),
Float32(Vec<f32>),
Float64(Vec<f64>),
String(String),
}
struct GroupedChunks {
stream_header_chunks: Vec<StreamHeaderChunk>,
stream_footer_chunks: Vec<StreamFooterChunk>,
clock_offsets: HashMap<StreamID, Vec<ClockOffsetChunk>>,
sample_map: HashMap<StreamID, Vec<SampleIter>>,
}
impl XDFFile {
pub fn from_bytes(bytes: &[u8]) -> Result<Self, XDFError> {
let (input, chunks) = xdf_file_parser(bytes)
.map_err(|e| match e {
nom::Err::Incomplete(n) => nom::Err::Incomplete(n),
nom::Err::Error(nom::error::Error { input, code }) => nom::Err::Error(nom::error::Error {
input: Arc::from(input.to_owned()),
code,
}),
nom::Err::Failure(nom::error::Error { input, code }) => nom::Err::Failure(nom::error::Error {
input: Arc::from(input.to_owned()),
code,
}),
})
.map_err(ParseError::from)?;
if !input.is_empty() {
warn!("There are {} bytes left in the input after parsing.", input.len());
}
let (file_header_chunk, grouped_chunks) = group_chunks(chunks)?;
let streams = process_streams(grouped_chunks)?;
Ok(Self {
version: file_header_chunk.version,
header: file_header_chunk.xml,
streams,
})
}
}
fn group_chunks(chunks: Vec<Chunk>) -> Result<(FileHeaderChunk, GroupedChunks), XDFError> {
let mut file_header_chunk: Option<FileHeaderChunk> = None;
let mut stream_header_chunks: Vec<StreamHeaderChunk> = Vec::new();
let mut stream_footer_chunks: Vec<StreamFooterChunk> = Vec::new();
let mut clock_offsets: HashMap<StreamID, Vec<ClockOffsetChunk>> = HashMap::new();
let sample_map = chunks
.into_iter()
.filter_map(|chunk_res| {
match chunk_res {
Chunk::FileHeader(c) => {
file_header_chunk = Some(c);
None
}
Chunk::StreamHeader(c) => {
stream_header_chunks.push(c);
None
}
Chunk::StreamFooter(c) => {
stream_footer_chunks.push(c);
None
}
Chunk::Samples(c) => Some(c), Chunk::ClockOffset(c) => {
clock_offsets.entry(c.stream_id).or_default().push(c);
None
}
Chunk::Boundary(_) => None, }
})
.fold(
HashMap::new(),
|mut map: HashMap<StreamID, Vec<SampleIter>>, chunk| {
map.entry(chunk.stream_id).or_default().push(chunk.samples.into_iter());
map
},
);
let file_header_chunk = file_header_chunk.ok_or(StreamError::MissingFileHeader)?;
let info = GroupedChunks {
stream_header_chunks,
stream_footer_chunks,
clock_offsets,
sample_map,
};
Ok((file_header_chunk, info))
}
fn process_streams(mut grouped_chunks: GroupedChunks) -> Result<Vec<Stream>, XDFError> {
let stream_header_map: HashMap<StreamID, StreamHeaderChunk> = grouped_chunks
.stream_header_chunks
.into_iter()
.map(|s| (s.stream_id, s))
.collect();
let mut stream_footer_map: HashMap<StreamID, StreamFooterChunk> = grouped_chunks
.stream_footer_chunks
.into_iter()
.map(|s| (s.stream_id, s))
.collect();
for &stream_id in stream_header_map.keys() {
if !stream_footer_map.contains_key(&stream_id) {
warn!("Stream header without corresponding stream footer for id: {stream_id}");
}
}
for &stream_id in stream_footer_map.keys() {
if !stream_header_map.contains_key(&stream_id) {
warn!("Stream footer without corresponding stream header for id: {stream_id}");
}
}
let mut streams_vec: Vec<Stream> = Vec::new();
for (stream_id, stream_header) in stream_header_map {
let stream_footer = stream_footer_map.remove(&stream_id);
let name = stream_header.info.name.as_ref().map(|name| Arc::from(name.as_str()));
let stream_type = stream_header
.info
.stream_type
.as_ref()
.map(|stream_type| Arc::from(stream_type.as_str()));
let mut stream_offsets = grouped_chunks
.clock_offsets
.remove(&stream_header.stream_id)
.unwrap_or_default();
stream_offsets.retain(|o| o.collection_time.is_finite() && o.offset_value.is_finite());
if !stream_offsets.is_sorted() {
return Err(ParseError::InvalidClockOffset.into());
}
let samples_vec: Vec<Sample> = process_samples(
grouped_chunks.sample_map.remove(&stream_id).unwrap_or_default(),
&stream_offsets,
stream_header.info.nominal_srate,
);
let measured_srate = if stream_header.info.nominal_srate.is_some() {
let first_timestamp: Option<f64> = samples_vec.first().and_then(|s| s.timestamp);
let last_timestamp: Option<f64> = samples_vec.last().and_then(|s| s.timestamp);
if let (Some(first_timestamp), Some(last_timestamp)) = (first_timestamp, last_timestamp) {
let delta = last_timestamp - first_timestamp;
if delta <= 0.0 || !delta.is_finite() {
None } else {
Some(samples_vec.len() as f64 / delta)
}
} else {
None
}
} else {
None
};
let stream = Stream {
id: stream_id,
channel_count: stream_header.info.channel_count,
nominal_srate: stream_header.info.nominal_srate,
format: stream_header.info.channel_format,
name,
r#type: stream_type,
header: stream_header.xml,
footer: stream_footer.map(|s| s.xml),
measured_srate,
samples: samples_vec,
};
streams_vec.push(stream);
}
Ok(streams_vec)
}
fn process_samples(
mut sample_iterators: Vec<SampleIter>,
stream_offsets: &[ClockOffsetChunk],
nominal_srate: Option<f64>,
) -> Vec<Sample> {
debug_assert!(stream_offsets
.iter()
.all(|o| o.stream_id == stream_offsets[0].stream_id));
let mut offset_index: usize = 0;
let mut most_recent_timestamp = (0_usize, 0_f64);
let mut sample_iterators_merged = vec![];
if let Some((first, rest)) = sample_iterators.split_first_mut() {
let mut first = first.peekable();
let first_ts = first
.peek()
.and_then(|s| s.timestamp)
.and_then(FiniteF64::new)
.unwrap_or(FiniteF64::zero());
sample_iterators_merged.push((first_ts, vec![first]));
for it in rest {
let mut it = it.peekable();
if let Some(first_sample) = it.peek() {
if let Some(ts) = first_sample.timestamp.and_then(FiniteF64::new) {
sample_iterators_merged.push((ts, vec![it]));
} else {
if let Some(v) = sample_iterators_merged.last_mut() {
v.1.push(it);
}
}
}
}
}
sample_iterators_merged.sort_by_key(|t| t.0);
let sample_iterators = sample_iterators_merged
.into_iter()
.flat_map(|t| t.1)
.map(Iterator::peekable)
.filter_map(|mut it| if it.peek().is_none() { None } else { Some(it) });
sample_iterators
.into_iter()
.flatten()
.enumerate()
.map(|(i, s)| -> Sample {
if let Some(srate) = nominal_srate {
let timestamp = if let Some(timestamp) = s.timestamp {
most_recent_timestamp = (i, timestamp);
s.timestamp
} else {
let (old_i, old_timestamp) = most_recent_timestamp;
let samples_since_ts = i - old_i;
Some(old_timestamp + (samples_since_ts as f64 / srate))
};
let timestamp = timestamp.map(|ts| interpolate_and_add_offsets(ts, stream_offsets, &mut offset_index));
Sample {
timestamp,
values: s.values,
}
} else {
s
}
})
.collect()
}
fn interpolate_and_add_offsets(ts: f64, stream_offsets: &[ClockOffsetChunk], offset_index: &mut usize) -> f64 {
if stream_offsets.is_empty() {
ts } else {
let time_or_nan = |i: usize| {
stream_offsets
.get(i + 1)
.map_or(f64::NAN, |c: &ClockOffsetChunk| c.collection_time)
};
if ts < stream_offsets[0].collection_time {
return ts + stream_offsets[0].offset_value;
}
while ts > time_or_nan(*offset_index) {
*offset_index += 1;
}
let prev_offset = stream_offsets.get(*offset_index).or_else(|| stream_offsets.last());
let next_offset = stream_offsets.get(*offset_index + 1).or_else(|| stream_offsets.last());
let interpolated = if let (Some(l), Some(n)) = (prev_offset, next_offset) {
let dt = n.collection_time - l.collection_time;
if dt > 0.0 {
let t_normalised = (ts - l.collection_time) / dt;
l.offset_value * (1.0 - t_normalised) + n.offset_value * t_normalised
} else {
l.offset_value
}
} else {
prev_offset.or(next_offset).map_or(0.0, |c| c.offset_value)
};
ts + interpolated
}
}
#[cfg(test)]
mod tests {
use super::*;
const EPSILON: f64 = 1E-14;
#[test]
fn test_interpolation_bad_offset() {
let offsets = vec![
ClockOffsetChunk {
collection_time: 0.0,
offset_value: -1.0,
stream_id: 0,
},
ClockOffsetChunk {
collection_time: 1.0,
offset_value: 1.0,
stream_id: 0,
},
];
let first_offset = offsets.first().unwrap();
let timestamp = first_offset.collection_time - 1.0;
let mut offset_index = 1;
interpolate_and_add_offsets(timestamp, &offsets, &mut offset_index);
}
#[test]
fn test_interpolation_inside() {
const TEST_VALUES: [((f64, f64), (f64, f64)); 4] = [
((0.0, -1.0), (1.0, 1.0)),
((0.0, 0.0), (1.0, 1.0)),
((0.0, -1.0), (1.0, 5.0)),
((4.0, -1.0), (5.0, 2.0)),
];
for ((s1_t, s1_v), (s2_t, s2_v)) in TEST_VALUES {
let offsets = vec![
ClockOffsetChunk {
collection_time: s1_t,
offset_value: s1_v,
stream_id: 0,
},
ClockOffsetChunk {
collection_time: s2_t,
offset_value: s2_v,
stream_id: 0,
},
];
let incline = (offsets[1].offset_value - offsets[0].offset_value)
/ (offsets[1].collection_time - offsets[0].collection_time);
let first_pos = (
offsets.first().unwrap().collection_time,
offsets.first().unwrap().offset_value,
);
let linspace = |start: f64, end: f64, n: usize| {
(0..n)
.map(|i| start + (end - start) * (i as f64) / (n as f64))
.collect::<Vec<f64>>()
};
for timestamp in linspace(s1_t, s2_t, 100) {
let mut offset_index = 0;
let interpolated = interpolate_and_add_offsets(timestamp, &offsets, &mut offset_index);
let expected: f64 = timestamp + ((timestamp - first_pos.0) * incline + first_pos.1);
assert!(
(interpolated - expected).abs() < EPSILON,
"expected {interpolated} to be within {EPSILON} of {expected}"
);
}
}
}
#[test]
fn test_interpolation_after() {
let offsets = vec![
ClockOffsetChunk {
collection_time: 0.0,
offset_value: -1.0,
stream_id: 0,
},
ClockOffsetChunk {
collection_time: 1.0,
offset_value: 1.0,
stream_id: 0,
},
ClockOffsetChunk {
collection_time: 3.0,
offset_value: 2.0,
stream_id: 0,
},
];
let last_offset = offsets.last().unwrap();
let timestamp = last_offset.collection_time + 1.0;
let mut offset_index = 0;
let interpolated = interpolate_and_add_offsets(timestamp, &offsets, &mut offset_index);
let expected = timestamp + last_offset.offset_value;
assert!(
(interpolated - expected).abs() < EPSILON,
"expected {interpolated} to be within {EPSILON} of {expected}"
);
}
#[test]
fn test_interpolation_before() {
let offsets = vec![
ClockOffsetChunk {
collection_time: 0.0,
offset_value: -1.0,
stream_id: 0,
},
ClockOffsetChunk {
collection_time: 1.0,
offset_value: 1.0,
stream_id: 0,
},
];
let first_offset = offsets.first().unwrap();
let timestamp = first_offset.collection_time - 1.0;
let mut offset_index = 0;
let interpolated = interpolate_and_add_offsets(timestamp, &offsets, &mut offset_index);
let expected = timestamp + first_offset.offset_value;
assert!(
(interpolated - expected).abs() < EPSILON,
"expected {interpolated} to be within {EPSILON} of {expected}"
);
}
#[test]
#[allow(clippy::float_cmp)]
fn test_no_offsets() {
let offsets = vec![];
let mut offset_index = 0;
for i in -20..=20 {
let timestamp = f64::from(i) / 10.0;
let res = interpolate_and_add_offsets(timestamp, &offsets, &mut offset_index);
assert_eq!(timestamp, res);
}
}
#[test]
const fn test_is_sync() {
const fn is_sync<T: Sync>() {}
is_sync::<XDFFile>();
}
#[test]
const fn test_is_send() {
const fn is_send<T: Send>() {}
is_send::<XDFFile>();
}
}