Skip to main content

tandem_server/
agent_teams.rs

1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::Context;
7use futures::future::BoxFuture;
8use serde::Deserialize;
9use serde::Serialize;
10use serde_json::{json, Value};
11use tandem_core::{
12    SpawnAgentHook, SpawnAgentToolContext, SpawnAgentToolResult, ToolPolicyContext,
13    ToolPolicyDecision, ToolPolicyHook,
14};
15use tandem_orchestrator::{
16    AgentInstance, AgentInstanceStatus, AgentRole, AgentTemplate, BudgetLimit, SpawnDecision,
17    SpawnDenyCode, SpawnPolicy, SpawnRequest, SpawnSource,
18};
19use tandem_skills::SkillService;
20use tandem_types::{EngineEvent, Session};
21use tokio::fs;
22use tokio::sync::RwLock;
23use uuid::Uuid;
24
25use crate::AppState;
26
27#[derive(Clone, Default)]
28pub struct AgentTeamRuntime {
29    policy: Arc<RwLock<Option<SpawnPolicy>>>,
30    templates: Arc<RwLock<HashMap<String, AgentTemplate>>>,
31    instances: Arc<RwLock<HashMap<String, AgentInstance>>>,
32    budgets: Arc<RwLock<HashMap<String, InstanceBudgetState>>>,
33    mission_budgets: Arc<RwLock<HashMap<String, MissionBudgetState>>>,
34    spawn_approvals: Arc<RwLock<HashMap<String, PendingSpawnApproval>>>,
35    loaded_workspace: Arc<RwLock<Option<String>>>,
36    audit_path: Arc<RwLock<PathBuf>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct SpawnResult {
41    pub decision: SpawnDecision,
42    pub instance: Option<AgentInstance>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46pub struct AgentMissionSummary {
47    #[serde(rename = "missionID")]
48    pub mission_id: String,
49    #[serde(rename = "instanceCount")]
50    pub instance_count: usize,
51    #[serde(rename = "runningCount")]
52    pub running_count: usize,
53    #[serde(rename = "completedCount")]
54    pub completed_count: usize,
55    #[serde(rename = "failedCount")]
56    pub failed_count: usize,
57    #[serde(rename = "cancelledCount")]
58    pub cancelled_count: usize,
59    #[serde(rename = "queuedCount")]
60    pub queued_count: usize,
61    #[serde(rename = "tokenUsedTotal")]
62    pub token_used_total: u64,
63    #[serde(rename = "toolCallsUsedTotal")]
64    pub tool_calls_used_total: u64,
65    #[serde(rename = "stepsUsedTotal")]
66    pub steps_used_total: u64,
67    #[serde(rename = "costUsedUsdTotal")]
68    pub cost_used_usd_total: f64,
69}
70
71#[derive(Debug, Clone, Default)]
72struct InstanceBudgetState {
73    tokens_used: u64,
74    steps_used: u32,
75    tool_calls_used: u32,
76    cost_used_usd: f64,
77    started_at: Option<Instant>,
78    exhausted: bool,
79}
80
81#[derive(Debug, Clone, Default)]
82struct MissionBudgetState {
83    tokens_used: u64,
84    steps_used: u64,
85    tool_calls_used: u64,
86    cost_used_usd: f64,
87    exhausted: bool,
88}
89
90#[derive(Debug, Clone, Serialize)]
91pub struct PendingSpawnApproval {
92    #[serde(rename = "approvalID")]
93    approval_id: String,
94    #[serde(rename = "createdAtMs")]
95    created_at_ms: u64,
96    request: SpawnRequest,
97    #[serde(rename = "decisionCode")]
98    decision_code: Option<SpawnDenyCode>,
99    reason: Option<String>,
100}
101
102#[derive(Clone)]
103pub struct ServerSpawnAgentHook {
104    state: AppState,
105}
106
107#[derive(Debug, Deserialize)]
108struct SpawnAgentToolInput {
109    #[serde(rename = "missionID")]
110    mission_id: Option<String>,
111    #[serde(rename = "parentInstanceID")]
112    parent_instance_id: Option<String>,
113    #[serde(rename = "templateID")]
114    template_id: Option<String>,
115    role: AgentRole,
116    source: Option<SpawnSource>,
117    justification: String,
118    #[serde(rename = "budgetOverride", default)]
119    budget_override: Option<BudgetLimit>,
120}
121
122impl ServerSpawnAgentHook {
123    pub fn new(state: AppState) -> Self {
124        Self { state }
125    }
126}
127
128impl SpawnAgentHook for ServerSpawnAgentHook {
129    fn spawn_agent(
130        &self,
131        ctx: SpawnAgentToolContext,
132    ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>> {
133        let state = self.state.clone();
134        Box::pin(async move {
135            let parsed = serde_json::from_value::<SpawnAgentToolInput>(ctx.args.clone());
136            let input = match parsed {
137                Ok(input) => input,
138                Err(err) => {
139                    return Ok(SpawnAgentToolResult {
140                        output: format!("spawn_agent denied: invalid args ({err})"),
141                        metadata: json!({
142                            "ok": false,
143                            "code": "SPAWN_INVALID_ARGS",
144                            "error": err.to_string(),
145                        }),
146                    });
147                }
148            };
149            let req = SpawnRequest {
150                mission_id: input.mission_id,
151                parent_instance_id: input.parent_instance_id,
152                source: input.source.unwrap_or(SpawnSource::ToolCall),
153                parent_role: None,
154                role: input.role,
155                template_id: input.template_id,
156                justification: input.justification,
157                budget_override: input.budget_override,
158            };
159
160            let event_ctx = SpawnEventContext {
161                session_id: Some(ctx.session_id.as_str()),
162                message_id: Some(ctx.message_id.as_str()),
163                run_id: None,
164            };
165            emit_spawn_requested_with_context(&state, &req, &event_ctx);
166            let result = state.agent_teams.spawn(&state, req.clone()).await;
167            if !result.decision.allowed || result.instance.is_none() {
168                emit_spawn_denied_with_context(&state, &req, &result.decision, &event_ctx);
169                return Ok(SpawnAgentToolResult {
170                    output: result
171                        .decision
172                        .reason
173                        .clone()
174                        .unwrap_or_else(|| "spawn_agent denied".to_string()),
175                    metadata: json!({
176                        "ok": false,
177                        "code": result.decision.code,
178                        "error": result.decision.reason,
179                        "requiresUserApproval": result.decision.requires_user_approval,
180                    }),
181                });
182            }
183            let instance = result.instance.expect("checked is_some");
184            emit_spawn_approved_with_context(&state, &req, &instance, &event_ctx);
185            Ok(SpawnAgentToolResult {
186                output: format!(
187                    "spawned {} as instance {} (session {})",
188                    instance.template_id, instance.instance_id, instance.session_id
189                ),
190                metadata: json!({
191                    "ok": true,
192                    "missionID": instance.mission_id,
193                    "instanceID": instance.instance_id,
194                    "sessionID": instance.session_id,
195                    "runID": instance.run_id,
196                    "status": instance.status,
197                    "skillHash": instance.skill_hash,
198                }),
199            })
200        })
201    }
202}
203
204#[derive(Clone)]
205pub struct ServerToolPolicyHook {
206    state: AppState,
207}
208
209impl ServerToolPolicyHook {
210    pub fn new(state: AppState) -> Self {
211        Self { state }
212    }
213}
214
215impl ToolPolicyHook for ServerToolPolicyHook {
216    fn evaluate_tool(
217        &self,
218        ctx: ToolPolicyContext,
219    ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>> {
220        let state = self.state.clone();
221        Box::pin(async move {
222            let Some(instance) = state
223                .agent_teams
224                .instance_for_session(&ctx.session_id)
225                .await
226            else {
227                return Ok(ToolPolicyDecision {
228                    allowed: true,
229                    reason: None,
230                });
231            };
232            let tool = normalize_tool_name(&ctx.tool);
233            let caps = instance.capabilities.clone();
234            let deny = evaluate_capability_deny(
235                &state,
236                &instance,
237                &tool,
238                &ctx.args,
239                &caps,
240                &ctx.session_id,
241                &ctx.message_id,
242            )
243            .await;
244            if let Some(reason) = deny {
245                state.event_bus.publish(EngineEvent::new(
246                    "agent_team.capability.denied",
247                    json!({
248                        "sessionID": ctx.session_id,
249                        "messageID": ctx.message_id,
250                        "runID": instance.run_id,
251                        "missionID": instance.mission_id,
252                        "instanceID": instance.instance_id,
253                        "tool": tool,
254                        "reason": reason,
255                        "timestampMs": crate::now_ms(),
256                    }),
257                ));
258                return Ok(ToolPolicyDecision {
259                    allowed: false,
260                    reason: Some(reason),
261                });
262            }
263            Ok(ToolPolicyDecision {
264                allowed: true,
265                reason: None,
266            })
267        })
268    }
269}
270
271impl AgentTeamRuntime {
272    pub fn new(audit_path: PathBuf) -> Self {
273        Self {
274            policy: Arc::new(RwLock::new(None)),
275            templates: Arc::new(RwLock::new(HashMap::new())),
276            instances: Arc::new(RwLock::new(HashMap::new())),
277            budgets: Arc::new(RwLock::new(HashMap::new())),
278            mission_budgets: Arc::new(RwLock::new(HashMap::new())),
279            spawn_approvals: Arc::new(RwLock::new(HashMap::new())),
280            loaded_workspace: Arc::new(RwLock::new(None)),
281            audit_path: Arc::new(RwLock::new(audit_path)),
282        }
283    }
284
285    pub async fn set_audit_path(&self, path: PathBuf) {
286        *self.audit_path.write().await = path;
287    }
288
289    pub async fn list_templates(&self) -> Vec<AgentTemplate> {
290        let mut rows = self
291            .templates
292            .read()
293            .await
294            .values()
295            .cloned()
296            .collect::<Vec<_>>();
297        rows.sort_by(|a, b| a.template_id.cmp(&b.template_id));
298        rows
299    }
300
301    pub async fn list_instances(
302        &self,
303        mission_id: Option<&str>,
304        parent_instance_id: Option<&str>,
305        status: Option<AgentInstanceStatus>,
306    ) -> Vec<AgentInstance> {
307        let mut rows = self
308            .instances
309            .read()
310            .await
311            .values()
312            .filter(|instance| {
313                if let Some(mission_id) = mission_id {
314                    if instance.mission_id != mission_id {
315                        return false;
316                    }
317                }
318                if let Some(parent_id) = parent_instance_id {
319                    if instance.parent_instance_id.as_deref() != Some(parent_id) {
320                        return false;
321                    }
322                }
323                if let Some(status) = &status {
324                    if &instance.status != status {
325                        return false;
326                    }
327                }
328                true
329            })
330            .cloned()
331            .collect::<Vec<_>>();
332        rows.sort_by(|a, b| a.instance_id.cmp(&b.instance_id));
333        rows
334    }
335
336    pub async fn list_mission_summaries(&self) -> Vec<AgentMissionSummary> {
337        let instances = self.instances.read().await;
338        let mut by_mission: HashMap<String, AgentMissionSummary> = HashMap::new();
339        for instance in instances.values() {
340            let row = by_mission
341                .entry(instance.mission_id.clone())
342                .or_insert_with(|| AgentMissionSummary {
343                    mission_id: instance.mission_id.clone(),
344                    instance_count: 0,
345                    running_count: 0,
346                    completed_count: 0,
347                    failed_count: 0,
348                    cancelled_count: 0,
349                    queued_count: 0,
350                    token_used_total: 0,
351                    tool_calls_used_total: 0,
352                    steps_used_total: 0,
353                    cost_used_usd_total: 0.0,
354                });
355            row.instance_count = row.instance_count.saturating_add(1);
356            match instance.status {
357                AgentInstanceStatus::Queued => row.queued_count = row.queued_count.saturating_add(1),
358                AgentInstanceStatus::Running => {
359                    row.running_count = row.running_count.saturating_add(1)
360                }
361                AgentInstanceStatus::Completed => {
362                    row.completed_count = row.completed_count.saturating_add(1)
363                }
364                AgentInstanceStatus::Failed => row.failed_count = row.failed_count.saturating_add(1),
365                AgentInstanceStatus::Cancelled => {
366                    row.cancelled_count = row.cancelled_count.saturating_add(1)
367                }
368            }
369            if let Some(usage) = instance
370                .metadata
371                .as_ref()
372                .and_then(|m| m.get("budgetUsage"))
373                .and_then(|u| u.as_object())
374            {
375                row.token_used_total = row.token_used_total.saturating_add(
376                    usage
377                        .get("tokensUsed")
378                        .and_then(|v| v.as_u64())
379                        .unwrap_or(0),
380                );
381                row.tool_calls_used_total = row.tool_calls_used_total.saturating_add(
382                    usage
383                        .get("toolCallsUsed")
384                        .and_then(|v| v.as_u64())
385                        .unwrap_or(0),
386                );
387                row.steps_used_total = row.steps_used_total.saturating_add(
388                    usage
389                        .get("stepsUsed")
390                        .and_then(|v| v.as_u64())
391                        .unwrap_or(0),
392                );
393                row.cost_used_usd_total += usage
394                    .get("costUsedUsd")
395                    .and_then(|v| v.as_f64())
396                    .unwrap_or(0.0);
397            }
398        }
399        let mut rows = by_mission.into_values().collect::<Vec<_>>();
400        rows.sort_by(|a, b| a.mission_id.cmp(&b.mission_id));
401        rows
402    }
403
404    pub async fn instance_for_session(&self, session_id: &str) -> Option<AgentInstance> {
405        self.instances
406            .read()
407            .await
408            .values()
409            .find(|instance| instance.session_id == session_id)
410            .cloned()
411    }
412
413    pub async fn list_spawn_approvals(&self) -> Vec<PendingSpawnApproval> {
414        let mut rows = self
415            .spawn_approvals
416            .read()
417            .await
418            .values()
419            .cloned()
420            .collect::<Vec<_>>();
421        rows.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
422        rows
423    }
424
425    pub async fn ensure_loaded_for_workspace(&self, workspace_root: &str) -> anyhow::Result<()> {
426        let normalized = workspace_root.trim().to_string();
427        let already_loaded = self
428            .loaded_workspace
429            .read()
430            .await
431            .as_ref()
432            .map(|s| s == &normalized)
433            .unwrap_or(false);
434        if already_loaded {
435            return Ok(());
436        }
437
438        let root = PathBuf::from(&normalized);
439        let policy_path = root
440            .join(".tandem")
441            .join("agent-team")
442            .join("spawn-policy.yaml");
443        let templates_dir = root.join(".tandem").join("agent-team").join("templates");
444
445        let mut next_policy = None;
446        if policy_path.exists() {
447            let raw = fs::read_to_string(&policy_path)
448                .await
449                .with_context(|| format!("failed reading {}", policy_path.display()))?;
450            let parsed = serde_yaml::from_str::<SpawnPolicy>(&raw)
451                .with_context(|| format!("failed parsing {}", policy_path.display()))?;
452            next_policy = Some(parsed);
453        }
454
455        let mut next_templates = HashMap::new();
456        if templates_dir.exists() {
457            let mut entries = fs::read_dir(&templates_dir).await?;
458            while let Some(entry) = entries.next_entry().await? {
459                let path = entry.path();
460                if !path.is_file() {
461                    continue;
462                }
463                let ext = path
464                    .extension()
465                    .and_then(|v| v.to_str())
466                    .unwrap_or_default()
467                    .to_ascii_lowercase();
468                if ext != "yaml" && ext != "yml" && ext != "json" {
469                    continue;
470                }
471                let raw = fs::read_to_string(&path).await?;
472                let template = serde_yaml::from_str::<AgentTemplate>(&raw)
473                    .with_context(|| format!("failed parsing {}", path.display()))?;
474                next_templates.insert(template.template_id.clone(), template);
475            }
476        }
477
478        *self.policy.write().await = next_policy;
479        *self.templates.write().await = next_templates;
480        *self.loaded_workspace.write().await = Some(normalized);
481        Ok(())
482    }
483
484    pub async fn spawn(&self, state: &AppState, mut req: SpawnRequest) -> SpawnResult {
485        let workspace_root = state.workspace_index.snapshot().await.root;
486        if let Err(err) = self.ensure_loaded_for_workspace(&workspace_root).await {
487            return SpawnResult {
488                decision: SpawnDecision {
489                    allowed: false,
490                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
491                    reason: Some(format!("spawn policy load failed: {}", err)),
492                    requires_user_approval: false,
493                },
494                instance: None,
495            };
496        }
497
498        let Some(policy) = self.policy.read().await.clone() else {
499            return SpawnResult {
500                decision: SpawnDecision {
501                    allowed: false,
502                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
503                    reason: Some("spawn policy file missing".to_string()),
504                    requires_user_approval: false,
505                },
506                instance: None,
507            };
508        };
509
510        let template = {
511            let templates = self.templates.read().await;
512            req.template_id
513                .as_deref()
514                .and_then(|template_id| templates.get(template_id).cloned())
515        };
516        if req.template_id.is_none() {
517            if let Some(found) = self
518                .templates
519                .read()
520                .await
521                .values()
522                .find(|t| t.role == req.role)
523                .cloned()
524            {
525                req.template_id = Some(found.template_id.clone());
526            }
527        }
528        let template = if template.is_some() {
529            template
530        } else {
531            let templates = self.templates.read().await;
532            req.template_id
533                .as_deref()
534                .and_then(|id| templates.get(id).cloned())
535        };
536
537        if req.parent_role.is_none() {
538            if let Some(parent_id) = req.parent_instance_id.as_deref() {
539                let instances = self.instances.read().await;
540                req.parent_role = instances
541                    .get(parent_id)
542                    .map(|instance| instance.role.clone());
543            }
544        }
545
546        let instances = self.instances.read().await;
547        let total_agents = instances.len();
548        let running_agents = instances
549            .values()
550            .filter(|instance| instance.status == AgentInstanceStatus::Running)
551            .count();
552        drop(instances);
553
554        let decision = policy.evaluate(&req, total_agents, running_agents, template.as_ref());
555        if !decision.allowed {
556            if decision.requires_user_approval {
557                self.queue_spawn_approval(&req, &decision).await;
558            }
559            return SpawnResult {
560                decision,
561                instance: None,
562            };
563        }
564
565        let mission_id = req
566            .mission_id
567            .clone()
568            .unwrap_or_else(|| "mission-default".to_string());
569
570        if let Some(reason) = self
571            .mission_budget_exceeded_reason(&policy, &mission_id)
572            .await
573        {
574            return SpawnResult {
575                decision: SpawnDecision {
576                    allowed: false,
577                    code: Some(SpawnDenyCode::SpawnMissionBudgetExceeded),
578                    reason: Some(reason),
579                    requires_user_approval: false,
580                },
581                instance: None,
582            };
583        }
584
585        let template = template.unwrap_or_else(|| AgentTemplate {
586            template_id: "default-template".to_string(),
587            role: req.role.clone(),
588            system_prompt: None,
589            skills: Vec::new(),
590            default_budget: BudgetLimit::default(),
591            capabilities: Default::default(),
592        });
593
594        let skill_hash = match compute_skill_hash(&workspace_root, &template, &policy).await {
595            Ok(hash) => hash,
596            Err(err) => {
597                let lowered = err.to_ascii_lowercase();
598                let code = if lowered.contains("pinned hash mismatch") {
599                    SpawnDenyCode::SpawnSkillHashMismatch
600                } else if lowered.contains("skill source denied") {
601                    SpawnDenyCode::SpawnSkillSourceDenied
602                } else {
603                    SpawnDenyCode::SpawnRequiredSkillMissing
604                };
605                return SpawnResult {
606                    decision: SpawnDecision {
607                        allowed: false,
608                        code: Some(code),
609                        reason: Some(err),
610                        requires_user_approval: false,
611                    },
612                    instance: None,
613                };
614            }
615        };
616
617        let parent_snapshot = {
618            let instances = self.instances.read().await;
619            req.parent_instance_id
620                .as_deref()
621                .and_then(|id| instances.get(id).cloned())
622        };
623        let parent_usage = if let Some(parent_id) = req.parent_instance_id.as_deref() {
624            self.budgets.read().await.get(parent_id).cloned()
625        } else {
626            None
627        };
628
629        let budget = resolve_budget(
630            &policy,
631            parent_snapshot,
632            parent_usage,
633            &template,
634            req.budget_override.clone(),
635            &req.role,
636        );
637
638        let mut session = Session::new(
639            Some(format!("Agent Team {}", template.template_id)),
640            Some(workspace_root.clone()),
641        );
642        session.workspace_root = Some(workspace_root.clone());
643        let session_id = session.id.clone();
644        if let Err(err) = state.storage.save_session(session).await {
645            return SpawnResult {
646                decision: SpawnDecision {
647                    allowed: false,
648                    code: Some(SpawnDenyCode::SpawnPolicyMissing),
649                    reason: Some(format!("failed creating child session: {}", err)),
650                    requires_user_approval: false,
651                },
652                instance: None,
653            };
654        }
655
656        let instance = AgentInstance {
657            instance_id: format!("ins_{}", Uuid::new_v4().simple()),
658            mission_id: mission_id.clone(),
659            parent_instance_id: req.parent_instance_id.clone(),
660            role: template.role.clone(),
661            template_id: template.template_id.clone(),
662            session_id: session_id.clone(),
663            run_id: None,
664            status: AgentInstanceStatus::Running,
665            budget,
666            skill_hash: skill_hash.clone(),
667            capabilities: template.capabilities.clone(),
668            metadata: Some(json!({
669                "source": req.source,
670                "justification": req.justification,
671            })),
672        };
673
674        self.instances
675            .write()
676            .await
677            .insert(instance.instance_id.clone(), instance.clone());
678        self.budgets.write().await.insert(
679            instance.instance_id.clone(),
680            InstanceBudgetState {
681                started_at: Some(Instant::now()),
682                ..InstanceBudgetState::default()
683            },
684        );
685        let _ = self.append_audit("spawn.approved", &instance).await;
686
687        SpawnResult {
688            decision: SpawnDecision {
689                allowed: true,
690                code: None,
691                reason: None,
692                requires_user_approval: false,
693            },
694            instance: Some(instance),
695        }
696    }
697
698    pub async fn cancel_instance(
699        &self,
700        state: &AppState,
701        instance_id: &str,
702        reason: &str,
703    ) -> Option<AgentInstance> {
704        let mut instances = self.instances.write().await;
705        let instance = instances.get_mut(instance_id)?;
706        if matches!(
707            instance.status,
708            AgentInstanceStatus::Completed
709                | AgentInstanceStatus::Failed
710                | AgentInstanceStatus::Cancelled
711        ) {
712            return Some(instance.clone());
713        }
714        instance.status = AgentInstanceStatus::Cancelled;
715        let snapshot = instance.clone();
716        drop(instances);
717        let _ = state.cancellations.cancel(&snapshot.session_id).await;
718        let _ = self.append_audit("instance.cancelled", &snapshot).await;
719        emit_instance_cancelled(state, &snapshot, reason);
720        Some(snapshot)
721    }
722
723    async fn queue_spawn_approval(&self, req: &SpawnRequest, decision: &SpawnDecision) {
724        let approval = PendingSpawnApproval {
725            approval_id: format!("spawn_{}", Uuid::new_v4().simple()),
726            created_at_ms: crate::now_ms(),
727            request: req.clone(),
728            decision_code: decision.code.clone(),
729            reason: decision.reason.clone(),
730        };
731        self.spawn_approvals
732            .write()
733            .await
734            .insert(approval.approval_id.clone(), approval);
735    }
736
737    async fn mission_budget_exceeded_reason(
738        &self,
739        policy: &SpawnPolicy,
740        mission_id: &str,
741    ) -> Option<String> {
742        let Some(limit) = policy.mission_total_budget.as_ref() else {
743            return None;
744        };
745        let usage = self
746            .mission_budgets
747            .read()
748            .await
749            .get(mission_id)
750            .cloned()
751            .unwrap_or_default();
752        if let Some(max) = limit.max_tokens {
753            if usage.tokens_used >= max {
754                return Some(format!("mission max_tokens exhausted ({}/{})", usage.tokens_used, max));
755            }
756        }
757        if let Some(max) = limit.max_steps {
758            if usage.steps_used >= u64::from(max) {
759                return Some(format!("mission max_steps exhausted ({}/{})", usage.steps_used, max));
760            }
761        }
762        if let Some(max) = limit.max_tool_calls {
763            if usage.tool_calls_used >= u64::from(max) {
764                return Some(format!(
765                    "mission max_tool_calls exhausted ({}/{})",
766                    usage.tool_calls_used, max
767                ));
768            }
769        }
770        if let Some(max) = limit.max_cost_usd {
771            if usage.cost_used_usd >= max {
772                return Some(format!(
773                    "mission max_cost_usd exhausted ({:.6}/{:.6})",
774                    usage.cost_used_usd, max
775                ));
776            }
777        }
778        None
779    }
780
781    pub async fn cancel_mission(&self, state: &AppState, mission_id: &str, reason: &str) -> usize {
782        let instance_ids = self
783            .instances
784            .read()
785            .await
786            .values()
787            .filter(|instance| instance.mission_id == mission_id)
788            .map(|instance| instance.instance_id.clone())
789            .collect::<Vec<_>>();
790        let mut count = 0usize;
791        for instance_id in instance_ids {
792            if self
793                .cancel_instance(state, &instance_id, reason)
794                .await
795                .is_some()
796            {
797                count = count.saturating_add(1);
798            }
799        }
800        count
801    }
802
803    async fn mark_instance_terminal(
804        &self,
805        state: &AppState,
806        instance_id: &str,
807        status: AgentInstanceStatus,
808    ) -> Option<AgentInstance> {
809        let mut instances = self.instances.write().await;
810        let instance = instances.get_mut(instance_id)?;
811        if matches!(
812            instance.status,
813            AgentInstanceStatus::Completed
814                | AgentInstanceStatus::Failed
815                | AgentInstanceStatus::Cancelled
816        ) {
817            return Some(instance.clone());
818        }
819        instance.status = status.clone();
820        let snapshot = instance.clone();
821        drop(instances);
822        match status {
823            AgentInstanceStatus::Completed => emit_instance_completed(state, &snapshot),
824            AgentInstanceStatus::Failed => emit_instance_failed(state, &snapshot),
825            _ => {}
826        }
827        Some(snapshot)
828    }
829
830    pub async fn handle_engine_event(&self, state: &AppState, event: &EngineEvent) {
831        let Some(session_id) = extract_session_id(event) else {
832            return;
833        };
834        let Some(instance_id) = self.instance_id_for_session(&session_id).await else {
835            return;
836        };
837        if event.event_type == "provider.usage" {
838            let total_tokens = event
839                .properties
840                .get("totalTokens")
841                .and_then(|v| v.as_u64())
842                .unwrap_or(0);
843            let cost_used_usd = event
844                .properties
845                .get("costUsd")
846                .and_then(|v| v.as_f64())
847                .unwrap_or(0.0);
848            if total_tokens > 0 {
849                let exhausted = self
850                    .apply_exact_token_usage(state, &instance_id, total_tokens, cost_used_usd)
851                    .await;
852                if exhausted {
853                    let _ = self
854                        .cancel_instance(state, &instance_id, "budget exhausted")
855                        .await;
856                }
857            }
858            return;
859        }
860        let mut delta_tokens = 0u64;
861        let mut delta_steps = 0u32;
862        let mut delta_tool_calls = 0u32;
863        if event.event_type == "message.part.updated" {
864            if let Some(part) = event.properties.get("part") {
865                let part_type = part.get("type").and_then(|v| v.as_str()).unwrap_or("");
866                if part_type == "tool-invocation" {
867                    delta_tool_calls = 1;
868                } else if part_type == "text" {
869                    let delta = event
870                        .properties
871                        .get("delta")
872                        .and_then(|v| v.as_str())
873                        .unwrap_or("");
874                    if !delta.is_empty() {
875                        delta_tokens = estimate_tokens(delta);
876                    }
877                }
878            }
879        } else if event.event_type == "session.run.finished" {
880            delta_steps = 1;
881            let run_status = event
882                .properties
883                .get("status")
884                .and_then(|v| v.as_str())
885                .unwrap_or("")
886                .to_ascii_lowercase();
887            if run_status == "completed" {
888                let _ = self
889                    .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Completed)
890                    .await;
891            } else if run_status == "failed" || run_status == "error" {
892                let _ = self
893                    .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Failed)
894                    .await;
895            }
896        }
897        if delta_tokens == 0 && delta_steps == 0 && delta_tool_calls == 0 {
898            return;
899        }
900        let exhausted = self
901            .apply_budget_delta(
902                state,
903                &instance_id,
904                delta_tokens,
905                delta_steps,
906                delta_tool_calls,
907            )
908            .await;
909        if exhausted {
910            let _ = self
911                .cancel_instance(state, &instance_id, "budget exhausted")
912                .await;
913        }
914    }
915
916    async fn instance_id_for_session(&self, session_id: &str) -> Option<String> {
917        self.instances
918            .read()
919            .await
920            .values()
921            .find(|instance| instance.session_id == session_id)
922            .map(|instance| instance.instance_id.clone())
923    }
924
925    async fn apply_budget_delta(
926        &self,
927        state: &AppState,
928        instance_id: &str,
929        delta_tokens: u64,
930        delta_steps: u32,
931        delta_tool_calls: u32,
932    ) -> bool {
933        let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
934            enabled: false,
935            require_justification: false,
936            max_agents: None,
937            max_concurrent: None,
938            child_budget_percent_of_parent_remaining: None,
939            mission_total_budget: None,
940            cost_per_1k_tokens_usd: None,
941            spawn_edges: HashMap::new(),
942            required_skills: HashMap::new(),
943            role_defaults: HashMap::new(),
944            skill_sources: Default::default(),
945        });
946        let mut budgets = self.budgets.write().await;
947        let Some(usage) = budgets.get_mut(instance_id) else {
948            return false;
949        };
950        if usage.exhausted {
951            return true;
952        }
953        let prev_cost_used_usd = usage.cost_used_usd;
954        usage.tokens_used = usage.tokens_used.saturating_add(delta_tokens);
955        usage.steps_used = usage.steps_used.saturating_add(delta_steps);
956        usage.tool_calls_used = usage.tool_calls_used.saturating_add(delta_tool_calls);
957        if let Some(rate) = policy.cost_per_1k_tokens_usd {
958            usage.cost_used_usd += (delta_tokens as f64 / 1000.0) * rate;
959        }
960        let elapsed_ms = usage
961            .started_at
962            .map(|started| started.elapsed().as_millis() as u64)
963            .unwrap_or(0);
964
965        let mut exhausted_reason: Option<&'static str> = None;
966        let mut snapshot: Option<AgentInstance> = None;
967        {
968            let mut instances = self.instances.write().await;
969            if let Some(instance) = instances.get_mut(instance_id) {
970                instance.metadata = Some(merge_metadata_usage(
971                    instance.metadata.take(),
972                    usage.tokens_used,
973                    usage.steps_used,
974                    usage.tool_calls_used,
975                    usage.cost_used_usd,
976                    elapsed_ms,
977                ));
978                if let Some(limit) = instance.budget.max_tokens {
979                    if usage.tokens_used >= limit {
980                        exhausted_reason = Some("max_tokens");
981                    }
982                }
983                if exhausted_reason.is_none() {
984                    if let Some(limit) = instance.budget.max_steps {
985                        if usage.steps_used >= limit {
986                            exhausted_reason = Some("max_steps");
987                        }
988                    }
989                }
990                if exhausted_reason.is_none() {
991                    if let Some(limit) = instance.budget.max_tool_calls {
992                        if usage.tool_calls_used >= limit {
993                            exhausted_reason = Some("max_tool_calls");
994                        }
995                    }
996                }
997                if exhausted_reason.is_none() {
998                    if let Some(limit) = instance.budget.max_duration_ms {
999                        if elapsed_ms >= limit {
1000                            exhausted_reason = Some("max_duration_ms");
1001                        }
1002                    }
1003                }
1004                if exhausted_reason.is_none() {
1005                    if let Some(limit) = instance.budget.max_cost_usd {
1006                        if usage.cost_used_usd >= limit {
1007                            exhausted_reason = Some("max_cost_usd");
1008                        }
1009                    }
1010                }
1011                snapshot = Some(instance.clone());
1012            }
1013        }
1014        let Some(instance) = snapshot else {
1015            return false;
1016        };
1017        emit_budget_usage(
1018            state,
1019            &instance,
1020            usage.tokens_used,
1021            usage.steps_used,
1022            usage.tool_calls_used,
1023            usage.cost_used_usd,
1024            elapsed_ms,
1025        );
1026        let mission_exhausted = self
1027            .apply_mission_budget_delta(
1028                state,
1029                &instance.mission_id,
1030                delta_tokens,
1031                u64::from(delta_steps),
1032                u64::from(delta_tool_calls),
1033                usage.cost_used_usd - prev_cost_used_usd,
1034                &policy,
1035            )
1036            .await;
1037        if mission_exhausted {
1038            usage.exhausted = true;
1039            let _ = self
1040                .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1041                .await;
1042            return true;
1043        }
1044        if let Some(reason) = exhausted_reason {
1045            usage.exhausted = true;
1046            emit_budget_exhausted(
1047                state,
1048                &instance,
1049                reason,
1050                usage.tokens_used,
1051                usage.steps_used,
1052                usage.tool_calls_used,
1053                usage.cost_used_usd,
1054                elapsed_ms,
1055            );
1056            return true;
1057        }
1058        false
1059    }
1060
1061    async fn apply_exact_token_usage(
1062        &self,
1063        state: &AppState,
1064        instance_id: &str,
1065        total_tokens: u64,
1066        cost_used_usd: f64,
1067    ) -> bool {
1068        let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1069            enabled: false,
1070            require_justification: false,
1071            max_agents: None,
1072            max_concurrent: None,
1073            child_budget_percent_of_parent_remaining: None,
1074            mission_total_budget: None,
1075            cost_per_1k_tokens_usd: None,
1076            spawn_edges: HashMap::new(),
1077            required_skills: HashMap::new(),
1078            role_defaults: HashMap::new(),
1079            skill_sources: Default::default(),
1080        });
1081        let mut budgets = self.budgets.write().await;
1082        let Some(usage) = budgets.get_mut(instance_id) else {
1083            return false;
1084        };
1085        if usage.exhausted {
1086            return true;
1087        }
1088        let prev_tokens = usage.tokens_used;
1089        let prev_cost_used_usd = usage.cost_used_usd;
1090        usage.tokens_used = usage.tokens_used.max(total_tokens);
1091        if cost_used_usd > 0.0 {
1092            usage.cost_used_usd = usage.cost_used_usd.max(cost_used_usd);
1093        } else if let Some(rate) = policy.cost_per_1k_tokens_usd {
1094            let delta = usage.tokens_used.saturating_sub(prev_tokens);
1095            usage.cost_used_usd += (delta as f64 / 1000.0) * rate;
1096        }
1097        let elapsed_ms = usage
1098            .started_at
1099            .map(|started| started.elapsed().as_millis() as u64)
1100            .unwrap_or(0);
1101        let mut exhausted_reason: Option<&'static str> = None;
1102        let mut snapshot: Option<AgentInstance> = None;
1103        {
1104            let mut instances = self.instances.write().await;
1105            if let Some(instance) = instances.get_mut(instance_id) {
1106                instance.metadata = Some(merge_metadata_usage(
1107                    instance.metadata.take(),
1108                    usage.tokens_used,
1109                    usage.steps_used,
1110                    usage.tool_calls_used,
1111                    usage.cost_used_usd,
1112                    elapsed_ms,
1113                ));
1114                if let Some(limit) = instance.budget.max_tokens {
1115                    if usage.tokens_used >= limit {
1116                        exhausted_reason = Some("max_tokens");
1117                    }
1118                }
1119                if exhausted_reason.is_none() {
1120                    if let Some(limit) = instance.budget.max_cost_usd {
1121                        if usage.cost_used_usd >= limit {
1122                            exhausted_reason = Some("max_cost_usd");
1123                        }
1124                    }
1125                }
1126                snapshot = Some(instance.clone());
1127            }
1128        }
1129        let Some(instance) = snapshot else {
1130            return false;
1131        };
1132        emit_budget_usage(
1133            state,
1134            &instance,
1135            usage.tokens_used,
1136            usage.steps_used,
1137            usage.tool_calls_used,
1138            usage.cost_used_usd,
1139            elapsed_ms,
1140        );
1141        let mission_exhausted = self
1142            .apply_mission_budget_delta(
1143                state,
1144                &instance.mission_id,
1145                usage.tokens_used.saturating_sub(prev_tokens),
1146                0,
1147                0,
1148                usage.cost_used_usd - prev_cost_used_usd,
1149                &policy,
1150            )
1151            .await;
1152        if mission_exhausted {
1153            usage.exhausted = true;
1154            let _ = self
1155                .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1156                .await;
1157            return true;
1158        }
1159        if let Some(reason) = exhausted_reason {
1160            usage.exhausted = true;
1161            emit_budget_exhausted(
1162                state,
1163                &instance,
1164                reason,
1165                usage.tokens_used,
1166                usage.steps_used,
1167                usage.tool_calls_used,
1168                usage.cost_used_usd,
1169                elapsed_ms,
1170            );
1171            return true;
1172        }
1173        false
1174    }
1175
1176    async fn append_audit(&self, action: &str, instance: &AgentInstance) -> anyhow::Result<()> {
1177        let path = self.audit_path.read().await.clone();
1178        if let Some(parent) = path.parent() {
1179            fs::create_dir_all(parent).await?;
1180        }
1181        let row = json!({
1182            "action": action,
1183            "missionID": instance.mission_id,
1184            "instanceID": instance.instance_id,
1185            "parentInstanceID": instance.parent_instance_id,
1186            "role": instance.role,
1187            "templateID": instance.template_id,
1188            "sessionID": instance.session_id,
1189            "skillHash": instance.skill_hash,
1190            "timestampMs": crate::now_ms(),
1191        });
1192        let mut existing = if path.exists() {
1193            fs::read_to_string(&path).await.unwrap_or_default()
1194        } else {
1195            String::new()
1196        };
1197        existing.push_str(&serde_json::to_string(&row)?);
1198        existing.push('\n');
1199        fs::write(path, existing).await?;
1200        Ok(())
1201    }
1202
1203    async fn apply_mission_budget_delta(
1204        &self,
1205        state: &AppState,
1206        mission_id: &str,
1207        delta_tokens: u64,
1208        delta_steps: u64,
1209        delta_tool_calls: u64,
1210        delta_cost_used_usd: f64,
1211        policy: &SpawnPolicy,
1212    ) -> bool {
1213        let mut budgets = self.mission_budgets.write().await;
1214        let row = budgets.entry(mission_id.to_string()).or_default();
1215        row.tokens_used = row.tokens_used.saturating_add(delta_tokens);
1216        row.steps_used = row.steps_used.saturating_add(delta_steps);
1217        row.tool_calls_used = row.tool_calls_used.saturating_add(delta_tool_calls);
1218        row.cost_used_usd += delta_cost_used_usd.max(0.0);
1219        if row.exhausted {
1220            return true;
1221        }
1222        let Some(limit) = policy.mission_total_budget.as_ref() else {
1223            return false;
1224        };
1225        let mut exhausted_by: Option<&'static str> = None;
1226        if let Some(max) = limit.max_tokens {
1227            if row.tokens_used >= max {
1228                exhausted_by = Some("mission_max_tokens");
1229            }
1230        }
1231        if exhausted_by.is_none() {
1232            if let Some(max) = limit.max_steps {
1233                if row.steps_used >= u64::from(max) {
1234                    exhausted_by = Some("mission_max_steps");
1235                }
1236            }
1237        }
1238        if exhausted_by.is_none() {
1239            if let Some(max) = limit.max_tool_calls {
1240                if row.tool_calls_used >= u64::from(max) {
1241                    exhausted_by = Some("mission_max_tool_calls");
1242                }
1243            }
1244        }
1245        if exhausted_by.is_none() {
1246            if let Some(max) = limit.max_cost_usd {
1247                if row.cost_used_usd >= max {
1248                    exhausted_by = Some("mission_max_cost_usd");
1249                }
1250            }
1251        }
1252        if let Some(exhausted_by) = exhausted_by {
1253            row.exhausted = true;
1254            emit_mission_budget_exhausted(
1255                state,
1256                mission_id,
1257                exhausted_by,
1258                row.tokens_used,
1259                row.steps_used,
1260                row.tool_calls_used,
1261                row.cost_used_usd,
1262            );
1263            return true;
1264        }
1265        false
1266    }
1267
1268    pub async fn set_for_test(
1269        &self,
1270        workspace_root: Option<String>,
1271        policy: Option<SpawnPolicy>,
1272        templates: Vec<AgentTemplate>,
1273    ) {
1274        *self.policy.write().await = policy;
1275        let mut by_id = HashMap::new();
1276        for template in templates {
1277            by_id.insert(template.template_id.clone(), template);
1278        }
1279        *self.templates.write().await = by_id;
1280        self.instances.write().await.clear();
1281        self.budgets.write().await.clear();
1282        self.mission_budgets.write().await.clear();
1283        self.spawn_approvals.write().await.clear();
1284        *self.loaded_workspace.write().await = workspace_root;
1285    }
1286}
1287
1288fn resolve_budget(
1289    policy: &SpawnPolicy,
1290    parent_instance: Option<AgentInstance>,
1291    parent_usage: Option<InstanceBudgetState>,
1292    template: &AgentTemplate,
1293    override_budget: Option<BudgetLimit>,
1294    role: &AgentRole,
1295) -> BudgetLimit {
1296    let role_default = policy.role_defaults.get(role).cloned().unwrap_or_default();
1297    let mut chosen = merge_budget(
1298        merge_budget(role_default, template.default_budget.clone()),
1299        override_budget.unwrap_or_default(),
1300    );
1301
1302    if let Some(parent) = parent_instance {
1303        let usage = parent_usage.unwrap_or_default();
1304        if let Some(pct) = policy.child_budget_percent_of_parent_remaining {
1305            if pct > 0 {
1306                chosen.max_tokens = cap_budget_remaining_u64(
1307                    chosen.max_tokens,
1308                    parent.budget.max_tokens,
1309                    usage.tokens_used,
1310                    pct,
1311                );
1312                chosen.max_steps = cap_budget_remaining_u32(
1313                    chosen.max_steps,
1314                    parent.budget.max_steps,
1315                    usage.steps_used,
1316                    pct,
1317                );
1318                chosen.max_tool_calls = cap_budget_remaining_u32(
1319                    chosen.max_tool_calls,
1320                    parent.budget.max_tool_calls,
1321                    usage.tool_calls_used,
1322                    pct,
1323                );
1324                chosen.max_duration_ms = cap_budget_remaining_u64(
1325                    chosen.max_duration_ms,
1326                    parent.budget.max_duration_ms,
1327                    usage
1328                        .started_at
1329                        .map(|started| started.elapsed().as_millis() as u64)
1330                        .unwrap_or(0),
1331                    pct,
1332                );
1333                chosen.max_cost_usd = cap_budget_remaining_f64(
1334                    chosen.max_cost_usd,
1335                    parent.budget.max_cost_usd,
1336                    usage.cost_used_usd,
1337                    pct,
1338                );
1339            }
1340        }
1341    }
1342    chosen
1343}
1344
1345fn merge_budget(base: BudgetLimit, overlay: BudgetLimit) -> BudgetLimit {
1346    BudgetLimit {
1347        max_tokens: overlay.max_tokens.or(base.max_tokens),
1348        max_steps: overlay.max_steps.or(base.max_steps),
1349        max_tool_calls: overlay.max_tool_calls.or(base.max_tool_calls),
1350        max_duration_ms: overlay.max_duration_ms.or(base.max_duration_ms),
1351        max_cost_usd: overlay.max_cost_usd.or(base.max_cost_usd),
1352    }
1353}
1354
1355fn cap_budget_remaining_u64(
1356    child: Option<u64>,
1357    parent_limit: Option<u64>,
1358    parent_used: u64,
1359    pct: u8,
1360) -> Option<u64> {
1361    match (child, parent_limit) {
1362        (Some(child), Some(parent_limit)) => {
1363            let remaining = parent_limit.saturating_sub(parent_used);
1364            Some(child.min(remaining.saturating_mul(pct as u64) / 100))
1365        }
1366        (None, Some(parent_limit)) => {
1367            let remaining = parent_limit.saturating_sub(parent_used);
1368            Some(remaining.saturating_mul(pct as u64) / 100)
1369        }
1370        (Some(child), None) => Some(child),
1371        (None, None) => None,
1372    }
1373}
1374
1375fn cap_budget_remaining_u32(
1376    child: Option<u32>,
1377    parent_limit: Option<u32>,
1378    parent_used: u32,
1379    pct: u8,
1380) -> Option<u32> {
1381    match (child, parent_limit) {
1382        (Some(child), Some(parent_limit)) => {
1383            let remaining = parent_limit.saturating_sub(parent_used);
1384            Some(child.min(remaining.saturating_mul(pct as u32) / 100))
1385        }
1386        (None, Some(parent_limit)) => {
1387            let remaining = parent_limit.saturating_sub(parent_used);
1388            Some(remaining.saturating_mul(pct as u32) / 100)
1389        }
1390        (Some(child), None) => Some(child),
1391        (None, None) => None,
1392    }
1393}
1394
1395fn cap_budget_remaining_f64(
1396    child: Option<f64>,
1397    parent_limit: Option<f64>,
1398    parent_used: f64,
1399    pct: u8,
1400) -> Option<f64> {
1401    match (child, parent_limit) {
1402        (Some(child), Some(parent_limit)) => {
1403            let remaining = (parent_limit - parent_used).max(0.0);
1404            Some(child.min(remaining * f64::from(pct) / 100.0))
1405        }
1406        (None, Some(parent_limit)) => {
1407            let remaining = (parent_limit - parent_used).max(0.0);
1408            Some(remaining * f64::from(pct) / 100.0)
1409        }
1410        (Some(child), None) => Some(child),
1411        (None, None) => None,
1412    }
1413}
1414
1415async fn compute_skill_hash(
1416    workspace_root: &str,
1417    template: &AgentTemplate,
1418    policy: &SpawnPolicy,
1419) -> Result<String, String> {
1420    use sha2::{Digest, Sha256};
1421    let mut rows = Vec::new();
1422    let skill_service = SkillService::for_workspace(Some(PathBuf::from(workspace_root)));
1423    for skill in &template.skills {
1424        if let Some(path) = skill.path.as_deref() {
1425            validate_skill_source(skill.id.as_deref(), Some(path), policy)?;
1426            let skill_path = Path::new(workspace_root).join(path);
1427            let raw = fs::read_to_string(&skill_path)
1428                .await
1429                .map_err(|_| format!("missing required skill path `{}`", skill_path.display()))?;
1430            let digest = hash_hex(raw.as_bytes());
1431            validate_pinned_hash(skill.id.as_deref(), Some(path), &digest, policy)?;
1432            rows.push(format!("path:{}:{}", path, digest));
1433        } else if let Some(id) = skill.id.as_deref() {
1434            validate_skill_source(Some(id), None, policy)?;
1435            let loaded = skill_service
1436                .load_skill(id)
1437                .map_err(|err| format!("failed loading skill `{id}`: {err}"))?;
1438            let Some(loaded) = loaded else {
1439                return Err(format!("missing required skill id `{id}`"));
1440            };
1441            let digest = hash_hex(loaded.content.as_bytes());
1442            validate_pinned_hash(Some(id), None, &digest, policy)?;
1443            rows.push(format!("id:{}:{}", id, digest));
1444        }
1445    }
1446    rows.sort();
1447    let mut hasher = Sha256::new();
1448    for row in rows {
1449        hasher.update(row.as_bytes());
1450        hasher.update(b"\n");
1451    }
1452    let digest = hasher.finalize();
1453    Ok(format!("sha256:{}", hash_hex(digest.as_slice())))
1454}
1455
1456fn validate_skill_source(
1457    id: Option<&str>,
1458    path: Option<&str>,
1459    policy: &SpawnPolicy,
1460) -> Result<(), String> {
1461    use tandem_orchestrator::SkillSourceMode;
1462    match policy.skill_sources.mode {
1463        SkillSourceMode::Any => Ok(()),
1464        SkillSourceMode::ProjectOnly => {
1465            if id.is_some() {
1466                return Err("skill source denied: project_only forbids skill IDs".to_string());
1467            }
1468            let Some(path) = path else {
1469                return Err("skill source denied: project_only requires skill path".to_string());
1470            };
1471            let p = PathBuf::from(path);
1472            if p.is_absolute() {
1473                return Err("skill source denied: absolute skill paths are forbidden".to_string());
1474            }
1475            Ok(())
1476        }
1477        SkillSourceMode::Allowlist => {
1478            if let Some(id) = id {
1479                if policy.skill_sources.allowlist_ids.iter().any(|v| v == id) {
1480                    return Ok(());
1481                }
1482            }
1483            if let Some(path) = path {
1484                if policy
1485                    .skill_sources
1486                    .allowlist_paths
1487                    .iter()
1488                    .any(|v| v == path)
1489                {
1490                    return Ok(());
1491                }
1492            }
1493            Err("skill source denied: not present in allowlist".to_string())
1494        }
1495    }
1496}
1497
1498fn validate_pinned_hash(
1499    id: Option<&str>,
1500    path: Option<&str>,
1501    digest: &str,
1502    policy: &SpawnPolicy,
1503) -> Result<(), String> {
1504    let by_id = id.and_then(|id| policy.skill_sources.pinned_hashes.get(&format!("id:{id}")));
1505    let by_path =
1506        path.and_then(|path| policy.skill_sources.pinned_hashes.get(&format!("path:{path}")));
1507    let expected = by_id.or(by_path);
1508    if let Some(expected) = expected {
1509        let normalized = expected.strip_prefix("sha256:").unwrap_or(expected);
1510        if normalized != digest {
1511            return Err("pinned hash mismatch for skill reference".to_string());
1512        }
1513    }
1514    Ok(())
1515}
1516
1517fn hash_hex(bytes: &[u8]) -> String {
1518    let mut out = String::with_capacity(bytes.len() * 2);
1519    for byte in bytes {
1520        use std::fmt::Write as _;
1521        let _ = write!(&mut out, "{:02x}", byte);
1522    }
1523    out
1524}
1525
1526fn estimate_tokens(text: &str) -> u64 {
1527    let chars = text.chars().count() as u64;
1528    (chars / 4).max(1)
1529}
1530
1531fn extract_session_id(event: &EngineEvent) -> Option<String> {
1532    event
1533        .properties
1534        .get("sessionID")
1535        .and_then(|v| v.as_str())
1536        .map(|v| v.to_string())
1537        .or_else(|| {
1538            event
1539                .properties
1540                .get("part")
1541                .and_then(|v| v.get("sessionID"))
1542                .and_then(|v| v.as_str())
1543                .map(|v| v.to_string())
1544        })
1545}
1546
1547fn merge_metadata_usage(
1548    metadata: Option<Value>,
1549    tokens_used: u64,
1550    steps_used: u32,
1551    tool_calls_used: u32,
1552    cost_used_usd: f64,
1553    elapsed_ms: u64,
1554) -> Value {
1555    let mut base = metadata
1556        .and_then(|v| v.as_object().cloned())
1557        .unwrap_or_default();
1558    base.insert(
1559        "budgetUsage".to_string(),
1560        json!({
1561            "tokensUsed": tokens_used,
1562            "stepsUsed": steps_used,
1563            "toolCallsUsed": tool_calls_used,
1564            "costUsedUsd": cost_used_usd,
1565            "elapsedMs": elapsed_ms
1566        }),
1567    );
1568    Value::Object(base)
1569}
1570
1571fn normalize_tool_name(name: &str) -> String {
1572    match name.trim().to_lowercase().replace('-', "_").as_str() {
1573        "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
1574        other => other.to_string(),
1575    }
1576}
1577
1578async fn evaluate_capability_deny(
1579    state: &AppState,
1580    instance: &AgentInstance,
1581    tool: &str,
1582    args: &Value,
1583    caps: &tandem_orchestrator::CapabilitySpec,
1584    session_id: &str,
1585    message_id: &str,
1586) -> Option<String> {
1587    if !caps.tool_denylist.is_empty()
1588        && caps
1589            .tool_denylist
1590            .iter()
1591            .any(|name| normalize_tool_name(name) == *tool)
1592    {
1593        return Some(format!("tool `{tool}` denied by agent capability policy"));
1594    }
1595    if !caps.tool_allowlist.is_empty()
1596        && !caps
1597            .tool_allowlist
1598            .iter()
1599            .any(|name| normalize_tool_name(name) == *tool)
1600    {
1601        return Some(format!("tool `{tool}` not in agent allowlist"));
1602    }
1603
1604    if matches!(tool, "websearch" | "webfetch" | "webfetch_document") {
1605        if !caps.net_scopes.enabled {
1606            return Some("network disabled for this agent instance".to_string());
1607        }
1608        if !caps.net_scopes.allow_hosts.is_empty() {
1609            if tool == "websearch" {
1610                return Some(
1611                    "websearch blocked: host allowlist cannot be verified for search tool"
1612                        .to_string(),
1613                );
1614            }
1615            if let Some(host) = extract_url_host(args) {
1616                let allowed = caps.net_scopes.allow_hosts.iter().any(|h| {
1617                    let allowed = h.trim().to_ascii_lowercase();
1618                    !allowed.is_empty()
1619                        && (host == allowed || host.ends_with(&format!(".{allowed}")))
1620                });
1621                if !allowed {
1622                    return Some(format!("network host `{host}` not in allow_hosts"));
1623                }
1624            }
1625        }
1626    }
1627
1628    if tool == "bash" {
1629        let cmd = args
1630            .get("command")
1631            .and_then(|v| v.as_str())
1632            .unwrap_or("")
1633            .to_ascii_lowercase();
1634        if cmd.contains("git push") {
1635            if !caps.git_caps.push {
1636                return Some("git push disabled for this agent instance".to_string());
1637            }
1638            if caps.git_caps.push_requires_approval {
1639                let action = state.permissions.evaluate("git_push", "git_push").await;
1640                match action {
1641                    tandem_core::PermissionAction::Allow => {}
1642                    tandem_core::PermissionAction::Deny => {
1643                        return Some("git push denied by policy rule".to_string());
1644                    }
1645                    tandem_core::PermissionAction::Ask => {
1646                        let pending = state
1647                            .permissions
1648                            .ask_for_session_with_context(
1649                                Some(session_id),
1650                                "git_push",
1651                                args.clone(),
1652                                Some(tandem_core::PermissionArgsContext {
1653                                    args_source: "agent_team.git_push".to_string(),
1654                                    args_integrity: "runtime-checked".to_string(),
1655                                    query: Some(format!(
1656                                        "instanceID={} messageID={}",
1657                                        instance.instance_id, message_id
1658                                    )),
1659                                }),
1660                            )
1661                            .await;
1662                        return Some(format!(
1663                            "git push requires explicit user approval (approvalID={})",
1664                            pending.id
1665                        ));
1666                    }
1667                }
1668            }
1669        }
1670        if cmd.contains("git commit") && !caps.git_caps.commit {
1671            return Some("git commit disabled for this agent instance".to_string());
1672        }
1673    }
1674
1675    let access_kind = tool_fs_access_kind(tool);
1676    if let Some(kind) = access_kind {
1677        let Some(session) = state.storage.get_session(session_id).await else {
1678            return Some("session not found for capability evaluation".to_string());
1679        };
1680        let Some(root) = session.workspace_root.clone() else {
1681            return Some("workspace root missing for capability evaluation".to_string());
1682        };
1683        let requested = extract_tool_candidate_paths(tool, args);
1684        if !requested.is_empty() {
1685            let allowed_scopes = if kind == "read" {
1686                &caps.fs_scopes.read
1687            } else {
1688                &caps.fs_scopes.write
1689            };
1690            if allowed_scopes.is_empty() {
1691                return Some(format!("fs {kind} access blocked: no scopes configured"));
1692            }
1693            for candidate in requested {
1694                if !is_path_allowed_by_scopes(&root, &candidate, allowed_scopes) {
1695                    return Some(format!(
1696                        "fs {kind} access denied for path `{}`",
1697                        candidate
1698                    ));
1699                }
1700            }
1701        }
1702    }
1703
1704    denied_secrets_reason(tool, caps, args)
1705}
1706
1707fn denied_secrets_reason(
1708    tool: &str,
1709    caps: &tandem_orchestrator::CapabilitySpec,
1710    args: &Value,
1711) -> Option<String> {
1712    if tool == "auth" {
1713        if caps.secrets_scopes.is_empty() {
1714            return Some("secrets are disabled for this agent instance".to_string());
1715        }
1716        let alias = args
1717            .get("id")
1718            .or_else(|| args.get("provider"))
1719            .or_else(|| args.get("providerID"))
1720            .and_then(|v| v.as_str())
1721            .unwrap_or("")
1722            .trim();
1723        if !alias.is_empty() && !caps.secrets_scopes.iter().any(|allowed| allowed == alias) {
1724            return Some(format!(
1725                "secret alias `{alias}` is not in agent secretsScopes allowlist"
1726            ));
1727        }
1728    }
1729    None
1730}
1731
1732fn tool_fs_access_kind(tool: &str) -> Option<&'static str> {
1733    match tool {
1734        "read" | "glob" | "grep" | "codesearch" | "lsp" => Some("read"),
1735        "write" | "edit" | "apply_patch" => Some("write"),
1736        _ => None,
1737    }
1738}
1739
1740fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
1741    let Some(obj) = args.as_object() else {
1742        return Vec::new();
1743    };
1744    let keys: &[&str] = match tool {
1745        "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
1746        "glob" => &["pattern"],
1747        "lsp" => &["filePath", "path"],
1748        "bash" => &["cwd"],
1749        "apply_patch" => &["path"],
1750        _ => &["path", "cwd"],
1751    };
1752    keys.iter()
1753        .filter_map(|key| obj.get(*key))
1754        .filter_map(|value| value.as_str())
1755        .filter(|s| !s.trim().is_empty())
1756        .map(|raw| strip_glob_tokens(raw).to_string())
1757        .collect()
1758}
1759
1760fn strip_glob_tokens(path: &str) -> &str {
1761    let mut end = path.len();
1762    for (idx, ch) in path.char_indices() {
1763        if ch == '*' || ch == '?' || ch == '[' {
1764            end = idx;
1765            break;
1766        }
1767    }
1768    &path[..end]
1769}
1770
1771fn is_path_allowed_by_scopes(root: &str, candidate: &str, scopes: &[String]) -> bool {
1772    let root_path = PathBuf::from(root);
1773    let candidate_path = resolve_path(&root_path, candidate);
1774    scopes.iter().any(|scope| {
1775        let scope_path = resolve_path(&root_path, scope);
1776        candidate_path.starts_with(scope_path)
1777    })
1778}
1779
1780fn resolve_path(root: &Path, raw: &str) -> PathBuf {
1781    let raw = raw.trim();
1782    if raw.is_empty() {
1783        return root.to_path_buf();
1784    }
1785    let path = PathBuf::from(raw);
1786    if path.is_absolute() {
1787        path
1788    } else {
1789        root.join(path)
1790    }
1791}
1792
1793fn extract_url_host(args: &Value) -> Option<String> {
1794    let url = args
1795        .get("url")
1796        .or_else(|| args.get("uri"))
1797        .or_else(|| args.get("link"))
1798        .and_then(|v| v.as_str())?;
1799    let raw = url.trim();
1800    let (_, after_scheme) = raw.split_once("://")?;
1801    let host_port = after_scheme.split('/').next().unwrap_or_default();
1802    let host = host_port.split('@').next_back().unwrap_or_default();
1803    let host = host.split(':').next().unwrap_or_default().to_ascii_lowercase();
1804    if host.is_empty() {
1805        None
1806    } else {
1807        Some(host)
1808    }
1809}
1810
1811pub fn emit_spawn_requested(state: &AppState, req: &SpawnRequest) {
1812    emit_spawn_requested_with_context(state, req, &SpawnEventContext::default());
1813}
1814
1815pub fn emit_spawn_denied(state: &AppState, req: &SpawnRequest, decision: &SpawnDecision) {
1816    emit_spawn_denied_with_context(state, req, decision, &SpawnEventContext::default());
1817}
1818
1819pub fn emit_spawn_approved(state: &AppState, req: &SpawnRequest, instance: &AgentInstance) {
1820    emit_spawn_approved_with_context(state, req, instance, &SpawnEventContext::default());
1821}
1822
1823#[derive(Default)]
1824pub struct SpawnEventContext<'a> {
1825    pub session_id: Option<&'a str>,
1826    pub message_id: Option<&'a str>,
1827    pub run_id: Option<&'a str>,
1828}
1829
1830pub fn emit_spawn_requested_with_context(
1831    state: &AppState,
1832    req: &SpawnRequest,
1833    ctx: &SpawnEventContext<'_>,
1834) {
1835    state.event_bus.publish(EngineEvent::new(
1836        "agent_team.spawn.requested",
1837        json!({
1838            "sessionID": ctx.session_id,
1839            "messageID": ctx.message_id,
1840            "runID": ctx.run_id,
1841            "missionID": req.mission_id,
1842            "instanceID": Value::Null,
1843            "parentInstanceID": req.parent_instance_id,
1844            "source": req.source,
1845            "requestedRole": req.role,
1846            "templateID": req.template_id,
1847            "justification": req.justification,
1848            "timestampMs": crate::now_ms(),
1849        }),
1850    ));
1851}
1852
1853pub fn emit_spawn_denied_with_context(
1854    state: &AppState,
1855    req: &SpawnRequest,
1856    decision: &SpawnDecision,
1857    ctx: &SpawnEventContext<'_>,
1858) {
1859    state.event_bus.publish(EngineEvent::new(
1860        "agent_team.spawn.denied",
1861        json!({
1862            "sessionID": ctx.session_id,
1863            "messageID": ctx.message_id,
1864            "runID": ctx.run_id,
1865            "missionID": req.mission_id,
1866            "instanceID": Value::Null,
1867            "parentInstanceID": req.parent_instance_id,
1868            "source": req.source,
1869            "requestedRole": req.role,
1870            "templateID": req.template_id,
1871            "code": decision.code,
1872            "error": decision.reason,
1873            "timestampMs": crate::now_ms(),
1874        }),
1875    ));
1876}
1877
1878pub fn emit_spawn_approved_with_context(
1879    state: &AppState,
1880    req: &SpawnRequest,
1881    instance: &AgentInstance,
1882    ctx: &SpawnEventContext<'_>,
1883) {
1884    state.event_bus.publish(EngineEvent::new(
1885        "agent_team.spawn.approved",
1886        json!({
1887            "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
1888            "messageID": ctx.message_id,
1889            "runID": ctx.run_id.or(instance.run_id.as_deref()),
1890            "missionID": instance.mission_id,
1891            "instanceID": instance.instance_id,
1892            "parentInstanceID": instance.parent_instance_id,
1893            "source": req.source,
1894            "requestedRole": req.role,
1895            "templateID": instance.template_id,
1896            "skillHash": instance.skill_hash,
1897            "timestampMs": crate::now_ms(),
1898        }),
1899    ));
1900    state.event_bus.publish(EngineEvent::new(
1901        "agent_team.instance.started",
1902        json!({
1903            "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
1904            "messageID": ctx.message_id,
1905            "runID": ctx.run_id.or(instance.run_id.as_deref()),
1906            "missionID": instance.mission_id,
1907            "instanceID": instance.instance_id,
1908            "parentInstanceID": instance.parent_instance_id,
1909            "role": instance.role,
1910            "status": instance.status,
1911            "budgetLimit": instance.budget,
1912            "skillHash": instance.skill_hash,
1913            "timestampMs": crate::now_ms(),
1914        }),
1915    ));
1916}
1917
1918pub fn emit_budget_usage(
1919    state: &AppState,
1920    instance: &AgentInstance,
1921    tokens_used: u64,
1922    steps_used: u32,
1923    tool_calls_used: u32,
1924    cost_used_usd: f64,
1925    elapsed_ms: u64,
1926) {
1927    state.event_bus.publish(EngineEvent::new(
1928        "agent_team.budget.usage",
1929        json!({
1930            "sessionID": instance.session_id,
1931            "messageID": Value::Null,
1932            "runID": instance.run_id,
1933            "missionID": instance.mission_id,
1934            "instanceID": instance.instance_id,
1935            "tokensUsed": tokens_used,
1936            "stepsUsed": steps_used,
1937            "toolCallsUsed": tool_calls_used,
1938            "costUsedUsd": cost_used_usd,
1939            "elapsedMs": elapsed_ms,
1940            "timestampMs": crate::now_ms(),
1941        }),
1942    ));
1943}
1944
1945pub fn emit_budget_exhausted(
1946    state: &AppState,
1947    instance: &AgentInstance,
1948    exhausted_by: &str,
1949    tokens_used: u64,
1950    steps_used: u32,
1951    tool_calls_used: u32,
1952    cost_used_usd: f64,
1953    elapsed_ms: u64,
1954) {
1955    state.event_bus.publish(EngineEvent::new(
1956        "agent_team.budget.exhausted",
1957        json!({
1958            "sessionID": instance.session_id,
1959            "messageID": Value::Null,
1960            "runID": instance.run_id,
1961            "missionID": instance.mission_id,
1962            "instanceID": instance.instance_id,
1963            "exhaustedBy": exhausted_by,
1964            "tokensUsed": tokens_used,
1965            "stepsUsed": steps_used,
1966            "toolCallsUsed": tool_calls_used,
1967            "costUsedUsd": cost_used_usd,
1968            "elapsedMs": elapsed_ms,
1969            "timestampMs": crate::now_ms(),
1970        }),
1971    ));
1972}
1973
1974pub fn emit_instance_cancelled(state: &AppState, instance: &AgentInstance, reason: &str) {
1975    state.event_bus.publish(EngineEvent::new(
1976        "agent_team.instance.cancelled",
1977        json!({
1978            "sessionID": instance.session_id,
1979            "messageID": Value::Null,
1980            "runID": instance.run_id,
1981            "missionID": instance.mission_id,
1982            "instanceID": instance.instance_id,
1983            "parentInstanceID": instance.parent_instance_id,
1984            "role": instance.role,
1985            "status": instance.status,
1986            "reason": reason,
1987            "timestampMs": crate::now_ms(),
1988        }),
1989    ));
1990}
1991
1992pub fn emit_instance_completed(state: &AppState, instance: &AgentInstance) {
1993    state.event_bus.publish(EngineEvent::new(
1994        "agent_team.instance.completed",
1995        json!({
1996            "sessionID": instance.session_id,
1997            "messageID": Value::Null,
1998            "runID": instance.run_id,
1999            "missionID": instance.mission_id,
2000            "instanceID": instance.instance_id,
2001            "parentInstanceID": instance.parent_instance_id,
2002            "role": instance.role,
2003            "status": instance.status,
2004            "timestampMs": crate::now_ms(),
2005        }),
2006    ));
2007}
2008
2009pub fn emit_instance_failed(state: &AppState, instance: &AgentInstance) {
2010    state.event_bus.publish(EngineEvent::new(
2011        "agent_team.instance.failed",
2012        json!({
2013            "sessionID": instance.session_id,
2014            "messageID": Value::Null,
2015            "runID": instance.run_id,
2016            "missionID": instance.mission_id,
2017            "instanceID": instance.instance_id,
2018            "parentInstanceID": instance.parent_instance_id,
2019            "role": instance.role,
2020            "status": instance.status,
2021            "timestampMs": crate::now_ms(),
2022        }),
2023    ));
2024}
2025
2026pub fn emit_mission_budget_exhausted(
2027    state: &AppState,
2028    mission_id: &str,
2029    exhausted_by: &str,
2030    tokens_used: u64,
2031    steps_used: u64,
2032    tool_calls_used: u64,
2033    cost_used_usd: f64,
2034) {
2035    state.event_bus.publish(EngineEvent::new(
2036        "agent_team.mission.budget.exhausted",
2037        json!({
2038            "sessionID": Value::Null,
2039            "messageID": Value::Null,
2040            "runID": Value::Null,
2041            "missionID": mission_id,
2042            "instanceID": Value::Null,
2043            "exhaustedBy": exhausted_by,
2044            "tokensUsed": tokens_used,
2045            "stepsUsed": steps_used,
2046            "toolCallsUsed": tool_calls_used,
2047            "costUsedUsd": cost_used_usd,
2048            "timestampMs": crate::now_ms(),
2049        }),
2050    ));
2051}