apcore 0.18.0

Schema-driven module standard for AI-perceivable interfaces
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
//! Integration tests for AsyncTaskManager.
//!
//! These tests exercise the public API exposed through `apcore::` and cover
//! the scenarios described in the conformance spec:
//!   - submit() returns a non-empty UUID task_id
//!   - get_status() returns TaskInfo with correct initial/terminal status
//!   - cancel() a pending task
//!   - cancel() a task that is already terminal (idempotent)
//!   - list_tasks() with and without status filter
//!   - cleanup() removes old completed/cancelled tasks and leaves recent ones
//!   - max_tasks limit enforcement (submit rejected when at capacity)
//!   - max_concurrent limit (tasks stay Pending when semaphore is exhausted)

use apcore::async_task::{AsyncTaskManager, TaskStatus};
use apcore::config::Config;
use apcore::context::{Context, Identity};
use apcore::errors::ModuleError;
use apcore::module::Module;
use apcore::registry::registry::Registry;
use apcore::Executor;
use async_trait::async_trait;
use serde_json::{json, Value};
use std::collections::HashMap;
use std::sync::Arc;

// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------

/// Build a bare executor backed by an empty registry.
fn make_executor() -> Arc<Executor> {
    let registry = Arc::new(Registry::default());
    let config = Arc::new(Config::default());
    Arc::new(Executor::new(registry, config))
}

/// A module that echoes its input immediately.
struct EchoModule;

#[async_trait]
impl Module for EchoModule {
    fn input_schema(&self) -> Value {
        json!({ "type": "object" })
    }
    fn output_schema(&self) -> Value {
        json!({ "type": "object" })
    }
    fn description(&self) -> &'static str {
        "Echo input"
    }
    async fn execute(&self, inputs: Value, _ctx: &Context<Value>) -> Result<Value, ModuleError> {
        Ok(inputs)
    }
}

/// Build an executor with an EchoModule registered at the given id.
fn make_executor_with_echo(module_id: &str) -> Arc<Executor> {
    let registry = Arc::new(Registry::default());
    registry
        .register_module(module_id, Box::new(EchoModule))
        .expect("register EchoModule");
    let config = Arc::new(Config::default());
    Arc::new(Executor::new(registry, config))
}

fn _make_ctx() -> Context<Value> {
    Context::new(Identity::new(
        "test".to_string(),
        "Test".to_string(),
        vec![],
        HashMap::new(),
    ))
}

// ---------------------------------------------------------------------------
// submit() — returns a task_id
// ---------------------------------------------------------------------------

#[tokio::test]
async fn submit_returns_non_empty_task_id() {
    let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
    let task_id = mgr
        .submit("any.module", json!({}), None)
        .expect("submit should succeed");
    assert!(!task_id.is_empty(), "task_id must be a non-empty string");
}

#[tokio::test]
async fn submit_increments_task_count() {
    let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
    assert_eq!(mgr.task_count(), 0);
    let _ = mgr.submit("m", json!({}), None).unwrap();
    assert_eq!(mgr.task_count(), 1);
    let _ = mgr.submit("m", json!({}), None).unwrap();
    assert_eq!(mgr.task_count(), 2);
}

#[tokio::test]
async fn submit_task_ids_are_unique() {
    let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
    let id1 = mgr.submit("m", json!({}), None).unwrap();
    let id2 = mgr.submit("m", json!({}), None).unwrap();
    assert_ne!(id1, id2, "each submitted task must receive a unique id");
}

// ---------------------------------------------------------------------------
// get_status() — TaskInfo and status progression
// ---------------------------------------------------------------------------

#[tokio::test]
async fn get_status_returns_some_after_submit() {
    let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
    let task_id = mgr.submit("m", json!({}), None).unwrap();
    // Task exists immediately after submit (may be Pending, Running, or Completed
    // depending on scheduling, but must be present).
    assert!(
        mgr.get_status(&task_id).is_some(),
        "get_status should return Some right after submit"
    );
}

#[tokio::test]
async fn get_status_returns_none_for_unknown_id() {
    let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
    assert!(mgr.get_status("no-such-task").is_none());
}

#[tokio::test]
async fn task_info_contains_correct_module_id() {
    let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
    let task_id = mgr.submit("echo.module", json!({}), None).unwrap();
    let info = mgr.get_status(&task_id).unwrap();
    assert_eq!(info.module_id, "echo.module");
}

#[tokio::test]
async fn task_info_submitted_at_is_set() {
    let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
    let task_id = mgr.submit("m", json!({}), None).unwrap();
    let info = mgr.get_status(&task_id).unwrap();
    assert!(
        info.submitted_at > 0.0,
        "submitted_at must be a positive UNIX timestamp"
    );
}

#[tokio::test]
async fn completed_task_has_completed_status() {
    // Use an executor that actually has the module so the task completes.
    let exec = make_executor_with_echo("echo.v1");
    let mgr = AsyncTaskManager::new(exec, 4, 100);
    let task_id = mgr.submit("echo.v1", json!({"x": 1}), None).unwrap();

    // Poll for completion (up to 1 second).
    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(1);
    loop {
        let status = mgr.get_status(&task_id).unwrap().status;
        if status == TaskStatus::Completed || status == TaskStatus::Failed {
            break;
        }
        assert!(
            std::time::Instant::now() <= deadline,
            "task did not reach a terminal state within 1 second"
        );
        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
    }

    let info = mgr.get_status(&task_id).unwrap();
    assert_eq!(
        info.status,
        TaskStatus::Completed,
        "task for a registered module should complete successfully"
    );
    assert!(
        info.completed_at.is_some(),
        "completed_at must be set after completion"
    );
    assert!(
        info.started_at.is_some(),
        "started_at must be set once execution began"
    );
}

#[tokio::test]
async fn failed_task_has_failed_status_and_error_message() {
    // Module is not registered — executor returns ModuleNotFound.
    let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
    let task_id = mgr.submit("nonexistent.module", json!({}), None).unwrap();

    let deadline = std::time::Instant::now() + std::time::Duration::from_secs(1);
    loop {
        let status = mgr.get_status(&task_id).unwrap().status;
        if matches!(status, TaskStatus::Failed | TaskStatus::Completed) {
            break;
        }
        assert!(
            std::time::Instant::now() <= deadline,
            "task did not reach a terminal state within 1 second"
        );
        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
    }

    let info = mgr.get_status(&task_id).unwrap();
    assert_eq!(info.status, TaskStatus::Failed);
    assert!(
        info.error.is_some(),
        "failed task must have an error message"
    );
    assert!(
        !info.error.as_ref().unwrap().is_empty(),
        "error message must not be empty"
    );
}

// ---------------------------------------------------------------------------
// cancel() — pending task
// ---------------------------------------------------------------------------

#[tokio::test]
async fn cancel_pending_task_returns_true() {
    // max_concurrent = 0 keeps all tasks in Pending state.
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    let task_id = mgr.submit("m", json!({}), None).unwrap();

    let result = mgr.cancel(&task_id);
    assert!(result, "cancel should return true for a Pending task");
}

#[tokio::test]
async fn cancel_pending_task_sets_cancelled_status() {
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    let task_id = mgr.submit("m", json!({}), None).unwrap();
    mgr.cancel(&task_id);

    let info = mgr.get_status(&task_id).unwrap();
    assert_eq!(info.status, TaskStatus::Cancelled);
}

#[tokio::test]
async fn cancel_pending_task_sets_completed_at() {
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    let task_id = mgr.submit("m", json!({}), None).unwrap();
    mgr.cancel(&task_id);

    let info = mgr.get_status(&task_id).unwrap();
    assert!(
        info.completed_at.is_some(),
        "completed_at should be set when task is cancelled"
    );
}

// ---------------------------------------------------------------------------
// cancel() — already terminal task (idempotent)
// ---------------------------------------------------------------------------

#[tokio::test]
async fn cancel_already_cancelled_task_returns_false() {
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    let task_id = mgr.submit("m", json!({}), None).unwrap();
    assert!(mgr.cancel(&task_id), "first cancel should succeed");
    assert!(
        !mgr.cancel(&task_id),
        "second cancel on an already-cancelled task should return false"
    );
}

#[tokio::test]
async fn cancel_unknown_task_returns_false() {
    let mgr = AsyncTaskManager::new(make_executor(), 4, 100);
    assert!(!mgr.cancel("ghost-task-id"));
}

// ---------------------------------------------------------------------------
// cancel() — running task
// ---------------------------------------------------------------------------

#[tokio::test]
async fn cancel_running_task_sets_cancelled_status() {
    // Use max_concurrent = 1 so the task starts running, but point it at a
    // nonexistent module so we have a brief window where it is Running before
    // it either fails or we cancel it. We cancel while it may be Running or
    // Pending; either way the final status must be Cancelled.
    let mgr = AsyncTaskManager::new(make_executor(), 1, 100);
    let task_id = mgr
        .submit("some.module.that.does.not.exist", json!({}), None)
        .unwrap();

    // Give the tokio runtime a tick to let the task acquire the semaphore and
    // mark itself Running before we cancel.
    tokio::task::yield_now().await;

    // Cancel regardless of whether it transitioned to Running yet.
    let cancelled = mgr.cancel(&task_id);

    // The task may already be Failed (module not found) or Cancelled.
    let info = mgr.get_status(&task_id).unwrap();
    if cancelled {
        // cancel() returned true — status must now be Cancelled.
        assert_eq!(info.status, TaskStatus::Cancelled);
    } else {
        // cancel() returned false — task already reached Failed on its own.
        assert_eq!(
            info.status,
            TaskStatus::Failed,
            "if cancel returns false the task should be in a terminal state"
        );
    }
}

// ---------------------------------------------------------------------------
// list_tasks() — with and without filter
// ---------------------------------------------------------------------------

#[tokio::test]
async fn list_tasks_without_filter_returns_all() {
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    assert!(mgr.list_tasks(None).is_empty());

    let id1 = mgr.submit("m1", json!({}), None).unwrap();
    let id2 = mgr.submit("m2", json!({}), None).unwrap();

    let all = mgr.list_tasks(None);
    assert_eq!(all.len(), 2);
    let ids: Vec<&str> = all.iter().map(|t| t.task_id.as_str()).collect();
    assert!(ids.contains(&id1.as_str()));
    assert!(ids.contains(&id2.as_str()));
}

#[tokio::test]
async fn list_tasks_with_pending_filter_returns_only_pending() {
    // max_concurrent = 0 → all tasks stay Pending.
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    let _ = mgr.submit("m", json!({}), None).unwrap();
    let _ = mgr.submit("m", json!({}), None).unwrap();

    let pending = mgr.list_tasks(Some(TaskStatus::Pending));
    assert_eq!(pending.len(), 2, "both tasks should be Pending");

    let completed = mgr.list_tasks(Some(TaskStatus::Completed));
    assert!(completed.is_empty(), "no tasks should be Completed yet");
}

#[tokio::test]
async fn list_tasks_with_cancelled_filter_returns_only_cancelled() {
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    let id1 = mgr.submit("m", json!({}), None).unwrap();
    let id2 = mgr.submit("m", json!({}), None).unwrap();

    mgr.cancel(&id1);

    let cancelled = mgr.list_tasks(Some(TaskStatus::Cancelled));
    assert_eq!(cancelled.len(), 1);
    assert_eq!(cancelled[0].task_id, id1);

    let pending = mgr.list_tasks(Some(TaskStatus::Pending));
    assert_eq!(pending.len(), 1);
    assert_eq!(pending[0].task_id, id2);
}

#[tokio::test]
async fn list_tasks_empty_when_no_tasks_match_filter() {
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    let _ = mgr.submit("m", json!({}), None).unwrap();

    // No tasks have been completed — filter should yield nothing.
    let completed = mgr.list_tasks(Some(TaskStatus::Completed));
    assert!(completed.is_empty());
}

// ---------------------------------------------------------------------------
// cleanup() — removes old completed tasks
// ---------------------------------------------------------------------------

#[tokio::test]
async fn cleanup_removes_cancelled_tasks_past_max_age() {
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    let task_id = mgr.submit("m", json!({}), None).unwrap();
    mgr.cancel(&task_id);

    // A negative max_age means every task is "old enough."
    let removed = mgr.cleanup(-1.0);
    assert_eq!(removed, 1);
    assert!(mgr.get_status(&task_id).is_none(), "task should be gone");
}

#[tokio::test]
async fn cleanup_keeps_tasks_within_max_age() {
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    let task_id = mgr.submit("m", json!({}), None).unwrap();
    mgr.cancel(&task_id);

    // Very large max_age — the task was just created, so it is not old enough.
    let removed = mgr.cleanup(9_999_999.0);
    assert_eq!(removed, 0);
    assert!(
        mgr.get_status(&task_id).is_some(),
        "task should still exist"
    );
}

#[tokio::test]
async fn cleanup_does_not_remove_active_tasks() {
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    let task_id = mgr.submit("m", json!({}), None).unwrap();
    // Task is Pending, not terminal — cleanup with age=-1 must not remove it.
    let removed = mgr.cleanup(-1.0);
    assert_eq!(
        removed, 0,
        "active (Pending) tasks must never be cleaned up"
    );
    assert!(mgr.get_status(&task_id).is_some());
}

#[tokio::test]
async fn cleanup_removes_multiple_terminal_tasks() {
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    let id1 = mgr.submit("m", json!({}), None).unwrap();
    let id2 = mgr.submit("m", json!({}), None).unwrap();
    let id3 = mgr.submit("m", json!({}), None).unwrap();

    mgr.cancel(&id1);
    mgr.cancel(&id2);
    // id3 stays Pending

    let removed = mgr.cleanup(-1.0);
    assert_eq!(removed, 2, "only the two cancelled tasks should be removed");
    assert!(mgr.get_status(&id3).is_some(), "pending task must remain");
}

// ---------------------------------------------------------------------------
// max_tasks limit enforcement
// ---------------------------------------------------------------------------

#[tokio::test]
async fn submit_rejected_at_max_tasks_limit() {
    let mgr = AsyncTaskManager::new(make_executor(), 4, 2); // capacity = 2
    let _ = mgr.submit("m", json!({}), None).unwrap();
    let _ = mgr.submit("m", json!({}), None).unwrap();

    let err = mgr
        .submit("m", json!({}), None)
        .expect_err("third submit should be rejected");
    assert!(
        err.to_string().contains("Task limit"),
        "error message should mention task limit; got: {err}"
    );
}

#[tokio::test]
async fn submit_allowed_after_cleanup_frees_space() {
    let mgr = AsyncTaskManager::new(make_executor(), 0, 2); // capacity = 2
    let id1 = mgr.submit("m", json!({}), None).unwrap();
    let _ = mgr.submit("m", json!({}), None).unwrap();

    // At capacity — third submit should fail.
    assert!(mgr.submit("m", json!({}), None).is_err());

    // Cancel one task, cleanup it away.
    mgr.cancel(&id1);
    mgr.cleanup(-1.0);

    // Now there is room for one more.
    assert!(
        mgr.submit("m", json!({}), None).is_ok(),
        "submit should succeed once cleanup freed space"
    );
}

// ---------------------------------------------------------------------------
// max_concurrent limit — tasks are queued when all permits are held
// ---------------------------------------------------------------------------

#[tokio::test]
async fn tasks_are_queued_when_max_concurrent_reached() {
    // max_concurrent = 0 means no task can ever acquire the semaphore and
    // start running — all tasks stay Pending indefinitely.
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);

    let id1 = mgr.submit("m", json!({}), None).unwrap();
    let id2 = mgr.submit("m", json!({}), None).unwrap();

    // Yield so any tokio tasks get a chance to run.
    tokio::task::yield_now().await;

    let s1 = mgr.get_status(&id1).unwrap().status;
    let s2 = mgr.get_status(&id2).unwrap().status;

    assert_eq!(s1, TaskStatus::Pending, "task 1 should be stuck Pending");
    assert_eq!(s2, TaskStatus::Pending, "task 2 should be stuck Pending");
}

#[tokio::test]
async fn max_concurrent_one_limits_parallelism() {
    // With max_concurrent = 1, submit two tasks. One may start running; the
    // other must remain Pending until the first completes or is cancelled.
    let mgr = Arc::new(AsyncTaskManager::new(make_executor(), 1, 100));

    let _id1 = mgr.submit("m1", json!({}), None).unwrap();
    let _id2 = mgr.submit("m2", json!({}), None).unwrap();

    // The combined count must be exactly 2.
    assert_eq!(mgr.task_count(), 2);

    // At most 1 task should be Running at any point.
    let running = mgr.list_tasks(Some(TaskStatus::Running)).len();
    assert!(
        running <= 1,
        "at most 1 task should be Running with max_concurrent=1; got {running}"
    );
}

// ---------------------------------------------------------------------------
// shutdown() — cancels all active tasks
// ---------------------------------------------------------------------------

#[tokio::test]
async fn shutdown_cancels_all_pending_tasks() {
    let mgr = AsyncTaskManager::new(make_executor(), 0, 100);
    let id1 = mgr.submit("m1", json!({}), None).unwrap();
    let id2 = mgr.submit("m2", json!({}), None).unwrap();
    mgr.shutdown();

    assert_eq!(mgr.get_status(&id1).unwrap().status, TaskStatus::Cancelled);
    assert_eq!(mgr.get_status(&id2).unwrap().status, TaskStatus::Cancelled);
}