use super::energy::{CoherenceEnergy, EdgeEnergy, EdgeId};
use super::engine::{CoherenceEngine, NodeId};
use chrono::{DateTime, Utc};
#[cfg(feature = "parallel")]
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IncrementalConfig {
pub enabled: bool,
pub full_recompute_threshold: f32,
pub batch_updates: bool,
pub max_batch_size: usize,
pub track_history: bool,
pub history_size: usize,
}
impl Default for IncrementalConfig {
fn default() -> Self {
Self {
enabled: true,
full_recompute_threshold: 0.3, batch_updates: true,
max_batch_size: 100,
track_history: true,
history_size: 1000,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaResult {
pub energy_delta: f32,
pub new_energy: f32,
pub old_energy: f32,
pub edges_recomputed: usize,
pub total_edges: usize,
pub was_full_recompute: bool,
pub compute_time_us: u64,
pub timestamp: DateTime<Utc>,
}
impl DeltaResult {
pub fn relative_change(&self) -> f32 {
if self.old_energy > 1e-10 {
self.energy_delta / self.old_energy
} else {
if self.new_energy > 1e-10 {
1.0
} else {
0.0
}
}
}
#[inline]
pub fn energy_increased(&self) -> bool {
self.energy_delta > 0.0
}
#[inline]
pub fn energy_decreased(&self) -> bool {
self.energy_delta < 0.0
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateEvent {
NodeUpdated {
node_id: NodeId,
affected_edges: Vec<EdgeId>,
timestamp: DateTime<Utc>,
},
EdgeAdded {
edge_id: EdgeId,
timestamp: DateTime<Utc>,
},
EdgeRemoved {
edge_id: EdgeId,
old_energy: f32,
timestamp: DateTime<Utc>,
},
NodeAdded {
node_id: NodeId,
timestamp: DateTime<Utc>,
},
NodeRemoved {
node_id: NodeId,
removed_edges: Vec<EdgeId>,
removed_energy: f32,
timestamp: DateTime<Utc>,
},
}
impl UpdateEvent {
pub fn timestamp(&self) -> DateTime<Utc> {
match self {
UpdateEvent::NodeUpdated { timestamp, .. } => *timestamp,
UpdateEvent::EdgeAdded { timestamp, .. } => *timestamp,
UpdateEvent::EdgeRemoved { timestamp, .. } => *timestamp,
UpdateEvent::NodeAdded { timestamp, .. } => *timestamp,
UpdateEvent::NodeRemoved { timestamp, .. } => *timestamp,
}
}
pub fn affects_edge(&self, edge_id: &str) -> bool {
match self {
UpdateEvent::NodeUpdated { affected_edges, .. } => affected_edges.contains(&edge_id.to_string()),
UpdateEvent::EdgeAdded { edge_id: eid, .. } => eid == edge_id,
UpdateEvent::EdgeRemoved { edge_id: eid, .. } => eid == edge_id,
UpdateEvent::NodeAdded { .. } => false,
UpdateEvent::NodeRemoved { removed_edges, .. } => removed_edges.contains(&edge_id.to_string()),
}
}
}
#[derive(Debug, Default)]
pub struct IncrementalCache {
edge_energies: HashMap<EdgeId, f32>,
edge_residuals: HashMap<EdgeId, Vec<f32>>,
total_energy: f32,
last_fingerprint: String,
dirty_edges: HashSet<EdgeId>,
removed_energies: HashMap<EdgeId, f32>,
}
impl IncrementalCache {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn is_valid(&self, fingerprint: &str) -> bool {
self.last_fingerprint == fingerprint && self.dirty_edges.is_empty()
}
pub fn mark_dirty(&mut self, edge_id: impl Into<EdgeId>) {
self.dirty_edges.insert(edge_id.into());
}
pub fn mark_node_dirty(&mut self, incident_edges: &[EdgeId]) {
for edge_id in incident_edges {
self.dirty_edges.insert(edge_id.clone());
}
}
pub fn update_edge(&mut self, edge_id: impl Into<EdgeId>, energy: f32, residual: Vec<f32>) {
let edge_id = edge_id.into();
self.dirty_edges.remove(&edge_id);
if let Some(old_energy) = self.edge_energies.get(&edge_id) {
self.total_energy -= old_energy;
}
self.total_energy += energy;
self.edge_energies.insert(edge_id.clone(), energy);
self.edge_residuals.insert(edge_id, residual);
}
pub fn remove_edge(&mut self, edge_id: &str) {
if let Some(energy) = self.edge_energies.remove(edge_id) {
self.total_energy -= energy;
self.removed_energies.insert(edge_id.to_string(), energy);
}
self.edge_residuals.remove(edge_id);
self.dirty_edges.remove(edge_id);
}
pub fn get_energy(&self, edge_id: &str) -> Option<f32> {
self.edge_energies.get(edge_id).copied()
}
pub fn get_residual(&self, edge_id: &str) -> Option<&Vec<f32>> {
self.edge_residuals.get(edge_id)
}
#[inline]
pub fn total_energy(&self) -> f32 {
self.total_energy
}
#[inline]
pub fn dirty_count(&self) -> usize {
self.dirty_edges.len()
}
pub fn dirty_edges(&self) -> &HashSet<EdgeId> {
&self.dirty_edges
}
pub fn set_fingerprint(&mut self, fingerprint: impl Into<String>) {
self.last_fingerprint = fingerprint.into();
}
pub fn clear_removed(&mut self) {
self.removed_energies.clear();
}
pub fn clear(&mut self) {
self.edge_energies.clear();
self.edge_residuals.clear();
self.total_energy = 0.0;
self.last_fingerprint.clear();
self.dirty_edges.clear();
self.removed_energies.clear();
}
}
pub struct IncrementalEngine<'a> {
engine: &'a CoherenceEngine,
config: IncrementalConfig,
cache: IncrementalCache,
pending_events: Vec<UpdateEvent>,
energy_history: Vec<EnergyHistoryEntry>,
stats: IncrementalStats,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct EnergyHistoryEntry {
energy: f32,
timestamp: DateTime<Utc>,
was_incremental: bool,
edges_recomputed: usize,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
struct IncrementalStats {
total_updates: u64,
incremental_updates: u64,
full_recomputes: u64,
total_edges_recomputed: u64,
total_time_us: u64,
}
impl<'a> IncrementalEngine<'a> {
pub fn new(engine: &'a CoherenceEngine, config: IncrementalConfig) -> Self {
Self {
engine,
config,
cache: IncrementalCache::new(),
pending_events: Vec::new(),
energy_history: Vec::new(),
stats: IncrementalStats::default(),
}
}
pub fn node_updated(&mut self, node_id: impl Into<NodeId>) {
let node_id = node_id.into();
let affected_edges = self.engine.edges_incident_to(&node_id);
self.cache.mark_node_dirty(&affected_edges);
if self.config.track_history {
self.pending_events.push(UpdateEvent::NodeUpdated {
node_id,
affected_edges,
timestamp: Utc::now(),
});
}
}
pub fn edge_added(&mut self, edge_id: impl Into<EdgeId>) {
let edge_id = edge_id.into();
self.cache.mark_dirty(edge_id.clone());
if self.config.track_history {
self.pending_events.push(UpdateEvent::EdgeAdded {
edge_id,
timestamp: Utc::now(),
});
}
}
pub fn edge_removed(&mut self, edge_id: impl Into<EdgeId>) {
let edge_id = edge_id.into();
let old_energy = self.cache.get_energy(&edge_id).unwrap_or(0.0);
self.cache.remove_edge(&edge_id);
if self.config.track_history {
self.pending_events.push(UpdateEvent::EdgeRemoved {
edge_id,
old_energy,
timestamp: Utc::now(),
});
}
}
pub fn compute(&mut self) -> DeltaResult {
let start = std::time::Instant::now();
let old_energy = self.cache.total_energy();
let total_edges = self.engine.edge_count();
let dirty_count = self.cache.dirty_count();
let ratio = if total_edges > 0 {
dirty_count as f32 / total_edges as f32
} else {
1.0
};
let (new_energy, edges_recomputed, was_full) = if !self.config.enabled
|| ratio > self.config.full_recompute_threshold
|| self.cache.last_fingerprint.is_empty()
{
let energy = self.compute_full_internal();
(energy.total_energy, energy.edge_count, true)
} else {
let result = self.compute_incremental_internal();
(result, dirty_count, false)
};
let compute_time_us = start.elapsed().as_micros() as u64;
let energy_delta = new_energy - old_energy;
self.stats.total_updates += 1;
if was_full {
self.stats.full_recomputes += 1;
} else {
self.stats.incremental_updates += 1;
}
self.stats.total_edges_recomputed += edges_recomputed as u64;
self.stats.total_time_us += compute_time_us;
if self.config.track_history {
self.energy_history.push(EnergyHistoryEntry {
energy: new_energy,
timestamp: Utc::now(),
was_incremental: !was_full,
edges_recomputed,
});
while self.energy_history.len() > self.config.history_size {
self.energy_history.remove(0);
}
}
self.pending_events.clear();
self.cache.clear_removed();
DeltaResult {
energy_delta,
new_energy,
old_energy,
edges_recomputed,
total_edges,
was_full_recompute: was_full,
compute_time_us,
timestamp: Utc::now(),
}
}
pub fn compute_full(&mut self) -> CoherenceEnergy {
self.compute_full_internal()
}
#[inline]
pub fn cached_energy(&self) -> f32 {
self.cache.total_energy()
}
#[inline]
pub fn dirty_count(&self) -> usize {
self.cache.dirty_count()
}
pub fn incremental_ratio(&self) -> f32 {
if self.stats.total_updates > 0 {
self.stats.incremental_updates as f32 / self.stats.total_updates as f32
} else {
0.0
}
}
pub fn energy_trend(&self, window: usize) -> Option<f32> {
if self.energy_history.len() < window {
return None;
}
let recent: Vec<_> = self.energy_history.iter().rev().take(window).collect();
let n = recent.len() as f32;
let sum_x: f32 = (0..recent.len()).map(|i| i as f32).sum();
let sum_y: f32 = recent.iter().map(|e| e.energy).sum();
let sum_xy: f32 = recent
.iter()
.enumerate()
.map(|(i, e)| i as f32 * e.energy)
.sum();
let sum_xx: f32 = (0..recent.len()).map(|i| (i as f32).powi(2)).sum();
let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_xx - sum_x * sum_x);
Some(slope)
}
fn compute_full_internal(&mut self) -> CoherenceEnergy {
let energy = self.engine.compute_energy();
self.cache.clear();
for (edge_id, edge_energy) in &energy.edge_energies {
self.cache.update_edge(
edge_id.clone(),
edge_energy.energy,
edge_energy.residual.clone(),
);
}
self.cache.set_fingerprint(&energy.fingerprint);
energy
}
fn compute_incremental_internal(&mut self) -> f32 {
let dirty_edges: Vec<_> = self.cache.dirty_edges().iter().cloned().collect();
#[cfg(feature = "parallel")]
let new_energies: Vec<(EdgeId, EdgeEnergy)> = dirty_edges
.par_iter()
.filter_map(|edge_id| {
self.engine
.compute_edge_energy(edge_id)
.ok()
.map(|e| (edge_id.clone(), e))
})
.collect();
#[cfg(not(feature = "parallel"))]
let new_energies: Vec<(EdgeId, EdgeEnergy)> = dirty_edges
.iter()
.filter_map(|edge_id| {
self.engine
.compute_edge_energy(edge_id)
.ok()
.map(|e| (edge_id.clone(), e))
})
.collect();
for (edge_id, edge_energy) in new_energies {
self.cache.update_edge(
edge_id,
edge_energy.energy,
edge_energy.residual,
);
}
self.cache.set_fingerprint(self.engine.current_fingerprint());
self.cache.total_energy()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coherence::engine::CoherenceConfig;
#[test]
fn test_incremental_cache() {
let mut cache = IncrementalCache::new();
cache.update_edge("e1", 1.0, vec![1.0]);
cache.update_edge("e2", 2.0, vec![1.4]);
assert_eq!(cache.total_energy(), 3.0);
assert_eq!(cache.get_energy("e1"), Some(1.0));
cache.remove_edge("e1");
assert_eq!(cache.total_energy(), 2.0);
assert_eq!(cache.get_energy("e1"), None);
}
#[test]
fn test_dirty_tracking() {
let mut cache = IncrementalCache::new();
cache.update_edge("e1", 1.0, vec![]);
cache.set_fingerprint("fp1");
assert_eq!(cache.dirty_count(), 0);
cache.mark_dirty("e1");
assert_eq!(cache.dirty_count(), 1);
assert!(!cache.is_valid("fp1"));
cache.update_edge("e1", 1.5, vec![]);
assert_eq!(cache.dirty_count(), 0);
}
#[test]
fn test_incremental_engine() {
let engine = CoherenceEngine::new(CoherenceConfig::default());
engine.add_node("n1", vec![1.0, 0.0]).unwrap();
engine.add_node("n2", vec![0.0, 1.0]).unwrap();
engine.add_edge("n1", "n2", 1.0, None).unwrap();
let mut inc = IncrementalEngine::new(&engine, IncrementalConfig::default());
let result = inc.compute();
assert!(result.was_full_recompute);
assert_eq!(result.new_energy, 2.0);
assert_eq!(inc.dirty_count(), 0);
}
#[test]
fn test_delta_result() {
let result = DeltaResult {
energy_delta: 0.5,
new_energy: 2.5,
old_energy: 2.0,
edges_recomputed: 1,
total_edges: 10,
was_full_recompute: false,
compute_time_us: 100,
timestamp: Utc::now(),
};
assert!(result.energy_increased());
assert!(!result.energy_decreased());
assert!((result.relative_change() - 0.25).abs() < 1e-6);
}
#[test]
fn test_update_events() {
let event = UpdateEvent::NodeUpdated {
node_id: "n1".to_string(),
affected_edges: vec!["e1".to_string(), "e2".to_string()],
timestamp: Utc::now(),
};
assert!(event.affects_edge("e1"));
assert!(event.affects_edge("e2"));
assert!(!event.affects_edge("e3"));
}
#[test]
fn test_energy_trend() {
let engine = CoherenceEngine::default();
let mut inc = IncrementalEngine::new(
&engine,
IncrementalConfig {
track_history: true,
history_size: 10,
..Default::default()
},
);
for i in 0..5 {
inc.energy_history.push(EnergyHistoryEntry {
energy: i as f32 * 0.5,
timestamp: Utc::now(),
was_incremental: true,
edges_recomputed: 1,
});
}
let trend = inc.energy_trend(4);
assert!(trend.is_some());
assert!(trend.unwrap() > 0.0); }
}