Skip to main content

openentropy_core/
synchrony.rs

1//! # Synchrony Analysis
2//!
3//! Multi-source coordination methods for byte-stream synchrony and event detection.
4
5use serde::Serialize;
6
7#[derive(Debug, Clone, Serialize)]
8pub struct MutualInfoResult {
9    pub mutual_information: f64,
10    pub normalized_mi: f64,
11    pub entropy_a: f64,
12    pub entropy_b: f64,
13    pub joint_entropy: f64,
14    pub is_valid: bool,
15}
16
17#[derive(Debug, Clone, Serialize)]
18pub struct PhaseCoherenceResult {
19    pub coherence: f64,
20    pub mean_phase_diff: f64,
21    pub phase_std: f64,
22    pub is_coherent: bool,
23    pub is_valid: bool,
24}
25
26#[derive(Debug, Clone, Serialize)]
27pub struct CrossSyncResult {
28    pub correlation: f64,
29    pub lag_at_max: i64,
30    pub max_cross_correlation: f64,
31    pub is_synchronized: bool,
32    pub is_valid: bool,
33}
34
35#[derive(Debug, Clone, Serialize)]
36pub struct GlobalEvent {
37    pub offset: usize,
38    pub n_streams_affected: usize,
39    pub mean_deviation: f64,
40}
41
42#[derive(Debug, Clone, Serialize)]
43pub struct GlobalEventResult {
44    pub events: Vec<GlobalEvent>,
45    pub n_events: usize,
46    pub event_rate: f64,
47    pub is_valid: bool,
48}
49
50#[derive(Debug, Clone, Serialize)]
51pub struct SynchronyAnalysis {
52    pub mutual_info: MutualInfoResult,
53    pub phase_coherence: PhaseCoherenceResult,
54    pub cross_sync: CrossSyncResult,
55}
56
57pub fn synchrony_analysis(data_a: &[u8], data_b: &[u8]) -> SynchronyAnalysis {
58    SynchronyAnalysis {
59        mutual_info: mutual_information(data_a, data_b),
60        phase_coherence: phase_coherence(data_a, data_b),
61        cross_sync: cross_sync(data_a, data_b),
62    }
63}
64
65fn mean(values: &[f64]) -> f64 {
66    values.iter().sum::<f64>() / values.len() as f64
67}
68
69fn std_dev(values: &[f64], avg: f64) -> f64 {
70    let var = values
71        .iter()
72        .map(|v| {
73            let d = *v - avg;
74            d * d
75        })
76        .sum::<f64>()
77        / values.len() as f64;
78    var.sqrt()
79}
80
81fn pearson_corr(a: &[f64], b: &[f64]) -> f64 {
82    let n = a.len().min(b.len());
83    if n < 2 {
84        return 0.0;
85    }
86    let a = &a[..n];
87    let b = &b[..n];
88    let mean_a = mean(a);
89    let mean_b = mean(b);
90
91    let mut num = 0.0;
92    let mut den_a = 0.0;
93    let mut den_b = 0.0;
94    for i in 0..n {
95        let da = a[i] - mean_a;
96        let db = b[i] - mean_b;
97        num += da * db;
98        den_a += da * da;
99        den_b += db * db;
100    }
101
102    let den = (den_a * den_b).sqrt();
103    if den <= f64::EPSILON {
104        0.0
105    } else {
106        (num / den).clamp(-1.0, 1.0)
107    }
108}
109
110pub fn mutual_information(data_a: &[u8], data_b: &[u8]) -> MutualInfoResult {
111    let n = data_a.len().min(data_b.len());
112    if n < 2 {
113        return MutualInfoResult {
114            mutual_information: 0.0,
115            normalized_mi: 0.0,
116            entropy_a: 0.0,
117            entropy_b: 0.0,
118            joint_entropy: 0.0,
119            is_valid: false,
120        };
121    }
122
123    let mut hist_a = [0_u64; 256];
124    let mut hist_b = [0_u64; 256];
125    let mut joint = vec![0_u64; 256 * 256];
126    for i in 0..n {
127        let a = data_a[i] as usize;
128        let b = data_b[i] as usize;
129        hist_a[a] += 1;
130        hist_b[b] += 1;
131        joint[a * 256 + b] += 1;
132    }
133
134    let n_f = n as f64;
135    let entropy_a = hist_a
136        .iter()
137        .filter(|&&c| c > 0)
138        .map(|&c| {
139            let p = c as f64 / n_f;
140            -p * p.ln()
141        })
142        .sum::<f64>();
143
144    let entropy_b = hist_b
145        .iter()
146        .filter(|&&c| c > 0)
147        .map(|&c| {
148            let p = c as f64 / n_f;
149            -p * p.ln()
150        })
151        .sum::<f64>();
152
153    let joint_entropy = joint
154        .iter()
155        .filter(|&&c| c > 0)
156        .map(|&c| {
157            let p = c as f64 / n_f;
158            -p * p.ln()
159        })
160        .sum::<f64>();
161
162    let mutual_information = (entropy_a + entropy_b - joint_entropy).max(0.0);
163    let denom = (entropy_a * entropy_b).sqrt();
164    let normalized_mi = if denom <= f64::EPSILON {
165        0.0
166    } else {
167        (mutual_information / denom).clamp(0.0, 1.0)
168    };
169
170    MutualInfoResult {
171        mutual_information,
172        normalized_mi,
173        entropy_a,
174        entropy_b,
175        joint_entropy,
176        is_valid: true,
177    }
178}
179
180pub fn phase_coherence(data_a: &[u8], data_b: &[u8]) -> PhaseCoherenceResult {
181    let n = data_a.len().min(data_b.len());
182    if n < 2 {
183        return PhaseCoherenceResult {
184            coherence: 0.0,
185            mean_phase_diff: 0.0,
186            phase_std: 0.0,
187            is_coherent: false,
188            is_valid: false,
189        };
190    }
191
192    let mean_a = data_a[..n].iter().map(|&x| x as f64).sum::<f64>() / n as f64;
193    let mean_b = data_b[..n].iter().map(|&x| x as f64).sum::<f64>() / n as f64;
194
195    let mut diffs = Vec::with_capacity(n);
196    for i in 0..n {
197        let sign_a = if data_a[i] as f64 > mean_a { 1.0 } else { -1.0 };
198        let sign_b = if data_b[i] as f64 > mean_b { 1.0 } else { -1.0 };
199        diffs.push(sign_a * sign_b);
200    }
201
202    let mean_phase_diff = mean(&diffs);
203    let phase_std = std_dev(&diffs, mean_phase_diff);
204    let coherence = mean_phase_diff.abs();
205
206    PhaseCoherenceResult {
207        coherence,
208        mean_phase_diff,
209        phase_std,
210        is_coherent: coherence > 0.5,
211        is_valid: true,
212    }
213}
214
215pub fn cross_sync(data_a: &[u8], data_b: &[u8]) -> CrossSyncResult {
216    let n = data_a.len().min(data_b.len());
217    if n < 2 {
218        return CrossSyncResult {
219            correlation: 0.0,
220            lag_at_max: 0,
221            max_cross_correlation: 0.0,
222            is_synchronized: false,
223            is_valid: false,
224        };
225    }
226
227    let step = n.div_ceil(2000);
228    let mut a = Vec::new();
229    let mut b = Vec::new();
230    let mut i = 0;
231    while i < n {
232        a.push(data_a[i] as f64);
233        b.push(data_b[i] as f64);
234        i += step;
235    }
236    let m = a.len().min(b.len());
237    if m < 2 {
238        return CrossSyncResult {
239            correlation: 0.0,
240            lag_at_max: 0,
241            max_cross_correlation: 0.0,
242            is_synchronized: false,
243            is_valid: false,
244        };
245    }
246
247    let a = &a[..m];
248    let b = &b[..m];
249    let correlation = pearson_corr(a, b);
250
251    let mut lag_at_max = 0_i64;
252    let mut max_cross_correlation = correlation;
253    let mut best_abs = correlation.abs();
254
255    for lag in -10_i64..=10_i64 {
256        let (aa, bb): (&[f64], &[f64]) = if lag < 0 {
257            let l = (-lag) as usize;
258            if l >= m {
259                continue;
260            }
261            (&a[..m - l], &b[l..m])
262        } else {
263            let l = lag as usize;
264            if l >= m {
265                continue;
266            }
267            (&a[l..m], &b[..m - l])
268        };
269        if aa.len() < 2 || bb.len() < 2 {
270            continue;
271        }
272        let c = pearson_corr(aa, bb);
273        let abs_c = c.abs();
274        if abs_c > best_abs {
275            best_abs = abs_c;
276            max_cross_correlation = c;
277            lag_at_max = lag;
278        }
279    }
280
281    CrossSyncResult {
282        correlation,
283        lag_at_max,
284        max_cross_correlation,
285        is_synchronized: max_cross_correlation.abs() > 0.3,
286        is_valid: true,
287    }
288}
289
290pub fn global_event_detection(streams: &[&[u8]]) -> GlobalEventResult {
291    if streams.len() < 2 {
292        return GlobalEventResult {
293            events: Vec::new(),
294            n_events: 0,
295            event_rate: 0.0,
296            is_valid: false,
297        };
298    }
299
300    let min_len = streams.iter().map(|s| s.len()).min().unwrap_or(0);
301    if min_len == 0 {
302        return GlobalEventResult {
303            events: Vec::new(),
304            n_events: 0,
305            event_rate: 0.0,
306            is_valid: false,
307        };
308    }
309
310    let window_size = 100;
311    let total_windows = min_len / window_size;
312    if total_windows == 0 {
313        return GlobalEventResult {
314            events: Vec::new(),
315            n_events: 0,
316            event_rate: 0.0,
317            is_valid: false,
318        };
319    }
320
321    let n_streams = streams.len();
322    let mut stream_window_means = vec![vec![0.0_f64; total_windows]; n_streams];
323    for (si, stream) in streams.iter().enumerate() {
324        #[allow(clippy::needless_range_loop)]
325        for w in 0..total_windows {
326            let start = w * window_size;
327            let end = start + window_size;
328            let avg =
329                stream[start..end].iter().map(|&x| x as f64).sum::<f64>() / window_size as f64;
330            stream_window_means[si][w] = avg;
331        }
332    }
333
334    let mut stream_global_means = vec![0.0_f64; n_streams];
335    let mut stream_global_stds = vec![0.0_f64; n_streams];
336    for si in 0..n_streams {
337        let m = mean(&stream_window_means[si]);
338        let s = std_dev(&stream_window_means[si], m);
339        stream_global_means[si] = m;
340        stream_global_stds[si] = s;
341    }
342
343    let mut events = Vec::new();
344    #[allow(clippy::needless_range_loop)]
345    for w in 0..total_windows {
346        let mut affected = 0_usize;
347        let mut dev_sum = 0.0_f64;
348        for si in 0..n_streams {
349            let s = stream_global_stds[si];
350            if s <= f64::EPSILON {
351                continue;
352            }
353            let d = (stream_window_means[si][w] - stream_global_means[si]).abs();
354            if d > 2.0 * s {
355                affected += 1;
356                dev_sum += d;
357            }
358        }
359        if affected * 2 >= n_streams {
360            events.push(GlobalEvent {
361                offset: w * window_size,
362                n_streams_affected: affected,
363                mean_deviation: if affected > 0 {
364                    dev_sum / affected as f64
365                } else {
366                    0.0
367                },
368            });
369        }
370    }
371
372    let n_events = events.len();
373    GlobalEventResult {
374        events,
375        n_events,
376        event_rate: n_events as f64 / total_windows as f64,
377        is_valid: true,
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    fn random_data_seeded(len: usize, seed: u64) -> Vec<u8> {
386        let mut state = seed;
387        let mut data = Vec::with_capacity(len);
388        for _ in 0..len {
389            state = state
390                .wrapping_mul(6364136223846793005)
391                .wrapping_add(1442695040888963407);
392            data.push((state >> 33) as u8);
393        }
394        data
395    }
396
397    #[test]
398    fn mutual_information_identical_high_nmi() {
399        let data = random_data_seeded(5000, 1);
400        let result = mutual_information(&data, &data);
401        assert!(result.is_valid);
402        assert!((result.normalized_mi - 1.0).abs() < 0.02);
403    }
404
405    #[test]
406    fn mutual_information_independent_low_nmi() {
407        let data_a = random_data_seeded(5000, 1);
408        let data_b = random_data_seeded(5000, 2);
409        let result = mutual_information(&data_a, &data_b);
410        assert!(result.is_valid);
411        assert!(result.normalized_mi < 0.5);
412    }
413
414    #[test]
415    fn mutual_information_empty_invalid() {
416        let result = mutual_information(&[], &[]);
417        assert!(!result.is_valid);
418    }
419
420    #[test]
421    fn mutual_information_single_byte_invalid() {
422        let result = mutual_information(&[1], &[1]);
423        assert!(!result.is_valid);
424    }
425
426    #[test]
427    fn phase_coherence_identical_high() {
428        let data = random_data_seeded(3000, 11);
429        let result = phase_coherence(&data, &data);
430        assert!(result.is_valid);
431        assert!((result.coherence - 1.0).abs() < 1e-9);
432        assert!(result.mean_phase_diff > 0.95);
433    }
434
435    #[test]
436    fn phase_coherence_independent_low() {
437        let data_a = random_data_seeded(3000, 11);
438        let data_b = random_data_seeded(3000, 91);
439        let result = phase_coherence(&data_a, &data_b);
440        assert!(result.is_valid);
441        assert!(result.coherence < 0.3);
442    }
443
444    #[test]
445    fn phase_coherence_empty_invalid() {
446        let result = phase_coherence(&[], &[]);
447        assert!(!result.is_valid);
448    }
449
450    #[test]
451    fn phase_coherence_anti_correlated_high() {
452        let mut data_a = Vec::new();
453        let mut data_b = Vec::new();
454        for i in 0..3000 {
455            if i % 2 == 0 {
456                data_a.push(240);
457                data_b.push(20);
458            } else {
459                data_a.push(20);
460                data_b.push(240);
461            }
462        }
463        let result = phase_coherence(&data_a, &data_b);
464        assert!(result.is_valid);
465        assert!((result.coherence - 1.0).abs() < 1e-9);
466        assert!(result.mean_phase_diff < -0.95);
467    }
468
469    #[test]
470    fn cross_sync_identical_high_corr() {
471        let data = random_data_seeded(4000, 7);
472        let result = cross_sync(&data, &data);
473        assert!(result.is_valid);
474        assert!((result.correlation - 1.0).abs() < 1e-9);
475    }
476
477    #[test]
478    fn cross_sync_independent_low_corr() {
479        let data_a = random_data_seeded(4000, 7);
480        let data_b = random_data_seeded(4000, 13);
481        let result = cross_sync(&data_a, &data_b);
482        assert!(result.is_valid);
483        assert!(result.correlation.abs() < 0.2);
484    }
485
486    #[test]
487    fn cross_sync_empty_invalid() {
488        let result = cross_sync(&[], &[]);
489        assert!(!result.is_valid);
490    }
491
492    #[test]
493    fn cross_sync_lagged_copy_nonzero_lag() {
494        let base = random_data_seeded(5000, 88);
495        let mut lagged = vec![0_u8; 5];
496        lagged.extend(base.iter().copied().take(4995));
497        let result = cross_sync(&base, &lagged);
498        assert!(result.is_valid);
499        assert_ne!(result.lag_at_max, 0);
500    }
501
502    #[test]
503    fn global_event_independent_low_rate() {
504        let a = random_data_seeded(3000, 31);
505        let b = random_data_seeded(3000, 37);
506        let c = random_data_seeded(3000, 41);
507        let streams: [&[u8]; 3] = [&a, &b, &c];
508        let result = global_event_detection(&streams);
509        assert!(result.is_valid);
510        assert!(result.event_rate < 0.2);
511    }
512
513    #[test]
514    fn global_event_injected_spike_detected() {
515        let mut a = random_data_seeded(3000, 31);
516        let mut b = random_data_seeded(3000, 37);
517        let mut c = random_data_seeded(3000, 41);
518        for i in 1000..1100 {
519            a[i] = 255;
520            b[i] = 255;
521            c[i] = 255;
522        }
523        let streams: [&[u8]; 3] = [&a, &b, &c];
524        let result = global_event_detection(&streams);
525        assert!(result.is_valid);
526        assert!(result.n_events >= 1);
527    }
528
529    #[test]
530    fn global_event_less_than_two_streams_invalid() {
531        let a = random_data_seeded(1000, 1);
532        let streams: [&[u8]; 1] = [&a];
533        let result = global_event_detection(&streams);
534        assert!(!result.is_valid);
535    }
536
537    #[test]
538    fn global_event_empty_invalid() {
539        let a: [u8; 0] = [];
540        let b: [u8; 0] = [];
541        let streams: [&[u8]; 2] = [&a, &b];
542        let result = global_event_detection(&streams);
543        assert!(!result.is_valid);
544    }
545
546    #[test]
547    fn test_synchrony_analysis_serializes() {
548        let data_a = random_data_seeded(5000, 0xdeadbeef);
549        let data_b = random_data_seeded(5000, 0xcafebabe);
550        let result = synchrony_analysis(&data_a, &data_b);
551        let json = serde_json::to_string(&result).expect("serialization failed");
552        assert!(json.contains("mutual_info"));
553        assert!(json.contains("cross_sync"));
554    }
555}