Skip to main content

tandem_server/
agent_teams.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::Context;
7use futures::future::BoxFuture;
8use serde::Deserialize;
9use serde::Serialize;
10use serde_json::{json, Value};
11use tandem_core::{
12    any_policy_matches, SpawnAgentHook, SpawnAgentToolContext, SpawnAgentToolResult,
13    ToolPolicyContext, ToolPolicyDecision, ToolPolicyHook,
14};
15use tandem_orchestrator::{
16    AgentInstance, AgentInstanceStatus, AgentRole, AgentTemplate, BudgetLimit, SpawnDecision,
17    SpawnDenyCode, SpawnPolicy, SpawnRequest, SpawnSource,
18};
19use tandem_skills::SkillService;
20use tandem_types::{EngineEvent, Session};
21use tokio::fs;
22use tokio::sync::RwLock;
23use uuid::Uuid;
24
25use crate::AppState;
26
27#[derive(Clone, Default)]
28pub struct AgentTeamRuntime {
29    policy: Arc<RwLock<Option<SpawnPolicy>>>,
30    templates: Arc<RwLock<HashMap<String, AgentTemplate>>>,
31    instances: Arc<RwLock<HashMap<String, AgentInstance>>>,
32    budgets: Arc<RwLock<HashMap<String, InstanceBudgetState>>>,
33    mission_budgets: Arc<RwLock<HashMap<String, MissionBudgetState>>>,
34    spawn_approvals: Arc<RwLock<HashMap<String, PendingSpawnApproval>>>,
35    loaded_workspace: Arc<RwLock<Option<String>>>,
36    audit_path: Arc<RwLock<PathBuf>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct SpawnResult {
41    pub decision: SpawnDecision,
42    pub instance: Option<AgentInstance>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46pub struct AgentMissionSummary {
47    #[serde(rename = "missionID")]
48    pub mission_id: String,
49    #[serde(rename = "instanceCount")]
50    pub instance_count: usize,
51    #[serde(rename = "runningCount")]
52    pub running_count: usize,
53    #[serde(rename = "completedCount")]
54    pub completed_count: usize,
55    #[serde(rename = "failedCount")]
56    pub failed_count: usize,
57    #[serde(rename = "cancelledCount")]
58    pub cancelled_count: usize,
59    #[serde(rename = "queuedCount")]
60    pub queued_count: usize,
61    #[serde(rename = "tokenUsedTotal")]
62    pub token_used_total: u64,
63    #[serde(rename = "toolCallsUsedTotal")]
64    pub tool_calls_used_total: u64,
65    #[serde(rename = "stepsUsedTotal")]
66    pub steps_used_total: u64,
67    #[serde(rename = "costUsedUsdTotal")]
68    pub cost_used_usd_total: f64,
69}
70
71#[derive(Debug, Clone, Default)]
72struct InstanceBudgetState {
73    tokens_used: u64,
74    steps_used: u32,
75    tool_calls_used: u32,
76    cost_used_usd: f64,
77    started_at: Option<Instant>,
78    exhausted: bool,
79}
80
81#[derive(Debug, Clone, Default)]
82struct MissionBudgetState {
83    tokens_used: u64,
84    steps_used: u64,
85    tool_calls_used: u64,
86    cost_used_usd: f64,
87    exhausted: bool,
88}
89
90#[derive(Debug, Clone, Serialize)]
91pub struct PendingSpawnApproval {
92    #[serde(rename = "approvalID")]
93    pub approval_id: String,
94    #[serde(rename = "createdAtMs")]
95    pub created_at_ms: u64,
96    pub request: SpawnRequest,
97    #[serde(rename = "decisionCode")]
98    pub decision_code: Option<SpawnDenyCode>,
99    pub reason: Option<String>,
100}
101
102#[derive(Clone)]
103pub struct ServerSpawnAgentHook {
104    state: AppState,
105}
106
107#[derive(Debug, Deserialize)]
108struct SpawnAgentToolInput {
109    #[serde(rename = "missionID")]
110    mission_id: Option<String>,
111    #[serde(rename = "parentInstanceID")]
112    parent_instance_id: Option<String>,
113    #[serde(rename = "templateID")]
114    template_id: Option<String>,
115    role: AgentRole,
116    source: Option<SpawnSource>,
117    justification: String,
118    #[serde(rename = "budgetOverride", default)]
119    budget_override: Option<BudgetLimit>,
120}
121
122impl ServerSpawnAgentHook {
123    pub fn new(state: AppState) -> Self {
124        Self { state }
125    }
126}
127
128impl SpawnAgentHook for ServerSpawnAgentHook {
129    fn spawn_agent(
130        &self,
131        ctx: SpawnAgentToolContext,
132    ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>> {
133        let state = self.state.clone();
134        Box::pin(async move {
135            let parsed = serde_json::from_value::<SpawnAgentToolInput>(ctx.args.clone());
136            let input = match parsed {
137                Ok(input) => input,
138                Err(err) => {
139                    return Ok(SpawnAgentToolResult {
140                        output: format!("spawn_agent denied: invalid args ({err})"),
141                        metadata: json!({
142                            "ok": false,
143                            "code": "SPAWN_INVALID_ARGS",
144                            "error": err.to_string(),
145                        }),
146                    });
147                }
148            };
149            let req = SpawnRequest {
150                mission_id: input.mission_id,
151                parent_instance_id: input.parent_instance_id,
152                source: input.source.unwrap_or(SpawnSource::ToolCall),
153                parent_role: None,
154                role: input.role,
155                template_id: input.template_id,
156                justification: input.justification,
157                budget_override: input.budget_override,
158            };
159
160            let event_ctx = SpawnEventContext {
161                session_id: Some(ctx.session_id.as_str()),
162                message_id: Some(ctx.message_id.as_str()),
163                run_id: None,
164            };
165            emit_spawn_requested_with_context(&state, &req, &event_ctx);
166            let result = state.agent_teams.spawn(&state, req.clone()).await;
167            if !result.decision.allowed || result.instance.is_none() {
168                emit_spawn_denied_with_context(&state, &req, &result.decision, &event_ctx);
169                return Ok(SpawnAgentToolResult {
170                    output: result
171                        .decision
172                        .reason
173                        .clone()
174                        .unwrap_or_else(|| "spawn_agent denied".to_string()),
175                    metadata: json!({
176                        "ok": false,
177                        "code": result.decision.code,
178                        "error": result.decision.reason,
179                        "requiresUserApproval": result.decision.requires_user_approval,
180                    }),
181                });
182            }
183            let instance = result.instance.expect("checked is_some");
184            emit_spawn_approved_with_context(&state, &req, &instance, &event_ctx);
185            Ok(SpawnAgentToolResult {
186                output: format!(
187                    "spawned {} as instance {} (session {})",
188                    instance.template_id, instance.instance_id, instance.session_id
189                ),
190                metadata: json!({
191                    "ok": true,
192                    "missionID": instance.mission_id,
193                    "instanceID": instance.instance_id,
194                    "sessionID": instance.session_id,
195                    "runID": instance.run_id,
196                    "status": instance.status,
197                    "skillHash": instance.skill_hash,
198                }),
199            })
200        })
201    }
202}
203
204#[derive(Clone)]
205pub struct ServerToolPolicyHook {
206    state: AppState,
207}
208
209impl ServerToolPolicyHook {
210    pub fn new(state: AppState) -> Self {
211        Self { state }
212    }
213}
214
215impl ToolPolicyHook for ServerToolPolicyHook {
216    fn evaluate_tool(
217        &self,
218        ctx: ToolPolicyContext,
219    ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>> {
220        let state = self.state.clone();
221        Box::pin(async move {
222            let tool = normalize_tool_name(&ctx.tool);
223            if let Some(policy) = state.routine_session_policy(&ctx.session_id).await {
224                let allowed_patterns = policy
225                    .allowed_tools
226                    .iter()
227                    .map(|name| normalize_tool_name(name))
228                    .collect::<Vec<_>>();
229                if !policy.allowed_tools.is_empty() && !any_policy_matches(&allowed_patterns, &tool)
230                {
231                    let reason = format!(
232                        "tool `{}` is not allowed for routine `{}` (run `{}`)",
233                        tool, policy.routine_id, policy.run_id
234                    );
235                    state.event_bus.publish(EngineEvent::new(
236                        "routine.tool.denied",
237                        json!({
238                            "sessionID": ctx.session_id,
239                            "messageID": ctx.message_id,
240                            "runID": policy.run_id,
241                            "routineID": policy.routine_id,
242                            "tool": tool,
243                            "reason": reason,
244                            "timestampMs": crate::now_ms(),
245                        }),
246                    ));
247                    return Ok(ToolPolicyDecision {
248                        allowed: false,
249                        reason: Some(reason),
250                    });
251                }
252            }
253
254            let Some(instance) = state
255                .agent_teams
256                .instance_for_session(&ctx.session_id)
257                .await
258            else {
259                return Ok(ToolPolicyDecision {
260                    allowed: true,
261                    reason: None,
262                });
263            };
264            let caps = instance.capabilities.clone();
265            let deny = evaluate_capability_deny(
266                &state,
267                &instance,
268                &tool,
269                &ctx.args,
270                &caps,
271                &ctx.session_id,
272                &ctx.message_id,
273            )
274            .await;
275            if let Some(reason) = deny {
276                state.event_bus.publish(EngineEvent::new(
277                    "agent_team.capability.denied",
278                    json!({
279                        "sessionID": ctx.session_id,
280                        "messageID": ctx.message_id,
281                        "runID": instance.run_id,
282                        "missionID": instance.mission_id,
283                        "instanceID": instance.instance_id,
284                        "tool": tool,
285                        "reason": reason,
286                        "timestampMs": crate::now_ms(),
287                    }),
288                ));
289                return Ok(ToolPolicyDecision {
290                    allowed: false,
291                    reason: Some(reason),
292                });
293            }
294            Ok(ToolPolicyDecision {
295                allowed: true,
296                reason: None,
297            })
298        })
299    }
300}
301
302impl AgentTeamRuntime {
303    pub fn new(audit_path: PathBuf) -> Self {
304        Self {
305            policy: Arc::new(RwLock::new(None)),
306            templates: Arc::new(RwLock::new(HashMap::new())),
307            instances: Arc::new(RwLock::new(HashMap::new())),
308            budgets: Arc::new(RwLock::new(HashMap::new())),
309            mission_budgets: Arc::new(RwLock::new(HashMap::new())),
310            spawn_approvals: Arc::new(RwLock::new(HashMap::new())),
311            loaded_workspace: Arc::new(RwLock::new(None)),
312            audit_path: Arc::new(RwLock::new(audit_path)),
313        }
314    }
315
316    pub async fn set_audit_path(&self, path: PathBuf) {
317        *self.audit_path.write().await = path;
318    }
319
320    pub async fn list_templates(&self) -> Vec<AgentTemplate> {
321        let mut rows = self
322            .templates
323            .read()
324            .await
325            .values()
326            .cloned()
327            .collect::<Vec<_>>();
328        rows.sort_by(|a, b| a.template_id.cmp(&b.template_id));
329        rows
330    }
331
332    async fn templates_dir_for_loaded_workspace(&self) -> anyhow::Result<PathBuf> {
333        let workspace = self
334            .loaded_workspace
335            .read()
336            .await
337            .clone()
338            .ok_or_else(|| anyhow::anyhow!("agent team workspace not loaded"))?;
339        Ok(PathBuf::from(workspace)
340            .join(".tandem")
341            .join("agent-team")
342            .join("templates"))
343    }
344
345    fn template_filename(template_id: &str) -> String {
346        let safe = template_id
347            .trim()
348            .chars()
349            .map(|ch| {
350                if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
351                    ch
352                } else {
353                    '_'
354                }
355            })
356            .collect::<String>();
357        let fallback = if safe.is_empty() {
358            "template".to_string()
359        } else {
360            safe
361        };
362        format!("{fallback}.yaml")
363    }
364
365    pub async fn upsert_template(
366        &self,
367        workspace_root: &str,
368        template: AgentTemplate,
369    ) -> anyhow::Result<AgentTemplate> {
370        self.ensure_loaded_for_workspace(workspace_root).await?;
371        let templates_dir = self.templates_dir_for_loaded_workspace().await?;
372        fs::create_dir_all(&templates_dir).await?;
373        let path = templates_dir.join(Self::template_filename(&template.template_id));
374        let payload = serde_yaml::to_string(&template)?;
375        fs::write(path, payload).await?;
376        self.templates
377            .write()
378            .await
379            .insert(template.template_id.clone(), template.clone());
380        Ok(template)
381    }
382
383    pub async fn delete_template(
384        &self,
385        workspace_root: &str,
386        template_id: &str,
387    ) -> anyhow::Result<bool> {
388        self.ensure_loaded_for_workspace(workspace_root).await?;
389        let templates_dir = self.templates_dir_for_loaded_workspace().await?;
390        let path = templates_dir.join(Self::template_filename(template_id));
391        let existed = self.templates.write().await.remove(template_id).is_some();
392        if path.exists() {
393            let _ = fs::remove_file(path).await;
394        }
395        Ok(existed)
396    }
397
398    pub async fn get_template_for_workspace(
399        &self,
400        workspace_root: &str,
401        template_id: &str,
402    ) -> anyhow::Result<Option<AgentTemplate>> {
403        self.ensure_loaded_for_workspace(workspace_root).await?;
404        Ok(self.templates.read().await.get(template_id).cloned())
405    }
406
407    pub async fn list_instances(
408        &self,
409        mission_id: Option<&str>,
410        parent_instance_id: Option<&str>,
411        status: Option<AgentInstanceStatus>,
412    ) -> Vec<AgentInstance> {
413        let mut rows = self
414            .instances
415            .read()
416            .await
417            .values()
418            .filter(|instance| {
419                if let Some(mission_id) = mission_id {
420                    if instance.mission_id != mission_id {
421                        return false;
422                    }
423                }
424                if let Some(parent_id) = parent_instance_id {
425                    if instance.parent_instance_id.as_deref() != Some(parent_id) {
426                        return false;
427                    }
428                }
429                if let Some(status) = &status {
430                    if &instance.status != status {
431                        return false;
432                    }
433                }
434                true
435            })
436            .cloned()
437            .collect::<Vec<_>>();
438        rows.sort_by(|a, b| a.instance_id.cmp(&b.instance_id));
439        rows
440    }
441
442    pub async fn list_mission_summaries(&self) -> Vec<AgentMissionSummary> {
443        let instances = self.instances.read().await;
444        let mut by_mission: HashMap<String, AgentMissionSummary> = HashMap::new();
445        for instance in instances.values() {
446            let row = by_mission
447                .entry(instance.mission_id.clone())
448                .or_insert_with(|| AgentMissionSummary {
449                    mission_id: instance.mission_id.clone(),
450                    instance_count: 0,
451                    running_count: 0,
452                    completed_count: 0,
453                    failed_count: 0,
454                    cancelled_count: 0,
455                    queued_count: 0,
456                    token_used_total: 0,
457                    tool_calls_used_total: 0,
458                    steps_used_total: 0,
459                    cost_used_usd_total: 0.0,
460                });
461            row.instance_count = row.instance_count.saturating_add(1);
462            match instance.status {
463                AgentInstanceStatus::Queued => {
464                    row.queued_count = row.queued_count.saturating_add(1)
465                }
466                AgentInstanceStatus::Running => {
467                    row.running_count = row.running_count.saturating_add(1)
468                }
469                AgentInstanceStatus::Completed => {
470                    row.completed_count = row.completed_count.saturating_add(1)
471                }
472                AgentInstanceStatus::Failed => {
473                    row.failed_count = row.failed_count.saturating_add(1)
474                }
475                AgentInstanceStatus::Cancelled => {
476                    row.cancelled_count = row.cancelled_count.saturating_add(1)
477                }
478            }
479            if let Some(usage) = instance
480                .metadata
481                .as_ref()
482                .and_then(|m| m.get("budgetUsage"))
483                .and_then(|u| u.as_object())
484            {
485                row.token_used_total = row.token_used_total.saturating_add(
486                    usage
487                        .get("tokensUsed")
488                        .and_then(|v| v.as_u64())
489                        .unwrap_or(0),
490                );
491                row.tool_calls_used_total = row.tool_calls_used_total.saturating_add(
492                    usage
493                        .get("toolCallsUsed")
494                        .and_then(|v| v.as_u64())
495                        .unwrap_or(0),
496                );
497                row.steps_used_total = row
498                    .steps_used_total
499                    .saturating_add(usage.get("stepsUsed").and_then(|v| v.as_u64()).unwrap_or(0));
500                row.cost_used_usd_total += usage
501                    .get("costUsedUsd")
502                    .and_then(|v| v.as_f64())
503                    .unwrap_or(0.0);
504            }
505        }
506        let mut rows = by_mission.into_values().collect::<Vec<_>>();
507        rows.sort_by(|a, b| a.mission_id.cmp(&b.mission_id));
508        rows
509    }
510
511    pub async fn instance_for_session(&self, session_id: &str) -> Option<AgentInstance> {
512        self.instances
513            .read()
514            .await
515            .values()
516            .find(|instance| instance.session_id == session_id)
517            .cloned()
518    }
519
520    pub async fn list_spawn_approvals(&self) -> Vec<PendingSpawnApproval> {
521        let mut rows = self
522            .spawn_approvals
523            .read()
524            .await
525            .values()
526            .cloned()
527            .collect::<Vec<_>>();
528        rows.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
529        rows
530    }
531
532    pub async fn ensure_loaded_for_workspace(&self, workspace_root: &str) -> anyhow::Result<()> {
533        let normalized = workspace_root.trim().to_string();
534        let already_loaded = self
535            .loaded_workspace
536            .read()
537            .await
538            .as_ref()
539            .map(|s| s == &normalized)
540            .unwrap_or(false);
541        if already_loaded {
542            return Ok(());
543        }
544
545        let root = PathBuf::from(&normalized);
546        let policy_path = root
547            .join(".tandem")
548            .join("agent-team")
549            .join("spawn-policy.yaml");
550        let templates_dir = root.join(".tandem").join("agent-team").join("templates");
551
552        let mut next_policy = None;
553        if policy_path.exists() {
554            let raw = fs::read_to_string(&policy_path)
555                .await
556                .with_context(|| format!("failed reading {}", policy_path.display()))?;
557            let parsed = serde_yaml::from_str::<SpawnPolicy>(&raw)
558                .with_context(|| format!("failed parsing {}", policy_path.display()))?;
559            next_policy = Some(parsed);
560        }
561
562        let mut next_templates = HashMap::new();
563        if templates_dir.exists() {
564            let mut entries = fs::read_dir(&templates_dir).await?;
565            while let Some(entry) = entries.next_entry().await? {
566                let path = entry.path();
567                if !path.is_file() {
568                    continue;
569                }
570                let ext = path
571                    .extension()
572                    .and_then(|v| v.to_str())
573                    .unwrap_or_default()
574                    .to_ascii_lowercase();
575                if ext != "yaml" && ext != "yml" && ext != "json" {
576                    continue;
577                }
578                let raw = fs::read_to_string(&path).await?;
579                let template = serde_yaml::from_str::<AgentTemplate>(&raw)
580                    .with_context(|| format!("failed parsing {}", path.display()))?;
581                next_templates.insert(template.template_id.clone(), template);
582            }
583        }
584
585        *self.policy.write().await = next_policy;
586        *self.templates.write().await = next_templates;
587        *self.loaded_workspace.write().await = Some(normalized);
588        Ok(())
589    }
590
591    pub async fn spawn(&self, state: &AppState, req: SpawnRequest) -> SpawnResult {
592        self.spawn_with_approval_override(state, req, false).await
593    }
594
595    async fn spawn_with_approval_override(
596        &self,
597        state: &AppState,
598        mut req: SpawnRequest,
599        approval_override: bool,
600    ) -> SpawnResult {
601        let workspace_root = state.workspace_index.snapshot().await.root;
602        if let Err(err) = self.ensure_loaded_for_workspace(&workspace_root).await {
603            return SpawnResult {
604                decision: SpawnDecision {
605                    allowed: false,
606                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
607                    reason: Some(format!("spawn policy load failed: {}", err)),
608                    requires_user_approval: false,
609                },
610                instance: None,
611            };
612        }
613
614        let Some(policy) = self.policy.read().await.clone() else {
615            return SpawnResult {
616                decision: SpawnDecision {
617                    allowed: false,
618                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
619                    reason: Some("spawn policy file missing".to_string()),
620                    requires_user_approval: false,
621                },
622                instance: None,
623            };
624        };
625
626        let template = {
627            let templates = self.templates.read().await;
628            req.template_id
629                .as_deref()
630                .and_then(|template_id| templates.get(template_id).cloned())
631        };
632        if req.template_id.is_none() {
633            if let Some(found) = self
634                .templates
635                .read()
636                .await
637                .values()
638                .find(|t| t.role == req.role)
639                .cloned()
640            {
641                req.template_id = Some(found.template_id.clone());
642            }
643        }
644        let template = if template.is_some() {
645            template
646        } else {
647            let templates = self.templates.read().await;
648            req.template_id
649                .as_deref()
650                .and_then(|id| templates.get(id).cloned())
651        };
652
653        if req.parent_role.is_none() {
654            if let Some(parent_id) = req.parent_instance_id.as_deref() {
655                let instances = self.instances.read().await;
656                req.parent_role = instances
657                    .get(parent_id)
658                    .map(|instance| instance.role.clone());
659            }
660        }
661
662        let instances = self.instances.read().await;
663        let total_agents = instances.len();
664        let running_agents = instances
665            .values()
666            .filter(|instance| instance.status == AgentInstanceStatus::Running)
667            .count();
668        drop(instances);
669
670        let mut decision = policy.evaluate(&req, total_agents, running_agents, template.as_ref());
671        if approval_override
672            && !decision.allowed
673            && decision.requires_user_approval
674            && matches!(decision.code, Some(SpawnDenyCode::SpawnRequiresApproval))
675        {
676            decision = SpawnDecision {
677                allowed: true,
678                code: None,
679                reason: None,
680                requires_user_approval: false,
681            };
682        }
683        if !decision.allowed {
684            if decision.requires_user_approval && !approval_override {
685                self.queue_spawn_approval(&req, &decision).await;
686            }
687            return SpawnResult {
688                decision,
689                instance: None,
690            };
691        }
692
693        let mission_id = req
694            .mission_id
695            .clone()
696            .unwrap_or_else(|| "mission-default".to_string());
697
698        if let Some(reason) = self
699            .mission_budget_exceeded_reason(&policy, &mission_id)
700            .await
701        {
702            return SpawnResult {
703                decision: SpawnDecision {
704                    allowed: false,
705                    code: Some(SpawnDenyCode::SpawnMissionBudgetExceeded),
706                    reason: Some(reason),
707                    requires_user_approval: false,
708                },
709                instance: None,
710            };
711        }
712
713        let template = template.unwrap_or_else(|| AgentTemplate {
714            template_id: "default-template".to_string(),
715            display_name: None,
716            avatar_url: None,
717            role: req.role.clone(),
718            system_prompt: None,
719            default_model: None,
720            skills: Vec::new(),
721            default_budget: BudgetLimit::default(),
722            capabilities: Default::default(),
723        });
724
725        let skill_hash = match compute_skill_hash(&workspace_root, &template, &policy).await {
726            Ok(hash) => hash,
727            Err(err) => {
728                let lowered = err.to_ascii_lowercase();
729                let code = if lowered.contains("pinned hash mismatch") {
730                    SpawnDenyCode::SpawnSkillHashMismatch
731                } else if lowered.contains("skill source denied") {
732                    SpawnDenyCode::SpawnSkillSourceDenied
733                } else {
734                    SpawnDenyCode::SpawnRequiredSkillMissing
735                };
736                return SpawnResult {
737                    decision: SpawnDecision {
738                        allowed: false,
739                        code: Some(code),
740                        reason: Some(err),
741                        requires_user_approval: false,
742                    },
743                    instance: None,
744                };
745            }
746        };
747
748        let parent_snapshot = {
749            let instances = self.instances.read().await;
750            req.parent_instance_id
751                .as_deref()
752                .and_then(|id| instances.get(id).cloned())
753        };
754        let parent_usage = if let Some(parent_id) = req.parent_instance_id.as_deref() {
755            self.budgets.read().await.get(parent_id).cloned()
756        } else {
757            None
758        };
759
760        let budget = resolve_budget(
761            &policy,
762            parent_snapshot,
763            parent_usage,
764            &template,
765            req.budget_override.clone(),
766            &req.role,
767        );
768
769        let mut session = Session::new(
770            Some(format!("Agent Team {}", template.template_id)),
771            Some(workspace_root.clone()),
772        );
773        session.workspace_root = Some(workspace_root.clone());
774        let session_id = session.id.clone();
775        if let Err(err) = state.storage.save_session(session).await {
776            return SpawnResult {
777                decision: SpawnDecision {
778                    allowed: false,
779                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
780                    reason: Some(format!("failed creating child session: {}", err)),
781                    requires_user_approval: false,
782                },
783                instance: None,
784            };
785        }
786
787        let instance = AgentInstance {
788            instance_id: format!("ins_{}", Uuid::new_v4().simple()),
789            mission_id: mission_id.clone(),
790            parent_instance_id: req.parent_instance_id.clone(),
791            role: template.role.clone(),
792            template_id: template.template_id.clone(),
793            session_id: session_id.clone(),
794            run_id: None,
795            status: AgentInstanceStatus::Running,
796            budget,
797            skill_hash: skill_hash.clone(),
798            capabilities: template.capabilities.clone(),
799            metadata: Some(json!({
800                "source": req.source,
801                "justification": req.justification,
802            })),
803        };
804
805        self.instances
806            .write()
807            .await
808            .insert(instance.instance_id.clone(), instance.clone());
809        self.budgets.write().await.insert(
810            instance.instance_id.clone(),
811            InstanceBudgetState {
812                started_at: Some(Instant::now()),
813                ..InstanceBudgetState::default()
814            },
815        );
816        let _ = self.append_audit("spawn.approved", &instance).await;
817
818        SpawnResult {
819            decision: SpawnDecision {
820                allowed: true,
821                code: None,
822                reason: None,
823                requires_user_approval: false,
824            },
825            instance: Some(instance),
826        }
827    }
828
829    pub async fn approve_spawn_approval(
830        &self,
831        state: &AppState,
832        approval_id: &str,
833        reason: Option<&str>,
834    ) -> Option<SpawnResult> {
835        let approval = self.spawn_approvals.write().await.remove(approval_id)?;
836        let result = self
837            .spawn_with_approval_override(state, approval.request.clone(), true)
838            .await;
839        if let Some(instance) = result.instance.as_ref() {
840            let note = reason.unwrap_or("approved by operator");
841            let _ = self
842                .append_approval_audit("spawn.approval.approved", approval_id, Some(instance), note)
843                .await;
844        } else {
845            let note = reason.unwrap_or("approval replay failed policy checks");
846            let _ = self
847                .append_approval_audit("spawn.approval.rejected_on_replay", approval_id, None, note)
848                .await;
849        }
850        Some(result)
851    }
852
853    pub async fn deny_spawn_approval(
854        &self,
855        approval_id: &str,
856        reason: Option<&str>,
857    ) -> Option<PendingSpawnApproval> {
858        let approval = self.spawn_approvals.write().await.remove(approval_id)?;
859        let note = reason.unwrap_or("denied by operator");
860        let _ = self
861            .append_approval_audit("spawn.approval.denied", approval_id, None, note)
862            .await;
863        Some(approval)
864    }
865
866    pub async fn cancel_instance(
867        &self,
868        state: &AppState,
869        instance_id: &str,
870        reason: &str,
871    ) -> Option<AgentInstance> {
872        let mut instances = self.instances.write().await;
873        let instance = instances.get_mut(instance_id)?;
874        if matches!(
875            instance.status,
876            AgentInstanceStatus::Completed
877                | AgentInstanceStatus::Failed
878                | AgentInstanceStatus::Cancelled
879        ) {
880            return Some(instance.clone());
881        }
882        instance.status = AgentInstanceStatus::Cancelled;
883        let snapshot = instance.clone();
884        drop(instances);
885        let _ = state.cancellations.cancel(&snapshot.session_id).await;
886        let _ = self.append_audit("instance.cancelled", &snapshot).await;
887        emit_instance_cancelled(state, &snapshot, reason);
888        Some(snapshot)
889    }
890
891    async fn queue_spawn_approval(&self, req: &SpawnRequest, decision: &SpawnDecision) {
892        let approval = PendingSpawnApproval {
893            approval_id: format!("spawn_{}", Uuid::new_v4().simple()),
894            created_at_ms: crate::now_ms(),
895            request: req.clone(),
896            decision_code: decision.code.clone(),
897            reason: decision.reason.clone(),
898        };
899        self.spawn_approvals
900            .write()
901            .await
902            .insert(approval.approval_id.clone(), approval);
903    }
904
905    async fn mission_budget_exceeded_reason(
906        &self,
907        policy: &SpawnPolicy,
908        mission_id: &str,
909    ) -> Option<String> {
910        let limit = policy.mission_total_budget.as_ref()?;
911        let usage = self
912            .mission_budgets
913            .read()
914            .await
915            .get(mission_id)
916            .cloned()
917            .unwrap_or_default();
918        if let Some(max) = limit.max_tokens {
919            if usage.tokens_used >= max {
920                return Some(format!(
921                    "mission max_tokens exhausted ({}/{})",
922                    usage.tokens_used, max
923                ));
924            }
925        }
926        if let Some(max) = limit.max_steps {
927            if usage.steps_used >= u64::from(max) {
928                return Some(format!(
929                    "mission max_steps exhausted ({}/{})",
930                    usage.steps_used, max
931                ));
932            }
933        }
934        if let Some(max) = limit.max_tool_calls {
935            if usage.tool_calls_used >= u64::from(max) {
936                return Some(format!(
937                    "mission max_tool_calls exhausted ({}/{})",
938                    usage.tool_calls_used, max
939                ));
940            }
941        }
942        if let Some(max) = limit.max_cost_usd {
943            if usage.cost_used_usd >= max {
944                return Some(format!(
945                    "mission max_cost_usd exhausted ({:.6}/{:.6})",
946                    usage.cost_used_usd, max
947                ));
948            }
949        }
950        None
951    }
952
953    pub async fn cancel_mission(&self, state: &AppState, mission_id: &str, reason: &str) -> usize {
954        let instance_ids = self
955            .instances
956            .read()
957            .await
958            .values()
959            .filter(|instance| instance.mission_id == mission_id)
960            .map(|instance| instance.instance_id.clone())
961            .collect::<Vec<_>>();
962        let mut count = 0usize;
963        for instance_id in instance_ids {
964            if self
965                .cancel_instance(state, &instance_id, reason)
966                .await
967                .is_some()
968            {
969                count = count.saturating_add(1);
970            }
971        }
972        count
973    }
974
975    async fn mark_instance_terminal(
976        &self,
977        state: &AppState,
978        instance_id: &str,
979        status: AgentInstanceStatus,
980    ) -> Option<AgentInstance> {
981        let mut instances = self.instances.write().await;
982        let instance = instances.get_mut(instance_id)?;
983        if matches!(
984            instance.status,
985            AgentInstanceStatus::Completed
986                | AgentInstanceStatus::Failed
987                | AgentInstanceStatus::Cancelled
988        ) {
989            return Some(instance.clone());
990        }
991        instance.status = status.clone();
992        let snapshot = instance.clone();
993        drop(instances);
994        match status {
995            AgentInstanceStatus::Completed => emit_instance_completed(state, &snapshot),
996            AgentInstanceStatus::Failed => emit_instance_failed(state, &snapshot),
997            _ => {}
998        }
999        Some(snapshot)
1000    }
1001
1002    pub async fn handle_engine_event(&self, state: &AppState, event: &EngineEvent) {
1003        let Some(session_id) = extract_session_id(event) else {
1004            return;
1005        };
1006        let Some(instance_id) = self.instance_id_for_session(&session_id).await else {
1007            return;
1008        };
1009        if event.event_type == "provider.usage" {
1010            let total_tokens = event
1011                .properties
1012                .get("totalTokens")
1013                .and_then(|v| v.as_u64())
1014                .unwrap_or(0);
1015            let cost_used_usd = event
1016                .properties
1017                .get("costUsd")
1018                .and_then(|v| v.as_f64())
1019                .unwrap_or(0.0);
1020            if total_tokens > 0 {
1021                let exhausted = self
1022                    .apply_exact_token_usage(state, &instance_id, total_tokens, cost_used_usd)
1023                    .await;
1024                if exhausted {
1025                    let _ = self
1026                        .cancel_instance(state, &instance_id, "budget exhausted")
1027                        .await;
1028                }
1029            }
1030            return;
1031        }
1032        let mut delta_tokens = 0u64;
1033        let mut delta_steps = 0u32;
1034        let mut delta_tool_calls = 0u32;
1035        if event.event_type == "message.part.updated" {
1036            if let Some(part) = event.properties.get("part") {
1037                let part_type = part.get("type").and_then(|v| v.as_str()).unwrap_or("");
1038                if part_type == "tool-invocation" {
1039                    delta_tool_calls = 1;
1040                } else if part_type == "text" {
1041                    let delta = event
1042                        .properties
1043                        .get("delta")
1044                        .and_then(|v| v.as_str())
1045                        .unwrap_or("");
1046                    if !delta.is_empty() {
1047                        delta_tokens = estimate_tokens(delta);
1048                    }
1049                }
1050            }
1051        } else if event.event_type == "session.run.finished" {
1052            delta_steps = 1;
1053            let run_status = event
1054                .properties
1055                .get("status")
1056                .and_then(|v| v.as_str())
1057                .unwrap_or("")
1058                .to_ascii_lowercase();
1059            if run_status == "completed" {
1060                let _ = self
1061                    .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Completed)
1062                    .await;
1063            } else if run_status == "failed" || run_status == "error" {
1064                let _ = self
1065                    .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Failed)
1066                    .await;
1067            }
1068        }
1069        if delta_tokens == 0 && delta_steps == 0 && delta_tool_calls == 0 {
1070            return;
1071        }
1072        let exhausted = self
1073            .apply_budget_delta(
1074                state,
1075                &instance_id,
1076                delta_tokens,
1077                delta_steps,
1078                delta_tool_calls,
1079            )
1080            .await;
1081        if exhausted {
1082            let _ = self
1083                .cancel_instance(state, &instance_id, "budget exhausted")
1084                .await;
1085        }
1086    }
1087
1088    async fn instance_id_for_session(&self, session_id: &str) -> Option<String> {
1089        self.instances
1090            .read()
1091            .await
1092            .values()
1093            .find(|instance| instance.session_id == session_id)
1094            .map(|instance| instance.instance_id.clone())
1095    }
1096
1097    async fn apply_budget_delta(
1098        &self,
1099        state: &AppState,
1100        instance_id: &str,
1101        delta_tokens: u64,
1102        delta_steps: u32,
1103        delta_tool_calls: u32,
1104    ) -> bool {
1105        let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1106            enabled: false,
1107            require_justification: false,
1108            max_agents: None,
1109            max_concurrent: None,
1110            child_budget_percent_of_parent_remaining: None,
1111            mission_total_budget: None,
1112            cost_per_1k_tokens_usd: None,
1113            spawn_edges: HashMap::new(),
1114            required_skills: HashMap::new(),
1115            role_defaults: HashMap::new(),
1116            skill_sources: Default::default(),
1117        });
1118        let mut budgets = self.budgets.write().await;
1119        let Some(usage) = budgets.get_mut(instance_id) else {
1120            return false;
1121        };
1122        if usage.exhausted {
1123            return true;
1124        }
1125        let prev_cost_used_usd = usage.cost_used_usd;
1126        usage.tokens_used = usage.tokens_used.saturating_add(delta_tokens);
1127        usage.steps_used = usage.steps_used.saturating_add(delta_steps);
1128        usage.tool_calls_used = usage.tool_calls_used.saturating_add(delta_tool_calls);
1129        if let Some(rate) = policy.cost_per_1k_tokens_usd {
1130            usage.cost_used_usd += (delta_tokens as f64 / 1000.0) * rate;
1131        }
1132        let elapsed_ms = usage
1133            .started_at
1134            .map(|started| started.elapsed().as_millis() as u64)
1135            .unwrap_or(0);
1136
1137        let mut exhausted_reason: Option<&'static str> = None;
1138        let mut snapshot: Option<AgentInstance> = None;
1139        {
1140            let mut instances = self.instances.write().await;
1141            if let Some(instance) = instances.get_mut(instance_id) {
1142                instance.metadata = Some(merge_metadata_usage(
1143                    instance.metadata.take(),
1144                    usage.tokens_used,
1145                    usage.steps_used,
1146                    usage.tool_calls_used,
1147                    usage.cost_used_usd,
1148                    elapsed_ms,
1149                ));
1150                if let Some(limit) = instance.budget.max_tokens {
1151                    if usage.tokens_used >= limit {
1152                        exhausted_reason = Some("max_tokens");
1153                    }
1154                }
1155                if exhausted_reason.is_none() {
1156                    if let Some(limit) = instance.budget.max_steps {
1157                        if usage.steps_used >= limit {
1158                            exhausted_reason = Some("max_steps");
1159                        }
1160                    }
1161                }
1162                if exhausted_reason.is_none() {
1163                    if let Some(limit) = instance.budget.max_tool_calls {
1164                        if usage.tool_calls_used >= limit {
1165                            exhausted_reason = Some("max_tool_calls");
1166                        }
1167                    }
1168                }
1169                if exhausted_reason.is_none() {
1170                    if let Some(limit) = instance.budget.max_duration_ms {
1171                        if elapsed_ms >= limit {
1172                            exhausted_reason = Some("max_duration_ms");
1173                        }
1174                    }
1175                }
1176                if exhausted_reason.is_none() {
1177                    if let Some(limit) = instance.budget.max_cost_usd {
1178                        if usage.cost_used_usd >= limit {
1179                            exhausted_reason = Some("max_cost_usd");
1180                        }
1181                    }
1182                }
1183                snapshot = Some(instance.clone());
1184            }
1185        }
1186        let Some(instance) = snapshot else {
1187            return false;
1188        };
1189        emit_budget_usage(
1190            state,
1191            &instance,
1192            usage.tokens_used,
1193            usage.steps_used,
1194            usage.tool_calls_used,
1195            usage.cost_used_usd,
1196            elapsed_ms,
1197        );
1198        let mission_exhausted = self
1199            .apply_mission_budget_delta(
1200                state,
1201                &instance.mission_id,
1202                delta_tokens,
1203                u64::from(delta_steps),
1204                u64::from(delta_tool_calls),
1205                usage.cost_used_usd - prev_cost_used_usd,
1206                &policy,
1207            )
1208            .await;
1209        if mission_exhausted {
1210            usage.exhausted = true;
1211            let _ = self
1212                .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1213                .await;
1214            return true;
1215        }
1216        if let Some(reason) = exhausted_reason {
1217            usage.exhausted = true;
1218            emit_budget_exhausted(
1219                state,
1220                &instance,
1221                reason,
1222                usage.tokens_used,
1223                usage.steps_used,
1224                usage.tool_calls_used,
1225                usage.cost_used_usd,
1226                elapsed_ms,
1227            );
1228            return true;
1229        }
1230        false
1231    }
1232
1233    async fn apply_exact_token_usage(
1234        &self,
1235        state: &AppState,
1236        instance_id: &str,
1237        total_tokens: u64,
1238        cost_used_usd: f64,
1239    ) -> bool {
1240        let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1241            enabled: false,
1242            require_justification: false,
1243            max_agents: None,
1244            max_concurrent: None,
1245            child_budget_percent_of_parent_remaining: None,
1246            mission_total_budget: None,
1247            cost_per_1k_tokens_usd: None,
1248            spawn_edges: HashMap::new(),
1249            required_skills: HashMap::new(),
1250            role_defaults: HashMap::new(),
1251            skill_sources: Default::default(),
1252        });
1253        let mut budgets = self.budgets.write().await;
1254        let Some(usage) = budgets.get_mut(instance_id) else {
1255            return false;
1256        };
1257        if usage.exhausted {
1258            return true;
1259        }
1260        let prev_tokens = usage.tokens_used;
1261        let prev_cost_used_usd = usage.cost_used_usd;
1262        usage.tokens_used = usage.tokens_used.max(total_tokens);
1263        if cost_used_usd > 0.0 {
1264            usage.cost_used_usd = usage.cost_used_usd.max(cost_used_usd);
1265        } else if let Some(rate) = policy.cost_per_1k_tokens_usd {
1266            let delta = usage.tokens_used.saturating_sub(prev_tokens);
1267            usage.cost_used_usd += (delta as f64 / 1000.0) * rate;
1268        }
1269        let elapsed_ms = usage
1270            .started_at
1271            .map(|started| started.elapsed().as_millis() as u64)
1272            .unwrap_or(0);
1273        let mut exhausted_reason: Option<&'static str> = None;
1274        let mut snapshot: Option<AgentInstance> = None;
1275        {
1276            let mut instances = self.instances.write().await;
1277            if let Some(instance) = instances.get_mut(instance_id) {
1278                instance.metadata = Some(merge_metadata_usage(
1279                    instance.metadata.take(),
1280                    usage.tokens_used,
1281                    usage.steps_used,
1282                    usage.tool_calls_used,
1283                    usage.cost_used_usd,
1284                    elapsed_ms,
1285                ));
1286                if let Some(limit) = instance.budget.max_tokens {
1287                    if usage.tokens_used >= limit {
1288                        exhausted_reason = Some("max_tokens");
1289                    }
1290                }
1291                if exhausted_reason.is_none() {
1292                    if let Some(limit) = instance.budget.max_cost_usd {
1293                        if usage.cost_used_usd >= limit {
1294                            exhausted_reason = Some("max_cost_usd");
1295                        }
1296                    }
1297                }
1298                snapshot = Some(instance.clone());
1299            }
1300        }
1301        let Some(instance) = snapshot else {
1302            return false;
1303        };
1304        emit_budget_usage(
1305            state,
1306            &instance,
1307            usage.tokens_used,
1308            usage.steps_used,
1309            usage.tool_calls_used,
1310            usage.cost_used_usd,
1311            elapsed_ms,
1312        );
1313        let mission_exhausted = self
1314            .apply_mission_budget_delta(
1315                state,
1316                &instance.mission_id,
1317                usage.tokens_used.saturating_sub(prev_tokens),
1318                0,
1319                0,
1320                usage.cost_used_usd - prev_cost_used_usd,
1321                &policy,
1322            )
1323            .await;
1324        if mission_exhausted {
1325            usage.exhausted = true;
1326            let _ = self
1327                .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1328                .await;
1329            return true;
1330        }
1331        if let Some(reason) = exhausted_reason {
1332            usage.exhausted = true;
1333            emit_budget_exhausted(
1334                state,
1335                &instance,
1336                reason,
1337                usage.tokens_used,
1338                usage.steps_used,
1339                usage.tool_calls_used,
1340                usage.cost_used_usd,
1341                elapsed_ms,
1342            );
1343            return true;
1344        }
1345        false
1346    }
1347
1348    async fn append_audit(&self, action: &str, instance: &AgentInstance) -> anyhow::Result<()> {
1349        let path = self.audit_path.read().await.clone();
1350        if let Some(parent) = path.parent() {
1351            fs::create_dir_all(parent).await?;
1352        }
1353        let row = json!({
1354            "action": action,
1355            "missionID": instance.mission_id,
1356            "instanceID": instance.instance_id,
1357            "parentInstanceID": instance.parent_instance_id,
1358            "role": instance.role,
1359            "templateID": instance.template_id,
1360            "sessionID": instance.session_id,
1361            "skillHash": instance.skill_hash,
1362            "timestampMs": crate::now_ms(),
1363        });
1364        let mut existing = if path.exists() {
1365            fs::read_to_string(&path).await.unwrap_or_default()
1366        } else {
1367            String::new()
1368        };
1369        existing.push_str(&serde_json::to_string(&row)?);
1370        existing.push('\n');
1371        fs::write(path, existing).await?;
1372        Ok(())
1373    }
1374
1375    async fn append_approval_audit(
1376        &self,
1377        action: &str,
1378        approval_id: &str,
1379        instance: Option<&AgentInstance>,
1380        reason: &str,
1381    ) -> anyhow::Result<()> {
1382        let path = self.audit_path.read().await.clone();
1383        if let Some(parent) = path.parent() {
1384            fs::create_dir_all(parent).await?;
1385        }
1386        let row = json!({
1387            "action": action,
1388            "approvalID": approval_id,
1389            "reason": reason,
1390            "missionID": instance.map(|v| v.mission_id.clone()),
1391            "instanceID": instance.map(|v| v.instance_id.clone()),
1392            "parentInstanceID": instance.and_then(|v| v.parent_instance_id.clone()),
1393            "role": instance.map(|v| v.role.clone()),
1394            "templateID": instance.map(|v| v.template_id.clone()),
1395            "sessionID": instance.map(|v| v.session_id.clone()),
1396            "skillHash": instance.map(|v| v.skill_hash.clone()),
1397            "timestampMs": crate::now_ms(),
1398        });
1399        let mut existing = if path.exists() {
1400            fs::read_to_string(&path).await.unwrap_or_default()
1401        } else {
1402            String::new()
1403        };
1404        existing.push_str(&serde_json::to_string(&row)?);
1405        existing.push('\n');
1406        fs::write(path, existing).await?;
1407        Ok(())
1408    }
1409
1410    #[allow(clippy::too_many_arguments)]
1411    async fn apply_mission_budget_delta(
1412        &self,
1413        state: &AppState,
1414        mission_id: &str,
1415        delta_tokens: u64,
1416        delta_steps: u64,
1417        delta_tool_calls: u64,
1418        delta_cost_used_usd: f64,
1419        policy: &SpawnPolicy,
1420    ) -> bool {
1421        let mut budgets = self.mission_budgets.write().await;
1422        let row = budgets.entry(mission_id.to_string()).or_default();
1423        row.tokens_used = row.tokens_used.saturating_add(delta_tokens);
1424        row.steps_used = row.steps_used.saturating_add(delta_steps);
1425        row.tool_calls_used = row.tool_calls_used.saturating_add(delta_tool_calls);
1426        row.cost_used_usd += delta_cost_used_usd.max(0.0);
1427        if row.exhausted {
1428            return true;
1429        }
1430        let Some(limit) = policy.mission_total_budget.as_ref() else {
1431            return false;
1432        };
1433        let mut exhausted_by: Option<&'static str> = None;
1434        if let Some(max) = limit.max_tokens {
1435            if row.tokens_used >= max {
1436                exhausted_by = Some("mission_max_tokens");
1437            }
1438        }
1439        if exhausted_by.is_none() {
1440            if let Some(max) = limit.max_steps {
1441                if row.steps_used >= u64::from(max) {
1442                    exhausted_by = Some("mission_max_steps");
1443                }
1444            }
1445        }
1446        if exhausted_by.is_none() {
1447            if let Some(max) = limit.max_tool_calls {
1448                if row.tool_calls_used >= u64::from(max) {
1449                    exhausted_by = Some("mission_max_tool_calls");
1450                }
1451            }
1452        }
1453        if exhausted_by.is_none() {
1454            if let Some(max) = limit.max_cost_usd {
1455                if row.cost_used_usd >= max {
1456                    exhausted_by = Some("mission_max_cost_usd");
1457                }
1458            }
1459        }
1460        if let Some(exhausted_by) = exhausted_by {
1461            row.exhausted = true;
1462            emit_mission_budget_exhausted(
1463                state,
1464                mission_id,
1465                exhausted_by,
1466                row.tokens_used,
1467                row.steps_used,
1468                row.tool_calls_used,
1469                row.cost_used_usd,
1470            );
1471            return true;
1472        }
1473        false
1474    }
1475
1476    pub async fn set_for_test(
1477        &self,
1478        workspace_root: Option<String>,
1479        policy: Option<SpawnPolicy>,
1480        templates: Vec<AgentTemplate>,
1481    ) {
1482        *self.policy.write().await = policy;
1483        let mut by_id = HashMap::new();
1484        for template in templates {
1485            by_id.insert(template.template_id.clone(), template);
1486        }
1487        *self.templates.write().await = by_id;
1488        self.instances.write().await.clear();
1489        self.budgets.write().await.clear();
1490        self.mission_budgets.write().await.clear();
1491        self.spawn_approvals.write().await.clear();
1492        *self.loaded_workspace.write().await = workspace_root;
1493    }
1494}
1495
1496fn resolve_budget(
1497    policy: &SpawnPolicy,
1498    parent_instance: Option<AgentInstance>,
1499    parent_usage: Option<InstanceBudgetState>,
1500    template: &AgentTemplate,
1501    override_budget: Option<BudgetLimit>,
1502    role: &AgentRole,
1503) -> BudgetLimit {
1504    let role_default = policy.role_defaults.get(role).cloned().unwrap_or_default();
1505    let mut chosen = merge_budget(
1506        merge_budget(role_default, template.default_budget.clone()),
1507        override_budget.unwrap_or_default(),
1508    );
1509
1510    if let Some(parent) = parent_instance {
1511        let usage = parent_usage.unwrap_or_default();
1512        if let Some(pct) = policy.child_budget_percent_of_parent_remaining {
1513            if pct > 0 {
1514                chosen.max_tokens = cap_budget_remaining_u64(
1515                    chosen.max_tokens,
1516                    parent.budget.max_tokens,
1517                    usage.tokens_used,
1518                    pct,
1519                );
1520                chosen.max_steps = cap_budget_remaining_u32(
1521                    chosen.max_steps,
1522                    parent.budget.max_steps,
1523                    usage.steps_used,
1524                    pct,
1525                );
1526                chosen.max_tool_calls = cap_budget_remaining_u32(
1527                    chosen.max_tool_calls,
1528                    parent.budget.max_tool_calls,
1529                    usage.tool_calls_used,
1530                    pct,
1531                );
1532                chosen.max_duration_ms = cap_budget_remaining_u64(
1533                    chosen.max_duration_ms,
1534                    parent.budget.max_duration_ms,
1535                    usage
1536                        .started_at
1537                        .map(|started| started.elapsed().as_millis() as u64)
1538                        .unwrap_or(0),
1539                    pct,
1540                );
1541                chosen.max_cost_usd = cap_budget_remaining_f64(
1542                    chosen.max_cost_usd,
1543                    parent.budget.max_cost_usd,
1544                    usage.cost_used_usd,
1545                    pct,
1546                );
1547            }
1548        }
1549    }
1550    chosen
1551}
1552
1553fn merge_budget(base: BudgetLimit, overlay: BudgetLimit) -> BudgetLimit {
1554    BudgetLimit {
1555        max_tokens: overlay.max_tokens.or(base.max_tokens),
1556        max_steps: overlay.max_steps.or(base.max_steps),
1557        max_tool_calls: overlay.max_tool_calls.or(base.max_tool_calls),
1558        max_duration_ms: overlay.max_duration_ms.or(base.max_duration_ms),
1559        max_cost_usd: overlay.max_cost_usd.or(base.max_cost_usd),
1560    }
1561}
1562
1563fn cap_budget_remaining_u64(
1564    child: Option<u64>,
1565    parent_limit: Option<u64>,
1566    parent_used: u64,
1567    pct: u8,
1568) -> Option<u64> {
1569    match (child, parent_limit) {
1570        (Some(child), Some(parent_limit)) => {
1571            let remaining = parent_limit.saturating_sub(parent_used);
1572            Some(child.min(remaining.saturating_mul(pct as u64) / 100))
1573        }
1574        (None, Some(parent_limit)) => {
1575            let remaining = parent_limit.saturating_sub(parent_used);
1576            Some(remaining.saturating_mul(pct as u64) / 100)
1577        }
1578        (Some(child), None) => Some(child),
1579        (None, None) => None,
1580    }
1581}
1582
1583fn cap_budget_remaining_u32(
1584    child: Option<u32>,
1585    parent_limit: Option<u32>,
1586    parent_used: u32,
1587    pct: u8,
1588) -> Option<u32> {
1589    match (child, parent_limit) {
1590        (Some(child), Some(parent_limit)) => {
1591            let remaining = parent_limit.saturating_sub(parent_used);
1592            Some(child.min(remaining.saturating_mul(pct as u32) / 100))
1593        }
1594        (None, Some(parent_limit)) => {
1595            let remaining = parent_limit.saturating_sub(parent_used);
1596            Some(remaining.saturating_mul(pct as u32) / 100)
1597        }
1598        (Some(child), None) => Some(child),
1599        (None, None) => None,
1600    }
1601}
1602
1603fn cap_budget_remaining_f64(
1604    child: Option<f64>,
1605    parent_limit: Option<f64>,
1606    parent_used: f64,
1607    pct: u8,
1608) -> Option<f64> {
1609    match (child, parent_limit) {
1610        (Some(child), Some(parent_limit)) => {
1611            let remaining = (parent_limit - parent_used).max(0.0);
1612            Some(child.min(remaining * f64::from(pct) / 100.0))
1613        }
1614        (None, Some(parent_limit)) => {
1615            let remaining = (parent_limit - parent_used).max(0.0);
1616            Some(remaining * f64::from(pct) / 100.0)
1617        }
1618        (Some(child), None) => Some(child),
1619        (None, None) => None,
1620    }
1621}
1622
1623async fn compute_skill_hash(
1624    workspace_root: &str,
1625    template: &AgentTemplate,
1626    policy: &SpawnPolicy,
1627) -> Result<String, String> {
1628    use sha2::{Digest, Sha256};
1629    let mut rows = Vec::new();
1630    let skill_service = SkillService::for_workspace(Some(PathBuf::from(workspace_root)));
1631    for skill in &template.skills {
1632        if let Some(path) = skill.path.as_deref() {
1633            validate_skill_source(skill.id.as_deref(), Some(path), policy)?;
1634            let skill_path = Path::new(workspace_root).join(path);
1635            let raw = fs::read_to_string(&skill_path)
1636                .await
1637                .map_err(|_| format!("missing required skill path `{}`", skill_path.display()))?;
1638            let digest = hash_hex(raw.as_bytes());
1639            validate_pinned_hash(skill.id.as_deref(), Some(path), &digest, policy)?;
1640            rows.push(format!("path:{}:{}", path, digest));
1641        } else if let Some(id) = skill.id.as_deref() {
1642            validate_skill_source(Some(id), None, policy)?;
1643            let loaded = skill_service
1644                .load_skill(id)
1645                .map_err(|err| format!("failed loading skill `{id}`: {err}"))?;
1646            let Some(loaded) = loaded else {
1647                return Err(format!("missing required skill id `{id}`"));
1648            };
1649            let digest = hash_hex(loaded.content.as_bytes());
1650            validate_pinned_hash(Some(id), None, &digest, policy)?;
1651            rows.push(format!("id:{}:{}", id, digest));
1652        }
1653    }
1654    rows.sort();
1655    let mut hasher = Sha256::new();
1656    for row in rows {
1657        hasher.update(row.as_bytes());
1658        hasher.update(b"\n");
1659    }
1660    let digest = hasher.finalize();
1661    Ok(format!("sha256:{}", hash_hex(digest.as_slice())))
1662}
1663
1664fn validate_skill_source(
1665    id: Option<&str>,
1666    path: Option<&str>,
1667    policy: &SpawnPolicy,
1668) -> Result<(), String> {
1669    use tandem_orchestrator::SkillSourceMode;
1670    match policy.skill_sources.mode {
1671        SkillSourceMode::Any => Ok(()),
1672        SkillSourceMode::ProjectOnly => {
1673            if id.is_some() {
1674                return Err("skill source denied: project_only forbids skill IDs".to_string());
1675            }
1676            let Some(path) = path else {
1677                return Err("skill source denied: project_only requires skill path".to_string());
1678            };
1679            let p = PathBuf::from(path);
1680            if p.is_absolute() {
1681                return Err("skill source denied: absolute skill paths are forbidden".to_string());
1682            }
1683            Ok(())
1684        }
1685        SkillSourceMode::Allowlist => {
1686            if let Some(id) = id {
1687                if policy.skill_sources.allowlist_ids.iter().any(|v| v == id) {
1688                    return Ok(());
1689                }
1690            }
1691            if let Some(path) = path {
1692                if policy
1693                    .skill_sources
1694                    .allowlist_paths
1695                    .iter()
1696                    .any(|v| v == path)
1697                {
1698                    return Ok(());
1699                }
1700            }
1701            Err("skill source denied: not present in allowlist".to_string())
1702        }
1703    }
1704}
1705
1706fn validate_pinned_hash(
1707    id: Option<&str>,
1708    path: Option<&str>,
1709    digest: &str,
1710    policy: &SpawnPolicy,
1711) -> Result<(), String> {
1712    let by_id = id.and_then(|id| policy.skill_sources.pinned_hashes.get(&format!("id:{id}")));
1713    let by_path = path.and_then(|path| {
1714        policy
1715            .skill_sources
1716            .pinned_hashes
1717            .get(&format!("path:{path}"))
1718    });
1719    let expected = by_id.or(by_path);
1720    if let Some(expected) = expected {
1721        let normalized = expected.strip_prefix("sha256:").unwrap_or(expected);
1722        if normalized != digest {
1723            return Err("pinned hash mismatch for skill reference".to_string());
1724        }
1725    }
1726    Ok(())
1727}
1728
1729fn hash_hex(bytes: &[u8]) -> String {
1730    let mut out = String::with_capacity(bytes.len() * 2);
1731    for byte in bytes {
1732        use std::fmt::Write as _;
1733        let _ = write!(&mut out, "{:02x}", byte);
1734    }
1735    out
1736}
1737
1738fn estimate_tokens(text: &str) -> u64 {
1739    let chars = text.chars().count() as u64;
1740    (chars / 4).max(1)
1741}
1742
1743fn extract_session_id(event: &EngineEvent) -> Option<String> {
1744    event
1745        .properties
1746        .get("sessionID")
1747        .and_then(|v| v.as_str())
1748        .map(|v| v.to_string())
1749        .or_else(|| {
1750            event
1751                .properties
1752                .get("part")
1753                .and_then(|v| v.get("sessionID"))
1754                .and_then(|v| v.as_str())
1755                .map(|v| v.to_string())
1756        })
1757}
1758
1759fn merge_metadata_usage(
1760    metadata: Option<Value>,
1761    tokens_used: u64,
1762    steps_used: u32,
1763    tool_calls_used: u32,
1764    cost_used_usd: f64,
1765    elapsed_ms: u64,
1766) -> Value {
1767    let mut base = metadata
1768        .and_then(|v| v.as_object().cloned())
1769        .unwrap_or_default();
1770    base.insert(
1771        "budgetUsage".to_string(),
1772        json!({
1773            "tokensUsed": tokens_used,
1774            "stepsUsed": steps_used,
1775            "toolCallsUsed": tool_calls_used,
1776            "costUsedUsd": cost_used_usd,
1777            "elapsedMs": elapsed_ms
1778        }),
1779    );
1780    Value::Object(base)
1781}
1782
1783fn normalize_tool_name(name: &str) -> String {
1784    match name.trim().to_lowercase().replace('-', "_").as_str() {
1785        "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
1786        other => other.to_string(),
1787    }
1788}
1789
1790async fn evaluate_capability_deny(
1791    state: &AppState,
1792    instance: &AgentInstance,
1793    tool: &str,
1794    args: &Value,
1795    caps: &tandem_orchestrator::CapabilitySpec,
1796    session_id: &str,
1797    message_id: &str,
1798) -> Option<String> {
1799    let deny_patterns = caps
1800        .tool_denylist
1801        .iter()
1802        .map(|name| normalize_tool_name(name))
1803        .collect::<Vec<_>>();
1804    if !deny_patterns.is_empty() && any_policy_matches(&deny_patterns, tool) {
1805        return Some(format!("tool `{tool}` denied by agent capability policy"));
1806    }
1807    let allow_patterns = caps
1808        .tool_allowlist
1809        .iter()
1810        .map(|name| normalize_tool_name(name))
1811        .collect::<Vec<_>>();
1812    if !allow_patterns.is_empty() && !any_policy_matches(&allow_patterns, tool) {
1813        return Some(format!("tool `{tool}` not in agent allowlist"));
1814    }
1815
1816    let browser_execution_tool = matches!(
1817        tool,
1818        "browser_open"
1819            | "browser_navigate"
1820            | "browser_snapshot"
1821            | "browser_click"
1822            | "browser_type"
1823            | "browser_press"
1824            | "browser_wait"
1825            | "browser_extract"
1826            | "browser_screenshot"
1827            | "browser_close"
1828    );
1829
1830    if matches!(
1831        tool,
1832        "websearch" | "webfetch" | "webfetch_html" | "browser_open" | "browser_navigate"
1833    ) || browser_execution_tool
1834    {
1835        if !caps.net_scopes.enabled {
1836            return Some("network disabled for this agent instance".to_string());
1837        }
1838        if !caps.net_scopes.allow_hosts.is_empty() {
1839            if tool == "websearch" {
1840                return Some(
1841                    "websearch blocked: host allowlist cannot be verified for search tool"
1842                        .to_string(),
1843                );
1844            }
1845            if let Some(host) = extract_url_host(args) {
1846                let allowed = caps.net_scopes.allow_hosts.iter().any(|h| {
1847                    let allowed = h.trim().to_ascii_lowercase();
1848                    !allowed.is_empty()
1849                        && (host == allowed || host.ends_with(&format!(".{allowed}")))
1850                });
1851                if !allowed {
1852                    return Some(format!("network host `{host}` not in allow_hosts"));
1853                }
1854            }
1855        }
1856    }
1857
1858    if tool == "bash" {
1859        let cmd = args
1860            .get("command")
1861            .and_then(|v| v.as_str())
1862            .unwrap_or("")
1863            .to_ascii_lowercase();
1864        if cmd.contains("git push") {
1865            if !caps.git_caps.push {
1866                return Some("git push disabled for this agent instance".to_string());
1867            }
1868            if caps.git_caps.push_requires_approval {
1869                let action = state.permissions.evaluate("git_push", "git_push").await;
1870                match action {
1871                    tandem_core::PermissionAction::Allow => {}
1872                    tandem_core::PermissionAction::Deny => {
1873                        return Some("git push denied by policy rule".to_string());
1874                    }
1875                    tandem_core::PermissionAction::Ask => {
1876                        let pending = state
1877                            .permissions
1878                            .ask_for_session_with_context(
1879                                Some(session_id),
1880                                "git_push",
1881                                args.clone(),
1882                                Some(tandem_core::PermissionArgsContext {
1883                                    args_source: "agent_team.git_push".to_string(),
1884                                    args_integrity: "runtime-checked".to_string(),
1885                                    query: Some(format!(
1886                                        "instanceID={} messageID={}",
1887                                        instance.instance_id, message_id
1888                                    )),
1889                                }),
1890                            )
1891                            .await;
1892                        return Some(format!(
1893                            "git push requires explicit user approval (approvalID={})",
1894                            pending.id
1895                        ));
1896                    }
1897                }
1898            }
1899        }
1900        if cmd.contains("git commit") && !caps.git_caps.commit {
1901            return Some("git commit disabled for this agent instance".to_string());
1902        }
1903    }
1904
1905    let access_kind = tool_fs_access_kind(tool);
1906    if let Some(kind) = access_kind {
1907        let Some(session) = state.storage.get_session(session_id).await else {
1908            return Some("session not found for capability evaluation".to_string());
1909        };
1910        let Some(root) = session.workspace_root.clone() else {
1911            return Some("workspace root missing for capability evaluation".to_string());
1912        };
1913        let requested = extract_tool_candidate_paths(tool, args);
1914        if !requested.is_empty() {
1915            let allowed_scopes = if kind == "read" {
1916                &caps.fs_scopes.read
1917            } else {
1918                &caps.fs_scopes.write
1919            };
1920            if allowed_scopes.is_empty() {
1921                return Some(format!("fs {kind} access blocked: no scopes configured"));
1922            }
1923            for candidate in requested {
1924                if !is_path_allowed_by_scopes(&root, &candidate, allowed_scopes) {
1925                    return Some(format!("fs {kind} access denied for path `{}`", candidate));
1926                }
1927            }
1928        }
1929    }
1930
1931    denied_secrets_reason(tool, caps, args)
1932}
1933
1934fn denied_secrets_reason(
1935    tool: &str,
1936    caps: &tandem_orchestrator::CapabilitySpec,
1937    args: &Value,
1938) -> Option<String> {
1939    if tool == "auth" {
1940        if caps.secrets_scopes.is_empty() {
1941            return Some("secrets are disabled for this agent instance".to_string());
1942        }
1943        let alias = args
1944            .get("id")
1945            .or_else(|| args.get("provider"))
1946            .or_else(|| args.get("providerID"))
1947            .and_then(|v| v.as_str())
1948            .unwrap_or("")
1949            .trim();
1950        if !alias.is_empty() && !caps.secrets_scopes.iter().any(|allowed| allowed == alias) {
1951            return Some(format!(
1952                "secret alias `{alias}` is not in agent secretsScopes allowlist"
1953            ));
1954        }
1955    }
1956    None
1957}
1958
1959fn tool_fs_access_kind(tool: &str) -> Option<&'static str> {
1960    match tool {
1961        "read" | "glob" | "grep" | "codesearch" | "lsp" => Some("read"),
1962        "write" | "edit" | "apply_patch" => Some("write"),
1963        _ => None,
1964    }
1965}
1966
1967fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
1968    let Some(obj) = args.as_object() else {
1969        return Vec::new();
1970    };
1971    let keys: &[&str] = match tool {
1972        "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
1973        "glob" => &["pattern"],
1974        "lsp" => &["filePath", "path"],
1975        "bash" => &["cwd"],
1976        "apply_patch" => &["path"],
1977        _ => &["path", "cwd"],
1978    };
1979    keys.iter()
1980        .filter_map(|key| obj.get(*key))
1981        .filter_map(|value| value.as_str())
1982        .filter(|s| !s.trim().is_empty())
1983        .map(|raw| strip_glob_tokens(raw).to_string())
1984        .collect()
1985}
1986
1987fn strip_glob_tokens(path: &str) -> &str {
1988    let mut end = path.len();
1989    for (idx, ch) in path.char_indices() {
1990        if ch == '*' || ch == '?' || ch == '[' {
1991            end = idx;
1992            break;
1993        }
1994    }
1995    &path[..end]
1996}
1997
1998fn is_path_allowed_by_scopes(root: &str, candidate: &str, scopes: &[String]) -> bool {
1999    let root_path = PathBuf::from(root);
2000    let candidate_path = resolve_path(&root_path, candidate);
2001    scopes.iter().any(|scope| {
2002        let scope_path = resolve_path(&root_path, scope);
2003        candidate_path.starts_with(scope_path)
2004    })
2005}
2006
2007fn resolve_path(root: &Path, raw: &str) -> PathBuf {
2008    let raw = raw.trim();
2009    if raw.is_empty() {
2010        return root.to_path_buf();
2011    }
2012    let path = PathBuf::from(raw);
2013    if path.is_absolute() {
2014        path
2015    } else {
2016        root.join(path)
2017    }
2018}
2019
2020fn extract_url_host(args: &Value) -> Option<String> {
2021    let url = args
2022        .get("url")
2023        .or_else(|| args.get("uri"))
2024        .or_else(|| args.get("link"))
2025        .and_then(|v| v.as_str())?;
2026    let raw = url.trim();
2027    let (_, after_scheme) = raw.split_once("://")?;
2028    let host_port = after_scheme.split('/').next().unwrap_or_default();
2029    let host = host_port.split('@').next_back().unwrap_or_default();
2030    let host = host
2031        .split(':')
2032        .next()
2033        .unwrap_or_default()
2034        .to_ascii_lowercase();
2035    if host.is_empty() {
2036        None
2037    } else {
2038        Some(host)
2039    }
2040}
2041
2042pub fn emit_spawn_requested(state: &AppState, req: &SpawnRequest) {
2043    emit_spawn_requested_with_context(state, req, &SpawnEventContext::default());
2044}
2045
2046pub fn emit_spawn_denied(state: &AppState, req: &SpawnRequest, decision: &SpawnDecision) {
2047    emit_spawn_denied_with_context(state, req, decision, &SpawnEventContext::default());
2048}
2049
2050pub fn emit_spawn_approved(state: &AppState, req: &SpawnRequest, instance: &AgentInstance) {
2051    emit_spawn_approved_with_context(state, req, instance, &SpawnEventContext::default());
2052}
2053
2054#[derive(Default)]
2055pub struct SpawnEventContext<'a> {
2056    pub session_id: Option<&'a str>,
2057    pub message_id: Option<&'a str>,
2058    pub run_id: Option<&'a str>,
2059}
2060
2061pub fn emit_spawn_requested_with_context(
2062    state: &AppState,
2063    req: &SpawnRequest,
2064    ctx: &SpawnEventContext<'_>,
2065) {
2066    state.event_bus.publish(EngineEvent::new(
2067        "agent_team.spawn.requested",
2068        json!({
2069            "sessionID": ctx.session_id,
2070            "messageID": ctx.message_id,
2071            "runID": ctx.run_id,
2072            "missionID": req.mission_id,
2073            "instanceID": Value::Null,
2074            "parentInstanceID": req.parent_instance_id,
2075            "source": req.source,
2076            "requestedRole": req.role,
2077            "templateID": req.template_id,
2078            "justification": req.justification,
2079            "timestampMs": crate::now_ms(),
2080        }),
2081    ));
2082}
2083
2084pub fn emit_spawn_denied_with_context(
2085    state: &AppState,
2086    req: &SpawnRequest,
2087    decision: &SpawnDecision,
2088    ctx: &SpawnEventContext<'_>,
2089) {
2090    state.event_bus.publish(EngineEvent::new(
2091        "agent_team.spawn.denied",
2092        json!({
2093            "sessionID": ctx.session_id,
2094            "messageID": ctx.message_id,
2095            "runID": ctx.run_id,
2096            "missionID": req.mission_id,
2097            "instanceID": Value::Null,
2098            "parentInstanceID": req.parent_instance_id,
2099            "source": req.source,
2100            "requestedRole": req.role,
2101            "templateID": req.template_id,
2102            "code": decision.code,
2103            "error": decision.reason,
2104            "timestampMs": crate::now_ms(),
2105        }),
2106    ));
2107}
2108
2109pub fn emit_spawn_approved_with_context(
2110    state: &AppState,
2111    req: &SpawnRequest,
2112    instance: &AgentInstance,
2113    ctx: &SpawnEventContext<'_>,
2114) {
2115    state.event_bus.publish(EngineEvent::new(
2116        "agent_team.spawn.approved",
2117        json!({
2118            "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2119            "messageID": ctx.message_id,
2120            "runID": ctx.run_id.or(instance.run_id.as_deref()),
2121            "missionID": instance.mission_id,
2122            "instanceID": instance.instance_id,
2123            "parentInstanceID": instance.parent_instance_id,
2124            "source": req.source,
2125            "requestedRole": req.role,
2126            "templateID": instance.template_id,
2127            "skillHash": instance.skill_hash,
2128            "timestampMs": crate::now_ms(),
2129        }),
2130    ));
2131    state.event_bus.publish(EngineEvent::new(
2132        "agent_team.instance.started",
2133        json!({
2134            "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2135            "messageID": ctx.message_id,
2136            "runID": ctx.run_id.or(instance.run_id.as_deref()),
2137            "missionID": instance.mission_id,
2138            "instanceID": instance.instance_id,
2139            "parentInstanceID": instance.parent_instance_id,
2140            "role": instance.role,
2141            "status": instance.status,
2142            "budgetLimit": instance.budget,
2143            "skillHash": instance.skill_hash,
2144            "timestampMs": crate::now_ms(),
2145        }),
2146    ));
2147}
2148
2149pub fn emit_budget_usage(
2150    state: &AppState,
2151    instance: &AgentInstance,
2152    tokens_used: u64,
2153    steps_used: u32,
2154    tool_calls_used: u32,
2155    cost_used_usd: f64,
2156    elapsed_ms: u64,
2157) {
2158    state.event_bus.publish(EngineEvent::new(
2159        "agent_team.budget.usage",
2160        json!({
2161            "sessionID": instance.session_id,
2162            "messageID": Value::Null,
2163            "runID": instance.run_id,
2164            "missionID": instance.mission_id,
2165            "instanceID": instance.instance_id,
2166            "tokensUsed": tokens_used,
2167            "stepsUsed": steps_used,
2168            "toolCallsUsed": tool_calls_used,
2169            "costUsedUsd": cost_used_usd,
2170            "elapsedMs": elapsed_ms,
2171            "timestampMs": crate::now_ms(),
2172        }),
2173    ));
2174}
2175
2176#[allow(clippy::too_many_arguments)]
2177pub fn emit_budget_exhausted(
2178    state: &AppState,
2179    instance: &AgentInstance,
2180    exhausted_by: &str,
2181    tokens_used: u64,
2182    steps_used: u32,
2183    tool_calls_used: u32,
2184    cost_used_usd: f64,
2185    elapsed_ms: u64,
2186) {
2187    state.event_bus.publish(EngineEvent::new(
2188        "agent_team.budget.exhausted",
2189        json!({
2190            "sessionID": instance.session_id,
2191            "messageID": Value::Null,
2192            "runID": instance.run_id,
2193            "missionID": instance.mission_id,
2194            "instanceID": instance.instance_id,
2195            "exhaustedBy": exhausted_by,
2196            "tokensUsed": tokens_used,
2197            "stepsUsed": steps_used,
2198            "toolCallsUsed": tool_calls_used,
2199            "costUsedUsd": cost_used_usd,
2200            "elapsedMs": elapsed_ms,
2201            "timestampMs": crate::now_ms(),
2202        }),
2203    ));
2204}
2205
2206pub fn emit_instance_cancelled(state: &AppState, instance: &AgentInstance, reason: &str) {
2207    state.event_bus.publish(EngineEvent::new(
2208        "agent_team.instance.cancelled",
2209        json!({
2210            "sessionID": instance.session_id,
2211            "messageID": Value::Null,
2212            "runID": instance.run_id,
2213            "missionID": instance.mission_id,
2214            "instanceID": instance.instance_id,
2215            "parentInstanceID": instance.parent_instance_id,
2216            "role": instance.role,
2217            "status": instance.status,
2218            "reason": reason,
2219            "timestampMs": crate::now_ms(),
2220        }),
2221    ));
2222}
2223
2224pub fn emit_instance_completed(state: &AppState, instance: &AgentInstance) {
2225    state.event_bus.publish(EngineEvent::new(
2226        "agent_team.instance.completed",
2227        json!({
2228            "sessionID": instance.session_id,
2229            "messageID": Value::Null,
2230            "runID": instance.run_id,
2231            "missionID": instance.mission_id,
2232            "instanceID": instance.instance_id,
2233            "parentInstanceID": instance.parent_instance_id,
2234            "role": instance.role,
2235            "status": instance.status,
2236            "timestampMs": crate::now_ms(),
2237        }),
2238    ));
2239}
2240
2241pub fn emit_instance_failed(state: &AppState, instance: &AgentInstance) {
2242    state.event_bus.publish(EngineEvent::new(
2243        "agent_team.instance.failed",
2244        json!({
2245            "sessionID": instance.session_id,
2246            "messageID": Value::Null,
2247            "runID": instance.run_id,
2248            "missionID": instance.mission_id,
2249            "instanceID": instance.instance_id,
2250            "parentInstanceID": instance.parent_instance_id,
2251            "role": instance.role,
2252            "status": instance.status,
2253            "timestampMs": crate::now_ms(),
2254        }),
2255    ));
2256}
2257
2258pub fn emit_mission_budget_exhausted(
2259    state: &AppState,
2260    mission_id: &str,
2261    exhausted_by: &str,
2262    tokens_used: u64,
2263    steps_used: u64,
2264    tool_calls_used: u64,
2265    cost_used_usd: f64,
2266) {
2267    state.event_bus.publish(EngineEvent::new(
2268        "agent_team.mission.budget.exhausted",
2269        json!({
2270            "sessionID": Value::Null,
2271            "messageID": Value::Null,
2272            "runID": Value::Null,
2273            "missionID": mission_id,
2274            "instanceID": Value::Null,
2275            "exhaustedBy": exhausted_by,
2276            "tokensUsed": tokens_used,
2277            "stepsUsed": steps_used,
2278            "toolCallsUsed": tool_calls_used,
2279            "costUsedUsd": cost_used_usd,
2280            "timestampMs": crate::now_ms(),
2281        }),
2282    ));
2283}