use anyhow::Result;
use rsactor::{message_handlers, Actor, ActorRef};
use std::time::Duration;
use tokio::task::JoinHandle;
#[derive(Actor)]
struct WorkerActor {
task_counter: u32,
}
struct PanicTask;
#[message_handlers]
impl WorkerActor {
#[handler]
async fn handle_panic_task(&mut self, _: PanicTask, _: &ActorRef<Self>) -> JoinHandle<String> {
tokio::spawn(async {
tokio::time::sleep(Duration::from_millis(100)).await;
panic!("This task intentionally panics!");
})
}
}
struct HeavyComputationTask {
id: u32,
duration_secs: u64,
multiplier: u32,
}
struct FileProcessingTask {
filename: String,
content: String,
}
struct GetTaskCount;
#[message_handlers]
impl WorkerActor {
#[handler]
async fn handle_heavy_computation(
&mut self,
msg: HeavyComputationTask,
_: &ActorRef<Self>,
) -> JoinHandle<u64> {
self.task_counter += 1;
let task_id = msg.id;
let duration = Duration::from_secs(msg.duration_secs);
let multiplier = msg.multiplier;
println!(
"WorkerActor: Spawning heavy computation task {} (duration: {}s, multiplier: {})",
task_id, msg.duration_secs, multiplier
);
tokio::spawn(async move {
println!("Task {}: Starting computation...", task_id);
tokio::time::sleep(duration).await;
let result = (task_id as u64) * (multiplier as u64);
println!(
"Task {}: Computation completed, result: {}",
task_id, result
);
result
})
}
#[handler]
async fn handle_file_processing(
&mut self,
msg: FileProcessingTask,
_: &ActorRef<Self>,
) -> JoinHandle<String> {
self.task_counter += 1;
let filename = msg.filename.clone();
let content = msg.content.clone();
println!(
"WorkerActor: Spawning file processing task for '{}' ({} bytes)",
filename,
content.len()
);
tokio::spawn(async move {
println!("File task: Processing file '{}'...", filename);
tokio::time::sleep(Duration::from_millis(800)).await;
let processed_content = format!(
"PROCESSED[{}]: {} (length: {})",
filename,
content,
content.len()
);
println!("File task: Processing completed for '{}'", filename);
processed_content
})
}
#[handler]
async fn handle_get_task_count(&mut self, _: GetTaskCount, _: &ActorRef<Self>) -> u32 {
self.task_counter
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_target(false)
.init();
let (worker_ref, worker_handle) =
rsactor::spawn::<WorkerActor>(WorkerActor { task_counter: 0 });
println!("=== ask_join Demo: Heavy Computation Tasks ===");
let computation_tasks = vec![
HeavyComputationTask {
id: 1,
duration_secs: 2,
multiplier: 10,
},
HeavyComputationTask {
id: 2,
duration_secs: 1,
multiplier: 20,
},
HeavyComputationTask {
id: 3,
duration_secs: 3,
multiplier: 5,
},
];
let mut computation_handles = Vec::new();
for task in computation_tasks {
let worker_ref_clone = worker_ref.clone();
let handle = tokio::spawn(async move {
let task_id = task.id;
println!(
"Client: Sending computation task {} using ask_join",
task_id
);
let result = worker_ref_clone.ask_join(task).await;
(task_id, result)
});
computation_handles.push(handle);
}
for handle in computation_handles {
let (task_id, result) = handle.await?;
match result {
Ok(value) => println!(
"✅ Computation task {} completed with result: {}",
task_id, value
),
Err(e) => println!("❌ Computation task {} failed: {}", task_id, e),
}
}
println!("\n=== ask_join Demo: File Processing Tasks ===");
let file_tasks = vec![
FileProcessingTask {
filename: "document1.txt".to_string(),
content: "Hello world from document 1".to_string(),
},
FileProcessingTask {
filename: "data.json".to_string(),
content: r#"{"name": "example", "value": 42}"#.to_string(),
},
FileProcessingTask {
filename: "config.yaml".to_string(),
content: "server:\n port: 8080\n host: localhost".to_string(),
},
];
let mut file_handles = Vec::new();
for task in file_tasks {
let worker_ref_clone = worker_ref.clone();
let filename = task.filename.clone();
let handle = tokio::spawn(async move {
println!("Client: Processing file '{}' using ask_join", filename);
let result = worker_ref_clone.ask_join(task).await;
(filename, result)
});
file_handles.push(handle);
}
for handle in file_handles {
let (filename, result) = handle.await?;
match result {
Ok(content) => println!("✅ File '{}' processed: {}", filename, content),
Err(e) => println!("❌ File '{}' processing failed: {}", filename, e),
}
}
println!("\n=== ask_join Demo: Error Handling ===");
println!("Client: Sending a task that will panic to demonstrate error handling");
match worker_ref.ask_join(PanicTask).await {
Ok(result) => println!("Unexpected success: {}", result),
Err(rsactor::Error::Join { identity, source }) => {
println!(
"✅ Correctly caught join error from actor {}: {}",
identity, source
);
if source.is_panic() {
println!(" The task panicked as expected");
}
}
Err(e) => println!("Unexpected error type: {}", e),
}
println!("\n=== ask_join Demo: Comparison with Regular ask ===");
println!("Using regular ask (returns JoinHandle):");
let join_handle: JoinHandle<u64> = worker_ref
.ask(HeavyComputationTask {
id: 999,
duration_secs: 1,
multiplier: 100,
})
.await?;
println!("Got JoinHandle, now manually awaiting...");
let result = join_handle.await?;
println!("Manual await result: {}", result);
println!("\nUsing ask_join (automatically awaits):");
let result: u64 = worker_ref
.ask_join(HeavyComputationTask {
id: 1000,
duration_secs: 1,
multiplier: 100,
})
.await?;
println!("ask_join result: {}", result);
let task_count = worker_ref.ask(GetTaskCount).await?;
println!("\nTotal tasks processed by the actor: {}", task_count);
println!("\nStopping actor...");
worker_ref.stop().await?;
let _result = worker_handle.await?;
println!("ask_join demo completed successfully!");
Ok(())
}