async fn record_automation_external_actions_for_session(
state: &AppState,
run_id: &str,
automation: &AutomationV2Spec,
node: &AutomationFlowNode,
attempt: u32,
session_id: &str,
session: &Session,
) -> anyhow::Result<Vec<ExternalActionRecord>> {
let actions = collect_automation_external_action_receipts(
&state.capability_resolver.list_bindings().await?,
run_id,
automation,
node,
attempt,
session_id,
session,
);
let mut recorded = Vec::with_capacity(actions.len());
for action in actions {
recorded.push(state.record_external_action(action).await?);
}
Ok(recorded)
}
pub(crate) fn collect_automation_external_action_receipts(
bindings: &capability_resolver::CapabilityBindingsFile,
run_id: &str,
automation: &AutomationV2Spec,
node: &AutomationFlowNode,
attempt: u32,
session_id: &str,
session: &Session,
) -> Vec<ExternalActionRecord> {
if !automation_node_is_outbound_action(node) {
return Vec::new();
}
let mut out = Vec::new();
let mut seen = std::collections::HashSet::new();
for (call_index, part) in session
.messages
.iter()
.flat_map(|message| message.parts.iter())
.enumerate()
{
let MessagePart::ToolInvocation {
tool,
args,
result,
error,
} = part
else {
continue;
};
if error.as_ref().is_some_and(|value| !value.trim().is_empty()) || result.is_none() {
continue;
}
let Some(binding) = bindings
.bindings
.iter()
.find(|binding| automation_binding_matches_tool_name(binding, tool))
else {
continue;
};
let idempotency_key = automation_external_action_idempotency_key(
automation,
run_id,
node,
tool,
args,
&call_index.to_string(),
);
if !seen.insert(idempotency_key.clone()) {
continue;
}
let source_id = format!("{run_id}:{}:{attempt}:{call_index}", node.node_id);
let created_at_ms = now_ms();
out.push(ExternalActionRecord {
action_id: format!("automation-external-{}", &idempotency_key[..16]),
operation: binding.capability_id.clone(),
status: "posted".to_string(),
source_kind: Some("automation_v2".to_string()),
source_id: Some(source_id),
routine_run_id: None,
context_run_id: Some(format!("automation-v2-{run_id}")),
capability_id: Some(binding.capability_id.clone()),
provider: Some(binding.provider.clone()),
target: automation_external_action_target(args, result.as_ref()),
approval_state: Some("executed".to_string()),
idempotency_key: Some(idempotency_key),
receipt: Some(json!({
"tool": tool,
"args": args,
"result": result,
})),
error: None,
metadata: Some(json!({
"automationID": automation.automation_id,
"automationRunID": run_id,
"nodeID": node.node_id,
"attempt": attempt,
"nodeObjective": node.objective,
"sessionID": session_id,
"tool": tool,
"provider": binding.provider,
})),
created_at_ms,
updated_at_ms: created_at_ms,
});
}
out
}
pub(crate) fn automation_external_action_idempotency_key(
automation: &AutomationV2Spec,
run_id: &str,
node: &AutomationFlowNode,
tool: &str,
args: &Value,
call_index: &str,
) -> String {
crate::sha256_hex(&[
"automation_v2",
&automation.automation_id,
run_id,
&node.node_id,
tool,
&args.to_string(),
call_index,
])
}
pub(crate) fn automation_attempt_uses_legacy_fallback(
session_text: &str,
artifact_validation: Option<&Value>,
) -> bool {
if artifact_validation
.and_then(|value| value.get("semantic_block_reason"))
.and_then(Value::as_str)
.is_some()
{
return false;
}
let lowered = session_text
.chars()
.take(1600)
.collect::<String>()
.to_ascii_lowercase();
[
"status: blocked",
"status blocked",
"## status blocked",
"blocked pending",
"this brief is blocked",
"brief is blocked",
"partially blocked",
"provisional",
"path-level evidence",
"based on filenames not content",
"could not be confirmed from file contents",
"could not safely cite exact file-derived claims",
"not approved",
"approval has not happened",
"publication is blocked",
"i’m blocked",
"i'm blocked",
"status: verify_failed",
"status verify_failed",
"verification failed",
"tests failed",
"build failed",
"lint failed",
"verify failed",
]
.iter()
.any(|marker| lowered.contains(marker))
}
pub(crate) fn automation_publish_editorial_block_reason(
run: &AutomationV2RunRecord,
node: &AutomationFlowNode,
) -> Option<String> {
if !automation_node_is_outbound_action(node) {
return None;
}
let mut upstream_ids = node.depends_on.clone();
for input in &node.input_refs {
if !upstream_ids
.iter()
.any(|value| value == &input.from_step_id)
{
upstream_ids.push(input.from_step_id.clone());
}
}
let blocked_upstreams = upstream_ids
.into_iter()
.filter(|node_id| {
let Some(output) = run.checkpoint.node_outputs.get(node_id) else {
return false;
};
output
.get("failure_kind")
.and_then(Value::as_str)
.is_some_and(|value| value == "editorial_quality_failed")
|| output
.get("phase")
.and_then(Value::as_str)
.is_some_and(|value| value == "editorial_validation")
|| output
.get("validator_summary")
.and_then(|value| value.get("unmet_requirements"))
.and_then(Value::as_array)
.is_some_and(|requirements| {
requirements.iter().any(|value| {
matches!(
value.as_str(),
Some("editorial_substance_missing")
| Some("markdown_structure_missing")
| Some("editorial_clearance_required")
)
})
})
})
.collect::<Vec<_>>();
if blocked_upstreams.is_empty() {
None
} else {
Some(format!(
"publish step blocked until upstream editorial issues are resolved: {}",
blocked_upstreams.join(", ")
))
}
}
pub(crate) fn automation_binding_matches_tool_name(
binding: &capability_resolver::CapabilityBinding,
tool_name: &str,
) -> bool {
binding.tool_name.eq_ignore_ascii_case(tool_name)
|| binding
.tool_name_aliases
.iter()
.any(|alias| alias.eq_ignore_ascii_case(tool_name))
}
pub(crate) fn automation_external_action_target(
args: &Value,
result: Option<&Value>,
) -> Option<String> {
for candidate in [
args.pointer("/owner_repo").and_then(Value::as_str),
args.pointer("/repo").and_then(Value::as_str),
args.pointer("/repository").and_then(Value::as_str),
args.pointer("/channel").and_then(Value::as_str),
args.pointer("/channel_id").and_then(Value::as_str),
args.pointer("/thread_ts").and_then(Value::as_str),
result
.and_then(|value| value.pointer("/metadata/channel"))
.and_then(Value::as_str),
result
.and_then(|value| value.pointer("/metadata/repo"))
.and_then(Value::as_str),
] {
let trimmed = candidate.map(str::trim).unwrap_or_default();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}
None
}
pub(crate) async fn try_execute_connector_preflight_node(
state: &AppState,
run_id: &str,
automation: &AutomationV2Spec,
node: &AutomationFlowNode,
session_id: &str,
workspace_root: &str,
required_output_path: Option<&str>,
requested_tools: &[String],
effective_offered_tools: &[String],
capability_resolution: &Value,
mcp_tool_diagnostics: &Value,
) -> anyhow::Result<Option<Value>> {
if !automation_node_is_connector_preflight(node) {
return Ok(None);
}
let required_calls = automation_node_required_tool_calls(node);
if required_calls.is_empty() {
return Ok(None);
}
let Some(output_path) = required_output_path else {
return Ok(None);
};
let unavailable = required_calls
.iter()
.filter(|call| {
!effective_offered_tools
.iter()
.any(|offered| offered == &call.tool)
})
.map(|call| call.tool.clone())
.collect::<Vec<_>>();
if !unavailable.is_empty() {
return Ok(Some(build_connector_preflight_blocked_output(
node,
requested_tools,
capability_resolution,
mcp_tool_diagnostics,
&format!(
"required connector preflight tool(s) are unavailable: {}",
unavailable.join(", ")
),
json!({
"required_tool_calls": required_calls,
"unavailable_tools": unavailable,
}),
)));
}
let resolved_output =
resolve_automation_output_path_for_run(workspace_root, run_id, output_path)?;
if let Some(parent) = resolved_output.parent() {
std::fs::create_dir_all(parent)?;
}
let mut session = state
.storage
.get_session(session_id)
.await
.unwrap_or_else(|| {
Session::new(
Some(format!(
"Automation {} / {}",
automation.automation_id, node.node_id
)),
Some(workspace_root.to_string()),
)
});
session.project_id = Some(automation_workspace_project_id(workspace_root));
session.workspace_root = Some(workspace_root.to_string());
let mut invocation_parts = Vec::new();
let mut call_rows = Vec::new();
let mut failed_required = Vec::new();
let tenant_context = automation.tenant_context();
for (index, call) in required_calls.iter().enumerate() {
let args = call.args.clone().unwrap_or_else(|| json!({}));
let result = state
.tools
.execute_for_tenant(&call.tool, args.clone(), tenant_context.clone())
.await;
match result {
Ok(result) => {
let result_value = json!({
"output": result.output,
"metadata": result.metadata,
});
invocation_parts.push(MessagePart::ToolInvocation {
tool: call.tool.clone(),
args: args.clone(),
result: Some(result_value.clone()),
error: None,
});
call_rows.push(json!({
"index": index,
"tool": call.tool,
"args": args,
"status": "completed",
"required_success": call.required_success,
"evidence_key": call.evidence_key,
"result_excerpt": truncate_text(&result_value.to_string(), 1200),
}));
}
Err(error) => {
let error_text = error.to_string();
invocation_parts.push(MessagePart::ToolInvocation {
tool: call.tool.clone(),
args: args.clone(),
result: None,
error: Some(error_text.clone()),
});
call_rows.push(json!({
"index": index,
"tool": call.tool,
"args": args,
"status": "failed",
"required_success": call.required_success,
"evidence_key": call.evidence_key,
"error": error_text,
}));
if call.required_success {
failed_required.push(call.tool.clone());
}
}
}
}
let preflight_status = if failed_required.is_empty() {
"completed"
} else {
"blocked"
};
let blocked_reason = if failed_required.is_empty() {
Value::Null
} else {
json!(format!(
"required connector preflight call(s) failed: {}",
failed_required.join(", ")
))
};
let artifact = json!({
"status": preflight_status,
"node_id": node.node_id,
"run_id": run_id,
"automation_id": automation.automation_id,
"checked_at_ms": now_ms(),
"required_tool_calls": call_rows,
"blocked_reason": blocked_reason,
"capability_resolution": capability_resolution,
"mcp_tool_diagnostics": mcp_tool_diagnostics,
});
let artifact_text = serde_json::to_string_pretty(&artifact)?;
std::fs::write(&resolved_output, &artifact_text)?;
let display_path = resolved_output
.strip_prefix(workspace_root)
.ok()
.and_then(|value| value.to_str().map(str::to_string))
.filter(|value| !value.is_empty())
.unwrap_or_else(|| output_path.to_string());
invocation_parts.push(MessagePart::Text {
text: format!(
"Connector preflight {} for `{}` and wrote `{}`.\n\n{}",
preflight_status, node.node_id, display_path, artifact_text
),
});
session.messages.push(tandem_types::Message::new(
MessageRole::Assistant,
invocation_parts,
));
state.storage.save_session(session.clone()).await?;
let artifact_validation = if failed_required.is_empty() {
json!({
"accepted_candidate_source": "deterministic_connector_preflight",
"validation_outcome": "accepted",
"unmet_requirements": [],
})
} else {
json!({
"accepted_candidate_source": "deterministic_connector_preflight",
"validation_outcome": "blocked",
"semantic_block_reason": blocked_reason,
"unmet_requirements": ["mcp_required_tool_failed"],
})
};
Ok(Some(
node_output::wrap_automation_node_output_with_automation(
automation,
node,
&session,
requested_tools,
session_id,
Some(run_id),
&format!("{{\"status\":\"{}\"}}", preflight_status),
Some((display_path, artifact_text)),
Some(artifact_validation),
),
))
}
pub(crate) async fn try_execute_workspace_scope_preflight_node(
state: &AppState,
run_id: &str,
automation: &AutomationV2Spec,
node: &AutomationFlowNode,
session_id: &str,
workspace_root: &str,
required_output_path: Option<&str>,
requested_tools: &[String],
) -> anyhow::Result<Option<Value>> {
if !automation_node_is_workspace_scope_preflight(node) {
return Ok(None);
}
let Some(output_path) = required_output_path else {
return Ok(None);
};
let resolved_output =
resolve_automation_output_path_for_run(workspace_root, run_id, output_path)?;
if let Some(parent) = resolved_output.parent() {
std::fs::create_dir_all(parent)?;
}
let path_candidates = automation_workspace_scope_path_candidates(node);
let workspace = PathBuf::from(workspace_root);
let mut present_paths = Vec::new();
let mut missing_paths = Vec::new();
let mut source_material = serde_json::Map::new();
let mut invocation_parts = Vec::new();
for rel in &path_candidates {
let resolved = workspace.join(rel);
if resolved.is_file() {
present_paths.push(rel.clone());
let content = std::fs::read_to_string(&resolved).unwrap_or_default();
let excerpt = truncate_text(&content, 2000);
source_material.insert(
rel.clone(),
json!({
"path": rel,
"bytes": content.len(),
"excerpt": excerpt,
}),
);
invocation_parts.push(MessagePart::ToolInvocation {
tool: "read".to_string(),
args: json!({ "path": rel }),
result: Some(json!({
"path": rel,
"exists": true,
"bytes": content.len(),
"excerpt": truncate_text(&content, 600),
})),
error: None,
});
} else {
missing_paths.push(rel.clone());
invocation_parts.push(MessagePart::ToolInvocation {
tool: "read".to_string(),
args: json!({ "path": rel }),
result: None,
error: Some("file not found".to_string()),
});
}
}
let glob_roots = automation_workspace_scope_glob_roots(node, &present_paths);
for root in &glob_roots {
let resolved = workspace.join(root.trim_end_matches("/**"));
let exists = resolved.exists();
invocation_parts.push(MessagePart::ToolInvocation {
tool: "glob".to_string(),
args: json!({ "pattern": root }),
result: Some(json!({
"pattern": root,
"exists": exists,
})),
error: None,
});
}
let status = "completed";
let title = format!(
"# Repository Scope Assessment\n\nRun ID: `{run_id}`\nNode ID: `{}`\n",
node.node_id
);
let mut artifact_text = String::new();
artifact_text.push_str(&title);
artifact_text.push_str(&format!("\n## Status\n\n`{status}`\n"));
artifact_text.push_str("\n## Present Paths\n\n");
if present_paths.is_empty() {
artifact_text.push_str("- none\n");
} else {
for path in &present_paths {
artifact_text.push_str(&format!("- `{path}`\n"));
}
}
artifact_text.push_str("\n## Missing Paths\n\n");
if missing_paths.is_empty() {
artifact_text.push_str("- none\n");
} else {
for path in &missing_paths {
artifact_text.push_str(&format!("- `{path}`\n"));
}
}
artifact_text.push_str("\n## Glob Roots\n\n");
if glob_roots.is_empty() {
artifact_text.push_str("- none inferred\n");
} else {
for root in &glob_roots {
artifact_text.push_str(&format!("- `{root}`\n"));
}
}
artifact_text.push_str("\n## Source Material\n\n");
artifact_text.push_str("```json\n");
artifact_text.push_str(&serde_json::to_string_pretty(&json!(source_material))?);
artifact_text.push_str("\n```\n");
std::fs::write(&resolved_output, &artifact_text)?;
let display_path = resolved_output
.strip_prefix(workspace_root)
.ok()
.and_then(|value| value.to_str().map(str::to_string))
.filter(|value| !value.is_empty())
.unwrap_or_else(|| output_path.to_string());
let mut session = state
.storage
.get_session(session_id)
.await
.unwrap_or_else(|| {
Session::new(
Some(format!(
"Automation {} / {}",
automation.automation_id, node.node_id
)),
Some(workspace_root.to_string()),
)
});
session.project_id = Some(automation_workspace_project_id(workspace_root));
session.workspace_root = Some(workspace_root.to_string());
invocation_parts.push(MessagePart::Text {
text: format!(
"Workspace scope preflight {status} for `{}` and wrote `{}`.\n\n{{\"status\":\"{status}\"}}",
node.node_id, display_path
),
});
session.messages.push(tandem_types::Message::new(
MessageRole::Assistant,
invocation_parts,
));
state.storage.save_session(session.clone()).await?;
let artifact_validation = json!({
"accepted_candidate_source": "deterministic_workspace_scope_preflight",
"validation_outcome": "accepted",
"unmet_requirements": [],
"present_paths": present_paths,
"missing_paths": missing_paths,
"glob_roots": glob_roots,
});
Ok(Some(
node_output::wrap_automation_node_output_with_automation(
automation,
node,
&session,
requested_tools,
session_id,
Some(run_id),
&format!("{{\"status\":\"{status}\"}}"),
Some((display_path, artifact_text)),
Some(artifact_validation),
),
))
}
fn automation_node_is_workspace_scope_preflight(node: &AutomationFlowNode) -> bool {
node.metadata
.as_ref()
.and_then(|metadata| metadata.get("builder"))
.and_then(Value::as_object)
.is_some_and(|builder| {
["task_class", "task_kind"]
.iter()
.filter_map(|key| builder.get(*key).and_then(Value::as_str))
.any(|value| value.trim().eq_ignore_ascii_case("scope"))
})
}
pub(crate) fn automation_value_contains_false_flag(value: &Value, flag: &str) -> bool {
match value {
Value::Object(object) => {
object.get(flag).and_then(Value::as_bool) == Some(false)
|| object
.values()
.any(|child| automation_value_contains_false_flag(child, flag))
}
Value::Array(items) => items
.iter()
.any(|child| automation_value_contains_false_flag(child, flag)),
_ => false,
}
}
pub(crate) fn automation_node_schema_has_required_fields(
node: &AutomationFlowNode,
fields: &[&str],
) -> bool {
let Some(schema) = node
.output_contract
.as_ref()
.and_then(|contract| contract.schema.as_ref())
else {
return false;
};
let required = schema
.get("required")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.collect::<HashSet<_>>()
})
.unwrap_or_default();
fields.iter().all(|field| required.contains(field))
}
fn automation_node_declares_output_fields(node: &AutomationFlowNode, fields: &[&str]) -> bool {
if automation_node_schema_has_required_fields(node, fields) {
return true;
}
let mut declared = HashSet::new();
if let Some(schema) = node
.output_contract
.as_ref()
.and_then(|contract| contract.schema.as_ref())
{
if let Some(properties) = schema.get("properties").and_then(Value::as_object) {
declared.extend(properties.keys().map(String::as_str));
}
}
let prompt = node
.metadata
.as_ref()
.and_then(|metadata| metadata.get("builder"))
.and_then(|builder| builder.get("prompt"))
.and_then(Value::as_str)
.unwrap_or_default();
fields.iter().all(|field| {
declared.contains(field)
|| prompt.contains(&format!("\"{field}\""))
|| prompt.contains(field)
})
}
pub(crate) fn automation_empty_upstream_artifact_for_node(
node: &AutomationFlowNode,
upstream_inputs: &[Value],
) -> Option<(&'static str, Value)> {
let upstream_has_no_work = upstream_inputs
.iter()
.any(|input| automation_value_contains_false_flag(input, "has_work"));
let upstream_has_no_candidates = upstream_inputs
.iter()
.any(|input| automation_value_contains_false_flag(input, "has_candidates"));
let upstream_has_no_high_value_contacts = upstream_inputs
.iter()
.any(|input| automation_value_contains_false_flag(input, "has_high_value_contacts"));
let upstream_has_no_rows_to_write = upstream_inputs
.iter()
.any(|input| automation_value_contains_false_flag(input, "has_rows_to_write"));
if upstream_has_no_work
&& automation_node_declares_output_fields(
node,
&["candidates_by_company", "has_candidates"],
)
{
return Some((
"has_work",
json!({
"schema_version": "1",
"candidates_by_company": [],
"has_candidates": false,
}),
));
}
if upstream_has_no_candidates
&& automation_node_declares_output_fields(
node,
&["scored_by_company", "has_high_value_contacts"],
)
{
return Some((
"has_candidates",
json!({
"schema_version": "1",
"scored_by_company": [],
"has_high_value_contacts": false,
}),
));
}
if upstream_has_no_high_value_contacts
&& automation_node_declares_output_fields(
node,
&[
"ready_to_write",
"duplicates_or_skipped",
"has_rows_to_write",
],
)
{
return Some((
"has_high_value_contacts",
json!({
"schema_version": "1",
"ready_to_write": [],
"duplicates_or_skipped": [],
"has_rows_to_write": false,
}),
));
}
if upstream_has_no_rows_to_write
&& automation_node_declares_output_fields(
node,
&["created_pages", "skipped_count", "summary"],
)
{
return Some((
"has_rows_to_write",
json!({
"artifact_kind": "notion_write_result",
"created_pages": [],
"skipped_count": 0,
"summary": "No contacts were ready to write, so no external Notion write was performed.",
}),
));
}
None
}
pub(crate) async fn try_execute_empty_upstream_structured_short_circuit(
state: &AppState,
run_id: &str,
automation: &AutomationV2Spec,
node: &AutomationFlowNode,
session_id: &str,
workspace_root: &str,
required_output_path: Option<&str>,
requested_tools: &[String],
upstream_inputs: &[Value],
) -> anyhow::Result<Option<Value>> {
let Some((empty_flag, artifact)) =
automation_empty_upstream_artifact_for_node(node, upstream_inputs)
else {
return Ok(None);
};
let Some(output_path) = required_output_path else {
return Ok(None);
};
let resolved_output =
resolve_automation_output_path_for_run(workspace_root, run_id, output_path)?;
if let Some(parent) = resolved_output.parent() {
std::fs::create_dir_all(parent)?;
}
let artifact_text = serde_json::to_string_pretty(&artifact)?;
std::fs::write(&resolved_output, &artifact_text)?;
let display_path = resolved_output
.strip_prefix(workspace_root)
.ok()
.and_then(|value| value.to_str().map(str::to_string))
.filter(|value| !value.is_empty())
.unwrap_or_else(|| output_path.to_string());
let mut session = state
.storage
.get_session(session_id)
.await
.unwrap_or_else(|| {
Session::new(
Some(format!(
"Automation {} / {}",
automation.automation_id, node.node_id
)),
Some(workspace_root.to_string()),
)
});
session.project_id = Some(automation_workspace_project_id(workspace_root));
session.workspace_root = Some(workspace_root.to_string());
session.messages.push(tandem_types::Message::new(
MessageRole::Assistant,
vec![MessagePart::Text {
text: format!(
"Upstream reported `{empty_flag}: false`; skipped connector calls for `{}` and wrote `{}`.\n\n{}",
node.node_id, display_path, artifact_text
),
}],
));
state.storage.save_session(session.clone()).await?;
let artifact_validation = json!({
"accepted_candidate_source": "deterministic_empty_upstream_short_circuit",
"validation_outcome": "accepted",
"unmet_requirements": [],
});
Ok(Some(
node_output::wrap_automation_node_output_with_automation(
automation,
node,
&session,
requested_tools,
session_id,
Some(run_id),
"{\"status\":\"completed\"}",
Some((display_path, artifact_text)),
Some(artifact_validation),
),
))
}
fn automation_workspace_scope_path_candidates(node: &AutomationFlowNode) -> Vec<String> {
let mut paths = Vec::new();
let mut seen = HashSet::new();
for token in node.objective.split_whitespace() {
let candidate = automation_clean_scope_path_token(token);
if candidate.is_empty()
|| candidate.contains('*')
|| candidate.starts_with("http://")
|| candidate.starts_with("https://")
|| !candidate.contains('/')
{
continue;
}
if seen.insert(candidate.clone()) {
paths.push(candidate);
}
}
paths
}
fn automation_workspace_scope_glob_roots(
node: &AutomationFlowNode,
present_paths: &[String],
) -> Vec<String> {
let mut roots = Vec::new();
let mut seen = HashSet::new();
for token in node.objective.split_whitespace() {
let candidate = automation_clean_scope_path_token(token);
if candidate.contains('*') && seen.insert(candidate.clone()) {
roots.push(candidate);
}
}
for path in present_paths {
let mut parts = path.split('/').collect::<Vec<_>>();
if parts.len() > 1 {
parts.pop();
let root = format!("{}/**", parts.join("/"));
if seen.insert(root.clone()) {
roots.push(root);
}
}
}
roots
}
fn automation_clean_scope_path_token(token: &str) -> String {
token
.trim_matches(|ch: char| {
ch == '`'
|| ch == '\''
|| ch == '"'
|| ch == ','
|| ch == '.'
|| ch == ';'
|| ch == ':'
|| ch == '('
|| ch == ')'
|| ch == '['
|| ch == ']'
|| ch == '{'
|| ch == '}'
})
.trim()
.to_string()
}
fn build_connector_preflight_blocked_output(
node: &AutomationFlowNode,
requested_tools: &[String],
capability_resolution: &Value,
mcp_tool_diagnostics: &Value,
detail: &str,
extra: Value,
) -> Value {
let mut output =
crate::automation_v2::executor::build_node_execution_error_output_with_category(
node,
detail,
false,
"tool_resolution_failed",
);
if let Some(object) = output.as_object_mut() {
object.insert(
"tool_telemetry".to_string(),
automation_initialized_attempt_tool_telemetry(requested_tools, capability_resolution),
);
object.insert(
"capability_resolution".to_string(),
capability_resolution.clone(),
);
object.insert(
"mcp_tool_diagnostics".to_string(),
mcp_tool_diagnostics.clone(),
);
object.insert("connector_preflight".to_string(), extra);
}
automation_mark_tool_resolution_output_blocked(&mut output, detail);
output
}
pub(crate) fn automation_node_max_attempts(node: &AutomationFlowNode) -> u32 {
let explicit = node
.retry_policy
.as_ref()
.and_then(|value| value.get("max_attempts"))
.and_then(Value::as_u64)
.map(|value| value.clamp(1, 10) as u32);
if let Some(value) = explicit {
return value;
}
let validator_kind = automation_output_validator_kind(node);
if validator_kind == crate::AutomationOutputValidatorKind::StandupUpdate {
return 3;
}
if validator_kind == crate::AutomationOutputValidatorKind::ResearchBrief
|| !automation_node_required_tools(node).is_empty()
{
5
} else {
3
}
}
pub(crate) fn automation_output_is_blocked(output: &Value) -> bool {
output
.get("status")
.and_then(Value::as_str)
.is_some_and(|value| value.eq_ignore_ascii_case("blocked"))
}
pub(crate) fn automation_output_is_verify_failed(output: &Value) -> bool {
output
.get("status")
.and_then(Value::as_str)
.is_some_and(|value| value.eq_ignore_ascii_case("verify_failed"))
}
pub(crate) fn automation_output_needs_repair(output: &Value) -> bool {
output
.get("status")
.and_then(Value::as_str)
.is_some_and(|value| value.eq_ignore_ascii_case("needs_repair"))
}
pub(crate) fn automation_output_has_warnings(output: &Value) -> bool {
output
.get("validator_summary")
.and_then(|value| value.get("warning_count"))
.and_then(Value::as_u64)
.unwrap_or_else(|| {
output
.get("artifact_validation")
.and_then(|value| value.get("warning_count"))
.and_then(Value::as_u64)
.unwrap_or(0)
})
> 0
}
pub(crate) fn automation_output_repair_exhausted(output: &Value) -> bool {
output
.get("artifact_validation")
.and_then(|value| value.get("repair_exhausted"))
.and_then(Value::as_bool)
.unwrap_or(false)
}
pub(crate) fn automation_output_failure_reason(output: &Value) -> Option<String> {
output
.get("blocked_reason")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
pub(crate) fn automation_output_blocked_reason(output: &Value) -> Option<String> {
output
.get("blocked_reason")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
pub(crate) fn automation_output_is_passing(output: &Value) -> bool {
output
.get("validator_summary")
.and_then(|v| v.get("outcome"))
.and_then(Value::as_str)
.is_some_and(|outcome| {
outcome.eq_ignore_ascii_case("passed")
|| outcome.eq_ignore_ascii_case("accepted_with_warnings")
})
&& output
.get("validator_summary")
.and_then(|v| v.get("unmet_requirements"))
.and_then(Value::as_array)
.map(|reqs| reqs.is_empty())
.unwrap_or(false)
}
pub(crate) fn automation_node_has_passing_artifact(
node_id: &str,
checkpoint: &crate::automation_v2::types::AutomationRunCheckpoint,
) -> bool {
checkpoint
.node_outputs
.get(node_id)
.map(automation_output_is_passing)
.unwrap_or(false)
}
pub(crate) async fn resolve_automation_v2_workspace_root(
state: &AppState,
automation: &AutomationV2Spec,
) -> String {
if let Some(workspace_root) = automation
.workspace_root
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
{
return workspace_root;
}
if let Some(workspace_root) = automation
.metadata
.as_ref()
.and_then(|row| row.get("workspace_root"))
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
{
return workspace_root;
}
state.workspace_index.snapshot().await.root
}
pub(crate) fn automation_declared_output_paths(automation: &AutomationV2Spec) -> Vec<String> {
let mut paths = Vec::new();
for target in &automation.output_targets {
let trimmed = target.trim();
if !trimmed.is_empty() && !paths.iter().any(|existing| existing == trimmed) {
paths.push(trimmed.to_string());
}
}
for node in &automation.flow.nodes {
if let Some(path) = automation_node_required_output_path(node) {
let trimmed = path.trim();
if !trimmed.is_empty() && !paths.iter().any(|existing| existing == trimmed) {
paths.push(trimmed.to_string());
}
}
}
paths
}
pub(crate) fn automation_declared_output_paths_for_run(
automation: &AutomationV2Spec,
run_id: &str,
) -> Vec<String> {
automation
.flow
.nodes
.iter()
.filter_map(automation_node_required_output_path)
.filter_map(|path| automation_run_scoped_output_path(run_id, &path))
.collect::<Vec<_>>()
}
pub(crate) async fn clear_automation_declared_outputs(
state: &AppState,
automation: &AutomationV2Spec,
run_id: &str,
) -> anyhow::Result<()> {
let workspace_root = resolve_automation_v2_workspace_root(state, automation).await;
for output_path in automation_declared_output_paths_for_run(automation, run_id) {
if let Ok(resolved) = resolve_automation_output_path(&workspace_root, &output_path) {
if resolved.exists() {
let _ = std::fs::remove_file(&resolved);
}
}
}
remove_suspicious_automation_marker_files(&workspace_root);
Ok(())
}
fn automation_session_write_policy_for_node(
automation: &AutomationV2Spec,
node: &AutomationFlowNode,
workspace_root: &str,
run_id: &str,
execution_mode: &str,
required_output_path: Option<&str>,
runtime_values: &AutomationPromptRuntimeValues,
) -> tandem_core::SessionWritePolicy {
if matches!(execution_mode, "git_patch" | "filesystem_patch") {
return tandem_core::SessionWritePolicy {
mode: tandem_core::SessionWritePolicyMode::RepoEdit,
allowed_paths: Vec::new(),
reason: "automation node is an explicit code workflow".to_string(),
};
}
let mut allowed_paths = Vec::new();
if let Some(output_path) = required_output_path {
if let Ok(candidates) =
automation_output_path_candidates(workspace_root, run_id, node, output_path)
{
allowed_paths.extend(candidates.into_iter().map(|path| {
path.strip_prefix(workspace_root)
.ok()
.and_then(|value| value.to_str())
.map(str::to_string)
.unwrap_or_else(|| path.to_string_lossy().to_string())
}));
} else {
allowed_paths.push(output_path.to_string());
}
}
allowed_paths.extend(automation_node_must_write_files_for_automation(
automation,
node,
Some(runtime_values),
));
allowed_paths.sort();
allowed_paths.dedup();
tandem_core::SessionWritePolicy {
mode: if allowed_paths.is_empty() {
tandem_core::SessionWritePolicyMode::ArtifactOnly
} else {
tandem_core::SessionWritePolicyMode::ExplicitTargets
},
allowed_paths,
reason: "automation artifact node is restricted to declared outputs".to_string(),
}
}
fn automation_source_mutating_tool_is_blocked_for_read_only_node(tool: &str) -> bool {
matches!(
tool,
"apply_patch" | "edit" | "bash" | "shell" | "exec" | "exec_command"
)
}
fn automation_mark_tool_resolution_output_blocked(output: &mut Value, reason: &str) {
if let Some(object) = output.as_object_mut() {
object.insert("status".to_string(), json!("blocked"));
object.insert("blocked_reason".to_string(), json!(reason));
object.insert("failure_kind".to_string(), json!("tool_resolution_failed"));
if let Some(summary) = object
.get_mut("validator_summary")
.and_then(Value::as_object_mut)
{
summary.insert("outcome".to_string(), json!("blocked"));
summary.insert("reason".to_string(), json!(reason));
}
if let Some(validation) = object
.get_mut("artifact_validation")
.and_then(Value::as_object_mut)
{
validation.insert("semantic_block_reason".to_string(), json!(reason));
validation.insert("repair_exhausted".to_string(), json!(true));
validation.insert(
"blocking_classification".to_string(),
json!("tool_unavailable"),
);
}
}
}
pub async fn clear_automation_subtree_outputs(
state: &AppState,
automation: &AutomationV2Spec,
run_id: &str,
node_ids: &std::collections::HashSet<String>,
) -> anyhow::Result<Vec<String>> {
let workspace_root = resolve_automation_v2_workspace_root(state, automation).await;
let mut cleared = Vec::new();
for node in &automation.flow.nodes {
if !node_ids.contains(&node.node_id) {
continue;
}
let Some(output_path) = automation_node_required_output_path(node) else {
continue;
};
let candidates =
automation_output_path_candidates(&workspace_root, run_id, node, &output_path)?;
for resolved in candidates {
if !resolved.exists() || !resolved.is_file() {
continue;
}
std::fs::remove_file(&resolved).map_err(|error| {
anyhow::anyhow!(
"failed to clear subtree output `{}` for automation `{}`: {}",
output_path,
automation.automation_id,
error
)
})?;
if let Some(display) = resolved
.strip_prefix(&workspace_root)
.ok()
.and_then(|value| value.to_str().map(str::to_string))
.filter(|value| !value.is_empty())
{
cleared.push(display);
} else {
cleared.push(output_path.clone());
}
}
}
let had_markers = !list_suspicious_automation_marker_files(&workspace_root).is_empty();
if had_markers {
remove_suspicious_automation_marker_files(&workspace_root);
}
cleared.sort();
cleared.dedup();
Ok(cleared)
}
pub(crate) async fn run_automation_node_prompt_with_timeout<F>(
state: &AppState,
session_id: &str,
run_id: &str,
node: &AutomationFlowNode,
future: F,
) -> anyhow::Result<()>
where
F: std::future::Future<Output = anyhow::Result<()>>,
{
let idle_timeout_ms = effective_automation_node_timeout_ms(node);
let absolute_timeout_ms = effective_automation_node_absolute_timeout_ms(node);
let mut event_rx = state.event_bus.subscribe();
if let Err(active_run) = state
.run_registry
.acquire(
session_id,
run_id.to_string(),
None,
Some(node.agent_id.clone()),
None,
)
.await
{
tracing::warn!(
session_id = %session_id,
run_id = %run_id,
active_run_id = %active_run.run_id,
"automation node prompt could not acquire run heartbeat due active session registry conflict"
);
}
let mut future = Box::pin(future);
let mut ticker = tokio::time::interval(Duration::from_secs(3));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let absolute_deadline = tokio::time::Instant::now() + Duration::from_millis(absolute_timeout_ms);
let mut absolute_timeout = Box::pin(tokio::time::sleep_until(absolute_deadline));
let mut idle_timeout = Box::pin(tokio::time::sleep(Duration::from_millis(idle_timeout_ms)));
loop {
tokio::select! {
_ = ticker.tick() => {
state
.run_registry
.touch(session_id, run_id)
.await;
}
event = event_rx.recv() => {
if let Ok(event) = event {
if automation_node_event_is_progress_for_session(&event, session_id) {
idle_timeout.as_mut().reset(tokio::time::Instant::now() + Duration::from_millis(idle_timeout_ms));
}
}
}
_ = &mut idle_timeout => {
let _ = state.cancellations.cancel(session_id).await;
let _ = state
.run_registry
.finish_if_match(session_id, run_id)
.await;
anyhow::bail!(
"automation node `{}` idle timed out after {} ms without progress",
node.node_id,
idle_timeout_ms
);
}
_ = &mut absolute_timeout => {
let _ = state.cancellations.cancel(session_id).await;
let _ = state
.run_registry
.finish_if_match(session_id, run_id)
.await;
anyhow::bail!(
"automation node `{}` absolute timed out after {} ms",
node.node_id,
absolute_timeout_ms
);
}
result = &mut future => {
let _ = state
.run_registry
.finish_if_match(session_id, run_id)
.await;
return result;
}
}
}
}
pub(crate) fn automation_node_prompt_timeout_error(
error: &anyhow::Error,
node: &AutomationFlowNode,
) -> bool {
let message = error.to_string();
message.contains(&format!(
"automation node `{}` idle timed out after",
node.node_id
)) || message.contains(&format!(
"automation node `{}` absolute timed out after",
node.node_id
)) || message.contains(&format!(
"automation node `{}` timed out after",
node.node_id
))
}
pub(crate) fn automation_node_event_is_progress_for_session(
event: &tandem_types::EngineEvent,
session_id: &str,
) -> bool {
let matches_session = event
.properties
.get("sessionID")
.or_else(|| event.properties.get("session_id"))
.and_then(serde_json::Value::as_str)
.is_some_and(|value| value == session_id);
if !matches_session {
return false;
}
matches!(
event.event_type.as_str(),
"message.part.updated"
| "provider.call.first_byte"
| "provider.call.iteration.finish"
| "provider.call.iteration.retry"
| "provider.usage"
| "tool.progress"
| "tool.call.progress"
| "tool.effect"
) || event.event_type.starts_with("tool.")
}
pub(crate) fn effective_automation_node_absolute_timeout_ms(node: &AutomationFlowNode) -> u64 {
effective_automation_node_timeout_ms(node)
.saturating_mul(3)
.max(effective_automation_node_timeout_ms(node).saturating_add(60_000))
}
pub(crate) fn effective_automation_node_timeout_ms(node: &AutomationFlowNode) -> u64 {
node.timeout_ms
.filter(|value| *value > 0)
.unwrap_or_else(|| match automation_output_validator_kind(node) {
crate::AutomationOutputValidatorKind::StandupUpdate => 120_000,
crate::AutomationOutputValidatorKind::StructuredJson
if automation_node_needs_long_execute_budget(node) =>
{
crate::config::env::resolve_automation_execute_node_timeout_ms()
}
crate::AutomationOutputValidatorKind::StructuredJson => 180_000,
_ if automation_node_needs_long_execute_budget(node) => {
crate::config::env::resolve_automation_execute_node_timeout_ms()
}
_ => 600_000,
})
}
fn automation_node_needs_long_execute_budget(node: &AutomationFlowNode) -> bool {
let node_id = node.node_id.trim().to_ascii_lowercase();
if node_id == "execute_goal" || node_id.ends_with("_execute_goal") {
return true;
}
let objective = node.objective.to_ascii_lowercase();
objective.contains("execute the requested automation goal")
|| objective.contains("execute the automation goal")
|| automation_node_uses_external_data_source_budget(&node_id, &objective)
}
fn automation_node_uses_external_data_source_budget(node_id: &str, objective: &str) -> bool {
let mentions_external_source = objective.contains("mcp.")
|| objective.contains("notion")
|| objective.contains("collection://")
|| objective.contains("reddit")
|| objective.contains("web research")
|| objective.contains("web_research")
|| objective.contains("websearch")
|| objective.contains("webfetch");
let is_inspection_or_fetch = node_id.contains("inspect")
|| node_id.contains("assess")
|| node_id.contains("fetch")
|| node_id.contains("gather")
|| node_id.contains("research")
|| node_id.contains("search")
|| objective.contains("check")
|| objective.contains("fetch")
|| objective.contains("gather")
|| objective.contains("inspect")
|| objective.contains("search")
|| objective.contains("database")
|| objective.contains("collection");
mentions_external_source && is_inspection_or_fetch
}
pub(crate) async fn execute_automation_v2_node(
state: &AppState,
run_id: &str,
automation: &AutomationV2Spec,
node: &AutomationFlowNode,
agent: &AutomationAgentProfile,
) -> anyhow::Result<Value> {
let run = state
.get_automation_v2_run(run_id)
.await
.ok_or_else(|| anyhow::anyhow!("automation run `{}` not found", run_id))?;
let start_cost_usd = run.estimated_cost_usd;
let start_prompt_tokens = run.prompt_tokens;
let start_completion_tokens = run.completion_tokens;
let prevalidated = {
let scheduler = state.automation_scheduler.read().await;
if scheduler
.preexisting_registry
.is_artifact_prevalidated(run_id, &node.node_id)
{
let path = scheduler
.preexisting_registry
.get_prevalidated_path(run_id, &node.node_id)
.map(|s| s.to_string());
let digest = scheduler
.preexisting_registry
.artifacts
.get(run_id)
.and_then(|m| m.get(&node.node_id))
.map(|a| a.content_digest.clone());
Some((path, digest))
} else {
None
}
};
if let Some((Some(output_path), Some(content_digest))) = prevalidated {
let workspace_root = resolve_automation_v2_workspace_root(state, automation).await;
let resolved =
resolve_automation_output_path_for_run(&workspace_root, run_id, &output_path)?;
if resolved.exists() {
let current_content = std::fs::read_to_string(&resolved).ok();
let current_digest = current_content.as_ref().map(|c| sha256_hex(&[c]));
if current_digest.as_ref() == Some(&content_digest) {
tracing::info!(
run_id = %run_id,
node_id = %node.node_id,
path = %output_path,
"reusing prevalidated artifact from registry (MWF-300)"
);
let mut session = Session::new(
Some(format!(
"Automation {} / {} (Reused)",
automation.automation_id, node.node_id
)),
Some(workspace_root.clone()),
);
let session_id = session.id.clone();
session.project_id = Some(automation_workspace_project_id(&workspace_root));
session.workspace_root = Some(workspace_root.clone());
session.messages.push(tandem_types::Message::new(
tandem_types::MessageRole::Assistant,
vec![tandem_types::MessagePart::Text {
text: format!(
"Reusing previously validated artifact `{}`.\n\n{{\"status\":\"completed\"}}",
output_path
),
}],
));
state.storage.save_session(session.clone()).await?;
let mut output = node_output::wrap_automation_node_output_with_automation(
automation,
node,
&session,
&[],
&session_id,
Some(run_id),
"Reusing previously validated artifact.",
Some((output_path, current_content.unwrap())),
Some(json!({
"accepted_candidate_source": "preexisting_output",
"status": "reused_valid"
})),
);
return Ok(output);
}
}
}
let attempt = run
.checkpoint
.node_attempts
.get(&node.node_id)
.copied()
.unwrap_or(1);
let workspace_root = resolve_automation_v2_workspace_root(state, automation).await;
let upstream_inputs = build_automation_v2_upstream_inputs(&run, node, &workspace_root)?;
let workspace_path = PathBuf::from(&workspace_root);
if !workspace_path.exists() {
anyhow::bail!(
"workspace_root `{}` for automation `{}` does not exist",
workspace_root,
automation.automation_id
);
}
if !workspace_path.is_dir() {
anyhow::bail!(
"workspace_root `{}` for automation `{}` is not a directory",
workspace_root,
automation.automation_id
);
}
let run_started_at_ms = run.started_at_ms.unwrap_or_else(now_ms);
let required_output_path = automation_effective_required_output_path_for_run(
automation,
node,
run_id,
run_started_at_ms,
);
let inline_artifact_payload = automation_node_inline_artifact_payload(node);
let inline_output_path = required_output_path.clone().or_else(|| {
inline_artifact_payload.as_ref()?;
Some(format!(
".tandem/artifacts/{}.json",
node.node_id.replace("_", "-")
))
});
if let (Some(output_path), Some(payload)) =
(inline_output_path.as_deref(), inline_artifact_payload)
{
let verified_output =
write_automation_inline_artifact(&workspace_root, run_id, output_path, &payload)?;
let mut session = Session::new(
Some(format!(
"Automation {} / {}",
automation.automation_id, node.node_id
)),
Some(workspace_root.clone()),
);
let session_id = session.id.clone();
session.project_id = Some(automation_workspace_project_id(&workspace_root));
session.workspace_root = Some(workspace_root.clone());
session.messages.push(tandem_types::Message::new(
MessageRole::Assistant,
vec![MessagePart::Text {
text: format!(
"Prepared deterministic workflow artifact `{}` from the node inputs.\n\n{{\"status\":\"completed\"}}",
output_path
),
}],
));
state.storage.save_session(session.clone()).await?;
tracing::info!(
run_id = %run_id,
automation_id = %automation.automation_id,
node_id = %node.node_id,
output_path = %output_path,
"automation node used deterministic inline artifact shortcut"
);
let mut output = node_output::wrap_automation_node_output_with_automation(
automation,
node,
&session,
&[],
&session_id,
Some(run_id),
"Prepared deterministic workflow artifact from inline node inputs.",
Some(verified_output),
Some(json!({
"deterministic_artifact": true,
"deterministic_source": "node_metadata_inputs",
"accepted_candidate_source": "verified_output",
"validation_outcome": "passed",
"unmet_requirements": [],
})),
);
if let Some(object) = output.as_object_mut() {
object.insert("status".to_string(), json!("completed"));
object.insert("blocked_reason".to_string(), Value::Null);
}
return Ok(output);
}
let template = if let Some(template_id) = agent.template_id.as_deref().map(str::trim) {
if template_id.is_empty() {
None
} else {
resolve_automation_agent_template(state, &workspace_root, template_id)
.await?
.ok_or_else(|| anyhow::anyhow!("agent template `{}` not found", template_id))
.map(Some)?
}
} else {
None
};
let mut session = Session::new(
Some(format!(
"Automation {} / {}",
automation.automation_id, node.node_id
)),
Some(workspace_root.clone()),
);
let session_id = session.id.clone();
let project_id = automation_workspace_project_id(&workspace_root);
session.project_id = Some(project_id.clone());
session.workspace_root = Some(workspace_root.clone());
state.storage.save_session(session).await?;
state.add_automation_v2_session(run_id, &session_id).await;
let mut allowlist = merge_automation_agent_allowlist(agent, template.as_ref());
if let Some(mcp_tools) = agent.mcp_policy.allowed_tools.as_ref() {
allowlist.extend(mcp_tools.clone());
}
let mcp_tool_diagnostics = sync_automation_allowed_mcp_servers(
state,
node,
&agent.mcp_policy.allowed_servers,
&allowlist,
)
.await;
let available_tool_schemas = state.tools.list().await;
let available_tool_names = available_tool_schemas
.iter()
.map(|schema| schema.name.clone())
.collect::<HashSet<_>>();
let requested_tools = automation_requested_tools_for_node(
node,
&workspace_root,
allowlist.clone(),
&available_tool_names,
);
let selected_mcp_server_names = mcp_tool_diagnostics
.get("selected_servers")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(str::to_string)
.collect::<Vec<_>>()
})
.unwrap_or_default();
let selected_mcp_wildcard_server_names = mcp_tool_diagnostics
.get("wildcard_selected_servers")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(str::to_string)
.collect::<Vec<_>>()
})
.unwrap_or_else(|| selected_mcp_server_names.clone());
let selected_mcp_source = mcp_tool_diagnostics
.get("selected_source")
.and_then(Value::as_str)
.unwrap_or("none")
.to_string();
let execution_mode = automation_node_execution_mode(node, &workspace_root);
let mut requested_tools = requested_tools;
let has_concrete_mcp_tool_policy = allowlist
.iter()
.any(|tool| tool.starts_with("mcp.") && !tool.ends_with(".*"));
if !has_concrete_mcp_tool_policy {
requested_tools.extend(automation_requested_server_scoped_mcp_tools(
node,
&selected_mcp_wildcard_server_names,
));
}
if node_runtime_impl::automation_node_requires_email_draft_without_send(node) {
requested_tools.retain(|tool| {
!tool.starts_with("mcp.")
|| (automation_tool_name_is_email_draft(tool)
&& !automation_tool_name_is_email_send(tool))
});
}
let mut required_concrete_mcp_tools = automation_node_required_concrete_mcp_tools(node);
required_concrete_mcp_tools.extend(
automation_node_required_tool_calls(node)
.into_iter()
.map(|call| call.tool)
.filter(|tool| tool.starts_with("mcp.") && !tool.ends_with(".*")),
);
required_concrete_mcp_tools.sort();
required_concrete_mcp_tools.dedup();
requested_tools.extend(required_concrete_mcp_tools.clone());
if node_runtime_impl::automation_node_requires_email_draft_without_send(node) {
requested_tools.retain(|tool| {
!tool.starts_with("mcp.")
|| (automation_tool_name_is_email_draft(tool)
&& !automation_tool_name_is_email_send(tool))
});
}
if automation_node_uses_broad_read_only_source_guard(node)
&& !matches!(execution_mode, "git_patch" | "filesystem_patch")
{
requested_tools
.retain(|tool| !automation_source_mutating_tool_is_blocked_for_read_only_node(tool));
}
let denied_tools =
config::channels::normalize_allowed_tools(agent.tool_policy.denylist.clone());
if !denied_tools.is_empty() {
requested_tools.retain(|tool| {
!denied_tools.iter().any(|denied| {
denied == "*"
|| denied == tool
|| denied
.strip_suffix('*')
.is_some_and(|prefix| tool.starts_with(prefix))
})
});
}
requested_tools.sort();
requested_tools.dedup();
let has_selected_mcp_servers_policy = !selected_mcp_server_names.is_empty()
&& selected_mcp_source == "policy"
&& automation_output_validator_kind(node)
!= crate::AutomationOutputValidatorKind::ReviewDecision;
let requested_tools =
automation_add_mcp_list_when_scoped(requested_tools, has_selected_mcp_servers_policy);
let effective_offered_tools =
automation_expand_effective_offered_tools(&requested_tools, &available_tool_names);
let missing_concrete_mcp_tools = required_concrete_mcp_tools
.iter()
.filter(|tool| {
!effective_offered_tools
.iter()
.any(|offered| offered == *tool)
})
.cloned()
.collect::<Vec<_>>();
let mut capability_resolution = automation_resolve_capabilities_with_schemas(
node,
execution_mode,
&effective_offered_tools,
&available_tool_names,
&available_tool_schemas,
);
if automation_node_requires_email_delivery(node) || has_selected_mcp_servers_policy {
automation_merge_mcp_capability_diagnostics(
&mut capability_resolution,
&mcp_tool_diagnostics,
);
}
if let Some(detail) = automation_policy_mcp_preflight_blocker(&mcp_tool_diagnostics) {
let mut output =
crate::automation_v2::executor::build_node_execution_error_output_with_category(
node,
&detail,
false,
"tool_resolution_failed",
);
if let Some(object) = output.as_object_mut() {
object.insert(
"tool_telemetry".to_string(),
automation_initialized_attempt_tool_telemetry(
&requested_tools,
&capability_resolution,
),
);
object.insert(
"capability_resolution".to_string(),
capability_resolution.clone(),
);
object.insert(
"mcp_tool_diagnostics".to_string(),
mcp_tool_diagnostics.clone(),
);
}
automation_mark_tool_resolution_output_blocked(&mut output, &detail);
return Ok(output);
}
if !missing_concrete_mcp_tools.is_empty() {
let detail = format!(
"required concrete MCP tool(s) were unavailable after MCP/tool sync: {}",
missing_concrete_mcp_tools.join(", ")
);
let mut output =
crate::automation_v2::executor::build_node_execution_error_output_with_category(
node,
&detail,
false,
"tool_resolution_failed",
);
if let Some(object) = output.as_object_mut() {
object.insert(
"tool_telemetry".to_string(),
automation_initialized_attempt_tool_telemetry(
&requested_tools,
&capability_resolution,
),
);
object.insert(
"capability_resolution".to_string(),
capability_resolution.clone(),
);
object.insert(
"mcp_tool_diagnostics".to_string(),
mcp_tool_diagnostics.clone(),
);
object.insert(
"missing_concrete_mcp_tools".to_string(),
json!(missing_concrete_mcp_tools),
);
}
automation_mark_tool_resolution_output_blocked(&mut output, &detail);
return Ok(output);
}
let missing_capabilities =
automation_capability_resolution_missing_capabilities(&capability_resolution);
let offered_tool_schemas = available_tool_schemas
.iter()
.filter(|schema| {
effective_offered_tools
.iter()
.any(|tool| tool == &schema.name)
})
.cloned()
.collect::<Vec<_>>();
let mcp_contracts = automation_mcp_contract_summaries(&offered_tool_schemas);
let required_tool_call_arg_validation = automation_required_tool_call_arg_validation(
&automation_node_required_tool_calls(node),
&offered_tool_schemas,
);
if let Some(object) = capability_resolution.as_object_mut() {
object.insert("mcp_contracts".to_string(), mcp_contracts.clone());
object.insert(
"required_tool_call_arg_validation".to_string(),
required_tool_call_arg_validation.clone(),
);
}
if !missing_capabilities.is_empty() {
let offered_tools_summary = if effective_offered_tools.is_empty() {
"none".to_string()
} else {
effective_offered_tools.join(", ")
};
let selected_servers_summary = {
let servers = mcp_tool_diagnostics
.get("selected_servers")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(str::to_string)
.collect::<Vec<_>>()
})
.unwrap_or_default();
if servers.is_empty() {
"none".to_string()
} else {
servers.join(", ")
}
};
let registered_tools_summary = {
let tools = mcp_tool_diagnostics
.get("registered_tools")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(str::to_string)
.collect::<Vec<_>>()
})
.unwrap_or_default();
if tools.is_empty() {
"none".to_string()
} else {
tools.join(", ")
}
};
let detail = format!(
"required automation capabilities were not offered after MCP/tool sync: {}. Offered tools: {}. Selected MCP servers: {}. Registered MCP tools after sync: {}.",
missing_capabilities.join(", "),
offered_tools_summary,
selected_servers_summary,
registered_tools_summary
);
let mut output =
crate::automation_v2::executor::build_node_execution_error_output_with_category(
node,
&detail,
false,
"tool_resolution_failed",
);
if let Some(object) = output.as_object_mut() {
object.insert(
"tool_telemetry".to_string(),
automation_initialized_attempt_tool_telemetry(
&requested_tools,
&capability_resolution,
),
);
object.insert(
"capability_resolution".to_string(),
capability_resolution.clone(),
);
}
return Ok(output);
}
if let Some(output) = try_execute_connector_preflight_node(
state,
run_id,
automation,
node,
&session_id,
&workspace_root,
required_output_path.as_deref(),
&requested_tools,
&effective_offered_tools,
&capability_resolution,
&mcp_tool_diagnostics,
)
.await?
{
state.clear_automation_v2_session(run_id, &session_id).await;
return Ok(output);
}
if let Some(output) = try_execute_workspace_scope_preflight_node(
state,
run_id,
automation,
node,
&session_id,
&workspace_root,
required_output_path.as_deref(),
&requested_tools,
)
.await?
{
state.clear_automation_v2_session(run_id, &session_id).await;
return Ok(output);
}
if let Some(output) = try_execute_empty_upstream_structured_short_circuit(
state,
run_id,
automation,
node,
&session_id,
&workspace_root,
required_output_path.as_deref(),
&requested_tools,
&upstream_inputs,
)
.await?
{
state.clear_automation_v2_session(run_id, &session_id).await;
return Ok(output);
}
let runtime_values = automation_prompt_runtime_values(run.started_at_ms);
let write_policy = automation_session_write_policy_for_node(
automation,
node,
&workspace_root,
run_id,
execution_mode,
required_output_path.as_deref(),
&runtime_values,
);
state
.set_automation_v2_session_mcp_servers(&session_id, selected_mcp_server_names.clone())
.await;
state
.engine_loop
.set_session_allowed_tools(&session_id, requested_tools.clone())
.await;
state
.engine_loop
.set_session_write_policy(&session_id, write_policy)
.await;
state
.engine_loop
.set_session_auto_approve_permissions(&session_id, true)
.await;
let model = resolve_automation_agent_model(state, agent, template.as_ref()).await;
let preexisting_output = required_output_path
.as_deref()
.and_then(|output_path| {
automation_output_path_candidates(&workspace_root, run_id, node, output_path)
.ok()
.and_then(|candidates| {
candidates
.into_iter()
.find(|candidate| candidate.exists() && candidate.is_file())
})
})
.and_then(|resolved| std::fs::read_to_string(resolved).ok());
let read_only_source_guard_paths = automation_read_only_source_guard_paths_for_node(
automation,
node,
&workspace_root,
Some(&runtime_values),
);
let read_only_source_snapshot =
automation_read_only_file_snapshot_for_node(&workspace_root, &read_only_source_guard_paths);
let mut read_only_source_snapshot_rollback =
ReadOnlySourceSnapshotRollback::armed(&workspace_root, &read_only_source_snapshot);
let workspace_snapshot_before = automation_workspace_root_file_snapshot(&workspace_root);
let standup_report_path =
if is_agent_standup_automation(automation) && node.node_id == "standup_synthesis" {
resolve_standup_report_path_for_run(automation, run_started_at_ms)
} else {
None
};
let previous_standup_context: Option<String> = if is_agent_standup_automation(automation) {
let report_path_template = resolve_standup_report_path_template(automation);
let run_ts = run.started_at_ms.unwrap_or_else(now_ms);
let previous_report = report_path_template.and_then(|template| {
for days_back in 1u64..=7 {
let previous_ts = run_ts.saturating_sub(days_back * 24 * 60 * 60 * 1000);
let candidate_path = if template.contains("{{date}}") {
let date = chrono::DateTime::<chrono::Utc>::from_timestamp_millis(
previous_ts as i64,
)
.unwrap_or_else(chrono::Utc::now)
.format("%Y-%m-%d")
.to_string();
template.replace("{{date}}", &date)
} else {
break;
};
if let Ok(resolved) =
resolve_automation_output_path(&workspace_root, &candidate_path)
{
if resolved.is_file() {
if let Ok(content) = std::fs::read_to_string(&resolved) {
let trimmed = content.trim();
if !trimmed.is_empty() {
return Some(format!(
"Previous Standup Report ({}):\n{}\n\nReport only NEW progress since the above. Do not repeat items already listed in the previous standup.",
candidate_path,
trimmed
));
}
}
}
}
}
None
});
previous_report
} else {
None
};
let knowledge_preflight =
automation_knowledge_preflight(state, automation, node, run_id, &project_id).await;
let (approved_learning_ids, workflow_learning_context) = state
.workflow_learning_context_for_automation_node(automation, node)
.await;
let knowledge_context = {
let base = knowledge_preflight.as_ref().and_then(|preflight| {
if !preflight.is_reusable() {
return None;
}
let rendered = preflight.format_for_injection();
if rendered.trim().is_empty() {
None
} else {
Some(rendered)
}
});
match (base, workflow_learning_context, previous_standup_context) {
(Some(base), Some(learning), Some(prev)) => {
Some(format!("{base}\n\n{learning}\n\n{prev}"))
}
(Some(base), Some(learning), None) => Some(format!("{base}\n\n{learning}")),
(Some(base), None, Some(prev)) => Some(format!("{base}\n\n{prev}")),
(None, Some(learning), Some(prev)) => Some(format!("{learning}\n\n{prev}")),
(Some(base), None, None) => Some(base),
(None, Some(learning), None) => Some(learning),
(None, None, Some(prev)) => Some(prev),
(None, None, None) => None,
}
};
let mcp_contract_guidance = automation_mcp_contract_prompt_guidance(&mcp_contracts);
if !approved_learning_ids.is_empty() {
let _ = state
.record_automation_v2_run_learning_usage(run_id, &approved_learning_ids)
.await;
}
let max_attempts = automation_node_max_attempts(node);
let mut prompt = render_automation_v2_prompt_with_options(
automation,
&workspace_root,
run_id,
node,
attempt,
agent,
&upstream_inputs,
&requested_tools,
template
.as_ref()
.and_then(|value| value.system_prompt.as_deref()),
standup_report_path.as_deref(),
if is_agent_standup_automation(automation) {
Some(project_id.as_str())
} else {
None
},
AutomationPromptRenderOptions {
summary_only_upstream: false,
knowledge_context: knowledge_context.clone(),
runtime_values: Some(runtime_values.clone()),
mcp_contract_guidance: mcp_contract_guidance.clone(),
},
);
let preserve_full_upstream_inputs = automation_node_preserves_full_upstream_inputs(node);
let mut preflight = build_automation_prompt_preflight(
&prompt,
&effective_offered_tools,
&offered_tool_schemas,
execution_mode,
&capability_resolution,
"standard",
false,
);
if automation_preflight_should_degrade(&preflight) && !upstream_inputs.is_empty() {
if preserve_full_upstream_inputs {
preflight = build_automation_prompt_preflight(
&prompt,
&effective_offered_tools,
&offered_tool_schemas,
execution_mode,
&capability_resolution,
"full_upstream_preserved",
true,
);
} else {
prompt = render_automation_v2_prompt_with_options(
automation,
&workspace_root,
run_id,
node,
attempt,
agent,
&upstream_inputs,
&requested_tools,
template
.as_ref()
.and_then(|value| value.system_prompt.as_deref()),
standup_report_path.as_deref(),
if is_agent_standup_automation(automation) {
Some(project_id.as_str())
} else {
None
},
AutomationPromptRenderOptions {
summary_only_upstream: true,
knowledge_context: knowledge_context.clone(),
runtime_values: Some(runtime_values.clone()),
mcp_contract_guidance: mcp_contract_guidance.clone(),
},
);
preflight = build_automation_prompt_preflight(
&prompt,
&effective_offered_tools,
&offered_tool_schemas,
execution_mode,
&capability_resolution,
"summary_only_upstream",
true,
);
}
}
if let Some(repair_brief) = render_automation_repair_brief(
node,
run.checkpoint.node_outputs.get(&node.node_id),
attempt,
max_attempts,
Some(run_id),
) {
prompt.push_str("\n\n");
prompt.push_str(&repair_brief);
}
let req = SendMessageRequest {
parts: vec![MessagePartInput::Text { text: prompt }],
model: model.clone(),
agent: None,
tool_mode: Some(ToolMode::Required),
tool_allowlist: Some(requested_tools.clone()),
strict_kb_grounding: None,
context_mode: None,
write_required: required_output_path.as_ref().map(|_| true),
prewrite_requirements: automation_node_prewrite_requirements(node, &requested_tools),
};
let result = run_automation_node_prompt_with_timeout(
state,
&session_id,
run_id,
node,
state.engine_loop.run_prompt_async_with_context(
session_id.clone(),
req,
Some(format!("automation-v2:{run_id}")),
),
)
.await;
tokio::task::yield_now().await;
state
.engine_loop
.clear_session_allowed_tools(&session_id)
.await;
state
.engine_loop
.clear_session_write_policy(&session_id)
.await;
state
.engine_loop
.clear_session_auto_approve_permissions(&session_id)
.await;
state
.clear_automation_v2_session_mcp_servers(&session_id)
.await;
state.clear_automation_v2_session(run_id, &session_id).await;
let mut recovered_after_prompt_timeout = false;
if let Err(error) = result {
if required_output_path.is_some() && automation_node_prompt_timeout_error(&error, node) {
recovered_after_prompt_timeout = true;
tracing::warn!(
run_id = %run_id,
node_id = %node.node_id,
error = %error,
"automation node prompt timed out after artifact-write attempt; reconciling required output before failing node"
);
} else {
let receipt_root = receipts::automation_attempt_receipts_root();
let payload = json!({
"automation_id": automation.automation_id,
"run_id": run_id,
"node_id": node.node_id,
"attempt": attempt,
"session_id": session_id,
"failure_class": "runtime_tool_or_provider_error",
"reason": error.to_string(),
"offered_tools": requested_tools,
"executed_tools": [],
"tool_error": error.to_string(),
"recoverable": true
});
if let Err(receipt_error) = append_automation_attempt_receipts(
&receipt_root,
run_id,
&node.node_id,
attempt,
&session_id,
&[AutomationAttemptReceiptEventInput {
event_type: "runtime_failure".to_string(),
payload,
}],
)
.await
{
tracing::warn!(
run_id = %run_id,
node_id = %node.node_id,
error = %receipt_error,
"failed to append automation runtime failure receipt"
);
}
return Err(error);
}
}
let expect_tool_activity = !requested_tools.is_empty();
let session = load_automation_session_after_run(state, &session_id, expect_tool_activity)
.await
.ok_or_else(|| anyhow::anyhow!("automation session `{}` missing after run", session_id))?;
let session_text = extract_session_text_output(&session);
let read_only_source_mutations =
read_only_source_snapshot_mutations(&workspace_root, &read_only_source_snapshot);
if !read_only_source_mutations.is_empty() {
let restored = revert_read_only_source_snapshot_mutations(
&workspace_root,
&read_only_source_snapshot,
&read_only_source_mutations,
);
let mutation_paths = read_only_source_mutations
.iter()
.filter_map(|value| value.get("path").and_then(Value::as_str))
.map(str::to_string)
.collect::<Vec<_>>();
anyhow::bail!(
"automation node `{}` attempted to modify read-only source files; restored {} file(s): {}",
node.node_id,
restored.len(),
mutation_paths.join(", ")
);
}
let verified_output = if let Some(output_path) = required_output_path.as_deref() {
let resolution = reconcile_automation_resolve_verified_output_path(
&session,
&workspace_root,
run_id,
node,
output_path,
250,
25,
)
.await?
.ok_or_else(|| {
anyhow::anyhow!(
"required output `{}` was not created for node `{}`",
output_path,
node.node_id
)
})?;
let resolved = resolution.path.clone();
if !resolved.is_file() {
anyhow::bail!(
"required output `{}` for node `{}` is not a file",
output_path,
node.node_id
);
}
let file_text = std::fs::read_to_string(&resolved).map_err(|error| {
anyhow::anyhow!(
"required output `{}` for node `{}` could not be read: {}",
output_path,
node.node_id,
error
)
})?;
let display_path = resolved
.strip_prefix(&workspace_root)
.ok()
.and_then(|value| value.to_str().map(str::to_string))
.filter(|value| !value.is_empty())
.unwrap_or_else(|| output_path.to_string());
Some((display_path, file_text, resolution))
} else {
None
};
let tool_telemetry = summarize_automation_tool_activity(node, &session, &requested_tools);
let mut tool_telemetry = tool_telemetry;
let verified_output_resolution = verified_output
.as_ref()
.map(|(_, _, resolution)| resolution.clone());
let verified_output_for_evidence = verified_output
.as_ref()
.map(|(path, text, _)| (path.clone(), text.clone()));
let base_attempt_evidence = node_output::build_automation_attempt_evidence(
node,
attempt,
&session,
&session_id,
&workspace_root,
&tool_telemetry,
&preflight,
&capability_resolution,
required_output_path.as_deref(),
verified_output_resolution.as_ref(),
verified_output_for_evidence.as_ref(),
);
if let Some(object) = tool_telemetry.as_object_mut() {
object.insert("node_attempt".to_string(), json!(attempt));
object.insert("node_max_attempts".to_string(), json!(max_attempts));
object.insert(
"node_attempts_remaining".to_string(),
json!(max_attempts.saturating_sub(attempt)),
);
object.insert("preflight".to_string(), preflight.clone());
object.insert(
"capability_resolution".to_string(),
capability_resolution.clone(),
);
object.insert(
"verified_output_materialized_by_current_attempt".to_string(),
json!(verified_output_resolution
.as_ref()
.map(|resolution| resolution.materialized_by_current_attempt)
.unwrap_or(false)),
);
object.insert(
"recovered_after_prompt_timeout".to_string(),
json!(recovered_after_prompt_timeout),
);
object.insert(
"attempt_evidence".to_string(),
base_attempt_evidence.clone(),
);
}
let upstream_evidence = if automation_node_uses_upstream_validation_evidence(node) {
Some(
collect_automation_upstream_research_evidence(
state,
automation,
&run,
node,
&workspace_root,
)
.await,
)
} else {
None
};
let verified_output = verified_output.map(|(path, text, _)| (path, text));
let (verified_output, mut artifact_validation, artifact_rejected_reason) =
validate_automation_artifact_output_with_context(
automation,
node,
&session,
&workspace_root,
Some(run_id.as_ref()),
Some(&runtime_values),
&session_text,
&tool_telemetry,
preexisting_output.as_deref(),
verified_output,
&workspace_snapshot_before,
upstream_evidence.as_ref(),
Some(&read_only_source_snapshot),
);
let _ = artifact_rejected_reason;
if let Some(promoted_from) = verified_output_resolution
.as_ref()
.and_then(|resolution| resolution.legacy_workspace_artifact_promoted_from.as_ref())
{
if let Some(object) = artifact_validation.as_object_mut() {
object.insert(
"legacy_workspace_artifact_promoted".to_string(),
json!(true),
);
object.insert(
"legacy_workspace_artifact_promoted_from".to_string(),
json!(promoted_from.to_string_lossy().to_string()),
);
object
.entry("accepted_candidate_source".to_string())
.or_insert_with(|| json!("legacy_workspace_artifact_promoted"));
}
}
let editorial_publish_block_reason = state
.get_automation_v2_run(run_id)
.await
.and_then(|run| automation_publish_editorial_block_reason(&run, node));
if let Some(reason) = editorial_publish_block_reason.as_ref() {
if let Some(object) = artifact_validation.as_object_mut() {
let unmet = object
.entry("unmet_requirements".to_string())
.or_insert_with(|| json!([]));
if let Some(rows) = unmet.as_array_mut() {
if !rows
.iter()
.any(|value| value.as_str() == Some("editorial_clearance_required"))
{
rows.push(json!("editorial_clearance_required"));
}
}
object
.entry("semantic_block_reason".to_string())
.or_insert_with(|| Value::String(reason.clone()));
}
}
let artifact_publication = if artifact_validation
.get("semantic_block_reason")
.and_then(Value::as_str)
.is_none()
{
if let Some(verified_output) = verified_output.as_ref() {
if let Some(spec) = automation_node_publish_spec(node) {
Some(
publish_automation_verified_output(
&workspace_root,
automation,
run_id,
node,
verified_output,
&spec,
)
.map_err(|error| {
anyhow::anyhow!(
"durable publication failed for node `{}` after validating `{}`: {}",
node.node_id,
verified_output.0,
error
)
})?,
)
} else if !automation.output_targets.is_empty()
&& automation_node_can_access_declared_output_targets(automation, node)
{
Some(
publish_automation_verified_outputs(
&workspace_root,
automation,
run_id,
node,
verified_output,
)
.map_err(|error| {
anyhow::anyhow!(
"durable publication failed for node `{}` after validating `{}`: {}",
node.node_id,
verified_output.0,
error
)
})?,
)
} else {
None
}
} else {
None
}
} else {
None
};
if let Some(publication) = artifact_publication.clone() {
if let Some(object) = artifact_validation.as_object_mut() {
object.insert("artifact_publication".to_string(), publication);
}
}
let (receipt_status, receipt_blocked_reason, receipt_approved) =
node_output::detect_automation_node_status(
node,
&session_text,
verified_output.as_ref(),
&tool_telemetry,
Some(&artifact_validation),
);
let receipt_blocker_category = node_output::detect_automation_blocker_category(
node,
&receipt_status,
receipt_blocked_reason.as_deref(),
&tool_telemetry,
Some(&artifact_validation),
);
let receipt_fallback_used =
automation_attempt_uses_legacy_fallback(&session_text, Some(&artifact_validation));
let receipt_validator_summary = node_output::build_automation_validator_summary(
automation_output_validator_kind(node),
&receipt_status,
receipt_blocked_reason.as_deref(),
Some(&artifact_validation),
);
let receipt_attempt_evidence = tool_telemetry
.get("attempt_evidence")
.cloned()
.map(|value| {
node_output::augment_automation_attempt_evidence_with_validation(
&value,
Some(&artifact_validation),
verified_output.as_ref(),
artifact_validation
.get("accepted_candidate_source")
.and_then(Value::as_str),
receipt_blocker_category.as_deref(),
receipt_fallback_used,
node_output::automation_backend_actionability_state(&receipt_status),
)
});
let receipt_telemetry_summary = json!({
"receipt_kind": "tool_telemetry_summary",
"automation_id": automation.automation_id,
"automation_run_id": run_id,
"context_run_id": format!("automation-v2-{run_id}"),
"node_id": node.node_id,
"attempt": attempt,
"session_id": session_id,
"preflight": preflight.clone(),
"capability_resolution": capability_resolution.clone(),
"tool_call_counts": tool_telemetry.get("tool_call_counts").cloned().unwrap_or_else(|| json!({})),
"web_research_used": tool_telemetry.get("web_research_used").cloned().unwrap_or_else(|| json!(false)),
"web_research_succeeded": tool_telemetry.get("web_research_succeeded").cloned().unwrap_or_else(|| json!(false)),
"latest_web_research_failure": tool_telemetry.get("latest_web_research_failure").cloned().unwrap_or(Value::Null),
"email_delivery_attempted": tool_telemetry.get("email_delivery_attempted").cloned().unwrap_or_else(|| json!(false)),
"email_delivery_succeeded": tool_telemetry.get("email_delivery_succeeded").cloned().unwrap_or_else(|| json!(false)),
"latest_email_delivery_failure": tool_telemetry.get("latest_email_delivery_failure").cloned().unwrap_or(Value::Null),
});
let receipt_attempt_summary = json!({
"receipt_kind": "attempt_summary",
"automation_id": automation.automation_id,
"automation_run_id": run_id,
"context_run_id": format!("automation-v2-{run_id}"),
"node_id": node.node_id,
"attempt": attempt,
"session_id": session_id,
"status": receipt_status,
"approved": receipt_approved,
"blocked_reason": receipt_blocked_reason,
"blocker_category": receipt_blocker_category,
"fallback_used": receipt_fallback_used,
"attempt_evidence": receipt_attempt_evidence,
});
let receipt_validation_summary = json!({
"receipt_kind": "validation_summary",
"automation_id": automation.automation_id,
"automation_run_id": run_id,
"context_run_id": format!("automation-v2-{run_id}"),
"node_id": node.node_id,
"attempt": attempt,
"session_id": session_id,
"validator_summary": receipt_validator_summary,
});
let mut receipt_events = collect_automation_attempt_receipt_events(
automation,
run_id,
node,
attempt,
&session_id,
&session,
verified_output.as_ref(),
verified_output_resolution.as_ref(),
required_output_path.as_deref(),
Some(&artifact_validation),
);
receipt_events.extend(vec![
AutomationAttemptReceiptEventInput {
event_type: "attempt_summary".to_string(),
payload: receipt_attempt_summary,
},
AutomationAttemptReceiptEventInput {
event_type: "tool_telemetry_summary".to_string(),
payload: receipt_telemetry_summary,
},
AutomationAttemptReceiptEventInput {
event_type: "validation_summary".to_string(),
payload: receipt_validation_summary,
},
]);
let receipt_root = receipts::automation_attempt_receipts_root();
let receipt_ledger = match append_automation_attempt_receipts(
&receipt_root,
run_id,
&node.node_id,
attempt,
&session_id,
&receipt_events,
)
.await
{
Ok(summary) => Some(serde_json::to_value(summary)?),
Err(error) => {
tracing::warn!(
run_id = %run_id,
node_id = %node.node_id,
attempt = attempt,
error = %error,
"failed to append automation attempt receipt ledger"
);
None
}
};
let receipt_timeline = receipt_ledger
.as_ref()
.and_then(|ledger| ledger.get("path").and_then(Value::as_str))
.map(PathBuf::from);
let receipt_timeline = match receipt_timeline {
Some(path) => receipts::read_automation_attempt_receipt_records(&path)
.await
.ok()
.map(|records| {
json!({
"record_count": records.len(),
"records": records,
})
}),
None => None,
};
let attempt_forensic_record = json!({
"version": 1,
"automation_id": automation.automation_id,
"automation_run_id": run_id,
"context_run_id": format!("automation-v2-{run_id}"),
"node_id": node.node_id,
"attempt": attempt,
"session_id": session_id,
"status": receipt_status,
"final_backend_actionability_state": node_output::automation_backend_actionability_state(&receipt_status),
"approved": receipt_approved,
"blocked_reason": receipt_blocked_reason,
"blocker_category": receipt_blocker_category,
"fallback_used": receipt_fallback_used,
"preflight": preflight.clone(),
"capability_resolution": capability_resolution.clone(),
"validator_summary": receipt_validator_summary,
"attempt_evidence": receipt_attempt_evidence.clone(),
"receipt_ledger": receipt_ledger.clone(),
"receipt_timeline": receipt_timeline.clone(),
});
let attempt_forensic_record_path = match receipts::persist_automation_attempt_forensic_record(
&workspace_root,
run_id,
&node.node_id,
attempt,
&attempt_forensic_record,
)
.await
{
Ok(path) => Some(path.to_string_lossy().to_string()),
Err(error) => {
tracing::warn!(
run_id = %run_id,
node_id = %node.node_id,
attempt = attempt,
error = %error,
"failed to persist automation attempt forensic record"
);
None
}
};
let external_actions = if editorial_publish_block_reason.is_some() {
Vec::new()
} else {
record_automation_external_actions_for_session(
state,
run_id,
automation,
node,
attempt,
&session_id,
&session,
)
.await?
};
let mut output = wrap_automation_node_output_with_automation(
automation,
node,
&session,
&requested_tools,
&session_id,
Some(run_id),
&session_text,
verified_output,
Some(artifact_validation),
);
let run_after = state.get_automation_v2_run(run_id).await.unwrap_or(run);
let cost_usd_delta = run_after.estimated_cost_usd - start_cost_usd;
let prompt_tokens_delta = run_after.prompt_tokens.saturating_sub(start_prompt_tokens);
let completion_tokens_delta = run_after
.completion_tokens
.saturating_sub(start_completion_tokens);
let budget_limit_reached = automation
.execution
.max_total_cost_usd
.map(|max| run_after.estimated_cost_usd >= max)
.unwrap_or(false);
let cost_provenance = automation_step_cost_provenance(
&node.node_id,
model.map(|m| m.model_id.clone()),
prompt_tokens_delta,
completion_tokens_delta,
cost_usd_delta,
run_after.estimated_cost_usd,
budget_limit_reached,
);
if let Some(object) = output.as_object_mut() {
object.insert("cost_provenance".to_string(), cost_provenance);
if let Some(knowledge_preflight) = knowledge_preflight.as_ref() {
object.insert(
"knowledge_preflight".to_string(),
serde_json::to_value(knowledge_preflight)?,
);
}
if let Some(publication) = artifact_publication {
object.insert("artifact_publication".to_string(), publication);
}
if let Some(receipt_timeline) = receipt_timeline.clone() {
object.insert("receipt_timeline".to_string(), receipt_timeline);
}
if let Some(receipt_ledger) = receipt_ledger {
if let Some(attempt_evidence) = object
.get_mut("attempt_evidence")
.and_then(Value::as_object_mut)
{
attempt_evidence.insert("receipt_ledger".to_string(), receipt_ledger);
if let Some(receipt_timeline) = receipt_timeline {
attempt_evidence.insert("receipt_timeline".to_string(), receipt_timeline);
}
}
}
if let Some(path) = attempt_forensic_record_path.clone() {
object.insert(
"attempt_forensic_record_path".to_string(),
json!(path.clone()),
);
if let Some(attempt_evidence) = object
.get_mut("attempt_evidence")
.and_then(Value::as_object_mut)
{
attempt_evidence.insert("forensic_record_path".to_string(), json!(path));
}
}
if !external_actions.is_empty() {
object.insert(
"external_actions".to_string(),
serde_json::to_value(&external_actions)?,
);
}
if is_agent_standup_automation(automation) && node.node_id == "standup_synthesis" {
let report_text = object
.get("content")
.and_then(|c| c.get("text").and_then(Value::as_str))
.or_else(|| {
object
.get("content")
.and_then(|c| c.get("raw_assistant_text").and_then(Value::as_str))
})
.unwrap_or(&session_text);
let read_paths = tool_telemetry
.get("read_paths")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(str::to_string)
.collect::<Vec<_>>()
})
.unwrap_or_default();
let assessment = assess_artifact_candidate(
node,
&workspace_root,
"session_write",
report_text,
&read_paths,
&[],
&[],
&[],
);
let assessment_summary = assessment::artifact_candidate_summary(&assessment, true);
object.insert("standup_assessment".to_string(), assessment_summary);
if assessment.score < 500 {
object.insert("standup_quality_warning".to_string(), json!(true));
tracing::warn!(
run_id = %run_id,
node_id = %node.node_id,
score = assessment.score,
substantive = assessment.substantive,
placeholder_like = assessment.placeholder_like,
"standup coordinator output scored below warning threshold (500); \
report may be low-quality"
);
}
if let Some(report_path) = standup_report_path.as_deref() {
if let Some(receipt_json) = build_standup_run_receipt(
&run_after,
automation,
run_id,
report_path,
&assessment,
) {
let receipt_path = standup_receipt_path_for_report(report_path);
let abs_receipt = PathBuf::from(&workspace_root).join(&receipt_path);
if let Some(parent) = abs_receipt.parent() {
let _ = std::fs::create_dir_all(parent);
}
match serde_json::to_string_pretty(&receipt_json) {
Ok(content) => match std::fs::write(&abs_receipt, &content) {
Ok(()) => {
object.insert(
"standup_receipt_path".to_string(),
json!(receipt_path),
);
}
Err(err) => {
tracing::warn!(
run_id = %run_id,
receipt_path = %receipt_path,
error = %err,
"failed to write standup run receipt"
);
}
},
Err(err) => {
tracing::warn!(
run_id = %run_id,
error = %err,
"failed to serialize standup run receipt"
);
}
}
}
}
}
}
read_only_source_snapshot_rollback.disarm();
Ok(output)
}
#[path = "../tasks.rs"]
pub mod tasks;
pub async fn run_automation_v2_executor(state: AppState) {
tasks::run_automation_v2_executor(state).await
}