Skip to main content

tandem_server/
agent_teams.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::Context;
7use futures::future::BoxFuture;
8use serde::Deserialize;
9use serde::Serialize;
10use serde_json::{json, Value};
11use tandem_core::{
12    any_policy_matches, SpawnAgentHook, SpawnAgentToolContext, SpawnAgentToolResult,
13    ToolPolicyContext, ToolPolicyDecision, ToolPolicyHook,
14};
15use tandem_orchestrator::{
16    AgentInstance, AgentInstanceStatus, AgentRole, AgentTemplate, BudgetLimit, SpawnDecision,
17    SpawnDenyCode, SpawnPolicy, SpawnRequest, SpawnSource,
18};
19use tandem_skills::SkillService;
20use tandem_types::{EngineEvent, Session};
21use tokio::fs;
22use tokio::sync::RwLock;
23use uuid::Uuid;
24
25use crate::AppState;
26
27#[derive(Clone, Default)]
28pub struct AgentTeamRuntime {
29    policy: Arc<RwLock<Option<SpawnPolicy>>>,
30    templates: Arc<RwLock<HashMap<String, AgentTemplate>>>,
31    instances: Arc<RwLock<HashMap<String, AgentInstance>>>,
32    budgets: Arc<RwLock<HashMap<String, InstanceBudgetState>>>,
33    mission_budgets: Arc<RwLock<HashMap<String, MissionBudgetState>>>,
34    spawn_approvals: Arc<RwLock<HashMap<String, PendingSpawnApproval>>>,
35    loaded_workspace: Arc<RwLock<Option<String>>>,
36    audit_path: Arc<RwLock<PathBuf>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct SpawnResult {
41    pub decision: SpawnDecision,
42    pub instance: Option<AgentInstance>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46pub struct AgentMissionSummary {
47    #[serde(rename = "missionID")]
48    pub mission_id: String,
49    #[serde(rename = "instanceCount")]
50    pub instance_count: usize,
51    #[serde(rename = "runningCount")]
52    pub running_count: usize,
53    #[serde(rename = "completedCount")]
54    pub completed_count: usize,
55    #[serde(rename = "failedCount")]
56    pub failed_count: usize,
57    #[serde(rename = "cancelledCount")]
58    pub cancelled_count: usize,
59    #[serde(rename = "queuedCount")]
60    pub queued_count: usize,
61    #[serde(rename = "tokenUsedTotal")]
62    pub token_used_total: u64,
63    #[serde(rename = "toolCallsUsedTotal")]
64    pub tool_calls_used_total: u64,
65    #[serde(rename = "stepsUsedTotal")]
66    pub steps_used_total: u64,
67    #[serde(rename = "costUsedUsdTotal")]
68    pub cost_used_usd_total: f64,
69}
70
71#[derive(Debug, Clone, Default)]
72struct InstanceBudgetState {
73    tokens_used: u64,
74    steps_used: u32,
75    tool_calls_used: u32,
76    cost_used_usd: f64,
77    started_at: Option<Instant>,
78    exhausted: bool,
79}
80
81#[derive(Debug, Clone, Default)]
82struct MissionBudgetState {
83    tokens_used: u64,
84    steps_used: u64,
85    tool_calls_used: u64,
86    cost_used_usd: f64,
87    exhausted: bool,
88}
89
90#[derive(Debug, Clone, Serialize)]
91pub struct PendingSpawnApproval {
92    #[serde(rename = "approvalID")]
93    pub approval_id: String,
94    #[serde(rename = "createdAtMs")]
95    pub created_at_ms: u64,
96    pub request: SpawnRequest,
97    #[serde(rename = "decisionCode")]
98    pub decision_code: Option<SpawnDenyCode>,
99    pub reason: Option<String>,
100}
101
102#[derive(Clone)]
103pub struct ServerSpawnAgentHook {
104    state: AppState,
105}
106
107#[derive(Debug, Deserialize)]
108struct SpawnAgentToolInput {
109    #[serde(rename = "missionID")]
110    mission_id: Option<String>,
111    #[serde(rename = "parentInstanceID")]
112    parent_instance_id: Option<String>,
113    #[serde(rename = "templateID")]
114    template_id: Option<String>,
115    role: AgentRole,
116    source: Option<SpawnSource>,
117    justification: String,
118    #[serde(rename = "budgetOverride", default)]
119    budget_override: Option<BudgetLimit>,
120}
121
122impl ServerSpawnAgentHook {
123    pub fn new(state: AppState) -> Self {
124        Self { state }
125    }
126}
127
128impl SpawnAgentHook for ServerSpawnAgentHook {
129    fn spawn_agent(
130        &self,
131        ctx: SpawnAgentToolContext,
132    ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>> {
133        let state = self.state.clone();
134        Box::pin(async move {
135            let parsed = serde_json::from_value::<SpawnAgentToolInput>(ctx.args.clone());
136            let input = match parsed {
137                Ok(input) => input,
138                Err(err) => {
139                    return Ok(SpawnAgentToolResult {
140                        output: format!("spawn_agent denied: invalid args ({err})"),
141                        metadata: json!({
142                            "ok": false,
143                            "code": "SPAWN_INVALID_ARGS",
144                            "error": err.to_string(),
145                        }),
146                    });
147                }
148            };
149            let req = SpawnRequest {
150                mission_id: input.mission_id,
151                parent_instance_id: input.parent_instance_id,
152                source: input.source.unwrap_or(SpawnSource::ToolCall),
153                parent_role: None,
154                role: input.role,
155                template_id: input.template_id,
156                justification: input.justification,
157                budget_override: input.budget_override,
158            };
159
160            let event_ctx = SpawnEventContext {
161                session_id: Some(ctx.session_id.as_str()),
162                message_id: Some(ctx.message_id.as_str()),
163                run_id: None,
164            };
165            emit_spawn_requested_with_context(&state, &req, &event_ctx);
166            let result = state.agent_teams.spawn(&state, req.clone()).await;
167            if !result.decision.allowed || result.instance.is_none() {
168                emit_spawn_denied_with_context(&state, &req, &result.decision, &event_ctx);
169                return Ok(SpawnAgentToolResult {
170                    output: result
171                        .decision
172                        .reason
173                        .clone()
174                        .unwrap_or_else(|| "spawn_agent denied".to_string()),
175                    metadata: json!({
176                        "ok": false,
177                        "code": result.decision.code,
178                        "error": result.decision.reason,
179                        "requiresUserApproval": result.decision.requires_user_approval,
180                    }),
181                });
182            }
183            let instance = result.instance.expect("checked is_some");
184            emit_spawn_approved_with_context(&state, &req, &instance, &event_ctx);
185            Ok(SpawnAgentToolResult {
186                output: format!(
187                    "spawned {} as instance {} (session {})",
188                    instance.template_id, instance.instance_id, instance.session_id
189                ),
190                metadata: json!({
191                    "ok": true,
192                    "missionID": instance.mission_id,
193                    "instanceID": instance.instance_id,
194                    "sessionID": instance.session_id,
195                    "runID": instance.run_id,
196                    "status": instance.status,
197                    "skillHash": instance.skill_hash,
198                }),
199            })
200        })
201    }
202}
203
204#[derive(Clone)]
205pub struct ServerToolPolicyHook {
206    state: AppState,
207}
208
209impl ServerToolPolicyHook {
210    pub fn new(state: AppState) -> Self {
211        Self { state }
212    }
213}
214
215impl ToolPolicyHook for ServerToolPolicyHook {
216    fn evaluate_tool(
217        &self,
218        ctx: ToolPolicyContext,
219    ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>> {
220        let state = self.state.clone();
221        Box::pin(async move {
222            let tool = normalize_tool_name(&ctx.tool);
223            if let Some(policy) = state.routine_session_policy(&ctx.session_id).await {
224                let allowed_patterns = policy
225                    .allowed_tools
226                    .iter()
227                    .map(|name| normalize_tool_name(name))
228                    .collect::<Vec<_>>();
229                if !policy.allowed_tools.is_empty() && !any_policy_matches(&allowed_patterns, &tool)
230                {
231                    let reason = format!(
232                        "tool `{}` is not allowed for routine `{}` (run `{}`)",
233                        tool, policy.routine_id, policy.run_id
234                    );
235                    state.event_bus.publish(EngineEvent::new(
236                        "routine.tool.denied",
237                        json!({
238                            "sessionID": ctx.session_id,
239                            "messageID": ctx.message_id,
240                            "runID": policy.run_id,
241                            "routineID": policy.routine_id,
242                            "tool": tool,
243                            "reason": reason,
244                            "timestampMs": crate::now_ms(),
245                        }),
246                    ));
247                    return Ok(ToolPolicyDecision {
248                        allowed: false,
249                        reason: Some(reason),
250                    });
251                }
252            }
253
254            let Some(instance) = state
255                .agent_teams
256                .instance_for_session(&ctx.session_id)
257                .await
258            else {
259                return Ok(ToolPolicyDecision {
260                    allowed: true,
261                    reason: None,
262                });
263            };
264            let caps = instance.capabilities.clone();
265            let deny = evaluate_capability_deny(
266                &state,
267                &instance,
268                &tool,
269                &ctx.args,
270                &caps,
271                &ctx.session_id,
272                &ctx.message_id,
273            )
274            .await;
275            if let Some(reason) = deny {
276                state.event_bus.publish(EngineEvent::new(
277                    "agent_team.capability.denied",
278                    json!({
279                        "sessionID": ctx.session_id,
280                        "messageID": ctx.message_id,
281                        "runID": instance.run_id,
282                        "missionID": instance.mission_id,
283                        "instanceID": instance.instance_id,
284                        "tool": tool,
285                        "reason": reason,
286                        "timestampMs": crate::now_ms(),
287                    }),
288                ));
289                return Ok(ToolPolicyDecision {
290                    allowed: false,
291                    reason: Some(reason),
292                });
293            }
294            Ok(ToolPolicyDecision {
295                allowed: true,
296                reason: None,
297            })
298        })
299    }
300}
301
302impl AgentTeamRuntime {
303    pub fn new(audit_path: PathBuf) -> Self {
304        Self {
305            policy: Arc::new(RwLock::new(None)),
306            templates: Arc::new(RwLock::new(HashMap::new())),
307            instances: Arc::new(RwLock::new(HashMap::new())),
308            budgets: Arc::new(RwLock::new(HashMap::new())),
309            mission_budgets: Arc::new(RwLock::new(HashMap::new())),
310            spawn_approvals: Arc::new(RwLock::new(HashMap::new())),
311            loaded_workspace: Arc::new(RwLock::new(None)),
312            audit_path: Arc::new(RwLock::new(audit_path)),
313        }
314    }
315
316    pub async fn set_audit_path(&self, path: PathBuf) {
317        *self.audit_path.write().await = path;
318    }
319
320    pub async fn list_templates(&self) -> Vec<AgentTemplate> {
321        let mut rows = self
322            .templates
323            .read()
324            .await
325            .values()
326            .cloned()
327            .collect::<Vec<_>>();
328        rows.sort_by(|a, b| a.template_id.cmp(&b.template_id));
329        rows
330    }
331
332    async fn templates_dir_for_loaded_workspace(&self) -> anyhow::Result<PathBuf> {
333        let workspace = self
334            .loaded_workspace
335            .read()
336            .await
337            .clone()
338            .ok_or_else(|| anyhow::anyhow!("agent team workspace not loaded"))?;
339        Ok(PathBuf::from(workspace)
340            .join(".tandem")
341            .join("agent-team")
342            .join("templates"))
343    }
344
345    fn template_filename(template_id: &str) -> String {
346        let safe = template_id
347            .trim()
348            .chars()
349            .map(|ch| {
350                if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
351                    ch
352                } else {
353                    '_'
354                }
355            })
356            .collect::<String>();
357        let fallback = if safe.is_empty() {
358            "template".to_string()
359        } else {
360            safe
361        };
362        format!("{fallback}.yaml")
363    }
364
365    pub async fn upsert_template(
366        &self,
367        workspace_root: &str,
368        template: AgentTemplate,
369    ) -> anyhow::Result<AgentTemplate> {
370        self.ensure_loaded_for_workspace(workspace_root).await?;
371        let templates_dir = self.templates_dir_for_loaded_workspace().await?;
372        fs::create_dir_all(&templates_dir).await?;
373        let path = templates_dir.join(Self::template_filename(&template.template_id));
374        let payload = serde_yaml::to_string(&template)?;
375        fs::write(path, payload).await?;
376        self.templates
377            .write()
378            .await
379            .insert(template.template_id.clone(), template.clone());
380        Ok(template)
381    }
382
383    pub async fn delete_template(
384        &self,
385        workspace_root: &str,
386        template_id: &str,
387    ) -> anyhow::Result<bool> {
388        self.ensure_loaded_for_workspace(workspace_root).await?;
389        let templates_dir = self.templates_dir_for_loaded_workspace().await?;
390        let path = templates_dir.join(Self::template_filename(template_id));
391        let existed = self.templates.write().await.remove(template_id).is_some();
392        if path.exists() {
393            let _ = fs::remove_file(path).await;
394        }
395        Ok(existed)
396    }
397
398    pub async fn list_instances(
399        &self,
400        mission_id: Option<&str>,
401        parent_instance_id: Option<&str>,
402        status: Option<AgentInstanceStatus>,
403    ) -> Vec<AgentInstance> {
404        let mut rows = self
405            .instances
406            .read()
407            .await
408            .values()
409            .filter(|instance| {
410                if let Some(mission_id) = mission_id {
411                    if instance.mission_id != mission_id {
412                        return false;
413                    }
414                }
415                if let Some(parent_id) = parent_instance_id {
416                    if instance.parent_instance_id.as_deref() != Some(parent_id) {
417                        return false;
418                    }
419                }
420                if let Some(status) = &status {
421                    if &instance.status != status {
422                        return false;
423                    }
424                }
425                true
426            })
427            .cloned()
428            .collect::<Vec<_>>();
429        rows.sort_by(|a, b| a.instance_id.cmp(&b.instance_id));
430        rows
431    }
432
433    pub async fn list_mission_summaries(&self) -> Vec<AgentMissionSummary> {
434        let instances = self.instances.read().await;
435        let mut by_mission: HashMap<String, AgentMissionSummary> = HashMap::new();
436        for instance in instances.values() {
437            let row = by_mission
438                .entry(instance.mission_id.clone())
439                .or_insert_with(|| AgentMissionSummary {
440                    mission_id: instance.mission_id.clone(),
441                    instance_count: 0,
442                    running_count: 0,
443                    completed_count: 0,
444                    failed_count: 0,
445                    cancelled_count: 0,
446                    queued_count: 0,
447                    token_used_total: 0,
448                    tool_calls_used_total: 0,
449                    steps_used_total: 0,
450                    cost_used_usd_total: 0.0,
451                });
452            row.instance_count = row.instance_count.saturating_add(1);
453            match instance.status {
454                AgentInstanceStatus::Queued => {
455                    row.queued_count = row.queued_count.saturating_add(1)
456                }
457                AgentInstanceStatus::Running => {
458                    row.running_count = row.running_count.saturating_add(1)
459                }
460                AgentInstanceStatus::Completed => {
461                    row.completed_count = row.completed_count.saturating_add(1)
462                }
463                AgentInstanceStatus::Failed => {
464                    row.failed_count = row.failed_count.saturating_add(1)
465                }
466                AgentInstanceStatus::Cancelled => {
467                    row.cancelled_count = row.cancelled_count.saturating_add(1)
468                }
469            }
470            if let Some(usage) = instance
471                .metadata
472                .as_ref()
473                .and_then(|m| m.get("budgetUsage"))
474                .and_then(|u| u.as_object())
475            {
476                row.token_used_total = row.token_used_total.saturating_add(
477                    usage
478                        .get("tokensUsed")
479                        .and_then(|v| v.as_u64())
480                        .unwrap_or(0),
481                );
482                row.tool_calls_used_total = row.tool_calls_used_total.saturating_add(
483                    usage
484                        .get("toolCallsUsed")
485                        .and_then(|v| v.as_u64())
486                        .unwrap_or(0),
487                );
488                row.steps_used_total = row
489                    .steps_used_total
490                    .saturating_add(usage.get("stepsUsed").and_then(|v| v.as_u64()).unwrap_or(0));
491                row.cost_used_usd_total += usage
492                    .get("costUsedUsd")
493                    .and_then(|v| v.as_f64())
494                    .unwrap_or(0.0);
495            }
496        }
497        let mut rows = by_mission.into_values().collect::<Vec<_>>();
498        rows.sort_by(|a, b| a.mission_id.cmp(&b.mission_id));
499        rows
500    }
501
502    pub async fn instance_for_session(&self, session_id: &str) -> Option<AgentInstance> {
503        self.instances
504            .read()
505            .await
506            .values()
507            .find(|instance| instance.session_id == session_id)
508            .cloned()
509    }
510
511    pub async fn list_spawn_approvals(&self) -> Vec<PendingSpawnApproval> {
512        let mut rows = self
513            .spawn_approvals
514            .read()
515            .await
516            .values()
517            .cloned()
518            .collect::<Vec<_>>();
519        rows.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
520        rows
521    }
522
523    pub async fn ensure_loaded_for_workspace(&self, workspace_root: &str) -> anyhow::Result<()> {
524        let normalized = workspace_root.trim().to_string();
525        let already_loaded = self
526            .loaded_workspace
527            .read()
528            .await
529            .as_ref()
530            .map(|s| s == &normalized)
531            .unwrap_or(false);
532        if already_loaded {
533            return Ok(());
534        }
535
536        let root = PathBuf::from(&normalized);
537        let policy_path = root
538            .join(".tandem")
539            .join("agent-team")
540            .join("spawn-policy.yaml");
541        let templates_dir = root.join(".tandem").join("agent-team").join("templates");
542
543        let mut next_policy = None;
544        if policy_path.exists() {
545            let raw = fs::read_to_string(&policy_path)
546                .await
547                .with_context(|| format!("failed reading {}", policy_path.display()))?;
548            let parsed = serde_yaml::from_str::<SpawnPolicy>(&raw)
549                .with_context(|| format!("failed parsing {}", policy_path.display()))?;
550            next_policy = Some(parsed);
551        }
552
553        let mut next_templates = HashMap::new();
554        if templates_dir.exists() {
555            let mut entries = fs::read_dir(&templates_dir).await?;
556            while let Some(entry) = entries.next_entry().await? {
557                let path = entry.path();
558                if !path.is_file() {
559                    continue;
560                }
561                let ext = path
562                    .extension()
563                    .and_then(|v| v.to_str())
564                    .unwrap_or_default()
565                    .to_ascii_lowercase();
566                if ext != "yaml" && ext != "yml" && ext != "json" {
567                    continue;
568                }
569                let raw = fs::read_to_string(&path).await?;
570                let template = serde_yaml::from_str::<AgentTemplate>(&raw)
571                    .with_context(|| format!("failed parsing {}", path.display()))?;
572                next_templates.insert(template.template_id.clone(), template);
573            }
574        }
575
576        *self.policy.write().await = next_policy;
577        *self.templates.write().await = next_templates;
578        *self.loaded_workspace.write().await = Some(normalized);
579        Ok(())
580    }
581
582    pub async fn spawn(&self, state: &AppState, req: SpawnRequest) -> SpawnResult {
583        self.spawn_with_approval_override(state, req, false).await
584    }
585
586    async fn spawn_with_approval_override(
587        &self,
588        state: &AppState,
589        mut req: SpawnRequest,
590        approval_override: bool,
591    ) -> SpawnResult {
592        let workspace_root = state.workspace_index.snapshot().await.root;
593        if let Err(err) = self.ensure_loaded_for_workspace(&workspace_root).await {
594            return SpawnResult {
595                decision: SpawnDecision {
596                    allowed: false,
597                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
598                    reason: Some(format!("spawn policy load failed: {}", err)),
599                    requires_user_approval: false,
600                },
601                instance: None,
602            };
603        }
604
605        let Some(policy) = self.policy.read().await.clone() else {
606            return SpawnResult {
607                decision: SpawnDecision {
608                    allowed: false,
609                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
610                    reason: Some("spawn policy file missing".to_string()),
611                    requires_user_approval: false,
612                },
613                instance: None,
614            };
615        };
616
617        let template = {
618            let templates = self.templates.read().await;
619            req.template_id
620                .as_deref()
621                .and_then(|template_id| templates.get(template_id).cloned())
622        };
623        if req.template_id.is_none() {
624            if let Some(found) = self
625                .templates
626                .read()
627                .await
628                .values()
629                .find(|t| t.role == req.role)
630                .cloned()
631            {
632                req.template_id = Some(found.template_id.clone());
633            }
634        }
635        let template = if template.is_some() {
636            template
637        } else {
638            let templates = self.templates.read().await;
639            req.template_id
640                .as_deref()
641                .and_then(|id| templates.get(id).cloned())
642        };
643
644        if req.parent_role.is_none() {
645            if let Some(parent_id) = req.parent_instance_id.as_deref() {
646                let instances = self.instances.read().await;
647                req.parent_role = instances
648                    .get(parent_id)
649                    .map(|instance| instance.role.clone());
650            }
651        }
652
653        let instances = self.instances.read().await;
654        let total_agents = instances.len();
655        let running_agents = instances
656            .values()
657            .filter(|instance| instance.status == AgentInstanceStatus::Running)
658            .count();
659        drop(instances);
660
661        let mut decision = policy.evaluate(&req, total_agents, running_agents, template.as_ref());
662        if approval_override
663            && !decision.allowed
664            && decision.requires_user_approval
665            && matches!(decision.code, Some(SpawnDenyCode::SpawnRequiresApproval))
666        {
667            decision = SpawnDecision {
668                allowed: true,
669                code: None,
670                reason: None,
671                requires_user_approval: false,
672            };
673        }
674        if !decision.allowed {
675            if decision.requires_user_approval && !approval_override {
676                self.queue_spawn_approval(&req, &decision).await;
677            }
678            return SpawnResult {
679                decision,
680                instance: None,
681            };
682        }
683
684        let mission_id = req
685            .mission_id
686            .clone()
687            .unwrap_or_else(|| "mission-default".to_string());
688
689        if let Some(reason) = self
690            .mission_budget_exceeded_reason(&policy, &mission_id)
691            .await
692        {
693            return SpawnResult {
694                decision: SpawnDecision {
695                    allowed: false,
696                    code: Some(SpawnDenyCode::SpawnMissionBudgetExceeded),
697                    reason: Some(reason),
698                    requires_user_approval: false,
699                },
700                instance: None,
701            };
702        }
703
704        let template = template.unwrap_or_else(|| AgentTemplate {
705            template_id: "default-template".to_string(),
706            role: req.role.clone(),
707            system_prompt: None,
708            skills: Vec::new(),
709            default_budget: BudgetLimit::default(),
710            capabilities: Default::default(),
711        });
712
713        let skill_hash = match compute_skill_hash(&workspace_root, &template, &policy).await {
714            Ok(hash) => hash,
715            Err(err) => {
716                let lowered = err.to_ascii_lowercase();
717                let code = if lowered.contains("pinned hash mismatch") {
718                    SpawnDenyCode::SpawnSkillHashMismatch
719                } else if lowered.contains("skill source denied") {
720                    SpawnDenyCode::SpawnSkillSourceDenied
721                } else {
722                    SpawnDenyCode::SpawnRequiredSkillMissing
723                };
724                return SpawnResult {
725                    decision: SpawnDecision {
726                        allowed: false,
727                        code: Some(code),
728                        reason: Some(err),
729                        requires_user_approval: false,
730                    },
731                    instance: None,
732                };
733            }
734        };
735
736        let parent_snapshot = {
737            let instances = self.instances.read().await;
738            req.parent_instance_id
739                .as_deref()
740                .and_then(|id| instances.get(id).cloned())
741        };
742        let parent_usage = if let Some(parent_id) = req.parent_instance_id.as_deref() {
743            self.budgets.read().await.get(parent_id).cloned()
744        } else {
745            None
746        };
747
748        let budget = resolve_budget(
749            &policy,
750            parent_snapshot,
751            parent_usage,
752            &template,
753            req.budget_override.clone(),
754            &req.role,
755        );
756
757        let mut session = Session::new(
758            Some(format!("Agent Team {}", template.template_id)),
759            Some(workspace_root.clone()),
760        );
761        session.workspace_root = Some(workspace_root.clone());
762        let session_id = session.id.clone();
763        if let Err(err) = state.storage.save_session(session).await {
764            return SpawnResult {
765                decision: SpawnDecision {
766                    allowed: false,
767                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
768                    reason: Some(format!("failed creating child session: {}", err)),
769                    requires_user_approval: false,
770                },
771                instance: None,
772            };
773        }
774
775        let instance = AgentInstance {
776            instance_id: format!("ins_{}", Uuid::new_v4().simple()),
777            mission_id: mission_id.clone(),
778            parent_instance_id: req.parent_instance_id.clone(),
779            role: template.role.clone(),
780            template_id: template.template_id.clone(),
781            session_id: session_id.clone(),
782            run_id: None,
783            status: AgentInstanceStatus::Running,
784            budget,
785            skill_hash: skill_hash.clone(),
786            capabilities: template.capabilities.clone(),
787            metadata: Some(json!({
788                "source": req.source,
789                "justification": req.justification,
790            })),
791        };
792
793        self.instances
794            .write()
795            .await
796            .insert(instance.instance_id.clone(), instance.clone());
797        self.budgets.write().await.insert(
798            instance.instance_id.clone(),
799            InstanceBudgetState {
800                started_at: Some(Instant::now()),
801                ..InstanceBudgetState::default()
802            },
803        );
804        let _ = self.append_audit("spawn.approved", &instance).await;
805
806        SpawnResult {
807            decision: SpawnDecision {
808                allowed: true,
809                code: None,
810                reason: None,
811                requires_user_approval: false,
812            },
813            instance: Some(instance),
814        }
815    }
816
817    pub async fn approve_spawn_approval(
818        &self,
819        state: &AppState,
820        approval_id: &str,
821        reason: Option<&str>,
822    ) -> Option<SpawnResult> {
823        let approval = self.spawn_approvals.write().await.remove(approval_id)?;
824        let result = self
825            .spawn_with_approval_override(state, approval.request.clone(), true)
826            .await;
827        if let Some(instance) = result.instance.as_ref() {
828            let note = reason.unwrap_or("approved by operator");
829            let _ = self
830                .append_approval_audit("spawn.approval.approved", approval_id, Some(instance), note)
831                .await;
832        } else {
833            let note = reason.unwrap_or("approval replay failed policy checks");
834            let _ = self
835                .append_approval_audit("spawn.approval.rejected_on_replay", approval_id, None, note)
836                .await;
837        }
838        Some(result)
839    }
840
841    pub async fn deny_spawn_approval(
842        &self,
843        approval_id: &str,
844        reason: Option<&str>,
845    ) -> Option<PendingSpawnApproval> {
846        let approval = self.spawn_approvals.write().await.remove(approval_id)?;
847        let note = reason.unwrap_or("denied by operator");
848        let _ = self
849            .append_approval_audit("spawn.approval.denied", approval_id, None, note)
850            .await;
851        Some(approval)
852    }
853
854    pub async fn cancel_instance(
855        &self,
856        state: &AppState,
857        instance_id: &str,
858        reason: &str,
859    ) -> Option<AgentInstance> {
860        let mut instances = self.instances.write().await;
861        let instance = instances.get_mut(instance_id)?;
862        if matches!(
863            instance.status,
864            AgentInstanceStatus::Completed
865                | AgentInstanceStatus::Failed
866                | AgentInstanceStatus::Cancelled
867        ) {
868            return Some(instance.clone());
869        }
870        instance.status = AgentInstanceStatus::Cancelled;
871        let snapshot = instance.clone();
872        drop(instances);
873        let _ = state.cancellations.cancel(&snapshot.session_id).await;
874        let _ = self.append_audit("instance.cancelled", &snapshot).await;
875        emit_instance_cancelled(state, &snapshot, reason);
876        Some(snapshot)
877    }
878
879    async fn queue_spawn_approval(&self, req: &SpawnRequest, decision: &SpawnDecision) {
880        let approval = PendingSpawnApproval {
881            approval_id: format!("spawn_{}", Uuid::new_v4().simple()),
882            created_at_ms: crate::now_ms(),
883            request: req.clone(),
884            decision_code: decision.code.clone(),
885            reason: decision.reason.clone(),
886        };
887        self.spawn_approvals
888            .write()
889            .await
890            .insert(approval.approval_id.clone(), approval);
891    }
892
893    async fn mission_budget_exceeded_reason(
894        &self,
895        policy: &SpawnPolicy,
896        mission_id: &str,
897    ) -> Option<String> {
898        let limit = policy.mission_total_budget.as_ref()?;
899        let usage = self
900            .mission_budgets
901            .read()
902            .await
903            .get(mission_id)
904            .cloned()
905            .unwrap_or_default();
906        if let Some(max) = limit.max_tokens {
907            if usage.tokens_used >= max {
908                return Some(format!(
909                    "mission max_tokens exhausted ({}/{})",
910                    usage.tokens_used, max
911                ));
912            }
913        }
914        if let Some(max) = limit.max_steps {
915            if usage.steps_used >= u64::from(max) {
916                return Some(format!(
917                    "mission max_steps exhausted ({}/{})",
918                    usage.steps_used, max
919                ));
920            }
921        }
922        if let Some(max) = limit.max_tool_calls {
923            if usage.tool_calls_used >= u64::from(max) {
924                return Some(format!(
925                    "mission max_tool_calls exhausted ({}/{})",
926                    usage.tool_calls_used, max
927                ));
928            }
929        }
930        if let Some(max) = limit.max_cost_usd {
931            if usage.cost_used_usd >= max {
932                return Some(format!(
933                    "mission max_cost_usd exhausted ({:.6}/{:.6})",
934                    usage.cost_used_usd, max
935                ));
936            }
937        }
938        None
939    }
940
941    pub async fn cancel_mission(&self, state: &AppState, mission_id: &str, reason: &str) -> usize {
942        let instance_ids = self
943            .instances
944            .read()
945            .await
946            .values()
947            .filter(|instance| instance.mission_id == mission_id)
948            .map(|instance| instance.instance_id.clone())
949            .collect::<Vec<_>>();
950        let mut count = 0usize;
951        for instance_id in instance_ids {
952            if self
953                .cancel_instance(state, &instance_id, reason)
954                .await
955                .is_some()
956            {
957                count = count.saturating_add(1);
958            }
959        }
960        count
961    }
962
963    async fn mark_instance_terminal(
964        &self,
965        state: &AppState,
966        instance_id: &str,
967        status: AgentInstanceStatus,
968    ) -> Option<AgentInstance> {
969        let mut instances = self.instances.write().await;
970        let instance = instances.get_mut(instance_id)?;
971        if matches!(
972            instance.status,
973            AgentInstanceStatus::Completed
974                | AgentInstanceStatus::Failed
975                | AgentInstanceStatus::Cancelled
976        ) {
977            return Some(instance.clone());
978        }
979        instance.status = status.clone();
980        let snapshot = instance.clone();
981        drop(instances);
982        match status {
983            AgentInstanceStatus::Completed => emit_instance_completed(state, &snapshot),
984            AgentInstanceStatus::Failed => emit_instance_failed(state, &snapshot),
985            _ => {}
986        }
987        Some(snapshot)
988    }
989
990    pub async fn handle_engine_event(&self, state: &AppState, event: &EngineEvent) {
991        let Some(session_id) = extract_session_id(event) else {
992            return;
993        };
994        let Some(instance_id) = self.instance_id_for_session(&session_id).await else {
995            return;
996        };
997        if event.event_type == "provider.usage" {
998            let total_tokens = event
999                .properties
1000                .get("totalTokens")
1001                .and_then(|v| v.as_u64())
1002                .unwrap_or(0);
1003            let cost_used_usd = event
1004                .properties
1005                .get("costUsd")
1006                .and_then(|v| v.as_f64())
1007                .unwrap_or(0.0);
1008            if total_tokens > 0 {
1009                let exhausted = self
1010                    .apply_exact_token_usage(state, &instance_id, total_tokens, cost_used_usd)
1011                    .await;
1012                if exhausted {
1013                    let _ = self
1014                        .cancel_instance(state, &instance_id, "budget exhausted")
1015                        .await;
1016                }
1017            }
1018            return;
1019        }
1020        let mut delta_tokens = 0u64;
1021        let mut delta_steps = 0u32;
1022        let mut delta_tool_calls = 0u32;
1023        if event.event_type == "message.part.updated" {
1024            if let Some(part) = event.properties.get("part") {
1025                let part_type = part.get("type").and_then(|v| v.as_str()).unwrap_or("");
1026                if part_type == "tool-invocation" {
1027                    delta_tool_calls = 1;
1028                } else if part_type == "text" {
1029                    let delta = event
1030                        .properties
1031                        .get("delta")
1032                        .and_then(|v| v.as_str())
1033                        .unwrap_or("");
1034                    if !delta.is_empty() {
1035                        delta_tokens = estimate_tokens(delta);
1036                    }
1037                }
1038            }
1039        } else if event.event_type == "session.run.finished" {
1040            delta_steps = 1;
1041            let run_status = event
1042                .properties
1043                .get("status")
1044                .and_then(|v| v.as_str())
1045                .unwrap_or("")
1046                .to_ascii_lowercase();
1047            if run_status == "completed" {
1048                let _ = self
1049                    .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Completed)
1050                    .await;
1051            } else if run_status == "failed" || run_status == "error" {
1052                let _ = self
1053                    .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Failed)
1054                    .await;
1055            }
1056        }
1057        if delta_tokens == 0 && delta_steps == 0 && delta_tool_calls == 0 {
1058            return;
1059        }
1060        let exhausted = self
1061            .apply_budget_delta(
1062                state,
1063                &instance_id,
1064                delta_tokens,
1065                delta_steps,
1066                delta_tool_calls,
1067            )
1068            .await;
1069        if exhausted {
1070            let _ = self
1071                .cancel_instance(state, &instance_id, "budget exhausted")
1072                .await;
1073        }
1074    }
1075
1076    async fn instance_id_for_session(&self, session_id: &str) -> Option<String> {
1077        self.instances
1078            .read()
1079            .await
1080            .values()
1081            .find(|instance| instance.session_id == session_id)
1082            .map(|instance| instance.instance_id.clone())
1083    }
1084
1085    async fn apply_budget_delta(
1086        &self,
1087        state: &AppState,
1088        instance_id: &str,
1089        delta_tokens: u64,
1090        delta_steps: u32,
1091        delta_tool_calls: u32,
1092    ) -> bool {
1093        let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1094            enabled: false,
1095            require_justification: false,
1096            max_agents: None,
1097            max_concurrent: None,
1098            child_budget_percent_of_parent_remaining: None,
1099            mission_total_budget: None,
1100            cost_per_1k_tokens_usd: None,
1101            spawn_edges: HashMap::new(),
1102            required_skills: HashMap::new(),
1103            role_defaults: HashMap::new(),
1104            skill_sources: Default::default(),
1105        });
1106        let mut budgets = self.budgets.write().await;
1107        let Some(usage) = budgets.get_mut(instance_id) else {
1108            return false;
1109        };
1110        if usage.exhausted {
1111            return true;
1112        }
1113        let prev_cost_used_usd = usage.cost_used_usd;
1114        usage.tokens_used = usage.tokens_used.saturating_add(delta_tokens);
1115        usage.steps_used = usage.steps_used.saturating_add(delta_steps);
1116        usage.tool_calls_used = usage.tool_calls_used.saturating_add(delta_tool_calls);
1117        if let Some(rate) = policy.cost_per_1k_tokens_usd {
1118            usage.cost_used_usd += (delta_tokens as f64 / 1000.0) * rate;
1119        }
1120        let elapsed_ms = usage
1121            .started_at
1122            .map(|started| started.elapsed().as_millis() as u64)
1123            .unwrap_or(0);
1124
1125        let mut exhausted_reason: Option<&'static str> = None;
1126        let mut snapshot: Option<AgentInstance> = None;
1127        {
1128            let mut instances = self.instances.write().await;
1129            if let Some(instance) = instances.get_mut(instance_id) {
1130                instance.metadata = Some(merge_metadata_usage(
1131                    instance.metadata.take(),
1132                    usage.tokens_used,
1133                    usage.steps_used,
1134                    usage.tool_calls_used,
1135                    usage.cost_used_usd,
1136                    elapsed_ms,
1137                ));
1138                if let Some(limit) = instance.budget.max_tokens {
1139                    if usage.tokens_used >= limit {
1140                        exhausted_reason = Some("max_tokens");
1141                    }
1142                }
1143                if exhausted_reason.is_none() {
1144                    if let Some(limit) = instance.budget.max_steps {
1145                        if usage.steps_used >= limit {
1146                            exhausted_reason = Some("max_steps");
1147                        }
1148                    }
1149                }
1150                if exhausted_reason.is_none() {
1151                    if let Some(limit) = instance.budget.max_tool_calls {
1152                        if usage.tool_calls_used >= limit {
1153                            exhausted_reason = Some("max_tool_calls");
1154                        }
1155                    }
1156                }
1157                if exhausted_reason.is_none() {
1158                    if let Some(limit) = instance.budget.max_duration_ms {
1159                        if elapsed_ms >= limit {
1160                            exhausted_reason = Some("max_duration_ms");
1161                        }
1162                    }
1163                }
1164                if exhausted_reason.is_none() {
1165                    if let Some(limit) = instance.budget.max_cost_usd {
1166                        if usage.cost_used_usd >= limit {
1167                            exhausted_reason = Some("max_cost_usd");
1168                        }
1169                    }
1170                }
1171                snapshot = Some(instance.clone());
1172            }
1173        }
1174        let Some(instance) = snapshot else {
1175            return false;
1176        };
1177        emit_budget_usage(
1178            state,
1179            &instance,
1180            usage.tokens_used,
1181            usage.steps_used,
1182            usage.tool_calls_used,
1183            usage.cost_used_usd,
1184            elapsed_ms,
1185        );
1186        let mission_exhausted = self
1187            .apply_mission_budget_delta(
1188                state,
1189                &instance.mission_id,
1190                delta_tokens,
1191                u64::from(delta_steps),
1192                u64::from(delta_tool_calls),
1193                usage.cost_used_usd - prev_cost_used_usd,
1194                &policy,
1195            )
1196            .await;
1197        if mission_exhausted {
1198            usage.exhausted = true;
1199            let _ = self
1200                .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1201                .await;
1202            return true;
1203        }
1204        if let Some(reason) = exhausted_reason {
1205            usage.exhausted = true;
1206            emit_budget_exhausted(
1207                state,
1208                &instance,
1209                reason,
1210                usage.tokens_used,
1211                usage.steps_used,
1212                usage.tool_calls_used,
1213                usage.cost_used_usd,
1214                elapsed_ms,
1215            );
1216            return true;
1217        }
1218        false
1219    }
1220
1221    async fn apply_exact_token_usage(
1222        &self,
1223        state: &AppState,
1224        instance_id: &str,
1225        total_tokens: u64,
1226        cost_used_usd: f64,
1227    ) -> bool {
1228        let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1229            enabled: false,
1230            require_justification: false,
1231            max_agents: None,
1232            max_concurrent: None,
1233            child_budget_percent_of_parent_remaining: None,
1234            mission_total_budget: None,
1235            cost_per_1k_tokens_usd: None,
1236            spawn_edges: HashMap::new(),
1237            required_skills: HashMap::new(),
1238            role_defaults: HashMap::new(),
1239            skill_sources: Default::default(),
1240        });
1241        let mut budgets = self.budgets.write().await;
1242        let Some(usage) = budgets.get_mut(instance_id) else {
1243            return false;
1244        };
1245        if usage.exhausted {
1246            return true;
1247        }
1248        let prev_tokens = usage.tokens_used;
1249        let prev_cost_used_usd = usage.cost_used_usd;
1250        usage.tokens_used = usage.tokens_used.max(total_tokens);
1251        if cost_used_usd > 0.0 {
1252            usage.cost_used_usd = usage.cost_used_usd.max(cost_used_usd);
1253        } else if let Some(rate) = policy.cost_per_1k_tokens_usd {
1254            let delta = usage.tokens_used.saturating_sub(prev_tokens);
1255            usage.cost_used_usd += (delta as f64 / 1000.0) * rate;
1256        }
1257        let elapsed_ms = usage
1258            .started_at
1259            .map(|started| started.elapsed().as_millis() as u64)
1260            .unwrap_or(0);
1261        let mut exhausted_reason: Option<&'static str> = None;
1262        let mut snapshot: Option<AgentInstance> = None;
1263        {
1264            let mut instances = self.instances.write().await;
1265            if let Some(instance) = instances.get_mut(instance_id) {
1266                instance.metadata = Some(merge_metadata_usage(
1267                    instance.metadata.take(),
1268                    usage.tokens_used,
1269                    usage.steps_used,
1270                    usage.tool_calls_used,
1271                    usage.cost_used_usd,
1272                    elapsed_ms,
1273                ));
1274                if let Some(limit) = instance.budget.max_tokens {
1275                    if usage.tokens_used >= limit {
1276                        exhausted_reason = Some("max_tokens");
1277                    }
1278                }
1279                if exhausted_reason.is_none() {
1280                    if let Some(limit) = instance.budget.max_cost_usd {
1281                        if usage.cost_used_usd >= limit {
1282                            exhausted_reason = Some("max_cost_usd");
1283                        }
1284                    }
1285                }
1286                snapshot = Some(instance.clone());
1287            }
1288        }
1289        let Some(instance) = snapshot else {
1290            return false;
1291        };
1292        emit_budget_usage(
1293            state,
1294            &instance,
1295            usage.tokens_used,
1296            usage.steps_used,
1297            usage.tool_calls_used,
1298            usage.cost_used_usd,
1299            elapsed_ms,
1300        );
1301        let mission_exhausted = self
1302            .apply_mission_budget_delta(
1303                state,
1304                &instance.mission_id,
1305                usage.tokens_used.saturating_sub(prev_tokens),
1306                0,
1307                0,
1308                usage.cost_used_usd - prev_cost_used_usd,
1309                &policy,
1310            )
1311            .await;
1312        if mission_exhausted {
1313            usage.exhausted = true;
1314            let _ = self
1315                .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1316                .await;
1317            return true;
1318        }
1319        if let Some(reason) = exhausted_reason {
1320            usage.exhausted = true;
1321            emit_budget_exhausted(
1322                state,
1323                &instance,
1324                reason,
1325                usage.tokens_used,
1326                usage.steps_used,
1327                usage.tool_calls_used,
1328                usage.cost_used_usd,
1329                elapsed_ms,
1330            );
1331            return true;
1332        }
1333        false
1334    }
1335
1336    async fn append_audit(&self, action: &str, instance: &AgentInstance) -> anyhow::Result<()> {
1337        let path = self.audit_path.read().await.clone();
1338        if let Some(parent) = path.parent() {
1339            fs::create_dir_all(parent).await?;
1340        }
1341        let row = json!({
1342            "action": action,
1343            "missionID": instance.mission_id,
1344            "instanceID": instance.instance_id,
1345            "parentInstanceID": instance.parent_instance_id,
1346            "role": instance.role,
1347            "templateID": instance.template_id,
1348            "sessionID": instance.session_id,
1349            "skillHash": instance.skill_hash,
1350            "timestampMs": crate::now_ms(),
1351        });
1352        let mut existing = if path.exists() {
1353            fs::read_to_string(&path).await.unwrap_or_default()
1354        } else {
1355            String::new()
1356        };
1357        existing.push_str(&serde_json::to_string(&row)?);
1358        existing.push('\n');
1359        fs::write(path, existing).await?;
1360        Ok(())
1361    }
1362
1363    async fn append_approval_audit(
1364        &self,
1365        action: &str,
1366        approval_id: &str,
1367        instance: Option<&AgentInstance>,
1368        reason: &str,
1369    ) -> anyhow::Result<()> {
1370        let path = self.audit_path.read().await.clone();
1371        if let Some(parent) = path.parent() {
1372            fs::create_dir_all(parent).await?;
1373        }
1374        let row = json!({
1375            "action": action,
1376            "approvalID": approval_id,
1377            "reason": reason,
1378            "missionID": instance.map(|v| v.mission_id.clone()),
1379            "instanceID": instance.map(|v| v.instance_id.clone()),
1380            "parentInstanceID": instance.and_then(|v| v.parent_instance_id.clone()),
1381            "role": instance.map(|v| v.role.clone()),
1382            "templateID": instance.map(|v| v.template_id.clone()),
1383            "sessionID": instance.map(|v| v.session_id.clone()),
1384            "skillHash": instance.map(|v| v.skill_hash.clone()),
1385            "timestampMs": crate::now_ms(),
1386        });
1387        let mut existing = if path.exists() {
1388            fs::read_to_string(&path).await.unwrap_or_default()
1389        } else {
1390            String::new()
1391        };
1392        existing.push_str(&serde_json::to_string(&row)?);
1393        existing.push('\n');
1394        fs::write(path, existing).await?;
1395        Ok(())
1396    }
1397
1398    #[allow(clippy::too_many_arguments)]
1399    async fn apply_mission_budget_delta(
1400        &self,
1401        state: &AppState,
1402        mission_id: &str,
1403        delta_tokens: u64,
1404        delta_steps: u64,
1405        delta_tool_calls: u64,
1406        delta_cost_used_usd: f64,
1407        policy: &SpawnPolicy,
1408    ) -> bool {
1409        let mut budgets = self.mission_budgets.write().await;
1410        let row = budgets.entry(mission_id.to_string()).or_default();
1411        row.tokens_used = row.tokens_used.saturating_add(delta_tokens);
1412        row.steps_used = row.steps_used.saturating_add(delta_steps);
1413        row.tool_calls_used = row.tool_calls_used.saturating_add(delta_tool_calls);
1414        row.cost_used_usd += delta_cost_used_usd.max(0.0);
1415        if row.exhausted {
1416            return true;
1417        }
1418        let Some(limit) = policy.mission_total_budget.as_ref() else {
1419            return false;
1420        };
1421        let mut exhausted_by: Option<&'static str> = None;
1422        if let Some(max) = limit.max_tokens {
1423            if row.tokens_used >= max {
1424                exhausted_by = Some("mission_max_tokens");
1425            }
1426        }
1427        if exhausted_by.is_none() {
1428            if let Some(max) = limit.max_steps {
1429                if row.steps_used >= u64::from(max) {
1430                    exhausted_by = Some("mission_max_steps");
1431                }
1432            }
1433        }
1434        if exhausted_by.is_none() {
1435            if let Some(max) = limit.max_tool_calls {
1436                if row.tool_calls_used >= u64::from(max) {
1437                    exhausted_by = Some("mission_max_tool_calls");
1438                }
1439            }
1440        }
1441        if exhausted_by.is_none() {
1442            if let Some(max) = limit.max_cost_usd {
1443                if row.cost_used_usd >= max {
1444                    exhausted_by = Some("mission_max_cost_usd");
1445                }
1446            }
1447        }
1448        if let Some(exhausted_by) = exhausted_by {
1449            row.exhausted = true;
1450            emit_mission_budget_exhausted(
1451                state,
1452                mission_id,
1453                exhausted_by,
1454                row.tokens_used,
1455                row.steps_used,
1456                row.tool_calls_used,
1457                row.cost_used_usd,
1458            );
1459            return true;
1460        }
1461        false
1462    }
1463
1464    pub async fn set_for_test(
1465        &self,
1466        workspace_root: Option<String>,
1467        policy: Option<SpawnPolicy>,
1468        templates: Vec<AgentTemplate>,
1469    ) {
1470        *self.policy.write().await = policy;
1471        let mut by_id = HashMap::new();
1472        for template in templates {
1473            by_id.insert(template.template_id.clone(), template);
1474        }
1475        *self.templates.write().await = by_id;
1476        self.instances.write().await.clear();
1477        self.budgets.write().await.clear();
1478        self.mission_budgets.write().await.clear();
1479        self.spawn_approvals.write().await.clear();
1480        *self.loaded_workspace.write().await = workspace_root;
1481    }
1482}
1483
1484fn resolve_budget(
1485    policy: &SpawnPolicy,
1486    parent_instance: Option<AgentInstance>,
1487    parent_usage: Option<InstanceBudgetState>,
1488    template: &AgentTemplate,
1489    override_budget: Option<BudgetLimit>,
1490    role: &AgentRole,
1491) -> BudgetLimit {
1492    let role_default = policy.role_defaults.get(role).cloned().unwrap_or_default();
1493    let mut chosen = merge_budget(
1494        merge_budget(role_default, template.default_budget.clone()),
1495        override_budget.unwrap_or_default(),
1496    );
1497
1498    if let Some(parent) = parent_instance {
1499        let usage = parent_usage.unwrap_or_default();
1500        if let Some(pct) = policy.child_budget_percent_of_parent_remaining {
1501            if pct > 0 {
1502                chosen.max_tokens = cap_budget_remaining_u64(
1503                    chosen.max_tokens,
1504                    parent.budget.max_tokens,
1505                    usage.tokens_used,
1506                    pct,
1507                );
1508                chosen.max_steps = cap_budget_remaining_u32(
1509                    chosen.max_steps,
1510                    parent.budget.max_steps,
1511                    usage.steps_used,
1512                    pct,
1513                );
1514                chosen.max_tool_calls = cap_budget_remaining_u32(
1515                    chosen.max_tool_calls,
1516                    parent.budget.max_tool_calls,
1517                    usage.tool_calls_used,
1518                    pct,
1519                );
1520                chosen.max_duration_ms = cap_budget_remaining_u64(
1521                    chosen.max_duration_ms,
1522                    parent.budget.max_duration_ms,
1523                    usage
1524                        .started_at
1525                        .map(|started| started.elapsed().as_millis() as u64)
1526                        .unwrap_or(0),
1527                    pct,
1528                );
1529                chosen.max_cost_usd = cap_budget_remaining_f64(
1530                    chosen.max_cost_usd,
1531                    parent.budget.max_cost_usd,
1532                    usage.cost_used_usd,
1533                    pct,
1534                );
1535            }
1536        }
1537    }
1538    chosen
1539}
1540
1541fn merge_budget(base: BudgetLimit, overlay: BudgetLimit) -> BudgetLimit {
1542    BudgetLimit {
1543        max_tokens: overlay.max_tokens.or(base.max_tokens),
1544        max_steps: overlay.max_steps.or(base.max_steps),
1545        max_tool_calls: overlay.max_tool_calls.or(base.max_tool_calls),
1546        max_duration_ms: overlay.max_duration_ms.or(base.max_duration_ms),
1547        max_cost_usd: overlay.max_cost_usd.or(base.max_cost_usd),
1548    }
1549}
1550
1551fn cap_budget_remaining_u64(
1552    child: Option<u64>,
1553    parent_limit: Option<u64>,
1554    parent_used: u64,
1555    pct: u8,
1556) -> Option<u64> {
1557    match (child, parent_limit) {
1558        (Some(child), Some(parent_limit)) => {
1559            let remaining = parent_limit.saturating_sub(parent_used);
1560            Some(child.min(remaining.saturating_mul(pct as u64) / 100))
1561        }
1562        (None, Some(parent_limit)) => {
1563            let remaining = parent_limit.saturating_sub(parent_used);
1564            Some(remaining.saturating_mul(pct as u64) / 100)
1565        }
1566        (Some(child), None) => Some(child),
1567        (None, None) => None,
1568    }
1569}
1570
1571fn cap_budget_remaining_u32(
1572    child: Option<u32>,
1573    parent_limit: Option<u32>,
1574    parent_used: u32,
1575    pct: u8,
1576) -> Option<u32> {
1577    match (child, parent_limit) {
1578        (Some(child), Some(parent_limit)) => {
1579            let remaining = parent_limit.saturating_sub(parent_used);
1580            Some(child.min(remaining.saturating_mul(pct as u32) / 100))
1581        }
1582        (None, Some(parent_limit)) => {
1583            let remaining = parent_limit.saturating_sub(parent_used);
1584            Some(remaining.saturating_mul(pct as u32) / 100)
1585        }
1586        (Some(child), None) => Some(child),
1587        (None, None) => None,
1588    }
1589}
1590
1591fn cap_budget_remaining_f64(
1592    child: Option<f64>,
1593    parent_limit: Option<f64>,
1594    parent_used: f64,
1595    pct: u8,
1596) -> Option<f64> {
1597    match (child, parent_limit) {
1598        (Some(child), Some(parent_limit)) => {
1599            let remaining = (parent_limit - parent_used).max(0.0);
1600            Some(child.min(remaining * f64::from(pct) / 100.0))
1601        }
1602        (None, Some(parent_limit)) => {
1603            let remaining = (parent_limit - parent_used).max(0.0);
1604            Some(remaining * f64::from(pct) / 100.0)
1605        }
1606        (Some(child), None) => Some(child),
1607        (None, None) => None,
1608    }
1609}
1610
1611async fn compute_skill_hash(
1612    workspace_root: &str,
1613    template: &AgentTemplate,
1614    policy: &SpawnPolicy,
1615) -> Result<String, String> {
1616    use sha2::{Digest, Sha256};
1617    let mut rows = Vec::new();
1618    let skill_service = SkillService::for_workspace(Some(PathBuf::from(workspace_root)));
1619    for skill in &template.skills {
1620        if let Some(path) = skill.path.as_deref() {
1621            validate_skill_source(skill.id.as_deref(), Some(path), policy)?;
1622            let skill_path = Path::new(workspace_root).join(path);
1623            let raw = fs::read_to_string(&skill_path)
1624                .await
1625                .map_err(|_| format!("missing required skill path `{}`", skill_path.display()))?;
1626            let digest = hash_hex(raw.as_bytes());
1627            validate_pinned_hash(skill.id.as_deref(), Some(path), &digest, policy)?;
1628            rows.push(format!("path:{}:{}", path, digest));
1629        } else if let Some(id) = skill.id.as_deref() {
1630            validate_skill_source(Some(id), None, policy)?;
1631            let loaded = skill_service
1632                .load_skill(id)
1633                .map_err(|err| format!("failed loading skill `{id}`: {err}"))?;
1634            let Some(loaded) = loaded else {
1635                return Err(format!("missing required skill id `{id}`"));
1636            };
1637            let digest = hash_hex(loaded.content.as_bytes());
1638            validate_pinned_hash(Some(id), None, &digest, policy)?;
1639            rows.push(format!("id:{}:{}", id, digest));
1640        }
1641    }
1642    rows.sort();
1643    let mut hasher = Sha256::new();
1644    for row in rows {
1645        hasher.update(row.as_bytes());
1646        hasher.update(b"\n");
1647    }
1648    let digest = hasher.finalize();
1649    Ok(format!("sha256:{}", hash_hex(digest.as_slice())))
1650}
1651
1652fn validate_skill_source(
1653    id: Option<&str>,
1654    path: Option<&str>,
1655    policy: &SpawnPolicy,
1656) -> Result<(), String> {
1657    use tandem_orchestrator::SkillSourceMode;
1658    match policy.skill_sources.mode {
1659        SkillSourceMode::Any => Ok(()),
1660        SkillSourceMode::ProjectOnly => {
1661            if id.is_some() {
1662                return Err("skill source denied: project_only forbids skill IDs".to_string());
1663            }
1664            let Some(path) = path else {
1665                return Err("skill source denied: project_only requires skill path".to_string());
1666            };
1667            let p = PathBuf::from(path);
1668            if p.is_absolute() {
1669                return Err("skill source denied: absolute skill paths are forbidden".to_string());
1670            }
1671            Ok(())
1672        }
1673        SkillSourceMode::Allowlist => {
1674            if let Some(id) = id {
1675                if policy.skill_sources.allowlist_ids.iter().any(|v| v == id) {
1676                    return Ok(());
1677                }
1678            }
1679            if let Some(path) = path {
1680                if policy
1681                    .skill_sources
1682                    .allowlist_paths
1683                    .iter()
1684                    .any(|v| v == path)
1685                {
1686                    return Ok(());
1687                }
1688            }
1689            Err("skill source denied: not present in allowlist".to_string())
1690        }
1691    }
1692}
1693
1694fn validate_pinned_hash(
1695    id: Option<&str>,
1696    path: Option<&str>,
1697    digest: &str,
1698    policy: &SpawnPolicy,
1699) -> Result<(), String> {
1700    let by_id = id.and_then(|id| policy.skill_sources.pinned_hashes.get(&format!("id:{id}")));
1701    let by_path = path.and_then(|path| {
1702        policy
1703            .skill_sources
1704            .pinned_hashes
1705            .get(&format!("path:{path}"))
1706    });
1707    let expected = by_id.or(by_path);
1708    if let Some(expected) = expected {
1709        let normalized = expected.strip_prefix("sha256:").unwrap_or(expected);
1710        if normalized != digest {
1711            return Err("pinned hash mismatch for skill reference".to_string());
1712        }
1713    }
1714    Ok(())
1715}
1716
1717fn hash_hex(bytes: &[u8]) -> String {
1718    let mut out = String::with_capacity(bytes.len() * 2);
1719    for byte in bytes {
1720        use std::fmt::Write as _;
1721        let _ = write!(&mut out, "{:02x}", byte);
1722    }
1723    out
1724}
1725
1726fn estimate_tokens(text: &str) -> u64 {
1727    let chars = text.chars().count() as u64;
1728    (chars / 4).max(1)
1729}
1730
1731fn extract_session_id(event: &EngineEvent) -> Option<String> {
1732    event
1733        .properties
1734        .get("sessionID")
1735        .and_then(|v| v.as_str())
1736        .map(|v| v.to_string())
1737        .or_else(|| {
1738            event
1739                .properties
1740                .get("part")
1741                .and_then(|v| v.get("sessionID"))
1742                .and_then(|v| v.as_str())
1743                .map(|v| v.to_string())
1744        })
1745}
1746
1747fn merge_metadata_usage(
1748    metadata: Option<Value>,
1749    tokens_used: u64,
1750    steps_used: u32,
1751    tool_calls_used: u32,
1752    cost_used_usd: f64,
1753    elapsed_ms: u64,
1754) -> Value {
1755    let mut base = metadata
1756        .and_then(|v| v.as_object().cloned())
1757        .unwrap_or_default();
1758    base.insert(
1759        "budgetUsage".to_string(),
1760        json!({
1761            "tokensUsed": tokens_used,
1762            "stepsUsed": steps_used,
1763            "toolCallsUsed": tool_calls_used,
1764            "costUsedUsd": cost_used_usd,
1765            "elapsedMs": elapsed_ms
1766        }),
1767    );
1768    Value::Object(base)
1769}
1770
1771fn normalize_tool_name(name: &str) -> String {
1772    match name.trim().to_lowercase().replace('-', "_").as_str() {
1773        "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
1774        other => other.to_string(),
1775    }
1776}
1777
1778async fn evaluate_capability_deny(
1779    state: &AppState,
1780    instance: &AgentInstance,
1781    tool: &str,
1782    args: &Value,
1783    caps: &tandem_orchestrator::CapabilitySpec,
1784    session_id: &str,
1785    message_id: &str,
1786) -> Option<String> {
1787    let deny_patterns = caps
1788        .tool_denylist
1789        .iter()
1790        .map(|name| normalize_tool_name(name))
1791        .collect::<Vec<_>>();
1792    if !deny_patterns.is_empty() && any_policy_matches(&deny_patterns, tool) {
1793        return Some(format!("tool `{tool}` denied by agent capability policy"));
1794    }
1795    let allow_patterns = caps
1796        .tool_allowlist
1797        .iter()
1798        .map(|name| normalize_tool_name(name))
1799        .collect::<Vec<_>>();
1800    if !allow_patterns.is_empty() && !any_policy_matches(&allow_patterns, tool) {
1801        return Some(format!("tool `{tool}` not in agent allowlist"));
1802    }
1803
1804    let browser_execution_tool = matches!(
1805        tool,
1806        "browser_open"
1807            | "browser_navigate"
1808            | "browser_snapshot"
1809            | "browser_click"
1810            | "browser_type"
1811            | "browser_press"
1812            | "browser_wait"
1813            | "browser_extract"
1814            | "browser_screenshot"
1815            | "browser_close"
1816    );
1817
1818    if matches!(
1819        tool,
1820        "websearch" | "webfetch" | "webfetch_html" | "browser_open" | "browser_navigate"
1821    ) || browser_execution_tool
1822    {
1823        if !caps.net_scopes.enabled {
1824            return Some("network disabled for this agent instance".to_string());
1825        }
1826        if !caps.net_scopes.allow_hosts.is_empty() {
1827            if tool == "websearch" {
1828                return Some(
1829                    "websearch blocked: host allowlist cannot be verified for search tool"
1830                        .to_string(),
1831                );
1832            }
1833            if let Some(host) = extract_url_host(args) {
1834                let allowed = caps.net_scopes.allow_hosts.iter().any(|h| {
1835                    let allowed = h.trim().to_ascii_lowercase();
1836                    !allowed.is_empty()
1837                        && (host == allowed || host.ends_with(&format!(".{allowed}")))
1838                });
1839                if !allowed {
1840                    return Some(format!("network host `{host}` not in allow_hosts"));
1841                }
1842            }
1843        }
1844    }
1845
1846    if tool == "bash" {
1847        let cmd = args
1848            .get("command")
1849            .and_then(|v| v.as_str())
1850            .unwrap_or("")
1851            .to_ascii_lowercase();
1852        if cmd.contains("git push") {
1853            if !caps.git_caps.push {
1854                return Some("git push disabled for this agent instance".to_string());
1855            }
1856            if caps.git_caps.push_requires_approval {
1857                let action = state.permissions.evaluate("git_push", "git_push").await;
1858                match action {
1859                    tandem_core::PermissionAction::Allow => {}
1860                    tandem_core::PermissionAction::Deny => {
1861                        return Some("git push denied by policy rule".to_string());
1862                    }
1863                    tandem_core::PermissionAction::Ask => {
1864                        let pending = state
1865                            .permissions
1866                            .ask_for_session_with_context(
1867                                Some(session_id),
1868                                "git_push",
1869                                args.clone(),
1870                                Some(tandem_core::PermissionArgsContext {
1871                                    args_source: "agent_team.git_push".to_string(),
1872                                    args_integrity: "runtime-checked".to_string(),
1873                                    query: Some(format!(
1874                                        "instanceID={} messageID={}",
1875                                        instance.instance_id, message_id
1876                                    )),
1877                                }),
1878                            )
1879                            .await;
1880                        return Some(format!(
1881                            "git push requires explicit user approval (approvalID={})",
1882                            pending.id
1883                        ));
1884                    }
1885                }
1886            }
1887        }
1888        if cmd.contains("git commit") && !caps.git_caps.commit {
1889            return Some("git commit disabled for this agent instance".to_string());
1890        }
1891    }
1892
1893    let access_kind = tool_fs_access_kind(tool);
1894    if let Some(kind) = access_kind {
1895        let Some(session) = state.storage.get_session(session_id).await else {
1896            return Some("session not found for capability evaluation".to_string());
1897        };
1898        let Some(root) = session.workspace_root.clone() else {
1899            return Some("workspace root missing for capability evaluation".to_string());
1900        };
1901        let requested = extract_tool_candidate_paths(tool, args);
1902        if !requested.is_empty() {
1903            let allowed_scopes = if kind == "read" {
1904                &caps.fs_scopes.read
1905            } else {
1906                &caps.fs_scopes.write
1907            };
1908            if allowed_scopes.is_empty() {
1909                return Some(format!("fs {kind} access blocked: no scopes configured"));
1910            }
1911            for candidate in requested {
1912                if !is_path_allowed_by_scopes(&root, &candidate, allowed_scopes) {
1913                    return Some(format!("fs {kind} access denied for path `{}`", candidate));
1914                }
1915            }
1916        }
1917    }
1918
1919    denied_secrets_reason(tool, caps, args)
1920}
1921
1922fn denied_secrets_reason(
1923    tool: &str,
1924    caps: &tandem_orchestrator::CapabilitySpec,
1925    args: &Value,
1926) -> Option<String> {
1927    if tool == "auth" {
1928        if caps.secrets_scopes.is_empty() {
1929            return Some("secrets are disabled for this agent instance".to_string());
1930        }
1931        let alias = args
1932            .get("id")
1933            .or_else(|| args.get("provider"))
1934            .or_else(|| args.get("providerID"))
1935            .and_then(|v| v.as_str())
1936            .unwrap_or("")
1937            .trim();
1938        if !alias.is_empty() && !caps.secrets_scopes.iter().any(|allowed| allowed == alias) {
1939            return Some(format!(
1940                "secret alias `{alias}` is not in agent secretsScopes allowlist"
1941            ));
1942        }
1943    }
1944    None
1945}
1946
1947fn tool_fs_access_kind(tool: &str) -> Option<&'static str> {
1948    match tool {
1949        "read" | "glob" | "grep" | "codesearch" | "lsp" => Some("read"),
1950        "write" | "edit" | "apply_patch" => Some("write"),
1951        _ => None,
1952    }
1953}
1954
1955fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
1956    let Some(obj) = args.as_object() else {
1957        return Vec::new();
1958    };
1959    let keys: &[&str] = match tool {
1960        "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
1961        "glob" => &["pattern"],
1962        "lsp" => &["filePath", "path"],
1963        "bash" => &["cwd"],
1964        "apply_patch" => &["path"],
1965        _ => &["path", "cwd"],
1966    };
1967    keys.iter()
1968        .filter_map(|key| obj.get(*key))
1969        .filter_map(|value| value.as_str())
1970        .filter(|s| !s.trim().is_empty())
1971        .map(|raw| strip_glob_tokens(raw).to_string())
1972        .collect()
1973}
1974
1975fn strip_glob_tokens(path: &str) -> &str {
1976    let mut end = path.len();
1977    for (idx, ch) in path.char_indices() {
1978        if ch == '*' || ch == '?' || ch == '[' {
1979            end = idx;
1980            break;
1981        }
1982    }
1983    &path[..end]
1984}
1985
1986fn is_path_allowed_by_scopes(root: &str, candidate: &str, scopes: &[String]) -> bool {
1987    let root_path = PathBuf::from(root);
1988    let candidate_path = resolve_path(&root_path, candidate);
1989    scopes.iter().any(|scope| {
1990        let scope_path = resolve_path(&root_path, scope);
1991        candidate_path.starts_with(scope_path)
1992    })
1993}
1994
1995fn resolve_path(root: &Path, raw: &str) -> PathBuf {
1996    let raw = raw.trim();
1997    if raw.is_empty() {
1998        return root.to_path_buf();
1999    }
2000    let path = PathBuf::from(raw);
2001    if path.is_absolute() {
2002        path
2003    } else {
2004        root.join(path)
2005    }
2006}
2007
2008fn extract_url_host(args: &Value) -> Option<String> {
2009    let url = args
2010        .get("url")
2011        .or_else(|| args.get("uri"))
2012        .or_else(|| args.get("link"))
2013        .and_then(|v| v.as_str())?;
2014    let raw = url.trim();
2015    let (_, after_scheme) = raw.split_once("://")?;
2016    let host_port = after_scheme.split('/').next().unwrap_or_default();
2017    let host = host_port.split('@').next_back().unwrap_or_default();
2018    let host = host
2019        .split(':')
2020        .next()
2021        .unwrap_or_default()
2022        .to_ascii_lowercase();
2023    if host.is_empty() {
2024        None
2025    } else {
2026        Some(host)
2027    }
2028}
2029
2030pub fn emit_spawn_requested(state: &AppState, req: &SpawnRequest) {
2031    emit_spawn_requested_with_context(state, req, &SpawnEventContext::default());
2032}
2033
2034pub fn emit_spawn_denied(state: &AppState, req: &SpawnRequest, decision: &SpawnDecision) {
2035    emit_spawn_denied_with_context(state, req, decision, &SpawnEventContext::default());
2036}
2037
2038pub fn emit_spawn_approved(state: &AppState, req: &SpawnRequest, instance: &AgentInstance) {
2039    emit_spawn_approved_with_context(state, req, instance, &SpawnEventContext::default());
2040}
2041
2042#[derive(Default)]
2043pub struct SpawnEventContext<'a> {
2044    pub session_id: Option<&'a str>,
2045    pub message_id: Option<&'a str>,
2046    pub run_id: Option<&'a str>,
2047}
2048
2049pub fn emit_spawn_requested_with_context(
2050    state: &AppState,
2051    req: &SpawnRequest,
2052    ctx: &SpawnEventContext<'_>,
2053) {
2054    state.event_bus.publish(EngineEvent::new(
2055        "agent_team.spawn.requested",
2056        json!({
2057            "sessionID": ctx.session_id,
2058            "messageID": ctx.message_id,
2059            "runID": ctx.run_id,
2060            "missionID": req.mission_id,
2061            "instanceID": Value::Null,
2062            "parentInstanceID": req.parent_instance_id,
2063            "source": req.source,
2064            "requestedRole": req.role,
2065            "templateID": req.template_id,
2066            "justification": req.justification,
2067            "timestampMs": crate::now_ms(),
2068        }),
2069    ));
2070}
2071
2072pub fn emit_spawn_denied_with_context(
2073    state: &AppState,
2074    req: &SpawnRequest,
2075    decision: &SpawnDecision,
2076    ctx: &SpawnEventContext<'_>,
2077) {
2078    state.event_bus.publish(EngineEvent::new(
2079        "agent_team.spawn.denied",
2080        json!({
2081            "sessionID": ctx.session_id,
2082            "messageID": ctx.message_id,
2083            "runID": ctx.run_id,
2084            "missionID": req.mission_id,
2085            "instanceID": Value::Null,
2086            "parentInstanceID": req.parent_instance_id,
2087            "source": req.source,
2088            "requestedRole": req.role,
2089            "templateID": req.template_id,
2090            "code": decision.code,
2091            "error": decision.reason,
2092            "timestampMs": crate::now_ms(),
2093        }),
2094    ));
2095}
2096
2097pub fn emit_spawn_approved_with_context(
2098    state: &AppState,
2099    req: &SpawnRequest,
2100    instance: &AgentInstance,
2101    ctx: &SpawnEventContext<'_>,
2102) {
2103    state.event_bus.publish(EngineEvent::new(
2104        "agent_team.spawn.approved",
2105        json!({
2106            "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2107            "messageID": ctx.message_id,
2108            "runID": ctx.run_id.or(instance.run_id.as_deref()),
2109            "missionID": instance.mission_id,
2110            "instanceID": instance.instance_id,
2111            "parentInstanceID": instance.parent_instance_id,
2112            "source": req.source,
2113            "requestedRole": req.role,
2114            "templateID": instance.template_id,
2115            "skillHash": instance.skill_hash,
2116            "timestampMs": crate::now_ms(),
2117        }),
2118    ));
2119    state.event_bus.publish(EngineEvent::new(
2120        "agent_team.instance.started",
2121        json!({
2122            "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2123            "messageID": ctx.message_id,
2124            "runID": ctx.run_id.or(instance.run_id.as_deref()),
2125            "missionID": instance.mission_id,
2126            "instanceID": instance.instance_id,
2127            "parentInstanceID": instance.parent_instance_id,
2128            "role": instance.role,
2129            "status": instance.status,
2130            "budgetLimit": instance.budget,
2131            "skillHash": instance.skill_hash,
2132            "timestampMs": crate::now_ms(),
2133        }),
2134    ));
2135}
2136
2137pub fn emit_budget_usage(
2138    state: &AppState,
2139    instance: &AgentInstance,
2140    tokens_used: u64,
2141    steps_used: u32,
2142    tool_calls_used: u32,
2143    cost_used_usd: f64,
2144    elapsed_ms: u64,
2145) {
2146    state.event_bus.publish(EngineEvent::new(
2147        "agent_team.budget.usage",
2148        json!({
2149            "sessionID": instance.session_id,
2150            "messageID": Value::Null,
2151            "runID": instance.run_id,
2152            "missionID": instance.mission_id,
2153            "instanceID": instance.instance_id,
2154            "tokensUsed": tokens_used,
2155            "stepsUsed": steps_used,
2156            "toolCallsUsed": tool_calls_used,
2157            "costUsedUsd": cost_used_usd,
2158            "elapsedMs": elapsed_ms,
2159            "timestampMs": crate::now_ms(),
2160        }),
2161    ));
2162}
2163
2164#[allow(clippy::too_many_arguments)]
2165pub fn emit_budget_exhausted(
2166    state: &AppState,
2167    instance: &AgentInstance,
2168    exhausted_by: &str,
2169    tokens_used: u64,
2170    steps_used: u32,
2171    tool_calls_used: u32,
2172    cost_used_usd: f64,
2173    elapsed_ms: u64,
2174) {
2175    state.event_bus.publish(EngineEvent::new(
2176        "agent_team.budget.exhausted",
2177        json!({
2178            "sessionID": instance.session_id,
2179            "messageID": Value::Null,
2180            "runID": instance.run_id,
2181            "missionID": instance.mission_id,
2182            "instanceID": instance.instance_id,
2183            "exhaustedBy": exhausted_by,
2184            "tokensUsed": tokens_used,
2185            "stepsUsed": steps_used,
2186            "toolCallsUsed": tool_calls_used,
2187            "costUsedUsd": cost_used_usd,
2188            "elapsedMs": elapsed_ms,
2189            "timestampMs": crate::now_ms(),
2190        }),
2191    ));
2192}
2193
2194pub fn emit_instance_cancelled(state: &AppState, instance: &AgentInstance, reason: &str) {
2195    state.event_bus.publish(EngineEvent::new(
2196        "agent_team.instance.cancelled",
2197        json!({
2198            "sessionID": instance.session_id,
2199            "messageID": Value::Null,
2200            "runID": instance.run_id,
2201            "missionID": instance.mission_id,
2202            "instanceID": instance.instance_id,
2203            "parentInstanceID": instance.parent_instance_id,
2204            "role": instance.role,
2205            "status": instance.status,
2206            "reason": reason,
2207            "timestampMs": crate::now_ms(),
2208        }),
2209    ));
2210}
2211
2212pub fn emit_instance_completed(state: &AppState, instance: &AgentInstance) {
2213    state.event_bus.publish(EngineEvent::new(
2214        "agent_team.instance.completed",
2215        json!({
2216            "sessionID": instance.session_id,
2217            "messageID": Value::Null,
2218            "runID": instance.run_id,
2219            "missionID": instance.mission_id,
2220            "instanceID": instance.instance_id,
2221            "parentInstanceID": instance.parent_instance_id,
2222            "role": instance.role,
2223            "status": instance.status,
2224            "timestampMs": crate::now_ms(),
2225        }),
2226    ));
2227}
2228
2229pub fn emit_instance_failed(state: &AppState, instance: &AgentInstance) {
2230    state.event_bus.publish(EngineEvent::new(
2231        "agent_team.instance.failed",
2232        json!({
2233            "sessionID": instance.session_id,
2234            "messageID": Value::Null,
2235            "runID": instance.run_id,
2236            "missionID": instance.mission_id,
2237            "instanceID": instance.instance_id,
2238            "parentInstanceID": instance.parent_instance_id,
2239            "role": instance.role,
2240            "status": instance.status,
2241            "timestampMs": crate::now_ms(),
2242        }),
2243    ));
2244}
2245
2246pub fn emit_mission_budget_exhausted(
2247    state: &AppState,
2248    mission_id: &str,
2249    exhausted_by: &str,
2250    tokens_used: u64,
2251    steps_used: u64,
2252    tool_calls_used: u64,
2253    cost_used_usd: f64,
2254) {
2255    state.event_bus.publish(EngineEvent::new(
2256        "agent_team.mission.budget.exhausted",
2257        json!({
2258            "sessionID": Value::Null,
2259            "messageID": Value::Null,
2260            "runID": Value::Null,
2261            "missionID": mission_id,
2262            "instanceID": Value::Null,
2263            "exhaustedBy": exhausted_by,
2264            "tokensUsed": tokens_used,
2265            "stepsUsed": steps_used,
2266            "toolCallsUsed": tool_calls_used,
2267            "costUsedUsd": cost_used_usd,
2268            "timestampMs": crate::now_ms(),
2269        }),
2270    ));
2271}