use std::any::Any;
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use dactor::actor::{Actor, ActorContext, ActorRef, Handler};
use dactor::interceptor::{
Disposition, DropNotice, DropObserver, OutboundContext, OutboundInterceptor,
};
use dactor::message::{Headers, Message, RuntimeHeaders};
use dactor::TestRuntime;
struct ProcessItem(String);
impl Message for ProcessItem {
type Reply = ();
}
struct Processor {
processed: Vec<String>,
}
impl Actor for Processor {
type Args = ();
type Deps = ();
fn create(_: (), _: ()) -> Self {
Processor {
processed: Vec::new(),
}
}
}
#[async_trait]
impl Handler<ProcessItem> for Processor {
async fn handle(&mut self, msg: ProcessItem, _ctx: &mut ActorContext) {
println!(" [Processor] handled: {}", msg.0);
self.processed.push(msg.0);
}
}
struct CollectingObserver {
notices: Arc<Mutex<Vec<String>>>,
}
impl DropObserver for CollectingObserver {
fn on_drop(&self, notice: DropNotice) {
let entry = format!(
"DROPPED target={} msg={} by={} ctx={}",
notice.target_name, notice.message_type, notice.interceptor_name, notice.context
);
println!(" [DropObserver] {}", entry);
self.notices.lock().unwrap().push(entry);
}
}
struct SpamFilter;
impl OutboundInterceptor for SpamFilter {
fn name(&self) -> &'static str {
"spam-filter"
}
fn on_send(
&self,
_ctx: &OutboundContext<'_>,
_rh: &RuntimeHeaders,
_headers: &mut Headers,
msg: &dyn Any,
) -> Disposition {
if let Some(item) = msg.downcast_ref::<ProcessItem>() {
if item.0.contains("spam") {
println!(" [SpamFilter] dropping: {}", item.0);
return Disposition::Drop;
}
}
Disposition::Continue
}
}
#[tokio::main]
async fn main() {
println!("=== Dead Letters / Drop Observer Example ===\n");
let drop_log: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
let mut runtime = TestRuntime::new();
runtime.set_drop_observer(Arc::new(CollectingObserver {
notices: drop_log.clone(),
}));
runtime.add_outbound_interceptor(Box::new(SpamFilter));
let processor = runtime.spawn::<Processor>("processor", ()).await.unwrap();
println!("--- Sending messages ---");
let items = vec![
"order-001",
"spam-promo",
"order-002",
"spam-newsletter",
"order-003",
];
for item in &items {
processor.tell(ProcessItem(item.to_string())).unwrap();
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
println!("\n--- Drop notices ---");
let notices = drop_log.lock().unwrap();
for notice in notices.iter() {
println!(" {}", notice);
}
assert_eq!(notices.len(), 2, "expected 2 dropped messages");
println!("\n ✓ 2 spam messages dropped, 3 valid messages processed");
println!("\n=== Done ===");
}