#![allow(clippy::missing_panics_doc)]
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use parking_lot::Mutex;
use serde_json::Value;
use apcore::async_task::{
AsyncTaskManager, InMemoryTaskStore, ReaperConfig, RetryConfig, TaskInfo, TaskStatus, TaskStore,
};
use apcore::config::Config;
use apcore::context::{Context, Identity};
use apcore::errors::ModuleError;
use apcore::module::Module;
use apcore::registry::registry::Registry;
use apcore::Executor;
fn find_fixtures_root() -> PathBuf {
if let Ok(spec_repo) = std::env::var("APCORE_SPEC_REPO") {
let p = PathBuf::from(&spec_repo)
.join("conformance")
.join("fixtures");
if p.is_dir() {
return p;
}
panic!("APCORE_SPEC_REPO={spec_repo} does not contain conformance/fixtures/");
}
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let sibling = manifest_dir
.parent()
.unwrap()
.join("apcore")
.join("conformance")
.join("fixtures");
if sibling.is_dir() {
return sibling;
}
panic!(
"Cannot find apcore conformance fixtures.\n\
Set APCORE_SPEC_REPO or clone apcore as a sibling."
);
}
fn load_fixture() -> Value {
let path = find_fixtures_root().join("async_task_evolution.json");
let content = std::fs::read_to_string(&path)
.unwrap_or_else(|_| panic!("Failed to read fixture: {}", path.display()));
serde_json::from_str(&content).unwrap_or_else(|e| panic!("Invalid JSON: {e}"))
}
fn fixture_case<'a>(fixture: &'a Value, id: &str) -> &'a Value {
fixture["test_cases"]
.as_array()
.expect("test_cases must be an array")
.iter()
.find(|c| c["id"].as_str() == Some(id))
.unwrap_or_else(|| panic!("fixture case '{id}' not present"))
}
fn parse_task_info(value: &Value) -> TaskInfo {
serde_json::from_value(value.clone()).expect("fixture task_info must deserialize as TaskInfo")
}
struct RedisTaskStore {
inner: InMemoryTaskStore,
}
impl RedisTaskStore {
fn new() -> Self {
Self {
inner: InMemoryTaskStore::new(),
}
}
}
#[async_trait]
impl TaskStore for RedisTaskStore {
async fn save(&self, task: &TaskInfo) -> Result<(), ModuleError> {
self.inner.save(task).await
}
async fn get(&self, id: &str) -> Result<Option<TaskInfo>, ModuleError> {
self.inner.get(id).await
}
async fn list(&self, status: Option<TaskStatus>) -> Result<Vec<TaskInfo>, ModuleError> {
self.inner.list(status).await
}
async fn delete(&self, id: &str) -> Result<(), ModuleError> {
self.inner.delete(id).await
}
async fn list_expired(&self, before: f64) -> Result<Vec<TaskInfo>, ModuleError> {
self.inner.list_expired(before).await
}
fn store_type_name(&self) -> &'static str {
"RedisTaskStore"
}
}
struct AlwaysFailModule {
message: String,
calls: Arc<Mutex<u32>>,
}
#[async_trait]
impl Module for AlwaysFailModule {
fn input_schema(&self) -> Value {
serde_json::json!({"type": "object"})
}
fn output_schema(&self) -> Value {
serde_json::json!({"type": "object"})
}
fn description(&self) -> &'static str {
"Always fails (test stub)"
}
async fn execute(&self, _inputs: Value, _ctx: &Context<Value>) -> Result<Value, ModuleError> {
*self.calls.lock() += 1;
Err(ModuleError::new(
apcore::errors::ErrorCode::GeneralInternalError,
self.message.clone(),
))
}
}
fn make_executor_with_failing_module(
module_id: &str,
message: &str,
) -> (Arc<Executor>, Arc<Mutex<u32>>) {
let calls = Arc::new(Mutex::new(0u32));
let registry = Arc::new(Registry::default());
registry
.register_module(
module_id,
Box::new(AlwaysFailModule {
message: message.to_string(),
calls: Arc::clone(&calls),
}),
)
.expect("register failing module");
let config = Arc::new(Config::default());
(Arc::new(Executor::new(registry, config)), calls)
}
fn make_bare_executor() -> Arc<Executor> {
let registry = Arc::new(Registry::default());
let config = Arc::new(Config::default());
Arc::new(Executor::new(registry, config))
}
fn make_ctx() -> Context<Value> {
Context::new(Identity::new(
"test".into(),
"Test".into(),
vec![],
std::collections::HashMap::new(),
))
}
#[tokio::test]
async fn case_in_memory_store_default() {
let fixture = load_fixture();
let case = fixture_case(&fixture, "in_memory_store_default");
let expected_type = case["expected"]["store_type"].as_str().unwrap();
let mgr = AsyncTaskManager::new(make_bare_executor(), 4, 100);
assert_eq!(mgr.store_type_name(), expected_type);
}
#[tokio::test]
async fn case_custom_store_injected() {
let fixture = load_fixture();
let case = fixture_case(&fixture, "custom_store_injected");
let expected_type = case["expected"]["store_type"].as_str().unwrap();
let store: Arc<dyn TaskStore> = Arc::new(RedisTaskStore::new());
let mgr = AsyncTaskManager::with_store(make_bare_executor(), 4, 100, store);
assert_eq!(mgr.store_type_name(), expected_type);
}
#[tokio::test]
async fn case_task_store_save_and_get() {
let fixture = load_fixture();
let case = fixture_case(&fixture, "task_store_save_and_get");
let task = parse_task_info(&case["task_info"]);
let lookup_id = case["lookup_id"].as_str().unwrap();
let store = InMemoryTaskStore::new();
store.save(&task).await.unwrap();
let got = store
.get(lookup_id)
.await
.unwrap()
.expect("task must be found");
let expected = &case["expected"];
assert_eq!(got.task_id, expected["task_id"].as_str().unwrap());
assert_eq!(
format!("{:?}", got.status).to_lowercase(),
expected["status"].as_str().unwrap()
);
assert_eq!(got.result.as_ref().unwrap(), &expected["result"]);
}
#[tokio::test]
async fn case_task_store_list_by_status() {
let fixture = load_fixture();
let case = fixture_case(&fixture, "task_store_list_by_status");
let store = InMemoryTaskStore::new();
for raw in case["stored_tasks"].as_array().unwrap() {
store.save(&parse_task_info(raw)).await.unwrap();
}
let status_filter: TaskStatus = serde_json::from_value(case["status_filter"].clone())
.expect("status_filter must deserialize");
let listed = store.list(Some(status_filter)).await.unwrap();
let expected = &case["expected"];
let expected_count =
usize::try_from(expected["count"].as_u64().unwrap()).expect("count fits in usize");
assert_eq!(listed.len(), expected_count);
let listed_ids: Vec<String> = listed.iter().map(|t| t.task_id.clone()).collect();
let expected_ids: Vec<String> = expected["task_ids"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect();
assert_eq!(listed_ids, expected_ids);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn case_retry_scheduled_on_failure() {
let fixture = load_fixture();
let case = fixture_case(&fixture, "retry_scheduled_on_failure");
let cfg = &case["retry_config"];
let retry = RetryConfig {
max_retries: u32::try_from(cfg["max_retries"].as_u64().unwrap()).unwrap(),
retry_delay_ms: cfg["retry_delay_ms"].as_u64().unwrap(),
backoff_multiplier: cfg["backoff_multiplier"].as_f64().unwrap(),
max_retry_delay_ms: cfg["max_retry_delay_ms"].as_u64().unwrap(),
};
let (executor, _calls) =
make_executor_with_failing_module("worker.flaky_job", "connection_error");
let mgr = AsyncTaskManager::new(executor, 4, 100);
let task_id = mgr
.submit_with_retry(
"worker.flaky_job",
serde_json::json!({}),
Some(make_ctx()),
Some(retry),
)
.await
.expect("submit");
let deadline = std::time::Instant::now() + Duration::from_secs(2);
loop {
let info = mgr.get_status(&task_id).expect("task present");
if info.retry_count >= 1 {
assert_eq!(
info.status,
TaskStatus::Pending,
"after first failure, status MUST be Pending awaiting retry"
);
assert_eq!(info.retry_count, 1);
assert_eq!(
retry.compute_delay_ms(0),
case["expected"]["next_retry_delay_ms"].as_u64().unwrap()
);
return;
}
assert!(
std::time::Instant::now() < deadline,
"retry was never scheduled within 2s; status={:?}",
info.status
);
tokio::time::sleep(Duration::from_millis(20)).await;
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn case_max_retries_exhausted_becomes_failed() {
let fixture = load_fixture();
let case = fixture_case(&fixture, "max_retries_exhausted_becomes_failed");
let cfg = &case["retry_config"];
let retry = RetryConfig {
max_retries: u32::try_from(cfg["max_retries"].as_u64().unwrap()).unwrap(),
retry_delay_ms: cfg["retry_delay_ms"].as_u64().unwrap(),
backoff_multiplier: cfg["backoff_multiplier"].as_f64().unwrap(),
max_retry_delay_ms: cfg["max_retry_delay_ms"].as_u64().unwrap(),
};
let (executor, calls) =
make_executor_with_failing_module("worker.always_fails", "persistent_error");
let mgr = AsyncTaskManager::new(executor, 4, 100);
let task_id = mgr
.submit_with_retry(
"worker.always_fails",
serde_json::json!({}),
Some(make_ctx()),
Some(retry),
)
.await
.expect("submit");
let deadline = std::time::Instant::now() + Duration::from_secs(3);
loop {
let info = mgr.get_status(&task_id).expect("task present");
if info.status == TaskStatus::Failed {
let expected = &case["expected"];
assert_eq!(
format!("{:?}", info.status).to_lowercase(),
expected["final_status"].as_str().unwrap()
);
assert_eq!(
info.retry_count,
u32::try_from(expected["retry_count"].as_u64().unwrap()).unwrap()
);
assert_eq!(
info.error.is_some(),
expected["error_populated"].as_bool().unwrap()
);
assert_eq!(*calls.lock(), retry.max_retries + 1);
return;
}
assert!(
std::time::Instant::now() < deadline,
"task never reached Failed; current status={:?}",
info.status
);
tokio::time::sleep(Duration::from_millis(20)).await;
}
}
#[tokio::test]
async fn case_backoff_multiplier_applied() {
let fixture = load_fixture();
let case = fixture_case(&fixture, "backoff_multiplier_applied");
let cfg = &case["retry_config"];
let retry = RetryConfig {
max_retries: 100, retry_delay_ms: cfg["retry_delay_ms"].as_u64().unwrap(),
backoff_multiplier: cfg["backoff_multiplier"].as_f64().unwrap(),
max_retry_delay_ms: cfg["max_retry_delay_ms"].as_u64().unwrap(),
};
let exp = &case["expected"];
assert_eq!(
retry.compute_delay_ms(0),
exp["attempt_0_delay_ms"].as_u64().unwrap()
);
assert_eq!(
retry.compute_delay_ms(1),
exp["attempt_1_delay_ms"].as_u64().unwrap()
);
assert_eq!(
retry.compute_delay_ms(2),
exp["attempt_2_delay_ms"].as_u64().unwrap()
);
assert_eq!(
retry.compute_delay_ms(3),
exp["attempt_3_delay_ms"].as_u64().unwrap()
);
assert_eq!(
retry.compute_delay_ms(4),
exp["attempt_4_delay_ms"].as_u64().unwrap()
);
assert_eq!(
retry.compute_delay_ms(5),
exp["attempt_5_delay_ms"].as_u64().unwrap()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn case_reaper_disabled_by_default() {
let fixture = load_fixture();
let case = fixture_case(&fixture, "reaper_disabled_by_default");
let store = Arc::new(InMemoryTaskStore::new());
for raw in case["stored_expired_tasks"].as_array().unwrap() {
store.save(&parse_task_info(raw)).await.unwrap();
}
let mgr = AsyncTaskManager::with_store(
make_bare_executor(),
4,
100,
store.clone() as Arc<dyn TaskStore>,
);
let _ = &mgr;
tokio::time::sleep(Duration::from_millis(100)).await;
let remaining = store.list(None).await.unwrap();
assert_eq!(remaining.len(), 1, "no reaper means no automatic cleanup");
assert_eq!(remaining[0].task_id, "old-task-001");
}
async fn reap_once(store: &dyn TaskStore, now: f64, ttl_seconds: f64) -> Vec<String> {
let before = now - ttl_seconds;
let expired = store.list_expired(before).await.unwrap();
let mut deleted = Vec::new();
for info in expired {
if !matches!(
info.status,
TaskStatus::Completed | TaskStatus::Failed | TaskStatus::Cancelled
) {
continue;
}
store.delete(&info.task_id).await.unwrap();
deleted.push(info.task_id);
}
deleted.sort();
deleted
}
#[tokio::test]
async fn case_reaper_deletes_expired_tasks() {
let fixture = load_fixture();
let case = fixture_case(&fixture, "reaper_deletes_expired_tasks");
let store = Arc::new(InMemoryTaskStore::new());
let mut fresh_completed_at = 0.0_f64;
for raw in case["stored_tasks"].as_array().unwrap() {
let info = parse_task_info(raw);
if info.task_id == "fresh-task-001" {
fresh_completed_at = info.completed_at.unwrap();
}
store.save(&info).await.unwrap();
}
let ttl = case["config"]["reaper"]["ttl_seconds"].as_f64().unwrap();
let now = fresh_completed_at + ttl - 1.0;
let deleted = reap_once(&*store, now, ttl).await;
let mut expected: Vec<String> = case["expected"]["deleted_task_ids"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect();
expected.sort();
assert_eq!(deleted, expected);
let mut remaining: Vec<String> = store
.list(None)
.await
.unwrap()
.into_iter()
.map(|t| t.task_id)
.collect();
remaining.sort();
let mut expected_remaining: Vec<String> = case["expected"]["remaining_task_ids"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect();
expected_remaining.sort();
assert_eq!(remaining, expected_remaining);
}
#[tokio::test]
async fn case_reaper_skips_running_tasks() {
let fixture = load_fixture();
let case = fixture_case(&fixture, "reaper_skips_running_tasks");
let store = Arc::new(InMemoryTaskStore::new());
for raw in case["stored_tasks"].as_array().unwrap() {
store.save(&parse_task_info(raw)).await.unwrap();
}
let ttl = case["config"]["reaper"]["ttl_seconds"].as_f64().unwrap();
let now = case["now_timestamp"].as_f64().unwrap();
let deleted = reap_once(&*store, now, ttl).await;
assert!(
deleted.is_empty(),
"reaper MUST NOT delete pending or running tasks even past TTL"
);
let mut remaining: Vec<String> = store
.list(None)
.await
.unwrap()
.into_iter()
.map(|t| t.task_id)
.collect();
remaining.sort();
let mut expected_remaining: Vec<String> = case["expected"]["remaining_task_ids"]
.as_array()
.unwrap()
.iter()
.map(|v| v.as_str().unwrap().to_string())
.collect();
expected_remaining.sort();
assert_eq!(remaining, expected_remaining);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reaper_handle_starts_and_stops_gracefully() {
let store = Arc::new(InMemoryTaskStore::new());
store
.save(&TaskInfo {
task_id: "stale".into(),
module_id: "m".into(),
status: TaskStatus::Completed,
submitted_at: 0.0,
started_at: Some(0.0),
completed_at: Some(0.0),
result: None,
error: None,
retry_count: 0,
max_retries: 0,
})
.await
.unwrap();
let mgr = AsyncTaskManager::with_store(
make_bare_executor(),
4,
100,
store.clone() as Arc<dyn TaskStore>,
);
let handle = mgr.start_reaper(ReaperConfig {
ttl_seconds: 1.0,
sweep_interval_ms: 50,
});
tokio::time::sleep(Duration::from_millis(200)).await;
let remaining = store.list(None).await.unwrap();
assert!(
remaining.iter().all(|t| t.task_id != "stale"),
"reaper should have removed the expired task"
);
handle.stop().await;
}