use crate::contracts::{research_output_contract_policy_seed, ProjectedOutputValidatorKind};
use crate::decomposition::{
derive_step_decomposition_hints, derive_workflow_decomposition_profile,
WorkflowDecompositionProfile,
};
pub const GENERATED_WORKFLOW_MAX_STEPS: usize = 8;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WorkflowTaskBudgetReport {
pub status: String,
pub max_generated_steps: usize,
pub generated_step_count: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub original_step_count: Option<usize>,
pub enforcement: String,
}
pub fn plan_save_options() -> Value {
json!({
"can_export_pack": true,
"can_save_skill": true
})
}
#[derive(Debug, Clone)]
pub struct PackBuilderExportOptions {
pub session_id: Option<String>,
pub thread_key: Option<String>,
pub auto_apply: bool,
}
pub fn pack_builder_schedule_value<M>(schedule: &AutomationV2Schedule<M>) -> Value {
match schedule.schedule_type {
AutomationV2ScheduleType::Cron => schedule
.cron_expression
.as_ref()
.map(|expression| {
json!({
"cron": expression,
"timezone": schedule.timezone,
})
})
.unwrap_or(Value::Null),
AutomationV2ScheduleType::Interval => json!({
"interval_seconds": schedule.interval_seconds.unwrap_or(86_400),
"timezone": schedule.timezone,
}),
AutomationV2ScheduleType::Manual => Value::Null,
}
}
pub fn pack_builder_export_goal<M, S>(plan: &WorkflowPlan<AutomationV2Schedule<M>, S>) -> String {
let goal = plan
.original_prompt
.trim()
.split_whitespace()
.collect::<Vec<_>>()
.join(" ");
if goal.is_empty() {
plan.title.clone()
} else {
goal
}
}
pub fn pack_builder_export_args<M, S>(
plan: &WorkflowPlan<AutomationV2Schedule<M>, S>,
options: &PackBuilderExportOptions,
) -> Value {
json!({
"mode": "preview",
"goal": pack_builder_export_goal(plan),
"__session_id": options.session_id,
"thread_key": options.thread_key,
"auto_apply": options.auto_apply,
"schedule": pack_builder_schedule_value(&plan.schedule),
})
}
pub fn workflow_plan_draft_record<Plan: Clone>(
plan: Plan,
plan_id: String,
planner_diagnostics: Option<Value>,
conversation_id: String,
created_at_ms: u64,
) -> WorkflowPlanDraftRecord<Plan> {
WorkflowPlanDraftRecord {
current_plan: plan.clone(),
plan_revision: 1,
conversation: WorkflowPlanConversation {
conversation_id,
plan_id,
created_at_ms,
updated_at_ms: created_at_ms,
messages: Vec::new(),
},
planner_diagnostics,
last_success_materialization: None,
initial_plan: plan,
review: None,
}
}
pub fn workflow_step_expects_web_research(step_id: &str, kind: &str, objective: &str) -> bool {
if workflow_step_allows_optional_web_research(objective) {
return false;
}
let lowered_step_id = step_id.trim().to_ascii_lowercase();
let lowered_kind = kind.trim().to_ascii_lowercase();
let lowered_objective = objective.trim().to_ascii_lowercase();
lowered_step_id.contains("research")
|| lowered_kind.contains("research")
|| lowered_objective.contains("web")
|| lowered_objective.contains("online")
|| lowered_objective.contains("current")
|| lowered_objective.contains("latest")
}
pub fn workflow_step_allows_optional_web_research(text: &str) -> bool {
let lowered = text.trim().to_ascii_lowercase();
if lowered.is_empty() {
return false;
}
let mentions_web = lowered.contains("web research")
|| lowered.contains("web_research")
|| lowered.contains("websearch")
|| lowered.contains("web fetch")
|| lowered.contains("web_fetch")
|| lowered.contains("web context")
|| lowered.contains("external context");
let has_optional_language = lowered.contains("only when useful")
|| lowered.contains("when useful")
|| lowered.contains("if useful")
|| lowered.contains("if needed")
|| lowered.contains("if no web context is needed")
|| lowered.contains("if no web context needed")
|| lowered.contains("no web context is needed")
|| lowered.contains("no web context needed")
|| lowered.contains("empty citations list")
|| lowered.contains("empty citations")
|| lowered.contains("do not replace reddit")
|| lowered.contains("do not replace the primary evidence");
mentions_web && has_optional_language
}
pub fn workflow_step_allows_optional_connector_references(text: &str) -> bool {
let lowered = text.trim().to_ascii_lowercase();
if lowered.is_empty() {
return false;
}
let mentions_connector = workflow_plan_mentions_connector_backed_sources(&lowered);
let has_optional_language = lowered.contains("as reference if needed")
|| lowered.contains("reference if needed")
|| lowered.contains("only when useful")
|| lowered.contains("when useful")
|| lowered.contains("if useful")
|| lowered.contains("if needed")
|| lowered.contains("if no context is needed")
|| lowered.contains("if no connector context is needed")
|| lowered.contains("empty citations list")
|| lowered.contains("empty citations")
|| lowered.contains("do not replace reddit")
|| lowered.contains("do not replace the primary evidence");
mentions_connector && has_optional_language
}
pub fn workflow_step_metadata_defaults(
step_id: &str,
kind: &str,
objective: &str,
validator_is_research_brief: bool,
) -> Option<Value> {
let expects_web_research = workflow_step_expects_web_research(step_id, kind, objective);
let mut builder = json!({
"builder": {
"knowledge": {
"enabled": true,
"reuse_mode": "preflight",
"trust_floor": "promoted",
"read_spaces": [{"scope": "project"}],
"promote_spaces": [{"scope": "project"}],
"subject": objective.trim(),
}
}
});
if validator_is_research_brief {
if let Some(builder_object) = builder.get_mut("builder").and_then(Value::as_object_mut) {
builder_object.insert(
"web_research_expected".to_string(),
Value::Bool(expects_web_research),
);
}
}
Some(builder)
}
pub fn workflow_step_enforcement_defaults(
step_id: &str,
kind: &str,
objective: &str,
validator_is_research_brief: bool,
) -> Option<Value> {
if !validator_is_research_brief {
return None;
}
let expects_web_research = workflow_step_expects_web_research(step_id, kind, objective);
let normalized_kind = kind.trim().to_ascii_lowercase();
serde_json::to_value(research_output_contract_policy_seed(
&normalized_kind,
expects_web_research,
tandem_core::prewrite_repair_retry_max_attempts() as u32,
))
.ok()
}
pub fn normalize_workflow_step_metadata<
Step,
StepId,
Kind,
Objective,
IsBrief,
IsEnforcementNone,
SetEnforcement,
Metadata,
SetMetadata,
>(
step: &mut Step,
step_id: StepId,
kind: Kind,
objective: Objective,
output_contract_is_research_brief: IsBrief,
output_contract_enforcement_is_none: IsEnforcementNone,
mut set_output_contract_enforcement_from_value: SetEnforcement,
metadata: Metadata,
mut set_metadata: SetMetadata,
) where
StepId: Fn(&Step) -> &str,
Kind: Fn(&Step) -> &str,
Objective: Fn(&Step) -> &str,
IsBrief: Fn(&Step) -> bool,
IsEnforcementNone: Fn(&Step) -> bool,
SetEnforcement: FnMut(&mut Step, Value),
Metadata: Fn(&Step) -> Option<&Value>,
SetMetadata: FnMut(&mut Step, Value),
{
let validator_is_research_brief = output_contract_is_research_brief(step);
if let Some(enforcement) = workflow_step_enforcement_defaults(
step_id(step),
kind(step),
objective(step),
validator_is_research_brief,
) {
if output_contract_enforcement_is_none(step) {
set_output_contract_enforcement_from_value(step, enforcement);
}
}
let defaults = workflow_step_metadata_defaults(
step_id(step),
kind(step),
objective(step),
validator_is_research_brief,
);
match (metadata(step).cloned(), defaults) {
(Some(mut metadata), Some(defaults)) => {
let Some(root) = metadata.as_object_mut() else {
set_metadata(step, defaults);
return;
};
let builder = root
.entry("builder".to_string())
.or_insert_with(|| json!({}));
let Some(builder_map) = builder.as_object_mut() else {
*builder = defaults
.get("builder")
.cloned()
.unwrap_or_else(|| json!({}));
set_metadata(step, metadata);
return;
};
if let Some(default_builder) = defaults.get("builder").and_then(Value::as_object) {
for (key, value) in default_builder {
builder_map
.entry(key.clone())
.or_insert_with(|| value.clone());
}
}
set_metadata(step, metadata);
}
(None, Some(defaults)) => {
set_metadata(step, defaults);
}
_ => {}
}
}
fn workflow_step_builder_map_mut(
step: &mut WorkflowPlanStep<impl WorkflowInputRefLike, impl Serialize>,
) -> Option<&mut serde_json::Map<String, Value>> {
let metadata = step.metadata.get_or_insert_with(|| json!({}));
let root = metadata.as_object_mut()?;
let builder = root
.entry("builder".to_string())
.or_insert_with(|| json!({}));
builder.as_object_mut()
}
fn workflow_step_builder_string_array(
step: &WorkflowPlanStep<impl WorkflowInputRefLike, impl Serialize>,
key: &str,
) -> Vec<String> {
step.metadata
.as_ref()
.and_then(|metadata| metadata.get("builder"))
.and_then(Value::as_object)
.and_then(|builder| builder.get(key))
.and_then(Value::as_array)
.map(|items| {
items
.iter()
.filter_map(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
.collect::<Vec<_>>()
})
.unwrap_or_default()
}
fn workflow_step_builder_string(
step: &WorkflowPlanStep<impl WorkflowInputRefLike, impl Serialize>,
key: &str,
) -> Option<String> {
step.metadata
.as_ref()
.and_then(|metadata| metadata.get("builder"))
.and_then(Value::as_object)
.and_then(|builder| builder.get(key))
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn workflow_step_decomposition_metadata_defaults(
step: &mut WorkflowPlanStep<impl WorkflowInputRefLike, impl Serialize>,
profile: &crate::decomposition::WorkflowDecompositionProfile,
step_index: usize,
step_count: usize,
) {
let step_id = step.step_id.clone();
let kind = step.kind.clone();
let objective = step.objective.clone();
let output_contract_kind = step
.output_contract
.as_ref()
.and_then(|contract| serde_json::to_value(contract).ok())
.and_then(|contract| contract.get("kind").cloned())
.and_then(|value| value.as_str().map(str::to_string));
let hints = derive_step_decomposition_hints(
&step_id,
&kind,
&objective,
output_contract_kind.as_deref(),
&step.depends_on,
step_index,
step_count,
profile,
);
let Some(builder) = workflow_step_builder_map_mut(step) else {
return;
};
builder
.entry("phase_id".to_string())
.or_insert_with(|| Value::String(hints.phase_id.clone()));
builder
.entry("task_class".to_string())
.or_insert_with(|| Value::String(hints.task_class.clone()));
builder
.entry("task_kind".to_string())
.or_insert_with(|| Value::String(hints.task_class.clone()));
builder
.entry("task_family".to_string())
.or_insert_with(|| Value::String(hints.task_family.clone()));
builder
.entry("retry_class".to_string())
.or_insert_with(|| Value::String(hints.retry_class.clone()));
if let Some(parent_step_id) = hints.parent_step_id {
builder
.entry("parent_step_id".to_string())
.or_insert_with(|| Value::String(parent_step_id));
}
}
pub fn derive_workflow_step_file_contracts<S, I, O>(
plan: &mut WorkflowPlan<S, WorkflowPlanStep<I, O>>,
) where
I: WorkflowInputRefLike + Serialize,
O: Serialize,
{
let output_files_by_step_id = plan
.steps
.iter()
.map(|step| {
let mut output_files = workflow_step_builder_string_array(step, "output_files");
if output_files.is_empty() {
if let Some(output_path) = workflow_step_builder_string(step, "output_path") {
output_files.push(output_path);
}
}
output_files.sort();
output_files.dedup();
(step.step_id.clone(), output_files)
})
.collect::<std::collections::HashMap<_, _>>();
for step in &mut plan.steps {
let explicit_input_files = step
.metadata
.as_ref()
.and_then(|metadata| metadata.get("builder"))
.and_then(Value::as_object)
.is_some_and(|builder| builder.contains_key("input_files"));
let explicit_output_files = step
.metadata
.as_ref()
.and_then(|metadata| metadata.get("builder"))
.and_then(Value::as_object)
.is_some_and(|builder| builder.contains_key("output_files"));
let inferred_input_files = step
.input_refs
.iter()
.flat_map(|input_ref| {
output_files_by_step_id
.get(input_ref.from_step_id())
.cloned()
.unwrap_or_default()
})
.collect::<BTreeSet<_>>()
.into_iter()
.collect::<Vec<_>>();
let inferred_output_files = output_files_by_step_id
.get(&step.step_id)
.cloned()
.unwrap_or_default();
let Some(builder) = workflow_step_builder_map_mut(step) else {
continue;
};
if !explicit_input_files && !inferred_input_files.is_empty() {
builder.insert("input_files".to_string(), json!(inferred_input_files));
}
if !explicit_output_files && !inferred_output_files.is_empty() {
builder.insert("output_files".to_string(), json!(inferred_output_files));
}
}
}
pub fn inferred_output_validator_kind(contract_kind: &str) -> ProjectedOutputValidatorKind {
match contract_kind.trim().to_ascii_lowercase().as_str() {
"brief" => ProjectedOutputValidatorKind::ResearchBrief,
"review" | "review_summary" | "approval_gate" => {
ProjectedOutputValidatorKind::ReviewDecision
}
"structured_json" => ProjectedOutputValidatorKind::StructuredJson,
"code_patch" => ProjectedOutputValidatorKind::CodePatch,
_ => ProjectedOutputValidatorKind::GenericArtifact,
}
}
pub fn output_contract_is_code_patch(
contract_kind: &str,
explicit_validator_key: Option<&str>,
) -> bool {
explicit_validator_key
.map(crate::contracts::projected_output_validator_kind_from_key)
.unwrap_or_else(|| inferred_output_validator_kind(contract_kind))
== ProjectedOutputValidatorKind::CodePatch
}
pub fn output_contract_is_research_brief(
contract_kind: &str,
explicit_validator_key: Option<&str>,
) -> bool {
explicit_validator_key
.map(crate::contracts::projected_output_validator_kind_from_key)
.unwrap_or_else(|| inferred_output_validator_kind(contract_kind))
== ProjectedOutputValidatorKind::ResearchBrief
}
pub fn plan_step_with_dep<InputRef, OutputContract>(
step_id: &str,
kind: &str,
objective: &str,
agent_role: &str,
depends_on: &[String],
input_refs: Vec<InputRef>,
output_contract: Option<OutputContract>,
metadata: Option<Value>,
) -> WorkflowPlanStep<InputRef, OutputContract> {
WorkflowPlanStep {
step_id: step_id.to_string(),
kind: kind.to_string(),
objective: objective.to_string(),
depends_on: depends_on.to_vec(),
agent_role: agent_role.to_string(),
input_refs,
output_contract,
metadata,
}
}
pub fn build_minimal_fallback_plan<S, Step>(
plan_id: &str,
planner_version: &str,
plan_source: &str,
prompt: &str,
normalized_prompt: &str,
title: String,
workspace_root: String,
schedule: S,
allowed_mcp_servers: Vec<String>,
operator_preferences: Option<Value>,
description: Option<String>,
fallback_step: Step,
) -> WorkflowPlan<S, Step> {
WorkflowPlan {
plan_id: plan_id.to_string(),
planner_version: planner_version.to_string(),
plan_source: plan_source.to_string(),
original_prompt: prompt.trim().to_string(),
normalized_prompt: normalized_prompt.to_string(),
confidence: "low".to_string(),
title,
description,
schedule,
execution_target: "automation_v2".to_string(),
workspace_root,
steps: vec![fallback_step],
requires_integrations: Vec::new(),
allowed_mcp_servers,
operator_preferences,
save_options: plan_save_options(),
}
}
pub fn workflow_plan_source_exempts_generated_task_budget(plan_source: &str) -> bool {
let lowered = plan_source.trim().to_ascii_lowercase();
!lowered.is_empty()
&& (lowered.contains("import")
|| lowered.contains("imported_bundle")
|| lowered.contains("workflow_studio")
|| lowered.contains("studio_manual")
|| lowered.contains("manual_author"))
}
pub fn workflow_plan_generated_task_budget_exceeded<S, Step>(plan: &WorkflowPlan<S, Step>) -> bool {
!workflow_plan_source_exempts_generated_task_budget(&plan.plan_source)
&& plan.steps.len() > GENERATED_WORKFLOW_MAX_STEPS
}
pub fn workflow_task_budget_report_value(
status: &str,
generated_step_count: usize,
original_step_count: Option<usize>,
enforcement: &str,
) -> Value {
serde_json::to_value(WorkflowTaskBudgetReport {
status: status.to_string(),
max_generated_steps: GENERATED_WORKFLOW_MAX_STEPS,
generated_step_count,
original_step_count,
enforcement: enforcement.to_string(),
})
.unwrap_or_else(|_| {
json!({
"status": status,
"max_generated_steps": GENERATED_WORKFLOW_MAX_STEPS,
"generated_step_count": generated_step_count,
"original_step_count": original_step_count,
"enforcement": enforcement,
})
})
}
pub fn workflow_task_budget_report_for_plan<S, Step>(
plan: &WorkflowPlan<S, Step>,
status_override: Option<&str>,
original_step_count: Option<usize>,
enforcement_override: Option<&str>,
) -> Value {
if workflow_plan_source_exempts_generated_task_budget(&plan.plan_source) {
return workflow_task_budget_report_value(
status_override.unwrap_or("exempt_manual"),
plan.steps.len(),
original_step_count,
enforcement_override.unwrap_or("exempt"),
);
}
let status = status_override.unwrap_or(if plan.steps.len() > GENERATED_WORKFLOW_MAX_STEPS {
"rejected"
} else {
"within_budget"
});
let enforcement = enforcement_override.unwrap_or(if status == "rejected" {
"rejected"
} else {
"accepted"
});
workflow_task_budget_report_value(status, plan.steps.len(), original_step_count, enforcement)
}
pub fn workflow_plan_decomposition_observation_with_task_budget<S, Step>(
profile: &WorkflowDecompositionProfile,
plan: &WorkflowPlan<S, Step>,
task_budget: Option<Value>,
) -> Value {
let mut observation =
crate::decomposition::workflow_plan_decomposition_observation(profile, plan.steps.len());
let budget =
task_budget.unwrap_or_else(|| workflow_task_budget_report_for_plan(plan, None, None, None));
if let Some(object) = observation.as_object_mut() {
object.insert("task_budget".to_string(), budget);
}
observation
}
fn text_has_any(text: &str, needles: &[&str]) -> bool {
needles.iter().any(|needle| text.contains(needle))
}
fn compact_bucket_for_step<I, O>(step: &WorkflowPlanStep<I, O>) -> &'static str {
let text = format!("{} {} {}", step.step_id, step.kind, step.objective).to_ascii_lowercase();
if text_has_any(
&text,
&[
"notion",
"database",
"collection://",
"create page",
"update page",
"verify page",
"page identifier",
"page url",
],
) {
return "create_and_verify_notion_page";
}
if text_has_any(&text, &["reddit", "subreddit", "community", "practitioner"]) {
return "gather_reddit_signals";
}
if text_has_any(&text, &["tandem", "docs", "documentation", "mcp docs"]) {
return "gather_tandem_docs";
}
if text_has_any(
&text,
&[
"web",
"websearch",
"web_fetch",
"market",
"vendor",
"platform",
"enterprise",
"source",
"sources",
],
) {
return "gather_market_sources";
}
if text_has_any(
&text,
&[
"synthes",
"draft",
"brief",
"summary",
"key finding",
"market notes",
"sources section",
"run details",
"final report",
],
) {
return "draft_market_brief";
}
if text_has_any(
&text,
&[
"verify", "validate", "review", "check", "quality", "complete",
],
) {
return "validate_result";
}
if text_has_any(
&text,
&[
"scope",
"criteria",
"confirm",
"inspect",
"destination",
"requirements",
],
) || step.depends_on.is_empty()
{
return "confirm_scope_and_destination";
}
"synthesize_work"
}
fn compact_bucket_label(bucket: &str) -> (&'static str, &'static str, &'static str) {
match bucket {
"confirm_scope_and_destination" => (
"assess",
"agent_research_planner",
"Confirm the workflow scope, destination, required deliverable sections, and available MCP/web research capabilities before research begins.",
),
"gather_tandem_docs" => (
"research",
"agent_docs_researcher",
"Use the connected Tandem MCP documentation tools to gather source-ready notes relevant to the requested topic.",
),
"gather_market_sources" => (
"research",
"agent_market_researcher",
"Use web research to gather current market coverage, vendor examples, operational practices, and source links for the requested topic.",
),
"gather_reddit_signals" => (
"research",
"agent_community_researcher",
"Use the connected Reddit MCP tools to collect relevant practitioner discussions, links, repeated concerns, and market signals.",
),
"draft_market_brief" => (
"synthesize",
"agent_brief_writer",
"Synthesize all upstream evidence into one concise brief; do not split each requested report section into separate workflow tasks.",
),
"create_and_verify_notion_page" => (
"deliver",
"agent_notion_operator",
"Create or update the requested destination item and verify the required sections plus final identifier or URL are captured.",
),
"validate_result" => (
"validate",
"agent_reviewer",
"Validate the completed deliverable against the user's requested scope, sections, source links, and destination.",
),
_ => (
"synthesize",
"agent_workflow_executor",
"Complete the remaining request-specific work using the upstream context and original user intent.",
),
}
}
fn prompt_collection_refs(prompt: &str) -> Vec<String> {
prompt
.split_whitespace()
.map(|token| {
token.trim_matches(|ch: char| ch == '"' || ch == '\'' || ch == ',' || ch == '.')
})
.filter(|token| token.starts_with("collection://"))
.map(str::to_string)
.collect()
}
fn prompt_requested_sections(prompt: &str) -> Vec<String> {
prompt
.lines()
.map(str::trim)
.filter_map(|line| {
line.strip_prefix("- ")
.or_else(|| line.strip_prefix("* "))
.map(str::trim)
})
.filter(|line| !line.is_empty() && line.len() <= 80)
.map(str::to_string)
.collect()
}
fn compact_objective<I, O>(
bucket: &str,
default_objective: &str,
original_steps: &[WorkflowPlanStep<I, O>],
prompt: &str,
) -> String {
let refs = prompt_collection_refs(prompt);
let sections = prompt_requested_sections(prompt);
let mut parts = vec![format!(
"Compact {} original planner task(s). {}",
original_steps.len(),
default_objective
)];
if !refs.is_empty()
&& matches!(
bucket,
"create_and_verify_notion_page" | "confirm_scope_and_destination"
)
{
parts.push(format!("Destination: {}.", refs.join(", ")));
}
if !sections.is_empty()
&& matches!(
bucket,
"draft_market_brief" | "create_and_verify_notion_page" | "validate_result"
)
{
parts.push(format!("Required sections: {}.", sections.join(", ")));
}
let original_intent = original_steps
.iter()
.take(4)
.map(|step| step.objective.trim())
.filter(|objective| !objective.is_empty())
.collect::<Vec<_>>();
if !original_intent.is_empty() {
parts.push(format!(
"Preserve source/tool intent from: {}.",
original_intent.join(" | ")
));
}
parts.join(" ")
}
fn compact_step_dependencies(bucket: &str, available: &[String]) -> Vec<String> {
let has = |id: &str| available.iter().any(|value| value == id);
match bucket {
"confirm_scope_and_destination" => Vec::new(),
"gather_tandem_docs" | "gather_market_sources" | "gather_reddit_signals" => {
if has("confirm_scope_and_destination") {
vec!["confirm_scope_and_destination".to_string()]
} else {
Vec::new()
}
}
"draft_market_brief" => available
.iter()
.filter(|id| {
matches!(
id.as_str(),
"gather_tandem_docs" | "gather_market_sources" | "gather_reddit_signals"
)
})
.cloned()
.collect(),
"create_and_verify_notion_page" => {
if has("draft_market_brief") {
vec!["draft_market_brief".to_string()]
} else {
available.last().cloned().into_iter().collect()
}
}
"validate_result" => {
if has("create_and_verify_notion_page") {
vec!["create_and_verify_notion_page".to_string()]
} else if has("draft_market_brief") {
vec!["draft_market_brief".to_string()]
} else {
available.last().cloned().into_iter().collect()
}
}
_ => available.last().cloned().into_iter().collect(),
}
}
pub fn compact_generated_workflow_plan_to_budget<M, I, O>(
mut plan: WorkflowPlan<AutomationV2Schedule<M>, WorkflowPlanStep<I, O>>,
profile: &WorkflowDecompositionProfile,
) -> (
WorkflowPlan<AutomationV2Schedule<M>, WorkflowPlanStep<I, O>>,
Option<Value>,
)
where
I: Clone + WorkflowInputRefLike,
O: Clone + Serialize,
{
if workflow_plan_source_exempts_generated_task_budget(&plan.plan_source)
|| plan.steps.len() <= GENERATED_WORKFLOW_MAX_STEPS
{
return (plan, None);
}
let original_step_count = plan.steps.len();
let original_steps = plan.steps.clone();
let bucket_order = [
"confirm_scope_and_destination",
"gather_tandem_docs",
"gather_market_sources",
"gather_reddit_signals",
"draft_market_brief",
"create_and_verify_notion_page",
"validate_result",
"synthesize_work",
];
let mut bucketed = std::collections::BTreeMap::<String, Vec<WorkflowPlanStep<I, O>>>::new();
for step in original_steps {
bucketed
.entry(compact_bucket_for_step(&step).to_string())
.or_default()
.push(step);
}
let prompt = plan.original_prompt.clone();
let mut next_steps = Vec::new();
for bucket in bucket_order {
let Some(steps) = bucketed.remove(bucket) else {
continue;
};
if next_steps.len() >= GENERATED_WORKFLOW_MAX_STEPS {
break;
}
let (kind, agent_role, default_objective) = compact_bucket_label(bucket);
let mut step = steps.first().cloned().unwrap_or_else(|| WorkflowPlanStep {
step_id: bucket.to_string(),
kind: kind.to_string(),
objective: default_objective.to_string(),
depends_on: Vec::new(),
agent_role: agent_role.to_string(),
input_refs: Vec::new(),
output_contract: None,
metadata: None,
});
step.step_id = bucket.to_string();
step.kind = kind.to_string();
step.agent_role = agent_role.to_string();
step.objective = compact_objective(bucket, default_objective, &steps, &prompt);
step.depends_on = compact_step_dependencies(
bucket,
&next_steps
.iter()
.map(|row: &WorkflowPlanStep<I, O>| row.step_id.clone())
.collect::<Vec<_>>(),
);
step.input_refs = Vec::new();
let original_ids = steps
.iter()
.map(|row| row.step_id.clone())
.collect::<Vec<_>>();
let mut metadata = step.metadata.take().unwrap_or_else(|| json!({}));
if !metadata.is_object() {
metadata = json!({});
}
let metadata_obj = metadata.as_object_mut().expect("metadata object");
let builder = metadata_obj
.entry("builder".to_string())
.or_insert_with(|| json!({}));
if !builder.is_object() {
*builder = json!({});
}
if let Some(builder_obj) = builder.as_object_mut() {
builder_obj.insert("compacted_from_step_ids".to_string(), json!(original_ids));
builder_obj.insert(
"compaction_bucket".to_string(),
Value::String(bucket.to_string()),
);
builder_obj.insert(
"task_budget".to_string(),
workflow_task_budget_report_value(
"compacted",
0,
Some(original_step_count),
"compacted",
),
);
}
step.metadata = Some(metadata);
next_steps.push(step);
}
if next_steps.is_empty() {
let report = workflow_task_budget_report_for_plan(
&plan,
Some("rejected"),
Some(original_step_count),
Some("rejected"),
);
return (plan, Some(report));
}
let compacted_step_count = next_steps.len();
let compacted_budget = workflow_task_budget_report_value(
"compacted",
compacted_step_count,
Some(original_step_count),
"compacted",
);
for (step_index, step) in next_steps.iter_mut().enumerate() {
if let Some(builder) = workflow_step_builder_map_mut(step) {
builder.insert("task_budget".to_string(), compacted_budget.clone());
}
workflow_step_decomposition_metadata_defaults(
step,
profile,
step_index,
compacted_step_count,
);
}
plan.steps = next_steps;
plan.description = Some(match plan.description.take() {
Some(description) if !description.trim().is_empty() => format!(
"{}\n\nPlanner compacted {} generated tasks into {} runnable workflow steps.",
description.trim(),
original_step_count,
compacted_step_count
),
_ => format!(
"Planner compacted {} generated tasks into {} runnable workflow steps.",
original_step_count, compacted_step_count
),
});
(plan, Some(compacted_budget))
}
pub fn plan_title(prompt: &str, schedule_type: &AutomationV2ScheduleType) -> String {
let trimmed = prompt
.trim()
.split_whitespace()
.collect::<Vec<_>>()
.join(" ");
if trimmed.is_empty() {
return "Automation Plan".to_string();
}
let base = if trimmed.len() > 64 {
format!("{}...", &trimmed[..61])
} else {
trimmed
};
match schedule_type {
AutomationV2ScheduleType::Manual => base,
_ => format!("Scheduled {}", base),
}
}
pub fn normalize_prompt(prompt: &str) -> String {
prompt
.trim()
.split_whitespace()
.collect::<Vec<_>>()
.join(" ")
.to_ascii_lowercase()
}
pub fn normalize_string_list(raw: Vec<String>) -> Vec<String> {
let mut values = raw
.into_iter()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.collect::<Vec<_>>();
values.sort();
values.dedup();
values
}
fn prompt_token_has_context(prompt: &str, token: &str, needles: &[&str]) -> bool {
let lowered_prompt = prompt.to_ascii_lowercase();
let lowered_token = token.to_ascii_lowercase();
if lowered_token.is_empty() {
return false;
}
let mut clauses = Vec::new();
let mut current = String::new();
let mut chars = lowered_prompt.chars().peekable();
while let Some(ch) = chars.next() {
current.push(ch);
let is_boundary = match ch {
'\n' | ';' | '!' | '?' => true,
'.' => chars.peek().is_none_or(|next| next.is_whitespace()),
_ => false,
};
if is_boundary {
let clause = current.trim();
if !clause.is_empty() {
clauses.push(clause.to_string());
}
current.clear();
}
}
let trailing = current.trim();
if !trailing.is_empty() {
clauses.push(trailing.to_string());
}
if clauses.into_iter().any(|clause| {
clause.contains(&lowered_token) && needles.iter().any(|needle| clause.contains(needle))
}) {
return true;
}
let lines = lowered_prompt.lines().collect::<Vec<_>>();
for (index, line) in lines.iter().enumerate() {
let trimmed = line.trim();
if !trimmed.contains(&lowered_token) {
continue;
}
if needles.iter().any(|needle| trimmed.contains(needle)) {
return true;
}
let start = index.saturating_sub(2);
let end = (index + 2).min(lines.len().saturating_sub(1));
for neighbor_index in start..=end {
if neighbor_index == index {
continue;
}
let neighbor = lines[neighbor_index].trim();
if neighbor.is_empty() {
continue;
}
if needles.iter().any(|needle| neighbor.contains(needle)) {
return true;
}
}
}
false
}
fn prompt_token_has_ordered_context(prompt: &str, token: &str, needles: &[&str]) -> bool {
let lowered_prompt = prompt.to_ascii_lowercase();
let lowered_token = token.to_ascii_lowercase();
if lowered_token.is_empty() {
return false;
}
let mut clauses = Vec::new();
let mut current = String::new();
let mut chars = lowered_prompt.chars().peekable();
while let Some(ch) = chars.next() {
current.push(ch);
let is_boundary = match ch {
'\n' | ';' | '!' | '?' => true,
'.' => chars.peek().is_none_or(|next| next.is_whitespace()),
_ => false,
};
if is_boundary {
let clause = current.trim();
if !clause.is_empty() {
clauses.push(clause.to_string());
}
current.clear();
}
}
let trailing = current.trim();
if !trailing.is_empty() {
clauses.push(trailing.to_string());
}
clauses.into_iter().any(|clause| {
let Some(token_index) = clause.find(&lowered_token) else {
return false;
};
needles
.iter()
.any(|needle| clause[token_index + lowered_token.len()..].contains(needle))
})
}
fn prompt_contains_write_intent(prompt: &str, token: &str) -> bool {
let lowered_prompt = prompt.to_ascii_lowercase();
let lowered_token = token.to_ascii_lowercase();
if lowered_token.is_empty() {
return false;
}
let direct_patterns = [
format!("write {}", lowered_token),
format!("save {}", lowered_token),
format!("create {}", lowered_token),
format!("create or append to {}", lowered_token),
format!("create or append {}", lowered_token),
format!("append to {}", lowered_token),
format!("append {}", lowered_token),
format!("update {}", lowered_token),
format!("generate {}", lowered_token),
format!("produce {}", lowered_token),
format!("emit {}", lowered_token),
format!("store {}", lowered_token),
format!("export {}", lowered_token),
format!("publish {}", lowered_token),
format!("record {}", lowered_token),
format!("materialize {}", lowered_token),
format!("output {}", lowered_token),
format!("artifact {}", lowered_token),
format!("report {}", lowered_token),
format!("file {}", lowered_token),
];
if direct_patterns
.iter()
.any(|pattern| lowered_prompt.contains(pattern))
{
return true;
}
let existence_patterns = [
format!("if {} does not exist", lowered_token),
format!("if {} does not already exist", lowered_token),
format!("if {} is missing", lowered_token),
format!("when {} does not exist", lowered_token),
format!("when {} is missing", lowered_token),
];
if existence_patterns
.iter()
.any(|pattern| lowered_prompt.contains(pattern))
{
return [
"create it",
"create one",
"create the file",
"write it",
"save it",
"append it",
]
.iter()
.any(|pattern| lowered_prompt.contains(pattern));
}
prompt_token_has_context(
prompt,
token,
&[
"write",
"save",
"create",
"create or append",
"append",
"update",
"generate",
"produce",
"emit",
"store",
"export",
"publish",
"record",
"materialize",
"output",
"artifact",
"report",
"file",
],
)
}
fn prompt_contains_read_only_intent(prompt: &str, token: &str) -> bool {
let lowered_prompt = prompt.to_ascii_lowercase();
let lowered_token = token.to_ascii_lowercase();
if lowered_token.is_empty() {
return false;
}
[
format!("read {}", lowered_token),
format!("read from {}", lowered_token),
format!("only read from {}", lowered_token),
format!("read only from {}", lowered_token),
format!("inspect {}", lowered_token),
format!("review {}", lowered_token),
format!("open {}", lowered_token),
format!("never edit {}", lowered_token),
format!("do not edit {}", lowered_token),
format!("don't edit {}", lowered_token),
format!("do not modify {}", lowered_token),
format!("don't modify {}", lowered_token),
format!("do not rewrite {}", lowered_token),
format!("don't rewrite {}", lowered_token),
format!("do not rename {}", lowered_token),
format!("don't rename {}", lowered_token),
format!("do not move {}", lowered_token),
format!("don't move {}", lowered_token),
format!("do not delete {}", lowered_token),
format!("don't delete {}", lowered_token),
format!("{} as the source of truth", lowered_token),
format!("{} as source of truth", lowered_token),
format!("{} is the source of truth", lowered_token),
format!("{} is source of truth", lowered_token),
format!("keep {} untouched", lowered_token),
format!("leave {} untouched", lowered_token),
format!("must remain untouched {}", lowered_token),
]
.iter()
.any(|pattern| lowered_prompt.contains(pattern))
|| prompt_token_has_ordered_context(prompt, token, &["source of truth"])
}
pub fn infer_explicit_output_targets(prompt: &str) -> Vec<String> {
let mut targets = BTreeSet::new();
for raw_token in prompt.split_whitespace() {
let token = raw_token
.trim_matches(|ch: char| {
matches!(
ch,
'"' | '\'' | '`' | '(' | ')' | '[' | ']' | '{' | '}' | ',' | ';' | ':'
)
})
.trim_end_matches(|ch: char| matches!(ch, '.' | '!' | '?'))
.trim();
if token.is_empty() || token.contains("://") {
continue;
}
let path = Path::new(token);
let has_extension = path
.extension()
.and_then(|value| value.to_str())
.is_some_and(|value| !value.is_empty());
let looks_like_path = token.starts_with('/')
|| token.starts_with("./")
|| token.starts_with("../")
|| token.contains('/');
if !has_extension {
continue;
}
if prompt_contains_read_only_intent(prompt, token) {
continue;
}
if looks_like_path || prompt_contains_write_intent(prompt, token) {
targets.insert(token.to_string());
}
}
targets.into_iter().collect()
}
pub fn infer_read_only_source_paths(prompt: &str) -> Vec<String> {
let mut sources = BTreeSet::new();
for raw_token in prompt.split_whitespace() {
let token = raw_token
.trim_matches(|ch: char| {
matches!(
ch,
'"' | '\'' | '`' | '(' | ')' | '[' | ']' | '{' | '}' | ',' | ';' | ':'
)
})
.trim_end_matches(|ch: char| matches!(ch, '.' | '!' | '?'))
.trim();
if token.is_empty() || token.contains("://") {
continue;
}
let path = Path::new(token);
let has_extension = path
.extension()
.and_then(|value| value.to_str())
.is_some_and(|value| !value.is_empty());
if !has_extension || !prompt_contains_read_only_intent(prompt, token) {
continue;
}
sources.insert(token.to_string());
}
sources.into_iter().collect()
}
pub fn workflow_plan_mentions_connector_backed_sources(prompt: &str) -> bool {
let lowered = prompt.trim().to_ascii_lowercase();
if lowered.is_empty() {
return false;
}
[
"mcp",
"reddit",
"github issue",
"github issues",
"slack",
"jira",
"linear",
"notion",
"confluence",
"zendesk",
"salesforce",
"airtable",
"google drive",
"google docs",
"google sheets",
"gmail",
"outlook",
"sharepoint",
"dropbox",
"discord",
"intercom",
"figma",
]
.iter()
.any(|needle| lowered.contains(needle))
}
pub fn workflow_plan_mentions_web_research_tools(prompt: &str) -> bool {
let lowered = prompt.trim().to_ascii_lowercase();
if lowered.is_empty() {
return false;
}
[
"websearch",
"web search",
"webfetch",
"web fetch",
"search the web",
"browse the web",
"browser search",
"browser",
]
.iter()
.any(|needle| lowered.contains(needle))
}
pub fn workflow_plan_mentions_email_delivery(prompt: &str) -> bool {
let lowered = prompt.trim().to_ascii_lowercase();
if lowered.is_empty() {
return false;
}
let explicit_email = lowered.contains("email")
|| lowered.contains("mail ")
|| lowered.contains("mailing")
|| lowered.contains("inbox")
|| lowered.contains("send a draft")
|| lowered.contains("send a reply")
|| lowered.contains("send an email")
|| lowered.contains("send email");
let delivery_context = lowered.contains("send")
|| lowered.contains("deliver")
|| lowered.contains("notify")
|| lowered.contains("reply")
|| lowered.contains("draft");
explicit_email && delivery_context
}
pub fn workflow_plan_should_surface_mcp_discovery(
prompt: &str,
allowed_mcp_servers: &[String],
) -> bool {
!allowed_mcp_servers.is_empty() || workflow_plan_mentions_connector_backed_sources(prompt)
}
fn normalized_parallel_agent_count(
execution_mode: Option<&str>,
max_parallel_agents: Option<u64>,
) -> Option<u32> {
let execution_mode = execution_mode
.map(str::trim)
.map(str::to_ascii_lowercase)
.filter(|value| !value.is_empty());
match execution_mode.as_deref() {
Some("single") => Some(1),
Some("team") => Some(
max_parallel_agents
.map(|value| value.clamp(2, 16) as u32)
.unwrap_or(2),
),
Some("swarm") => Some(
max_parallel_agents
.map(|value| value.clamp(4, 16) as u32)
.unwrap_or(4),
),
Some(_) => max_parallel_agents
.map(|value| value.clamp(1, 16) as u32)
.or(Some(1)),
None => max_parallel_agents.map(|value| value.clamp(1, 16) as u32),
}
}
pub fn plan_max_parallel_agents(operator_preferences: Option<&Value>) -> u32 {
let execution_mode = operator_preferences
.and_then(|prefs| prefs.get("execution_mode"))
.and_then(Value::as_str);
let explicit_max = operator_preferences
.and_then(|prefs| prefs.get("max_parallel_agents"))
.and_then(Value::as_u64);
normalized_parallel_agent_count(execution_mode, explicit_max).unwrap_or(1)
}
pub fn workflow_plan_agent_roles<Step, F>(steps: &[Step], agent_role: F) -> Vec<String>
where
F: Fn(&Step) -> &str,
{
let mut roles = BTreeSet::new();
for step in steps {
let role = agent_role(step).trim();
if !role.is_empty() {
roles.insert(role.to_string());
}
}
roles.into_iter().collect()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlannerMcpServerToolSet {
pub server: String,
#[serde(default)]
pub tool_names: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlannerMcpServerCapabilitySummary {
pub server: String,
pub tool_count: usize,
#[serde(default)]
pub capabilities: Vec<String>,
#[serde(default)]
pub sample_tools: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlannerCapabilitySummary {
#[serde(default)]
pub built_in_capabilities: Vec<String>,
#[serde(default)]
pub mcp_servers: Vec<PlannerMcpServerCapabilitySummary>,
}
pub fn build_planner_capability_summary(server_tools: &[PlannerMcpServerToolSet]) -> Value {
let mut servers = Vec::new();
for server in server_tools {
let mut capabilities = BTreeSet::new();
let mut sample_tools = Vec::new();
for tool_name in server.tool_names.iter().take(8) {
let tool_name = tool_name.trim().to_string();
if !tool_name.is_empty() {
sample_tools.push(tool_name.clone());
}
let lower = tool_name.to_ascii_lowercase();
if lower.contains("gmail_send_email") || lower.contains("send_email") {
capabilities.insert("email_send".to_string());
}
if lower.contains("gmail_send_draft") || lower.contains("send_draft") {
capabilities.insert("email_draft".to_string());
}
if lower.contains("reddit") {
capabilities.insert("reddit_research".to_string());
}
if lower.contains("search") {
capabilities.insert("search".to_string());
}
if lower.contains("docs") || lower.contains("document") {
capabilities.insert("docs".to_string());
}
if lower.contains("slack") {
capabilities.insert("slack_delivery".to_string());
}
}
servers.push(PlannerMcpServerCapabilitySummary {
server: server.server.clone(),
tool_count: server.tool_names.len(),
capabilities: capabilities.into_iter().collect(),
sample_tools,
});
}
json!(PlannerCapabilitySummary {
built_in_capabilities: vec![
"web_research".to_string(),
"web_fetch".to_string(),
"workspace_read".to_string(),
],
mcp_servers: servers,
})
}
pub fn normalize_operator_preferences(raw: Option<Value>) -> Option<Value> {
let Some(mut prefs) = raw else {
return None;
};
let Some(map) = prefs.as_object_mut() else {
return None;
};
let execution_mode = map
.get("execution_mode")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(|value| value.to_ascii_lowercase());
if let Some(mode) = execution_mode.as_ref() {
map.insert("execution_mode".to_string(), Value::String(mode.clone()));
} else {
map.remove("execution_mode");
}
let max_parallel = map.get("max_parallel_agents").and_then(Value::as_u64);
match normalized_parallel_agent_count(execution_mode.as_deref(), max_parallel) {
Some(value) => {
map.insert(
"max_parallel_agents".to_string(),
Value::Number(serde_json::Number::from(value)),
);
}
None => {
map.remove("max_parallel_agents");
}
}
if let Some(provider_id) = map
.get("model_provider")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
{
map.insert(
"model_provider".to_string(),
Value::String(provider_id.to_string()),
);
} else {
map.remove("model_provider");
}
if let Some(model_id) = map
.get("model_id")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
{
map.insert("model_id".to_string(), Value::String(model_id.to_string()));
} else {
map.remove("model_id");
}
if let Some(role_models) = map.get_mut("role_models").and_then(Value::as_object_mut) {
let invalid_role_keys = role_models
.iter()
.filter_map(|(key, value)| {
let role = value.as_object()?;
let provider_id = role
.get("provider_id")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty());
let model_id = role
.get("model_id")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty());
if provider_id.is_some() && model_id.is_some() {
None
} else {
Some(key.clone())
}
})
.collect::<Vec<_>>();
for key in invalid_role_keys {
role_models.remove(&key);
}
if role_models.is_empty() {
map.remove("role_models");
}
} else {
map.remove("role_models");
}
if map.is_empty() {
None
} else {
Some(prefs)
}
}
pub fn merge_create_operator_preferences(
explicit: Option<&Value>,
candidate: Option<Value>,
) -> Option<Value> {
let candidate = normalize_operator_preferences(candidate);
let explicit = normalize_operator_preferences(explicit.cloned());
match (candidate, explicit) {
(None, None) => None,
(Some(candidate), None) => Some(candidate),
(None, Some(explicit)) => Some(explicit),
(Some(candidate), Some(explicit)) => {
let mut merged = candidate.as_object().cloned().unwrap_or_default();
for (key, value) in explicit.as_object().cloned().unwrap_or_default() {
merged.insert(key, value);
}
normalize_operator_preferences(Some(Value::Object(merged)))
}
}
}
pub fn extract_json_value_from_text(text: &str) -> Option<Value> {
serde_json::from_str(text.trim())
.ok()
.or_else(|| {
text.split("```").find_map(|chunk| {
let trimmed = chunk.trim();
if trimmed.starts_with('{') || trimmed.starts_with('[') {
serde_json::from_str(trimmed).ok()
} else if let Some(rest) = trimmed.strip_prefix("json") {
let rest = rest.trim();
(rest.starts_with('{') || rest.starts_with('['))
.then(|| serde_json::from_str(rest).ok())
.flatten()
} else {
None
}
})
})
.or_else(|| {
extract_balanced_json_fragment(text)
.and_then(|fragment| serde_json::from_str::<Value>(fragment).ok())
})
}
fn extract_balanced_json_fragment(text: &str) -> Option<&str> {
let mut start = None;
let mut stack: Vec<char> = Vec::new();
let mut in_string = false;
let mut escape = false;
for (idx, ch) in text.char_indices() {
if start.is_none() {
if ch == '{' || ch == '[' {
start = Some(idx);
stack.push(ch);
}
continue;
}
if in_string {
if escape {
escape = false;
continue;
}
match ch {
'\\' => escape = true,
'"' => in_string = false,
_ => {}
}
continue;
}
match ch {
'"' => in_string = true,
'{' | '[' => stack.push(ch),
'}' | ']' => {
let Some(open) = stack.pop() else {
return None;
};
let is_match = matches!((open, ch), ('{', '}') | ('[', ']'));
if !is_match {
return None;
}
if stack.is_empty() {
let start = start?;
return Some(&text[start..idx + ch.len_utf8()]);
}
}
_ => {}
}
}
None
}
pub fn decode_planner_plan_value<Plan: DeserializeOwned>(value: Value) -> Option<Plan> {
serde_json::from_value::<Plan>(value.clone())
.ok()
.or_else(|| decode_planner_plan_value_relaxed(value))
}
pub fn decode_planner_plan_value_relaxed<Plan: DeserializeOwned>(mut value: Value) -> Option<Plan> {
let plan = value.as_object_mut()?;
plan.entry("plan_id".to_string())
.or_insert_with(|| Value::String(String::new()));
plan.entry("planner_version".to_string())
.or_insert_with(|| Value::String(String::new()));
plan.entry("plan_source".to_string())
.or_insert_with(|| Value::String(String::new()));
plan.entry("original_prompt".to_string())
.or_insert_with(|| Value::String(String::new()));
plan.entry("normalized_prompt".to_string())
.or_insert_with(|| Value::String(String::new()));
plan.entry("confidence".to_string())
.or_insert_with(|| Value::String("medium".to_string()));
plan.entry("title".to_string())
.or_insert_with(|| Value::String(String::new()));
plan.entry("save_options".to_string())
.or_insert_with(|| json!({}));
plan.entry("requires_integrations".to_string())
.or_insert_with(|| json!([]));
plan.entry("allowed_mcp_servers".to_string())
.or_insert_with(|| json!([]));
let steps = plan.get_mut("steps")?.as_array_mut()?;
for step in steps.iter_mut() {
let Some(step_obj) = step.as_object_mut() else {
continue;
};
if !step_obj.contains_key("step_id") {
if let Some(id) = step_obj.get("id").cloned() {
step_obj.insert("step_id".to_string(), id);
}
}
if !step_obj.contains_key("kind") {
if let Some(kind) = step_obj.get("type").cloned() {
step_obj.insert("kind".to_string(), kind);
}
}
if !step_obj.contains_key("objective") {
let objective = step_obj
.get("objective")
.and_then(Value::as_str)
.map(str::to_string)
.or_else(|| {
step_obj
.get("config")
.and_then(|row| row.get("objective"))
.and_then(Value::as_str)
.map(str::to_string)
})
.or_else(|| {
step_obj
.get("label")
.and_then(Value::as_str)
.map(str::to_string)
});
if let Some(objective) = objective {
step_obj.insert("objective".to_string(), Value::String(objective));
}
}
if !step_obj.contains_key("agent_role") {
let agent_role = step_obj
.get("agent_role")
.and_then(Value::as_str)
.map(str::to_string)
.or_else(|| {
step_obj
.get("config")
.and_then(|row| row.get("agent_role"))
.and_then(Value::as_str)
.map(str::to_string)
})
.unwrap_or_else(|| "worker".to_string());
step_obj.insert("agent_role".to_string(), Value::String(agent_role));
}
if let Some(input_refs) = step_obj.get_mut("input_refs").and_then(Value::as_array_mut) {
for input_ref in input_refs.iter_mut() {
match input_ref {
Value::String(from_step_id) => {
*input_ref = json!({
"from_step_id": from_step_id,
"alias": from_step_id
});
}
Value::Object(map) => {
if !map.contains_key("from_step_id") {
if let Some(value) = map
.get("from")
.cloned()
.or_else(|| map.get("step_id").cloned())
.or_else(|| map.get("id").cloned())
{
map.insert("from_step_id".to_string(), value);
}
}
if !map.contains_key("alias") {
if let Some(from_step_id) = map
.get("from_step_id")
.and_then(Value::as_str)
.map(str::to_string)
{
map.insert("alias".to_string(), Value::String(from_step_id));
}
}
}
_ => {}
}
}
}
if !step_obj.contains_key("output_contract") {
let inferred_kind = step_obj
.get("config")
.and_then(|row| row.get("format"))
.and_then(Value::as_str)
.and_then(|format| match format.trim().to_ascii_lowercase().as_str() {
"markdown" | "md" => Some("report_markdown".to_string()),
"json" => Some("structured_json".to_string()),
"text" | "summary" => Some("text_summary".to_string()),
_ => None,
});
if let Some(kind) = inferred_kind {
step_obj.insert(
"output_contract".to_string(),
json!({
"kind": kind
}),
);
}
}
}
serde_json::from_value::<Plan>(value).ok()
}
pub fn manual_schedule<M: Clone>(timezone: String, misfire_policy: M) -> AutomationV2Schedule<M> {
AutomationV2Schedule {
schedule_type: AutomationV2ScheduleType::Manual,
cron_expression: None,
interval_seconds: None,
timezone,
misfire_policy,
}
}
pub fn schedule_from_value<M: Clone>(
value: &Value,
default_misfire_policy: M,
) -> Option<AutomationV2Schedule<M>> {
let timezone = value
.get("timezone")
.and_then(Value::as_str)
.unwrap_or("UTC")
.to_string();
if let Some(schedule_type) = value.get("type").and_then(Value::as_str) {
match schedule_type.trim().to_ascii_lowercase().as_str() {
"cron" => {
let expr = value
.get("cron_expression")
.or_else(|| value.get("cronExpression"))
.and_then(Value::as_str)?
.trim()
.to_string();
if expr.is_empty() {
return None;
}
return Some(AutomationV2Schedule {
schedule_type: AutomationV2ScheduleType::Cron,
cron_expression: Some(expr),
interval_seconds: None,
timezone,
misfire_policy: default_misfire_policy,
});
}
"interval" => {
let seconds = value
.get("interval_seconds")
.or_else(|| value.get("intervalSeconds"))
.and_then(Value::as_u64)?;
return Some(AutomationV2Schedule {
schedule_type: AutomationV2ScheduleType::Interval,
cron_expression: None,
interval_seconds: Some(seconds),
timezone,
misfire_policy: default_misfire_policy,
});
}
"manual" => {
return Some(AutomationV2Schedule {
schedule_type: AutomationV2ScheduleType::Manual,
cron_expression: None,
interval_seconds: None,
timezone,
misfire_policy: default_misfire_policy,
});
}
_ => {}
}
}
if let Some(expr) = value
.get("cron")
.and_then(|cron| {
cron.get("expression")
.or_else(|| cron.get("cron_expression"))
})
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
{
return Some(AutomationV2Schedule {
schedule_type: AutomationV2ScheduleType::Cron,
cron_expression: Some(expr.to_string()),
interval_seconds: None,
timezone,
misfire_policy: default_misfire_policy,
});
}
let seconds = value.get("interval_seconds").and_then(|row| {
row.get("seconds")
.and_then(Value::as_u64)
.or_else(|| row.as_u64())
});
seconds.map(|seconds| AutomationV2Schedule {
schedule_type: AutomationV2ScheduleType::Interval,
cron_expression: None,
interval_seconds: Some(seconds),
timezone,
misfire_policy: default_misfire_policy,
})
}
pub fn workflow_steps_equal<I: Serialize, O: Serialize>(
left: &[WorkflowPlanStep<I, O>],
right: &[WorkflowPlanStep<I, O>],
) -> bool {
serde_json::to_value(left).ok() == serde_json::to_value(right).ok()
}
pub fn workflow_schedule_equal<M: Serialize>(
left: &AutomationV2Schedule<M>,
right: &AutomationV2Schedule<M>,
) -> bool {
serde_json::to_value(left).ok() == serde_json::to_value(right).ok()
}
pub trait WorkflowInputRefLike {
fn from_step_id(&self) -> &str;
}
pub fn validate_workflow_plan<S, I, O>(
plan: &WorkflowPlan<S, WorkflowPlanStep<I, O>>,
) -> Result<(), String>
where
I: WorkflowInputRefLike,
{
if plan.execution_target.trim() != "automation_v2" {
return Err("execution_target must be automation_v2".to_string());
}
normalize_absolute_workspace_root(&plan.workspace_root)?;
let normalized_step_ids = plan
.steps
.iter()
.map(|step| normalize_workflow_step_id(step.step_id.as_str()))
.collect::<Vec<_>>();
if normalized_step_ids.is_empty() {
return Err("workflow plan must include at least one step".to_string());
}
let mut step_ids = std::collections::HashSet::new();
for (step, normalized_step_id) in plan.steps.iter().zip(normalized_step_ids.iter()) {
if !workflow_step_id_has_supported_shape(normalized_step_id) {
return Err(format!("invalid workflow step id `{}`", step.step_id));
}
if !step_ids.insert(normalized_step_id.clone()) {
return Err(format!(
"workflow step id `{}` duplicates another step after normalization",
step.step_id
));
}
}
for step in &plan.steps {
for dep in &step.depends_on {
if !step_ids.contains(&normalize_workflow_step_id(dep.as_str())) {
return Err(format!(
"workflow step `{}` depends on unknown step `{}`",
step.step_id, dep
));
}
}
for input in &step.input_refs {
if !step_ids.contains(&normalize_workflow_step_id(input.from_step_id())) {
return Err(format!(
"workflow step `{}` references unknown input step `{}`",
step.step_id,
input.from_step_id()
));
}
}
}
Ok(())
}
pub const WORKFLOW_STEP_ID_EXAMPLES: &[&str] = &[
"assess",
"collect_inputs",
"summarize_inputs",
"organize_workstreams",
"gather_supporting_sources",
"refine_results",
"draft_deliverable",
"deliver_summary",
"finalize_outputs",
];
pub fn workflow_step_is_triage(step_id: &str, kind: &str) -> bool {
let lowered_id = step_id.trim().to_ascii_lowercase();
let lowered_kind = kind.trim().to_ascii_lowercase();
lowered_id.contains("assess")
|| lowered_id.contains("triage")
|| lowered_kind.contains("assess")
|| lowered_kind.contains("triage")
}
fn normalize_workflow_step_id(raw: &str) -> String {
let mut out = String::new();
let mut previous_underscore = false;
for ch in raw.trim().chars() {
if ch.is_ascii_alphanumeric() {
out.push(ch.to_ascii_lowercase());
previous_underscore = false;
} else if !previous_underscore {
out.push('_');
previous_underscore = true;
}
}
out.trim_matches('_').to_string()
}
fn workflow_step_id_has_supported_shape(normalized_step_id: &str) -> bool {
if normalized_step_id.is_empty() || normalized_step_id.len() > 80 {
return false;
}
if normalized_step_id.ends_with('_') || normalized_step_id.contains("__") {
return false;
}
let mut chars = normalized_step_id.chars();
let Some(first) = chars.next() else {
return false;
};
if !first.is_ascii_lowercase() {
return false;
}
chars.all(|ch| ch.is_ascii_lowercase() || ch.is_ascii_digit() || ch == '_')
}
pub fn normalize_absolute_workspace_root(raw: &str) -> Result<String, String> {
let trimmed = raw.trim();
if trimmed.is_empty() {
return Err("workspace_root is required".to_string());
}
let as_path = PathBuf::from(trimmed);
if !as_path.is_absolute() {
return Err("workspace_root must be an absolute path".to_string());
}
tandem_core::normalize_workspace_path(trimmed)
.ok_or_else(|| "workspace_root is invalid".to_string())
}
pub fn resolve_workspace_root_candidate(
requested: Option<&str>,
default_root: &str,
cwd: Option<&str>,
) -> Result<String, String> {
let requested = requested.map(str::trim).filter(|value| !value.is_empty());
if let Some(workspace_root) = requested {
return normalize_absolute_workspace_root(workspace_root);
}
match normalize_absolute_workspace_root(default_root) {
Ok(normalized) => Ok(normalized),
Err(error) => {
#[cfg(unix)]
{
if default_root.starts_with('\\') {
let unix_like = default_root.replace('\\', "/");
return normalize_absolute_workspace_root(&unix_like);
}
}
let cwd = cwd.ok_or(error.clone())?;
normalize_absolute_workspace_root(cwd)
}
}
}
pub fn normalize_and_validate_planner_plan<M, I, O>(
mut candidate: WorkflowPlan<AutomationV2Schedule<M>, WorkflowPlanStep<I, O>>,
ctx: &PlannerPlanNormalizationContext<'_, M>,
mut normalize_step: impl FnMut(&mut WorkflowPlanStep<I, O>),
) -> Result<WorkflowPlan<AutomationV2Schedule<M>, WorkflowPlanStep<I, O>>, String>
where
M: Clone,
I: WorkflowInputRefLike,
O: Serialize,
{
candidate.plan_id = ctx.plan_id.to_string();
candidate.planner_version = ctx.planner_version.to_string();
candidate.plan_source = ctx.plan_source.to_string();
candidate.original_prompt = ctx.original_prompt.to_string();
candidate.normalized_prompt = ctx.normalized_prompt.to_string();
candidate.execution_target = "automation_v2".to_string();
candidate.requires_integrations = normalize_string_list(candidate.requires_integrations);
candidate.description = candidate.description.and_then(|value| {
let trimmed = value.trim();
(!trimmed.is_empty()).then_some(trimmed.to_string())
});
candidate.confidence = match candidate.confidence.trim().to_ascii_lowercase().as_str() {
"low" | "medium" | "high" => candidate.confidence.trim().to_ascii_lowercase(),
_ => "medium".to_string(),
};
candidate.title = {
let trimmed = candidate.title.trim();
if trimmed.is_empty() {
plan_title(ctx.original_prompt, &candidate.schedule.schedule_type)
} else {
truncate_text(trimmed, 120)
}
};
candidate.save_options = if candidate.save_options.is_object() {
candidate.save_options
} else {
plan_save_options()
};
match ctx.mode {
PlannerPlanMode::Create => {
candidate.workspace_root = ctx.resolved_workspace_root.to_string();
candidate.allowed_mcp_servers = ctx.request_allowed_mcp_servers.to_vec();
candidate.operator_preferences = merge_create_operator_preferences(
ctx.request_operator_preferences,
candidate.operator_preferences.take(),
);
if let Some(explicit_schedule) = ctx.explicit_schedule {
candidate.schedule = explicit_schedule.clone();
}
}
PlannerPlanMode::Revise => {
candidate.workspace_root =
normalize_absolute_workspace_root(&candidate.workspace_root)?;
candidate.allowed_mcp_servers = normalize_string_list(candidate.allowed_mcp_servers);
candidate.operator_preferences =
normalize_operator_preferences(candidate.operator_preferences.take());
}
}
let decomposition_profile = derive_workflow_decomposition_profile(
ctx.original_prompt,
&candidate.allowed_mcp_servers,
&infer_explicit_output_targets(ctx.original_prompt),
!matches!(
&candidate.schedule.schedule_type,
AutomationV2ScheduleType::Manual
),
);
let step_count = candidate.steps.len();
for step in &mut candidate.steps {
normalize_step(step);
}
for (step_index, step) in candidate.steps.iter_mut().enumerate() {
workflow_step_decomposition_metadata_defaults(
step,
&decomposition_profile,
step_index,
step_count,
);
}
validate_workflow_plan(&candidate)?;
Ok(candidate)
}
pub enum PlannerPlanMode {
Create,
Revise,
}
pub struct PlannerPlanNormalizationContext<'a, M> {
pub mode: PlannerPlanMode,
pub plan_id: &'a str,
pub planner_version: &'a str,
pub plan_source: &'a str,
pub original_prompt: &'a str,
pub normalized_prompt: &'a str,
pub resolved_workspace_root: &'a str,
pub explicit_schedule: Option<&'a AutomationV2Schedule<M>>,
pub request_allowed_mcp_servers: &'a [String],
pub request_operator_preferences: Option<&'a Value>,
}
pub fn normalize_mcp_server_namespace(raw: &str) -> String {
let mut out = String::new();
let mut previous_underscore = false;
for ch in raw.trim().chars() {
if ch.is_ascii_alphanumeric() {
out.push(ch.to_ascii_lowercase());
previous_underscore = false;
} else if !previous_underscore {
out.push('_');
previous_underscore = true;
}
}
let cleaned = out.trim_matches('_');
if cleaned.is_empty() {
"server".to_string()
} else {
cleaned.to_string()
}
}
pub fn compile_workflow_agent_tool_allowlist(
allowed_mcp_servers: &[String],
operator_preferences: Option<&Value>,
normalize_allowed_tools: impl Fn(Vec<String>) -> Vec<String>,
) -> Vec<String> {
let custom_allowlist = operator_preferences
.and_then(|prefs| prefs.get("tool_allowlist"))
.and_then(Value::as_array)
.map(|items| {
items
.iter()
.filter_map(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
})
.unwrap_or_default();
let tool_access_mode = operator_preferences
.and_then(|prefs| prefs.get("tool_access_mode"))
.and_then(Value::as_str)
.map(str::trim)
.unwrap_or("all");
let mut allowlist = if tool_access_mode == "custom" {
custom_allowlist
} else {
vec!["*".to_string()]
};
for server in allowed_mcp_servers {
let namespace = normalize_mcp_server_namespace(server);
allowlist.push(format!("mcp.{namespace}.*"));
}
normalize_allowed_tools(allowlist)
}
pub fn planner_model_spec(operator_preferences: Option<&Value>) -> Option<tandem_types::ModelSpec> {
let prefs = operator_preferences?;
let default_provider = prefs
.get("model_provider")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string);
let default_model = prefs
.get("model_id")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string);
if let Some(planner_model) = prefs
.get("role_models")
.and_then(|row| row.get("planner"))
.and_then(Value::as_object)
{
let provider_id = planner_model
.get("provider_id")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty());
let model_id = planner_model
.get("model_id")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty());
if let (Some(provider_id), Some(model_id)) = (provider_id, model_id) {
return Some(tandem_types::ModelSpec {
provider_id: provider_id.to_string(),
model_id: model_id.to_string(),
});
}
}
match (default_provider, default_model) {
(Some(provider_id), Some(model_id)) => Some(tandem_types::ModelSpec {
provider_id,
model_id,
}),
_ => None,
}
}
pub fn compile_operator_model_policy(operator_preferences: Option<&Value>) -> Option<Value> {
let prefs = operator_preferences?;
let provider_id = prefs
.get("model_provider")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty());
let model_id = prefs
.get("model_id")
.and_then(Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty());
let role_models = prefs
.get("role_models")
.cloned()
.filter(|value| value.is_object());
let mut payload = serde_json::Map::new();
if let (Some(provider_id), Some(model_id)) = (provider_id, model_id) {
payload.insert(
"default_model".to_string(),
json!({
"provider_id": provider_id,
"model_id": model_id,
}),
);
}
if let Some(role_models) = role_models {
payload.insert("role_models".to_string(), role_models);
}
if payload.is_empty() {
None
} else {
Some(Value::Object(payload))
}
}
pub fn agent_id_for_role(role: &str) -> String {
format!("agent_{}", role.trim().replace([' ', '-'], "_"))
}
pub fn display_name_for_role(role: &str) -> String {
role.split(['_', '-', ' '])
.filter(|part| !part.is_empty())
.map(|part| {
let mut chars = part.chars();
match chars.next() {
Some(first) => format!("{}{}", first.to_ascii_uppercase(), chars.as_str()),
None => String::new(),
}
})
.collect::<Vec<_>>()
.join(" ")
}
pub(crate) fn planner_llm_provider_unconfigured_hint(provider_id: &str) -> String {
format!(
"The configured planner model uses provider `{provider_id}`, but that provider is not configured on this engine. Configure the provider first and try again."
)
}
pub(crate) fn planner_diagnostics(
reason: Option<&str>,
detail: Option<String>,
decomposition_observation: Option<Value>,
) -> Option<Value> {
let mut payload = serde_json::Map::new();
if let Some(reason) = reason.map(str::trim).filter(|value| !value.is_empty()) {
payload.insert(
"fallback_reason".to_string(),
Value::String(reason.to_string()),
);
}
if let Some(detail) = detail
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
{
payload.insert("detail".to_string(), Value::String(detail));
}
if let Some(decomposition_observation) = decomposition_observation {
if let Some(observation) = decomposition_observation.as_object() {
for (key, value) in observation {
payload.insert(key.clone(), value.clone());
}
} else {
payload.insert(
"decomposition_profile".to_string(),
decomposition_observation,
);
}
}
if payload.is_empty() {
None
} else {
Some(Value::Object(payload))
}
}
pub(crate) fn truncate_text(input: &str, max_len: usize) -> String {
if max_len == 0 {
return String::new();
}
if input.len() <= max_len {
return input.to_string();
}
let mut end = 0usize;
for (idx, ch) in input.char_indices() {
let next = idx + ch.len_utf8();
if next > max_len {
break;
}
end = next;
}
let mut out = input[..end].to_string();
out
}