#[derive(Debug, Clone)]
pub struct TimestampedRow<T: Clone> {
pub timestamp_ms: i64,
pub data: T,
}
#[derive(Debug, Clone)]
pub struct AsofMatch<L: Clone, R: Clone> {
pub left: TimestampedRow<L>,
pub right: Option<TimestampedRow<R>>,
}
pub fn asof_join<L: Clone, R: Clone>(
left: &[TimestampedRow<L>],
right: &[TimestampedRow<R>],
tolerance_ms: i64,
) -> Vec<AsofMatch<L, R>> {
let mut results = Vec::with_capacity(left.len());
let mut right_cursor = 0usize;
for left_row in left {
while right_cursor < right.len()
&& right[right_cursor].timestamp_ms <= left_row.timestamp_ms
{
right_cursor += 1;
}
let matched = if right_cursor > 0 {
let candidate = &right[right_cursor - 1];
let gap = left_row.timestamp_ms - candidate.timestamp_ms;
if gap >= 0 && (tolerance_ms == i64::MAX || gap <= tolerance_ms) {
Some(candidate.clone())
} else {
None
}
} else {
None
};
results.push(AsofMatch {
left: left_row.clone(),
right: matched,
});
}
results
}
pub fn asof_join_keyed<L, R, K>(
left: &[TimestampedRow<L>],
right: &[TimestampedRow<R>],
tolerance_ms: i64,
left_key: impl Fn(&L) -> K,
right_key: impl Fn(&R) -> K,
) -> Vec<AsofMatch<L, R>>
where
L: Clone,
R: Clone,
K: Eq + std::hash::Hash + Clone,
{
use std::collections::HashMap;
let mut right_groups: HashMap<K, Vec<&TimestampedRow<R>>> = HashMap::new();
for row in right {
right_groups
.entry(right_key(&row.data))
.or_default()
.push(row);
}
let mut results = Vec::with_capacity(left.len());
for left_row in left {
let key = left_key(&left_row.data);
let matched = if let Some(group) = right_groups.get(&key) {
let target = left_row.timestamp_ms;
let pos = group.partition_point(|r| r.timestamp_ms <= target);
if pos > 0 {
let candidate = group[pos - 1];
let gap = target - candidate.timestamp_ms;
if tolerance_ms == i64::MAX || gap <= tolerance_ms {
Some(candidate.clone())
} else {
None
}
} else {
None
}
} else {
None
};
results.push(AsofMatch {
left: left_row.clone(),
right: matched,
});
}
results
}
#[cfg(test)]
mod tests {
use super::*;
fn row(ts: i64, data: &str) -> TimestampedRow<String> {
TimestampedRow {
timestamp_ms: ts,
data: data.to_string(),
}
}
#[test]
fn basic_asof_join() {
let left = vec![row(100, "a"), row(200, "b"), row(300, "c")];
let right = vec![row(90, "x"), row(150, "y"), row(250, "z")];
let results = asof_join(&left, &right, i64::MAX);
assert_eq!(results.len(), 3);
assert_eq!(results[0].right.as_ref().unwrap().data, "x");
assert_eq!(results[1].right.as_ref().unwrap().data, "y");
assert_eq!(results[2].right.as_ref().unwrap().data, "z");
}
#[test]
fn exact_match() {
let left = vec![row(100, "a"), row(200, "b")];
let right = vec![row(100, "x"), row(200, "y")];
let results = asof_join(&left, &right, 0);
assert_eq!(results[0].right.as_ref().unwrap().data, "x");
assert_eq!(results[1].right.as_ref().unwrap().data, "y");
}
#[test]
fn tolerance_window() {
let left = vec![row(100, "a"), row(200, "b")];
let right = vec![row(50, "x"), row(180, "y")];
let results = asof_join(&left, &right, 10);
assert!(results[0].right.is_none());
assert!(results[1].right.is_none());
let results = asof_join(&left, &right, 50);
assert_eq!(results[0].right.as_ref().unwrap().data, "x"); assert_eq!(results[1].right.as_ref().unwrap().data, "y"); }
#[test]
fn no_right_rows() {
let left = vec![row(100, "a")];
let right: Vec<TimestampedRow<String>> = vec![];
let results = asof_join(&left, &right, i64::MAX);
assert!(results[0].right.is_none());
}
#[test]
fn right_all_after_left() {
let left = vec![row(100, "a")];
let right = vec![row(200, "x"), row(300, "y")];
let results = asof_join(&left, &right, i64::MAX);
assert!(results[0].right.is_none());
}
#[test]
fn keyed_asof_join() {
let left = vec![
TimestampedRow {
timestamp_ms: 100,
data: ("host-a", 1.0),
},
TimestampedRow {
timestamp_ms: 200,
data: ("host-b", 2.0),
},
TimestampedRow {
timestamp_ms: 300,
data: ("host-a", 3.0),
},
];
let right = vec![
TimestampedRow {
timestamp_ms: 90,
data: ("host-a", 10.0),
},
TimestampedRow {
timestamp_ms: 150,
data: ("host-b", 20.0),
},
TimestampedRow {
timestamp_ms: 250,
data: ("host-a", 30.0),
},
];
let results = asof_join_keyed(&left, &right, i64::MAX, |d| d.0, |d| d.0);
assert_eq!(results.len(), 3);
assert_eq!(results[0].right.as_ref().unwrap().data.1, 10.0);
assert_eq!(results[1].right.as_ref().unwrap().data.1, 20.0);
assert_eq!(results[2].right.as_ref().unwrap().data.1, 30.0);
}
}