use apcore::async_task::{AsyncTaskManager, TaskStatus};
use apcore::config::Config;
use apcore::context::{Context, Identity};
use apcore::errors::ModuleError;
use apcore::module::Module;
use apcore::registry::registry::Registry;
use apcore::Executor;
use async_trait::async_trait;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;
fn make_executor() -> Arc<Executor> {
let registry = Arc::new(Registry::default());
let config = Arc::new(Config::default());
Arc::new(Executor::new(registry, config))
}
struct EchoModule;
#[async_trait]
impl Module for EchoModule {
fn input_schema(&self) -> Value {
json!({ "type": "object" })
}
fn output_schema(&self) -> Value {
json!({ "type": "object" })
}
fn description(&self) -> &'static str {
"Echo input"
}
async fn execute(&self, inputs: Value, _ctx: &Context<Value>) -> Result<Value, ModuleError> {
Ok(inputs)
}
}
fn make_executor_with_echo(module_id: &str) -> Arc<Executor> {
let registry = Arc::new(Registry::default());
registry
.register_module(module_id, Box::new(EchoModule))
.expect("register EchoModule");
let config = Arc::new(Config::default());
Arc::new(Executor::new(registry, config))
}
fn _make_ctx() -> Context<Value> {
Context::new(Identity::new(
"test".to_string(),
"Test".to_string(),
vec![],
HashMap::new(),
))
}
#[tokio::test]
async fn submit_returns_non_empty_task_id() {
let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
let task_id = mgr
.submit("any.module", json!({}), None)
.expect("submit should succeed");
assert!(!task_id.is_empty(), "task_id must be a non-empty string");
}
#[tokio::test]
async fn submit_increments_task_count() {
let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
assert_eq!(mgr.task_count(), 0);
let _ = mgr.submit("m", json!({}), None).unwrap();
assert_eq!(mgr.task_count(), 1);
let _ = mgr.submit("m", json!({}), None).unwrap();
assert_eq!(mgr.task_count(), 2);
}
#[tokio::test]
async fn submit_task_ids_are_unique() {
let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
let id1 = mgr.submit("m", json!({}), None).unwrap();
let id2 = mgr.submit("m", json!({}), None).unwrap();
assert_ne!(id1, id2, "each submitted task must receive a unique id");
}
#[tokio::test]
async fn get_status_returns_some_after_submit() {
let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
let task_id = mgr.submit("m", json!({}), None).unwrap();
assert!(
mgr.get_status(&task_id).is_some(),
"get_status should return Some right after submit"
);
}
#[tokio::test]
async fn get_status_returns_none_for_unknown_id() {
let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
assert!(mgr.get_status("no-such-task").is_none());
}
#[tokio::test]
async fn task_info_contains_correct_module_id() {
let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
let task_id = mgr.submit("echo.module", json!({}), None).unwrap();
let info = mgr.get_status(&task_id).unwrap();
assert_eq!(info.module_id, "echo.module");
}
#[tokio::test]
async fn task_info_submitted_at_is_set() {
let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
let task_id = mgr.submit("m", json!({}), None).unwrap();
let info = mgr.get_status(&task_id).unwrap();
assert!(
info.submitted_at > 0.0,
"submitted_at must be a positive UNIX timestamp"
);
}
#[tokio::test]
async fn completed_task_has_completed_status() {
let exec = make_executor_with_echo("echo.v1");
let mgr = AsyncTaskManager::new(exec, 4, 100);
let task_id = mgr.submit("echo.v1", json!({"x": 1}), None).unwrap();
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(1);
loop {
let status = mgr.get_status(&task_id).unwrap().status;
if status == TaskStatus::Completed || status == TaskStatus::Failed {
break;
}
assert!(
std::time::Instant::now() <= deadline,
"task did not reach a terminal state within 1 second"
);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
let info = mgr.get_status(&task_id).unwrap();
assert_eq!(
info.status,
TaskStatus::Completed,
"task for a registered module should complete successfully"
);
assert!(
info.completed_at.is_some(),
"completed_at must be set after completion"
);
assert!(
info.started_at.is_some(),
"started_at must be set once execution began"
);
}
#[tokio::test]
async fn failed_task_has_failed_status_and_error_message() {
let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
let task_id = mgr.submit("nonexistent.module", json!({}), None).unwrap();
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(1);
loop {
let status = mgr.get_status(&task_id).unwrap().status;
if matches!(status, TaskStatus::Failed | TaskStatus::Completed) {
break;
}
assert!(
std::time::Instant::now() <= deadline,
"task did not reach a terminal state within 1 second"
);
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
let info = mgr.get_status(&task_id).unwrap();
assert_eq!(info.status, TaskStatus::Failed);
assert!(
info.error.is_some(),
"failed task must have an error message"
);
assert!(
!info.error.as_ref().unwrap().is_empty(),
"error message must not be empty"
);
}
#[tokio::test]
async fn cancel_pending_task_returns_true() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let task_id = mgr.submit("m", json!({}), None).unwrap();
let result = mgr.cancel(&task_id);
assert!(result, "cancel should return true for a Pending task");
}
#[tokio::test]
async fn cancel_pending_task_sets_cancelled_status() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let task_id = mgr.submit("m", json!({}), None).unwrap();
mgr.cancel(&task_id);
let info = mgr.get_status(&task_id).unwrap();
assert_eq!(info.status, TaskStatus::Cancelled);
}
#[tokio::test]
async fn cancel_pending_task_sets_completed_at() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let task_id = mgr.submit("m", json!({}), None).unwrap();
mgr.cancel(&task_id);
let info = mgr.get_status(&task_id).unwrap();
assert!(
info.completed_at.is_some(),
"completed_at should be set when task is cancelled"
);
}
#[tokio::test]
async fn cancel_already_cancelled_task_returns_false() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let task_id = mgr.submit("m", json!({}), None).unwrap();
assert!(mgr.cancel(&task_id), "first cancel should succeed");
assert!(
!mgr.cancel(&task_id),
"second cancel on an already-cancelled task should return false"
);
}
#[tokio::test]
async fn cancel_unknown_task_returns_false() {
let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
assert!(!mgr.cancel("ghost-task-id"));
}
#[tokio::test]
async fn cancel_running_task_sets_cancelled_status() {
let mgr = AsyncTaskManager::new(make_executor(), 1, 100);
let task_id = mgr
.submit("some.module.that.does.not.exist", json!({}), None)
.unwrap();
tokio::task::yield_now().await;
let cancelled = mgr.cancel(&task_id);
let info = mgr.get_status(&task_id).unwrap();
if cancelled {
assert_eq!(info.status, TaskStatus::Cancelled);
} else {
assert_eq!(
info.status,
TaskStatus::Failed,
"if cancel returns false the task should be in a terminal state"
);
}
}
#[tokio::test]
async fn list_tasks_without_filter_returns_all() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
assert!(mgr.list_tasks(None).is_empty());
let id1 = mgr.submit("m1", json!({}), None).unwrap();
let id2 = mgr.submit("m2", json!({}), None).unwrap();
let all = mgr.list_tasks(None);
assert_eq!(all.len(), 2);
let ids: Vec<&str> = all.iter().map(|t| t.task_id.as_str()).collect();
assert!(ids.contains(&id1.as_str()));
assert!(ids.contains(&id2.as_str()));
}
#[tokio::test]
async fn list_tasks_with_pending_filter_returns_only_pending() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let _ = mgr.submit("m", json!({}), None).unwrap();
let _ = mgr.submit("m", json!({}), None).unwrap();
let pending = mgr.list_tasks(Some(TaskStatus::Pending));
assert_eq!(pending.len(), 2, "both tasks should be Pending");
let completed = mgr.list_tasks(Some(TaskStatus::Completed));
assert!(completed.is_empty(), "no tasks should be Completed yet");
}
#[tokio::test]
async fn list_tasks_with_cancelled_filter_returns_only_cancelled() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let id1 = mgr.submit("m", json!({}), None).unwrap();
let id2 = mgr.submit("m", json!({}), None).unwrap();
mgr.cancel(&id1);
let cancelled = mgr.list_tasks(Some(TaskStatus::Cancelled));
assert_eq!(cancelled.len(), 1);
assert_eq!(cancelled[0].task_id, id1);
let pending = mgr.list_tasks(Some(TaskStatus::Pending));
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].task_id, id2);
}
#[tokio::test]
async fn list_tasks_empty_when_no_tasks_match_filter() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let _ = mgr.submit("m", json!({}), None).unwrap();
let completed = mgr.list_tasks(Some(TaskStatus::Completed));
assert!(completed.is_empty());
}
#[tokio::test]
async fn cleanup_removes_cancelled_tasks_past_max_age() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let task_id = mgr.submit("m", json!({}), None).unwrap();
mgr.cancel(&task_id);
let removed = mgr.cleanup(-1.0);
assert_eq!(removed, 1);
assert!(mgr.get_status(&task_id).is_none(), "task should be gone");
}
#[tokio::test]
async fn cleanup_keeps_tasks_within_max_age() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let task_id = mgr.submit("m", json!({}), None).unwrap();
mgr.cancel(&task_id);
let removed = mgr.cleanup(9_999_999.0);
assert_eq!(removed, 0);
assert!(
mgr.get_status(&task_id).is_some(),
"task should still exist"
);
}
#[tokio::test]
async fn cleanup_does_not_remove_active_tasks() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let task_id = mgr.submit("m", json!({}), None).unwrap();
let removed = mgr.cleanup(-1.0);
assert_eq!(
removed, 0,
"active (Pending) tasks must never be cleaned up"
);
assert!(mgr.get_status(&task_id).is_some());
}
#[tokio::test]
async fn cleanup_removes_multiple_terminal_tasks() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let id1 = mgr.submit("m", json!({}), None).unwrap();
let id2 = mgr.submit("m", json!({}), None).unwrap();
let id3 = mgr.submit("m", json!({}), None).unwrap();
mgr.cancel(&id1);
mgr.cancel(&id2);
let removed = mgr.cleanup(-1.0);
assert_eq!(removed, 2, "only the two cancelled tasks should be removed");
assert!(mgr.get_status(&id3).is_some(), "pending task must remain");
}
#[tokio::test]
async fn submit_rejected_at_max_tasks_limit() {
let mgr = AsyncTaskManager::new(make_executor(), 4, 2); let _ = mgr.submit("m", json!({}), None).unwrap();
let _ = mgr.submit("m", json!({}), None).unwrap();
let err = mgr
.submit("m", json!({}), None)
.expect_err("third submit should be rejected");
assert!(
err.to_string().contains("Task limit"),
"error message should mention task limit; got: {err}"
);
}
#[tokio::test]
async fn submit_allowed_after_cleanup_frees_space() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 2); let id1 = mgr.submit("m", json!({}), None).unwrap();
let _ = mgr.submit("m", json!({}), None).unwrap();
assert!(mgr.submit("m", json!({}), None).is_err());
mgr.cancel(&id1);
mgr.cleanup(-1.0);
assert!(
mgr.submit("m", json!({}), None).is_ok(),
"submit should succeed once cleanup freed space"
);
}
#[tokio::test]
async fn tasks_are_queued_when_max_concurrent_reached() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let id1 = mgr.submit("m", json!({}), None).unwrap();
let id2 = mgr.submit("m", json!({}), None).unwrap();
tokio::task::yield_now().await;
let s1 = mgr.get_status(&id1).unwrap().status;
let s2 = mgr.get_status(&id2).unwrap().status;
assert_eq!(s1, TaskStatus::Pending, "task 1 should be stuck Pending");
assert_eq!(s2, TaskStatus::Pending, "task 2 should be stuck Pending");
}
#[tokio::test]
async fn max_concurrent_one_limits_parallelism() {
let mgr = Arc::new(AsyncTaskManager::new(make_executor(), 1, 100));
let _id1 = mgr.submit("m1", json!({}), None).unwrap();
let _id2 = mgr.submit("m2", json!({}), None).unwrap();
assert_eq!(mgr.task_count(), 2);
let running = mgr.list_tasks(Some(TaskStatus::Running)).len();
assert!(
running <= 1,
"at most 1 task should be Running with max_concurrent=1; got {running}"
);
}
#[tokio::test]
async fn shutdown_cancels_all_pending_tasks() {
let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
let id1 = mgr.submit("m1", json!({}), None).unwrap();
let id2 = mgr.submit("m2", json!({}), None).unwrap();
mgr.shutdown();
assert_eq!(mgr.get_status(&id1).unwrap().status, TaskStatus::Cancelled);
assert_eq!(mgr.get_status(&id2).unwrap().status, TaskStatus::Cancelled);
}