use std::sync::Arc;
use rsclaw::{
MemoryTier,
gateway::task_queue::{
Completion, DispatchAction, Priority, QueuedMessage, Recommend, StructuredOutcome,
TaskOutcome, TaskQueueManager, TaskStatus, decide_action, drain_pending_outcome,
stage_pending_outcome,
},
store::redb_store::RedbStore,
};
fn open_manager() -> (Arc<TaskQueueManager>, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let store =
RedbStore::open(&dir.path().join("queue.redb"), MemoryTier::Low).expect("open redb");
let manager = Arc::new(TaskQueueManager::new(Arc::new(store)));
(manager, dir)
}
fn make_message(text: &str) -> QueuedMessage {
QueuedMessage {
text: text.to_string(),
sender: "test-user".to_string(),
channel: "test-channel".to_string(),
account: None,
chat_id: "test-chat".to_string(),
is_group: false,
reply_to: None,
timestamp: 0,
images: vec![],
files: vec![],
}
}
fn make_outcome(
completion: Completion,
recommend: Recommend,
follow_ups: Vec<&str>,
) -> StructuredOutcome {
StructuredOutcome {
completion,
recommend,
verified: false,
verification_log: None,
accomplished: vec!["did stage 1".into()],
skipped: vec![],
blocked_on: vec![],
assumptions: vec![],
follow_up_tasks: follow_ups.into_iter().map(String::from).collect(),
summary: None,
}
}
#[test]
fn outcome_stash_is_session_scoped() {
let a = "test:scope:a";
let b = "test:scope:b";
stage_pending_outcome(a, make_outcome(Completion::Full, Recommend::Ship, vec![]));
stage_pending_outcome(
b,
make_outcome(Completion::Partial, Recommend::Retry, vec![]),
);
let drained_a = drain_pending_outcome(a).expect("a should have outcome");
let drained_b = drain_pending_outcome(b).expect("b should have outcome");
assert_eq!(drained_a.recommend, Recommend::Ship);
assert_eq!(drained_b.recommend, Recommend::Retry);
}
#[test]
fn structured_continue_spawns_follow_up_tasks_into_queue() {
let (manager, _dir) = open_manager();
let session = "test:selfdrive:parent";
let parent_msg = make_message("Refactor the auth module");
let (parent_id, _) = manager
.submit(session, parent_msg, Priority::User)
.expect("submit parent");
assert!(!parent_id.is_empty());
let dequeued = manager.next().expect("next").expect("parent dequeued");
assert_eq!(dequeued.id, parent_id);
let outcome = make_outcome(
Completion::Partial,
Recommend::Continue,
vec![
"Extract auth middleware into its own module",
"Wire the new module into the router",
"Add an integration test for the unified path",
],
);
stage_pending_outcome(session, outcome);
let drained = drain_pending_outcome(session).expect("staged outcome");
let action = decide_action(&TaskOutcome::Structured(drained), 1, 10);
match action {
DispatchAction::Spawn { tasks } => {
assert_eq!(tasks.len(), 3);
let base = make_message("");
for follow_up in tasks {
let mut msg = base.clone();
msg.text = follow_up;
msg.sender = format!("{}:follow_up", base.sender);
manager
.submit_task(session, msg, Priority::System, 10, 3600)
.expect("submit follow-up");
}
}
other => panic!("expected Spawn, got {other:?}"),
}
manager.complete(&parent_id).expect("complete parent");
let stats = manager.stats().expect("stats");
assert_eq!(stats.pending, 1, "follow-ups merge into one pending task");
let drained_task = manager
.next()
.expect("next")
.expect("merged follow-up task");
assert_eq!(drained_task.messages.len(), 3);
let merged = drained_task.merged_text();
assert!(merged.contains("Extract auth middleware"));
assert!(merged.contains("Wire the new module"));
assert!(merged.contains("integration test"));
manager.complete(&drained_task.id).expect("complete child");
}
#[test]
fn structured_abandon_marks_task_failed() {
let (manager, _dir) = open_manager();
let session = "test:selfdrive:abandon";
let (task_id, _) = manager
.submit(session, make_message("Try the impossible"), Priority::User)
.expect("submit");
let outcome = make_outcome(Completion::Failed, Recommend::Abandon, vec![]);
stage_pending_outcome(session, outcome);
let drained = drain_pending_outcome(session).expect("staged");
let action = decide_action(&TaskOutcome::Structured(drained), 1, 10);
assert_eq!(action, DispatchAction::Fail);
manager
.fail(&task_id, "agent abandoned", 0)
.expect("fail call");
let stats = manager.stats().expect("stats");
assert_eq!(
stats.dead, 1,
"abandoned task (max_retries=0) should land in Dead bucket"
);
assert_eq!(stats.failed, 0);
}
#[test]
fn structured_retry_at_max_turns_downgrades_to_fail() {
let outcome = make_outcome(Completion::Minimal, Recommend::Retry, vec![]);
let action = decide_action(&TaskOutcome::Structured(outcome), 5, 5);
assert_eq!(action, DispatchAction::Fail);
}
#[test]
fn structured_retry_under_budget_continues() {
let outcome = make_outcome(Completion::Minimal, Recommend::Retry, vec![]);
match decide_action(&TaskOutcome::Structured(outcome), 1, 10) {
DispatchAction::AutoContinue { prompt, slow } => {
assert!(prompt.contains("Retry"));
assert!(slow, "retry should rate-limit");
}
other => panic!("expected AutoContinue, got {other:?}"),
}
}
#[test]
fn structured_continue_without_followups_completes() {
let outcome = make_outcome(Completion::Partial, Recommend::Continue, vec![]);
assert_eq!(
decide_action(&TaskOutcome::Structured(outcome), 1, 10),
DispatchAction::Complete
);
}
#[test]
fn mixed_turns_partial_then_task_finish_ship() {
let session = "test:selfdrive:mixed";
let action_t1 = decide_action(&TaskOutcome::Partial, 1, 5);
match action_t1 {
DispatchAction::AutoContinue { .. } => {} other => panic!("turn 1 expected AutoContinue, got {other:?}"),
}
stage_pending_outcome(
session,
make_outcome(Completion::Full, Recommend::Ship, vec![]),
);
let drained = drain_pending_outcome(session).expect("staged");
let action_t2 = decide_action(&TaskOutcome::Structured(drained), 2, 5);
assert_eq!(action_t2, DispatchAction::Complete);
}
#[test]
fn outcome_serializes_with_snake_case_keys() {
let mut out = make_outcome(
Completion::Partial,
Recommend::NeedsHuman,
vec!["follow-up 1"],
);
out.blocked_on = vec!["which database?".into()];
out.assumptions = vec!["assumed postgres".into()];
let json = serde_json::to_value(&out).expect("serialize outcome");
assert_eq!(json["completion"], "partial");
assert_eq!(json["recommend"], "needs_human");
assert_eq!(json["follow_up_tasks"][0], "follow-up 1");
assert_eq!(json["blocked_on"][0], "which database?");
assert_eq!(json["assumptions"][0], "assumed postgres");
assert_eq!(json["accomplished"][0], "did stage 1");
}
#[test]
fn drain_consumes_outcome() {
let session = "test:drain:once";
stage_pending_outcome(
session,
make_outcome(Completion::Full, Recommend::Ship, vec![]),
);
assert!(drain_pending_outcome(session).is_some());
assert!(drain_pending_outcome(session).is_none());
let _ = TaskStatus::Pending;
}