use std::time::Duration;
use async_trait::async_trait;
use dactor::actor::{cancel_after, Actor, ActorContext, ActorRef, Handler, ExpandHandler};
use dactor::message::Message;
use dactor::stream::StreamSender;
use dactor::TestRuntime;
use tokio_stream::StreamExt;
struct GetNumbers;
impl Message for GetNumbers {
type Reply = u32;
}
struct Ping;
impl Message for Ping {
type Reply = String;
}
struct SlowProducer;
impl Actor for SlowProducer {
type Args = ();
type Deps = ();
fn create(_: (), _: ()) -> Self {
SlowProducer
}
}
#[async_trait]
impl ExpandHandler<GetNumbers, u32> for SlowProducer {
async fn handle_expand(
&mut self,
_msg: GetNumbers,
sender: StreamSender<u32>,
_ctx: &mut ActorContext,
) {
for i in 1..=10 {
tokio::time::sleep(Duration::from_millis(50)).await;
if sender.send(i).await.is_err() {
println!(" [Producer] consumer disconnected at item {}", i);
break;
}
println!(" [Producer] sent item {}", i);
}
}
}
struct CancellableWorker;
impl Actor for CancellableWorker {
type Args = ();
type Deps = ();
fn create(_: (), _: ()) -> Self {
CancellableWorker
}
}
#[async_trait]
impl Handler<Ping> for CancellableWorker {
async fn handle(&mut self, _msg: Ping, ctx: &mut ActorContext) -> String {
tokio::select! {
_ = ctx.cancelled() => {
println!(" [Worker] cancelled!");
"cancelled".to_string()
}
_ = tokio::time::sleep(Duration::from_secs(5)) => {
"completed".to_string()
}
}
}
}
#[tokio::main]
async fn main() {
println!("=== Cancellation Example ===\n");
let runtime = TestRuntime::new();
println!("--- Stream with cancel_after(150ms) ---");
let producer = runtime.spawn::<SlowProducer>("producer", ()).await.unwrap();
let token = cancel_after(Duration::from_millis(150));
let mut stream = producer.expand(GetNumbers, 16, None, Some(token)).unwrap();
let mut received = Vec::new();
while let Some(n) = stream.next().await {
received.push(n);
println!(" [Client] received: {}", n);
}
println!(
" [Client] stream ended — received {} items",
received.len()
);
assert!(
received.len() < 10,
"expected early termination but got all 10 items"
);
println!(" ✓ stream terminated early (< 10 items)\n");
println!("--- Handler with ctx.cancelled() ---");
let worker = runtime.spawn::<CancellableWorker>("worker", ()).await.unwrap();
let token = cancel_after(Duration::from_millis(50));
let result = worker.ask(Ping, Some(token)).unwrap().await.unwrap();
println!(" [Client] handler returned: {}", result);
assert_eq!(result, "cancelled");
println!(" ✓ handler detected cancellation\n");
println!("=== Done ===");
}