Skip to main content

tandem_server/agent_teams_parts/
part01.rs

1use tandem_orchestrator::{
2    AgentInstance, AgentInstanceStatus, AgentRole, AgentTemplate, BudgetLimit, SpawnDecision,
3    SpawnDenyCode, SpawnPolicy, SpawnRequest, SpawnSource,
4};
5use tandem_skills::SkillService;
6use tandem_types::{EngineEvent, Session};
7use tokio::fs;
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11use crate::AppState;
12
13#[derive(Clone, Default)]
14pub struct AgentTeamRuntime {
15    policy: Arc<RwLock<Option<SpawnPolicy>>>,
16    templates: Arc<RwLock<HashMap<String, AgentTemplate>>>,
17    instances: Arc<RwLock<HashMap<String, AgentInstance>>>,
18    budgets: Arc<RwLock<HashMap<String, InstanceBudgetState>>>,
19    mission_budgets: Arc<RwLock<HashMap<String, MissionBudgetState>>>,
20    spawn_approvals: Arc<RwLock<HashMap<String, PendingSpawnApproval>>>,
21    loaded_workspace: Arc<RwLock<Option<String>>>,
22    audit_path: Arc<RwLock<PathBuf>>,
23}
24
25#[derive(Debug, Clone)]
26pub struct SpawnResult {
27    pub decision: SpawnDecision,
28    pub instance: Option<AgentInstance>,
29}
30
31#[derive(Debug, Clone, Serialize)]
32pub struct AgentMissionSummary {
33    #[serde(rename = "missionID")]
34    pub mission_id: String,
35    #[serde(rename = "instanceCount")]
36    pub instance_count: usize,
37    #[serde(rename = "runningCount")]
38    pub running_count: usize,
39    #[serde(rename = "completedCount")]
40    pub completed_count: usize,
41    #[serde(rename = "failedCount")]
42    pub failed_count: usize,
43    #[serde(rename = "cancelledCount")]
44    pub cancelled_count: usize,
45    #[serde(rename = "queuedCount")]
46    pub queued_count: usize,
47    #[serde(rename = "tokenUsedTotal")]
48    pub token_used_total: u64,
49    #[serde(rename = "toolCallsUsedTotal")]
50    pub tool_calls_used_total: u64,
51    #[serde(rename = "stepsUsedTotal")]
52    pub steps_used_total: u64,
53    #[serde(rename = "costUsedUsdTotal")]
54    pub cost_used_usd_total: f64,
55}
56
57#[derive(Debug, Clone, Default)]
58struct InstanceBudgetState {
59    tokens_used: u64,
60    steps_used: u32,
61    tool_calls_used: u32,
62    cost_used_usd: f64,
63    started_at: Option<Instant>,
64    exhausted: bool,
65}
66
67#[derive(Debug, Clone, Default)]
68struct MissionBudgetState {
69    tokens_used: u64,
70    steps_used: u64,
71    tool_calls_used: u64,
72    cost_used_usd: f64,
73    exhausted: bool,
74}
75
76#[derive(Debug, Clone, Serialize)]
77pub struct PendingSpawnApproval {
78    #[serde(rename = "approvalID")]
79    pub approval_id: String,
80    #[serde(rename = "createdAtMs")]
81    pub created_at_ms: u64,
82    pub request: SpawnRequest,
83    #[serde(rename = "decisionCode")]
84    pub decision_code: Option<SpawnDenyCode>,
85    pub reason: Option<String>,
86}
87
88#[derive(Clone)]
89pub struct ServerSpawnAgentHook {
90    state: AppState,
91}
92
93#[derive(Debug, Deserialize)]
94struct SpawnAgentToolInput {
95    #[serde(rename = "missionID")]
96    mission_id: Option<String>,
97    #[serde(rename = "parentInstanceID")]
98    parent_instance_id: Option<String>,
99    #[serde(rename = "templateID")]
100    template_id: Option<String>,
101    role: AgentRole,
102    source: Option<SpawnSource>,
103    justification: String,
104    #[serde(rename = "budgetOverride", default)]
105    budget_override: Option<BudgetLimit>,
106}
107
108impl ServerSpawnAgentHook {
109    pub fn new(state: AppState) -> Self {
110        Self { state }
111    }
112}
113
114impl SpawnAgentHook for ServerSpawnAgentHook {
115    fn spawn_agent(
116        &self,
117        ctx: SpawnAgentToolContext,
118    ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>> {
119        let state = self.state.clone();
120        Box::pin(async move {
121            let parsed = serde_json::from_value::<SpawnAgentToolInput>(ctx.args.clone());
122            let input = match parsed {
123                Ok(input) => input,
124                Err(err) => {
125                    return Ok(SpawnAgentToolResult {
126                        output: format!("spawn_agent denied: invalid args ({err})"),
127                        metadata: json!({
128                            "ok": false,
129                            "code": "SPAWN_INVALID_ARGS",
130                            "error": err.to_string(),
131                        }),
132                    });
133                }
134            };
135            let req = SpawnRequest {
136                mission_id: input.mission_id,
137                parent_instance_id: input.parent_instance_id,
138                source: input.source.unwrap_or(SpawnSource::ToolCall),
139                parent_role: None,
140                role: input.role,
141                template_id: input.template_id,
142                justification: input.justification,
143                budget_override: input.budget_override,
144            };
145
146            let event_ctx = SpawnEventContext {
147                session_id: Some(ctx.session_id.as_str()),
148                message_id: Some(ctx.message_id.as_str()),
149                run_id: None,
150            };
151            emit_spawn_requested_with_context(&state, &req, &event_ctx);
152            let result = state.agent_teams.spawn(&state, req.clone()).await;
153            if !result.decision.allowed || result.instance.is_none() {
154                emit_spawn_denied_with_context(&state, &req, &result.decision, &event_ctx);
155                return Ok(SpawnAgentToolResult {
156                    output: result
157                        .decision
158                        .reason
159                        .clone()
160                        .unwrap_or_else(|| "spawn_agent denied".to_string()),
161                    metadata: json!({
162                        "ok": false,
163                        "code": result.decision.code,
164                        "error": result.decision.reason,
165                        "requiresUserApproval": result.decision.requires_user_approval,
166                    }),
167                });
168            }
169            let instance = result.instance.expect("checked is_some");
170            emit_spawn_approved_with_context(&state, &req, &instance, &event_ctx);
171            Ok(SpawnAgentToolResult {
172                output: format!(
173                    "spawned {} as instance {} (session {})",
174                    instance.template_id, instance.instance_id, instance.session_id
175                ),
176                metadata: json!({
177                    "ok": true,
178                    "missionID": instance.mission_id,
179                    "instanceID": instance.instance_id,
180                    "sessionID": instance.session_id,
181                    "runID": instance.run_id,
182                    "status": instance.status,
183                    "skillHash": instance.skill_hash,
184                    "workspaceRoot": instance_workspace_root(&instance),
185                    "workspaceRepoRoot": instance_workspace_repo_root(&instance),
186                    "managedWorktree": instance_managed_worktree(&instance),
187                }),
188            })
189        })
190    }
191}
192
193#[derive(Clone)]
194pub struct ServerToolPolicyHook {
195    state: AppState,
196}
197
198impl ServerToolPolicyHook {
199    pub fn new(state: AppState) -> Self {
200        Self { state }
201    }
202}
203
204fn automation_tool_target_paths(tool: &str, args: &Value) -> Vec<String> {
205    let mut paths = Vec::new();
206    match tool {
207        "write" | "edit" => {
208            if let Some(path) = args.get("path").and_then(Value::as_str) {
209                let trimmed = path.trim();
210                if !trimmed.is_empty() {
211                    paths.push(trimmed.to_string());
212                }
213            }
214        }
215        "apply_patch" => {
216            let patch = args
217                .get("patchText")
218                .and_then(Value::as_str)
219                .or_else(|| args.as_str());
220            if let Some(patch) = patch {
221                for line in patch.lines() {
222                    for prefix in [
223                        "*** Update File: ",
224                        "*** Delete File: ",
225                        "*** Add File: ",
226                        "*** Move to: ",
227                    ] {
228                        if let Some(path) = line.strip_prefix(prefix) {
229                            let trimmed = path.trim();
230                            if !trimmed.is_empty() {
231                                paths.push(trimmed.to_string());
232                            }
233                        }
234                    }
235                }
236            }
237        }
238        _ => {}
239    }
240    paths.sort();
241    paths.dedup();
242    paths
243}
244
245fn automation_path_references_read_only_source_of_truth(
246    path: &str,
247    read_only_names: &std::collections::HashSet<String>,
248    workspace_root: Option<&str>,
249) -> bool {
250    let trimmed = path.trim().trim_matches('`');
251    if trimmed.is_empty() {
252        return false;
253    }
254    let lowered = trimmed.to_ascii_lowercase();
255    if read_only_names.contains(&lowered) {
256        return true;
257    }
258    if let Some(filename) = std::path::Path::new(trimmed)
259        .file_name()
260        .and_then(|value| value.to_str())
261    {
262        if read_only_names.contains(&filename.to_ascii_lowercase()) {
263            return true;
264        }
265    }
266    workspace_root
267        .and_then(|root| {
268            crate::app::state::automation::normalize_workspace_display_path(root, trimmed)
269        })
270        .is_some_and(|normalized| {
271            let normalized_lower = normalized.to_ascii_lowercase();
272            if read_only_names.contains(&normalized_lower) {
273                return true;
274            }
275            std::path::Path::new(&normalized)
276                .file_name()
277                .and_then(|value| value.to_str())
278                .is_some_and(|filename| read_only_names.contains(&filename.to_ascii_lowercase()))
279        })
280}
281
282async fn evaluate_automation_read_only_write_deny(
283    state: &AppState,
284    session_id: &str,
285    tool: &str,
286    args: &Value,
287) -> Option<String> {
288    if !matches!(tool, "write" | "edit" | "apply_patch") {
289        return None;
290    }
291    let run_id = state
292        .automation_v2_session_runs
293        .read()
294        .await
295        .get(session_id)
296        .cloned()?;
297    let run = state.get_automation_v2_run(&run_id).await?;
298    let automation = run.automation_snapshot?;
299    let read_only_names =
300        crate::app::state::automation::enforcement::automation_read_only_source_of_truth_name_variants_for_automation(
301            &automation,
302        );
303    if read_only_names.is_empty() {
304        return None;
305    }
306    let workspace_root = args
307        .get("__workspace_root")
308        .and_then(Value::as_str)
309        .or(automation.workspace_root.as_deref());
310    let blocked_paths = automation_tool_target_paths(tool, args)
311        .into_iter()
312        .filter(|path| {
313            automation_path_references_read_only_source_of_truth(
314                path,
315                &read_only_names,
316                workspace_root,
317            )
318        })
319        .collect::<Vec<_>>();
320    if blocked_paths.is_empty() {
321        None
322    } else {
323        Some(format!(
324            "write denied for automation `{}` (run `{}`): read-only source-of-truth file(s) cannot be mutated: {}",
325            automation.automation_id,
326            run_id,
327            blocked_paths.join(", ")
328        ))
329    }
330}
331
332impl ToolPolicyHook for ServerToolPolicyHook {
333    fn evaluate_tool(
334        &self,
335        ctx: ToolPolicyContext,
336    ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>> {
337        let state = self.state.clone();
338        Box::pin(async move {
339            let tool = normalize_tool_name(&ctx.tool);
340            if let Some(policy) = state.routine_session_policy(&ctx.session_id).await {
341                let allowed_patterns = policy
342                    .allowed_tools
343                    .iter()
344                    .map(|name| normalize_tool_name(name))
345                    .collect::<Vec<_>>();
346                if !policy.allowed_tools.is_empty() && !any_policy_matches(&allowed_patterns, &tool)
347                {
348                    let reason = format!(
349                        "tool `{}` is not allowed for routine `{}` (run `{}`)",
350                        tool, policy.routine_id, policy.run_id
351                    );
352                    state.event_bus.publish(EngineEvent::new(
353                        "routine.tool.denied",
354                        json!({
355                            "sessionID": ctx.session_id,
356                            "messageID": ctx.message_id,
357                            "runID": policy.run_id,
358                            "routineID": policy.routine_id,
359                            "tool": tool,
360                            "reason": reason,
361                            "timestampMs": crate::now_ms(),
362                        }),
363                    ));
364                    return Ok(ToolPolicyDecision {
365                        allowed: false,
366                        reason: Some(reason),
367                    });
368                }
369            }
370
371            if let Some(reason) =
372                evaluate_automation_read_only_write_deny(&state, &ctx.session_id, &tool, &ctx.args)
373                    .await
374            {
375                state.event_bus.publish(EngineEvent::new(
376                    "automation.read_only_write.denied",
377                    json!({
378                        "sessionID": ctx.session_id,
379                        "messageID": ctx.message_id,
380                        "tool": tool,
381                        "reason": reason,
382                        "timestampMs": crate::now_ms(),
383                    }),
384                ));
385                return Ok(ToolPolicyDecision {
386                    allowed: false,
387                    reason: Some(reason),
388                });
389            }
390
391            let Some(instance) = state
392                .agent_teams
393                .instance_for_session(&ctx.session_id)
394                .await
395            else {
396                return Ok(ToolPolicyDecision {
397                    allowed: true,
398                    reason: None,
399                });
400            };
401            let caps = instance.capabilities.clone();
402            let deny = evaluate_capability_deny(
403                &state,
404                &instance,
405                &tool,
406                &ctx.args,
407                &caps,
408                &ctx.session_id,
409                &ctx.message_id,
410            )
411            .await;
412            if let Some(reason) = deny {
413                state.event_bus.publish(EngineEvent::new(
414                    "agent_team.capability.denied",
415                    json!({
416                        "sessionID": ctx.session_id,
417                        "messageID": ctx.message_id,
418                        "runID": instance.run_id,
419                        "missionID": instance.mission_id,
420                        "instanceID": instance.instance_id,
421                        "tool": tool,
422                        "reason": reason,
423                        "timestampMs": crate::now_ms(),
424                    }),
425                ));
426                return Ok(ToolPolicyDecision {
427                    allowed: false,
428                    reason: Some(reason),
429                });
430            }
431            Ok(ToolPolicyDecision {
432                allowed: true,
433                reason: None,
434            })
435        })
436    }
437}
438
439impl AgentTeamRuntime {
440    pub fn new(audit_path: PathBuf) -> Self {
441        Self {
442            policy: Arc::new(RwLock::new(None)),
443            templates: Arc::new(RwLock::new(HashMap::new())),
444            instances: Arc::new(RwLock::new(HashMap::new())),
445            budgets: Arc::new(RwLock::new(HashMap::new())),
446            mission_budgets: Arc::new(RwLock::new(HashMap::new())),
447            spawn_approvals: Arc::new(RwLock::new(HashMap::new())),
448            loaded_workspace: Arc::new(RwLock::new(None)),
449            audit_path: Arc::new(RwLock::new(audit_path)),
450        }
451    }
452
453    pub async fn set_audit_path(&self, path: PathBuf) {
454        *self.audit_path.write().await = path;
455    }
456
457    pub async fn list_templates(&self) -> Vec<AgentTemplate> {
458        let mut rows = self
459            .templates
460            .read()
461            .await
462            .values()
463            .cloned()
464            .collect::<Vec<_>>();
465        rows.sort_by(|a, b| a.template_id.cmp(&b.template_id));
466        rows
467    }
468
469    async fn templates_dir_for_loaded_workspace(&self) -> anyhow::Result<PathBuf> {
470        let workspace = self
471            .loaded_workspace
472            .read()
473            .await
474            .clone()
475            .ok_or_else(|| anyhow::anyhow!("agent team workspace not loaded"))?;
476        Ok(PathBuf::from(workspace)
477            .join(".tandem")
478            .join("agent-team")
479            .join("templates"))
480    }
481
482    fn template_filename(template_id: &str) -> String {
483        let safe = template_id
484            .trim()
485            .chars()
486            .map(|ch| {
487                if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
488                    ch
489                } else {
490                    '_'
491                }
492            })
493            .collect::<String>();
494        let fallback = if safe.is_empty() {
495            "template".to_string()
496        } else {
497            safe
498        };
499        format!("{fallback}.yaml")
500    }
501
502    pub async fn upsert_template(
503        &self,
504        workspace_root: &str,
505        template: AgentTemplate,
506    ) -> anyhow::Result<AgentTemplate> {
507        self.ensure_loaded_for_workspace(workspace_root).await?;
508        let templates_dir = self.templates_dir_for_loaded_workspace().await?;
509        fs::create_dir_all(&templates_dir).await?;
510        let path = templates_dir.join(Self::template_filename(&template.template_id));
511        let payload = serde_yaml::to_string(&template)?;
512        fs::write(path, payload).await?;
513        self.templates
514            .write()
515            .await
516            .insert(template.template_id.clone(), template.clone());
517        Ok(template)
518    }
519
520    pub async fn delete_template(
521        &self,
522        workspace_root: &str,
523        template_id: &str,
524    ) -> anyhow::Result<bool> {
525        self.ensure_loaded_for_workspace(workspace_root).await?;
526        let templates_dir = self.templates_dir_for_loaded_workspace().await?;
527        let path = templates_dir.join(Self::template_filename(template_id));
528        let existed = self.templates.write().await.remove(template_id).is_some();
529        if path.exists() {
530            let _ = fs::remove_file(path).await;
531        }
532        Ok(existed)
533    }
534
535    pub async fn get_template_for_workspace(
536        &self,
537        workspace_root: &str,
538        template_id: &str,
539    ) -> anyhow::Result<Option<AgentTemplate>> {
540        self.ensure_loaded_for_workspace(workspace_root).await?;
541        Ok(self.templates.read().await.get(template_id).cloned())
542    }
543
544    pub async fn list_instances(
545        &self,
546        mission_id: Option<&str>,
547        parent_instance_id: Option<&str>,
548        status: Option<AgentInstanceStatus>,
549    ) -> Vec<AgentInstance> {
550        let mut rows = self
551            .instances
552            .read()
553            .await
554            .values()
555            .filter(|instance| {
556                if let Some(mission_id) = mission_id {
557                    if instance.mission_id != mission_id {
558                        return false;
559                    }
560                }
561                if let Some(parent_id) = parent_instance_id {
562                    if instance.parent_instance_id.as_deref() != Some(parent_id) {
563                        return false;
564                    }
565                }
566                if let Some(status) = &status {
567                    if &instance.status != status {
568                        return false;
569                    }
570                }
571                true
572            })
573            .cloned()
574            .collect::<Vec<_>>();
575        rows.sort_by(|a, b| a.instance_id.cmp(&b.instance_id));
576        rows
577    }
578
579    pub async fn list_mission_summaries(&self) -> Vec<AgentMissionSummary> {
580        let instances = self.instances.read().await;
581        let mut by_mission: HashMap<String, AgentMissionSummary> = HashMap::new();
582        for instance in instances.values() {
583            let row = by_mission
584                .entry(instance.mission_id.clone())
585                .or_insert_with(|| AgentMissionSummary {
586                    mission_id: instance.mission_id.clone(),
587                    instance_count: 0,
588                    running_count: 0,
589                    completed_count: 0,
590                    failed_count: 0,
591                    cancelled_count: 0,
592                    queued_count: 0,
593                    token_used_total: 0,
594                    tool_calls_used_total: 0,
595                    steps_used_total: 0,
596                    cost_used_usd_total: 0.0,
597                });
598            row.instance_count = row.instance_count.saturating_add(1);
599            match instance.status {
600                AgentInstanceStatus::Queued => {
601                    row.queued_count = row.queued_count.saturating_add(1)
602                }
603                AgentInstanceStatus::Running => {
604                    row.running_count = row.running_count.saturating_add(1)
605                }
606                AgentInstanceStatus::Completed => {
607                    row.completed_count = row.completed_count.saturating_add(1)
608                }
609                AgentInstanceStatus::Failed => {
610                    row.failed_count = row.failed_count.saturating_add(1)
611                }
612                AgentInstanceStatus::Cancelled => {
613                    row.cancelled_count = row.cancelled_count.saturating_add(1)
614                }
615            }
616            if let Some(usage) = instance
617                .metadata
618                .as_ref()
619                .and_then(|m| m.get("budgetUsage"))
620                .and_then(|u| u.as_object())
621            {
622                row.token_used_total = row.token_used_total.saturating_add(
623                    usage
624                        .get("tokensUsed")
625                        .and_then(|v| v.as_u64())
626                        .unwrap_or(0),
627                );
628                row.tool_calls_used_total = row.tool_calls_used_total.saturating_add(
629                    usage
630                        .get("toolCallsUsed")
631                        .and_then(|v| v.as_u64())
632                        .unwrap_or(0),
633                );
634                row.steps_used_total = row
635                    .steps_used_total
636                    .saturating_add(usage.get("stepsUsed").and_then(|v| v.as_u64()).unwrap_or(0));
637                row.cost_used_usd_total += usage
638                    .get("costUsedUsd")
639                    .and_then(|v| v.as_f64())
640                    .unwrap_or(0.0);
641            }
642        }
643        let mut rows = by_mission.into_values().collect::<Vec<_>>();
644        rows.sort_by(|a, b| a.mission_id.cmp(&b.mission_id));
645        rows
646    }
647
648    pub async fn instance_for_session(&self, session_id: &str) -> Option<AgentInstance> {
649        self.instances
650            .read()
651            .await
652            .values()
653            .find(|instance| instance.session_id == session_id)
654            .cloned()
655    }
656
657    pub async fn list_spawn_approvals(&self) -> Vec<PendingSpawnApproval> {
658        let mut rows = self
659            .spawn_approvals
660            .read()
661            .await
662            .values()
663            .cloned()
664            .collect::<Vec<_>>();
665        rows.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
666        rows
667    }
668
669    pub async fn ensure_loaded_for_workspace(&self, workspace_root: &str) -> anyhow::Result<()> {
670        let normalized = workspace_root.trim().to_string();
671        let already_loaded = self
672            .loaded_workspace
673            .read()
674            .await
675            .as_ref()
676            .map(|s| s == &normalized)
677            .unwrap_or(false);
678        if already_loaded {
679            return Ok(());
680        }
681
682        let root = PathBuf::from(&normalized);
683        let policy_path = root
684            .join(".tandem")
685            .join("agent-team")
686            .join("spawn-policy.yaml");
687        let templates_dir = root.join(".tandem").join("agent-team").join("templates");
688
689        let mut next_policy = None;
690        if policy_path.exists() {
691            let raw = fs::read_to_string(&policy_path)
692                .await
693                .with_context(|| format!("failed reading {}", policy_path.display()))?;
694            let parsed = serde_yaml::from_str::<SpawnPolicy>(&raw)
695                .with_context(|| format!("failed parsing {}", policy_path.display()))?;
696            next_policy = Some(parsed);
697        }
698
699        let mut next_templates = HashMap::new();
700        if templates_dir.exists() {
701            let mut entries = fs::read_dir(&templates_dir).await?;
702            while let Some(entry) = entries.next_entry().await? {
703                let path = entry.path();
704                if !path.is_file() {
705                    continue;
706                }
707                let ext = path
708                    .extension()
709                    .and_then(|v| v.to_str())
710                    .unwrap_or_default()
711                    .to_ascii_lowercase();
712                if ext != "yaml" && ext != "yml" && ext != "json" {
713                    continue;
714                }
715                let raw = fs::read_to_string(&path).await?;
716                let template = serde_yaml::from_str::<AgentTemplate>(&raw)
717                    .with_context(|| format!("failed parsing {}", path.display()))?;
718                next_templates.insert(template.template_id.clone(), template);
719            }
720        }
721
722        *self.policy.write().await = next_policy;
723        *self.templates.write().await = next_templates;
724        *self.loaded_workspace.write().await = Some(normalized);
725        Ok(())
726    }
727
728    pub async fn spawn(&self, state: &AppState, req: SpawnRequest) -> SpawnResult {
729        self.spawn_with_approval_override(state, req, false).await
730    }
731
732    async fn spawn_with_approval_override(
733        &self,
734        state: &AppState,
735        mut req: SpawnRequest,
736        approval_override: bool,
737    ) -> SpawnResult {
738        let workspace_root = state.workspace_index.snapshot().await.root;
739        if let Err(err) = self.ensure_loaded_for_workspace(&workspace_root).await {
740            return SpawnResult {
741                decision: SpawnDecision {
742                    allowed: false,
743                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
744                    reason: Some(format!("spawn policy load failed: {}", err)),
745                    requires_user_approval: false,
746                },
747                instance: None,
748            };
749        }
750
751        let Some(policy) = self.policy.read().await.clone() else {
752            return SpawnResult {
753                decision: SpawnDecision {
754                    allowed: false,
755                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
756                    reason: Some("spawn policy file missing".to_string()),
757                    requires_user_approval: false,
758                },
759                instance: None,
760            };
761        };
762
763        let template = {
764            let templates = self.templates.read().await;
765            req.template_id
766                .as_deref()
767                .and_then(|template_id| templates.get(template_id).cloned())
768        };
769        if req.template_id.is_none() {
770            if let Some(found) = self
771                .templates
772                .read()
773                .await
774                .values()
775                .find(|t| t.role == req.role)
776                .cloned()
777            {
778                req.template_id = Some(found.template_id.clone());
779            }
780        }
781        let template = if template.is_some() {
782            template
783        } else {
784            let templates = self.templates.read().await;
785            req.template_id
786                .as_deref()
787                .and_then(|id| templates.get(id).cloned())
788        };
789
790        if req.parent_role.is_none() {
791            if let Some(parent_id) = req.parent_instance_id.as_deref() {
792                let instances = self.instances.read().await;
793                req.parent_role = instances
794                    .get(parent_id)
795                    .map(|instance| instance.role.clone());
796            }
797        }
798
799        let instances = self.instances.read().await;
800        let total_agents = instances.len();
801        let running_agents = instances
802            .values()
803            .filter(|instance| instance.status == AgentInstanceStatus::Running)
804            .count();
805        drop(instances);
806
807        let mut decision = policy.evaluate(&req, total_agents, running_agents, template.as_ref());
808        if approval_override
809            && !decision.allowed
810            && decision.requires_user_approval
811            && matches!(decision.code, Some(SpawnDenyCode::SpawnRequiresApproval))
812        {
813            decision = SpawnDecision {
814                allowed: true,
815                code: None,
816                reason: None,
817                requires_user_approval: false,
818            };
819        }
820        if !decision.allowed {
821            if decision.requires_user_approval && !approval_override {
822                self.queue_spawn_approval(&req, &decision).await;
823            }
824            return SpawnResult {
825                decision,
826                instance: None,
827            };
828        }
829
830        let mission_id = req
831            .mission_id
832            .clone()
833            .unwrap_or_else(|| "mission-default".to_string());
834
835        if let Some(reason) = self
836            .mission_budget_exceeded_reason(&policy, &mission_id)
837            .await
838        {
839            return SpawnResult {
840                decision: SpawnDecision {
841                    allowed: false,
842                    code: Some(SpawnDenyCode::SpawnMissionBudgetExceeded),
843                    reason: Some(reason),
844                    requires_user_approval: false,
845                },
846                instance: None,
847            };
848        }
849
850        let template = template.unwrap_or_else(|| AgentTemplate {
851            template_id: "default-template".to_string(),
852            display_name: None,
853            avatar_url: None,
854            role: req.role.clone(),
855            system_prompt: None,
856            default_model: None,
857            skills: Vec::new(),
858            default_budget: BudgetLimit::default(),
859            capabilities: Default::default(),
860        });
861
862        let skill_hash = match compute_skill_hash(&workspace_root, &template, &policy).await {
863            Ok(hash) => hash,
864            Err(err) => {
865                let lowered = err.to_ascii_lowercase();
866                let code = if lowered.contains("pinned hash mismatch") {
867                    SpawnDenyCode::SpawnSkillHashMismatch
868                } else if lowered.contains("skill source denied") {
869                    SpawnDenyCode::SpawnSkillSourceDenied
870                } else {
871                    SpawnDenyCode::SpawnRequiredSkillMissing
872                };
873                return SpawnResult {
874                    decision: SpawnDecision {
875                        allowed: false,
876                        code: Some(code),
877                        reason: Some(err),
878                        requires_user_approval: false,
879                    },
880                    instance: None,
881                };
882            }
883        };
884
885        let parent_snapshot = {
886            let instances = self.instances.read().await;
887            req.parent_instance_id
888                .as_deref()
889                .and_then(|id| instances.get(id).cloned())
890        };
891        let parent_usage = if let Some(parent_id) = req.parent_instance_id.as_deref() {
892            self.budgets.read().await.get(parent_id).cloned()
893        } else {
894            None
895        };
896
897        let budget = resolve_budget(
898            &policy,
899            parent_snapshot,
900            parent_usage,
901            &template,
902            req.budget_override.clone(),
903            &req.role,
904        );
905
906        let instance_id = format!("ins_{}", Uuid::new_v4().simple());
907        let managed_worktree = prepare_agent_instance_workspace(
908            state,
909            &workspace_root,
910            req.mission_id.as_deref(),
911            &instance_id,
912            &template.template_id,
913        )
914        .await;
915        let workspace_repo_root = managed_worktree
916            .as_ref()
917            .map(|row| row.record.repo_root.clone())
918            .or_else(|| crate::runtime::worktrees::resolve_git_repo_root(&workspace_root))
919            .unwrap_or_else(|| workspace_root.clone());
920        let worker_workspace_root = managed_worktree
921            .as_ref()
922            .map(|row| row.record.path.clone())
923            .unwrap_or_else(|| workspace_root.clone());
924        let mut session = Session::new(
925            Some(format!("Agent Team {}", template.template_id)),
926            Some(worker_workspace_root.clone()),
927        );
928        session.workspace_root = Some(worker_workspace_root.clone());
929        let session_id = session.id.clone();
930        if let Err(err) = state.storage.save_session(session).await {
931            if let Some(worktree) = managed_worktree.as_ref() {
932                let _ = crate::runtime::worktrees::delete_managed_worktree(state, &worktree.record)
933                    .await;
934            }
935            return SpawnResult {
936                decision: SpawnDecision {
937                    allowed: false,
938                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
939                    reason: Some(format!("failed creating child session: {}", err)),
940                    requires_user_approval: false,
941                },
942                instance: None,
943            };
944        }
945
946        let instance = AgentInstance {
947            instance_id: instance_id.clone(),
948            mission_id: mission_id.clone(),
949            parent_instance_id: req.parent_instance_id.clone(),
950            role: template.role.clone(),
951            template_id: template.template_id.clone(),
952            session_id: session_id.clone(),
953            run_id: None,
954            status: AgentInstanceStatus::Running,
955            budget,
956            skill_hash: skill_hash.clone(),
957            capabilities: template.capabilities.clone(),
958            metadata: Some(json!({
959                "source": req.source,
960                "justification": req.justification,
961                "workspaceRoot": worker_workspace_root,
962                "workspaceRepoRoot": workspace_repo_root,
963                "managedWorktree": managed_worktree.as_ref().map(|row| json!({
964                    "path": row.record.path,
965                    "branch": row.record.branch,
966                    "repoRoot": row.record.repo_root,
967                    "cleanupBranch": row.record.cleanup_branch,
968                    "reused": row.reused,
969                })).unwrap_or(Value::Null),
970            })),
971        };
972
973        self.instances
974            .write()
975            .await
976            .insert(instance.instance_id.clone(), instance.clone());
977        self.budgets.write().await.insert(
978            instance.instance_id.clone(),
979            InstanceBudgetState {
980                started_at: Some(Instant::now()),
981                ..InstanceBudgetState::default()
982            },
983        );
984        let _ = self.append_audit("spawn.approved", &instance).await;
985
986        SpawnResult {
987            decision: SpawnDecision {
988                allowed: true,
989                code: None,
990                reason: None,
991                requires_user_approval: false,
992            },
993            instance: Some(instance),
994        }
995    }
996
997    pub async fn approve_spawn_approval(
998        &self,
999        state: &AppState,
1000        approval_id: &str,
1001        reason: Option<&str>,
1002    ) -> Option<SpawnResult> {
1003        let approval = self.spawn_approvals.write().await.remove(approval_id)?;
1004        let result = self
1005            .spawn_with_approval_override(state, approval.request.clone(), true)
1006            .await;
1007        if let Some(instance) = result.instance.as_ref() {
1008            let note = reason.unwrap_or("approved by operator");
1009            let _ = self
1010                .append_approval_audit("spawn.approval.approved", approval_id, Some(instance), note)
1011                .await;
1012        } else {
1013            let note = reason.unwrap_or("approval replay failed policy checks");
1014            let _ = self
1015                .append_approval_audit("spawn.approval.rejected_on_replay", approval_id, None, note)
1016                .await;
1017        }
1018        Some(result)
1019    }
1020
1021    pub async fn deny_spawn_approval(
1022        &self,
1023        approval_id: &str,
1024        reason: Option<&str>,
1025    ) -> Option<PendingSpawnApproval> {
1026        let approval = self.spawn_approvals.write().await.remove(approval_id)?;
1027        let note = reason.unwrap_or("denied by operator");
1028        let _ = self
1029            .append_approval_audit("spawn.approval.denied", approval_id, None, note)
1030            .await;
1031        Some(approval)
1032    }
1033
1034    pub async fn cancel_instance(
1035        &self,
1036        state: &AppState,
1037        instance_id: &str,
1038        reason: &str,
1039    ) -> Option<AgentInstance> {
1040        let mut instances = self.instances.write().await;
1041        let instance = instances.get_mut(instance_id)?;
1042        if matches!(
1043            instance.status,
1044            AgentInstanceStatus::Completed
1045                | AgentInstanceStatus::Failed
1046                | AgentInstanceStatus::Cancelled
1047        ) {
1048            return Some(instance.clone());
1049        }
1050        instance.status = AgentInstanceStatus::Cancelled;
1051        let snapshot = instance.clone();
1052        drop(instances);
1053        let _ = state.cancellations.cancel(&snapshot.session_id).await;
1054        cleanup_instance_managed_worktree(state, &snapshot).await;
1055        let _ = self.append_audit("instance.cancelled", &snapshot).await;
1056        emit_instance_cancelled(state, &snapshot, reason);
1057        Some(snapshot)
1058    }
1059
1060    async fn queue_spawn_approval(&self, req: &SpawnRequest, decision: &SpawnDecision) {
1061        let approval = PendingSpawnApproval {
1062            approval_id: format!("spawn_{}", Uuid::new_v4().simple()),
1063            created_at_ms: crate::now_ms(),
1064            request: req.clone(),
1065            decision_code: decision.code.clone(),
1066            reason: decision.reason.clone(),
1067        };
1068        self.spawn_approvals
1069            .write()
1070            .await
1071            .insert(approval.approval_id.clone(), approval);
1072    }
1073
1074    async fn mission_budget_exceeded_reason(
1075        &self,
1076        policy: &SpawnPolicy,
1077        mission_id: &str,
1078    ) -> Option<String> {
1079        let Some(limit) = policy.mission_total_budget.as_ref() else {
1080            return None;
1081        };
1082        let usage = self
1083            .mission_budgets
1084            .read()
1085            .await
1086            .get(mission_id)
1087            .cloned()
1088            .unwrap_or_default();
1089        if let Some(max) = limit.max_tokens {
1090            if usage.tokens_used >= max {
1091                return Some(format!(
1092                    "mission max_tokens exhausted ({}/{})",
1093                    usage.tokens_used, max
1094                ));
1095            }
1096        }
1097        if let Some(max) = limit.max_steps {
1098            if usage.steps_used >= u64::from(max) {
1099                return Some(format!(
1100                    "mission max_steps exhausted ({}/{})",
1101                    usage.steps_used, max
1102                ));
1103            }
1104        }
1105        if let Some(max) = limit.max_tool_calls {
1106            if usage.tool_calls_used >= u64::from(max) {
1107                return Some(format!(
1108                    "mission max_tool_calls exhausted ({}/{})",
1109                    usage.tool_calls_used, max
1110                ));
1111            }
1112        }
1113        if let Some(max) = limit.max_cost_usd {
1114            if usage.cost_used_usd >= max {
1115                return Some(format!(
1116                    "mission max_cost_usd exhausted ({:.6}/{:.6})",
1117                    usage.cost_used_usd, max
1118                ));
1119            }
1120        }
1121        None
1122    }
1123
1124    pub async fn cancel_mission(&self, state: &AppState, mission_id: &str, reason: &str) -> usize {
1125        let instance_ids = self
1126            .instances
1127            .read()
1128            .await
1129            .values()
1130            .filter(|instance| instance.mission_id == mission_id)
1131            .map(|instance| instance.instance_id.clone())
1132            .collect::<Vec<_>>();
1133        let mut count = 0usize;
1134        for instance_id in instance_ids {
1135            if self
1136                .cancel_instance(state, &instance_id, reason)
1137                .await
1138                .is_some()
1139            {
1140                count = count.saturating_add(1);
1141            }
1142        }
1143        count
1144    }
1145
1146    async fn mark_instance_terminal(
1147        &self,
1148        state: &AppState,
1149        instance_id: &str,
1150        status: AgentInstanceStatus,
1151    ) -> Option<AgentInstance> {
1152        let mut instances = self.instances.write().await;
1153        let instance = instances.get_mut(instance_id)?;
1154        if matches!(
1155            instance.status,
1156            AgentInstanceStatus::Completed
1157                | AgentInstanceStatus::Failed
1158                | AgentInstanceStatus::Cancelled
1159        ) {
1160            return Some(instance.clone());
1161        }
1162        instance.status = status.clone();
1163        let snapshot = instance.clone();
1164        drop(instances);
1165        cleanup_instance_managed_worktree(state, &snapshot).await;
1166        match status {
1167            AgentInstanceStatus::Completed => emit_instance_completed(state, &snapshot),
1168            AgentInstanceStatus::Failed => emit_instance_failed(state, &snapshot),
1169            _ => {}
1170        }
1171        Some(snapshot)
1172    }
1173
1174    pub async fn handle_engine_event(&self, state: &AppState, event: &EngineEvent) {
1175        let Some(session_id) = extract_session_id(event) else {
1176            return;
1177        };
1178        let Some(instance_id) = self.instance_id_for_session(&session_id).await else {
1179            return;
1180        };
1181        if event.event_type == "provider.usage" {
1182            let total_tokens = event
1183                .properties
1184                .get("totalTokens")
1185                .and_then(|v| v.as_u64())
1186                .unwrap_or(0);
1187            let cost_used_usd = event
1188                .properties
1189                .get("costUsd")
1190                .and_then(|v| v.as_f64())
1191                .unwrap_or(0.0);
1192            if total_tokens > 0 {
1193                let exhausted = self
1194                    .apply_exact_token_usage(state, &instance_id, total_tokens, cost_used_usd)
1195                    .await;
1196                if exhausted {
1197                    let _ = self
1198                        .cancel_instance(state, &instance_id, "budget exhausted")
1199                        .await;
1200                }
1201            }
1202            return;
1203        }
1204        let mut delta_tokens = 0u64;
1205        let mut delta_steps = 0u32;
1206        let mut delta_tool_calls = 0u32;
1207        if event.event_type == "message.part.updated" {
1208            if let Some(part) = event.properties.get("part") {
1209                let part_type = part.get("type").and_then(|v| v.as_str()).unwrap_or("");
1210                if part_type == "tool-invocation" {
1211                    delta_tool_calls = 1;
1212                } else if part_type == "text" {
1213                    let delta = event
1214                        .properties
1215                        .get("delta")
1216                        .and_then(|v| v.as_str())
1217                        .unwrap_or("");
1218                    if !delta.is_empty() {
1219                        delta_tokens = estimate_tokens(delta);
1220                    }
1221                }
1222            }
1223        } else if event.event_type == "session.run.finished" {
1224            delta_steps = 1;
1225            let run_status = event
1226                .properties
1227                .get("status")
1228                .and_then(|v| v.as_str())
1229                .unwrap_or("")
1230                .to_ascii_lowercase();
1231            if run_status == "completed" {
1232                let _ = self
1233                    .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Completed)
1234                    .await;
1235            } else if run_status == "failed" || run_status == "error" {
1236                let _ = self
1237                    .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Failed)
1238                    .await;
1239            }
1240        }
1241        if delta_tokens == 0 && delta_steps == 0 && delta_tool_calls == 0 {
1242            return;
1243        }
1244        let exhausted = self
1245            .apply_budget_delta(
1246                state,
1247                &instance_id,
1248                delta_tokens,
1249                delta_steps,
1250                delta_tool_calls,
1251            )
1252            .await;
1253        if exhausted {
1254            let _ = self
1255                .cancel_instance(state, &instance_id, "budget exhausted")
1256                .await;
1257        }
1258    }
1259
1260    async fn instance_id_for_session(&self, session_id: &str) -> Option<String> {
1261        self.instances
1262            .read()
1263            .await
1264            .values()
1265            .find(|instance| instance.session_id == session_id)
1266            .map(|instance| instance.instance_id.clone())
1267    }
1268
1269    async fn apply_budget_delta(
1270        &self,
1271        state: &AppState,
1272        instance_id: &str,
1273        delta_tokens: u64,
1274        delta_steps: u32,
1275        delta_tool_calls: u32,
1276    ) -> bool {
1277        let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1278            enabled: false,
1279            require_justification: false,
1280            max_agents: None,
1281            max_concurrent: None,
1282            child_budget_percent_of_parent_remaining: None,
1283            mission_total_budget: None,
1284            cost_per_1k_tokens_usd: None,
1285            spawn_edges: HashMap::new(),
1286            required_skills: HashMap::new(),
1287            role_defaults: HashMap::new(),
1288            skill_sources: Default::default(),
1289        });
1290        let mut budgets = self.budgets.write().await;
1291        let Some(usage) = budgets.get_mut(instance_id) else {
1292            return false;
1293        };
1294        if usage.exhausted {
1295            return true;
1296        }
1297        let prev_cost_used_usd = usage.cost_used_usd;
1298        usage.tokens_used = usage.tokens_used.saturating_add(delta_tokens);
1299        usage.steps_used = usage.steps_used.saturating_add(delta_steps);
1300        usage.tool_calls_used = usage.tool_calls_used.saturating_add(delta_tool_calls);
1301        if let Some(rate) = policy.cost_per_1k_tokens_usd {
1302            usage.cost_used_usd += (delta_tokens as f64 / 1000.0) * rate;
1303        }
1304        let elapsed_ms = usage
1305            .started_at
1306            .map(|started| started.elapsed().as_millis() as u64)
1307            .unwrap_or(0);
1308
1309        let mut exhausted_reason: Option<&'static str> = None;
1310        let mut snapshot: Option<AgentInstance> = None;
1311        {
1312            let mut instances = self.instances.write().await;
1313            if let Some(instance) = instances.get_mut(instance_id) {
1314                instance.metadata = Some(merge_metadata_usage(
1315                    instance.metadata.take(),
1316                    usage.tokens_used,
1317                    usage.steps_used,
1318                    usage.tool_calls_used,
1319                    usage.cost_used_usd,
1320                    elapsed_ms,
1321                ));
1322                if let Some(limit) = instance.budget.max_tokens {
1323                    if usage.tokens_used >= limit {
1324                        exhausted_reason = Some("max_tokens");
1325                    }
1326                }
1327                if exhausted_reason.is_none() {
1328                    if let Some(limit) = instance.budget.max_steps {
1329                        if usage.steps_used >= limit {
1330                            exhausted_reason = Some("max_steps");
1331                        }
1332                    }
1333                }
1334                if exhausted_reason.is_none() {
1335                    if let Some(limit) = instance.budget.max_tool_calls {
1336                        if usage.tool_calls_used >= limit {
1337                            exhausted_reason = Some("max_tool_calls");
1338                        }
1339                    }
1340                }
1341                if exhausted_reason.is_none() {
1342                    if let Some(limit) = instance.budget.max_duration_ms {
1343                        if elapsed_ms >= limit {
1344                            exhausted_reason = Some("max_duration_ms");
1345                        }
1346                    }
1347                }
1348                if exhausted_reason.is_none() {
1349                    if let Some(limit) = instance.budget.max_cost_usd {
1350                        if usage.cost_used_usd >= limit {
1351                            exhausted_reason = Some("max_cost_usd");
1352                        }
1353                    }
1354                }
1355                snapshot = Some(instance.clone());
1356            }
1357        }
1358        let Some(instance) = snapshot else {
1359            return false;
1360        };
1361        emit_budget_usage(
1362            state,
1363            &instance,
1364            usage.tokens_used,
1365            usage.steps_used,
1366            usage.tool_calls_used,
1367            usage.cost_used_usd,
1368            elapsed_ms,
1369        );
1370        let mission_exhausted = self
1371            .apply_mission_budget_delta(
1372                state,
1373                &instance.mission_id,
1374                delta_tokens,
1375                u64::from(delta_steps),
1376                u64::from(delta_tool_calls),
1377                usage.cost_used_usd - prev_cost_used_usd,
1378                &policy,
1379            )
1380            .await;
1381        if mission_exhausted {
1382            usage.exhausted = true;
1383            let _ = self
1384                .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1385                .await;
1386            return true;
1387        }
1388        if let Some(reason) = exhausted_reason {
1389            usage.exhausted = true;
1390            emit_budget_exhausted(
1391                state,
1392                &instance,
1393                reason,
1394                usage.tokens_used,
1395                usage.steps_used,
1396                usage.tool_calls_used,
1397                usage.cost_used_usd,
1398                elapsed_ms,
1399            );
1400            return true;
1401        }
1402        false
1403    }
1404
1405    async fn apply_exact_token_usage(
1406        &self,
1407        state: &AppState,
1408        instance_id: &str,
1409        total_tokens: u64,
1410        cost_used_usd: f64,
1411    ) -> bool {
1412        let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1413            enabled: false,
1414            require_justification: false,
1415            max_agents: None,
1416            max_concurrent: None,
1417            child_budget_percent_of_parent_remaining: None,
1418            mission_total_budget: None,
1419            cost_per_1k_tokens_usd: None,
1420            spawn_edges: HashMap::new(),
1421            required_skills: HashMap::new(),
1422            role_defaults: HashMap::new(),
1423            skill_sources: Default::default(),
1424        });
1425        let mut budgets = self.budgets.write().await;
1426        let Some(usage) = budgets.get_mut(instance_id) else {
1427            return false;
1428        };
1429        if usage.exhausted {
1430            return true;
1431        }
1432        let prev_tokens = usage.tokens_used;
1433        let prev_cost_used_usd = usage.cost_used_usd;
1434        usage.tokens_used = usage.tokens_used.max(total_tokens);
1435        if cost_used_usd > 0.0 {
1436            usage.cost_used_usd = usage.cost_used_usd.max(cost_used_usd);
1437        } else if let Some(rate) = policy.cost_per_1k_tokens_usd {
1438            let delta = usage.tokens_used.saturating_sub(prev_tokens);
1439            usage.cost_used_usd += (delta as f64 / 1000.0) * rate;
1440        }
1441        let elapsed_ms = usage
1442            .started_at
1443            .map(|started| started.elapsed().as_millis() as u64)
1444            .unwrap_or(0);
1445        let mut exhausted_reason: Option<&'static str> = None;
1446        let mut snapshot: Option<AgentInstance> = None;
1447        {
1448            let mut instances = self.instances.write().await;
1449            if let Some(instance) = instances.get_mut(instance_id) {
1450                instance.metadata = Some(merge_metadata_usage(
1451                    instance.metadata.take(),
1452                    usage.tokens_used,
1453                    usage.steps_used,
1454                    usage.tool_calls_used,
1455                    usage.cost_used_usd,
1456                    elapsed_ms,
1457                ));
1458                if let Some(limit) = instance.budget.max_tokens {
1459                    if usage.tokens_used >= limit {
1460                        exhausted_reason = Some("max_tokens");
1461                    }
1462                }
1463                if exhausted_reason.is_none() {
1464                    if let Some(limit) = instance.budget.max_cost_usd {
1465                        if usage.cost_used_usd >= limit {
1466                            exhausted_reason = Some("max_cost_usd");
1467                        }
1468                    }
1469                }
1470                snapshot = Some(instance.clone());
1471            }
1472        }
1473        let Some(instance) = snapshot else {
1474            return false;
1475        };
1476        emit_budget_usage(
1477            state,
1478            &instance,
1479            usage.tokens_used,
1480            usage.steps_used,
1481            usage.tool_calls_used,
1482            usage.cost_used_usd,
1483            elapsed_ms,
1484        );
1485        let mission_exhausted = self
1486            .apply_mission_budget_delta(
1487                state,
1488                &instance.mission_id,
1489                usage.tokens_used.saturating_sub(prev_tokens),
1490                0,
1491                0,
1492                usage.cost_used_usd - prev_cost_used_usd,
1493                &policy,
1494            )
1495            .await;
1496        if mission_exhausted {
1497            usage.exhausted = true;
1498            let _ = self
1499                .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1500                .await;
1501            return true;
1502        }
1503        if let Some(reason) = exhausted_reason {
1504            usage.exhausted = true;
1505            emit_budget_exhausted(
1506                state,
1507                &instance,
1508                reason,
1509                usage.tokens_used,
1510                usage.steps_used,
1511                usage.tool_calls_used,
1512                usage.cost_used_usd,
1513                elapsed_ms,
1514            );
1515            return true;
1516        }
1517        false
1518    }
1519
1520    async fn append_audit(&self, action: &str, instance: &AgentInstance) -> anyhow::Result<()> {
1521        let path = self.audit_path.read().await.clone();
1522        if let Some(parent) = path.parent() {
1523            fs::create_dir_all(parent).await?;
1524        }
1525        let row = json!({
1526            "action": action,
1527            "missionID": instance.mission_id,
1528            "instanceID": instance.instance_id,
1529            "parentInstanceID": instance.parent_instance_id,
1530            "role": instance.role,
1531            "templateID": instance.template_id,
1532            "sessionID": instance.session_id,
1533            "skillHash": instance.skill_hash,
1534            "workspaceRoot": instance_workspace_root(instance),
1535            "workspaceRepoRoot": instance_workspace_repo_root(instance),
1536            "managedWorktree": instance_managed_worktree(instance),
1537            "timestampMs": crate::now_ms(),
1538        });
1539        let mut existing = if path.exists() {
1540            fs::read_to_string(&path).await.unwrap_or_default()
1541        } else {
1542            String::new()
1543        };
1544        existing.push_str(&serde_json::to_string(&row)?);
1545        existing.push('\n');
1546        fs::write(path, existing).await?;
1547        Ok(())
1548    }
1549
1550    async fn append_approval_audit(
1551        &self,
1552        action: &str,
1553        approval_id: &str,
1554        instance: Option<&AgentInstance>,
1555        reason: &str,
1556    ) -> anyhow::Result<()> {
1557        let path = self.audit_path.read().await.clone();
1558        if let Some(parent) = path.parent() {
1559            fs::create_dir_all(parent).await?;
1560        }
1561        let row = json!({
1562            "action": action,
1563            "approvalID": approval_id,
1564            "reason": reason,
1565            "missionID": instance.map(|v| v.mission_id.clone()),
1566            "instanceID": instance.map(|v| v.instance_id.clone()),
1567            "parentInstanceID": instance.and_then(|v| v.parent_instance_id.clone()),
1568            "role": instance.map(|v| v.role.clone()),
1569            "templateID": instance.map(|v| v.template_id.clone()),
1570            "sessionID": instance.map(|v| v.session_id.clone()),
1571            "skillHash": instance.map(|v| v.skill_hash.clone()),
1572            "workspaceRoot": instance.and_then(instance_workspace_root),
1573            "workspaceRepoRoot": instance.and_then(instance_workspace_repo_root),
1574            "managedWorktree": instance.and_then(instance_managed_worktree),
1575            "timestampMs": crate::now_ms(),
1576        });
1577        let mut existing = if path.exists() {
1578            fs::read_to_string(&path).await.unwrap_or_default()
1579        } else {
1580            String::new()
1581        };
1582        existing.push_str(&serde_json::to_string(&row)?);
1583        existing.push('\n');
1584        fs::write(path, existing).await?;
1585        Ok(())
1586    }
1587
1588    async fn apply_mission_budget_delta(
1589        &self,
1590        state: &AppState,
1591        mission_id: &str,
1592        delta_tokens: u64,
1593        delta_steps: u64,
1594        delta_tool_calls: u64,
1595        delta_cost_used_usd: f64,
1596        policy: &SpawnPolicy,
1597    ) -> bool {
1598        let mut budgets = self.mission_budgets.write().await;
1599        let row = budgets.entry(mission_id.to_string()).or_default();
1600        row.tokens_used = row.tokens_used.saturating_add(delta_tokens);
1601        row.steps_used = row.steps_used.saturating_add(delta_steps);
1602        row.tool_calls_used = row.tool_calls_used.saturating_add(delta_tool_calls);
1603        row.cost_used_usd += delta_cost_used_usd.max(0.0);
1604        if row.exhausted {
1605            return true;
1606        }
1607        let Some(limit) = policy.mission_total_budget.as_ref() else {
1608            return false;
1609        };
1610        let mut exhausted_by: Option<&'static str> = None;
1611        if let Some(max) = limit.max_tokens {
1612            if row.tokens_used >= max {
1613                exhausted_by = Some("mission_max_tokens");
1614            }
1615        }
1616        if exhausted_by.is_none() {
1617            if let Some(max) = limit.max_steps {
1618                if row.steps_used >= u64::from(max) {
1619                    exhausted_by = Some("mission_max_steps");
1620                }
1621            }
1622        }
1623        if exhausted_by.is_none() {
1624            if let Some(max) = limit.max_tool_calls {
1625                if row.tool_calls_used >= u64::from(max) {
1626                    exhausted_by = Some("mission_max_tool_calls");
1627                }
1628            }
1629        }
1630        if exhausted_by.is_none() {
1631            if let Some(max) = limit.max_cost_usd {
1632                if row.cost_used_usd >= max {
1633                    exhausted_by = Some("mission_max_cost_usd");
1634                }
1635            }
1636        }
1637        if let Some(exhausted_by) = exhausted_by {
1638            row.exhausted = true;
1639            emit_mission_budget_exhausted(
1640                state,
1641                mission_id,
1642                exhausted_by,
1643                row.tokens_used,
1644                row.steps_used,
1645                row.tool_calls_used,
1646                row.cost_used_usd,
1647            );
1648            return true;
1649        }
1650        false
1651    }
1652
1653    pub async fn set_for_test(
1654        &self,
1655        workspace_root: Option<String>,
1656        policy: Option<SpawnPolicy>,
1657        templates: Vec<AgentTemplate>,
1658    ) {
1659        *self.policy.write().await = policy;
1660        let mut by_id = HashMap::new();
1661        for template in templates {
1662            by_id.insert(template.template_id.clone(), template);
1663        }
1664        *self.templates.write().await = by_id;
1665        self.instances.write().await.clear();
1666        self.budgets.write().await.clear();
1667        self.mission_budgets.write().await.clear();
1668        self.spawn_approvals.write().await.clear();
1669        *self.loaded_workspace.write().await = workspace_root;
1670    }
1671}
1672
1673fn resolve_budget(
1674    policy: &SpawnPolicy,
1675    parent_instance: Option<AgentInstance>,
1676    parent_usage: Option<InstanceBudgetState>,
1677    template: &AgentTemplate,
1678    override_budget: Option<BudgetLimit>,
1679    role: &AgentRole,
1680) -> BudgetLimit {
1681    let role_default = policy.role_defaults.get(role).cloned().unwrap_or_default();
1682    let mut chosen = merge_budget(
1683        merge_budget(role_default, template.default_budget.clone()),
1684        override_budget.unwrap_or_default(),
1685    );
1686
1687    if let Some(parent) = parent_instance {
1688        let usage = parent_usage.unwrap_or_default();
1689        if let Some(pct) = policy.child_budget_percent_of_parent_remaining {
1690            if pct > 0 {
1691                chosen.max_tokens = cap_budget_remaining_u64(
1692                    chosen.max_tokens,
1693                    parent.budget.max_tokens,
1694                    usage.tokens_used,
1695                    pct,
1696                );
1697                chosen.max_steps = cap_budget_remaining_u32(
1698                    chosen.max_steps,
1699                    parent.budget.max_steps,
1700                    usage.steps_used,
1701                    pct,
1702                );
1703                chosen.max_tool_calls = cap_budget_remaining_u32(
1704                    chosen.max_tool_calls,
1705                    parent.budget.max_tool_calls,
1706                    usage.tool_calls_used,
1707                    pct,
1708                );
1709                chosen.max_duration_ms = cap_budget_remaining_u64(
1710                    chosen.max_duration_ms,
1711                    parent.budget.max_duration_ms,
1712                    usage
1713                        .started_at
1714                        .map(|started| started.elapsed().as_millis() as u64)
1715                        .unwrap_or(0),
1716                    pct,
1717                );
1718                chosen.max_cost_usd = cap_budget_remaining_f64(
1719                    chosen.max_cost_usd,
1720                    parent.budget.max_cost_usd,
1721                    usage.cost_used_usd,
1722                    pct,
1723                );
1724            }
1725        }
1726    }
1727    chosen
1728}
1729
1730fn merge_budget(base: BudgetLimit, overlay: BudgetLimit) -> BudgetLimit {
1731    BudgetLimit {
1732        max_tokens: overlay.max_tokens.or(base.max_tokens),
1733        max_steps: overlay.max_steps.or(base.max_steps),
1734        max_tool_calls: overlay.max_tool_calls.or(base.max_tool_calls),
1735        max_duration_ms: overlay.max_duration_ms.or(base.max_duration_ms),
1736        max_cost_usd: overlay.max_cost_usd.or(base.max_cost_usd),
1737    }
1738}
1739
1740fn cap_budget_remaining_u64(
1741    child: Option<u64>,
1742    parent_limit: Option<u64>,
1743    parent_used: u64,
1744    pct: u8,
1745) -> Option<u64> {
1746    match (child, parent_limit) {
1747        (Some(child), Some(parent_limit)) => {
1748            let remaining = parent_limit.saturating_sub(parent_used);
1749            Some(child.min(remaining.saturating_mul(pct as u64) / 100))
1750        }
1751        (None, Some(parent_limit)) => {
1752            let remaining = parent_limit.saturating_sub(parent_used);
1753            Some(remaining.saturating_mul(pct as u64) / 100)
1754        }
1755        (Some(child), None) => Some(child),
1756        (None, None) => None,
1757    }
1758}
1759
1760fn cap_budget_remaining_u32(
1761    child: Option<u32>,
1762    parent_limit: Option<u32>,
1763    parent_used: u32,
1764    pct: u8,
1765) -> Option<u32> {
1766    match (child, parent_limit) {
1767        (Some(child), Some(parent_limit)) => {
1768            let remaining = parent_limit.saturating_sub(parent_used);
1769            Some(child.min(remaining.saturating_mul(pct as u32) / 100))
1770        }
1771        (None, Some(parent_limit)) => {
1772            let remaining = parent_limit.saturating_sub(parent_used);
1773            Some(remaining.saturating_mul(pct as u32) / 100)
1774        }
1775        (Some(child), None) => Some(child),
1776        (None, None) => None,
1777    }
1778}
1779
1780fn cap_budget_remaining_f64(
1781    child: Option<f64>,
1782    parent_limit: Option<f64>,
1783    parent_used: f64,
1784    pct: u8,
1785) -> Option<f64> {
1786    match (child, parent_limit) {
1787        (Some(child), Some(parent_limit)) => {
1788            let remaining = (parent_limit - parent_used).max(0.0);
1789            Some(child.min(remaining * f64::from(pct) / 100.0))
1790        }
1791        (None, Some(parent_limit)) => {
1792            let remaining = (parent_limit - parent_used).max(0.0);
1793            Some(remaining * f64::from(pct) / 100.0)
1794        }
1795        (Some(child), None) => Some(child),
1796        (None, None) => None,
1797    }
1798}
1799
1800async fn compute_skill_hash(
1801    workspace_root: &str,
1802    template: &AgentTemplate,
1803    policy: &SpawnPolicy,
1804) -> Result<String, String> {
1805    use sha2::{Digest, Sha256};
1806    let mut rows = Vec::new();
1807    let skill_service = SkillService::for_workspace(Some(PathBuf::from(workspace_root)));
1808    for skill in &template.skills {
1809        if let Some(path) = skill.path.as_deref() {
1810            validate_skill_source(skill.id.as_deref(), Some(path), policy)?;
1811            let skill_path = Path::new(workspace_root).join(path);
1812            let raw = fs::read_to_string(&skill_path)
1813                .await
1814                .map_err(|_| format!("missing required skill path `{}`", skill_path.display()))?;
1815            let digest = hash_hex(raw.as_bytes());
1816            validate_pinned_hash(skill.id.as_deref(), Some(path), &digest, policy)?;
1817            rows.push(format!("path:{}:{}", path, digest));
1818        } else if let Some(id) = skill.id.as_deref() {
1819            validate_skill_source(Some(id), None, policy)?;
1820            let loaded = skill_service
1821                .load_skill(id)
1822                .map_err(|err| format!("failed loading skill `{id}`: {err}"))?;
1823            let Some(loaded) = loaded else {
1824                return Err(format!("missing required skill id `{id}`"));
1825            };
1826            let digest = hash_hex(loaded.content.as_bytes());
1827            validate_pinned_hash(Some(id), None, &digest, policy)?;
1828            rows.push(format!("id:{}:{}", id, digest));
1829        }
1830    }
1831    rows.sort();
1832    let mut hasher = Sha256::new();
1833    for row in rows {
1834        hasher.update(row.as_bytes());
1835        hasher.update(b"\n");
1836    }
1837    let digest = hasher.finalize();
1838    Ok(format!("sha256:{}", hash_hex(digest.as_slice())))
1839}
1840
1841fn validate_skill_source(
1842    id: Option<&str>,
1843    path: Option<&str>,
1844    policy: &SpawnPolicy,
1845) -> Result<(), String> {
1846    use tandem_orchestrator::SkillSourceMode;
1847    match policy.skill_sources.mode {
1848        SkillSourceMode::Any => Ok(()),
1849        SkillSourceMode::ProjectOnly => {
1850            if id.is_some() {
1851                return Err("skill source denied: project_only forbids skill IDs".to_string());
1852            }
1853            let Some(path) = path else {
1854                return Err("skill source denied: project_only requires skill path".to_string());
1855            };
1856            let p = PathBuf::from(path);
1857            if p.is_absolute() {
1858                return Err("skill source denied: absolute skill paths are forbidden".to_string());
1859            }
1860            Ok(())
1861        }
1862        SkillSourceMode::Allowlist => {
1863            if let Some(id) = id {
1864                if policy.skill_sources.allowlist_ids.iter().any(|v| v == id) {
1865                    return Ok(());
1866                }
1867            }
1868            if let Some(path) = path {
1869                if policy
1870                    .skill_sources
1871                    .allowlist_paths
1872                    .iter()
1873                    .any(|v| v == path)
1874                {
1875                    return Ok(());
1876                }
1877            }
1878            Err("skill source denied: not present in allowlist".to_string())
1879        }
1880    }
1881}
1882
1883fn validate_pinned_hash(
1884    id: Option<&str>,
1885    path: Option<&str>,
1886    digest: &str,
1887    policy: &SpawnPolicy,
1888) -> Result<(), String> {
1889    let by_id = id.and_then(|id| policy.skill_sources.pinned_hashes.get(&format!("id:{id}")));
1890    let by_path = path.and_then(|path| {
1891        policy
1892            .skill_sources
1893            .pinned_hashes
1894            .get(&format!("path:{path}"))
1895    });
1896    let expected = by_id.or(by_path);
1897    if let Some(expected) = expected {
1898        let normalized = expected.strip_prefix("sha256:").unwrap_or(expected);
1899        if normalized != digest {
1900            return Err("pinned hash mismatch for skill reference".to_string());
1901        }
1902    }
1903    Ok(())
1904}
1905
1906fn hash_hex(bytes: &[u8]) -> String {
1907    let mut out = String::with_capacity(bytes.len() * 2);
1908    for byte in bytes {
1909        use std::fmt::Write as _;
1910        let _ = write!(&mut out, "{:02x}", byte);
1911    }
1912    out
1913}
1914
1915fn estimate_tokens(text: &str) -> u64 {
1916    let chars = text.chars().count() as u64;
1917    (chars / 4).max(1)
1918}
1919
1920fn extract_session_id(event: &EngineEvent) -> Option<String> {
1921    event
1922        .properties
1923        .get("sessionID")
1924        .and_then(|v| v.as_str())
1925        .map(|v| v.to_string())
1926        .or_else(|| {
1927            event
1928                .properties
1929                .get("part")
1930                .and_then(|v| v.get("sessionID"))
1931                .and_then(|v| v.as_str())
1932                .map(|v| v.to_string())
1933        })
1934}