1use std::collections::HashMap;
4
5use chrono::{DateTime, Utc};
6use serde::{Deserialize, Serialize};
7
8use crate::{DataRecord, FrameworkError, Result, Relationship, TemporalWindow};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct CoherenceConfig {
13 pub min_edge_weight: f64,
15
16 pub window_size_secs: i64,
18
19 pub window_step_secs: i64,
21
22 pub approximate: bool,
24
25 pub epsilon: f64,
27
28 pub parallel: bool,
30
31 pub track_boundaries: bool,
33}
34
35impl Default for CoherenceConfig {
36 fn default() -> Self {
37 Self {
38 min_edge_weight: 0.01,
39 window_size_secs: 86400 * 7, window_step_secs: 86400, approximate: true,
42 epsilon: 0.1,
43 parallel: true,
44 track_boundaries: true,
45 }
46 }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct CoherenceSignal {
52 pub id: String,
54
55 pub window: TemporalWindow,
57
58 pub min_cut_value: f64,
60
61 pub node_count: usize,
63
64 pub edge_count: usize,
66
67 pub partition_sizes: Option<(usize, usize)>,
69
70 pub is_exact: bool,
72
73 pub cut_nodes: Vec<String>,
75
76 pub delta: Option<f64>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct CoherenceEvent {
83 pub event_type: CoherenceEventType,
85
86 pub timestamp: DateTime<Utc>,
88
89 pub nodes: Vec<String>,
91
92 pub magnitude: f64,
94
95 pub context: HashMap<String, serde_json::Value>,
97}
98
99#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
101pub enum CoherenceEventType {
102 Strengthened,
104
105 Weakened,
107
108 Split,
110
111 Merged,
113
114 ThresholdCrossed,
116
117 Anomaly,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
123pub struct CoherenceBoundary {
124 pub id: String,
126
127 pub side_a: Vec<String>,
129
130 pub side_b: Vec<String>,
132
133 pub cut_value: f64,
135
136 pub history: Vec<(DateTime<Utc>, f64)>,
138
139 pub first_seen: DateTime<Utc>,
141
142 pub last_updated: DateTime<Utc>,
144
145 pub stable: bool,
147}
148
149pub struct CoherenceEngine {
151 config: CoherenceConfig,
152
153 nodes: HashMap<String, u64>,
155 node_ids: HashMap<u64, String>,
156 edges: Vec<(u64, u64, f64)>,
157 next_id: u64,
158
159 signals: Vec<CoherenceSignal>,
161
162 boundaries: Vec<CoherenceBoundary>,
164}
165
166impl CoherenceEngine {
167 pub fn new(config: CoherenceConfig) -> Self {
169 Self {
170 config,
171 nodes: HashMap::new(),
172 node_ids: HashMap::new(),
173 edges: Vec::new(),
174 next_id: 0,
175 signals: Vec::new(),
176 boundaries: Vec::new(),
177 }
178 }
179
180 pub fn add_node(&mut self, id: &str) -> u64 {
182 if let Some(&node_id) = self.nodes.get(id) {
183 return node_id;
184 }
185
186 let node_id = self.next_id;
187 self.next_id += 1;
188 self.nodes.insert(id.to_string(), node_id);
189 self.node_ids.insert(node_id, id.to_string());
190 node_id
191 }
192
193 pub fn add_edge(&mut self, source: &str, target: &str, weight: f64) {
195 if weight < self.config.min_edge_weight {
196 return;
197 }
198
199 let source_id = self.add_node(source);
200 let target_id = self.add_node(target);
201 self.edges.push((source_id, target_id, weight));
202 }
203
204 pub fn node_count(&self) -> usize {
206 self.nodes.len()
207 }
208
209 pub fn edge_count(&self) -> usize {
211 self.edges.len()
212 }
213
214 pub fn build_from_records(&mut self, records: &[DataRecord]) {
216 for record in records {
217 self.add_node(&record.id);
218
219 for rel in &record.relationships {
220 self.add_edge(&record.id, &rel.target_id, rel.weight);
221 }
222 }
223 }
224
225 pub fn compute_from_records(&mut self, records: &[DataRecord]) -> Result<Vec<CoherenceSignal>> {
227 self.build_from_records(records);
228 self.compute_signals()
229 }
230
231 pub fn compute_signals(&mut self) -> Result<Vec<CoherenceSignal>> {
233 if self.nodes.is_empty() {
234 return Ok(vec![]);
235 }
236
237 let min_cut_value = self.compute_min_cut()?;
240
241 let signal = CoherenceSignal {
242 id: format!("signal_{}", self.signals.len()),
243 window: TemporalWindow::new(Utc::now(), Utc::now(), self.signals.len() as u64),
244 min_cut_value,
245 node_count: self.node_count(),
246 edge_count: self.edge_count(),
247 partition_sizes: self.compute_partition_sizes(),
248 is_exact: !self.config.approximate,
249 cut_nodes: self.find_cut_nodes(),
250 delta: self.compute_delta(),
251 };
252
253 self.signals.push(signal.clone());
254 Ok(self.signals.clone())
255 }
256
257 fn compute_min_cut(&self) -> Result<f64> {
259 if self.nodes.len() < 2 {
261 return Ok(f64::INFINITY);
262 }
263
264 let total_weight: f64 = self.edges.iter().map(|(_, _, w)| w).sum();
267
268 let approx_cut = if self.edges.is_empty() {
271 0.0
272 } else {
273 let avg_degree = (2.0 * self.edges.len() as f64) / self.nodes.len() as f64;
274 total_weight / (avg_degree.max(1.0))
275 };
276
277 Ok(approx_cut)
278 }
279
280 fn compute_partition_sizes(&self) -> Option<(usize, usize)> {
282 let n = self.nodes.len();
283 if n < 2 {
284 return None;
285 }
286 Some((n / 2, n - n / 2))
288 }
289
290 fn find_cut_nodes(&self) -> Vec<String> {
292 let mut degrees: HashMap<u64, usize> = HashMap::new();
295
296 for (src, tgt, _) in &self.edges {
297 *degrees.entry(*src).or_default() += 1;
298 *degrees.entry(*tgt).or_default() += 1;
299 }
300
301 let avg_degree = if degrees.is_empty() {
302 0
303 } else {
304 degrees.values().sum::<usize>() / degrees.len()
305 };
306
307 degrees
308 .iter()
309 .filter(|(_, &d)| d > avg_degree * 2)
310 .filter_map(|(&id, _)| self.node_ids.get(&id).cloned())
311 .take(10)
312 .collect()
313 }
314
315 fn compute_delta(&self) -> Option<f64> {
317 if self.signals.is_empty() {
318 return None;
319 }
320
321 let prev = &self.signals[self.signals.len() - 1];
322 let current_cut = self.compute_min_cut().unwrap_or(0.0);
323 Some(current_cut - prev.min_cut_value)
324 }
325
326 pub fn detect_events(&self, threshold: f64) -> Vec<CoherenceEvent> {
328 let mut events = Vec::new();
329
330 for i in 1..self.signals.len() {
331 let prev = &self.signals[i - 1];
332 let curr = &self.signals[i];
333
334 if let Some(delta) = curr.delta {
335 if delta.abs() > threshold {
336 let event_type = if delta > 0.0 {
337 CoherenceEventType::Strengthened
338 } else {
339 CoherenceEventType::Weakened
340 };
341
342 events.push(CoherenceEvent {
343 event_type,
344 timestamp: curr.window.start,
345 nodes: curr.cut_nodes.clone(),
346 magnitude: delta.abs(),
347 context: HashMap::new(),
348 });
349 }
350 }
351 }
352
353 events
354 }
355
356 pub fn signals(&self) -> &[CoherenceSignal] {
358 &self.signals
359 }
360
361 pub fn boundaries(&self) -> &[CoherenceBoundary] {
363 &self.boundaries
364 }
365
366 pub fn clear(&mut self) {
368 self.nodes.clear();
369 self.node_ids.clear();
370 self.edges.clear();
371 self.next_id = 0;
372 self.signals.clear();
373 }
374}
375
376pub struct StreamingCoherence {
378 engine: CoherenceEngine,
379 window_size: i64,
380 window_step: i64,
381 current_window: Option<TemporalWindow>,
382 window_records: Vec<DataRecord>,
383}
384
385impl StreamingCoherence {
386 pub fn new(config: CoherenceConfig) -> Self {
388 let window_size = config.window_size_secs;
389 let window_step = config.window_step_secs;
390
391 Self {
392 engine: CoherenceEngine::new(config),
393 window_size,
394 window_step,
395 current_window: None,
396 window_records: Vec::new(),
397 }
398 }
399
400 pub fn process(&mut self, record: DataRecord) -> Option<CoherenceSignal> {
402 let ts = record.timestamp;
403
404 if self.current_window.is_none() {
406 self.current_window = Some(TemporalWindow::new(
407 ts,
408 ts + chrono::Duration::seconds(self.window_size),
409 0,
410 ));
411 }
412
413 {
415 let window = self.current_window.as_ref().unwrap();
416 if window.contains(ts) {
417 self.window_records.push(record);
418 return None;
419 }
420 }
421
422 let (old_start, old_window_id) = {
424 let window = self.current_window.as_ref().unwrap();
425 (window.start, window.window_id)
426 };
427
428 let signal = self.finalize_window();
430
431 let new_start = old_start + chrono::Duration::seconds(self.window_step);
433 self.current_window = Some(TemporalWindow::new(
434 new_start,
435 new_start + chrono::Duration::seconds(self.window_size),
436 old_window_id + 1,
437 ));
438
439 self.window_records.push(record);
441
442 signal
443 }
444
445 pub fn finalize_window(&mut self) -> Option<CoherenceSignal> {
447 if self.window_records.is_empty() {
448 return None;
449 }
450
451 self.engine.clear();
452 let signals = self
453 .engine
454 .compute_from_records(&self.window_records)
455 .ok()?;
456 self.window_records.clear();
457
458 signals.into_iter().last()
459 }
460}
461
462#[cfg(test)]
463mod tests {
464 use super::*;
465
466 fn make_test_record(id: &str, rels: Vec<(&str, f64)>) -> DataRecord {
467 DataRecord {
468 id: id.to_string(),
469 source: "test".to_string(),
470 record_type: "node".to_string(),
471 timestamp: Utc::now(),
472 data: serde_json::json!({}),
473 embedding: None,
474 relationships: rels
475 .into_iter()
476 .map(|(target, weight)| Relationship {
477 target_id: target.to_string(),
478 rel_type: "related".to_string(),
479 weight,
480 properties: HashMap::new(),
481 })
482 .collect(),
483 }
484 }
485
486 #[test]
487 fn test_coherence_engine_basic() {
488 let config = CoherenceConfig::default();
489 let mut engine = CoherenceEngine::new(config);
490
491 engine.add_node("A");
492 engine.add_node("B");
493 engine.add_edge("A", "B", 1.0);
494
495 assert_eq!(engine.node_count(), 2);
496 assert_eq!(engine.edge_count(), 1);
497 }
498
499 #[test]
500 fn test_coherence_from_records() {
501 let config = CoherenceConfig::default();
502 let mut engine = CoherenceEngine::new(config);
503
504 let records = vec![
505 make_test_record("A", vec![("B", 1.0), ("C", 0.5)]),
506 make_test_record("B", vec![("C", 1.0)]),
507 make_test_record("C", vec![]),
508 ];
509
510 let signals = engine.compute_from_records(&records).unwrap();
511 assert!(!signals.is_empty());
512 assert_eq!(engine.node_count(), 3);
513 }
514
515 #[test]
516 fn test_event_detection() {
517 let config = CoherenceConfig::default();
518 let engine = CoherenceEngine::new(config);
519
520 let events = engine.detect_events(0.1);
522 assert!(events.is_empty());
523 }
524}