use a3s_code_core::config::{CodeConfig, ModelConfig, ModelModalities, ProviderConfig};
use a3s_code_core::llm::Message;
use a3s_code_core::mcp::{McpServerConfig, McpTransportConfig};
use a3s_code_core::subagent_task_tracker::SubagentStatus;
use a3s_code_core::{Agent, AgentEvent, SessionOptions};
use tokio_util::sync::CancellationToken;
fn offline_test_config() -> CodeConfig {
CodeConfig {
default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
providers: vec![ProviderConfig {
name: "anthropic".to_string(),
api_key: Some("offline-key".to_string()),
base_url: None,
headers: std::collections::HashMap::new(),
session_id_header: None,
models: vec![ModelConfig {
id: "claude-sonnet-4-20250514".to_string(),
name: "Claude Sonnet 4".to_string(),
family: "claude-sonnet".to_string(),
api_key: None,
base_url: None,
headers: std::collections::HashMap::new(),
session_id_header: None,
attachment: false,
reasoning: false,
tool_call: true,
temperature: true,
release_date: None,
modalities: ModelModalities::default(),
cost: Default::default(),
limit: Default::default(),
}],
}],
..Default::default()
}
}
#[tokio::test]
async fn close_with_subagent_in_flight_marks_task_cancelled_and_resists_regression() {
let agent = Agent::from_config(offline_test_config()).await.unwrap();
let opts = SessionOptions::new().with_session_id("it1-close-subagent");
let session = agent
.session("/tmp/it1-close-subagent-workspace", Some(opts))
.expect("session");
let tracker = session.subagent_tracker();
let task_id = "task-abc";
let child_session_id = "child-xyz";
let canceller = CancellationToken::new();
tracker
.record_event(&AgentEvent::SubagentStart {
task_id: task_id.to_string(),
session_id: child_session_id.to_string(),
parent_session_id: session.id().to_string(),
agent: "general".to_string(),
description: "long-running synthetic task".to_string(),
})
.await;
tracker.register_canceller(task_id, canceller.clone()).await;
let pending = session.pending_subagent_tasks().await;
assert_eq!(pending.len(), 1, "pre-close pending list");
assert_eq!(pending[0].task_id, task_id);
assert_eq!(pending[0].status, SubagentStatus::Running);
assert!(
!canceller.is_cancelled(),
"canceller must not be fired before close"
);
session.close().await;
assert!(session.is_closed(), "session must report closed");
assert!(
canceller.is_cancelled(),
"subagent canceller must be fired by close()"
);
let snapshot = session
.subagent_task(task_id)
.await
.expect("snapshot still queryable after close");
assert_eq!(snapshot.status, SubagentStatus::Cancelled);
assert!(session.pending_subagent_tasks().await.is_empty());
tracker
.record_event(&AgentEvent::SubagentEnd {
task_id: task_id.to_string(),
session_id: child_session_id.to_string(),
agent: "general".to_string(),
output: "would-have-succeeded".to_string(),
success: true,
})
.await;
let after_end = session
.subagent_task(task_id)
.await
.expect("snapshot remains queryable");
assert_eq!(
after_end.status,
SubagentStatus::Cancelled,
"late SubagentEnd(success=true) must not regress Cancelled status"
);
}
fn disabled_mcp_server(name: &str) -> McpServerConfig {
McpServerConfig {
name: name.to_string(),
transport: McpTransportConfig::Stdio {
command: "/bin/true".to_string(),
args: vec![],
},
enabled: false,
env: std::collections::HashMap::new(),
oauth: None,
tool_timeout_secs: 60,
}
}
#[tokio::test]
async fn agent_close_handles_global_mcp_branch_and_is_idempotent() {
{
let agent = Agent::from_config(offline_test_config()).await.unwrap();
assert!(!agent.is_closed());
agent.close().await;
assert!(agent.is_closed());
agent.close().await;
assert!(agent.is_closed());
}
{
let mut cfg = offline_test_config();
cfg.mcp_servers = vec![disabled_mcp_server("offline-server")];
let agent = Agent::from_config(cfg).await.unwrap();
agent.close().await;
assert!(agent.is_closed());
let err = agent
.session("/tmp/it2-post-close", None)
.err()
.expect("session() after close must error");
let msg = err.to_string();
assert!(
msg.contains("closed") || msg.contains("Closed"),
"post-close session() error must mention 'closed', got: {msg}"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn session_drop_prunes_registry_under_concurrency() {
let agent = std::sync::Arc::new(Agent::from_config(offline_test_config()).await.unwrap());
const N: usize = 32;
let mut handles = Vec::with_capacity(N);
for i in 0..N {
let agent = std::sync::Arc::clone(&agent);
handles.push(tokio::spawn(async move {
let id = format!("it3-session-{i:02}");
let opts = SessionOptions::new().with_session_id(&id);
let session = agent
.session(format!("/tmp/it3-ws-{i:02}"), Some(opts))
.expect("session");
if i % 2 == 0 {
drop(session);
None
} else {
Some((id, session))
}
}));
}
let mut held = Vec::new();
for h in handles {
if let Some(kept) = h.await.expect("task should not panic") {
held.push(kept);
}
}
let mut expected: Vec<String> = held.iter().map(|(id, _)| id.clone()).collect();
expected.sort();
let observed = agent.list_sessions().await;
assert_eq!(
observed, expected,
"registry must contain exactly the IDs of still-held sessions"
);
drop(held);
let after_drop = agent.list_sessions().await;
assert!(
after_drop.is_empty(),
"after dropping all sessions the registry must prune to empty, got: {after_drop:?}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn subagent_tasks_persist_across_save_and_resume() {
use a3s_code_core::store::MemorySessionStore;
let store: std::sync::Arc<dyn a3s_code_core::store::SessionStore> =
std::sync::Arc::new(MemorySessionStore::new());
let agent_a = Agent::from_config(offline_test_config()).await.unwrap();
let opts_a = SessionOptions::new()
.with_session_id("pillar1-subagent-persist")
.with_session_store(std::sync::Arc::clone(&store))
.with_auto_save(true);
let session_a = agent_a
.session("/tmp/pillar1-subagent-persist", Some(opts_a))
.expect("phase A session");
let tracker_a = session_a.subagent_tracker();
let parent_id = session_a.id().to_string();
let inject = |task_id: &str, child_id: &str| AgentEvent::SubagentStart {
task_id: task_id.to_string(),
session_id: child_id.to_string(),
parent_session_id: parent_id.clone(),
agent: "general".to_string(),
description: format!("seed {task_id}"),
};
tracker_a.record_event(&inject("p1-done", "child-1")).await;
tracker_a
.record_event(&AgentEvent::SubagentEnd {
task_id: "p1-done".to_string(),
session_id: "child-1".to_string(),
agent: "general".to_string(),
output: "ok".to_string(),
success: true,
})
.await;
tracker_a.record_event(&inject("p1-fail", "child-2")).await;
tracker_a
.record_event(&AgentEvent::SubagentEnd {
task_id: "p1-fail".to_string(),
session_id: "child-2".to_string(),
agent: "general".to_string(),
output: "boom".to_string(),
success: false,
})
.await;
tracker_a
.record_event(&inject("p1-cancel", "child-3"))
.await;
tracker_a
.register_canceller("p1-cancel", CancellationToken::new())
.await;
let _ = session_a.cancel_subagent_task("p1-cancel").await;
session_a.save().await.expect("phase A save");
let pre_save: Vec<(String, SubagentStatus)> = session_a
.subagent_tasks()
.await
.into_iter()
.map(|t| (t.task_id, t.status))
.collect();
assert_eq!(pre_save.len(), 3);
drop(session_a);
drop(agent_a);
let agent_b = Agent::from_config(offline_test_config()).await.unwrap();
let resume_opts = SessionOptions::new().with_session_store(std::sync::Arc::clone(&store));
let session_b = agent_b
.resume_session("pillar1-subagent-persist", resume_opts)
.expect("phase B resume");
let mut post_resume: Vec<(String, SubagentStatus)> = session_b
.subagent_tasks()
.await
.into_iter()
.map(|t| (t.task_id, t.status))
.collect();
post_resume.sort_by(|a, b| a.0.cmp(&b.0));
let mut expected = pre_save.clone();
expected.sort_by(|a, b| a.0.cmp(&b.0));
assert_eq!(
post_resume, expected,
"resumed session must observe the same subagent task set & statuses"
);
let cancel_attempt = session_b.cancel_subagent_task("p1-done").await;
assert!(
!cancel_attempt,
"cancel on a restored terminal task must return false (no live canceller)"
);
let still_done = session_b
.subagent_task("p1-done")
.await
.expect("snapshot still present");
assert_eq!(still_done.status, SubagentStatus::Completed);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn identity_labels_persist_across_save_and_resume() {
use a3s_code_core::store::MemorySessionStore;
let store: std::sync::Arc<dyn a3s_code_core::store::SessionStore> =
std::sync::Arc::new(MemorySessionStore::new());
let agent_a = Agent::from_config(offline_test_config()).await.unwrap();
let opts_a = SessionOptions::new()
.with_session_id("pillar5-labels")
.with_session_store(std::sync::Arc::clone(&store))
.with_auto_save(true)
.with_tenant_id("acme-prod")
.with_principal("svc-deploy-bot")
.with_agent_template_id("ci-runner-v7")
.with_correlation_id("trace-1234abcd");
let session_a = agent_a
.session("/tmp/pillar5-labels", Some(opts_a))
.expect("phase A session");
session_a.save().await.expect("phase A save");
assert_eq!(session_a.tenant_id(), Some("acme-prod"));
assert_eq!(session_a.correlation_id(), Some("trace-1234abcd"));
drop(session_a);
drop(agent_a);
let agent_b = Agent::from_config(offline_test_config()).await.unwrap();
let resume_opts = SessionOptions::new().with_session_store(std::sync::Arc::clone(&store));
let session_b = agent_b
.resume_session("pillar5-labels", resume_opts)
.expect("phase B resume");
assert_eq!(session_b.tenant_id(), Some("acme-prod"));
assert_eq!(session_b.principal(), Some("svc-deploy-bot"));
assert_eq!(session_b.agent_template_id(), Some("ci-runner-v7"));
assert_eq!(session_b.correlation_id(), Some("trace-1234abcd"));
drop(session_b);
let resume_relabel = SessionOptions::new()
.with_session_store(std::sync::Arc::clone(&store))
.with_correlation_id("trace-followup");
let session_c = agent_b
.resume_session("pillar5-labels", resume_relabel)
.expect("phase C resume");
assert_eq!(
session_c.correlation_id(),
Some("trace-followup"),
"caller-supplied correlation_id must override persisted one"
);
assert_eq!(session_c.tenant_id(), Some("acme-prod"));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn cluster_ops_consolidated_session_lifecycle() {
use a3s_code_core::loop_checkpoint::{LoopCheckpoint, LOOP_CHECKPOINT_SCHEMA_VERSION};
use a3s_code_core::retention::SessionRetentionLimits;
use a3s_code_core::store::MemorySessionStore;
let store: std::sync::Arc<dyn a3s_code_core::store::SessionStore> =
std::sync::Arc::new(MemorySessionStore::new());
let agent_a = Agent::from_config(offline_test_config()).await.unwrap();
let limits_a = SessionRetentionLimits::new()
.with_max_runs(50)
.with_max_terminal_subagent_tasks(20);
let opts_a = SessionOptions::new()
.with_session_id("cluster-ops-target")
.with_session_store(std::sync::Arc::clone(&store))
.with_auto_save(true)
.with_tenant_id("acme-prod")
.with_principal("svc-deploy-bot")
.with_agent_template_id("planner-v3")
.with_correlation_id("trace-cluster-ops")
.with_retention_limits(limits_a);
let session_a = agent_a
.session("/tmp/cluster-ops-node-a", Some(opts_a))
.expect("node A session");
let tracker_a = session_a.subagent_tracker();
tracker_a
.record_event(&AgentEvent::SubagentStart {
task_id: "explore-1".to_string(),
session_id: "child-1".to_string(),
parent_session_id: session_a.id().to_string(),
agent: "explore".to_string(),
description: "find auth callsites".to_string(),
})
.await;
tracker_a
.record_event(&AgentEvent::SubagentEnd {
task_id: "explore-1".to_string(),
session_id: "child-1".to_string(),
agent: "explore".to_string(),
output: "found 3 callsites".to_string(),
success: true,
})
.await;
session_a.save().await.expect("node A save");
let seeded_run_id = "in-flight-run-x";
let cp = LoopCheckpoint {
schema_version: LOOP_CHECKPOINT_SCHEMA_VERSION,
run_id: seeded_run_id.to_string(),
session_id: session_a.id().to_string(),
turn: 2,
messages: vec![
Message::user("refactor the auth module"),
Message {
role: "assistant".to_string(),
content: vec![a3s_code_core::llm::ContentBlock::Text {
text: "scanned callsites, planning edits".to_string(),
}],
reasoning_content: None,
},
],
total_usage: a3s_code_core::llm::TokenUsage {
prompt_tokens: 800,
completion_tokens: 200,
total_tokens: 1000,
cache_read_tokens: None,
cache_write_tokens: None,
},
tool_calls_count: 1,
verification_reports: Vec::new(),
checkpoint_ms: 1_700_000_000_000,
};
store
.save_loop_checkpoint(seeded_run_id, &cp)
.await
.expect("seed checkpoint");
drop(session_a);
drop(agent_a);
let agent_b = Agent::from_config(offline_test_config()).await.unwrap();
let resume_opts = SessionOptions::new().with_session_store(std::sync::Arc::clone(&store));
let session_b = agent_b
.resume_session("cluster-ops-target", resume_opts)
.expect("node B resume");
assert_eq!(session_b.tenant_id(), Some("acme-prod"));
assert_eq!(session_b.principal(), Some("svc-deploy-bot"));
assert_eq!(session_b.agent_template_id(), Some("planner-v3"));
assert_eq!(session_b.correlation_id(), Some("trace-cluster-ops"));
let restored_tasks = session_b.subagent_tasks().await;
assert_eq!(restored_tasks.len(), 1);
assert_eq!(restored_tasks[0].task_id, "explore-1");
assert_eq!(
restored_tasks[0].status,
a3s_code_core::subagent_task_tracker::SubagentStatus::Completed
);
let cp_after = {
let s: std::sync::Arc<dyn a3s_code_core::store::SessionStore> =
std::sync::Arc::clone(&store);
s.load_loop_checkpoint(seeded_run_id)
.await
.expect("load checkpoint after migration")
.expect("checkpoint preserved")
};
assert_eq!(cp_after.run_id, seeded_run_id);
assert_eq!(cp_after.turn, 2);
assert_eq!(cp_after.messages.len(), 2);
assert_eq!(cp_after.total_usage.total_tokens, 1000);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn retention_limits_are_plumbed_into_subagent_tracker() {
use a3s_code_core::retention::SessionRetentionLimits;
let agent = Agent::from_config(offline_test_config()).await.unwrap();
let limits = SessionRetentionLimits::new().with_max_terminal_subagent_tasks(2);
let opts = SessionOptions::new()
.with_session_id("it9-retention")
.with_retention_limits(limits);
let session = agent
.session("/tmp/it9-retention-ws", Some(opts))
.expect("session");
let tracker = session.subagent_tracker();
let parent = session.id().to_string();
let start = |task_id: &str| AgentEvent::SubagentStart {
task_id: task_id.to_string(),
session_id: format!("{task_id}-child"),
parent_session_id: parent.clone(),
agent: "general".to_string(),
description: "seed".to_string(),
};
let end = |task_id: &str| AgentEvent::SubagentEnd {
task_id: task_id.to_string(),
session_id: format!("{task_id}-child"),
agent: "general".to_string(),
output: "ok".to_string(),
success: true,
};
for id in ["t-a", "t-b", "t-c"] {
tracker.record_event(&start(id)).await;
tracker.record_event(&end(id)).await;
}
let surviving: Vec<String> = session
.subagent_tasks()
.await
.into_iter()
.map(|t| t.task_id)
.collect();
assert_eq!(surviving.len(), 2, "cap must be enforced");
assert!(surviving.contains(&"t-b".to_string()));
assert!(surviving.contains(&"t-c".to_string()));
assert!(
!surviving.contains(&"t-a".to_string()),
"oldest terminal entry must be evicted by SessionRetentionLimits"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn loop_checkpoint_round_trips_through_session_store() {
use a3s_code_core::llm::TokenUsage;
use a3s_code_core::loop_checkpoint::{LoopCheckpoint, LOOP_CHECKPOINT_SCHEMA_VERSION};
use a3s_code_core::store::{MemorySessionStore, SessionStore};
let store: std::sync::Arc<dyn SessionStore> = std::sync::Arc::new(MemorySessionStore::new());
let run_id = "run-pillar3-roundtrip";
let checkpoint = LoopCheckpoint {
schema_version: LOOP_CHECKPOINT_SCHEMA_VERSION,
run_id: run_id.to_string(),
session_id: "session-pillar3".to_string(),
turn: 4,
messages: vec![
a3s_code_core::llm::Message::user("seed prompt"),
a3s_code_core::llm::Message {
role: "assistant".to_string(),
content: vec![a3s_code_core::llm::ContentBlock::Text {
text: "ack".to_string(),
}],
reasoning_content: None,
},
],
total_usage: TokenUsage {
prompt_tokens: 120,
completion_tokens: 30,
total_tokens: 150,
cache_read_tokens: None,
cache_write_tokens: None,
},
tool_calls_count: 3,
verification_reports: Vec::new(),
checkpoint_ms: 1_700_000_000_000,
};
store
.save_loop_checkpoint(run_id, &checkpoint)
.await
.expect("save");
let loaded = store
.load_loop_checkpoint(run_id)
.await
.expect("load")
.expect("checkpoint present");
assert_eq!(loaded.run_id, run_id);
assert_eq!(loaded.session_id, "session-pillar3");
assert_eq!(loaded.turn, 4);
assert_eq!(loaded.tool_calls_count, 3);
assert_eq!(loaded.messages.len(), 2);
assert_eq!(loaded.total_usage.total_tokens, 150);
assert_eq!(loaded.schema_version, LOOP_CHECKPOINT_SCHEMA_VERSION);
let mut newer = loaded.clone();
newer.turn = 5;
newer.tool_calls_count = 4;
store
.save_loop_checkpoint(run_id, &newer)
.await
.expect("save second");
let again = store
.load_loop_checkpoint(run_id)
.await
.expect("load again")
.expect("checkpoint still present");
assert_eq!(again.turn, 5);
assert_eq!(again.tool_calls_count, 4);
let absent = store
.load_loop_checkpoint("does-not-exist")
.await
.expect("load missing");
assert!(absent.is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn send_without_tool_calls_does_not_emit_loop_checkpoint() {
use a3s_code_core::store::{MemorySessionStore, SessionStore};
let store_arc: std::sync::Arc<MemorySessionStore> =
std::sync::Arc::new(MemorySessionStore::new());
let store: std::sync::Arc<dyn SessionStore> = store_arc.clone();
let agent = Agent::from_config(offline_test_config()).await.unwrap();
let opts = SessionOptions::new()
.with_session_id("pillar3-no-tool-call")
.with_session_store(std::sync::Arc::clone(&store))
.with_auto_save(true);
let session = agent
.session("/tmp/pillar3-no-tools", Some(opts))
.expect("session");
let probe = store
.load_loop_checkpoint("any-fake-run-id")
.await
.expect("probe");
assert!(probe.is_none());
assert!(!session.is_closed());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn resume_run_error_paths_are_distinguishable() {
use a3s_code_core::store::MemorySessionStore;
{
let agent = Agent::from_config(offline_test_config()).await.unwrap();
let session = agent
.session("/tmp/it8-no-store", None)
.expect("session no store");
let err = session.resume_run("any-id").await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("session_store"),
"expected store-missing error, got: {msg}"
);
}
{
let store: std::sync::Arc<dyn a3s_code_core::store::SessionStore> =
std::sync::Arc::new(MemorySessionStore::new());
let agent = Agent::from_config(offline_test_config()).await.unwrap();
let opts = SessionOptions::new()
.with_session_id("it8-no-checkpoint")
.with_session_store(std::sync::Arc::clone(&store));
let session = agent
.session("/tmp/it8-no-checkpoint", Some(opts))
.expect("session with store");
let err = session.resume_run("does-not-exist").await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("does-not-exist") && msg.contains("no loop checkpoint"),
"expected checkpoint-missing error naming the run id, got: {msg}"
);
}
}