use super::AgentLoopError;
use crate::contracts::runtime::phase::StepContext;
use crate::contracts::runtime::state::{reduce_state_actions, ScopeContext};
use crate::contracts::runtime::tool_call::tool_call_states_from_state;
use crate::contracts::runtime::tool_call::Tool;
use crate::contracts::runtime::SuspendedCall;
use crate::contracts::thread::{Message, Role};
use crate::contracts::RunAction;
use crate::contracts::RunContext;
use crate::runtime::control::{ToolCallResume, ToolCallState, ToolCallStatus};
use tirea_state::{Patch, TrackedPatch};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
const PENDING_APPROVAL_PLACEHOLDER_PREFIX: &str = "Tool '";
const PENDING_APPROVAL_PLACEHOLDER_SUFFIX: &str = "' is awaiting approval. Execution paused.";
pub(super) fn pending_approval_placeholder_message(tool_name: &str) -> String {
format!("{PENDING_APPROVAL_PLACEHOLDER_PREFIX}{tool_name}{PENDING_APPROVAL_PLACEHOLDER_SUFFIX}")
}
fn is_pending_approval_placeholder_content(content: &str) -> bool {
content
.strip_prefix(PENDING_APPROVAL_PLACEHOLDER_PREFIX)
.and_then(|rest| rest.strip_suffix(PENDING_APPROVAL_PLACEHOLDER_SUFFIX))
.is_some_and(|tool_name| !tool_name.is_empty())
}
fn is_pending_approval_placeholder(msg: &Message) -> bool {
msg.role == Role::Tool && is_pending_approval_placeholder_content(&msg.content)
}
pub(super) fn build_messages(step: &StepContext<'_>, system_prompt: &str) -> Vec<Message> {
let mut messages = Vec::new();
let system_ctx = &step.inference.system_context[..];
let session_ctx = &step.inference.session_context[..];
let system = if system_ctx.is_empty() {
system_prompt.to_string()
} else {
format!("{}\n\n{}", system_prompt, system_ctx.join("\n"))
};
if !system.is_empty() {
messages.push(Message::system(system));
}
for ctx in session_ctx {
messages.push(Message::system(ctx.clone()));
}
let known_tool_call_ids: HashSet<&str> = step
.messages()
.iter()
.filter(|m| m.role == Role::Assistant)
.filter_map(|m| m.tool_calls.as_ref())
.flatten()
.map(|tc| tc.id.as_str())
.collect();
let mut pending_placeholder_ids = HashSet::new();
let mut resolved_result_ids = HashSet::new();
for msg in step.messages() {
let Some(tc_id) = msg.tool_call_id.as_deref() else {
continue;
};
if !known_tool_call_ids.contains(tc_id) {
continue;
}
if is_pending_approval_placeholder(msg) {
pending_placeholder_ids.insert(tc_id.to_string());
} else if msg.role == Role::Tool {
resolved_result_ids.insert(tc_id.to_string());
}
}
let superseded_pending_ids: HashSet<String> = pending_placeholder_ids
.intersection(&resolved_result_ids)
.cloned()
.collect();
for msg in step.messages() {
if msg.role == Role::Tool {
if let Some(ref tc_id) = msg.tool_call_id {
if !known_tool_call_ids.contains(tc_id.as_str()) {
continue;
}
if superseded_pending_ids.contains(tc_id) && is_pending_approval_placeholder(msg) {
continue;
}
}
}
messages.push((**msg).clone());
}
patch_dangling_tool_calls(&mut messages);
messages
}
fn patch_dangling_tool_calls(messages: &mut Vec<Message>) {
let result_ids: HashSet<String> = messages
.iter()
.filter(|m| m.role == Role::Tool)
.filter_map(|m| m.tool_call_id.clone())
.collect();
let mut insertions: Vec<(usize, Vec<Message>)> = Vec::new();
let mut i = 0;
while i < messages.len() {
let msg = &messages[i];
if msg.role == Role::Assistant {
if let Some(ref calls) = msg.tool_calls {
let call_ids: Vec<&str> = calls.iter().map(|tc| tc.id.as_str()).collect();
let missing: Vec<&str> = call_ids
.iter()
.filter(|id| !result_ids.contains(**id))
.copied()
.collect();
if !missing.is_empty() {
let mut insert_at = i + 1;
while insert_at < messages.len() && messages[insert_at].role == Role::Tool {
insert_at += 1;
}
let synthetic: Vec<Message> = missing
.into_iter()
.map(|id| {
Message::tool(
id,
"[Tool execution was interrupted before producing a result.]",
)
})
.collect();
insertions.push((insert_at, synthetic));
}
}
}
i += 1;
}
for (idx, msgs) in insertions.into_iter().rev() {
let idx = idx.min(messages.len());
for (offset, msg) in msgs.into_iter().enumerate() {
messages.insert(idx + offset, msg);
}
}
}
pub(super) type InferenceInputs = (
Vec<Message>,
Vec<String>,
RunAction,
Vec<std::sync::Arc<dyn tirea_contract::runtime::inference::InferenceRequestTransform>>,
);
pub(super) fn inference_inputs_from_step(
step: &mut StepContext<'_>,
system_prompt: &str,
) -> InferenceInputs {
let messages = build_messages(step, system_prompt);
let tools = &step.inference.tools[..];
let filtered_tools = tools.iter().map(|td| td.id.clone()).collect::<Vec<_>>();
let run_action = step.run_action();
let transforms = std::mem::take(&mut step.inference.request_transforms);
(messages, filtered_tools, run_action, transforms)
}
pub(super) fn build_request_for_filtered_tools(
messages: &[Message],
tools: &HashMap<String, Arc<dyn Tool>>,
filtered_tools: &[String],
transforms: &[std::sync::Arc<
dyn tirea_contract::runtime::inference::InferenceRequestTransform,
>],
) -> genai::chat::ChatRequest {
let filtered: HashSet<&str> = filtered_tools.iter().map(String::as_str).collect();
let filtered_tool_refs: Vec<&dyn Tool> = tools
.values()
.filter(|t| filtered.contains(t.descriptor().id.as_str()))
.map(|t| t.as_ref())
.collect();
let (effective_messages, enable_prompt_cache) = if transforms.is_empty() {
(messages.to_vec(), false)
} else {
let tool_descriptors: Vec<_> = filtered_tool_refs.iter().map(|t| t.descriptor()).collect();
let output = tirea_contract::runtime::inference::apply_request_transforms(
messages.to_vec(),
&tool_descriptors,
transforms,
);
(output.messages, output.enable_prompt_cache)
};
let mut request =
crate::engine::convert::build_request(&effective_messages, &filtered_tool_refs);
if enable_prompt_cache {
crate::engine::convert::apply_prompt_cache_hints(&mut request);
}
request
}
#[allow(dead_code)]
pub(super) fn suspended_calls_from_ctx(run_ctx: &RunContext) -> HashMap<String, SuspendedCall> {
run_ctx.suspended_calls()
}
pub(super) fn tool_call_states_from_ctx(run_ctx: &RunContext) -> HashMap<String, ToolCallState> {
run_ctx
.snapshot()
.map(|s| tool_call_states_from_state(&s))
.unwrap_or_default()
}
pub(super) struct ToolCallStateSeed<'a> {
pub(super) call_id: &'a str,
pub(super) tool_name: &'a str,
pub(super) arguments: &'a Value,
pub(super) status: ToolCallStatus,
pub(super) resume_token: Option<String>,
}
pub(super) struct ToolCallStateTransition {
pub(super) status: ToolCallStatus,
pub(super) resume_token: Option<String>,
pub(super) resume: Option<ToolCallResume>,
pub(super) updated_at: u64,
}
pub(super) fn transition_tool_call_state(
current: Option<ToolCallState>,
seed: ToolCallStateSeed<'_>,
transition: ToolCallStateTransition,
) -> Option<ToolCallState> {
let mut tool_state = current.unwrap_or_else(|| ToolCallState {
call_id: seed.call_id.to_string(),
tool_name: seed.tool_name.to_string(),
arguments: seed.arguments.clone(),
status: seed.status,
resume_token: seed.resume_token.clone(),
resume: None,
scratch: Value::Null,
updated_at: transition.updated_at,
});
if !tool_state.status.can_transition_to(transition.status) {
return None;
}
tool_state.call_id = seed.call_id.to_string();
tool_state.tool_name = seed.tool_name.to_string();
tool_state.arguments = seed.arguments.clone();
tool_state.status = transition.status;
tool_state.resume_token = transition.resume_token;
tool_state.resume = transition.resume;
tool_state.updated_at = transition.updated_at;
Some(tool_state)
}
pub(super) fn upsert_tool_call_state(
base_state: &Value,
call_id: &str,
tool_state: ToolCallState,
) -> Result<TrackedPatch, AgentLoopError> {
if call_id.trim().is_empty() {
return Err(AgentLoopError::StateError(
"failed to upsert tool call state: call_id must not be empty".to_string(),
));
}
let mut reduced = reduce_state_actions(
vec![tool_state.into_state_action()],
base_state,
"agent_loop",
&ScopeContext::run(),
)
.map_err(|e| AgentLoopError::StateError(e.to_string()))?;
match reduced.pop() {
Some(patch) => Ok(patch),
None => Ok(TrackedPatch::new(Patch::new()).with_source("agent_loop")),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::runtime::control::{ResumeDecisionAction, ToolCallStatus};
use serde_json::json;
use tirea_state::apply_patch;
fn sample_state(call_id: &str, status: ToolCallStatus) -> ToolCallState {
ToolCallState {
call_id: call_id.to_string(),
tool_name: "echo".to_string(),
arguments: json!({"msg": call_id}),
status,
resume_token: None,
resume: None,
scratch: Value::Null,
updated_at: 1,
}
}
#[test]
fn upsert_tool_call_state_generates_single_call_scoped_patch() {
let state = json!({
"__tool_call_scope": {
"call_a": {
"tool_call_state": sample_state("call_a", ToolCallStatus::Suspended)
},
"call_b": {
"tool_call_state": sample_state("call_b", ToolCallStatus::Suspended)
}
}
});
let updated = sample_state("call_a", ToolCallStatus::Resuming);
let patch = upsert_tool_call_state(&state, "call_a", updated).expect("patch should build");
let ops = patch.patch().ops();
assert_eq!(ops.len(), 1);
assert!(
ops[0]
.path()
.to_string()
.starts_with("$.__tool_call_scope.call_a.tool_call_state"),
"unexpected patch path: {}",
ops[0].path()
);
let merged = apply_patch(&state, patch.patch()).expect("patch should apply");
assert_eq!(
merged["__tool_call_scope"]["call_a"]["tool_call_state"]["status"],
json!("resuming")
);
assert_eq!(
merged["__tool_call_scope"]["call_b"]["tool_call_state"]["status"],
json!("suspended")
);
}
#[test]
fn upsert_tool_call_state_rejects_empty_call_id() {
let error = upsert_tool_call_state(&json!({}), " ", sample_state("x", ToolCallStatus::New))
.expect_err("empty call_id must fail");
let AgentLoopError::StateError(message) = error else {
panic!("unexpected error type");
};
assert!(message.contains("call_id must not be empty"));
}
#[test]
fn transition_tool_call_state_applies_seed_and_runtime_fields() {
let transitioned = transition_tool_call_state(
None,
ToolCallStateSeed {
call_id: "call_1",
tool_name: "echo",
arguments: &json!({"message":"hi"}),
status: ToolCallStatus::Suspended,
resume_token: Some("resume_token_1".to_string()),
},
ToolCallStateTransition {
status: ToolCallStatus::Resuming,
resume_token: Some("resume_token_1".to_string()),
resume: Some(ToolCallResume {
decision_id: "decision_1".to_string(),
action: ResumeDecisionAction::Resume,
result: json!(true),
reason: None,
updated_at: 42,
}),
updated_at: 42,
},
)
.expect("transition should be allowed");
assert_eq!(transitioned.call_id, "call_1");
assert_eq!(transitioned.tool_name, "echo");
assert_eq!(transitioned.arguments, json!({"message":"hi"}));
assert_eq!(transitioned.status, ToolCallStatus::Resuming);
assert_eq!(transitioned.resume_token.as_deref(), Some("resume_token_1"));
assert_eq!(
transitioned
.resume
.as_ref()
.map(|resume| &resume.decision_id),
Some(&"decision_1".to_string())
);
assert_eq!(transitioned.updated_at, 42);
}
#[test]
fn transition_tool_call_state_rejects_invalid_lifecycle_transition() {
let current = ToolCallState {
call_id: "call_1".to_string(),
tool_name: "echo".to_string(),
arguments: json!({"message":"done"}),
status: ToolCallStatus::Succeeded,
resume_token: None,
resume: None,
scratch: Value::Null,
updated_at: 1,
};
let transitioned = transition_tool_call_state(
Some(current),
ToolCallStateSeed {
call_id: "call_1",
tool_name: "echo",
arguments: &json!({"message":"done"}),
status: ToolCallStatus::New,
resume_token: None,
},
ToolCallStateTransition {
status: ToolCallStatus::Running,
resume_token: None,
resume: None,
updated_at: 2,
},
);
assert!(transitioned.is_none(), "terminal state should not reopen");
}
#[test]
fn patch_dangling_tool_calls_adds_synthetic_results() {
use crate::contracts::thread::ToolCall;
let mut messages = vec![
Message::user("hi"),
Message::assistant_with_tool_calls(
"let me check",
vec![
ToolCall::new("c1", "search", json!({"q": "x"})),
ToolCall::new("c2", "fetch", json!({"url": "y"})),
],
),
Message::tool("c1", "found it"),
];
patch_dangling_tool_calls(&mut messages);
assert_eq!(messages.len(), 4);
assert_eq!(messages[3].role, Role::Tool);
assert_eq!(messages[3].tool_call_id.as_deref(), Some("c2"));
assert!(messages[3].content.contains("interrupted"));
}
#[test]
fn patch_dangling_tool_calls_handles_end_of_history() {
use crate::contracts::thread::ToolCall;
let mut messages = vec![
Message::user("do something"),
Message::assistant_with_tool_calls(
"calling tool",
vec![ToolCall::new("c1", "search", json!({}))],
),
];
patch_dangling_tool_calls(&mut messages);
assert_eq!(messages.len(), 3);
assert_eq!(messages[2].role, Role::Tool);
assert_eq!(messages[2].tool_call_id.as_deref(), Some("c1"));
}
#[test]
fn patch_dangling_tool_calls_in_middle_of_history() {
use crate::contracts::thread::ToolCall;
let mut messages = vec![
Message::user("first"),
Message::assistant_with_tool_calls(
"calling",
vec![ToolCall::new("c1", "t", json!({}))],
),
Message::user("second"),
Message::assistant("ok"),
];
patch_dangling_tool_calls(&mut messages);
assert_eq!(messages.len(), 5);
assert_eq!(messages[2].role, Role::Tool);
assert_eq!(messages[2].tool_call_id.as_deref(), Some("c1"));
assert_eq!(messages[3].role, Role::User);
assert_eq!(messages[4].role, Role::Assistant);
}
#[test]
fn patch_dangling_tool_calls_no_op_when_complete() {
use crate::contracts::thread::ToolCall;
let mut messages = vec![
Message::user("hi"),
Message::assistant_with_tool_calls(
"calling",
vec![ToolCall::new("c1", "t", json!({}))],
),
Message::tool("c1", "result"),
Message::assistant("done"),
];
let len_before = messages.len();
patch_dangling_tool_calls(&mut messages);
assert_eq!(messages.len(), len_before, "no synthetic results needed");
}
#[test]
fn pending_approval_placeholder_detection_requires_exact_template() {
let exact = pending_approval_placeholder_message("echo");
assert!(is_pending_approval_placeholder_content(&exact));
assert!(!is_pending_approval_placeholder_content(
"prefix Tool 'echo' is awaiting approval. Execution paused. suffix"
));
assert!(!is_pending_approval_placeholder_content(
"Tool '' is awaiting approval. Execution paused."
));
}
}