use phi_core::agent_loop::{agent_loop, AgentLoopConfig};
use phi_core::agents::SubAgentTool;
use phi_core::provider::mock::*;
use phi_core::provider::{MockProvider, ModelConfig};
use phi_core::{LlmMessage, *};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
fn make_config(provider: Arc<dyn phi_core::provider::StreamProvider>) -> AgentLoopConfig {
AgentLoopConfig {
model_config: ModelConfig::anthropic("mock", "mock", "test"),
provider_override: Some(provider),
thinking_level: ThinkingLevel::Off,
max_tokens: None,
temperature: None,
convert_to_llm: None,
transform_context: None,
get_steering_messages: None,
get_follow_up_messages: None,
context_config: None,
execution_limits: None,
cache_config: CacheConfig::default(),
tool_execution: ToolExecutionStrategy::default(),
tool_timeout: None,
response_format: phi_core::provider::ResponseFormat::Text,
retry_config: phi_core::RetryConfig::default(),
before_turn: None,
after_turn: None,
on_error: None,
before_loop: None,
after_loop: None,
before_tool_execution: None,
after_tool_execution: None,
before_tool_execution_update: None,
after_tool_execution_update: None,
before_compaction_start: None,
after_compaction_end: None,
input_filters: vec![],
first_turn_trigger: TurnTrigger::User,
config_id: None,
context_translation: None,
prun_pending: None,
}
}
fn collect_events(mut rx: mpsc::UnboundedReceiver<AgentEvent>) -> Vec<AgentEvent> {
let mut events = Vec::new();
while let Ok(e) = rx.try_recv() {
events.push(e);
}
events
}
#[tokio::test]
async fn test_sub_agent_basic() {
let sub_provider = Arc::new(MockProvider::text("Research result: Rust is great"));
let sub_agent = SubAgentTool::new("researcher", ModelConfig::anthropic("mock", "mock", "test"))
.with_provider_override(sub_provider)
.with_description("Researches topics")
.with_system_prompt("You are a research assistant.");
let params = serde_json::json!({"task": "Tell me about Rust"});
let result = sub_agent
.execute(
params,
ToolContext {
tool_call_id: "tc-1".into(),
tool_name: "researcher".into(),
cancel: CancellationToken::new(),
on_update: None,
on_progress: None,
},
)
.await
.expect("sub-agent should succeed");
let text = match &result.content[0] {
Content::Text { text } => text.as_str(),
_ => panic!("Expected text content"),
};
assert_eq!(text, "Research result: Rust is great");
assert_eq!(result.details["sub_agent"], "researcher");
}
struct EchoTool;
#[async_trait::async_trait]
impl AgentTool for EchoTool {
fn name(&self) -> &str {
"echo"
}
fn label(&self) -> &str {
"Echo"
}
fn description(&self) -> &str {
"Echoes input"
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({
"type": "object",
"properties": {
"text": {"type": "string"}
}
})
}
async fn execute(
&self,
params: serde_json::Value,
_ctx: ToolContext,
) -> Result<ToolResult, ToolError> {
let text = params["text"].as_str().unwrap_or("(empty)");
Ok(ToolResult {
content: vec![Content::Text {
text: format!("echoed: {}", text),
}],
details: serde_json::Value::Null,
child_loop_id: None,
})
}
}
#[tokio::test]
async fn test_sub_agent_with_tools() {
let sub_provider = Arc::new(MockProvider::new(vec![
MockResponse::ToolCalls(vec![MockToolCall {
name: "echo".into(),
arguments: serde_json::json!({"text": "hello"}),
}]),
MockResponse::Text("The echo returned: echoed: hello".into()),
]));
let echo_tool: Arc<dyn AgentTool> = Arc::new(EchoTool);
let sub_agent = SubAgentTool::new("echo_agent", ModelConfig::anthropic("mock", "mock", "test"))
.with_provider_override(sub_provider)
.with_description("Agent that echoes")
.with_system_prompt("Use the echo tool.")
.with_tools(vec![echo_tool]);
let params = serde_json::json!({"task": "Echo hello"});
let result = sub_agent
.execute(
params,
ToolContext {
tool_call_id: "tc-1".into(),
tool_name: "echo_agent".into(),
cancel: CancellationToken::new(),
on_update: None,
on_progress: None,
},
)
.await
.expect("sub-agent should succeed");
let text = match &result.content[0] {
Content::Text { text } => text.as_str(),
_ => panic!("Expected text content"),
};
assert_eq!(text, "The echo returned: echoed: hello");
}
#[tokio::test]
async fn test_sub_agent_cancellation() {
let sub_provider = Arc::new(MockProvider::text("Should not appear"));
let sub_agent = SubAgentTool::new(
"cancelled_agent",
ModelConfig::anthropic("mock", "mock", "test"),
)
.with_provider_override(sub_provider);
let cancel = CancellationToken::new();
cancel.cancel();
let params = serde_json::json!({"task": "Do something"});
let result = sub_agent
.execute(
params,
ToolContext {
tool_call_id: "tc-1".into(),
tool_name: "cancelled_agent".into(),
cancel,
on_update: None,
on_progress: None,
},
)
.await
.expect("should return a result even when cancelled");
let text = match &result.content[0] {
Content::Text { text } => text.as_str(),
_ => panic!("Expected text content"),
};
assert_ne!(
text, "Should not appear",
"Sub-agent ran despite cancellation"
);
}
#[tokio::test]
async fn test_sub_agent_max_turns() {
let sub_provider = Arc::new(MockProvider::new(vec![
MockResponse::ToolCalls(vec![MockToolCall {
name: "echo".into(),
arguments: serde_json::json!({"text": "loop"}),
}]),
MockResponse::Text("Should not reach".into()),
]));
let echo_tool: Arc<dyn AgentTool> = Arc::new(EchoTool);
let sub_agent = SubAgentTool::new(
"limited_agent",
ModelConfig::anthropic("mock", "mock", "test"),
)
.with_provider_override(sub_provider)
.with_tools(vec![echo_tool])
.with_max_turns(1);
let params = serde_json::json!({"task": "Keep going"});
let result = sub_agent
.execute(
params,
ToolContext {
tool_call_id: "tc-1".into(),
tool_name: "limited_agent".into(),
cancel: CancellationToken::new(),
on_update: None,
on_progress: None,
},
)
.await
.expect("sub-agent should succeed");
let text = match &result.content[0] {
Content::Text { text } => text.as_str(),
_ => panic!("Expected text content"),
};
assert_ne!(text, "Should not reach");
}
#[tokio::test]
async fn test_sub_agent_parallel() {
struct SlowProvider {
delay_ms: u64,
text: String,
}
#[async_trait::async_trait]
impl phi_core::provider::StreamProvider for SlowProvider {
fn provider_id(&self) -> &str {
"mock"
}
async fn stream(
&self,
_config: phi_core::provider::StreamConfig,
tx: tokio::sync::mpsc::UnboundedSender<phi_core::provider::StreamEvent>,
cancel: tokio_util::sync::CancellationToken,
) -> Result<Message, phi_core::provider::ProviderError> {
if cancel.is_cancelled() {
return Err(phi_core::provider::ProviderError::Cancelled);
}
tokio::time::sleep(std::time::Duration::from_millis(self.delay_ms)).await;
let _ = tx.send(phi_core::provider::StreamEvent::Start);
let _ = tx.send(phi_core::provider::StreamEvent::TextDelta {
content_index: 0,
delta: self.text.clone(),
});
let msg = Message::Assistant {
content: vec![Content::Text {
text: self.text.clone(),
}],
stop_reason: StopReason::Stop,
model: "slow".into(),
provider: "slow".into(),
usage: Usage::default(),
timestamp: phi_core::now_ms(),
error_message: None,
};
let _ = tx.send(phi_core::provider::StreamEvent::Done {
message: msg.clone(),
});
Ok(msg)
}
}
let sub_a = SubAgentTool::new("agent_a", ModelConfig::anthropic("slow", "slow", "test"))
.with_provider_override(Arc::new(SlowProvider {
delay_ms: 50,
text: "Result A".into(),
}));
let sub_b = SubAgentTool::new("agent_b", ModelConfig::anthropic("slow", "slow", "test"))
.with_provider_override(Arc::new(SlowProvider {
delay_ms: 50,
text: "Result B".into(),
}));
let parent_provider = MockProvider::new(vec![
MockResponse::ToolCalls(vec![
MockToolCall {
name: "agent_a".into(),
arguments: serde_json::json!({"task": "Do A"}),
},
MockToolCall {
name: "agent_b".into(),
arguments: serde_json::json!({"task": "Do B"}),
},
]),
MockResponse::Text("Both sub-agents completed.".into()),
]);
let config = make_config(Arc::new(parent_provider));
let mut context = AgentContext {
system_prompt: "You are a coordinator.".into(),
messages: Vec::new(),
tools: vec![Arc::new(sub_a), Arc::new(sub_b)],
agent_id: None,
session_id: None,
loop_id: None,
parent_loop_id: None,
continuation_kind: None,
session: None,
user_context: Vec::new(),
inrun_context: Vec::new(),
};
let prompt = AgentMessage::Llm(LlmMessage::new(Message::user("Run both agents")));
let (tx, rx) = mpsc::unbounded_channel();
let cancel = CancellationToken::new();
let start = std::time::Instant::now();
let new_messages = agent_loop(vec![prompt], &mut context, &config, tx, cancel).await;
let elapsed = start.elapsed();
let _events = collect_events(rx);
let tool_results: Vec<_> = new_messages
.iter()
.filter(|m| m.role() == "toolResult")
.collect();
assert_eq!(tool_results.len(), 2);
assert!(
elapsed.as_millis() < 130,
"Parallel sub-agents took {}ms, expected <130ms",
elapsed.as_millis()
);
}
#[tokio::test]
async fn test_sub_agent_event_forwarding() {
let sub_provider = Arc::new(MockProvider::text("Sub-agent done"));
let sub_agent = SubAgentTool::new(
"streaming_agent",
ModelConfig::anthropic("mock", "mock", "test"),
)
.with_provider_override(sub_provider);
let params = serde_json::json!({"task": "Do work"});
let updates: Arc<std::sync::Mutex<Vec<String>>> = Arc::new(std::sync::Mutex::new(Vec::new()));
let updates_clone = updates.clone();
let on_update: ToolUpdateFn = Arc::new(move |result: ToolResult| {
if let Some(Content::Text { text }) = result.content.first() {
updates_clone.lock().unwrap().push(text.clone());
}
});
let result = sub_agent
.execute(
params,
ToolContext {
tool_call_id: "tc-1".into(),
tool_name: "streaming_agent".into(),
cancel: CancellationToken::new(),
on_update: Some(on_update),
on_progress: None,
},
)
.await
.expect("sub-agent should succeed");
let text = match &result.content[0] {
Content::Text { text } => text.as_str(),
_ => panic!("Expected text content"),
};
assert_eq!(text, "Sub-agent done");
let collected = updates.lock().unwrap();
assert!(
!collected.is_empty(),
"Expected on_update to receive streaming events"
);
assert!(
collected.iter().any(|t| t.contains("Sub-agent done")),
"Expected text delta in updates, got: {:?}",
*collected
);
}
#[tokio::test]
async fn test_sub_agent_missing_task_parameter() {
let sub_provider = Arc::new(MockProvider::text("Should not run"));
let sub_agent = SubAgentTool::new("test_agent", ModelConfig::anthropic("mock", "mock", "test"))
.with_provider_override(sub_provider);
let params = serde_json::json!({});
let result = sub_agent
.execute(
params,
ToolContext {
tool_call_id: "tc-1".into(),
tool_name: "test_agent".into(),
cancel: CancellationToken::new(),
on_update: None,
on_progress: None,
},
)
.await;
assert!(result.is_err());
match result.unwrap_err() {
ToolError::InvalidArgs(msg) => assert!(msg.contains("task")),
other => panic!("Expected InvalidArgs, got: {:?}", other),
}
}
#[tokio::test]
async fn test_sub_agent_in_parent_loop() {
let sub_provider = Arc::new(MockProvider::text("42 is the answer"));
let sub_agent = SubAgentTool::new("calculator", ModelConfig::anthropic("mock", "mock", "test"))
.with_provider_override(sub_provider)
.with_description("Calculates things");
let parent_provider = MockProvider::new(vec![
MockResponse::ToolCalls(vec![MockToolCall {
name: "calculator".into(),
arguments: serde_json::json!({"task": "What is 6*7?"}),
}]),
MockResponse::Text("The calculator says: 42 is the answer".into()),
]);
let config = make_config(Arc::new(parent_provider));
let mut context = AgentContext {
system_prompt: "You are a coordinator.".into(),
messages: Vec::new(),
tools: vec![Arc::new(sub_agent)],
agent_id: None,
session_id: None,
loop_id: None,
parent_loop_id: None,
continuation_kind: None,
session: None,
user_context: Vec::new(),
inrun_context: Vec::new(),
};
let prompt = AgentMessage::Llm(LlmMessage::new(Message::user("What is 6*7?")));
let (tx, rx) = mpsc::unbounded_channel();
let cancel = CancellationToken::new();
let new_messages = agent_loop(vec![prompt], &mut context, &config, tx, cancel).await;
let events = collect_events(rx);
assert_eq!(new_messages.len(), 4);
assert_eq!(new_messages[0].role(), "user");
assert_eq!(new_messages[1].role(), "assistant");
assert_eq!(new_messages[2].role(), "toolResult");
assert_eq!(new_messages[3].role(), "assistant");
if let AgentMessage::Llm(LlmMessage {
message: Message::ToolResult { content, .. },
..
}) = &new_messages[2]
{
let text = match &content[0] {
Content::Text { text } => text.as_str(),
_ => panic!("Expected text content"),
};
assert_eq!(text, "42 is the answer");
} else {
panic!("Expected tool result message");
}
let has_tool_start = events
.iter()
.any(|e| matches!(e, AgentEvent::ToolExecutionStart { tool_name, .. } if tool_name == "calculator"));
let has_tool_end = events
.iter()
.any(|e| matches!(e, AgentEvent::ToolExecutionEnd { tool_name, .. } if tool_name == "calculator"));
assert!(has_tool_start);
assert!(has_tool_end);
}