use crate::types::Atom;
use serde::Serialize;
use std::time::Duration;
#[derive(Debug, Clone)]
pub enum TemporalJoinType {
Exact,
AsOf { tolerance: Duration },
Interpolated,
ForwardFill,
}
#[derive(Debug, Clone, Serialize)]
pub struct JoinedRow {
pub timestamp: u64,
pub left_key: String,
pub left_value: Atom,
pub right_key: String,
pub right_value: Option<Atom>,
}
pub type TimeEntry = (u64, String, Atom);
pub fn temporal_join(
left: &[TimeEntry],
right: &[TimeEntry],
join_type: &TemporalJoinType,
) -> Vec<JoinedRow> {
match join_type {
TemporalJoinType::Exact => join_exact(left, right),
TemporalJoinType::AsOf { tolerance } => join_asof(left, right, *tolerance),
TemporalJoinType::Interpolated => join_interpolated(left, right),
TemporalJoinType::ForwardFill => join_forward_fill(left, right),
}
}
fn join_exact(left: &[TimeEntry], right: &[TimeEntry]) -> Vec<JoinedRow> {
let right_map: std::collections::HashMap<u64, &TimeEntry> =
right.iter().map(|e| (e.0, e)).collect();
left.iter()
.map(|(ts, lk, lv)| {
let right_match = right_map.get(ts);
JoinedRow {
timestamp: *ts,
left_key: lk.clone(),
left_value: lv.clone(),
right_key: right_match.map(|r| r.1.clone()).unwrap_or_default(),
right_value: right_match.map(|r| r.2.clone()),
}
})
.collect()
}
fn join_asof(left: &[TimeEntry], right: &[TimeEntry], tolerance: Duration) -> Vec<JoinedRow> {
let tol_micros = tolerance.as_micros() as u64;
let mut right_idx = 0;
left.iter()
.map(|(ts, lk, lv)| {
while right_idx + 1 < right.len() && right[right_idx + 1].0 <= *ts {
right_idx += 1;
}
let right_match = if right_idx < right.len() && right[right_idx].0 <= *ts {
let diff = ts - right[right_idx].0;
if diff <= tol_micros {
Some(&right[right_idx])
} else {
None
}
} else {
None
};
JoinedRow {
timestamp: *ts,
left_key: lk.clone(),
left_value: lv.clone(),
right_key: right_match.map(|r| r.1.clone()).unwrap_or_default(),
right_value: right_match.map(|r| r.2.clone()),
}
})
.collect()
}
fn join_interpolated(left: &[TimeEntry], right: &[TimeEntry]) -> Vec<JoinedRow> {
left.iter()
.map(|(ts, lk, lv)| {
let interpolated = interpolate_at(right, *ts);
JoinedRow {
timestamp: *ts,
left_key: lk.clone(),
left_value: lv.clone(),
right_key: right.first().map(|r| r.1.clone()).unwrap_or_default(),
right_value: interpolated,
}
})
.collect()
}
fn join_forward_fill(left: &[TimeEntry], right: &[TimeEntry]) -> Vec<JoinedRow> {
let mut right_idx = 0;
left.iter()
.map(|(ts, lk, lv)| {
while right_idx + 1 < right.len() && right[right_idx + 1].0 <= *ts {
right_idx += 1;
}
let right_match = if right_idx < right.len() && right[right_idx].0 <= *ts {
Some(&right[right_idx])
} else {
None
};
JoinedRow {
timestamp: *ts,
left_key: lk.clone(),
left_value: lv.clone(),
right_key: right_match.map(|r| r.1.clone()).unwrap_or_default(),
right_value: right_match.map(|r| r.2.clone()),
}
})
.collect()
}
fn interpolate_at(series: &[TimeEntry], ts: u64) -> Option<Atom> {
if series.is_empty() {
return None;
}
let after_idx = series.iter().position(|e| e.0 >= ts);
match after_idx {
None => {
Some(series.last().unwrap().2.clone())
}
Some(0) => {
Some(series[0].2.clone())
}
Some(idx) => {
let (t0, _, v0) = &series[idx - 1];
let (t1, _, v1) = &series[idx];
match (v0, v1) {
(Atom::Float(f0), Atom::Float(f1)) => {
if t1 > t0 {
let frac = (ts - t0) as f64 / (t1 - t0) as f64;
Some(Atom::Float(f0 + frac * (f1 - f0)))
} else {
Some(Atom::Float(*f0))
}
}
_ => {
if ts - t0 <= t1 - ts {
Some(v0.clone())
} else {
Some(v1.clone())
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_float_entries(data: &[(u64, f64)]) -> Vec<TimeEntry> {
data.iter()
.map(|(ts, v)| (*ts, "key".to_string(), Atom::Float(*v)))
.collect()
}
#[test]
fn test_exact_join() {
let left = make_float_entries(&[(100, 1.0), (200, 2.0), (300, 3.0)]);
let right = make_float_entries(&[(100, 10.0), (300, 30.0)]);
let result = temporal_join(&left, &right, &TemporalJoinType::Exact);
assert_eq!(result.len(), 3);
assert_eq!(result[0].right_value, Some(Atom::Float(10.0)));
assert_eq!(result[1].right_value, None); assert_eq!(result[2].right_value, Some(Atom::Float(30.0)));
}
#[test]
fn test_asof_join() {
let left = make_float_entries(&[(100, 1.0), (200, 2.0), (300, 3.0)]);
let right = make_float_entries(&[(90, 9.0), (250, 25.0)]);
let result = temporal_join(
&left,
&right,
&TemporalJoinType::AsOf {
tolerance: Duration::from_micros(50),
},
);
assert_eq!(result[0].right_value, Some(Atom::Float(9.0)));
assert_eq!(result[1].right_value, None);
assert_eq!(result[2].right_value, Some(Atom::Float(25.0)));
}
#[test]
fn test_interpolated_join() {
let left = make_float_entries(&[(150, 1.0)]);
let right = make_float_entries(&[(100, 10.0), (200, 20.0)]);
let result = temporal_join(&left, &right, &TemporalJoinType::Interpolated);
assert_eq!(result.len(), 1);
match &result[0].right_value {
Some(Atom::Float(v)) => assert!((v - 15.0).abs() < 1e-10),
other => panic!("expected Float(15.0), got {:?}", other),
}
}
#[test]
fn test_forward_fill_join() {
let left = make_float_entries(&[(100, 1.0), (200, 2.0), (300, 3.0)]);
let right = make_float_entries(&[(50, 5.0), (250, 25.0)]);
let result = temporal_join(&left, &right, &TemporalJoinType::ForwardFill);
assert_eq!(result[0].right_value, Some(Atom::Float(5.0)));
assert_eq!(result[1].right_value, Some(Atom::Float(5.0)));
assert_eq!(result[2].right_value, Some(Atom::Float(25.0)));
}
#[test]
fn test_empty_right() {
let left = make_float_entries(&[(100, 1.0)]);
let right: Vec<TimeEntry> = vec![];
let result = temporal_join(&left, &right, &TemporalJoinType::Exact);
assert_eq!(result.len(), 1);
assert_eq!(result[0].right_value, None);
}
}