use std::collections::HashMap;
use std::time::Instant;
use chrono::Utc;
use rand::Rng;
use tokio;
use ruvector_data_framework::{
PubMedClient, BiorxivClient, CrossRefClient,
FrameworkError, Result,
};
use ruvector_data_framework::optimized::{
OptimizedDiscoveryEngine, OptimizedConfig, SignificantPattern, simd_cosine_similarity,
};
use ruvector_data_framework::ruvector_native::{Domain, SemanticVector};
use ruvector_data_framework::export::export_patterns_with_evidence_csv;
#[derive(Debug, Default)]
struct RunnerMetrics {
fetch_time_ms: u64,
embedding_time_ms: u64,
graph_build_time_ms: u64,
coherence_time_ms: u64,
pattern_detection_time_ms: u64,
total_time_ms: u64,
vectors_processed: usize,
edges_created: usize,
patterns_discovered: usize,
vectors_per_sec: f64,
edges_per_sec: f64,
}
struct PhaseTimer {
name: &'static str,
start: Instant,
}
impl PhaseTimer {
fn new(name: &'static str) -> Self {
println!("\n⚡ Phase {}: Starting...", name);
Self {
name,
start: Instant::now(),
}
}
fn finish(self) -> u64 {
let elapsed = self.start.elapsed();
let ms = elapsed.as_millis() as u64;
println!("✓ Phase {} completed in {:.2}s ({} ms)",
self.name, elapsed.as_secs_f64(), ms);
ms
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
println!("╔══════════════════════════════════════════════════════════════╗");
println!("║ RuVector Optimized Multi-Source Discovery Runner ║");
println!("║ Parallel Fetch | SIMD Vectors | Statistical Patterns ║");
println!("╚══════════════════════════════════════════════════════════════╝\n");
let mut metrics = RunnerMetrics::default();
let total_timer = Instant::now();
let vectors = {
let _timer = PhaseTimer::new("1: Parallel Data Fetching");
let fetch_start = Instant::now();
let vectors = fetch_all_sources_parallel().await?;
metrics.fetch_time_ms = fetch_start.elapsed().as_millis() as u64;
metrics.vectors_processed = vectors.len();
println!(" → Fetched {} vectors from 5 sources", vectors.len());
vectors
};
let mut engine = {
let _timer = PhaseTimer::new("2: SIMD-Accelerated Graph Building");
let build_start = Instant::now();
let config = OptimizedConfig {
similarity_threshold: 0.65,
mincut_sensitivity: 0.12,
cross_domain: true,
batch_size: 256,
use_simd: true,
similarity_cache_size: 10000,
significance_threshold: 0.05,
causality_lookback: 10,
causality_min_correlation: 0.6,
};
let mut engine = OptimizedDiscoveryEngine::new(config);
#[cfg(feature = "parallel")]
{
engine.add_vectors_batch(vectors);
}
#[cfg(not(feature = "parallel"))]
{
for vector in vectors {
engine.add_vector(vector);
}
}
metrics.graph_build_time_ms = build_start.elapsed().as_millis() as u64;
let stats = engine.stats();
metrics.edges_created = stats.total_edges;
println!(" → Built graph: {} nodes, {} edges", stats.total_nodes, stats.total_edges);
println!(" → Cross-domain edges: {}", stats.cross_domain_edges);
println!(" → Vector comparisons: {}", stats.total_comparisons);
engine
};
let _coherence_snapshot = {
let _timer = PhaseTimer::new("3: Incremental Coherence Computation");
let coherence_start = Instant::now();
let snapshot = engine.compute_coherence();
metrics.coherence_time_ms = coherence_start.elapsed().as_millis() as u64;
println!(" → Min-cut value: {:.4}", snapshot.mincut_value);
println!(" → Partition sizes: {:?}", snapshot.partition_sizes);
println!(" → Boundary nodes: {}", snapshot.boundary_nodes.len());
println!(" → Avg edge weight: {:.3}", snapshot.avg_edge_weight);
snapshot
};
let patterns = {
let _timer = PhaseTimer::new("4: Pattern Detection with Statistical Significance");
let pattern_start = Instant::now();
let patterns = engine.detect_patterns_with_significance();
metrics.pattern_detection_time_ms = pattern_start.elapsed().as_millis() as u64;
metrics.patterns_discovered = patterns.len();
println!(" → Discovered {} patterns", patterns.len());
patterns
};
{
let _timer = PhaseTimer::new("5: Cross-Domain Correlation Analysis");
analyze_cross_domain_correlations(&engine, &patterns);
}
{
let _timer = PhaseTimer::new("6: Export Results");
export_results(&engine, &patterns)?;
}
metrics.total_time_ms = total_timer.elapsed().as_millis() as u64;
metrics.vectors_per_sec = if metrics.total_time_ms > 0 {
(metrics.vectors_processed as f64) / (metrics.total_time_ms as f64 / 1000.0)
} else {
0.0
};
metrics.edges_per_sec = if metrics.graph_build_time_ms > 0 {
(metrics.edges_created as f64) / (metrics.graph_build_time_ms as f64 / 1000.0)
} else {
0.0
};
print_final_report(&metrics, &patterns);
simd_benchmark();
println!("\n✅ Optimized discovery pipeline complete!");
Ok(())
}
async fn fetch_all_sources_parallel() -> Result<Vec<SemanticVector>> {
println!(" 🌐 Launching parallel data fetch from 3 sources...");
let pubmed = PubMedClient::new(None).expect("Failed to create PubMed client");
let biorxiv = BiorxivClient::new();
let crossref = CrossRefClient::new(Some("discovery@ruvector.io".to_string()));
let (pubmed_result, biorxiv_result, crossref_result) = tokio::join!(
fetch_pubmed(&pubmed, "climate change impact", 80),
fetch_biorxiv_recent(&biorxiv, 14),
fetch_crossref(&crossref, "climate science environmental", 80),
);
let mut all_vectors = Vec::with_capacity(200);
if let Ok(mut vectors) = pubmed_result {
println!(" ✓ PubMed: {} vectors", vectors.len());
all_vectors.append(&mut vectors);
} else {
println!(" ✗ PubMed: {}", pubmed_result.unwrap_err());
}
if let Ok(mut vectors) = biorxiv_result {
println!(" ✓ bioRxiv: {} vectors", vectors.len());
all_vectors.append(&mut vectors);
} else {
println!(" ✗ bioRxiv: {}", biorxiv_result.unwrap_err());
}
if let Ok(mut vectors) = crossref_result {
println!(" ✓ CrossRef: {} vectors", vectors.len());
all_vectors.append(&mut vectors);
} else {
println!(" ✗ CrossRef: {}", crossref_result.unwrap_err());
}
if all_vectors.len() < 100 {
println!(" ⚙ Adding synthetic climate/research data to reach target...");
let synthetic = generate_synthetic_data(200 - all_vectors.len());
println!(" ✓ Synthetic: {} vectors", synthetic.len());
all_vectors.extend(synthetic);
}
Ok(all_vectors)
}
async fn fetch_pubmed(client: &PubMedClient, query: &str, limit: usize) -> Result<Vec<SemanticVector>> {
match client.search_articles(query, limit).await {
Ok(vectors) => Ok(vectors),
Err(e) => {
eprintln!("PubMed error: {}", e);
Ok(vec![]) }
}
}
async fn fetch_biorxiv_recent(client: &BiorxivClient, days: u64) -> Result<Vec<SemanticVector>> {
match client.search_recent(days, 100).await {
Ok(vectors) => Ok(vectors),
Err(e) => {
eprintln!("bioRxiv error: {}", e);
Ok(vec![])
}
}
}
async fn fetch_crossref(client: &CrossRefClient, query: &str, limit: usize) -> Result<Vec<SemanticVector>> {
match client.search_works(query, limit).await {
Ok(vectors) => Ok(vectors),
Err(e) => {
eprintln!("CrossRef error: {}", e);
Ok(vec![])
}
}
}
fn generate_synthetic_data(count: usize) -> Vec<SemanticVector> {
use rand::{Rng, SeedableRng};
use rand::rngs::StdRng;
use chrono::Duration as ChronoDuration;
let mut rng = StdRng::seed_from_u64(42);
let mut vectors = Vec::with_capacity(count);
let climate_topics = [
"temperature_anomaly", "precipitation_patterns", "drought_severity",
"ocean_acidification", "arctic_sea_ice", "atmospheric_co2",
"el_nino_southern_oscillation", "atlantic_meridional_oscillation",
];
let research_topics = [
"climate_modeling", "carbon_sequestration", "renewable_energy",
"climate_adaptation", "ecosystem_resilience", "climate_policy",
];
for i in 0..count {
let is_climate = i % 2 == 0;
let (domain, topic) = if is_climate {
let topic = climate_topics[i % climate_topics.len()];
(Domain::Climate, topic)
} else {
let topic = research_topics[i % research_topics.len()];
(Domain::Research, topic)
};
let embedding = generate_topic_embedding(&mut rng, i, is_climate);
vectors.push(SemanticVector {
id: format!("synthetic_{}_{}", topic, i),
embedding,
domain,
timestamp: Utc::now() - ChronoDuration::days((i as i64 % 365)),
metadata: {
let mut m = HashMap::new();
m.insert("topic".to_string(), topic.to_string());
m.insert("synthetic".to_string(), "true".to_string());
m
},
});
}
vectors
}
fn generate_topic_embedding(rng: &mut impl Rng, seed: usize, is_climate: bool) -> Vec<f32> {
let dim = 128;
let mut embedding = vec![0.0_f32; dim];
for i in 0..dim {
embedding[i] = rng.gen::<f32>() * 0.1;
}
let cluster_start = (seed * 8) % (dim - 12);
for i in 0..12 {
embedding[cluster_start + i] += 0.5 + rng.gen::<f32>() * 0.3;
}
let domain_start = if is_climate { 0 } else { 50 };
for i in 0..10 {
embedding[domain_start + i] += 0.4;
}
if rng.gen::<f32>() < 0.3 {
let bridge_start = if is_climate { 50 } else { 0 };
for i in 0..8 {
embedding[bridge_start + i] += 0.25;
}
}
let norm: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
if norm > 0.0 {
for x in &mut embedding {
*x /= norm;
}
}
embedding
}
fn analyze_cross_domain_correlations(
engine: &OptimizedDiscoveryEngine,
patterns: &[SignificantPattern],
) {
println!("\n 📊 Cross-Domain Correlation Analysis:");
println!(" ═══════════════════════════════════════");
let domains = [Domain::Climate, Domain::Finance, Domain::Research];
let mut domain_coherence = HashMap::new();
for &domain in &domains {
if let Some(coherence) = engine.domain_coherence(domain) {
domain_coherence.insert(domain, coherence);
println!(" {:?}: coherence = {:.4}", domain, coherence);
}
}
let cross_domain_patterns: Vec<_> = patterns.iter()
.filter(|p| !p.pattern.cross_domain_links.is_empty())
.collect();
println!("\n 🔗 Cross-Domain Links: {}", cross_domain_patterns.len());
for (i, pattern) in cross_domain_patterns.iter().take(5).enumerate() {
for link in &pattern.pattern.cross_domain_links {
println!(" {}. {:?} → {:?} (strength: {:.3})",
i + 1,
link.source_domain,
link.target_domain,
link.link_strength
);
}
}
let significant_patterns: Vec<_> = patterns.iter()
.filter(|p| p.is_significant)
.collect();
println!("\n 📈 Statistical Significance:");
println!(" Total patterns: {}", patterns.len());
println!(" Significant (p < 0.05): {}", significant_patterns.len());
if !significant_patterns.is_empty() {
let avg_effect_size: f64 = significant_patterns.iter()
.map(|p| p.effect_size.abs())
.sum::<f64>() / significant_patterns.len() as f64;
println!(" Avg effect size: {:.3}", avg_effect_size);
}
}
fn export_results(
engine: &OptimizedDiscoveryEngine,
patterns: &[SignificantPattern],
) -> Result<()> {
let output_dir = "/home/user/ruvector/examples/data/framework/output";
std::fs::create_dir_all(output_dir)
.map_err(|e| FrameworkError::Config(format!("Failed to create output dir: {}", e)))?;
let patterns_file = format!("{}/optimized_patterns.csv", output_dir);
export_patterns_with_evidence_csv(patterns, &patterns_file)?;
println!(" ✓ Patterns exported to: {}", patterns_file);
let hypothesis_file = format!("{}/hypothesis_report.txt", output_dir);
export_hypothesis_report(patterns, &hypothesis_file)?;
println!(" ✓ Hypothesis report: {}", hypothesis_file);
Ok(())
}
fn export_hypothesis_report(patterns: &[SignificantPattern], path: &str) -> Result<()> {
use std::io::Write;
let mut file = std::fs::File::create(path)
.map_err(|e| FrameworkError::Config(format!("Failed to create file: {}", e)))?;
writeln!(file, "RuVector Discovery - Hypothesis Report")
.map_err(|e| FrameworkError::Config(format!("Write error: {}", e)))?;
writeln!(file, "Generated: {}", Utc::now())
.map_err(|e| FrameworkError::Config(format!("Write error: {}", e)))?;
writeln!(file, "═══════════════════════════════════════\n")
.map_err(|e| FrameworkError::Config(format!("Write error: {}", e)))?;
let mut by_type: HashMap<String, Vec<&SignificantPattern>> = HashMap::new();
for pattern in patterns {
let type_name = format!("{:?}", pattern.pattern.pattern_type);
by_type.entry(type_name).or_default().push(pattern);
}
for (pattern_type, group) in by_type.iter() {
writeln!(file, "\n## {} ({} patterns)", pattern_type, group.len())
.map_err(|e| FrameworkError::Config(format!("Write error: {}", e)))?;
for (i, pattern) in group.iter().take(10).enumerate() {
writeln!(file, "\n{}. {}", i + 1, pattern.pattern.description)
.map_err(|e| FrameworkError::Config(format!("Write error: {}", e)))?;
writeln!(file, " Confidence: {:.2}%", pattern.pattern.confidence * 100.0)
.map_err(|e| FrameworkError::Config(format!("Write error: {}", e)))?;
writeln!(file, " P-value: {:.4}", pattern.p_value)
.map_err(|e| FrameworkError::Config(format!("Write error: {}", e)))?;
writeln!(file, " Effect size: {:.3}", pattern.effect_size)
.map_err(|e| FrameworkError::Config(format!("Write error: {}", e)))?;
writeln!(file, " Significant: {}", pattern.is_significant)
.map_err(|e| FrameworkError::Config(format!("Write error: {}", e)))?;
if !pattern.pattern.evidence.is_empty() {
writeln!(file, " Evidence:")
.map_err(|e| FrameworkError::Config(format!("Write error: {}", e)))?;
for evidence in &pattern.pattern.evidence {
writeln!(file, " - {}: {:.3}", evidence.evidence_type, evidence.value)
.map_err(|e| FrameworkError::Config(format!("Write error: {}", e)))?;
}
}
}
}
Ok(())
}
fn print_final_report(metrics: &RunnerMetrics, patterns: &[SignificantPattern]) {
println!("\n╔══════════════════════════════════════════════════════════════╗");
println!("║ Performance Report ║");
println!("╚══════════════════════════════════════════════════════════════╝");
println!("\n📊 Timing Breakdown:");
println!(" ├─ Data Fetching: {:>6} ms", metrics.fetch_time_ms);
println!(" ├─ Graph Building: {:>6} ms", metrics.graph_build_time_ms);
println!(" ├─ Coherence Compute: {:>6} ms", metrics.coherence_time_ms);
println!(" ├─ Pattern Detection: {:>6} ms", metrics.pattern_detection_time_ms);
println!(" └─ Total: {:>6} ms ({:.2}s)",
metrics.total_time_ms, metrics.total_time_ms as f64 / 1000.0);
println!("\n⚡ Throughput Metrics:");
println!(" ├─ Vectors processed: {:>6}", metrics.vectors_processed);
println!(" ├─ Vectors/sec: {:>6.0}", metrics.vectors_per_sec);
println!(" ├─ Edges created: {:>6}", metrics.edges_created);
println!(" └─ Edges/sec: {:>6.0}", metrics.edges_per_sec);
println!("\n🔍 Discovery Results:");
println!(" ├─ Total patterns: {:>6}", metrics.patterns_discovered);
let significant = patterns.iter().filter(|p| p.is_significant).count();
println!(" ├─ Significant: {:>6} ({:.1}%)",
significant,
if metrics.patterns_discovered > 0 {
significant as f64 / metrics.patterns_discovered as f64 * 100.0
} else {
0.0
}
);
let cross_domain = patterns.iter()
.filter(|p| !p.pattern.cross_domain_links.is_empty())
.count();
println!(" └─ Cross-domain links: {:>6}", cross_domain);
println!("\n🎯 Target Metrics Achievement:");
let target_vectors_time = 5000; let vectors_ok = if metrics.vectors_processed >= 1000 {
metrics.total_time_ms <= target_vectors_time
} else {
false
};
println!(" ├─ 1000+ vectors in <5s: {} {}",
if vectors_ok { "✓" } else { "✗" },
if vectors_ok {
format!("({} vectors in {:.2}s)", metrics.vectors_processed, metrics.total_time_ms as f64 / 1000.0)
} else {
format!("({} vectors)", metrics.vectors_processed)
}
);
let target_edges_time = 2000; let edges_ok = if metrics.edges_created >= 100000 {
metrics.graph_build_time_ms <= target_edges_time
} else {
metrics.edges_created >= 1000 };
println!(" └─ Fast edge computation: {} ({} edges in {:.2}s)",
if edges_ok { "✓" } else { "✗" },
metrics.edges_created,
metrics.graph_build_time_ms as f64 / 1000.0
);
}
fn simd_benchmark() {
println!("\n╔══════════════════════════════════════════════════════════════╗");
println!("║ SIMD Performance Benchmark ║");
println!("╚══════════════════════════════════════════════════════════════╝");
use rand::{Rng, SeedableRng};
use rand::rngs::StdRng;
let mut rng = StdRng::seed_from_u64(42);
let dim = 384;
let num_pairs = 10000;
let mut vectors_a = Vec::with_capacity(num_pairs);
let mut vectors_b = Vec::with_capacity(num_pairs);
for _ in 0..num_pairs {
let a: Vec<f32> = (0..dim).map(|_| rng.gen::<f32>()).collect();
let b: Vec<f32> = (0..dim).map(|_| rng.gen::<f32>()).collect();
vectors_a.push(a);
vectors_b.push(b);
}
let simd_start = Instant::now();
let mut simd_sum = 0.0_f32;
for i in 0..num_pairs {
simd_sum += simd_cosine_similarity(&vectors_a[i], &vectors_b[i]);
}
let simd_time = simd_start.elapsed();
println!("\n SIMD-accelerated cosine similarity:");
println!(" ├─ Comparisons: {}", num_pairs);
println!(" ├─ Time: {:.2} ms", simd_time.as_millis());
println!(" ├─ Throughput: {:.0} comparisons/sec",
num_pairs as f64 / simd_time.as_secs_f64());
println!(" └─ Checksum: {:.6}", simd_sum);
println!("\n ✓ Using SIMD-optimized implementation");
println!(" (Falls back to chunked processing on non-x86_64)");
}