1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::Context;
7use futures::future::BoxFuture;
8use serde::Deserialize;
9use serde::Serialize;
10use serde_json::{json, Value};
11use tandem_core::{
12 any_policy_matches, SpawnAgentHook, SpawnAgentToolContext, SpawnAgentToolResult,
13 ToolPolicyContext, ToolPolicyDecision, ToolPolicyHook,
14};
15use tandem_orchestrator::{
16 AgentInstance, AgentInstanceStatus, AgentRole, AgentTemplate, BudgetLimit, SpawnDecision,
17 SpawnDenyCode, SpawnPolicy, SpawnRequest, SpawnSource,
18};
19use tandem_skills::SkillService;
20use tandem_types::{EngineEvent, Session};
21use tokio::fs;
22use tokio::sync::RwLock;
23use uuid::Uuid;
24
25use crate::AppState;
26
27#[derive(Clone, Default)]
28pub struct AgentTeamRuntime {
29 policy: Arc<RwLock<Option<SpawnPolicy>>>,
30 templates: Arc<RwLock<HashMap<String, AgentTemplate>>>,
31 instances: Arc<RwLock<HashMap<String, AgentInstance>>>,
32 budgets: Arc<RwLock<HashMap<String, InstanceBudgetState>>>,
33 mission_budgets: Arc<RwLock<HashMap<String, MissionBudgetState>>>,
34 spawn_approvals: Arc<RwLock<HashMap<String, PendingSpawnApproval>>>,
35 loaded_workspace: Arc<RwLock<Option<String>>>,
36 audit_path: Arc<RwLock<PathBuf>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct SpawnResult {
41 pub decision: SpawnDecision,
42 pub instance: Option<AgentInstance>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46pub struct AgentMissionSummary {
47 #[serde(rename = "missionID")]
48 pub mission_id: String,
49 #[serde(rename = "instanceCount")]
50 pub instance_count: usize,
51 #[serde(rename = "runningCount")]
52 pub running_count: usize,
53 #[serde(rename = "completedCount")]
54 pub completed_count: usize,
55 #[serde(rename = "failedCount")]
56 pub failed_count: usize,
57 #[serde(rename = "cancelledCount")]
58 pub cancelled_count: usize,
59 #[serde(rename = "queuedCount")]
60 pub queued_count: usize,
61 #[serde(rename = "tokenUsedTotal")]
62 pub token_used_total: u64,
63 #[serde(rename = "toolCallsUsedTotal")]
64 pub tool_calls_used_total: u64,
65 #[serde(rename = "stepsUsedTotal")]
66 pub steps_used_total: u64,
67 #[serde(rename = "costUsedUsdTotal")]
68 pub cost_used_usd_total: f64,
69}
70
71#[derive(Debug, Clone, Default)]
72struct InstanceBudgetState {
73 tokens_used: u64,
74 steps_used: u32,
75 tool_calls_used: u32,
76 cost_used_usd: f64,
77 started_at: Option<Instant>,
78 exhausted: bool,
79}
80
81#[derive(Debug, Clone, Default)]
82struct MissionBudgetState {
83 tokens_used: u64,
84 steps_used: u64,
85 tool_calls_used: u64,
86 cost_used_usd: f64,
87 exhausted: bool,
88}
89
90#[derive(Debug, Clone, Serialize)]
91pub struct PendingSpawnApproval {
92 #[serde(rename = "approvalID")]
93 pub approval_id: String,
94 #[serde(rename = "createdAtMs")]
95 pub created_at_ms: u64,
96 pub request: SpawnRequest,
97 #[serde(rename = "decisionCode")]
98 pub decision_code: Option<SpawnDenyCode>,
99 pub reason: Option<String>,
100}
101
102#[derive(Clone)]
103pub struct ServerSpawnAgentHook {
104 state: AppState,
105}
106
107#[derive(Debug, Deserialize)]
108struct SpawnAgentToolInput {
109 #[serde(rename = "missionID")]
110 mission_id: Option<String>,
111 #[serde(rename = "parentInstanceID")]
112 parent_instance_id: Option<String>,
113 #[serde(rename = "templateID")]
114 template_id: Option<String>,
115 role: AgentRole,
116 source: Option<SpawnSource>,
117 justification: String,
118 #[serde(rename = "budgetOverride", default)]
119 budget_override: Option<BudgetLimit>,
120}
121
122impl ServerSpawnAgentHook {
123 pub fn new(state: AppState) -> Self {
124 Self { state }
125 }
126}
127
128impl SpawnAgentHook for ServerSpawnAgentHook {
129 fn spawn_agent(
130 &self,
131 ctx: SpawnAgentToolContext,
132 ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>> {
133 let state = self.state.clone();
134 Box::pin(async move {
135 let parsed = serde_json::from_value::<SpawnAgentToolInput>(ctx.args.clone());
136 let input = match parsed {
137 Ok(input) => input,
138 Err(err) => {
139 return Ok(SpawnAgentToolResult {
140 output: format!("spawn_agent denied: invalid args ({err})"),
141 metadata: json!({
142 "ok": false,
143 "code": "SPAWN_INVALID_ARGS",
144 "error": err.to_string(),
145 }),
146 });
147 }
148 };
149 let req = SpawnRequest {
150 mission_id: input.mission_id,
151 parent_instance_id: input.parent_instance_id,
152 source: input.source.unwrap_or(SpawnSource::ToolCall),
153 parent_role: None,
154 role: input.role,
155 template_id: input.template_id,
156 justification: input.justification,
157 budget_override: input.budget_override,
158 };
159
160 let event_ctx = SpawnEventContext {
161 session_id: Some(ctx.session_id.as_str()),
162 message_id: Some(ctx.message_id.as_str()),
163 run_id: None,
164 };
165 emit_spawn_requested_with_context(&state, &req, &event_ctx);
166 let result = state.agent_teams.spawn(&state, req.clone()).await;
167 if !result.decision.allowed || result.instance.is_none() {
168 emit_spawn_denied_with_context(&state, &req, &result.decision, &event_ctx);
169 return Ok(SpawnAgentToolResult {
170 output: result
171 .decision
172 .reason
173 .clone()
174 .unwrap_or_else(|| "spawn_agent denied".to_string()),
175 metadata: json!({
176 "ok": false,
177 "code": result.decision.code,
178 "error": result.decision.reason,
179 "requiresUserApproval": result.decision.requires_user_approval,
180 }),
181 });
182 }
183 let instance = result.instance.expect("checked is_some");
184 emit_spawn_approved_with_context(&state, &req, &instance, &event_ctx);
185 Ok(SpawnAgentToolResult {
186 output: format!(
187 "spawned {} as instance {} (session {})",
188 instance.template_id, instance.instance_id, instance.session_id
189 ),
190 metadata: json!({
191 "ok": true,
192 "missionID": instance.mission_id,
193 "instanceID": instance.instance_id,
194 "sessionID": instance.session_id,
195 "runID": instance.run_id,
196 "status": instance.status,
197 "skillHash": instance.skill_hash,
198 "workspaceRoot": instance_workspace_root(&instance),
199 "workspaceRepoRoot": instance_workspace_repo_root(&instance),
200 "managedWorktree": instance_managed_worktree(&instance),
201 }),
202 })
203 })
204 }
205}
206
207#[derive(Clone)]
208pub struct ServerToolPolicyHook {
209 state: AppState,
210}
211
212impl ServerToolPolicyHook {
213 pub fn new(state: AppState) -> Self {
214 Self { state }
215 }
216}
217
218fn automation_tool_target_paths(tool: &str, args: &Value) -> Vec<String> {
219 let mut paths = Vec::new();
220 match tool {
221 "write" | "edit" => {
222 if let Some(path) = args.get("path").and_then(Value::as_str) {
223 let trimmed = path.trim();
224 if !trimmed.is_empty() {
225 paths.push(trimmed.to_string());
226 }
227 }
228 }
229 "apply_patch" => {
230 let patch = args
231 .get("patchText")
232 .and_then(Value::as_str)
233 .or_else(|| args.as_str());
234 if let Some(patch) = patch {
235 for line in patch.lines() {
236 for prefix in [
237 "*** Update File: ",
238 "*** Delete File: ",
239 "*** Add File: ",
240 "*** Move to: ",
241 ] {
242 if let Some(path) = line.strip_prefix(prefix) {
243 let trimmed = path.trim();
244 if !trimmed.is_empty() {
245 paths.push(trimmed.to_string());
246 }
247 }
248 }
249 }
250 }
251 }
252 _ => {}
253 }
254 paths.sort();
255 paths.dedup();
256 paths
257}
258
259fn automation_path_references_read_only_source_of_truth(
260 path: &str,
261 read_only_names: &std::collections::HashSet<String>,
262 workspace_root: Option<&str>,
263) -> bool {
264 let trimmed = path.trim().trim_matches('`');
265 if trimmed.is_empty() {
266 return false;
267 }
268 let lowered = trimmed.to_ascii_lowercase();
269 if read_only_names.contains(&lowered) {
270 return true;
271 }
272 if let Some(filename) = std::path::Path::new(trimmed)
273 .file_name()
274 .and_then(|value| value.to_str())
275 {
276 if read_only_names.contains(&filename.to_ascii_lowercase()) {
277 return true;
278 }
279 }
280 workspace_root
281 .and_then(|root| {
282 crate::app::state::automation::normalize_workspace_display_path(root, trimmed)
283 })
284 .is_some_and(|normalized| {
285 let normalized_lower = normalized.to_ascii_lowercase();
286 if read_only_names.contains(&normalized_lower) {
287 return true;
288 }
289 std::path::Path::new(&normalized)
290 .file_name()
291 .and_then(|value| value.to_str())
292 .is_some_and(|filename| read_only_names.contains(&filename.to_ascii_lowercase()))
293 })
294}
295
296async fn evaluate_automation_read_only_write_deny(
297 state: &AppState,
298 session_id: &str,
299 tool: &str,
300 args: &Value,
301) -> Option<String> {
302 if !matches!(tool, "write" | "edit" | "apply_patch") {
303 return None;
304 }
305 let run_id = state
306 .automation_v2_session_runs
307 .read()
308 .await
309 .get(session_id)
310 .cloned()?;
311 let run = state.get_automation_v2_run(&run_id).await?;
312 let automation = run.automation_snapshot?;
313 let read_only_names =
314 crate::app::state::automation::enforcement::automation_read_only_source_of_truth_name_variants_for_automation(
315 &automation,
316 );
317 if read_only_names.is_empty() {
318 return None;
319 }
320 let workspace_root = args
321 .get("__workspace_root")
322 .and_then(Value::as_str)
323 .or(automation.workspace_root.as_deref());
324 let blocked_paths = automation_tool_target_paths(tool, args)
325 .into_iter()
326 .filter(|path| {
327 automation_path_references_read_only_source_of_truth(
328 path,
329 &read_only_names,
330 workspace_root,
331 )
332 })
333 .collect::<Vec<_>>();
334 if blocked_paths.is_empty() {
335 None
336 } else {
337 Some(format!(
338 "write denied for automation `{}` (run `{}`): read-only source-of-truth file(s) cannot be mutated: {}",
339 automation.automation_id,
340 run_id,
341 blocked_paths.join(", ")
342 ))
343 }
344}
345
346impl ToolPolicyHook for ServerToolPolicyHook {
347 fn evaluate_tool(
348 &self,
349 ctx: ToolPolicyContext,
350 ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>> {
351 let state = self.state.clone();
352 Box::pin(async move {
353 let tool = normalize_tool_name(&ctx.tool);
354 if let Some(policy) = state.routine_session_policy(&ctx.session_id).await {
355 let allowed_patterns = policy
356 .allowed_tools
357 .iter()
358 .map(|name| normalize_tool_name(name))
359 .collect::<Vec<_>>();
360 if !policy.allowed_tools.is_empty() && !any_policy_matches(&allowed_patterns, &tool)
361 {
362 let reason = format!(
363 "tool `{}` is not allowed for routine `{}` (run `{}`)",
364 tool, policy.routine_id, policy.run_id
365 );
366 state.event_bus.publish(EngineEvent::new(
367 "routine.tool.denied",
368 json!({
369 "sessionID": ctx.session_id,
370 "messageID": ctx.message_id,
371 "runID": policy.run_id,
372 "routineID": policy.routine_id,
373 "tool": tool,
374 "reason": reason,
375 "timestampMs": crate::now_ms(),
376 }),
377 ));
378 return Ok(ToolPolicyDecision {
379 allowed: false,
380 reason: Some(reason),
381 });
382 }
383 }
384
385 if let Some(reason) =
386 evaluate_automation_read_only_write_deny(&state, &ctx.session_id, &tool, &ctx.args)
387 .await
388 {
389 state.event_bus.publish(EngineEvent::new(
390 "automation.read_only_write.denied",
391 json!({
392 "sessionID": ctx.session_id,
393 "messageID": ctx.message_id,
394 "tool": tool,
395 "reason": reason,
396 "timestampMs": crate::now_ms(),
397 }),
398 ));
399 return Ok(ToolPolicyDecision {
400 allowed: false,
401 reason: Some(reason),
402 });
403 }
404
405 let Some(instance) = state
406 .agent_teams
407 .instance_for_session(&ctx.session_id)
408 .await
409 else {
410 return Ok(ToolPolicyDecision {
411 allowed: true,
412 reason: None,
413 });
414 };
415 let caps = instance.capabilities.clone();
416 let deny = evaluate_capability_deny(
417 &state,
418 &instance,
419 &tool,
420 &ctx.args,
421 &caps,
422 &ctx.session_id,
423 &ctx.message_id,
424 )
425 .await;
426 if let Some(reason) = deny {
427 state.event_bus.publish(EngineEvent::new(
428 "agent_team.capability.denied",
429 json!({
430 "sessionID": ctx.session_id,
431 "messageID": ctx.message_id,
432 "runID": instance.run_id,
433 "missionID": instance.mission_id,
434 "instanceID": instance.instance_id,
435 "tool": tool,
436 "reason": reason,
437 "timestampMs": crate::now_ms(),
438 }),
439 ));
440 return Ok(ToolPolicyDecision {
441 allowed: false,
442 reason: Some(reason),
443 });
444 }
445 Ok(ToolPolicyDecision {
446 allowed: true,
447 reason: None,
448 })
449 })
450 }
451}
452
453impl AgentTeamRuntime {
454 pub fn new(audit_path: PathBuf) -> Self {
455 Self {
456 policy: Arc::new(RwLock::new(None)),
457 templates: Arc::new(RwLock::new(HashMap::new())),
458 instances: Arc::new(RwLock::new(HashMap::new())),
459 budgets: Arc::new(RwLock::new(HashMap::new())),
460 mission_budgets: Arc::new(RwLock::new(HashMap::new())),
461 spawn_approvals: Arc::new(RwLock::new(HashMap::new())),
462 loaded_workspace: Arc::new(RwLock::new(None)),
463 audit_path: Arc::new(RwLock::new(audit_path)),
464 }
465 }
466
467 pub async fn set_audit_path(&self, path: PathBuf) {
468 *self.audit_path.write().await = path;
469 }
470
471 pub async fn list_templates(&self) -> Vec<AgentTemplate> {
472 let mut rows = self
473 .templates
474 .read()
475 .await
476 .values()
477 .cloned()
478 .collect::<Vec<_>>();
479 rows.sort_by(|a, b| a.template_id.cmp(&b.template_id));
480 rows
481 }
482
483 async fn templates_dir_for_loaded_workspace(&self) -> anyhow::Result<PathBuf> {
484 let workspace = self
485 .loaded_workspace
486 .read()
487 .await
488 .clone()
489 .ok_or_else(|| anyhow::anyhow!("agent team workspace not loaded"))?;
490 Ok(PathBuf::from(workspace)
491 .join(".tandem")
492 .join("agent-team")
493 .join("templates"))
494 }
495
496 fn template_filename(template_id: &str) -> String {
497 let safe = template_id
498 .trim()
499 .chars()
500 .map(|ch| {
501 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
502 ch
503 } else {
504 '_'
505 }
506 })
507 .collect::<String>();
508 let fallback = if safe.is_empty() {
509 "template".to_string()
510 } else {
511 safe
512 };
513 format!("{fallback}.yaml")
514 }
515
516 pub async fn upsert_template(
517 &self,
518 workspace_root: &str,
519 template: AgentTemplate,
520 ) -> anyhow::Result<AgentTemplate> {
521 self.ensure_loaded_for_workspace(workspace_root).await?;
522 let templates_dir = self.templates_dir_for_loaded_workspace().await?;
523 fs::create_dir_all(&templates_dir).await?;
524 let path = templates_dir.join(Self::template_filename(&template.template_id));
525 let payload = serde_yaml::to_string(&template)?;
526 fs::write(path, payload).await?;
527 self.templates
528 .write()
529 .await
530 .insert(template.template_id.clone(), template.clone());
531 Ok(template)
532 }
533
534 pub async fn delete_template(
535 &self,
536 workspace_root: &str,
537 template_id: &str,
538 ) -> anyhow::Result<bool> {
539 self.ensure_loaded_for_workspace(workspace_root).await?;
540 let templates_dir = self.templates_dir_for_loaded_workspace().await?;
541 let path = templates_dir.join(Self::template_filename(template_id));
542 let existed = self.templates.write().await.remove(template_id).is_some();
543 if path.exists() {
544 let _ = fs::remove_file(path).await;
545 }
546 Ok(existed)
547 }
548
549 pub async fn get_template_for_workspace(
550 &self,
551 workspace_root: &str,
552 template_id: &str,
553 ) -> anyhow::Result<Option<AgentTemplate>> {
554 self.ensure_loaded_for_workspace(workspace_root).await?;
555 Ok(self.templates.read().await.get(template_id).cloned())
556 }
557
558 pub async fn list_instances(
559 &self,
560 mission_id: Option<&str>,
561 parent_instance_id: Option<&str>,
562 status: Option<AgentInstanceStatus>,
563 ) -> Vec<AgentInstance> {
564 let mut rows = self
565 .instances
566 .read()
567 .await
568 .values()
569 .filter(|instance| {
570 if let Some(mission_id) = mission_id {
571 if instance.mission_id != mission_id {
572 return false;
573 }
574 }
575 if let Some(parent_id) = parent_instance_id {
576 if instance.parent_instance_id.as_deref() != Some(parent_id) {
577 return false;
578 }
579 }
580 if let Some(status) = &status {
581 if &instance.status != status {
582 return false;
583 }
584 }
585 true
586 })
587 .cloned()
588 .collect::<Vec<_>>();
589 rows.sort_by(|a, b| a.instance_id.cmp(&b.instance_id));
590 rows
591 }
592
593 pub async fn list_mission_summaries(&self) -> Vec<AgentMissionSummary> {
594 let instances = self.instances.read().await;
595 let mut by_mission: HashMap<String, AgentMissionSummary> = HashMap::new();
596 for instance in instances.values() {
597 let row = by_mission
598 .entry(instance.mission_id.clone())
599 .or_insert_with(|| AgentMissionSummary {
600 mission_id: instance.mission_id.clone(),
601 instance_count: 0,
602 running_count: 0,
603 completed_count: 0,
604 failed_count: 0,
605 cancelled_count: 0,
606 queued_count: 0,
607 token_used_total: 0,
608 tool_calls_used_total: 0,
609 steps_used_total: 0,
610 cost_used_usd_total: 0.0,
611 });
612 row.instance_count = row.instance_count.saturating_add(1);
613 match instance.status {
614 AgentInstanceStatus::Queued => {
615 row.queued_count = row.queued_count.saturating_add(1)
616 }
617 AgentInstanceStatus::Running => {
618 row.running_count = row.running_count.saturating_add(1)
619 }
620 AgentInstanceStatus::Completed => {
621 row.completed_count = row.completed_count.saturating_add(1)
622 }
623 AgentInstanceStatus::Failed => {
624 row.failed_count = row.failed_count.saturating_add(1)
625 }
626 AgentInstanceStatus::Cancelled => {
627 row.cancelled_count = row.cancelled_count.saturating_add(1)
628 }
629 }
630 if let Some(usage) = instance
631 .metadata
632 .as_ref()
633 .and_then(|m| m.get("budgetUsage"))
634 .and_then(|u| u.as_object())
635 {
636 row.token_used_total = row.token_used_total.saturating_add(
637 usage
638 .get("tokensUsed")
639 .and_then(|v| v.as_u64())
640 .unwrap_or(0),
641 );
642 row.tool_calls_used_total = row.tool_calls_used_total.saturating_add(
643 usage
644 .get("toolCallsUsed")
645 .and_then(|v| v.as_u64())
646 .unwrap_or(0),
647 );
648 row.steps_used_total = row
649 .steps_used_total
650 .saturating_add(usage.get("stepsUsed").and_then(|v| v.as_u64()).unwrap_or(0));
651 row.cost_used_usd_total += usage
652 .get("costUsedUsd")
653 .and_then(|v| v.as_f64())
654 .unwrap_or(0.0);
655 }
656 }
657 let mut rows = by_mission.into_values().collect::<Vec<_>>();
658 rows.sort_by(|a, b| a.mission_id.cmp(&b.mission_id));
659 rows
660 }
661
662 pub async fn instance_for_session(&self, session_id: &str) -> Option<AgentInstance> {
663 self.instances
664 .read()
665 .await
666 .values()
667 .find(|instance| instance.session_id == session_id)
668 .cloned()
669 }
670
671 pub async fn list_spawn_approvals(&self) -> Vec<PendingSpawnApproval> {
672 let mut rows = self
673 .spawn_approvals
674 .read()
675 .await
676 .values()
677 .cloned()
678 .collect::<Vec<_>>();
679 rows.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
680 rows
681 }
682
683 pub async fn ensure_loaded_for_workspace(&self, workspace_root: &str) -> anyhow::Result<()> {
684 let normalized = workspace_root.trim().to_string();
685 let already_loaded = self
686 .loaded_workspace
687 .read()
688 .await
689 .as_ref()
690 .map(|s| s == &normalized)
691 .unwrap_or(false);
692 if already_loaded {
693 return Ok(());
694 }
695
696 let root = PathBuf::from(&normalized);
697 let policy_path = root
698 .join(".tandem")
699 .join("agent-team")
700 .join("spawn-policy.yaml");
701 let templates_dir = root.join(".tandem").join("agent-team").join("templates");
702
703 let mut next_policy = None;
704 if policy_path.exists() {
705 let raw = fs::read_to_string(&policy_path)
706 .await
707 .with_context(|| format!("failed reading {}", policy_path.display()))?;
708 let parsed = serde_yaml::from_str::<SpawnPolicy>(&raw)
709 .with_context(|| format!("failed parsing {}", policy_path.display()))?;
710 next_policy = Some(parsed);
711 }
712
713 let mut next_templates = HashMap::new();
714 if templates_dir.exists() {
715 let mut entries = fs::read_dir(&templates_dir).await?;
716 while let Some(entry) = entries.next_entry().await? {
717 let path = entry.path();
718 if !path.is_file() {
719 continue;
720 }
721 let ext = path
722 .extension()
723 .and_then(|v| v.to_str())
724 .unwrap_or_default()
725 .to_ascii_lowercase();
726 if ext != "yaml" && ext != "yml" && ext != "json" {
727 continue;
728 }
729 let raw = fs::read_to_string(&path).await?;
730 let template = serde_yaml::from_str::<AgentTemplate>(&raw)
731 .with_context(|| format!("failed parsing {}", path.display()))?;
732 next_templates.insert(template.template_id.clone(), template);
733 }
734 }
735
736 *self.policy.write().await = next_policy;
737 *self.templates.write().await = next_templates;
738 *self.loaded_workspace.write().await = Some(normalized);
739 Ok(())
740 }
741
742 pub async fn spawn(&self, state: &AppState, req: SpawnRequest) -> SpawnResult {
743 self.spawn_with_approval_override(state, req, false).await
744 }
745
746 async fn spawn_with_approval_override(
747 &self,
748 state: &AppState,
749 mut req: SpawnRequest,
750 approval_override: bool,
751 ) -> SpawnResult {
752 let workspace_root = state.workspace_index.snapshot().await.root;
753 if let Err(err) = self.ensure_loaded_for_workspace(&workspace_root).await {
754 return SpawnResult {
755 decision: SpawnDecision {
756 allowed: false,
757 code: Some(SpawnDenyCode::SpawnPolicyMissing),
758 reason: Some(format!("spawn policy load failed: {}", err)),
759 requires_user_approval: false,
760 },
761 instance: None,
762 };
763 }
764
765 let Some(policy) = self.policy.read().await.clone() else {
766 return SpawnResult {
767 decision: SpawnDecision {
768 allowed: false,
769 code: Some(SpawnDenyCode::SpawnPolicyMissing),
770 reason: Some("spawn policy file missing".to_string()),
771 requires_user_approval: false,
772 },
773 instance: None,
774 };
775 };
776
777 let template = {
778 let templates = self.templates.read().await;
779 req.template_id
780 .as_deref()
781 .and_then(|template_id| templates.get(template_id).cloned())
782 };
783 if req.template_id.is_none() {
784 if let Some(found) = self
785 .templates
786 .read()
787 .await
788 .values()
789 .find(|t| t.role == req.role)
790 .cloned()
791 {
792 req.template_id = Some(found.template_id.clone());
793 }
794 }
795 let template = if template.is_some() {
796 template
797 } else {
798 let templates = self.templates.read().await;
799 req.template_id
800 .as_deref()
801 .and_then(|id| templates.get(id).cloned())
802 };
803
804 if req.parent_role.is_none() {
805 if let Some(parent_id) = req.parent_instance_id.as_deref() {
806 let instances = self.instances.read().await;
807 req.parent_role = instances
808 .get(parent_id)
809 .map(|instance| instance.role.clone());
810 }
811 }
812
813 let instances = self.instances.read().await;
814 let total_agents = instances.len();
815 let running_agents = instances
816 .values()
817 .filter(|instance| instance.status == AgentInstanceStatus::Running)
818 .count();
819 drop(instances);
820
821 let mut decision = policy.evaluate(&req, total_agents, running_agents, template.as_ref());
822 if approval_override
823 && !decision.allowed
824 && decision.requires_user_approval
825 && matches!(decision.code, Some(SpawnDenyCode::SpawnRequiresApproval))
826 {
827 decision = SpawnDecision {
828 allowed: true,
829 code: None,
830 reason: None,
831 requires_user_approval: false,
832 };
833 }
834 if !decision.allowed {
835 if decision.requires_user_approval && !approval_override {
836 self.queue_spawn_approval(&req, &decision).await;
837 }
838 return SpawnResult {
839 decision,
840 instance: None,
841 };
842 }
843
844 let mission_id = req
845 .mission_id
846 .clone()
847 .unwrap_or_else(|| "mission-default".to_string());
848
849 if let Some(reason) = self
850 .mission_budget_exceeded_reason(&policy, &mission_id)
851 .await
852 {
853 return SpawnResult {
854 decision: SpawnDecision {
855 allowed: false,
856 code: Some(SpawnDenyCode::SpawnMissionBudgetExceeded),
857 reason: Some(reason),
858 requires_user_approval: false,
859 },
860 instance: None,
861 };
862 }
863
864 let template = template.unwrap_or_else(|| AgentTemplate {
865 template_id: "default-template".to_string(),
866 display_name: None,
867 avatar_url: None,
868 role: req.role.clone(),
869 system_prompt: None,
870 default_model: None,
871 skills: Vec::new(),
872 default_budget: BudgetLimit::default(),
873 capabilities: Default::default(),
874 });
875
876 let skill_hash = match compute_skill_hash(&workspace_root, &template, &policy).await {
877 Ok(hash) => hash,
878 Err(err) => {
879 let lowered = err.to_ascii_lowercase();
880 let code = if lowered.contains("pinned hash mismatch") {
881 SpawnDenyCode::SpawnSkillHashMismatch
882 } else if lowered.contains("skill source denied") {
883 SpawnDenyCode::SpawnSkillSourceDenied
884 } else {
885 SpawnDenyCode::SpawnRequiredSkillMissing
886 };
887 return SpawnResult {
888 decision: SpawnDecision {
889 allowed: false,
890 code: Some(code),
891 reason: Some(err),
892 requires_user_approval: false,
893 },
894 instance: None,
895 };
896 }
897 };
898
899 let parent_snapshot = {
900 let instances = self.instances.read().await;
901 req.parent_instance_id
902 .as_deref()
903 .and_then(|id| instances.get(id).cloned())
904 };
905 let parent_usage = if let Some(parent_id) = req.parent_instance_id.as_deref() {
906 self.budgets.read().await.get(parent_id).cloned()
907 } else {
908 None
909 };
910
911 let budget = resolve_budget(
912 &policy,
913 parent_snapshot,
914 parent_usage,
915 &template,
916 req.budget_override.clone(),
917 &req.role,
918 );
919
920 let instance_id = format!("ins_{}", Uuid::new_v4().simple());
921 let managed_worktree = prepare_agent_instance_workspace(
922 state,
923 &workspace_root,
924 req.mission_id.as_deref(),
925 &instance_id,
926 &template.template_id,
927 )
928 .await;
929 let workspace_repo_root = managed_worktree
930 .as_ref()
931 .map(|row| row.record.repo_root.clone())
932 .or_else(|| crate::runtime::worktrees::resolve_git_repo_root(&workspace_root))
933 .unwrap_or_else(|| workspace_root.clone());
934 let worker_workspace_root = managed_worktree
935 .as_ref()
936 .map(|row| row.record.path.clone())
937 .unwrap_or_else(|| workspace_root.clone());
938 let mut session = Session::new(
939 Some(format!("Agent Team {}", template.template_id)),
940 Some(worker_workspace_root.clone()),
941 );
942 session.workspace_root = Some(worker_workspace_root.clone());
943 let session_id = session.id.clone();
944 if let Err(err) = state.storage.save_session(session).await {
945 if let Some(worktree) = managed_worktree.as_ref() {
946 let _ = crate::runtime::worktrees::delete_managed_worktree(state, &worktree.record)
947 .await;
948 }
949 return SpawnResult {
950 decision: SpawnDecision {
951 allowed: false,
952 code: Some(SpawnDenyCode::SpawnPolicyMissing),
953 reason: Some(format!("failed creating child session: {}", err)),
954 requires_user_approval: false,
955 },
956 instance: None,
957 };
958 }
959
960 let instance = AgentInstance {
961 instance_id: instance_id.clone(),
962 mission_id: mission_id.clone(),
963 parent_instance_id: req.parent_instance_id.clone(),
964 role: template.role.clone(),
965 template_id: template.template_id.clone(),
966 session_id: session_id.clone(),
967 run_id: None,
968 status: AgentInstanceStatus::Running,
969 budget,
970 skill_hash: skill_hash.clone(),
971 capabilities: template.capabilities.clone(),
972 metadata: Some(json!({
973 "source": req.source,
974 "justification": req.justification,
975 "workspaceRoot": worker_workspace_root,
976 "workspaceRepoRoot": workspace_repo_root,
977 "managedWorktree": managed_worktree.as_ref().map(|row| json!({
978 "path": row.record.path,
979 "branch": row.record.branch,
980 "repoRoot": row.record.repo_root,
981 "cleanupBranch": row.record.cleanup_branch,
982 "reused": row.reused,
983 })).unwrap_or(Value::Null),
984 })),
985 };
986
987 self.instances
988 .write()
989 .await
990 .insert(instance.instance_id.clone(), instance.clone());
991 self.budgets.write().await.insert(
992 instance.instance_id.clone(),
993 InstanceBudgetState {
994 started_at: Some(Instant::now()),
995 ..InstanceBudgetState::default()
996 },
997 );
998 let _ = self.append_audit("spawn.approved", &instance).await;
999
1000 SpawnResult {
1001 decision: SpawnDecision {
1002 allowed: true,
1003 code: None,
1004 reason: None,
1005 requires_user_approval: false,
1006 },
1007 instance: Some(instance),
1008 }
1009 }
1010
1011 pub async fn approve_spawn_approval(
1012 &self,
1013 state: &AppState,
1014 approval_id: &str,
1015 reason: Option<&str>,
1016 ) -> Option<SpawnResult> {
1017 let approval = self.spawn_approvals.write().await.remove(approval_id)?;
1018 let result = self
1019 .spawn_with_approval_override(state, approval.request.clone(), true)
1020 .await;
1021 if let Some(instance) = result.instance.as_ref() {
1022 let note = reason.unwrap_or("approved by operator");
1023 let _ = self
1024 .append_approval_audit("spawn.approval.approved", approval_id, Some(instance), note)
1025 .await;
1026 } else {
1027 let note = reason.unwrap_or("approval replay failed policy checks");
1028 let _ = self
1029 .append_approval_audit("spawn.approval.rejected_on_replay", approval_id, None, note)
1030 .await;
1031 }
1032 Some(result)
1033 }
1034
1035 pub async fn deny_spawn_approval(
1036 &self,
1037 approval_id: &str,
1038 reason: Option<&str>,
1039 ) -> Option<PendingSpawnApproval> {
1040 let approval = self.spawn_approvals.write().await.remove(approval_id)?;
1041 let note = reason.unwrap_or("denied by operator");
1042 let _ = self
1043 .append_approval_audit("spawn.approval.denied", approval_id, None, note)
1044 .await;
1045 Some(approval)
1046 }
1047
1048 pub async fn cancel_instance(
1049 &self,
1050 state: &AppState,
1051 instance_id: &str,
1052 reason: &str,
1053 ) -> Option<AgentInstance> {
1054 let mut instances = self.instances.write().await;
1055 let instance = instances.get_mut(instance_id)?;
1056 if matches!(
1057 instance.status,
1058 AgentInstanceStatus::Completed
1059 | AgentInstanceStatus::Failed
1060 | AgentInstanceStatus::Cancelled
1061 ) {
1062 return Some(instance.clone());
1063 }
1064 instance.status = AgentInstanceStatus::Cancelled;
1065 let snapshot = instance.clone();
1066 drop(instances);
1067 let _ = state.cancellations.cancel(&snapshot.session_id).await;
1068 cleanup_instance_managed_worktree(state, &snapshot).await;
1069 let _ = self.append_audit("instance.cancelled", &snapshot).await;
1070 emit_instance_cancelled(state, &snapshot, reason);
1071 Some(snapshot)
1072 }
1073
1074 async fn queue_spawn_approval(&self, req: &SpawnRequest, decision: &SpawnDecision) {
1075 let approval = PendingSpawnApproval {
1076 approval_id: format!("spawn_{}", Uuid::new_v4().simple()),
1077 created_at_ms: crate::now_ms(),
1078 request: req.clone(),
1079 decision_code: decision.code.clone(),
1080 reason: decision.reason.clone(),
1081 };
1082 self.spawn_approvals
1083 .write()
1084 .await
1085 .insert(approval.approval_id.clone(), approval);
1086 }
1087
1088 async fn mission_budget_exceeded_reason(
1089 &self,
1090 policy: &SpawnPolicy,
1091 mission_id: &str,
1092 ) -> Option<String> {
1093 let Some(limit) = policy.mission_total_budget.as_ref() else {
1094 return None;
1095 };
1096 let usage = self
1097 .mission_budgets
1098 .read()
1099 .await
1100 .get(mission_id)
1101 .cloned()
1102 .unwrap_or_default();
1103 if let Some(max) = limit.max_tokens {
1104 if usage.tokens_used >= max {
1105 return Some(format!(
1106 "mission max_tokens exhausted ({}/{})",
1107 usage.tokens_used, max
1108 ));
1109 }
1110 }
1111 if let Some(max) = limit.max_steps {
1112 if usage.steps_used >= u64::from(max) {
1113 return Some(format!(
1114 "mission max_steps exhausted ({}/{})",
1115 usage.steps_used, max
1116 ));
1117 }
1118 }
1119 if let Some(max) = limit.max_tool_calls {
1120 if usage.tool_calls_used >= u64::from(max) {
1121 return Some(format!(
1122 "mission max_tool_calls exhausted ({}/{})",
1123 usage.tool_calls_used, max
1124 ));
1125 }
1126 }
1127 if let Some(max) = limit.max_cost_usd {
1128 if usage.cost_used_usd >= max {
1129 return Some(format!(
1130 "mission max_cost_usd exhausted ({:.6}/{:.6})",
1131 usage.cost_used_usd, max
1132 ));
1133 }
1134 }
1135 None
1136 }
1137
1138 pub async fn cancel_mission(&self, state: &AppState, mission_id: &str, reason: &str) -> usize {
1139 let instance_ids = self
1140 .instances
1141 .read()
1142 .await
1143 .values()
1144 .filter(|instance| instance.mission_id == mission_id)
1145 .map(|instance| instance.instance_id.clone())
1146 .collect::<Vec<_>>();
1147 let mut count = 0usize;
1148 for instance_id in instance_ids {
1149 if self
1150 .cancel_instance(state, &instance_id, reason)
1151 .await
1152 .is_some()
1153 {
1154 count = count.saturating_add(1);
1155 }
1156 }
1157 count
1158 }
1159
1160 async fn mark_instance_terminal(
1161 &self,
1162 state: &AppState,
1163 instance_id: &str,
1164 status: AgentInstanceStatus,
1165 ) -> Option<AgentInstance> {
1166 let mut instances = self.instances.write().await;
1167 let instance = instances.get_mut(instance_id)?;
1168 if matches!(
1169 instance.status,
1170 AgentInstanceStatus::Completed
1171 | AgentInstanceStatus::Failed
1172 | AgentInstanceStatus::Cancelled
1173 ) {
1174 return Some(instance.clone());
1175 }
1176 instance.status = status.clone();
1177 let snapshot = instance.clone();
1178 drop(instances);
1179 cleanup_instance_managed_worktree(state, &snapshot).await;
1180 match status {
1181 AgentInstanceStatus::Completed => emit_instance_completed(state, &snapshot),
1182 AgentInstanceStatus::Failed => emit_instance_failed(state, &snapshot),
1183 _ => {}
1184 }
1185 Some(snapshot)
1186 }
1187
1188 pub async fn handle_engine_event(&self, state: &AppState, event: &EngineEvent) {
1189 let Some(session_id) = extract_session_id(event) else {
1190 return;
1191 };
1192 let Some(instance_id) = self.instance_id_for_session(&session_id).await else {
1193 return;
1194 };
1195 if event.event_type == "provider.usage" {
1196 let total_tokens = event
1197 .properties
1198 .get("totalTokens")
1199 .and_then(|v| v.as_u64())
1200 .unwrap_or(0);
1201 let cost_used_usd = event
1202 .properties
1203 .get("costUsd")
1204 .and_then(|v| v.as_f64())
1205 .unwrap_or(0.0);
1206 if total_tokens > 0 {
1207 let exhausted = self
1208 .apply_exact_token_usage(state, &instance_id, total_tokens, cost_used_usd)
1209 .await;
1210 if exhausted {
1211 let _ = self
1212 .cancel_instance(state, &instance_id, "budget exhausted")
1213 .await;
1214 }
1215 }
1216 return;
1217 }
1218 let mut delta_tokens = 0u64;
1219 let mut delta_steps = 0u32;
1220 let mut delta_tool_calls = 0u32;
1221 if event.event_type == "message.part.updated" {
1222 if let Some(part) = event.properties.get("part") {
1223 let part_type = part.get("type").and_then(|v| v.as_str()).unwrap_or("");
1224 if part_type == "tool-invocation" {
1225 delta_tool_calls = 1;
1226 } else if part_type == "text" {
1227 let delta = event
1228 .properties
1229 .get("delta")
1230 .and_then(|v| v.as_str())
1231 .unwrap_or("");
1232 if !delta.is_empty() {
1233 delta_tokens = estimate_tokens(delta);
1234 }
1235 }
1236 }
1237 } else if event.event_type == "session.run.finished" {
1238 delta_steps = 1;
1239 let run_status = event
1240 .properties
1241 .get("status")
1242 .and_then(|v| v.as_str())
1243 .unwrap_or("")
1244 .to_ascii_lowercase();
1245 if run_status == "completed" {
1246 let _ = self
1247 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Completed)
1248 .await;
1249 } else if run_status == "failed" || run_status == "error" {
1250 let _ = self
1251 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Failed)
1252 .await;
1253 }
1254 }
1255 if delta_tokens == 0 && delta_steps == 0 && delta_tool_calls == 0 {
1256 return;
1257 }
1258 let exhausted = self
1259 .apply_budget_delta(
1260 state,
1261 &instance_id,
1262 delta_tokens,
1263 delta_steps,
1264 delta_tool_calls,
1265 )
1266 .await;
1267 if exhausted {
1268 let _ = self
1269 .cancel_instance(state, &instance_id, "budget exhausted")
1270 .await;
1271 }
1272 }
1273
1274 async fn instance_id_for_session(&self, session_id: &str) -> Option<String> {
1275 self.instances
1276 .read()
1277 .await
1278 .values()
1279 .find(|instance| instance.session_id == session_id)
1280 .map(|instance| instance.instance_id.clone())
1281 }
1282
1283 async fn apply_budget_delta(
1284 &self,
1285 state: &AppState,
1286 instance_id: &str,
1287 delta_tokens: u64,
1288 delta_steps: u32,
1289 delta_tool_calls: u32,
1290 ) -> bool {
1291 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1292 enabled: false,
1293 require_justification: false,
1294 max_agents: None,
1295 max_concurrent: None,
1296 child_budget_percent_of_parent_remaining: None,
1297 mission_total_budget: None,
1298 cost_per_1k_tokens_usd: None,
1299 spawn_edges: HashMap::new(),
1300 required_skills: HashMap::new(),
1301 role_defaults: HashMap::new(),
1302 skill_sources: Default::default(),
1303 });
1304 let mut budgets = self.budgets.write().await;
1305 let Some(usage) = budgets.get_mut(instance_id) else {
1306 return false;
1307 };
1308 if usage.exhausted {
1309 return true;
1310 }
1311 let prev_cost_used_usd = usage.cost_used_usd;
1312 usage.tokens_used = usage.tokens_used.saturating_add(delta_tokens);
1313 usage.steps_used = usage.steps_used.saturating_add(delta_steps);
1314 usage.tool_calls_used = usage.tool_calls_used.saturating_add(delta_tool_calls);
1315 if let Some(rate) = policy.cost_per_1k_tokens_usd {
1316 usage.cost_used_usd += (delta_tokens as f64 / 1000.0) * rate;
1317 }
1318 let elapsed_ms = usage
1319 .started_at
1320 .map(|started| started.elapsed().as_millis() as u64)
1321 .unwrap_or(0);
1322
1323 let mut exhausted_reason: Option<&'static str> = None;
1324 let mut snapshot: Option<AgentInstance> = None;
1325 {
1326 let mut instances = self.instances.write().await;
1327 if let Some(instance) = instances.get_mut(instance_id) {
1328 instance.metadata = Some(merge_metadata_usage(
1329 instance.metadata.take(),
1330 usage.tokens_used,
1331 usage.steps_used,
1332 usage.tool_calls_used,
1333 usage.cost_used_usd,
1334 elapsed_ms,
1335 ));
1336 if let Some(limit) = instance.budget.max_tokens {
1337 if usage.tokens_used >= limit {
1338 exhausted_reason = Some("max_tokens");
1339 }
1340 }
1341 if exhausted_reason.is_none() {
1342 if let Some(limit) = instance.budget.max_steps {
1343 if usage.steps_used >= limit {
1344 exhausted_reason = Some("max_steps");
1345 }
1346 }
1347 }
1348 if exhausted_reason.is_none() {
1349 if let Some(limit) = instance.budget.max_tool_calls {
1350 if usage.tool_calls_used >= limit {
1351 exhausted_reason = Some("max_tool_calls");
1352 }
1353 }
1354 }
1355 if exhausted_reason.is_none() {
1356 if let Some(limit) = instance.budget.max_duration_ms {
1357 if elapsed_ms >= limit {
1358 exhausted_reason = Some("max_duration_ms");
1359 }
1360 }
1361 }
1362 if exhausted_reason.is_none() {
1363 if let Some(limit) = instance.budget.max_cost_usd {
1364 if usage.cost_used_usd >= limit {
1365 exhausted_reason = Some("max_cost_usd");
1366 }
1367 }
1368 }
1369 snapshot = Some(instance.clone());
1370 }
1371 }
1372 let Some(instance) = snapshot else {
1373 return false;
1374 };
1375 emit_budget_usage(
1376 state,
1377 &instance,
1378 usage.tokens_used,
1379 usage.steps_used,
1380 usage.tool_calls_used,
1381 usage.cost_used_usd,
1382 elapsed_ms,
1383 );
1384 let mission_exhausted = self
1385 .apply_mission_budget_delta(
1386 state,
1387 &instance.mission_id,
1388 delta_tokens,
1389 u64::from(delta_steps),
1390 u64::from(delta_tool_calls),
1391 usage.cost_used_usd - prev_cost_used_usd,
1392 &policy,
1393 )
1394 .await;
1395 if mission_exhausted {
1396 usage.exhausted = true;
1397 let _ = self
1398 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1399 .await;
1400 return true;
1401 }
1402 if let Some(reason) = exhausted_reason {
1403 usage.exhausted = true;
1404 emit_budget_exhausted(
1405 state,
1406 &instance,
1407 reason,
1408 usage.tokens_used,
1409 usage.steps_used,
1410 usage.tool_calls_used,
1411 usage.cost_used_usd,
1412 elapsed_ms,
1413 );
1414 return true;
1415 }
1416 false
1417 }
1418
1419 async fn apply_exact_token_usage(
1420 &self,
1421 state: &AppState,
1422 instance_id: &str,
1423 total_tokens: u64,
1424 cost_used_usd: f64,
1425 ) -> bool {
1426 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1427 enabled: false,
1428 require_justification: false,
1429 max_agents: None,
1430 max_concurrent: None,
1431 child_budget_percent_of_parent_remaining: None,
1432 mission_total_budget: None,
1433 cost_per_1k_tokens_usd: None,
1434 spawn_edges: HashMap::new(),
1435 required_skills: HashMap::new(),
1436 role_defaults: HashMap::new(),
1437 skill_sources: Default::default(),
1438 });
1439 let mut budgets = self.budgets.write().await;
1440 let Some(usage) = budgets.get_mut(instance_id) else {
1441 return false;
1442 };
1443 if usage.exhausted {
1444 return true;
1445 }
1446 let prev_tokens = usage.tokens_used;
1447 let prev_cost_used_usd = usage.cost_used_usd;
1448 usage.tokens_used = usage.tokens_used.max(total_tokens);
1449 if cost_used_usd > 0.0 {
1450 usage.cost_used_usd = usage.cost_used_usd.max(cost_used_usd);
1451 } else if let Some(rate) = policy.cost_per_1k_tokens_usd {
1452 let delta = usage.tokens_used.saturating_sub(prev_tokens);
1453 usage.cost_used_usd += (delta as f64 / 1000.0) * rate;
1454 }
1455 let elapsed_ms = usage
1456 .started_at
1457 .map(|started| started.elapsed().as_millis() as u64)
1458 .unwrap_or(0);
1459 let mut exhausted_reason: Option<&'static str> = None;
1460 let mut snapshot: Option<AgentInstance> = None;
1461 {
1462 let mut instances = self.instances.write().await;
1463 if let Some(instance) = instances.get_mut(instance_id) {
1464 instance.metadata = Some(merge_metadata_usage(
1465 instance.metadata.take(),
1466 usage.tokens_used,
1467 usage.steps_used,
1468 usage.tool_calls_used,
1469 usage.cost_used_usd,
1470 elapsed_ms,
1471 ));
1472 if let Some(limit) = instance.budget.max_tokens {
1473 if usage.tokens_used >= limit {
1474 exhausted_reason = Some("max_tokens");
1475 }
1476 }
1477 if exhausted_reason.is_none() {
1478 if let Some(limit) = instance.budget.max_cost_usd {
1479 if usage.cost_used_usd >= limit {
1480 exhausted_reason = Some("max_cost_usd");
1481 }
1482 }
1483 }
1484 snapshot = Some(instance.clone());
1485 }
1486 }
1487 let Some(instance) = snapshot else {
1488 return false;
1489 };
1490 emit_budget_usage(
1491 state,
1492 &instance,
1493 usage.tokens_used,
1494 usage.steps_used,
1495 usage.tool_calls_used,
1496 usage.cost_used_usd,
1497 elapsed_ms,
1498 );
1499 let mission_exhausted = self
1500 .apply_mission_budget_delta(
1501 state,
1502 &instance.mission_id,
1503 usage.tokens_used.saturating_sub(prev_tokens),
1504 0,
1505 0,
1506 usage.cost_used_usd - prev_cost_used_usd,
1507 &policy,
1508 )
1509 .await;
1510 if mission_exhausted {
1511 usage.exhausted = true;
1512 let _ = self
1513 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1514 .await;
1515 return true;
1516 }
1517 if let Some(reason) = exhausted_reason {
1518 usage.exhausted = true;
1519 emit_budget_exhausted(
1520 state,
1521 &instance,
1522 reason,
1523 usage.tokens_used,
1524 usage.steps_used,
1525 usage.tool_calls_used,
1526 usage.cost_used_usd,
1527 elapsed_ms,
1528 );
1529 return true;
1530 }
1531 false
1532 }
1533
1534 async fn append_audit(&self, action: &str, instance: &AgentInstance) -> anyhow::Result<()> {
1535 let path = self.audit_path.read().await.clone();
1536 if let Some(parent) = path.parent() {
1537 fs::create_dir_all(parent).await?;
1538 }
1539 let row = json!({
1540 "action": action,
1541 "missionID": instance.mission_id,
1542 "instanceID": instance.instance_id,
1543 "parentInstanceID": instance.parent_instance_id,
1544 "role": instance.role,
1545 "templateID": instance.template_id,
1546 "sessionID": instance.session_id,
1547 "skillHash": instance.skill_hash,
1548 "workspaceRoot": instance_workspace_root(instance),
1549 "workspaceRepoRoot": instance_workspace_repo_root(instance),
1550 "managedWorktree": instance_managed_worktree(instance),
1551 "timestampMs": crate::now_ms(),
1552 });
1553 let mut existing = if path.exists() {
1554 fs::read_to_string(&path).await.unwrap_or_default()
1555 } else {
1556 String::new()
1557 };
1558 existing.push_str(&serde_json::to_string(&row)?);
1559 existing.push('\n');
1560 fs::write(path, existing).await?;
1561 Ok(())
1562 }
1563
1564 async fn append_approval_audit(
1565 &self,
1566 action: &str,
1567 approval_id: &str,
1568 instance: Option<&AgentInstance>,
1569 reason: &str,
1570 ) -> anyhow::Result<()> {
1571 let path = self.audit_path.read().await.clone();
1572 if let Some(parent) = path.parent() {
1573 fs::create_dir_all(parent).await?;
1574 }
1575 let row = json!({
1576 "action": action,
1577 "approvalID": approval_id,
1578 "reason": reason,
1579 "missionID": instance.map(|v| v.mission_id.clone()),
1580 "instanceID": instance.map(|v| v.instance_id.clone()),
1581 "parentInstanceID": instance.and_then(|v| v.parent_instance_id.clone()),
1582 "role": instance.map(|v| v.role.clone()),
1583 "templateID": instance.map(|v| v.template_id.clone()),
1584 "sessionID": instance.map(|v| v.session_id.clone()),
1585 "skillHash": instance.map(|v| v.skill_hash.clone()),
1586 "workspaceRoot": instance.and_then(instance_workspace_root),
1587 "workspaceRepoRoot": instance.and_then(instance_workspace_repo_root),
1588 "managedWorktree": instance.and_then(instance_managed_worktree),
1589 "timestampMs": crate::now_ms(),
1590 });
1591 let mut existing = if path.exists() {
1592 fs::read_to_string(&path).await.unwrap_or_default()
1593 } else {
1594 String::new()
1595 };
1596 existing.push_str(&serde_json::to_string(&row)?);
1597 existing.push('\n');
1598 fs::write(path, existing).await?;
1599 Ok(())
1600 }
1601
1602 async fn apply_mission_budget_delta(
1603 &self,
1604 state: &AppState,
1605 mission_id: &str,
1606 delta_tokens: u64,
1607 delta_steps: u64,
1608 delta_tool_calls: u64,
1609 delta_cost_used_usd: f64,
1610 policy: &SpawnPolicy,
1611 ) -> bool {
1612 let mut budgets = self.mission_budgets.write().await;
1613 let row = budgets.entry(mission_id.to_string()).or_default();
1614 row.tokens_used = row.tokens_used.saturating_add(delta_tokens);
1615 row.steps_used = row.steps_used.saturating_add(delta_steps);
1616 row.tool_calls_used = row.tool_calls_used.saturating_add(delta_tool_calls);
1617 row.cost_used_usd += delta_cost_used_usd.max(0.0);
1618 if row.exhausted {
1619 return true;
1620 }
1621 let Some(limit) = policy.mission_total_budget.as_ref() else {
1622 return false;
1623 };
1624 let mut exhausted_by: Option<&'static str> = None;
1625 if let Some(max) = limit.max_tokens {
1626 if row.tokens_used >= max {
1627 exhausted_by = Some("mission_max_tokens");
1628 }
1629 }
1630 if exhausted_by.is_none() {
1631 if let Some(max) = limit.max_steps {
1632 if row.steps_used >= u64::from(max) {
1633 exhausted_by = Some("mission_max_steps");
1634 }
1635 }
1636 }
1637 if exhausted_by.is_none() {
1638 if let Some(max) = limit.max_tool_calls {
1639 if row.tool_calls_used >= u64::from(max) {
1640 exhausted_by = Some("mission_max_tool_calls");
1641 }
1642 }
1643 }
1644 if exhausted_by.is_none() {
1645 if let Some(max) = limit.max_cost_usd {
1646 if row.cost_used_usd >= max {
1647 exhausted_by = Some("mission_max_cost_usd");
1648 }
1649 }
1650 }
1651 if let Some(exhausted_by) = exhausted_by {
1652 row.exhausted = true;
1653 emit_mission_budget_exhausted(
1654 state,
1655 mission_id,
1656 exhausted_by,
1657 row.tokens_used,
1658 row.steps_used,
1659 row.tool_calls_used,
1660 row.cost_used_usd,
1661 );
1662 return true;
1663 }
1664 false
1665 }
1666
1667 pub async fn set_for_test(
1668 &self,
1669 workspace_root: Option<String>,
1670 policy: Option<SpawnPolicy>,
1671 templates: Vec<AgentTemplate>,
1672 ) {
1673 *self.policy.write().await = policy;
1674 let mut by_id = HashMap::new();
1675 for template in templates {
1676 by_id.insert(template.template_id.clone(), template);
1677 }
1678 *self.templates.write().await = by_id;
1679 self.instances.write().await.clear();
1680 self.budgets.write().await.clear();
1681 self.mission_budgets.write().await.clear();
1682 self.spawn_approvals.write().await.clear();
1683 *self.loaded_workspace.write().await = workspace_root;
1684 }
1685}
1686
1687fn resolve_budget(
1688 policy: &SpawnPolicy,
1689 parent_instance: Option<AgentInstance>,
1690 parent_usage: Option<InstanceBudgetState>,
1691 template: &AgentTemplate,
1692 override_budget: Option<BudgetLimit>,
1693 role: &AgentRole,
1694) -> BudgetLimit {
1695 let role_default = policy.role_defaults.get(role).cloned().unwrap_or_default();
1696 let mut chosen = merge_budget(
1697 merge_budget(role_default, template.default_budget.clone()),
1698 override_budget.unwrap_or_default(),
1699 );
1700
1701 if let Some(parent) = parent_instance {
1702 let usage = parent_usage.unwrap_or_default();
1703 if let Some(pct) = policy.child_budget_percent_of_parent_remaining {
1704 if pct > 0 {
1705 chosen.max_tokens = cap_budget_remaining_u64(
1706 chosen.max_tokens,
1707 parent.budget.max_tokens,
1708 usage.tokens_used,
1709 pct,
1710 );
1711 chosen.max_steps = cap_budget_remaining_u32(
1712 chosen.max_steps,
1713 parent.budget.max_steps,
1714 usage.steps_used,
1715 pct,
1716 );
1717 chosen.max_tool_calls = cap_budget_remaining_u32(
1718 chosen.max_tool_calls,
1719 parent.budget.max_tool_calls,
1720 usage.tool_calls_used,
1721 pct,
1722 );
1723 chosen.max_duration_ms = cap_budget_remaining_u64(
1724 chosen.max_duration_ms,
1725 parent.budget.max_duration_ms,
1726 usage
1727 .started_at
1728 .map(|started| started.elapsed().as_millis() as u64)
1729 .unwrap_or(0),
1730 pct,
1731 );
1732 chosen.max_cost_usd = cap_budget_remaining_f64(
1733 chosen.max_cost_usd,
1734 parent.budget.max_cost_usd,
1735 usage.cost_used_usd,
1736 pct,
1737 );
1738 }
1739 }
1740 }
1741 chosen
1742}
1743
1744fn merge_budget(base: BudgetLimit, overlay: BudgetLimit) -> BudgetLimit {
1745 BudgetLimit {
1746 max_tokens: overlay.max_tokens.or(base.max_tokens),
1747 max_steps: overlay.max_steps.or(base.max_steps),
1748 max_tool_calls: overlay.max_tool_calls.or(base.max_tool_calls),
1749 max_duration_ms: overlay.max_duration_ms.or(base.max_duration_ms),
1750 max_cost_usd: overlay.max_cost_usd.or(base.max_cost_usd),
1751 }
1752}
1753
1754fn cap_budget_remaining_u64(
1755 child: Option<u64>,
1756 parent_limit: Option<u64>,
1757 parent_used: u64,
1758 pct: u8,
1759) -> Option<u64> {
1760 match (child, parent_limit) {
1761 (Some(child), Some(parent_limit)) => {
1762 let remaining = parent_limit.saturating_sub(parent_used);
1763 Some(child.min(remaining.saturating_mul(pct as u64) / 100))
1764 }
1765 (None, Some(parent_limit)) => {
1766 let remaining = parent_limit.saturating_sub(parent_used);
1767 Some(remaining.saturating_mul(pct as u64) / 100)
1768 }
1769 (Some(child), None) => Some(child),
1770 (None, None) => None,
1771 }
1772}
1773
1774fn cap_budget_remaining_u32(
1775 child: Option<u32>,
1776 parent_limit: Option<u32>,
1777 parent_used: u32,
1778 pct: u8,
1779) -> Option<u32> {
1780 match (child, parent_limit) {
1781 (Some(child), Some(parent_limit)) => {
1782 let remaining = parent_limit.saturating_sub(parent_used);
1783 Some(child.min(remaining.saturating_mul(pct as u32) / 100))
1784 }
1785 (None, Some(parent_limit)) => {
1786 let remaining = parent_limit.saturating_sub(parent_used);
1787 Some(remaining.saturating_mul(pct as u32) / 100)
1788 }
1789 (Some(child), None) => Some(child),
1790 (None, None) => None,
1791 }
1792}
1793
1794fn cap_budget_remaining_f64(
1795 child: Option<f64>,
1796 parent_limit: Option<f64>,
1797 parent_used: f64,
1798 pct: u8,
1799) -> Option<f64> {
1800 match (child, parent_limit) {
1801 (Some(child), Some(parent_limit)) => {
1802 let remaining = (parent_limit - parent_used).max(0.0);
1803 Some(child.min(remaining * f64::from(pct) / 100.0))
1804 }
1805 (None, Some(parent_limit)) => {
1806 let remaining = (parent_limit - parent_used).max(0.0);
1807 Some(remaining * f64::from(pct) / 100.0)
1808 }
1809 (Some(child), None) => Some(child),
1810 (None, None) => None,
1811 }
1812}
1813
1814async fn compute_skill_hash(
1815 workspace_root: &str,
1816 template: &AgentTemplate,
1817 policy: &SpawnPolicy,
1818) -> Result<String, String> {
1819 use sha2::{Digest, Sha256};
1820 let mut rows = Vec::new();
1821 let skill_service = SkillService::for_workspace(Some(PathBuf::from(workspace_root)));
1822 for skill in &template.skills {
1823 if let Some(path) = skill.path.as_deref() {
1824 validate_skill_source(skill.id.as_deref(), Some(path), policy)?;
1825 let skill_path = Path::new(workspace_root).join(path);
1826 let raw = fs::read_to_string(&skill_path)
1827 .await
1828 .map_err(|_| format!("missing required skill path `{}`", skill_path.display()))?;
1829 let digest = hash_hex(raw.as_bytes());
1830 validate_pinned_hash(skill.id.as_deref(), Some(path), &digest, policy)?;
1831 rows.push(format!("path:{}:{}", path, digest));
1832 } else if let Some(id) = skill.id.as_deref() {
1833 validate_skill_source(Some(id), None, policy)?;
1834 let loaded = skill_service
1835 .load_skill(id)
1836 .map_err(|err| format!("failed loading skill `{id}`: {err}"))?;
1837 let Some(loaded) = loaded else {
1838 return Err(format!("missing required skill id `{id}`"));
1839 };
1840 let digest = hash_hex(loaded.content.as_bytes());
1841 validate_pinned_hash(Some(id), None, &digest, policy)?;
1842 rows.push(format!("id:{}:{}", id, digest));
1843 }
1844 }
1845 rows.sort();
1846 let mut hasher = Sha256::new();
1847 for row in rows {
1848 hasher.update(row.as_bytes());
1849 hasher.update(b"\n");
1850 }
1851 let digest = hasher.finalize();
1852 Ok(format!("sha256:{}", hash_hex(digest.as_slice())))
1853}
1854
1855fn validate_skill_source(
1856 id: Option<&str>,
1857 path: Option<&str>,
1858 policy: &SpawnPolicy,
1859) -> Result<(), String> {
1860 use tandem_orchestrator::SkillSourceMode;
1861 match policy.skill_sources.mode {
1862 SkillSourceMode::Any => Ok(()),
1863 SkillSourceMode::ProjectOnly => {
1864 if id.is_some() {
1865 return Err("skill source denied: project_only forbids skill IDs".to_string());
1866 }
1867 let Some(path) = path else {
1868 return Err("skill source denied: project_only requires skill path".to_string());
1869 };
1870 let p = PathBuf::from(path);
1871 if p.is_absolute() {
1872 return Err("skill source denied: absolute skill paths are forbidden".to_string());
1873 }
1874 Ok(())
1875 }
1876 SkillSourceMode::Allowlist => {
1877 if let Some(id) = id {
1878 if policy.skill_sources.allowlist_ids.iter().any(|v| v == id) {
1879 return Ok(());
1880 }
1881 }
1882 if let Some(path) = path {
1883 if policy
1884 .skill_sources
1885 .allowlist_paths
1886 .iter()
1887 .any(|v| v == path)
1888 {
1889 return Ok(());
1890 }
1891 }
1892 Err("skill source denied: not present in allowlist".to_string())
1893 }
1894 }
1895}
1896
1897fn validate_pinned_hash(
1898 id: Option<&str>,
1899 path: Option<&str>,
1900 digest: &str,
1901 policy: &SpawnPolicy,
1902) -> Result<(), String> {
1903 let by_id = id.and_then(|id| policy.skill_sources.pinned_hashes.get(&format!("id:{id}")));
1904 let by_path = path.and_then(|path| {
1905 policy
1906 .skill_sources
1907 .pinned_hashes
1908 .get(&format!("path:{path}"))
1909 });
1910 let expected = by_id.or(by_path);
1911 if let Some(expected) = expected {
1912 let normalized = expected.strip_prefix("sha256:").unwrap_or(expected);
1913 if normalized != digest {
1914 return Err("pinned hash mismatch for skill reference".to_string());
1915 }
1916 }
1917 Ok(())
1918}
1919
1920fn hash_hex(bytes: &[u8]) -> String {
1921 let mut out = String::with_capacity(bytes.len() * 2);
1922 for byte in bytes {
1923 use std::fmt::Write as _;
1924 let _ = write!(&mut out, "{:02x}", byte);
1925 }
1926 out
1927}
1928
1929fn estimate_tokens(text: &str) -> u64 {
1930 let chars = text.chars().count() as u64;
1931 (chars / 4).max(1)
1932}
1933
1934fn extract_session_id(event: &EngineEvent) -> Option<String> {
1935 event
1936 .properties
1937 .get("sessionID")
1938 .and_then(|v| v.as_str())
1939 .map(|v| v.to_string())
1940 .or_else(|| {
1941 event
1942 .properties
1943 .get("part")
1944 .and_then(|v| v.get("sessionID"))
1945 .and_then(|v| v.as_str())
1946 .map(|v| v.to_string())
1947 })
1948}
1949
1950fn merge_metadata_usage(
1951 metadata: Option<Value>,
1952 tokens_used: u64,
1953 steps_used: u32,
1954 tool_calls_used: u32,
1955 cost_used_usd: f64,
1956 elapsed_ms: u64,
1957) -> Value {
1958 let mut base = metadata
1959 .and_then(|v| v.as_object().cloned())
1960 .unwrap_or_default();
1961 base.insert(
1962 "budgetUsage".to_string(),
1963 json!({
1964 "tokensUsed": tokens_used,
1965 "stepsUsed": steps_used,
1966 "toolCallsUsed": tool_calls_used,
1967 "costUsedUsd": cost_used_usd,
1968 "elapsedMs": elapsed_ms
1969 }),
1970 );
1971 Value::Object(base)
1972}
1973
1974fn instance_workspace_root(instance: &AgentInstance) -> Option<Value> {
1975 instance
1976 .metadata
1977 .as_ref()
1978 .and_then(|row| row.get("workspaceRoot"))
1979 .cloned()
1980}
1981
1982fn instance_workspace_repo_root(instance: &AgentInstance) -> Option<Value> {
1983 instance
1984 .metadata
1985 .as_ref()
1986 .and_then(|row| row.get("workspaceRepoRoot"))
1987 .cloned()
1988}
1989
1990fn instance_managed_worktree(instance: &AgentInstance) -> Option<Value> {
1991 instance
1992 .metadata
1993 .as_ref()
1994 .and_then(|row| row.get("managedWorktree"))
1995 .cloned()
1996}
1997
1998async fn prepare_agent_instance_workspace(
1999 state: &AppState,
2000 workspace_root: &str,
2001 mission_id: Option<&str>,
2002 instance_id: &str,
2003 template_id: &str,
2004) -> Option<crate::runtime::worktrees::ManagedWorktreeEnsureResult> {
2005 let repo_root = crate::runtime::worktrees::resolve_git_repo_root(workspace_root)?;
2006 crate::runtime::worktrees::ensure_managed_worktree(
2007 state,
2008 crate::runtime::worktrees::ManagedWorktreeEnsureInput {
2009 repo_root,
2010 task_id: mission_id.map(ToString::to_string),
2011 owner_run_id: Some(instance_id.to_string()),
2012 lease_id: None,
2013 branch_hint: Some(template_id.to_string()),
2014 base: "HEAD".to_string(),
2015 cleanup_branch: true,
2016 },
2017 )
2018 .await
2019 .ok()
2020}
2021
2022async fn cleanup_instance_managed_worktree(state: &AppState, instance: &AgentInstance) {
2023 let Some(metadata) = instance.metadata.as_ref() else {
2024 return;
2025 };
2026 let Some(worktree) = metadata.get("managedWorktree").and_then(Value::as_object) else {
2027 return;
2028 };
2029 let Some(path) = worktree.get("path").and_then(Value::as_str) else {
2030 return;
2031 };
2032 let Some(branch) = worktree.get("branch").and_then(Value::as_str) else {
2033 return;
2034 };
2035 let Some(repo_root) = worktree.get("repoRoot").and_then(Value::as_str) else {
2036 return;
2037 };
2038 let record = crate::ManagedWorktreeRecord {
2039 key: crate::runtime::worktrees::managed_worktree_key(
2040 repo_root,
2041 instance.mission_id.as_str().into(),
2042 Some(instance.instance_id.as_str()),
2043 None,
2044 path,
2045 branch,
2046 ),
2047 repo_root: repo_root.to_string(),
2048 path: path.to_string(),
2049 branch: branch.to_string(),
2050 base: "HEAD".to_string(),
2051 managed: true,
2052 task_id: Some(instance.mission_id.clone()),
2053 owner_run_id: Some(instance.instance_id.clone()),
2054 lease_id: None,
2055 cleanup_branch: worktree
2056 .get("cleanupBranch")
2057 .and_then(Value::as_bool)
2058 .unwrap_or(true),
2059 created_at_ms: 0,
2060 updated_at_ms: 0,
2061 };
2062 let _ = crate::runtime::worktrees::delete_managed_worktree(state, &record).await;
2063}
2064
2065fn normalize_tool_name(name: &str) -> String {
2066 match name.trim().to_lowercase().replace('-', "_").as_str() {
2067 "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
2068 other => other.to_string(),
2069 }
2070}
2071
2072async fn evaluate_capability_deny(
2073 state: &AppState,
2074 instance: &AgentInstance,
2075 tool: &str,
2076 args: &Value,
2077 caps: &tandem_orchestrator::CapabilitySpec,
2078 session_id: &str,
2079 message_id: &str,
2080) -> Option<String> {
2081 let deny_patterns = caps
2082 .tool_denylist
2083 .iter()
2084 .map(|name| normalize_tool_name(name))
2085 .collect::<Vec<_>>();
2086 if !deny_patterns.is_empty() && any_policy_matches(&deny_patterns, tool) {
2087 return Some(format!("tool `{tool}` denied by agent capability policy"));
2088 }
2089 let allow_patterns = caps
2090 .tool_allowlist
2091 .iter()
2092 .map(|name| normalize_tool_name(name))
2093 .collect::<Vec<_>>();
2094 if !allow_patterns.is_empty() && !any_policy_matches(&allow_patterns, tool) {
2095 return Some(format!("tool `{tool}` not in agent allowlist"));
2096 }
2097
2098 let browser_execution_tool = matches!(
2099 tool,
2100 "browser_open"
2101 | "browser_navigate"
2102 | "browser_snapshot"
2103 | "browser_click"
2104 | "browser_type"
2105 | "browser_press"
2106 | "browser_wait"
2107 | "browser_extract"
2108 | "browser_screenshot"
2109 | "browser_close"
2110 );
2111
2112 if matches!(
2113 tool,
2114 "websearch" | "webfetch" | "webfetch_html" | "browser_open" | "browser_navigate"
2115 ) || browser_execution_tool
2116 {
2117 if !caps.net_scopes.enabled {
2118 return Some("network disabled for this agent instance".to_string());
2119 }
2120 if !caps.net_scopes.allow_hosts.is_empty() {
2121 if tool == "websearch" {
2122 return Some(
2123 "websearch blocked: host allowlist cannot be verified for search tool"
2124 .to_string(),
2125 );
2126 }
2127 if let Some(host) = extract_url_host(args) {
2128 let allowed = caps.net_scopes.allow_hosts.iter().any(|h| {
2129 let allowed = h.trim().to_ascii_lowercase();
2130 !allowed.is_empty()
2131 && (host == allowed || host.ends_with(&format!(".{allowed}")))
2132 });
2133 if !allowed {
2134 return Some(format!("network host `{host}` not in allow_hosts"));
2135 }
2136 }
2137 }
2138 }
2139
2140 if tool == "bash" {
2141 let cmd = args
2142 .get("command")
2143 .and_then(|v| v.as_str())
2144 .unwrap_or("")
2145 .to_ascii_lowercase();
2146 if cmd.contains("git push") {
2147 if !caps.git_caps.push {
2148 return Some("git push disabled for this agent instance".to_string());
2149 }
2150 if caps.git_caps.push_requires_approval {
2151 let action = state.permissions.evaluate("git_push", "git_push").await;
2152 match action {
2153 tandem_core::PermissionAction::Allow => {}
2154 tandem_core::PermissionAction::Deny => {
2155 return Some("git push denied by policy rule".to_string());
2156 }
2157 tandem_core::PermissionAction::Ask => {
2158 let pending = state
2159 .permissions
2160 .ask_for_session_with_context(
2161 Some(session_id),
2162 "git_push",
2163 args.clone(),
2164 Some(tandem_core::PermissionArgsContext {
2165 args_source: "agent_team.git_push".to_string(),
2166 args_integrity: "runtime-checked".to_string(),
2167 query: Some(format!(
2168 "instanceID={} messageID={}",
2169 instance.instance_id, message_id
2170 )),
2171 }),
2172 )
2173 .await;
2174 return Some(format!(
2175 "git push requires explicit user approval (approvalID={})",
2176 pending.id
2177 ));
2178 }
2179 }
2180 }
2181 }
2182 if cmd.contains("git commit") && !caps.git_caps.commit {
2183 return Some("git commit disabled for this agent instance".to_string());
2184 }
2185 }
2186
2187 let access_kind = tool_fs_access_kind(tool);
2188 if let Some(kind) = access_kind {
2189 let Some(session) = state.storage.get_session(session_id).await else {
2190 return Some("session not found for capability evaluation".to_string());
2191 };
2192 let Some(root) = session.workspace_root.clone() else {
2193 return Some("workspace root missing for capability evaluation".to_string());
2194 };
2195 let requested = extract_tool_candidate_paths(tool, args);
2196 if !requested.is_empty() {
2197 let allowed_scopes = if kind == "read" {
2198 &caps.fs_scopes.read
2199 } else {
2200 &caps.fs_scopes.write
2201 };
2202 if allowed_scopes.is_empty() {
2203 return Some(format!("fs {kind} access blocked: no scopes configured"));
2204 }
2205 for candidate in requested {
2206 if !is_path_allowed_by_scopes(&root, &candidate, allowed_scopes) {
2207 return Some(format!("fs {kind} access denied for path `{}`", candidate));
2208 }
2209 }
2210 }
2211 }
2212
2213 denied_secrets_reason(tool, caps, args)
2214}
2215
2216fn denied_secrets_reason(
2217 tool: &str,
2218 caps: &tandem_orchestrator::CapabilitySpec,
2219 args: &Value,
2220) -> Option<String> {
2221 if tool == "auth" {
2222 if caps.secrets_scopes.is_empty() {
2223 return Some("secrets are disabled for this agent instance".to_string());
2224 }
2225 let alias = args
2226 .get("id")
2227 .or_else(|| args.get("provider"))
2228 .or_else(|| args.get("providerID"))
2229 .and_then(|v| v.as_str())
2230 .unwrap_or("")
2231 .trim();
2232 if !alias.is_empty() && !caps.secrets_scopes.iter().any(|allowed| allowed == alias) {
2233 return Some(format!(
2234 "secret alias `{alias}` is not in agent secretsScopes allowlist"
2235 ));
2236 }
2237 }
2238 None
2239}
2240
2241fn tool_fs_access_kind(tool: &str) -> Option<&'static str> {
2242 match tool {
2243 "read" | "glob" | "grep" | "codesearch" | "lsp" => Some("read"),
2244 "write" | "edit" | "apply_patch" => Some("write"),
2245 _ => None,
2246 }
2247}
2248
2249fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
2250 let Some(obj) = args.as_object() else {
2251 return Vec::new();
2252 };
2253 let keys: &[&str] = match tool {
2254 "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
2255 "glob" => &["pattern"],
2256 "lsp" => &["filePath", "path"],
2257 "bash" => &["cwd"],
2258 "apply_patch" => &["path"],
2259 _ => &["path", "cwd"],
2260 };
2261 keys.iter()
2262 .filter_map(|key| obj.get(*key))
2263 .filter_map(|value| value.as_str())
2264 .filter(|s| !s.trim().is_empty())
2265 .map(|raw| strip_glob_tokens(raw).to_string())
2266 .collect()
2267}
2268
2269fn strip_glob_tokens(path: &str) -> &str {
2270 let mut end = path.len();
2271 for (idx, ch) in path.char_indices() {
2272 if ch == '*' || ch == '?' || ch == '[' {
2273 end = idx;
2274 break;
2275 }
2276 }
2277 &path[..end]
2278}
2279
2280fn is_path_allowed_by_scopes(root: &str, candidate: &str, scopes: &[String]) -> bool {
2281 let root_path = PathBuf::from(root);
2282 let candidate_path = resolve_path(&root_path, candidate);
2283 scopes.iter().any(|scope| {
2284 let scope_path = resolve_path(&root_path, scope);
2285 candidate_path.starts_with(scope_path)
2286 })
2287}
2288
2289fn resolve_path(root: &Path, raw: &str) -> PathBuf {
2290 let raw = raw.trim();
2291 if raw.is_empty() {
2292 return root.to_path_buf();
2293 }
2294 let path = PathBuf::from(raw);
2295 if path.is_absolute() {
2296 path
2297 } else {
2298 root.join(path)
2299 }
2300}
2301
2302fn extract_url_host(args: &Value) -> Option<String> {
2303 let url = args
2304 .get("url")
2305 .or_else(|| args.get("uri"))
2306 .or_else(|| args.get("link"))
2307 .and_then(|v| v.as_str())?;
2308 let raw = url.trim();
2309 let (_, after_scheme) = raw.split_once("://")?;
2310 let host_port = after_scheme.split('/').next().unwrap_or_default();
2311 let host = host_port.split('@').next_back().unwrap_or_default();
2312 let host = host
2313 .split(':')
2314 .next()
2315 .unwrap_or_default()
2316 .to_ascii_lowercase();
2317 if host.is_empty() {
2318 None
2319 } else {
2320 Some(host)
2321 }
2322}
2323
2324pub fn emit_spawn_requested(state: &AppState, req: &SpawnRequest) {
2325 emit_spawn_requested_with_context(state, req, &SpawnEventContext::default());
2326}
2327
2328pub fn emit_spawn_denied(state: &AppState, req: &SpawnRequest, decision: &SpawnDecision) {
2329 emit_spawn_denied_with_context(state, req, decision, &SpawnEventContext::default());
2330}
2331
2332pub fn emit_spawn_approved(state: &AppState, req: &SpawnRequest, instance: &AgentInstance) {
2333 emit_spawn_approved_with_context(state, req, instance, &SpawnEventContext::default());
2334}
2335
2336#[derive(Default)]
2337pub struct SpawnEventContext<'a> {
2338 pub session_id: Option<&'a str>,
2339 pub message_id: Option<&'a str>,
2340 pub run_id: Option<&'a str>,
2341}
2342
2343pub fn emit_spawn_requested_with_context(
2344 state: &AppState,
2345 req: &SpawnRequest,
2346 ctx: &SpawnEventContext<'_>,
2347) {
2348 state.event_bus.publish(EngineEvent::new(
2349 "agent_team.spawn.requested",
2350 json!({
2351 "sessionID": ctx.session_id,
2352 "messageID": ctx.message_id,
2353 "runID": ctx.run_id,
2354 "missionID": req.mission_id,
2355 "instanceID": Value::Null,
2356 "parentInstanceID": req.parent_instance_id,
2357 "source": req.source,
2358 "requestedRole": req.role,
2359 "templateID": req.template_id,
2360 "justification": req.justification,
2361 "timestampMs": crate::now_ms(),
2362 }),
2363 ));
2364}
2365
2366pub fn emit_spawn_denied_with_context(
2367 state: &AppState,
2368 req: &SpawnRequest,
2369 decision: &SpawnDecision,
2370 ctx: &SpawnEventContext<'_>,
2371) {
2372 state.event_bus.publish(EngineEvent::new(
2373 "agent_team.spawn.denied",
2374 json!({
2375 "sessionID": ctx.session_id,
2376 "messageID": ctx.message_id,
2377 "runID": ctx.run_id,
2378 "missionID": req.mission_id,
2379 "instanceID": Value::Null,
2380 "parentInstanceID": req.parent_instance_id,
2381 "source": req.source,
2382 "requestedRole": req.role,
2383 "templateID": req.template_id,
2384 "code": decision.code,
2385 "error": decision.reason,
2386 "timestampMs": crate::now_ms(),
2387 }),
2388 ));
2389}
2390
2391pub fn emit_spawn_approved_with_context(
2392 state: &AppState,
2393 req: &SpawnRequest,
2394 instance: &AgentInstance,
2395 ctx: &SpawnEventContext<'_>,
2396) {
2397 state.event_bus.publish(EngineEvent::new(
2398 "agent_team.spawn.approved",
2399 json!({
2400 "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2401 "messageID": ctx.message_id,
2402 "runID": ctx.run_id.or(instance.run_id.as_deref()),
2403 "missionID": instance.mission_id,
2404 "instanceID": instance.instance_id,
2405 "parentInstanceID": instance.parent_instance_id,
2406 "source": req.source,
2407 "requestedRole": req.role,
2408 "templateID": instance.template_id,
2409 "skillHash": instance.skill_hash,
2410 "workspaceRoot": instance_workspace_root(instance),
2411 "workspaceRepoRoot": instance_workspace_repo_root(instance),
2412 "managedWorktree": instance_managed_worktree(instance),
2413 "timestampMs": crate::now_ms(),
2414 }),
2415 ));
2416 state.event_bus.publish(EngineEvent::new(
2417 "agent_team.instance.started",
2418 json!({
2419 "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2420 "messageID": ctx.message_id,
2421 "runID": ctx.run_id.or(instance.run_id.as_deref()),
2422 "missionID": instance.mission_id,
2423 "instanceID": instance.instance_id,
2424 "parentInstanceID": instance.parent_instance_id,
2425 "role": instance.role,
2426 "status": instance.status,
2427 "budgetLimit": instance.budget,
2428 "skillHash": instance.skill_hash,
2429 "workspaceRoot": instance_workspace_root(instance),
2430 "workspaceRepoRoot": instance_workspace_repo_root(instance),
2431 "managedWorktree": instance_managed_worktree(instance),
2432 "timestampMs": crate::now_ms(),
2433 }),
2434 ));
2435}
2436
2437pub fn emit_budget_usage(
2438 state: &AppState,
2439 instance: &AgentInstance,
2440 tokens_used: u64,
2441 steps_used: u32,
2442 tool_calls_used: u32,
2443 cost_used_usd: f64,
2444 elapsed_ms: u64,
2445) {
2446 state.event_bus.publish(EngineEvent::new(
2447 "agent_team.budget.usage",
2448 json!({
2449 "sessionID": instance.session_id,
2450 "messageID": Value::Null,
2451 "runID": instance.run_id,
2452 "missionID": instance.mission_id,
2453 "instanceID": instance.instance_id,
2454 "tokensUsed": tokens_used,
2455 "stepsUsed": steps_used,
2456 "toolCallsUsed": tool_calls_used,
2457 "costUsedUsd": cost_used_usd,
2458 "elapsedMs": elapsed_ms,
2459 "timestampMs": crate::now_ms(),
2460 }),
2461 ));
2462}
2463
2464pub fn emit_budget_exhausted(
2465 state: &AppState,
2466 instance: &AgentInstance,
2467 exhausted_by: &str,
2468 tokens_used: u64,
2469 steps_used: u32,
2470 tool_calls_used: u32,
2471 cost_used_usd: f64,
2472 elapsed_ms: u64,
2473) {
2474 state.event_bus.publish(EngineEvent::new(
2475 "agent_team.budget.exhausted",
2476 json!({
2477 "sessionID": instance.session_id,
2478 "messageID": Value::Null,
2479 "runID": instance.run_id,
2480 "missionID": instance.mission_id,
2481 "instanceID": instance.instance_id,
2482 "exhaustedBy": exhausted_by,
2483 "tokensUsed": tokens_used,
2484 "stepsUsed": steps_used,
2485 "toolCallsUsed": tool_calls_used,
2486 "costUsedUsd": cost_used_usd,
2487 "elapsedMs": elapsed_ms,
2488 "timestampMs": crate::now_ms(),
2489 }),
2490 ));
2491}
2492
2493pub fn emit_instance_cancelled(state: &AppState, instance: &AgentInstance, reason: &str) {
2494 state.event_bus.publish(EngineEvent::new(
2495 "agent_team.instance.cancelled",
2496 json!({
2497 "sessionID": instance.session_id,
2498 "messageID": Value::Null,
2499 "runID": instance.run_id,
2500 "missionID": instance.mission_id,
2501 "instanceID": instance.instance_id,
2502 "parentInstanceID": instance.parent_instance_id,
2503 "role": instance.role,
2504 "status": instance.status,
2505 "reason": reason,
2506 "workspaceRoot": instance_workspace_root(instance),
2507 "workspaceRepoRoot": instance_workspace_repo_root(instance),
2508 "managedWorktree": instance_managed_worktree(instance),
2509 "timestampMs": crate::now_ms(),
2510 }),
2511 ));
2512}
2513
2514pub fn emit_instance_completed(state: &AppState, instance: &AgentInstance) {
2515 state.event_bus.publish(EngineEvent::new(
2516 "agent_team.instance.completed",
2517 json!({
2518 "sessionID": instance.session_id,
2519 "messageID": Value::Null,
2520 "runID": instance.run_id,
2521 "missionID": instance.mission_id,
2522 "instanceID": instance.instance_id,
2523 "parentInstanceID": instance.parent_instance_id,
2524 "role": instance.role,
2525 "status": instance.status,
2526 "workspaceRoot": instance_workspace_root(instance),
2527 "workspaceRepoRoot": instance_workspace_repo_root(instance),
2528 "managedWorktree": instance_managed_worktree(instance),
2529 "timestampMs": crate::now_ms(),
2530 }),
2531 ));
2532}
2533
2534pub fn emit_instance_failed(state: &AppState, instance: &AgentInstance) {
2535 state.event_bus.publish(EngineEvent::new(
2536 "agent_team.instance.failed",
2537 json!({
2538 "sessionID": instance.session_id,
2539 "messageID": Value::Null,
2540 "runID": instance.run_id,
2541 "missionID": instance.mission_id,
2542 "instanceID": instance.instance_id,
2543 "parentInstanceID": instance.parent_instance_id,
2544 "role": instance.role,
2545 "status": instance.status,
2546 "workspaceRoot": instance_workspace_root(instance),
2547 "workspaceRepoRoot": instance_workspace_repo_root(instance),
2548 "managedWorktree": instance_managed_worktree(instance),
2549 "timestampMs": crate::now_ms(),
2550 }),
2551 ));
2552}
2553
2554pub fn emit_mission_budget_exhausted(
2555 state: &AppState,
2556 mission_id: &str,
2557 exhausted_by: &str,
2558 tokens_used: u64,
2559 steps_used: u64,
2560 tool_calls_used: u64,
2561 cost_used_usd: f64,
2562) {
2563 state.event_bus.publish(EngineEvent::new(
2564 "agent_team.mission.budget.exhausted",
2565 json!({
2566 "sessionID": Value::Null,
2567 "messageID": Value::Null,
2568 "runID": Value::Null,
2569 "missionID": mission_id,
2570 "instanceID": Value::Null,
2571 "exhaustedBy": exhausted_by,
2572 "tokensUsed": tokens_used,
2573 "stepsUsed": steps_used,
2574 "toolCallsUsed": tool_calls_used,
2575 "costUsedUsd": cost_used_usd,
2576 "timestampMs": crate::now_ms(),
2577 }),
2578 ));
2579}