1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::Context;
7use futures::future::BoxFuture;
8use serde::Deserialize;
9use serde::Serialize;
10use serde_json::{json, Value};
11use tandem_core::{
12 any_policy_matches, SpawnAgentHook, SpawnAgentToolContext, SpawnAgentToolResult,
13 ToolPolicyContext, ToolPolicyDecision, ToolPolicyHook,
14};
15use tandem_orchestrator::{
16 AgentInstance, AgentInstanceStatus, AgentRole, AgentTemplate, BudgetLimit, SpawnDecision,
17 SpawnDenyCode, SpawnPolicy, SpawnRequest, SpawnSource,
18};
19use tandem_skills::SkillService;
20use tandem_types::{EngineEvent, Session};
21use tokio::fs;
22use tokio::sync::RwLock;
23use uuid::Uuid;
24
25use crate::AppState;
26
27#[derive(Clone, Default)]
28pub struct AgentTeamRuntime {
29 policy: Arc<RwLock<Option<SpawnPolicy>>>,
30 templates: Arc<RwLock<HashMap<String, AgentTemplate>>>,
31 instances: Arc<RwLock<HashMap<String, AgentInstance>>>,
32 budgets: Arc<RwLock<HashMap<String, InstanceBudgetState>>>,
33 mission_budgets: Arc<RwLock<HashMap<String, MissionBudgetState>>>,
34 spawn_approvals: Arc<RwLock<HashMap<String, PendingSpawnApproval>>>,
35 loaded_workspace: Arc<RwLock<Option<String>>>,
36 audit_path: Arc<RwLock<PathBuf>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct SpawnResult {
41 pub decision: SpawnDecision,
42 pub instance: Option<AgentInstance>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46pub struct AgentMissionSummary {
47 #[serde(rename = "missionID")]
48 pub mission_id: String,
49 #[serde(rename = "instanceCount")]
50 pub instance_count: usize,
51 #[serde(rename = "runningCount")]
52 pub running_count: usize,
53 #[serde(rename = "completedCount")]
54 pub completed_count: usize,
55 #[serde(rename = "failedCount")]
56 pub failed_count: usize,
57 #[serde(rename = "cancelledCount")]
58 pub cancelled_count: usize,
59 #[serde(rename = "queuedCount")]
60 pub queued_count: usize,
61 #[serde(rename = "tokenUsedTotal")]
62 pub token_used_total: u64,
63 #[serde(rename = "toolCallsUsedTotal")]
64 pub tool_calls_used_total: u64,
65 #[serde(rename = "stepsUsedTotal")]
66 pub steps_used_total: u64,
67 #[serde(rename = "costUsedUsdTotal")]
68 pub cost_used_usd_total: f64,
69}
70
71#[derive(Debug, Clone, Default)]
72struct InstanceBudgetState {
73 tokens_used: u64,
74 steps_used: u32,
75 tool_calls_used: u32,
76 cost_used_usd: f64,
77 started_at: Option<Instant>,
78 exhausted: bool,
79}
80
81#[derive(Debug, Clone, Default)]
82struct MissionBudgetState {
83 tokens_used: u64,
84 steps_used: u64,
85 tool_calls_used: u64,
86 cost_used_usd: f64,
87 exhausted: bool,
88}
89
90#[derive(Debug, Clone, Serialize)]
91pub struct PendingSpawnApproval {
92 #[serde(rename = "approvalID")]
93 pub approval_id: String,
94 #[serde(rename = "createdAtMs")]
95 pub created_at_ms: u64,
96 pub request: SpawnRequest,
97 #[serde(rename = "decisionCode")]
98 pub decision_code: Option<SpawnDenyCode>,
99 pub reason: Option<String>,
100}
101
102#[derive(Clone)]
103pub struct ServerSpawnAgentHook {
104 state: AppState,
105}
106
107#[derive(Debug, Deserialize)]
108struct SpawnAgentToolInput {
109 #[serde(rename = "missionID")]
110 mission_id: Option<String>,
111 #[serde(rename = "parentInstanceID")]
112 parent_instance_id: Option<String>,
113 #[serde(rename = "templateID")]
114 template_id: Option<String>,
115 role: AgentRole,
116 source: Option<SpawnSource>,
117 justification: String,
118 #[serde(rename = "budgetOverride", default)]
119 budget_override: Option<BudgetLimit>,
120}
121
122impl ServerSpawnAgentHook {
123 pub fn new(state: AppState) -> Self {
124 Self { state }
125 }
126}
127
128impl SpawnAgentHook for ServerSpawnAgentHook {
129 fn spawn_agent(
130 &self,
131 ctx: SpawnAgentToolContext,
132 ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>> {
133 let state = self.state.clone();
134 Box::pin(async move {
135 let parsed = serde_json::from_value::<SpawnAgentToolInput>(ctx.args.clone());
136 let input = match parsed {
137 Ok(input) => input,
138 Err(err) => {
139 return Ok(SpawnAgentToolResult {
140 output: format!("spawn_agent denied: invalid args ({err})"),
141 metadata: json!({
142 "ok": false,
143 "code": "SPAWN_INVALID_ARGS",
144 "error": err.to_string(),
145 }),
146 });
147 }
148 };
149 let req = SpawnRequest {
150 mission_id: input.mission_id,
151 parent_instance_id: input.parent_instance_id,
152 source: input.source.unwrap_or(SpawnSource::ToolCall),
153 parent_role: None,
154 role: input.role,
155 template_id: input.template_id,
156 justification: input.justification,
157 budget_override: input.budget_override,
158 };
159
160 let event_ctx = SpawnEventContext {
161 session_id: Some(ctx.session_id.as_str()),
162 message_id: Some(ctx.message_id.as_str()),
163 run_id: None,
164 };
165 emit_spawn_requested_with_context(&state, &req, &event_ctx);
166 let result = state.agent_teams.spawn(&state, req.clone()).await;
167 if !result.decision.allowed || result.instance.is_none() {
168 emit_spawn_denied_with_context(&state, &req, &result.decision, &event_ctx);
169 return Ok(SpawnAgentToolResult {
170 output: result
171 .decision
172 .reason
173 .clone()
174 .unwrap_or_else(|| "spawn_agent denied".to_string()),
175 metadata: json!({
176 "ok": false,
177 "code": result.decision.code,
178 "error": result.decision.reason,
179 "requiresUserApproval": result.decision.requires_user_approval,
180 }),
181 });
182 }
183 let instance = result.instance.expect("checked is_some");
184 emit_spawn_approved_with_context(&state, &req, &instance, &event_ctx);
185 Ok(SpawnAgentToolResult {
186 output: format!(
187 "spawned {} as instance {} (session {})",
188 instance.template_id, instance.instance_id, instance.session_id
189 ),
190 metadata: json!({
191 "ok": true,
192 "missionID": instance.mission_id,
193 "instanceID": instance.instance_id,
194 "sessionID": instance.session_id,
195 "runID": instance.run_id,
196 "status": instance.status,
197 "skillHash": instance.skill_hash,
198 "workspaceRoot": instance_workspace_root(&instance),
199 "workspaceRepoRoot": instance_workspace_repo_root(&instance),
200 "managedWorktree": instance_managed_worktree(&instance),
201 }),
202 })
203 })
204 }
205}
206
207#[derive(Clone)]
208pub struct ServerToolPolicyHook {
209 state: AppState,
210}
211
212impl ServerToolPolicyHook {
213 pub fn new(state: AppState) -> Self {
214 Self { state }
215 }
216}
217
218impl ToolPolicyHook for ServerToolPolicyHook {
219 fn evaluate_tool(
220 &self,
221 ctx: ToolPolicyContext,
222 ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>> {
223 let state = self.state.clone();
224 Box::pin(async move {
225 let tool = normalize_tool_name(&ctx.tool);
226 if let Some(policy) = state.routine_session_policy(&ctx.session_id).await {
227 let allowed_patterns = policy
228 .allowed_tools
229 .iter()
230 .map(|name| normalize_tool_name(name))
231 .collect::<Vec<_>>();
232 if !policy.allowed_tools.is_empty() && !any_policy_matches(&allowed_patterns, &tool)
233 {
234 let reason = format!(
235 "tool `{}` is not allowed for routine `{}` (run `{}`)",
236 tool, policy.routine_id, policy.run_id
237 );
238 state.event_bus.publish(EngineEvent::new(
239 "routine.tool.denied",
240 json!({
241 "sessionID": ctx.session_id,
242 "messageID": ctx.message_id,
243 "runID": policy.run_id,
244 "routineID": policy.routine_id,
245 "tool": tool,
246 "reason": reason,
247 "timestampMs": crate::now_ms(),
248 }),
249 ));
250 return Ok(ToolPolicyDecision {
251 allowed: false,
252 reason: Some(reason),
253 });
254 }
255 }
256
257 let Some(instance) = state
258 .agent_teams
259 .instance_for_session(&ctx.session_id)
260 .await
261 else {
262 return Ok(ToolPolicyDecision {
263 allowed: true,
264 reason: None,
265 });
266 };
267 let caps = instance.capabilities.clone();
268 let deny = evaluate_capability_deny(
269 &state,
270 &instance,
271 &tool,
272 &ctx.args,
273 &caps,
274 &ctx.session_id,
275 &ctx.message_id,
276 )
277 .await;
278 if let Some(reason) = deny {
279 state.event_bus.publish(EngineEvent::new(
280 "agent_team.capability.denied",
281 json!({
282 "sessionID": ctx.session_id,
283 "messageID": ctx.message_id,
284 "runID": instance.run_id,
285 "missionID": instance.mission_id,
286 "instanceID": instance.instance_id,
287 "tool": tool,
288 "reason": reason,
289 "timestampMs": crate::now_ms(),
290 }),
291 ));
292 return Ok(ToolPolicyDecision {
293 allowed: false,
294 reason: Some(reason),
295 });
296 }
297 Ok(ToolPolicyDecision {
298 allowed: true,
299 reason: None,
300 })
301 })
302 }
303}
304
305impl AgentTeamRuntime {
306 pub fn new(audit_path: PathBuf) -> Self {
307 Self {
308 policy: Arc::new(RwLock::new(None)),
309 templates: Arc::new(RwLock::new(HashMap::new())),
310 instances: Arc::new(RwLock::new(HashMap::new())),
311 budgets: Arc::new(RwLock::new(HashMap::new())),
312 mission_budgets: Arc::new(RwLock::new(HashMap::new())),
313 spawn_approvals: Arc::new(RwLock::new(HashMap::new())),
314 loaded_workspace: Arc::new(RwLock::new(None)),
315 audit_path: Arc::new(RwLock::new(audit_path)),
316 }
317 }
318
319 pub async fn set_audit_path(&self, path: PathBuf) {
320 *self.audit_path.write().await = path;
321 }
322
323 pub async fn list_templates(&self) -> Vec<AgentTemplate> {
324 let mut rows = self
325 .templates
326 .read()
327 .await
328 .values()
329 .cloned()
330 .collect::<Vec<_>>();
331 rows.sort_by(|a, b| a.template_id.cmp(&b.template_id));
332 rows
333 }
334
335 async fn templates_dir_for_loaded_workspace(&self) -> anyhow::Result<PathBuf> {
336 let workspace = self
337 .loaded_workspace
338 .read()
339 .await
340 .clone()
341 .ok_or_else(|| anyhow::anyhow!("agent team workspace not loaded"))?;
342 Ok(PathBuf::from(workspace)
343 .join(".tandem")
344 .join("agent-team")
345 .join("templates"))
346 }
347
348 fn template_filename(template_id: &str) -> String {
349 let safe = template_id
350 .trim()
351 .chars()
352 .map(|ch| {
353 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
354 ch
355 } else {
356 '_'
357 }
358 })
359 .collect::<String>();
360 let fallback = if safe.is_empty() {
361 "template".to_string()
362 } else {
363 safe
364 };
365 format!("{fallback}.yaml")
366 }
367
368 pub async fn upsert_template(
369 &self,
370 workspace_root: &str,
371 template: AgentTemplate,
372 ) -> anyhow::Result<AgentTemplate> {
373 self.ensure_loaded_for_workspace(workspace_root).await?;
374 let templates_dir = self.templates_dir_for_loaded_workspace().await?;
375 fs::create_dir_all(&templates_dir).await?;
376 let path = templates_dir.join(Self::template_filename(&template.template_id));
377 let payload = serde_yaml::to_string(&template)?;
378 fs::write(path, payload).await?;
379 self.templates
380 .write()
381 .await
382 .insert(template.template_id.clone(), template.clone());
383 Ok(template)
384 }
385
386 pub async fn delete_template(
387 &self,
388 workspace_root: &str,
389 template_id: &str,
390 ) -> anyhow::Result<bool> {
391 self.ensure_loaded_for_workspace(workspace_root).await?;
392 let templates_dir = self.templates_dir_for_loaded_workspace().await?;
393 let path = templates_dir.join(Self::template_filename(template_id));
394 let existed = self.templates.write().await.remove(template_id).is_some();
395 if path.exists() {
396 let _ = fs::remove_file(path).await;
397 }
398 Ok(existed)
399 }
400
401 pub async fn get_template_for_workspace(
402 &self,
403 workspace_root: &str,
404 template_id: &str,
405 ) -> anyhow::Result<Option<AgentTemplate>> {
406 self.ensure_loaded_for_workspace(workspace_root).await?;
407 Ok(self.templates.read().await.get(template_id).cloned())
408 }
409
410 pub async fn list_instances(
411 &self,
412 mission_id: Option<&str>,
413 parent_instance_id: Option<&str>,
414 status: Option<AgentInstanceStatus>,
415 ) -> Vec<AgentInstance> {
416 let mut rows = self
417 .instances
418 .read()
419 .await
420 .values()
421 .filter(|instance| {
422 if let Some(mission_id) = mission_id {
423 if instance.mission_id != mission_id {
424 return false;
425 }
426 }
427 if let Some(parent_id) = parent_instance_id {
428 if instance.parent_instance_id.as_deref() != Some(parent_id) {
429 return false;
430 }
431 }
432 if let Some(status) = &status {
433 if &instance.status != status {
434 return false;
435 }
436 }
437 true
438 })
439 .cloned()
440 .collect::<Vec<_>>();
441 rows.sort_by(|a, b| a.instance_id.cmp(&b.instance_id));
442 rows
443 }
444
445 pub async fn list_mission_summaries(&self) -> Vec<AgentMissionSummary> {
446 let instances = self.instances.read().await;
447 let mut by_mission: HashMap<String, AgentMissionSummary> = HashMap::new();
448 for instance in instances.values() {
449 let row = by_mission
450 .entry(instance.mission_id.clone())
451 .or_insert_with(|| AgentMissionSummary {
452 mission_id: instance.mission_id.clone(),
453 instance_count: 0,
454 running_count: 0,
455 completed_count: 0,
456 failed_count: 0,
457 cancelled_count: 0,
458 queued_count: 0,
459 token_used_total: 0,
460 tool_calls_used_total: 0,
461 steps_used_total: 0,
462 cost_used_usd_total: 0.0,
463 });
464 row.instance_count = row.instance_count.saturating_add(1);
465 match instance.status {
466 AgentInstanceStatus::Queued => {
467 row.queued_count = row.queued_count.saturating_add(1)
468 }
469 AgentInstanceStatus::Running => {
470 row.running_count = row.running_count.saturating_add(1)
471 }
472 AgentInstanceStatus::Completed => {
473 row.completed_count = row.completed_count.saturating_add(1)
474 }
475 AgentInstanceStatus::Failed => {
476 row.failed_count = row.failed_count.saturating_add(1)
477 }
478 AgentInstanceStatus::Cancelled => {
479 row.cancelled_count = row.cancelled_count.saturating_add(1)
480 }
481 }
482 if let Some(usage) = instance
483 .metadata
484 .as_ref()
485 .and_then(|m| m.get("budgetUsage"))
486 .and_then(|u| u.as_object())
487 {
488 row.token_used_total = row.token_used_total.saturating_add(
489 usage
490 .get("tokensUsed")
491 .and_then(|v| v.as_u64())
492 .unwrap_or(0),
493 );
494 row.tool_calls_used_total = row.tool_calls_used_total.saturating_add(
495 usage
496 .get("toolCallsUsed")
497 .and_then(|v| v.as_u64())
498 .unwrap_or(0),
499 );
500 row.steps_used_total = row
501 .steps_used_total
502 .saturating_add(usage.get("stepsUsed").and_then(|v| v.as_u64()).unwrap_or(0));
503 row.cost_used_usd_total += usage
504 .get("costUsedUsd")
505 .and_then(|v| v.as_f64())
506 .unwrap_or(0.0);
507 }
508 }
509 let mut rows = by_mission.into_values().collect::<Vec<_>>();
510 rows.sort_by(|a, b| a.mission_id.cmp(&b.mission_id));
511 rows
512 }
513
514 pub async fn instance_for_session(&self, session_id: &str) -> Option<AgentInstance> {
515 self.instances
516 .read()
517 .await
518 .values()
519 .find(|instance| instance.session_id == session_id)
520 .cloned()
521 }
522
523 pub async fn list_spawn_approvals(&self) -> Vec<PendingSpawnApproval> {
524 let mut rows = self
525 .spawn_approvals
526 .read()
527 .await
528 .values()
529 .cloned()
530 .collect::<Vec<_>>();
531 rows.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
532 rows
533 }
534
535 pub async fn ensure_loaded_for_workspace(&self, workspace_root: &str) -> anyhow::Result<()> {
536 let normalized = workspace_root.trim().to_string();
537 let already_loaded = self
538 .loaded_workspace
539 .read()
540 .await
541 .as_ref()
542 .map(|s| s == &normalized)
543 .unwrap_or(false);
544 if already_loaded {
545 return Ok(());
546 }
547
548 let root = PathBuf::from(&normalized);
549 let policy_path = root
550 .join(".tandem")
551 .join("agent-team")
552 .join("spawn-policy.yaml");
553 let templates_dir = root.join(".tandem").join("agent-team").join("templates");
554
555 let mut next_policy = None;
556 if policy_path.exists() {
557 let raw = fs::read_to_string(&policy_path)
558 .await
559 .with_context(|| format!("failed reading {}", policy_path.display()))?;
560 let parsed = serde_yaml::from_str::<SpawnPolicy>(&raw)
561 .with_context(|| format!("failed parsing {}", policy_path.display()))?;
562 next_policy = Some(parsed);
563 }
564
565 let mut next_templates = HashMap::new();
566 if templates_dir.exists() {
567 let mut entries = fs::read_dir(&templates_dir).await?;
568 while let Some(entry) = entries.next_entry().await? {
569 let path = entry.path();
570 if !path.is_file() {
571 continue;
572 }
573 let ext = path
574 .extension()
575 .and_then(|v| v.to_str())
576 .unwrap_or_default()
577 .to_ascii_lowercase();
578 if ext != "yaml" && ext != "yml" && ext != "json" {
579 continue;
580 }
581 let raw = fs::read_to_string(&path).await?;
582 let template = serde_yaml::from_str::<AgentTemplate>(&raw)
583 .with_context(|| format!("failed parsing {}", path.display()))?;
584 next_templates.insert(template.template_id.clone(), template);
585 }
586 }
587
588 *self.policy.write().await = next_policy;
589 *self.templates.write().await = next_templates;
590 *self.loaded_workspace.write().await = Some(normalized);
591 Ok(())
592 }
593
594 pub async fn spawn(&self, state: &AppState, req: SpawnRequest) -> SpawnResult {
595 self.spawn_with_approval_override(state, req, false).await
596 }
597
598 async fn spawn_with_approval_override(
599 &self,
600 state: &AppState,
601 mut req: SpawnRequest,
602 approval_override: bool,
603 ) -> SpawnResult {
604 let workspace_root = state.workspace_index.snapshot().await.root;
605 if let Err(err) = self.ensure_loaded_for_workspace(&workspace_root).await {
606 return SpawnResult {
607 decision: SpawnDecision {
608 allowed: false,
609 code: Some(SpawnDenyCode::SpawnPolicyMissing),
610 reason: Some(format!("spawn policy load failed: {}", err)),
611 requires_user_approval: false,
612 },
613 instance: None,
614 };
615 }
616
617 let Some(policy) = self.policy.read().await.clone() else {
618 return SpawnResult {
619 decision: SpawnDecision {
620 allowed: false,
621 code: Some(SpawnDenyCode::SpawnPolicyMissing),
622 reason: Some("spawn policy file missing".to_string()),
623 requires_user_approval: false,
624 },
625 instance: None,
626 };
627 };
628
629 let template = {
630 let templates = self.templates.read().await;
631 req.template_id
632 .as_deref()
633 .and_then(|template_id| templates.get(template_id).cloned())
634 };
635 if req.template_id.is_none() {
636 if let Some(found) = self
637 .templates
638 .read()
639 .await
640 .values()
641 .find(|t| t.role == req.role)
642 .cloned()
643 {
644 req.template_id = Some(found.template_id.clone());
645 }
646 }
647 let template = if template.is_some() {
648 template
649 } else {
650 let templates = self.templates.read().await;
651 req.template_id
652 .as_deref()
653 .and_then(|id| templates.get(id).cloned())
654 };
655
656 if req.parent_role.is_none() {
657 if let Some(parent_id) = req.parent_instance_id.as_deref() {
658 let instances = self.instances.read().await;
659 req.parent_role = instances
660 .get(parent_id)
661 .map(|instance| instance.role.clone());
662 }
663 }
664
665 let instances = self.instances.read().await;
666 let total_agents = instances.len();
667 let running_agents = instances
668 .values()
669 .filter(|instance| instance.status == AgentInstanceStatus::Running)
670 .count();
671 drop(instances);
672
673 let mut decision = policy.evaluate(&req, total_agents, running_agents, template.as_ref());
674 if approval_override
675 && !decision.allowed
676 && decision.requires_user_approval
677 && matches!(decision.code, Some(SpawnDenyCode::SpawnRequiresApproval))
678 {
679 decision = SpawnDecision {
680 allowed: true,
681 code: None,
682 reason: None,
683 requires_user_approval: false,
684 };
685 }
686 if !decision.allowed {
687 if decision.requires_user_approval && !approval_override {
688 self.queue_spawn_approval(&req, &decision).await;
689 }
690 return SpawnResult {
691 decision,
692 instance: None,
693 };
694 }
695
696 let mission_id = req
697 .mission_id
698 .clone()
699 .unwrap_or_else(|| "mission-default".to_string());
700
701 if let Some(reason) = self
702 .mission_budget_exceeded_reason(&policy, &mission_id)
703 .await
704 {
705 return SpawnResult {
706 decision: SpawnDecision {
707 allowed: false,
708 code: Some(SpawnDenyCode::SpawnMissionBudgetExceeded),
709 reason: Some(reason),
710 requires_user_approval: false,
711 },
712 instance: None,
713 };
714 }
715
716 let template = template.unwrap_or_else(|| AgentTemplate {
717 template_id: "default-template".to_string(),
718 display_name: None,
719 avatar_url: None,
720 role: req.role.clone(),
721 system_prompt: None,
722 default_model: None,
723 skills: Vec::new(),
724 default_budget: BudgetLimit::default(),
725 capabilities: Default::default(),
726 });
727
728 let skill_hash = match compute_skill_hash(&workspace_root, &template, &policy).await {
729 Ok(hash) => hash,
730 Err(err) => {
731 let lowered = err.to_ascii_lowercase();
732 let code = if lowered.contains("pinned hash mismatch") {
733 SpawnDenyCode::SpawnSkillHashMismatch
734 } else if lowered.contains("skill source denied") {
735 SpawnDenyCode::SpawnSkillSourceDenied
736 } else {
737 SpawnDenyCode::SpawnRequiredSkillMissing
738 };
739 return SpawnResult {
740 decision: SpawnDecision {
741 allowed: false,
742 code: Some(code),
743 reason: Some(err),
744 requires_user_approval: false,
745 },
746 instance: None,
747 };
748 }
749 };
750
751 let parent_snapshot = {
752 let instances = self.instances.read().await;
753 req.parent_instance_id
754 .as_deref()
755 .and_then(|id| instances.get(id).cloned())
756 };
757 let parent_usage = if let Some(parent_id) = req.parent_instance_id.as_deref() {
758 self.budgets.read().await.get(parent_id).cloned()
759 } else {
760 None
761 };
762
763 let budget = resolve_budget(
764 &policy,
765 parent_snapshot,
766 parent_usage,
767 &template,
768 req.budget_override.clone(),
769 &req.role,
770 );
771
772 let instance_id = format!("ins_{}", Uuid::new_v4().simple());
773 let managed_worktree = prepare_agent_instance_workspace(
774 state,
775 &workspace_root,
776 req.mission_id.as_deref(),
777 &instance_id,
778 &template.template_id,
779 )
780 .await;
781 let workspace_repo_root = managed_worktree
782 .as_ref()
783 .map(|row| row.record.repo_root.clone())
784 .or_else(|| crate::runtime::worktrees::resolve_git_repo_root(&workspace_root))
785 .unwrap_or_else(|| workspace_root.clone());
786 let worker_workspace_root = managed_worktree
787 .as_ref()
788 .map(|row| row.record.path.clone())
789 .unwrap_or_else(|| workspace_root.clone());
790 let mut session = Session::new(
791 Some(format!("Agent Team {}", template.template_id)),
792 Some(worker_workspace_root.clone()),
793 );
794 session.workspace_root = Some(worker_workspace_root.clone());
795 let session_id = session.id.clone();
796 if let Err(err) = state.storage.save_session(session).await {
797 if let Some(worktree) = managed_worktree.as_ref() {
798 let _ = crate::runtime::worktrees::delete_managed_worktree(state, &worktree.record)
799 .await;
800 }
801 return SpawnResult {
802 decision: SpawnDecision {
803 allowed: false,
804 code: Some(SpawnDenyCode::SpawnPolicyMissing),
805 reason: Some(format!("failed creating child session: {}", err)),
806 requires_user_approval: false,
807 },
808 instance: None,
809 };
810 }
811
812 let instance = AgentInstance {
813 instance_id: instance_id.clone(),
814 mission_id: mission_id.clone(),
815 parent_instance_id: req.parent_instance_id.clone(),
816 role: template.role.clone(),
817 template_id: template.template_id.clone(),
818 session_id: session_id.clone(),
819 run_id: None,
820 status: AgentInstanceStatus::Running,
821 budget,
822 skill_hash: skill_hash.clone(),
823 capabilities: template.capabilities.clone(),
824 metadata: Some(json!({
825 "source": req.source,
826 "justification": req.justification,
827 "workspaceRoot": worker_workspace_root,
828 "workspaceRepoRoot": workspace_repo_root,
829 "managedWorktree": managed_worktree.as_ref().map(|row| json!({
830 "path": row.record.path,
831 "branch": row.record.branch,
832 "repoRoot": row.record.repo_root,
833 "cleanupBranch": row.record.cleanup_branch,
834 "reused": row.reused,
835 })).unwrap_or(Value::Null),
836 })),
837 };
838
839 self.instances
840 .write()
841 .await
842 .insert(instance.instance_id.clone(), instance.clone());
843 self.budgets.write().await.insert(
844 instance.instance_id.clone(),
845 InstanceBudgetState {
846 started_at: Some(Instant::now()),
847 ..InstanceBudgetState::default()
848 },
849 );
850 let _ = self.append_audit("spawn.approved", &instance).await;
851
852 SpawnResult {
853 decision: SpawnDecision {
854 allowed: true,
855 code: None,
856 reason: None,
857 requires_user_approval: false,
858 },
859 instance: Some(instance),
860 }
861 }
862
863 pub async fn approve_spawn_approval(
864 &self,
865 state: &AppState,
866 approval_id: &str,
867 reason: Option<&str>,
868 ) -> Option<SpawnResult> {
869 let approval = self.spawn_approvals.write().await.remove(approval_id)?;
870 let result = self
871 .spawn_with_approval_override(state, approval.request.clone(), true)
872 .await;
873 if let Some(instance) = result.instance.as_ref() {
874 let note = reason.unwrap_or("approved by operator");
875 let _ = self
876 .append_approval_audit("spawn.approval.approved", approval_id, Some(instance), note)
877 .await;
878 } else {
879 let note = reason.unwrap_or("approval replay failed policy checks");
880 let _ = self
881 .append_approval_audit("spawn.approval.rejected_on_replay", approval_id, None, note)
882 .await;
883 }
884 Some(result)
885 }
886
887 pub async fn deny_spawn_approval(
888 &self,
889 approval_id: &str,
890 reason: Option<&str>,
891 ) -> Option<PendingSpawnApproval> {
892 let approval = self.spawn_approvals.write().await.remove(approval_id)?;
893 let note = reason.unwrap_or("denied by operator");
894 let _ = self
895 .append_approval_audit("spawn.approval.denied", approval_id, None, note)
896 .await;
897 Some(approval)
898 }
899
900 pub async fn cancel_instance(
901 &self,
902 state: &AppState,
903 instance_id: &str,
904 reason: &str,
905 ) -> Option<AgentInstance> {
906 let mut instances = self.instances.write().await;
907 let instance = instances.get_mut(instance_id)?;
908 if matches!(
909 instance.status,
910 AgentInstanceStatus::Completed
911 | AgentInstanceStatus::Failed
912 | AgentInstanceStatus::Cancelled
913 ) {
914 return Some(instance.clone());
915 }
916 instance.status = AgentInstanceStatus::Cancelled;
917 let snapshot = instance.clone();
918 drop(instances);
919 let _ = state.cancellations.cancel(&snapshot.session_id).await;
920 cleanup_instance_managed_worktree(state, &snapshot).await;
921 let _ = self.append_audit("instance.cancelled", &snapshot).await;
922 emit_instance_cancelled(state, &snapshot, reason);
923 Some(snapshot)
924 }
925
926 async fn queue_spawn_approval(&self, req: &SpawnRequest, decision: &SpawnDecision) {
927 let approval = PendingSpawnApproval {
928 approval_id: format!("spawn_{}", Uuid::new_v4().simple()),
929 created_at_ms: crate::now_ms(),
930 request: req.clone(),
931 decision_code: decision.code.clone(),
932 reason: decision.reason.clone(),
933 };
934 self.spawn_approvals
935 .write()
936 .await
937 .insert(approval.approval_id.clone(), approval);
938 }
939
940 async fn mission_budget_exceeded_reason(
941 &self,
942 policy: &SpawnPolicy,
943 mission_id: &str,
944 ) -> Option<String> {
945 let Some(limit) = policy.mission_total_budget.as_ref() else {
946 return None;
947 };
948 let usage = self
949 .mission_budgets
950 .read()
951 .await
952 .get(mission_id)
953 .cloned()
954 .unwrap_or_default();
955 if let Some(max) = limit.max_tokens {
956 if usage.tokens_used >= max {
957 return Some(format!(
958 "mission max_tokens exhausted ({}/{})",
959 usage.tokens_used, max
960 ));
961 }
962 }
963 if let Some(max) = limit.max_steps {
964 if usage.steps_used >= u64::from(max) {
965 return Some(format!(
966 "mission max_steps exhausted ({}/{})",
967 usage.steps_used, max
968 ));
969 }
970 }
971 if let Some(max) = limit.max_tool_calls {
972 if usage.tool_calls_used >= u64::from(max) {
973 return Some(format!(
974 "mission max_tool_calls exhausted ({}/{})",
975 usage.tool_calls_used, max
976 ));
977 }
978 }
979 if let Some(max) = limit.max_cost_usd {
980 if usage.cost_used_usd >= max {
981 return Some(format!(
982 "mission max_cost_usd exhausted ({:.6}/{:.6})",
983 usage.cost_used_usd, max
984 ));
985 }
986 }
987 None
988 }
989
990 pub async fn cancel_mission(&self, state: &AppState, mission_id: &str, reason: &str) -> usize {
991 let instance_ids = self
992 .instances
993 .read()
994 .await
995 .values()
996 .filter(|instance| instance.mission_id == mission_id)
997 .map(|instance| instance.instance_id.clone())
998 .collect::<Vec<_>>();
999 let mut count = 0usize;
1000 for instance_id in instance_ids {
1001 if self
1002 .cancel_instance(state, &instance_id, reason)
1003 .await
1004 .is_some()
1005 {
1006 count = count.saturating_add(1);
1007 }
1008 }
1009 count
1010 }
1011
1012 async fn mark_instance_terminal(
1013 &self,
1014 state: &AppState,
1015 instance_id: &str,
1016 status: AgentInstanceStatus,
1017 ) -> Option<AgentInstance> {
1018 let mut instances = self.instances.write().await;
1019 let instance = instances.get_mut(instance_id)?;
1020 if matches!(
1021 instance.status,
1022 AgentInstanceStatus::Completed
1023 | AgentInstanceStatus::Failed
1024 | AgentInstanceStatus::Cancelled
1025 ) {
1026 return Some(instance.clone());
1027 }
1028 instance.status = status.clone();
1029 let snapshot = instance.clone();
1030 drop(instances);
1031 cleanup_instance_managed_worktree(state, &snapshot).await;
1032 match status {
1033 AgentInstanceStatus::Completed => emit_instance_completed(state, &snapshot),
1034 AgentInstanceStatus::Failed => emit_instance_failed(state, &snapshot),
1035 _ => {}
1036 }
1037 Some(snapshot)
1038 }
1039
1040 pub async fn handle_engine_event(&self, state: &AppState, event: &EngineEvent) {
1041 let Some(session_id) = extract_session_id(event) else {
1042 return;
1043 };
1044 let Some(instance_id) = self.instance_id_for_session(&session_id).await else {
1045 return;
1046 };
1047 if event.event_type == "provider.usage" {
1048 let total_tokens = event
1049 .properties
1050 .get("totalTokens")
1051 .and_then(|v| v.as_u64())
1052 .unwrap_or(0);
1053 let cost_used_usd = event
1054 .properties
1055 .get("costUsd")
1056 .and_then(|v| v.as_f64())
1057 .unwrap_or(0.0);
1058 if total_tokens > 0 {
1059 let exhausted = self
1060 .apply_exact_token_usage(state, &instance_id, total_tokens, cost_used_usd)
1061 .await;
1062 if exhausted {
1063 let _ = self
1064 .cancel_instance(state, &instance_id, "budget exhausted")
1065 .await;
1066 }
1067 }
1068 return;
1069 }
1070 let mut delta_tokens = 0u64;
1071 let mut delta_steps = 0u32;
1072 let mut delta_tool_calls = 0u32;
1073 if event.event_type == "message.part.updated" {
1074 if let Some(part) = event.properties.get("part") {
1075 let part_type = part.get("type").and_then(|v| v.as_str()).unwrap_or("");
1076 if part_type == "tool-invocation" {
1077 delta_tool_calls = 1;
1078 } else if part_type == "text" {
1079 let delta = event
1080 .properties
1081 .get("delta")
1082 .and_then(|v| v.as_str())
1083 .unwrap_or("");
1084 if !delta.is_empty() {
1085 delta_tokens = estimate_tokens(delta);
1086 }
1087 }
1088 }
1089 } else if event.event_type == "session.run.finished" {
1090 delta_steps = 1;
1091 let run_status = event
1092 .properties
1093 .get("status")
1094 .and_then(|v| v.as_str())
1095 .unwrap_or("")
1096 .to_ascii_lowercase();
1097 if run_status == "completed" {
1098 let _ = self
1099 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Completed)
1100 .await;
1101 } else if run_status == "failed" || run_status == "error" {
1102 let _ = self
1103 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Failed)
1104 .await;
1105 }
1106 }
1107 if delta_tokens == 0 && delta_steps == 0 && delta_tool_calls == 0 {
1108 return;
1109 }
1110 let exhausted = self
1111 .apply_budget_delta(
1112 state,
1113 &instance_id,
1114 delta_tokens,
1115 delta_steps,
1116 delta_tool_calls,
1117 )
1118 .await;
1119 if exhausted {
1120 let _ = self
1121 .cancel_instance(state, &instance_id, "budget exhausted")
1122 .await;
1123 }
1124 }
1125
1126 async fn instance_id_for_session(&self, session_id: &str) -> Option<String> {
1127 self.instances
1128 .read()
1129 .await
1130 .values()
1131 .find(|instance| instance.session_id == session_id)
1132 .map(|instance| instance.instance_id.clone())
1133 }
1134
1135 async fn apply_budget_delta(
1136 &self,
1137 state: &AppState,
1138 instance_id: &str,
1139 delta_tokens: u64,
1140 delta_steps: u32,
1141 delta_tool_calls: u32,
1142 ) -> bool {
1143 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1144 enabled: false,
1145 require_justification: false,
1146 max_agents: None,
1147 max_concurrent: None,
1148 child_budget_percent_of_parent_remaining: None,
1149 mission_total_budget: None,
1150 cost_per_1k_tokens_usd: None,
1151 spawn_edges: HashMap::new(),
1152 required_skills: HashMap::new(),
1153 role_defaults: HashMap::new(),
1154 skill_sources: Default::default(),
1155 });
1156 let mut budgets = self.budgets.write().await;
1157 let Some(usage) = budgets.get_mut(instance_id) else {
1158 return false;
1159 };
1160 if usage.exhausted {
1161 return true;
1162 }
1163 let prev_cost_used_usd = usage.cost_used_usd;
1164 usage.tokens_used = usage.tokens_used.saturating_add(delta_tokens);
1165 usage.steps_used = usage.steps_used.saturating_add(delta_steps);
1166 usage.tool_calls_used = usage.tool_calls_used.saturating_add(delta_tool_calls);
1167 if let Some(rate) = policy.cost_per_1k_tokens_usd {
1168 usage.cost_used_usd += (delta_tokens as f64 / 1000.0) * rate;
1169 }
1170 let elapsed_ms = usage
1171 .started_at
1172 .map(|started| started.elapsed().as_millis() as u64)
1173 .unwrap_or(0);
1174
1175 let mut exhausted_reason: Option<&'static str> = None;
1176 let mut snapshot: Option<AgentInstance> = None;
1177 {
1178 let mut instances = self.instances.write().await;
1179 if let Some(instance) = instances.get_mut(instance_id) {
1180 instance.metadata = Some(merge_metadata_usage(
1181 instance.metadata.take(),
1182 usage.tokens_used,
1183 usage.steps_used,
1184 usage.tool_calls_used,
1185 usage.cost_used_usd,
1186 elapsed_ms,
1187 ));
1188 if let Some(limit) = instance.budget.max_tokens {
1189 if usage.tokens_used >= limit {
1190 exhausted_reason = Some("max_tokens");
1191 }
1192 }
1193 if exhausted_reason.is_none() {
1194 if let Some(limit) = instance.budget.max_steps {
1195 if usage.steps_used >= limit {
1196 exhausted_reason = Some("max_steps");
1197 }
1198 }
1199 }
1200 if exhausted_reason.is_none() {
1201 if let Some(limit) = instance.budget.max_tool_calls {
1202 if usage.tool_calls_used >= limit {
1203 exhausted_reason = Some("max_tool_calls");
1204 }
1205 }
1206 }
1207 if exhausted_reason.is_none() {
1208 if let Some(limit) = instance.budget.max_duration_ms {
1209 if elapsed_ms >= limit {
1210 exhausted_reason = Some("max_duration_ms");
1211 }
1212 }
1213 }
1214 if exhausted_reason.is_none() {
1215 if let Some(limit) = instance.budget.max_cost_usd {
1216 if usage.cost_used_usd >= limit {
1217 exhausted_reason = Some("max_cost_usd");
1218 }
1219 }
1220 }
1221 snapshot = Some(instance.clone());
1222 }
1223 }
1224 let Some(instance) = snapshot else {
1225 return false;
1226 };
1227 emit_budget_usage(
1228 state,
1229 &instance,
1230 usage.tokens_used,
1231 usage.steps_used,
1232 usage.tool_calls_used,
1233 usage.cost_used_usd,
1234 elapsed_ms,
1235 );
1236 let mission_exhausted = self
1237 .apply_mission_budget_delta(
1238 state,
1239 &instance.mission_id,
1240 delta_tokens,
1241 u64::from(delta_steps),
1242 u64::from(delta_tool_calls),
1243 usage.cost_used_usd - prev_cost_used_usd,
1244 &policy,
1245 )
1246 .await;
1247 if mission_exhausted {
1248 usage.exhausted = true;
1249 let _ = self
1250 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1251 .await;
1252 return true;
1253 }
1254 if let Some(reason) = exhausted_reason {
1255 usage.exhausted = true;
1256 emit_budget_exhausted(
1257 state,
1258 &instance,
1259 reason,
1260 usage.tokens_used,
1261 usage.steps_used,
1262 usage.tool_calls_used,
1263 usage.cost_used_usd,
1264 elapsed_ms,
1265 );
1266 return true;
1267 }
1268 false
1269 }
1270
1271 async fn apply_exact_token_usage(
1272 &self,
1273 state: &AppState,
1274 instance_id: &str,
1275 total_tokens: u64,
1276 cost_used_usd: f64,
1277 ) -> bool {
1278 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1279 enabled: false,
1280 require_justification: false,
1281 max_agents: None,
1282 max_concurrent: None,
1283 child_budget_percent_of_parent_remaining: None,
1284 mission_total_budget: None,
1285 cost_per_1k_tokens_usd: None,
1286 spawn_edges: HashMap::new(),
1287 required_skills: HashMap::new(),
1288 role_defaults: HashMap::new(),
1289 skill_sources: Default::default(),
1290 });
1291 let mut budgets = self.budgets.write().await;
1292 let Some(usage) = budgets.get_mut(instance_id) else {
1293 return false;
1294 };
1295 if usage.exhausted {
1296 return true;
1297 }
1298 let prev_tokens = usage.tokens_used;
1299 let prev_cost_used_usd = usage.cost_used_usd;
1300 usage.tokens_used = usage.tokens_used.max(total_tokens);
1301 if cost_used_usd > 0.0 {
1302 usage.cost_used_usd = usage.cost_used_usd.max(cost_used_usd);
1303 } else if let Some(rate) = policy.cost_per_1k_tokens_usd {
1304 let delta = usage.tokens_used.saturating_sub(prev_tokens);
1305 usage.cost_used_usd += (delta as f64 / 1000.0) * rate;
1306 }
1307 let elapsed_ms = usage
1308 .started_at
1309 .map(|started| started.elapsed().as_millis() as u64)
1310 .unwrap_or(0);
1311 let mut exhausted_reason: Option<&'static str> = None;
1312 let mut snapshot: Option<AgentInstance> = None;
1313 {
1314 let mut instances = self.instances.write().await;
1315 if let Some(instance) = instances.get_mut(instance_id) {
1316 instance.metadata = Some(merge_metadata_usage(
1317 instance.metadata.take(),
1318 usage.tokens_used,
1319 usage.steps_used,
1320 usage.tool_calls_used,
1321 usage.cost_used_usd,
1322 elapsed_ms,
1323 ));
1324 if let Some(limit) = instance.budget.max_tokens {
1325 if usage.tokens_used >= limit {
1326 exhausted_reason = Some("max_tokens");
1327 }
1328 }
1329 if exhausted_reason.is_none() {
1330 if let Some(limit) = instance.budget.max_cost_usd {
1331 if usage.cost_used_usd >= limit {
1332 exhausted_reason = Some("max_cost_usd");
1333 }
1334 }
1335 }
1336 snapshot = Some(instance.clone());
1337 }
1338 }
1339 let Some(instance) = snapshot else {
1340 return false;
1341 };
1342 emit_budget_usage(
1343 state,
1344 &instance,
1345 usage.tokens_used,
1346 usage.steps_used,
1347 usage.tool_calls_used,
1348 usage.cost_used_usd,
1349 elapsed_ms,
1350 );
1351 let mission_exhausted = self
1352 .apply_mission_budget_delta(
1353 state,
1354 &instance.mission_id,
1355 usage.tokens_used.saturating_sub(prev_tokens),
1356 0,
1357 0,
1358 usage.cost_used_usd - prev_cost_used_usd,
1359 &policy,
1360 )
1361 .await;
1362 if mission_exhausted {
1363 usage.exhausted = true;
1364 let _ = self
1365 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1366 .await;
1367 return true;
1368 }
1369 if let Some(reason) = exhausted_reason {
1370 usage.exhausted = true;
1371 emit_budget_exhausted(
1372 state,
1373 &instance,
1374 reason,
1375 usage.tokens_used,
1376 usage.steps_used,
1377 usage.tool_calls_used,
1378 usage.cost_used_usd,
1379 elapsed_ms,
1380 );
1381 return true;
1382 }
1383 false
1384 }
1385
1386 async fn append_audit(&self, action: &str, instance: &AgentInstance) -> anyhow::Result<()> {
1387 let path = self.audit_path.read().await.clone();
1388 if let Some(parent) = path.parent() {
1389 fs::create_dir_all(parent).await?;
1390 }
1391 let row = json!({
1392 "action": action,
1393 "missionID": instance.mission_id,
1394 "instanceID": instance.instance_id,
1395 "parentInstanceID": instance.parent_instance_id,
1396 "role": instance.role,
1397 "templateID": instance.template_id,
1398 "sessionID": instance.session_id,
1399 "skillHash": instance.skill_hash,
1400 "workspaceRoot": instance_workspace_root(instance),
1401 "workspaceRepoRoot": instance_workspace_repo_root(instance),
1402 "managedWorktree": instance_managed_worktree(instance),
1403 "timestampMs": crate::now_ms(),
1404 });
1405 let mut existing = if path.exists() {
1406 fs::read_to_string(&path).await.unwrap_or_default()
1407 } else {
1408 String::new()
1409 };
1410 existing.push_str(&serde_json::to_string(&row)?);
1411 existing.push('\n');
1412 fs::write(path, existing).await?;
1413 Ok(())
1414 }
1415
1416 async fn append_approval_audit(
1417 &self,
1418 action: &str,
1419 approval_id: &str,
1420 instance: Option<&AgentInstance>,
1421 reason: &str,
1422 ) -> anyhow::Result<()> {
1423 let path = self.audit_path.read().await.clone();
1424 if let Some(parent) = path.parent() {
1425 fs::create_dir_all(parent).await?;
1426 }
1427 let row = json!({
1428 "action": action,
1429 "approvalID": approval_id,
1430 "reason": reason,
1431 "missionID": instance.map(|v| v.mission_id.clone()),
1432 "instanceID": instance.map(|v| v.instance_id.clone()),
1433 "parentInstanceID": instance.and_then(|v| v.parent_instance_id.clone()),
1434 "role": instance.map(|v| v.role.clone()),
1435 "templateID": instance.map(|v| v.template_id.clone()),
1436 "sessionID": instance.map(|v| v.session_id.clone()),
1437 "skillHash": instance.map(|v| v.skill_hash.clone()),
1438 "workspaceRoot": instance.and_then(instance_workspace_root),
1439 "workspaceRepoRoot": instance.and_then(instance_workspace_repo_root),
1440 "managedWorktree": instance.and_then(instance_managed_worktree),
1441 "timestampMs": crate::now_ms(),
1442 });
1443 let mut existing = if path.exists() {
1444 fs::read_to_string(&path).await.unwrap_or_default()
1445 } else {
1446 String::new()
1447 };
1448 existing.push_str(&serde_json::to_string(&row)?);
1449 existing.push('\n');
1450 fs::write(path, existing).await?;
1451 Ok(())
1452 }
1453
1454 async fn apply_mission_budget_delta(
1455 &self,
1456 state: &AppState,
1457 mission_id: &str,
1458 delta_tokens: u64,
1459 delta_steps: u64,
1460 delta_tool_calls: u64,
1461 delta_cost_used_usd: f64,
1462 policy: &SpawnPolicy,
1463 ) -> bool {
1464 let mut budgets = self.mission_budgets.write().await;
1465 let row = budgets.entry(mission_id.to_string()).or_default();
1466 row.tokens_used = row.tokens_used.saturating_add(delta_tokens);
1467 row.steps_used = row.steps_used.saturating_add(delta_steps);
1468 row.tool_calls_used = row.tool_calls_used.saturating_add(delta_tool_calls);
1469 row.cost_used_usd += delta_cost_used_usd.max(0.0);
1470 if row.exhausted {
1471 return true;
1472 }
1473 let Some(limit) = policy.mission_total_budget.as_ref() else {
1474 return false;
1475 };
1476 let mut exhausted_by: Option<&'static str> = None;
1477 if let Some(max) = limit.max_tokens {
1478 if row.tokens_used >= max {
1479 exhausted_by = Some("mission_max_tokens");
1480 }
1481 }
1482 if exhausted_by.is_none() {
1483 if let Some(max) = limit.max_steps {
1484 if row.steps_used >= u64::from(max) {
1485 exhausted_by = Some("mission_max_steps");
1486 }
1487 }
1488 }
1489 if exhausted_by.is_none() {
1490 if let Some(max) = limit.max_tool_calls {
1491 if row.tool_calls_used >= u64::from(max) {
1492 exhausted_by = Some("mission_max_tool_calls");
1493 }
1494 }
1495 }
1496 if exhausted_by.is_none() {
1497 if let Some(max) = limit.max_cost_usd {
1498 if row.cost_used_usd >= max {
1499 exhausted_by = Some("mission_max_cost_usd");
1500 }
1501 }
1502 }
1503 if let Some(exhausted_by) = exhausted_by {
1504 row.exhausted = true;
1505 emit_mission_budget_exhausted(
1506 state,
1507 mission_id,
1508 exhausted_by,
1509 row.tokens_used,
1510 row.steps_used,
1511 row.tool_calls_used,
1512 row.cost_used_usd,
1513 );
1514 return true;
1515 }
1516 false
1517 }
1518
1519 pub async fn set_for_test(
1520 &self,
1521 workspace_root: Option<String>,
1522 policy: Option<SpawnPolicy>,
1523 templates: Vec<AgentTemplate>,
1524 ) {
1525 *self.policy.write().await = policy;
1526 let mut by_id = HashMap::new();
1527 for template in templates {
1528 by_id.insert(template.template_id.clone(), template);
1529 }
1530 *self.templates.write().await = by_id;
1531 self.instances.write().await.clear();
1532 self.budgets.write().await.clear();
1533 self.mission_budgets.write().await.clear();
1534 self.spawn_approvals.write().await.clear();
1535 *self.loaded_workspace.write().await = workspace_root;
1536 }
1537}
1538
1539fn resolve_budget(
1540 policy: &SpawnPolicy,
1541 parent_instance: Option<AgentInstance>,
1542 parent_usage: Option<InstanceBudgetState>,
1543 template: &AgentTemplate,
1544 override_budget: Option<BudgetLimit>,
1545 role: &AgentRole,
1546) -> BudgetLimit {
1547 let role_default = policy.role_defaults.get(role).cloned().unwrap_or_default();
1548 let mut chosen = merge_budget(
1549 merge_budget(role_default, template.default_budget.clone()),
1550 override_budget.unwrap_or_default(),
1551 );
1552
1553 if let Some(parent) = parent_instance {
1554 let usage = parent_usage.unwrap_or_default();
1555 if let Some(pct) = policy.child_budget_percent_of_parent_remaining {
1556 if pct > 0 {
1557 chosen.max_tokens = cap_budget_remaining_u64(
1558 chosen.max_tokens,
1559 parent.budget.max_tokens,
1560 usage.tokens_used,
1561 pct,
1562 );
1563 chosen.max_steps = cap_budget_remaining_u32(
1564 chosen.max_steps,
1565 parent.budget.max_steps,
1566 usage.steps_used,
1567 pct,
1568 );
1569 chosen.max_tool_calls = cap_budget_remaining_u32(
1570 chosen.max_tool_calls,
1571 parent.budget.max_tool_calls,
1572 usage.tool_calls_used,
1573 pct,
1574 );
1575 chosen.max_duration_ms = cap_budget_remaining_u64(
1576 chosen.max_duration_ms,
1577 parent.budget.max_duration_ms,
1578 usage
1579 .started_at
1580 .map(|started| started.elapsed().as_millis() as u64)
1581 .unwrap_or(0),
1582 pct,
1583 );
1584 chosen.max_cost_usd = cap_budget_remaining_f64(
1585 chosen.max_cost_usd,
1586 parent.budget.max_cost_usd,
1587 usage.cost_used_usd,
1588 pct,
1589 );
1590 }
1591 }
1592 }
1593 chosen
1594}
1595
1596fn merge_budget(base: BudgetLimit, overlay: BudgetLimit) -> BudgetLimit {
1597 BudgetLimit {
1598 max_tokens: overlay.max_tokens.or(base.max_tokens),
1599 max_steps: overlay.max_steps.or(base.max_steps),
1600 max_tool_calls: overlay.max_tool_calls.or(base.max_tool_calls),
1601 max_duration_ms: overlay.max_duration_ms.or(base.max_duration_ms),
1602 max_cost_usd: overlay.max_cost_usd.or(base.max_cost_usd),
1603 }
1604}
1605
1606fn cap_budget_remaining_u64(
1607 child: Option<u64>,
1608 parent_limit: Option<u64>,
1609 parent_used: u64,
1610 pct: u8,
1611) -> Option<u64> {
1612 match (child, parent_limit) {
1613 (Some(child), Some(parent_limit)) => {
1614 let remaining = parent_limit.saturating_sub(parent_used);
1615 Some(child.min(remaining.saturating_mul(pct as u64) / 100))
1616 }
1617 (None, Some(parent_limit)) => {
1618 let remaining = parent_limit.saturating_sub(parent_used);
1619 Some(remaining.saturating_mul(pct as u64) / 100)
1620 }
1621 (Some(child), None) => Some(child),
1622 (None, None) => None,
1623 }
1624}
1625
1626fn cap_budget_remaining_u32(
1627 child: Option<u32>,
1628 parent_limit: Option<u32>,
1629 parent_used: u32,
1630 pct: u8,
1631) -> Option<u32> {
1632 match (child, parent_limit) {
1633 (Some(child), Some(parent_limit)) => {
1634 let remaining = parent_limit.saturating_sub(parent_used);
1635 Some(child.min(remaining.saturating_mul(pct as u32) / 100))
1636 }
1637 (None, Some(parent_limit)) => {
1638 let remaining = parent_limit.saturating_sub(parent_used);
1639 Some(remaining.saturating_mul(pct as u32) / 100)
1640 }
1641 (Some(child), None) => Some(child),
1642 (None, None) => None,
1643 }
1644}
1645
1646fn cap_budget_remaining_f64(
1647 child: Option<f64>,
1648 parent_limit: Option<f64>,
1649 parent_used: f64,
1650 pct: u8,
1651) -> Option<f64> {
1652 match (child, parent_limit) {
1653 (Some(child), Some(parent_limit)) => {
1654 let remaining = (parent_limit - parent_used).max(0.0);
1655 Some(child.min(remaining * f64::from(pct) / 100.0))
1656 }
1657 (None, Some(parent_limit)) => {
1658 let remaining = (parent_limit - parent_used).max(0.0);
1659 Some(remaining * f64::from(pct) / 100.0)
1660 }
1661 (Some(child), None) => Some(child),
1662 (None, None) => None,
1663 }
1664}
1665
1666async fn compute_skill_hash(
1667 workspace_root: &str,
1668 template: &AgentTemplate,
1669 policy: &SpawnPolicy,
1670) -> Result<String, String> {
1671 use sha2::{Digest, Sha256};
1672 let mut rows = Vec::new();
1673 let skill_service = SkillService::for_workspace(Some(PathBuf::from(workspace_root)));
1674 for skill in &template.skills {
1675 if let Some(path) = skill.path.as_deref() {
1676 validate_skill_source(skill.id.as_deref(), Some(path), policy)?;
1677 let skill_path = Path::new(workspace_root).join(path);
1678 let raw = fs::read_to_string(&skill_path)
1679 .await
1680 .map_err(|_| format!("missing required skill path `{}`", skill_path.display()))?;
1681 let digest = hash_hex(raw.as_bytes());
1682 validate_pinned_hash(skill.id.as_deref(), Some(path), &digest, policy)?;
1683 rows.push(format!("path:{}:{}", path, digest));
1684 } else if let Some(id) = skill.id.as_deref() {
1685 validate_skill_source(Some(id), None, policy)?;
1686 let loaded = skill_service
1687 .load_skill(id)
1688 .map_err(|err| format!("failed loading skill `{id}`: {err}"))?;
1689 let Some(loaded) = loaded else {
1690 return Err(format!("missing required skill id `{id}`"));
1691 };
1692 let digest = hash_hex(loaded.content.as_bytes());
1693 validate_pinned_hash(Some(id), None, &digest, policy)?;
1694 rows.push(format!("id:{}:{}", id, digest));
1695 }
1696 }
1697 rows.sort();
1698 let mut hasher = Sha256::new();
1699 for row in rows {
1700 hasher.update(row.as_bytes());
1701 hasher.update(b"\n");
1702 }
1703 let digest = hasher.finalize();
1704 Ok(format!("sha256:{}", hash_hex(digest.as_slice())))
1705}
1706
1707fn validate_skill_source(
1708 id: Option<&str>,
1709 path: Option<&str>,
1710 policy: &SpawnPolicy,
1711) -> Result<(), String> {
1712 use tandem_orchestrator::SkillSourceMode;
1713 match policy.skill_sources.mode {
1714 SkillSourceMode::Any => Ok(()),
1715 SkillSourceMode::ProjectOnly => {
1716 if id.is_some() {
1717 return Err("skill source denied: project_only forbids skill IDs".to_string());
1718 }
1719 let Some(path) = path else {
1720 return Err("skill source denied: project_only requires skill path".to_string());
1721 };
1722 let p = PathBuf::from(path);
1723 if p.is_absolute() {
1724 return Err("skill source denied: absolute skill paths are forbidden".to_string());
1725 }
1726 Ok(())
1727 }
1728 SkillSourceMode::Allowlist => {
1729 if let Some(id) = id {
1730 if policy.skill_sources.allowlist_ids.iter().any(|v| v == id) {
1731 return Ok(());
1732 }
1733 }
1734 if let Some(path) = path {
1735 if policy
1736 .skill_sources
1737 .allowlist_paths
1738 .iter()
1739 .any(|v| v == path)
1740 {
1741 return Ok(());
1742 }
1743 }
1744 Err("skill source denied: not present in allowlist".to_string())
1745 }
1746 }
1747}
1748
1749fn validate_pinned_hash(
1750 id: Option<&str>,
1751 path: Option<&str>,
1752 digest: &str,
1753 policy: &SpawnPolicy,
1754) -> Result<(), String> {
1755 let by_id = id.and_then(|id| policy.skill_sources.pinned_hashes.get(&format!("id:{id}")));
1756 let by_path = path.and_then(|path| {
1757 policy
1758 .skill_sources
1759 .pinned_hashes
1760 .get(&format!("path:{path}"))
1761 });
1762 let expected = by_id.or(by_path);
1763 if let Some(expected) = expected {
1764 let normalized = expected.strip_prefix("sha256:").unwrap_or(expected);
1765 if normalized != digest {
1766 return Err("pinned hash mismatch for skill reference".to_string());
1767 }
1768 }
1769 Ok(())
1770}
1771
1772fn hash_hex(bytes: &[u8]) -> String {
1773 let mut out = String::with_capacity(bytes.len() * 2);
1774 for byte in bytes {
1775 use std::fmt::Write as _;
1776 let _ = write!(&mut out, "{:02x}", byte);
1777 }
1778 out
1779}
1780
1781fn estimate_tokens(text: &str) -> u64 {
1782 let chars = text.chars().count() as u64;
1783 (chars / 4).max(1)
1784}
1785
1786fn extract_session_id(event: &EngineEvent) -> Option<String> {
1787 event
1788 .properties
1789 .get("sessionID")
1790 .and_then(|v| v.as_str())
1791 .map(|v| v.to_string())
1792 .or_else(|| {
1793 event
1794 .properties
1795 .get("part")
1796 .and_then(|v| v.get("sessionID"))
1797 .and_then(|v| v.as_str())
1798 .map(|v| v.to_string())
1799 })
1800}
1801
1802fn merge_metadata_usage(
1803 metadata: Option<Value>,
1804 tokens_used: u64,
1805 steps_used: u32,
1806 tool_calls_used: u32,
1807 cost_used_usd: f64,
1808 elapsed_ms: u64,
1809) -> Value {
1810 let mut base = metadata
1811 .and_then(|v| v.as_object().cloned())
1812 .unwrap_or_default();
1813 base.insert(
1814 "budgetUsage".to_string(),
1815 json!({
1816 "tokensUsed": tokens_used,
1817 "stepsUsed": steps_used,
1818 "toolCallsUsed": tool_calls_used,
1819 "costUsedUsd": cost_used_usd,
1820 "elapsedMs": elapsed_ms
1821 }),
1822 );
1823 Value::Object(base)
1824}
1825
1826fn instance_workspace_root(instance: &AgentInstance) -> Option<Value> {
1827 instance
1828 .metadata
1829 .as_ref()
1830 .and_then(|row| row.get("workspaceRoot"))
1831 .cloned()
1832}
1833
1834fn instance_workspace_repo_root(instance: &AgentInstance) -> Option<Value> {
1835 instance
1836 .metadata
1837 .as_ref()
1838 .and_then(|row| row.get("workspaceRepoRoot"))
1839 .cloned()
1840}
1841
1842fn instance_managed_worktree(instance: &AgentInstance) -> Option<Value> {
1843 instance
1844 .metadata
1845 .as_ref()
1846 .and_then(|row| row.get("managedWorktree"))
1847 .cloned()
1848}
1849
1850async fn prepare_agent_instance_workspace(
1851 state: &AppState,
1852 workspace_root: &str,
1853 mission_id: Option<&str>,
1854 instance_id: &str,
1855 template_id: &str,
1856) -> Option<crate::runtime::worktrees::ManagedWorktreeEnsureResult> {
1857 let repo_root = crate::runtime::worktrees::resolve_git_repo_root(workspace_root)?;
1858 crate::runtime::worktrees::ensure_managed_worktree(
1859 state,
1860 crate::runtime::worktrees::ManagedWorktreeEnsureInput {
1861 repo_root,
1862 task_id: mission_id.map(ToString::to_string),
1863 owner_run_id: Some(instance_id.to_string()),
1864 lease_id: None,
1865 branch_hint: Some(template_id.to_string()),
1866 base: "HEAD".to_string(),
1867 cleanup_branch: true,
1868 },
1869 )
1870 .await
1871 .ok()
1872}
1873
1874async fn cleanup_instance_managed_worktree(state: &AppState, instance: &AgentInstance) {
1875 let Some(metadata) = instance.metadata.as_ref() else {
1876 return;
1877 };
1878 let Some(worktree) = metadata.get("managedWorktree").and_then(Value::as_object) else {
1879 return;
1880 };
1881 let Some(path) = worktree.get("path").and_then(Value::as_str) else {
1882 return;
1883 };
1884 let Some(branch) = worktree.get("branch").and_then(Value::as_str) else {
1885 return;
1886 };
1887 let Some(repo_root) = worktree.get("repoRoot").and_then(Value::as_str) else {
1888 return;
1889 };
1890 let record = crate::ManagedWorktreeRecord {
1891 key: crate::runtime::worktrees::managed_worktree_key(
1892 repo_root,
1893 instance.mission_id.as_str().into(),
1894 Some(instance.instance_id.as_str()),
1895 None,
1896 path,
1897 branch,
1898 ),
1899 repo_root: repo_root.to_string(),
1900 path: path.to_string(),
1901 branch: branch.to_string(),
1902 base: "HEAD".to_string(),
1903 managed: true,
1904 task_id: Some(instance.mission_id.clone()),
1905 owner_run_id: Some(instance.instance_id.clone()),
1906 lease_id: None,
1907 cleanup_branch: worktree
1908 .get("cleanupBranch")
1909 .and_then(Value::as_bool)
1910 .unwrap_or(true),
1911 created_at_ms: 0,
1912 updated_at_ms: 0,
1913 };
1914 let _ = crate::runtime::worktrees::delete_managed_worktree(state, &record).await;
1915}
1916
1917fn normalize_tool_name(name: &str) -> String {
1918 match name.trim().to_lowercase().replace('-', "_").as_str() {
1919 "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
1920 other => other.to_string(),
1921 }
1922}
1923
1924async fn evaluate_capability_deny(
1925 state: &AppState,
1926 instance: &AgentInstance,
1927 tool: &str,
1928 args: &Value,
1929 caps: &tandem_orchestrator::CapabilitySpec,
1930 session_id: &str,
1931 message_id: &str,
1932) -> Option<String> {
1933 let deny_patterns = caps
1934 .tool_denylist
1935 .iter()
1936 .map(|name| normalize_tool_name(name))
1937 .collect::<Vec<_>>();
1938 if !deny_patterns.is_empty() && any_policy_matches(&deny_patterns, tool) {
1939 return Some(format!("tool `{tool}` denied by agent capability policy"));
1940 }
1941 let allow_patterns = caps
1942 .tool_allowlist
1943 .iter()
1944 .map(|name| normalize_tool_name(name))
1945 .collect::<Vec<_>>();
1946 if !allow_patterns.is_empty() && !any_policy_matches(&allow_patterns, tool) {
1947 return Some(format!("tool `{tool}` not in agent allowlist"));
1948 }
1949
1950 let browser_execution_tool = matches!(
1951 tool,
1952 "browser_open"
1953 | "browser_navigate"
1954 | "browser_snapshot"
1955 | "browser_click"
1956 | "browser_type"
1957 | "browser_press"
1958 | "browser_wait"
1959 | "browser_extract"
1960 | "browser_screenshot"
1961 | "browser_close"
1962 );
1963
1964 if matches!(
1965 tool,
1966 "websearch" | "webfetch" | "webfetch_html" | "browser_open" | "browser_navigate"
1967 ) || browser_execution_tool
1968 {
1969 if !caps.net_scopes.enabled {
1970 return Some("network disabled for this agent instance".to_string());
1971 }
1972 if !caps.net_scopes.allow_hosts.is_empty() {
1973 if tool == "websearch" {
1974 return Some(
1975 "websearch blocked: host allowlist cannot be verified for search tool"
1976 .to_string(),
1977 );
1978 }
1979 if let Some(host) = extract_url_host(args) {
1980 let allowed = caps.net_scopes.allow_hosts.iter().any(|h| {
1981 let allowed = h.trim().to_ascii_lowercase();
1982 !allowed.is_empty()
1983 && (host == allowed || host.ends_with(&format!(".{allowed}")))
1984 });
1985 if !allowed {
1986 return Some(format!("network host `{host}` not in allow_hosts"));
1987 }
1988 }
1989 }
1990 }
1991
1992 if tool == "bash" {
1993 let cmd = args
1994 .get("command")
1995 .and_then(|v| v.as_str())
1996 .unwrap_or("")
1997 .to_ascii_lowercase();
1998 if cmd.contains("git push") {
1999 if !caps.git_caps.push {
2000 return Some("git push disabled for this agent instance".to_string());
2001 }
2002 if caps.git_caps.push_requires_approval {
2003 let action = state.permissions.evaluate("git_push", "git_push").await;
2004 match action {
2005 tandem_core::PermissionAction::Allow => {}
2006 tandem_core::PermissionAction::Deny => {
2007 return Some("git push denied by policy rule".to_string());
2008 }
2009 tandem_core::PermissionAction::Ask => {
2010 let pending = state
2011 .permissions
2012 .ask_for_session_with_context(
2013 Some(session_id),
2014 "git_push",
2015 args.clone(),
2016 Some(tandem_core::PermissionArgsContext {
2017 args_source: "agent_team.git_push".to_string(),
2018 args_integrity: "runtime-checked".to_string(),
2019 query: Some(format!(
2020 "instanceID={} messageID={}",
2021 instance.instance_id, message_id
2022 )),
2023 }),
2024 )
2025 .await;
2026 return Some(format!(
2027 "git push requires explicit user approval (approvalID={})",
2028 pending.id
2029 ));
2030 }
2031 }
2032 }
2033 }
2034 if cmd.contains("git commit") && !caps.git_caps.commit {
2035 return Some("git commit disabled for this agent instance".to_string());
2036 }
2037 }
2038
2039 let access_kind = tool_fs_access_kind(tool);
2040 if let Some(kind) = access_kind {
2041 let Some(session) = state.storage.get_session(session_id).await else {
2042 return Some("session not found for capability evaluation".to_string());
2043 };
2044 let Some(root) = session.workspace_root.clone() else {
2045 return Some("workspace root missing for capability evaluation".to_string());
2046 };
2047 let requested = extract_tool_candidate_paths(tool, args);
2048 if !requested.is_empty() {
2049 let allowed_scopes = if kind == "read" {
2050 &caps.fs_scopes.read
2051 } else {
2052 &caps.fs_scopes.write
2053 };
2054 if allowed_scopes.is_empty() {
2055 return Some(format!("fs {kind} access blocked: no scopes configured"));
2056 }
2057 for candidate in requested {
2058 if !is_path_allowed_by_scopes(&root, &candidate, allowed_scopes) {
2059 return Some(format!("fs {kind} access denied for path `{}`", candidate));
2060 }
2061 }
2062 }
2063 }
2064
2065 denied_secrets_reason(tool, caps, args)
2066}
2067
2068fn denied_secrets_reason(
2069 tool: &str,
2070 caps: &tandem_orchestrator::CapabilitySpec,
2071 args: &Value,
2072) -> Option<String> {
2073 if tool == "auth" {
2074 if caps.secrets_scopes.is_empty() {
2075 return Some("secrets are disabled for this agent instance".to_string());
2076 }
2077 let alias = args
2078 .get("id")
2079 .or_else(|| args.get("provider"))
2080 .or_else(|| args.get("providerID"))
2081 .and_then(|v| v.as_str())
2082 .unwrap_or("")
2083 .trim();
2084 if !alias.is_empty() && !caps.secrets_scopes.iter().any(|allowed| allowed == alias) {
2085 return Some(format!(
2086 "secret alias `{alias}` is not in agent secretsScopes allowlist"
2087 ));
2088 }
2089 }
2090 None
2091}
2092
2093fn tool_fs_access_kind(tool: &str) -> Option<&'static str> {
2094 match tool {
2095 "read" | "glob" | "grep" | "codesearch" | "lsp" => Some("read"),
2096 "write" | "edit" | "apply_patch" => Some("write"),
2097 _ => None,
2098 }
2099}
2100
2101fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
2102 let Some(obj) = args.as_object() else {
2103 return Vec::new();
2104 };
2105 let keys: &[&str] = match tool {
2106 "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
2107 "glob" => &["pattern"],
2108 "lsp" => &["filePath", "path"],
2109 "bash" => &["cwd"],
2110 "apply_patch" => &["path"],
2111 _ => &["path", "cwd"],
2112 };
2113 keys.iter()
2114 .filter_map(|key| obj.get(*key))
2115 .filter_map(|value| value.as_str())
2116 .filter(|s| !s.trim().is_empty())
2117 .map(|raw| strip_glob_tokens(raw).to_string())
2118 .collect()
2119}
2120
2121fn strip_glob_tokens(path: &str) -> &str {
2122 let mut end = path.len();
2123 for (idx, ch) in path.char_indices() {
2124 if ch == '*' || ch == '?' || ch == '[' {
2125 end = idx;
2126 break;
2127 }
2128 }
2129 &path[..end]
2130}
2131
2132fn is_path_allowed_by_scopes(root: &str, candidate: &str, scopes: &[String]) -> bool {
2133 let root_path = PathBuf::from(root);
2134 let candidate_path = resolve_path(&root_path, candidate);
2135 scopes.iter().any(|scope| {
2136 let scope_path = resolve_path(&root_path, scope);
2137 candidate_path.starts_with(scope_path)
2138 })
2139}
2140
2141fn resolve_path(root: &Path, raw: &str) -> PathBuf {
2142 let raw = raw.trim();
2143 if raw.is_empty() {
2144 return root.to_path_buf();
2145 }
2146 let path = PathBuf::from(raw);
2147 if path.is_absolute() {
2148 path
2149 } else {
2150 root.join(path)
2151 }
2152}
2153
2154fn extract_url_host(args: &Value) -> Option<String> {
2155 let url = args
2156 .get("url")
2157 .or_else(|| args.get("uri"))
2158 .or_else(|| args.get("link"))
2159 .and_then(|v| v.as_str())?;
2160 let raw = url.trim();
2161 let (_, after_scheme) = raw.split_once("://")?;
2162 let host_port = after_scheme.split('/').next().unwrap_or_default();
2163 let host = host_port.split('@').next_back().unwrap_or_default();
2164 let host = host
2165 .split(':')
2166 .next()
2167 .unwrap_or_default()
2168 .to_ascii_lowercase();
2169 if host.is_empty() {
2170 None
2171 } else {
2172 Some(host)
2173 }
2174}
2175
2176pub fn emit_spawn_requested(state: &AppState, req: &SpawnRequest) {
2177 emit_spawn_requested_with_context(state, req, &SpawnEventContext::default());
2178}
2179
2180pub fn emit_spawn_denied(state: &AppState, req: &SpawnRequest, decision: &SpawnDecision) {
2181 emit_spawn_denied_with_context(state, req, decision, &SpawnEventContext::default());
2182}
2183
2184pub fn emit_spawn_approved(state: &AppState, req: &SpawnRequest, instance: &AgentInstance) {
2185 emit_spawn_approved_with_context(state, req, instance, &SpawnEventContext::default());
2186}
2187
2188#[derive(Default)]
2189pub struct SpawnEventContext<'a> {
2190 pub session_id: Option<&'a str>,
2191 pub message_id: Option<&'a str>,
2192 pub run_id: Option<&'a str>,
2193}
2194
2195pub fn emit_spawn_requested_with_context(
2196 state: &AppState,
2197 req: &SpawnRequest,
2198 ctx: &SpawnEventContext<'_>,
2199) {
2200 state.event_bus.publish(EngineEvent::new(
2201 "agent_team.spawn.requested",
2202 json!({
2203 "sessionID": ctx.session_id,
2204 "messageID": ctx.message_id,
2205 "runID": ctx.run_id,
2206 "missionID": req.mission_id,
2207 "instanceID": Value::Null,
2208 "parentInstanceID": req.parent_instance_id,
2209 "source": req.source,
2210 "requestedRole": req.role,
2211 "templateID": req.template_id,
2212 "justification": req.justification,
2213 "timestampMs": crate::now_ms(),
2214 }),
2215 ));
2216}
2217
2218pub fn emit_spawn_denied_with_context(
2219 state: &AppState,
2220 req: &SpawnRequest,
2221 decision: &SpawnDecision,
2222 ctx: &SpawnEventContext<'_>,
2223) {
2224 state.event_bus.publish(EngineEvent::new(
2225 "agent_team.spawn.denied",
2226 json!({
2227 "sessionID": ctx.session_id,
2228 "messageID": ctx.message_id,
2229 "runID": ctx.run_id,
2230 "missionID": req.mission_id,
2231 "instanceID": Value::Null,
2232 "parentInstanceID": req.parent_instance_id,
2233 "source": req.source,
2234 "requestedRole": req.role,
2235 "templateID": req.template_id,
2236 "code": decision.code,
2237 "error": decision.reason,
2238 "timestampMs": crate::now_ms(),
2239 }),
2240 ));
2241}
2242
2243pub fn emit_spawn_approved_with_context(
2244 state: &AppState,
2245 req: &SpawnRequest,
2246 instance: &AgentInstance,
2247 ctx: &SpawnEventContext<'_>,
2248) {
2249 state.event_bus.publish(EngineEvent::new(
2250 "agent_team.spawn.approved",
2251 json!({
2252 "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2253 "messageID": ctx.message_id,
2254 "runID": ctx.run_id.or(instance.run_id.as_deref()),
2255 "missionID": instance.mission_id,
2256 "instanceID": instance.instance_id,
2257 "parentInstanceID": instance.parent_instance_id,
2258 "source": req.source,
2259 "requestedRole": req.role,
2260 "templateID": instance.template_id,
2261 "skillHash": instance.skill_hash,
2262 "workspaceRoot": instance_workspace_root(instance),
2263 "workspaceRepoRoot": instance_workspace_repo_root(instance),
2264 "managedWorktree": instance_managed_worktree(instance),
2265 "timestampMs": crate::now_ms(),
2266 }),
2267 ));
2268 state.event_bus.publish(EngineEvent::new(
2269 "agent_team.instance.started",
2270 json!({
2271 "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2272 "messageID": ctx.message_id,
2273 "runID": ctx.run_id.or(instance.run_id.as_deref()),
2274 "missionID": instance.mission_id,
2275 "instanceID": instance.instance_id,
2276 "parentInstanceID": instance.parent_instance_id,
2277 "role": instance.role,
2278 "status": instance.status,
2279 "budgetLimit": instance.budget,
2280 "skillHash": instance.skill_hash,
2281 "workspaceRoot": instance_workspace_root(instance),
2282 "workspaceRepoRoot": instance_workspace_repo_root(instance),
2283 "managedWorktree": instance_managed_worktree(instance),
2284 "timestampMs": crate::now_ms(),
2285 }),
2286 ));
2287}
2288
2289pub fn emit_budget_usage(
2290 state: &AppState,
2291 instance: &AgentInstance,
2292 tokens_used: u64,
2293 steps_used: u32,
2294 tool_calls_used: u32,
2295 cost_used_usd: f64,
2296 elapsed_ms: u64,
2297) {
2298 state.event_bus.publish(EngineEvent::new(
2299 "agent_team.budget.usage",
2300 json!({
2301 "sessionID": instance.session_id,
2302 "messageID": Value::Null,
2303 "runID": instance.run_id,
2304 "missionID": instance.mission_id,
2305 "instanceID": instance.instance_id,
2306 "tokensUsed": tokens_used,
2307 "stepsUsed": steps_used,
2308 "toolCallsUsed": tool_calls_used,
2309 "costUsedUsd": cost_used_usd,
2310 "elapsedMs": elapsed_ms,
2311 "timestampMs": crate::now_ms(),
2312 }),
2313 ));
2314}
2315
2316pub fn emit_budget_exhausted(
2317 state: &AppState,
2318 instance: &AgentInstance,
2319 exhausted_by: &str,
2320 tokens_used: u64,
2321 steps_used: u32,
2322 tool_calls_used: u32,
2323 cost_used_usd: f64,
2324 elapsed_ms: u64,
2325) {
2326 state.event_bus.publish(EngineEvent::new(
2327 "agent_team.budget.exhausted",
2328 json!({
2329 "sessionID": instance.session_id,
2330 "messageID": Value::Null,
2331 "runID": instance.run_id,
2332 "missionID": instance.mission_id,
2333 "instanceID": instance.instance_id,
2334 "exhaustedBy": exhausted_by,
2335 "tokensUsed": tokens_used,
2336 "stepsUsed": steps_used,
2337 "toolCallsUsed": tool_calls_used,
2338 "costUsedUsd": cost_used_usd,
2339 "elapsedMs": elapsed_ms,
2340 "timestampMs": crate::now_ms(),
2341 }),
2342 ));
2343}
2344
2345pub fn emit_instance_cancelled(state: &AppState, instance: &AgentInstance, reason: &str) {
2346 state.event_bus.publish(EngineEvent::new(
2347 "agent_team.instance.cancelled",
2348 json!({
2349 "sessionID": instance.session_id,
2350 "messageID": Value::Null,
2351 "runID": instance.run_id,
2352 "missionID": instance.mission_id,
2353 "instanceID": instance.instance_id,
2354 "parentInstanceID": instance.parent_instance_id,
2355 "role": instance.role,
2356 "status": instance.status,
2357 "reason": reason,
2358 "workspaceRoot": instance_workspace_root(instance),
2359 "workspaceRepoRoot": instance_workspace_repo_root(instance),
2360 "managedWorktree": instance_managed_worktree(instance),
2361 "timestampMs": crate::now_ms(),
2362 }),
2363 ));
2364}
2365
2366pub fn emit_instance_completed(state: &AppState, instance: &AgentInstance) {
2367 state.event_bus.publish(EngineEvent::new(
2368 "agent_team.instance.completed",
2369 json!({
2370 "sessionID": instance.session_id,
2371 "messageID": Value::Null,
2372 "runID": instance.run_id,
2373 "missionID": instance.mission_id,
2374 "instanceID": instance.instance_id,
2375 "parentInstanceID": instance.parent_instance_id,
2376 "role": instance.role,
2377 "status": instance.status,
2378 "workspaceRoot": instance_workspace_root(instance),
2379 "workspaceRepoRoot": instance_workspace_repo_root(instance),
2380 "managedWorktree": instance_managed_worktree(instance),
2381 "timestampMs": crate::now_ms(),
2382 }),
2383 ));
2384}
2385
2386pub fn emit_instance_failed(state: &AppState, instance: &AgentInstance) {
2387 state.event_bus.publish(EngineEvent::new(
2388 "agent_team.instance.failed",
2389 json!({
2390 "sessionID": instance.session_id,
2391 "messageID": Value::Null,
2392 "runID": instance.run_id,
2393 "missionID": instance.mission_id,
2394 "instanceID": instance.instance_id,
2395 "parentInstanceID": instance.parent_instance_id,
2396 "role": instance.role,
2397 "status": instance.status,
2398 "workspaceRoot": instance_workspace_root(instance),
2399 "workspaceRepoRoot": instance_workspace_repo_root(instance),
2400 "managedWorktree": instance_managed_worktree(instance),
2401 "timestampMs": crate::now_ms(),
2402 }),
2403 ));
2404}
2405
2406pub fn emit_mission_budget_exhausted(
2407 state: &AppState,
2408 mission_id: &str,
2409 exhausted_by: &str,
2410 tokens_used: u64,
2411 steps_used: u64,
2412 tool_calls_used: u64,
2413 cost_used_usd: f64,
2414) {
2415 state.event_bus.publish(EngineEvent::new(
2416 "agent_team.mission.budget.exhausted",
2417 json!({
2418 "sessionID": Value::Null,
2419 "messageID": Value::Null,
2420 "runID": Value::Null,
2421 "missionID": mission_id,
2422 "instanceID": Value::Null,
2423 "exhaustedBy": exhausted_by,
2424 "tokensUsed": tokens_used,
2425 "stepsUsed": steps_used,
2426 "toolCallsUsed": tool_calls_used,
2427 "costUsedUsd": cost_used_usd,
2428 "timestampMs": crate::now_ms(),
2429 }),
2430 ));
2431}