1use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8use crate::hnsw::{HnswConfig, HnswIndex, DistanceMetric};
9use crate::ruvector_native::{Domain, SemanticVector};
10use crate::utils::cosine_similarity;
11use crate::{DataRecord, FrameworkError, Result, Relationship, TemporalWindow};
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct CoherenceConfig {
16 pub min_edge_weight: f64,
18
19 pub window_size_secs: i64,
21
22 pub window_step_secs: i64,
24
25 pub approximate: bool,
27
28 pub epsilon: f64,
30
31 pub parallel: bool,
33
34 pub track_boundaries: bool,
36
37 pub similarity_threshold: f64,
39
40 pub use_embeddings: bool,
42
43 pub hnsw_k_neighbors: usize,
45
46 pub hnsw_min_records: usize,
48}
49
50impl Default for CoherenceConfig {
51 fn default() -> Self {
52 Self {
53 min_edge_weight: 0.01,
54 window_size_secs: 86400 * 7, window_step_secs: 86400, approximate: true,
57 epsilon: 0.1,
58 parallel: true,
59 track_boundaries: true,
60 similarity_threshold: 0.5,
61 use_embeddings: true,
62 hnsw_k_neighbors: 50,
63 hnsw_min_records: 100,
64 }
65 }
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct CoherenceSignal {
71 pub id: String,
73
74 pub window: TemporalWindow,
76
77 pub min_cut_value: f64,
79
80 pub node_count: usize,
82
83 pub edge_count: usize,
85
86 pub partition_sizes: Option<(usize, usize)>,
88
89 pub is_exact: bool,
91
92 pub cut_nodes: Vec<String>,
94
95 pub delta: Option<f64>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct CoherenceEvent {
102 pub event_type: CoherenceEventType,
104
105 pub timestamp: DateTime<Utc>,
107
108 pub nodes: Vec<String>,
110
111 pub magnitude: f64,
113
114 pub context: HashMap<String, serde_json::Value>,
116}
117
118#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
120pub enum CoherenceEventType {
121 Strengthened,
123
124 Weakened,
126
127 Split,
129
130 Merged,
132
133 ThresholdCrossed,
135
136 Anomaly,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct CoherenceBoundary {
143 pub id: String,
145
146 pub side_a: Vec<String>,
148
149 pub side_b: Vec<String>,
151
152 pub cut_value: f64,
154
155 pub history: Vec<(DateTime<Utc>, f64)>,
157
158 pub first_seen: DateTime<Utc>,
160
161 pub last_updated: DateTime<Utc>,
163
164 pub stable: bool,
166}
167
168pub struct CoherenceEngine {
170 config: CoherenceConfig,
171
172 nodes: HashMap<String, u64>,
174 node_ids: HashMap<u64, String>,
175 edges: Vec<(u64, u64, f64)>,
176 next_id: u64,
177
178 signals: Vec<CoherenceSignal>,
180
181 boundaries: Vec<CoherenceBoundary>,
183}
184
185impl CoherenceEngine {
186 pub fn new(config: CoherenceConfig) -> Self {
188 Self {
189 config,
190 nodes: HashMap::new(),
191 node_ids: HashMap::new(),
192 edges: Vec::new(),
193 next_id: 0,
194 signals: Vec::new(),
195 boundaries: Vec::new(),
196 }
197 }
198
199 pub fn add_node(&mut self, id: &str) -> u64 {
201 if let Some(&node_id) = self.nodes.get(id) {
202 return node_id;
203 }
204
205 let node_id = self.next_id;
206 self.next_id += 1;
207 self.nodes.insert(id.to_string(), node_id);
208 self.node_ids.insert(node_id, id.to_string());
209 node_id
210 }
211
212 pub fn add_edge(&mut self, source: &str, target: &str, weight: f64) {
214 if weight < self.config.min_edge_weight {
215 return;
216 }
217
218 let source_id = self.add_node(source);
219 let target_id = self.add_node(target);
220 self.edges.push((source_id, target_id, weight));
221 }
222
223 pub fn node_count(&self) -> usize {
225 self.nodes.len()
226 }
227
228 pub fn edge_count(&self) -> usize {
230 self.edges.len()
231 }
232
233 pub fn build_from_records(&mut self, records: &[DataRecord]) {
235 for record in records {
237 self.add_node(&record.id);
238
239 for rel in &record.relationships {
240 self.add_edge(&record.id, &rel.target_id, rel.weight);
241 }
242 }
243
244 if self.config.use_embeddings {
246 self.connect_by_embeddings(records);
247 }
248 }
249
250 fn connect_by_embeddings(&mut self, records: &[DataRecord]) {
252 let threshold = self.config.similarity_threshold;
253 let min_weight = self.config.min_edge_weight;
254
255 let embedded: Vec<_> = records.iter()
257 .filter(|r| r.embedding.is_some())
258 .collect();
259
260 if embedded.len() < 2 {
261 return;
262 }
263
264 if embedded.len() >= self.config.hnsw_min_records {
266 self.connect_by_embeddings_hnsw(&embedded, threshold, min_weight);
267 } else {
268 self.connect_by_embeddings_bruteforce(&embedded, threshold, min_weight);
269 }
270 }
271
272 fn connect_by_embeddings_hnsw(&mut self, embedded: &[&DataRecord], threshold: f64, min_weight: f64) {
274 let dim = match &embedded[0].embedding {
275 Some(emb) => emb.len(),
276 None => return,
277 };
278
279 let hnsw_config = HnswConfig {
280 dimension: dim,
281 metric: DistanceMetric::Cosine,
282 m: 16,
283 m_max_0: 32,
284 ef_construction: 200,
285 ef_search: self.config.hnsw_k_neighbors.max(50),
286 ..HnswConfig::default()
287 };
288
289 let mut hnsw = HnswIndex::with_config(hnsw_config);
290
291 for record in embedded.iter() {
292 if let Some(embedding) = &record.embedding {
293 let vector = SemanticVector {
294 id: record.id.clone(),
295 embedding: embedding.clone(),
296 timestamp: record.timestamp,
297 domain: Domain::CrossDomain,
298 metadata: std::collections::HashMap::new(),
299 };
300 let _ = hnsw.insert(vector);
301 }
302 }
303
304 let k = self.config.hnsw_k_neighbors;
305 let threshold_f32 = threshold as f32;
306 let min_weight_f32 = min_weight as f32;
307
308 use std::collections::HashSet;
309 let mut seen: HashSet<(String, String)> = HashSet::new();
310
311 for record in embedded.iter() {
312 if let Some(embedding) = &record.embedding {
313 if let Ok(neighbors) = hnsw.search_knn(embedding, k + 1) {
314 for neighbor in neighbors {
315 if neighbor.external_id == record.id {
316 continue;
317 }
318 if let Some(similarity) = neighbor.similarity {
319 if similarity >= threshold_f32 {
320 let key = if record.id < neighbor.external_id {
321 (record.id.clone(), neighbor.external_id.clone())
322 } else {
323 (neighbor.external_id.clone(), record.id.clone())
324 };
325 if seen.insert(key) {
326 self.add_edge(&record.id, &neighbor.external_id, similarity.max(min_weight_f32) as f64);
327 }
328 }
329 }
330 }
331 }
332 }
333 }
334 }
335
336 fn connect_by_embeddings_bruteforce(&mut self, embedded: &[&DataRecord], threshold: f64, min_weight: f64) {
338 let threshold_f32 = threshold as f32;
339 let min_weight_f32 = min_weight as f32;
340
341 for i in 0..embedded.len() {
342 for j in (i + 1)..embedded.len() {
343 if let (Some(emb_a), Some(emb_b)) =
344 (&embedded[i].embedding, &embedded[j].embedding)
345 {
346 let similarity = cosine_similarity(emb_a, emb_b);
347 if similarity >= threshold_f32 {
348 self.add_edge(
349 &embedded[i].id,
350 &embedded[j].id,
351 similarity.max(min_weight_f32) as f64,
352 );
353 }
354 }
355 }
356 }
357 }
358
359 pub fn compute_from_records(&mut self, records: &[DataRecord]) -> Result<Vec<CoherenceSignal>> {
361 self.build_from_records(records);
362 self.compute_signals()
363 }
364
365 pub fn compute_signals(&mut self) -> Result<Vec<CoherenceSignal>> {
367 if self.nodes.is_empty() {
368 return Ok(vec![]);
369 }
370
371 let min_cut_value = self.compute_min_cut()?;
374
375 let signal = CoherenceSignal {
376 id: format!("signal_{}", self.signals.len()),
377 window: TemporalWindow::new(Utc::now(), Utc::now(), self.signals.len() as u64),
378 min_cut_value,
379 node_count: self.node_count(),
380 edge_count: self.edge_count(),
381 partition_sizes: self.compute_partition_sizes(),
382 is_exact: !self.config.approximate,
383 cut_nodes: self.find_cut_nodes(),
384 delta: self.compute_delta(),
385 };
386
387 self.signals.push(signal.clone());
388 Ok(self.signals.clone())
389 }
390
391 fn compute_min_cut(&self) -> Result<f64> {
393 if self.nodes.len() < 2 {
395 return Ok(f64::INFINITY);
396 }
397
398 let total_weight: f64 = self.edges.iter().map(|(_, _, w)| w).sum();
401
402 let approx_cut = if self.edges.is_empty() {
405 0.0
406 } else {
407 let avg_degree = (2.0 * self.edges.len() as f64) / self.nodes.len() as f64;
408 total_weight / (avg_degree.max(1.0))
409 };
410
411 Ok(approx_cut)
412 }
413
414 fn compute_partition_sizes(&self) -> Option<(usize, usize)> {
416 let n = self.nodes.len();
417 if n < 2 {
418 return None;
419 }
420 Some((n / 2, n - n / 2))
422 }
423
424 fn find_cut_nodes(&self) -> Vec<String> {
426 let mut degrees: HashMap<u64, usize> = HashMap::new();
429
430 for (src, tgt, _) in &self.edges {
431 *degrees.entry(*src).or_default() += 1;
432 *degrees.entry(*tgt).or_default() += 1;
433 }
434
435 let avg_degree = if degrees.is_empty() {
436 0
437 } else {
438 degrees.values().sum::<usize>() / degrees.len()
439 };
440
441 degrees
442 .iter()
443 .filter(|(_, &d)| d > avg_degree * 2)
444 .filter_map(|(&id, _)| self.node_ids.get(&id).cloned())
445 .take(10)
446 .collect()
447 }
448
449 fn compute_delta(&self) -> Option<f64> {
451 if self.signals.is_empty() {
452 return None;
453 }
454
455 let prev = &self.signals[self.signals.len() - 1];
456 let current_cut = self.compute_min_cut().unwrap_or(0.0);
457 Some(current_cut - prev.min_cut_value)
458 }
459
460 pub fn detect_events(&self, threshold: f64) -> Vec<CoherenceEvent> {
462 let mut events = Vec::new();
463
464 for i in 1..self.signals.len() {
465 let prev = &self.signals[i - 1];
466 let curr = &self.signals[i];
467
468 if let Some(delta) = curr.delta {
469 if delta.abs() > threshold {
470 let event_type = if delta > 0.0 {
471 CoherenceEventType::Strengthened
472 } else {
473 CoherenceEventType::Weakened
474 };
475
476 events.push(CoherenceEvent {
477 event_type,
478 timestamp: curr.window.start,
479 nodes: curr.cut_nodes.clone(),
480 magnitude: delta.abs(),
481 context: HashMap::new(),
482 });
483 }
484 }
485 }
486
487 events
488 }
489
490 pub fn signals(&self) -> &[CoherenceSignal] {
492 &self.signals
493 }
494
495 pub fn boundaries(&self) -> &[CoherenceBoundary] {
497 &self.boundaries
498 }
499
500 pub fn clear(&mut self) {
502 self.nodes.clear();
503 self.node_ids.clear();
504 self.edges.clear();
505 self.next_id = 0;
506 self.signals.clear();
507 }
508}
509
510pub struct StreamingCoherence {
512 engine: CoherenceEngine,
513 window_size: i64,
514 window_step: i64,
515 current_window: Option<TemporalWindow>,
516 window_records: Vec<DataRecord>,
517}
518
519impl StreamingCoherence {
520 pub fn new(config: CoherenceConfig) -> Self {
522 let window_size = config.window_size_secs;
523 let window_step = config.window_step_secs;
524
525 Self {
526 engine: CoherenceEngine::new(config),
527 window_size,
528 window_step,
529 current_window: None,
530 window_records: Vec::new(),
531 }
532 }
533
534 pub fn process(&mut self, record: DataRecord) -> Option<CoherenceSignal> {
536 let ts = record.timestamp;
537
538 if self.current_window.is_none() {
540 self.current_window = Some(TemporalWindow::new(
541 ts,
542 ts + chrono::Duration::seconds(self.window_size),
543 0,
544 ));
545 }
546
547 {
549 let window = self.current_window.as_ref().unwrap();
550 if window.contains(ts) {
551 self.window_records.push(record);
552 return None;
553 }
554 }
555
556 let (old_start, old_window_id) = {
558 let window = self.current_window.as_ref().unwrap();
559 (window.start, window.window_id)
560 };
561
562 let signal = self.finalize_window();
564
565 let new_start = old_start + chrono::Duration::seconds(self.window_step);
567 self.current_window = Some(TemporalWindow::new(
568 new_start,
569 new_start + chrono::Duration::seconds(self.window_size),
570 old_window_id + 1,
571 ));
572
573 self.window_records.push(record);
575
576 signal
577 }
578
579 pub fn finalize_window(&mut self) -> Option<CoherenceSignal> {
581 if self.window_records.is_empty() {
582 return None;
583 }
584
585 self.engine.clear();
586 let signals = self
587 .engine
588 .compute_from_records(&self.window_records)
589 .ok()?;
590 self.window_records.clear();
591
592 signals.into_iter().last()
593 }
594}
595
596#[cfg(test)]
597mod tests {
598 use super::*;
599
600 fn make_test_record(id: &str, rels: Vec<(&str, f64)>) -> DataRecord {
601 DataRecord {
602 id: id.to_string(),
603 source: "test".to_string(),
604 record_type: "node".to_string(),
605 timestamp: Utc::now(),
606 data: serde_json::json!({}),
607 embedding: None,
608 relationships: rels
609 .into_iter()
610 .map(|(target, weight)| Relationship {
611 target_id: target.to_string(),
612 rel_type: "related".to_string(),
613 weight,
614 properties: HashMap::new(),
615 })
616 .collect(),
617 }
618 }
619
620 #[test]
621 fn test_coherence_engine_basic() {
622 let config = CoherenceConfig::default();
623 let mut engine = CoherenceEngine::new(config);
624
625 engine.add_node("A");
626 engine.add_node("B");
627 engine.add_edge("A", "B", 1.0);
628
629 assert_eq!(engine.node_count(), 2);
630 assert_eq!(engine.edge_count(), 1);
631 }
632
633 #[test]
634 fn test_coherence_from_records() {
635 let config = CoherenceConfig::default();
636 let mut engine = CoherenceEngine::new(config);
637
638 let records = vec![
639 make_test_record("A", vec![("B", 1.0), ("C", 0.5)]),
640 make_test_record("B", vec![("C", 1.0)]),
641 make_test_record("C", vec![]),
642 ];
643
644 let signals = engine.compute_from_records(&records).unwrap();
645 assert!(!signals.is_empty());
646 assert_eq!(engine.node_count(), 3);
647 }
648
649 #[test]
650 fn test_event_detection() {
651 let config = CoherenceConfig::default();
652 let engine = CoherenceEngine::new(config);
653
654 let events = engine.detect_events(0.1);
656 assert!(events.is_empty());
657 }
658}