Skip to main content

tandem_server/
agent_teams.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::Context;
7use futures::future::BoxFuture;
8use serde::Deserialize;
9use serde::Serialize;
10use serde_json::{json, Value};
11use tandem_core::{
12    any_policy_matches, SpawnAgentHook, SpawnAgentToolContext, SpawnAgentToolResult,
13    ToolPolicyContext, ToolPolicyDecision, ToolPolicyHook,
14};
15use tandem_orchestrator::{
16    AgentInstance, AgentInstanceStatus, AgentRole, AgentTemplate, BudgetLimit, SpawnDecision,
17    SpawnDenyCode, SpawnPolicy, SpawnRequest, SpawnSource,
18};
19use tandem_skills::SkillService;
20use tandem_types::{EngineEvent, Session};
21use tokio::fs;
22use tokio::sync::RwLock;
23use uuid::Uuid;
24
25use crate::AppState;
26
27#[derive(Clone, Default)]
28pub struct AgentTeamRuntime {
29    policy: Arc<RwLock<Option<SpawnPolicy>>>,
30    templates: Arc<RwLock<HashMap<String, AgentTemplate>>>,
31    instances: Arc<RwLock<HashMap<String, AgentInstance>>>,
32    budgets: Arc<RwLock<HashMap<String, InstanceBudgetState>>>,
33    mission_budgets: Arc<RwLock<HashMap<String, MissionBudgetState>>>,
34    spawn_approvals: Arc<RwLock<HashMap<String, PendingSpawnApproval>>>,
35    loaded_workspace: Arc<RwLock<Option<String>>>,
36    audit_path: Arc<RwLock<PathBuf>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct SpawnResult {
41    pub decision: SpawnDecision,
42    pub instance: Option<AgentInstance>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46pub struct AgentMissionSummary {
47    #[serde(rename = "missionID")]
48    pub mission_id: String,
49    #[serde(rename = "instanceCount")]
50    pub instance_count: usize,
51    #[serde(rename = "runningCount")]
52    pub running_count: usize,
53    #[serde(rename = "completedCount")]
54    pub completed_count: usize,
55    #[serde(rename = "failedCount")]
56    pub failed_count: usize,
57    #[serde(rename = "cancelledCount")]
58    pub cancelled_count: usize,
59    #[serde(rename = "queuedCount")]
60    pub queued_count: usize,
61    #[serde(rename = "tokenUsedTotal")]
62    pub token_used_total: u64,
63    #[serde(rename = "toolCallsUsedTotal")]
64    pub tool_calls_used_total: u64,
65    #[serde(rename = "stepsUsedTotal")]
66    pub steps_used_total: u64,
67    #[serde(rename = "costUsedUsdTotal")]
68    pub cost_used_usd_total: f64,
69}
70
71#[derive(Debug, Clone, Default)]
72struct InstanceBudgetState {
73    tokens_used: u64,
74    steps_used: u32,
75    tool_calls_used: u32,
76    cost_used_usd: f64,
77    started_at: Option<Instant>,
78    exhausted: bool,
79}
80
81#[derive(Debug, Clone, Default)]
82struct MissionBudgetState {
83    tokens_used: u64,
84    steps_used: u64,
85    tool_calls_used: u64,
86    cost_used_usd: f64,
87    exhausted: bool,
88}
89
90#[derive(Debug, Clone, Serialize)]
91pub struct PendingSpawnApproval {
92    #[serde(rename = "approvalID")]
93    pub approval_id: String,
94    #[serde(rename = "createdAtMs")]
95    pub created_at_ms: u64,
96    pub request: SpawnRequest,
97    #[serde(rename = "decisionCode")]
98    pub decision_code: Option<SpawnDenyCode>,
99    pub reason: Option<String>,
100}
101
102#[derive(Clone)]
103pub struct ServerSpawnAgentHook {
104    state: AppState,
105}
106
107#[derive(Debug, Deserialize)]
108struct SpawnAgentToolInput {
109    #[serde(rename = "missionID")]
110    mission_id: Option<String>,
111    #[serde(rename = "parentInstanceID")]
112    parent_instance_id: Option<String>,
113    #[serde(rename = "templateID")]
114    template_id: Option<String>,
115    role: AgentRole,
116    source: Option<SpawnSource>,
117    justification: String,
118    #[serde(rename = "budgetOverride", default)]
119    budget_override: Option<BudgetLimit>,
120}
121
122impl ServerSpawnAgentHook {
123    pub fn new(state: AppState) -> Self {
124        Self { state }
125    }
126}
127
128impl SpawnAgentHook for ServerSpawnAgentHook {
129    fn spawn_agent(
130        &self,
131        ctx: SpawnAgentToolContext,
132    ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>> {
133        let state = self.state.clone();
134        Box::pin(async move {
135            let parsed = serde_json::from_value::<SpawnAgentToolInput>(ctx.args.clone());
136            let input = match parsed {
137                Ok(input) => input,
138                Err(err) => {
139                    return Ok(SpawnAgentToolResult {
140                        output: format!("spawn_agent denied: invalid args ({err})"),
141                        metadata: json!({
142                            "ok": false,
143                            "code": "SPAWN_INVALID_ARGS",
144                            "error": err.to_string(),
145                        }),
146                    });
147                }
148            };
149            let req = SpawnRequest {
150                mission_id: input.mission_id,
151                parent_instance_id: input.parent_instance_id,
152                source: input.source.unwrap_or(SpawnSource::ToolCall),
153                parent_role: None,
154                role: input.role,
155                template_id: input.template_id,
156                justification: input.justification,
157                budget_override: input.budget_override,
158            };
159
160            let event_ctx = SpawnEventContext {
161                session_id: Some(ctx.session_id.as_str()),
162                message_id: Some(ctx.message_id.as_str()),
163                run_id: None,
164            };
165            emit_spawn_requested_with_context(&state, &req, &event_ctx);
166            let result = state.agent_teams.spawn(&state, req.clone()).await;
167            if !result.decision.allowed || result.instance.is_none() {
168                emit_spawn_denied_with_context(&state, &req, &result.decision, &event_ctx);
169                return Ok(SpawnAgentToolResult {
170                    output: result
171                        .decision
172                        .reason
173                        .clone()
174                        .unwrap_or_else(|| "spawn_agent denied".to_string()),
175                    metadata: json!({
176                        "ok": false,
177                        "code": result.decision.code,
178                        "error": result.decision.reason,
179                        "requiresUserApproval": result.decision.requires_user_approval,
180                    }),
181                });
182            }
183            let instance = result.instance.expect("checked is_some");
184            emit_spawn_approved_with_context(&state, &req, &instance, &event_ctx);
185            Ok(SpawnAgentToolResult {
186                output: format!(
187                    "spawned {} as instance {} (session {})",
188                    instance.template_id, instance.instance_id, instance.session_id
189                ),
190                metadata: json!({
191                    "ok": true,
192                    "missionID": instance.mission_id,
193                    "instanceID": instance.instance_id,
194                    "sessionID": instance.session_id,
195                    "runID": instance.run_id,
196                    "status": instance.status,
197                    "skillHash": instance.skill_hash,
198                    "workspaceRoot": instance_workspace_root(&instance),
199                    "workspaceRepoRoot": instance_workspace_repo_root(&instance),
200                    "managedWorktree": instance_managed_worktree(&instance),
201                }),
202            })
203        })
204    }
205}
206
207#[derive(Clone)]
208pub struct ServerToolPolicyHook {
209    state: AppState,
210}
211
212impl ServerToolPolicyHook {
213    pub fn new(state: AppState) -> Self {
214        Self { state }
215    }
216}
217
218impl ToolPolicyHook for ServerToolPolicyHook {
219    fn evaluate_tool(
220        &self,
221        ctx: ToolPolicyContext,
222    ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>> {
223        let state = self.state.clone();
224        Box::pin(async move {
225            let tool = normalize_tool_name(&ctx.tool);
226            if let Some(policy) = state.routine_session_policy(&ctx.session_id).await {
227                let allowed_patterns = policy
228                    .allowed_tools
229                    .iter()
230                    .map(|name| normalize_tool_name(name))
231                    .collect::<Vec<_>>();
232                if !policy.allowed_tools.is_empty() && !any_policy_matches(&allowed_patterns, &tool)
233                {
234                    let reason = format!(
235                        "tool `{}` is not allowed for routine `{}` (run `{}`)",
236                        tool, policy.routine_id, policy.run_id
237                    );
238                    state.event_bus.publish(EngineEvent::new(
239                        "routine.tool.denied",
240                        json!({
241                            "sessionID": ctx.session_id,
242                            "messageID": ctx.message_id,
243                            "runID": policy.run_id,
244                            "routineID": policy.routine_id,
245                            "tool": tool,
246                            "reason": reason,
247                            "timestampMs": crate::now_ms(),
248                        }),
249                    ));
250                    return Ok(ToolPolicyDecision {
251                        allowed: false,
252                        reason: Some(reason),
253                    });
254                }
255            }
256
257            let Some(instance) = state
258                .agent_teams
259                .instance_for_session(&ctx.session_id)
260                .await
261            else {
262                return Ok(ToolPolicyDecision {
263                    allowed: true,
264                    reason: None,
265                });
266            };
267            let caps = instance.capabilities.clone();
268            let deny = evaluate_capability_deny(
269                &state,
270                &instance,
271                &tool,
272                &ctx.args,
273                &caps,
274                &ctx.session_id,
275                &ctx.message_id,
276            )
277            .await;
278            if let Some(reason) = deny {
279                state.event_bus.publish(EngineEvent::new(
280                    "agent_team.capability.denied",
281                    json!({
282                        "sessionID": ctx.session_id,
283                        "messageID": ctx.message_id,
284                        "runID": instance.run_id,
285                        "missionID": instance.mission_id,
286                        "instanceID": instance.instance_id,
287                        "tool": tool,
288                        "reason": reason,
289                        "timestampMs": crate::now_ms(),
290                    }),
291                ));
292                return Ok(ToolPolicyDecision {
293                    allowed: false,
294                    reason: Some(reason),
295                });
296            }
297            Ok(ToolPolicyDecision {
298                allowed: true,
299                reason: None,
300            })
301        })
302    }
303}
304
305impl AgentTeamRuntime {
306    pub fn new(audit_path: PathBuf) -> Self {
307        Self {
308            policy: Arc::new(RwLock::new(None)),
309            templates: Arc::new(RwLock::new(HashMap::new())),
310            instances: Arc::new(RwLock::new(HashMap::new())),
311            budgets: Arc::new(RwLock::new(HashMap::new())),
312            mission_budgets: Arc::new(RwLock::new(HashMap::new())),
313            spawn_approvals: Arc::new(RwLock::new(HashMap::new())),
314            loaded_workspace: Arc::new(RwLock::new(None)),
315            audit_path: Arc::new(RwLock::new(audit_path)),
316        }
317    }
318
319    pub async fn set_audit_path(&self, path: PathBuf) {
320        *self.audit_path.write().await = path;
321    }
322
323    pub async fn list_templates(&self) -> Vec<AgentTemplate> {
324        let mut rows = self
325            .templates
326            .read()
327            .await
328            .values()
329            .cloned()
330            .collect::<Vec<_>>();
331        rows.sort_by(|a, b| a.template_id.cmp(&b.template_id));
332        rows
333    }
334
335    async fn templates_dir_for_loaded_workspace(&self) -> anyhow::Result<PathBuf> {
336        let workspace = self
337            .loaded_workspace
338            .read()
339            .await
340            .clone()
341            .ok_or_else(|| anyhow::anyhow!("agent team workspace not loaded"))?;
342        Ok(PathBuf::from(workspace)
343            .join(".tandem")
344            .join("agent-team")
345            .join("templates"))
346    }
347
348    fn template_filename(template_id: &str) -> String {
349        let safe = template_id
350            .trim()
351            .chars()
352            .map(|ch| {
353                if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
354                    ch
355                } else {
356                    '_'
357                }
358            })
359            .collect::<String>();
360        let fallback = if safe.is_empty() {
361            "template".to_string()
362        } else {
363            safe
364        };
365        format!("{fallback}.yaml")
366    }
367
368    pub async fn upsert_template(
369        &self,
370        workspace_root: &str,
371        template: AgentTemplate,
372    ) -> anyhow::Result<AgentTemplate> {
373        self.ensure_loaded_for_workspace(workspace_root).await?;
374        let templates_dir = self.templates_dir_for_loaded_workspace().await?;
375        fs::create_dir_all(&templates_dir).await?;
376        let path = templates_dir.join(Self::template_filename(&template.template_id));
377        let payload = serde_yaml::to_string(&template)?;
378        fs::write(path, payload).await?;
379        self.templates
380            .write()
381            .await
382            .insert(template.template_id.clone(), template.clone());
383        Ok(template)
384    }
385
386    pub async fn delete_template(
387        &self,
388        workspace_root: &str,
389        template_id: &str,
390    ) -> anyhow::Result<bool> {
391        self.ensure_loaded_for_workspace(workspace_root).await?;
392        let templates_dir = self.templates_dir_for_loaded_workspace().await?;
393        let path = templates_dir.join(Self::template_filename(template_id));
394        let existed = self.templates.write().await.remove(template_id).is_some();
395        if path.exists() {
396            let _ = fs::remove_file(path).await;
397        }
398        Ok(existed)
399    }
400
401    pub async fn get_template_for_workspace(
402        &self,
403        workspace_root: &str,
404        template_id: &str,
405    ) -> anyhow::Result<Option<AgentTemplate>> {
406        self.ensure_loaded_for_workspace(workspace_root).await?;
407        Ok(self.templates.read().await.get(template_id).cloned())
408    }
409
410    pub async fn list_instances(
411        &self,
412        mission_id: Option<&str>,
413        parent_instance_id: Option<&str>,
414        status: Option<AgentInstanceStatus>,
415    ) -> Vec<AgentInstance> {
416        let mut rows = self
417            .instances
418            .read()
419            .await
420            .values()
421            .filter(|instance| {
422                if let Some(mission_id) = mission_id {
423                    if instance.mission_id != mission_id {
424                        return false;
425                    }
426                }
427                if let Some(parent_id) = parent_instance_id {
428                    if instance.parent_instance_id.as_deref() != Some(parent_id) {
429                        return false;
430                    }
431                }
432                if let Some(status) = &status {
433                    if &instance.status != status {
434                        return false;
435                    }
436                }
437                true
438            })
439            .cloned()
440            .collect::<Vec<_>>();
441        rows.sort_by(|a, b| a.instance_id.cmp(&b.instance_id));
442        rows
443    }
444
445    pub async fn list_mission_summaries(&self) -> Vec<AgentMissionSummary> {
446        let instances = self.instances.read().await;
447        let mut by_mission: HashMap<String, AgentMissionSummary> = HashMap::new();
448        for instance in instances.values() {
449            let row = by_mission
450                .entry(instance.mission_id.clone())
451                .or_insert_with(|| AgentMissionSummary {
452                    mission_id: instance.mission_id.clone(),
453                    instance_count: 0,
454                    running_count: 0,
455                    completed_count: 0,
456                    failed_count: 0,
457                    cancelled_count: 0,
458                    queued_count: 0,
459                    token_used_total: 0,
460                    tool_calls_used_total: 0,
461                    steps_used_total: 0,
462                    cost_used_usd_total: 0.0,
463                });
464            row.instance_count = row.instance_count.saturating_add(1);
465            match instance.status {
466                AgentInstanceStatus::Queued => {
467                    row.queued_count = row.queued_count.saturating_add(1)
468                }
469                AgentInstanceStatus::Running => {
470                    row.running_count = row.running_count.saturating_add(1)
471                }
472                AgentInstanceStatus::Completed => {
473                    row.completed_count = row.completed_count.saturating_add(1)
474                }
475                AgentInstanceStatus::Failed => {
476                    row.failed_count = row.failed_count.saturating_add(1)
477                }
478                AgentInstanceStatus::Cancelled => {
479                    row.cancelled_count = row.cancelled_count.saturating_add(1)
480                }
481            }
482            if let Some(usage) = instance
483                .metadata
484                .as_ref()
485                .and_then(|m| m.get("budgetUsage"))
486                .and_then(|u| u.as_object())
487            {
488                row.token_used_total = row.token_used_total.saturating_add(
489                    usage
490                        .get("tokensUsed")
491                        .and_then(|v| v.as_u64())
492                        .unwrap_or(0),
493                );
494                row.tool_calls_used_total = row.tool_calls_used_total.saturating_add(
495                    usage
496                        .get("toolCallsUsed")
497                        .and_then(|v| v.as_u64())
498                        .unwrap_or(0),
499                );
500                row.steps_used_total = row
501                    .steps_used_total
502                    .saturating_add(usage.get("stepsUsed").and_then(|v| v.as_u64()).unwrap_or(0));
503                row.cost_used_usd_total += usage
504                    .get("costUsedUsd")
505                    .and_then(|v| v.as_f64())
506                    .unwrap_or(0.0);
507            }
508        }
509        let mut rows = by_mission.into_values().collect::<Vec<_>>();
510        rows.sort_by(|a, b| a.mission_id.cmp(&b.mission_id));
511        rows
512    }
513
514    pub async fn instance_for_session(&self, session_id: &str) -> Option<AgentInstance> {
515        self.instances
516            .read()
517            .await
518            .values()
519            .find(|instance| instance.session_id == session_id)
520            .cloned()
521    }
522
523    pub async fn list_spawn_approvals(&self) -> Vec<PendingSpawnApproval> {
524        let mut rows = self
525            .spawn_approvals
526            .read()
527            .await
528            .values()
529            .cloned()
530            .collect::<Vec<_>>();
531        rows.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
532        rows
533    }
534
535    pub async fn ensure_loaded_for_workspace(&self, workspace_root: &str) -> anyhow::Result<()> {
536        let normalized = workspace_root.trim().to_string();
537        let already_loaded = self
538            .loaded_workspace
539            .read()
540            .await
541            .as_ref()
542            .map(|s| s == &normalized)
543            .unwrap_or(false);
544        if already_loaded {
545            return Ok(());
546        }
547
548        let root = PathBuf::from(&normalized);
549        let policy_path = root
550            .join(".tandem")
551            .join("agent-team")
552            .join("spawn-policy.yaml");
553        let templates_dir = root.join(".tandem").join("agent-team").join("templates");
554
555        let mut next_policy = None;
556        if policy_path.exists() {
557            let raw = fs::read_to_string(&policy_path)
558                .await
559                .with_context(|| format!("failed reading {}", policy_path.display()))?;
560            let parsed = serde_yaml::from_str::<SpawnPolicy>(&raw)
561                .with_context(|| format!("failed parsing {}", policy_path.display()))?;
562            next_policy = Some(parsed);
563        }
564
565        let mut next_templates = HashMap::new();
566        if templates_dir.exists() {
567            let mut entries = fs::read_dir(&templates_dir).await?;
568            while let Some(entry) = entries.next_entry().await? {
569                let path = entry.path();
570                if !path.is_file() {
571                    continue;
572                }
573                let ext = path
574                    .extension()
575                    .and_then(|v| v.to_str())
576                    .unwrap_or_default()
577                    .to_ascii_lowercase();
578                if ext != "yaml" && ext != "yml" && ext != "json" {
579                    continue;
580                }
581                let raw = fs::read_to_string(&path).await?;
582                let template = serde_yaml::from_str::<AgentTemplate>(&raw)
583                    .with_context(|| format!("failed parsing {}", path.display()))?;
584                next_templates.insert(template.template_id.clone(), template);
585            }
586        }
587
588        *self.policy.write().await = next_policy;
589        *self.templates.write().await = next_templates;
590        *self.loaded_workspace.write().await = Some(normalized);
591        Ok(())
592    }
593
594    pub async fn spawn(&self, state: &AppState, req: SpawnRequest) -> SpawnResult {
595        self.spawn_with_approval_override(state, req, false).await
596    }
597
598    async fn spawn_with_approval_override(
599        &self,
600        state: &AppState,
601        mut req: SpawnRequest,
602        approval_override: bool,
603    ) -> SpawnResult {
604        let workspace_root = state.workspace_index.snapshot().await.root;
605        if let Err(err) = self.ensure_loaded_for_workspace(&workspace_root).await {
606            return SpawnResult {
607                decision: SpawnDecision {
608                    allowed: false,
609                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
610                    reason: Some(format!("spawn policy load failed: {}", err)),
611                    requires_user_approval: false,
612                },
613                instance: None,
614            };
615        }
616
617        let Some(policy) = self.policy.read().await.clone() else {
618            return SpawnResult {
619                decision: SpawnDecision {
620                    allowed: false,
621                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
622                    reason: Some("spawn policy file missing".to_string()),
623                    requires_user_approval: false,
624                },
625                instance: None,
626            };
627        };
628
629        let template = {
630            let templates = self.templates.read().await;
631            req.template_id
632                .as_deref()
633                .and_then(|template_id| templates.get(template_id).cloned())
634        };
635        if req.template_id.is_none() {
636            if let Some(found) = self
637                .templates
638                .read()
639                .await
640                .values()
641                .find(|t| t.role == req.role)
642                .cloned()
643            {
644                req.template_id = Some(found.template_id.clone());
645            }
646        }
647        let template = if template.is_some() {
648            template
649        } else {
650            let templates = self.templates.read().await;
651            req.template_id
652                .as_deref()
653                .and_then(|id| templates.get(id).cloned())
654        };
655
656        if req.parent_role.is_none() {
657            if let Some(parent_id) = req.parent_instance_id.as_deref() {
658                let instances = self.instances.read().await;
659                req.parent_role = instances
660                    .get(parent_id)
661                    .map(|instance| instance.role.clone());
662            }
663        }
664
665        let instances = self.instances.read().await;
666        let total_agents = instances.len();
667        let running_agents = instances
668            .values()
669            .filter(|instance| instance.status == AgentInstanceStatus::Running)
670            .count();
671        drop(instances);
672
673        let mut decision = policy.evaluate(&req, total_agents, running_agents, template.as_ref());
674        if approval_override
675            && !decision.allowed
676            && decision.requires_user_approval
677            && matches!(decision.code, Some(SpawnDenyCode::SpawnRequiresApproval))
678        {
679            decision = SpawnDecision {
680                allowed: true,
681                code: None,
682                reason: None,
683                requires_user_approval: false,
684            };
685        }
686        if !decision.allowed {
687            if decision.requires_user_approval && !approval_override {
688                self.queue_spawn_approval(&req, &decision).await;
689            }
690            return SpawnResult {
691                decision,
692                instance: None,
693            };
694        }
695
696        let mission_id = req
697            .mission_id
698            .clone()
699            .unwrap_or_else(|| "mission-default".to_string());
700
701        if let Some(reason) = self
702            .mission_budget_exceeded_reason(&policy, &mission_id)
703            .await
704        {
705            return SpawnResult {
706                decision: SpawnDecision {
707                    allowed: false,
708                    code: Some(SpawnDenyCode::SpawnMissionBudgetExceeded),
709                    reason: Some(reason),
710                    requires_user_approval: false,
711                },
712                instance: None,
713            };
714        }
715
716        let template = template.unwrap_or_else(|| AgentTemplate {
717            template_id: "default-template".to_string(),
718            display_name: None,
719            avatar_url: None,
720            role: req.role.clone(),
721            system_prompt: None,
722            default_model: None,
723            skills: Vec::new(),
724            default_budget: BudgetLimit::default(),
725            capabilities: Default::default(),
726        });
727
728        let skill_hash = match compute_skill_hash(&workspace_root, &template, &policy).await {
729            Ok(hash) => hash,
730            Err(err) => {
731                let lowered = err.to_ascii_lowercase();
732                let code = if lowered.contains("pinned hash mismatch") {
733                    SpawnDenyCode::SpawnSkillHashMismatch
734                } else if lowered.contains("skill source denied") {
735                    SpawnDenyCode::SpawnSkillSourceDenied
736                } else {
737                    SpawnDenyCode::SpawnRequiredSkillMissing
738                };
739                return SpawnResult {
740                    decision: SpawnDecision {
741                        allowed: false,
742                        code: Some(code),
743                        reason: Some(err),
744                        requires_user_approval: false,
745                    },
746                    instance: None,
747                };
748            }
749        };
750
751        let parent_snapshot = {
752            let instances = self.instances.read().await;
753            req.parent_instance_id
754                .as_deref()
755                .and_then(|id| instances.get(id).cloned())
756        };
757        let parent_usage = if let Some(parent_id) = req.parent_instance_id.as_deref() {
758            self.budgets.read().await.get(parent_id).cloned()
759        } else {
760            None
761        };
762
763        let budget = resolve_budget(
764            &policy,
765            parent_snapshot,
766            parent_usage,
767            &template,
768            req.budget_override.clone(),
769            &req.role,
770        );
771
772        let instance_id = format!("ins_{}", Uuid::new_v4().simple());
773        let managed_worktree = prepare_agent_instance_workspace(
774            state,
775            &workspace_root,
776            req.mission_id.as_deref(),
777            &instance_id,
778            &template.template_id,
779        )
780        .await;
781        let workspace_repo_root = managed_worktree
782            .as_ref()
783            .map(|row| row.record.repo_root.clone())
784            .or_else(|| crate::runtime::worktrees::resolve_git_repo_root(&workspace_root))
785            .unwrap_or_else(|| workspace_root.clone());
786        let worker_workspace_root = managed_worktree
787            .as_ref()
788            .map(|row| row.record.path.clone())
789            .unwrap_or_else(|| workspace_root.clone());
790        let mut session = Session::new(
791            Some(format!("Agent Team {}", template.template_id)),
792            Some(worker_workspace_root.clone()),
793        );
794        session.workspace_root = Some(worker_workspace_root.clone());
795        let session_id = session.id.clone();
796        if let Err(err) = state.storage.save_session(session).await {
797            if let Some(worktree) = managed_worktree.as_ref() {
798                let _ = crate::runtime::worktrees::delete_managed_worktree(state, &worktree.record)
799                    .await;
800            }
801            return SpawnResult {
802                decision: SpawnDecision {
803                    allowed: false,
804                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
805                    reason: Some(format!("failed creating child session: {}", err)),
806                    requires_user_approval: false,
807                },
808                instance: None,
809            };
810        }
811
812        let instance = AgentInstance {
813            instance_id: instance_id.clone(),
814            mission_id: mission_id.clone(),
815            parent_instance_id: req.parent_instance_id.clone(),
816            role: template.role.clone(),
817            template_id: template.template_id.clone(),
818            session_id: session_id.clone(),
819            run_id: None,
820            status: AgentInstanceStatus::Running,
821            budget,
822            skill_hash: skill_hash.clone(),
823            capabilities: template.capabilities.clone(),
824            metadata: Some(json!({
825                "source": req.source,
826                "justification": req.justification,
827                "workspaceRoot": worker_workspace_root,
828                "workspaceRepoRoot": workspace_repo_root,
829                "managedWorktree": managed_worktree.as_ref().map(|row| json!({
830                    "path": row.record.path,
831                    "branch": row.record.branch,
832                    "repoRoot": row.record.repo_root,
833                    "cleanupBranch": row.record.cleanup_branch,
834                    "reused": row.reused,
835                })).unwrap_or(Value::Null),
836            })),
837        };
838
839        self.instances
840            .write()
841            .await
842            .insert(instance.instance_id.clone(), instance.clone());
843        self.budgets.write().await.insert(
844            instance.instance_id.clone(),
845            InstanceBudgetState {
846                started_at: Some(Instant::now()),
847                ..InstanceBudgetState::default()
848            },
849        );
850        let _ = self.append_audit("spawn.approved", &instance).await;
851
852        SpawnResult {
853            decision: SpawnDecision {
854                allowed: true,
855                code: None,
856                reason: None,
857                requires_user_approval: false,
858            },
859            instance: Some(instance),
860        }
861    }
862
863    pub async fn approve_spawn_approval(
864        &self,
865        state: &AppState,
866        approval_id: &str,
867        reason: Option<&str>,
868    ) -> Option<SpawnResult> {
869        let approval = self.spawn_approvals.write().await.remove(approval_id)?;
870        let result = self
871            .spawn_with_approval_override(state, approval.request.clone(), true)
872            .await;
873        if let Some(instance) = result.instance.as_ref() {
874            let note = reason.unwrap_or("approved by operator");
875            let _ = self
876                .append_approval_audit("spawn.approval.approved", approval_id, Some(instance), note)
877                .await;
878        } else {
879            let note = reason.unwrap_or("approval replay failed policy checks");
880            let _ = self
881                .append_approval_audit("spawn.approval.rejected_on_replay", approval_id, None, note)
882                .await;
883        }
884        Some(result)
885    }
886
887    pub async fn deny_spawn_approval(
888        &self,
889        approval_id: &str,
890        reason: Option<&str>,
891    ) -> Option<PendingSpawnApproval> {
892        let approval = self.spawn_approvals.write().await.remove(approval_id)?;
893        let note = reason.unwrap_or("denied by operator");
894        let _ = self
895            .append_approval_audit("spawn.approval.denied", approval_id, None, note)
896            .await;
897        Some(approval)
898    }
899
900    pub async fn cancel_instance(
901        &self,
902        state: &AppState,
903        instance_id: &str,
904        reason: &str,
905    ) -> Option<AgentInstance> {
906        let mut instances = self.instances.write().await;
907        let instance = instances.get_mut(instance_id)?;
908        if matches!(
909            instance.status,
910            AgentInstanceStatus::Completed
911                | AgentInstanceStatus::Failed
912                | AgentInstanceStatus::Cancelled
913        ) {
914            return Some(instance.clone());
915        }
916        instance.status = AgentInstanceStatus::Cancelled;
917        let snapshot = instance.clone();
918        drop(instances);
919        let _ = state.cancellations.cancel(&snapshot.session_id).await;
920        cleanup_instance_managed_worktree(state, &snapshot).await;
921        let _ = self.append_audit("instance.cancelled", &snapshot).await;
922        emit_instance_cancelled(state, &snapshot, reason);
923        Some(snapshot)
924    }
925
926    async fn queue_spawn_approval(&self, req: &SpawnRequest, decision: &SpawnDecision) {
927        let approval = PendingSpawnApproval {
928            approval_id: format!("spawn_{}", Uuid::new_v4().simple()),
929            created_at_ms: crate::now_ms(),
930            request: req.clone(),
931            decision_code: decision.code.clone(),
932            reason: decision.reason.clone(),
933        };
934        self.spawn_approvals
935            .write()
936            .await
937            .insert(approval.approval_id.clone(), approval);
938    }
939
940    async fn mission_budget_exceeded_reason(
941        &self,
942        policy: &SpawnPolicy,
943        mission_id: &str,
944    ) -> Option<String> {
945        let Some(limit) = policy.mission_total_budget.as_ref() else {
946            return None;
947        };
948        let usage = self
949            .mission_budgets
950            .read()
951            .await
952            .get(mission_id)
953            .cloned()
954            .unwrap_or_default();
955        if let Some(max) = limit.max_tokens {
956            if usage.tokens_used >= max {
957                return Some(format!(
958                    "mission max_tokens exhausted ({}/{})",
959                    usage.tokens_used, max
960                ));
961            }
962        }
963        if let Some(max) = limit.max_steps {
964            if usage.steps_used >= u64::from(max) {
965                return Some(format!(
966                    "mission max_steps exhausted ({}/{})",
967                    usage.steps_used, max
968                ));
969            }
970        }
971        if let Some(max) = limit.max_tool_calls {
972            if usage.tool_calls_used >= u64::from(max) {
973                return Some(format!(
974                    "mission max_tool_calls exhausted ({}/{})",
975                    usage.tool_calls_used, max
976                ));
977            }
978        }
979        if let Some(max) = limit.max_cost_usd {
980            if usage.cost_used_usd >= max {
981                return Some(format!(
982                    "mission max_cost_usd exhausted ({:.6}/{:.6})",
983                    usage.cost_used_usd, max
984                ));
985            }
986        }
987        None
988    }
989
990    pub async fn cancel_mission(&self, state: &AppState, mission_id: &str, reason: &str) -> usize {
991        let instance_ids = self
992            .instances
993            .read()
994            .await
995            .values()
996            .filter(|instance| instance.mission_id == mission_id)
997            .map(|instance| instance.instance_id.clone())
998            .collect::<Vec<_>>();
999        let mut count = 0usize;
1000        for instance_id in instance_ids {
1001            if self
1002                .cancel_instance(state, &instance_id, reason)
1003                .await
1004                .is_some()
1005            {
1006                count = count.saturating_add(1);
1007            }
1008        }
1009        count
1010    }
1011
1012    async fn mark_instance_terminal(
1013        &self,
1014        state: &AppState,
1015        instance_id: &str,
1016        status: AgentInstanceStatus,
1017    ) -> Option<AgentInstance> {
1018        let mut instances = self.instances.write().await;
1019        let instance = instances.get_mut(instance_id)?;
1020        if matches!(
1021            instance.status,
1022            AgentInstanceStatus::Completed
1023                | AgentInstanceStatus::Failed
1024                | AgentInstanceStatus::Cancelled
1025        ) {
1026            return Some(instance.clone());
1027        }
1028        instance.status = status.clone();
1029        let snapshot = instance.clone();
1030        drop(instances);
1031        cleanup_instance_managed_worktree(state, &snapshot).await;
1032        match status {
1033            AgentInstanceStatus::Completed => emit_instance_completed(state, &snapshot),
1034            AgentInstanceStatus::Failed => emit_instance_failed(state, &snapshot),
1035            _ => {}
1036        }
1037        Some(snapshot)
1038    }
1039
1040    pub async fn handle_engine_event(&self, state: &AppState, event: &EngineEvent) {
1041        let Some(session_id) = extract_session_id(event) else {
1042            return;
1043        };
1044        let Some(instance_id) = self.instance_id_for_session(&session_id).await else {
1045            return;
1046        };
1047        if event.event_type == "provider.usage" {
1048            let total_tokens = event
1049                .properties
1050                .get("totalTokens")
1051                .and_then(|v| v.as_u64())
1052                .unwrap_or(0);
1053            let cost_used_usd = event
1054                .properties
1055                .get("costUsd")
1056                .and_then(|v| v.as_f64())
1057                .unwrap_or(0.0);
1058            if total_tokens > 0 {
1059                let exhausted = self
1060                    .apply_exact_token_usage(state, &instance_id, total_tokens, cost_used_usd)
1061                    .await;
1062                if exhausted {
1063                    let _ = self
1064                        .cancel_instance(state, &instance_id, "budget exhausted")
1065                        .await;
1066                }
1067            }
1068            return;
1069        }
1070        let mut delta_tokens = 0u64;
1071        let mut delta_steps = 0u32;
1072        let mut delta_tool_calls = 0u32;
1073        if event.event_type == "message.part.updated" {
1074            if let Some(part) = event.properties.get("part") {
1075                let part_type = part.get("type").and_then(|v| v.as_str()).unwrap_or("");
1076                if part_type == "tool-invocation" {
1077                    delta_tool_calls = 1;
1078                } else if part_type == "text" {
1079                    let delta = event
1080                        .properties
1081                        .get("delta")
1082                        .and_then(|v| v.as_str())
1083                        .unwrap_or("");
1084                    if !delta.is_empty() {
1085                        delta_tokens = estimate_tokens(delta);
1086                    }
1087                }
1088            }
1089        } else if event.event_type == "session.run.finished" {
1090            delta_steps = 1;
1091            let run_status = event
1092                .properties
1093                .get("status")
1094                .and_then(|v| v.as_str())
1095                .unwrap_or("")
1096                .to_ascii_lowercase();
1097            if run_status == "completed" {
1098                let _ = self
1099                    .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Completed)
1100                    .await;
1101            } else if run_status == "failed" || run_status == "error" {
1102                let _ = self
1103                    .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Failed)
1104                    .await;
1105            }
1106        }
1107        if delta_tokens == 0 && delta_steps == 0 && delta_tool_calls == 0 {
1108            return;
1109        }
1110        let exhausted = self
1111            .apply_budget_delta(
1112                state,
1113                &instance_id,
1114                delta_tokens,
1115                delta_steps,
1116                delta_tool_calls,
1117            )
1118            .await;
1119        if exhausted {
1120            let _ = self
1121                .cancel_instance(state, &instance_id, "budget exhausted")
1122                .await;
1123        }
1124    }
1125
1126    async fn instance_id_for_session(&self, session_id: &str) -> Option<String> {
1127        self.instances
1128            .read()
1129            .await
1130            .values()
1131            .find(|instance| instance.session_id == session_id)
1132            .map(|instance| instance.instance_id.clone())
1133    }
1134
1135    async fn apply_budget_delta(
1136        &self,
1137        state: &AppState,
1138        instance_id: &str,
1139        delta_tokens: u64,
1140        delta_steps: u32,
1141        delta_tool_calls: u32,
1142    ) -> bool {
1143        let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1144            enabled: false,
1145            require_justification: false,
1146            max_agents: None,
1147            max_concurrent: None,
1148            child_budget_percent_of_parent_remaining: None,
1149            mission_total_budget: None,
1150            cost_per_1k_tokens_usd: None,
1151            spawn_edges: HashMap::new(),
1152            required_skills: HashMap::new(),
1153            role_defaults: HashMap::new(),
1154            skill_sources: Default::default(),
1155        });
1156        let mut budgets = self.budgets.write().await;
1157        let Some(usage) = budgets.get_mut(instance_id) else {
1158            return false;
1159        };
1160        if usage.exhausted {
1161            return true;
1162        }
1163        let prev_cost_used_usd = usage.cost_used_usd;
1164        usage.tokens_used = usage.tokens_used.saturating_add(delta_tokens);
1165        usage.steps_used = usage.steps_used.saturating_add(delta_steps);
1166        usage.tool_calls_used = usage.tool_calls_used.saturating_add(delta_tool_calls);
1167        if let Some(rate) = policy.cost_per_1k_tokens_usd {
1168            usage.cost_used_usd += (delta_tokens as f64 / 1000.0) * rate;
1169        }
1170        let elapsed_ms = usage
1171            .started_at
1172            .map(|started| started.elapsed().as_millis() as u64)
1173            .unwrap_or(0);
1174
1175        let mut exhausted_reason: Option<&'static str> = None;
1176        let mut snapshot: Option<AgentInstance> = None;
1177        {
1178            let mut instances = self.instances.write().await;
1179            if let Some(instance) = instances.get_mut(instance_id) {
1180                instance.metadata = Some(merge_metadata_usage(
1181                    instance.metadata.take(),
1182                    usage.tokens_used,
1183                    usage.steps_used,
1184                    usage.tool_calls_used,
1185                    usage.cost_used_usd,
1186                    elapsed_ms,
1187                ));
1188                if let Some(limit) = instance.budget.max_tokens {
1189                    if usage.tokens_used >= limit {
1190                        exhausted_reason = Some("max_tokens");
1191                    }
1192                }
1193                if exhausted_reason.is_none() {
1194                    if let Some(limit) = instance.budget.max_steps {
1195                        if usage.steps_used >= limit {
1196                            exhausted_reason = Some("max_steps");
1197                        }
1198                    }
1199                }
1200                if exhausted_reason.is_none() {
1201                    if let Some(limit) = instance.budget.max_tool_calls {
1202                        if usage.tool_calls_used >= limit {
1203                            exhausted_reason = Some("max_tool_calls");
1204                        }
1205                    }
1206                }
1207                if exhausted_reason.is_none() {
1208                    if let Some(limit) = instance.budget.max_duration_ms {
1209                        if elapsed_ms >= limit {
1210                            exhausted_reason = Some("max_duration_ms");
1211                        }
1212                    }
1213                }
1214                if exhausted_reason.is_none() {
1215                    if let Some(limit) = instance.budget.max_cost_usd {
1216                        if usage.cost_used_usd >= limit {
1217                            exhausted_reason = Some("max_cost_usd");
1218                        }
1219                    }
1220                }
1221                snapshot = Some(instance.clone());
1222            }
1223        }
1224        let Some(instance) = snapshot else {
1225            return false;
1226        };
1227        emit_budget_usage(
1228            state,
1229            &instance,
1230            usage.tokens_used,
1231            usage.steps_used,
1232            usage.tool_calls_used,
1233            usage.cost_used_usd,
1234            elapsed_ms,
1235        );
1236        let mission_exhausted = self
1237            .apply_mission_budget_delta(
1238                state,
1239                &instance.mission_id,
1240                delta_tokens,
1241                u64::from(delta_steps),
1242                u64::from(delta_tool_calls),
1243                usage.cost_used_usd - prev_cost_used_usd,
1244                &policy,
1245            )
1246            .await;
1247        if mission_exhausted {
1248            usage.exhausted = true;
1249            let _ = self
1250                .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1251                .await;
1252            return true;
1253        }
1254        if let Some(reason) = exhausted_reason {
1255            usage.exhausted = true;
1256            emit_budget_exhausted(
1257                state,
1258                &instance,
1259                reason,
1260                usage.tokens_used,
1261                usage.steps_used,
1262                usage.tool_calls_used,
1263                usage.cost_used_usd,
1264                elapsed_ms,
1265            );
1266            return true;
1267        }
1268        false
1269    }
1270
1271    async fn apply_exact_token_usage(
1272        &self,
1273        state: &AppState,
1274        instance_id: &str,
1275        total_tokens: u64,
1276        cost_used_usd: f64,
1277    ) -> bool {
1278        let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1279            enabled: false,
1280            require_justification: false,
1281            max_agents: None,
1282            max_concurrent: None,
1283            child_budget_percent_of_parent_remaining: None,
1284            mission_total_budget: None,
1285            cost_per_1k_tokens_usd: None,
1286            spawn_edges: HashMap::new(),
1287            required_skills: HashMap::new(),
1288            role_defaults: HashMap::new(),
1289            skill_sources: Default::default(),
1290        });
1291        let mut budgets = self.budgets.write().await;
1292        let Some(usage) = budgets.get_mut(instance_id) else {
1293            return false;
1294        };
1295        if usage.exhausted {
1296            return true;
1297        }
1298        let prev_tokens = usage.tokens_used;
1299        let prev_cost_used_usd = usage.cost_used_usd;
1300        usage.tokens_used = usage.tokens_used.max(total_tokens);
1301        if cost_used_usd > 0.0 {
1302            usage.cost_used_usd = usage.cost_used_usd.max(cost_used_usd);
1303        } else if let Some(rate) = policy.cost_per_1k_tokens_usd {
1304            let delta = usage.tokens_used.saturating_sub(prev_tokens);
1305            usage.cost_used_usd += (delta as f64 / 1000.0) * rate;
1306        }
1307        let elapsed_ms = usage
1308            .started_at
1309            .map(|started| started.elapsed().as_millis() as u64)
1310            .unwrap_or(0);
1311        let mut exhausted_reason: Option<&'static str> = None;
1312        let mut snapshot: Option<AgentInstance> = None;
1313        {
1314            let mut instances = self.instances.write().await;
1315            if let Some(instance) = instances.get_mut(instance_id) {
1316                instance.metadata = Some(merge_metadata_usage(
1317                    instance.metadata.take(),
1318                    usage.tokens_used,
1319                    usage.steps_used,
1320                    usage.tool_calls_used,
1321                    usage.cost_used_usd,
1322                    elapsed_ms,
1323                ));
1324                if let Some(limit) = instance.budget.max_tokens {
1325                    if usage.tokens_used >= limit {
1326                        exhausted_reason = Some("max_tokens");
1327                    }
1328                }
1329                if exhausted_reason.is_none() {
1330                    if let Some(limit) = instance.budget.max_cost_usd {
1331                        if usage.cost_used_usd >= limit {
1332                            exhausted_reason = Some("max_cost_usd");
1333                        }
1334                    }
1335                }
1336                snapshot = Some(instance.clone());
1337            }
1338        }
1339        let Some(instance) = snapshot else {
1340            return false;
1341        };
1342        emit_budget_usage(
1343            state,
1344            &instance,
1345            usage.tokens_used,
1346            usage.steps_used,
1347            usage.tool_calls_used,
1348            usage.cost_used_usd,
1349            elapsed_ms,
1350        );
1351        let mission_exhausted = self
1352            .apply_mission_budget_delta(
1353                state,
1354                &instance.mission_id,
1355                usage.tokens_used.saturating_sub(prev_tokens),
1356                0,
1357                0,
1358                usage.cost_used_usd - prev_cost_used_usd,
1359                &policy,
1360            )
1361            .await;
1362        if mission_exhausted {
1363            usage.exhausted = true;
1364            let _ = self
1365                .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1366                .await;
1367            return true;
1368        }
1369        if let Some(reason) = exhausted_reason {
1370            usage.exhausted = true;
1371            emit_budget_exhausted(
1372                state,
1373                &instance,
1374                reason,
1375                usage.tokens_used,
1376                usage.steps_used,
1377                usage.tool_calls_used,
1378                usage.cost_used_usd,
1379                elapsed_ms,
1380            );
1381            return true;
1382        }
1383        false
1384    }
1385
1386    async fn append_audit(&self, action: &str, instance: &AgentInstance) -> anyhow::Result<()> {
1387        let path = self.audit_path.read().await.clone();
1388        if let Some(parent) = path.parent() {
1389            fs::create_dir_all(parent).await?;
1390        }
1391        let row = json!({
1392            "action": action,
1393            "missionID": instance.mission_id,
1394            "instanceID": instance.instance_id,
1395            "parentInstanceID": instance.parent_instance_id,
1396            "role": instance.role,
1397            "templateID": instance.template_id,
1398            "sessionID": instance.session_id,
1399            "skillHash": instance.skill_hash,
1400            "workspaceRoot": instance_workspace_root(instance),
1401            "workspaceRepoRoot": instance_workspace_repo_root(instance),
1402            "managedWorktree": instance_managed_worktree(instance),
1403            "timestampMs": crate::now_ms(),
1404        });
1405        let mut existing = if path.exists() {
1406            fs::read_to_string(&path).await.unwrap_or_default()
1407        } else {
1408            String::new()
1409        };
1410        existing.push_str(&serde_json::to_string(&row)?);
1411        existing.push('\n');
1412        fs::write(path, existing).await?;
1413        Ok(())
1414    }
1415
1416    async fn append_approval_audit(
1417        &self,
1418        action: &str,
1419        approval_id: &str,
1420        instance: Option<&AgentInstance>,
1421        reason: &str,
1422    ) -> anyhow::Result<()> {
1423        let path = self.audit_path.read().await.clone();
1424        if let Some(parent) = path.parent() {
1425            fs::create_dir_all(parent).await?;
1426        }
1427        let row = json!({
1428            "action": action,
1429            "approvalID": approval_id,
1430            "reason": reason,
1431            "missionID": instance.map(|v| v.mission_id.clone()),
1432            "instanceID": instance.map(|v| v.instance_id.clone()),
1433            "parentInstanceID": instance.and_then(|v| v.parent_instance_id.clone()),
1434            "role": instance.map(|v| v.role.clone()),
1435            "templateID": instance.map(|v| v.template_id.clone()),
1436            "sessionID": instance.map(|v| v.session_id.clone()),
1437            "skillHash": instance.map(|v| v.skill_hash.clone()),
1438            "workspaceRoot": instance.and_then(instance_workspace_root),
1439            "workspaceRepoRoot": instance.and_then(instance_workspace_repo_root),
1440            "managedWorktree": instance.and_then(instance_managed_worktree),
1441            "timestampMs": crate::now_ms(),
1442        });
1443        let mut existing = if path.exists() {
1444            fs::read_to_string(&path).await.unwrap_or_default()
1445        } else {
1446            String::new()
1447        };
1448        existing.push_str(&serde_json::to_string(&row)?);
1449        existing.push('\n');
1450        fs::write(path, existing).await?;
1451        Ok(())
1452    }
1453
1454    async fn apply_mission_budget_delta(
1455        &self,
1456        state: &AppState,
1457        mission_id: &str,
1458        delta_tokens: u64,
1459        delta_steps: u64,
1460        delta_tool_calls: u64,
1461        delta_cost_used_usd: f64,
1462        policy: &SpawnPolicy,
1463    ) -> bool {
1464        let mut budgets = self.mission_budgets.write().await;
1465        let row = budgets.entry(mission_id.to_string()).or_default();
1466        row.tokens_used = row.tokens_used.saturating_add(delta_tokens);
1467        row.steps_used = row.steps_used.saturating_add(delta_steps);
1468        row.tool_calls_used = row.tool_calls_used.saturating_add(delta_tool_calls);
1469        row.cost_used_usd += delta_cost_used_usd.max(0.0);
1470        if row.exhausted {
1471            return true;
1472        }
1473        let Some(limit) = policy.mission_total_budget.as_ref() else {
1474            return false;
1475        };
1476        let mut exhausted_by: Option<&'static str> = None;
1477        if let Some(max) = limit.max_tokens {
1478            if row.tokens_used >= max {
1479                exhausted_by = Some("mission_max_tokens");
1480            }
1481        }
1482        if exhausted_by.is_none() {
1483            if let Some(max) = limit.max_steps {
1484                if row.steps_used >= u64::from(max) {
1485                    exhausted_by = Some("mission_max_steps");
1486                }
1487            }
1488        }
1489        if exhausted_by.is_none() {
1490            if let Some(max) = limit.max_tool_calls {
1491                if row.tool_calls_used >= u64::from(max) {
1492                    exhausted_by = Some("mission_max_tool_calls");
1493                }
1494            }
1495        }
1496        if exhausted_by.is_none() {
1497            if let Some(max) = limit.max_cost_usd {
1498                if row.cost_used_usd >= max {
1499                    exhausted_by = Some("mission_max_cost_usd");
1500                }
1501            }
1502        }
1503        if let Some(exhausted_by) = exhausted_by {
1504            row.exhausted = true;
1505            emit_mission_budget_exhausted(
1506                state,
1507                mission_id,
1508                exhausted_by,
1509                row.tokens_used,
1510                row.steps_used,
1511                row.tool_calls_used,
1512                row.cost_used_usd,
1513            );
1514            return true;
1515        }
1516        false
1517    }
1518
1519    pub async fn set_for_test(
1520        &self,
1521        workspace_root: Option<String>,
1522        policy: Option<SpawnPolicy>,
1523        templates: Vec<AgentTemplate>,
1524    ) {
1525        *self.policy.write().await = policy;
1526        let mut by_id = HashMap::new();
1527        for template in templates {
1528            by_id.insert(template.template_id.clone(), template);
1529        }
1530        *self.templates.write().await = by_id;
1531        self.instances.write().await.clear();
1532        self.budgets.write().await.clear();
1533        self.mission_budgets.write().await.clear();
1534        self.spawn_approvals.write().await.clear();
1535        *self.loaded_workspace.write().await = workspace_root;
1536    }
1537}
1538
1539fn resolve_budget(
1540    policy: &SpawnPolicy,
1541    parent_instance: Option<AgentInstance>,
1542    parent_usage: Option<InstanceBudgetState>,
1543    template: &AgentTemplate,
1544    override_budget: Option<BudgetLimit>,
1545    role: &AgentRole,
1546) -> BudgetLimit {
1547    let role_default = policy.role_defaults.get(role).cloned().unwrap_or_default();
1548    let mut chosen = merge_budget(
1549        merge_budget(role_default, template.default_budget.clone()),
1550        override_budget.unwrap_or_default(),
1551    );
1552
1553    if let Some(parent) = parent_instance {
1554        let usage = parent_usage.unwrap_or_default();
1555        if let Some(pct) = policy.child_budget_percent_of_parent_remaining {
1556            if pct > 0 {
1557                chosen.max_tokens = cap_budget_remaining_u64(
1558                    chosen.max_tokens,
1559                    parent.budget.max_tokens,
1560                    usage.tokens_used,
1561                    pct,
1562                );
1563                chosen.max_steps = cap_budget_remaining_u32(
1564                    chosen.max_steps,
1565                    parent.budget.max_steps,
1566                    usage.steps_used,
1567                    pct,
1568                );
1569                chosen.max_tool_calls = cap_budget_remaining_u32(
1570                    chosen.max_tool_calls,
1571                    parent.budget.max_tool_calls,
1572                    usage.tool_calls_used,
1573                    pct,
1574                );
1575                chosen.max_duration_ms = cap_budget_remaining_u64(
1576                    chosen.max_duration_ms,
1577                    parent.budget.max_duration_ms,
1578                    usage
1579                        .started_at
1580                        .map(|started| started.elapsed().as_millis() as u64)
1581                        .unwrap_or(0),
1582                    pct,
1583                );
1584                chosen.max_cost_usd = cap_budget_remaining_f64(
1585                    chosen.max_cost_usd,
1586                    parent.budget.max_cost_usd,
1587                    usage.cost_used_usd,
1588                    pct,
1589                );
1590            }
1591        }
1592    }
1593    chosen
1594}
1595
1596fn merge_budget(base: BudgetLimit, overlay: BudgetLimit) -> BudgetLimit {
1597    BudgetLimit {
1598        max_tokens: overlay.max_tokens.or(base.max_tokens),
1599        max_steps: overlay.max_steps.or(base.max_steps),
1600        max_tool_calls: overlay.max_tool_calls.or(base.max_tool_calls),
1601        max_duration_ms: overlay.max_duration_ms.or(base.max_duration_ms),
1602        max_cost_usd: overlay.max_cost_usd.or(base.max_cost_usd),
1603    }
1604}
1605
1606fn cap_budget_remaining_u64(
1607    child: Option<u64>,
1608    parent_limit: Option<u64>,
1609    parent_used: u64,
1610    pct: u8,
1611) -> Option<u64> {
1612    match (child, parent_limit) {
1613        (Some(child), Some(parent_limit)) => {
1614            let remaining = parent_limit.saturating_sub(parent_used);
1615            Some(child.min(remaining.saturating_mul(pct as u64) / 100))
1616        }
1617        (None, Some(parent_limit)) => {
1618            let remaining = parent_limit.saturating_sub(parent_used);
1619            Some(remaining.saturating_mul(pct as u64) / 100)
1620        }
1621        (Some(child), None) => Some(child),
1622        (None, None) => None,
1623    }
1624}
1625
1626fn cap_budget_remaining_u32(
1627    child: Option<u32>,
1628    parent_limit: Option<u32>,
1629    parent_used: u32,
1630    pct: u8,
1631) -> Option<u32> {
1632    match (child, parent_limit) {
1633        (Some(child), Some(parent_limit)) => {
1634            let remaining = parent_limit.saturating_sub(parent_used);
1635            Some(child.min(remaining.saturating_mul(pct as u32) / 100))
1636        }
1637        (None, Some(parent_limit)) => {
1638            let remaining = parent_limit.saturating_sub(parent_used);
1639            Some(remaining.saturating_mul(pct as u32) / 100)
1640        }
1641        (Some(child), None) => Some(child),
1642        (None, None) => None,
1643    }
1644}
1645
1646fn cap_budget_remaining_f64(
1647    child: Option<f64>,
1648    parent_limit: Option<f64>,
1649    parent_used: f64,
1650    pct: u8,
1651) -> Option<f64> {
1652    match (child, parent_limit) {
1653        (Some(child), Some(parent_limit)) => {
1654            let remaining = (parent_limit - parent_used).max(0.0);
1655            Some(child.min(remaining * f64::from(pct) / 100.0))
1656        }
1657        (None, Some(parent_limit)) => {
1658            let remaining = (parent_limit - parent_used).max(0.0);
1659            Some(remaining * f64::from(pct) / 100.0)
1660        }
1661        (Some(child), None) => Some(child),
1662        (None, None) => None,
1663    }
1664}
1665
1666async fn compute_skill_hash(
1667    workspace_root: &str,
1668    template: &AgentTemplate,
1669    policy: &SpawnPolicy,
1670) -> Result<String, String> {
1671    use sha2::{Digest, Sha256};
1672    let mut rows = Vec::new();
1673    let skill_service = SkillService::for_workspace(Some(PathBuf::from(workspace_root)));
1674    for skill in &template.skills {
1675        if let Some(path) = skill.path.as_deref() {
1676            validate_skill_source(skill.id.as_deref(), Some(path), policy)?;
1677            let skill_path = Path::new(workspace_root).join(path);
1678            let raw = fs::read_to_string(&skill_path)
1679                .await
1680                .map_err(|_| format!("missing required skill path `{}`", skill_path.display()))?;
1681            let digest = hash_hex(raw.as_bytes());
1682            validate_pinned_hash(skill.id.as_deref(), Some(path), &digest, policy)?;
1683            rows.push(format!("path:{}:{}", path, digest));
1684        } else if let Some(id) = skill.id.as_deref() {
1685            validate_skill_source(Some(id), None, policy)?;
1686            let loaded = skill_service
1687                .load_skill(id)
1688                .map_err(|err| format!("failed loading skill `{id}`: {err}"))?;
1689            let Some(loaded) = loaded else {
1690                return Err(format!("missing required skill id `{id}`"));
1691            };
1692            let digest = hash_hex(loaded.content.as_bytes());
1693            validate_pinned_hash(Some(id), None, &digest, policy)?;
1694            rows.push(format!("id:{}:{}", id, digest));
1695        }
1696    }
1697    rows.sort();
1698    let mut hasher = Sha256::new();
1699    for row in rows {
1700        hasher.update(row.as_bytes());
1701        hasher.update(b"\n");
1702    }
1703    let digest = hasher.finalize();
1704    Ok(format!("sha256:{}", hash_hex(digest.as_slice())))
1705}
1706
1707fn validate_skill_source(
1708    id: Option<&str>,
1709    path: Option<&str>,
1710    policy: &SpawnPolicy,
1711) -> Result<(), String> {
1712    use tandem_orchestrator::SkillSourceMode;
1713    match policy.skill_sources.mode {
1714        SkillSourceMode::Any => Ok(()),
1715        SkillSourceMode::ProjectOnly => {
1716            if id.is_some() {
1717                return Err("skill source denied: project_only forbids skill IDs".to_string());
1718            }
1719            let Some(path) = path else {
1720                return Err("skill source denied: project_only requires skill path".to_string());
1721            };
1722            let p = PathBuf::from(path);
1723            if p.is_absolute() {
1724                return Err("skill source denied: absolute skill paths are forbidden".to_string());
1725            }
1726            Ok(())
1727        }
1728        SkillSourceMode::Allowlist => {
1729            if let Some(id) = id {
1730                if policy.skill_sources.allowlist_ids.iter().any(|v| v == id) {
1731                    return Ok(());
1732                }
1733            }
1734            if let Some(path) = path {
1735                if policy
1736                    .skill_sources
1737                    .allowlist_paths
1738                    .iter()
1739                    .any(|v| v == path)
1740                {
1741                    return Ok(());
1742                }
1743            }
1744            Err("skill source denied: not present in allowlist".to_string())
1745        }
1746    }
1747}
1748
1749fn validate_pinned_hash(
1750    id: Option<&str>,
1751    path: Option<&str>,
1752    digest: &str,
1753    policy: &SpawnPolicy,
1754) -> Result<(), String> {
1755    let by_id = id.and_then(|id| policy.skill_sources.pinned_hashes.get(&format!("id:{id}")));
1756    let by_path = path.and_then(|path| {
1757        policy
1758            .skill_sources
1759            .pinned_hashes
1760            .get(&format!("path:{path}"))
1761    });
1762    let expected = by_id.or(by_path);
1763    if let Some(expected) = expected {
1764        let normalized = expected.strip_prefix("sha256:").unwrap_or(expected);
1765        if normalized != digest {
1766            return Err("pinned hash mismatch for skill reference".to_string());
1767        }
1768    }
1769    Ok(())
1770}
1771
1772fn hash_hex(bytes: &[u8]) -> String {
1773    let mut out = String::with_capacity(bytes.len() * 2);
1774    for byte in bytes {
1775        use std::fmt::Write as _;
1776        let _ = write!(&mut out, "{:02x}", byte);
1777    }
1778    out
1779}
1780
1781fn estimate_tokens(text: &str) -> u64 {
1782    let chars = text.chars().count() as u64;
1783    (chars / 4).max(1)
1784}
1785
1786fn extract_session_id(event: &EngineEvent) -> Option<String> {
1787    event
1788        .properties
1789        .get("sessionID")
1790        .and_then(|v| v.as_str())
1791        .map(|v| v.to_string())
1792        .or_else(|| {
1793            event
1794                .properties
1795                .get("part")
1796                .and_then(|v| v.get("sessionID"))
1797                .and_then(|v| v.as_str())
1798                .map(|v| v.to_string())
1799        })
1800}
1801
1802fn merge_metadata_usage(
1803    metadata: Option<Value>,
1804    tokens_used: u64,
1805    steps_used: u32,
1806    tool_calls_used: u32,
1807    cost_used_usd: f64,
1808    elapsed_ms: u64,
1809) -> Value {
1810    let mut base = metadata
1811        .and_then(|v| v.as_object().cloned())
1812        .unwrap_or_default();
1813    base.insert(
1814        "budgetUsage".to_string(),
1815        json!({
1816            "tokensUsed": tokens_used,
1817            "stepsUsed": steps_used,
1818            "toolCallsUsed": tool_calls_used,
1819            "costUsedUsd": cost_used_usd,
1820            "elapsedMs": elapsed_ms
1821        }),
1822    );
1823    Value::Object(base)
1824}
1825
1826fn instance_workspace_root(instance: &AgentInstance) -> Option<Value> {
1827    instance
1828        .metadata
1829        .as_ref()
1830        .and_then(|row| row.get("workspaceRoot"))
1831        .cloned()
1832}
1833
1834fn instance_workspace_repo_root(instance: &AgentInstance) -> Option<Value> {
1835    instance
1836        .metadata
1837        .as_ref()
1838        .and_then(|row| row.get("workspaceRepoRoot"))
1839        .cloned()
1840}
1841
1842fn instance_managed_worktree(instance: &AgentInstance) -> Option<Value> {
1843    instance
1844        .metadata
1845        .as_ref()
1846        .and_then(|row| row.get("managedWorktree"))
1847        .cloned()
1848}
1849
1850async fn prepare_agent_instance_workspace(
1851    state: &AppState,
1852    workspace_root: &str,
1853    mission_id: Option<&str>,
1854    instance_id: &str,
1855    template_id: &str,
1856) -> Option<crate::runtime::worktrees::ManagedWorktreeEnsureResult> {
1857    let repo_root = crate::runtime::worktrees::resolve_git_repo_root(workspace_root)?;
1858    crate::runtime::worktrees::ensure_managed_worktree(
1859        state,
1860        crate::runtime::worktrees::ManagedWorktreeEnsureInput {
1861            repo_root,
1862            task_id: mission_id.map(ToString::to_string),
1863            owner_run_id: Some(instance_id.to_string()),
1864            lease_id: None,
1865            branch_hint: Some(template_id.to_string()),
1866            base: "HEAD".to_string(),
1867            cleanup_branch: true,
1868        },
1869    )
1870    .await
1871    .ok()
1872}
1873
1874async fn cleanup_instance_managed_worktree(state: &AppState, instance: &AgentInstance) {
1875    let Some(metadata) = instance.metadata.as_ref() else {
1876        return;
1877    };
1878    let Some(worktree) = metadata.get("managedWorktree").and_then(Value::as_object) else {
1879        return;
1880    };
1881    let Some(path) = worktree.get("path").and_then(Value::as_str) else {
1882        return;
1883    };
1884    let Some(branch) = worktree.get("branch").and_then(Value::as_str) else {
1885        return;
1886    };
1887    let Some(repo_root) = worktree.get("repoRoot").and_then(Value::as_str) else {
1888        return;
1889    };
1890    let record = crate::ManagedWorktreeRecord {
1891        key: crate::runtime::worktrees::managed_worktree_key(
1892            repo_root,
1893            instance.mission_id.as_str().into(),
1894            Some(instance.instance_id.as_str()),
1895            None,
1896            path,
1897            branch,
1898        ),
1899        repo_root: repo_root.to_string(),
1900        path: path.to_string(),
1901        branch: branch.to_string(),
1902        base: "HEAD".to_string(),
1903        managed: true,
1904        task_id: Some(instance.mission_id.clone()),
1905        owner_run_id: Some(instance.instance_id.clone()),
1906        lease_id: None,
1907        cleanup_branch: worktree
1908            .get("cleanupBranch")
1909            .and_then(Value::as_bool)
1910            .unwrap_or(true),
1911        created_at_ms: 0,
1912        updated_at_ms: 0,
1913    };
1914    let _ = crate::runtime::worktrees::delete_managed_worktree(state, &record).await;
1915}
1916
1917fn normalize_tool_name(name: &str) -> String {
1918    match name.trim().to_lowercase().replace('-', "_").as_str() {
1919        "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
1920        other => other.to_string(),
1921    }
1922}
1923
1924async fn evaluate_capability_deny(
1925    state: &AppState,
1926    instance: &AgentInstance,
1927    tool: &str,
1928    args: &Value,
1929    caps: &tandem_orchestrator::CapabilitySpec,
1930    session_id: &str,
1931    message_id: &str,
1932) -> Option<String> {
1933    let deny_patterns = caps
1934        .tool_denylist
1935        .iter()
1936        .map(|name| normalize_tool_name(name))
1937        .collect::<Vec<_>>();
1938    if !deny_patterns.is_empty() && any_policy_matches(&deny_patterns, tool) {
1939        return Some(format!("tool `{tool}` denied by agent capability policy"));
1940    }
1941    let allow_patterns = caps
1942        .tool_allowlist
1943        .iter()
1944        .map(|name| normalize_tool_name(name))
1945        .collect::<Vec<_>>();
1946    if !allow_patterns.is_empty() && !any_policy_matches(&allow_patterns, tool) {
1947        return Some(format!("tool `{tool}` not in agent allowlist"));
1948    }
1949
1950    let browser_execution_tool = matches!(
1951        tool,
1952        "browser_open"
1953            | "browser_navigate"
1954            | "browser_snapshot"
1955            | "browser_click"
1956            | "browser_type"
1957            | "browser_press"
1958            | "browser_wait"
1959            | "browser_extract"
1960            | "browser_screenshot"
1961            | "browser_close"
1962    );
1963
1964    if matches!(
1965        tool,
1966        "websearch" | "webfetch" | "webfetch_html" | "browser_open" | "browser_navigate"
1967    ) || browser_execution_tool
1968    {
1969        if !caps.net_scopes.enabled {
1970            return Some("network disabled for this agent instance".to_string());
1971        }
1972        if !caps.net_scopes.allow_hosts.is_empty() {
1973            if tool == "websearch" {
1974                return Some(
1975                    "websearch blocked: host allowlist cannot be verified for search tool"
1976                        .to_string(),
1977                );
1978            }
1979            if let Some(host) = extract_url_host(args) {
1980                let allowed = caps.net_scopes.allow_hosts.iter().any(|h| {
1981                    let allowed = h.trim().to_ascii_lowercase();
1982                    !allowed.is_empty()
1983                        && (host == allowed || host.ends_with(&format!(".{allowed}")))
1984                });
1985                if !allowed {
1986                    return Some(format!("network host `{host}` not in allow_hosts"));
1987                }
1988            }
1989        }
1990    }
1991
1992    if tool == "bash" {
1993        let cmd = args
1994            .get("command")
1995            .and_then(|v| v.as_str())
1996            .unwrap_or("")
1997            .to_ascii_lowercase();
1998        if cmd.contains("git push") {
1999            if !caps.git_caps.push {
2000                return Some("git push disabled for this agent instance".to_string());
2001            }
2002            if caps.git_caps.push_requires_approval {
2003                let action = state.permissions.evaluate("git_push", "git_push").await;
2004                match action {
2005                    tandem_core::PermissionAction::Allow => {}
2006                    tandem_core::PermissionAction::Deny => {
2007                        return Some("git push denied by policy rule".to_string());
2008                    }
2009                    tandem_core::PermissionAction::Ask => {
2010                        let pending = state
2011                            .permissions
2012                            .ask_for_session_with_context(
2013                                Some(session_id),
2014                                "git_push",
2015                                args.clone(),
2016                                Some(tandem_core::PermissionArgsContext {
2017                                    args_source: "agent_team.git_push".to_string(),
2018                                    args_integrity: "runtime-checked".to_string(),
2019                                    query: Some(format!(
2020                                        "instanceID={} messageID={}",
2021                                        instance.instance_id, message_id
2022                                    )),
2023                                }),
2024                            )
2025                            .await;
2026                        return Some(format!(
2027                            "git push requires explicit user approval (approvalID={})",
2028                            pending.id
2029                        ));
2030                    }
2031                }
2032            }
2033        }
2034        if cmd.contains("git commit") && !caps.git_caps.commit {
2035            return Some("git commit disabled for this agent instance".to_string());
2036        }
2037    }
2038
2039    let access_kind = tool_fs_access_kind(tool);
2040    if let Some(kind) = access_kind {
2041        let Some(session) = state.storage.get_session(session_id).await else {
2042            return Some("session not found for capability evaluation".to_string());
2043        };
2044        let Some(root) = session.workspace_root.clone() else {
2045            return Some("workspace root missing for capability evaluation".to_string());
2046        };
2047        let requested = extract_tool_candidate_paths(tool, args);
2048        if !requested.is_empty() {
2049            let allowed_scopes = if kind == "read" {
2050                &caps.fs_scopes.read
2051            } else {
2052                &caps.fs_scopes.write
2053            };
2054            if allowed_scopes.is_empty() {
2055                return Some(format!("fs {kind} access blocked: no scopes configured"));
2056            }
2057            for candidate in requested {
2058                if !is_path_allowed_by_scopes(&root, &candidate, allowed_scopes) {
2059                    return Some(format!("fs {kind} access denied for path `{}`", candidate));
2060                }
2061            }
2062        }
2063    }
2064
2065    denied_secrets_reason(tool, caps, args)
2066}
2067
2068fn denied_secrets_reason(
2069    tool: &str,
2070    caps: &tandem_orchestrator::CapabilitySpec,
2071    args: &Value,
2072) -> Option<String> {
2073    if tool == "auth" {
2074        if caps.secrets_scopes.is_empty() {
2075            return Some("secrets are disabled for this agent instance".to_string());
2076        }
2077        let alias = args
2078            .get("id")
2079            .or_else(|| args.get("provider"))
2080            .or_else(|| args.get("providerID"))
2081            .and_then(|v| v.as_str())
2082            .unwrap_or("")
2083            .trim();
2084        if !alias.is_empty() && !caps.secrets_scopes.iter().any(|allowed| allowed == alias) {
2085            return Some(format!(
2086                "secret alias `{alias}` is not in agent secretsScopes allowlist"
2087            ));
2088        }
2089    }
2090    None
2091}
2092
2093fn tool_fs_access_kind(tool: &str) -> Option<&'static str> {
2094    match tool {
2095        "read" | "glob" | "grep" | "codesearch" | "lsp" => Some("read"),
2096        "write" | "edit" | "apply_patch" => Some("write"),
2097        _ => None,
2098    }
2099}
2100
2101fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
2102    let Some(obj) = args.as_object() else {
2103        return Vec::new();
2104    };
2105    let keys: &[&str] = match tool {
2106        "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
2107        "glob" => &["pattern"],
2108        "lsp" => &["filePath", "path"],
2109        "bash" => &["cwd"],
2110        "apply_patch" => &["path"],
2111        _ => &["path", "cwd"],
2112    };
2113    keys.iter()
2114        .filter_map(|key| obj.get(*key))
2115        .filter_map(|value| value.as_str())
2116        .filter(|s| !s.trim().is_empty())
2117        .map(|raw| strip_glob_tokens(raw).to_string())
2118        .collect()
2119}
2120
2121fn strip_glob_tokens(path: &str) -> &str {
2122    let mut end = path.len();
2123    for (idx, ch) in path.char_indices() {
2124        if ch == '*' || ch == '?' || ch == '[' {
2125            end = idx;
2126            break;
2127        }
2128    }
2129    &path[..end]
2130}
2131
2132fn is_path_allowed_by_scopes(root: &str, candidate: &str, scopes: &[String]) -> bool {
2133    let root_path = PathBuf::from(root);
2134    let candidate_path = resolve_path(&root_path, candidate);
2135    scopes.iter().any(|scope| {
2136        let scope_path = resolve_path(&root_path, scope);
2137        candidate_path.starts_with(scope_path)
2138    })
2139}
2140
2141fn resolve_path(root: &Path, raw: &str) -> PathBuf {
2142    let raw = raw.trim();
2143    if raw.is_empty() {
2144        return root.to_path_buf();
2145    }
2146    let path = PathBuf::from(raw);
2147    if path.is_absolute() {
2148        path
2149    } else {
2150        root.join(path)
2151    }
2152}
2153
2154fn extract_url_host(args: &Value) -> Option<String> {
2155    let url = args
2156        .get("url")
2157        .or_else(|| args.get("uri"))
2158        .or_else(|| args.get("link"))
2159        .and_then(|v| v.as_str())?;
2160    let raw = url.trim();
2161    let (_, after_scheme) = raw.split_once("://")?;
2162    let host_port = after_scheme.split('/').next().unwrap_or_default();
2163    let host = host_port.split('@').next_back().unwrap_or_default();
2164    let host = host
2165        .split(':')
2166        .next()
2167        .unwrap_or_default()
2168        .to_ascii_lowercase();
2169    if host.is_empty() {
2170        None
2171    } else {
2172        Some(host)
2173    }
2174}
2175
2176pub fn emit_spawn_requested(state: &AppState, req: &SpawnRequest) {
2177    emit_spawn_requested_with_context(state, req, &SpawnEventContext::default());
2178}
2179
2180pub fn emit_spawn_denied(state: &AppState, req: &SpawnRequest, decision: &SpawnDecision) {
2181    emit_spawn_denied_with_context(state, req, decision, &SpawnEventContext::default());
2182}
2183
2184pub fn emit_spawn_approved(state: &AppState, req: &SpawnRequest, instance: &AgentInstance) {
2185    emit_spawn_approved_with_context(state, req, instance, &SpawnEventContext::default());
2186}
2187
2188#[derive(Default)]
2189pub struct SpawnEventContext<'a> {
2190    pub session_id: Option<&'a str>,
2191    pub message_id: Option<&'a str>,
2192    pub run_id: Option<&'a str>,
2193}
2194
2195pub fn emit_spawn_requested_with_context(
2196    state: &AppState,
2197    req: &SpawnRequest,
2198    ctx: &SpawnEventContext<'_>,
2199) {
2200    state.event_bus.publish(EngineEvent::new(
2201        "agent_team.spawn.requested",
2202        json!({
2203            "sessionID": ctx.session_id,
2204            "messageID": ctx.message_id,
2205            "runID": ctx.run_id,
2206            "missionID": req.mission_id,
2207            "instanceID": Value::Null,
2208            "parentInstanceID": req.parent_instance_id,
2209            "source": req.source,
2210            "requestedRole": req.role,
2211            "templateID": req.template_id,
2212            "justification": req.justification,
2213            "timestampMs": crate::now_ms(),
2214        }),
2215    ));
2216}
2217
2218pub fn emit_spawn_denied_with_context(
2219    state: &AppState,
2220    req: &SpawnRequest,
2221    decision: &SpawnDecision,
2222    ctx: &SpawnEventContext<'_>,
2223) {
2224    state.event_bus.publish(EngineEvent::new(
2225        "agent_team.spawn.denied",
2226        json!({
2227            "sessionID": ctx.session_id,
2228            "messageID": ctx.message_id,
2229            "runID": ctx.run_id,
2230            "missionID": req.mission_id,
2231            "instanceID": Value::Null,
2232            "parentInstanceID": req.parent_instance_id,
2233            "source": req.source,
2234            "requestedRole": req.role,
2235            "templateID": req.template_id,
2236            "code": decision.code,
2237            "error": decision.reason,
2238            "timestampMs": crate::now_ms(),
2239        }),
2240    ));
2241}
2242
2243pub fn emit_spawn_approved_with_context(
2244    state: &AppState,
2245    req: &SpawnRequest,
2246    instance: &AgentInstance,
2247    ctx: &SpawnEventContext<'_>,
2248) {
2249    state.event_bus.publish(EngineEvent::new(
2250        "agent_team.spawn.approved",
2251        json!({
2252            "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2253            "messageID": ctx.message_id,
2254            "runID": ctx.run_id.or(instance.run_id.as_deref()),
2255            "missionID": instance.mission_id,
2256            "instanceID": instance.instance_id,
2257            "parentInstanceID": instance.parent_instance_id,
2258            "source": req.source,
2259            "requestedRole": req.role,
2260            "templateID": instance.template_id,
2261            "skillHash": instance.skill_hash,
2262            "workspaceRoot": instance_workspace_root(instance),
2263            "workspaceRepoRoot": instance_workspace_repo_root(instance),
2264            "managedWorktree": instance_managed_worktree(instance),
2265            "timestampMs": crate::now_ms(),
2266        }),
2267    ));
2268    state.event_bus.publish(EngineEvent::new(
2269        "agent_team.instance.started",
2270        json!({
2271            "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2272            "messageID": ctx.message_id,
2273            "runID": ctx.run_id.or(instance.run_id.as_deref()),
2274            "missionID": instance.mission_id,
2275            "instanceID": instance.instance_id,
2276            "parentInstanceID": instance.parent_instance_id,
2277            "role": instance.role,
2278            "status": instance.status,
2279            "budgetLimit": instance.budget,
2280            "skillHash": instance.skill_hash,
2281            "workspaceRoot": instance_workspace_root(instance),
2282            "workspaceRepoRoot": instance_workspace_repo_root(instance),
2283            "managedWorktree": instance_managed_worktree(instance),
2284            "timestampMs": crate::now_ms(),
2285        }),
2286    ));
2287}
2288
2289pub fn emit_budget_usage(
2290    state: &AppState,
2291    instance: &AgentInstance,
2292    tokens_used: u64,
2293    steps_used: u32,
2294    tool_calls_used: u32,
2295    cost_used_usd: f64,
2296    elapsed_ms: u64,
2297) {
2298    state.event_bus.publish(EngineEvent::new(
2299        "agent_team.budget.usage",
2300        json!({
2301            "sessionID": instance.session_id,
2302            "messageID": Value::Null,
2303            "runID": instance.run_id,
2304            "missionID": instance.mission_id,
2305            "instanceID": instance.instance_id,
2306            "tokensUsed": tokens_used,
2307            "stepsUsed": steps_used,
2308            "toolCallsUsed": tool_calls_used,
2309            "costUsedUsd": cost_used_usd,
2310            "elapsedMs": elapsed_ms,
2311            "timestampMs": crate::now_ms(),
2312        }),
2313    ));
2314}
2315
2316pub fn emit_budget_exhausted(
2317    state: &AppState,
2318    instance: &AgentInstance,
2319    exhausted_by: &str,
2320    tokens_used: u64,
2321    steps_used: u32,
2322    tool_calls_used: u32,
2323    cost_used_usd: f64,
2324    elapsed_ms: u64,
2325) {
2326    state.event_bus.publish(EngineEvent::new(
2327        "agent_team.budget.exhausted",
2328        json!({
2329            "sessionID": instance.session_id,
2330            "messageID": Value::Null,
2331            "runID": instance.run_id,
2332            "missionID": instance.mission_id,
2333            "instanceID": instance.instance_id,
2334            "exhaustedBy": exhausted_by,
2335            "tokensUsed": tokens_used,
2336            "stepsUsed": steps_used,
2337            "toolCallsUsed": tool_calls_used,
2338            "costUsedUsd": cost_used_usd,
2339            "elapsedMs": elapsed_ms,
2340            "timestampMs": crate::now_ms(),
2341        }),
2342    ));
2343}
2344
2345pub fn emit_instance_cancelled(state: &AppState, instance: &AgentInstance, reason: &str) {
2346    state.event_bus.publish(EngineEvent::new(
2347        "agent_team.instance.cancelled",
2348        json!({
2349            "sessionID": instance.session_id,
2350            "messageID": Value::Null,
2351            "runID": instance.run_id,
2352            "missionID": instance.mission_id,
2353            "instanceID": instance.instance_id,
2354            "parentInstanceID": instance.parent_instance_id,
2355            "role": instance.role,
2356            "status": instance.status,
2357            "reason": reason,
2358            "workspaceRoot": instance_workspace_root(instance),
2359            "workspaceRepoRoot": instance_workspace_repo_root(instance),
2360            "managedWorktree": instance_managed_worktree(instance),
2361            "timestampMs": crate::now_ms(),
2362        }),
2363    ));
2364}
2365
2366pub fn emit_instance_completed(state: &AppState, instance: &AgentInstance) {
2367    state.event_bus.publish(EngineEvent::new(
2368        "agent_team.instance.completed",
2369        json!({
2370            "sessionID": instance.session_id,
2371            "messageID": Value::Null,
2372            "runID": instance.run_id,
2373            "missionID": instance.mission_id,
2374            "instanceID": instance.instance_id,
2375            "parentInstanceID": instance.parent_instance_id,
2376            "role": instance.role,
2377            "status": instance.status,
2378            "workspaceRoot": instance_workspace_root(instance),
2379            "workspaceRepoRoot": instance_workspace_repo_root(instance),
2380            "managedWorktree": instance_managed_worktree(instance),
2381            "timestampMs": crate::now_ms(),
2382        }),
2383    ));
2384}
2385
2386pub fn emit_instance_failed(state: &AppState, instance: &AgentInstance) {
2387    state.event_bus.publish(EngineEvent::new(
2388        "agent_team.instance.failed",
2389        json!({
2390            "sessionID": instance.session_id,
2391            "messageID": Value::Null,
2392            "runID": instance.run_id,
2393            "missionID": instance.mission_id,
2394            "instanceID": instance.instance_id,
2395            "parentInstanceID": instance.parent_instance_id,
2396            "role": instance.role,
2397            "status": instance.status,
2398            "workspaceRoot": instance_workspace_root(instance),
2399            "workspaceRepoRoot": instance_workspace_repo_root(instance),
2400            "managedWorktree": instance_managed_worktree(instance),
2401            "timestampMs": crate::now_ms(),
2402        }),
2403    ));
2404}
2405
2406pub fn emit_mission_budget_exhausted(
2407    state: &AppState,
2408    mission_id: &str,
2409    exhausted_by: &str,
2410    tokens_used: u64,
2411    steps_used: u64,
2412    tool_calls_used: u64,
2413    cost_used_usd: f64,
2414) {
2415    state.event_bus.publish(EngineEvent::new(
2416        "agent_team.mission.budget.exhausted",
2417        json!({
2418            "sessionID": Value::Null,
2419            "messageID": Value::Null,
2420            "runID": Value::Null,
2421            "missionID": mission_id,
2422            "instanceID": Value::Null,
2423            "exhaustedBy": exhausted_by,
2424            "tokensUsed": tokens_used,
2425            "stepsUsed": steps_used,
2426            "toolCallsUsed": tool_calls_used,
2427            "costUsedUsd": cost_used_usd,
2428            "timestampMs": crate::now_ms(),
2429        }),
2430    ));
2431}