#[derive(Clone)]
pub struct PackBuilderTool {
state: AppState,
plans: Arc<RwLock<HashMap<String, PreparedPlan>>>,
plans_path: PathBuf,
last_plan_by_session: Arc<RwLock<HashMap<String, String>>>,
workflows: Arc<RwLock<HashMap<String, WorkflowRecord>>>,
workflows_path: PathBuf,
}
impl PackBuilderTool {
pub fn new(state: AppState) -> Self {
let workflows_path = resolve_pack_builder_workflows_path();
let plans_path = resolve_pack_builder_plans_path();
Self {
state,
plans: Arc::new(RwLock::new(load_plans(&plans_path))),
plans_path,
last_plan_by_session: Arc::new(RwLock::new(HashMap::new())),
workflows: Arc::new(RwLock::new(load_workflows(&workflows_path))),
workflows_path,
}
}
async fn upsert_workflow(
&self,
event_type: &str,
status: WorkflowStatus,
plan_id: &str,
session_id: Option<&str>,
thread_key: Option<&str>,
goal: &str,
metadata: &Value,
) {
let now = now_ms();
let workflow_id = format!("wf-{}", plan_id);
let mut workflows = self.workflows.write().await;
let created_at_ms = workflows
.get(plan_id)
.map(|row| row.created_at_ms)
.unwrap_or(now);
workflows.insert(
plan_id.to_string(),
WorkflowRecord {
workflow_id: workflow_id.clone(),
plan_id: plan_id.to_string(),
session_id: session_id.map(ToString::to_string),
thread_key: thread_key.map(ToString::to_string),
goal: goal.to_string(),
status: status.clone(),
metadata: metadata.clone(),
created_at_ms,
updated_at_ms: now,
},
);
retain_recent_workflows(&mut workflows, 256);
save_workflows(&self.workflows_path, &workflows);
drop(workflows);
self.state.event_bus.publish(tandem_types::EngineEvent::new(
event_type,
json!({
"sessionID": session_id.unwrap_or_default(),
"threadKey": thread_key.unwrap_or_default(),
"planID": plan_id,
"status": workflow_status_label(&status),
"metadata": metadata,
}),
));
}
async fn resolve_plan_id_from_session(
&self,
session_id: Option<&str>,
thread_key: Option<&str>,
) -> Option<String> {
if let Some(session) = session_id {
if let Some(thread) = thread_key {
let scoped_key = session_thread_scope_key(session, Some(thread));
if let Some(found) = self
.last_plan_by_session
.read()
.await
.get(&scoped_key)
.cloned()
{
return Some(found);
}
}
}
if let Some(session) = session_id {
if let Some(found) = self.last_plan_by_session.read().await.get(session).cloned() {
return Some(found);
}
}
let workflows = self.workflows.read().await;
let mut best: Option<(&String, u64)> = None;
for (plan_id, wf) in workflows.iter() {
if !matches!(wf.status, WorkflowStatus::PreviewPending) {
continue;
}
if session_id.is_some() && wf.session_id.as_deref() != session_id {
continue;
}
if let Some(thread) = thread_key {
if wf.thread_key.as_deref() != Some(thread) {
continue;
}
}
let ts = wf.updated_at_ms;
if best.map(|(_, b)| ts > b).unwrap_or(true) {
best = Some((plan_id, ts));
}
}
best.map(|(plan_id, _)| plan_id.clone())
}
fn emit_metric(
&self,
metric: &str,
plan_id: &str,
status: &str,
session_id: Option<&str>,
thread_key: Option<&str>,
) {
let surface = infer_surface(thread_key);
self.state.event_bus.publish(tandem_types::EngineEvent::new(
"pack_builder.metric",
json!({
"metric": metric,
"value": 1,
"surface": surface,
"planID": plan_id,
"status": status,
"sessionID": session_id.unwrap_or_default(),
"threadKey": thread_key.unwrap_or_default(),
}),
));
}
}
#[derive(Debug, Clone, Deserialize, Default)]
struct PackBuilderInput {
#[serde(default)]
mode: Option<String>,
#[serde(default)]
goal: Option<String>,
#[serde(default)]
auto_apply: Option<bool>,
#[serde(default)]
selected_connectors: Vec<String>,
#[serde(default)]
plan_id: Option<String>,
#[serde(default)]
approve_connector_registration: Option<bool>,
#[serde(default)]
approve_pack_install: Option<bool>,
#[serde(default)]
approve_enable_routines: Option<bool>,
#[serde(default)]
schedule: Option<PreviewScheduleInput>,
#[serde(default, rename = "__session_id")]
session_id: Option<String>,
#[serde(default)]
thread_key: Option<String>,
#[serde(default)]
secret_refs_confirmed: Option<Value>,
#[serde(default)]
execution_mode: Option<String>,
#[serde(default)]
max_agents: Option<u32>,
}
#[derive(Debug, Clone, Deserialize, Default)]
struct PreviewScheduleInput {
#[serde(default)]
interval_seconds: Option<u64>,
#[serde(default)]
cron: Option<String>,
#[serde(default)]
timezone: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ConnectorCandidate {
slug: String,
name: String,
description: String,
documentation_url: String,
transport_url: String,
requires_auth: bool,
requires_setup: bool,
tool_count: usize,
score: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PreparedPlan {
plan_id: String,
goal: String,
pack_id: String,
pack_name: String,
version: String,
capabilities_required: Vec<String>,
capabilities_optional: Vec<String>,
recommended_connectors: Vec<ConnectorCandidate>,
selected_connector_slugs: Vec<String>,
selected_mcp_tools: Vec<String>,
fallback_warnings: Vec<String>,
required_secrets: Vec<String>,
generated_zip_path: PathBuf,
routine_ids: Vec<String>,
routine_template: RoutineTemplate,
created_at_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
enum WorkflowStatus {
PreviewPending,
ApplyBlockedMissingSecrets,
ApplyBlockedAuth,
ApplyComplete,
Cancelled,
Error,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct WorkflowRecord {
workflow_id: String,
plan_id: String,
session_id: Option<String>,
thread_key: Option<String>,
goal: String,
status: WorkflowStatus,
metadata: Value,
created_at_ms: u64,
updated_at_ms: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RoutineTemplate {
routine_id: String,
name: String,
timezone: String,
schedule: RoutineSchedule,
entrypoint: String,
allowed_tools: Vec<String>,
}
fn automation_v2_schedule_from_routine(
schedule: &RoutineSchedule,
timezone: &str,
) -> crate::AutomationV2Schedule {
match schedule {
RoutineSchedule::IntervalSeconds { seconds } => crate::AutomationV2Schedule {
schedule_type: crate::AutomationV2ScheduleType::Interval,
cron_expression: None,
interval_seconds: Some(*seconds),
timezone: timezone.to_string(),
misfire_policy: crate::RoutineMisfirePolicy::RunOnce,
},
RoutineSchedule::Cron { expression } => crate::AutomationV2Schedule {
schedule_type: crate::AutomationV2ScheduleType::Cron,
cron_expression: Some(expression.clone()),
interval_seconds: None,
timezone: timezone.to_string(),
misfire_policy: crate::RoutineMisfirePolicy::RunOnce,
},
}
}
fn build_pack_builder_automation(
plan: &PreparedPlan,
routine_id: &str,
execution_mode: &str,
max_agents: u32,
registered_servers: &[String],
routine_enabled: bool,
) -> crate::AutomationV2Spec {
let now = now_ms();
let automation_id = format!("automation.{}", routine_id);
crate::AutomationV2Spec {
automation_id: automation_id.clone(),
name: format!("{} automation", plan.pack_name),
description: Some(format!(
"Pack Builder automation for `{}` generated from plan `{}`.",
plan.pack_name, plan.plan_id
)),
status: crate::AutomationV2Status::Paused,
schedule: automation_v2_schedule_from_routine(
&plan.routine_template.schedule,
&plan.routine_template.timezone,
),
knowledge: tandem_orchestrator::KnowledgeBinding::default(),
agents: vec![crate::AutomationAgentProfile {
agent_id: "pack_builder_agent".to_string(),
template_id: None,
display_name: plan.pack_name.clone(),
avatar_url: None,
model_policy: None,
skills: vec![plan.pack_id.clone()],
tool_policy: crate::AutomationAgentToolPolicy {
allowlist: plan.routine_template.allowed_tools.clone(),
denylist: Vec::new(),
},
mcp_policy: crate::AutomationAgentMcpPolicy {
allowed_servers: registered_servers.to_vec(),
allowed_tools: None,
},
approval_policy: None,
}],
flow: crate::AutomationFlowSpec {
nodes: vec![crate::AutomationFlowNode {
node_id: "pack_builder_execute".to_string(),
agent_id: "pack_builder_agent".to_string(),
objective: format!(
"Execute the installed pack `{}` for this goal: {}",
plan.pack_name, plan.goal
),
knowledge: Default::default(),
depends_on: Vec::new(),
input_refs: Vec::new(),
output_contract: Some(crate::AutomationFlowOutputContract {
kind: "report_markdown".to_string(),
validator: Some(crate::AutomationOutputValidatorKind::GenericArtifact),
enforcement: None,
schema: None,
summary_guidance: None,
}),
tool_policy: None,
mcp_policy: None,
retry_policy: Some(json!({ "max_attempts": 3 })),
timeout_ms: None,
max_tool_calls: None,
stage_kind: Some(crate::AutomationNodeStageKind::Workstream),
gate: None,
metadata: Some(json!({
"builder": {
"origin": "pack_builder",
"task_kind": "pack_recipe",
"execution_mode": execution_mode,
},
"pack_builder": {
"pack_id": plan.pack_id,
"pack_name": plan.pack_name,
"plan_id": plan.plan_id,
"routine_id": routine_id,
}
})),
}],
},
execution: crate::AutomationExecutionPolicy {
profile: None,
max_parallel_agents: Some(max_agents.clamp(1, 16)),
max_total_runtime_ms: None,
max_total_tool_calls: None,
max_total_tokens: None,
max_total_cost_usd: None,
},
output_targets: vec![format!("run/{routine_id}/report.md")],
created_at_ms: now,
updated_at_ms: now,
creator_id: "pack_builder".to_string(),
workspace_root: None,
metadata: Some(json!({
"origin": "pack_builder",
"pack_builder_plan_id": plan.plan_id,
"pack_id": plan.pack_id,
"pack_name": plan.pack_name,
"goal": plan.goal,
"execution_mode": execution_mode,
"routine_id": routine_id,
"activation_mode": "routine_wrapper_mirror",
"routine_enabled": routine_enabled,
"registered_servers": registered_servers,
})),
next_fire_at_ms: None,
last_fired_at_ms: None,
scope_policy: None,
watch_conditions: Vec::new(),
handoff_config: None,
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CapabilityNeed {
id: String,
external: bool,
query_terms: Vec<String>,
}
#[derive(Debug, Clone)]
struct CatalogServer {
slug: String,
name: String,
description: String,
documentation_url: String,
transport_url: String,
requires_auth: bool,
requires_setup: bool,
tool_names: Vec<String>,
}
#[derive(Clone)]
struct McpBridgeTool {
schema: ToolSchema,
mcp: tandem_runtime::McpRegistry,
server_name: String,
tool_name: String,
}
#[async_trait]
impl Tool for McpBridgeTool {
fn schema(&self) -> ToolSchema {
self.schema.clone()
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
self.mcp
.call_tool(&self.server_name, &self.tool_name, args)
.await
.map_err(anyhow::Error::msg)
}
}
#[async_trait]
impl Tool for PackBuilderTool {
fn schema(&self) -> ToolSchema {
ToolSchema::new(
"pack_builder",
"MCP-first Tandem pack builder with preview/apply phases",
json!({
"type": "object",
"properties": {
"mode": {"type": "string", "enum": ["preview", "apply", "cancel", "pending"]},
"goal": {"type": "string"},
"auto_apply": {"type": "boolean"},
"plan_id": {"type": "string"},
"thread_key": {"type": "string"},
"secret_refs_confirmed": {"oneOf":[{"type":"boolean"},{"type":"array","items":{"type":"string"}}]},
"selected_connectors": {"type": "array", "items": {"type": "string"}},
"approve_connector_registration": {"type": "boolean"},
"approve_pack_install": {"type": "boolean"},
"approve_enable_routines": {"type": "boolean"},
"execution_mode": {
"type": "string",
"enum": ["single", "team", "swarm"],
"description": "Execution architecture: single agent, orchestrated team, or parallel swarm"
},
"max_agents": {"type": "integer", "minimum": 2, "maximum": 32},
"schedule": {
"type": "object",
"properties": {
"interval_seconds": {"type": "integer", "minimum": 30},
"cron": {"type": "string"},
"timezone": {"type": "string"}
}
}
},
"required": ["mode"]
}),
)
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let mut input: PackBuilderInput = serde_json::from_value(args).unwrap_or_default();
let mut mode = input
.mode
.as_deref()
.unwrap_or("preview")
.trim()
.to_ascii_lowercase();
if mode == "apply" && input.plan_id.is_none() {
input.plan_id = self
.resolve_plan_id_from_session(
input.session_id.as_deref(),
input.thread_key.as_deref(),
)
.await;
}
if mode == "preview" {
let goal_text = input.goal.as_deref().map(str::trim).unwrap_or("");
if is_confirmation_goal_text(goal_text) {
if let Some(last_plan_id) = self
.resolve_plan_id_from_session(
input.session_id.as_deref(),
input.thread_key.as_deref(),
)
.await
{
input.mode = Some("apply".to_string());
input.plan_id = Some(last_plan_id);
input.approve_pack_install = Some(true);
input.approve_connector_registration = Some(true);
input.approve_enable_routines = Some(true);
mode = "apply".to_string();
}
}
}
match mode.as_str() {
"cancel" => self.cancel(input).await,
"pending" => self.pending(input).await,
"apply" => self.apply(input).await,
_ => self.preview(input).await,
}
}
}
impl PackBuilderTool {
async fn preview(&self, input: PackBuilderInput) -> anyhow::Result<ToolResult> {
let goal = input
.goal
.as_deref()
.map(str::trim)
.filter(|v| !v.is_empty())
.unwrap_or("Create a useful automation pack")
.to_string();
let needs = infer_capabilities_from_goal(&goal);
let all_catalog = catalog_servers();
let builtin_tools = available_builtin_tools(&self.state).await;
let mut recommended_connectors = Vec::<ConnectorCandidate>::new();
let mut selected_connector_slugs = BTreeSet::<String>::new();
let mut selected_mcp_tools = BTreeSet::<String>::new();
let mut required = Vec::<String>::new();
let mut optional = Vec::<String>::new();
let mut fallback_warnings = Vec::<String>::new();
let mut unresolved_external_needs = Vec::<String>::new();
let mut resolved_needs = BTreeSet::<String>::new();
for need in &needs {
if need.external {
required.push(need.id.clone());
} else {
optional.push(need.id.clone());
}
if !need.external {
continue;
}
if need_satisfied_by_builtin(&builtin_tools, need) {
resolved_needs.insert(need.id.clone());
continue;
}
unresolved_external_needs.push(need.id.clone());
let mut candidates = score_candidates_for_need(&all_catalog, need);
if candidates.is_empty() {
fallback_warnings.push(format!(
"No MCP connector found for capability `{}`. Falling back to built-in tools.",
need.id
));
continue;
}
candidates.sort_by(|a, b| b.score.cmp(&a.score).then_with(|| a.slug.cmp(&b.slug)));
if let Some(best) = candidates.first() {
if should_auto_select_connector(need, best) {
selected_connector_slugs.insert(best.slug.clone());
resolved_needs.insert(need.id.clone());
if let Some(server) = all_catalog.iter().find(|s| s.slug == best.slug) {
for tool in server.tool_names.iter().take(3) {
selected_mcp_tools.insert(format!(
"mcp.{}.{}",
namespace_segment(&server.slug),
namespace_segment(tool)
));
}
}
}
}
recommended_connectors.extend(candidates.into_iter().take(3));
}
recommended_connectors
.sort_by(|a, b| b.score.cmp(&a.score).then_with(|| a.slug.cmp(&b.slug)));
recommended_connectors.dedup_by(|a, b| a.slug == b.slug);
let schedule = build_schedule(input.schedule.as_ref());
let pack_slug = goal_to_slug(&goal);
let pack_id = format!("tpk_pack_builder_{}", pack_slug);
let pack_name = format!("pack-builder-{}", pack_slug);
let version = "0.4.1".to_string();
let zips_dir = resolve_pack_builder_zips_dir();
fs::create_dir_all(&zips_dir)?;
let stage_id = Uuid::new_v4();
let pack_root = zips_dir.join(format!("stage-{}", stage_id)).join("pack");
fs::create_dir_all(pack_root.join("agents"))?;
fs::create_dir_all(pack_root.join("missions"))?;
fs::create_dir_all(pack_root.join("routines"))?;
let mission_id = "default".to_string();
let routine_id = "default".to_string();
let tool_ids = selected_mcp_tools.iter().cloned().collect::<Vec<_>>();
let routine_template = RoutineTemplate {
routine_id: format!("{}.{}", pack_id, routine_id),
name: format!("{} routine", pack_name),
timezone: schedule.2.clone(),
schedule: schedule.0.clone(),
entrypoint: "mission.default".to_string(),
allowed_tools: build_allowed_tools(&tool_ids, &needs),
};
let mission_yaml = render_mission_yaml(&mission_id, &tool_ids, &needs);
let agent_md = render_agent_md(&tool_ids, &goal);
let routine_yaml = render_routine_yaml(
&routine_id,
&schedule.0,
&schedule.1,
&schedule.2,
&routine_template.allowed_tools,
);
let manifest_yaml = render_manifest_yaml(
&pack_id,
&pack_name,
&version,
&required,
&optional,
&mission_id,
&routine_id,
);
fs::write(pack_root.join("missions/default.yaml"), mission_yaml)?;
fs::write(pack_root.join("agents/default.md"), agent_md)?;
fs::write(pack_root.join("routines/default.yaml"), routine_yaml)?;
fs::write(pack_root.join("tandempack.yaml"), manifest_yaml)?;
fs::write(pack_root.join("README.md"), "# Generated by pack_builder\n")?;
let zip_path = pack_root
.parent()
.expect("pack_root always has a parent staging dir")
.join(format!("{}-{}.zip", pack_name, version));
zip_dir(&pack_root, &zip_path)?;
let plan_id = format!("plan-{}", Uuid::new_v4());
let selected_connector_slugs = selected_connector_slugs.into_iter().collect::<Vec<_>>();
let required_secrets =
derive_required_secret_refs_for_selected(&all_catalog, &selected_connector_slugs);
let connector_selection_required = unresolved_external_needs
.iter()
.any(|need_id| !resolved_needs.contains(need_id));
let auto_apply_requested = input.auto_apply.unwrap_or(true);
let auto_apply_ready = auto_apply_requested
&& !connector_selection_required
&& required_secrets.is_empty()
&& fallback_warnings.is_empty();
let prepared = PreparedPlan {
plan_id: plan_id.clone(),
goal: goal.clone(),
pack_id: pack_id.clone(),
pack_name: pack_name.clone(),
version,
capabilities_required: required.clone(),
capabilities_optional: optional.clone(),
recommended_connectors: recommended_connectors.clone(),
selected_connector_slugs: selected_connector_slugs.clone(),
selected_mcp_tools: tool_ids.clone(),
fallback_warnings: fallback_warnings.clone(),
required_secrets: required_secrets.clone(),
generated_zip_path: zip_path.clone(),
routine_ids: vec![routine_template.routine_id.clone()],
routine_template,
created_at_ms: now_ms(),
};
{
let mut plans = self.plans.write().await;
plans.insert(plan_id.clone(), prepared);
retain_recent_plans(&mut plans, 256);
save_plans(&self.plans_path, &plans);
}
if let Some(session_id) = input
.session_id
.as_deref()
.map(str::trim)
.filter(|v| !v.is_empty())
{
let mut last = self.last_plan_by_session.write().await;
last.insert(session_id.to_string(), plan_id.clone());
if let Some(thread_key) = input
.thread_key
.as_deref()
.map(str::trim)
.filter(|v| !v.is_empty())
{
last.insert(
session_thread_scope_key(session_id, Some(thread_key)),
plan_id.clone(),
);
}
}
let output = json!({
"workflow_id": format!("wf-{}", plan_id),
"mode": "preview",
"plan_id": plan_id,
"session_id": input.session_id,
"thread_key": input.thread_key,
"goal": goal,
"pack": {
"pack_id": pack_id,
"name": pack_name,
"version": "0.4.1"
},
"connector_candidates": recommended_connectors,
"selected_connectors": selected_connector_slugs,
"connector_selection_required": connector_selection_required,
"mcp_mapping": tool_ids,
"fallback_warnings": fallback_warnings,
"required_secrets": required_secrets,
"zip_path": zip_path.to_string_lossy(),
"auto_apply_requested": auto_apply_requested,
"auto_apply_ready": auto_apply_ready,
"status": "preview_pending",
"next_actions": build_preview_next_actions(
connector_selection_required,
&required_secrets,
!selected_connector_slugs.is_empty(),
),
"approval_required": {
"register_connectors": false,
"install_pack": false,
"enable_routines": false
}
});
self.emit_metric(
"pack_builder.preview.count",
plan_id.as_str(),
"preview_pending",
input.session_id.as_deref(),
input.thread_key.as_deref(),
);
if auto_apply_ready {
let applied = self
.apply(PackBuilderInput {
mode: Some("apply".to_string()),
goal: None,
auto_apply: Some(false),
selected_connectors: selected_connector_slugs.clone(),
plan_id: Some(plan_id.clone()),
approve_connector_registration: Some(true),
approve_pack_install: Some(true),
approve_enable_routines: Some(true),
schedule: None,
session_id: input.session_id.clone(),
thread_key: input.thread_key.clone(),
secret_refs_confirmed: Some(json!(true)),
execution_mode: input.execution_mode.clone(),
max_agents: input.max_agents,
})
.await?;
let mut metadata = applied.metadata.clone();
if let Some(obj) = metadata.as_object_mut() {
obj.insert("auto_applied_from_preview".to_string(), json!(true));
obj.insert("preview_plan_id".to_string(), json!(plan_id));
}
self.upsert_workflow(
"pack_builder.apply_completed",
WorkflowStatus::ApplyComplete,
plan_id.as_str(),
input.session_id.as_deref(),
input.thread_key.as_deref(),
goal.as_str(),
&metadata,
)
.await;
return Ok(ToolResult {
output: render_pack_builder_apply_output(&metadata),
metadata,
});
}
self.upsert_workflow(
"pack_builder.preview_ready",
WorkflowStatus::PreviewPending,
plan_id.as_str(),
input.session_id.as_deref(),
input.thread_key.as_deref(),
goal.as_str(),
&output,
)
.await;
Ok(ToolResult {
output: render_pack_builder_preview_output(&output),
metadata: output,
})
}
async fn apply(&self, input: PackBuilderInput) -> anyhow::Result<ToolResult> {
let resolved_plan_id = if input.plan_id.is_none() {
self.resolve_plan_id_from_session(
input.session_id.as_deref(),
input.thread_key.as_deref(),
)
.await
} else {
input.plan_id.clone()
};
let Some(plan_id) = resolved_plan_id.as_deref() else {
self.emit_metric(
"pack_builder.apply.wrong_plan_prevented",
"unknown",
"error",
input.session_id.as_deref(),
input.thread_key.as_deref(),
);
let output = json!({"error":"plan_id is required for apply"});
self.upsert_workflow(
"pack_builder.error",
WorkflowStatus::Error,
"unknown",
input.session_id.as_deref(),
input.thread_key.as_deref(),
input.goal.as_deref().unwrap_or_default(),
&output,
)
.await;
return Ok(ToolResult {
output: render_pack_builder_apply_output(&output),
metadata: output,
});
};
let plan = {
let guard = self.plans.read().await;
guard.get(plan_id).cloned()
};
let Some(plan) = plan else {
self.emit_metric(
"pack_builder.apply.wrong_plan_prevented",
plan_id,
"error",
input.session_id.as_deref(),
input.thread_key.as_deref(),
);
let output = json!({"error":"unknown plan_id", "plan_id": plan_id});
self.upsert_workflow(
"pack_builder.error",
WorkflowStatus::Error,
plan_id,
input.session_id.as_deref(),
input.thread_key.as_deref(),
input.goal.as_deref().unwrap_or_default(),
&output,
)
.await;
return Ok(ToolResult {
output: render_pack_builder_apply_output(&output),
metadata: output,
});
};
let session_id = input.session_id.as_deref();
let thread_key = input.thread_key.as_deref();
if self
.workflows
.read()
.await
.get(plan_id)
.map(|wf| matches!(wf.status, WorkflowStatus::Cancelled))
.unwrap_or(false)
{
let output = json!({
"error":"plan_cancelled",
"plan_id": plan_id,
"status":"cancelled",
"next_actions": ["Create a new preview to continue."]
});
return Ok(ToolResult {
output: render_pack_builder_apply_output(&output),
metadata: output,
});
}
self.emit_metric(
"pack_builder.apply.count",
plan_id,
"apply_started",
session_id,
thread_key,
);
if input.approve_pack_install != Some(true) {
let output = json!({
"error": "approval_required",
"required": {
"approve_pack_install": true
},
"status": "error"
});
self.upsert_workflow(
"pack_builder.error",
WorkflowStatus::Error,
plan_id,
session_id,
thread_key,
&plan.goal,
&output,
)
.await;
return Ok(ToolResult {
output: render_pack_builder_apply_output(&output),
metadata: output,
});
}
let all_catalog = catalog_servers();
let selected = if input.selected_connectors.is_empty() {
plan.selected_connector_slugs.clone()
} else {
input.selected_connectors.clone()
};
if !selected.is_empty() && input.approve_connector_registration != Some(true) {
let output = json!({
"error": "approval_required",
"required": {
"approve_connector_registration": true,
"approve_pack_install": true
},
"status": "error"
});
self.upsert_workflow(
"pack_builder.error",
WorkflowStatus::Error,
plan_id,
session_id,
thread_key,
&plan.goal,
&output,
)
.await;
return Ok(ToolResult {
output: render_pack_builder_apply_output(&output),
metadata: output,
});
}
if !plan.required_secrets.is_empty()
&& !secret_refs_confirmed(&input.secret_refs_confirmed, &plan.required_secrets)
{
let output = json!({
"workflow_id": format!("wf-{}", plan.plan_id),
"mode": "apply",
"plan_id": plan.plan_id,
"session_id": input.session_id,
"thread_key": input.thread_key,
"goal": plan.goal,
"status": "apply_blocked_missing_secrets",
"required_secrets": plan.required_secrets,
"next_actions": [
"Set required secrets in engine settings/environment.",
"Re-run apply with `secret_refs_confirmed` after secrets are set."
],
});
self.upsert_workflow(
"pack_builder.apply_blocked",
WorkflowStatus::ApplyBlockedMissingSecrets,
plan_id,
session_id,
thread_key,
&plan.goal,
&output,
)
.await;
self.emit_metric(
"pack_builder.apply.blocked_missing_secrets",
plan_id,
"apply_blocked_missing_secrets",
session_id,
thread_key,
);
return Ok(ToolResult {
output: render_pack_builder_apply_output(&output),
metadata: output,
});
}
let auth_blocked = selected.iter().any(|slug| {
plan.recommended_connectors
.iter()
.any(|c| &c.slug == slug && (c.requires_setup || c.transport_url.contains('{')))
});
if auth_blocked {
let output = json!({
"workflow_id": format!("wf-{}", plan.plan_id),
"mode": "apply",
"plan_id": plan.plan_id,
"session_id": input.session_id,
"thread_key": input.thread_key,
"goal": plan.goal,
"status": "apply_blocked_auth",
"selected_connectors": selected,
"next_actions": [
"Complete connector setup/auth from the connector documentation.",
"Re-run apply after connector auth is completed."
],
});
self.upsert_workflow(
"pack_builder.apply_blocked",
WorkflowStatus::ApplyBlockedAuth,
plan_id,
session_id,
thread_key,
&plan.goal,
&output,
)
.await;
self.emit_metric(
"pack_builder.apply.blocked_auth",
plan_id,
"apply_blocked_auth",
session_id,
thread_key,
);
return Ok(ToolResult {
output: render_pack_builder_apply_output(&output),
metadata: output,
});
}
self.state.event_bus.publish(tandem_types::EngineEvent::new(
"pack_builder.apply_started",
json!({
"sessionID": session_id.unwrap_or_default(),
"threadKey": thread_key.unwrap_or_default(),
"planID": plan_id,
"status": "apply_started",
}),
));
if !plan.generated_zip_path.exists() {
let output = json!({
"workflow_id": format!("wf-{}", plan.plan_id),
"mode": "apply",
"plan_id": plan.plan_id,
"session_id": input.session_id,
"thread_key": input.thread_key,
"goal": plan.goal,
"status": "apply_blocked_missing_preview_artifacts",
"error": "preview_artifacts_missing",
"next_actions": [
"Run a new Pack Builder preview for this goal.",
"Confirm apply from the new preview."
]
});
self.upsert_workflow(
"pack_builder.apply_blocked",
WorkflowStatus::Error,
plan_id,
session_id,
thread_key,
&plan.goal,
&output,
)
.await;
return Ok(ToolResult {
output: render_pack_builder_apply_output(&output),
metadata: output,
});
}
let mut connector_results = Vec::<Value>::new();
let mut registered_servers = Vec::<String>::new();
for slug in &selected {
let Some(server) = all_catalog.iter().find(|s| &s.slug == slug) else {
connector_results
.push(json!({"slug": slug, "ok": false, "error": "not_in_catalog"}));
continue;
};
let transport = if server.transport_url.contains('{') || server.transport_url.is_empty()
{
connector_results.push(json!({
"slug": server.slug,
"ok": false,
"error": "transport_requires_manual_setup",
"documentation_url": server.documentation_url
}));
continue;
} else {
server.transport_url.clone()
};
let name = server.slug.clone();
self.state
.mcp
.add_or_update(name.clone(), transport, HashMap::new(), true)
.await;
let connected = self.state.mcp.connect(&name).await;
let tool_count = if connected {
sync_mcp_tools_for_server(&self.state, &name).await
} else {
0
};
if connected {
registered_servers.push(name.clone());
}
connector_results.push(json!({
"slug": server.slug,
"ok": connected,
"registered_name": name,
"tool_count": tool_count,
"documentation_url": server.documentation_url,
"requires_auth": server.requires_auth
}));
}
let installed = self
.state
.pack_manager
.install(PackInstallRequest {
path: Some(plan.generated_zip_path.to_string_lossy().to_string()),
url: None,
source: json!({"kind":"pack_builder", "plan_id": plan.plan_id, "goal": plan.goal}),
})
.await?;
let mut routines_registered = Vec::<String>::new();
let mut automations_registered = Vec::<String>::new();
for routine_id in &plan.routine_ids {
let exec_mode = input
.execution_mode
.as_deref()
.map(str::trim)
.filter(|v| !v.is_empty())
.unwrap_or("team");
let max_agents = input.max_agents.unwrap_or(4);
let mut routine = RoutineSpec {
routine_id: routine_id.clone(),
name: plan.routine_template.name.clone(),
status: RoutineStatus::Active,
schedule: plan.routine_template.schedule.clone(),
timezone: plan.routine_template.timezone.clone(),
misfire_policy: RoutineMisfirePolicy::RunOnce,
entrypoint: plan.routine_template.entrypoint.clone(),
args: json!({
"prompt": plan.goal,
"mode": exec_mode,
"uses_external_integrations": true,
"pack_id": plan.pack_id,
"pack_name": plan.pack_name,
"pack_builder_plan_id": plan.plan_id,
"orchestration": {
"execution_mode": exec_mode,
"max_agents": max_agents,
"objective": plan.goal,
},
}),
allowed_tools: plan.routine_template.allowed_tools.clone(),
output_targets: vec![format!("run/{}/report.md", routine_id)],
creator_type: "agent".to_string(),
creator_id: "pack_builder".to_string(),
requires_approval: false,
external_integrations_allowed: true,
next_fire_at_ms: None,
last_fired_at_ms: None,
};
if input.approve_enable_routines == Some(false) {
routine.status = RoutineStatus::Paused;
}
let automation = build_pack_builder_automation(
&plan,
routine_id,
exec_mode,
max_agents,
®istered_servers,
input.approve_enable_routines != Some(false),
);
let stored_automation = self.state.put_automation_v2(automation).await?;
automations_registered.push(stored_automation.automation_id.clone());
let stored = self
.state
.put_routine(routine)
.await
.map_err(|err| anyhow::anyhow!("failed to register routine: {:?}", err))?;
routines_registered.push(stored.routine_id);
}
let preset_path = save_pack_preset(&plan, ®istered_servers)?;
let output = json!({
"workflow_id": format!("wf-{}", plan.plan_id),
"mode": "apply",
"plan_id": plan.plan_id,
"session_id": input.session_id,
"thread_key": input.thread_key,
"capabilities": {
"required": plan.capabilities_required,
"optional": plan.capabilities_optional
},
"pack_installed": {
"pack_id": installed.pack_id,
"name": installed.name,
"version": installed.version,
"install_path": installed.install_path,
},
"connectors": connector_results,
"registered_servers": registered_servers,
"automations_registered": automations_registered,
"routines_registered": routines_registered,
"routines_enabled": input.approve_enable_routines != Some(false),
"fallback_warnings": plan.fallback_warnings,
"status": "apply_complete",
"next_actions": [
"Review the installed pack in Packs view.",
"Routine is enabled by default and will run on schedule."
],
"pack_preset": {
"path": preset_path.to_string_lossy().to_string(),
"required_secrets": plan.required_secrets,
"selected_tools": plan.selected_mcp_tools,
}
});
self.upsert_workflow(
"pack_builder.apply_completed",
WorkflowStatus::ApplyComplete,
plan_id,
session_id,
thread_key,
&plan.goal,
&output,
)
.await;
self.emit_metric(
"pack_builder.apply.success",
plan_id,
"apply_complete",
session_id,
thread_key,
);
Ok(ToolResult {
output: render_pack_builder_apply_output(&output),
metadata: output,
})
}
async fn cancel(&self, input: PackBuilderInput) -> anyhow::Result<ToolResult> {
let plan_id = if let Some(plan_id) = input.plan_id.as_deref().map(str::trim) {
if !plan_id.is_empty() {
Some(plan_id.to_string())
} else {
None
}
} else {
self.resolve_plan_id_from_session(
input.session_id.as_deref(),
input.thread_key.as_deref(),
)
.await
};
let Some(plan_id) = plan_id else {
let output = json!({"error":"plan_id is required for cancel"});
return Ok(ToolResult {
output: render_pack_builder_apply_output(&output),
metadata: output,
});
};
let goal = self
.plans
.read()
.await
.get(&plan_id)
.map(|p| p.goal.clone())
.unwrap_or_default();
let output = json!({
"workflow_id": format!("wf-{}", plan_id),
"mode": "cancel",
"plan_id": plan_id,
"session_id": input.session_id,
"thread_key": input.thread_key,
"goal": goal,
"status": "cancelled",
"next_actions": ["Create a new preview when ready."]
});
self.upsert_workflow(
"pack_builder.cancelled",
WorkflowStatus::Cancelled,
output
.get("plan_id")
.and_then(Value::as_str)
.unwrap_or_default(),
input.session_id.as_deref(),
input.thread_key.as_deref(),
output
.get("goal")
.and_then(Value::as_str)
.unwrap_or_default(),
&output,
)
.await;
self.emit_metric(
"pack_builder.apply.cancelled",
output
.get("plan_id")
.and_then(Value::as_str)
.unwrap_or_default(),
"cancelled",
input.session_id.as_deref(),
input.thread_key.as_deref(),
);
Ok(ToolResult {
output: "Pack Builder Apply Cancelled\n- Pending plan cancelled.".to_string(),
metadata: output,
})
}
async fn pending(&self, input: PackBuilderInput) -> anyhow::Result<ToolResult> {
let plan_id = if let Some(plan_id) = input.plan_id.as_deref().map(str::trim) {
if !plan_id.is_empty() {
Some(plan_id.to_string())
} else {
None
}
} else {
self.resolve_plan_id_from_session(
input.session_id.as_deref(),
input.thread_key.as_deref(),
)
.await
};
let Some(plan_id) = plan_id else {
let output = json!({"status":"none","pending":null});
return Ok(ToolResult {
output: "No pending pack-builder plan for this session.".to_string(),
metadata: output,
});
};
let workflows = self.workflows.read().await;
let Some(record) = workflows.get(&plan_id) else {
let output = json!({"status":"none","plan_id":plan_id});
return Ok(ToolResult {
output: "No pending pack-builder plan found.".to_string(),
metadata: output,
});
};
let output = json!({
"status":"ok",
"pending": record,
"plan_id": plan_id
});
Ok(ToolResult {
output: serde_json::to_string_pretty(&output).unwrap_or_else(|_| output.to_string()),
metadata: output,
})
}
}
fn render_pack_builder_preview_output(meta: &Value) -> String {
let goal = meta
.get("goal")
.and_then(Value::as_str)
.unwrap_or("automation goal");
let plan_id = meta.get("plan_id").and_then(Value::as_str).unwrap_or("-");
let pack_name = meta
.get("pack")
.and_then(|v| v.get("name"))
.and_then(Value::as_str)
.unwrap_or("generated-pack");
let pack_id = meta
.get("pack")
.and_then(|v| v.get("pack_id"))
.and_then(Value::as_str)
.unwrap_or("-");
let auto_apply_ready = meta
.get("auto_apply_ready")
.and_then(Value::as_bool)
.unwrap_or(false);
let connector_selection_required = meta
.get("connector_selection_required")
.and_then(Value::as_bool)
.unwrap_or(false);
let selected_connectors = meta
.get("selected_connectors")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(|v| format!("- {}", v))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let required_secrets = meta
.get("required_secrets")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(|v| format!("- {}", v))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let fallback_warnings = meta
.get("fallback_warnings")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(|v| format!("- {}", v))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let mut lines = vec![
"Pack Builder Preview".to_string(),
format!("- Goal: {}", goal),
format!("- Plan ID: {}", plan_id),
format!("- Pack: {} ({})", pack_name, pack_id),
];
if selected_connectors.is_empty() {
lines.push("- Selected connectors: none".to_string());
} else {
lines.push("- Selected connectors:".to_string());
lines.extend(selected_connectors);
}
if required_secrets.is_empty() {
lines.push("- Required secrets: none".to_string());
} else {
lines.push("- Required secrets:".to_string());
lines.extend(required_secrets);
}
if !fallback_warnings.is_empty() {
lines.push("- Warnings:".to_string());
lines.extend(fallback_warnings);
}
if auto_apply_ready {
lines.push("- Status: ready for automatic apply".to_string());
} else {
lines.push("- Status: waiting for apply confirmation".to_string());
if connector_selection_required {
lines.push("- Action needed: choose connectors before apply.".to_string());
}
}
lines.join("\n")
}
fn render_pack_builder_apply_output(meta: &Value) -> String {
if let Some(status) = meta.get("status").and_then(Value::as_str) {
match status {
"apply_blocked_missing_secrets" => {
let required = meta
.get("required_secrets")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(|v| format!("- {}", v))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let mut lines = vec![
"Pack Builder Apply Blocked".to_string(),
"- Reason: missing required secrets.".to_string(),
];
if !required.is_empty() {
lines.push("- Required secrets:".to_string());
lines.extend(required);
}
lines.push("- Action: set secrets, then apply again.".to_string());
return lines.join("\n");
}
"apply_blocked_auth" => {
let connectors = meta
.get("selected_connectors")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(|v| format!("- {}", v))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let mut lines = vec![
"Pack Builder Apply Blocked".to_string(),
"- Reason: connector authentication/setup required.".to_string(),
];
if !connectors.is_empty() {
lines.push("- Connectors awaiting setup:".to_string());
lines.extend(connectors);
}
lines.push("- Action: complete connector auth, then apply again.".to_string());
return lines.join("\n");
}
"cancelled" => {
return "Pack Builder Apply Cancelled\n- Pending plan cancelled.".to_string();
}
"apply_blocked_missing_preview_artifacts" => {
return "Pack Builder Apply Blocked\n- Preview artifacts expired. Run preview again, then confirm.".to_string();
}
_ => {}
}
}
if let Some(error) = meta.get("error").and_then(Value::as_str) {
return match error {
"approval_required" => {
"Pack Builder Apply Blocked\n- Approval required for this apply step.".to_string()
}
"unknown plan_id" => "Pack Builder Apply Failed\n- Plan not found.".to_string(),
"plan_cancelled" => {
"Pack Builder Apply Failed\n- Plan was already cancelled.".to_string()
}
_ => format!("Pack Builder Apply Failed\n- {}", error),
};
}
let pack_id = meta
.get("pack_installed")
.and_then(|v| v.get("pack_id"))
.and_then(Value::as_str)
.unwrap_or("-");
let pack_name = meta
.get("pack_installed")
.and_then(|v| v.get("name"))
.and_then(Value::as_str)
.unwrap_or("-");
let install_path = meta
.get("pack_installed")
.and_then(|v| v.get("install_path"))
.and_then(Value::as_str)
.unwrap_or("-");
let routines_enabled = meta
.get("routines_enabled")
.and_then(Value::as_bool)
.unwrap_or(false);
let registered_servers = meta
.get("registered_servers")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(|v| format!("- {}", v))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let routines = meta
.get("routines_registered")
.and_then(Value::as_array)
.map(|rows| {
rows.iter()
.filter_map(Value::as_str)
.map(|v| format!("- {}", v))
.collect::<Vec<_>>()
})
.unwrap_or_default();
let mut lines = vec![
"Pack Builder Apply Complete".to_string(),
format!("- Installed pack: {} ({})", pack_name, pack_id),
format!("- Install path: {}", install_path),
format!(
"- Routines: {}",
if routines_enabled {
"enabled"
} else {
"paused"
}
),
];
if registered_servers.is_empty() {
lines.push("- Registered connectors: none".to_string());
} else {
lines.push("- Registered connectors:".to_string());
lines.extend(registered_servers);
}
if !routines.is_empty() {
lines.push("- Registered routines:".to_string());
lines.extend(routines);
}
lines.join("\n")
}
fn resolve_pack_builder_workflows_path() -> PathBuf {
if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
let trimmed = dir.trim();
if !trimmed.is_empty() {
let base = PathBuf::from(trimmed);
return if base.file_name().and_then(|value| value.to_str()) == Some("data") {
base.join("pack-builder").join("workflows.json")
} else {
base.join("data")
.join("pack-builder")
.join("workflows.json")
};
}
}
if let Some(data_dir) = dirs::data_dir() {
return data_dir
.join("tandem")
.join("data")
.join("pack-builder")
.join("workflows.json");
}
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(".tandem")
.join("data")
.join("pack-builder")
.join("workflows.json")
}
fn resolve_pack_builder_plans_path() -> PathBuf {
if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
let trimmed = dir.trim();
if !trimmed.is_empty() {
let base = PathBuf::from(trimmed);
return if base.file_name().and_then(|value| value.to_str()) == Some("data") {
base.join("pack-builder").join("plans.json")
} else {
base.join("data").join("pack-builder").join("plans.json")
};
}
}
if let Some(data_dir) = dirs::data_dir() {
return data_dir
.join("tandem")
.join("data")
.join("pack-builder")
.join("plans.json");
}
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(".tandem")
.join("data")
.join("pack-builder")
.join("plans.json")
}
fn resolve_pack_builder_zips_dir() -> PathBuf {
if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
let trimmed = dir.trim();
if !trimmed.is_empty() {
let base = PathBuf::from(trimmed);
return if base.file_name().and_then(|value| value.to_str()) == Some("data") {
base.join("pack-builder").join("zips")
} else {
base.join("data").join("pack-builder").join("zips")
};
}
}
if let Some(data_dir) = dirs::data_dir() {
return data_dir
.join("tandem")
.join("data")
.join("pack-builder")
.join("zips");
}
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join(".tandem")
.join("data")
.join("pack-builder")
.join("zips")
}
fn load_workflows(path: &PathBuf) -> HashMap<String, WorkflowRecord> {
let read_path = if path.exists() {
path.clone()
} else {
legacy_pack_builder_root_file("pack_builder_workflows.json")
};
let Ok(bytes) = fs::read(read_path) else {
return HashMap::new();
};
serde_json::from_slice::<HashMap<String, WorkflowRecord>>(&bytes).unwrap_or_default()
}
fn save_workflows(path: &PathBuf, workflows: &HashMap<String, WorkflowRecord>) {
if let Some(parent) = path.parent() {
let _ = fs::create_dir_all(parent);
}
if let Ok(bytes) = serde_json::to_vec_pretty(workflows) {
let _ = fs::write(path, bytes);
}
}
fn load_plans(path: &PathBuf) -> HashMap<String, PreparedPlan> {
let read_path = if path.exists() {
path.clone()
} else {
legacy_pack_builder_root_file("pack_builder_plans.json")
};
let Ok(bytes) = fs::read(read_path) else {
return HashMap::new();
};
serde_json::from_slice::<HashMap<String, PreparedPlan>>(&bytes).unwrap_or_default()
}
fn legacy_pack_builder_root_file(file_name: &str) -> PathBuf {
if let Ok(dir) = std::env::var("TANDEM_STATE_DIR") {
let trimmed = dir.trim();
if !trimmed.is_empty() {
let base = PathBuf::from(trimmed);
if base.file_name().and_then(|value| value.to_str()) != Some("data") {
return base.join(file_name);
}
return base
.parent()
.map(|parent| parent.join(file_name))
.unwrap_or_else(|| base.join(file_name));
}
}
dirs::data_dir()
.map(|base| base.join("tandem").join(file_name))
.or_else(|| dirs::home_dir().map(|home| home.join(".tandem").join(file_name)))
.unwrap_or_else(|| PathBuf::from(file_name))
}
fn save_plans(path: &PathBuf, plans: &HashMap<String, PreparedPlan>) {
if let Some(parent) = path.parent() {
let _ = fs::create_dir_all(parent);
}
if let Ok(bytes) = serde_json::to_vec_pretty(plans) {
let _ = fs::write(path, bytes);
}
}
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
fn retain_recent_workflows(workflows: &mut HashMap<String, WorkflowRecord>, keep: usize) {
if workflows.len() <= keep {
return;
}
let mut rows = workflows
.iter()
.map(|(key, value)| (key.clone(), value.updated_at_ms))
.collect::<Vec<_>>();
rows.sort_by(|a, b| b.1.cmp(&a.1));
let keep_keys = rows
.into_iter()
.take(keep)
.map(|(key, _)| key)
.collect::<BTreeSet<_>>();
workflows.retain(|key, _| keep_keys.contains(key));
}
fn retain_recent_plans(plans: &mut HashMap<String, PreparedPlan>, keep: usize) {
if plans.len() <= keep {
return;
}
let mut rows = plans
.iter()
.map(|(key, value)| {
(
key.clone(),
value.created_at_ms,
value.generated_zip_path.clone(),
)
})
.collect::<Vec<_>>();
rows.sort_by(|a, b| b.1.cmp(&a.1));
let mut keep_keys = BTreeSet::<String>::new();
let mut evict_zips = Vec::<PathBuf>::new();
for (i, (key, _, zip_path)) in rows.iter().enumerate() {
if i < keep {
keep_keys.insert(key.clone());
} else {
evict_zips.push(zip_path.clone());
}
}
plans.retain(|key, _| keep_keys.contains(key));
for zip in evict_zips {
if let Some(stage_dir) = zip.parent() {
let _ = fs::remove_dir_all(stage_dir);
}
}
}
fn session_thread_scope_key(session_id: &str, thread_key: Option<&str>) -> String {
let thread = thread_key.unwrap_or_default().trim();
if thread.is_empty() {
return session_id.trim().to_string();
}
format!("{}::{}", session_id.trim(), thread)
}
fn workflow_status_label(status: &WorkflowStatus) -> &'static str {
match status {
WorkflowStatus::PreviewPending => "preview_pending",
WorkflowStatus::ApplyBlockedMissingSecrets => "apply_blocked_missing_secrets",
WorkflowStatus::ApplyBlockedAuth => "apply_blocked_auth",
WorkflowStatus::ApplyComplete => "apply_complete",
WorkflowStatus::Cancelled => "cancelled",
WorkflowStatus::Error => "error",
}
}
fn infer_surface(thread_key: Option<&str>) -> &'static str {
let key = thread_key.unwrap_or_default().to_lowercase();
if key.starts_with("telegram:") {
"telegram"
} else if key.starts_with("discord:") {
"discord"
} else if key.starts_with("slack:") {
"slack"
} else if key.starts_with("desktop:") || key.starts_with("tauri:") {
"tauri"
} else if key.starts_with("web:") || key.starts_with("control-panel:") {
"web"
} else {
"unknown"
}
}
fn build_preview_next_actions(
connector_selection_required: bool,
required_secrets: &[String],
has_connector_registration: bool,
) -> Vec<String> {
let mut actions = Vec::new();
if connector_selection_required {
actions.push("Select connector(s) before applying.".to_string());
}
if !required_secrets.is_empty() {
actions.push("Set required secrets in engine settings/environment.".to_string());
}
if has_connector_registration {
actions.push("Confirm connector registration and pack install.".to_string());
} else {
actions.push("Apply to install the generated pack.".to_string());
}
actions
}
fn secret_refs_confirmed(confirmed: &Option<Value>, required: &[String]) -> bool {
if required.is_empty() {
return true;
}
if env_has_all_required_secrets(required) {
return true;
}
let Some(value) = confirmed else {
return false;
};
if value.as_bool() == Some(true) {
return true;
}
let Some(rows) = value.as_array() else {
return false;
};
let confirmed = rows
.iter()
.filter_map(Value::as_str)
.map(|v| v.trim().to_ascii_uppercase())
.collect::<BTreeSet<_>>();
required
.iter()
.all(|item| confirmed.contains(&item.to_ascii_uppercase()))
}
fn env_has_all_required_secrets(required: &[String]) -> bool {
required.iter().all(|key| {
std::env::var(key)
.ok()
.map(|v| !v.trim().is_empty())
.unwrap_or(false)
})
}
fn build_schedule(input: Option<&PreviewScheduleInput>) -> (RoutineSchedule, String, String) {
let timezone = input
.and_then(|v| v.timezone.as_deref())
.filter(|v| !v.trim().is_empty())
.unwrap_or("UTC")
.to_string();
if let Some(cron) = input
.and_then(|v| v.cron.as_deref())
.map(str::trim)
.filter(|v| !v.is_empty())
{
return (
RoutineSchedule::Cron {
expression: cron.to_string(),
},
"cron".to_string(),
timezone,
);
}
let seconds = input
.and_then(|v| v.interval_seconds)
.unwrap_or(86_400)
.clamp(30, 31_536_000);
(
RoutineSchedule::IntervalSeconds { seconds },
format!("every_{}_seconds", seconds),
timezone,
)
}
fn build_allowed_tools(mcp_tools: &[String], needs: &[CapabilityNeed]) -> Vec<String> {
let mut out = BTreeSet::<String>::new();
for tool in mcp_tools {
out.insert(tool.clone());
}
out.insert("question".to_string());
if needs.iter().any(|n| !n.external) {
out.insert("read".to_string());
out.insert("write".to_string());
}
if needs
.iter()
.any(|n| n.id.contains("news") || n.id.contains("headline"))
{
out.insert("websearch".to_string());
out.insert("webfetch".to_string());
}
out.into_iter().collect()
}
fn render_mission_yaml(mission_id: &str, mcp_tools: &[String], needs: &[CapabilityNeed]) -> String {
let mut lines = vec![
format!("id: {}", mission_id),
"title: Generated Pack Builder Mission".to_string(),
"steps:".to_string(),
];
let mut step_idx = 1usize;
for tool in mcp_tools {
lines.push(format!(" - id: step_{}", step_idx));
lines.push(format!(" action: {}", tool));
step_idx += 1;
}
if mcp_tools.is_empty() {
lines.push(" - id: step_1".to_string());
lines.push(" action: websearch".to_string());
}
for need in needs {
lines.push(format!(" - id: verify_{}", namespace_segment(&need.id)));
lines.push(" action: question".to_string());
lines.push(" optional: true".to_string());
}
lines.join("\n") + "\n"
}
fn render_agent_md(mcp_tools: &[String], goal: &str) -> String {
let mut lines = vec![
"---".to_string(),
"name: default".to_string(),
"description: Generated MCP-first pack agent".to_string(),
"---".to_string(),
"".to_string(),
"You are the Pack Builder runtime agent for this routine.".to_string(),
format!("Mission goal: {}", goal),
"Use the mission steps exactly and invoke the discovered MCP tools explicitly.".to_string(),
"".to_string(),
"Discovered MCP tool IDs: ".to_string(),
];
if mcp_tools.is_empty() {
lines
.push("- (none discovered; fallback to built-ins is allowed for this run)".to_string());
} else {
for tool in mcp_tools {
lines.push(format!("- {}", tool));
}
}
lines.push("".to_string());
lines.push("If a required connector is missing or unauthorized, report it and stop before side effects.".to_string());
lines.join("\n") + "\n"
}