use jugar_probar::brick::distributed::{
BackendSelector, BrickCoordinator, BrickDataTracker, BrickMessage, WorkerId,
};
use std::sync::Arc;
fn main() {
println!("╔══════════════════════════════════════════════════════════════╗");
println!("║ Probar Distributed Worker Demo for Ruchy ║");
println!("╚══════════════════════════════════════════════════════════════╝\n");
demo_backend_selection();
demo_data_locality();
demo_pub_sub_coordination();
demo_compilation_pipeline();
}
fn demo_backend_selection() {
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!(" 1. Backend Selection for Compilation");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
let selector = BackendSelector::new()
.with_gpu_threshold(500_000) .with_simd_threshold(10_000) .with_cpu_max_threshold(50_000_000);
println!(" BackendSelector Configuration:");
println!(" ├─ GPU threshold: >= 500,000 chars");
println!(" ├─ SIMD threshold: >= 10,000 chars");
println!(" └─ CPU max: 50,000,000 chars\n");
let test_cases = [
(500, "Tiny script (500 chars)"),
(5_000, "Small module (5KB)"),
(25_000, "Medium module (25KB)"),
(100_000, "Large module (100KB)"),
(750_000, "Very large module (750KB)"),
];
println!(" Selection Results:");
println!(" ┌────────────────────────────┬──────────────┬──────────┐");
println!(" │ Source File │ Size (chars) │ Backend │");
println!(" ├────────────────────────────┼──────────────┼──────────┤");
for (size, desc) in &test_cases {
let gpu_available = *size > 100_000; let backend = selector.select(*size, gpu_available);
println!(" │ {:<26} │ {:>12} │ {:?} │", desc, size, backend);
}
println!(" └────────────────────────────┴──────────────┴──────────┘");
println!("\n Backend Capabilities:");
println!(" ├─ CPU: Always available, good for small files");
println!(" ├─ SIMD: Vectorized parsing, 2-4x faster for medium files");
println!(" ├─ GPU: Parallel compilation, best for large codebases");
println!(" └─ Remote: Distributed compilation (future)");
println!();
}
fn demo_data_locality() {
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!(" 2. Data Locality for Compilation Resources");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
let tracker = BrickDataTracker::new();
let worker0 = WorkerId::new(0);
let worker1 = WorkerId::new(1);
let worker2 = WorkerId::new(2);
println!(" Registering compilation resources:");
println!(" Worker 0: parser_cache (2MB), type_checker (3MB), stdlib (5MB)");
println!(" Worker 1: parser_cache (2MB), optimizer (4MB)");
println!(" Worker 2: code_gen (6MB), stdlib (5MB)");
println!();
tracker.track_data("parser_cache", worker0, 2 * 1024 * 1024);
tracker.track_data("type_checker", worker0, 3 * 1024 * 1024);
tracker.track_data("stdlib", worker0, 5 * 1024 * 1024);
tracker.track_data("parser_cache", worker1, 2 * 1024 * 1024);
tracker.track_data("optimizer", worker1, 4 * 1024 * 1024);
tracker.track_data("code_gen", worker2, 6 * 1024 * 1024);
tracker.track_data("stdlib", worker2, 5 * 1024 * 1024);
println!(" Data Location Queries:");
for key in &["parser_cache", "type_checker", "stdlib", "optimizer", "code_gen"] {
let workers = tracker.get_workers_for_data(key);
let worker_ids: Vec<_> = workers.iter().map(|w| w.0).collect();
println!(" {} → workers {:?}", key, worker_ids);
}
println!();
println!(" Task Affinity Scores:");
let full_compile_deps = vec![
"parser_cache".into(),
"type_checker".into(),
"code_gen".into(),
];
let full_affinity = tracker.calculate_affinity(&full_compile_deps);
println!("\n Full Compilation (parser + type_checker + code_gen):");
print_affinity_scores(&full_affinity);
let type_check_deps = vec![
"parser_cache".into(),
"type_checker".into(),
"stdlib".into(),
];
let type_affinity = tracker.calculate_affinity(&type_check_deps);
println!("\n Type Checking (parser + type_checker + stdlib):");
print_affinity_scores(&type_affinity);
let optimize_deps = vec!["parser_cache".into(), "optimizer".into()];
let opt_affinity = tracker.calculate_affinity(&optimize_deps);
println!("\n Optimization (parser + optimizer):");
print_affinity_scores(&opt_affinity);
println!();
}
fn demo_pub_sub_coordination() {
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!(" 3. PUB/SUB Coordination");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
let coordinator = BrickCoordinator::new();
let compile_sub = coordinator.subscribe("compilation");
let error_sub = coordinator.subscribe("errors");
let progress_sub = coordinator.subscribe("progress");
println!(" Subscriptions:");
println!(" ├─ Topic: {} (compilation events)", compile_sub.topic());
println!(" ├─ Topic: {} (error events)", error_sub.topic());
println!(" └─ Topic: {} (progress updates)", progress_sub.topic());
println!();
println!(" Publishing compilation events...");
coordinator.publish(
"progress",
BrickMessage::StateChange {
brick_name: "parser".into(),
event: "started".into(),
},
);
coordinator.publish(
"progress",
BrickMessage::StateChange {
brick_name: "parser".into(),
event: "completed".into(),
},
);
coordinator.publish(
"compilation",
BrickMessage::StateChange {
brick_name: "type_checker".into(),
event: "started".into(),
},
);
coordinator.publish(
"errors",
BrickMessage::StateChange {
brick_name: "type_checker".into(),
event: "warning: unused variable 'x'".into(),
},
);
coordinator.publish(
"compilation",
BrickMessage::StateChange {
brick_name: "type_checker".into(),
event: "completed".into(),
},
);
coordinator.publish(
"compilation",
BrickMessage::StateChange {
brick_name: "code_gen".into(),
event: "completed".into(),
},
);
let compile_msgs = compile_sub.drain();
let error_msgs = error_sub.drain();
let progress_msgs = progress_sub.drain();
println!("\n Received Messages:");
println!(" compilation topic: {} messages", compile_msgs.len());
for msg in &compile_msgs {
if let BrickMessage::StateChange { brick_name, event } = msg {
println!(" • {} → {}", brick_name, event);
}
}
println!(" errors topic: {} messages", error_msgs.len());
for msg in &error_msgs {
if let BrickMessage::StateChange { brick_name, event } = msg {
println!(" • {} → {}", brick_name, event);
}
}
println!(" progress topic: {} messages", progress_msgs.len());
for msg in &progress_msgs {
if let BrickMessage::StateChange { brick_name, event } = msg {
println!(" • {} → {}", brick_name, event);
}
}
println!();
}
fn demo_compilation_pipeline() {
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!(" 4. Ruchy Compilation Pipeline");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n");
let tracker = Arc::new(BrickDataTracker::new());
let coordinator = BrickCoordinator::new();
let selector = BackendSelector::new()
.with_simd_threshold(5_000)
.with_gpu_threshold(100_000);
tracker.track_data("lexer_tables", WorkerId::new(0), 1024 * 1024);
tracker.track_data("grammar_cache", WorkerId::new(0), 2 * 1024 * 1024);
tracker.track_data("type_env", WorkerId::new(1), 3 * 1024 * 1024);
tracker.track_data("wasm_templates", WorkerId::new(2), 4 * 1024 * 1024);
let pipeline_sub = coordinator.subscribe("pipeline");
println!(" Pipeline Stages:");
println!(" ┌─────────────┐ ┌─────────────┐ ┌─────────────┐");
println!(" │ Lexer │ -> │ Parser │ -> │ Type Check │");
println!(" │ (Worker 0) │ │ (Worker 0) │ │ (Worker 1) │");
println!(" └─────────────┘ └─────────────┘ └─────────────┘");
println!(" │");
println!(" ┌─────────────┐ ┌─────────────┐ │");
println!(" │ Optimize │ <- │ Code Gen │ <───────┘");
println!(" │ (Worker 1) │ │ (Worker 2) │");
println!(" └─────────────┘ └─────────────┘");
println!();
let source_sizes = [
(1_000, "small.ruchy"),
(20_000, "medium.ruchy"),
(150_000, "large.ruchy"),
];
println!(" Compilation Runs:");
for (size, filename) in &source_sizes {
let backend = selector.select(*size, true);
let deps = vec!["lexer_tables".into(), "grammar_cache".into()];
let affinity = tracker.calculate_affinity(&deps);
let best_worker = affinity
.iter()
.max_by(|a, b| a.1.partial_cmp(b.1).unwrap())
.map(|(w, _)| w.0)
.unwrap_or(0);
coordinator.publish(
"pipeline",
BrickMessage::StateChange {
brick_name: (*filename).into(),
event: format!("compiled on {:?} (worker {})", backend, best_worker),
},
);
println!(
" {} ({} chars): {:?} backend, worker {}",
filename, size, backend, best_worker
);
}
let msgs = pipeline_sub.drain();
println!("\n Pipeline Events: {} total", msgs.len());
println!("\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
println!(" Demo complete!");
println!("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━");
}
fn print_affinity_scores(affinity: &std::collections::HashMap<WorkerId, f64>) {
let mut sorted: Vec<_> = affinity.iter().collect();
sorted.sort_by(|a, b| b.1.partial_cmp(a.1).unwrap());
for (worker, score) in &sorted {
let bar_len = (*score * 20.0) as usize;
let bar: String = "█".repeat(bar_len);
println!(" Worker {}: {:.2} {}", worker.0, score, bar);
}
if let Some((best, _)) = sorted.first() {
println!(" → Best worker: {}", best.0);
}
}