#![cfg(test)]
use std::sync::Arc;
use crate::agent::agent_loop::{
LoopSpawnConfig, retrying_stream_fn, rig_stream_fn_from_model, spawn_loop_runner,
};
use crate::agent::recovery::RecoveryPolicy;
use crate::event::AgentEvent;
fn detect_provider() -> Option<&'static str> {
for (var, name) in [
("DEEPSEEK_API_KEY", "deepseek"),
("ANTHROPIC_API_KEY", "anthropic"),
("OPENAI_API_KEY", "openai"),
("OPENROUTER_API_KEY", "openrouter"),
] {
if std::env::var(var).is_ok() {
return Some(name);
}
}
None
}
fn default_model(provider: &str) -> &'static str {
match provider {
"deepseek" => "deepseek-chat",
"anthropic" => "claude-haiku-4-5-20251001",
"openai" => "gpt-4o-mini",
"openrouter" => "deepseek/deepseek-chat",
_ => "gpt-4o-mini",
}
}
fn build_stream_fn() -> Option<crate::agent::agent_loop::StreamFn> {
use crate::provider::{AnyClient, AnyModel};
use rig::providers::{anthropic, openai, openrouter};
use std::collections::HashMap;
let provider = detect_provider()?;
let model_name = default_model(provider);
let client = match crate::provider::create_client(provider, None, &HashMap::new()) {
Ok(c) => c,
Err(e) => {
eprintln!("[h7-smoke] failed to build {provider} client: {e}");
return None;
}
};
let any_model = client.completion_model(model_name);
let chunk_timeout = Some(std::time::Duration::from_secs(60));
let stream_fn = match any_model {
AnyModel::OpenRouter(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
AnyModel::OpenAI(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
AnyModel::ChatGptOpenAI(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
AnyModel::OpenAICodex(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
AnyModel::Anthropic(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
AnyModel::AnthropicOauth(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
AnyModel::Gemini(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
AnyModel::DeepSeek(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
AnyModel::Glm(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
AnyModel::Ollama(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
AnyModel::Custom(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
};
eprintln!("[h7-smoke] using provider={provider} model={model_name}");
Some(retrying_stream_fn(stream_fn, RecoveryPolicy::default()))
}
async fn drain_to_done(
mut runner: crate::agent::runner::AgentRunner,
) -> (Vec<AgentEvent>, Option<String>) {
let mut events = Vec::new();
let mut final_response = None;
while let Some(evt) = runner.event_rx.recv().await {
if let AgentEvent::Done { response, .. } = &evt {
final_response = Some(response.to_string());
}
events.push(evt);
}
let _ = runner.task.await;
(events, final_response)
}
fn dump_events(events: &[AgentEvent]) {
for e in events {
match e {
AgentEvent::Token(s) => eprint!("{}", s),
AgentEvent::Reasoning(_) => eprint!("·"),
AgentEvent::ToolCall { name, args, .. } => {
eprintln!("\n[tool_call] {name}({args})");
}
AgentEvent::ToolStarted { .. } => {}
AgentEvent::Usage { .. } => {}
AgentEvent::ToolResult { output, .. } => {
eprintln!("\n[tool_result] {} bytes", output.len());
}
AgentEvent::TurnStart { index } => eprintln!("\n[turn {index} start]"),
AgentEvent::TurnEnd { index } => eprintln!("\n[turn {index} end]"),
AgentEvent::Done { response, .. } => {
eprintln!("\n[done] response={response:?}");
}
AgentEvent::Error(s) => eprintln!("\n[ERROR] {s}"),
AgentEvent::ContextOverflow { error, .. } => {
eprintln!("\n[context_overflow] {error}");
}
AgentEvent::Interjected { .. } => eprintln!("\n[interjected]"),
AgentEvent::CustomMessage { payload } => {
eprintln!("\n[custom_message] {payload}");
}
AgentEvent::UserMessage { content } => {
eprintln!("\n[user_message] {content}");
}
AgentEvent::CompactionStarted { .. } => {
eprintln!("\n[compaction_started]");
}
AgentEvent::ContextCompacted { .. } => {
eprintln!("\n[context_compacted]");
}
AgentEvent::CheckpointRefresh { .. } => {
eprintln!("\n[checkpoint_refresh]");
}
AgentEvent::RetryNotice {
attempt,
delay_ms,
error,
} => {
eprintln!("\n[retry_notice] attempt={attempt} delay_ms={delay_ms}: {error}");
}
AgentEvent::SystemNotice { content } => {
eprintln!("\n[system_notice] {content}");
}
AgentEvent::RepairStats { snapshot } => {
eprintln!(
"\n[repair_stats] total={} invalid={}",
snapshot.total_successful(),
snapshot.invalid,
);
}
AgentEvent::EscalationActivated { provider, reason } => {
eprintln!("\n[escalation] provider={provider} reason={reason:?}");
}
}
}
eprintln!();
}
#[tokio::test]
async fn h7_scenario_1_simple_text() {
let stream_fn = match build_stream_fn() {
Some(f) => f,
None => {
eprintln!("[skipped] no provider API key in env");
return;
}
};
let cfg = LoopSpawnConfig {
stream_fn,
system_prompt: "You are a helpful assistant. Reply concisely.".to_string(),
history: Vec::new(),
initial_prompt: "What is 2+2? Reply with just the number, nothing else.".to_string(),
tools: Vec::new(),
#[cfg(feature = "plugin")]
plugin_mgr: None,
steering_queue: None,
tool_execution: crate::agent::agent_loop::types::ToolExecutionMode::Parallel,
event_channel_capacity: 256,
provider_name: None,
model_name: None,
summarize_fn: None,
tool_def_filter: None,
dynamic_tool_search: false,
escalation_stream_fn: None,
escalation_provider_name: None,
escalation_max_per_session: None,
file_touch_tracker: None,
verifier: None,
critic_fn: None,
goal_fn: None,
goal: None,
max_turns: None,
bg_store: None,
memory_provider: None,
};
let runner = spawn_loop_runner(cfg).into_agent_runner();
let (events, response) = drain_to_done(runner).await;
dump_events(&events);
let done = response.unwrap_or_default();
assert!(
done.contains('4'),
"expected response to contain '4', got: {done:?}"
);
let token_count = events
.iter()
.filter(|e| matches!(e, AgentEvent::Token(_)))
.count();
assert!(
token_count >= 1,
"expected at least 1 Token event from real stream; got 0 (provider may be returning all-at-once Done — check stream wrapping)"
);
for e in &events {
match e {
AgentEvent::Error(msg) => panic!("unexpected Error: {msg}"),
AgentEvent::ContextOverflow { error, .. } => {
panic!("unexpected ContextOverflow: {error}")
}
_ => {}
}
}
}
#[tokio::test]
async fn h7_scenario_2_turn_boundaries() {
let stream_fn = match build_stream_fn() {
Some(f) => f,
None => {
eprintln!("[skipped] no provider API key in env");
return;
}
};
let cfg = LoopSpawnConfig {
stream_fn,
system_prompt: "Reply briefly.".to_string(),
history: Vec::new(),
initial_prompt: "Say the word 'banana' and nothing else.".to_string(),
tools: Vec::new(),
#[cfg(feature = "plugin")]
plugin_mgr: None,
steering_queue: None,
tool_execution: crate::agent::agent_loop::types::ToolExecutionMode::Parallel,
event_channel_capacity: 256,
provider_name: None,
model_name: None,
summarize_fn: None,
tool_def_filter: None,
dynamic_tool_search: false,
escalation_stream_fn: None,
escalation_provider_name: None,
escalation_max_per_session: None,
file_touch_tracker: None,
verifier: None,
critic_fn: None,
goal_fn: None,
goal: None,
max_turns: None,
bg_store: None,
memory_provider: None,
};
let runner = spawn_loop_runner(cfg).into_agent_runner();
let (events, response) = drain_to_done(runner).await;
dump_events(&events);
let turn_starts = events
.iter()
.filter(|e| matches!(e, AgentEvent::TurnStart { .. }))
.count();
let turn_ends = events
.iter()
.filter(|e| matches!(e, AgentEvent::TurnEnd { .. }))
.count();
let dones = events
.iter()
.filter(|e| matches!(e, AgentEvent::Done { .. }))
.count();
assert_eq!(turn_starts, 1, "expected 1 TurnStart, got {turn_starts}");
assert_eq!(turn_ends, 1, "expected 1 TurnEnd, got {turn_ends}");
assert_eq!(dones, 1, "expected 1 Done, got {dones}");
assert!(
response
.unwrap_or_default()
.to_lowercase()
.contains("banana"),
"expected 'banana' in response",
);
}
#[tokio::test]
async fn h7_scenario_5_auth_error_surfaces() {
let provider = match detect_provider() {
Some(p) => p,
None => {
eprintln!("[skipped] no API key");
return;
}
};
let model_name = default_model(provider);
let client = match crate::provider::create_client(
provider,
Some("invalid-key-for-h7-test"),
&std::collections::HashMap::new(),
) {
Ok(c) => c,
Err(e) => {
eprintln!("[skipped] create_client failed: {e}");
return;
}
};
let any_model = client.completion_model(model_name);
let chunk_timeout = Some(std::time::Duration::from_secs(60));
let inner = match any_model {
crate::provider::AnyModel::DeepSeek(m) => {
rig_stream_fn_from_model(m, vec![], chunk_timeout)
}
crate::provider::AnyModel::OpenAI(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
crate::provider::AnyModel::Anthropic(m) => {
rig_stream_fn_from_model(m, vec![], chunk_timeout)
}
crate::provider::AnyModel::OpenRouter(m) => {
rig_stream_fn_from_model(m, vec![], chunk_timeout)
}
_ => {
eprintln!("[skipped] unsupported provider variant for this scenario");
return;
}
};
let stream_fn = retrying_stream_fn(inner, RecoveryPolicy::default());
let cfg = LoopSpawnConfig {
stream_fn,
system_prompt: String::new(),
history: Vec::new(),
initial_prompt: "hi".to_string(),
tools: Vec::new(),
#[cfg(feature = "plugin")]
plugin_mgr: None,
steering_queue: None,
tool_execution: crate::agent::agent_loop::types::ToolExecutionMode::Parallel,
event_channel_capacity: 256,
provider_name: None,
model_name: None,
summarize_fn: None,
tool_def_filter: None,
dynamic_tool_search: false,
escalation_stream_fn: None,
escalation_provider_name: None,
escalation_max_per_session: None,
file_touch_tracker: None,
verifier: None,
critic_fn: None,
goal_fn: None,
goal: None,
max_turns: None,
bg_store: None,
memory_provider: None,
};
let runner = spawn_loop_runner(cfg).into_agent_runner();
let (events, _) = drain_to_done(runner).await;
dump_events(&events);
let had_error = events
.iter()
.any(|e| matches!(e, AgentEvent::Error(_) | AgentEvent::ContextOverflow { .. }));
assert!(
had_error,
"expected Error or ContextOverflow event for invalid key"
);
}
#[tokio::test]
async fn h7_scenario_3_tool_dispatch() {
use crate::agent::agent_loop::result::LoopToolResult as ResultT;
use crate::agent::agent_loop::tool::{AbortSignal, LoopToolUpdate};
use crate::agent::agent_loop::{LoopTool, LoopToolResult, loop_tool_to_rig_definition};
use rig::completion::ToolDefinition;
use serde_json::Value;
use std::pin::Pin;
let provider = match detect_provider() {
Some(p) => p,
None => {
eprintln!("[skipped] no API key");
return;
}
};
if provider != "deepseek" && provider != "openai" && provider != "openrouter" {
eprintln!("[skipped] tool-use test prefers deepseek/openai/openrouter; got {provider}");
return;
}
#[derive(Debug)]
struct EchoTool;
impl LoopTool for EchoTool {
fn name(&self) -> &str {
"echo_tool"
}
fn description(&self) -> &str {
"Echo the given text back. Use this when asked to echo something."
}
fn label(&self) -> &str {
"Echo"
}
fn parameters(&self) -> &Value {
static P: std::sync::OnceLock<Value> = std::sync::OnceLock::new();
P.get_or_init(|| {
serde_json::json!({
"type": "object",
"properties": {
"text": {"type": "string", "description": "Text to echo"}
},
"required": ["text"]
})
})
}
fn execute<'a>(
&'a self,
_id: &'a str,
args: Value,
_signal: AbortSignal,
_on_update: LoopToolUpdate,
) -> Pin<Box<dyn Future<Output = Result<ResultT, String>> + Send + 'a>> {
Box::pin(async move {
let text = args
.get("text")
.and_then(|v| v.as_str())
.unwrap_or("(no text)")
.to_string();
Ok(ResultT {
content: vec![serde_json::json!({
"type": "text",
"text": format!("ECHO: {text}"),
})],
details: Value::Null,
terminate: None,
})
})
}
}
let tool = Arc::new(EchoTool) as Arc<dyn LoopTool>;
let tool_def = loop_tool_to_rig_definition(tool.as_ref());
let model_name = default_model(provider);
let client = crate::provider::create_client(provider, None, &std::collections::HashMap::new())
.expect("client");
let any_model = client.completion_model(model_name);
let chunk_timeout = Some(std::time::Duration::from_secs(60));
let inner_stream_fn = match any_model {
crate::provider::AnyModel::DeepSeek(m) => {
rig_stream_fn_from_model(m, vec![tool_def.clone()], chunk_timeout)
}
crate::provider::AnyModel::OpenAI(m) => {
rig_stream_fn_from_model(m, vec![tool_def.clone()], chunk_timeout)
}
crate::provider::AnyModel::OpenRouter(m) => {
rig_stream_fn_from_model(m, vec![tool_def.clone()], chunk_timeout)
}
_ => {
eprintln!("[skipped] this scenario hardcoded to deepseek/openai/openrouter");
return;
}
};
let stream_fn = retrying_stream_fn(inner_stream_fn, RecoveryPolicy::default());
eprintln!("[h7-smoke] tool-dispatch test using {provider}/{model_name}");
let cfg = LoopSpawnConfig {
stream_fn,
system_prompt: "You have access to an echo_tool that echoes text back. \
When the user asks you to echo something, USE THE TOOL — \
do not just reply with the text directly. After calling \
the tool, briefly confirm what was echoed."
.to_string(),
history: Vec::new(),
initial_prompt: "Echo the word 'pineapple'.".to_string(),
tools: vec![tool],
#[cfg(feature = "plugin")]
plugin_mgr: None,
steering_queue: None,
tool_execution: crate::agent::agent_loop::types::ToolExecutionMode::Sequential,
event_channel_capacity: 256,
provider_name: None,
model_name: None,
summarize_fn: None,
tool_def_filter: None,
dynamic_tool_search: false,
escalation_stream_fn: None,
escalation_provider_name: None,
escalation_max_per_session: None,
file_touch_tracker: None,
verifier: None,
critic_fn: None,
goal_fn: None,
goal: None,
max_turns: None,
bg_store: None,
memory_provider: None,
};
let runner = spawn_loop_runner(cfg).into_agent_runner();
let (events, response) = drain_to_done(runner).await;
dump_events(&events);
let tool_calls = events
.iter()
.filter(|e| matches!(e, AgentEvent::ToolCall { .. }))
.count();
let tool_results = events
.iter()
.filter(|e| matches!(e, AgentEvent::ToolResult { .. }))
.count();
assert!(
tool_calls >= 1,
"expected the LLM to call echo_tool, got 0 ToolCall events"
);
assert!(
tool_results >= 1,
"expected at least 1 ToolResult event, got 0"
);
assert_eq!(
tool_calls, tool_results,
"expected ToolCall and ToolResult counts to match"
);
let final_resp = response.unwrap_or_default();
assert!(
final_resp.to_lowercase().contains("pineapple"),
"expected final response to reference 'pineapple'; got: {final_resp:?}"
);
}
fn build_glm_stream_fn() -> Option<crate::agent::agent_loop::StreamFn> {
use crate::provider::AnyModel;
use std::collections::HashMap;
let key = match std::env::var("ZHIPU_API_KEY").or_else(|_| std::env::var("GLM_API_KEY")) {
Ok(k) => k,
Err(_) => {
eprintln!("[skipped] need ZHIPU_API_KEY or GLM_API_KEY");
return None;
}
};
let model_name = "glm-5.1";
let client = match crate::provider::create_client("glm", Some(key.as_str()), &HashMap::new()) {
Ok(c) => c,
Err(e) => {
eprintln!("[h7-smoke] failed to build glm client: {e}");
return None;
}
};
let any_model = client.completion_model(model_name);
let chunk_timeout = Some(std::time::Duration::from_secs(60));
let stream_fn = match any_model {
AnyModel::Glm(m) => rig_stream_fn_from_model(m, vec![], chunk_timeout),
_ => {
eprintln!("[h7-smoke] expected AnyModel::Glm");
return None;
}
};
eprintln!("[h7-smoke] using provider=glm model={model_name}");
Some(retrying_stream_fn(stream_fn, RecoveryPolicy::default()))
}
#[tokio::test]
async fn h7_glm_scenario_1_simple_text() {
let stream_fn = match build_glm_stream_fn() {
Some(f) => f,
None => return,
};
let cfg = LoopSpawnConfig {
stream_fn,
system_prompt: "You are a helpful assistant. Reply concisely.".to_string(),
history: Vec::new(),
initial_prompt: "What is 2+2? Reply with just the number, nothing else.".to_string(),
tools: Vec::new(),
#[cfg(feature = "plugin")]
plugin_mgr: None,
steering_queue: None,
tool_execution: crate::agent::agent_loop::types::ToolExecutionMode::Parallel,
event_channel_capacity: 256,
provider_name: None,
model_name: None,
summarize_fn: None,
tool_def_filter: None,
dynamic_tool_search: false,
escalation_stream_fn: None,
escalation_provider_name: None,
escalation_max_per_session: None,
file_touch_tracker: None,
verifier: None,
critic_fn: None,
goal_fn: None,
goal: None,
max_turns: None,
bg_store: None,
memory_provider: None,
};
let runner = spawn_loop_runner(cfg).into_agent_runner();
let (events, response) = drain_to_done(runner).await;
dump_events(&events);
let done = response.unwrap_or_default();
assert!(
done.contains('4'),
"expected response to contain '4', got: {done:?}"
);
for e in &events {
if let AgentEvent::Error(msg) = e {
panic!("unexpected Error: {msg}");
}
}
}
#[tokio::test]
async fn h7_glm_scenario_3_tool_dispatch() {
use crate::agent::agent_loop::result::LoopToolResult as ResultT;
use crate::agent::agent_loop::tool::{AbortSignal, LoopToolUpdate};
use crate::agent::agent_loop::{LoopTool, loop_tool_to_rig_definition};
use serde_json::Value;
use std::pin::Pin;
let key = match std::env::var("ZHIPU_API_KEY").or_else(|_| std::env::var("GLM_API_KEY")) {
Ok(k) => k,
Err(_) => {
eprintln!("[skipped] need ZHIPU_API_KEY or GLM_API_KEY");
return;
}
};
#[derive(Debug)]
struct EchoTool;
impl LoopTool for EchoTool {
fn name(&self) -> &str {
"echo_tool"
}
fn description(&self) -> &str {
"Echo the given text back. Use this when asked to echo something."
}
fn label(&self) -> &str {
"Echo"
}
fn parameters(&self) -> &Value {
static P: std::sync::OnceLock<Value> = std::sync::OnceLock::new();
P.get_or_init(|| {
serde_json::json!({
"type": "object",
"properties": {
"text": {"type": "string", "description": "Text to echo"}
},
"required": ["text"]
})
})
}
fn execute<'a>(
&'a self,
_id: &'a str,
args: Value,
_signal: AbortSignal,
_on_update: LoopToolUpdate,
) -> Pin<Box<dyn Future<Output = Result<ResultT, String>> + Send + 'a>> {
Box::pin(async move {
let text = args
.get("text")
.and_then(|v| v.as_str())
.unwrap_or("(none)")
.to_string();
Ok(ResultT {
content: vec![serde_json::json!({
"type": "text",
"text": format!("ECHO: {text}"),
})],
details: Value::Null,
terminate: None,
})
})
}
}
let tool = Arc::new(EchoTool) as Arc<dyn LoopTool>;
let tool_def = loop_tool_to_rig_definition(tool.as_ref());
let client = crate::provider::create_client(
"glm",
Some(key.as_str()),
&std::collections::HashMap::new(),
)
.expect("client");
let any_model = client.completion_model("glm-5.1");
let chunk_timeout = Some(std::time::Duration::from_secs(60));
let inner = match any_model {
crate::provider::AnyModel::Glm(m) => {
rig_stream_fn_from_model(m, vec![tool_def], chunk_timeout)
}
_ => panic!("expected Glm variant"),
};
let stream_fn = retrying_stream_fn(inner, RecoveryPolicy::default());
eprintln!("[h7-smoke] glm tool-dispatch test");
let cfg = LoopSpawnConfig {
stream_fn,
system_prompt: "You have access to an echo_tool that echoes text back. \
When the user asks you to echo something, USE THE TOOL — \
do not just reply with the text directly. After calling \
the tool, briefly confirm what was echoed."
.to_string(),
history: Vec::new(),
initial_prompt: "Echo the word 'pineapple'.".to_string(),
tools: vec![tool],
#[cfg(feature = "plugin")]
plugin_mgr: None,
steering_queue: None,
tool_execution: crate::agent::agent_loop::types::ToolExecutionMode::Sequential,
event_channel_capacity: 256,
provider_name: None,
model_name: None,
summarize_fn: None,
tool_def_filter: None,
dynamic_tool_search: false,
escalation_stream_fn: None,
escalation_provider_name: None,
escalation_max_per_session: None,
file_touch_tracker: None,
verifier: None,
critic_fn: None,
goal_fn: None,
goal: None,
max_turns: None,
bg_store: None,
memory_provider: None,
};
let runner = spawn_loop_runner(cfg).into_agent_runner();
let (events, response) = drain_to_done(runner).await;
dump_events(&events);
let tool_calls = events
.iter()
.filter(|e| matches!(e, AgentEvent::ToolCall { .. }))
.count();
let tool_results = events
.iter()
.filter(|e| matches!(e, AgentEvent::ToolResult { .. }))
.count();
assert!(tool_calls >= 1, "expected echo_tool call from GLM, got 0");
assert_eq!(tool_calls, tool_results, "call/result count mismatch");
let final_resp = response.unwrap_or_default();
assert!(
final_resp.to_lowercase().contains("pineapple"),
"expected 'pineapple' in final response; got: {final_resp:?}"
);
}
#[allow(unused_imports, dead_code)]
fn _ensure_arc_used(_: Arc<()>) {}