#![allow(
clippy::doc_markdown,
clippy::manual_assert,
clippy::manual_string_new,
clippy::type_complexity
)]
#[cfg(test)]
pub mod agent_tests {
use super::super::message_queue::{MAX_AUDIO_BYTES, MAX_IMAGE_BYTES, detect_image_mime};
#[allow(unused_imports)]
pub(crate) use super::super::{
Agent, CODE_CONTEXT_PREFIX, CROSS_SESSION_PREFIX, RECALL_PREFIX, SUMMARY_PREFIX,
TOOL_OUTPUT_SUFFIX, format_tool_output, recv_optional, shutdown_signal,
};
pub(crate) use crate::channel::Channel;
use crate::channel::{Attachment, AttachmentKind, ChannelMessage};
pub(crate) use crate::config::{SecurityConfig, TimeoutConfig};
pub(crate) use crate::metrics::MetricsSnapshot;
#[allow(unused_imports)]
pub(crate) use sqlx::prelude::*;
use std::sync::{Arc, Mutex};
pub(crate) use tokio::sync::{Notify, mpsc, watch};
pub(crate) use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
pub(crate) use zeph_llm::provider::{Message, MessageMetadata, Role};
pub(crate) use zeph_memory::semantic::SemanticMemory;
pub(crate) use zeph_skills::registry::SkillRegistry;
pub(crate) use zeph_skills::watcher::SkillEvent;
pub(crate) use zeph_tools::executor::ToolExecutor;
use zeph_tools::executor::{ToolError, ToolOutput};
pub(crate) struct QuickTestAgent {
pub(crate) agent: Agent<MockChannel>,
}
impl QuickTestAgent {
pub(crate) fn minimal(response: &str) -> Self {
let provider = mock_provider(vec![response.to_owned()]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
Self {
agent: Agent::new(provider, channel, registry, None, 5, executor),
}
}
pub(crate) fn with_responses(responses: Vec<String>) -> Self {
let provider = mock_provider(responses);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
Self {
agent: Agent::new(provider, channel, registry, None, 5, executor),
}
}
pub(crate) fn sent_messages(&self) -> Vec<String> {
self.agent.channel.sent_messages()
}
}
pub(crate) fn mock_provider(responses: Vec<String>) -> AnyProvider {
AnyProvider::Mock(MockProvider::with_responses(responses))
}
pub(crate) fn mock_provider_streaming(responses: Vec<String>) -> AnyProvider {
AnyProvider::Mock(MockProvider::with_responses(responses).with_streaming())
}
pub(crate) fn mock_provider_failing() -> AnyProvider {
AnyProvider::Mock(MockProvider::failing())
}
pub(crate) fn mock_provider_with_models(
responses: Vec<String>,
models: Vec<zeph_llm::model_cache::RemoteModelInfo>,
) -> AnyProvider {
AnyProvider::Mock(MockProvider::with_responses(responses).with_models(models))
}
pub(crate) struct MockChannel {
messages: Arc<Mutex<Vec<String>>>,
sent: Arc<Mutex<Vec<String>>>,
chunks: Arc<Mutex<Vec<String>>>,
confirmations: Arc<Mutex<Vec<bool>>>,
pub(crate) statuses: Arc<Mutex<Vec<String>>>,
exit_supported: bool,
}
impl MockChannel {
pub(crate) fn new(messages: Vec<String>) -> Self {
Self {
messages: Arc::new(Mutex::new(messages)),
sent: Arc::new(Mutex::new(Vec::new())),
chunks: Arc::new(Mutex::new(Vec::new())),
confirmations: Arc::new(Mutex::new(Vec::new())),
statuses: Arc::new(Mutex::new(Vec::new())),
exit_supported: true,
}
}
pub(crate) fn without_exit_support(mut self) -> Self {
self.exit_supported = false;
self
}
pub(crate) fn with_confirmations(mut self, confirmations: Vec<bool>) -> Self {
self.confirmations = Arc::new(Mutex::new(confirmations));
self
}
pub(crate) fn sent_messages(&self) -> Vec<String> {
self.sent.lock().unwrap().clone()
}
pub(crate) fn sent_chunks(&self) -> Vec<String> {
self.chunks.lock().unwrap().clone()
}
}
impl Channel for MockChannel {
async fn recv(&mut self) -> Result<Option<ChannelMessage>, crate::channel::ChannelError> {
let mut msgs = self.messages.lock().unwrap();
if msgs.is_empty() {
Ok(None)
} else {
Ok(Some(ChannelMessage {
text: msgs.remove(0),
attachments: vec![],
}))
}
}
fn try_recv(&mut self) -> Option<ChannelMessage> {
let mut msgs = self.messages.lock().unwrap();
if msgs.is_empty() {
None
} else {
Some(ChannelMessage {
text: msgs.remove(0),
attachments: vec![],
})
}
}
async fn send(&mut self, text: &str) -> Result<(), crate::channel::ChannelError> {
self.sent.lock().unwrap().push(text.to_string());
Ok(())
}
async fn send_chunk(&mut self, chunk: &str) -> Result<(), crate::channel::ChannelError> {
self.chunks.lock().unwrap().push(chunk.to_string());
Ok(())
}
async fn flush_chunks(&mut self) -> Result<(), crate::channel::ChannelError> {
Ok(())
}
async fn send_status(&mut self, text: &str) -> Result<(), crate::channel::ChannelError> {
self.statuses.lock().unwrap().push(text.to_string());
Ok(())
}
async fn confirm(&mut self, _prompt: &str) -> Result<bool, crate::channel::ChannelError> {
let mut confs = self.confirmations.lock().unwrap();
Ok(if confs.is_empty() {
true
} else {
confs.remove(0)
})
}
fn supports_exit(&self) -> bool {
self.exit_supported
}
}
pub(crate) struct MockToolExecutor {
outputs: Arc<Mutex<Vec<Result<Option<ToolOutput>, ToolError>>>>,
pub(crate) captured_env: Arc<Mutex<Vec<Option<std::collections::HashMap<String, String>>>>>,
}
impl MockToolExecutor {
pub(crate) fn new(outputs: Vec<Result<Option<ToolOutput>, ToolError>>) -> Self {
Self {
outputs: Arc::new(Mutex::new(outputs)),
captured_env: Arc::new(Mutex::new(Vec::new())),
}
}
pub(crate) fn no_tools() -> Self {
Self::new(vec![Ok(None)])
}
}
impl ToolExecutor for MockToolExecutor {
async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
let mut outputs = self.outputs.lock().unwrap();
if outputs.is_empty() {
Ok(None)
} else {
outputs.remove(0)
}
}
fn set_skill_env(&self, env: Option<std::collections::HashMap<String, String>>) {
self.captured_env.lock().unwrap().push(env);
}
}
pub(crate) fn create_test_registry() -> SkillRegistry {
let temp_dir = tempfile::tempdir().unwrap();
let skill_dir = temp_dir.path().join("test-skill");
std::fs::create_dir(&skill_dir).unwrap();
std::fs::write(
skill_dir.join("SKILL.md"),
"---\nname: test-skill\ndescription: A test skill\n---\nTest skill body",
)
.unwrap();
SkillRegistry::load(&[temp_dir.path().to_path_buf()])
}
#[tokio::test]
async fn agent_new_initializes_with_system_prompt() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor);
assert_eq!(agent.msg.messages.len(), 1);
assert_eq!(agent.msg.messages[0].role, Role::System);
assert!(!agent.msg.messages[0].content.is_empty());
}
#[tokio::test]
async fn agent_with_working_dir_updates_environment_context() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let tmp = tempfile::tempdir().unwrap();
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_model_name("test-model")
.with_working_dir(tmp.path().to_path_buf());
assert_eq!(
agent.session.env_context.working_dir,
tmp.path().display().to_string()
);
assert_eq!(agent.session.env_context.model_name, "test-model");
}
#[tokio::test]
async fn agent_with_embedding_model_sets_model() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_embedding_model("test-embed-model".to_string());
assert_eq!(agent.skill_state.embedding_model, "test-embed-model");
}
#[tokio::test]
async fn agent_with_shutdown_sets_receiver() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (_tx, rx) = watch::channel(false);
let _agent = Agent::new(provider, channel, registry, None, 5, executor).with_shutdown(rx);
}
#[tokio::test]
async fn agent_with_security_sets_config() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let security = SecurityConfig {
redact_secrets: true,
..Default::default()
};
let timeouts = TimeoutConfig {
llm_seconds: 60,
..Default::default()
};
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_security(security, timeouts);
assert!(agent.runtime.security.redact_secrets);
assert_eq!(agent.runtime.timeouts.llm_seconds, 60);
}
#[tokio::test]
async fn agent_run_handles_empty_channel() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn agent_run_processes_user_message() {
let provider = mock_provider(vec!["test response".to_string()]);
let channel = MockChannel::new(vec!["hello".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
assert_eq!(agent.msg.messages.len(), 3);
assert_eq!(agent.msg.messages[1].role, Role::User);
assert_eq!(agent.msg.messages[1].content, "hello");
assert_eq!(agent.msg.messages[2].role, Role::Assistant);
}
#[tokio::test]
async fn agent_run_handles_shutdown_signal() {
let provider = mock_provider(vec![]);
let (tx, rx) = watch::channel(false);
let channel = MockChannel::new(vec!["should not process".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent =
Agent::new(provider, channel, registry, None, 5, executor).with_shutdown(rx);
tx.send(true).unwrap();
let result = agent.run().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn agent_handles_skills_command() {
let provider = mock_provider(vec![]);
let _channel = MockChannel::new(vec!["/skills".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent_channel = MockChannel::new(vec!["/skills".to_string()]);
let sent = agent_channel.sent.clone();
let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let sent_msgs = sent.lock().unwrap();
assert!(!sent_msgs.is_empty());
assert!(sent_msgs[0].contains("Available skills"));
}
#[tokio::test]
async fn agent_process_response_handles_empty_response() {
let provider = mock_provider(vec![String::new()]);
let _channel = MockChannel::new(vec!["test".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent_channel = MockChannel::new(vec!["test".to_string()]);
let sent = agent_channel.sent.clone();
let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let sent_msgs = sent.lock().unwrap();
assert!(sent_msgs.iter().any(|m| m.contains("empty response")));
}
#[tokio::test]
async fn agent_handles_tool_execution_success() {
let provider = mock_provider(vec!["response with tool".to_string()]);
let _channel = MockChannel::new(vec!["execute tool".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::new(vec![Ok(Some(ToolOutput {
tool_name: "bash".to_string(),
summary: "tool executed successfully".to_string(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
}))]);
let agent_channel = MockChannel::new(vec!["execute tool".to_string()]);
let sent = agent_channel.sent.clone();
let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let sent_msgs = sent.lock().unwrap();
assert!(
sent_msgs
.iter()
.any(|m| m.contains("tool executed successfully"))
);
}
#[tokio::test]
async fn agent_handles_tool_blocked_error() {
let provider = mock_provider(vec!["run blocked command".to_string()]);
let _channel = MockChannel::new(vec!["test".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::new(vec![Err(ToolError::Blocked {
command: "rm -rf /".to_string(),
})]);
let agent_channel = MockChannel::new(vec!["test".to_string()]);
let sent = agent_channel.sent.clone();
let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let sent_msgs = sent.lock().unwrap();
assert!(
sent_msgs
.iter()
.any(|m| m.contains("blocked by security policy"))
);
}
#[tokio::test]
async fn agent_handles_tool_sandbox_violation() {
let provider = mock_provider(vec!["access forbidden path".to_string()]);
let _channel = MockChannel::new(vec!["test".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::new(vec![Err(ToolError::SandboxViolation {
path: "/etc/passwd".to_string(),
})]);
let agent_channel = MockChannel::new(vec!["test".to_string()]);
let sent = agent_channel.sent.clone();
let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let sent_msgs = sent.lock().unwrap();
assert!(sent_msgs.iter().any(|m| m.contains("outside the sandbox")));
}
#[tokio::test]
async fn agent_handles_tool_confirmation_approved() {
let provider = mock_provider(vec!["needs confirmation".to_string()]);
let _channel = MockChannel::new(vec!["test".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::new(vec![Err(ToolError::ConfirmationRequired {
command: "dangerous command".to_string(),
})]);
let agent_channel =
MockChannel::new(vec!["test".to_string()]).with_confirmations(vec![true]);
let sent = agent_channel.sent.clone();
let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let sent_msgs = sent.lock().unwrap();
assert!(!sent_msgs.is_empty());
}
#[tokio::test]
async fn agent_handles_tool_confirmation_denied() {
let provider = mock_provider(vec!["needs confirmation".to_string()]);
let _channel = MockChannel::new(vec!["test".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::new(vec![Err(ToolError::ConfirmationRequired {
command: "dangerous command".to_string(),
})]);
let agent_channel =
MockChannel::new(vec!["test".to_string()]).with_confirmations(vec![false]);
let sent = agent_channel.sent.clone();
let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let sent_msgs = sent.lock().unwrap();
assert!(sent_msgs.iter().any(|m| m.contains("Command cancelled")));
}
#[tokio::test]
async fn agent_handles_streaming_response() {
let provider = mock_provider_streaming(vec!["streaming response".to_string()]);
let _channel = MockChannel::new(vec!["test".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent_channel = MockChannel::new(vec!["test".to_string()]);
let chunks = agent_channel.chunks.clone();
let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let sent_chunks = chunks.lock().unwrap();
assert!(!sent_chunks.is_empty());
}
#[tokio::test]
async fn agent_maybe_redact_enabled() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let security = SecurityConfig {
redact_secrets: true,
..Default::default()
};
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_security(security, TimeoutConfig::default());
let text = "token: sk-abc123secret";
let redacted = agent.maybe_redact(text);
assert_ne!(AsRef::<str>::as_ref(&redacted), text);
}
#[tokio::test]
async fn agent_maybe_redact_disabled() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let security = SecurityConfig {
redact_secrets: false,
..Default::default()
};
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_security(security, TimeoutConfig::default());
let text = "password=secret123";
let redacted = agent.maybe_redact(text);
assert_eq!(AsRef::<str>::as_ref(&redacted), text);
}
#[tokio::test]
async fn agent_handles_multiple_messages() {
let provider = mock_provider(vec![
"first response".to_string(),
"second response".to_string(),
]);
let channel = MockChannel::new(vec!["first".to_string(), "second".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::new(vec![Ok(None), Ok(None)]);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
assert_eq!(agent.msg.messages.len(), 3);
assert_eq!(agent.msg.messages[1].content, "first\nsecond");
}
#[tokio::test]
async fn agent_handles_tool_output_with_error_marker() {
let provider = mock_provider(vec!["response".to_string(), "retry".to_string()]);
let channel = MockChannel::new(vec!["test".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::new(vec![
Ok(Some(ToolOutput {
tool_name: "bash".to_string(),
summary: "[error] command failed [exit code 1]".to_string(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
})),
Ok(None),
]);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn agent_handles_empty_tool_output() {
let provider = mock_provider(vec!["response".to_string()]);
let channel = MockChannel::new(vec!["test".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::new(vec![Ok(Some(ToolOutput {
tool_name: "bash".to_string(),
summary: " ".to_string(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
}))]);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn shutdown_signal_helper_returns_on_true() {
let (tx, rx) = watch::channel(false);
let handle = tokio::spawn(async move {
let mut rx_clone = rx;
shutdown_signal(&mut rx_clone).await;
});
tx.send(true).unwrap();
let result = tokio::time::timeout(std::time::Duration::from_millis(100), handle).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn recv_optional_returns_pending_when_no_receiver() {
let result = tokio::time::timeout(
std::time::Duration::from_millis(10),
recv_optional::<SkillEvent>(&mut None),
)
.await;
assert!(result.is_err());
}
#[tokio::test]
async fn recv_optional_receives_from_channel() {
let (tx, rx) = mpsc::channel(1);
tx.send(SkillEvent::Changed).await.unwrap();
let result = recv_optional(&mut Some(rx)).await;
assert!(result.is_some());
}
#[tokio::test]
async fn agent_with_skill_reload_sets_paths() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (_tx, rx) = mpsc::channel(1);
let paths = vec![std::path::PathBuf::from("/test/path")];
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_skill_reload(paths.clone(), rx);
assert_eq!(agent.skill_state.skill_paths, paths);
}
#[tokio::test]
async fn agent_handles_tool_execution_error() {
let provider = mock_provider(vec!["response".to_string()]);
let _channel = MockChannel::new(vec!["test".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::new(vec![Err(ToolError::Timeout { timeout_secs: 30 })]);
let agent_channel = MockChannel::new(vec!["test".to_string()]);
let sent = agent_channel.sent.clone();
let mut agent = Agent::new(provider, agent_channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let sent_msgs = sent.lock().unwrap();
assert!(
sent_msgs
.iter()
.any(|m| m.contains("Tool execution failed"))
);
}
#[tokio::test]
async fn agent_processes_multi_turn_tool_execution() {
let provider = mock_provider(vec![
"first response".to_string(),
"second response".to_string(),
]);
let channel = MockChannel::new(vec!["start task".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::new(vec![
Ok(Some(ToolOutput {
tool_name: "bash".to_string(),
summary: "step 1 complete".to_string(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
})),
Ok(None),
]);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
assert!(agent.msg.messages.len() > 3);
}
#[tokio::test]
async fn agent_respects_max_shell_iterations() {
let mut responses = vec![];
for _ in 0..10 {
responses.push("response".to_string());
}
let provider = mock_provider(responses);
let channel = MockChannel::new(vec!["test".to_string()]);
let registry = create_test_registry();
let mut outputs = vec![];
for _ in 0..10 {
outputs.push(Ok(Some(ToolOutput {
tool_name: "bash".to_string(),
summary: "continuing".to_string(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
})));
}
let executor = MockToolExecutor::new(outputs);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let assistant_count = agent
.msg
.messages
.iter()
.filter(|m| m.role == Role::Assistant)
.count();
assert!(assistant_count <= 10);
}
#[tokio::test]
async fn agent_with_metrics_sets_initial_values() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (tx, rx) = watch::channel(crate::metrics::MetricsSnapshot::default());
let _agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_model_name("test-model")
.with_metrics(tx);
let snapshot = rx.borrow().clone();
assert_eq!(snapshot.provider_name, "mock");
assert_eq!(snapshot.model_name, "test-model");
assert_eq!(snapshot.total_skills, 1);
assert!(
snapshot.prompt_tokens > 0,
"initial prompt estimate should be non-zero"
);
assert_eq!(snapshot.total_tokens, snapshot.prompt_tokens);
}
#[tokio::test]
async fn agent_metrics_update_on_llm_call() {
let provider = mock_provider(vec!["response".to_string()]);
let channel = MockChannel::new(vec!["hello".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (tx, rx) = watch::channel(crate::metrics::MetricsSnapshot::default());
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
agent.run().await.unwrap();
let snapshot = rx.borrow().clone();
assert_eq!(snapshot.api_calls, 1);
assert!(snapshot.total_tokens > 0);
}
#[tokio::test]
async fn agent_metrics_streaming_updates_completion_tokens() {
let provider = mock_provider_streaming(vec!["streaming response".to_string()]);
let channel = MockChannel::new(vec!["test".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (tx, rx) = watch::channel(crate::metrics::MetricsSnapshot::default());
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
agent.run().await.unwrap();
let snapshot = rx.borrow().clone();
assert!(snapshot.completion_tokens > 0);
assert_eq!(snapshot.api_calls, 1);
}
#[tokio::test]
async fn agent_metrics_persist_increments_count() {
let provider = mock_provider(vec!["response".to_string()]);
let channel = MockChannel::new(vec!["hello".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (tx, rx) = watch::channel(crate::metrics::MetricsSnapshot::default());
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
agent.run().await.unwrap();
let snapshot = rx.borrow().clone();
assert!(snapshot.sqlite_message_count == 0, "no memory = no persist");
}
#[tokio::test]
async fn agent_metrics_skills_updated_on_prompt_rebuild() {
let provider = mock_provider(vec!["response".to_string()]);
let channel = MockChannel::new(vec!["hello".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (tx, rx) = watch::channel(crate::metrics::MetricsSnapshot::default());
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
agent.run().await.unwrap();
let snapshot = rx.borrow().clone();
assert_eq!(snapshot.total_skills, 1);
assert!(!snapshot.active_skills.is_empty());
}
#[test]
fn update_metrics_noop_when_none() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.update_metrics(|m| m.api_calls = 999);
}
#[test]
fn update_metrics_sets_uptime_seconds() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (tx, rx) = tokio::sync::watch::channel(MetricsSnapshot::default());
let agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
agent.update_metrics(|m| m.api_calls = 1);
let snapshot = rx.borrow();
assert!(snapshot.uptime_seconds < 2);
assert_eq!(snapshot.api_calls, 1);
}
#[test]
fn test_last_user_query_finds_original() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.msg.messages.push(Message {
role: Role::User,
content: "hello".to_string(),
parts: vec![],
metadata: MessageMetadata::default(),
});
agent.msg.messages.push(Message {
role: Role::Assistant,
content: "cmd".to_string(),
parts: vec![],
metadata: MessageMetadata::default(),
});
agent.msg.messages.push(Message {
role: Role::User,
content: "[tool output: bash]\nsome output".to_string(),
parts: vec![],
metadata: MessageMetadata::default(),
});
assert_eq!(agent.last_user_query(), "hello");
}
#[test]
fn test_last_user_query_empty_messages() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor);
assert_eq!(agent.last_user_query(), "");
}
#[tokio::test]
async fn test_maybe_summarize_short_output_passthrough() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_tool_summarization(true);
let short = "short output";
let result = agent.maybe_summarize_tool_output(short).await;
assert_eq!(result, short);
}
#[tokio::test] async fn test_overflow_notice_contains_uuid() {
use std::sync::Arc;
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
use zeph_memory::semantic::SemanticMemory;
let memory = SemanticMemory::with_sqlite_backend(
":memory:",
AnyProvider::Mock(MockProvider::default()),
"test-model",
0.7,
0.3,
)
.await
.unwrap();
let cid = memory.sqlite().create_conversation().await.unwrap();
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_tool_summarization(false)
.with_overflow_config(zeph_tools::OverflowConfig {
threshold: 100,
retention_days: 7,
max_overflow_bytes: 0,
})
.with_memory(Arc::new(memory), cid, 100, 5, 1000);
let long = "x".repeat(zeph_tools::MAX_TOOL_OUTPUT_CHARS + 1000);
let result = agent.maybe_summarize_tool_output(&long).await;
assert!(
result.contains("full output stored"),
"notice must contain overflow storage notice, got: {result}"
);
assert!(
result.contains("bytes"),
"notice must contain byte count, got: {result}"
);
assert!(
result.contains("read_overflow"),
"notice must mention read_overflow tool, got: {result}"
);
assert!(
!result.contains(".txt"),
"notice must not contain filesystem path, got: {result}"
);
}
#[tokio::test]
async fn test_maybe_summarize_long_output_disabled_truncates() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_tool_summarization(false)
.with_overflow_config(zeph_tools::OverflowConfig {
threshold: 1000,
retention_days: 7,
max_overflow_bytes: 0,
});
let long = "x".repeat(zeph_tools::MAX_TOOL_OUTPUT_CHARS + 1000);
let result = agent.maybe_summarize_tool_output(&long).await;
assert!(result.contains("truncated"));
}
#[tokio::test]
async fn test_maybe_summarize_long_output_enabled_calls_llm() {
let provider = mock_provider(vec!["summary text".to_string()]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_tool_summarization(true)
.with_overflow_config(zeph_tools::OverflowConfig {
threshold: 1000,
retention_days: 7,
max_overflow_bytes: 0,
});
let long = "x".repeat(zeph_tools::MAX_TOOL_OUTPUT_CHARS + 1000);
let result = agent.maybe_summarize_tool_output(&long).await;
assert!(result.contains("summary text"));
assert!(result.contains("[tool output summary]"));
assert!(!result.contains("truncated"));
}
#[tokio::test]
async fn test_summarize_tool_output_llm_failure_fallback() {
let provider = mock_provider_failing();
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_tool_summarization(true)
.with_overflow_config(zeph_tools::OverflowConfig {
threshold: 1000,
retention_days: 7,
max_overflow_bytes: 0,
});
let long = "x".repeat(zeph_tools::MAX_TOOL_OUTPUT_CHARS + 1000);
let result = agent.maybe_summarize_tool_output(&long).await;
assert!(result.contains("truncated"));
}
#[tokio::test] async fn test_overflow_no_memory_backend_s3_fallback() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_tool_summarization(false)
.with_overflow_config(zeph_tools::OverflowConfig {
threshold: 100,
retention_days: 7,
max_overflow_bytes: 0,
});
let long = "x".repeat(200);
let result = agent.maybe_summarize_tool_output(&long).await;
assert!(
result.contains("could not be saved — no memory backend or conversation available"),
"S3 fallback message must appear when no memory backend, got: {result}"
);
}
#[test]
fn with_tool_summarization_sets_flag() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_tool_summarization(true);
assert!(agent.tool_orchestrator.summarize_tool_output_enabled);
let provider2 = mock_provider(vec![]);
let channel2 = MockChannel::new(vec![]);
let registry2 = create_test_registry();
let executor2 = MockToolExecutor::no_tools();
let agent2 = Agent::new(provider2, channel2, registry2, None, 5, executor2)
.with_tool_summarization(false);
assert!(!agent2.tool_orchestrator.summarize_tool_output_enabled);
}
#[test]
fn format_tool_output_structure() {
let out = format_tool_output("bash", "hello world");
assert!(out.starts_with("[tool output: bash]\n```\n"));
assert!(out.ends_with(TOOL_OUTPUT_SUFFIX));
assert!(out.contains("hello world"));
}
#[test]
fn format_tool_output_empty_body() {
let out = format_tool_output("grep", "");
assert_eq!(out, "[tool output: grep]\n```\n\n```");
}
#[tokio::test]
async fn cancel_signal_propagates_to_fresh_token() {
use tokio_util::sync::CancellationToken;
let signal = Arc::new(Notify::new());
let token = CancellationToken::new();
let sig = Arc::clone(&signal);
let tok = token.clone();
tokio::spawn(async move {
sig.notified().await;
tok.cancel();
});
tokio::task::yield_now().await;
assert!(!token.is_cancelled());
signal.notify_waiters();
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(token.is_cancelled());
}
#[tokio::test]
async fn cancel_signal_works_across_multiple_messages() {
use tokio_util::sync::CancellationToken;
let signal = Arc::new(Notify::new());
let token1 = CancellationToken::new();
let sig1 = Arc::clone(&signal);
let tok1 = token1.clone();
tokio::spawn(async move {
sig1.notified().await;
tok1.cancel();
});
tokio::task::yield_now().await;
signal.notify_waiters();
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(token1.is_cancelled());
let token2 = CancellationToken::new();
let sig2 = Arc::clone(&signal);
let tok2 = token2.clone();
tokio::spawn(async move {
sig2.notified().await;
tok2.cancel();
});
tokio::task::yield_now().await;
assert!(!token2.is_cancelled());
signal.notify_waiters();
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
assert!(token2.is_cancelled());
}
mod resolve_message_tests {
use super::*;
use crate::channel::{Attachment, AttachmentKind, ChannelMessage};
use std::future::Future;
use std::pin::Pin;
use zeph_llm::error::LlmError;
use zeph_llm::stt::{SpeechToText, Transcription};
struct MockStt {
text: Option<String>,
}
impl MockStt {
fn ok(text: &str) -> Self {
Self {
text: Some(text.to_string()),
}
}
fn failing() -> Self {
Self { text: None }
}
}
impl SpeechToText for MockStt {
fn transcribe(
&self,
_audio: &[u8],
_filename: Option<&str>,
) -> Pin<Box<dyn Future<Output = Result<Transcription, LlmError>> + Send + '_>>
{
let result = match &self.text {
Some(t) => Ok(Transcription {
text: t.clone(),
language: None,
duration_secs: None,
}),
None => Err(LlmError::TranscriptionFailed("mock error".into())),
};
Box::pin(async move { result })
}
}
fn make_agent(stt: Option<Box<dyn SpeechToText>>) -> Agent<MockChannel> {
let provider = mock_provider(vec!["ok".into()]);
let empty: Vec<String> = vec![];
let registry = zeph_skills::registry::SkillRegistry::load(&empty);
let channel = MockChannel::new(vec![]);
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.providers.stt = stt;
agent
}
fn audio_attachment(data: &[u8]) -> Attachment {
Attachment {
kind: AttachmentKind::Audio,
data: data.to_vec(),
filename: Some("test.wav".into()),
}
}
#[tokio::test]
async fn no_audio_attachments_returns_text() {
let agent = make_agent(None);
let msg = ChannelMessage {
text: "hello".into(),
attachments: vec![],
};
assert_eq!(agent.resolve_message(msg).await.0, "hello");
}
#[tokio::test]
async fn audio_without_stt_returns_original_text() {
let agent = make_agent(None);
let msg = ChannelMessage {
text: "hello".into(),
attachments: vec![audio_attachment(b"audio-data")],
};
assert_eq!(agent.resolve_message(msg).await.0, "hello");
}
#[tokio::test]
async fn audio_with_stt_prepends_transcription() {
let agent = make_agent(Some(Box::new(MockStt::ok("transcribed text"))));
let msg = ChannelMessage {
text: "original".into(),
attachments: vec![audio_attachment(b"audio-data")],
};
let (result, _) = agent.resolve_message(msg).await;
assert!(result.contains("[transcribed audio]"));
assert!(result.contains("transcribed text"));
assert!(result.contains("original"));
}
#[tokio::test]
async fn audio_with_stt_no_original_text() {
let agent = make_agent(Some(Box::new(MockStt::ok("transcribed text"))));
let msg = ChannelMessage {
text: String::new(),
attachments: vec![audio_attachment(b"audio-data")],
};
let (result, _) = agent.resolve_message(msg).await;
assert_eq!(result, "transcribed text");
}
#[tokio::test]
async fn all_transcriptions_fail_returns_original() {
let agent = make_agent(Some(Box::new(MockStt::failing())));
let msg = ChannelMessage {
text: "original".into(),
attachments: vec![audio_attachment(b"audio-data")],
};
assert_eq!(agent.resolve_message(msg).await.0, "original");
}
#[tokio::test]
async fn multiple_audio_attachments_joined() {
let agent = make_agent(Some(Box::new(MockStt::ok("chunk"))));
let msg = ChannelMessage {
text: String::new(),
attachments: vec![
audio_attachment(b"a1"),
audio_attachment(b"a2"),
audio_attachment(b"a3"),
],
};
let (result, _) = agent.resolve_message(msg).await;
assert_eq!(result, "chunk\nchunk\nchunk");
}
#[tokio::test]
async fn oversized_audio_skipped() {
let agent = make_agent(Some(Box::new(MockStt::ok("should not appear"))));
let big = vec![0u8; MAX_AUDIO_BYTES + 1];
let msg = ChannelMessage {
text: "original".into(),
attachments: vec![Attachment {
kind: AttachmentKind::Audio,
data: big,
filename: None,
}],
};
assert_eq!(agent.resolve_message(msg).await.0, "original");
}
}
#[test]
fn detect_image_mime_jpeg() {
assert_eq!(detect_image_mime(Some("photo.jpg")), "image/jpeg");
assert_eq!(detect_image_mime(Some("photo.jpeg")), "image/jpeg");
}
#[test]
fn detect_image_mime_gif() {
assert_eq!(detect_image_mime(Some("anim.gif")), "image/gif");
}
#[test]
fn detect_image_mime_webp() {
assert_eq!(detect_image_mime(Some("img.webp")), "image/webp");
}
#[test]
fn detect_image_mime_unknown_defaults_png() {
assert_eq!(detect_image_mime(Some("file.bmp")), "image/png");
assert_eq!(detect_image_mime(None), "image/png");
}
#[tokio::test]
async fn resolve_message_extracts_image_attachment() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor);
let msg = ChannelMessage {
text: "look at this".into(),
attachments: vec![Attachment {
kind: AttachmentKind::Image,
data: vec![0u8; 16],
filename: Some("test.jpg".into()),
}],
};
let (text, parts) = agent.resolve_message(msg).await;
assert_eq!(text, "look at this");
assert_eq!(parts.len(), 1);
match &parts[0] {
zeph_llm::provider::MessagePart::Image(img) => {
assert_eq!(img.mime_type, "image/jpeg");
assert_eq!(img.data.len(), 16);
}
_ => panic!("expected Image part"),
}
}
#[tokio::test]
async fn resolve_message_drops_oversized_image() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor);
let msg = ChannelMessage {
text: "big image".into(),
attachments: vec![Attachment {
kind: AttachmentKind::Image,
data: vec![0u8; MAX_IMAGE_BYTES + 1],
filename: Some("huge.png".into()),
}],
};
let (text, parts) = agent.resolve_message(msg).await;
assert_eq!(text, "big image");
assert!(parts.is_empty());
}
#[tokio::test]
async fn handle_image_command_rejects_path_traversal() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.handle_image_command("../../etc/passwd").await;
assert!(result.is_ok());
assert!(agent.msg.pending_image_parts.is_empty());
let sent = agent.channel.sent_messages();
assert!(sent.iter().any(|m| m.contains("traversal")));
}
#[tokio::test]
async fn handle_image_command_missing_file_sends_error() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.handle_image_command("/nonexistent/image.png").await;
assert!(result.is_ok());
assert!(agent.msg.pending_image_parts.is_empty());
let sent = agent.channel.sent_messages();
assert!(sent.iter().any(|m| m.contains("Cannot read image")));
}
#[tokio::test]
async fn handle_image_command_loads_valid_file() {
use std::io::Write;
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let mut tmp = tempfile::NamedTempFile::with_suffix(".jpg").unwrap();
let data = vec![0xFFu8, 0xD8, 0xFF, 0xE0];
tmp.write_all(&data).unwrap();
let path = tmp.path().to_str().unwrap().to_owned();
let result = agent.handle_image_command(&path).await;
assert!(result.is_ok());
assert_eq!(agent.msg.pending_image_parts.len(), 1);
match &agent.msg.pending_image_parts[0] {
zeph_llm::provider::MessagePart::Image(img) => {
assert_eq!(img.data, data);
assert_eq!(img.mime_type, "image/jpeg");
}
_ => panic!("expected Image part"),
}
let sent = agent.channel.sent_messages();
assert!(sent.iter().any(|m| m.contains("Image loaded")));
}
use crate::subagent::AgentCommand;
fn make_agent_with_manager() -> Agent<MockChannel> {
use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
use crate::subagent::hooks::SubagentHooks;
use crate::subagent::{SubAgentDef, SubAgentManager};
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let mut mgr = SubAgentManager::new(4);
mgr.definitions_mut().push(SubAgentDef {
name: "helper".into(),
description: "A helper bot".into(),
model: None,
tools: ToolPolicy::InheritAll,
disallowed_tools: vec![],
permissions: SubAgentPermissions::default(),
skills: SkillFilter::default(),
system_prompt: "You are helpful.".into(),
hooks: SubagentHooks::default(),
memory: None,
source: None,
file_path: None,
});
agent.orchestration.subagent_manager = Some(mgr);
agent
}
#[tokio::test]
async fn agent_command_no_manager_returns_none() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
assert!(
agent
.handle_agent_command(AgentCommand::List)
.await
.is_none()
);
}
#[tokio::test]
async fn agent_command_list_returns_definitions() {
let mut agent = make_agent_with_manager();
let resp = agent
.handle_agent_command(AgentCommand::List)
.await
.unwrap();
assert!(resp.contains("helper"));
assert!(resp.contains("A helper bot"));
}
#[tokio::test]
async fn agent_command_spawn_unknown_name_returns_error() {
let mut agent = make_agent_with_manager();
let resp = agent
.handle_agent_command(AgentCommand::Background {
name: "unknown-bot".into(),
prompt: "do something".into(),
})
.await
.unwrap();
assert!(resp.contains("Failed to spawn"));
}
#[tokio::test]
async fn agent_command_spawn_known_name_returns_started() {
let mut agent = make_agent_with_manager();
let resp = agent
.handle_agent_command(AgentCommand::Background {
name: "helper".into(),
prompt: "do some work".into(),
})
.await
.unwrap();
assert!(resp.contains("helper"));
assert!(resp.contains("started"));
}
#[tokio::test]
async fn agent_command_status_no_agents_returns_empty_message() {
let mut agent = make_agent_with_manager();
let resp = agent
.handle_agent_command(AgentCommand::Status)
.await
.unwrap();
assert!(resp.contains("No active sub-agents"));
}
#[tokio::test]
async fn agent_command_cancel_unknown_id_returns_not_found() {
let mut agent = make_agent_with_manager();
let resp = agent
.handle_agent_command(AgentCommand::Cancel {
id: "deadbeef".into(),
})
.await
.unwrap();
assert!(resp.contains("No sub-agent"));
}
#[tokio::test]
async fn agent_command_cancel_valid_id_succeeds() {
let mut agent = make_agent_with_manager();
let spawn_resp = agent
.handle_agent_command(AgentCommand::Background {
name: "helper".into(),
prompt: "cancel this".into(),
})
.await
.unwrap();
let short_id = spawn_resp
.split("id: ")
.nth(1)
.unwrap()
.trim_end_matches(')')
.trim()
.to_string();
let resp = agent
.handle_agent_command(AgentCommand::Cancel { id: short_id })
.await
.unwrap();
assert!(resp.contains("Cancelled"));
}
#[tokio::test]
async fn agent_command_approve_no_pending_request() {
let mut agent = make_agent_with_manager();
let spawn_resp = agent
.handle_agent_command(AgentCommand::Background {
name: "helper".into(),
prompt: "do work".into(),
})
.await
.unwrap();
let short_id = spawn_resp
.split("id: ")
.nth(1)
.unwrap()
.trim_end_matches(')')
.trim()
.to_string();
let resp = agent
.handle_agent_command(AgentCommand::Approve { id: short_id })
.await
.unwrap();
assert!(resp.contains("No pending secret request"));
}
#[test]
fn set_model_updates_model_name() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
assert!(agent.set_model("claude-opus-4-6").is_ok());
assert_eq!(agent.runtime.model_name, "claude-opus-4-6");
}
#[test]
fn set_model_overwrites_previous_value() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.set_model("model-a").unwrap();
agent.set_model("model-b").unwrap();
assert_eq!(agent.runtime.model_name, "model-b");
}
#[tokio::test]
async fn model_command_switch_sends_confirmation() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.handle_model_command("/model my-new-model").await;
let messages = sent.lock().unwrap();
assert!(
messages.iter().any(|m| m.contains("my-new-model")),
"expected switch confirmation, got: {messages:?}"
);
}
#[tokio::test]
async fn model_command_list_no_cache_fetches_remote() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
zeph_llm::model_cache::ModelCache::for_slug("mock").invalidate();
agent.handle_model_command("/model").await;
let messages = sent.lock().unwrap();
assert!(
messages.iter().any(|m| m.contains("No models")),
"expected empty model list message, got: {messages:?}"
);
}
#[tokio::test]
async fn model_command_refresh_sends_result() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.handle_model_command("/model refresh").await;
let messages = sent.lock().unwrap();
assert!(
messages.iter().any(|m| m.contains("Fetched")),
"expected fetch confirmation, got: {messages:?}"
);
}
#[tokio::test]
async fn model_command_valid_model_accepted() {
zeph_llm::model_cache::ModelCache::for_slug("mock").invalidate();
let models = vec![
zeph_llm::model_cache::RemoteModelInfo {
id: "llama3:8b".to_string(),
display_name: "Llama 3 8B".to_string(),
context_window: Some(8192),
created_at: None,
},
zeph_llm::model_cache::RemoteModelInfo {
id: "qwen3:8b".to_string(),
display_name: "Qwen3 8B".to_string(),
context_window: Some(32768),
created_at: None,
},
];
let provider = mock_provider_with_models(vec![], models);
let channel = MockChannel::new(vec![]);
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.handle_model_command("/model llama3:8b").await;
let messages = sent.lock().unwrap();
assert!(
messages
.iter()
.any(|m| m.contains("Switched to model: llama3:8b")),
"expected switch confirmation, got: {messages:?}"
);
assert!(
!messages.iter().any(|m| m.contains("Unknown model")),
"unexpected rejection for valid model, got: {messages:?}"
);
}
#[tokio::test]
async fn model_command_invalid_model_rejected() {
zeph_llm::model_cache::ModelCache::for_slug("mock").invalidate();
let models = vec![zeph_llm::model_cache::RemoteModelInfo {
id: "qwen3:8b".to_string(),
display_name: "Qwen3 8B".to_string(),
context_window: None,
created_at: None,
}];
let provider = mock_provider_with_models(vec![], models);
let channel = MockChannel::new(vec![]);
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.handle_model_command("/model nonexistent-model").await;
let messages = sent.lock().unwrap();
assert!(
messages
.iter()
.any(|m| m.contains("Unknown model") && m.contains("nonexistent-model")),
"expected rejection with model name, got: {messages:?}"
);
assert!(
messages.iter().any(|m| m.contains("qwen3:8b")),
"expected available models list, got: {messages:?}"
);
assert!(
!messages.iter().any(|m| m.contains("Switched to model")),
"should not switch to invalid model, got: {messages:?}"
);
}
#[tokio::test]
async fn model_command_empty_model_list_warns_and_proceeds() {
zeph_llm::model_cache::ModelCache::for_slug("mock").invalidate();
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.handle_model_command("/model unknown-model").await;
let messages = sent.lock().unwrap();
assert!(
messages.iter().any(|m| m.contains("unavailable")),
"expected warning about unavailable model list, got: {messages:?}"
);
assert!(
messages
.iter()
.any(|m| m.contains("Switched to model: unknown-model")),
"expected switch to proceed despite missing model list, got: {messages:?}"
);
}
#[tokio::test]
async fn help_command_lists_commands() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec!["/help".to_string()]);
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let messages = sent.lock().unwrap();
assert!(!messages.is_empty(), "expected /help output");
let output = messages.join("\n");
assert!(output.contains("/help"), "expected /help in output");
assert!(output.contains("/exit"), "expected /exit in output");
assert!(output.contains("/status"), "expected /status in output");
assert!(output.contains("/skills"), "expected /skills in output");
assert!(output.contains("/model"), "expected /model in output");
}
#[tokio::test]
async fn help_command_does_not_include_unknown_commands() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec!["/help".to_string()]);
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let messages = sent.lock().unwrap();
let output = messages.join("\n");
assert!(
!output.contains("/ingest"),
"unexpected /ingest in /help output"
);
}
#[tokio::test]
async fn status_command_includes_provider_and_model() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec!["/status".to_string()]);
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let messages = sent.lock().unwrap();
assert!(!messages.is_empty(), "expected /status output");
let output = messages.join("\n");
assert!(output.contains("Provider:"), "expected Provider: field");
assert!(output.contains("Model:"), "expected Model: field");
assert!(output.contains("Uptime:"), "expected Uptime: field");
assert!(output.contains("Tokens:"), "expected Tokens: field");
}
#[tokio::test]
async fn status_command_shows_metrics_in_cli_mode() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec!["/status".to_string()]);
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (tx, _rx) = watch::channel(MetricsSnapshot::default());
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
agent.update_metrics(|m| {
m.api_calls = 3;
m.prompt_tokens = 100;
m.completion_tokens = 50;
});
let result = agent.run().await;
assert!(result.is_ok());
let messages = sent.lock().unwrap();
let output = messages.join("\n");
assert!(
output.contains("API calls: 3"),
"expected non-zero api_calls in /status output; got: {output}"
);
assert!(
output.contains("100 prompt / 50 completion"),
"expected non-zero tokens in /status output; got: {output}"
);
}
#[tokio::test]
async fn status_command_shows_orchestration_stats_when_plans_nonzero() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec!["/status".to_string()]);
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (tx, _rx) = watch::channel(MetricsSnapshot::default());
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
agent.update_metrics(|m| {
m.orchestration.plans_total = 2;
m.orchestration.tasks_total = 10;
m.orchestration.tasks_completed = 8;
m.orchestration.tasks_failed = 1;
m.orchestration.tasks_skipped = 1;
});
let result = agent.run().await;
assert!(result.is_ok());
let messages = sent.lock().unwrap();
let output = messages.join("\n");
assert!(
output.contains("Orchestration:"),
"expected Orchestration: section; got: {output}"
);
assert!(
output.contains("Plans: 2"),
"expected Plans: 2; got: {output}"
);
assert!(
output.contains("8/10 completed"),
"expected 8/10 completed; got: {output}"
);
assert!(
output.contains("Failed: 1"),
"expected Failed: 1; got: {output}"
);
assert!(
output.contains("Skipped: 1"),
"expected Skipped: 1; got: {output}"
);
}
#[tokio::test]
async fn status_command_hides_orchestration_when_no_plans() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec!["/status".to_string()]);
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (tx, _rx) = watch::channel(MetricsSnapshot::default());
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
let result = agent.run().await;
assert!(result.is_ok());
let messages = sent.lock().unwrap();
let output = messages.join("\n");
assert!(
!output.contains("Orchestration:"),
"Orchestration: section must be absent when no plans ran; got: {output}"
);
}
#[tokio::test]
async fn exit_command_breaks_run_loop() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec!["/exit".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
assert_eq!(agent.msg.messages.len(), 1, "expected only system message");
}
#[tokio::test]
async fn quit_command_breaks_run_loop() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec!["/quit".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
assert_eq!(agent.msg.messages.len(), 1, "expected only system message");
}
#[tokio::test]
async fn exit_command_sends_info_and_continues_when_not_supported() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![
"/exit".to_string(),
])
.without_exit_support();
let sent = channel.sent.clone();
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let messages = sent.lock().unwrap();
assert!(
messages
.iter()
.any(|m| m.contains("/exit is not supported")),
"expected info message, got: {messages:?}"
);
}
#[test]
fn slash_commands_registry_has_no_ingest() {
use super::super::slash_commands::COMMANDS;
assert!(
!COMMANDS.iter().any(|c| c.name == "/ingest"),
"/ingest is not implemented and must not appear in COMMANDS"
);
}
#[test]
fn slash_commands_graph_and_plan_have_no_feature_gate() {
use super::super::slash_commands::COMMANDS;
for cmd in COMMANDS {
if cmd.name == "/graph" || cmd.name == "/plan" {
assert!(
cmd.feature_gate.is_none(),
"{} should have feature_gate: None",
cmd.name
);
}
}
}
#[tokio::test]
async fn bare_skill_command_does_not_invoke_llm() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec!["/skill".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let sent = agent.channel.sent_messages();
assert!(
sent.iter().any(|m| m.contains("Unknown /skill subcommand")),
"bare /skill must send usage; got: {sent:?}"
);
assert!(
agent.msg.messages.iter().all(|m| m.role != Role::Assistant),
"bare /skill must not produce an assistant message; messages: {:?}",
agent.msg.messages
);
}
#[tokio::test]
async fn bare_feedback_command_does_not_invoke_llm() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec!["/feedback".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let sent = agent.channel.sent_messages();
assert!(
sent.iter().any(|m| m.contains("Usage: /feedback")),
"bare /feedback must send usage; got: {sent:?}"
);
assert!(
agent.msg.messages.iter().all(|m| m.role != Role::Assistant),
"bare /feedback must not produce an assistant message; messages: {:?}",
agent.msg.messages
);
}
#[tokio::test]
async fn bare_image_command_sends_usage() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec!["/image".to_string()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let sent = agent.channel.sent_messages();
assert!(
sent.iter().any(|m| m.contains("Usage: /image <path>")),
"bare /image must send usage; got: {sent:?}"
);
assert!(
agent.msg.messages.iter().all(|m| m.role != Role::Assistant),
"bare /image must not produce an assistant message; messages: {:?}",
agent.msg.messages
);
}
#[tokio::test]
async fn feedback_positive_records_user_approval() {
let provider = mock_provider(vec![]);
let memory =
SemanticMemory::new(":memory:", "http://127.0.0.1:1", provider.clone(), "test")
.await
.unwrap();
let cid = memory.sqlite().create_conversation().await.unwrap();
let memory = std::sync::Arc::new(memory);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
memory.clone(),
cid,
50,
5,
50,
);
agent
.handle_feedback("git great job, works perfectly")
.await
.unwrap();
let row: Option<(String,)> =
sqlx::query_as("SELECT outcome FROM skill_outcomes WHERE skill_name = 'git' LIMIT 1")
.fetch_optional(memory.sqlite().pool())
.await
.unwrap();
assert_eq!(
row.map(|r| r.0).as_deref(),
Some("user_approval"),
"positive feedback must be recorded as user_approval"
);
}
#[tokio::test]
async fn feedback_negative_records_user_rejection() {
let provider = mock_provider(vec![]);
let memory =
SemanticMemory::new(":memory:", "http://127.0.0.1:1", provider.clone(), "test")
.await
.unwrap();
let cid = memory.sqlite().create_conversation().await.unwrap();
let memory = std::sync::Arc::new(memory);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
memory.clone(),
cid,
50,
5,
50,
);
agent
.handle_feedback("git that was wrong, bad output")
.await
.unwrap();
let row: Option<(String,)> =
sqlx::query_as("SELECT outcome FROM skill_outcomes WHERE skill_name = 'git' LIMIT 1")
.fetch_optional(memory.sqlite().pool())
.await
.unwrap();
assert_eq!(
row.map(|r| r.0).as_deref(),
Some("user_rejection"),
"negative feedback must be recorded as user_rejection"
);
}
#[tokio::test]
async fn feedback_neutral_records_user_approval() {
let provider = mock_provider(vec![]);
let memory =
SemanticMemory::new(":memory:", "http://127.0.0.1:1", provider.clone(), "test")
.await
.unwrap();
let cid = memory.sqlite().create_conversation().await.unwrap();
let memory = std::sync::Arc::new(memory);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_memory(
memory.clone(),
cid,
50,
5,
50,
);
agent.handle_feedback("git ok").await.unwrap();
let row: Option<(String,)> =
sqlx::query_as("SELECT outcome FROM skill_outcomes WHERE skill_name = 'git' LIMIT 1")
.fetch_optional(memory.sqlite().pool())
.await
.unwrap();
assert_eq!(
row.map(|r| r.0).as_deref(),
Some("user_approval"),
"neutral/ambiguous feedback must be recorded as user_approval"
);
}
#[test]
fn agent_test_harness_minimal_constructs_agent() {
let harness = QuickTestAgent::minimal("hello from mock");
assert!(!harness.agent.msg.messages.is_empty());
assert_eq!(harness.agent.msg.messages[0].role, Role::System);
}
#[test]
fn agent_test_harness_with_responses_constructs_agent() {
let harness = QuickTestAgent::with_responses(vec!["first".into(), "second".into()]);
assert!(!harness.agent.msg.messages.is_empty());
}
#[test]
fn agent_test_harness_sent_messages_initially_empty() {
let harness = QuickTestAgent::minimal("response");
assert!(harness.sent_messages().is_empty());
}
}
#[cfg(test)]
mod compaction_e2e {
use super::agent_tests::*;
use zeph_llm::LlmError;
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
use zeph_llm::provider::{Message, MessageMetadata, Role};
#[tokio::test]
async fn agent_recovers_from_context_length_exceeded_and_produces_response() {
let provider = AnyProvider::Mock(
MockProvider::with_responses(vec!["final answer".into()])
.with_errors(vec![LlmError::ContextLengthExceeded]),
);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = super::super::Agent::new(provider, channel, registry, None, 5, executor)
.with_context_budget(200_000, 0.20, 0.80, 4, 0);
agent.msg.messages.push(Message {
role: Role::User,
content: "describe the architecture".into(),
parts: vec![],
metadata: MessageMetadata::default(),
});
let result = agent.call_llm_with_retry(2).await.unwrap();
assert!(
result.is_some(),
"agent must produce a response after recovering from context length error"
);
assert_eq!(result.as_deref(), Some("final answer"));
let sent = agent.channel.sent_messages();
assert!(
sent.iter().any(|m| m.contains("final answer")),
"recovered response must be forwarded to the channel; got: {sent:?}"
);
}
#[tokio::test]
async fn subagent_spawn_text_collect_e2e() {
use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
use crate::subagent::hooks::SubagentHooks;
use crate::subagent::{AgentCommand, SubAgentDef, SubAgentManager};
let provider = mock_provider(vec!["task completed successfully".into()]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let mut mgr = SubAgentManager::new(4);
mgr.definitions_mut().push(SubAgentDef {
name: "worker".into(),
description: "A worker bot".into(),
model: None,
tools: ToolPolicy::InheritAll,
disallowed_tools: vec![],
permissions: SubAgentPermissions {
max_turns: 1,
..SubAgentPermissions::default()
},
skills: SkillFilter::default(),
system_prompt: "You are a worker.".into(),
hooks: SubagentHooks::default(),
memory: None,
source: None,
file_path: None,
});
agent.orchestration.subagent_manager = Some(mgr);
let spawn_resp = agent
.handle_agent_command(AgentCommand::Background {
name: "worker".into(),
prompt: "do a task".into(),
})
.await
.expect("Background spawn must return Some");
assert!(
spawn_resp.contains("worker"),
"spawn response must mention agent name; got: {spawn_resp}"
);
assert!(
spawn_resp.contains("started"),
"spawn response must confirm start; got: {spawn_resp}"
);
let short_id = spawn_resp
.split("id: ")
.nth(1)
.expect("response must contain 'id: '")
.trim_end_matches(')')
.trim()
.to_string();
assert!(!short_id.is_empty(), "short_id must not be empty");
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
let full_id = loop {
let mgr = agent.orchestration.subagent_manager.as_ref().unwrap();
let statuses = mgr.statuses();
let found = statuses.iter().find(|(id, _)| id.starts_with(&short_id));
if let Some((id, status)) = found {
match status.state {
crate::subagent::SubAgentState::Completed => break id.clone(),
crate::subagent::SubAgentState::Failed => {
panic!(
"sub-agent reached Failed state unexpectedly: {:?}",
status.last_message
);
}
_ => {}
}
}
assert!(
std::time::Instant::now() <= deadline,
"sub-agent did not complete within timeout"
);
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
};
let result = agent
.orchestration
.subagent_manager
.as_mut()
.unwrap()
.collect(&full_id)
.await
.expect("collect must succeed for completed sub-agent");
assert!(
result.contains("task completed successfully"),
"collected result must contain sub-agent output; got: {result:?}"
);
}
#[tokio::test]
async fn foreground_spawn_secret_bridge_approves() {
use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
use crate::subagent::hooks::SubagentHooks;
use crate::subagent::{AgentCommand, SubAgentDef, SubAgentManager};
let provider = mock_provider(vec![
"[REQUEST_SECRET: api-key]".into(),
"done with secret".into(),
]);
let channel = MockChannel::new(vec![]).with_confirmations(vec![true]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let mut mgr = SubAgentManager::new(4);
mgr.definitions_mut().push(SubAgentDef {
name: "vault-bot".into(),
description: "A bot that requests secrets".into(),
model: None,
tools: ToolPolicy::InheritAll,
disallowed_tools: vec![],
permissions: SubAgentPermissions {
max_turns: 2,
secrets: vec!["api-key".into()],
..SubAgentPermissions::default()
},
skills: SkillFilter::default(),
system_prompt: "You need a secret.".into(),
hooks: SubagentHooks::default(),
memory: None,
source: None,
file_path: None,
});
agent.orchestration.subagent_manager = Some(mgr);
let resp: String = agent
.handle_agent_command(AgentCommand::Spawn {
name: "vault-bot".into(),
prompt: "fetch the api key".into(),
})
.await
.expect("Spawn must return Some");
assert!(
resp.contains("vault-bot"),
"response must mention agent name; got: {resp}"
);
assert!(
resp.contains("completed"),
"sub-agent must complete successfully; got: {resp}"
);
}
use crate::orchestration::{
GraphStatus, PlanCommand, TaskGraph, TaskNode, TaskResult, TaskStatus,
};
fn agent_with_orchestration() -> Agent<MockChannel> {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.orchestration.orchestration_config.enabled = true;
agent
}
fn make_simple_graph(status: GraphStatus) -> TaskGraph {
let mut g = TaskGraph::new("test goal");
let mut node = TaskNode::new(0, "task-0", "do something");
node.status = match status {
GraphStatus::Created => TaskStatus::Pending,
GraphStatus::Running => TaskStatus::Ready,
_ => TaskStatus::Completed,
};
if status == GraphStatus::Running || status == GraphStatus::Completed {
node.result = Some(TaskResult {
output: "done".into(),
artifacts: vec![],
duration_ms: 0,
agent_id: None,
agent_def: None,
});
if status == GraphStatus::Completed {
node.status = TaskStatus::Completed;
}
}
g.tasks.push(node);
g.status = status;
g
}
#[tokio::test]
async fn plan_confirm_no_manager_restores_graph() {
let mut agent = agent_with_orchestration();
let graph = make_simple_graph(GraphStatus::Created);
agent.orchestration.pending_graph = Some(graph);
agent
.handle_plan_command(PlanCommand::Confirm)
.await
.unwrap();
assert!(
agent.orchestration.pending_graph.is_some(),
"graph must be restored when no manager configured"
);
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter().any(|m| m.contains("sub-agent")),
"must send fallback message; got: {msgs:?}"
);
}
#[tokio::test]
async fn plan_confirm_no_pending_graph_sends_message() {
let mut agent = agent_with_orchestration();
agent
.handle_plan_command(PlanCommand::Confirm)
.await
.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter().any(|m| m.contains("No pending plan")),
"must send 'No pending plan' message; got: {msgs:?}"
);
}
#[tokio::test]
async fn plan_confirm_completed_graph_aggregates() {
use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
use crate::subagent::hooks::SubagentHooks;
use crate::subagent::{SubAgentDef, SubAgentManager};
let provider = mock_provider(vec!["synthesis result".into()]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.orchestration.orchestration_config.enabled = true;
let mut mgr = SubAgentManager::new(4);
mgr.definitions_mut().push(SubAgentDef {
name: "worker".into(),
description: "A worker".into(),
model: None,
tools: ToolPolicy::InheritAll,
disallowed_tools: vec![],
permissions: SubAgentPermissions::default(),
skills: SkillFilter::default(),
system_prompt: "You are helpful.".into(),
hooks: SubagentHooks::default(),
memory: None,
source: None,
file_path: None,
});
agent.orchestration.subagent_manager = Some(mgr);
let mut graph = TaskGraph::new("test goal");
let mut node = TaskNode::new(0, "task-0", "already done");
node.status = TaskStatus::Completed;
node.result = Some(TaskResult {
output: "task output".into(),
artifacts: vec![],
duration_ms: 10,
agent_id: None,
agent_def: None,
});
graph.tasks.push(node);
graph.status = GraphStatus::Running;
agent.orchestration.pending_graph = Some(graph);
agent
.handle_plan_command(PlanCommand::Confirm)
.await
.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter().any(|m| m.contains("synthesis result")),
"aggregation synthesis must be sent to user; got: {msgs:?}"
);
assert!(
agent.orchestration.pending_graph.is_none(),
"pending_graph must be cleared after Completed"
);
}
#[tokio::test]
async fn plan_confirm_inline_provider_failure_sends_message() {
use crate::subagent::SubAgentManager;
let provider = mock_provider_failing();
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.orchestration.orchestration_config.enabled = true;
agent.orchestration.subagent_manager = Some(SubAgentManager::new(4));
let mut graph = TaskGraph::new("failing inline goal");
let node = TaskNode::new(0, "task-0", "will fail inline");
graph.tasks.push(node);
graph.status = GraphStatus::Created;
agent.orchestration.pending_graph = Some(graph);
agent
.handle_plan_command(PlanCommand::Confirm)
.await
.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter()
.any(|m| m.contains("failed") || m.contains("Failed")),
"failure message must be sent after inline provider error; got: {msgs:?}"
);
}
#[tokio::test]
async fn plan_list_with_pending_graph_shows_summary() {
let mut agent = agent_with_orchestration();
agent.orchestration.pending_graph = Some(make_simple_graph(GraphStatus::Created));
agent.handle_plan_command(PlanCommand::List).await.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter().any(|m| m.contains("awaiting confirmation")),
"must show 'awaiting confirmation' status; got: {msgs:?}"
);
}
#[tokio::test]
async fn plan_list_no_graph_shows_no_recent() {
let mut agent = agent_with_orchestration();
agent.handle_plan_command(PlanCommand::List).await.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter().any(|m| m.contains("No recent plans")),
"must show 'No recent plans'; got: {msgs:?}"
);
}
#[tokio::test]
async fn plan_retry_resets_running_tasks_to_ready() {
let mut agent = agent_with_orchestration();
let mut graph = TaskGraph::new("retry test");
let mut failed = TaskNode::new(0, "failed-task", "desc");
failed.status = TaskStatus::Failed;
let mut stale_running = TaskNode::new(1, "stale-task", "desc");
stale_running.status = TaskStatus::Running;
stale_running.assigned_agent = Some("old-handle-id".into());
graph.tasks.push(failed);
graph.tasks.push(stale_running);
graph.status = GraphStatus::Failed;
agent.orchestration.pending_graph = Some(graph);
agent
.handle_plan_command(PlanCommand::Retry(None))
.await
.unwrap();
let g = agent
.orchestration
.pending_graph
.as_ref()
.expect("graph must be present after retry");
assert_eq!(
g.tasks[0].status,
TaskStatus::Ready,
"failed task must be reset to Ready"
);
assert_eq!(
g.tasks[1].status,
TaskStatus::Ready,
"stale Running task must be reset to Ready"
);
assert!(
g.tasks[1].assigned_agent.is_none(),
"assigned_agent must be cleared for stale Running task"
);
}
#[tokio::test]
async fn test_secret_drain_after_instant_completion() {
use crate::subagent::def::{SkillFilter, SubAgentPermissions, ToolPolicy};
use crate::subagent::hooks::SubagentHooks;
use crate::subagent::{
PermissionGrants, SecretRequest, SubAgentDef, SubAgentHandle, SubAgentManager,
SubAgentState, SubAgentStatus,
};
use tokio_util::sync::CancellationToken;
let channel = MockChannel::new(vec![]).with_confirmations(vec![true]);
let provider = mock_provider(vec!["synthesis".into()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.orchestration.orchestration_config.enabled = true;
let mut mgr = SubAgentManager::new(4);
mgr.definitions_mut().push(SubAgentDef {
name: "worker".into(),
description: "A worker".into(),
model: None,
tools: ToolPolicy::InheritAll,
disallowed_tools: vec![],
permissions: SubAgentPermissions::default(),
skills: SkillFilter::default(),
system_prompt: "You are helpful.".into(),
hooks: SubagentHooks::default(),
memory: None,
source: None,
file_path: None,
});
let (secret_request_tx, pending_secret_rx) = tokio::sync::mpsc::channel::<SecretRequest>(4);
let (secret_tx, _secret_rx) = tokio::sync::mpsc::channel(1);
let (status_tx, status_rx) = watch::channel(SubAgentStatus {
state: SubAgentState::Completed,
last_message: None,
turns_used: 1,
started_at: std::time::Instant::now(),
});
drop(status_tx);
secret_request_tx
.send(SecretRequest {
secret_key: "api-key".into(),
reason: Some("test drain".into()),
})
.await
.expect("channel must accept request");
drop(secret_request_tx);
let fake_handle_id = "deadbeef-0000-0000-0000-000000000001".to_owned();
let def_clone = mgr.definitions()[0].clone();
mgr.insert_handle_for_test(
fake_handle_id.clone(),
SubAgentHandle {
id: fake_handle_id.clone(),
def: def_clone,
task_id: fake_handle_id.clone(),
state: SubAgentState::Completed,
join_handle: None,
cancel: CancellationToken::new(),
status_rx,
grants: PermissionGrants::default(),
pending_secret_rx,
secret_tx,
started_at_str: "2026-01-01T00:00:00Z".to_owned(),
transcript_dir: None,
},
);
agent.orchestration.subagent_manager = Some(mgr);
let mut graph = TaskGraph::new("instant goal");
let mut node = TaskNode::new(0, "task-0", "already done");
node.status = TaskStatus::Completed;
node.result = Some(TaskResult {
output: "task output".into(),
artifacts: vec![],
duration_ms: 1,
agent_id: None,
agent_def: None,
});
graph.tasks.push(node);
graph.status = GraphStatus::Running;
agent.orchestration.pending_graph = Some(graph);
agent
.handle_plan_command(PlanCommand::Confirm)
.await
.unwrap();
let leftover = agent
.orchestration
.subagent_manager
.as_mut()
.and_then(SubAgentManager::try_recv_secret_request);
assert!(
leftover.is_none(),
"pending secret request must be drained after instant plan completion; \
got: {leftover:?}"
);
}
#[tokio::test]
async fn plan_confirm_no_subagents_executes_inline() {
use crate::subagent::SubAgentManager;
let provider = mock_provider(vec!["inline task output".into(), "synthesis done".into()]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.orchestration.orchestration_config.enabled = true;
agent.orchestration.subagent_manager = Some(SubAgentManager::new(4));
let mut graph = TaskGraph::new("inline goal");
let node = TaskNode::new(0, "task-0", "do something inline");
graph.tasks.push(node);
graph.status = GraphStatus::Created;
agent.orchestration.pending_graph = Some(graph);
agent
.handle_plan_command(PlanCommand::Confirm)
.await
.unwrap();
assert!(
agent.orchestration.pending_graph.is_none(),
"pending_graph must be cleared after inline plan completion"
);
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter().any(|m| m.contains("synthesis done")),
"aggregation synthesis must appear in messages; got: {msgs:?}"
);
}
#[tokio::test]
async fn plan_cancel_during_scheduler_loop_cancels_plan() {
use crate::orchestration::{DagScheduler, OrchestrationConfig, RuleBasedRouter};
use crate::subagent::SubAgentManager;
let channel = MockChannel::new(vec!["/plan cancel".to_owned()]);
let provider = mock_provider(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.orchestration.orchestration_config.enabled = true;
agent.orchestration.subagent_manager = Some(SubAgentManager::new(4));
let mut graph = TaskGraph::new("cancel test goal");
let mut node = TaskNode::new(0, "task-0", "will be canceled");
node.status = TaskStatus::Running;
graph.tasks.push(node);
graph.status = GraphStatus::Running;
let config = OrchestrationConfig {
enabled: true,
..OrchestrationConfig::default()
};
let mut scheduler =
DagScheduler::resume_from(graph, &config, Box::new(RuleBasedRouter), vec![]).unwrap();
let token = tokio_util::sync::CancellationToken::new();
let status = agent
.run_scheduler_loop(&mut scheduler, 1, token)
.await
.unwrap();
assert_eq!(
status,
GraphStatus::Canceled,
"run_scheduler_loop must return Canceled when /plan cancel is received"
);
assert!(
agent
.channel
.statuses
.lock()
.unwrap()
.iter()
.any(|s| s.contains("Canceling plan")),
"must send 'Canceling plan...' status before processing cancel"
);
}
#[tokio::test]
async fn finalize_plan_execution_canceled_does_not_store_graph() {
use crate::subagent::SubAgentManager;
let channel = MockChannel::new(vec![]);
let provider = mock_provider(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (metrics_tx, metrics_rx) = watch::channel(MetricsSnapshot::default());
let mut agent =
Agent::new(provider, channel, registry, None, 5, executor).with_metrics(metrics_tx);
agent.orchestration.orchestration_config.enabled = true;
agent.orchestration.subagent_manager = Some(SubAgentManager::new(4));
let mut graph = TaskGraph::new("cancel finalize test");
let mut completed = TaskNode::new(0, "task-done", "finished");
completed.status = TaskStatus::Completed;
completed.result = Some(TaskResult {
output: "done".into(),
artifacts: vec![],
duration_ms: 10,
agent_id: None,
agent_def: None,
});
let mut canceled = TaskNode::new(1, "task-canceled", "was running");
canceled.status = TaskStatus::Canceled;
graph.tasks.push(completed);
graph.tasks.push(canceled);
graph.status = GraphStatus::Canceled;
agent
.finalize_plan_execution(graph, GraphStatus::Canceled)
.await
.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter()
.any(|m| m.contains("canceled") || m.contains("Canceled")),
"must send a cancellation message; got: {msgs:?}"
);
assert!(
msgs.iter().any(|m| m.contains("1/2")),
"must report completed task count (1/2); got: {msgs:?}"
);
assert!(
agent.orchestration.pending_graph.is_none(),
"canceled plan must NOT be stored in pending_graph"
);
let snapshot = metrics_rx.borrow().clone();
assert_eq!(
snapshot.orchestration.tasks_completed, 1,
"tasks completed before cancellation must be counted in metrics"
);
}
#[tokio::test]
async fn scheduler_loop_queues_non_cancel_message() {
use crate::orchestration::{DagScheduler, OrchestrationConfig, RuleBasedRouter};
use crate::subagent::SubAgentManager;
let channel = MockChannel::new(vec!["hello".to_owned()]);
let provider = mock_provider(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.orchestration.orchestration_config.enabled = true;
agent.orchestration.subagent_manager = Some(SubAgentManager::new(4));
let mut graph = TaskGraph::new("queue test goal");
let mut node = TaskNode::new(0, "task-0", "long running task");
node.status = TaskStatus::Running;
graph.tasks.push(node);
graph.status = GraphStatus::Running;
let config = OrchestrationConfig {
enabled: true,
..OrchestrationConfig::default()
};
let mut scheduler =
DagScheduler::resume_from(graph, &config, Box::new(RuleBasedRouter), vec![]).unwrap();
let token = tokio_util::sync::CancellationToken::new();
let _ = agent
.run_scheduler_loop(&mut scheduler, 1, token)
.await
.unwrap();
assert_eq!(
agent.msg.message_queue.len(),
1,
"non-cancel message must be queued in message_queue; got: {:?}",
agent
.msg
.message_queue
.iter()
.map(|m| &m.text)
.collect::<Vec<_>>()
);
assert_eq!(
agent.msg.message_queue[0].text, "hello",
"queued message text must match the received message"
);
}
#[tokio::test]
async fn scheduler_loop_channel_close_returns_failed() {
use crate::orchestration::{DagScheduler, OrchestrationConfig, RuleBasedRouter};
use crate::subagent::SubAgentManager;
let channel = MockChannel::new(vec![]);
let provider = mock_provider(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.orchestration.orchestration_config.enabled = true;
agent.orchestration.subagent_manager = Some(SubAgentManager::new(4));
let mut graph = TaskGraph::new("channel close test goal");
let mut node = TaskNode::new(0, "task-0", "will fail on channel close");
node.status = TaskStatus::Running;
graph.tasks.push(node);
graph.status = GraphStatus::Running;
let config = OrchestrationConfig {
enabled: true,
..OrchestrationConfig::default()
};
let mut scheduler =
DagScheduler::resume_from(graph, &config, Box::new(RuleBasedRouter), vec![]).unwrap();
let token = tokio_util::sync::CancellationToken::new();
let status = agent
.run_scheduler_loop(&mut scheduler, 1, token)
.await
.unwrap();
assert_eq!(
status,
GraphStatus::Failed,
"run_scheduler_loop must return Failed when channel is closed (Ok(None))"
);
}
#[tokio::test]
async fn plan_status_reflects_graph_status() {
let mut agent = agent_with_orchestration();
agent
.handle_plan_command(PlanCommand::Status(None))
.await
.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter().any(|m| m.contains("No active plan")),
"no plan → 'No active plan'; got: {msgs:?}"
);
let mut agent = agent_with_orchestration();
agent.orchestration.pending_graph = Some(make_simple_graph(GraphStatus::Created));
agent
.handle_plan_command(PlanCommand::Status(None))
.await
.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter().any(|m| m.contains("awaiting confirmation")),
"Created graph → 'awaiting confirmation'; got: {msgs:?}"
);
let mut agent = agent_with_orchestration();
let mut failed_graph = make_simple_graph(GraphStatus::Created);
failed_graph.status = GraphStatus::Failed;
agent.orchestration.pending_graph = Some(failed_graph);
agent
.handle_plan_command(PlanCommand::Status(None))
.await
.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter()
.any(|m| m.contains("failed") || m.contains("Failed")),
"Failed graph → failure message; got: {msgs:?}"
);
let mut agent = agent_with_orchestration();
let mut paused_graph = make_simple_graph(GraphStatus::Created);
paused_graph.status = GraphStatus::Paused;
agent.orchestration.pending_graph = Some(paused_graph);
agent
.handle_plan_command(PlanCommand::Status(None))
.await
.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter()
.any(|m| m.contains("paused") || m.contains("Paused")),
"Paused graph → paused message; got: {msgs:?}"
);
let mut agent = agent_with_orchestration();
let mut completed_graph = make_simple_graph(GraphStatus::Created);
completed_graph.status = GraphStatus::Completed;
agent.orchestration.pending_graph = Some(completed_graph);
agent
.handle_plan_command(PlanCommand::Status(None))
.await
.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter()
.any(|m| m.contains("completed") || m.contains("Completed")),
"Completed graph → completed message; got: {msgs:?}"
);
}
#[tokio::test]
async fn finalize_plan_execution_deadlock_emits_cancelled_message() {
use crate::subagent::SubAgentManager;
let channel = MockChannel::new(vec![]);
let provider = mock_provider(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.orchestration.orchestration_config.enabled = true;
agent.orchestration.subagent_manager = Some(SubAgentManager::new(4));
let mut graph = TaskGraph::new("deadlock goal");
let mut task0 = TaskNode::new(0, "upstream", "will be blocked");
task0.status = TaskStatus::Canceled;
let mut task1 = TaskNode::new(1, "downstream", "never ran");
task1.status = TaskStatus::Canceled;
graph.tasks.push(task0);
graph.tasks.push(task1);
graph.status = GraphStatus::Failed;
agent
.finalize_plan_execution(graph, GraphStatus::Failed)
.await
.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
!msgs.iter().any(|m| m.contains("0/2 tasks failed")),
"misleading '0/2 tasks failed' message must not appear; got: {msgs:?}"
);
assert!(
msgs.iter().any(|m| m.contains("Plan canceled")),
"must contain 'Plan canceled' for pure deadlock; got: {msgs:?}"
);
assert!(
msgs.iter().any(|m| m.contains("2/2")),
"must report 2/2 canceled; got: {msgs:?}"
);
}
#[tokio::test]
async fn plan_goal_increments_api_calls_and_plans_total() {
let valid_plan_json = r#"{"tasks": [
{"task_id": "step-one", "title": "Step one", "description": "Do step one", "depends_on": []}
]}"#
.to_string();
let provider = mock_provider(vec![valid_plan_json]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (tx, rx) = watch::channel(MetricsSnapshot::default());
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
agent.orchestration.orchestration_config.enabled = true;
agent
.orchestration
.orchestration_config
.confirm_before_execute = true;
agent
.handle_plan_command(PlanCommand::Goal("build something".to_owned()))
.await
.unwrap();
let snapshot = rx.borrow().clone();
assert_eq!(
snapshot.api_calls, 1,
"api_calls must be incremented by 1 after a successful plan() call; got: {}",
snapshot.api_calls
);
assert_eq!(
snapshot.orchestration.plans_total, 1,
"plans_total must be incremented by 1 after plan() succeeds; got: {}",
snapshot.orchestration.plans_total
);
assert_eq!(
snapshot.orchestration.tasks_total, 1,
"tasks_total must match the number of tasks in the plan; got: {}",
snapshot.orchestration.tasks_total
);
}
#[tokio::test]
async fn finalize_plan_execution_completed_increments_aggregator_metrics() {
use crate::subagent::SubAgentManager;
let provider = mock_provider(vec!["synthesis".into()]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let (tx, rx) = watch::channel(MetricsSnapshot::default());
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
agent.orchestration.orchestration_config.enabled = true;
agent.orchestration.subagent_manager = Some(SubAgentManager::new(4));
let mut graph = TaskGraph::new("metrics finalize test");
let mut completed = TaskNode::new(0, "task-done", "desc");
completed.status = TaskStatus::Completed;
completed.result = Some(TaskResult {
output: "ok".into(),
artifacts: vec![],
duration_ms: 5,
agent_id: None,
agent_def: None,
});
let mut skipped = TaskNode::new(1, "task-skip", "desc");
skipped.status = TaskStatus::Skipped;
graph.tasks.push(completed);
graph.tasks.push(skipped);
graph.status = GraphStatus::Completed;
agent
.finalize_plan_execution(graph, GraphStatus::Completed)
.await
.unwrap();
let snapshot = rx.borrow().clone();
assert_eq!(
snapshot.api_calls, 1,
"api_calls must be incremented by 1 for the aggregator LLM call; got: {}",
snapshot.api_calls
);
assert_eq!(
snapshot.orchestration.tasks_completed, 1,
"tasks_completed must be 1; got: {}",
snapshot.orchestration.tasks_completed
);
assert_eq!(
snapshot.orchestration.tasks_skipped, 1,
"tasks_skipped must be 1; got: {}",
snapshot.orchestration.tasks_skipped
);
}
#[tokio::test]
async fn finalize_plan_execution_mixed_failed_and_cancelled() {
use crate::subagent::SubAgentManager;
let channel = MockChannel::new(vec![]);
let provider = mock_provider(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
agent.orchestration.orchestration_config.enabled = true;
agent.orchestration.subagent_manager = Some(SubAgentManager::new(4));
let mut graph = TaskGraph::new("mixed goal");
let mut failed = TaskNode::new(0, "failed-task", "really failed");
failed.status = TaskStatus::Failed;
failed.result = Some(TaskResult {
output: "error: something went wrong".into(),
artifacts: vec![],
duration_ms: 100,
agent_id: None,
agent_def: None,
});
let mut cancelled = TaskNode::new(1, "cancelled-task", "never ran");
cancelled.status = TaskStatus::Canceled;
graph.tasks.push(failed);
graph.tasks.push(cancelled);
graph.status = GraphStatus::Failed;
agent
.finalize_plan_execution(graph, GraphStatus::Failed)
.await
.unwrap();
let msgs = agent.channel.sent_messages();
assert!(
msgs.iter().any(|m| m.contains("Plan failed")),
"mixed state must say 'Plan failed'; got: {msgs:?}"
);
assert!(
msgs.iter().any(|m| m.contains("canceled")),
"must mention canceled tasks in mixed state; got: {msgs:?}"
);
assert!(
msgs.iter().any(|m| m.contains("failed-task")),
"must list the failed task; got: {msgs:?}"
);
}
}
#[cfg(test)]
mod secret_reason_truncation {
fn build_prompt(secret_key: &str, reason: Option<&str>) -> String {
format!(
"Sub-agent requests secret '{}'. Allow?{}",
crate::text::truncate_to_chars(secret_key, 100),
reason
.map(|r| format!(" Reason: {}", crate::text::truncate_to_chars(r, 200)))
.unwrap_or_default()
)
}
#[test]
fn reason_short_ascii_unchanged() {
let reason = "need access to external API";
let prompt = build_prompt("MY_SECRET", Some(reason));
assert!(prompt.contains(reason));
}
#[test]
fn reason_over_200_chars_truncated_to_200() {
let reason = "a".repeat(300);
let prompt = build_prompt("MY_SECRET", Some(&reason));
let after = prompt.split("Reason: ").nth(1).unwrap();
assert_eq!(after.chars().count(), 201);
assert!(after.ends_with('\u{2026}'));
}
#[test]
fn reason_exactly_200_chars_unchanged() {
let reason = "b".repeat(200);
let prompt = build_prompt("MY_SECRET", Some(&reason));
let after = prompt.split("Reason: ").nth(1).unwrap();
assert_eq!(after.chars().count(), 200);
assert!(!after.ends_with('\u{2026}'));
}
#[test]
fn reason_multibyte_utf8_truncated_at_char_boundary() {
let reason = "й".repeat(300);
let prompt = build_prompt("MY_SECRET", Some(&reason));
let after = prompt.split("Reason: ").nth(1).unwrap();
assert_eq!(after.chars().count(), 201);
assert!(after.ends_with('\u{2026}'));
assert!(std::str::from_utf8(after.as_bytes()).is_ok());
}
#[test]
fn reason_none_produces_no_reason_suffix() {
let prompt = build_prompt("MY_SECRET", None);
assert!(!prompt.contains("Reason:"));
assert!(prompt.ends_with("Allow?"));
}
#[test]
fn secret_key_short_unchanged() {
let prompt = build_prompt("MY_API_KEY", None);
assert!(prompt.contains("MY_API_KEY"));
}
#[test]
fn secret_key_over_100_chars_truncated() {
let key = "A".repeat(150);
let prompt = build_prompt(&key, None);
let after_quote = prompt.split("secret '").nth(1).unwrap();
let key_in_prompt = after_quote.split("'. Allow?").next().unwrap();
assert_eq!(key_in_prompt.chars().count(), 101);
assert!(key_in_prompt.ends_with('\u{2026}'));
}
#[test]
fn secret_key_exactly_100_chars_unchanged() {
let key = "B".repeat(100);
let prompt = build_prompt(&key, None);
let after_quote = prompt.split("secret '").nth(1).unwrap();
let key_in_prompt = after_quote.split("'. Allow?").next().unwrap();
assert_eq!(key_in_prompt.chars().count(), 100);
assert!(!key_in_prompt.ends_with('\u{2026}'));
}
}
#[cfg(test)]
mod inline_tool_loop_tests {
use std::sync::Mutex;
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
use zeph_llm::provider::{ChatResponse, ToolUseRequest};
use zeph_tools::executor::{ToolCall, ToolError, ToolExecutor, ToolOutput};
use super::super::Agent;
use super::agent_tests::{MockChannel, create_test_registry};
struct CallableToolExecutor {
outputs: Mutex<Vec<Result<Option<ToolOutput>, ToolError>>>,
}
impl CallableToolExecutor {
fn new(outputs: Vec<Result<Option<ToolOutput>, ToolError>>) -> Self {
Self {
outputs: Mutex::new(outputs),
}
}
fn fixed_output(summary: &str) -> Self {
Self::new(vec![Ok(Some(ToolOutput {
tool_name: "test_tool".into(),
summary: summary.to_owned(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
}))])
}
fn failing() -> Self {
Self::new(vec![Err(ToolError::InvalidParams {
message: "tool failed".into(),
})])
}
}
impl ToolExecutor for CallableToolExecutor {
async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
Ok(None)
}
async fn execute_tool_call(
&self,
_call: &ToolCall,
) -> Result<Option<ToolOutput>, ToolError> {
let mut outputs = self.outputs.lock().unwrap();
if outputs.is_empty() {
Ok(None)
} else {
outputs.remove(0)
}
}
}
fn tool_use_response(tool_id: &str, tool_name: &str) -> ChatResponse {
ChatResponse::ToolUse {
text: None,
tool_calls: vec![ToolUseRequest {
id: tool_id.to_owned(),
name: tool_name.to_owned(),
input: serde_json::json!({"arg": "val"}),
}],
thinking_blocks: vec![],
}
}
#[tokio::test]
async fn text_only_response_returns_immediately() {
let (mock, _counter) =
MockProvider::default().with_tool_use(vec![ChatResponse::Text("the answer".into())]);
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = CallableToolExecutor::new(vec![]);
let agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run_inline_tool_loop("what is 2+2?", 10).await;
assert_eq!(result.unwrap(), "the answer");
}
#[tokio::test]
async fn single_tool_iteration_returns_final_text() {
let (mock, counter) = MockProvider::default().with_tool_use(vec![
tool_use_response("call-1", "test_tool"),
ChatResponse::Text("done".into()),
]);
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = CallableToolExecutor::fixed_output("tool result");
let agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run_inline_tool_loop("run a tool", 10).await;
assert_eq!(result.unwrap(), "done");
assert_eq!(*counter.lock().unwrap(), 2);
}
#[tokio::test]
async fn loop_terminates_at_max_iterations() {
let responses: Vec<ChatResponse> = (0..25)
.map(|i| tool_use_response(&format!("call-{i}"), "test_tool"))
.collect();
let (mock, counter) = MockProvider::default().with_tool_use(responses);
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = CallableToolExecutor::fixed_output("ok");
let max_iter = 5usize;
let agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run_inline_tool_loop("loop forever", max_iter).await;
assert!(result.is_ok());
assert_eq!(*counter.lock().unwrap(), u32::try_from(max_iter).unwrap());
}
#[tokio::test]
async fn tool_error_produces_is_error_result_and_loop_continues() {
let (mock, _counter) = MockProvider::default().with_tool_use(vec![
tool_use_response("call-err", "test_tool"),
ChatResponse::Text("recovered".into()),
]);
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = CallableToolExecutor::failing();
let agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run_inline_tool_loop("trigger error", 10).await;
assert_eq!(result.unwrap(), "recovered");
}
#[tokio::test]
async fn multiple_tool_iterations_before_text() {
let (mock, counter) = MockProvider::default().with_tool_use(vec![
tool_use_response("call-1", "test_tool"),
tool_use_response("call-2", "test_tool"),
ChatResponse::Text("all done".into()),
]);
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = CallableToolExecutor::new(vec![
Ok(Some(ToolOutput {
tool_name: "test_tool".into(),
summary: "result-1".into(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
})),
Ok(Some(ToolOutput {
tool_name: "test_tool".into(),
summary: "result-2".into(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
})),
]);
let agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent
.run_inline_tool_loop("two tools then answer", 10)
.await;
assert_eq!(result.unwrap(), "all done");
assert_eq!(*counter.lock().unwrap(), 3);
}
#[tokio::test]
async fn provider_error_is_propagated() {
let provider = AnyProvider::Mock(zeph_llm::mock::MockProvider::failing());
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = CallableToolExecutor::new(vec![]);
let agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run_inline_tool_loop("this will fail", 10).await;
assert!(result.is_err());
}
}
#[cfg(test)]
mod confirmation_propagation_tests {
use std::collections::HashMap;
use std::sync::Mutex;
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
use zeph_llm::provider::{ChatResponse, ToolUseRequest};
use zeph_tools::executor::{ToolCall, ToolError, ToolExecutor, ToolOutput};
use zeph_tools::registry::{InvocationHint, ToolDef};
use super::super::Agent;
use super::agent_tests::{MockChannel, create_test_registry};
struct DagAwareToolExecutor {
results: Mutex<HashMap<String, Result<Option<ToolOutput>, ToolError>>>,
}
impl DagAwareToolExecutor {
fn new(entries: Vec<(&str, Result<Option<ToolOutput>, ToolError>)>) -> Self {
Self {
results: Mutex::new(
entries
.into_iter()
.map(|(k, v)| (k.to_owned(), v))
.collect(),
),
}
}
fn make_output(tool_name: &str, summary: &str) -> ToolOutput {
ToolOutput {
tool_name: tool_name.to_owned(),
summary: summary.to_owned(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
}
}
}
impl ToolExecutor for DagAwareToolExecutor {
async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
Ok(None)
}
fn tool_definitions(&self) -> Vec<ToolDef> {
vec![
ToolDef {
id: "tool_a".into(),
description: "tool a".into(),
schema: schemars::Schema::default(),
invocation: InvocationHint::ToolCall,
},
ToolDef {
id: "tool_b".into(),
description: "tool b".into(),
schema: schemars::Schema::default(),
invocation: InvocationHint::ToolCall,
},
ToolDef {
id: "tool_c".into(),
description: "tool c".into(),
schema: schemars::Schema::default(),
invocation: InvocationHint::ToolCall,
},
]
}
async fn execute_tool_call(
&self,
call: &ToolCall,
) -> Result<Option<ToolOutput>, ToolError> {
let mut results = self.results.lock().unwrap();
results.remove(&call.tool_id).unwrap_or(Ok(None))
}
async fn execute_tool_call_confirmed(
&self,
call: &ToolCall,
) -> Result<Option<ToolOutput>, ToolError> {
Ok(Some(Self::make_output(&call.tool_id, "confirmed")))
}
}
fn dag_tool_use_response() -> ChatResponse {
ChatResponse::ToolUse {
text: None,
tool_calls: vec![
ToolUseRequest {
id: "tool_a_id".to_owned(),
name: "tool_a".to_owned(),
input: serde_json::json!({"arg": "value"}),
},
ToolUseRequest {
id: "tool_b_id".to_owned(),
name: "tool_b".to_owned(),
input: serde_json::json!({"source": "tool_a_id"}),
},
],
thinking_blocks: vec![],
}
}
fn independent_tool_use_response() -> ChatResponse {
ChatResponse::ToolUse {
text: None,
tool_calls: vec![
ToolUseRequest {
id: "tool_a_id".to_owned(),
name: "tool_a".to_owned(),
input: serde_json::json!({"arg": "value"}),
},
ToolUseRequest {
id: "tool_c_id".to_owned(),
name: "tool_c".to_owned(),
input: serde_json::json!({"arg": "independent"}),
},
],
thinking_blocks: vec![],
}
}
#[tokio::test]
async fn confirmation_required_propagates_to_dependent_tier() {
let (mock, _counter) = MockProvider::default().with_tool_use(vec![
dag_tool_use_response(),
ChatResponse::Text("done".into()),
]);
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec!["test".to_string()]).with_confirmations(vec![false]);
let registry = create_test_registry();
let executor = DagAwareToolExecutor::new(vec![(
"tool_a",
Err(ToolError::ConfirmationRequired {
command: "cmd".to_owned(),
}),
)]);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let has_skipped = agent.msg.messages.iter().any(|m| {
m.content
.contains("Skipped: a prerequisite tool failed or requires confirmation")
});
let sent = agent.channel.sent_messages();
let has_skipped_in_sent = sent
.iter()
.any(|m| m.contains("Skipped: a prerequisite tool failed or requires confirmation"));
assert!(
has_skipped || has_skipped_in_sent,
"expected synthetic skip message for tool_b; sent={sent:?}, agent_msgs={:?}",
agent
.msg
.messages
.iter()
.map(|m| &m.content)
.collect::<Vec<_>>()
);
}
#[tokio::test]
async fn independent_tool_not_affected_by_confirmation_required() {
let (mock, _counter) = MockProvider::default().with_tool_use(vec![
independent_tool_use_response(),
ChatResponse::Text("done".into()),
]);
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec!["test".to_string()]).with_confirmations(vec![false]);
let registry = create_test_registry();
let executor = DagAwareToolExecutor::new(vec![
(
"tool_a",
Err(ToolError::ConfirmationRequired {
command: "cmd".to_owned(),
}),
),
(
"tool_c",
Ok(Some(DagAwareToolExecutor::make_output(
"tool_c",
"independent result",
))),
),
]);
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(result.is_ok());
let has_independent_result = agent
.msg
.messages
.iter()
.any(|m| m.content.contains("independent result"));
assert!(
has_independent_result,
"expected tool_c (independent) to execute normally; agent_msgs={:?}",
agent
.msg
.messages
.iter()
.map(|m| &m.content)
.collect::<Vec<_>>()
);
}
}
#[cfg(test)]
mod shutdown_summary_tests {
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
use zeph_llm::provider::{Message, MessageMetadata, Role};
use zeph_memory::semantic::SemanticMemory;
use super::super::Agent;
use super::agent_tests::{MockChannel, MockToolExecutor, create_test_registry, mock_provider};
#[tokio::test]
async fn shutdown_summary_disabled_skips_llm() {
let (mock, recorded) = MockProvider::default().with_recording();
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_shutdown_summary_config(false, 4, 20, 10);
for i in 0..5 {
agent.msg.messages.push(Message {
role: Role::User,
content: format!("user message {i}"),
parts: vec![],
metadata: MessageMetadata::default(),
});
}
agent.maybe_store_shutdown_summary().await;
assert!(
recorded.lock().unwrap().is_empty(),
"LLM must not be called when shutdown_summary is disabled"
);
}
#[tokio::test]
async fn shutdown_summary_no_memory_skips_llm() {
let (mock, recorded) = MockProvider::default().with_recording();
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_shutdown_summary_config(true, 4, 20, 10);
for i in 0..5 {
agent.msg.messages.push(Message {
role: Role::User,
content: format!("user message {i}"),
parts: vec![],
metadata: MessageMetadata::default(),
});
}
agent.maybe_store_shutdown_summary().await;
assert!(
recorded.lock().unwrap().is_empty(),
"LLM must not be called when no memory backend is attached"
);
}
#[tokio::test]
async fn shutdown_summary_too_few_user_messages_skips_llm() {
use std::sync::Arc;
let (mock, recorded) = MockProvider::default().with_recording();
let provider = AnyProvider::Mock(mock.clone());
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let memory = SemanticMemory::new(
":memory:",
"http://127.0.0.1:1",
AnyProvider::Mock(MockProvider::default()),
"test-model",
)
.await
.unwrap();
let cid = memory.sqlite().create_conversation().await.unwrap();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_memory(Arc::new(memory), cid, 100, 5, 1000)
.with_shutdown_summary_config(true, 4, 20, 10);
agent.msg.messages.push(Message {
role: Role::User,
content: "first user message".into(),
parts: vec![],
metadata: MessageMetadata::default(),
});
agent.msg.messages.push(Message {
role: Role::Assistant,
content: "assistant reply".into(),
parts: vec![],
metadata: MessageMetadata::default(),
});
agent.msg.messages.push(Message {
role: Role::User,
content: "second user message".into(),
parts: vec![],
metadata: MessageMetadata::default(),
});
agent.maybe_store_shutdown_summary().await;
assert!(
recorded.lock().unwrap().is_empty(),
"LLM must not be called when user message count is below min_messages"
);
}
#[tokio::test]
async fn shutdown_summary_only_counts_user_role_messages() {
use std::sync::Arc;
let (mock, recorded) = MockProvider::default().with_recording();
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let memory = SemanticMemory::new(
":memory:",
"http://127.0.0.1:1",
AnyProvider::Mock(MockProvider::default()),
"test-model",
)
.await
.unwrap();
let cid = memory.sqlite().create_conversation().await.unwrap();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_memory(Arc::new(memory), cid, 100, 5, 1000)
.with_shutdown_summary_config(true, 4, 20, 10);
for _ in 0..8 {
agent.msg.messages.push(Message {
role: Role::Assistant,
content: "assistant reply".into(),
parts: vec![],
metadata: MessageMetadata::default(),
});
}
for i in 0..3 {
agent.msg.messages.push(Message {
role: Role::User,
content: format!("user message {i}"),
parts: vec![],
metadata: MessageMetadata::default(),
});
}
agent.maybe_store_shutdown_summary().await;
assert!(
recorded.lock().unwrap().is_empty(),
"assistant messages must not count toward min_messages threshold"
);
}
#[tokio::test]
async fn with_shutdown_summary_config_builder_sets_fields() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_shutdown_summary_config(false, 7, 15, 10);
assert!(!agent.memory_state.shutdown_summary);
assert_eq!(agent.memory_state.shutdown_summary_min_messages, 7);
assert_eq!(agent.memory_state.shutdown_summary_max_messages, 15);
assert_eq!(agent.memory_state.shutdown_summary_timeout_secs, 10);
}
#[tokio::test]
async fn shutdown_summary_default_config_values() {
let provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let agent = Agent::new(provider, channel, registry, None, 5, executor);
assert!(
agent.memory_state.shutdown_summary,
"shutdown_summary must be enabled by default"
);
assert_eq!(
agent.memory_state.shutdown_summary_min_messages, 4,
"default min_messages must be 4"
);
assert_eq!(
agent.memory_state.shutdown_summary_max_messages, 20,
"default max_messages must be 20"
);
assert_eq!(
agent.memory_state.shutdown_summary_timeout_secs, 10,
"default timeout_secs must be 10"
);
}
#[tokio::test]
async fn doom_loop_agent_breaks_on_identical_native_tool_outputs() {
use super::super::DOOM_LOOP_WINDOW;
use zeph_llm::mock::MockProvider;
use zeph_llm::provider::{ChatResponse, ToolUseRequest};
let tool_responses: Vec<ChatResponse> = (0..DOOM_LOOP_WINDOW + 1)
.map(|i| ChatResponse::ToolUse {
text: None,
tool_calls: vec![ToolUseRequest {
id: format!("toolu_{i:06}"),
name: "stub_tool".to_owned(),
input: serde_json::json!({ "iteration": i }),
}],
thinking_blocks: vec![],
})
.collect();
let (mock, _counter) = MockProvider::default().with_tool_use(tool_responses);
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec!["trigger doom loop".to_owned()]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let mut agent = Agent::new(provider, channel, registry, None, 5, executor);
let result = agent.run().await;
assert!(
result.is_ok(),
"agent must not return an error on doom loop"
);
let sent = agent.channel.sent_messages();
assert!(
sent.iter()
.any(|m| m.contains("Stopping: detected repeated identical tool outputs.")),
"agent must send the doom-loop stopping message; got: {sent:?}"
);
}
#[tokio::test]
async fn filter_stats_metrics_increment_on_normal_native_tool_path() {
use crate::metrics::MetricsSnapshot;
use tokio::sync::watch;
use zeph_llm::mock::MockProvider;
use zeph_llm::provider::{ChatResponse, ToolUseRequest};
use zeph_tools::executor::{FilterStats, ToolCall, ToolError, ToolExecutor, ToolOutput};
struct FilteredToolExecutor;
impl ToolExecutor for FilteredToolExecutor {
async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
Ok(None)
}
async fn execute_tool_call(
&self,
_call: &ToolCall,
) -> Result<Option<ToolOutput>, ToolError> {
Ok(Some(ToolOutput {
tool_name: "shell".to_owned(),
summary: "filtered output".to_owned(),
blocks_executed: 1,
filter_stats: Some(FilterStats {
raw_chars: 400,
filtered_chars: 200,
raw_lines: 20,
filtered_lines: 10,
confidence: None,
command: None,
kept_lines: vec![],
}),
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
}))
}
}
let (mock, _counter) = MockProvider::default().with_tool_use(vec![
ChatResponse::ToolUse {
text: None,
tool_calls: vec![ToolUseRequest {
id: "call-1".to_owned(),
name: "shell".to_owned(),
input: serde_json::json!({"cmd": "ls"}),
}],
thinking_blocks: vec![],
},
ChatResponse::Text("done".to_owned()),
]);
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec!["run a tool".to_owned()]);
let registry = create_test_registry();
let executor = FilteredToolExecutor;
let (tx, rx) = watch::channel(MetricsSnapshot::default());
let mut agent = Agent::new(provider, channel, registry, None, 5, executor).with_metrics(tx);
agent.run().await.expect("agent run must succeed");
let snap: MetricsSnapshot = rx.borrow().clone();
assert!(
snap.filter_applications > 0,
"filter_applications must be > 0"
);
assert!(snap.filter_raw_tokens > 0, "filter_raw_tokens must be > 0");
assert!(
snap.filter_saved_tokens > 0,
"filter_saved_tokens must be > 0"
);
assert_eq!(snap.filter_total_commands, 1);
assert_eq!(snap.filter_filtered_commands, 1);
}
#[tokio::test]
async fn filter_stats_metrics_recorded_in_self_reflection_remaining_tools_loop() {
use crate::config::LearningConfig;
use crate::metrics::MetricsSnapshot;
use std::sync::Mutex;
use tokio::sync::watch;
use zeph_llm::mock::MockProvider;
use zeph_llm::provider::{ChatResponse, ToolUseRequest};
use zeph_tools::executor::{FilterStats, ToolCall, ToolError, ToolExecutor, ToolOutput};
struct TwoToolExecutor {
call_count: Mutex<u32>,
}
impl ToolExecutor for TwoToolExecutor {
async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
Ok(None)
}
async fn execute_tool_call(
&self,
call: &ToolCall,
) -> Result<Option<ToolOutput>, ToolError> {
let n = {
let mut g = self.call_count.lock().unwrap();
*g += 1;
*g
};
if n == 1 || call.tool_id == "tool_a_id" {
Ok(Some(ToolOutput {
tool_name: "tool_a".to_owned(),
summary: "[error] command failed [exit code 1]".to_owned(),
blocks_executed: 1,
filter_stats: None,
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
}))
} else {
Ok(Some(ToolOutput {
tool_name: "tool_b".to_owned(),
summary: "filtered output".to_owned(),
blocks_executed: 1,
filter_stats: Some(FilterStats {
raw_chars: 400,
filtered_chars: 200,
raw_lines: 20,
filtered_lines: 10,
confidence: None,
command: None,
kept_lines: vec![],
}),
diff: None,
streamed: false,
terminal_id: None,
locations: None,
raw_response: None,
}))
}
}
}
let (mock, _counter) = MockProvider::with_responses(vec!["reflection ok".to_owned()])
.with_tool_use(vec![ChatResponse::ToolUse {
text: None,
tool_calls: vec![
ToolUseRequest {
id: "tool_a_id".to_owned(),
name: "tool_a".to_owned(),
input: serde_json::json!({}),
},
ToolUseRequest {
id: "tool_b_id".to_owned(),
name: "tool_b".to_owned(),
input: serde_json::json!({}),
},
],
thinking_blocks: vec![],
}]);
let provider = AnyProvider::Mock(mock);
let channel = MockChannel::new(vec!["run tools".to_owned()]);
let registry = create_test_registry();
let executor = TwoToolExecutor {
call_count: Mutex::new(0),
};
let (tx, rx) = watch::channel(MetricsSnapshot::default());
let mut agent = Agent::new(provider, channel, registry, None, 5, executor)
.with_metrics(tx)
.with_learning(LearningConfig {
enabled: true,
..LearningConfig::default()
});
agent
.skill_state
.active_skill_names
.push("test-skill".to_owned());
agent.run().await.expect("agent run must succeed");
let snap: MetricsSnapshot = rx.borrow().clone();
assert!(
snap.filter_applications > 0,
"filter_applications must be > 0 after remaining-tools loop processes tool_b"
);
assert!(
snap.filter_raw_tokens > 0,
"filter_raw_tokens must be > 0 after remaining-tools loop processes tool_b"
);
assert!(
snap.filter_saved_tokens > 0,
"filter_saved_tokens must be > 0 after remaining-tools loop processes tool_b"
);
}
#[tokio::test]
async fn correction_stored_when_learning_disabled() {
use crate::config::LearningConfig;
use std::sync::Arc;
use zeph_llm::any::AnyProvider;
use zeph_llm::mock::MockProvider;
use zeph_memory::semantic::SemanticMemory;
let mock = MockProvider::default();
let provider = AnyProvider::Mock(mock);
let memory: SemanticMemory =
SemanticMemory::new(":memory:", "http://127.0.0.1:1", provider, "test-model")
.await
.expect("in-memory SQLite must init");
let memory = Arc::new(memory);
let agent_provider = mock_provider(vec![]);
let channel = MockChannel::new(vec![]);
let registry = create_test_registry();
let executor = MockToolExecutor::no_tools();
let conv_id = memory.sqlite().create_conversation().await.unwrap();
let mut agent = Agent::new(agent_provider, channel, registry, None, 5, executor)
.with_learning(LearningConfig {
enabled: false,
correction_detection: true,
..LearningConfig::default()
})
.with_memory(Arc::clone(&memory), conv_id, 20, 5, 10);
agent
.detect_and_record_corrections("no that's wrong", Some(conv_id))
.await;
let rows = memory.sqlite().load_recent_corrections(10).await.unwrap();
assert_eq!(
rows.len(),
1,
"correction must be stored even when learning is disabled"
);
assert_eq!(rows[0].correction_kind, "explicit_rejection");
assert_eq!(rows[0].correction_text, "no that's wrong");
}
}