use std::sync::{Arc, Mutex};
use std::time::Duration;
use async_trait::async_trait;
use dactor::actor::{
cancel_after, Actor, ActorContext, ActorRef, ReduceHandler, Handler, ExpandHandler,
};
use dactor::circuit_breaker::CircuitBreakerInterceptor;
use dactor::dead_letter::CollectingDeadLetterHandler;
use dactor::interceptor::{DropNotice, DropObserver};
use dactor::message::Message;
use dactor::pool::{PoolRef, PoolRouting};
use dactor::stream::{StreamReceiver, StreamSender};
use dactor::timer::send_after;
use dactor::{SpawnOptions, TestActorRef, TestRuntime};
use tokio_stream::StreamExt;
struct ProcessTask(String);
impl Message for ProcessTask {
type Reply = ();
}
struct GetStatus;
impl Message for GetStatus {
type Reply = String;
}
struct StreamItems;
impl Message for StreamItems {
type Reply = u32;
}
struct TaskProcessor {
id: usize,
processed: usize,
}
impl Actor for TaskProcessor {
type Args = usize;
type Deps = ();
fn create(id: usize, _: ()) -> Self {
Self { id, processed: 0 }
}
}
#[async_trait]
impl Handler<ProcessTask> for TaskProcessor {
async fn handle(&mut self, msg: ProcessTask, _ctx: &mut ActorContext) {
self.processed += 1;
println!(" [Worker-{}] processed: {}", self.id, msg.0);
}
}
#[async_trait]
impl Handler<GetStatus> for TaskProcessor {
async fn handle(&mut self, _msg: GetStatus, _ctx: &mut ActorContext) -> String {
format!("worker-{}: {} tasks done", self.id, self.processed)
}
}
#[async_trait]
impl ExpandHandler<StreamItems, u32> for TaskProcessor {
async fn handle_expand(
&mut self,
_msg: StreamItems,
sender: StreamSender<u32>,
_ctx: &mut ActorContext,
) {
for i in 1..=5 {
if sender.send(i).await.is_err() {
break;
}
}
}
}
#[async_trait]
impl ReduceHandler<u64, u64> for TaskProcessor {
async fn handle_reduce(&mut self, mut rx: StreamReceiver<u64>, _ctx: &mut ActorContext) -> u64 {
let mut sum = 0u64;
while let Some(n) = rx.recv().await {
sum += n;
}
sum
}
}
struct LoggingDropObserver {
drops: Arc<Mutex<Vec<String>>>,
}
impl DropObserver for LoggingDropObserver {
fn on_drop(&self, notice: DropNotice) {
let msg = format!(
"{} dropped by {}",
notice.message_type, notice.interceptor_name
);
self.drops.lock().unwrap().push(msg);
}
}
#[tokio::main]
async fn main() {
println!("=== dactor Showcase ===\n");
let mut runtime = TestRuntime::new();
runtime.enable_metrics();
let dead_letters = Arc::new(CollectingDeadLetterHandler::new());
runtime.set_dead_letter_handler(dead_letters.clone());
let drop_log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
runtime.set_drop_observer(Arc::new(LoggingDropObserver {
drops: drop_log.clone(),
}));
let mut workers: Vec<TestActorRef<TaskProcessor>> = Vec::new();
for i in 0..4 {
let mut opts = SpawnOptions::default();
opts.interceptors
.push(Box::new(CircuitBreakerInterceptor::new(
3, Duration::from_secs(60), Duration::from_secs(10), )));
let w = runtime.spawn_with_options::<TaskProcessor>(&format!("worker-{i}"), i, opts).await.unwrap();
workers.push(w);
}
let pool = PoolRef::new(workers.clone(), PoolRouting::RoundRobin);
println!("--- Tell: distributing tasks across pool ---");
for i in 0..8 {
pool.tell(ProcessTask(format!("task-{i}"))).unwrap();
}
tokio::time::sleep(Duration::from_millis(50)).await;
println!("\n--- Ask: query worker status ---");
let status = workers[0].ask(GetStatus, None).unwrap().await.unwrap();
println!(" {status}");
println!("\n--- Stream: server-streaming items ---");
let mut stream = workers[1].expand(StreamItems, 8, None, None).unwrap();
let mut items = Vec::new();
while let Some(n) = stream.next().await {
items.push(n);
}
println!(" Received {} items: {:?}", items.len(), items);
println!("\n--- Feed: client-streaming sum ---");
let input = futures::stream::iter(vec![10u64, 20, 30]);
let sum = workers[2]
.reduce::<u64, u64>(Box::pin(input), 8, None, None)
.unwrap()
.await
.unwrap();
println!(" Feed result (sum): {sum}");
println!("\n--- send_after: delayed task ---");
send_after::<TaskProcessor, _, _>(
&workers[3],
ProcessTask("delayed-task".into()),
Duration::from_millis(100),
);
tokio::time::sleep(Duration::from_millis(150)).await;
println!("\n--- cancel_after: cancellable ask ---");
let token = cancel_after(Duration::from_millis(200));
let result = workers[0].ask(GetStatus, Some(token)).unwrap().await;
println!(" Cancellable ask result: {:?}", result.unwrap());
println!("\n--- Dead letters: sending to stopped actor ---");
workers[3].stop();
tokio::time::sleep(Duration::from_millis(50)).await;
let _ = workers[3].tell(ProcessTask("after-stop".into()));
tokio::time::sleep(Duration::from_millis(50)).await;
println!("\n========================================");
println!(" SUMMARY");
println!("========================================");
let registry = runtime.metrics().unwrap();
let rt_metrics = registry.runtime_metrics();
println!("\n Metrics:");
println!(" Total messages: {}", rt_metrics.total_messages);
println!(" Total errors: {}", rt_metrics.total_errors);
println!(" Message rate: {:.1}/s", rt_metrics.message_rate);
println!(" Error rate: {:.1}/s", rt_metrics.error_rate);
println!(" Actor count: {}", rt_metrics.actor_count);
println!("\n Per-actor breakdown:");
for (actor_id, snap) in registry.all() {
println!(
" {:?}: {} msgs, {} errs, {:.1} msg/s",
actor_id, snap.message_count, snap.error_count, snap.message_rate
);
}
println!("\n Dead letters: {}", dead_letters.count());
for dl in dead_letters.events() {
println!(
" target={:?} msg={} reason={:?}",
dl.target_id, dl.message_type, dl.reason
);
}
println!("\n Circuit breaker: all Closed (0 errors < threshold 3)");
let drops = drop_log.lock().unwrap();
println!("\n Drops observed: {}", drops.len());
for d in drops.iter() {
println!(" {d}");
}
println!("\n=== Done ===");
}