use rayrust::prelude::*;
#[rayrust::remote]
fn add(a: i32, b: i32) -> i32 {
a + b
}
#[rayrust::remote]
fn greet(name: String) -> String {
format!("Hello, {} from async Rust!", name)
}
#[tokio::main]
async fn main() {
let address = std::env::var("RAY_ADDRESS")
.unwrap_or_else(|_| "192.168.42.141:6379".to_string());
let node_ip = std::env::var("RAY_NODE_IP")
.unwrap_or_else(|_| "192.168.42.106".to_string());
let worker_so = std::env::var("RAY_WORKER_SO")
.unwrap_or_else(|_| {
eprintln!("ERROR: RAY_WORKER_SO not set.");
std::process::exit(1);
});
println!("=== Ray Async Demo (GetAsync + eventfd) ===");
println!("Address: {}, Node: {}", address, node_ip);
let config = RayConfig::new(&address)
.node_ip(&node_ip)
.code_search_path(vec![worker_so.clone()]);
rayrust::init_with_config(&config).expect("init failed");
println!("✓ Ray initialized\n");
println!("--- Concurrent Remote Tasks ---");
let (r1, r2, r3) = tokio::join!(
add_remote_async(10, 32),
add_remote_async(100, 200),
greet_remote_async("Tokio".to_string()),
);
let r1: ObjectRef<i32> = r1.expect("add(10,32) failed");
let r2: ObjectRef<i32> = r2.expect("add(100,200) failed");
let r3: ObjectRef<String> = r3.expect("greet failed");
let (v1, v2, v3) = tokio::join!(
r1.get_async(),
r2.get_async(),
r3.get_async(),
);
println!("add(10, 32) = {} ✓", v1.unwrap());
println!("add(100, 200) = {} ✓", v2.unwrap());
println!("greet(\"Tokio\") = {} ✓", v3.unwrap());
println!("\n--- Batch Concurrent (10 tasks) ---");
let mut submit_futs = Vec::new();
for i in 0..10i32 {
submit_futs.push(add_remote_async(i, i * 2));
}
let obj_refs: Vec<ObjectRef<i32>> = join_all(submit_futs).await
.into_iter().map(|r| r.expect("task failed")).collect();
let mut get_futs = Vec::new();
for obj_ref in obj_refs {
get_futs.push(async move { obj_ref.get_async().await });
}
let results: Vec<i32> = join_all(get_futs).await
.into_iter().map(|r| r.unwrap_or(0)).collect();
let mut sum = 0i32;
for (i, val) in results.iter().enumerate() {
sum += val;
if i < 3 || i >= results.len() - 1 {
println!(" task[{}] add({}, {}) = {}", i, i, i * 2, val);
} else if i == 3 {
println!(" ...");
}
}
println!("Sum of all results: {} ✓", sum);
println!("\n--- Shutdown ---");
rayrust::shutdown();
println!("✓ Ray shutdown");
}
async fn join_all<F, T>(futs: Vec<F>) -> Vec<T>
where
F: std::future::Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let mut set = tokio::task::JoinSet::new();
for f in futs {
set.spawn(f);
}
let mut results = Vec::with_capacity(set.len());
while let Some(res) = set.join_next().await {
results.push(res.unwrap());
}
results
}