use crate::AletheiaDB;
use crate::core::error::{Error, Result, StorageError};
use crate::core::history::EntityHistory;
use crate::core::id::NodeId;
use crate::core::temporal::TimeRange;
use crate::core::vector::ops::euclidean_distance;
const MAX_BINS: usize = 1_000_000;
#[derive(Debug, Clone)]
pub struct RippleConfig {
pub window: TimeRange,
pub bin_size_us: i64,
pub max_lag_bins: usize,
pub min_correlation: f32,
}
impl Default for RippleConfig {
fn default() -> Self {
let hour_us = 3600 * 1_000_000;
Self {
window: TimeRange::new(0.into(), hour_us.into()).unwrap(),
bin_size_us: 1_000_000, max_lag_bins: 10,
min_correlation: 0.5,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct RippleEffect {
pub source: NodeId,
pub target: NodeId,
pub lag_bins: usize,
pub lag_us: i64,
pub correlation: f32,
pub confidence: f32,
}
pub struct RippleDetector<'a> {
db: &'a AletheiaDB,
}
impl<'a> RippleDetector<'a> {
pub fn new(db: &'a AletheiaDB) -> Self {
Self { db }
}
pub fn detect_causality(
&self,
source: NodeId,
target: NodeId,
config: &RippleConfig,
property: &str,
) -> Result<Option<RippleEffect>> {
let source_hist = self.db.get_node_history(source)?;
let target_hist = self.db.get_node_history(target)?;
let source_flux = self.compute_flux(&source_hist, config, property)?;
let target_flux = self.compute_flux(&target_hist, config, property)?;
if source_flux.iter().all(|&x| x == 0.0) || target_flux.iter().all(|&x| x == 0.0) {
return Ok(None);
}
let (best_lag, max_corr) =
self.cross_correlate(&source_flux, &target_flux, config.max_lag_bins);
if max_corr >= config.min_correlation {
Ok(Some(RippleEffect {
source,
target,
lag_bins: best_lag,
lag_us: best_lag as i64 * config.bin_size_us,
correlation: max_corr,
confidence: 1.0, }))
} else {
Ok(None)
}
}
fn compute_flux(
&self,
history: &EntityHistory,
config: &RippleConfig,
property: &str,
) -> Result<Vec<f32>> {
let start_time = config.window.start().wallclock();
let end_time = config.window.end().wallclock();
let duration = end_time - start_time;
if duration <= 0 {
return Ok(Vec::new());
}
let num_bins = (duration / config.bin_size_us) as usize + 1;
if num_bins > MAX_BINS {
return Err(Error::Storage(StorageError::CapacityExceeded {
resource: "Ripple detection bins".to_string(),
current: num_bins,
limit: MAX_BINS,
}));
}
let mut flux = vec![0.0; num_bins];
let mut prev_vec: Option<Vec<f32>> = None;
for version in &history.versions {
let ts = version.temporal.valid_time().start().wallclock();
let current_vec = version
.properties
.get(property)
.and_then(|v| v.as_vector())
.map(|v| v.to_vec());
#[allow(clippy::collapsible_if)]
if ts >= start_time && ts < end_time {
if let (Some(curr), Some(prev)) = (¤t_vec, &prev_vec) {
let dist = euclidean_distance(curr, prev)?;
let bin_idx = ((ts - start_time) / config.bin_size_us) as usize;
if bin_idx < num_bins {
flux[bin_idx] += dist;
}
}
}
if current_vec.is_some() {
prev_vec = current_vec;
}
}
Ok(flux)
}
fn cross_correlate(&self, source: &[f32], target: &[f32], max_lag: usize) -> (usize, f32) {
let n = source.len().min(target.len());
if n == 0 {
return (0, 0.0);
}
let source = &source[0..n];
let target = &target[0..n];
let mut best_lag = 0;
let mut max_corr = -1.0;
for lag in 0..=max_lag {
if lag >= n {
break;
}
let s_slice = &source[0..n - lag];
let t_slice = &target[lag..n];
let corr = self.pearson_correlation(s_slice, t_slice);
if corr > max_corr {
max_corr = corr;
best_lag = lag;
}
}
(best_lag, max_corr)
}
fn pearson_correlation(&self, x: &[f32], y: &[f32]) -> f32 {
let n = x.len();
if n < 2 {
return 0.0;
}
let mean_x = x.iter().sum::<f32>() / n as f32;
let mean_y = y.iter().sum::<f32>() / n as f32;
let mut num = 0.0;
let mut den_x = 0.0;
let mut den_y = 0.0;
for i in 0..n {
let dx = x[i] - mean_x;
let dy = y[i] - mean_y;
num += dx * dy;
den_x += dx * dx;
den_y += dy * dy;
}
if den_x == 0.0 || den_y == 0.0 {
return 0.0;
}
num / (den_x.sqrt() * den_y.sqrt())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::transaction::WriteOps;
use crate::core::property::PropertyMapBuilder;
use crate::core::temporal::time;
#[test]
fn test_ripple_exact_match_zero_lag() {
let db = AletheiaDB::new().unwrap();
let t0 = time::now();
let bin_size = 1_000;
let props_a = PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 0.0])
.build();
let a = db.create_node("Source", props_a).unwrap();
let props_b = PropertyMapBuilder::new()
.insert_vector("vec", &[0.0, 0.0])
.build();
let b = db.create_node("Target", props_b).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10)); let _t1 = time::now();
db.write(|tx| {
tx.update_node(
a,
PropertyMapBuilder::new()
.insert_vector("vec", &[1.0, 0.0])
.build(),
)?;
tx.update_node(
b,
PropertyMapBuilder::new()
.insert_vector("vec", &[1.0, 0.0])
.build(),
)
})
.unwrap();
let t_end = time::now();
let detector = RippleDetector::new(&db);
let config = RippleConfig {
window: TimeRange::new(t0, t_end).unwrap(),
bin_size_us: bin_size, max_lag_bins: 5,
min_correlation: 0.9,
};
let ripple = detector
.detect_causality(a, b, &config, "vec")
.unwrap()
.unwrap();
assert_eq!(ripple.lag_bins, 0);
assert!(ripple.correlation > 0.9);
}
#[test]
fn test_ripple_lagged() {
let db = AletheiaDB::new().unwrap();
let t_base = time::now().wallclock();
let bin = 1_000_000;
let props = PropertyMapBuilder::new()
.insert_vector("vec", &[0.0])
.build();
let update_at = |id: NodeId, offset_us: i64, val: f32| {
let ts = crate::core::temporal::Timestamp::new(t_base + offset_us, 0).unwrap();
let mut tx = db.write_transaction().unwrap();
tx.update_node_with_valid_time(
id,
PropertyMapBuilder::new()
.insert_vector("vec", &[val])
.build(),
Some(ts),
)
.unwrap();
tx.commit().unwrap();
};
let a = db.create_node("S", props.clone()).unwrap();
let b = db.create_node("T", props.clone()).unwrap();
update_at(a, 2 * bin, 1.0); update_at(a, 3 * bin, 1.0);
update_at(b, 5 * bin, 1.0); update_at(b, 6 * bin, 1.0);
let detector = RippleDetector::new(&db);
let config = RippleConfig {
window: TimeRange::new((t_base).into(), (t_base + 10 * bin).into()).unwrap(),
bin_size_us: bin,
max_lag_bins: 5,
min_correlation: 0.8,
};
let ripple = detector.detect_causality(a, b, &config, "vec").unwrap();
assert!(ripple.is_some(), "Should detect ripple");
let r = ripple.unwrap();
assert_eq!(r.lag_bins, 3);
assert!(r.correlation > 0.9);
}
}