use phi_core::agent_loop::script_callback::detect_interpreter;
use phi_core::agents::{Agent, BasicAgent};
use phi_core::provider::{MockProvider, ModelConfig};
use phi_core::types::*;
use std::path::Path;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
#[test]
fn with_revert_render_policy_propagates_to_loop_config() {
let tight_policy = RevertRenderPolicy {
lesson_window_turns: 1,
lesson_window_count: 1,
};
let agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"))
.with_revert_tool()
.with_revert_render_policy(tight_policy);
let config = agent.build_config().expect("BasicAgent has a model_config");
assert_eq!(
config.revert_render_policy.lesson_window_turns, 1,
"custom lesson_window_turns must propagate to AgentLoopConfig"
);
assert_eq!(
config.revert_render_policy.lesson_window_count, 1,
"custom lesson_window_count must propagate to AgentLoopConfig"
);
}
#[tokio::test]
async fn revert_render_policy_strips_old_lesson_tags_from_llm_prompt() {
use phi_core::agent_loop::{agent_loop, AgentLoopConfig};
use phi_core::provider::{
mock::MockResponse, ProviderError, StreamConfig, StreamEvent, StreamProvider,
};
use std::sync::Mutex;
use tokio_util::sync::CancellationToken;
struct CapturingMockProvider {
inner: MockProvider,
captured: Mutex<Vec<Vec<Message>>>,
}
impl CapturingMockProvider {
fn new(responses: Vec<MockResponse>) -> Self {
Self {
inner: MockProvider::new(responses),
captured: Mutex::new(Vec::new()),
}
}
fn captured_messages(&self) -> Vec<Vec<Message>> {
self.captured.lock().unwrap().clone()
}
}
#[async_trait::async_trait]
impl StreamProvider for CapturingMockProvider {
fn provider_id(&self) -> &str {
"capturing-mock"
}
async fn stream(
&self,
config: StreamConfig,
tx: mpsc::UnboundedSender<StreamEvent>,
cancel: CancellationToken,
) -> Result<Message, ProviderError> {
self.captured.lock().unwrap().push(config.messages.clone());
self.inner.stream(config, tx, cancel).await
}
}
fn assistant_with_lesson_tag(
text: &str,
ts: u64,
node: NodeId,
parent: Option<NodeId>,
tag_turn: u32,
) -> AgentMessage {
let mut am = AgentMessage::Llm(
LlmMessage::new(Message::Assistant {
content: vec![Content::Text {
text: text.to_string(),
}],
stop_reason: StopReason::Stop,
model: "test".into(),
provider: "test".into(),
usage: Usage::default(),
timestamp: ts,
error_message: None,
})
.with_node_identity(node, parent),
);
if let AgentMessage::Llm(ref mut lm) = am {
lm.tags.push(NodeTag::new(
TagKind::Lesson,
format!("L-at-turn-{}", tag_turn),
tag_turn,
vec![],
));
}
am
}
let provider = Arc::new(CapturingMockProvider::new(vec![
MockResponse::ToolCalls(vec![phi_core::provider::mock::MockToolCall {
name: "noop".into(),
arguments: serde_json::json!({}),
}]),
MockResponse::Text("final".into()),
]));
let policy = RevertRenderPolicy {
lesson_window_turns: 0,
lesson_window_count: 1,
};
let captured_post_policy: Arc<Mutex<Vec<AgentMessage>>> = Arc::new(Mutex::new(Vec::new()));
let captured_clone = captured_post_policy.clone();
let turn_snapshots: Arc<Mutex<Vec<Vec<AgentMessage>>>> = Arc::new(Mutex::new(Vec::new()));
let snapshots_clone = turn_snapshots.clone();
let probe_convert = Arc::new(move |msgs: &[AgentMessage]| -> Vec<Message> {
*captured_clone.lock().unwrap() = msgs.to_vec();
snapshots_clone.lock().unwrap().push(msgs.to_vec());
msgs.iter().filter_map(|m| m.as_llm().cloned()).collect()
});
let config = AgentLoopConfig {
model_config: ModelConfig::anthropic("mock", "mock-model", "test-key"),
provider_override: Some(provider.clone()),
thinking_level: ThinkingLevel::Off,
max_tokens: None,
temperature: None,
convert_to_llm: Some(probe_convert),
transform_context: None,
get_steering_messages: None,
get_follow_up_messages: None,
context_config: None,
execution_limits: None,
cache_config: CacheConfig::default(),
tool_execution: ToolExecutionStrategy::default(),
tool_timeout: None,
response_format: phi_core::provider::ResponseFormat::Text,
retry_config: phi_core::RetryConfig::default(),
before_turn: None,
after_turn: None,
on_error: None,
before_loop: None,
after_loop: None,
before_tool_execution: None,
after_tool_execution: None,
before_tool_execution_update: None,
after_tool_execution_update: None,
before_compaction_start: None,
after_compaction_end: None,
input_filters: vec![],
first_turn_trigger: TurnTrigger::User,
config_id: None,
context_translation: None,
prun_pending: None,
revert_pending: None,
current_tool: None,
revert_render_policy: policy,
};
struct NoopTool;
#[async_trait::async_trait]
impl AgentTool for NoopTool {
fn name(&self) -> &str {
"noop"
}
fn label(&self) -> &str {
"Noop"
}
fn description(&self) -> &str {
"no-op"
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({"type": "object"})
}
async fn execute(
&self,
_params: serde_json::Value,
_ctx: ToolContext,
) -> Result<ToolResult, ToolError> {
Ok(ToolResult {
content: vec![Content::Text { text: "ok".into() }],
details: serde_json::Value::Null,
child_loop_id: None,
})
}
}
let mut context = AgentContext {
system_prompt: "system".into(),
messages: vec![
AgentMessage::Llm(
LlmMessage::new(Message::User {
content: vec![Content::Text {
text: "start".into(),
}],
timestamp: 0,
})
.with_node_identity(NodeId(0), None),
),
assistant_with_lesson_tag("a", 1, NodeId(1), Some(NodeId(0)), 0),
assistant_with_lesson_tag("b", 2, NodeId(2), Some(NodeId(1)), 1),
],
tools: vec![Arc::new(NoopTool)],
agent_id: Some("agent-1".into()),
session_id: Some("session-1".into()),
loop_id: Some("session-1.test.1".into()),
parent_loop_id: None,
continuation_kind: None,
session: None,
user_context: Vec::new(),
inrun_context: Vec::new(),
active_node_id: Some(NodeId(2)), next_node_id: 3,
};
let (tx, _rx) = mpsc::unbounded_channel();
let cancel = CancellationToken::new();
let new_user = vec![AgentMessage::Llm(LlmMessage::new(Message::user(
"next-turn",
)))];
agent_loop(new_user, &mut context, &config, tx, cancel).await;
let snapshot = captured_post_policy.lock().unwrap().clone();
assert!(
!snapshot.is_empty(),
"convert_to_llm probe must have received at least one message"
);
let captured_wire = provider.captured_messages();
assert!(
!captured_wire.is_empty(),
"provider must have received at least one StreamConfig"
);
let surviving_lesson_count: usize = snapshot
.iter()
.filter_map(|m| match m {
AgentMessage::Llm(lm) => {
Some(lm.tags.iter().filter(|t| t.kind == TagKind::Lesson).count())
}
_ => None,
})
.sum();
assert_eq!(
surviving_lesson_count, 1,
"tight RevertRenderPolicy must strip the 2 older Lesson tags and keep only the newest one \
(window=1, count_cap=1, current_turn=0, trunk lesson turns=[0,1,2])"
);
let all_snapshots = turn_snapshots.lock().unwrap();
assert_eq!(
all_snapshots.len(),
2,
"the loop must have invoked convert_to_llm once per turn (2 turns: tool call + text)"
);
let manual = context.build_trunk_context_with_policy(&policy, 1);
let manual_lesson_count: usize = manual
.iter()
.filter_map(|m| match m {
AgentMessage::Llm(lm) => {
Some(lm.tags.iter().filter(|t| t.kind == TagKind::Lesson).count())
}
_ => None,
})
.sum();
assert_eq!(
surviving_lesson_count, manual_lesson_count,
"policy applied by the loop's streaming dispatch at turn_index=1 must match the standalone filter at current_turn=1"
);
}
#[tokio::test]
async fn current_tool_timeout_visible_during_tool_execution() {
use phi_core::provider::mock::{MockResponse, MockToolCall};
struct SleepyTool {
observed_during_exec: Arc<std::sync::Mutex<Option<Duration>>>,
agent_slot: Arc<std::sync::Mutex<Option<phi_core::context::CurrentToolExecution>>>,
}
#[async_trait::async_trait]
impl AgentTool for SleepyTool {
fn name(&self) -> &str {
"sleepy"
}
fn label(&self) -> &str {
"Sleepy"
}
fn description(&self) -> &str {
"sleeps briefly then returns"
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({"type": "object"})
}
fn timeout(&self) -> Option<Duration> {
Some(Duration::from_secs(7))
}
async fn execute(
&self,
_params: serde_json::Value,
_ctx: ToolContext,
) -> Result<ToolResult, ToolError> {
tokio::time::sleep(Duration::from_millis(5)).await;
{
let guard = self.agent_slot.lock().unwrap();
let observed = guard.as_ref().and_then(|t| t.timeout);
let mut out = self.observed_during_exec.lock().unwrap();
*out = observed;
}
tokio::time::sleep(Duration::from_millis(5)).await;
Ok(ToolResult {
content: vec![Content::Text {
text: "done".into(),
}],
details: serde_json::Value::Null,
child_loop_id: None,
})
}
}
let observed = Arc::new(std::sync::Mutex::new(None::<Duration>));
let provider = MockProvider::new(vec![
MockResponse::ToolCalls(vec![MockToolCall {
name: "sleepy".into(),
arguments: serde_json::json!({}),
}]),
MockResponse::Text("done".into()),
]);
let mut agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"))
.with_provider_override(Arc::new(provider));
assert!(agent.current_tool_timeout().is_none());
let config_slot = agent
.build_config()
.unwrap()
.current_tool
.expect("BasicAgent installs the shared slot");
let tool = Arc::new(SleepyTool {
observed_during_exec: observed.clone(),
agent_slot: config_slot,
});
agent.set_tools(vec![tool]);
let (tx, _rx) = mpsc::unbounded_channel();
agent
.prompt_messages_with_sender(
vec![AgentMessage::Llm(LlmMessage::new(Message::user("go")))],
tx,
)
.await;
let mid_run = *observed.lock().unwrap();
assert_eq!(
mid_run,
Some(Duration::from_secs(7)),
"during tool execution the agent's current_tool_timeout slot must reflect the tool's effective timeout"
);
assert!(
agent.current_tool_timeout().is_none(),
"after tool execution the slot must be cleared back to None"
);
}
#[tokio::test]
async fn async_update_hooks_fire_through_sync_bridge() {
use phi_core::provider::mock::{MockResponse, MockToolCall};
struct UpdatingTool;
#[async_trait::async_trait]
impl AgentTool for UpdatingTool {
fn name(&self) -> &str {
"updater"
}
fn label(&self) -> &str {
"Updater"
}
fn description(&self) -> &str {
"emits 3 updates"
}
fn parameters_schema(&self) -> serde_json::Value {
serde_json::json!({"type": "object"})
}
async fn execute(
&self,
_params: serde_json::Value,
ctx: ToolContext,
) -> Result<ToolResult, ToolError> {
if let Some(ref on_update) = ctx.on_update {
for i in 0..3 {
on_update(ToolResult {
content: vec![Content::Text {
text: format!("partial-{}", i),
}],
details: serde_json::Value::Null,
child_loop_id: None,
});
}
}
Ok(ToolResult {
content: vec![Content::Text {
text: "done".into(),
}],
details: serde_json::Value::Null,
child_loop_id: None,
})
}
}
let before_count = Arc::new(AtomicU32::new(0));
let after_count = Arc::new(AtomicU32::new(0));
{
let provider = MockProvider::new(vec![
MockResponse::ToolCalls(vec![MockToolCall {
name: "updater".into(),
arguments: serde_json::json!({}),
}]),
MockResponse::Text("done".into()),
]);
let bc = before_count.clone();
let ac = after_count.clone();
let mut agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"))
.with_provider_override(Arc::new(provider))
.on_before_tool_execution_update(move |_name, _id, _text| {
bc.fetch_add(1, Ordering::SeqCst);
true })
.on_after_tool_execution_update(move |_name, _id, _text| {
ac.fetch_add(1, Ordering::SeqCst);
});
agent.set_tools(vec![Arc::new(UpdatingTool)]);
let (tx, _rx) = mpsc::unbounded_channel();
agent
.prompt_messages_with_sender(
vec![AgentMessage::Llm(LlmMessage::new(Message::user("go")))],
tx,
)
.await;
assert_eq!(
before_count.load(Ordering::SeqCst),
3,
"before-update hook must fire once per on_update call"
);
assert_eq!(
after_count.load(Ordering::SeqCst),
3,
"after-update hook must fire once per emitted ToolExecutionUpdate event"
);
}
let veto_before = Arc::new(AtomicU32::new(0));
let veto_after = Arc::new(AtomicU32::new(0));
{
let provider = MockProvider::new(vec![
MockResponse::ToolCalls(vec![MockToolCall {
name: "updater".into(),
arguments: serde_json::json!({}),
}]),
MockResponse::Text("done".into()),
]);
let bc = veto_before.clone();
let ac = veto_after.clone();
let mut agent = BasicAgent::new(ModelConfig::anthropic("mock", "mock-model", "test-key"))
.with_provider_override(Arc::new(provider))
.on_before_tool_execution_update(move |_name, _id, _text| {
bc.fetch_add(1, Ordering::SeqCst);
false })
.on_after_tool_execution_update(move |_name, _id, _text| {
ac.fetch_add(1, Ordering::SeqCst);
});
agent.set_tools(vec![Arc::new(UpdatingTool)]);
let (tx, _rx) = mpsc::unbounded_channel();
agent
.prompt_messages_with_sender(
vec![AgentMessage::Llm(LlmMessage::new(Message::user("go")))],
tx,
)
.await;
assert_eq!(
veto_before.load(Ordering::SeqCst),
3,
"before-update hook fires once per on_update even when vetoing"
);
assert_eq!(
veto_after.load(Ordering::SeqCst),
0,
"after-update hook is skipped when before-update vetoes"
);
}
}
#[test]
fn detect_interpreter_is_publicly_reachable_and_correct() {
assert_eq!(
detect_interpreter(Path::new("hook.py")),
vec!["python3".to_string()],
);
assert_eq!(
detect_interpreter(Path::new("hook.sh")),
vec!["sh".to_string()],
);
assert_eq!(
detect_interpreter(Path::new("hook")),
vec!["sh".to_string()],
);
assert_eq!(
detect_interpreter(Path::new("hook.unknown")),
vec!["sh".to_string()],
);
}