#[cfg(unix)]
use ash_flare::distributed::{RemoteSupervisorHandle, SupervisorAddress, SupervisorServer};
#[cfg(unix)]
use ash_flare::{RestartPolicy, RestartStrategy, SupervisorHandle, SupervisorSpec, Worker};
#[cfg(unix)]
use async_trait::async_trait;
#[cfg(unix)]
use std::time::Duration;
#[cfg(unix)]
use tokio::time::sleep;
#[cfg(unix)]
#[derive(Debug)]
struct WorkerError(String);
#[cfg(unix)]
impl std::fmt::Display for WorkerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
#[cfg(unix)]
impl std::error::Error for WorkerError {}
#[cfg(unix)]
struct Counter {
name: String,
}
#[cfg(unix)]
impl Counter {
fn new(name: impl Into<String>) -> Self {
Self { name: name.into() }
}
}
#[cfg(unix)]
#[async_trait]
impl Worker for Counter {
type Error = WorkerError;
async fn run(&mut self) -> Result<(), Self::Error> {
let mut count = 0;
loop {
count += 1;
println!("[{}] count: {}", self.name, count);
sleep(Duration::from_secs(2)).await;
}
}
async fn shutdown(&mut self) -> Result<(), Self::Error> {
println!("[{}] shutdown", self.name);
Ok(())
}
}
#[cfg(unix)]
async fn run_supervisor_server(socket_path: String) {
let spec = SupervisorSpec::new("remote_supervisor")
.with_restart_strategy(RestartStrategy::OneForOne)
.with_worker(
"worker1",
|| Counter::new("worker1"),
RestartPolicy::Permanent,
)
.with_worker(
"worker2",
|| Counter::new("worker2"),
RestartPolicy::Permanent,
)
.with_worker(
"worker3",
|| Counter::new("worker3"),
RestartPolicy::Permanent,
);
let supervisor = SupervisorHandle::start(spec);
let server = SupervisorServer::new(supervisor);
println!("\nđ Supervisor server starting...\n");
if let Err(e) = server.listen_unix(&socket_path).await {
eprintln!("Server error: {}", e);
}
}
#[cfg(unix)]
async fn run_client(socket_path: String) {
sleep(Duration::from_secs(1)).await;
let remote = RemoteSupervisorHandle::new(SupervisorAddress::Unix(socket_path));
println!("\nđĄ Client connecting to remote supervisor...\n");
println!("đ Getting supervisor status...");
match remote.status().await {
Ok(status) => {
println!(" Name: {}", status.name);
println!(" Children: {}", status.children_count);
println!(" Strategy: {}", status.restart_strategy);
println!(" Uptime: {}s", status.uptime_secs);
}
Err(e) => eprintln!(" Error: {}", e),
}
sleep(Duration::from_secs(2)).await;
println!("\nđ Listing children...");
match remote.which_children().await {
Ok(children) => {
for child in children {
println!(" - {} ({:?})", child.id, child.child_type);
}
}
Err(e) => eprintln!(" Error: {}", e),
}
sleep(Duration::from_secs(3)).await;
println!("\nđ§ Terminating worker2...");
match remote.terminate_child("worker2").await {
Ok(_) => println!(" â Terminated"),
Err(e) => eprintln!(" Error: {}", e),
}
sleep(Duration::from_secs(2)).await;
println!("\nđ Listing children after termination...");
match remote.which_children().await {
Ok(children) => {
for child in children {
println!(" - {} ({:?})", child.id, child.child_type);
}
}
Err(e) => eprintln!(" Error: {}", e),
}
sleep(Duration::from_secs(2)).await;
println!("\nđ Shutting down remote supervisor...");
match remote.shutdown().await {
Ok(_) => println!(" â Shutdown complete"),
Err(e) => eprintln!(" Error: {}", e),
}
}
#[tokio::main]
#[cfg(unix)]
async fn main() {
let socket_path = "/tmp/ash-flare-supervisor.sock";
let args: Vec<String> = std::env::args().collect();
if args.len() > 1 && args[1] == "server" {
run_supervisor_server(socket_path.to_string()).await;
} else {
println!("=== Distributed Supervisor Demo ===");
println!("\nâšī¸ To run this demo:");
println!(" Terminal 1: cargo run --example distributed server");
println!(" Terminal 2: cargo run --example distributed\n");
tokio::spawn(run_supervisor_server(socket_path.to_string()));
run_client(socket_path.to_string()).await;
sleep(Duration::from_secs(1)).await;
}
}
#[cfg(not(unix))]
fn main() {
eprintln!("This example requires Unix sockets and is not supported on Windows.");
eprintln!("Please run this example on Linux or macOS.");
std::process::exit(1);
}