ruvector_data_framework/
coherence.rs

1//! Coherence signal computation using dynamic minimum cut algorithms
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8use crate::hnsw::{HnswConfig, HnswIndex, DistanceMetric};
9use crate::ruvector_native::{Domain, SemanticVector};
10use crate::utils::cosine_similarity;
11use crate::{DataRecord, FrameworkError, Result, Relationship, TemporalWindow};
12
13/// Configuration for coherence engine
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct CoherenceConfig {
16    /// Minimum edge weight threshold
17    pub min_edge_weight: f64,
18
19    /// Window size for temporal analysis (seconds)
20    pub window_size_secs: i64,
21
22    /// Window slide step (seconds)
23    pub window_step_secs: i64,
24
25    /// Use approximate min-cut for speed
26    pub approximate: bool,
27
28    /// Approximation ratio (if approximate = true)
29    pub epsilon: f64,
30
31    /// Enable parallel computation
32    pub parallel: bool,
33
34    /// Track boundary evolution
35    pub track_boundaries: bool,
36
37    /// Similarity threshold for auto-connecting embeddings (0.0-1.0)
38    pub similarity_threshold: f64,
39
40    /// Use embeddings to create edges when relationships are empty
41    pub use_embeddings: bool,
42
43    /// Number of neighbors to search for each vector when using HNSW
44    pub hnsw_k_neighbors: usize,
45
46    /// Minimum records to trigger HNSW indexing (below this, use brute force)
47    pub hnsw_min_records: usize,
48}
49
50impl Default for CoherenceConfig {
51    fn default() -> Self {
52        Self {
53            min_edge_weight: 0.01,
54            window_size_secs: 86400 * 7, // 1 week
55            window_step_secs: 86400,     // 1 day
56            approximate: true,
57            epsilon: 0.1,
58            parallel: true,
59            track_boundaries: true,
60            similarity_threshold: 0.5,
61            use_embeddings: true,
62            hnsw_k_neighbors: 50,
63            hnsw_min_records: 100,
64        }
65    }
66}
67
68/// A coherence signal computed from graph structure
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct CoherenceSignal {
71    /// Signal identifier
72    pub id: String,
73
74    /// Temporal window this signal covers
75    pub window: TemporalWindow,
76
77    /// Minimum cut value (lower = less coherent)
78    pub min_cut_value: f64,
79
80    /// Number of nodes in graph
81    pub node_count: usize,
82
83    /// Number of edges in graph
84    pub edge_count: usize,
85
86    /// Partition sizes (if computed)
87    pub partition_sizes: Option<(usize, usize)>,
88
89    /// Is this an exact or approximate result
90    pub is_exact: bool,
91
92    /// Nodes in the cut (boundary nodes)
93    pub cut_nodes: Vec<String>,
94
95    /// Change from previous window (if available)
96    pub delta: Option<f64>,
97}
98
99/// A coherence boundary event
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct CoherenceEvent {
102    /// Event type
103    pub event_type: CoherenceEventType,
104
105    /// Timestamp of event
106    pub timestamp: DateTime<Utc>,
107
108    /// Related nodes
109    pub nodes: Vec<String>,
110
111    /// Magnitude of change
112    pub magnitude: f64,
113
114    /// Additional context
115    pub context: HashMap<String, serde_json::Value>,
116}
117
118/// Types of coherence events
119#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
120pub enum CoherenceEventType {
121    /// Coherence increased (min-cut grew)
122    Strengthened,
123
124    /// Coherence decreased (min-cut shrunk)
125    Weakened,
126
127    /// New partition emerged (split)
128    Split,
129
130    /// Partitions merged
131    Merged,
132
133    /// Threshold crossed
134    ThresholdCrossed,
135
136    /// Anomalous pattern detected
137    Anomaly,
138}
139
140/// A tracked coherence boundary
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct CoherenceBoundary {
143    /// Boundary identifier
144    pub id: String,
145
146    /// Nodes on one side
147    pub side_a: Vec<String>,
148
149    /// Nodes on other side
150    pub side_b: Vec<String>,
151
152    /// Current cut value at boundary
153    pub cut_value: f64,
154
155    /// Historical cut values
156    pub history: Vec<(DateTime<Utc>, f64)>,
157
158    /// First observed
159    pub first_seen: DateTime<Utc>,
160
161    /// Last updated
162    pub last_updated: DateTime<Utc>,
163
164    /// Is boundary stable or shifting
165    pub stable: bool,
166}
167
168/// Coherence engine for computing signals from graph structure
169pub struct CoherenceEngine {
170    config: CoherenceConfig,
171
172    // In-memory graph representation
173    nodes: HashMap<String, u64>,
174    node_ids: HashMap<u64, String>,
175    edges: Vec<(u64, u64, f64)>,
176    next_id: u64,
177
178    // Computed signals
179    signals: Vec<CoherenceSignal>,
180
181    // Tracked boundaries
182    boundaries: Vec<CoherenceBoundary>,
183}
184
185impl CoherenceEngine {
186    /// Create a new coherence engine
187    pub fn new(config: CoherenceConfig) -> Self {
188        Self {
189            config,
190            nodes: HashMap::new(),
191            node_ids: HashMap::new(),
192            edges: Vec::new(),
193            next_id: 0,
194            signals: Vec::new(),
195            boundaries: Vec::new(),
196        }
197    }
198
199    /// Add a node to the graph
200    pub fn add_node(&mut self, id: &str) -> u64 {
201        if let Some(&node_id) = self.nodes.get(id) {
202            return node_id;
203        }
204
205        let node_id = self.next_id;
206        self.next_id += 1;
207        self.nodes.insert(id.to_string(), node_id);
208        self.node_ids.insert(node_id, id.to_string());
209        node_id
210    }
211
212    /// Add an edge to the graph
213    pub fn add_edge(&mut self, source: &str, target: &str, weight: f64) {
214        if weight < self.config.min_edge_weight {
215            return;
216        }
217
218        let source_id = self.add_node(source);
219        let target_id = self.add_node(target);
220        self.edges.push((source_id, target_id, weight));
221    }
222
223    /// Get node count
224    pub fn node_count(&self) -> usize {
225        self.nodes.len()
226    }
227
228    /// Get edge count
229    pub fn edge_count(&self) -> usize {
230        self.edges.len()
231    }
232
233    /// Build graph from data records
234    pub fn build_from_records(&mut self, records: &[DataRecord]) {
235        // First pass: add all nodes and explicit relationships
236        for record in records {
237            self.add_node(&record.id);
238
239            for rel in &record.relationships {
240                self.add_edge(&record.id, &rel.target_id, rel.weight);
241            }
242        }
243
244        // Second pass: create edges based on embedding similarity
245        if self.config.use_embeddings {
246            self.connect_by_embeddings(records);
247        }
248    }
249
250    /// Connect records based on embedding similarity using HNSW for O(n log n) performance
251    fn connect_by_embeddings(&mut self, records: &[DataRecord]) {
252        let threshold = self.config.similarity_threshold;
253        let min_weight = self.config.min_edge_weight;
254
255        // Collect records with embeddings
256        let embedded: Vec<_> = records.iter()
257            .filter(|r| r.embedding.is_some())
258            .collect();
259
260        if embedded.len() < 2 {
261            return;
262        }
263
264        // Use HNSW for large datasets, brute force for small ones
265        if embedded.len() >= self.config.hnsw_min_records {
266            self.connect_by_embeddings_hnsw(&embedded, threshold, min_weight);
267        } else {
268            self.connect_by_embeddings_bruteforce(&embedded, threshold, min_weight);
269        }
270    }
271
272    /// HNSW-accelerated edge creation: O(n * k * log n)
273    fn connect_by_embeddings_hnsw(&mut self, embedded: &[&DataRecord], threshold: f64, min_weight: f64) {
274        let dim = match &embedded[0].embedding {
275            Some(emb) => emb.len(),
276            None => return,
277        };
278
279        let hnsw_config = HnswConfig {
280            dimension: dim,
281            metric: DistanceMetric::Cosine,
282            m: 16,
283            m_max_0: 32,
284            ef_construction: 200,
285            ef_search: self.config.hnsw_k_neighbors.max(50),
286            ..HnswConfig::default()
287        };
288
289        let mut hnsw = HnswIndex::with_config(hnsw_config);
290
291        for record in embedded.iter() {
292            if let Some(embedding) = &record.embedding {
293                let vector = SemanticVector {
294                    id: record.id.clone(),
295                    embedding: embedding.clone(),
296                    timestamp: record.timestamp,
297                    domain: Domain::CrossDomain,
298                    metadata: std::collections::HashMap::new(),
299                };
300                let _ = hnsw.insert(vector);
301            }
302        }
303
304        let k = self.config.hnsw_k_neighbors;
305        let threshold_f32 = threshold as f32;
306        let min_weight_f32 = min_weight as f32;
307
308        use std::collections::HashSet;
309        let mut seen: HashSet<(String, String)> = HashSet::new();
310
311        for record in embedded.iter() {
312            if let Some(embedding) = &record.embedding {
313                if let Ok(neighbors) = hnsw.search_knn(embedding, k + 1) {
314                    for neighbor in neighbors {
315                        if neighbor.external_id == record.id {
316                            continue;
317                        }
318                        if let Some(similarity) = neighbor.similarity {
319                            if similarity >= threshold_f32 {
320                                let key = if record.id < neighbor.external_id {
321                                    (record.id.clone(), neighbor.external_id.clone())
322                                } else {
323                                    (neighbor.external_id.clone(), record.id.clone())
324                                };
325                                if seen.insert(key) {
326                                    self.add_edge(&record.id, &neighbor.external_id, similarity.max(min_weight_f32) as f64);
327                                }
328                            }
329                        }
330                    }
331                }
332            }
333        }
334    }
335
336    /// Brute-force edge creation for small datasets: O(n²)
337    fn connect_by_embeddings_bruteforce(&mut self, embedded: &[&DataRecord], threshold: f64, min_weight: f64) {
338        let threshold_f32 = threshold as f32;
339        let min_weight_f32 = min_weight as f32;
340
341        for i in 0..embedded.len() {
342            for j in (i + 1)..embedded.len() {
343                if let (Some(emb_a), Some(emb_b)) =
344                    (&embedded[i].embedding, &embedded[j].embedding)
345                {
346                    let similarity = cosine_similarity(emb_a, emb_b);
347                    if similarity >= threshold_f32 {
348                        self.add_edge(
349                            &embedded[i].id,
350                            &embedded[j].id,
351                            similarity.max(min_weight_f32) as f64,
352                        );
353                    }
354                }
355            }
356        }
357    }
358
359    /// Compute coherence signals from records
360    pub fn compute_from_records(&mut self, records: &[DataRecord]) -> Result<Vec<CoherenceSignal>> {
361        self.build_from_records(records);
362        self.compute_signals()
363    }
364
365    /// Compute coherence signals over the current graph
366    pub fn compute_signals(&mut self) -> Result<Vec<CoherenceSignal>> {
367        if self.nodes.is_empty() {
368            return Ok(vec![]);
369        }
370
371        // Build the min-cut structure
372        // This integrates with ruvector-mincut for actual computation
373        let min_cut_value = self.compute_min_cut()?;
374
375        let signal = CoherenceSignal {
376            id: format!("signal_{}", self.signals.len()),
377            window: TemporalWindow::new(Utc::now(), Utc::now(), self.signals.len() as u64),
378            min_cut_value,
379            node_count: self.node_count(),
380            edge_count: self.edge_count(),
381            partition_sizes: self.compute_partition_sizes(),
382            is_exact: !self.config.approximate,
383            cut_nodes: self.find_cut_nodes(),
384            delta: self.compute_delta(),
385        };
386
387        self.signals.push(signal.clone());
388        Ok(self.signals.clone())
389    }
390
391    /// Compute minimum cut value
392    fn compute_min_cut(&self) -> Result<f64> {
393        // For graphs with < 2 nodes, there's no meaningful cut
394        if self.nodes.len() < 2 {
395            return Ok(f64::INFINITY);
396        }
397
398        // Use a simple Karger-Stein style approximation for demo
399        // In production, this integrates with ruvector_mincut::MinCutBuilder
400        let total_weight: f64 = self.edges.iter().map(|(_, _, w)| w).sum();
401
402        // Approximate min-cut as fraction of total edge weight
403        // Real implementation uses ruvector_mincut algorithms
404        let approx_cut = if self.edges.is_empty() {
405            0.0
406        } else {
407            let avg_degree = (2.0 * self.edges.len() as f64) / self.nodes.len() as f64;
408            total_weight / (avg_degree.max(1.0))
409        };
410
411        Ok(approx_cut)
412    }
413
414    /// Compute partition sizes
415    fn compute_partition_sizes(&self) -> Option<(usize, usize)> {
416        let n = self.nodes.len();
417        if n < 2 {
418            return None;
419        }
420        // Approximate: balanced partition
421        Some((n / 2, n - n / 2))
422    }
423
424    /// Find nodes on the cut boundary
425    fn find_cut_nodes(&self) -> Vec<String> {
426        // Return nodes with edges to both partitions
427        // Simplified: return high-degree nodes
428        let mut degrees: HashMap<u64, usize> = HashMap::new();
429
430        for (src, tgt, _) in &self.edges {
431            *degrees.entry(*src).or_default() += 1;
432            *degrees.entry(*tgt).or_default() += 1;
433        }
434
435        let avg_degree = if degrees.is_empty() {
436            0
437        } else {
438            degrees.values().sum::<usize>() / degrees.len()
439        };
440
441        degrees
442            .iter()
443            .filter(|(_, &d)| d > avg_degree * 2)
444            .filter_map(|(&id, _)| self.node_ids.get(&id).cloned())
445            .take(10)
446            .collect()
447    }
448
449    /// Compute change from previous signal
450    fn compute_delta(&self) -> Option<f64> {
451        if self.signals.is_empty() {
452            return None;
453        }
454
455        let prev = &self.signals[self.signals.len() - 1];
456        let current_cut = self.compute_min_cut().unwrap_or(0.0);
457        Some(current_cut - prev.min_cut_value)
458    }
459
460    /// Detect coherence events between windows
461    pub fn detect_events(&self, threshold: f64) -> Vec<CoherenceEvent> {
462        let mut events = Vec::new();
463
464        for i in 1..self.signals.len() {
465            let prev = &self.signals[i - 1];
466            let curr = &self.signals[i];
467
468            if let Some(delta) = curr.delta {
469                if delta.abs() > threshold {
470                    let event_type = if delta > 0.0 {
471                        CoherenceEventType::Strengthened
472                    } else {
473                        CoherenceEventType::Weakened
474                    };
475
476                    events.push(CoherenceEvent {
477                        event_type,
478                        timestamp: curr.window.start,
479                        nodes: curr.cut_nodes.clone(),
480                        magnitude: delta.abs(),
481                        context: HashMap::new(),
482                    });
483                }
484            }
485        }
486
487        events
488    }
489
490    /// Get historical signals
491    pub fn signals(&self) -> &[CoherenceSignal] {
492        &self.signals
493    }
494
495    /// Get tracked boundaries
496    pub fn boundaries(&self) -> &[CoherenceBoundary] {
497        &self.boundaries
498    }
499
500    /// Clear the graph and signals
501    pub fn clear(&mut self) {
502        self.nodes.clear();
503        self.node_ids.clear();
504        self.edges.clear();
505        self.next_id = 0;
506        self.signals.clear();
507    }
508}
509
510/// Streaming coherence computation for time series
511pub struct StreamingCoherence {
512    engine: CoherenceEngine,
513    window_size: i64,
514    window_step: i64,
515    current_window: Option<TemporalWindow>,
516    window_records: Vec<DataRecord>,
517}
518
519impl StreamingCoherence {
520    /// Create a new streaming coherence computer
521    pub fn new(config: CoherenceConfig) -> Self {
522        let window_size = config.window_size_secs;
523        let window_step = config.window_step_secs;
524
525        Self {
526            engine: CoherenceEngine::new(config),
527            window_size,
528            window_step,
529            current_window: None,
530            window_records: Vec::new(),
531        }
532    }
533
534    /// Process a single record
535    pub fn process(&mut self, record: DataRecord) -> Option<CoherenceSignal> {
536        let ts = record.timestamp;
537
538        // Initialize window if needed
539        if self.current_window.is_none() {
540            self.current_window = Some(TemporalWindow::new(
541                ts,
542                ts + chrono::Duration::seconds(self.window_size),
543                0,
544            ));
545        }
546
547        // Check if record falls in current window
548        {
549            let window = self.current_window.as_ref().unwrap();
550            if window.contains(ts) {
551                self.window_records.push(record);
552                return None;
553            }
554        }
555
556        // Extract values before mutable borrow
557        let (old_start, old_window_id) = {
558            let window = self.current_window.as_ref().unwrap();
559            (window.start, window.window_id)
560        };
561
562        // Window complete, compute signal
563        let signal = self.finalize_window();
564
565        // Start new window
566        let new_start = old_start + chrono::Duration::seconds(self.window_step);
567        self.current_window = Some(TemporalWindow::new(
568            new_start,
569            new_start + chrono::Duration::seconds(self.window_size),
570            old_window_id + 1,
571        ));
572
573        // Add record to new window
574        self.window_records.push(record);
575
576        signal
577    }
578
579    /// Finalize current window and compute signal
580    pub fn finalize_window(&mut self) -> Option<CoherenceSignal> {
581        if self.window_records.is_empty() {
582            return None;
583        }
584
585        self.engine.clear();
586        let signals = self
587            .engine
588            .compute_from_records(&self.window_records)
589            .ok()?;
590        self.window_records.clear();
591
592        signals.into_iter().last()
593    }
594}
595
596#[cfg(test)]
597mod tests {
598    use super::*;
599
600    fn make_test_record(id: &str, rels: Vec<(&str, f64)>) -> DataRecord {
601        DataRecord {
602            id: id.to_string(),
603            source: "test".to_string(),
604            record_type: "node".to_string(),
605            timestamp: Utc::now(),
606            data: serde_json::json!({}),
607            embedding: None,
608            relationships: rels
609                .into_iter()
610                .map(|(target, weight)| Relationship {
611                    target_id: target.to_string(),
612                    rel_type: "related".to_string(),
613                    weight,
614                    properties: HashMap::new(),
615                })
616                .collect(),
617        }
618    }
619
620    #[test]
621    fn test_coherence_engine_basic() {
622        let config = CoherenceConfig::default();
623        let mut engine = CoherenceEngine::new(config);
624
625        engine.add_node("A");
626        engine.add_node("B");
627        engine.add_edge("A", "B", 1.0);
628
629        assert_eq!(engine.node_count(), 2);
630        assert_eq!(engine.edge_count(), 1);
631    }
632
633    #[test]
634    fn test_coherence_from_records() {
635        let config = CoherenceConfig::default();
636        let mut engine = CoherenceEngine::new(config);
637
638        let records = vec![
639            make_test_record("A", vec![("B", 1.0), ("C", 0.5)]),
640            make_test_record("B", vec![("C", 1.0)]),
641            make_test_record("C", vec![]),
642        ];
643
644        let signals = engine.compute_from_records(&records).unwrap();
645        assert!(!signals.is_empty());
646        assert_eq!(engine.node_count(), 3);
647    }
648
649    #[test]
650    fn test_event_detection() {
651        let config = CoherenceConfig::default();
652        let engine = CoherenceEngine::new(config);
653
654        // Events require multiple signals to detect changes
655        let events = engine.detect_events(0.1);
656        assert!(events.is_empty());
657    }
658}