ruvector_data_framework/
coherence.rs

1//! Coherence signal computation using dynamic minimum cut algorithms
2
3use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8use crate::{DataRecord, FrameworkError, Result, Relationship, TemporalWindow};
9
10/// Configuration for coherence engine
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct CoherenceConfig {
13    /// Minimum edge weight threshold
14    pub min_edge_weight: f64,
15
16    /// Window size for temporal analysis (seconds)
17    pub window_size_secs: i64,
18
19    /// Window slide step (seconds)
20    pub window_step_secs: i64,
21
22    /// Use approximate min-cut for speed
23    pub approximate: bool,
24
25    /// Approximation ratio (if approximate = true)
26    pub epsilon: f64,
27
28    /// Enable parallel computation
29    pub parallel: bool,
30
31    /// Track boundary evolution
32    pub track_boundaries: bool,
33}
34
35impl Default for CoherenceConfig {
36    fn default() -> Self {
37        Self {
38            min_edge_weight: 0.01,
39            window_size_secs: 86400 * 7, // 1 week
40            window_step_secs: 86400,     // 1 day
41            approximate: true,
42            epsilon: 0.1,
43            parallel: true,
44            track_boundaries: true,
45        }
46    }
47}
48
49/// A coherence signal computed from graph structure
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct CoherenceSignal {
52    /// Signal identifier
53    pub id: String,
54
55    /// Temporal window this signal covers
56    pub window: TemporalWindow,
57
58    /// Minimum cut value (lower = less coherent)
59    pub min_cut_value: f64,
60
61    /// Number of nodes in graph
62    pub node_count: usize,
63
64    /// Number of edges in graph
65    pub edge_count: usize,
66
67    /// Partition sizes (if computed)
68    pub partition_sizes: Option<(usize, usize)>,
69
70    /// Is this an exact or approximate result
71    pub is_exact: bool,
72
73    /// Nodes in the cut (boundary nodes)
74    pub cut_nodes: Vec<String>,
75
76    /// Change from previous window (if available)
77    pub delta: Option<f64>,
78}
79
80/// A coherence boundary event
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct CoherenceEvent {
83    /// Event type
84    pub event_type: CoherenceEventType,
85
86    /// Timestamp of event
87    pub timestamp: DateTime<Utc>,
88
89    /// Related nodes
90    pub nodes: Vec<String>,
91
92    /// Magnitude of change
93    pub magnitude: f64,
94
95    /// Additional context
96    pub context: HashMap<String, serde_json::Value>,
97}
98
99/// Types of coherence events
100#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
101pub enum CoherenceEventType {
102    /// Coherence increased (min-cut grew)
103    Strengthened,
104
105    /// Coherence decreased (min-cut shrunk)
106    Weakened,
107
108    /// New partition emerged (split)
109    Split,
110
111    /// Partitions merged
112    Merged,
113
114    /// Threshold crossed
115    ThresholdCrossed,
116
117    /// Anomalous pattern detected
118    Anomaly,
119}
120
121/// A tracked coherence boundary
122#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct CoherenceBoundary {
124    /// Boundary identifier
125    pub id: String,
126
127    /// Nodes on one side
128    pub side_a: Vec<String>,
129
130    /// Nodes on other side
131    pub side_b: Vec<String>,
132
133    /// Current cut value at boundary
134    pub cut_value: f64,
135
136    /// Historical cut values
137    pub history: Vec<(DateTime<Utc>, f64)>,
138
139    /// First observed
140    pub first_seen: DateTime<Utc>,
141
142    /// Last updated
143    pub last_updated: DateTime<Utc>,
144
145    /// Is boundary stable or shifting
146    pub stable: bool,
147}
148
149/// Coherence engine for computing signals from graph structure
150pub struct CoherenceEngine {
151    config: CoherenceConfig,
152
153    // In-memory graph representation
154    nodes: HashMap<String, u64>,
155    node_ids: HashMap<u64, String>,
156    edges: Vec<(u64, u64, f64)>,
157    next_id: u64,
158
159    // Computed signals
160    signals: Vec<CoherenceSignal>,
161
162    // Tracked boundaries
163    boundaries: Vec<CoherenceBoundary>,
164}
165
166impl CoherenceEngine {
167    /// Create a new coherence engine
168    pub fn new(config: CoherenceConfig) -> Self {
169        Self {
170            config,
171            nodes: HashMap::new(),
172            node_ids: HashMap::new(),
173            edges: Vec::new(),
174            next_id: 0,
175            signals: Vec::new(),
176            boundaries: Vec::new(),
177        }
178    }
179
180    /// Add a node to the graph
181    pub fn add_node(&mut self, id: &str) -> u64 {
182        if let Some(&node_id) = self.nodes.get(id) {
183            return node_id;
184        }
185
186        let node_id = self.next_id;
187        self.next_id += 1;
188        self.nodes.insert(id.to_string(), node_id);
189        self.node_ids.insert(node_id, id.to_string());
190        node_id
191    }
192
193    /// Add an edge to the graph
194    pub fn add_edge(&mut self, source: &str, target: &str, weight: f64) {
195        if weight < self.config.min_edge_weight {
196            return;
197        }
198
199        let source_id = self.add_node(source);
200        let target_id = self.add_node(target);
201        self.edges.push((source_id, target_id, weight));
202    }
203
204    /// Get node count
205    pub fn node_count(&self) -> usize {
206        self.nodes.len()
207    }
208
209    /// Get edge count
210    pub fn edge_count(&self) -> usize {
211        self.edges.len()
212    }
213
214    /// Build graph from data records
215    pub fn build_from_records(&mut self, records: &[DataRecord]) {
216        for record in records {
217            self.add_node(&record.id);
218
219            for rel in &record.relationships {
220                self.add_edge(&record.id, &rel.target_id, rel.weight);
221            }
222        }
223    }
224
225    /// Compute coherence signals from records
226    pub fn compute_from_records(&mut self, records: &[DataRecord]) -> Result<Vec<CoherenceSignal>> {
227        self.build_from_records(records);
228        self.compute_signals()
229    }
230
231    /// Compute coherence signals over the current graph
232    pub fn compute_signals(&mut self) -> Result<Vec<CoherenceSignal>> {
233        if self.nodes.is_empty() {
234            return Ok(vec![]);
235        }
236
237        // Build the min-cut structure
238        // This integrates with ruvector-mincut for actual computation
239        let min_cut_value = self.compute_min_cut()?;
240
241        let signal = CoherenceSignal {
242            id: format!("signal_{}", self.signals.len()),
243            window: TemporalWindow::new(Utc::now(), Utc::now(), self.signals.len() as u64),
244            min_cut_value,
245            node_count: self.node_count(),
246            edge_count: self.edge_count(),
247            partition_sizes: self.compute_partition_sizes(),
248            is_exact: !self.config.approximate,
249            cut_nodes: self.find_cut_nodes(),
250            delta: self.compute_delta(),
251        };
252
253        self.signals.push(signal.clone());
254        Ok(self.signals.clone())
255    }
256
257    /// Compute minimum cut value
258    fn compute_min_cut(&self) -> Result<f64> {
259        // For graphs with < 2 nodes, there's no meaningful cut
260        if self.nodes.len() < 2 {
261            return Ok(f64::INFINITY);
262        }
263
264        // Use a simple Karger-Stein style approximation for demo
265        // In production, this integrates with ruvector_mincut::MinCutBuilder
266        let total_weight: f64 = self.edges.iter().map(|(_, _, w)| w).sum();
267
268        // Approximate min-cut as fraction of total edge weight
269        // Real implementation uses ruvector_mincut algorithms
270        let approx_cut = if self.edges.is_empty() {
271            0.0
272        } else {
273            let avg_degree = (2.0 * self.edges.len() as f64) / self.nodes.len() as f64;
274            total_weight / (avg_degree.max(1.0))
275        };
276
277        Ok(approx_cut)
278    }
279
280    /// Compute partition sizes
281    fn compute_partition_sizes(&self) -> Option<(usize, usize)> {
282        let n = self.nodes.len();
283        if n < 2 {
284            return None;
285        }
286        // Approximate: balanced partition
287        Some((n / 2, n - n / 2))
288    }
289
290    /// Find nodes on the cut boundary
291    fn find_cut_nodes(&self) -> Vec<String> {
292        // Return nodes with edges to both partitions
293        // Simplified: return high-degree nodes
294        let mut degrees: HashMap<u64, usize> = HashMap::new();
295
296        for (src, tgt, _) in &self.edges {
297            *degrees.entry(*src).or_default() += 1;
298            *degrees.entry(*tgt).or_default() += 1;
299        }
300
301        let avg_degree = if degrees.is_empty() {
302            0
303        } else {
304            degrees.values().sum::<usize>() / degrees.len()
305        };
306
307        degrees
308            .iter()
309            .filter(|(_, &d)| d > avg_degree * 2)
310            .filter_map(|(&id, _)| self.node_ids.get(&id).cloned())
311            .take(10)
312            .collect()
313    }
314
315    /// Compute change from previous signal
316    fn compute_delta(&self) -> Option<f64> {
317        if self.signals.is_empty() {
318            return None;
319        }
320
321        let prev = &self.signals[self.signals.len() - 1];
322        let current_cut = self.compute_min_cut().unwrap_or(0.0);
323        Some(current_cut - prev.min_cut_value)
324    }
325
326    /// Detect coherence events between windows
327    pub fn detect_events(&self, threshold: f64) -> Vec<CoherenceEvent> {
328        let mut events = Vec::new();
329
330        for i in 1..self.signals.len() {
331            let prev = &self.signals[i - 1];
332            let curr = &self.signals[i];
333
334            if let Some(delta) = curr.delta {
335                if delta.abs() > threshold {
336                    let event_type = if delta > 0.0 {
337                        CoherenceEventType::Strengthened
338                    } else {
339                        CoherenceEventType::Weakened
340                    };
341
342                    events.push(CoherenceEvent {
343                        event_type,
344                        timestamp: curr.window.start,
345                        nodes: curr.cut_nodes.clone(),
346                        magnitude: delta.abs(),
347                        context: HashMap::new(),
348                    });
349                }
350            }
351        }
352
353        events
354    }
355
356    /// Get historical signals
357    pub fn signals(&self) -> &[CoherenceSignal] {
358        &self.signals
359    }
360
361    /// Get tracked boundaries
362    pub fn boundaries(&self) -> &[CoherenceBoundary] {
363        &self.boundaries
364    }
365
366    /// Clear the graph and signals
367    pub fn clear(&mut self) {
368        self.nodes.clear();
369        self.node_ids.clear();
370        self.edges.clear();
371        self.next_id = 0;
372        self.signals.clear();
373    }
374}
375
376/// Streaming coherence computation for time series
377pub struct StreamingCoherence {
378    engine: CoherenceEngine,
379    window_size: i64,
380    window_step: i64,
381    current_window: Option<TemporalWindow>,
382    window_records: Vec<DataRecord>,
383}
384
385impl StreamingCoherence {
386    /// Create a new streaming coherence computer
387    pub fn new(config: CoherenceConfig) -> Self {
388        let window_size = config.window_size_secs;
389        let window_step = config.window_step_secs;
390
391        Self {
392            engine: CoherenceEngine::new(config),
393            window_size,
394            window_step,
395            current_window: None,
396            window_records: Vec::new(),
397        }
398    }
399
400    /// Process a single record
401    pub fn process(&mut self, record: DataRecord) -> Option<CoherenceSignal> {
402        let ts = record.timestamp;
403
404        // Initialize window if needed
405        if self.current_window.is_none() {
406            self.current_window = Some(TemporalWindow::new(
407                ts,
408                ts + chrono::Duration::seconds(self.window_size),
409                0,
410            ));
411        }
412
413        // Check if record falls in current window
414        {
415            let window = self.current_window.as_ref().unwrap();
416            if window.contains(ts) {
417                self.window_records.push(record);
418                return None;
419            }
420        }
421
422        // Extract values before mutable borrow
423        let (old_start, old_window_id) = {
424            let window = self.current_window.as_ref().unwrap();
425            (window.start, window.window_id)
426        };
427
428        // Window complete, compute signal
429        let signal = self.finalize_window();
430
431        // Start new window
432        let new_start = old_start + chrono::Duration::seconds(self.window_step);
433        self.current_window = Some(TemporalWindow::new(
434            new_start,
435            new_start + chrono::Duration::seconds(self.window_size),
436            old_window_id + 1,
437        ));
438
439        // Add record to new window
440        self.window_records.push(record);
441
442        signal
443    }
444
445    /// Finalize current window and compute signal
446    pub fn finalize_window(&mut self) -> Option<CoherenceSignal> {
447        if self.window_records.is_empty() {
448            return None;
449        }
450
451        self.engine.clear();
452        let signals = self
453            .engine
454            .compute_from_records(&self.window_records)
455            .ok()?;
456        self.window_records.clear();
457
458        signals.into_iter().last()
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465
466    fn make_test_record(id: &str, rels: Vec<(&str, f64)>) -> DataRecord {
467        DataRecord {
468            id: id.to_string(),
469            source: "test".to_string(),
470            record_type: "node".to_string(),
471            timestamp: Utc::now(),
472            data: serde_json::json!({}),
473            embedding: None,
474            relationships: rels
475                .into_iter()
476                .map(|(target, weight)| Relationship {
477                    target_id: target.to_string(),
478                    rel_type: "related".to_string(),
479                    weight,
480                    properties: HashMap::new(),
481                })
482                .collect(),
483        }
484    }
485
486    #[test]
487    fn test_coherence_engine_basic() {
488        let config = CoherenceConfig::default();
489        let mut engine = CoherenceEngine::new(config);
490
491        engine.add_node("A");
492        engine.add_node("B");
493        engine.add_edge("A", "B", 1.0);
494
495        assert_eq!(engine.node_count(), 2);
496        assert_eq!(engine.edge_count(), 1);
497    }
498
499    #[test]
500    fn test_coherence_from_records() {
501        let config = CoherenceConfig::default();
502        let mut engine = CoherenceEngine::new(config);
503
504        let records = vec![
505            make_test_record("A", vec![("B", 1.0), ("C", 0.5)]),
506            make_test_record("B", vec![("C", 1.0)]),
507            make_test_record("C", vec![]),
508        ];
509
510        let signals = engine.compute_from_records(&records).unwrap();
511        assert!(!signals.is_empty());
512        assert_eq!(engine.node_count(), 3);
513    }
514
515    #[test]
516    fn test_event_detection() {
517        let config = CoherenceConfig::default();
518        let engine = CoherenceEngine::new(config);
519
520        // Events require multiple signals to detect changes
521        let events = engine.detect_events(0.1);
522        assert!(events.is_empty());
523    }
524}