use apcore::cancel::CancelToken;
use apcore::context::{Context, Identity};
use apcore::errors::ModuleError;
use apcore::module::Module;
use async_trait::async_trait;
use serde_json::{json, Value};
use std::collections::HashMap;
struct SlowModule;
#[async_trait]
impl Module for SlowModule {
fn input_schema(&self) -> Value {
json!({ "type": "object", "properties": { "steps": { "type": "integer" } } })
}
fn output_schema(&self) -> Value {
json!({ "type": "object", "properties": { "completed_steps": { "type": "integer" } } })
}
fn description(&self) -> &'static str {
"A slow module that checks for cancellation between steps"
}
async fn execute(&self, input: Value, ctx: &Context<Value>) -> Result<Value, ModuleError> {
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
let steps = input["steps"].as_i64().unwrap_or(5) as usize;
for i in 0..steps {
if let Some(token) = &ctx.cancel_token {
if token.is_cancelled() {
println!(" [SlowModule] Cancelled at step {i}");
return Err(ModuleError::new(
apcore::errors::ErrorCode::ExecutionCancelled,
format!("Execution cancelled after {i} steps"),
));
}
}
println!(" [SlowModule] Executing step {i}...");
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
Ok(json!({ "completed_steps": steps }))
}
}
#[tokio::main]
async fn main() {
let identity = Identity::new(
"user-1".to_string(),
"user".to_string(),
vec![],
HashMap::new(),
);
println!("=== Run 1: normal execution ===");
let token = CancelToken::new();
let ctx: Context<Value> = Context::create(
Some(identity.clone()),
None,
Some(token),
None,
Value::Null,
None,
);
let module = SlowModule;
let result = module.execute(json!({"steps": 3}), &ctx).await.unwrap();
println!("Result: {result}\n");
println!("=== Run 2: cancelled mid-flight ===");
let token2 = CancelToken::new();
let ctx2: Context<Value> = Context::create(
Some(identity.clone()),
None,
Some(token2.clone()),
None,
Value::Null,
None,
);
let token2_clone = token2.clone();
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(80)).await;
println!(" [main] Sending cancel signal…");
token2_clone.cancel();
});
match module.execute(json!({"steps": 10}), &ctx2).await {
Ok(r) => println!("Result: {r}"),
Err(e) => println!("Error (expected): {e}"),
}
println!("\n=== CancelToken state demo ===");
let t = CancelToken::new();
println!("Before cancel: is_cancelled = {}", t.is_cancelled()); t.cancel();
println!("After cancel: is_cancelled = {}", t.is_cancelled());
let t2 = t.clone();
println!("Cloned token: is_cancelled = {}", t2.is_cancelled()); }