use std::sync::Arc;
use std::time::Duration;
use apcore::async_task::{
AsyncTaskManager, InMemoryTaskStore, ReaperConfig, TaskInfo, TaskStatus, TaskStore,
};
use apcore::config::Config;
use apcore::context::Context;
use apcore::errors::{ErrorCode, ModuleError};
use apcore::module::Module;
use apcore::registry::registry::Registry;
use apcore::Executor;
use async_trait::async_trait;
use serde_json::{json, Value};
struct NoopModule;
#[async_trait]
impl Module for NoopModule {
fn input_schema(&self) -> Value {
json!({"type": "object"})
}
fn output_schema(&self) -> Value {
json!({"type": "object"})
}
fn description(&self) -> &'static str {
"noop"
}
async fn execute(&self, _inputs: Value, _ctx: &Context<Value>) -> Result<Value, ModuleError> {
Ok(json!({"ok": true}))
}
}
fn make_manager_with_store(max_tasks: usize, store: Arc<dyn TaskStore>) -> AsyncTaskManager {
let registry = Arc::new(Registry::new());
registry
.register_module("noop.module", Box::new(NoopModule))
.unwrap();
let executor = Arc::new(Executor::new(registry, Arc::new(Config::default())));
AsyncTaskManager::with_store(executor, 16, max_tasks, store)
}
fn make_manager(max_tasks: usize) -> AsyncTaskManager {
make_manager_with_store(max_tasks, Arc::new(InMemoryTaskStore::new()))
}
#[tokio::test]
async fn max_tasks_counts_only_active_statuses() {
let store = Arc::new(InMemoryTaskStore::new());
for i in 0..3 {
let mut info = TaskInfo::default();
info.task_id = format!("done-{i}");
info.module_id = "noop.module".to_string();
info.status = TaskStatus::Completed;
info.completed_at = Some(0.0);
info.started_at = Some(0.0);
info.result = Some(json!({}));
store.save(&info).await.unwrap();
}
let mgr = make_manager_with_store( 3, store.clone() as Arc<dyn TaskStore>);
let result = mgr.submit("noop.module", json!({}), None).await;
assert!(
result.is_ok(),
"submit must not be rejected by terminal-state records (closes A-D-AT-01); got {:?}",
result.err()
);
}
#[tokio::test]
async fn max_tasks_still_rejects_when_active_budget_exhausted() {
let store = Arc::new(InMemoryTaskStore::new());
for i in 0..2 {
let mut info = TaskInfo::default();
info.task_id = format!("running-{i}");
info.module_id = "noop.module".to_string();
info.status = TaskStatus::Running;
info.started_at = Some(0.0);
store.save(&info).await.unwrap();
}
let mgr = make_manager_with_store( 2, store as Arc<dyn TaskStore>);
let err = mgr
.submit("noop.module", json!({}), None)
.await
.expect_err("submit must fail when active >= max_tasks");
assert_eq!(err.code, ErrorCode::TaskLimitExceeded);
}
#[tokio::test]
async fn start_reaper_rejects_concurrent_start() {
let mgr = make_manager( 100);
let mut cfg = ReaperConfig::default();
cfg.ttl_seconds = 60.0;
cfg.sweep_interval_ms = 5_000;
let first = mgr
.start_reaper(cfg)
.expect("first start_reaper must succeed");
let err = mgr
.start_reaper(cfg)
.expect_err("second start_reaper must fail while first is live");
assert_eq!(err.code, ErrorCode::ReaperAlreadyRunning);
first.stop().await;
let third = mgr
.start_reaper(cfg)
.expect("start_reaper must succeed after stop()");
third.stop().await;
}
#[tokio::test]
async fn dropped_reaper_handle_releases_running_flag() {
let mgr = make_manager( 100);
let mut cfg = ReaperConfig::default();
cfg.ttl_seconds = 60.0;
cfg.sweep_interval_ms = 5_000;
{
let _detached = mgr.start_reaper(cfg).unwrap();
}
tokio::time::sleep(Duration::from_millis(10)).await;
let handle = mgr
.start_reaper(cfg)
.expect("drop must release running flag");
handle.stop().await;
}