use std::any::Any;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use dactor::actor::{Actor, ActorContext, ActorRef, Handler};
use dactor::interceptor::{
Disposition, InboundContext, InboundInterceptor, OutboundContext, OutboundInterceptor, Outcome,
};
use dactor::mailbox::MailboxConfig;
use dactor::message::{Headers, Message, Priority, RuntimeHeaders};
use dactor::{SpawnOptions, TestRuntime};
struct Greeter;
impl Actor for Greeter {
type Args = ();
type Deps = ();
fn create(_: (), _: ()) -> Self {
Greeter
}
}
struct SayHello(String);
impl Message for SayHello {
type Reply = String;
}
#[async_trait]
impl Handler<SayHello> for Greeter {
async fn handle(&mut self, msg: SayHello, ctx: &mut ActorContext) -> String {
let priority = ctx.headers.get::<Priority>().map(|p| p.0);
println!(
" [Greeter] handling '{}' (priority header: {:?})",
msg.0, priority
);
format!("Hello, {}!", msg.0)
}
}
struct LoggingInterceptor {
log: Arc<Mutex<Vec<String>>>,
}
impl InboundInterceptor for LoggingInterceptor {
fn name(&self) -> &'static str {
"logging"
}
fn on_receive(
&self,
ctx: &InboundContext<'_>,
_rh: &RuntimeHeaders,
_headers: &mut Headers,
_msg: &dyn Any,
) -> Disposition {
let entry = format!("RECV actor={} msg={}", ctx.actor_name, ctx.message_type);
println!(" [LoggingInterceptor] {}", entry);
self.log.lock().unwrap().push(entry);
Disposition::Continue
}
fn on_complete(
&self,
ctx: &InboundContext<'_>,
_rh: &RuntimeHeaders,
_headers: &Headers,
outcome: &Outcome<'_>,
) {
let entry = format!(
"DONE actor={} msg={} outcome={:?}",
ctx.actor_name, ctx.message_type, outcome
);
println!(" [LoggingInterceptor] {}", entry);
self.log.lock().unwrap().push(entry);
}
}
struct TimingInterceptor {
starts: Mutex<Vec<(&'static str, Instant)>>,
}
impl TimingInterceptor {
fn new() -> Self {
Self {
starts: Mutex::new(Vec::new()),
}
}
}
impl InboundInterceptor for TimingInterceptor {
fn name(&self) -> &'static str {
"timing"
}
fn on_receive(
&self,
ctx: &InboundContext<'_>,
_rh: &RuntimeHeaders,
_headers: &mut Headers,
_msg: &dyn Any,
) -> Disposition {
self.starts
.lock()
.unwrap()
.push((ctx.message_type, Instant::now()));
Disposition::Continue
}
fn on_complete(
&self,
ctx: &InboundContext<'_>,
_rh: &RuntimeHeaders,
_headers: &Headers,
_outcome: &Outcome<'_>,
) {
let starts = self.starts.lock().unwrap();
if let Some((_, t)) = starts.iter().rev().find(|(n, _)| *n == ctx.message_type) {
println!(
" [TimingInterceptor] {} took {:?}",
ctx.message_type,
t.elapsed()
);
}
}
}
struct HeaderStampInterceptor;
impl OutboundInterceptor for HeaderStampInterceptor {
fn name(&self) -> &'static str {
"header-stamp"
}
fn on_send(
&self,
ctx: &OutboundContext<'_>,
_rh: &RuntimeHeaders,
headers: &mut Headers,
_msg: &dyn Any,
) -> Disposition {
headers.insert(Priority::HIGH);
println!(
" [HeaderStampInterceptor] stamped Priority::HIGH on {} → {}",
ctx.message_type, ctx.target_name
);
Disposition::Continue
}
}
#[tokio::main]
async fn main() {
println!("=== Interceptors Example ===\n");
let log = Arc::new(Mutex::new(Vec::<String>::new()));
let mut runtime = TestRuntime::new();
runtime.add_outbound_interceptor(Box::new(HeaderStampInterceptor));
let greeter = runtime.spawn_with_options::<Greeter>(
"greeter",
(),
SpawnOptions {
interceptors: vec![
Box::new(LoggingInterceptor { log: log.clone() }),
Box::new(TimingInterceptor::new()),
],
mailbox: MailboxConfig::Unbounded,
},
).await.unwrap();
println!("--- Sending ask(SayHello) ---");
let reply = greeter
.ask(SayHello("World".into()), None)
.unwrap()
.await
.unwrap();
println!(" [Main] reply: {}\n", reply);
println!("--- Sending ask(SayHello) ---");
let reply = greeter
.ask(SayHello("Rust".into()), None)
.unwrap()
.await
.unwrap();
println!(" [Main] reply: {}\n", reply);
println!("--- Interceptor log ---");
for entry in log.lock().unwrap().iter() {
println!(" {}", entry);
}
tokio::time::sleep(Duration::from_millis(50)).await;
println!("\n=== Done ===");
}