use ash_flare::{
Mailbox, RestartPolicy, SupervisorHandle, SupervisorSpec, Worker,
mailbox::{MailboxConfig, mailbox},
};
use async_trait::async_trait;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;
#[derive(Debug)]
struct WorkerError(String);
impl std::fmt::Display for WorkerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for WorkerError {}
struct PoolWorker {
worker_id: usize,
mailbox: Arc<Mutex<Mailbox>>,
}
impl PoolWorker {
fn new(worker_id: usize, mailbox: Arc<Mutex<Mailbox>>) -> Self {
Self { worker_id, mailbox }
}
async fn process_message(&self, msg: String) -> Result<(), WorkerError> {
println!("[Worker {}] Processing: {}", self.worker_id, msg);
sleep(Duration::from_millis(100)).await;
if msg.contains("fail") {
println!("[Worker {}] Failed processing: {}", self.worker_id, msg);
return Err(WorkerError(format!("Failed to process: {}", msg)));
}
println!("[Worker {}] Completed: {}", self.worker_id, msg);
Ok(())
}
}
#[async_trait]
impl Worker for PoolWorker {
type Error = WorkerError;
async fn run(&mut self) -> Result<(), Self::Error> {
println!("[Worker {}] Started", self.worker_id);
loop {
let msg = {
let mut mailbox = self.mailbox.lock().await;
mailbox.recv().await
};
match msg {
Some(msg) => {
self.process_message(msg).await?;
}
None => {
println!("[Worker {}] Mailbox closed, shutting down", self.worker_id);
break;
}
}
}
Ok(())
}
async fn shutdown(&mut self) -> Result<(), Self::Error> {
println!("[Worker {}] Shutting down", self.worker_id);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== Worker Pool with Shared Mailbox Example ===\n");
let (handle, mailbox) = mailbox(MailboxConfig::bounded(20));
let shared_mailbox = Arc::new(Mutex::new(mailbox));
let mb1 = Arc::clone(&shared_mailbox);
let mb2 = Arc::clone(&shared_mailbox);
let mb3 = Arc::clone(&shared_mailbox);
let mb4 = Arc::clone(&shared_mailbox);
let spec = SupervisorSpec::new("worker-pool")
.with_worker(
"worker-1",
move || PoolWorker::new(1, Arc::clone(&mb1)),
RestartPolicy::Permanent,
)
.with_worker(
"worker-2",
move || PoolWorker::new(2, Arc::clone(&mb2)),
RestartPolicy::Permanent,
)
.with_worker(
"worker-3",
move || PoolWorker::new(3, Arc::clone(&mb3)),
RestartPolicy::Permanent,
)
.with_worker(
"worker-4",
move || PoolWorker::new(4, Arc::clone(&mb4)),
RestartPolicy::Permanent,
);
let supervisor = SupervisorHandle::start(spec);
sleep(Duration::from_millis(100)).await;
println!("Sending messages to worker pool...\n");
for i in 1..=10 {
let msg = if i == 5 {
format!("task-{} (should fail)", i)
} else {
format!("task-{}", i)
};
handle.send(msg).await?;
}
sleep(Duration::from_secs(2)).await;
println!("\n=== Supervisor Status ===");
let children = supervisor.which_children().await?;
for child in children {
println!("Child: {} (type: {:?})", child.id, child.child_type);
}
println!("\n=== Shutting Down ===");
supervisor.shutdown().await?;
println!("\nExample completed!");
Ok(())
}