use super::super::JsonError;
use super::AppState;
use axum::http::StatusCode;
use roboticus_agent::capability::CapabilitySource;
use roboticus_agent::script_runner::ScriptRunner;
use roboticus_agent::tools::ToolContext;
use roboticus_core::InputAuthority;
use roboticus_pipeline::flight_recorder::ToolSource;
pub(crate) struct ToolExecutionDetails {
pub output: String,
pub source: ToolSource,
}
pub(crate) use roboticus_pipeline::tool_parsing::classify_provider_error;
fn parse_risk_level(raw: &str) -> Result<roboticus_core::RiskLevel, String> {
match raw.to_ascii_lowercase().as_str() {
"safe" => Ok(roboticus_core::RiskLevel::Safe),
"caution" => Ok(roboticus_core::RiskLevel::Caution),
"dangerous" => Ok(roboticus_core::RiskLevel::Dangerous),
"forbidden" => Ok(roboticus_core::RiskLevel::Forbidden),
_ => Err(format!("invalid skill risk_level '{raw}'")),
}
}
async fn resolve_survival_tier(state: &AppState) -> roboticus_core::SurvivalTier {
if let Ok(Some(cached)) = roboticus_db::treasury::get_treasury_state(&state.db) {
if let Ok(updated) =
chrono::NaiveDateTime::parse_from_str(&cached.updated_at, "%Y-%m-%d %H:%M:%S")
{
let age = chrono::Utc::now().naive_utc() - updated;
if age.num_seconds() > 600 {
tracing::warn!(
age_seconds = age.num_seconds(),
"treasury state is stale (>600s); returning cached value"
);
}
}
return cached.survival_tier;
}
let balance = match state.wallet.wallet.get_usdc_balance().await {
Ok(balance) => balance,
Err(e) => {
tracing::warn!(
error = %e,
"wallet balance lookup failed; falling back to zero balance tier"
);
0.0
}
};
roboticus_core::SurvivalTier::from_balance(balance, 0.0)
}
async fn resolve_run_script_policy(
state: &AppState,
tool_name: &str,
params: &serde_json::Value,
authority: InputAuthority,
default_risk: roboticus_core::RiskLevel,
) -> Result<(roboticus_core::RiskLevel, Option<(String, String, String)>), String> {
if tool_name != "run_script" {
return Ok((default_risk, None));
}
let script_arg = params.get("path").and_then(|v| v.as_str()).unwrap_or("");
let config = state.config.read().await;
let runner = ScriptRunner::new(config.skills.clone(), config.security.filesystem.clone());
let maybe_script_path = runner
.resolve_script_path(std::path::Path::new(script_arg))
.inspect_err(|e| {
tracing::debug!(
script = script_arg,
error = %e,
"run_script policy: script path resolution failed (proceeding with default risk)"
);
})
.ok()
.map(|p| p.to_string_lossy().to_string());
drop(config);
let Some(script_path) = maybe_script_path else {
return Ok((default_risk, None));
};
let skill = roboticus_db::skills::find_skill_by_script_path(&state.db, &script_path)
.map_err(|e| format!("Skill policy lookup failed: {e}"))?;
let Some(skill) = skill else {
return Ok((default_risk, None));
};
if !skill.enabled {
return Err(format!(
"Policy override denied: skill '{}' is disabled",
skill.name
));
}
let effective_risk = parse_risk_level(&skill.risk_level)
.map_err(|e| format!("Policy override denied: skill '{}' has {}", skill.name, e))?;
let matched_skill = Some((
skill.id.clone(),
skill.name.clone(),
skill.content_hash.clone(),
));
if let Some(overrides_raw) = skill.policy_overrides_json.as_deref() {
let overrides = serde_json::from_str::<serde_json::Value>(overrides_raw).map_err(|e| {
format!(
"Policy override parse failed for skill '{}': {e}",
skill.name
)
})?;
let require_creator = overrides
.get("require_creator")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let deny_external = overrides
.get("deny_external")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let disabled = overrides
.get("disabled")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if disabled {
return Err(format!(
"Policy override denied: skill '{}' is disabled",
skill.name
));
}
if require_creator && authority != roboticus_core::InputAuthority::Creator {
return Err(format!(
"Policy override denied: skill '{}' requires Creator authority",
skill.name
));
}
if deny_external && authority == roboticus_core::InputAuthority::External {
return Err(format!(
"Policy override denied: skill '{}' denies External authority",
skill.name
));
}
}
Ok((effective_risk, matched_skill))
}
pub(crate) async fn execute_tool_call(
state: &AppState,
tool_name: &str,
params: &serde_json::Value,
turn_id: &str,
authority: InputAuthority,
channel: Option<&str>,
) -> Result<String, String> {
execute_tool_call_detailed(state, tool_name, params, turn_id, authority, channel)
.await
.map(|details| details.output)
}
pub(crate) async fn execute_tool_call_after_approval(
state: &AppState,
tool_name: &str,
params: &serde_json::Value,
turn_id: &str,
authority: InputAuthority,
channel: Option<&str>,
) -> Result<String, String> {
execute_tool_call_internal(state, tool_name, params, turn_id, authority, channel, false)
.await
.map(|details| details.output)
}
fn capability_source_to_tool_source(source: CapabilitySource) -> ToolSource {
match source {
CapabilitySource::BuiltIn => ToolSource::BuiltIn,
CapabilitySource::Plugin(name) => ToolSource::Plugin(name),
CapabilitySource::Mcp { server, .. } => ToolSource::Mcp { server },
}
}
pub(crate) async fn execute_tool_call_detailed(
state: &AppState,
tool_name: &str,
params: &serde_json::Value,
turn_id: &str,
authority: InputAuthority,
channel: Option<&str>,
) -> Result<ToolExecutionDetails, String> {
execute_tool_call_internal(state, tool_name, params, turn_id, authority, channel, true).await
}
async fn execute_tool_call_internal(
state: &AppState,
tool_name: &str,
params: &serde_json::Value,
turn_id: &str,
authority: InputAuthority,
channel: Option<&str>,
enforce_approval_gate: bool,
) -> Result<ToolExecutionDetails, String> {
let tier = resolve_survival_tier(state).await;
if super::is_virtual_delegation_tool(tool_name) {
let start = std::time::Instant::now();
let result = super::execute_virtual_subagent_tool_call(
state, tool_name, params, turn_id, authority, tier,
)
.await;
let duration_ms = start.elapsed().as_millis() as i64;
let (output, status) = match &result {
Ok(out) => (out.clone(), "success"),
Err(err) => (err.clone(), "error"),
};
roboticus_db::tools::record_tool_call_with_skill(
&state.db,
turn_id,
tool_name,
¶ms.to_string(),
Some(&output),
status,
Some(duration_ms),
None,
None,
None,
)
.inspect_err(|e| tracing::warn!(error = %e, "failed to record virtual tool call"))
.ok();
return result.map(|output| ToolExecutionDetails {
output,
source: ToolSource::BuiltIn,
});
}
if super::is_virtual_orchestration_tool(tool_name) {
let start = std::time::Instant::now();
let result = super::execute_virtual_orchestration_tool(
state, tool_name, params, turn_id, authority, tier,
)
.await;
let duration_ms = start.elapsed().as_millis() as i64;
let (output, status) = match &result {
Ok(out) => (out.clone(), "success"),
Err(err) => (err.clone(), "error"),
};
roboticus_db::tools::record_tool_call_with_skill(
&state.db,
turn_id,
tool_name,
¶ms.to_string(),
Some(&output),
status,
Some(duration_ms),
None,
None,
None,
)
.inspect_err(|e| tracing::warn!(error = %e, "failed to record orchestration tool call"))
.ok();
return result.map(|output| ToolExecutionDetails {
output,
source: ToolSource::BuiltIn,
});
}
let tool = match state.tools.get(tool_name) {
Some(t) => t,
None => return Err(format!("Unknown tool: {tool_name}")),
};
let capability = state.capabilities.get(tool_name).await;
let tool_source = capability
.as_ref()
.map(|cap| capability_source_to_tool_source(cap.source()))
.unwrap_or(ToolSource::BuiltIn);
let (effective_risk, matched_skill) =
resolve_run_script_policy(state, tool_name, params, authority, tool.risk_level()).await?;
if authority == InputAuthority::Creator {
roboticus_db::policy::record_policy_decision(
&state.db,
Some(turn_id),
tool_name,
"allow",
Some("creator_override"),
Some("Creator authority bypassed policy/approval gates"),
)
.inspect_err(|e| tracing::warn!(error = %e, "failed to record creator policy decision"))
.ok();
} else {
let policy_result = super::check_tool_policy(
&state.policy_engine,
tool_name,
params,
authority,
tier,
effective_risk,
);
let (decision_str, rule_name, reason) = match &policy_result {
Ok(()) => ("allow".to_string(), None, None),
Err(JsonError(_status, msg)) => (
"deny".to_string(),
Some("policy_engine"),
Some(msg.as_str()),
),
};
roboticus_db::policy::record_policy_decision(
&state.db,
Some(turn_id),
tool_name,
&decision_str,
rule_name,
reason,
)
.inspect_err(|e| tracing::warn!(error = %e, "failed to record policy decision"))
.ok();
if let Err(JsonError(_status, msg)) = policy_result {
return Err(format!("Policy denied: {msg}"));
}
}
if enforce_approval_gate && authority != InputAuthority::Creator {
match state.approvals.check_tool(tool_name) {
Ok(roboticus_agent::approvals::ToolClassification::Gated) => {
let request = state
.approvals
.request_approval(
tool_name,
¶ms.to_string(),
None,
Some(turn_id),
authority,
)
.map_err(|e| format!("Approval error: {e}"))?;
roboticus_db::approvals::record_approval_request(
&state.db,
&request.id,
&request.tool_name,
&request.tool_input,
request.session_id.as_deref(),
request.turn_id.as_deref(),
"pending",
&request.timeout_at.to_rfc3339(),
)
.inspect_err(|e| tracing::warn!(error = %e, "failed to persist approval request"))
.ok();
state.event_bus.publish(
serde_json::json!({
"type": "pending_approval",
"tool": tool_name,
"request_id": request.id,
})
.to_string(),
);
return Err(format!(
"Tool '{tool_name}' requires approval (request: {})",
request.id
));
}
Err(e) => {
return Err(format!("Tool blocked: {e}"));
}
Ok(_) => {}
}
}
let (workspace_root, agent_id, agent_name, tool_allowed_paths, sandbox) = {
let cfg = state.config.read().await;
(
cfg.agent.workspace.clone(),
cfg.agent.id.clone(),
cfg.agent.name.clone(),
cfg.security.filesystem.tool_allowed_paths.clone(),
roboticus_agent::tools::ToolSandboxSnapshot::from_config(
&cfg.security.filesystem,
&cfg.skills,
),
)
};
let ctx = ToolContext {
session_id: turn_id.to_string(),
agent_id,
agent_name,
authority,
workspace_root,
tool_allowed_paths,
channel: channel.map(|s| s.to_string()),
db: Some(state.db.clone()),
sandbox,
};
let ws_agent_id = {
let config = state.config.read().await;
config.agent.id.clone()
};
state.event_bus.publish(
serde_json::json!({
"type": "agent_working",
"agent_id": ws_agent_id,
"workstation": "exec",
"activity": format!("tool:{tool_name}"),
"turn_id": turn_id,
})
.to_string(),
);
if let Some((_, skill_name, _)) = matched_skill.as_ref() {
state.event_bus.publish(
serde_json::json!({
"type": "skill_activated",
"agent_id": ws_agent_id,
"skill": skill_name,
"tool_name": tool_name,
"turn_id": turn_id,
})
.to_string(),
);
}
let start = std::time::Instant::now();
let timeout_duration = std::time::Duration::from_secs(120);
let result = match tokio::time::timeout(timeout_duration, async {
if let Some(cap) = capability {
cap.execute(params.clone(), &ctx).await
} else {
tool.execute(params.clone(), &ctx).await
}
})
.await
{
Ok(result) => result,
Err(_) => Err(roboticus_agent::tools::ToolError {
message: format!("Tool '{tool_name}' timed out after {timeout_duration:?}"),
}),
};
let duration_ms = start.elapsed().as_millis() as i64;
const MAX_TOOL_OUTPUT: usize = 16_384;
let (output, status) = match &result {
Ok(r) => {
tracing::info!(
tool = tool_name,
duration_ms,
output_len = r.output.len(),
"tool executed successfully"
);
let mut out = if r.output.len() > MAX_TOOL_OUTPUT {
let boundary = r.output.floor_char_boundary(MAX_TOOL_OUTPUT);
format!(
"{}...\n[truncated: {} bytes total]",
&r.output[..boundary],
r.output.len()
)
} else {
r.output.clone()
};
let mut status = "success";
if let Some(unreadable) = r
.metadata
.as_ref()
.and_then(|m| m.get("unreadable_files"))
.and_then(|v| v.as_u64())
&& unreadable > 0
{
status = "partial_success";
out = format!("{out}\n\n[warning] Search skipped {unreadable} unreadable file(s).");
}
use roboticus_agent::tool_output_filter::ToolOutputFilterChain;
let out = ToolOutputFilterChain::default_chain().apply(tool_name, &out);
(out, status)
}
Err(e) => {
tracing::warn!(tool = tool_name, duration_ms, error = %e.message, "tool execution failed");
(e.message.clone(), "error")
}
};
let (skill_id, skill_name, skill_hash) = match matched_skill.as_ref() {
Some((id, name, hash)) => (Some(id.as_str()), Some(name.as_str()), Some(hash.as_str())),
None => (None, None, None),
};
roboticus_db::tools::record_tool_call_with_skill(
&state.db,
turn_id,
tool_name,
¶ms.to_string(),
Some(&output),
status,
Some(duration_ms),
skill_id,
skill_name,
skill_hash,
)
.inspect_err(|e| tracing::warn!(error = %e, "failed to record tool call"))
.ok();
if status == "error" {
state.event_bus.publish(
serde_json::json!({
"type": "tool_error",
"tool": tool_name,
"turn_id": turn_id,
"duration_ms": duration_ms,
"error": &output,
})
.to_string(),
);
}
state.event_bus.publish(
serde_json::json!({
"type": "agent_idle",
"agent_id": ws_agent_id,
"workstation": "exec",
"turn_id": turn_id,
})
.to_string(),
);
if let Err(e) = result {
Err(e.message)
} else {
Ok(ToolExecutionDetails {
output,
source: tool_source,
})
}
}
pub(crate) fn check_tool_policy(
engine: &roboticus_agent::policy::PolicyEngine,
tool_name: &str,
params: &serde_json::Value,
authority: InputAuthority,
tier: roboticus_core::SurvivalTier,
risk_level: roboticus_core::RiskLevel,
) -> Result<(), JsonError> {
let call = roboticus_agent::policy::ToolCallRequest {
tool_name: tool_name.into(),
params: params.clone(),
risk_level,
};
let ctx = roboticus_agent::policy::PolicyContext {
authority,
survival_tier: tier,
claim: None,
};
let decision = engine.evaluate_all(&call, &ctx);
match decision {
roboticus_core::PolicyDecision::Allow => Ok(()),
roboticus_core::PolicyDecision::Deny { rule, reason } => {
tracing::warn!(tool = tool_name, rule = %rule, reason = %reason, "Policy denied tool call");
Err(JsonError(
StatusCode::FORBIDDEN,
format!("Policy denied: {reason}"),
))
}
}
}
#[cfg(test)]
mod tests {
use roboticus_pipeline::tool_parsing::{parse_tool_call, parse_tool_calls};
#[test]
fn parse_tool_calls_single() {
let input = r#"{"tool_call": {"name": "echo", "params": {"message": "hi"}}}"#;
let calls = parse_tool_calls(input);
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].0, "echo");
}
#[test]
fn parse_tool_calls_multiple() {
let input = r#"{"tool_call": {"name": "echo", "params": {"message": "hi"}}}
{"tool_call": {"name": "web-search", "params": {"query": "rust"}}}"#;
let calls = parse_tool_calls(input);
assert_eq!(calls.len(), 2);
assert_eq!(calls[0].0, "echo");
assert_eq!(calls[1].0, "web-search");
}
#[test]
fn parse_tool_calls_with_surrounding_text() {
let input = r#"Let me help you with that.
{"tool_call": {"name": "echo", "params": {"message": "test"}}}
I will also search:
{"tool_call": {"name": "web-search", "params": {"query": "rust lang"}}}"#;
let calls = parse_tool_calls(input);
assert_eq!(calls.len(), 2);
assert_eq!(calls[0].0, "echo");
assert_eq!(calls[1].0, "web-search");
}
#[test]
fn parse_tool_calls_empty() {
let calls = parse_tool_calls("No tool calls here");
assert!(calls.is_empty());
}
#[test]
fn parse_tool_calls_unterminated_json_recovers() {
let input = r#"{"tool_call": {"name": "echo", "params": {"message": "hi"}}"#;
let calls = parse_tool_calls(input);
assert_eq!(calls.len(), 1, "should recover truncated tool call");
assert_eq!(calls[0].0, "echo");
assert_eq!(calls[0].1["message"], "hi");
}
#[test]
fn parse_tool_call_backward_compat() {
let input = r#"Some text {"tool_call": {"name": "echo", "params": {"message": "hi"}}}"#;
let single = parse_tool_call(input);
assert!(single.is_some());
assert_eq!(single.unwrap().0, "echo");
}
#[test]
fn parse_tool_call_shorthand_shape() {
let input = r#"{"tool_call":"bash","params":{"command":"ls -la"}}"#;
let single = parse_tool_call(input).expect("should parse shorthand shape");
assert_eq!(single.0, "bash");
assert_eq!(single.1["command"], "ls -la");
}
#[test]
fn parse_tool_calls_shorthand_shape() {
let input = r#"{"tool_call":"orchestrate-subagents","params":{"task":"sitrep"}}"#;
let calls = parse_tool_calls(input);
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].0, "orchestrate-subagents");
assert_eq!(calls[0].1["task"], "sitrep");
}
#[test]
fn parse_tool_call_accepts_tool_name_and_arguments_shape() {
let input = r#"{"tool_call":{"tool_name":"compose-subagent","arguments":{"name":"finance-specialist"}}}"#;
let single = parse_tool_call(input).expect("should parse tool_name/arguments shape");
assert_eq!(single.0, "compose-subagent");
assert_eq!(single.1["name"], "finance-specialist");
}
#[test]
fn parse_tool_call_accepts_tool_and_input_shape() {
let input = r#"{"tool_call":{"tool":"compose-skill","input":{"name":"forecasting"}}}"#;
let single = parse_tool_call(input).expect("should parse tool/input shape");
assert_eq!(single.0, "compose-skill");
assert_eq!(single.1["name"], "forecasting");
}
#[test]
fn parse_tool_calls_narrated_with_backticks() {
let input = "Perfect! I can see the `ghola` skill is available in my workspace. Let me use it to fetch weather information for Meyrin, Switzerland from meteoswiss.ch.\n{\"tool_call\":{\"name\":\"run_script\",\"params\":{\"path\":\"ghola\",\"args\":[\"https://www.meteoswiss.ch/en/weather/weather-in-your-region/meyrin.html\"]}}}";
let calls = parse_tool_calls(input);
assert_eq!(
calls.len(),
1,
"should parse narrated tool call: {:?}",
calls
);
assert_eq!(calls[0].0, "run_script");
}
#[test]
fn parse_openai_structured_tool_call_with_object_arguments() {
let response_body = serde_json::json!({
"id": "chatcmpl-test",
"object": "chat.completion",
"model": "kimi-k2-turbo-preview",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "Let me fetch that for you.",
"tool_calls": [{
"id": "call_1",
"type": "function",
"function": {
"name": "run_script",
"arguments": {"path": "ghola", "args": ["https://example.com"]}
}
}]
},
"finish_reason": "tool_calls"
}],
"usage": {"prompt_tokens": 100, "completion_tokens": 50}
});
let resp = roboticus_llm::format::translate_response(
&response_body,
roboticus_core::ApiFormat::OpenAiCompletions,
)
.unwrap();
let calls = parse_tool_calls(&resp.content);
assert_eq!(
calls.len(),
1,
"should find tool call in translated response"
);
assert_eq!(calls[0].0, "run_script");
assert_eq!(
calls[0].1["path"], "ghola",
"arguments should be preserved even when provider returns them as an object, not a string. Got: {}",
calls[0].1
);
}
#[test]
fn parse_tool_calls_truncated_json_recovery() {
let input = "Let me try a different approach:\n{\"tool_call\":{\"name\":\"bash\",\"params\":{\"command\":\"curl -s \\\"wttr.in/Meyrin?format=3\\\"\",\"timeout_seconds\":10}}";
let calls = parse_tool_calls(input);
assert_eq!(
calls.len(),
1,
"should recover truncated tool call: {:?}",
calls
);
assert_eq!(calls[0].0, "bash");
assert_eq!(calls[0].1["command"], "curl -s \"wttr.in/Meyrin?format=3\"");
}
#[test]
fn parse_tool_calls_after_sanitize_simulation() {
let content = "Perfect! I can see the `ghola` skill is available in my workspace. Let me use it to fetch weather information for Meyrin, Switzerland from meteoswiss.ch.\n{\"tool_call\":{\"name\":\"run_script\",\"params\":{\"path\":\"ghola\",\"args\":[\"https://www.meteoswiss.ch/en/weather/weather-in-your-region/meyrin.html\"]}}}";
let blocked = roboticus_agent::injection::scan_output(content);
assert!(
!blocked,
"scan_output should NOT block a legitimate tool call response"
);
let calls = parse_tool_calls(content);
assert_eq!(calls.len(), 1, "tool call should parse after sanitize pass");
assert_eq!(calls[0].0, "run_script");
assert_eq!(calls[0].1["path"], "ghola");
}
}