use std::collections::{BTreeSet, HashMap, VecDeque};
use std::iter::Iterator;
use std::ops::{Add, Mul, Sub};
use std::time::Duration;
use crate::ring_buffer::Step;
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum SynchronizationStrategy {
None,
#[default]
ZeroOrderHold,
Linear,
}
pub trait Interpolatable:
Copy + Add<Output = Self> + Sub<Output = Self> + Mul<f64, Output = Self>
{
}
impl Interpolatable for f64 {}
pub struct Synchronizer<T> {
strategy: SynchronizationStrategy,
last_steps: HashMap<&'static str, Step<T>>,
timeline: BTreeSet<Duration>,
pub pending: VecDeque<Step<T>>,
}
impl<T> Synchronizer<T>
where
T: Interpolatable,
{
pub fn new(strategy: SynchronizationStrategy) -> Self {
Self {
strategy,
last_steps: HashMap::new(),
timeline: BTreeSet::new(),
pending: VecDeque::new(),
}
}
pub fn strategy(&self) -> SynchronizationStrategy {
self.strategy
}
pub fn evaluate(&mut self, current_step: Step<T>) {
let signal_id = current_step.signal;
let current_time = current_step.timestamp;
if let Some(prev_step) = self.last_steps.get(&signal_id)
&& current_time <= prev_step.timestamp
{
eprintln!(
"Warning: Ignoring step for signal '{}' at {:?}. Timestamp must be strictly increasing (last: {:?}).",
signal_id, current_time, prev_step.timestamp
);
return;
}
if self.strategy == SynchronizationStrategy::None {
self.last_steps.insert(signal_id, current_step.clone());
self.pending.push_back(current_step);
return;
}
let current_value = current_step.value;
self.timeline.insert(current_time);
if let Some(prev_step) = self.last_steps.get(&signal_id).cloned() {
let prev_time = prev_step.timestamp;
let prev_val = prev_step.value;
if current_time > prev_time {
let missed_timestamps: Vec<Duration> = self
.timeline
.range((
std::ops::Bound::Excluded(prev_time),
std::ops::Bound::Excluded(current_time),
))
.cloned()
.collect();
for t in missed_timestamps {
let interp_val = match self.strategy {
SynchronizationStrategy::None => current_value, SynchronizationStrategy::ZeroOrderHold => prev_val,
SynchronizationStrategy::Linear => {
let dt_total = current_time.as_secs_f64() - prev_time.as_secs_f64();
let dt_curr = t.as_secs_f64() - prev_time.as_secs_f64();
let alpha = if dt_total != 0.0 {
dt_curr / dt_total
} else {
0.0
};
prev_val + (current_value - prev_val) * alpha
}
};
self.pending.push_back(Step {
signal: signal_id,
timestamp: t,
value: interp_val,
});
}
}
}
self.last_steps.insert(signal_id, current_step.clone());
self.pending.push_back(current_step);
self.prune_history();
}
fn prune_history(&mut self) {
if self.last_steps.is_empty() {
return;
}
let min_frontier = self.last_steps.values().map(|s| s.timestamp).min();
if let Some(frontier) = min_frontier {
let keep = self.timeline.split_off(&frontier);
self.timeline = keep;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_synchronizer_zero_order_hold() {
let steps = vec![
Step {
signal: "B",
value: 0.0,
timestamp: Duration::from_secs(0),
},
Step {
signal: "A",
value: 1.0,
timestamp: Duration::from_secs(1),
},
Step {
signal: "A",
value: 10.0,
timestamp: Duration::from_secs(2),
},
Step {
signal: "A",
value: 3.0,
timestamp: Duration::from_secs(4),
},
Step {
signal: "B",
value: 30.0,
timestamp: Duration::from_secs(5),
},
];
let mut sync = Synchronizer::new(SynchronizationStrategy::ZeroOrderHold);
let mut result = Vec::new();
for step in &steps {
sync.evaluate(step.clone());
while let Some(s) = sync.pending.pop_front() {
println!("Popped step: {:?}", s);
result.push(s);
}
}
assert!(result.iter().any(|s| s.signal == "B"
&& (s.timestamp == Duration::from_secs(1)
|| s.timestamp == Duration::from_secs(2)
|| s.timestamp == Duration::from_secs(4))
&& s.value == 0.0));
}
#[test]
fn test_synchronizer_linear() {
let steps = vec![
Step {
signal: "A",
value: 0.0,
timestamp: Duration::from_secs(0),
},
Step {
signal: "B",
value: 0.0,
timestamp: Duration::from_secs(0),
},
Step {
signal: "A",
value: 10.0,
timestamp: Duration::from_secs(2),
},
Step {
signal: "B",
value: 20.0,
timestamp: Duration::from_secs(4),
},
];
let mut sync = Synchronizer::new(SynchronizationStrategy::Linear);
let mut result = Vec::new();
for step in &steps {
sync.evaluate(step.clone());
while let Some(s) = sync.pending.pop_front() {
result.push(s);
}
}
assert!(result.iter().any(|s| s.signal == "B"
&& s.timestamp == Duration::from_secs(2)
&& (s.value - 10.0).abs() < 1e-6));
}
#[test]
fn test_non_increasing_timestamp_ignored() {
let mut sync = Synchronizer::new(SynchronizationStrategy::ZeroOrderHold);
sync.evaluate(Step {
signal: "A",
value: 10.0,
timestamp: Duration::from_secs(2),
});
assert_eq!(sync.pending.len(), 1);
sync.pending.clear();
sync.evaluate(Step {
signal: "A",
value: 15.0,
timestamp: Duration::from_secs(3),
});
assert_eq!(sync.pending.len(), 1);
sync.pending.clear();
sync.evaluate(Step {
signal: "A",
value: 20.0,
timestamp: Duration::from_secs(3),
});
assert_eq!(sync.pending.len(), 0, "Equal timestamp should be ignored");
sync.evaluate(Step {
signal: "A",
value: 25.0,
timestamp: Duration::from_secs(1),
});
assert_eq!(
sync.pending.len(),
0,
"Decreasing timestamp should be ignored"
);
sync.evaluate(Step {
signal: "A",
value: 30.0,
timestamp: Duration::from_secs(5),
});
assert_eq!(sync.pending.len(), 1);
}
#[test]
fn test_different_signals_independent_timestamps() {
let mut sync = Synchronizer::new(SynchronizationStrategy::None);
sync.evaluate(Step {
signal: "A",
value: 10.0,
timestamp: Duration::from_secs(5),
});
assert_eq!(sync.pending.len(), 1);
sync.pending.clear();
sync.evaluate(Step {
signal: "B",
value: 20.0,
timestamp: Duration::from_secs(2),
});
assert_eq!(sync.pending.len(), 1);
sync.pending.clear();
sync.evaluate(Step {
signal: "A",
value: 15.0,
timestamp: Duration::from_secs(3),
});
assert_eq!(sync.pending.len(), 0, "Signal A timestamp must be > 5");
sync.evaluate(Step {
signal: "B",
value: 25.0,
timestamp: Duration::from_secs(3),
});
assert_eq!(sync.pending.len(), 1);
}
}