#![allow(clippy::pedantic)]
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use apcore::async_task::{AsyncTaskManager, RetryConfig, TaskStatus};
use apcore::config::Config;
use apcore::context::Context;
use apcore::errors::{ErrorCode, ModuleError};
use apcore::module::Module;
use apcore::registry::registry::Registry;
use apcore::Executor;
use async_trait::async_trait;
use serde_json::{json, Value};
fn find_fixtures_root() -> PathBuf {
if let Ok(spec_repo) = std::env::var("APCORE_SPEC_REPO") {
let p = PathBuf::from(&spec_repo)
.join("conformance")
.join("fixtures");
if p.is_dir() {
return p;
}
panic!("APCORE_SPEC_REPO={spec_repo} does not contain conformance/fixtures/");
}
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let sibling = manifest_dir
.parent()
.unwrap()
.join("apcore")
.join("conformance")
.join("fixtures");
if sibling.is_dir() {
return sibling;
}
panic!(
"Cannot find apcore conformance fixtures.\n\
Set APCORE_SPEC_REPO or clone apcore as a sibling of {}",
manifest_dir.parent().unwrap().display()
);
}
fn load_fixture() -> Value {
let path = find_fixtures_root().join("async_task_cancellation.json");
let content = std::fs::read_to_string(&path)
.unwrap_or_else(|_| panic!("Failed to read fixture: {}", path.display()));
serde_json::from_str(&content).unwrap_or_else(|e| panic!("Invalid JSON: {e}"))
}
fn fixture_case<'a>(fixture: &'a Value, id: &str) -> &'a Value {
fixture["test_cases"]
.as_array()
.expect("test_cases must be an array")
.iter()
.find(|c| c["id"].as_str() == Some(id))
.unwrap_or_else(|| panic!("fixture missing test case {id}"))
}
struct LongRunningModule;
#[async_trait]
impl Module for LongRunningModule {
fn input_schema(&self) -> Value {
json!({"type": "object"})
}
fn output_schema(&self) -> Value {
json!({"type": "object"})
}
fn description(&self) -> &'static str {
"long-running module that occupies a task slot"
}
async fn execute(&self, _inputs: Value, _ctx: &Context<Value>) -> Result<Value, ModuleError> {
tokio::time::sleep(Duration::from_secs(30)).await;
Ok(json!({"done": true}))
}
}
struct AlwaysFailingModule {
attempts: Arc<AtomicUsize>,
}
#[async_trait]
impl Module for AlwaysFailingModule {
fn input_schema(&self) -> Value {
json!({"type": "object"})
}
fn output_schema(&self) -> Value {
json!({"type": "object"})
}
fn description(&self) -> &'static str {
"module that always errors, with an observable attempt counter"
}
async fn execute(&self, _inputs: Value, _ctx: &Context<Value>) -> Result<Value, ModuleError> {
self.attempts.fetch_add(1, Ordering::SeqCst);
Err(ModuleError::new(
ErrorCode::GeneralInternalError,
"conformance: always-failing module",
))
}
}
#[tokio::test]
async fn submit_over_capacity_raises_task_limit_exceeded() {
let fixture = load_fixture();
let tc = fixture_case(&fixture, "submit_over_capacity_raises_task_limit_exceeded");
let max_tasks = tc["max_tasks"].as_u64().expect("max_tasks") as usize;
let max_concurrent = tc["max_concurrent"].as_u64().expect("max_concurrent") as usize;
let submit_count = tc["submit_count"].as_u64().expect("submit_count");
let expected_error = tc["expected_error"].as_str().expect("expected_error");
assert_eq!(
expected_error, "TASK_LIMIT_EXCEEDED",
"fixture contract drift: expected_error must be TASK_LIMIT_EXCEEDED"
);
let registry = Arc::new(Registry::new());
registry
.register_module("test.long_running", Box::new(LongRunningModule))
.unwrap();
let executor = Arc::new(Executor::new(registry, Arc::new(Config::default())));
let mgr = AsyncTaskManager::new(executor, max_concurrent, max_tasks);
let first = mgr.submit("test.long_running", json!({}), None).await;
assert!(
first.is_ok(),
"first submit must succeed; got {:?}",
first.err()
);
let mut last_err: Option<ModuleError> = None;
for _ in 1..submit_count {
match mgr.submit("test.long_running", json!({}), None).await {
Ok(_) => {
panic!("submit beyond max_tasks={max_tasks} must be rejected, but it succeeded")
}
Err(e) => last_err = Some(e),
}
}
let err = last_err.expect("over-capacity submit must produce an error");
assert_eq!(
err.code,
ErrorCode::TaskLimitExceeded,
"over-capacity submit must raise the typed TASK_LIMIT_EXCEEDED error, got {:?}",
err.code
);
mgr.shutdown().await;
}
#[tokio::test]
async fn cancel_during_backoff_stops_further_retries() {
let fixture = load_fixture();
let tc = fixture_case(&fixture, "cancel_during_backoff_stops_further_retries");
let max_retries = tc["max_retries"].as_u64().expect("max_retries") as u32;
let retry_delay_ms = tc["retry_delay_ms"].as_u64().expect("retry_delay_ms");
let backoff_multiplier = tc["backoff_multiplier"]
.as_f64()
.expect("backoff_multiplier");
let expected_final_status = tc["expected_final_status"]
.as_str()
.expect("expected_final_status");
assert_eq!(
expected_final_status, "cancelled",
"fixture contract drift: expected_final_status must be cancelled"
);
let attempts = Arc::new(AtomicUsize::new(0));
let registry = Arc::new(Registry::new());
registry
.register_module(
"test.always_fail",
Box::new(AlwaysFailingModule {
attempts: Arc::clone(&attempts),
}),
)
.unwrap();
let executor = Arc::new(Executor::new(registry, Arc::new(Config::default())));
let mgr = AsyncTaskManager::new(executor, 4, 4);
let mut retry = RetryConfig::default();
retry.max_retries = max_retries;
retry.retry_delay_ms = retry_delay_ms;
retry.backoff_multiplier = backoff_multiplier;
let task_id = mgr
.submit_with_retry("test.always_fail", json!({}), None, Some(retry))
.await
.expect("submit_with_retry must succeed");
let deadline = tokio::time::Instant::now() + Duration::from_secs(10);
loop {
if attempts.load(Ordering::SeqCst) >= 1 {
break;
}
assert!(
tokio::time::Instant::now() < deadline,
"module never executed its first attempt"
);
tokio::time::sleep(Duration::from_millis(5)).await;
}
let attempts_at_cancel = attempts.load(Ordering::SeqCst);
let cancelled = mgr.cancel(&task_id).await;
assert!(cancelled, "cancel() must report success for an active task");
tokio::time::sleep(Duration::from_millis(200)).await;
let attempts_after = attempts.load(Ordering::SeqCst);
assert_eq!(
attempts_after, attempts_at_cancel,
"cancelling during backoff MUST stop further retry attempts: \
attempts went from {attempts_at_cancel} to {attempts_after}"
);
let info = mgr
.get_status(&task_id)
.expect("task status must be retrievable");
assert_eq!(
info.status,
TaskStatus::Cancelled,
"cancelled task must end in CANCELLED, got {:?}",
info.status
);
}