#[cfg(feature = "distributed")]
use repartir::{
error::Result,
executor::remote::RemoteExecutor,
executor::Executor,
task::{Backend, ExecutionResult, Task},
};
#[cfg(feature = "distributed")]
use std::time::Instant;
#[cfg(feature = "distributed")]
#[tokio::main]
async fn main() -> Result<()> {
let args: Vec<String> = std::env::args().collect();
let (linux_addr, mac_addr) = if args.len() >= 5 {
let linux = args
.iter()
.position(|a| a == "--linux")
.map(|i| args.get(i + 1).cloned())
.flatten()
.unwrap_or_else(|| "localhost:9000".to_string());
let mac = args
.iter()
.position(|a| a == "--mac")
.map(|i| args.get(i + 1).cloned())
.flatten()
.unwrap_or_else(|| "localhost:9001".to_string());
(linux, mac)
} else {
println!("╔════════════════════════════════════════════════════════════════╗");
println!("║ MULTI-MACHINE DISTRIBUTED COMPUTING DEMO ║");
println!("╠════════════════════════════════════════════════════════════════╣");
println!("║ ║");
println!("║ To run across Linux + Mac: ║");
println!("║ ║");
println!("║ 1. On Linux machine: ║");
println!("║ cd ~/src/repartir ║");
println!("║ cargo run --bin repartir-worker --features remote \\ ║");
println!("║ --release -- --bind 0.0.0.0:9000 ║");
println!("║ ║");
println!("║ 2. On Mac machine: ║");
println!("║ cd ~/src/repartir ║");
println!("║ cargo run --bin repartir-worker --features remote \\ ║");
println!("║ --release -- --bind 0.0.0.0:9000 ║");
println!("║ ║");
println!("║ 3. Run coordinator with actual IPs: ║");
println!("║ cargo run --example multi_machine_demo --features \\ ║");
println!("║ distributed -- --linux 10.0.0.1:9000 \\ ║");
println!("║ --mac 10.0.0.2:9000 ║");
println!("║ ║");
println!("╚════════════════════════════════════════════════════════════════╝");
println!();
println!("Running in LOCAL DEMO mode (single machine simulation)...\n");
("localhost:9000".to_string(), "localhost:9001".to_string())
};
println!("═══════════════════════════════════════════════════════════════");
println!(" Connecting to workers...");
println!("═══════════════════════════════════════════════════════════════");
println!(" Linux worker: {}", linux_addr);
println!(" Mac worker: {}", mac_addr);
println!();
let executor = RemoteExecutor::new().await?;
let mut connected = 0;
for addr in [&linux_addr, &mac_addr] {
match executor.add_worker(addr).await {
Ok(()) => {
println!(" ✓ Connected to {}", addr);
connected += 1;
}
Err(e) => {
println!(" ✗ Failed to connect to {}: {}", addr, e);
}
}
}
println!();
if connected > 0 {
println!("Connected to {} workers\n", executor.capacity());
run_distributed_demo(&executor).await?;
} else {
println!("Could not connect to any workers.");
println!();
println!("Make sure workers are running:");
println!(
" Linux: cargo run --bin repartir-worker --features remote -- --bind 0.0.0.0:9000"
);
println!(
" Mac: cargo run --bin repartir-worker --features remote -- --bind 0.0.0.0:9000"
);
println!();
println!("Falling back to LOCAL CPU pool demo...\n");
run_local_demo().await?;
}
Ok(())
}
#[cfg(feature = "distributed")]
async fn run_distributed_demo(executor: &RemoteExecutor) -> Result<()> {
println!("═══════════════════════════════════════════════════════════════");
println!(" Running distributed tasks across machines");
println!("═══════════════════════════════════════════════════════════════\n");
println!("1. Querying system info from each node...\n");
let tasks = vec![
("Linux", Task::builder().binary("uname").arg("-a").backend(Backend::Cpu).build()?),
("Mac", Task::builder().binary("uname").arg("-a").backend(Backend::Cpu).build()?),
];
for (name, task) in tasks {
let start = Instant::now();
let exec_result: Result<ExecutionResult> = executor.execute(task).await;
match exec_result {
Ok(result) if result.is_success() => {
let output = result.stdout_str().unwrap_or_default();
println!(" {} ({:.1}ms):", name, start.elapsed().as_secs_f64() * 1000.0);
println!(" └─ {}", output.trim());
}
Ok(result) => {
println!(" {} failed: {:?}", name, result.exit_code());
}
Err(e) => {
println!(" {} error: {}", name, e);
}
}
}
println!();
println!("2. Running parallel computation on both nodes...\n");
let computations = vec![
("Shard 0 (Linux)", "echo 'Computing shard 0...' && sleep 0.5 && echo 'Shard 0 complete'"),
("Shard 1 (Mac)", "echo 'Computing shard 1...' && sleep 0.5 && echo 'Shard 1 complete'"),
];
let start = Instant::now();
let mut handles = vec![];
for (name, cmd) in &computations {
let task =
Task::builder().binary("sh").arg("-c").arg(*cmd).backend(Backend::Cpu).build()?;
let result: ExecutionResult = executor.execute(task).await?;
handles.push((name, result));
}
let total_time = start.elapsed();
for (name, result) in handles {
if result.is_success() {
let output = result.stdout_str().unwrap_or_default();
println!(" {}: {}", name, output.trim().replace('\n', " → "));
}
}
println!();
println!(" Total time: {:.2}s (parallel would be ~0.5s)", total_time.as_secs_f64());
println!();
println!("3. Comparing architectures...\n");
let arch_task = Task::builder()
.binary("sh")
.arg("-c")
.arg("echo \"Arch: $(uname -m), CPUs: $(nproc 2>/dev/null || sysctl -n hw.ncpu)\"")
.backend(Backend::Cpu)
.build()?;
let result: ExecutionResult = executor.execute(arch_task).await?;
if result.is_success() {
println!(" {}", result.stdout_str().unwrap_or_default().trim());
}
println!();
println!("═══════════════════════════════════════════════════════════════");
println!(" Distributed demo complete!");
println!("═══════════════════════════════════════════════════════════════");
Ok(())
}
#[cfg(feature = "distributed")]
async fn run_local_demo() -> Result<()> {
use repartir::Pool;
println!("═══════════════════════════════════════════════════════════════");
println!(" Running local CPU pool demo");
println!("═══════════════════════════════════════════════════════════════\n");
let cpu_count = std::thread::available_parallelism().map(|p| p.get()).unwrap_or(4);
let pool = Pool::builder().cpu_workers(cpu_count.min(8)).build()?;
println!(" Pool: {} workers on {} cores\n", pool.capacity(), cpu_count);
let tasks = vec![
("System", "uname -a"),
("CPU Info", "grep -c processor /proc/cpuinfo 2>/dev/null || sysctl -n hw.ncpu"),
("Memory", "free -h 2>/dev/null | head -2 || vm_stat | head -5"),
];
for (name, cmd) in tasks {
let task = Task::builder().binary("sh").arg("-c").arg(cmd).backend(Backend::Cpu).build()?;
let start = Instant::now();
let result = pool.submit(task).await?;
let elapsed = start.elapsed();
if result.is_success() {
let output = result.stdout_str().unwrap_or_default();
let first_line = output.lines().next().unwrap_or("").trim();
println!(" {} ({:.1}ms): {}", name, elapsed.as_secs_f64() * 1000.0, first_line);
}
}
pool.shutdown().await;
println!();
println!("═══════════════════════════════════════════════════════════════");
println!(" Local demo complete!");
println!("═══════════════════════════════════════════════════════════════");
Ok(())
}
#[cfg(not(feature = "distributed"))]
fn main() {
println!("This example requires the 'distributed' feature.");
println!("Run with: cargo run --example multi_machine_demo --features distributed");
}