1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::Context;
7use futures::future::BoxFuture;
8use serde::Deserialize;
9use serde::Serialize;
10use serde_json::{json, Value};
11use tandem_core::{
12 SpawnAgentHook, SpawnAgentToolContext, SpawnAgentToolResult, ToolPolicyContext,
13 ToolPolicyDecision, ToolPolicyHook,
14};
15use tandem_orchestrator::{
16 AgentInstance, AgentInstanceStatus, AgentRole, AgentTemplate, BudgetLimit, SpawnDecision,
17 SpawnDenyCode, SpawnPolicy, SpawnRequest, SpawnSource,
18};
19use tandem_skills::SkillService;
20use tandem_types::{EngineEvent, Session};
21use tokio::fs;
22use tokio::sync::RwLock;
23use uuid::Uuid;
24
25use crate::AppState;
26
27#[derive(Clone, Default)]
28pub struct AgentTeamRuntime {
29 policy: Arc<RwLock<Option<SpawnPolicy>>>,
30 templates: Arc<RwLock<HashMap<String, AgentTemplate>>>,
31 instances: Arc<RwLock<HashMap<String, AgentInstance>>>,
32 budgets: Arc<RwLock<HashMap<String, InstanceBudgetState>>>,
33 mission_budgets: Arc<RwLock<HashMap<String, MissionBudgetState>>>,
34 spawn_approvals: Arc<RwLock<HashMap<String, PendingSpawnApproval>>>,
35 loaded_workspace: Arc<RwLock<Option<String>>>,
36 audit_path: Arc<RwLock<PathBuf>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct SpawnResult {
41 pub decision: SpawnDecision,
42 pub instance: Option<AgentInstance>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46pub struct AgentMissionSummary {
47 #[serde(rename = "missionID")]
48 pub mission_id: String,
49 #[serde(rename = "instanceCount")]
50 pub instance_count: usize,
51 #[serde(rename = "runningCount")]
52 pub running_count: usize,
53 #[serde(rename = "completedCount")]
54 pub completed_count: usize,
55 #[serde(rename = "failedCount")]
56 pub failed_count: usize,
57 #[serde(rename = "cancelledCount")]
58 pub cancelled_count: usize,
59 #[serde(rename = "queuedCount")]
60 pub queued_count: usize,
61 #[serde(rename = "tokenUsedTotal")]
62 pub token_used_total: u64,
63 #[serde(rename = "toolCallsUsedTotal")]
64 pub tool_calls_used_total: u64,
65 #[serde(rename = "stepsUsedTotal")]
66 pub steps_used_total: u64,
67 #[serde(rename = "costUsedUsdTotal")]
68 pub cost_used_usd_total: f64,
69}
70
71#[derive(Debug, Clone, Default)]
72struct InstanceBudgetState {
73 tokens_used: u64,
74 steps_used: u32,
75 tool_calls_used: u32,
76 cost_used_usd: f64,
77 started_at: Option<Instant>,
78 exhausted: bool,
79}
80
81#[derive(Debug, Clone, Default)]
82struct MissionBudgetState {
83 tokens_used: u64,
84 steps_used: u64,
85 tool_calls_used: u64,
86 cost_used_usd: f64,
87 exhausted: bool,
88}
89
90#[derive(Debug, Clone, Serialize)]
91pub struct PendingSpawnApproval {
92 #[serde(rename = "approvalID")]
93 approval_id: String,
94 #[serde(rename = "createdAtMs")]
95 created_at_ms: u64,
96 request: SpawnRequest,
97 #[serde(rename = "decisionCode")]
98 decision_code: Option<SpawnDenyCode>,
99 reason: Option<String>,
100}
101
102#[derive(Clone)]
103pub struct ServerSpawnAgentHook {
104 state: AppState,
105}
106
107#[derive(Debug, Deserialize)]
108struct SpawnAgentToolInput {
109 #[serde(rename = "missionID")]
110 mission_id: Option<String>,
111 #[serde(rename = "parentInstanceID")]
112 parent_instance_id: Option<String>,
113 #[serde(rename = "templateID")]
114 template_id: Option<String>,
115 role: AgentRole,
116 source: Option<SpawnSource>,
117 justification: String,
118 #[serde(rename = "budgetOverride", default)]
119 budget_override: Option<BudgetLimit>,
120}
121
122impl ServerSpawnAgentHook {
123 pub fn new(state: AppState) -> Self {
124 Self { state }
125 }
126}
127
128impl SpawnAgentHook for ServerSpawnAgentHook {
129 fn spawn_agent(
130 &self,
131 ctx: SpawnAgentToolContext,
132 ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>> {
133 let state = self.state.clone();
134 Box::pin(async move {
135 let parsed = serde_json::from_value::<SpawnAgentToolInput>(ctx.args.clone());
136 let input = match parsed {
137 Ok(input) => input,
138 Err(err) => {
139 return Ok(SpawnAgentToolResult {
140 output: format!("spawn_agent denied: invalid args ({err})"),
141 metadata: json!({
142 "ok": false,
143 "code": "SPAWN_INVALID_ARGS",
144 "error": err.to_string(),
145 }),
146 });
147 }
148 };
149 let req = SpawnRequest {
150 mission_id: input.mission_id,
151 parent_instance_id: input.parent_instance_id,
152 source: input.source.unwrap_or(SpawnSource::ToolCall),
153 parent_role: None,
154 role: input.role,
155 template_id: input.template_id,
156 justification: input.justification,
157 budget_override: input.budget_override,
158 };
159
160 let event_ctx = SpawnEventContext {
161 session_id: Some(ctx.session_id.as_str()),
162 message_id: Some(ctx.message_id.as_str()),
163 run_id: None,
164 };
165 emit_spawn_requested_with_context(&state, &req, &event_ctx);
166 let result = state.agent_teams.spawn(&state, req.clone()).await;
167 if !result.decision.allowed || result.instance.is_none() {
168 emit_spawn_denied_with_context(&state, &req, &result.decision, &event_ctx);
169 return Ok(SpawnAgentToolResult {
170 output: result
171 .decision
172 .reason
173 .clone()
174 .unwrap_or_else(|| "spawn_agent denied".to_string()),
175 metadata: json!({
176 "ok": false,
177 "code": result.decision.code,
178 "error": result.decision.reason,
179 "requiresUserApproval": result.decision.requires_user_approval,
180 }),
181 });
182 }
183 let instance = result.instance.expect("checked is_some");
184 emit_spawn_approved_with_context(&state, &req, &instance, &event_ctx);
185 Ok(SpawnAgentToolResult {
186 output: format!(
187 "spawned {} as instance {} (session {})",
188 instance.template_id, instance.instance_id, instance.session_id
189 ),
190 metadata: json!({
191 "ok": true,
192 "missionID": instance.mission_id,
193 "instanceID": instance.instance_id,
194 "sessionID": instance.session_id,
195 "runID": instance.run_id,
196 "status": instance.status,
197 "skillHash": instance.skill_hash,
198 }),
199 })
200 })
201 }
202}
203
204#[derive(Clone)]
205pub struct ServerToolPolicyHook {
206 state: AppState,
207}
208
209impl ServerToolPolicyHook {
210 pub fn new(state: AppState) -> Self {
211 Self { state }
212 }
213}
214
215impl ToolPolicyHook for ServerToolPolicyHook {
216 fn evaluate_tool(
217 &self,
218 ctx: ToolPolicyContext,
219 ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>> {
220 let state = self.state.clone();
221 Box::pin(async move {
222 let Some(instance) = state
223 .agent_teams
224 .instance_for_session(&ctx.session_id)
225 .await
226 else {
227 return Ok(ToolPolicyDecision {
228 allowed: true,
229 reason: None,
230 });
231 };
232 let tool = normalize_tool_name(&ctx.tool);
233 let caps = instance.capabilities.clone();
234 let deny = evaluate_capability_deny(
235 &state,
236 &instance,
237 &tool,
238 &ctx.args,
239 &caps,
240 &ctx.session_id,
241 &ctx.message_id,
242 )
243 .await;
244 if let Some(reason) = deny {
245 state.event_bus.publish(EngineEvent::new(
246 "agent_team.capability.denied",
247 json!({
248 "sessionID": ctx.session_id,
249 "messageID": ctx.message_id,
250 "runID": instance.run_id,
251 "missionID": instance.mission_id,
252 "instanceID": instance.instance_id,
253 "tool": tool,
254 "reason": reason,
255 "timestampMs": crate::now_ms(),
256 }),
257 ));
258 return Ok(ToolPolicyDecision {
259 allowed: false,
260 reason: Some(reason),
261 });
262 }
263 Ok(ToolPolicyDecision {
264 allowed: true,
265 reason: None,
266 })
267 })
268 }
269}
270
271impl AgentTeamRuntime {
272 pub fn new(audit_path: PathBuf) -> Self {
273 Self {
274 policy: Arc::new(RwLock::new(None)),
275 templates: Arc::new(RwLock::new(HashMap::new())),
276 instances: Arc::new(RwLock::new(HashMap::new())),
277 budgets: Arc::new(RwLock::new(HashMap::new())),
278 mission_budgets: Arc::new(RwLock::new(HashMap::new())),
279 spawn_approvals: Arc::new(RwLock::new(HashMap::new())),
280 loaded_workspace: Arc::new(RwLock::new(None)),
281 audit_path: Arc::new(RwLock::new(audit_path)),
282 }
283 }
284
285 pub async fn set_audit_path(&self, path: PathBuf) {
286 *self.audit_path.write().await = path;
287 }
288
289 pub async fn list_templates(&self) -> Vec<AgentTemplate> {
290 let mut rows = self
291 .templates
292 .read()
293 .await
294 .values()
295 .cloned()
296 .collect::<Vec<_>>();
297 rows.sort_by(|a, b| a.template_id.cmp(&b.template_id));
298 rows
299 }
300
301 pub async fn list_instances(
302 &self,
303 mission_id: Option<&str>,
304 parent_instance_id: Option<&str>,
305 status: Option<AgentInstanceStatus>,
306 ) -> Vec<AgentInstance> {
307 let mut rows = self
308 .instances
309 .read()
310 .await
311 .values()
312 .filter(|instance| {
313 if let Some(mission_id) = mission_id {
314 if instance.mission_id != mission_id {
315 return false;
316 }
317 }
318 if let Some(parent_id) = parent_instance_id {
319 if instance.parent_instance_id.as_deref() != Some(parent_id) {
320 return false;
321 }
322 }
323 if let Some(status) = &status {
324 if &instance.status != status {
325 return false;
326 }
327 }
328 true
329 })
330 .cloned()
331 .collect::<Vec<_>>();
332 rows.sort_by(|a, b| a.instance_id.cmp(&b.instance_id));
333 rows
334 }
335
336 pub async fn list_mission_summaries(&self) -> Vec<AgentMissionSummary> {
337 let instances = self.instances.read().await;
338 let mut by_mission: HashMap<String, AgentMissionSummary> = HashMap::new();
339 for instance in instances.values() {
340 let row = by_mission
341 .entry(instance.mission_id.clone())
342 .or_insert_with(|| AgentMissionSummary {
343 mission_id: instance.mission_id.clone(),
344 instance_count: 0,
345 running_count: 0,
346 completed_count: 0,
347 failed_count: 0,
348 cancelled_count: 0,
349 queued_count: 0,
350 token_used_total: 0,
351 tool_calls_used_total: 0,
352 steps_used_total: 0,
353 cost_used_usd_total: 0.0,
354 });
355 row.instance_count = row.instance_count.saturating_add(1);
356 match instance.status {
357 AgentInstanceStatus::Queued => row.queued_count = row.queued_count.saturating_add(1),
358 AgentInstanceStatus::Running => {
359 row.running_count = row.running_count.saturating_add(1)
360 }
361 AgentInstanceStatus::Completed => {
362 row.completed_count = row.completed_count.saturating_add(1)
363 }
364 AgentInstanceStatus::Failed => row.failed_count = row.failed_count.saturating_add(1),
365 AgentInstanceStatus::Cancelled => {
366 row.cancelled_count = row.cancelled_count.saturating_add(1)
367 }
368 }
369 if let Some(usage) = instance
370 .metadata
371 .as_ref()
372 .and_then(|m| m.get("budgetUsage"))
373 .and_then(|u| u.as_object())
374 {
375 row.token_used_total = row.token_used_total.saturating_add(
376 usage
377 .get("tokensUsed")
378 .and_then(|v| v.as_u64())
379 .unwrap_or(0),
380 );
381 row.tool_calls_used_total = row.tool_calls_used_total.saturating_add(
382 usage
383 .get("toolCallsUsed")
384 .and_then(|v| v.as_u64())
385 .unwrap_or(0),
386 );
387 row.steps_used_total = row.steps_used_total.saturating_add(
388 usage
389 .get("stepsUsed")
390 .and_then(|v| v.as_u64())
391 .unwrap_or(0),
392 );
393 row.cost_used_usd_total += usage
394 .get("costUsedUsd")
395 .and_then(|v| v.as_f64())
396 .unwrap_or(0.0);
397 }
398 }
399 let mut rows = by_mission.into_values().collect::<Vec<_>>();
400 rows.sort_by(|a, b| a.mission_id.cmp(&b.mission_id));
401 rows
402 }
403
404 pub async fn instance_for_session(&self, session_id: &str) -> Option<AgentInstance> {
405 self.instances
406 .read()
407 .await
408 .values()
409 .find(|instance| instance.session_id == session_id)
410 .cloned()
411 }
412
413 pub async fn list_spawn_approvals(&self) -> Vec<PendingSpawnApproval> {
414 let mut rows = self
415 .spawn_approvals
416 .read()
417 .await
418 .values()
419 .cloned()
420 .collect::<Vec<_>>();
421 rows.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
422 rows
423 }
424
425 pub async fn ensure_loaded_for_workspace(&self, workspace_root: &str) -> anyhow::Result<()> {
426 let normalized = workspace_root.trim().to_string();
427 let already_loaded = self
428 .loaded_workspace
429 .read()
430 .await
431 .as_ref()
432 .map(|s| s == &normalized)
433 .unwrap_or(false);
434 if already_loaded {
435 return Ok(());
436 }
437
438 let root = PathBuf::from(&normalized);
439 let policy_path = root
440 .join(".tandem")
441 .join("agent-team")
442 .join("spawn-policy.yaml");
443 let templates_dir = root.join(".tandem").join("agent-team").join("templates");
444
445 let mut next_policy = None;
446 if policy_path.exists() {
447 let raw = fs::read_to_string(&policy_path)
448 .await
449 .with_context(|| format!("failed reading {}", policy_path.display()))?;
450 let parsed = serde_yaml::from_str::<SpawnPolicy>(&raw)
451 .with_context(|| format!("failed parsing {}", policy_path.display()))?;
452 next_policy = Some(parsed);
453 }
454
455 let mut next_templates = HashMap::new();
456 if templates_dir.exists() {
457 let mut entries = fs::read_dir(&templates_dir).await?;
458 while let Some(entry) = entries.next_entry().await? {
459 let path = entry.path();
460 if !path.is_file() {
461 continue;
462 }
463 let ext = path
464 .extension()
465 .and_then(|v| v.to_str())
466 .unwrap_or_default()
467 .to_ascii_lowercase();
468 if ext != "yaml" && ext != "yml" && ext != "json" {
469 continue;
470 }
471 let raw = fs::read_to_string(&path).await?;
472 let template = serde_yaml::from_str::<AgentTemplate>(&raw)
473 .with_context(|| format!("failed parsing {}", path.display()))?;
474 next_templates.insert(template.template_id.clone(), template);
475 }
476 }
477
478 *self.policy.write().await = next_policy;
479 *self.templates.write().await = next_templates;
480 *self.loaded_workspace.write().await = Some(normalized);
481 Ok(())
482 }
483
484 pub async fn spawn(&self, state: &AppState, mut req: SpawnRequest) -> SpawnResult {
485 let workspace_root = state.workspace_index.snapshot().await.root;
486 if let Err(err) = self.ensure_loaded_for_workspace(&workspace_root).await {
487 return SpawnResult {
488 decision: SpawnDecision {
489 allowed: false,
490 code: Some(SpawnDenyCode::SpawnPolicyMissing),
491 reason: Some(format!("spawn policy load failed: {}", err)),
492 requires_user_approval: false,
493 },
494 instance: None,
495 };
496 }
497
498 let Some(policy) = self.policy.read().await.clone() else {
499 return SpawnResult {
500 decision: SpawnDecision {
501 allowed: false,
502 code: Some(SpawnDenyCode::SpawnPolicyMissing),
503 reason: Some("spawn policy file missing".to_string()),
504 requires_user_approval: false,
505 },
506 instance: None,
507 };
508 };
509
510 let template = {
511 let templates = self.templates.read().await;
512 req.template_id
513 .as_deref()
514 .and_then(|template_id| templates.get(template_id).cloned())
515 };
516 if req.template_id.is_none() {
517 if let Some(found) = self
518 .templates
519 .read()
520 .await
521 .values()
522 .find(|t| t.role == req.role)
523 .cloned()
524 {
525 req.template_id = Some(found.template_id.clone());
526 }
527 }
528 let template = if template.is_some() {
529 template
530 } else {
531 let templates = self.templates.read().await;
532 req.template_id
533 .as_deref()
534 .and_then(|id| templates.get(id).cloned())
535 };
536
537 if req.parent_role.is_none() {
538 if let Some(parent_id) = req.parent_instance_id.as_deref() {
539 let instances = self.instances.read().await;
540 req.parent_role = instances
541 .get(parent_id)
542 .map(|instance| instance.role.clone());
543 }
544 }
545
546 let instances = self.instances.read().await;
547 let total_agents = instances.len();
548 let running_agents = instances
549 .values()
550 .filter(|instance| instance.status == AgentInstanceStatus::Running)
551 .count();
552 drop(instances);
553
554 let decision = policy.evaluate(&req, total_agents, running_agents, template.as_ref());
555 if !decision.allowed {
556 if decision.requires_user_approval {
557 self.queue_spawn_approval(&req, &decision).await;
558 }
559 return SpawnResult {
560 decision,
561 instance: None,
562 };
563 }
564
565 let mission_id = req
566 .mission_id
567 .clone()
568 .unwrap_or_else(|| "mission-default".to_string());
569
570 if let Some(reason) = self
571 .mission_budget_exceeded_reason(&policy, &mission_id)
572 .await
573 {
574 return SpawnResult {
575 decision: SpawnDecision {
576 allowed: false,
577 code: Some(SpawnDenyCode::SpawnMissionBudgetExceeded),
578 reason: Some(reason),
579 requires_user_approval: false,
580 },
581 instance: None,
582 };
583 }
584
585 let template = template.unwrap_or_else(|| AgentTemplate {
586 template_id: "default-template".to_string(),
587 role: req.role.clone(),
588 system_prompt: None,
589 skills: Vec::new(),
590 default_budget: BudgetLimit::default(),
591 capabilities: Default::default(),
592 });
593
594 let skill_hash = match compute_skill_hash(&workspace_root, &template, &policy).await {
595 Ok(hash) => hash,
596 Err(err) => {
597 let lowered = err.to_ascii_lowercase();
598 let code = if lowered.contains("pinned hash mismatch") {
599 SpawnDenyCode::SpawnSkillHashMismatch
600 } else if lowered.contains("skill source denied") {
601 SpawnDenyCode::SpawnSkillSourceDenied
602 } else {
603 SpawnDenyCode::SpawnRequiredSkillMissing
604 };
605 return SpawnResult {
606 decision: SpawnDecision {
607 allowed: false,
608 code: Some(code),
609 reason: Some(err),
610 requires_user_approval: false,
611 },
612 instance: None,
613 };
614 }
615 };
616
617 let parent_snapshot = {
618 let instances = self.instances.read().await;
619 req.parent_instance_id
620 .as_deref()
621 .and_then(|id| instances.get(id).cloned())
622 };
623 let parent_usage = if let Some(parent_id) = req.parent_instance_id.as_deref() {
624 self.budgets.read().await.get(parent_id).cloned()
625 } else {
626 None
627 };
628
629 let budget = resolve_budget(
630 &policy,
631 parent_snapshot,
632 parent_usage,
633 &template,
634 req.budget_override.clone(),
635 &req.role,
636 );
637
638 let mut session = Session::new(
639 Some(format!("Agent Team {}", template.template_id)),
640 Some(workspace_root.clone()),
641 );
642 session.workspace_root = Some(workspace_root.clone());
643 let session_id = session.id.clone();
644 if let Err(err) = state.storage.save_session(session).await {
645 return SpawnResult {
646 decision: SpawnDecision {
647 allowed: false,
648 code: Some(SpawnDenyCode::SpawnPolicyMissing),
649 reason: Some(format!("failed creating child session: {}", err)),
650 requires_user_approval: false,
651 },
652 instance: None,
653 };
654 }
655
656 let instance = AgentInstance {
657 instance_id: format!("ins_{}", Uuid::new_v4().simple()),
658 mission_id: mission_id.clone(),
659 parent_instance_id: req.parent_instance_id.clone(),
660 role: template.role.clone(),
661 template_id: template.template_id.clone(),
662 session_id: session_id.clone(),
663 run_id: None,
664 status: AgentInstanceStatus::Running,
665 budget,
666 skill_hash: skill_hash.clone(),
667 capabilities: template.capabilities.clone(),
668 metadata: Some(json!({
669 "source": req.source,
670 "justification": req.justification,
671 })),
672 };
673
674 self.instances
675 .write()
676 .await
677 .insert(instance.instance_id.clone(), instance.clone());
678 self.budgets.write().await.insert(
679 instance.instance_id.clone(),
680 InstanceBudgetState {
681 started_at: Some(Instant::now()),
682 ..InstanceBudgetState::default()
683 },
684 );
685 let _ = self.append_audit("spawn.approved", &instance).await;
686
687 SpawnResult {
688 decision: SpawnDecision {
689 allowed: true,
690 code: None,
691 reason: None,
692 requires_user_approval: false,
693 },
694 instance: Some(instance),
695 }
696 }
697
698 pub async fn cancel_instance(
699 &self,
700 state: &AppState,
701 instance_id: &str,
702 reason: &str,
703 ) -> Option<AgentInstance> {
704 let mut instances = self.instances.write().await;
705 let instance = instances.get_mut(instance_id)?;
706 if matches!(
707 instance.status,
708 AgentInstanceStatus::Completed
709 | AgentInstanceStatus::Failed
710 | AgentInstanceStatus::Cancelled
711 ) {
712 return Some(instance.clone());
713 }
714 instance.status = AgentInstanceStatus::Cancelled;
715 let snapshot = instance.clone();
716 drop(instances);
717 let _ = state.cancellations.cancel(&snapshot.session_id).await;
718 let _ = self.append_audit("instance.cancelled", &snapshot).await;
719 emit_instance_cancelled(state, &snapshot, reason);
720 Some(snapshot)
721 }
722
723 async fn queue_spawn_approval(&self, req: &SpawnRequest, decision: &SpawnDecision) {
724 let approval = PendingSpawnApproval {
725 approval_id: format!("spawn_{}", Uuid::new_v4().simple()),
726 created_at_ms: crate::now_ms(),
727 request: req.clone(),
728 decision_code: decision.code.clone(),
729 reason: decision.reason.clone(),
730 };
731 self.spawn_approvals
732 .write()
733 .await
734 .insert(approval.approval_id.clone(), approval);
735 }
736
737 async fn mission_budget_exceeded_reason(
738 &self,
739 policy: &SpawnPolicy,
740 mission_id: &str,
741 ) -> Option<String> {
742 let Some(limit) = policy.mission_total_budget.as_ref() else {
743 return None;
744 };
745 let usage = self
746 .mission_budgets
747 .read()
748 .await
749 .get(mission_id)
750 .cloned()
751 .unwrap_or_default();
752 if let Some(max) = limit.max_tokens {
753 if usage.tokens_used >= max {
754 return Some(format!("mission max_tokens exhausted ({}/{})", usage.tokens_used, max));
755 }
756 }
757 if let Some(max) = limit.max_steps {
758 if usage.steps_used >= u64::from(max) {
759 return Some(format!("mission max_steps exhausted ({}/{})", usage.steps_used, max));
760 }
761 }
762 if let Some(max) = limit.max_tool_calls {
763 if usage.tool_calls_used >= u64::from(max) {
764 return Some(format!(
765 "mission max_tool_calls exhausted ({}/{})",
766 usage.tool_calls_used, max
767 ));
768 }
769 }
770 if let Some(max) = limit.max_cost_usd {
771 if usage.cost_used_usd >= max {
772 return Some(format!(
773 "mission max_cost_usd exhausted ({:.6}/{:.6})",
774 usage.cost_used_usd, max
775 ));
776 }
777 }
778 None
779 }
780
781 pub async fn cancel_mission(&self, state: &AppState, mission_id: &str, reason: &str) -> usize {
782 let instance_ids = self
783 .instances
784 .read()
785 .await
786 .values()
787 .filter(|instance| instance.mission_id == mission_id)
788 .map(|instance| instance.instance_id.clone())
789 .collect::<Vec<_>>();
790 let mut count = 0usize;
791 for instance_id in instance_ids {
792 if self
793 .cancel_instance(state, &instance_id, reason)
794 .await
795 .is_some()
796 {
797 count = count.saturating_add(1);
798 }
799 }
800 count
801 }
802
803 async fn mark_instance_terminal(
804 &self,
805 state: &AppState,
806 instance_id: &str,
807 status: AgentInstanceStatus,
808 ) -> Option<AgentInstance> {
809 let mut instances = self.instances.write().await;
810 let instance = instances.get_mut(instance_id)?;
811 if matches!(
812 instance.status,
813 AgentInstanceStatus::Completed
814 | AgentInstanceStatus::Failed
815 | AgentInstanceStatus::Cancelled
816 ) {
817 return Some(instance.clone());
818 }
819 instance.status = status.clone();
820 let snapshot = instance.clone();
821 drop(instances);
822 match status {
823 AgentInstanceStatus::Completed => emit_instance_completed(state, &snapshot),
824 AgentInstanceStatus::Failed => emit_instance_failed(state, &snapshot),
825 _ => {}
826 }
827 Some(snapshot)
828 }
829
830 pub async fn handle_engine_event(&self, state: &AppState, event: &EngineEvent) {
831 let Some(session_id) = extract_session_id(event) else {
832 return;
833 };
834 let Some(instance_id) = self.instance_id_for_session(&session_id).await else {
835 return;
836 };
837 if event.event_type == "provider.usage" {
838 let total_tokens = event
839 .properties
840 .get("totalTokens")
841 .and_then(|v| v.as_u64())
842 .unwrap_or(0);
843 let cost_used_usd = event
844 .properties
845 .get("costUsd")
846 .and_then(|v| v.as_f64())
847 .unwrap_or(0.0);
848 if total_tokens > 0 {
849 let exhausted = self
850 .apply_exact_token_usage(state, &instance_id, total_tokens, cost_used_usd)
851 .await;
852 if exhausted {
853 let _ = self
854 .cancel_instance(state, &instance_id, "budget exhausted")
855 .await;
856 }
857 }
858 return;
859 }
860 let mut delta_tokens = 0u64;
861 let mut delta_steps = 0u32;
862 let mut delta_tool_calls = 0u32;
863 if event.event_type == "message.part.updated" {
864 if let Some(part) = event.properties.get("part") {
865 let part_type = part.get("type").and_then(|v| v.as_str()).unwrap_or("");
866 if part_type == "tool-invocation" {
867 delta_tool_calls = 1;
868 } else if part_type == "text" {
869 let delta = event
870 .properties
871 .get("delta")
872 .and_then(|v| v.as_str())
873 .unwrap_or("");
874 if !delta.is_empty() {
875 delta_tokens = estimate_tokens(delta);
876 }
877 }
878 }
879 } else if event.event_type == "session.run.finished" {
880 delta_steps = 1;
881 let run_status = event
882 .properties
883 .get("status")
884 .and_then(|v| v.as_str())
885 .unwrap_or("")
886 .to_ascii_lowercase();
887 if run_status == "completed" {
888 let _ = self
889 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Completed)
890 .await;
891 } else if run_status == "failed" || run_status == "error" {
892 let _ = self
893 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Failed)
894 .await;
895 }
896 }
897 if delta_tokens == 0 && delta_steps == 0 && delta_tool_calls == 0 {
898 return;
899 }
900 let exhausted = self
901 .apply_budget_delta(
902 state,
903 &instance_id,
904 delta_tokens,
905 delta_steps,
906 delta_tool_calls,
907 )
908 .await;
909 if exhausted {
910 let _ = self
911 .cancel_instance(state, &instance_id, "budget exhausted")
912 .await;
913 }
914 }
915
916 async fn instance_id_for_session(&self, session_id: &str) -> Option<String> {
917 self.instances
918 .read()
919 .await
920 .values()
921 .find(|instance| instance.session_id == session_id)
922 .map(|instance| instance.instance_id.clone())
923 }
924
925 async fn apply_budget_delta(
926 &self,
927 state: &AppState,
928 instance_id: &str,
929 delta_tokens: u64,
930 delta_steps: u32,
931 delta_tool_calls: u32,
932 ) -> bool {
933 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
934 enabled: false,
935 require_justification: false,
936 max_agents: None,
937 max_concurrent: None,
938 child_budget_percent_of_parent_remaining: None,
939 mission_total_budget: None,
940 cost_per_1k_tokens_usd: None,
941 spawn_edges: HashMap::new(),
942 required_skills: HashMap::new(),
943 role_defaults: HashMap::new(),
944 skill_sources: Default::default(),
945 });
946 let mut budgets = self.budgets.write().await;
947 let Some(usage) = budgets.get_mut(instance_id) else {
948 return false;
949 };
950 if usage.exhausted {
951 return true;
952 }
953 let prev_cost_used_usd = usage.cost_used_usd;
954 usage.tokens_used = usage.tokens_used.saturating_add(delta_tokens);
955 usage.steps_used = usage.steps_used.saturating_add(delta_steps);
956 usage.tool_calls_used = usage.tool_calls_used.saturating_add(delta_tool_calls);
957 if let Some(rate) = policy.cost_per_1k_tokens_usd {
958 usage.cost_used_usd += (delta_tokens as f64 / 1000.0) * rate;
959 }
960 let elapsed_ms = usage
961 .started_at
962 .map(|started| started.elapsed().as_millis() as u64)
963 .unwrap_or(0);
964
965 let mut exhausted_reason: Option<&'static str> = None;
966 let mut snapshot: Option<AgentInstance> = None;
967 {
968 let mut instances = self.instances.write().await;
969 if let Some(instance) = instances.get_mut(instance_id) {
970 instance.metadata = Some(merge_metadata_usage(
971 instance.metadata.take(),
972 usage.tokens_used,
973 usage.steps_used,
974 usage.tool_calls_used,
975 usage.cost_used_usd,
976 elapsed_ms,
977 ));
978 if let Some(limit) = instance.budget.max_tokens {
979 if usage.tokens_used >= limit {
980 exhausted_reason = Some("max_tokens");
981 }
982 }
983 if exhausted_reason.is_none() {
984 if let Some(limit) = instance.budget.max_steps {
985 if usage.steps_used >= limit {
986 exhausted_reason = Some("max_steps");
987 }
988 }
989 }
990 if exhausted_reason.is_none() {
991 if let Some(limit) = instance.budget.max_tool_calls {
992 if usage.tool_calls_used >= limit {
993 exhausted_reason = Some("max_tool_calls");
994 }
995 }
996 }
997 if exhausted_reason.is_none() {
998 if let Some(limit) = instance.budget.max_duration_ms {
999 if elapsed_ms >= limit {
1000 exhausted_reason = Some("max_duration_ms");
1001 }
1002 }
1003 }
1004 if exhausted_reason.is_none() {
1005 if let Some(limit) = instance.budget.max_cost_usd {
1006 if usage.cost_used_usd >= limit {
1007 exhausted_reason = Some("max_cost_usd");
1008 }
1009 }
1010 }
1011 snapshot = Some(instance.clone());
1012 }
1013 }
1014 let Some(instance) = snapshot else {
1015 return false;
1016 };
1017 emit_budget_usage(
1018 state,
1019 &instance,
1020 usage.tokens_used,
1021 usage.steps_used,
1022 usage.tool_calls_used,
1023 usage.cost_used_usd,
1024 elapsed_ms,
1025 );
1026 let mission_exhausted = self
1027 .apply_mission_budget_delta(
1028 state,
1029 &instance.mission_id,
1030 delta_tokens,
1031 u64::from(delta_steps),
1032 u64::from(delta_tool_calls),
1033 usage.cost_used_usd - prev_cost_used_usd,
1034 &policy,
1035 )
1036 .await;
1037 if mission_exhausted {
1038 usage.exhausted = true;
1039 let _ = self
1040 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1041 .await;
1042 return true;
1043 }
1044 if let Some(reason) = exhausted_reason {
1045 usage.exhausted = true;
1046 emit_budget_exhausted(
1047 state,
1048 &instance,
1049 reason,
1050 usage.tokens_used,
1051 usage.steps_used,
1052 usage.tool_calls_used,
1053 usage.cost_used_usd,
1054 elapsed_ms,
1055 );
1056 return true;
1057 }
1058 false
1059 }
1060
1061 async fn apply_exact_token_usage(
1062 &self,
1063 state: &AppState,
1064 instance_id: &str,
1065 total_tokens: u64,
1066 cost_used_usd: f64,
1067 ) -> bool {
1068 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1069 enabled: false,
1070 require_justification: false,
1071 max_agents: None,
1072 max_concurrent: None,
1073 child_budget_percent_of_parent_remaining: None,
1074 mission_total_budget: None,
1075 cost_per_1k_tokens_usd: None,
1076 spawn_edges: HashMap::new(),
1077 required_skills: HashMap::new(),
1078 role_defaults: HashMap::new(),
1079 skill_sources: Default::default(),
1080 });
1081 let mut budgets = self.budgets.write().await;
1082 let Some(usage) = budgets.get_mut(instance_id) else {
1083 return false;
1084 };
1085 if usage.exhausted {
1086 return true;
1087 }
1088 let prev_tokens = usage.tokens_used;
1089 let prev_cost_used_usd = usage.cost_used_usd;
1090 usage.tokens_used = usage.tokens_used.max(total_tokens);
1091 if cost_used_usd > 0.0 {
1092 usage.cost_used_usd = usage.cost_used_usd.max(cost_used_usd);
1093 } else if let Some(rate) = policy.cost_per_1k_tokens_usd {
1094 let delta = usage.tokens_used.saturating_sub(prev_tokens);
1095 usage.cost_used_usd += (delta as f64 / 1000.0) * rate;
1096 }
1097 let elapsed_ms = usage
1098 .started_at
1099 .map(|started| started.elapsed().as_millis() as u64)
1100 .unwrap_or(0);
1101 let mut exhausted_reason: Option<&'static str> = None;
1102 let mut snapshot: Option<AgentInstance> = None;
1103 {
1104 let mut instances = self.instances.write().await;
1105 if let Some(instance) = instances.get_mut(instance_id) {
1106 instance.metadata = Some(merge_metadata_usage(
1107 instance.metadata.take(),
1108 usage.tokens_used,
1109 usage.steps_used,
1110 usage.tool_calls_used,
1111 usage.cost_used_usd,
1112 elapsed_ms,
1113 ));
1114 if let Some(limit) = instance.budget.max_tokens {
1115 if usage.tokens_used >= limit {
1116 exhausted_reason = Some("max_tokens");
1117 }
1118 }
1119 if exhausted_reason.is_none() {
1120 if let Some(limit) = instance.budget.max_cost_usd {
1121 if usage.cost_used_usd >= limit {
1122 exhausted_reason = Some("max_cost_usd");
1123 }
1124 }
1125 }
1126 snapshot = Some(instance.clone());
1127 }
1128 }
1129 let Some(instance) = snapshot else {
1130 return false;
1131 };
1132 emit_budget_usage(
1133 state,
1134 &instance,
1135 usage.tokens_used,
1136 usage.steps_used,
1137 usage.tool_calls_used,
1138 usage.cost_used_usd,
1139 elapsed_ms,
1140 );
1141 let mission_exhausted = self
1142 .apply_mission_budget_delta(
1143 state,
1144 &instance.mission_id,
1145 usage.tokens_used.saturating_sub(prev_tokens),
1146 0,
1147 0,
1148 usage.cost_used_usd - prev_cost_used_usd,
1149 &policy,
1150 )
1151 .await;
1152 if mission_exhausted {
1153 usage.exhausted = true;
1154 let _ = self
1155 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1156 .await;
1157 return true;
1158 }
1159 if let Some(reason) = exhausted_reason {
1160 usage.exhausted = true;
1161 emit_budget_exhausted(
1162 state,
1163 &instance,
1164 reason,
1165 usage.tokens_used,
1166 usage.steps_used,
1167 usage.tool_calls_used,
1168 usage.cost_used_usd,
1169 elapsed_ms,
1170 );
1171 return true;
1172 }
1173 false
1174 }
1175
1176 async fn append_audit(&self, action: &str, instance: &AgentInstance) -> anyhow::Result<()> {
1177 let path = self.audit_path.read().await.clone();
1178 if let Some(parent) = path.parent() {
1179 fs::create_dir_all(parent).await?;
1180 }
1181 let row = json!({
1182 "action": action,
1183 "missionID": instance.mission_id,
1184 "instanceID": instance.instance_id,
1185 "parentInstanceID": instance.parent_instance_id,
1186 "role": instance.role,
1187 "templateID": instance.template_id,
1188 "sessionID": instance.session_id,
1189 "skillHash": instance.skill_hash,
1190 "timestampMs": crate::now_ms(),
1191 });
1192 let mut existing = if path.exists() {
1193 fs::read_to_string(&path).await.unwrap_or_default()
1194 } else {
1195 String::new()
1196 };
1197 existing.push_str(&serde_json::to_string(&row)?);
1198 existing.push('\n');
1199 fs::write(path, existing).await?;
1200 Ok(())
1201 }
1202
1203 async fn apply_mission_budget_delta(
1204 &self,
1205 state: &AppState,
1206 mission_id: &str,
1207 delta_tokens: u64,
1208 delta_steps: u64,
1209 delta_tool_calls: u64,
1210 delta_cost_used_usd: f64,
1211 policy: &SpawnPolicy,
1212 ) -> bool {
1213 let mut budgets = self.mission_budgets.write().await;
1214 let row = budgets.entry(mission_id.to_string()).or_default();
1215 row.tokens_used = row.tokens_used.saturating_add(delta_tokens);
1216 row.steps_used = row.steps_used.saturating_add(delta_steps);
1217 row.tool_calls_used = row.tool_calls_used.saturating_add(delta_tool_calls);
1218 row.cost_used_usd += delta_cost_used_usd.max(0.0);
1219 if row.exhausted {
1220 return true;
1221 }
1222 let Some(limit) = policy.mission_total_budget.as_ref() else {
1223 return false;
1224 };
1225 let mut exhausted_by: Option<&'static str> = None;
1226 if let Some(max) = limit.max_tokens {
1227 if row.tokens_used >= max {
1228 exhausted_by = Some("mission_max_tokens");
1229 }
1230 }
1231 if exhausted_by.is_none() {
1232 if let Some(max) = limit.max_steps {
1233 if row.steps_used >= u64::from(max) {
1234 exhausted_by = Some("mission_max_steps");
1235 }
1236 }
1237 }
1238 if exhausted_by.is_none() {
1239 if let Some(max) = limit.max_tool_calls {
1240 if row.tool_calls_used >= u64::from(max) {
1241 exhausted_by = Some("mission_max_tool_calls");
1242 }
1243 }
1244 }
1245 if exhausted_by.is_none() {
1246 if let Some(max) = limit.max_cost_usd {
1247 if row.cost_used_usd >= max {
1248 exhausted_by = Some("mission_max_cost_usd");
1249 }
1250 }
1251 }
1252 if let Some(exhausted_by) = exhausted_by {
1253 row.exhausted = true;
1254 emit_mission_budget_exhausted(
1255 state,
1256 mission_id,
1257 exhausted_by,
1258 row.tokens_used,
1259 row.steps_used,
1260 row.tool_calls_used,
1261 row.cost_used_usd,
1262 );
1263 return true;
1264 }
1265 false
1266 }
1267
1268 pub async fn set_for_test(
1269 &self,
1270 workspace_root: Option<String>,
1271 policy: Option<SpawnPolicy>,
1272 templates: Vec<AgentTemplate>,
1273 ) {
1274 *self.policy.write().await = policy;
1275 let mut by_id = HashMap::new();
1276 for template in templates {
1277 by_id.insert(template.template_id.clone(), template);
1278 }
1279 *self.templates.write().await = by_id;
1280 self.instances.write().await.clear();
1281 self.budgets.write().await.clear();
1282 self.mission_budgets.write().await.clear();
1283 self.spawn_approvals.write().await.clear();
1284 *self.loaded_workspace.write().await = workspace_root;
1285 }
1286}
1287
1288fn resolve_budget(
1289 policy: &SpawnPolicy,
1290 parent_instance: Option<AgentInstance>,
1291 parent_usage: Option<InstanceBudgetState>,
1292 template: &AgentTemplate,
1293 override_budget: Option<BudgetLimit>,
1294 role: &AgentRole,
1295) -> BudgetLimit {
1296 let role_default = policy.role_defaults.get(role).cloned().unwrap_or_default();
1297 let mut chosen = merge_budget(
1298 merge_budget(role_default, template.default_budget.clone()),
1299 override_budget.unwrap_or_default(),
1300 );
1301
1302 if let Some(parent) = parent_instance {
1303 let usage = parent_usage.unwrap_or_default();
1304 if let Some(pct) = policy.child_budget_percent_of_parent_remaining {
1305 if pct > 0 {
1306 chosen.max_tokens = cap_budget_remaining_u64(
1307 chosen.max_tokens,
1308 parent.budget.max_tokens,
1309 usage.tokens_used,
1310 pct,
1311 );
1312 chosen.max_steps = cap_budget_remaining_u32(
1313 chosen.max_steps,
1314 parent.budget.max_steps,
1315 usage.steps_used,
1316 pct,
1317 );
1318 chosen.max_tool_calls = cap_budget_remaining_u32(
1319 chosen.max_tool_calls,
1320 parent.budget.max_tool_calls,
1321 usage.tool_calls_used,
1322 pct,
1323 );
1324 chosen.max_duration_ms = cap_budget_remaining_u64(
1325 chosen.max_duration_ms,
1326 parent.budget.max_duration_ms,
1327 usage
1328 .started_at
1329 .map(|started| started.elapsed().as_millis() as u64)
1330 .unwrap_or(0),
1331 pct,
1332 );
1333 chosen.max_cost_usd = cap_budget_remaining_f64(
1334 chosen.max_cost_usd,
1335 parent.budget.max_cost_usd,
1336 usage.cost_used_usd,
1337 pct,
1338 );
1339 }
1340 }
1341 }
1342 chosen
1343}
1344
1345fn merge_budget(base: BudgetLimit, overlay: BudgetLimit) -> BudgetLimit {
1346 BudgetLimit {
1347 max_tokens: overlay.max_tokens.or(base.max_tokens),
1348 max_steps: overlay.max_steps.or(base.max_steps),
1349 max_tool_calls: overlay.max_tool_calls.or(base.max_tool_calls),
1350 max_duration_ms: overlay.max_duration_ms.or(base.max_duration_ms),
1351 max_cost_usd: overlay.max_cost_usd.or(base.max_cost_usd),
1352 }
1353}
1354
1355fn cap_budget_remaining_u64(
1356 child: Option<u64>,
1357 parent_limit: Option<u64>,
1358 parent_used: u64,
1359 pct: u8,
1360) -> Option<u64> {
1361 match (child, parent_limit) {
1362 (Some(child), Some(parent_limit)) => {
1363 let remaining = parent_limit.saturating_sub(parent_used);
1364 Some(child.min(remaining.saturating_mul(pct as u64) / 100))
1365 }
1366 (None, Some(parent_limit)) => {
1367 let remaining = parent_limit.saturating_sub(parent_used);
1368 Some(remaining.saturating_mul(pct as u64) / 100)
1369 }
1370 (Some(child), None) => Some(child),
1371 (None, None) => None,
1372 }
1373}
1374
1375fn cap_budget_remaining_u32(
1376 child: Option<u32>,
1377 parent_limit: Option<u32>,
1378 parent_used: u32,
1379 pct: u8,
1380) -> Option<u32> {
1381 match (child, parent_limit) {
1382 (Some(child), Some(parent_limit)) => {
1383 let remaining = parent_limit.saturating_sub(parent_used);
1384 Some(child.min(remaining.saturating_mul(pct as u32) / 100))
1385 }
1386 (None, Some(parent_limit)) => {
1387 let remaining = parent_limit.saturating_sub(parent_used);
1388 Some(remaining.saturating_mul(pct as u32) / 100)
1389 }
1390 (Some(child), None) => Some(child),
1391 (None, None) => None,
1392 }
1393}
1394
1395fn cap_budget_remaining_f64(
1396 child: Option<f64>,
1397 parent_limit: Option<f64>,
1398 parent_used: f64,
1399 pct: u8,
1400) -> Option<f64> {
1401 match (child, parent_limit) {
1402 (Some(child), Some(parent_limit)) => {
1403 let remaining = (parent_limit - parent_used).max(0.0);
1404 Some(child.min(remaining * f64::from(pct) / 100.0))
1405 }
1406 (None, Some(parent_limit)) => {
1407 let remaining = (parent_limit - parent_used).max(0.0);
1408 Some(remaining * f64::from(pct) / 100.0)
1409 }
1410 (Some(child), None) => Some(child),
1411 (None, None) => None,
1412 }
1413}
1414
1415async fn compute_skill_hash(
1416 workspace_root: &str,
1417 template: &AgentTemplate,
1418 policy: &SpawnPolicy,
1419) -> Result<String, String> {
1420 use sha2::{Digest, Sha256};
1421 let mut rows = Vec::new();
1422 let skill_service = SkillService::for_workspace(Some(PathBuf::from(workspace_root)));
1423 for skill in &template.skills {
1424 if let Some(path) = skill.path.as_deref() {
1425 validate_skill_source(skill.id.as_deref(), Some(path), policy)?;
1426 let skill_path = Path::new(workspace_root).join(path);
1427 let raw = fs::read_to_string(&skill_path)
1428 .await
1429 .map_err(|_| format!("missing required skill path `{}`", skill_path.display()))?;
1430 let digest = hash_hex(raw.as_bytes());
1431 validate_pinned_hash(skill.id.as_deref(), Some(path), &digest, policy)?;
1432 rows.push(format!("path:{}:{}", path, digest));
1433 } else if let Some(id) = skill.id.as_deref() {
1434 validate_skill_source(Some(id), None, policy)?;
1435 let loaded = skill_service
1436 .load_skill(id)
1437 .map_err(|err| format!("failed loading skill `{id}`: {err}"))?;
1438 let Some(loaded) = loaded else {
1439 return Err(format!("missing required skill id `{id}`"));
1440 };
1441 let digest = hash_hex(loaded.content.as_bytes());
1442 validate_pinned_hash(Some(id), None, &digest, policy)?;
1443 rows.push(format!("id:{}:{}", id, digest));
1444 }
1445 }
1446 rows.sort();
1447 let mut hasher = Sha256::new();
1448 for row in rows {
1449 hasher.update(row.as_bytes());
1450 hasher.update(b"\n");
1451 }
1452 let digest = hasher.finalize();
1453 Ok(format!("sha256:{}", hash_hex(digest.as_slice())))
1454}
1455
1456fn validate_skill_source(
1457 id: Option<&str>,
1458 path: Option<&str>,
1459 policy: &SpawnPolicy,
1460) -> Result<(), String> {
1461 use tandem_orchestrator::SkillSourceMode;
1462 match policy.skill_sources.mode {
1463 SkillSourceMode::Any => Ok(()),
1464 SkillSourceMode::ProjectOnly => {
1465 if id.is_some() {
1466 return Err("skill source denied: project_only forbids skill IDs".to_string());
1467 }
1468 let Some(path) = path else {
1469 return Err("skill source denied: project_only requires skill path".to_string());
1470 };
1471 let p = PathBuf::from(path);
1472 if p.is_absolute() {
1473 return Err("skill source denied: absolute skill paths are forbidden".to_string());
1474 }
1475 Ok(())
1476 }
1477 SkillSourceMode::Allowlist => {
1478 if let Some(id) = id {
1479 if policy.skill_sources.allowlist_ids.iter().any(|v| v == id) {
1480 return Ok(());
1481 }
1482 }
1483 if let Some(path) = path {
1484 if policy
1485 .skill_sources
1486 .allowlist_paths
1487 .iter()
1488 .any(|v| v == path)
1489 {
1490 return Ok(());
1491 }
1492 }
1493 Err("skill source denied: not present in allowlist".to_string())
1494 }
1495 }
1496}
1497
1498fn validate_pinned_hash(
1499 id: Option<&str>,
1500 path: Option<&str>,
1501 digest: &str,
1502 policy: &SpawnPolicy,
1503) -> Result<(), String> {
1504 let by_id = id.and_then(|id| policy.skill_sources.pinned_hashes.get(&format!("id:{id}")));
1505 let by_path =
1506 path.and_then(|path| policy.skill_sources.pinned_hashes.get(&format!("path:{path}")));
1507 let expected = by_id.or(by_path);
1508 if let Some(expected) = expected {
1509 let normalized = expected.strip_prefix("sha256:").unwrap_or(expected);
1510 if normalized != digest {
1511 return Err("pinned hash mismatch for skill reference".to_string());
1512 }
1513 }
1514 Ok(())
1515}
1516
1517fn hash_hex(bytes: &[u8]) -> String {
1518 let mut out = String::with_capacity(bytes.len() * 2);
1519 for byte in bytes {
1520 use std::fmt::Write as _;
1521 let _ = write!(&mut out, "{:02x}", byte);
1522 }
1523 out
1524}
1525
1526fn estimate_tokens(text: &str) -> u64 {
1527 let chars = text.chars().count() as u64;
1528 (chars / 4).max(1)
1529}
1530
1531fn extract_session_id(event: &EngineEvent) -> Option<String> {
1532 event
1533 .properties
1534 .get("sessionID")
1535 .and_then(|v| v.as_str())
1536 .map(|v| v.to_string())
1537 .or_else(|| {
1538 event
1539 .properties
1540 .get("part")
1541 .and_then(|v| v.get("sessionID"))
1542 .and_then(|v| v.as_str())
1543 .map(|v| v.to_string())
1544 })
1545}
1546
1547fn merge_metadata_usage(
1548 metadata: Option<Value>,
1549 tokens_used: u64,
1550 steps_used: u32,
1551 tool_calls_used: u32,
1552 cost_used_usd: f64,
1553 elapsed_ms: u64,
1554) -> Value {
1555 let mut base = metadata
1556 .and_then(|v| v.as_object().cloned())
1557 .unwrap_or_default();
1558 base.insert(
1559 "budgetUsage".to_string(),
1560 json!({
1561 "tokensUsed": tokens_used,
1562 "stepsUsed": steps_used,
1563 "toolCallsUsed": tool_calls_used,
1564 "costUsedUsd": cost_used_usd,
1565 "elapsedMs": elapsed_ms
1566 }),
1567 );
1568 Value::Object(base)
1569}
1570
1571fn normalize_tool_name(name: &str) -> String {
1572 match name.trim().to_lowercase().replace('-', "_").as_str() {
1573 "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
1574 other => other.to_string(),
1575 }
1576}
1577
1578async fn evaluate_capability_deny(
1579 state: &AppState,
1580 instance: &AgentInstance,
1581 tool: &str,
1582 args: &Value,
1583 caps: &tandem_orchestrator::CapabilitySpec,
1584 session_id: &str,
1585 message_id: &str,
1586) -> Option<String> {
1587 if !caps.tool_denylist.is_empty()
1588 && caps
1589 .tool_denylist
1590 .iter()
1591 .any(|name| normalize_tool_name(name) == *tool)
1592 {
1593 return Some(format!("tool `{tool}` denied by agent capability policy"));
1594 }
1595 if !caps.tool_allowlist.is_empty()
1596 && !caps
1597 .tool_allowlist
1598 .iter()
1599 .any(|name| normalize_tool_name(name) == *tool)
1600 {
1601 return Some(format!("tool `{tool}` not in agent allowlist"));
1602 }
1603
1604 if matches!(tool, "websearch" | "webfetch" | "webfetch_document") {
1605 if !caps.net_scopes.enabled {
1606 return Some("network disabled for this agent instance".to_string());
1607 }
1608 if !caps.net_scopes.allow_hosts.is_empty() {
1609 if tool == "websearch" {
1610 return Some(
1611 "websearch blocked: host allowlist cannot be verified for search tool"
1612 .to_string(),
1613 );
1614 }
1615 if let Some(host) = extract_url_host(args) {
1616 let allowed = caps.net_scopes.allow_hosts.iter().any(|h| {
1617 let allowed = h.trim().to_ascii_lowercase();
1618 !allowed.is_empty()
1619 && (host == allowed || host.ends_with(&format!(".{allowed}")))
1620 });
1621 if !allowed {
1622 return Some(format!("network host `{host}` not in allow_hosts"));
1623 }
1624 }
1625 }
1626 }
1627
1628 if tool == "bash" {
1629 let cmd = args
1630 .get("command")
1631 .and_then(|v| v.as_str())
1632 .unwrap_or("")
1633 .to_ascii_lowercase();
1634 if cmd.contains("git push") {
1635 if !caps.git_caps.push {
1636 return Some("git push disabled for this agent instance".to_string());
1637 }
1638 if caps.git_caps.push_requires_approval {
1639 let action = state.permissions.evaluate("git_push", "git_push").await;
1640 match action {
1641 tandem_core::PermissionAction::Allow => {}
1642 tandem_core::PermissionAction::Deny => {
1643 return Some("git push denied by policy rule".to_string());
1644 }
1645 tandem_core::PermissionAction::Ask => {
1646 let pending = state
1647 .permissions
1648 .ask_for_session_with_context(
1649 Some(session_id),
1650 "git_push",
1651 args.clone(),
1652 Some(tandem_core::PermissionArgsContext {
1653 args_source: "agent_team.git_push".to_string(),
1654 args_integrity: "runtime-checked".to_string(),
1655 query: Some(format!(
1656 "instanceID={} messageID={}",
1657 instance.instance_id, message_id
1658 )),
1659 }),
1660 )
1661 .await;
1662 return Some(format!(
1663 "git push requires explicit user approval (approvalID={})",
1664 pending.id
1665 ));
1666 }
1667 }
1668 }
1669 }
1670 if cmd.contains("git commit") && !caps.git_caps.commit {
1671 return Some("git commit disabled for this agent instance".to_string());
1672 }
1673 }
1674
1675 let access_kind = tool_fs_access_kind(tool);
1676 if let Some(kind) = access_kind {
1677 let Some(session) = state.storage.get_session(session_id).await else {
1678 return Some("session not found for capability evaluation".to_string());
1679 };
1680 let Some(root) = session.workspace_root.clone() else {
1681 return Some("workspace root missing for capability evaluation".to_string());
1682 };
1683 let requested = extract_tool_candidate_paths(tool, args);
1684 if !requested.is_empty() {
1685 let allowed_scopes = if kind == "read" {
1686 &caps.fs_scopes.read
1687 } else {
1688 &caps.fs_scopes.write
1689 };
1690 if allowed_scopes.is_empty() {
1691 return Some(format!("fs {kind} access blocked: no scopes configured"));
1692 }
1693 for candidate in requested {
1694 if !is_path_allowed_by_scopes(&root, &candidate, allowed_scopes) {
1695 return Some(format!(
1696 "fs {kind} access denied for path `{}`",
1697 candidate
1698 ));
1699 }
1700 }
1701 }
1702 }
1703
1704 denied_secrets_reason(tool, caps, args)
1705}
1706
1707fn denied_secrets_reason(
1708 tool: &str,
1709 caps: &tandem_orchestrator::CapabilitySpec,
1710 args: &Value,
1711) -> Option<String> {
1712 if tool == "auth" {
1713 if caps.secrets_scopes.is_empty() {
1714 return Some("secrets are disabled for this agent instance".to_string());
1715 }
1716 let alias = args
1717 .get("id")
1718 .or_else(|| args.get("provider"))
1719 .or_else(|| args.get("providerID"))
1720 .and_then(|v| v.as_str())
1721 .unwrap_or("")
1722 .trim();
1723 if !alias.is_empty() && !caps.secrets_scopes.iter().any(|allowed| allowed == alias) {
1724 return Some(format!(
1725 "secret alias `{alias}` is not in agent secretsScopes allowlist"
1726 ));
1727 }
1728 }
1729 None
1730}
1731
1732fn tool_fs_access_kind(tool: &str) -> Option<&'static str> {
1733 match tool {
1734 "read" | "glob" | "grep" | "codesearch" | "lsp" => Some("read"),
1735 "write" | "edit" | "apply_patch" => Some("write"),
1736 _ => None,
1737 }
1738}
1739
1740fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
1741 let Some(obj) = args.as_object() else {
1742 return Vec::new();
1743 };
1744 let keys: &[&str] = match tool {
1745 "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
1746 "glob" => &["pattern"],
1747 "lsp" => &["filePath", "path"],
1748 "bash" => &["cwd"],
1749 "apply_patch" => &["path"],
1750 _ => &["path", "cwd"],
1751 };
1752 keys.iter()
1753 .filter_map(|key| obj.get(*key))
1754 .filter_map(|value| value.as_str())
1755 .filter(|s| !s.trim().is_empty())
1756 .map(|raw| strip_glob_tokens(raw).to_string())
1757 .collect()
1758}
1759
1760fn strip_glob_tokens(path: &str) -> &str {
1761 let mut end = path.len();
1762 for (idx, ch) in path.char_indices() {
1763 if ch == '*' || ch == '?' || ch == '[' {
1764 end = idx;
1765 break;
1766 }
1767 }
1768 &path[..end]
1769}
1770
1771fn is_path_allowed_by_scopes(root: &str, candidate: &str, scopes: &[String]) -> bool {
1772 let root_path = PathBuf::from(root);
1773 let candidate_path = resolve_path(&root_path, candidate);
1774 scopes.iter().any(|scope| {
1775 let scope_path = resolve_path(&root_path, scope);
1776 candidate_path.starts_with(scope_path)
1777 })
1778}
1779
1780fn resolve_path(root: &Path, raw: &str) -> PathBuf {
1781 let raw = raw.trim();
1782 if raw.is_empty() {
1783 return root.to_path_buf();
1784 }
1785 let path = PathBuf::from(raw);
1786 if path.is_absolute() {
1787 path
1788 } else {
1789 root.join(path)
1790 }
1791}
1792
1793fn extract_url_host(args: &Value) -> Option<String> {
1794 let url = args
1795 .get("url")
1796 .or_else(|| args.get("uri"))
1797 .or_else(|| args.get("link"))
1798 .and_then(|v| v.as_str())?;
1799 let raw = url.trim();
1800 let (_, after_scheme) = raw.split_once("://")?;
1801 let host_port = after_scheme.split('/').next().unwrap_or_default();
1802 let host = host_port.split('@').next_back().unwrap_or_default();
1803 let host = host.split(':').next().unwrap_or_default().to_ascii_lowercase();
1804 if host.is_empty() {
1805 None
1806 } else {
1807 Some(host)
1808 }
1809}
1810
1811pub fn emit_spawn_requested(state: &AppState, req: &SpawnRequest) {
1812 emit_spawn_requested_with_context(state, req, &SpawnEventContext::default());
1813}
1814
1815pub fn emit_spawn_denied(state: &AppState, req: &SpawnRequest, decision: &SpawnDecision) {
1816 emit_spawn_denied_with_context(state, req, decision, &SpawnEventContext::default());
1817}
1818
1819pub fn emit_spawn_approved(state: &AppState, req: &SpawnRequest, instance: &AgentInstance) {
1820 emit_spawn_approved_with_context(state, req, instance, &SpawnEventContext::default());
1821}
1822
1823#[derive(Default)]
1824pub struct SpawnEventContext<'a> {
1825 pub session_id: Option<&'a str>,
1826 pub message_id: Option<&'a str>,
1827 pub run_id: Option<&'a str>,
1828}
1829
1830pub fn emit_spawn_requested_with_context(
1831 state: &AppState,
1832 req: &SpawnRequest,
1833 ctx: &SpawnEventContext<'_>,
1834) {
1835 state.event_bus.publish(EngineEvent::new(
1836 "agent_team.spawn.requested",
1837 json!({
1838 "sessionID": ctx.session_id,
1839 "messageID": ctx.message_id,
1840 "runID": ctx.run_id,
1841 "missionID": req.mission_id,
1842 "instanceID": Value::Null,
1843 "parentInstanceID": req.parent_instance_id,
1844 "source": req.source,
1845 "requestedRole": req.role,
1846 "templateID": req.template_id,
1847 "justification": req.justification,
1848 "timestampMs": crate::now_ms(),
1849 }),
1850 ));
1851}
1852
1853pub fn emit_spawn_denied_with_context(
1854 state: &AppState,
1855 req: &SpawnRequest,
1856 decision: &SpawnDecision,
1857 ctx: &SpawnEventContext<'_>,
1858) {
1859 state.event_bus.publish(EngineEvent::new(
1860 "agent_team.spawn.denied",
1861 json!({
1862 "sessionID": ctx.session_id,
1863 "messageID": ctx.message_id,
1864 "runID": ctx.run_id,
1865 "missionID": req.mission_id,
1866 "instanceID": Value::Null,
1867 "parentInstanceID": req.parent_instance_id,
1868 "source": req.source,
1869 "requestedRole": req.role,
1870 "templateID": req.template_id,
1871 "code": decision.code,
1872 "error": decision.reason,
1873 "timestampMs": crate::now_ms(),
1874 }),
1875 ));
1876}
1877
1878pub fn emit_spawn_approved_with_context(
1879 state: &AppState,
1880 req: &SpawnRequest,
1881 instance: &AgentInstance,
1882 ctx: &SpawnEventContext<'_>,
1883) {
1884 state.event_bus.publish(EngineEvent::new(
1885 "agent_team.spawn.approved",
1886 json!({
1887 "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
1888 "messageID": ctx.message_id,
1889 "runID": ctx.run_id.or(instance.run_id.as_deref()),
1890 "missionID": instance.mission_id,
1891 "instanceID": instance.instance_id,
1892 "parentInstanceID": instance.parent_instance_id,
1893 "source": req.source,
1894 "requestedRole": req.role,
1895 "templateID": instance.template_id,
1896 "skillHash": instance.skill_hash,
1897 "timestampMs": crate::now_ms(),
1898 }),
1899 ));
1900 state.event_bus.publish(EngineEvent::new(
1901 "agent_team.instance.started",
1902 json!({
1903 "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
1904 "messageID": ctx.message_id,
1905 "runID": ctx.run_id.or(instance.run_id.as_deref()),
1906 "missionID": instance.mission_id,
1907 "instanceID": instance.instance_id,
1908 "parentInstanceID": instance.parent_instance_id,
1909 "role": instance.role,
1910 "status": instance.status,
1911 "budgetLimit": instance.budget,
1912 "skillHash": instance.skill_hash,
1913 "timestampMs": crate::now_ms(),
1914 }),
1915 ));
1916}
1917
1918pub fn emit_budget_usage(
1919 state: &AppState,
1920 instance: &AgentInstance,
1921 tokens_used: u64,
1922 steps_used: u32,
1923 tool_calls_used: u32,
1924 cost_used_usd: f64,
1925 elapsed_ms: u64,
1926) {
1927 state.event_bus.publish(EngineEvent::new(
1928 "agent_team.budget.usage",
1929 json!({
1930 "sessionID": instance.session_id,
1931 "messageID": Value::Null,
1932 "runID": instance.run_id,
1933 "missionID": instance.mission_id,
1934 "instanceID": instance.instance_id,
1935 "tokensUsed": tokens_used,
1936 "stepsUsed": steps_used,
1937 "toolCallsUsed": tool_calls_used,
1938 "costUsedUsd": cost_used_usd,
1939 "elapsedMs": elapsed_ms,
1940 "timestampMs": crate::now_ms(),
1941 }),
1942 ));
1943}
1944
1945pub fn emit_budget_exhausted(
1946 state: &AppState,
1947 instance: &AgentInstance,
1948 exhausted_by: &str,
1949 tokens_used: u64,
1950 steps_used: u32,
1951 tool_calls_used: u32,
1952 cost_used_usd: f64,
1953 elapsed_ms: u64,
1954) {
1955 state.event_bus.publish(EngineEvent::new(
1956 "agent_team.budget.exhausted",
1957 json!({
1958 "sessionID": instance.session_id,
1959 "messageID": Value::Null,
1960 "runID": instance.run_id,
1961 "missionID": instance.mission_id,
1962 "instanceID": instance.instance_id,
1963 "exhaustedBy": exhausted_by,
1964 "tokensUsed": tokens_used,
1965 "stepsUsed": steps_used,
1966 "toolCallsUsed": tool_calls_used,
1967 "costUsedUsd": cost_used_usd,
1968 "elapsedMs": elapsed_ms,
1969 "timestampMs": crate::now_ms(),
1970 }),
1971 ));
1972}
1973
1974pub fn emit_instance_cancelled(state: &AppState, instance: &AgentInstance, reason: &str) {
1975 state.event_bus.publish(EngineEvent::new(
1976 "agent_team.instance.cancelled",
1977 json!({
1978 "sessionID": instance.session_id,
1979 "messageID": Value::Null,
1980 "runID": instance.run_id,
1981 "missionID": instance.mission_id,
1982 "instanceID": instance.instance_id,
1983 "parentInstanceID": instance.parent_instance_id,
1984 "role": instance.role,
1985 "status": instance.status,
1986 "reason": reason,
1987 "timestampMs": crate::now_ms(),
1988 }),
1989 ));
1990}
1991
1992pub fn emit_instance_completed(state: &AppState, instance: &AgentInstance) {
1993 state.event_bus.publish(EngineEvent::new(
1994 "agent_team.instance.completed",
1995 json!({
1996 "sessionID": instance.session_id,
1997 "messageID": Value::Null,
1998 "runID": instance.run_id,
1999 "missionID": instance.mission_id,
2000 "instanceID": instance.instance_id,
2001 "parentInstanceID": instance.parent_instance_id,
2002 "role": instance.role,
2003 "status": instance.status,
2004 "timestampMs": crate::now_ms(),
2005 }),
2006 ));
2007}
2008
2009pub fn emit_instance_failed(state: &AppState, instance: &AgentInstance) {
2010 state.event_bus.publish(EngineEvent::new(
2011 "agent_team.instance.failed",
2012 json!({
2013 "sessionID": instance.session_id,
2014 "messageID": Value::Null,
2015 "runID": instance.run_id,
2016 "missionID": instance.mission_id,
2017 "instanceID": instance.instance_id,
2018 "parentInstanceID": instance.parent_instance_id,
2019 "role": instance.role,
2020 "status": instance.status,
2021 "timestampMs": crate::now_ms(),
2022 }),
2023 ));
2024}
2025
2026pub fn emit_mission_budget_exhausted(
2027 state: &AppState,
2028 mission_id: &str,
2029 exhausted_by: &str,
2030 tokens_used: u64,
2031 steps_used: u64,
2032 tool_calls_used: u64,
2033 cost_used_usd: f64,
2034) {
2035 state.event_bus.publish(EngineEvent::new(
2036 "agent_team.mission.budget.exhausted",
2037 json!({
2038 "sessionID": Value::Null,
2039 "messageID": Value::Null,
2040 "runID": Value::Null,
2041 "missionID": mission_id,
2042 "instanceID": Value::Null,
2043 "exhaustedBy": exhausted_by,
2044 "tokensUsed": tokens_used,
2045 "stepsUsed": steps_used,
2046 "toolCallsUsed": tool_calls_used,
2047 "costUsedUsd": cost_used_usd,
2048 "timestampMs": crate::now_ms(),
2049 }),
2050 ));
2051}