1use crate::coordinator::Coordinator;
8use crate::hashing::ConsistentHashRing;
9use crate::query::{DistributedHybridConfig, DistributedQueryEngine};
10use crate::runner::{DistributedRunner, RunnerConfig};
11use crate::shard::ShardedColony;
12use crate::types::*;
13use phago_core::types::Position;
14use phago_runtime::colony::ColonyConfig;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::RwLock;
18
19#[derive(Debug, Clone)]
21pub struct BenchConfig {
22 pub num_shards: u32,
24 pub num_documents: usize,
26 pub num_ticks: u64,
28 pub num_queries: usize,
30 pub sample_queries: Vec<String>,
32}
33
34impl Default for BenchConfig {
35 fn default() -> Self {
36 Self {
37 num_shards: 3,
38 num_documents: 100,
39 num_ticks: 20,
40 num_queries: 50,
41 sample_queries: vec![
42 "cell membrane".to_string(),
43 "protein transport".to_string(),
44 "molecular biology".to_string(),
45 ],
46 }
47 }
48}
49
50impl BenchConfig {
51 pub fn new() -> Self {
53 Self::default()
54 }
55
56 pub fn with_shards(mut self, num_shards: u32) -> Self {
58 self.num_shards = num_shards;
59 self
60 }
61
62 pub fn with_documents(mut self, num_documents: usize) -> Self {
64 self.num_documents = num_documents;
65 self
66 }
67
68 pub fn with_ticks(mut self, num_ticks: u64) -> Self {
70 self.num_ticks = num_ticks;
71 self
72 }
73
74 pub fn with_queries(mut self, num_queries: usize) -> Self {
76 self.num_queries = num_queries;
77 self
78 }
79
80 pub fn with_sample_queries(mut self, queries: Vec<String>) -> Self {
82 self.sample_queries = queries;
83 self
84 }
85}
86
87#[derive(Debug, Clone)]
89pub struct BenchResults {
90 pub setup_time: Duration,
92 pub ingest_time: Duration,
94 pub tick_time: Duration,
96 pub query_time: Duration,
98 pub total_time: Duration,
100 pub docs_per_second: f64,
102 pub ticks_per_second: f64,
104 pub queries_per_second: f64,
106 pub total_nodes: usize,
108 pub total_edges: usize,
110 pub num_shards: u32,
112 pub num_documents: usize,
114 pub num_ticks: u64,
116}
117
118impl BenchResults {
119 pub fn print_summary(&self) {
121 println!("\n=== Distributed Colony Benchmark Results ===\n");
122 println!("Configuration:");
123 println!(
124 " Shards: {}, Documents: {}, Ticks: {}",
125 self.num_shards, self.num_documents, self.num_ticks
126 );
127 println!();
128 println!("Timing:");
129 println!(" Setup time: {:?}", self.setup_time);
130 println!(
131 " Ingest time: {:?} ({:.1} docs/sec)",
132 self.ingest_time, self.docs_per_second
133 );
134 println!(
135 " Tick time: {:?} ({:.1} ticks/sec)",
136 self.tick_time, self.ticks_per_second
137 );
138 println!(
139 " Query time: {:?} ({:.1} queries/sec)",
140 self.query_time, self.queries_per_second
141 );
142 println!(" Total time: {:?}", self.total_time);
143 println!();
144 println!(
145 "Graph size: {} nodes, {} edges",
146 self.total_nodes, self.total_edges
147 );
148 }
149
150 pub fn to_csv_row(&self) -> String {
152 format!(
153 "{},{},{},{:.2},{:.2},{:.2},{},{},{}",
154 self.num_shards,
155 self.num_documents,
156 self.num_ticks,
157 self.docs_per_second,
158 self.ticks_per_second,
159 self.queries_per_second,
160 self.total_nodes,
161 self.total_edges,
162 self.total_time.as_millis()
163 )
164 }
165
166 pub fn csv_header() -> &'static str {
168 "shards,documents,ticks,docs_per_sec,ticks_per_sec,queries_per_sec,nodes,edges,total_time_ms"
169 }
170}
171
172pub fn create_bench_cluster(
176 num_shards: u32,
177) -> (Arc<Coordinator>, Vec<Arc<RwLock<ShardedColony>>>) {
178 let coordinator = Arc::new(Coordinator::new(num_shards));
179 let hash_ring = Arc::new(RwLock::new(ConsistentHashRing::new(num_shards)));
180
181 let shards: Vec<_> = (0..num_shards)
182 .map(|i| {
183 Arc::new(RwLock::new(ShardedColony::new(
184 ShardId::new(i),
185 ColonyConfig::default(),
186 hash_ring.clone(),
187 )))
188 })
189 .collect();
190
191 (coordinator, shards)
192}
193
194pub fn generate_documents(count: usize) -> Vec<(String, String)> {
198 let topics = [
199 (
200 "Cell Biology",
201 "cell membrane protein transport signaling pathway organelle cytoplasm",
202 ),
203 (
204 "Molecular Biology",
205 "DNA RNA transcription translation gene expression nucleotide sequence",
206 ),
207 (
208 "Biochemistry",
209 "enzyme substrate reaction kinetics metabolism catalysis activation",
210 ),
211 (
212 "Genetics",
213 "chromosome gene mutation inheritance phenotype genotype allele",
214 ),
215 (
216 "Neuroscience",
217 "neuron synapse action potential neurotransmitter receptor axon dendrite",
218 ),
219 (
220 "Immunology",
221 "antibody antigen immune response lymphocyte cytokine inflammation",
222 ),
223 (
224 "Microbiology",
225 "bacteria virus pathogen infection microbiome antimicrobial resistance",
226 ),
227 (
228 "Ecology",
229 "ecosystem biodiversity species population habitat conservation environment",
230 ),
231 ];
232
233 (0..count)
234 .map(|i| {
235 let (title, base_content) = topics[i % topics.len()];
236 let title = format!("{} Document {}", title, i);
237 let content = format!(
238 "{} - variation {} with unique content about scientific concepts and research findings",
239 base_content, i
240 );
241 (title, content)
242 })
243 .collect()
244}
245
246pub async fn run_benchmark(config: BenchConfig) -> BenchResults {
251 let total_start = Instant::now();
252
253 let setup_start = Instant::now();
255 let (coordinator, shards) = create_bench_cluster(config.num_shards);
256
257 for (i, _shard) in shards.iter().enumerate() {
259 let info = ShardInfo::new(ShardId::new(i as u32), format!("127.0.0.1:{}", 8080 + i));
260 let _ = coordinator.register_shard(info).await;
261 }
262 let setup_time = setup_start.elapsed();
263
264 let ingest_start = Instant::now();
266 let documents = generate_documents(config.num_documents);
267
268 for (i, (title, content)) in documents.iter().enumerate() {
269 let doc_id = phago_core::types::DocumentId::from_seed(i as u64);
270 let shard_id = coordinator.route_document(&doc_id).await;
271
272 for shard in &shards {
274 let mut s = shard.write().await;
275 if s.shard_id() == shard_id {
276 s.ingest_document_direct(
277 title,
278 content,
279 Position::new(i as f64 % 100.0, (i / 100) as f64),
280 );
281 break;
282 }
283 }
284 }
285 let ingest_time = ingest_start.elapsed();
286
287 let tick_start = Instant::now();
289 let runner = DistributedRunner::new(
290 coordinator.clone(),
291 shards.clone(),
292 RunnerConfig {
293 resolve_ghosts: false, ..Default::default()
295 },
296 );
297 let _ = runner.run(config.num_ticks).await;
298 let tick_time = tick_start.elapsed();
299
300 let query_start = Instant::now();
302 let engine = DistributedQueryEngine::new(DistributedHybridConfig::default());
303 let total_queries = config.num_queries * config.sample_queries.len();
304
305 for _ in 0..config.num_queries {
306 for query in &config.sample_queries {
307 let guards: Vec<_> =
309 futures::future::join_all(shards.iter().map(|s| async { s.read().await })).await;
310
311 let refs: Vec<&ShardedColony> = guards.iter().map(|g| &**g).collect();
313 let _ = engine.distributed_query(&refs, query);
314 }
315 }
316 let query_time = query_start.elapsed();
317
318 let mut total_nodes = 0;
320 let mut total_edges = 0;
321 for shard in &shards {
322 let s = shard.read().await;
323 let stats = s.stats();
324 total_nodes += stats.graph_nodes;
325 total_edges += stats.graph_edges;
326 }
327
328 let total_time = total_start.elapsed();
329
330 BenchResults {
331 setup_time,
332 ingest_time,
333 tick_time,
334 query_time,
335 total_time,
336 docs_per_second: config.num_documents as f64 / ingest_time.as_secs_f64(),
337 ticks_per_second: config.num_ticks as f64 / tick_time.as_secs_f64(),
338 queries_per_second: total_queries as f64 / query_time.as_secs_f64(),
339 total_nodes,
340 total_edges,
341 num_shards: config.num_shards,
342 num_documents: config.num_documents,
343 num_ticks: config.num_ticks,
344 }
345}
346
347pub async fn run_quick_benchmark() -> BenchResults {
351 run_benchmark(BenchConfig {
352 num_shards: 2,
353 num_documents: 20,
354 num_ticks: 10,
355 num_queries: 5,
356 sample_queries: vec!["cell".to_string(), "protein".to_string()],
357 })
358 .await
359}
360
361pub async fn compare_single_vs_distributed(num_documents: usize, num_ticks: u64) {
365 println!("\n=== Single-Node vs Distributed Comparison ===\n");
366
367 let base_config = BenchConfig {
368 num_documents,
369 num_ticks,
370 num_queries: 20,
371 ..Default::default()
372 };
373
374 println!("Running single-node benchmark...");
376 let single_result = run_benchmark(BenchConfig {
377 num_shards: 1,
378 ..base_config.clone()
379 })
380 .await;
381
382 println!("Running 3-shard distributed benchmark...");
384 let dist_3_result = run_benchmark(BenchConfig {
385 num_shards: 3,
386 ..base_config.clone()
387 })
388 .await;
389
390 println!("Running 5-shard distributed benchmark...");
392 let dist_5_result = run_benchmark(BenchConfig {
393 num_shards: 5,
394 ..base_config.clone()
395 })
396 .await;
397
398 println!("\n| Shards | Ingest (docs/s) | Ticks/s | Queries/s | Total Time |");
399 println!("|--------|-----------------|---------|-----------|------------|");
400 println!(
401 "| 1 | {:>15.1} | {:>7.1} | {:>9.1} | {:>10?} |",
402 single_result.docs_per_second,
403 single_result.ticks_per_second,
404 single_result.queries_per_second,
405 single_result.total_time
406 );
407 println!(
408 "| 3 | {:>15.1} | {:>7.1} | {:>9.1} | {:>10?} |",
409 dist_3_result.docs_per_second,
410 dist_3_result.ticks_per_second,
411 dist_3_result.queries_per_second,
412 dist_3_result.total_time
413 );
414 println!(
415 "| 5 | {:>15.1} | {:>7.1} | {:>9.1} | {:>10?} |",
416 dist_5_result.docs_per_second,
417 dist_5_result.ticks_per_second,
418 dist_5_result.queries_per_second,
419 dist_5_result.total_time
420 );
421
422 println!("\n| Shards | Nodes | Edges |");
423 println!("|--------|--------|--------|");
424 println!(
425 "| 1 | {:>6} | {:>6} |",
426 single_result.total_nodes, single_result.total_edges
427 );
428 println!(
429 "| 3 | {:>6} | {:>6} |",
430 dist_3_result.total_nodes, dist_3_result.total_edges
431 );
432 println!(
433 "| 5 | {:>6} | {:>6} |",
434 dist_5_result.total_nodes, dist_5_result.total_edges
435 );
436}
437
438pub async fn scaling_benchmark(num_documents: usize, num_ticks: u64) -> Vec<BenchResults> {
442 let shard_counts = [1, 2, 4, 8];
443 let mut results = Vec::new();
444
445 for &num_shards in &shard_counts {
446 println!("Running benchmark with {} shard(s)...", num_shards);
447 let result = run_benchmark(BenchConfig {
448 num_shards,
449 num_documents,
450 num_ticks,
451 num_queries: 20,
452 ..Default::default()
453 })
454 .await;
455 results.push(result);
456 }
457
458 results
459}
460
461pub fn print_scaling_results(results: &[BenchResults]) {
463 println!("\n=== Scaling Benchmark Results ===\n");
464 println!("{}", BenchResults::csv_header());
465 for result in results {
466 println!("{}", result.to_csv_row());
467 }
468}
469
470#[cfg(test)]
471mod tests {
472 use super::*;
473
474 #[tokio::test]
475 async fn test_create_bench_cluster() {
476 let (coordinator, shards) = create_bench_cluster(3);
477 assert_eq!(coordinator.shard_count().await, 3);
478 assert_eq!(shards.len(), 3);
479 }
480
481 #[test]
482 fn test_generate_documents() {
483 let docs = generate_documents(10);
484 assert_eq!(docs.len(), 10);
485 assert!(docs[0].0.contains("Document 0"));
486 assert!(!docs[0].1.is_empty());
487 }
488
489 #[test]
490 fn test_generate_documents_cycles_topics() {
491 let docs = generate_documents(20);
492 assert!(docs[0].0.contains("Cell Biology"));
494 assert!(docs[8].0.contains("Cell Biology")); }
496
497 #[tokio::test]
498 async fn test_small_benchmark() {
499 let result = run_benchmark(BenchConfig {
500 num_shards: 2,
501 num_documents: 10,
502 num_ticks: 5,
503 num_queries: 5,
504 sample_queries: vec!["cell".to_string()],
505 })
506 .await;
507
508 assert!(result.docs_per_second > 0.0);
509 assert!(result.ticks_per_second > 0.0);
510 assert_eq!(result.num_shards, 2);
511 assert_eq!(result.num_documents, 10);
512 assert_eq!(result.num_ticks, 5);
513 }
514
515 #[tokio::test]
516 async fn test_quick_benchmark() {
517 let result = run_quick_benchmark().await;
518
519 assert!(result.docs_per_second > 0.0);
520 assert!(result.total_time.as_millis() > 0);
521 }
522
523 #[test]
524 fn test_bench_config_builder() {
525 let config = BenchConfig::new()
526 .with_shards(5)
527 .with_documents(200)
528 .with_ticks(50)
529 .with_queries(30)
530 .with_sample_queries(vec!["test".to_string()]);
531
532 assert_eq!(config.num_shards, 5);
533 assert_eq!(config.num_documents, 200);
534 assert_eq!(config.num_ticks, 50);
535 assert_eq!(config.num_queries, 30);
536 assert_eq!(config.sample_queries, vec!["test".to_string()]);
537 }
538
539 #[test]
540 fn test_bench_results_csv() {
541 let results = BenchResults {
542 setup_time: Duration::from_millis(10),
543 ingest_time: Duration::from_millis(100),
544 tick_time: Duration::from_millis(200),
545 query_time: Duration::from_millis(50),
546 total_time: Duration::from_millis(360),
547 docs_per_second: 1000.0,
548 ticks_per_second: 100.0,
549 queries_per_second: 500.0,
550 total_nodes: 50,
551 total_edges: 100,
552 num_shards: 3,
553 num_documents: 100,
554 num_ticks: 20,
555 };
556
557 let csv = results.to_csv_row();
558 assert!(csv.contains("3,100,20"));
559 assert!(csv.contains("1000.00"));
560 }
561
562 #[tokio::test]
563 async fn test_benchmark_shard_distribution() {
564 let config = BenchConfig {
565 num_shards: 3,
566 num_documents: 30,
567 num_ticks: 5,
568 num_queries: 3,
569 sample_queries: vec!["cell".to_string()],
570 };
571
572 let result = run_benchmark(config).await;
573
574 assert!(result.num_documents == 30);
576 assert!(result.total_time.as_millis() > 0);
578 }
579}