use acty::{Actor, ActorExt, AsyncClose};
use futures::{Stream, StreamExt, future::join_all};
use std::pin::pin;
use tokio::sync::oneshot;
struct Worker;
struct ComputeChunk {
start: u64,
end: u64,
responder: oneshot::Sender<u64>,
}
impl Actor for Worker {
type Message = ComputeChunk;
async fn run(self, inbox: impl Stream<Item = Self::Message> + Send) {
let mut inbox = pin!(inbox);
if let Some(msg) = inbox.next().await {
let sum = (msg.start..=msg.end).sum();
msg.responder.send(sum).unwrap_or(());
}
}
}
struct Manager {
num_workers: u64,
}
struct ComputeSum {
start: u64,
end: u64,
responder: oneshot::Sender<u64>,
}
impl Actor for Manager {
type Message = ComputeSum;
async fn run(self, inbox: impl Stream<Item = Self::Message> + Send) {
let mut inbox = pin!(inbox);
while let Some(msg) = inbox.next().await {
println!(
"Manager: Received job to sum from {} to {}.",
msg.start, msg.end
);
let total_range = msg.end - msg.start + 1;
let chunk_size = total_range.div_ceil(self.num_workers);
let mut response_rxs = Vec::new();
for i in 0..self.num_workers {
let chunk_start = msg.start + i * chunk_size;
let chunk_end = (chunk_start + chunk_size - 1).min(msg.end);
if chunk_start > msg.end {
break;
}
println!(
"Manager: Spawning worker for range {}..={}",
chunk_start, chunk_end
);
let (worker_tx, worker_rx) = oneshot::channel();
response_rxs.push(worker_rx);
let worker = Worker.start();
let worker_msg = ComputeChunk {
start: chunk_start,
end: chunk_end,
responder: worker_tx,
};
worker.send(worker_msg).unwrap_or(());
}
let worker_results = join_all(response_rxs).await;
let total_sum: u64 = worker_results.into_iter().map(|res| res.unwrap_or(0)).sum();
msg.responder.send(total_sum).unwrap_or(());
}
}
}
#[tokio::main]
async fn main() {
let manager = Manager { num_workers: 4 }.start();
let (tx, rx) = oneshot::channel();
let job = ComputeSum {
start: 1,
end: 100,
responder: tx,
};
manager.send(job).unwrap_or(());
let result = rx.await.expect("Manager did not respond");
let expected: u64 = (1..=100).sum();
println!("\nFinal result: {}, Expected: {}", result, expected);
assert_eq!(result, expected);
manager.close().await;
}