use std::rc::Rc;
use crate::agent_events::AgentEvent;
use crate::bridge::HostBridge;
use crate::orchestration::{AutoCompactConfig, TurnPolicy};
use crate::value::{VmError, VmValue};
use super::super::daemon::{detect_watch_changes, DaemonLoopConfig};
use super::super::helpers::transcript_event;
use super::helpers::{
action_turn_nudge, append_host_messages_to_recorded, append_message_to_contexts,
assistant_history_text, daemon_snapshot_from_state, inject_queued_user_messages,
interpret_post_turn_callback_result, maybe_auto_compact_agent_messages,
maybe_persist_daemon_snapshot, runtime_feedback_message, should_stop_after_successful_tools,
};
use super::llm_call::LlmCallResult;
use super::state::AgentLoopState;
use super::tool_dispatch::ToolDispatchResult;
pub(super) enum IterationOutcome {
Continue,
Break,
}
pub(super) struct PostTurnContext<'a> {
pub bridge: &'a Option<Rc<HostBridge>>,
pub session_id: &'a str,
pub tool_format: &'a str,
pub has_tools: bool,
pub max_nudges: usize,
pub persistent: bool,
pub daemon: bool,
pub turn_policy: Option<&'a TurnPolicy>,
pub stop_after_successful_tools: &'a Option<Vec<String>>,
pub post_turn_callback: &'a Option<VmValue>,
pub auto_compact: &'a Option<AutoCompactConfig>,
pub daemon_config: &'a DaemonLoopConfig,
pub custom_nudge: &'a Option<String>,
pub iteration: usize,
}
async fn emit_post_agent_turn_hook(
session_id: &str,
_iteration: usize,
turn: serde_json::Value,
) -> Result<(), VmError> {
crate::orchestration::run_lifecycle_hooks(
crate::orchestration::HookEvent::PostAgentTurn,
&serde_json::json!({
"event": crate::orchestration::HookEvent::PostAgentTurn.as_str(),
"session": {
"id": session_id,
},
"turn": turn,
}),
)
.await
}
pub(super) async fn run_post_turn(
state: &mut AgentLoopState,
opts: &mut super::super::api::LlmCallOptions,
ctx: &PostTurnContext<'_>,
call_result: &mut LlmCallResult,
dispatch: Option<ToolDispatchResult>,
) -> Result<IterationOutcome, VmError> {
let iteration = ctx.iteration;
if let Some(dispatch) = dispatch {
state.all_tools_used.extend(dispatch.tools_used_this_iter);
if ctx.tool_format != "native" && !dispatch.observations.is_empty() {
append_message_to_contexts(
&mut state.visible_messages,
&mut state.recorded_messages,
runtime_feedback_message("tool_results", dispatch.observations.trim_end()),
);
}
let finish_step_messages = inject_queued_user_messages(
ctx.bridge.as_ref(),
&mut state.visible_messages,
crate::bridge::DeliveryCheckpoint::AfterCurrentOperation,
)
.await?;
append_host_messages_to_recorded(&mut state.recorded_messages, &finish_step_messages);
for message in &finish_step_messages {
state.transcript_events.push(transcript_event(
"host_input",
"user",
"public",
&message.content,
Some(serde_json::json!({"delivery": format!("{:?}", message.mode)})),
));
}
if !finish_step_messages.is_empty() {
state.consecutive_text_only = 0;
}
if call_result.tool_calls.len() == 1 {
state.consecutive_single_tool_turns += 1;
} else {
state.consecutive_single_tool_turns = 0;
}
let successful_tool_names: Vec<&str> = dispatch
.tool_results_this_iter
.iter()
.filter(|result| result["status"].as_str() == Some("ok"))
.filter_map(|result| result["tool_name"].as_str())
.collect();
for tool_name in &successful_tool_names {
if !state
.successful_tools_used
.iter()
.any(|existing| existing == tool_name)
{
state.successful_tools_used.push((*tool_name).to_string());
}
}
let tool_names: Vec<&str> = call_result
.tool_calls
.iter()
.filter_map(|tc| tc["name"].as_str())
.collect();
let turn_info = serde_json::json!({
"tool_names": tool_names,
"tool_results": dispatch.tool_results_this_iter,
"successful_tool_names": successful_tool_names,
"tool_count": call_result.tool_calls.len(),
"iteration": iteration,
"failed": dispatch
.tool_results_this_iter
.iter()
.any(|result| result["status"].as_str() != Some("ok"))
|| !call_result.tool_parse_errors.is_empty(),
"consecutive_single_tool_turns": state.consecutive_single_tool_turns,
"session_tools_used": state.all_tools_used,
"session_successful_tools": state.successful_tools_used,
});
emit_post_agent_turn_hook(ctx.session_id, iteration, turn_info.clone()).await?;
super::emit_agent_event(&AgentEvent::TurnEnd {
session_id: ctx.session_id.to_string(),
iteration,
turn_info: turn_info.clone(),
})
.await;
if let Some(stop_tools) = ctx.stop_after_successful_tools.as_ref() {
if should_stop_after_successful_tools(&dispatch.tool_results_this_iter, stop_tools) {
crate::events::log_debug(
"agent.stop_after_successful_tools",
&format!("iter={iteration} requested stage stop after successful tool turn"),
);
return Ok(IterationOutcome::Break);
}
}
if let Some(VmValue::Closure(closure)) = ctx.post_turn_callback.as_ref() {
let mut cb_vm = crate::vm::clone_async_builtin_child_vm().ok_or_else(|| {
VmError::Runtime(
"post_turn_callback requires an async builtin VM context".to_string(),
)
})?;
let info_vm = crate::stdlib::json_to_vm_value(&turn_info);
let cb_result = cb_vm.call_closure_pub(closure, &[info_vm]).await?;
let (message, stop) = interpret_post_turn_callback_result(&cb_result);
if let Some(msg) = message {
if !msg.trim().is_empty() {
let feedback = runtime_feedback_message("post_turn_callback", msg.as_str());
append_message_to_contexts(
&mut state.visible_messages,
&mut state.recorded_messages,
feedback,
);
}
}
if stop {
crate::events::log_debug(
"agent.post_turn_callback",
&format!("iter={iteration} post_turn_callback requested stage stop"),
);
return Ok(IterationOutcome::Break);
}
}
if let Some(ref ac) = ctx.auto_compact {
let mut est = crate::orchestration::estimate_message_tokens(&state.visible_messages);
if let Some(ref sys) = opts.system {
est += sys.len() / 4;
}
if est > ac.token_threshold {
let mut compact_opts = opts.clone();
compact_opts.messages = state.visible_messages.clone();
let original_message_count = state.visible_messages.len();
if let Some(summary) = crate::orchestration::auto_compact_messages(
&mut state.visible_messages,
ac,
Some(&compact_opts),
)
.await?
{
let estimated_tokens_after =
crate::orchestration::estimate_message_tokens(&state.visible_messages);
let archived_messages = original_message_count
.saturating_sub(state.visible_messages.len())
.saturating_add(1);
super::super::trace::emit_agent_event(
super::super::trace::AgentTraceEvent::ContextCompaction {
archived_messages,
new_summary_len: summary.len(),
iteration,
},
);
state.transcript_events.push(crate::llm::helpers::transcript_event(
"compaction",
"system",
"internal",
"Transcript compacted during agent loop",
Some(serde_json::json!({
"mode": "auto",
"strategy": crate::orchestration::compact_strategy_name(&ac.compact_strategy),
"archived_messages": archived_messages,
"estimated_tokens_before": est,
"estimated_tokens_after": estimated_tokens_after,
})),
));
super::emit_agent_event(
&crate::agent_events::AgentEvent::TranscriptCompacted {
session_id: ctx.session_id.to_string(),
mode: "auto".to_string(),
strategy: crate::orchestration::compact_strategy_name(
&ac.compact_strategy,
)
.to_string(),
archived_messages,
estimated_tokens_before: est,
estimated_tokens_after,
snapshot_asset_id: None,
},
)
.await;
let merged = match state.transcript_summary.take() {
Some(existing)
if !existing.trim().is_empty() && existing.trim() != summary.trim() =>
{
format!("{existing}\n\n{summary}")
}
Some(_) | None => summary,
};
state.transcript_summary = Some(merged);
}
}
}
if !call_result.tool_parse_errors.is_empty() {
let error_msg = call_result.tool_parse_errors.join("\n\n");
append_message_to_contexts(
&mut state.visible_messages,
&mut state.recorded_messages,
runtime_feedback_message("parse_error", error_msg),
);
}
if call_result.sentinel_hit {
if !call_result.tool_parse_errors.is_empty() {
crate::events::log_warn(
"llm.tool",
&format!(
"{} tool-call parse error(s) suppressed by sentinel: {}",
call_result.tool_parse_errors.len(),
call_result.tool_parse_errors.join("; ")
),
);
}
return Ok(IterationOutcome::Break);
}
return Ok(IterationOutcome::Continue);
}
if call_result.sentinel_hit {
let assistant_content_for_history = assistant_history_text(
call_result.canonical_history.as_deref(),
&call_result.text,
call_result.tool_parse_errors.len(),
&call_result.tool_calls,
);
append_message_to_contexts(
&mut state.visible_messages,
&mut state.recorded_messages,
serde_json::json!({
"role": "assistant",
"content": assistant_content_for_history,
}),
);
emit_post_agent_turn_hook(
ctx.session_id,
iteration,
serde_json::json!({
"iteration": iteration,
"failed": false,
"sentinel_hit": true,
"text": call_result.text.clone(),
}),
)
.await?;
return Ok(IterationOutcome::Break);
}
if !call_result.tool_parse_errors.is_empty() {
let error_msg = call_result.tool_parse_errors.join("\n\n");
append_message_to_contexts(
&mut state.visible_messages,
&mut state.recorded_messages,
runtime_feedback_message("parse_error", error_msg),
);
call_result.tool_parse_errors.clear();
state.consecutive_text_only = 0;
emit_post_agent_turn_hook(
ctx.session_id,
iteration,
serde_json::json!({
"iteration": iteration,
"failed": true,
"parse_error": true,
"text": call_result.text.clone(),
}),
)
.await?;
return Ok(IterationOutcome::Continue);
}
let action_required_before_answer = ctx.tool_format == "native"
&& ctx
.turn_policy
.is_some_and(|policy| policy.require_action_or_yield)
&& state.all_tools_used.is_empty();
if action_required_before_answer {
state.consecutive_text_only += 1;
if state.consecutive_text_only > ctx.max_nudges {
state.final_status = "stuck";
let tail_excerpt = {
let raw = call_result.text.trim();
if raw.chars().count() > 240 {
let truncated: String = raw.chars().take(240).collect();
format!("{truncated}…")
} else {
raw.to_string()
}
};
super::emit_agent_event(&AgentEvent::LoopStuck {
session_id: ctx.session_id.to_string(),
max_nudges: ctx.max_nudges,
last_iteration: iteration,
tail_excerpt,
})
.await;
emit_post_agent_turn_hook(
ctx.session_id,
iteration,
serde_json::json!({
"iteration": iteration,
"failed": true,
"status": "stuck",
"text": call_result.text.clone(),
}),
)
.await?;
return Ok(IterationOutcome::Break);
}
let guidance = action_turn_nudge(
ctx.tool_format,
ctx.has_tools,
ctx.turn_policy,
call_result.prose_too_long,
)
.unwrap_or_else(|| {
if ctx.has_tools {
"Use a tool call to make progress.".to_string()
} else {
"Make concrete progress in your reply.".to_string()
}
});
append_message_to_contexts(
&mut state.visible_messages,
&mut state.recorded_messages,
runtime_feedback_message(
"action_required",
format!(
"You returned assistant text/JSON before using any tool. \
This stage requires at least one tool action before an answer counts. \
That response was not accepted. {guidance}"
),
),
);
emit_post_agent_turn_hook(
ctx.session_id,
iteration,
serde_json::json!({
"iteration": iteration,
"failed": true,
"action_required": true,
"text": call_result.text.clone(),
}),
)
.await?;
return Ok(IterationOutcome::Continue);
}
let assistant_content_for_history = assistant_history_text(
call_result.canonical_history.as_deref(),
&call_result.text,
call_result.tool_parse_errors.len(),
&call_result.tool_calls,
);
append_message_to_contexts(
&mut state.visible_messages,
&mut state.recorded_messages,
serde_json::json!({
"role": "assistant",
"content": assistant_content_for_history,
}),
);
if !ctx.persistent && !ctx.daemon {
emit_post_agent_turn_hook(
ctx.session_id,
iteration,
serde_json::json!({
"iteration": iteration,
"failed": false,
"text": call_result.text.clone(),
}),
)
.await?;
return Ok(IterationOutcome::Break);
}
if ctx.daemon && !ctx.persistent {
state.daemon_state = "idle".to_string();
if let Some(bridge) = ctx.bridge.as_ref() {
bridge.set_daemon_idle(true);
}
if ctx.daemon_config.consolidate_on_idle {
maybe_auto_compact_agent_messages(
opts,
ctx.auto_compact,
&mut state.visible_messages,
&mut state.transcript_summary,
)
.await?;
}
let idle_snapshot = daemon_snapshot_from_state(
&state.daemon_state,
&state.visible_messages,
&state.recorded_messages,
&state.transcript_summary,
&state.transcript_events,
&state.total_text,
&state.last_iteration_text,
&state.all_tools_used,
&state.rejected_tools,
&state.deferred_user_messages,
state.total_iterations,
state.idle_backoff_ms,
state.last_run_exit_code,
&state.daemon_watch_state,
);
state.daemon_snapshot_path =
maybe_persist_daemon_snapshot(ctx.daemon_config, &idle_snapshot)?
.or(state.daemon_snapshot_path.take());
if !ctx.daemon_config.has_wake_source(ctx.bridge.is_some()) {
state.final_status = "idle";
emit_post_agent_turn_hook(
ctx.session_id,
iteration,
serde_json::json!({
"iteration": iteration,
"failed": false,
"status": "idle",
"text": call_result.text.clone(),
}),
)
.await?;
if let Some(bridge) = ctx.bridge.as_ref() {
bridge.set_daemon_idle(false);
}
return Ok(IterationOutcome::Break);
}
let watchdog_limit = ctx.daemon_config.idle_watchdog_attempts;
let watchdog_started = std::time::Instant::now();
let mut idle_null_attempts: usize = 0;
loop {
if let Some(bridge) = ctx.bridge.as_ref() {
bridge.notify(
"agent/idle",
serde_json::json!({
"iteration": state.total_iterations,
"backoff_ms": state.idle_backoff_ms,
"persist_path": state.daemon_snapshot_path,
"watch_paths": ctx.daemon_config.watch_paths,
}),
);
}
tokio::time::sleep(tokio::time::Duration::from_millis(
ctx.daemon_config.idle_wait_ms(state.idle_backoff_ms),
))
.await;
let resumed = ctx
.bridge
.as_ref()
.is_some_and(|bridge| bridge.take_resume_signal());
let idle_messages = inject_queued_user_messages(
ctx.bridge.as_ref(),
&mut state.visible_messages,
crate::bridge::DeliveryCheckpoint::InterruptImmediate,
)
.await?;
append_host_messages_to_recorded(&mut state.recorded_messages, &idle_messages);
let changed_paths = if ctx.daemon_config.watch_paths.is_empty() {
Vec::new()
} else {
detect_watch_changes(
&ctx.daemon_config.watch_paths,
&mut state.daemon_watch_state,
)
};
for message in &idle_messages {
state.transcript_events.push(transcript_event(
"host_input",
"user",
"public",
&message.content,
Some(serde_json::json!({"delivery": format!("{:?}", message.mode)})),
));
}
let wake_reason = if !idle_messages.is_empty() {
Some(("message", None))
} else if resumed {
Some(("resume", None))
} else if !changed_paths.is_empty() {
Some((
"watch",
Some(format!(
"Daemon wake: watched paths changed: {}. Re-check the task state and act only if something actually changed.",
changed_paths.join(", ")
)),
))
} else if ctx.daemon_config.wake_interval_ms.is_some() {
Some((
"timer",
Some(
"Daemon timer wake fired. Re-check for background work and only act when there is new information or a pending follow-up."
.to_string(),
),
))
} else {
None
};
if let Some((reason, wake_message)) = wake_reason {
if let Some(message) = wake_message {
append_message_to_contexts(
&mut state.visible_messages,
&mut state.recorded_messages,
runtime_feedback_message(reason, message),
);
}
state.transcript_events.push(transcript_event(
"daemon_wake",
"system",
"internal",
reason,
Some(serde_json::json!({
"reason": reason,
"watch_paths": changed_paths,
"resumed": resumed,
})),
));
state.daemon_state = "active".to_string();
state.consecutive_text_only = 0;
state.idle_backoff_ms = 100;
if let Some(bridge) = ctx.bridge.as_ref() {
bridge.set_daemon_idle(false);
}
break;
}
idle_null_attempts += 1;
if let Some(limit) = watchdog_limit {
if idle_null_attempts >= limit {
let elapsed_ms = watchdog_started.elapsed().as_millis() as u64;
super::emit_agent_event(&AgentEvent::DaemonWatchdogTripped {
session_id: ctx.session_id.to_string(),
attempts: idle_null_attempts,
elapsed_ms,
})
.await;
state.final_status = "watchdog";
emit_post_agent_turn_hook(
ctx.session_id,
iteration,
serde_json::json!({
"iteration": iteration,
"failed": true,
"status": "watchdog",
"text": call_result.text.clone(),
}),
)
.await?;
if let Some(bridge) = ctx.bridge.as_ref() {
bridge.set_daemon_idle(false);
}
return Ok(IterationOutcome::Break);
}
}
ctx.daemon_config
.update_idle_backoff(&mut state.idle_backoff_ms);
}
emit_post_agent_turn_hook(
ctx.session_id,
iteration,
serde_json::json!({
"iteration": iteration,
"failed": false,
"status": "daemon_wake",
"text": call_result.text.clone(),
}),
)
.await?;
return Ok(IterationOutcome::Continue);
}
let finish_step_messages = inject_queued_user_messages(
ctx.bridge.as_ref(),
&mut state.visible_messages,
crate::bridge::DeliveryCheckpoint::AfterCurrentOperation,
)
.await?;
append_host_messages_to_recorded(&mut state.recorded_messages, &finish_step_messages);
for message in &finish_step_messages {
state.transcript_events.push(transcript_event(
"host_input",
"user",
"public",
&message.content,
Some(serde_json::json!({"delivery": format!("{:?}", message.mode)})),
));
}
if !finish_step_messages.is_empty() {
state.consecutive_text_only = 0;
state.idle_backoff_ms = 100;
emit_post_agent_turn_hook(
ctx.session_id,
iteration,
serde_json::json!({
"iteration": iteration,
"failed": false,
"status": "host_input",
"text": call_result.text.clone(),
}),
)
.await?;
return Ok(IterationOutcome::Continue);
}
state.consecutive_text_only += 1;
if state.consecutive_text_only > ctx.max_nudges {
state.final_status = "stuck";
let tail_excerpt = {
let raw = call_result.text.trim();
if raw.chars().count() > 240 {
let truncated: String = raw.chars().take(240).collect();
format!("{truncated}…")
} else {
raw.to_string()
}
};
super::emit_agent_event(&AgentEvent::LoopStuck {
session_id: ctx.session_id.to_string(),
max_nudges: ctx.max_nudges,
last_iteration: iteration,
tail_excerpt,
})
.await;
emit_post_agent_turn_hook(
ctx.session_id,
iteration,
serde_json::json!({
"iteration": iteration,
"failed": true,
"status": "stuck",
"text": call_result.text.clone(),
}),
)
.await?;
return Ok(IterationOutcome::Break);
}
let nudge = action_turn_nudge(
ctx.tool_format,
ctx.has_tools,
ctx.turn_policy,
call_result.prose_too_long,
)
.or_else(|| ctx.custom_nudge.clone())
.unwrap_or_else(|| {
if ctx.has_tools {
"Continue — use a tool call to make progress.".to_string()
} else {
"Continue — make progress in your reply.".to_string()
}
});
append_message_to_contexts(
&mut state.visible_messages,
&mut state.recorded_messages,
runtime_feedback_message("nudge", nudge),
);
emit_post_agent_turn_hook(
ctx.session_id,
iteration,
serde_json::json!({
"iteration": iteration,
"failed": false,
"status": "continue",
"text": call_result.text.clone(),
}),
)
.await?;
Ok(IterationOutcome::Continue)
}