use super::*;
fn automation_node_delivery_method_value(node: &AutomationFlowNode) -> String {
automation_node_delivery_method(node).unwrap_or_else(|| "none".to_string())
}
pub(crate) fn automation_output_session_id(output: &Value) -> Option<String> {
output
.get("content")
.and_then(Value::as_object)
.and_then(|content| {
content
.get("session_id")
.or_else(|| content.get("sessionId"))
.and_then(Value::as_str)
})
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
pub(crate) fn build_automation_pending_gate(
node: &AutomationFlowNode,
) -> Option<AutomationPendingGate> {
let gate = node.gate.as_ref()?;
Some(AutomationPendingGate {
node_id: node.node_id.clone(),
title: node
.metadata
.as_ref()
.and_then(|metadata| metadata.get("builder"))
.and_then(|builder| builder.get("title"))
.and_then(Value::as_str)
.unwrap_or(node.objective.as_str())
.to_string(),
instructions: gate.instructions.clone(),
decisions: gate.decisions.clone(),
rework_targets: gate.rework_targets.clone(),
requested_at_ms: now_ms(),
upstream_node_ids: node.depends_on.clone(),
})
}
fn automation_node_builder_metadata(node: &AutomationFlowNode, key: &str) -> Option<String> {
node.metadata
.as_ref()
.and_then(|metadata| metadata.get("builder"))
.and_then(|builder| builder.get(key))
.and_then(Value::as_str)
.map(str::to_string)
}
pub(crate) fn automation_node_research_stage(node: &AutomationFlowNode) -> Option<String> {
automation_node_builder_metadata(node, "research_stage")
.map(|value| value.trim().to_ascii_lowercase())
.filter(|value| !value.is_empty())
}
fn automation_node_is_research_finalize(node: &AutomationFlowNode) -> bool {
automation_node_research_stage(node).as_deref() == Some("research_finalize")
}
fn automation_node_builder_priority(node: &AutomationFlowNode) -> i32 {
node.metadata
.as_ref()
.and_then(|metadata| metadata.get("builder"))
.and_then(|builder| builder.get("priority"))
.and_then(Value::as_i64)
.and_then(|value| i32::try_from(value).ok())
.unwrap_or(0)
}
fn truncate_path_list_for_prompt(paths: Vec<String>, limit: usize) -> Vec<String> {
let mut deduped = normalize_non_empty_list(paths);
if deduped.len() > limit {
deduped.truncate(limit);
}
deduped
}
fn value_object_path_field(value: &Value, key: &str) -> Option<String> {
value
.get(key)
.and_then(Value::as_str)
.map(str::trim)
.filter(|path| !path.is_empty())
.map(str::to_string)
}
fn render_research_finalize_upstream_summary(upstream_inputs: &[Value]) -> Option<String> {
let source_inventory =
automation_upstream_output_for_alias(upstream_inputs, "source_inventory")
.and_then(automation_upstream_structured_handoff);
let local_source_notes =
automation_upstream_output_for_alias(upstream_inputs, "local_source_notes")
.and_then(automation_upstream_structured_handoff);
let external_research =
automation_upstream_output_for_alias(upstream_inputs, "external_research")
.and_then(automation_upstream_structured_handoff);
let discovered_files = source_inventory
.and_then(|handoff| handoff.get("discovered_paths"))
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(|row| match row {
Value::String(path) => Some(path.trim().to_string()),
Value::Object(_) => value_object_path_field(row, "path"),
_ => None,
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
let priority_files = source_inventory
.and_then(|handoff| handoff.get("priority_paths"))
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(|row| match row {
Value::String(path) => Some(path.trim().to_string()),
Value::Object(_) => value_object_path_field(row, "path"),
_ => None,
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
let files_reviewed = local_source_notes
.and_then(|handoff| handoff.get("files_reviewed"))
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(|row| match row {
Value::String(path) => Some(path.trim().to_string()),
Value::Object(_) => value_object_path_field(row, "path"),
_ => None,
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
let files_not_reviewed = local_source_notes
.and_then(|handoff| handoff.get("files_not_reviewed"))
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(|row| match row {
Value::String(path) => Some(path.trim().to_string()),
Value::Object(_) => value_object_path_field(row, "path"),
_ => None,
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
let web_sources_reviewed = external_research
.and_then(|handoff| handoff.get("sources_reviewed"))
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(|row| match row {
Value::String(path) => Some(path.trim().to_string()),
Value::Object(_) => value_object_path_field(row, "url")
.or_else(|| value_object_path_field(row, "path")),
_ => None,
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
let discovered_files = truncate_path_list_for_prompt(discovered_files, 12);
let priority_files = truncate_path_list_for_prompt(priority_files, 12);
let files_reviewed = truncate_path_list_for_prompt(files_reviewed, 12);
let files_not_reviewed = truncate_path_list_for_prompt(files_not_reviewed, 12);
let web_sources_reviewed = truncate_path_list_for_prompt(web_sources_reviewed, 8);
if discovered_files.is_empty()
&& priority_files.is_empty()
&& files_reviewed.is_empty()
&& files_not_reviewed.is_empty()
&& web_sources_reviewed.is_empty()
{
return None;
}
let list_or_none = |items: &[String]| {
if items.is_empty() {
"none recorded".to_string()
} else {
items
.iter()
.map(|item| format!("- `{}`", item))
.collect::<Vec<_>>()
.join("\n")
}
};
Some(format!(
"Research Coverage Summary:\nRelevant discovered files from upstream:\n{}\nPriority paths from upstream:\n{}\nUpstream files already reviewed:\n{}\nUpstream files already marked not reviewed:\n{}\nUpstream web sources reviewed:\n{}\nFinal brief rule: every relevant discovered file should appear in `Files reviewed` or `Files not reviewed`, and proof points must stay citation-backed.",
list_or_none(&discovered_files),
list_or_none(&priority_files),
list_or_none(&files_reviewed),
list_or_none(&files_not_reviewed),
list_or_none(&web_sources_reviewed),
))
}
fn split_research_template_config(template_id: &str) -> Option<SplitResearchTemplateConfig> {
match template_id {
"marketing-content-pipeline" => Some(SplitResearchTemplateConfig {
template_id: "marketing-content-pipeline",
final_node_id: "research-brief",
final_agent_id: "research",
discover_node_id: "research-discover-sources",
discover_agent_id: "research-discover",
discover_title: "Discover Sources",
discover_objective: "Enumerate the workspace, identify the relevant source corpus, and prioritize which local files must be read for the marketing brief.",
discover_display_name: "Research Discover",
local_node_id: "research-local-sources",
local_agent_id: "research-local-sources",
local_title: "Read Local Sources",
local_objective: "Read the prioritized local product and marketing files and produce source-backed notes for the brief.",
local_display_name: "Research Local Sources",
external_node_id: "research-external-research",
external_agent_id: "research-external",
external_title: "External Research",
external_objective: "Perform targeted external research that complements the local source notes and record what web evidence was gathered or unavailable.",
external_display_name: "Research External",
final_title: "Research Brief",
final_objective: "Write `marketing-brief.md` from the structured discovery, local source notes, and external research gathered earlier in the workflow.",
}),
"competitor-research-pipeline" => Some(SplitResearchTemplateConfig {
template_id: "competitor-research-pipeline",
final_node_id: "scan-market",
final_agent_id: "market-scan",
discover_node_id: "scan-market-discover",
discover_agent_id: "market-discover",
discover_title: "Discover Market Sources",
discover_objective: "Identify the local source corpus and file inventory that should guide the competitor scan.",
discover_display_name: "Market Discover",
local_node_id: "scan-market-local-sources",
local_agent_id: "market-local-sources",
local_title: "Read Market Sources",
local_objective: "Read the prioritized local competitor and strategy sources before external scanning.",
local_display_name: "Market Local Sources",
external_node_id: "scan-market-external-research",
external_agent_id: "market-external",
external_title: "Research Market",
external_objective: "Gather current external competitor evidence guided by the local market context.",
external_display_name: "Market External",
final_title: "Scan Market",
final_objective: "Synthesize the discovered local and external evidence into the final competitor scan.",
}),
"weekly-newsletter-builder" => Some(SplitResearchTemplateConfig {
template_id: "weekly-newsletter-builder",
final_node_id: "curate-issue",
final_agent_id: "curator",
discover_node_id: "curate-issue-discover",
discover_agent_id: "curator-discover",
discover_title: "Discover Issue Sources",
discover_objective: "Identify the local source corpus and candidate files that should feed this week's issue.",
discover_display_name: "Curator Discover",
local_node_id: "curate-issue-local-sources",
local_agent_id: "curator-local-sources",
local_title: "Read Issue Sources",
local_objective: "Read the prioritized local source files and extract the strongest issue candidates.",
local_display_name: "Curator Local Sources",
external_node_id: "curate-issue-external-research",
external_agent_id: "curator-external",
external_title: "Research Issue",
external_objective: "Gather timely external signals that should influence this week's issue.",
external_display_name: "Curator External",
final_title: "Curate Issue",
final_objective: "Curate the best items for this week's issue from the staged research handoffs.",
}),
"sales-prospecting-team" => Some(SplitResearchTemplateConfig {
template_id: "sales-prospecting-team",
final_node_id: "research-account",
final_agent_id: "account-research",
discover_node_id: "research-account-discover",
discover_agent_id: "account-discover",
discover_title: "Discover Account Sources",
discover_objective: "Identify the source corpus that should guide account research.",
discover_display_name: "Account Discover",
local_node_id: "research-account-local-sources",
local_agent_id: "account-local-sources",
local_title: "Read Account Sources",
local_objective: "Read the prioritized local account and ICP files before drafting the account brief.",
local_display_name: "Account Local Sources",
external_node_id: "research-account-external-research",
external_agent_id: "account-external",
external_title: "Research Account Externally",
external_objective: "Gather targeted external account context and buying signals to support the brief.",
external_display_name: "Account External",
final_title: "Research Account",
final_objective: "Prepare the final account brief from the staged discovery, local evidence, and external research.",
}),
_ => None,
}
}
fn studio_template_id(automation: &AutomationV2Spec) -> Option<String> {
automation
.metadata
.as_ref()
.and_then(|metadata| metadata.get("studio"))
.and_then(Value::as_object)
.and_then(|studio| {
studio
.get("template_id")
.or_else(|| studio.get("templateId"))
.and_then(Value::as_str)
})
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn split_research_stage_metadata(
title: &str,
role: &str,
prompt: String,
research_stage: &str,
output_path: Option<&str>,
required_tools: &[&str],
write_required: bool,
) -> Option<Value> {
let mut builder = serde_json::Map::new();
builder.insert("title".to_string(), json!(title));
builder.insert("role".to_string(), json!(role));
builder.insert("prompt".to_string(), json!(prompt));
builder.insert("research_stage".to_string(), json!(research_stage));
if let Some(path) = output_path {
builder.insert("output_path".to_string(), json!(path));
}
if !required_tools.is_empty() {
builder.insert("required_tools".to_string(), json!(required_tools));
}
if write_required {
builder.insert("write_required".to_string(), json!(true));
}
let mut studio = serde_json::Map::new();
studio.insert("research_stage".to_string(), json!(research_stage));
if let Some(path) = output_path {
studio.insert("output_path".to_string(), json!(path));
}
Some(json!({
"builder": Value::Object(builder),
"studio": Value::Object(studio),
}))
}
fn migrated_stage_agent(
base: &AutomationAgentProfile,
agent_id: &str,
display_name: &str,
allowlist: &[&str],
) -> AutomationAgentProfile {
let mut agent = base.clone();
agent.agent_id = agent_id.to_string();
agent.display_name = display_name.to_string();
agent.template_id = None;
agent.tool_policy.allowlist = config::channels::normalize_allowed_tools(
allowlist.iter().map(|value| (*value).to_string()).collect(),
);
agent.tool_policy.denylist =
config::channels::normalize_allowed_tools(agent.tool_policy.denylist.clone());
agent
}
fn migrate_split_research_studio_metadata(metadata: &mut Value) {
let Some(root) = metadata.as_object_mut() else {
return;
};
let studio = root
.entry("studio".to_string())
.or_insert_with(|| json!({}));
let Some(studio_obj) = studio.as_object_mut() else {
return;
};
studio_obj.insert("version".to_string(), json!(2));
studio_obj.insert("workflow_structure_version".to_string(), json!(2));
studio_obj.remove("agent_drafts");
studio_obj.remove("node_drafts");
studio_obj.remove("node_layout");
}
pub(crate) fn migrate_bundled_studio_research_split_automation(
automation: &mut AutomationV2Spec,
) -> bool {
let Some(template_id) = studio_template_id(automation) else {
return false;
};
let Some(config) = split_research_template_config(&template_id) else {
return false;
};
if automation
.flow
.nodes
.iter()
.any(|node| node.node_id == config.discover_node_id)
|| automation
.flow
.nodes
.iter()
.find(|node| node.node_id == config.final_node_id)
.is_some_and(automation_node_is_research_finalize)
{
if let Some(metadata) = automation.metadata.as_mut() {
migrate_split_research_studio_metadata(metadata);
}
return false;
}
let Some(final_node_index) = automation
.flow
.nodes
.iter()
.position(|node| node.node_id == config.final_node_id)
else {
return false;
};
let Some(base_agent) = automation
.agents
.iter()
.find(|agent| agent.agent_id == config.final_agent_id)
.cloned()
else {
return false;
};
let existing_final_node = automation.flow.nodes[final_node_index].clone();
let output_path = automation_node_required_output_path(&existing_final_node);
let final_contract_kind = existing_final_node
.output_contract
.as_ref()
.map(|contract| contract.kind.clone())
.unwrap_or_else(|| "artifact".to_string());
let final_is_brief_like = final_contract_kind.trim().eq_ignore_ascii_case("brief");
let final_summary_guidance = existing_final_node
.output_contract
.as_ref()
.and_then(|contract| contract.summary_guidance.clone());
let discover_prompt = "Enumerate the workspace, identify the relevant source corpus, and return a structured handoff with `workspace_inventory_summary`, `discovered_paths`, `priority_paths`, and `skipped_paths_initial`. If a curated source index such as `SOURCES.md` exists, read it first. Perform at least one concrete `read` before finishing, but read only enough to identify the corpus for the next stage. Do not write final workspace artifacts in this stage.".to_string();
let local_prompt = "Use the upstream `source_inventory` handoff to decide which concrete local files to read. Perform concrete `read` calls, extract the product or market facts supported by those reads, and return a structured handoff with `read_paths`, `reviewed_facts`, `files_reviewed`, `files_not_reviewed`, and `citations_local`. Do not invent facts from filenames alone.".to_string();
let external_prompt = "Use the upstream `source_inventory` and `local_source_notes` handoffs to guide targeted external research. Perform `websearch` and fetch result pages when snippets are not enough, then return `external_research_mode`, `queries_attempted`, `sources_reviewed`, `citations_external`, and `research_limitations`. If search is unavailable, record that limitation clearly instead of inventing evidence.".to_string();
let final_prompt = match config.template_id {
"marketing-content-pipeline" => "Use the upstream `source_inventory`, `local_source_notes`, and `external_research` handoffs as the source of truth. Read `marketing-brief.md` from disk only as a fallback or verification step. Synthesize the final marketing brief from those handoffs instead of repeating discovery or fresh web research in this stage. Include a workspace source audit, audience, positioning, proof points with citations, `Files reviewed`, `Files not reviewed`, and `Web sources reviewed`, and clearly note any research limitations. In source-audit sections, list only exact concrete workspace-relative file paths or exact reviewed URLs; do not use directory names, wildcard paths, or glob patterns.".to_string(),
"competitor-research-pipeline" => "Use the upstream `source_inventory`, `local_source_notes`, and `external_research` handoffs as the source of truth for the final competitor scan. Separate observed evidence from inference, keep the scan current and signal-focused, and do not rerun discovery or fresh web research in this stage.".to_string(),
"weekly-newsletter-builder" => "Use the upstream `source_inventory`, `local_source_notes`, and `external_research` handoffs to curate the final issue. Turn them into the final shortlist and section order without repeating discovery or fresh web research in this stage.".to_string(),
"sales-prospecting-team" => "Use the upstream `source_inventory`, `local_source_notes`, and `external_research` handoffs as the source of truth for the final account brief. Separate observed facts from hypotheses and do not rerun discovery or fresh web research in this stage.".to_string(),
_ => "Use the upstream `source_inventory`, `local_source_notes`, and `external_research` handoffs as the source of truth and synthesize the final artifact without repeating discovery or fresh web research in this stage.".to_string(),
};
let discover_node = AutomationFlowNode {
node_id: config.discover_node_id.to_string(),
agent_id: config.discover_agent_id.to_string(),
objective: config.discover_objective.to_string(),
knowledge: Default::default(),
depends_on: Vec::new(),
input_refs: Vec::new(),
output_contract: Some(AutomationFlowOutputContract {
kind: "structured_json".to_string(),
validator: Some(crate::AutomationOutputValidatorKind::StructuredJson),
enforcement: Some(crate::AutomationOutputEnforcement {
validation_profile: Some("local_research".to_string()),
required_tools: vec!["read".to_string()],
required_evidence: vec!["local_source_reads".to_string()],
required_sections: Vec::new(),
prewrite_gates: vec![
"workspace_inspection".to_string(),
"concrete_reads".to_string(),
],
retry_on_missing: vec![
"local_source_reads".to_string(),
"workspace_inspection".to_string(),
"concrete_reads".to_string(),
],
terminal_on: vec![
"tool_unavailable".to_string(),
"repair_budget_exhausted".to_string(),
],
repair_budget: Some(5),
session_text_recovery: Some("require_prewrite_satisfied".to_string()),
}),
schema: None,
summary_guidance: Some(
"Return a structured handoff in the final response instead of writing workspace files."
.to_string(),
),
}),
retry_policy: None,
timeout_ms: None,
max_tool_calls: None,
stage_kind: Some(AutomationNodeStageKind::Workstream),
gate: None,
metadata: split_research_stage_metadata(
config.discover_title,
"watcher",
discover_prompt,
"research_discover",
None,
&["glob", "read"],
false,
),
};
let local_node = AutomationFlowNode {
node_id: config.local_node_id.to_string(),
agent_id: config.local_agent_id.to_string(),
objective: config.local_objective.to_string(),
knowledge: Default::default(),
depends_on: vec![config.discover_node_id.to_string()],
input_refs: vec![AutomationFlowInputRef {
from_step_id: config.discover_node_id.to_string(),
alias: "source_inventory".to_string(),
}],
output_contract: Some(AutomationFlowOutputContract {
kind: "structured_json".to_string(),
validator: Some(crate::AutomationOutputValidatorKind::StructuredJson),
enforcement: Some(crate::AutomationOutputEnforcement {
validation_profile: Some("local_research".to_string()),
required_tools: vec!["read".to_string()],
required_evidence: vec!["local_source_reads".to_string()],
required_sections: Vec::new(),
prewrite_gates: vec!["concrete_reads".to_string()],
retry_on_missing: vec![
"local_source_reads".to_string(),
"concrete_reads".to_string(),
],
terminal_on: vec![
"tool_unavailable".to_string(),
"repair_budget_exhausted".to_string(),
],
repair_budget: Some(5),
session_text_recovery: Some("require_prewrite_satisfied".to_string()),
}),
schema: None,
summary_guidance: Some(
"Return a structured handoff backed by concrete local file reads.".to_string(),
),
}),
retry_policy: None,
timeout_ms: None,
max_tool_calls: None,
stage_kind: Some(AutomationNodeStageKind::Workstream),
gate: None,
metadata: split_research_stage_metadata(
config.local_title,
"watcher",
local_prompt,
"research_local_sources",
None,
&["read"],
false,
),
};
let external_node = AutomationFlowNode {
node_id: config.external_node_id.to_string(),
agent_id: config.external_agent_id.to_string(),
objective: config.external_objective.to_string(),
knowledge: Default::default(),
depends_on: vec![
config.discover_node_id.to_string(),
config.local_node_id.to_string(),
],
input_refs: vec![
AutomationFlowInputRef {
from_step_id: config.discover_node_id.to_string(),
alias: "source_inventory".to_string(),
},
AutomationFlowInputRef {
from_step_id: config.local_node_id.to_string(),
alias: "local_source_notes".to_string(),
},
],
output_contract: Some(AutomationFlowOutputContract {
kind: "structured_json".to_string(),
validator: Some(crate::AutomationOutputValidatorKind::StructuredJson),
enforcement: Some(crate::AutomationOutputEnforcement {
validation_profile: Some("external_research".to_string()),
required_tools: vec!["websearch".to_string()],
required_evidence: vec!["external_sources".to_string()],
required_sections: Vec::new(),
prewrite_gates: vec!["successful_web_research".to_string()],
retry_on_missing: vec![
"external_sources".to_string(),
"successful_web_research".to_string(),
],
terminal_on: vec![
"tool_unavailable".to_string(),
"repair_budget_exhausted".to_string(),
],
repair_budget: Some(5),
session_text_recovery: Some("require_prewrite_satisfied".to_string()),
}),
schema: None,
summary_guidance: Some(
"Return a structured handoff describing external research findings or limitations."
.to_string(),
),
}),
retry_policy: None,
timeout_ms: None,
max_tool_calls: None,
stage_kind: Some(AutomationNodeStageKind::Workstream),
gate: None,
metadata: split_research_stage_metadata(
config.external_title,
"watcher",
external_prompt,
"research_external_sources",
None,
&["websearch", "webfetch", "read"],
false,
),
};
let mut final_node = existing_final_node.clone();
final_node.objective = config.final_objective.to_string();
final_node.depends_on = vec![
config.discover_node_id.to_string(),
config.local_node_id.to_string(),
config.external_node_id.to_string(),
];
final_node.input_refs = vec![
AutomationFlowInputRef {
from_step_id: config.discover_node_id.to_string(),
alias: "source_inventory".to_string(),
},
AutomationFlowInputRef {
from_step_id: config.local_node_id.to_string(),
alias: "local_source_notes".to_string(),
},
AutomationFlowInputRef {
from_step_id: config.external_node_id.to_string(),
alias: "external_research".to_string(),
},
];
final_node.stage_kind = Some(AutomationNodeStageKind::Workstream);
final_node.output_contract = Some(AutomationFlowOutputContract {
kind: final_contract_kind,
validator: existing_final_node
.output_contract
.as_ref()
.and_then(|contract| contract.validator)
.or(if final_is_brief_like {
Some(crate::AutomationOutputValidatorKind::ResearchBrief)
} else {
None
}),
enforcement: Some(crate::AutomationOutputEnforcement {
validation_profile: Some("research_synthesis".to_string()),
required_tools: Vec::new(),
required_evidence: vec![
"local_source_reads".to_string(),
"external_sources".to_string(),
],
required_sections: if final_is_brief_like {
vec!["citations".to_string()]
} else {
Vec::new()
},
prewrite_gates: Vec::new(),
retry_on_missing: if final_is_brief_like {
vec![
"local_source_reads".to_string(),
"external_sources".to_string(),
"citations".to_string(),
]
} else {
vec![
"local_source_reads".to_string(),
"external_sources".to_string(),
]
},
terminal_on: vec![
"tool_unavailable".to_string(),
"repair_budget_exhausted".to_string(),
],
repair_budget: Some(5),
session_text_recovery: Some("require_prewrite_satisfied".to_string()),
}),
schema: existing_final_node
.output_contract
.as_ref()
.and_then(|contract| contract.schema.clone()),
summary_guidance: final_summary_guidance,
});
final_node.metadata = split_research_stage_metadata(
config.final_title,
"watcher",
final_prompt,
"research_finalize",
output_path.as_deref(),
&[],
output_path.is_some(),
);
let mut new_nodes = Vec::with_capacity(automation.flow.nodes.len() + 3);
let mut inserted = false;
for node in automation.flow.nodes.clone() {
if node.node_id == config.final_node_id {
new_nodes.push(discover_node.clone());
new_nodes.push(local_node.clone());
new_nodes.push(external_node.clone());
new_nodes.push(final_node.clone());
inserted = true;
} else if node.node_id != config.discover_node_id
&& node.node_id != config.local_node_id
&& node.node_id != config.external_node_id
{
new_nodes.push(node);
}
}
if !inserted {
return false;
}
automation.flow.nodes = new_nodes;
for candidate in [
migrated_stage_agent(
&base_agent,
config.discover_agent_id,
config.discover_display_name,
&["glob", "read"],
),
migrated_stage_agent(
&base_agent,
config.local_agent_id,
config.local_display_name,
&["read"],
),
migrated_stage_agent(
&base_agent,
config.external_agent_id,
config.external_display_name,
&["websearch", "webfetch", "read"],
),
] {
if !automation
.agents
.iter()
.any(|agent| agent.agent_id == candidate.agent_id)
{
automation.agents.push(candidate);
}
}
if let Some(final_agent) = automation
.agents
.iter_mut()
.find(|agent| agent.agent_id == config.final_agent_id)
{
final_agent.tool_policy.allowlist = config::channels::normalize_allowed_tools(vec![
"read".to_string(),
"write".to_string(),
]);
}
if let Some(metadata) = automation.metadata.as_mut() {
migrate_split_research_studio_metadata(metadata);
} else {
automation.metadata = Some(json!({
"studio": {
"template_id": config.template_id,
"version": 2,
"workflow_structure_version": 2
}
}));
}
true
}
fn automation_phase_execution_mode_map(
automation: &AutomationV2Spec,
) -> std::collections::HashMap<String, String> {
automation
.metadata
.as_ref()
.and_then(|metadata| metadata.get("mission"))
.and_then(|mission| mission.get("phases"))
.and_then(Value::as_array)
.map(|phases| {
phases
.iter()
.filter_map(|phase| {
let phase_id = phase.get("phase_id").and_then(Value::as_str)?.trim();
if phase_id.is_empty() {
return None;
}
let mode = phase
.get("execution_mode")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.unwrap_or("soft");
Some((phase_id.to_string(), mode.to_string()))
})
.collect::<std::collections::HashMap<_, _>>()
})
.unwrap_or_default()
}
pub(crate) fn automation_current_open_phase(
automation: &AutomationV2Spec,
run: &AutomationV2RunRecord,
) -> Option<(String, usize, String)> {
let phase_rank = automation_phase_rank_map(automation);
if phase_rank.is_empty() {
return None;
}
let phase_modes = automation_phase_execution_mode_map(automation);
let completed = run
.checkpoint
.completed_nodes
.iter()
.cloned()
.collect::<std::collections::HashSet<_>>();
automation
.flow
.nodes
.iter()
.filter(|node| !completed.contains(&node.node_id))
.filter_map(|node| {
automation_node_builder_metadata(node, "phase_id").and_then(|phase_id| {
phase_rank
.get(&phase_id)
.copied()
.map(|rank| (phase_id, rank))
})
})
.min_by_key(|(_, rank)| *rank)
.map(|(phase_id, rank)| {
let mode = phase_modes
.get(&phase_id)
.cloned()
.unwrap_or_else(|| "soft".to_string());
(phase_id, rank, mode)
})
}
pub(crate) fn automation_phase_rank_map(
automation: &AutomationV2Spec,
) -> std::collections::HashMap<String, usize> {
automation
.metadata
.as_ref()
.and_then(|metadata| metadata.get("mission"))
.and_then(|mission| mission.get("phases"))
.and_then(Value::as_array)
.map(|phases| {
phases
.iter()
.enumerate()
.filter_map(|(index, phase)| {
phase
.get("phase_id")
.and_then(Value::as_str)
.map(|phase_id| (phase_id.to_string(), index))
})
.collect::<std::collections::HashMap<_, _>>()
})
.unwrap_or_default()
}
pub(crate) fn automation_node_sort_key(
node: &AutomationFlowNode,
phase_rank: &std::collections::HashMap<String, usize>,
current_open_phase_rank: Option<usize>,
) -> (usize, usize, i32, String) {
let phase_order = automation_node_builder_metadata(node, "phase_id")
.as_ref()
.and_then(|phase_id| phase_rank.get(phase_id))
.copied()
.unwrap_or(usize::MAX / 2);
let open_phase_bias = current_open_phase_rank
.map(|open_rank| usize::from(phase_order != open_rank))
.unwrap_or(0);
(
open_phase_bias,
phase_order,
-automation_node_builder_priority(node),
node.node_id.clone(),
)
}
pub(crate) fn automation_filter_runnable_by_open_phase(
automation: &AutomationV2Spec,
run: &AutomationV2RunRecord,
runnable: Vec<AutomationFlowNode>,
) -> Vec<AutomationFlowNode> {
let Some((_, open_rank, _)) = automation_current_open_phase(automation, run) else {
return runnable;
};
let phase_rank = automation_phase_rank_map(automation);
let in_open_phase = runnable
.iter()
.filter(|node| {
automation_node_builder_metadata(node, "phase_id")
.as_ref()
.and_then(|phase_id| phase_rank.get(phase_id))
.copied()
== Some(open_rank)
})
.cloned()
.collect::<Vec<_>>();
if in_open_phase.is_empty() {
runnable
} else {
in_open_phase
}
}
fn normalize_write_scope_entries(scope: Option<String>) -> Vec<String> {
let Some(scope) = scope else {
return vec!["__repo__".to_string()];
};
let entries = scope
.split(|ch| matches!(ch, ',' | '\n' | ';'))
.map(str::trim)
.filter(|value| !value.is_empty())
.map(|value| value.trim_matches('/').to_ascii_lowercase())
.filter(|value| !value.is_empty())
.collect::<Vec<_>>();
if entries.is_empty() {
vec!["__repo__".to_string()]
} else {
entries
}
}
fn write_scope_entries_conflict(left: &[String], right: &[String]) -> bool {
left.iter().any(|a| {
right.iter().any(|b| {
a == "__repo__"
|| b == "__repo__"
|| a == b
|| a == "."
|| b == "."
|| a == "*"
|| b == "*"
|| a.starts_with(&format!("{}/", b))
|| b.starts_with(&format!("{}/", a))
})
})
}
pub(crate) fn automation_filter_runnable_by_write_scope_conflicts(
runnable: Vec<AutomationFlowNode>,
max_parallel: usize,
) -> Vec<AutomationFlowNode> {
if max_parallel <= 1 {
return runnable.into_iter().take(1).collect();
}
let mut selected = Vec::new();
let mut selected_scopes = Vec::<Vec<String>>::new();
for node in runnable {
let is_code = automation_node_is_code_workflow(&node);
let scope_entries = if is_code {
normalize_write_scope_entries(automation_node_write_scope(&node))
} else {
Vec::new()
};
let conflicts = is_code
&& selected.iter().enumerate().any(|(index, existing)| {
automation_node_is_code_workflow(existing)
&& write_scope_entries_conflict(&scope_entries, &selected_scopes[index])
});
if conflicts {
continue;
}
if is_code {
selected_scopes.push(scope_entries);
} else {
selected_scopes.push(Vec::new());
}
selected.push(node);
if selected.len() >= max_parallel {
break;
}
}
selected
}
pub(crate) fn automation_blocked_nodes(
automation: &AutomationV2Spec,
run: &AutomationV2RunRecord,
) -> Vec<String> {
let completed = run
.checkpoint
.completed_nodes
.iter()
.cloned()
.collect::<std::collections::HashSet<_>>();
let pending = run
.checkpoint
.pending_nodes
.iter()
.cloned()
.collect::<std::collections::HashSet<_>>();
let phase_rank = automation_phase_rank_map(automation);
let current_open_phase = automation_current_open_phase(automation, run);
automation
.flow
.nodes
.iter()
.filter(|node| pending.contains(&node.node_id))
.filter_map(|node| {
let missing_deps = node.depends_on.iter().any(|dep| !completed.contains(dep));
if missing_deps {
return Some(node.node_id.clone());
}
let Some((_, open_rank, mode)) = current_open_phase.as_ref() else {
return None;
};
if mode != "barrier" {
return None;
}
let node_phase_rank = automation_node_builder_metadata(node, "phase_id")
.as_ref()
.and_then(|phase_id| phase_rank.get(phase_id))
.copied();
if node_phase_rank.is_some_and(|rank| rank > *open_rank) {
return Some(node.node_id.clone());
}
None
})
.collect::<Vec<_>>()
}
pub(crate) fn record_automation_open_phase_event(
automation: &AutomationV2Spec,
run: &mut AutomationV2RunRecord,
) {
let Some((phase_id, phase_rank, execution_mode)) =
automation_current_open_phase(automation, run)
else {
return;
};
let last_recorded = run
.checkpoint
.lifecycle_history
.iter()
.rev()
.find(|entry| entry.event == "phase_opened")
.and_then(|entry| entry.metadata.as_ref())
.and_then(|metadata| metadata.get("phase_id"))
.and_then(Value::as_str)
.map(str::to_string);
if last_recorded.as_deref() == Some(phase_id.as_str()) {
return;
}
record_automation_lifecycle_event_with_metadata(
run,
"phase_opened",
Some(format!("phase `{}` is now open", phase_id)),
None,
Some(json!({
"phase_id": phase_id,
"phase_rank": phase_rank,
"execution_mode": execution_mode,
})),
);
}
pub fn refresh_automation_runtime_state(
automation: &AutomationV2Spec,
run: &mut AutomationV2RunRecord,
) {
run.checkpoint.blocked_nodes = automation_blocked_nodes(automation, run);
record_automation_open_phase_event(automation, run);
}
fn automation_mission_milestones(automation: &AutomationV2Spec) -> Vec<Value> {
automation
.metadata
.as_ref()
.and_then(|metadata| metadata.get("mission"))
.and_then(|mission| mission.get("milestones"))
.and_then(Value::as_array)
.cloned()
.unwrap_or_default()
}
fn completed_mission_milestones(
automation: &AutomationV2Spec,
run: &AutomationV2RunRecord,
) -> std::collections::HashSet<String> {
let completed = run
.checkpoint
.completed_nodes
.iter()
.cloned()
.collect::<std::collections::HashSet<_>>();
automation_mission_milestones(automation)
.iter()
.filter_map(|milestone| {
let milestone_id = milestone
.get("milestone_id")
.and_then(Value::as_str)?
.trim();
if milestone_id.is_empty() {
return None;
}
let required = milestone
.get("required_stage_ids")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.collect::<Vec<_>>()
})
.unwrap_or_default();
(!required.is_empty()
&& required
.iter()
.all(|stage_id| completed.contains(*stage_id)))
.then_some(milestone_id.to_string())
})
.collect()
}
pub(crate) fn record_milestone_promotions(
automation: &AutomationV2Spec,
row: &mut AutomationV2RunRecord,
promoted_by_node_id: &str,
) {
let already_recorded = row
.checkpoint
.lifecycle_history
.iter()
.filter(|entry| entry.event == "milestone_promoted")
.filter_map(|entry| {
entry.metadata.as_ref().and_then(|metadata| {
metadata
.get("milestone_id")
.and_then(Value::as_str)
.map(str::to_string)
})
})
.collect::<std::collections::HashSet<_>>();
let completed = completed_mission_milestones(automation, row);
for milestone in automation_mission_milestones(automation) {
let milestone_id = milestone
.get("milestone_id")
.and_then(Value::as_str)
.map(str::trim)
.unwrap_or_default();
if milestone_id.is_empty()
|| !completed.contains(milestone_id)
|| already_recorded.contains(milestone_id)
{
continue;
}
let title = milestone
.get("title")
.and_then(Value::as_str)
.map(str::trim)
.unwrap_or(milestone_id);
let phase_id = milestone
.get("phase_id")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty());
let required_stage_ids = milestone
.get("required_stage_ids")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
record_automation_lifecycle_event_with_metadata(
row,
"milestone_promoted",
Some(format!("milestone `{title}` promoted")),
None,
Some(json!({
"milestone_id": milestone_id,
"title": title,
"phase_id": phase_id,
"required_stage_ids": required_stage_ids,
"promoted_by_node_id": promoted_by_node_id,
})),
);
}
}
pub fn collect_automation_descendants(
automation: &AutomationV2Spec,
root_ids: &std::collections::HashSet<String>,
) -> std::collections::HashSet<String> {
let mut descendants = root_ids.clone();
let mut changed = true;
while changed {
changed = false;
for node in &automation.flow.nodes {
if descendants.contains(&node.node_id) {
continue;
}
if node.depends_on.iter().any(|dep| descendants.contains(dep)) {
descendants.insert(node.node_id.clone());
changed = true;
}
}
}
descendants
}
pub fn collect_automation_ancestors(
automation: &AutomationV2Spec,
node_id: &str,
) -> std::collections::HashSet<String> {
let mut ancestors = std::collections::HashSet::new();
let mut queue = vec![node_id.to_string()];
while let Some(current_id) = queue.pop() {
if let Some(node) = automation
.flow
.nodes
.iter()
.find(|n| n.node_id == current_id)
{
for dep in &node.depends_on {
if ancestors.insert(dep.clone()) {
queue.push(dep.clone());
}
}
}
}
ancestors
}