use omicsx::futures::DistributedCoordinator;
use omicsx::futures::AlignmentTask;
use omicsx::protein::Protein;
use omicsx::scoring::ScoringMatrix;
use std::thread;
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== OMICSX Distributed Multi-Node Alignment Demo ===\n");
let coordinator = DistributedCoordinator::new();
println!("✓ Created distributed coordinator\n");
let node_ids: Vec<_> = (0..4)
.map(|i| {
let node_id = coordinator.register_node().unwrap();
println!(" Registered Node {}: {:?}", i, node_id);
node_id
})
.collect();
println!("\n{} nodes registered\n", node_ids.len());
let seq1 = "MTEITAAMAIDQEAIDAAAGHDKLSLVGANKEDIAAANDLGAWILGHPDNHEAVGVQVKVKALPDAQFEVVHSLAKWKRQTLG";
let seq2 = "MGQVGRQLGHHDLSAGNEEPGAADLSAGCQVGRQLGHHDLSAGNEEPGAADLSAGAHGDVPPEDQVGQQVGHDLSVGGDHLLEHSQGAQ";
let tasks: Vec<AlignmentTask> = (0..20)
.map(|i| {
let query_seq = if i % 2 == 0 { seq1 } else { seq2 };
let subject_seq = if i % 3 == 0 { seq2 } else { seq1 };
AlignmentTask {
task_id: i,
query: Protein::from_string(query_seq)
.unwrap()
.with_id(format!("query_{}", i)),
subject: Protein::from_string(subject_seq)
.unwrap()
.with_id(format!("subject_{}", i)),
matrix: ScoringMatrix::default(),
}
})
.collect();
let submitted = coordinator.submit_batch(tasks)?;
println!("Submitted {} alignment tasks\n", submitted);
let initial_stats = coordinator.get_stats()?;
println!("Initial Distribution Stats:");
println!(" Total nodes: {}", initial_stats.total_nodes);
println!(" Pending tasks: {}\n", initial_stats.pending_tasks);
println!("Simulating distributed work processing...\n");
for (idx, node_id) in node_ids.iter().enumerate() {
println!("Node {}: Starting work...", idx);
loop {
match coordinator.get_task(*node_id) {
Ok(Some(task)) => {
thread::sleep(Duration::from_millis(10));
let result = omicsx::futures::AlignmentResultRecord {
task_id: task.task_id,
node_id: *node_id,
score: 100 + (task.task_id as i32 * 5),
identity: 0.85 + (task.task_id as f32 * 0.01),
gaps: task.task_id,
query_coverage: 0.95,
};
coordinator.record_result(result)?;
println!(" Node {}: Completed task {}", idx, task.task_id);
}
Ok(None) => {
println!(" Node {}: No more tasks\n", idx);
break;
}
Err(e) => {
eprintln!(" Node {}: Error: {}\n", idx, e);
break;
}
}
}
}
println!("Distribution Complete!\n");
let final_stats = coordinator.get_stats()?;
println!("Final Distribution Stats:");
println!(" Total nodes: {}", final_stats.total_nodes);
println!(" Online nodes: {}", final_stats.online_nodes);
println!(" Completed tasks: {}", final_stats.completed_tasks);
println!(" Pending tasks: {}", final_stats.pending_tasks);
println!(" Total compute time: {} ms", final_stats.total_compute_time_ms);
println!(" Average time per task: {} ms\n", final_stats.average_time_per_task_ms);
println!("Node Statistics:");
let node_stats = coordinator.get_node_stats()?;
for stat in node_stats {
println!(" Node {:?}:", stat.node_id);
println!(" Tasks: {}", stat.task_count);
println!(" Completed: {}", stat.completed_tasks);
println!(" Status: {:?}", stat.status);
}
println!("\n// Key Features Demonstrated:");
println!("// 1. Multi-node registration and management");
println!("// 2. Batch task submission");
println!("// 3. Work-stealing load balancing");
println!("// 4. Result aggregation");
println!("// 5. Statistical tracking and reporting");
println!("\n=== Demo Complete ===");
Ok(())
}