1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::Context;
7use futures::future::BoxFuture;
8use serde::Deserialize;
9use serde::Serialize;
10use serde_json::{json, Value};
11use tandem_core::{
12 SpawnAgentHook, SpawnAgentToolContext, SpawnAgentToolResult, ToolPolicyContext,
13 ToolPolicyDecision, ToolPolicyHook,
14};
15use tandem_orchestrator::{
16 AgentInstance, AgentInstanceStatus, AgentRole, AgentTemplate, BudgetLimit, SpawnDecision,
17 SpawnDenyCode, SpawnPolicy, SpawnRequest, SpawnSource,
18};
19use tandem_skills::SkillService;
20use tandem_types::{EngineEvent, Session};
21use tokio::fs;
22use tokio::sync::RwLock;
23use uuid::Uuid;
24
25use crate::AppState;
26
27#[derive(Clone, Default)]
28pub struct AgentTeamRuntime {
29 policy: Arc<RwLock<Option<SpawnPolicy>>>,
30 templates: Arc<RwLock<HashMap<String, AgentTemplate>>>,
31 instances: Arc<RwLock<HashMap<String, AgentInstance>>>,
32 budgets: Arc<RwLock<HashMap<String, InstanceBudgetState>>>,
33 mission_budgets: Arc<RwLock<HashMap<String, MissionBudgetState>>>,
34 spawn_approvals: Arc<RwLock<HashMap<String, PendingSpawnApproval>>>,
35 loaded_workspace: Arc<RwLock<Option<String>>>,
36 audit_path: Arc<RwLock<PathBuf>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct SpawnResult {
41 pub decision: SpawnDecision,
42 pub instance: Option<AgentInstance>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46pub struct AgentMissionSummary {
47 #[serde(rename = "missionID")]
48 pub mission_id: String,
49 #[serde(rename = "instanceCount")]
50 pub instance_count: usize,
51 #[serde(rename = "runningCount")]
52 pub running_count: usize,
53 #[serde(rename = "completedCount")]
54 pub completed_count: usize,
55 #[serde(rename = "failedCount")]
56 pub failed_count: usize,
57 #[serde(rename = "cancelledCount")]
58 pub cancelled_count: usize,
59 #[serde(rename = "queuedCount")]
60 pub queued_count: usize,
61 #[serde(rename = "tokenUsedTotal")]
62 pub token_used_total: u64,
63 #[serde(rename = "toolCallsUsedTotal")]
64 pub tool_calls_used_total: u64,
65 #[serde(rename = "stepsUsedTotal")]
66 pub steps_used_total: u64,
67 #[serde(rename = "costUsedUsdTotal")]
68 pub cost_used_usd_total: f64,
69}
70
71#[derive(Debug, Clone, Default)]
72struct InstanceBudgetState {
73 tokens_used: u64,
74 steps_used: u32,
75 tool_calls_used: u32,
76 cost_used_usd: f64,
77 started_at: Option<Instant>,
78 exhausted: bool,
79}
80
81#[derive(Debug, Clone, Default)]
82struct MissionBudgetState {
83 tokens_used: u64,
84 steps_used: u64,
85 tool_calls_used: u64,
86 cost_used_usd: f64,
87 exhausted: bool,
88}
89
90#[derive(Debug, Clone, Serialize)]
91pub struct PendingSpawnApproval {
92 #[serde(rename = "approvalID")]
93 pub approval_id: String,
94 #[serde(rename = "createdAtMs")]
95 pub created_at_ms: u64,
96 pub request: SpawnRequest,
97 #[serde(rename = "decisionCode")]
98 pub decision_code: Option<SpawnDenyCode>,
99 pub reason: Option<String>,
100}
101
102#[derive(Clone)]
103pub struct ServerSpawnAgentHook {
104 state: AppState,
105}
106
107#[derive(Debug, Deserialize)]
108struct SpawnAgentToolInput {
109 #[serde(rename = "missionID")]
110 mission_id: Option<String>,
111 #[serde(rename = "parentInstanceID")]
112 parent_instance_id: Option<String>,
113 #[serde(rename = "templateID")]
114 template_id: Option<String>,
115 role: AgentRole,
116 source: Option<SpawnSource>,
117 justification: String,
118 #[serde(rename = "budgetOverride", default)]
119 budget_override: Option<BudgetLimit>,
120}
121
122impl ServerSpawnAgentHook {
123 pub fn new(state: AppState) -> Self {
124 Self { state }
125 }
126}
127
128impl SpawnAgentHook for ServerSpawnAgentHook {
129 fn spawn_agent(
130 &self,
131 ctx: SpawnAgentToolContext,
132 ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>> {
133 let state = self.state.clone();
134 Box::pin(async move {
135 let parsed = serde_json::from_value::<SpawnAgentToolInput>(ctx.args.clone());
136 let input = match parsed {
137 Ok(input) => input,
138 Err(err) => {
139 return Ok(SpawnAgentToolResult {
140 output: format!("spawn_agent denied: invalid args ({err})"),
141 metadata: json!({
142 "ok": false,
143 "code": "SPAWN_INVALID_ARGS",
144 "error": err.to_string(),
145 }),
146 });
147 }
148 };
149 let req = SpawnRequest {
150 mission_id: input.mission_id,
151 parent_instance_id: input.parent_instance_id,
152 source: input.source.unwrap_or(SpawnSource::ToolCall),
153 parent_role: None,
154 role: input.role,
155 template_id: input.template_id,
156 justification: input.justification,
157 budget_override: input.budget_override,
158 };
159
160 let event_ctx = SpawnEventContext {
161 session_id: Some(ctx.session_id.as_str()),
162 message_id: Some(ctx.message_id.as_str()),
163 run_id: None,
164 };
165 emit_spawn_requested_with_context(&state, &req, &event_ctx);
166 let result = state.agent_teams.spawn(&state, req.clone()).await;
167 if !result.decision.allowed || result.instance.is_none() {
168 emit_spawn_denied_with_context(&state, &req, &result.decision, &event_ctx);
169 return Ok(SpawnAgentToolResult {
170 output: result
171 .decision
172 .reason
173 .clone()
174 .unwrap_or_else(|| "spawn_agent denied".to_string()),
175 metadata: json!({
176 "ok": false,
177 "code": result.decision.code,
178 "error": result.decision.reason,
179 "requiresUserApproval": result.decision.requires_user_approval,
180 }),
181 });
182 }
183 let instance = result.instance.expect("checked is_some");
184 emit_spawn_approved_with_context(&state, &req, &instance, &event_ctx);
185 Ok(SpawnAgentToolResult {
186 output: format!(
187 "spawned {} as instance {} (session {})",
188 instance.template_id, instance.instance_id, instance.session_id
189 ),
190 metadata: json!({
191 "ok": true,
192 "missionID": instance.mission_id,
193 "instanceID": instance.instance_id,
194 "sessionID": instance.session_id,
195 "runID": instance.run_id,
196 "status": instance.status,
197 "skillHash": instance.skill_hash,
198 }),
199 })
200 })
201 }
202}
203
204#[derive(Clone)]
205pub struct ServerToolPolicyHook {
206 state: AppState,
207}
208
209impl ServerToolPolicyHook {
210 pub fn new(state: AppState) -> Self {
211 Self { state }
212 }
213}
214
215impl ToolPolicyHook for ServerToolPolicyHook {
216 fn evaluate_tool(
217 &self,
218 ctx: ToolPolicyContext,
219 ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>> {
220 let state = self.state.clone();
221 Box::pin(async move {
222 let tool = normalize_tool_name(&ctx.tool);
223 if let Some(policy) = state.routine_session_policy(&ctx.session_id).await {
224 if !policy.allowed_tools.is_empty()
225 && !policy
226 .allowed_tools
227 .iter()
228 .any(|name| normalize_tool_name(name) == tool)
229 {
230 let reason = format!(
231 "tool `{}` is not allowed for routine `{}` (run `{}`)",
232 tool, policy.routine_id, policy.run_id
233 );
234 state.event_bus.publish(EngineEvent::new(
235 "routine.tool.denied",
236 json!({
237 "sessionID": ctx.session_id,
238 "messageID": ctx.message_id,
239 "runID": policy.run_id,
240 "routineID": policy.routine_id,
241 "tool": tool,
242 "reason": reason,
243 "timestampMs": crate::now_ms(),
244 }),
245 ));
246 return Ok(ToolPolicyDecision {
247 allowed: false,
248 reason: Some(reason),
249 });
250 }
251 }
252
253 let Some(instance) = state
254 .agent_teams
255 .instance_for_session(&ctx.session_id)
256 .await
257 else {
258 return Ok(ToolPolicyDecision {
259 allowed: true,
260 reason: None,
261 });
262 };
263 let caps = instance.capabilities.clone();
264 let deny = evaluate_capability_deny(
265 &state,
266 &instance,
267 &tool,
268 &ctx.args,
269 &caps,
270 &ctx.session_id,
271 &ctx.message_id,
272 )
273 .await;
274 if let Some(reason) = deny {
275 state.event_bus.publish(EngineEvent::new(
276 "agent_team.capability.denied",
277 json!({
278 "sessionID": ctx.session_id,
279 "messageID": ctx.message_id,
280 "runID": instance.run_id,
281 "missionID": instance.mission_id,
282 "instanceID": instance.instance_id,
283 "tool": tool,
284 "reason": reason,
285 "timestampMs": crate::now_ms(),
286 }),
287 ));
288 return Ok(ToolPolicyDecision {
289 allowed: false,
290 reason: Some(reason),
291 });
292 }
293 Ok(ToolPolicyDecision {
294 allowed: true,
295 reason: None,
296 })
297 })
298 }
299}
300
301impl AgentTeamRuntime {
302 pub fn new(audit_path: PathBuf) -> Self {
303 Self {
304 policy: Arc::new(RwLock::new(None)),
305 templates: Arc::new(RwLock::new(HashMap::new())),
306 instances: Arc::new(RwLock::new(HashMap::new())),
307 budgets: Arc::new(RwLock::new(HashMap::new())),
308 mission_budgets: Arc::new(RwLock::new(HashMap::new())),
309 spawn_approvals: Arc::new(RwLock::new(HashMap::new())),
310 loaded_workspace: Arc::new(RwLock::new(None)),
311 audit_path: Arc::new(RwLock::new(audit_path)),
312 }
313 }
314
315 pub async fn set_audit_path(&self, path: PathBuf) {
316 *self.audit_path.write().await = path;
317 }
318
319 pub async fn list_templates(&self) -> Vec<AgentTemplate> {
320 let mut rows = self
321 .templates
322 .read()
323 .await
324 .values()
325 .cloned()
326 .collect::<Vec<_>>();
327 rows.sort_by(|a, b| a.template_id.cmp(&b.template_id));
328 rows
329 }
330
331 pub async fn list_instances(
332 &self,
333 mission_id: Option<&str>,
334 parent_instance_id: Option<&str>,
335 status: Option<AgentInstanceStatus>,
336 ) -> Vec<AgentInstance> {
337 let mut rows = self
338 .instances
339 .read()
340 .await
341 .values()
342 .filter(|instance| {
343 if let Some(mission_id) = mission_id {
344 if instance.mission_id != mission_id {
345 return false;
346 }
347 }
348 if let Some(parent_id) = parent_instance_id {
349 if instance.parent_instance_id.as_deref() != Some(parent_id) {
350 return false;
351 }
352 }
353 if let Some(status) = &status {
354 if &instance.status != status {
355 return false;
356 }
357 }
358 true
359 })
360 .cloned()
361 .collect::<Vec<_>>();
362 rows.sort_by(|a, b| a.instance_id.cmp(&b.instance_id));
363 rows
364 }
365
366 pub async fn list_mission_summaries(&self) -> Vec<AgentMissionSummary> {
367 let instances = self.instances.read().await;
368 let mut by_mission: HashMap<String, AgentMissionSummary> = HashMap::new();
369 for instance in instances.values() {
370 let row = by_mission
371 .entry(instance.mission_id.clone())
372 .or_insert_with(|| AgentMissionSummary {
373 mission_id: instance.mission_id.clone(),
374 instance_count: 0,
375 running_count: 0,
376 completed_count: 0,
377 failed_count: 0,
378 cancelled_count: 0,
379 queued_count: 0,
380 token_used_total: 0,
381 tool_calls_used_total: 0,
382 steps_used_total: 0,
383 cost_used_usd_total: 0.0,
384 });
385 row.instance_count = row.instance_count.saturating_add(1);
386 match instance.status {
387 AgentInstanceStatus::Queued => {
388 row.queued_count = row.queued_count.saturating_add(1)
389 }
390 AgentInstanceStatus::Running => {
391 row.running_count = row.running_count.saturating_add(1)
392 }
393 AgentInstanceStatus::Completed => {
394 row.completed_count = row.completed_count.saturating_add(1)
395 }
396 AgentInstanceStatus::Failed => {
397 row.failed_count = row.failed_count.saturating_add(1)
398 }
399 AgentInstanceStatus::Cancelled => {
400 row.cancelled_count = row.cancelled_count.saturating_add(1)
401 }
402 }
403 if let Some(usage) = instance
404 .metadata
405 .as_ref()
406 .and_then(|m| m.get("budgetUsage"))
407 .and_then(|u| u.as_object())
408 {
409 row.token_used_total = row.token_used_total.saturating_add(
410 usage
411 .get("tokensUsed")
412 .and_then(|v| v.as_u64())
413 .unwrap_or(0),
414 );
415 row.tool_calls_used_total = row.tool_calls_used_total.saturating_add(
416 usage
417 .get("toolCallsUsed")
418 .and_then(|v| v.as_u64())
419 .unwrap_or(0),
420 );
421 row.steps_used_total = row
422 .steps_used_total
423 .saturating_add(usage.get("stepsUsed").and_then(|v| v.as_u64()).unwrap_or(0));
424 row.cost_used_usd_total += usage
425 .get("costUsedUsd")
426 .and_then(|v| v.as_f64())
427 .unwrap_or(0.0);
428 }
429 }
430 let mut rows = by_mission.into_values().collect::<Vec<_>>();
431 rows.sort_by(|a, b| a.mission_id.cmp(&b.mission_id));
432 rows
433 }
434
435 pub async fn instance_for_session(&self, session_id: &str) -> Option<AgentInstance> {
436 self.instances
437 .read()
438 .await
439 .values()
440 .find(|instance| instance.session_id == session_id)
441 .cloned()
442 }
443
444 pub async fn list_spawn_approvals(&self) -> Vec<PendingSpawnApproval> {
445 let mut rows = self
446 .spawn_approvals
447 .read()
448 .await
449 .values()
450 .cloned()
451 .collect::<Vec<_>>();
452 rows.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
453 rows
454 }
455
456 pub async fn ensure_loaded_for_workspace(&self, workspace_root: &str) -> anyhow::Result<()> {
457 let normalized = workspace_root.trim().to_string();
458 let already_loaded = self
459 .loaded_workspace
460 .read()
461 .await
462 .as_ref()
463 .map(|s| s == &normalized)
464 .unwrap_or(false);
465 if already_loaded {
466 return Ok(());
467 }
468
469 let root = PathBuf::from(&normalized);
470 let policy_path = root
471 .join(".tandem")
472 .join("agent-team")
473 .join("spawn-policy.yaml");
474 let templates_dir = root.join(".tandem").join("agent-team").join("templates");
475
476 let mut next_policy = None;
477 if policy_path.exists() {
478 let raw = fs::read_to_string(&policy_path)
479 .await
480 .with_context(|| format!("failed reading {}", policy_path.display()))?;
481 let parsed = serde_yaml::from_str::<SpawnPolicy>(&raw)
482 .with_context(|| format!("failed parsing {}", policy_path.display()))?;
483 next_policy = Some(parsed);
484 }
485
486 let mut next_templates = HashMap::new();
487 if templates_dir.exists() {
488 let mut entries = fs::read_dir(&templates_dir).await?;
489 while let Some(entry) = entries.next_entry().await? {
490 let path = entry.path();
491 if !path.is_file() {
492 continue;
493 }
494 let ext = path
495 .extension()
496 .and_then(|v| v.to_str())
497 .unwrap_or_default()
498 .to_ascii_lowercase();
499 if ext != "yaml" && ext != "yml" && ext != "json" {
500 continue;
501 }
502 let raw = fs::read_to_string(&path).await?;
503 let template = serde_yaml::from_str::<AgentTemplate>(&raw)
504 .with_context(|| format!("failed parsing {}", path.display()))?;
505 next_templates.insert(template.template_id.clone(), template);
506 }
507 }
508
509 *self.policy.write().await = next_policy;
510 *self.templates.write().await = next_templates;
511 *self.loaded_workspace.write().await = Some(normalized);
512 Ok(())
513 }
514
515 pub async fn spawn(&self, state: &AppState, req: SpawnRequest) -> SpawnResult {
516 self.spawn_with_approval_override(state, req, false).await
517 }
518
519 async fn spawn_with_approval_override(
520 &self,
521 state: &AppState,
522 mut req: SpawnRequest,
523 approval_override: bool,
524 ) -> SpawnResult {
525 let workspace_root = state.workspace_index.snapshot().await.root;
526 if let Err(err) = self.ensure_loaded_for_workspace(&workspace_root).await {
527 return SpawnResult {
528 decision: SpawnDecision {
529 allowed: false,
530 code: Some(SpawnDenyCode::SpawnPolicyMissing),
531 reason: Some(format!("spawn policy load failed: {}", err)),
532 requires_user_approval: false,
533 },
534 instance: None,
535 };
536 }
537
538 let Some(policy) = self.policy.read().await.clone() else {
539 return SpawnResult {
540 decision: SpawnDecision {
541 allowed: false,
542 code: Some(SpawnDenyCode::SpawnPolicyMissing),
543 reason: Some("spawn policy file missing".to_string()),
544 requires_user_approval: false,
545 },
546 instance: None,
547 };
548 };
549
550 let template = {
551 let templates = self.templates.read().await;
552 req.template_id
553 .as_deref()
554 .and_then(|template_id| templates.get(template_id).cloned())
555 };
556 if req.template_id.is_none() {
557 if let Some(found) = self
558 .templates
559 .read()
560 .await
561 .values()
562 .find(|t| t.role == req.role)
563 .cloned()
564 {
565 req.template_id = Some(found.template_id.clone());
566 }
567 }
568 let template = if template.is_some() {
569 template
570 } else {
571 let templates = self.templates.read().await;
572 req.template_id
573 .as_deref()
574 .and_then(|id| templates.get(id).cloned())
575 };
576
577 if req.parent_role.is_none() {
578 if let Some(parent_id) = req.parent_instance_id.as_deref() {
579 let instances = self.instances.read().await;
580 req.parent_role = instances
581 .get(parent_id)
582 .map(|instance| instance.role.clone());
583 }
584 }
585
586 let instances = self.instances.read().await;
587 let total_agents = instances.len();
588 let running_agents = instances
589 .values()
590 .filter(|instance| instance.status == AgentInstanceStatus::Running)
591 .count();
592 drop(instances);
593
594 let mut decision = policy.evaluate(&req, total_agents, running_agents, template.as_ref());
595 if approval_override
596 && !decision.allowed
597 && decision.requires_user_approval
598 && matches!(decision.code, Some(SpawnDenyCode::SpawnRequiresApproval))
599 {
600 decision = SpawnDecision {
601 allowed: true,
602 code: None,
603 reason: None,
604 requires_user_approval: false,
605 };
606 }
607 if !decision.allowed {
608 if decision.requires_user_approval && !approval_override {
609 self.queue_spawn_approval(&req, &decision).await;
610 }
611 return SpawnResult {
612 decision,
613 instance: None,
614 };
615 }
616
617 let mission_id = req
618 .mission_id
619 .clone()
620 .unwrap_or_else(|| "mission-default".to_string());
621
622 if let Some(reason) = self
623 .mission_budget_exceeded_reason(&policy, &mission_id)
624 .await
625 {
626 return SpawnResult {
627 decision: SpawnDecision {
628 allowed: false,
629 code: Some(SpawnDenyCode::SpawnMissionBudgetExceeded),
630 reason: Some(reason),
631 requires_user_approval: false,
632 },
633 instance: None,
634 };
635 }
636
637 let template = template.unwrap_or_else(|| AgentTemplate {
638 template_id: "default-template".to_string(),
639 role: req.role.clone(),
640 system_prompt: None,
641 skills: Vec::new(),
642 default_budget: BudgetLimit::default(),
643 capabilities: Default::default(),
644 });
645
646 let skill_hash = match compute_skill_hash(&workspace_root, &template, &policy).await {
647 Ok(hash) => hash,
648 Err(err) => {
649 let lowered = err.to_ascii_lowercase();
650 let code = if lowered.contains("pinned hash mismatch") {
651 SpawnDenyCode::SpawnSkillHashMismatch
652 } else if lowered.contains("skill source denied") {
653 SpawnDenyCode::SpawnSkillSourceDenied
654 } else {
655 SpawnDenyCode::SpawnRequiredSkillMissing
656 };
657 return SpawnResult {
658 decision: SpawnDecision {
659 allowed: false,
660 code: Some(code),
661 reason: Some(err),
662 requires_user_approval: false,
663 },
664 instance: None,
665 };
666 }
667 };
668
669 let parent_snapshot = {
670 let instances = self.instances.read().await;
671 req.parent_instance_id
672 .as_deref()
673 .and_then(|id| instances.get(id).cloned())
674 };
675 let parent_usage = if let Some(parent_id) = req.parent_instance_id.as_deref() {
676 self.budgets.read().await.get(parent_id).cloned()
677 } else {
678 None
679 };
680
681 let budget = resolve_budget(
682 &policy,
683 parent_snapshot,
684 parent_usage,
685 &template,
686 req.budget_override.clone(),
687 &req.role,
688 );
689
690 let mut session = Session::new(
691 Some(format!("Agent Team {}", template.template_id)),
692 Some(workspace_root.clone()),
693 );
694 session.workspace_root = Some(workspace_root.clone());
695 let session_id = session.id.clone();
696 if let Err(err) = state.storage.save_session(session).await {
697 return SpawnResult {
698 decision: SpawnDecision {
699 allowed: false,
700 code: Some(SpawnDenyCode::SpawnPolicyMissing),
701 reason: Some(format!("failed creating child session: {}", err)),
702 requires_user_approval: false,
703 },
704 instance: None,
705 };
706 }
707
708 let instance = AgentInstance {
709 instance_id: format!("ins_{}", Uuid::new_v4().simple()),
710 mission_id: mission_id.clone(),
711 parent_instance_id: req.parent_instance_id.clone(),
712 role: template.role.clone(),
713 template_id: template.template_id.clone(),
714 session_id: session_id.clone(),
715 run_id: None,
716 status: AgentInstanceStatus::Running,
717 budget,
718 skill_hash: skill_hash.clone(),
719 capabilities: template.capabilities.clone(),
720 metadata: Some(json!({
721 "source": req.source,
722 "justification": req.justification,
723 })),
724 };
725
726 self.instances
727 .write()
728 .await
729 .insert(instance.instance_id.clone(), instance.clone());
730 self.budgets.write().await.insert(
731 instance.instance_id.clone(),
732 InstanceBudgetState {
733 started_at: Some(Instant::now()),
734 ..InstanceBudgetState::default()
735 },
736 );
737 let _ = self.append_audit("spawn.approved", &instance).await;
738
739 SpawnResult {
740 decision: SpawnDecision {
741 allowed: true,
742 code: None,
743 reason: None,
744 requires_user_approval: false,
745 },
746 instance: Some(instance),
747 }
748 }
749
750 pub async fn approve_spawn_approval(
751 &self,
752 state: &AppState,
753 approval_id: &str,
754 reason: Option<&str>,
755 ) -> Option<SpawnResult> {
756 let approval = self.spawn_approvals.write().await.remove(approval_id)?;
757 let result = self
758 .spawn_with_approval_override(state, approval.request.clone(), true)
759 .await;
760 if let Some(instance) = result.instance.as_ref() {
761 let note = reason.unwrap_or("approved by operator");
762 let _ = self
763 .append_approval_audit("spawn.approval.approved", approval_id, Some(instance), note)
764 .await;
765 } else {
766 let note = reason.unwrap_or("approval replay failed policy checks");
767 let _ = self
768 .append_approval_audit("spawn.approval.rejected_on_replay", approval_id, None, note)
769 .await;
770 }
771 Some(result)
772 }
773
774 pub async fn deny_spawn_approval(
775 &self,
776 approval_id: &str,
777 reason: Option<&str>,
778 ) -> Option<PendingSpawnApproval> {
779 let approval = self.spawn_approvals.write().await.remove(approval_id)?;
780 let note = reason.unwrap_or("denied by operator");
781 let _ = self
782 .append_approval_audit("spawn.approval.denied", approval_id, None, note)
783 .await;
784 Some(approval)
785 }
786
787 pub async fn cancel_instance(
788 &self,
789 state: &AppState,
790 instance_id: &str,
791 reason: &str,
792 ) -> Option<AgentInstance> {
793 let mut instances = self.instances.write().await;
794 let instance = instances.get_mut(instance_id)?;
795 if matches!(
796 instance.status,
797 AgentInstanceStatus::Completed
798 | AgentInstanceStatus::Failed
799 | AgentInstanceStatus::Cancelled
800 ) {
801 return Some(instance.clone());
802 }
803 instance.status = AgentInstanceStatus::Cancelled;
804 let snapshot = instance.clone();
805 drop(instances);
806 let _ = state.cancellations.cancel(&snapshot.session_id).await;
807 let _ = self.append_audit("instance.cancelled", &snapshot).await;
808 emit_instance_cancelled(state, &snapshot, reason);
809 Some(snapshot)
810 }
811
812 async fn queue_spawn_approval(&self, req: &SpawnRequest, decision: &SpawnDecision) {
813 let approval = PendingSpawnApproval {
814 approval_id: format!("spawn_{}", Uuid::new_v4().simple()),
815 created_at_ms: crate::now_ms(),
816 request: req.clone(),
817 decision_code: decision.code.clone(),
818 reason: decision.reason.clone(),
819 };
820 self.spawn_approvals
821 .write()
822 .await
823 .insert(approval.approval_id.clone(), approval);
824 }
825
826 async fn mission_budget_exceeded_reason(
827 &self,
828 policy: &SpawnPolicy,
829 mission_id: &str,
830 ) -> Option<String> {
831 let limit = policy.mission_total_budget.as_ref()?;
832 let usage = self
833 .mission_budgets
834 .read()
835 .await
836 .get(mission_id)
837 .cloned()
838 .unwrap_or_default();
839 if let Some(max) = limit.max_tokens {
840 if usage.tokens_used >= max {
841 return Some(format!(
842 "mission max_tokens exhausted ({}/{})",
843 usage.tokens_used, max
844 ));
845 }
846 }
847 if let Some(max) = limit.max_steps {
848 if usage.steps_used >= u64::from(max) {
849 return Some(format!(
850 "mission max_steps exhausted ({}/{})",
851 usage.steps_used, max
852 ));
853 }
854 }
855 if let Some(max) = limit.max_tool_calls {
856 if usage.tool_calls_used >= u64::from(max) {
857 return Some(format!(
858 "mission max_tool_calls exhausted ({}/{})",
859 usage.tool_calls_used, max
860 ));
861 }
862 }
863 if let Some(max) = limit.max_cost_usd {
864 if usage.cost_used_usd >= max {
865 return Some(format!(
866 "mission max_cost_usd exhausted ({:.6}/{:.6})",
867 usage.cost_used_usd, max
868 ));
869 }
870 }
871 None
872 }
873
874 pub async fn cancel_mission(&self, state: &AppState, mission_id: &str, reason: &str) -> usize {
875 let instance_ids = self
876 .instances
877 .read()
878 .await
879 .values()
880 .filter(|instance| instance.mission_id == mission_id)
881 .map(|instance| instance.instance_id.clone())
882 .collect::<Vec<_>>();
883 let mut count = 0usize;
884 for instance_id in instance_ids {
885 if self
886 .cancel_instance(state, &instance_id, reason)
887 .await
888 .is_some()
889 {
890 count = count.saturating_add(1);
891 }
892 }
893 count
894 }
895
896 async fn mark_instance_terminal(
897 &self,
898 state: &AppState,
899 instance_id: &str,
900 status: AgentInstanceStatus,
901 ) -> Option<AgentInstance> {
902 let mut instances = self.instances.write().await;
903 let instance = instances.get_mut(instance_id)?;
904 if matches!(
905 instance.status,
906 AgentInstanceStatus::Completed
907 | AgentInstanceStatus::Failed
908 | AgentInstanceStatus::Cancelled
909 ) {
910 return Some(instance.clone());
911 }
912 instance.status = status.clone();
913 let snapshot = instance.clone();
914 drop(instances);
915 match status {
916 AgentInstanceStatus::Completed => emit_instance_completed(state, &snapshot),
917 AgentInstanceStatus::Failed => emit_instance_failed(state, &snapshot),
918 _ => {}
919 }
920 Some(snapshot)
921 }
922
923 pub async fn handle_engine_event(&self, state: &AppState, event: &EngineEvent) {
924 let Some(session_id) = extract_session_id(event) else {
925 return;
926 };
927 let Some(instance_id) = self.instance_id_for_session(&session_id).await else {
928 return;
929 };
930 if event.event_type == "provider.usage" {
931 let total_tokens = event
932 .properties
933 .get("totalTokens")
934 .and_then(|v| v.as_u64())
935 .unwrap_or(0);
936 let cost_used_usd = event
937 .properties
938 .get("costUsd")
939 .and_then(|v| v.as_f64())
940 .unwrap_or(0.0);
941 if total_tokens > 0 {
942 let exhausted = self
943 .apply_exact_token_usage(state, &instance_id, total_tokens, cost_used_usd)
944 .await;
945 if exhausted {
946 let _ = self
947 .cancel_instance(state, &instance_id, "budget exhausted")
948 .await;
949 }
950 }
951 return;
952 }
953 let mut delta_tokens = 0u64;
954 let mut delta_steps = 0u32;
955 let mut delta_tool_calls = 0u32;
956 if event.event_type == "message.part.updated" {
957 if let Some(part) = event.properties.get("part") {
958 let part_type = part.get("type").and_then(|v| v.as_str()).unwrap_or("");
959 if part_type == "tool-invocation" {
960 delta_tool_calls = 1;
961 } else if part_type == "text" {
962 let delta = event
963 .properties
964 .get("delta")
965 .and_then(|v| v.as_str())
966 .unwrap_or("");
967 if !delta.is_empty() {
968 delta_tokens = estimate_tokens(delta);
969 }
970 }
971 }
972 } else if event.event_type == "session.run.finished" {
973 delta_steps = 1;
974 let run_status = event
975 .properties
976 .get("status")
977 .and_then(|v| v.as_str())
978 .unwrap_or("")
979 .to_ascii_lowercase();
980 if run_status == "completed" {
981 let _ = self
982 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Completed)
983 .await;
984 } else if run_status == "failed" || run_status == "error" {
985 let _ = self
986 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Failed)
987 .await;
988 }
989 }
990 if delta_tokens == 0 && delta_steps == 0 && delta_tool_calls == 0 {
991 return;
992 }
993 let exhausted = self
994 .apply_budget_delta(
995 state,
996 &instance_id,
997 delta_tokens,
998 delta_steps,
999 delta_tool_calls,
1000 )
1001 .await;
1002 if exhausted {
1003 let _ = self
1004 .cancel_instance(state, &instance_id, "budget exhausted")
1005 .await;
1006 }
1007 }
1008
1009 async fn instance_id_for_session(&self, session_id: &str) -> Option<String> {
1010 self.instances
1011 .read()
1012 .await
1013 .values()
1014 .find(|instance| instance.session_id == session_id)
1015 .map(|instance| instance.instance_id.clone())
1016 }
1017
1018 async fn apply_budget_delta(
1019 &self,
1020 state: &AppState,
1021 instance_id: &str,
1022 delta_tokens: u64,
1023 delta_steps: u32,
1024 delta_tool_calls: u32,
1025 ) -> bool {
1026 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1027 enabled: false,
1028 require_justification: false,
1029 max_agents: None,
1030 max_concurrent: None,
1031 child_budget_percent_of_parent_remaining: None,
1032 mission_total_budget: None,
1033 cost_per_1k_tokens_usd: None,
1034 spawn_edges: HashMap::new(),
1035 required_skills: HashMap::new(),
1036 role_defaults: HashMap::new(),
1037 skill_sources: Default::default(),
1038 });
1039 let mut budgets = self.budgets.write().await;
1040 let Some(usage) = budgets.get_mut(instance_id) else {
1041 return false;
1042 };
1043 if usage.exhausted {
1044 return true;
1045 }
1046 let prev_cost_used_usd = usage.cost_used_usd;
1047 usage.tokens_used = usage.tokens_used.saturating_add(delta_tokens);
1048 usage.steps_used = usage.steps_used.saturating_add(delta_steps);
1049 usage.tool_calls_used = usage.tool_calls_used.saturating_add(delta_tool_calls);
1050 if let Some(rate) = policy.cost_per_1k_tokens_usd {
1051 usage.cost_used_usd += (delta_tokens as f64 / 1000.0) * rate;
1052 }
1053 let elapsed_ms = usage
1054 .started_at
1055 .map(|started| started.elapsed().as_millis() as u64)
1056 .unwrap_or(0);
1057
1058 let mut exhausted_reason: Option<&'static str> = None;
1059 let mut snapshot: Option<AgentInstance> = None;
1060 {
1061 let mut instances = self.instances.write().await;
1062 if let Some(instance) = instances.get_mut(instance_id) {
1063 instance.metadata = Some(merge_metadata_usage(
1064 instance.metadata.take(),
1065 usage.tokens_used,
1066 usage.steps_used,
1067 usage.tool_calls_used,
1068 usage.cost_used_usd,
1069 elapsed_ms,
1070 ));
1071 if let Some(limit) = instance.budget.max_tokens {
1072 if usage.tokens_used >= limit {
1073 exhausted_reason = Some("max_tokens");
1074 }
1075 }
1076 if exhausted_reason.is_none() {
1077 if let Some(limit) = instance.budget.max_steps {
1078 if usage.steps_used >= limit {
1079 exhausted_reason = Some("max_steps");
1080 }
1081 }
1082 }
1083 if exhausted_reason.is_none() {
1084 if let Some(limit) = instance.budget.max_tool_calls {
1085 if usage.tool_calls_used >= limit {
1086 exhausted_reason = Some("max_tool_calls");
1087 }
1088 }
1089 }
1090 if exhausted_reason.is_none() {
1091 if let Some(limit) = instance.budget.max_duration_ms {
1092 if elapsed_ms >= limit {
1093 exhausted_reason = Some("max_duration_ms");
1094 }
1095 }
1096 }
1097 if exhausted_reason.is_none() {
1098 if let Some(limit) = instance.budget.max_cost_usd {
1099 if usage.cost_used_usd >= limit {
1100 exhausted_reason = Some("max_cost_usd");
1101 }
1102 }
1103 }
1104 snapshot = Some(instance.clone());
1105 }
1106 }
1107 let Some(instance) = snapshot else {
1108 return false;
1109 };
1110 emit_budget_usage(
1111 state,
1112 &instance,
1113 usage.tokens_used,
1114 usage.steps_used,
1115 usage.tool_calls_used,
1116 usage.cost_used_usd,
1117 elapsed_ms,
1118 );
1119 let mission_exhausted = self
1120 .apply_mission_budget_delta(
1121 state,
1122 &instance.mission_id,
1123 delta_tokens,
1124 u64::from(delta_steps),
1125 u64::from(delta_tool_calls),
1126 usage.cost_used_usd - prev_cost_used_usd,
1127 &policy,
1128 )
1129 .await;
1130 if mission_exhausted {
1131 usage.exhausted = true;
1132 let _ = self
1133 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1134 .await;
1135 return true;
1136 }
1137 if let Some(reason) = exhausted_reason {
1138 usage.exhausted = true;
1139 emit_budget_exhausted(
1140 state,
1141 &instance,
1142 reason,
1143 usage.tokens_used,
1144 usage.steps_used,
1145 usage.tool_calls_used,
1146 usage.cost_used_usd,
1147 elapsed_ms,
1148 );
1149 return true;
1150 }
1151 false
1152 }
1153
1154 async fn apply_exact_token_usage(
1155 &self,
1156 state: &AppState,
1157 instance_id: &str,
1158 total_tokens: u64,
1159 cost_used_usd: f64,
1160 ) -> bool {
1161 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1162 enabled: false,
1163 require_justification: false,
1164 max_agents: None,
1165 max_concurrent: None,
1166 child_budget_percent_of_parent_remaining: None,
1167 mission_total_budget: None,
1168 cost_per_1k_tokens_usd: None,
1169 spawn_edges: HashMap::new(),
1170 required_skills: HashMap::new(),
1171 role_defaults: HashMap::new(),
1172 skill_sources: Default::default(),
1173 });
1174 let mut budgets = self.budgets.write().await;
1175 let Some(usage) = budgets.get_mut(instance_id) else {
1176 return false;
1177 };
1178 if usage.exhausted {
1179 return true;
1180 }
1181 let prev_tokens = usage.tokens_used;
1182 let prev_cost_used_usd = usage.cost_used_usd;
1183 usage.tokens_used = usage.tokens_used.max(total_tokens);
1184 if cost_used_usd > 0.0 {
1185 usage.cost_used_usd = usage.cost_used_usd.max(cost_used_usd);
1186 } else if let Some(rate) = policy.cost_per_1k_tokens_usd {
1187 let delta = usage.tokens_used.saturating_sub(prev_tokens);
1188 usage.cost_used_usd += (delta as f64 / 1000.0) * rate;
1189 }
1190 let elapsed_ms = usage
1191 .started_at
1192 .map(|started| started.elapsed().as_millis() as u64)
1193 .unwrap_or(0);
1194 let mut exhausted_reason: Option<&'static str> = None;
1195 let mut snapshot: Option<AgentInstance> = None;
1196 {
1197 let mut instances = self.instances.write().await;
1198 if let Some(instance) = instances.get_mut(instance_id) {
1199 instance.metadata = Some(merge_metadata_usage(
1200 instance.metadata.take(),
1201 usage.tokens_used,
1202 usage.steps_used,
1203 usage.tool_calls_used,
1204 usage.cost_used_usd,
1205 elapsed_ms,
1206 ));
1207 if let Some(limit) = instance.budget.max_tokens {
1208 if usage.tokens_used >= limit {
1209 exhausted_reason = Some("max_tokens");
1210 }
1211 }
1212 if exhausted_reason.is_none() {
1213 if let Some(limit) = instance.budget.max_cost_usd {
1214 if usage.cost_used_usd >= limit {
1215 exhausted_reason = Some("max_cost_usd");
1216 }
1217 }
1218 }
1219 snapshot = Some(instance.clone());
1220 }
1221 }
1222 let Some(instance) = snapshot else {
1223 return false;
1224 };
1225 emit_budget_usage(
1226 state,
1227 &instance,
1228 usage.tokens_used,
1229 usage.steps_used,
1230 usage.tool_calls_used,
1231 usage.cost_used_usd,
1232 elapsed_ms,
1233 );
1234 let mission_exhausted = self
1235 .apply_mission_budget_delta(
1236 state,
1237 &instance.mission_id,
1238 usage.tokens_used.saturating_sub(prev_tokens),
1239 0,
1240 0,
1241 usage.cost_used_usd - prev_cost_used_usd,
1242 &policy,
1243 )
1244 .await;
1245 if mission_exhausted {
1246 usage.exhausted = true;
1247 let _ = self
1248 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1249 .await;
1250 return true;
1251 }
1252 if let Some(reason) = exhausted_reason {
1253 usage.exhausted = true;
1254 emit_budget_exhausted(
1255 state,
1256 &instance,
1257 reason,
1258 usage.tokens_used,
1259 usage.steps_used,
1260 usage.tool_calls_used,
1261 usage.cost_used_usd,
1262 elapsed_ms,
1263 );
1264 return true;
1265 }
1266 false
1267 }
1268
1269 async fn append_audit(&self, action: &str, instance: &AgentInstance) -> anyhow::Result<()> {
1270 let path = self.audit_path.read().await.clone();
1271 if let Some(parent) = path.parent() {
1272 fs::create_dir_all(parent).await?;
1273 }
1274 let row = json!({
1275 "action": action,
1276 "missionID": instance.mission_id,
1277 "instanceID": instance.instance_id,
1278 "parentInstanceID": instance.parent_instance_id,
1279 "role": instance.role,
1280 "templateID": instance.template_id,
1281 "sessionID": instance.session_id,
1282 "skillHash": instance.skill_hash,
1283 "timestampMs": crate::now_ms(),
1284 });
1285 let mut existing = if path.exists() {
1286 fs::read_to_string(&path).await.unwrap_or_default()
1287 } else {
1288 String::new()
1289 };
1290 existing.push_str(&serde_json::to_string(&row)?);
1291 existing.push('\n');
1292 fs::write(path, existing).await?;
1293 Ok(())
1294 }
1295
1296 async fn append_approval_audit(
1297 &self,
1298 action: &str,
1299 approval_id: &str,
1300 instance: Option<&AgentInstance>,
1301 reason: &str,
1302 ) -> anyhow::Result<()> {
1303 let path = self.audit_path.read().await.clone();
1304 if let Some(parent) = path.parent() {
1305 fs::create_dir_all(parent).await?;
1306 }
1307 let row = json!({
1308 "action": action,
1309 "approvalID": approval_id,
1310 "reason": reason,
1311 "missionID": instance.map(|v| v.mission_id.clone()),
1312 "instanceID": instance.map(|v| v.instance_id.clone()),
1313 "parentInstanceID": instance.and_then(|v| v.parent_instance_id.clone()),
1314 "role": instance.map(|v| v.role.clone()),
1315 "templateID": instance.map(|v| v.template_id.clone()),
1316 "sessionID": instance.map(|v| v.session_id.clone()),
1317 "skillHash": instance.map(|v| v.skill_hash.clone()),
1318 "timestampMs": crate::now_ms(),
1319 });
1320 let mut existing = if path.exists() {
1321 fs::read_to_string(&path).await.unwrap_or_default()
1322 } else {
1323 String::new()
1324 };
1325 existing.push_str(&serde_json::to_string(&row)?);
1326 existing.push('\n');
1327 fs::write(path, existing).await?;
1328 Ok(())
1329 }
1330
1331 #[allow(clippy::too_many_arguments)]
1332 async fn apply_mission_budget_delta(
1333 &self,
1334 state: &AppState,
1335 mission_id: &str,
1336 delta_tokens: u64,
1337 delta_steps: u64,
1338 delta_tool_calls: u64,
1339 delta_cost_used_usd: f64,
1340 policy: &SpawnPolicy,
1341 ) -> bool {
1342 let mut budgets = self.mission_budgets.write().await;
1343 let row = budgets.entry(mission_id.to_string()).or_default();
1344 row.tokens_used = row.tokens_used.saturating_add(delta_tokens);
1345 row.steps_used = row.steps_used.saturating_add(delta_steps);
1346 row.tool_calls_used = row.tool_calls_used.saturating_add(delta_tool_calls);
1347 row.cost_used_usd += delta_cost_used_usd.max(0.0);
1348 if row.exhausted {
1349 return true;
1350 }
1351 let Some(limit) = policy.mission_total_budget.as_ref() else {
1352 return false;
1353 };
1354 let mut exhausted_by: Option<&'static str> = None;
1355 if let Some(max) = limit.max_tokens {
1356 if row.tokens_used >= max {
1357 exhausted_by = Some("mission_max_tokens");
1358 }
1359 }
1360 if exhausted_by.is_none() {
1361 if let Some(max) = limit.max_steps {
1362 if row.steps_used >= u64::from(max) {
1363 exhausted_by = Some("mission_max_steps");
1364 }
1365 }
1366 }
1367 if exhausted_by.is_none() {
1368 if let Some(max) = limit.max_tool_calls {
1369 if row.tool_calls_used >= u64::from(max) {
1370 exhausted_by = Some("mission_max_tool_calls");
1371 }
1372 }
1373 }
1374 if exhausted_by.is_none() {
1375 if let Some(max) = limit.max_cost_usd {
1376 if row.cost_used_usd >= max {
1377 exhausted_by = Some("mission_max_cost_usd");
1378 }
1379 }
1380 }
1381 if let Some(exhausted_by) = exhausted_by {
1382 row.exhausted = true;
1383 emit_mission_budget_exhausted(
1384 state,
1385 mission_id,
1386 exhausted_by,
1387 row.tokens_used,
1388 row.steps_used,
1389 row.tool_calls_used,
1390 row.cost_used_usd,
1391 );
1392 return true;
1393 }
1394 false
1395 }
1396
1397 pub async fn set_for_test(
1398 &self,
1399 workspace_root: Option<String>,
1400 policy: Option<SpawnPolicy>,
1401 templates: Vec<AgentTemplate>,
1402 ) {
1403 *self.policy.write().await = policy;
1404 let mut by_id = HashMap::new();
1405 for template in templates {
1406 by_id.insert(template.template_id.clone(), template);
1407 }
1408 *self.templates.write().await = by_id;
1409 self.instances.write().await.clear();
1410 self.budgets.write().await.clear();
1411 self.mission_budgets.write().await.clear();
1412 self.spawn_approvals.write().await.clear();
1413 *self.loaded_workspace.write().await = workspace_root;
1414 }
1415}
1416
1417fn resolve_budget(
1418 policy: &SpawnPolicy,
1419 parent_instance: Option<AgentInstance>,
1420 parent_usage: Option<InstanceBudgetState>,
1421 template: &AgentTemplate,
1422 override_budget: Option<BudgetLimit>,
1423 role: &AgentRole,
1424) -> BudgetLimit {
1425 let role_default = policy.role_defaults.get(role).cloned().unwrap_or_default();
1426 let mut chosen = merge_budget(
1427 merge_budget(role_default, template.default_budget.clone()),
1428 override_budget.unwrap_or_default(),
1429 );
1430
1431 if let Some(parent) = parent_instance {
1432 let usage = parent_usage.unwrap_or_default();
1433 if let Some(pct) = policy.child_budget_percent_of_parent_remaining {
1434 if pct > 0 {
1435 chosen.max_tokens = cap_budget_remaining_u64(
1436 chosen.max_tokens,
1437 parent.budget.max_tokens,
1438 usage.tokens_used,
1439 pct,
1440 );
1441 chosen.max_steps = cap_budget_remaining_u32(
1442 chosen.max_steps,
1443 parent.budget.max_steps,
1444 usage.steps_used,
1445 pct,
1446 );
1447 chosen.max_tool_calls = cap_budget_remaining_u32(
1448 chosen.max_tool_calls,
1449 parent.budget.max_tool_calls,
1450 usage.tool_calls_used,
1451 pct,
1452 );
1453 chosen.max_duration_ms = cap_budget_remaining_u64(
1454 chosen.max_duration_ms,
1455 parent.budget.max_duration_ms,
1456 usage
1457 .started_at
1458 .map(|started| started.elapsed().as_millis() as u64)
1459 .unwrap_or(0),
1460 pct,
1461 );
1462 chosen.max_cost_usd = cap_budget_remaining_f64(
1463 chosen.max_cost_usd,
1464 parent.budget.max_cost_usd,
1465 usage.cost_used_usd,
1466 pct,
1467 );
1468 }
1469 }
1470 }
1471 chosen
1472}
1473
1474fn merge_budget(base: BudgetLimit, overlay: BudgetLimit) -> BudgetLimit {
1475 BudgetLimit {
1476 max_tokens: overlay.max_tokens.or(base.max_tokens),
1477 max_steps: overlay.max_steps.or(base.max_steps),
1478 max_tool_calls: overlay.max_tool_calls.or(base.max_tool_calls),
1479 max_duration_ms: overlay.max_duration_ms.or(base.max_duration_ms),
1480 max_cost_usd: overlay.max_cost_usd.or(base.max_cost_usd),
1481 }
1482}
1483
1484fn cap_budget_remaining_u64(
1485 child: Option<u64>,
1486 parent_limit: Option<u64>,
1487 parent_used: u64,
1488 pct: u8,
1489) -> Option<u64> {
1490 match (child, parent_limit) {
1491 (Some(child), Some(parent_limit)) => {
1492 let remaining = parent_limit.saturating_sub(parent_used);
1493 Some(child.min(remaining.saturating_mul(pct as u64) / 100))
1494 }
1495 (None, Some(parent_limit)) => {
1496 let remaining = parent_limit.saturating_sub(parent_used);
1497 Some(remaining.saturating_mul(pct as u64) / 100)
1498 }
1499 (Some(child), None) => Some(child),
1500 (None, None) => None,
1501 }
1502}
1503
1504fn cap_budget_remaining_u32(
1505 child: Option<u32>,
1506 parent_limit: Option<u32>,
1507 parent_used: u32,
1508 pct: u8,
1509) -> Option<u32> {
1510 match (child, parent_limit) {
1511 (Some(child), Some(parent_limit)) => {
1512 let remaining = parent_limit.saturating_sub(parent_used);
1513 Some(child.min(remaining.saturating_mul(pct as u32) / 100))
1514 }
1515 (None, Some(parent_limit)) => {
1516 let remaining = parent_limit.saturating_sub(parent_used);
1517 Some(remaining.saturating_mul(pct as u32) / 100)
1518 }
1519 (Some(child), None) => Some(child),
1520 (None, None) => None,
1521 }
1522}
1523
1524fn cap_budget_remaining_f64(
1525 child: Option<f64>,
1526 parent_limit: Option<f64>,
1527 parent_used: f64,
1528 pct: u8,
1529) -> Option<f64> {
1530 match (child, parent_limit) {
1531 (Some(child), Some(parent_limit)) => {
1532 let remaining = (parent_limit - parent_used).max(0.0);
1533 Some(child.min(remaining * f64::from(pct) / 100.0))
1534 }
1535 (None, Some(parent_limit)) => {
1536 let remaining = (parent_limit - parent_used).max(0.0);
1537 Some(remaining * f64::from(pct) / 100.0)
1538 }
1539 (Some(child), None) => Some(child),
1540 (None, None) => None,
1541 }
1542}
1543
1544async fn compute_skill_hash(
1545 workspace_root: &str,
1546 template: &AgentTemplate,
1547 policy: &SpawnPolicy,
1548) -> Result<String, String> {
1549 use sha2::{Digest, Sha256};
1550 let mut rows = Vec::new();
1551 let skill_service = SkillService::for_workspace(Some(PathBuf::from(workspace_root)));
1552 for skill in &template.skills {
1553 if let Some(path) = skill.path.as_deref() {
1554 validate_skill_source(skill.id.as_deref(), Some(path), policy)?;
1555 let skill_path = Path::new(workspace_root).join(path);
1556 let raw = fs::read_to_string(&skill_path)
1557 .await
1558 .map_err(|_| format!("missing required skill path `{}`", skill_path.display()))?;
1559 let digest = hash_hex(raw.as_bytes());
1560 validate_pinned_hash(skill.id.as_deref(), Some(path), &digest, policy)?;
1561 rows.push(format!("path:{}:{}", path, digest));
1562 } else if let Some(id) = skill.id.as_deref() {
1563 validate_skill_source(Some(id), None, policy)?;
1564 let loaded = skill_service
1565 .load_skill(id)
1566 .map_err(|err| format!("failed loading skill `{id}`: {err}"))?;
1567 let Some(loaded) = loaded else {
1568 return Err(format!("missing required skill id `{id}`"));
1569 };
1570 let digest = hash_hex(loaded.content.as_bytes());
1571 validate_pinned_hash(Some(id), None, &digest, policy)?;
1572 rows.push(format!("id:{}:{}", id, digest));
1573 }
1574 }
1575 rows.sort();
1576 let mut hasher = Sha256::new();
1577 for row in rows {
1578 hasher.update(row.as_bytes());
1579 hasher.update(b"\n");
1580 }
1581 let digest = hasher.finalize();
1582 Ok(format!("sha256:{}", hash_hex(digest.as_slice())))
1583}
1584
1585fn validate_skill_source(
1586 id: Option<&str>,
1587 path: Option<&str>,
1588 policy: &SpawnPolicy,
1589) -> Result<(), String> {
1590 use tandem_orchestrator::SkillSourceMode;
1591 match policy.skill_sources.mode {
1592 SkillSourceMode::Any => Ok(()),
1593 SkillSourceMode::ProjectOnly => {
1594 if id.is_some() {
1595 return Err("skill source denied: project_only forbids skill IDs".to_string());
1596 }
1597 let Some(path) = path else {
1598 return Err("skill source denied: project_only requires skill path".to_string());
1599 };
1600 let p = PathBuf::from(path);
1601 if p.is_absolute() {
1602 return Err("skill source denied: absolute skill paths are forbidden".to_string());
1603 }
1604 Ok(())
1605 }
1606 SkillSourceMode::Allowlist => {
1607 if let Some(id) = id {
1608 if policy.skill_sources.allowlist_ids.iter().any(|v| v == id) {
1609 return Ok(());
1610 }
1611 }
1612 if let Some(path) = path {
1613 if policy
1614 .skill_sources
1615 .allowlist_paths
1616 .iter()
1617 .any(|v| v == path)
1618 {
1619 return Ok(());
1620 }
1621 }
1622 Err("skill source denied: not present in allowlist".to_string())
1623 }
1624 }
1625}
1626
1627fn validate_pinned_hash(
1628 id: Option<&str>,
1629 path: Option<&str>,
1630 digest: &str,
1631 policy: &SpawnPolicy,
1632) -> Result<(), String> {
1633 let by_id = id.and_then(|id| policy.skill_sources.pinned_hashes.get(&format!("id:{id}")));
1634 let by_path = path.and_then(|path| {
1635 policy
1636 .skill_sources
1637 .pinned_hashes
1638 .get(&format!("path:{path}"))
1639 });
1640 let expected = by_id.or(by_path);
1641 if let Some(expected) = expected {
1642 let normalized = expected.strip_prefix("sha256:").unwrap_or(expected);
1643 if normalized != digest {
1644 return Err("pinned hash mismatch for skill reference".to_string());
1645 }
1646 }
1647 Ok(())
1648}
1649
1650fn hash_hex(bytes: &[u8]) -> String {
1651 let mut out = String::with_capacity(bytes.len() * 2);
1652 for byte in bytes {
1653 use std::fmt::Write as _;
1654 let _ = write!(&mut out, "{:02x}", byte);
1655 }
1656 out
1657}
1658
1659fn estimate_tokens(text: &str) -> u64 {
1660 let chars = text.chars().count() as u64;
1661 (chars / 4).max(1)
1662}
1663
1664fn extract_session_id(event: &EngineEvent) -> Option<String> {
1665 event
1666 .properties
1667 .get("sessionID")
1668 .and_then(|v| v.as_str())
1669 .map(|v| v.to_string())
1670 .or_else(|| {
1671 event
1672 .properties
1673 .get("part")
1674 .and_then(|v| v.get("sessionID"))
1675 .and_then(|v| v.as_str())
1676 .map(|v| v.to_string())
1677 })
1678}
1679
1680fn merge_metadata_usage(
1681 metadata: Option<Value>,
1682 tokens_used: u64,
1683 steps_used: u32,
1684 tool_calls_used: u32,
1685 cost_used_usd: f64,
1686 elapsed_ms: u64,
1687) -> Value {
1688 let mut base = metadata
1689 .and_then(|v| v.as_object().cloned())
1690 .unwrap_or_default();
1691 base.insert(
1692 "budgetUsage".to_string(),
1693 json!({
1694 "tokensUsed": tokens_used,
1695 "stepsUsed": steps_used,
1696 "toolCallsUsed": tool_calls_used,
1697 "costUsedUsd": cost_used_usd,
1698 "elapsedMs": elapsed_ms
1699 }),
1700 );
1701 Value::Object(base)
1702}
1703
1704fn normalize_tool_name(name: &str) -> String {
1705 match name.trim().to_lowercase().replace('-', "_").as_str() {
1706 "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
1707 other => other.to_string(),
1708 }
1709}
1710
1711async fn evaluate_capability_deny(
1712 state: &AppState,
1713 instance: &AgentInstance,
1714 tool: &str,
1715 args: &Value,
1716 caps: &tandem_orchestrator::CapabilitySpec,
1717 session_id: &str,
1718 message_id: &str,
1719) -> Option<String> {
1720 if !caps.tool_denylist.is_empty()
1721 && caps
1722 .tool_denylist
1723 .iter()
1724 .any(|name| normalize_tool_name(name) == *tool)
1725 {
1726 return Some(format!("tool `{tool}` denied by agent capability policy"));
1727 }
1728 if !caps.tool_allowlist.is_empty()
1729 && !caps
1730 .tool_allowlist
1731 .iter()
1732 .any(|name| normalize_tool_name(name) == *tool)
1733 {
1734 return Some(format!("tool `{tool}` not in agent allowlist"));
1735 }
1736
1737 if matches!(tool, "websearch" | "webfetch" | "webfetch_html") {
1738 if !caps.net_scopes.enabled {
1739 return Some("network disabled for this agent instance".to_string());
1740 }
1741 if !caps.net_scopes.allow_hosts.is_empty() {
1742 if tool == "websearch" {
1743 return Some(
1744 "websearch blocked: host allowlist cannot be verified for search tool"
1745 .to_string(),
1746 );
1747 }
1748 if let Some(host) = extract_url_host(args) {
1749 let allowed = caps.net_scopes.allow_hosts.iter().any(|h| {
1750 let allowed = h.trim().to_ascii_lowercase();
1751 !allowed.is_empty()
1752 && (host == allowed || host.ends_with(&format!(".{allowed}")))
1753 });
1754 if !allowed {
1755 return Some(format!("network host `{host}` not in allow_hosts"));
1756 }
1757 }
1758 }
1759 }
1760
1761 if tool == "bash" {
1762 let cmd = args
1763 .get("command")
1764 .and_then(|v| v.as_str())
1765 .unwrap_or("")
1766 .to_ascii_lowercase();
1767 if cmd.contains("git push") {
1768 if !caps.git_caps.push {
1769 return Some("git push disabled for this agent instance".to_string());
1770 }
1771 if caps.git_caps.push_requires_approval {
1772 let action = state.permissions.evaluate("git_push", "git_push").await;
1773 match action {
1774 tandem_core::PermissionAction::Allow => {}
1775 tandem_core::PermissionAction::Deny => {
1776 return Some("git push denied by policy rule".to_string());
1777 }
1778 tandem_core::PermissionAction::Ask => {
1779 let pending = state
1780 .permissions
1781 .ask_for_session_with_context(
1782 Some(session_id),
1783 "git_push",
1784 args.clone(),
1785 Some(tandem_core::PermissionArgsContext {
1786 args_source: "agent_team.git_push".to_string(),
1787 args_integrity: "runtime-checked".to_string(),
1788 query: Some(format!(
1789 "instanceID={} messageID={}",
1790 instance.instance_id, message_id
1791 )),
1792 }),
1793 )
1794 .await;
1795 return Some(format!(
1796 "git push requires explicit user approval (approvalID={})",
1797 pending.id
1798 ));
1799 }
1800 }
1801 }
1802 }
1803 if cmd.contains("git commit") && !caps.git_caps.commit {
1804 return Some("git commit disabled for this agent instance".to_string());
1805 }
1806 }
1807
1808 let access_kind = tool_fs_access_kind(tool);
1809 if let Some(kind) = access_kind {
1810 let Some(session) = state.storage.get_session(session_id).await else {
1811 return Some("session not found for capability evaluation".to_string());
1812 };
1813 let Some(root) = session.workspace_root.clone() else {
1814 return Some("workspace root missing for capability evaluation".to_string());
1815 };
1816 let requested = extract_tool_candidate_paths(tool, args);
1817 if !requested.is_empty() {
1818 let allowed_scopes = if kind == "read" {
1819 &caps.fs_scopes.read
1820 } else {
1821 &caps.fs_scopes.write
1822 };
1823 if allowed_scopes.is_empty() {
1824 return Some(format!("fs {kind} access blocked: no scopes configured"));
1825 }
1826 for candidate in requested {
1827 if !is_path_allowed_by_scopes(&root, &candidate, allowed_scopes) {
1828 return Some(format!("fs {kind} access denied for path `{}`", candidate));
1829 }
1830 }
1831 }
1832 }
1833
1834 denied_secrets_reason(tool, caps, args)
1835}
1836
1837fn denied_secrets_reason(
1838 tool: &str,
1839 caps: &tandem_orchestrator::CapabilitySpec,
1840 args: &Value,
1841) -> Option<String> {
1842 if tool == "auth" {
1843 if caps.secrets_scopes.is_empty() {
1844 return Some("secrets are disabled for this agent instance".to_string());
1845 }
1846 let alias = args
1847 .get("id")
1848 .or_else(|| args.get("provider"))
1849 .or_else(|| args.get("providerID"))
1850 .and_then(|v| v.as_str())
1851 .unwrap_or("")
1852 .trim();
1853 if !alias.is_empty() && !caps.secrets_scopes.iter().any(|allowed| allowed == alias) {
1854 return Some(format!(
1855 "secret alias `{alias}` is not in agent secretsScopes allowlist"
1856 ));
1857 }
1858 }
1859 None
1860}
1861
1862fn tool_fs_access_kind(tool: &str) -> Option<&'static str> {
1863 match tool {
1864 "read" | "glob" | "grep" | "codesearch" | "lsp" => Some("read"),
1865 "write" | "edit" | "apply_patch" => Some("write"),
1866 _ => None,
1867 }
1868}
1869
1870fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
1871 let Some(obj) = args.as_object() else {
1872 return Vec::new();
1873 };
1874 let keys: &[&str] = match tool {
1875 "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
1876 "glob" => &["pattern"],
1877 "lsp" => &["filePath", "path"],
1878 "bash" => &["cwd"],
1879 "apply_patch" => &["path"],
1880 _ => &["path", "cwd"],
1881 };
1882 keys.iter()
1883 .filter_map(|key| obj.get(*key))
1884 .filter_map(|value| value.as_str())
1885 .filter(|s| !s.trim().is_empty())
1886 .map(|raw| strip_glob_tokens(raw).to_string())
1887 .collect()
1888}
1889
1890fn strip_glob_tokens(path: &str) -> &str {
1891 let mut end = path.len();
1892 for (idx, ch) in path.char_indices() {
1893 if ch == '*' || ch == '?' || ch == '[' {
1894 end = idx;
1895 break;
1896 }
1897 }
1898 &path[..end]
1899}
1900
1901fn is_path_allowed_by_scopes(root: &str, candidate: &str, scopes: &[String]) -> bool {
1902 let root_path = PathBuf::from(root);
1903 let candidate_path = resolve_path(&root_path, candidate);
1904 scopes.iter().any(|scope| {
1905 let scope_path = resolve_path(&root_path, scope);
1906 candidate_path.starts_with(scope_path)
1907 })
1908}
1909
1910fn resolve_path(root: &Path, raw: &str) -> PathBuf {
1911 let raw = raw.trim();
1912 if raw.is_empty() {
1913 return root.to_path_buf();
1914 }
1915 let path = PathBuf::from(raw);
1916 if path.is_absolute() {
1917 path
1918 } else {
1919 root.join(path)
1920 }
1921}
1922
1923fn extract_url_host(args: &Value) -> Option<String> {
1924 let url = args
1925 .get("url")
1926 .or_else(|| args.get("uri"))
1927 .or_else(|| args.get("link"))
1928 .and_then(|v| v.as_str())?;
1929 let raw = url.trim();
1930 let (_, after_scheme) = raw.split_once("://")?;
1931 let host_port = after_scheme.split('/').next().unwrap_or_default();
1932 let host = host_port.split('@').next_back().unwrap_or_default();
1933 let host = host
1934 .split(':')
1935 .next()
1936 .unwrap_or_default()
1937 .to_ascii_lowercase();
1938 if host.is_empty() {
1939 None
1940 } else {
1941 Some(host)
1942 }
1943}
1944
1945pub fn emit_spawn_requested(state: &AppState, req: &SpawnRequest) {
1946 emit_spawn_requested_with_context(state, req, &SpawnEventContext::default());
1947}
1948
1949pub fn emit_spawn_denied(state: &AppState, req: &SpawnRequest, decision: &SpawnDecision) {
1950 emit_spawn_denied_with_context(state, req, decision, &SpawnEventContext::default());
1951}
1952
1953pub fn emit_spawn_approved(state: &AppState, req: &SpawnRequest, instance: &AgentInstance) {
1954 emit_spawn_approved_with_context(state, req, instance, &SpawnEventContext::default());
1955}
1956
1957#[derive(Default)]
1958pub struct SpawnEventContext<'a> {
1959 pub session_id: Option<&'a str>,
1960 pub message_id: Option<&'a str>,
1961 pub run_id: Option<&'a str>,
1962}
1963
1964pub fn emit_spawn_requested_with_context(
1965 state: &AppState,
1966 req: &SpawnRequest,
1967 ctx: &SpawnEventContext<'_>,
1968) {
1969 state.event_bus.publish(EngineEvent::new(
1970 "agent_team.spawn.requested",
1971 json!({
1972 "sessionID": ctx.session_id,
1973 "messageID": ctx.message_id,
1974 "runID": ctx.run_id,
1975 "missionID": req.mission_id,
1976 "instanceID": Value::Null,
1977 "parentInstanceID": req.parent_instance_id,
1978 "source": req.source,
1979 "requestedRole": req.role,
1980 "templateID": req.template_id,
1981 "justification": req.justification,
1982 "timestampMs": crate::now_ms(),
1983 }),
1984 ));
1985}
1986
1987pub fn emit_spawn_denied_with_context(
1988 state: &AppState,
1989 req: &SpawnRequest,
1990 decision: &SpawnDecision,
1991 ctx: &SpawnEventContext<'_>,
1992) {
1993 state.event_bus.publish(EngineEvent::new(
1994 "agent_team.spawn.denied",
1995 json!({
1996 "sessionID": ctx.session_id,
1997 "messageID": ctx.message_id,
1998 "runID": ctx.run_id,
1999 "missionID": req.mission_id,
2000 "instanceID": Value::Null,
2001 "parentInstanceID": req.parent_instance_id,
2002 "source": req.source,
2003 "requestedRole": req.role,
2004 "templateID": req.template_id,
2005 "code": decision.code,
2006 "error": decision.reason,
2007 "timestampMs": crate::now_ms(),
2008 }),
2009 ));
2010}
2011
2012pub fn emit_spawn_approved_with_context(
2013 state: &AppState,
2014 req: &SpawnRequest,
2015 instance: &AgentInstance,
2016 ctx: &SpawnEventContext<'_>,
2017) {
2018 state.event_bus.publish(EngineEvent::new(
2019 "agent_team.spawn.approved",
2020 json!({
2021 "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2022 "messageID": ctx.message_id,
2023 "runID": ctx.run_id.or(instance.run_id.as_deref()),
2024 "missionID": instance.mission_id,
2025 "instanceID": instance.instance_id,
2026 "parentInstanceID": instance.parent_instance_id,
2027 "source": req.source,
2028 "requestedRole": req.role,
2029 "templateID": instance.template_id,
2030 "skillHash": instance.skill_hash,
2031 "timestampMs": crate::now_ms(),
2032 }),
2033 ));
2034 state.event_bus.publish(EngineEvent::new(
2035 "agent_team.instance.started",
2036 json!({
2037 "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2038 "messageID": ctx.message_id,
2039 "runID": ctx.run_id.or(instance.run_id.as_deref()),
2040 "missionID": instance.mission_id,
2041 "instanceID": instance.instance_id,
2042 "parentInstanceID": instance.parent_instance_id,
2043 "role": instance.role,
2044 "status": instance.status,
2045 "budgetLimit": instance.budget,
2046 "skillHash": instance.skill_hash,
2047 "timestampMs": crate::now_ms(),
2048 }),
2049 ));
2050}
2051
2052pub fn emit_budget_usage(
2053 state: &AppState,
2054 instance: &AgentInstance,
2055 tokens_used: u64,
2056 steps_used: u32,
2057 tool_calls_used: u32,
2058 cost_used_usd: f64,
2059 elapsed_ms: u64,
2060) {
2061 state.event_bus.publish(EngineEvent::new(
2062 "agent_team.budget.usage",
2063 json!({
2064 "sessionID": instance.session_id,
2065 "messageID": Value::Null,
2066 "runID": instance.run_id,
2067 "missionID": instance.mission_id,
2068 "instanceID": instance.instance_id,
2069 "tokensUsed": tokens_used,
2070 "stepsUsed": steps_used,
2071 "toolCallsUsed": tool_calls_used,
2072 "costUsedUsd": cost_used_usd,
2073 "elapsedMs": elapsed_ms,
2074 "timestampMs": crate::now_ms(),
2075 }),
2076 ));
2077}
2078
2079#[allow(clippy::too_many_arguments)]
2080pub fn emit_budget_exhausted(
2081 state: &AppState,
2082 instance: &AgentInstance,
2083 exhausted_by: &str,
2084 tokens_used: u64,
2085 steps_used: u32,
2086 tool_calls_used: u32,
2087 cost_used_usd: f64,
2088 elapsed_ms: u64,
2089) {
2090 state.event_bus.publish(EngineEvent::new(
2091 "agent_team.budget.exhausted",
2092 json!({
2093 "sessionID": instance.session_id,
2094 "messageID": Value::Null,
2095 "runID": instance.run_id,
2096 "missionID": instance.mission_id,
2097 "instanceID": instance.instance_id,
2098 "exhaustedBy": exhausted_by,
2099 "tokensUsed": tokens_used,
2100 "stepsUsed": steps_used,
2101 "toolCallsUsed": tool_calls_used,
2102 "costUsedUsd": cost_used_usd,
2103 "elapsedMs": elapsed_ms,
2104 "timestampMs": crate::now_ms(),
2105 }),
2106 ));
2107}
2108
2109pub fn emit_instance_cancelled(state: &AppState, instance: &AgentInstance, reason: &str) {
2110 state.event_bus.publish(EngineEvent::new(
2111 "agent_team.instance.cancelled",
2112 json!({
2113 "sessionID": instance.session_id,
2114 "messageID": Value::Null,
2115 "runID": instance.run_id,
2116 "missionID": instance.mission_id,
2117 "instanceID": instance.instance_id,
2118 "parentInstanceID": instance.parent_instance_id,
2119 "role": instance.role,
2120 "status": instance.status,
2121 "reason": reason,
2122 "timestampMs": crate::now_ms(),
2123 }),
2124 ));
2125}
2126
2127pub fn emit_instance_completed(state: &AppState, instance: &AgentInstance) {
2128 state.event_bus.publish(EngineEvent::new(
2129 "agent_team.instance.completed",
2130 json!({
2131 "sessionID": instance.session_id,
2132 "messageID": Value::Null,
2133 "runID": instance.run_id,
2134 "missionID": instance.mission_id,
2135 "instanceID": instance.instance_id,
2136 "parentInstanceID": instance.parent_instance_id,
2137 "role": instance.role,
2138 "status": instance.status,
2139 "timestampMs": crate::now_ms(),
2140 }),
2141 ));
2142}
2143
2144pub fn emit_instance_failed(state: &AppState, instance: &AgentInstance) {
2145 state.event_bus.publish(EngineEvent::new(
2146 "agent_team.instance.failed",
2147 json!({
2148 "sessionID": instance.session_id,
2149 "messageID": Value::Null,
2150 "runID": instance.run_id,
2151 "missionID": instance.mission_id,
2152 "instanceID": instance.instance_id,
2153 "parentInstanceID": instance.parent_instance_id,
2154 "role": instance.role,
2155 "status": instance.status,
2156 "timestampMs": crate::now_ms(),
2157 }),
2158 ));
2159}
2160
2161pub fn emit_mission_budget_exhausted(
2162 state: &AppState,
2163 mission_id: &str,
2164 exhausted_by: &str,
2165 tokens_used: u64,
2166 steps_used: u64,
2167 tool_calls_used: u64,
2168 cost_used_usd: f64,
2169) {
2170 state.event_bus.publish(EngineEvent::new(
2171 "agent_team.mission.budget.exhausted",
2172 json!({
2173 "sessionID": Value::Null,
2174 "messageID": Value::Null,
2175 "runID": Value::Null,
2176 "missionID": mission_id,
2177 "instanceID": Value::Null,
2178 "exhaustedBy": exhausted_by,
2179 "tokensUsed": tokens_used,
2180 "stepsUsed": steps_used,
2181 "toolCallsUsed": tool_calls_used,
2182 "costUsedUsd": cost_used_usd,
2183 "timestampMs": crate::now_ms(),
2184 }),
2185 ));
2186}