Skip to main content

phago_distributed/
bench.rs

1//! Benchmarking utilities for distributed colony.
2//!
3//! This module provides tools for measuring the performance of the distributed
4//! colony system, including document ingestion, tick execution, and query
5//! performance across multiple shards.
6
7use crate::coordinator::Coordinator;
8use crate::hashing::ConsistentHashRing;
9use crate::query::{DistributedHybridConfig, DistributedQueryEngine};
10use crate::runner::{DistributedRunner, RunnerConfig};
11use crate::shard::ShardedColony;
12use crate::types::*;
13use phago_core::types::Position;
14use phago_runtime::colony::ColonyConfig;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17use tokio::sync::RwLock;
18
19/// Configuration for benchmark runs.
20#[derive(Debug, Clone)]
21pub struct BenchConfig {
22    /// Number of shards.
23    pub num_shards: u32,
24    /// Number of documents to ingest.
25    pub num_documents: usize,
26    /// Number of ticks to run.
27    pub num_ticks: u64,
28    /// Number of queries to execute.
29    pub num_queries: usize,
30    /// Sample queries to run.
31    pub sample_queries: Vec<String>,
32}
33
34impl Default for BenchConfig {
35    fn default() -> Self {
36        Self {
37            num_shards: 3,
38            num_documents: 100,
39            num_ticks: 20,
40            num_queries: 50,
41            sample_queries: vec![
42                "cell membrane".to_string(),
43                "protein transport".to_string(),
44                "molecular biology".to_string(),
45            ],
46        }
47    }
48}
49
50impl BenchConfig {
51    /// Create a new benchmark configuration.
52    pub fn new() -> Self {
53        Self::default()
54    }
55
56    /// Set the number of shards.
57    pub fn with_shards(mut self, num_shards: u32) -> Self {
58        self.num_shards = num_shards;
59        self
60    }
61
62    /// Set the number of documents.
63    pub fn with_documents(mut self, num_documents: usize) -> Self {
64        self.num_documents = num_documents;
65        self
66    }
67
68    /// Set the number of ticks.
69    pub fn with_ticks(mut self, num_ticks: u64) -> Self {
70        self.num_ticks = num_ticks;
71        self
72    }
73
74    /// Set the number of queries.
75    pub fn with_queries(mut self, num_queries: usize) -> Self {
76        self.num_queries = num_queries;
77        self
78    }
79
80    /// Set sample queries.
81    pub fn with_sample_queries(mut self, queries: Vec<String>) -> Self {
82        self.sample_queries = queries;
83        self
84    }
85}
86
87/// Results from a benchmark run.
88#[derive(Debug, Clone)]
89pub struct BenchResults {
90    /// Time to set up the cluster.
91    pub setup_time: Duration,
92    /// Time to ingest all documents.
93    pub ingest_time: Duration,
94    /// Time to run all ticks.
95    pub tick_time: Duration,
96    /// Time to run all queries.
97    pub query_time: Duration,
98    /// Total time.
99    pub total_time: Duration,
100    /// Documents per second during ingestion.
101    pub docs_per_second: f64,
102    /// Ticks per second.
103    pub ticks_per_second: f64,
104    /// Queries per second.
105    pub queries_per_second: f64,
106    /// Total nodes across all shards.
107    pub total_nodes: usize,
108    /// Total edges across all shards.
109    pub total_edges: usize,
110    /// Number of shards used.
111    pub num_shards: u32,
112    /// Number of documents ingested.
113    pub num_documents: usize,
114    /// Number of ticks run.
115    pub num_ticks: u64,
116}
117
118impl BenchResults {
119    /// Print a formatted summary.
120    pub fn print_summary(&self) {
121        println!("\n=== Distributed Colony Benchmark Results ===\n");
122        println!("Configuration:");
123        println!(
124            "  Shards: {}, Documents: {}, Ticks: {}",
125            self.num_shards, self.num_documents, self.num_ticks
126        );
127        println!();
128        println!("Timing:");
129        println!("  Setup time:    {:?}", self.setup_time);
130        println!(
131            "  Ingest time:   {:?} ({:.1} docs/sec)",
132            self.ingest_time, self.docs_per_second
133        );
134        println!(
135            "  Tick time:     {:?} ({:.1} ticks/sec)",
136            self.tick_time, self.ticks_per_second
137        );
138        println!(
139            "  Query time:    {:?} ({:.1} queries/sec)",
140            self.query_time, self.queries_per_second
141        );
142        println!("  Total time:    {:?}", self.total_time);
143        println!();
144        println!(
145            "Graph size: {} nodes, {} edges",
146            self.total_nodes, self.total_edges
147        );
148    }
149
150    /// Return results as a CSV row.
151    pub fn to_csv_row(&self) -> String {
152        format!(
153            "{},{},{},{:.2},{:.2},{:.2},{},{},{}",
154            self.num_shards,
155            self.num_documents,
156            self.num_ticks,
157            self.docs_per_second,
158            self.ticks_per_second,
159            self.queries_per_second,
160            self.total_nodes,
161            self.total_edges,
162            self.total_time.as_millis()
163        )
164    }
165
166    /// Return CSV header.
167    pub fn csv_header() -> &'static str {
168        "shards,documents,ticks,docs_per_sec,ticks_per_sec,queries_per_sec,nodes,edges,total_time_ms"
169    }
170}
171
172/// Create a test cluster for benchmarking.
173///
174/// Returns the coordinator and a vector of sharded colonies.
175pub fn create_bench_cluster(
176    num_shards: u32,
177) -> (Arc<Coordinator>, Vec<Arc<RwLock<ShardedColony>>>) {
178    let coordinator = Arc::new(Coordinator::new(num_shards));
179    let hash_ring = Arc::new(RwLock::new(ConsistentHashRing::new(num_shards)));
180
181    let shards: Vec<_> = (0..num_shards)
182        .map(|i| {
183            Arc::new(RwLock::new(ShardedColony::new(
184                ShardId::new(i),
185                ColonyConfig::default(),
186                hash_ring.clone(),
187            )))
188        })
189        .collect();
190
191    (coordinator, shards)
192}
193
194/// Generate sample documents for benchmarking.
195///
196/// Creates a variety of documents about different scientific topics.
197pub fn generate_documents(count: usize) -> Vec<(String, String)> {
198    let topics = [
199        (
200            "Cell Biology",
201            "cell membrane protein transport signaling pathway organelle cytoplasm",
202        ),
203        (
204            "Molecular Biology",
205            "DNA RNA transcription translation gene expression nucleotide sequence",
206        ),
207        (
208            "Biochemistry",
209            "enzyme substrate reaction kinetics metabolism catalysis activation",
210        ),
211        (
212            "Genetics",
213            "chromosome gene mutation inheritance phenotype genotype allele",
214        ),
215        (
216            "Neuroscience",
217            "neuron synapse action potential neurotransmitter receptor axon dendrite",
218        ),
219        (
220            "Immunology",
221            "antibody antigen immune response lymphocyte cytokine inflammation",
222        ),
223        (
224            "Microbiology",
225            "bacteria virus pathogen infection microbiome antimicrobial resistance",
226        ),
227        (
228            "Ecology",
229            "ecosystem biodiversity species population habitat conservation environment",
230        ),
231    ];
232
233    (0..count)
234        .map(|i| {
235            let (title, base_content) = topics[i % topics.len()];
236            let title = format!("{} Document {}", title, i);
237            let content = format!(
238                "{} - variation {} with unique content about scientific concepts and research findings",
239                base_content, i
240            );
241            (title, content)
242        })
243        .collect()
244}
245
246/// Run the full benchmark suite.
247///
248/// This is an async function that creates a distributed cluster, ingests
249/// documents, runs ticks, and executes queries to measure performance.
250pub async fn run_benchmark(config: BenchConfig) -> BenchResults {
251    let total_start = Instant::now();
252
253    // Setup
254    let setup_start = Instant::now();
255    let (coordinator, shards) = create_bench_cluster(config.num_shards);
256
257    // Register shards with coordinator
258    for (i, _shard) in shards.iter().enumerate() {
259        let info = ShardInfo::new(ShardId::new(i as u32), format!("127.0.0.1:{}", 8080 + i));
260        let _ = coordinator.register_shard(info).await;
261    }
262    let setup_time = setup_start.elapsed();
263
264    // Ingest documents
265    let ingest_start = Instant::now();
266    let documents = generate_documents(config.num_documents);
267
268    for (i, (title, content)) in documents.iter().enumerate() {
269        let doc_id = phago_core::types::DocumentId::from_seed(i as u64);
270        let shard_id = coordinator.route_document(&doc_id).await;
271
272        // Find the correct shard and ingest
273        for shard in &shards {
274            let mut s = shard.write().await;
275            if s.shard_id() == shard_id {
276                s.ingest_document_direct(
277                    title,
278                    content,
279                    Position::new(i as f64 % 100.0, (i / 100) as f64),
280                );
281                break;
282            }
283        }
284    }
285    let ingest_time = ingest_start.elapsed();
286
287    // Run ticks using DistributedRunner for proper phase synchronization
288    let tick_start = Instant::now();
289    let runner = DistributedRunner::new(
290        coordinator.clone(),
291        shards.clone(),
292        RunnerConfig {
293            resolve_ghosts: false, // Skip ghost resolution for benchmarking
294            ..Default::default()
295        },
296    );
297    let _ = runner.run(config.num_ticks).await;
298    let tick_time = tick_start.elapsed();
299
300    // Run queries
301    let query_start = Instant::now();
302    let engine = DistributedQueryEngine::new(DistributedHybridConfig::default());
303    let total_queries = config.num_queries * config.sample_queries.len();
304
305    for _ in 0..config.num_queries {
306        for query in &config.sample_queries {
307            // Collect shard guards
308            let guards: Vec<_> =
309                futures::future::join_all(shards.iter().map(|s| async { s.read().await })).await;
310
311            // Create references for the query
312            let refs: Vec<&ShardedColony> = guards.iter().map(|g| &**g).collect();
313            let _ = engine.distributed_query(&refs, query);
314        }
315    }
316    let query_time = query_start.elapsed();
317
318    // Collect stats
319    let mut total_nodes = 0;
320    let mut total_edges = 0;
321    for shard in &shards {
322        let s = shard.read().await;
323        let stats = s.stats();
324        total_nodes += stats.graph_nodes;
325        total_edges += stats.graph_edges;
326    }
327
328    let total_time = total_start.elapsed();
329
330    BenchResults {
331        setup_time,
332        ingest_time,
333        tick_time,
334        query_time,
335        total_time,
336        docs_per_second: config.num_documents as f64 / ingest_time.as_secs_f64(),
337        ticks_per_second: config.num_ticks as f64 / tick_time.as_secs_f64(),
338        queries_per_second: total_queries as f64 / query_time.as_secs_f64(),
339        total_nodes,
340        total_edges,
341        num_shards: config.num_shards,
342        num_documents: config.num_documents,
343        num_ticks: config.num_ticks,
344    }
345}
346
347/// Run a quick benchmark with minimal parameters.
348///
349/// Useful for testing that the benchmark infrastructure works.
350pub async fn run_quick_benchmark() -> BenchResults {
351    run_benchmark(BenchConfig {
352        num_shards: 2,
353        num_documents: 20,
354        num_ticks: 10,
355        num_queries: 5,
356        sample_queries: vec!["cell".to_string(), "protein".to_string()],
357    })
358    .await
359}
360
361/// Compare single-node vs distributed performance.
362///
363/// Runs benchmarks with 1, 3, and 5 shards and prints a comparison table.
364pub async fn compare_single_vs_distributed(num_documents: usize, num_ticks: u64) {
365    println!("\n=== Single-Node vs Distributed Comparison ===\n");
366
367    let base_config = BenchConfig {
368        num_documents,
369        num_ticks,
370        num_queries: 20,
371        ..Default::default()
372    };
373
374    // Single node (1 shard)
375    println!("Running single-node benchmark...");
376    let single_result = run_benchmark(BenchConfig {
377        num_shards: 1,
378        ..base_config.clone()
379    })
380    .await;
381
382    // Distributed (3 shards)
383    println!("Running 3-shard distributed benchmark...");
384    let dist_3_result = run_benchmark(BenchConfig {
385        num_shards: 3,
386        ..base_config.clone()
387    })
388    .await;
389
390    // Distributed (5 shards)
391    println!("Running 5-shard distributed benchmark...");
392    let dist_5_result = run_benchmark(BenchConfig {
393        num_shards: 5,
394        ..base_config.clone()
395    })
396    .await;
397
398    println!("\n| Shards | Ingest (docs/s) | Ticks/s | Queries/s | Total Time |");
399    println!("|--------|-----------------|---------|-----------|------------|");
400    println!(
401        "| 1      | {:>15.1} | {:>7.1} | {:>9.1} | {:>10?} |",
402        single_result.docs_per_second,
403        single_result.ticks_per_second,
404        single_result.queries_per_second,
405        single_result.total_time
406    );
407    println!(
408        "| 3      | {:>15.1} | {:>7.1} | {:>9.1} | {:>10?} |",
409        dist_3_result.docs_per_second,
410        dist_3_result.ticks_per_second,
411        dist_3_result.queries_per_second,
412        dist_3_result.total_time
413    );
414    println!(
415        "| 5      | {:>15.1} | {:>7.1} | {:>9.1} | {:>10?} |",
416        dist_5_result.docs_per_second,
417        dist_5_result.ticks_per_second,
418        dist_5_result.queries_per_second,
419        dist_5_result.total_time
420    );
421
422    println!("\n| Shards | Nodes  | Edges  |");
423    println!("|--------|--------|--------|");
424    println!(
425        "| 1      | {:>6} | {:>6} |",
426        single_result.total_nodes, single_result.total_edges
427    );
428    println!(
429        "| 3      | {:>6} | {:>6} |",
430        dist_3_result.total_nodes, dist_3_result.total_edges
431    );
432    println!(
433        "| 5      | {:>6} | {:>6} |",
434        dist_5_result.total_nodes, dist_5_result.total_edges
435    );
436}
437
438/// Run a scaling benchmark across different shard counts.
439///
440/// Returns results for 1, 2, 4, and 8 shards.
441pub async fn scaling_benchmark(num_documents: usize, num_ticks: u64) -> Vec<BenchResults> {
442    let shard_counts = [1, 2, 4, 8];
443    let mut results = Vec::new();
444
445    for &num_shards in &shard_counts {
446        println!("Running benchmark with {} shard(s)...", num_shards);
447        let result = run_benchmark(BenchConfig {
448            num_shards,
449            num_documents,
450            num_ticks,
451            num_queries: 20,
452            ..Default::default()
453        })
454        .await;
455        results.push(result);
456    }
457
458    results
459}
460
461/// Print scaling benchmark results as a table.
462pub fn print_scaling_results(results: &[BenchResults]) {
463    println!("\n=== Scaling Benchmark Results ===\n");
464    println!("{}", BenchResults::csv_header());
465    for result in results {
466        println!("{}", result.to_csv_row());
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473
474    #[tokio::test]
475    async fn test_create_bench_cluster() {
476        let (coordinator, shards) = create_bench_cluster(3);
477        assert_eq!(coordinator.shard_count().await, 3);
478        assert_eq!(shards.len(), 3);
479    }
480
481    #[test]
482    fn test_generate_documents() {
483        let docs = generate_documents(10);
484        assert_eq!(docs.len(), 10);
485        assert!(docs[0].0.contains("Document 0"));
486        assert!(!docs[0].1.is_empty());
487    }
488
489    #[test]
490    fn test_generate_documents_cycles_topics() {
491        let docs = generate_documents(20);
492        // Should cycle through 8 topics
493        assert!(docs[0].0.contains("Cell Biology"));
494        assert!(docs[8].0.contains("Cell Biology")); // Cycles back
495    }
496
497    #[tokio::test]
498    async fn test_small_benchmark() {
499        let result = run_benchmark(BenchConfig {
500            num_shards: 2,
501            num_documents: 10,
502            num_ticks: 5,
503            num_queries: 5,
504            sample_queries: vec!["cell".to_string()],
505        })
506        .await;
507
508        assert!(result.docs_per_second > 0.0);
509        assert!(result.ticks_per_second > 0.0);
510        assert_eq!(result.num_shards, 2);
511        assert_eq!(result.num_documents, 10);
512        assert_eq!(result.num_ticks, 5);
513    }
514
515    #[tokio::test]
516    async fn test_quick_benchmark() {
517        let result = run_quick_benchmark().await;
518
519        assert!(result.docs_per_second > 0.0);
520        assert!(result.total_time.as_millis() > 0);
521    }
522
523    #[test]
524    fn test_bench_config_builder() {
525        let config = BenchConfig::new()
526            .with_shards(5)
527            .with_documents(200)
528            .with_ticks(50)
529            .with_queries(30)
530            .with_sample_queries(vec!["test".to_string()]);
531
532        assert_eq!(config.num_shards, 5);
533        assert_eq!(config.num_documents, 200);
534        assert_eq!(config.num_ticks, 50);
535        assert_eq!(config.num_queries, 30);
536        assert_eq!(config.sample_queries, vec!["test".to_string()]);
537    }
538
539    #[test]
540    fn test_bench_results_csv() {
541        let results = BenchResults {
542            setup_time: Duration::from_millis(10),
543            ingest_time: Duration::from_millis(100),
544            tick_time: Duration::from_millis(200),
545            query_time: Duration::from_millis(50),
546            total_time: Duration::from_millis(360),
547            docs_per_second: 1000.0,
548            ticks_per_second: 100.0,
549            queries_per_second: 500.0,
550            total_nodes: 50,
551            total_edges: 100,
552            num_shards: 3,
553            num_documents: 100,
554            num_ticks: 20,
555        };
556
557        let csv = results.to_csv_row();
558        assert!(csv.contains("3,100,20"));
559        assert!(csv.contains("1000.00"));
560    }
561
562    #[tokio::test]
563    async fn test_benchmark_shard_distribution() {
564        let config = BenchConfig {
565            num_shards: 3,
566            num_documents: 30,
567            num_ticks: 5,
568            num_queries: 3,
569            sample_queries: vec!["cell".to_string()],
570        };
571
572        let result = run_benchmark(config).await;
573
574        // All documents should be ingested
575        assert!(result.num_documents == 30);
576        // With 3 shards, documents should be distributed
577        assert!(result.total_time.as_millis() > 0);
578    }
579}