1use tandem_orchestrator::{
2 AgentInstance, AgentInstanceStatus, AgentRole, AgentTemplate, BudgetLimit, SpawnDecision,
3 SpawnDenyCode, SpawnPolicy, SpawnRequest, SpawnSource,
4};
5use tandem_skills::SkillService;
6use tandem_types::{EngineEvent, Session};
7use tokio::fs;
8use tokio::sync::RwLock;
9use uuid::Uuid;
10
11use crate::AppState;
12
13#[derive(Clone, Default)]
14pub struct AgentTeamRuntime {
15 policy: Arc<RwLock<Option<SpawnPolicy>>>,
16 templates: Arc<RwLock<HashMap<String, AgentTemplate>>>,
17 instances: Arc<RwLock<HashMap<String, AgentInstance>>>,
18 budgets: Arc<RwLock<HashMap<String, InstanceBudgetState>>>,
19 mission_budgets: Arc<RwLock<HashMap<String, MissionBudgetState>>>,
20 spawn_approvals: Arc<RwLock<HashMap<String, PendingSpawnApproval>>>,
21 loaded_workspace: Arc<RwLock<Option<String>>>,
22 audit_path: Arc<RwLock<PathBuf>>,
23}
24
25#[derive(Debug, Clone)]
26pub struct SpawnResult {
27 pub decision: SpawnDecision,
28 pub instance: Option<AgentInstance>,
29}
30
31#[derive(Debug, Clone, Serialize)]
32pub struct AgentMissionSummary {
33 #[serde(rename = "missionID")]
34 pub mission_id: String,
35 #[serde(rename = "instanceCount")]
36 pub instance_count: usize,
37 #[serde(rename = "runningCount")]
38 pub running_count: usize,
39 #[serde(rename = "completedCount")]
40 pub completed_count: usize,
41 #[serde(rename = "failedCount")]
42 pub failed_count: usize,
43 #[serde(rename = "cancelledCount")]
44 pub cancelled_count: usize,
45 #[serde(rename = "queuedCount")]
46 pub queued_count: usize,
47 #[serde(rename = "tokenUsedTotal")]
48 pub token_used_total: u64,
49 #[serde(rename = "toolCallsUsedTotal")]
50 pub tool_calls_used_total: u64,
51 #[serde(rename = "stepsUsedTotal")]
52 pub steps_used_total: u64,
53 #[serde(rename = "costUsedUsdTotal")]
54 pub cost_used_usd_total: f64,
55}
56
57#[derive(Debug, Clone, Default)]
58struct InstanceBudgetState {
59 tokens_used: u64,
60 steps_used: u32,
61 tool_calls_used: u32,
62 cost_used_usd: f64,
63 started_at: Option<Instant>,
64 exhausted: bool,
65}
66
67#[derive(Debug, Clone, Default)]
68struct MissionBudgetState {
69 tokens_used: u64,
70 steps_used: u64,
71 tool_calls_used: u64,
72 cost_used_usd: f64,
73 exhausted: bool,
74}
75
76#[derive(Debug, Clone, Serialize)]
77pub struct PendingSpawnApproval {
78 #[serde(rename = "approvalID")]
79 pub approval_id: String,
80 #[serde(rename = "createdAtMs")]
81 pub created_at_ms: u64,
82 pub request: SpawnRequest,
83 #[serde(rename = "decisionCode")]
84 pub decision_code: Option<SpawnDenyCode>,
85 pub reason: Option<String>,
86}
87
88#[derive(Clone)]
89pub struct ServerSpawnAgentHook {
90 state: AppState,
91}
92
93#[derive(Debug, Deserialize)]
94struct SpawnAgentToolInput {
95 #[serde(rename = "missionID")]
96 mission_id: Option<String>,
97 #[serde(rename = "parentInstanceID")]
98 parent_instance_id: Option<String>,
99 #[serde(rename = "templateID")]
100 template_id: Option<String>,
101 role: AgentRole,
102 source: Option<SpawnSource>,
103 justification: String,
104 #[serde(rename = "budgetOverride", default)]
105 budget_override: Option<BudgetLimit>,
106}
107
108impl ServerSpawnAgentHook {
109 pub fn new(state: AppState) -> Self {
110 Self { state }
111 }
112}
113
114impl SpawnAgentHook for ServerSpawnAgentHook {
115 fn spawn_agent(
116 &self,
117 ctx: SpawnAgentToolContext,
118 ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>> {
119 let state = self.state.clone();
120 Box::pin(async move {
121 let parsed = serde_json::from_value::<SpawnAgentToolInput>(ctx.args.clone());
122 let input = match parsed {
123 Ok(input) => input,
124 Err(err) => {
125 return Ok(SpawnAgentToolResult {
126 output: format!("spawn_agent denied: invalid args ({err})"),
127 metadata: json!({
128 "ok": false,
129 "code": "SPAWN_INVALID_ARGS",
130 "error": err.to_string(),
131 }),
132 });
133 }
134 };
135 let req = SpawnRequest {
136 mission_id: input.mission_id,
137 parent_instance_id: input.parent_instance_id,
138 source: input.source.unwrap_or(SpawnSource::ToolCall),
139 parent_role: None,
140 role: input.role,
141 template_id: input.template_id,
142 justification: input.justification,
143 budget_override: input.budget_override,
144 };
145
146 let event_ctx = SpawnEventContext {
147 session_id: Some(ctx.session_id.as_str()),
148 message_id: Some(ctx.message_id.as_str()),
149 run_id: None,
150 };
151 emit_spawn_requested_with_context(&state, &req, &event_ctx);
152 let result = state.agent_teams.spawn(&state, req.clone()).await;
153 if !result.decision.allowed || result.instance.is_none() {
154 emit_spawn_denied_with_context(&state, &req, &result.decision, &event_ctx);
155 return Ok(SpawnAgentToolResult {
156 output: result
157 .decision
158 .reason
159 .clone()
160 .unwrap_or_else(|| "spawn_agent denied".to_string()),
161 metadata: json!({
162 "ok": false,
163 "code": result.decision.code,
164 "error": result.decision.reason,
165 "requiresUserApproval": result.decision.requires_user_approval,
166 }),
167 });
168 }
169 let instance = result.instance.expect("checked is_some");
170 emit_spawn_approved_with_context(&state, &req, &instance, &event_ctx);
171 Ok(SpawnAgentToolResult {
172 output: format!(
173 "spawned {} as instance {} (session {})",
174 instance.template_id, instance.instance_id, instance.session_id
175 ),
176 metadata: json!({
177 "ok": true,
178 "missionID": instance.mission_id,
179 "instanceID": instance.instance_id,
180 "sessionID": instance.session_id,
181 "runID": instance.run_id,
182 "status": instance.status,
183 "skillHash": instance.skill_hash,
184 "workspaceRoot": instance_workspace_root(&instance),
185 "workspaceRepoRoot": instance_workspace_repo_root(&instance),
186 "managedWorktree": instance_managed_worktree(&instance),
187 }),
188 })
189 })
190 }
191}
192
193#[derive(Clone)]
194pub struct ServerToolPolicyHook {
195 state: AppState,
196}
197
198impl ServerToolPolicyHook {
199 pub fn new(state: AppState) -> Self {
200 Self { state }
201 }
202}
203
204fn automation_tool_target_paths(tool: &str, args: &Value) -> Vec<String> {
205 let mut paths = Vec::new();
206 match tool {
207 "write" | "edit" => {
208 if let Some(path) = args.get("path").and_then(Value::as_str) {
209 let trimmed = path.trim();
210 if !trimmed.is_empty() {
211 paths.push(trimmed.to_string());
212 }
213 }
214 }
215 "apply_patch" => {
216 let patch = args
217 .get("patchText")
218 .and_then(Value::as_str)
219 .or_else(|| args.as_str());
220 if let Some(patch) = patch {
221 for line in patch.lines() {
222 for prefix in [
223 "*** Update File: ",
224 "*** Delete File: ",
225 "*** Add File: ",
226 "*** Move to: ",
227 ] {
228 if let Some(path) = line.strip_prefix(prefix) {
229 let trimmed = path.trim();
230 if !trimmed.is_empty() {
231 paths.push(trimmed.to_string());
232 }
233 }
234 }
235 }
236 }
237 }
238 _ => {}
239 }
240 paths.sort();
241 paths.dedup();
242 paths
243}
244
245fn automation_path_references_read_only_source_of_truth(
246 path: &str,
247 read_only_names: &std::collections::HashSet<String>,
248 workspace_root: Option<&str>,
249) -> bool {
250 let trimmed = path.trim().trim_matches('`');
251 if trimmed.is_empty() {
252 return false;
253 }
254 let lowered = trimmed.to_ascii_lowercase();
255 if read_only_names.contains(&lowered) {
256 return true;
257 }
258 if let Some(filename) = std::path::Path::new(trimmed)
259 .file_name()
260 .and_then(|value| value.to_str())
261 {
262 if read_only_names.contains(&filename.to_ascii_lowercase()) {
263 return true;
264 }
265 }
266 workspace_root
267 .and_then(|root| {
268 crate::app::state::automation::normalize_workspace_display_path(root, trimmed)
269 })
270 .is_some_and(|normalized| {
271 let normalized_lower = normalized.to_ascii_lowercase();
272 if read_only_names.contains(&normalized_lower) {
273 return true;
274 }
275 std::path::Path::new(&normalized)
276 .file_name()
277 .and_then(|value| value.to_str())
278 .is_some_and(|filename| read_only_names.contains(&filename.to_ascii_lowercase()))
279 })
280}
281
282async fn evaluate_automation_read_only_write_deny(
283 state: &AppState,
284 session_id: &str,
285 tool: &str,
286 args: &Value,
287) -> Option<String> {
288 if !matches!(tool, "write" | "edit" | "apply_patch") {
289 return None;
290 }
291 let run_id = state
292 .automation_v2_session_runs
293 .read()
294 .await
295 .get(session_id)
296 .cloned()?;
297 let run = state.get_automation_v2_run(&run_id).await?;
298 let automation = run.automation_snapshot?;
299 let read_only_names =
300 crate::app::state::automation::enforcement::automation_read_only_source_of_truth_name_variants_for_automation(
301 &automation,
302 );
303 if read_only_names.is_empty() {
304 return None;
305 }
306 let workspace_root = args
307 .get("__workspace_root")
308 .and_then(Value::as_str)
309 .or(automation.workspace_root.as_deref());
310 let blocked_paths = automation_tool_target_paths(tool, args)
311 .into_iter()
312 .filter(|path| {
313 automation_path_references_read_only_source_of_truth(
314 path,
315 &read_only_names,
316 workspace_root,
317 )
318 })
319 .collect::<Vec<_>>();
320 if blocked_paths.is_empty() {
321 None
322 } else {
323 Some(format!(
324 "write denied for automation `{}` (run `{}`): read-only source-of-truth file(s) cannot be mutated: {}",
325 automation.automation_id,
326 run_id,
327 blocked_paths.join(", ")
328 ))
329 }
330}
331
332impl ToolPolicyHook for ServerToolPolicyHook {
333 fn evaluate_tool(
334 &self,
335 ctx: ToolPolicyContext,
336 ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>> {
337 let state = self.state.clone();
338 Box::pin(async move {
339 let tool = normalize_tool_name(&ctx.tool);
340 if let Some(policy) = state.routine_session_policy(&ctx.session_id).await {
341 let allowed_patterns = policy
342 .allowed_tools
343 .iter()
344 .map(|name| normalize_tool_name(name))
345 .collect::<Vec<_>>();
346 if !policy.allowed_tools.is_empty() && !any_policy_matches(&allowed_patterns, &tool)
347 {
348 let reason = format!(
349 "tool `{}` is not allowed for routine `{}` (run `{}`)",
350 tool, policy.routine_id, policy.run_id
351 );
352 state.event_bus.publish(EngineEvent::new(
353 "routine.tool.denied",
354 json!({
355 "sessionID": ctx.session_id,
356 "messageID": ctx.message_id,
357 "runID": policy.run_id,
358 "routineID": policy.routine_id,
359 "tool": tool,
360 "reason": reason,
361 "timestampMs": crate::now_ms(),
362 }),
363 ));
364 return Ok(ToolPolicyDecision {
365 allowed: false,
366 reason: Some(reason),
367 });
368 }
369 }
370
371 if let Some(reason) =
372 evaluate_automation_read_only_write_deny(&state, &ctx.session_id, &tool, &ctx.args)
373 .await
374 {
375 state.event_bus.publish(EngineEvent::new(
376 "automation.read_only_write.denied",
377 json!({
378 "sessionID": ctx.session_id,
379 "messageID": ctx.message_id,
380 "tool": tool,
381 "reason": reason,
382 "timestampMs": crate::now_ms(),
383 }),
384 ));
385 return Ok(ToolPolicyDecision {
386 allowed: false,
387 reason: Some(reason),
388 });
389 }
390
391 let Some(instance) = state
392 .agent_teams
393 .instance_for_session(&ctx.session_id)
394 .await
395 else {
396 return Ok(ToolPolicyDecision {
397 allowed: true,
398 reason: None,
399 });
400 };
401 let caps = instance.capabilities.clone();
402 let deny = evaluate_capability_deny(
403 &state,
404 &instance,
405 &tool,
406 &ctx.args,
407 &caps,
408 &ctx.session_id,
409 &ctx.message_id,
410 )
411 .await;
412 if let Some(reason) = deny {
413 state.event_bus.publish(EngineEvent::new(
414 "agent_team.capability.denied",
415 json!({
416 "sessionID": ctx.session_id,
417 "messageID": ctx.message_id,
418 "runID": instance.run_id,
419 "missionID": instance.mission_id,
420 "instanceID": instance.instance_id,
421 "tool": tool,
422 "reason": reason,
423 "timestampMs": crate::now_ms(),
424 }),
425 ));
426 return Ok(ToolPolicyDecision {
427 allowed: false,
428 reason: Some(reason),
429 });
430 }
431 Ok(ToolPolicyDecision {
432 allowed: true,
433 reason: None,
434 })
435 })
436 }
437}
438
439impl AgentTeamRuntime {
440 pub fn new(audit_path: PathBuf) -> Self {
441 Self {
442 policy: Arc::new(RwLock::new(None)),
443 templates: Arc::new(RwLock::new(HashMap::new())),
444 instances: Arc::new(RwLock::new(HashMap::new())),
445 budgets: Arc::new(RwLock::new(HashMap::new())),
446 mission_budgets: Arc::new(RwLock::new(HashMap::new())),
447 spawn_approvals: Arc::new(RwLock::new(HashMap::new())),
448 loaded_workspace: Arc::new(RwLock::new(None)),
449 audit_path: Arc::new(RwLock::new(audit_path)),
450 }
451 }
452
453 pub async fn set_audit_path(&self, path: PathBuf) {
454 *self.audit_path.write().await = path;
455 }
456
457 pub async fn list_templates(&self) -> Vec<AgentTemplate> {
458 let mut rows = self
459 .templates
460 .read()
461 .await
462 .values()
463 .cloned()
464 .collect::<Vec<_>>();
465 rows.sort_by(|a, b| a.template_id.cmp(&b.template_id));
466 rows
467 }
468
469 async fn templates_dir_for_loaded_workspace(&self) -> anyhow::Result<PathBuf> {
470 let workspace = self
471 .loaded_workspace
472 .read()
473 .await
474 .clone()
475 .ok_or_else(|| anyhow::anyhow!("agent team workspace not loaded"))?;
476 Ok(PathBuf::from(workspace)
477 .join(".tandem")
478 .join("agent-team")
479 .join("templates"))
480 }
481
482 fn template_filename(template_id: &str) -> String {
483 let safe = template_id
484 .trim()
485 .chars()
486 .map(|ch| {
487 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
488 ch
489 } else {
490 '_'
491 }
492 })
493 .collect::<String>();
494 let fallback = if safe.is_empty() {
495 "template".to_string()
496 } else {
497 safe
498 };
499 format!("{fallback}.yaml")
500 }
501
502 pub async fn upsert_template(
503 &self,
504 workspace_root: &str,
505 template: AgentTemplate,
506 ) -> anyhow::Result<AgentTemplate> {
507 self.ensure_loaded_for_workspace(workspace_root).await?;
508 let templates_dir = self.templates_dir_for_loaded_workspace().await?;
509 fs::create_dir_all(&templates_dir).await?;
510 let path = templates_dir.join(Self::template_filename(&template.template_id));
511 let payload = serde_yaml::to_string(&template)?;
512 fs::write(path, payload).await?;
513 self.templates
514 .write()
515 .await
516 .insert(template.template_id.clone(), template.clone());
517 Ok(template)
518 }
519
520 pub async fn delete_template(
521 &self,
522 workspace_root: &str,
523 template_id: &str,
524 ) -> anyhow::Result<bool> {
525 self.ensure_loaded_for_workspace(workspace_root).await?;
526 let templates_dir = self.templates_dir_for_loaded_workspace().await?;
527 let path = templates_dir.join(Self::template_filename(template_id));
528 let existed = self.templates.write().await.remove(template_id).is_some();
529 if path.exists() {
530 let _ = fs::remove_file(path).await;
531 }
532 Ok(existed)
533 }
534
535 pub async fn get_template_for_workspace(
536 &self,
537 workspace_root: &str,
538 template_id: &str,
539 ) -> anyhow::Result<Option<AgentTemplate>> {
540 self.ensure_loaded_for_workspace(workspace_root).await?;
541 Ok(self.templates.read().await.get(template_id).cloned())
542 }
543
544 pub async fn list_instances(
545 &self,
546 mission_id: Option<&str>,
547 parent_instance_id: Option<&str>,
548 status: Option<AgentInstanceStatus>,
549 ) -> Vec<AgentInstance> {
550 let mut rows = self
551 .instances
552 .read()
553 .await
554 .values()
555 .filter(|instance| {
556 if let Some(mission_id) = mission_id {
557 if instance.mission_id != mission_id {
558 return false;
559 }
560 }
561 if let Some(parent_id) = parent_instance_id {
562 if instance.parent_instance_id.as_deref() != Some(parent_id) {
563 return false;
564 }
565 }
566 if let Some(status) = &status {
567 if &instance.status != status {
568 return false;
569 }
570 }
571 true
572 })
573 .cloned()
574 .collect::<Vec<_>>();
575 rows.sort_by(|a, b| a.instance_id.cmp(&b.instance_id));
576 rows
577 }
578
579 pub async fn list_mission_summaries(&self) -> Vec<AgentMissionSummary> {
580 let instances = self.instances.read().await;
581 let mut by_mission: HashMap<String, AgentMissionSummary> = HashMap::new();
582 for instance in instances.values() {
583 let row = by_mission
584 .entry(instance.mission_id.clone())
585 .or_insert_with(|| AgentMissionSummary {
586 mission_id: instance.mission_id.clone(),
587 instance_count: 0,
588 running_count: 0,
589 completed_count: 0,
590 failed_count: 0,
591 cancelled_count: 0,
592 queued_count: 0,
593 token_used_total: 0,
594 tool_calls_used_total: 0,
595 steps_used_total: 0,
596 cost_used_usd_total: 0.0,
597 });
598 row.instance_count = row.instance_count.saturating_add(1);
599 match instance.status {
600 AgentInstanceStatus::Queued => {
601 row.queued_count = row.queued_count.saturating_add(1)
602 }
603 AgentInstanceStatus::Running => {
604 row.running_count = row.running_count.saturating_add(1)
605 }
606 AgentInstanceStatus::Completed => {
607 row.completed_count = row.completed_count.saturating_add(1)
608 }
609 AgentInstanceStatus::Failed => {
610 row.failed_count = row.failed_count.saturating_add(1)
611 }
612 AgentInstanceStatus::Cancelled => {
613 row.cancelled_count = row.cancelled_count.saturating_add(1)
614 }
615 }
616 if let Some(usage) = instance
617 .metadata
618 .as_ref()
619 .and_then(|m| m.get("budgetUsage"))
620 .and_then(|u| u.as_object())
621 {
622 row.token_used_total = row.token_used_total.saturating_add(
623 usage
624 .get("tokensUsed")
625 .and_then(|v| v.as_u64())
626 .unwrap_or(0),
627 );
628 row.tool_calls_used_total = row.tool_calls_used_total.saturating_add(
629 usage
630 .get("toolCallsUsed")
631 .and_then(|v| v.as_u64())
632 .unwrap_or(0),
633 );
634 row.steps_used_total = row
635 .steps_used_total
636 .saturating_add(usage.get("stepsUsed").and_then(|v| v.as_u64()).unwrap_or(0));
637 row.cost_used_usd_total += usage
638 .get("costUsedUsd")
639 .and_then(|v| v.as_f64())
640 .unwrap_or(0.0);
641 }
642 }
643 let mut rows = by_mission.into_values().collect::<Vec<_>>();
644 rows.sort_by(|a, b| a.mission_id.cmp(&b.mission_id));
645 rows
646 }
647
648 pub async fn instance_for_session(&self, session_id: &str) -> Option<AgentInstance> {
649 self.instances
650 .read()
651 .await
652 .values()
653 .find(|instance| instance.session_id == session_id)
654 .cloned()
655 }
656
657 pub async fn list_spawn_approvals(&self) -> Vec<PendingSpawnApproval> {
658 let mut rows = self
659 .spawn_approvals
660 .read()
661 .await
662 .values()
663 .cloned()
664 .collect::<Vec<_>>();
665 rows.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
666 rows
667 }
668
669 pub async fn ensure_loaded_for_workspace(&self, workspace_root: &str) -> anyhow::Result<()> {
670 let normalized = workspace_root.trim().to_string();
671 let already_loaded = self
672 .loaded_workspace
673 .read()
674 .await
675 .as_ref()
676 .map(|s| s == &normalized)
677 .unwrap_or(false);
678 if already_loaded {
679 return Ok(());
680 }
681
682 let root = PathBuf::from(&normalized);
683 let policy_path = root
684 .join(".tandem")
685 .join("agent-team")
686 .join("spawn-policy.yaml");
687 let templates_dir = root.join(".tandem").join("agent-team").join("templates");
688
689 let mut next_policy = None;
690 if policy_path.exists() {
691 let raw = fs::read_to_string(&policy_path)
692 .await
693 .with_context(|| format!("failed reading {}", policy_path.display()))?;
694 let parsed = serde_yaml::from_str::<SpawnPolicy>(&raw)
695 .with_context(|| format!("failed parsing {}", policy_path.display()))?;
696 next_policy = Some(parsed);
697 }
698
699 let mut next_templates = HashMap::new();
700 if templates_dir.exists() {
701 let mut entries = fs::read_dir(&templates_dir).await?;
702 while let Some(entry) = entries.next_entry().await? {
703 let path = entry.path();
704 if !path.is_file() {
705 continue;
706 }
707 let ext = path
708 .extension()
709 .and_then(|v| v.to_str())
710 .unwrap_or_default()
711 .to_ascii_lowercase();
712 if ext != "yaml" && ext != "yml" && ext != "json" {
713 continue;
714 }
715 let raw = fs::read_to_string(&path).await?;
716 let template = serde_yaml::from_str::<AgentTemplate>(&raw)
717 .with_context(|| format!("failed parsing {}", path.display()))?;
718 next_templates.insert(template.template_id.clone(), template);
719 }
720 }
721
722 *self.policy.write().await = next_policy;
723 *self.templates.write().await = next_templates;
724 *self.loaded_workspace.write().await = Some(normalized);
725 Ok(())
726 }
727
728 pub async fn spawn(&self, state: &AppState, req: SpawnRequest) -> SpawnResult {
729 self.spawn_with_approval_override(state, req, false).await
730 }
731
732 async fn spawn_with_approval_override(
733 &self,
734 state: &AppState,
735 mut req: SpawnRequest,
736 approval_override: bool,
737 ) -> SpawnResult {
738 let workspace_root = state.workspace_index.snapshot().await.root;
739 if let Err(err) = self.ensure_loaded_for_workspace(&workspace_root).await {
740 return SpawnResult {
741 decision: SpawnDecision {
742 allowed: false,
743 code: Some(SpawnDenyCode::SpawnPolicyMissing),
744 reason: Some(format!("spawn policy load failed: {}", err)),
745 requires_user_approval: false,
746 },
747 instance: None,
748 };
749 }
750
751 let Some(policy) = self.policy.read().await.clone() else {
752 return SpawnResult {
753 decision: SpawnDecision {
754 allowed: false,
755 code: Some(SpawnDenyCode::SpawnPolicyMissing),
756 reason: Some("spawn policy file missing".to_string()),
757 requires_user_approval: false,
758 },
759 instance: None,
760 };
761 };
762
763 let template = {
764 let templates = self.templates.read().await;
765 req.template_id
766 .as_deref()
767 .and_then(|template_id| templates.get(template_id).cloned())
768 };
769 if req.template_id.is_none() {
770 if let Some(found) = self
771 .templates
772 .read()
773 .await
774 .values()
775 .find(|t| t.role == req.role)
776 .cloned()
777 {
778 req.template_id = Some(found.template_id.clone());
779 }
780 }
781 let template = if template.is_some() {
782 template
783 } else {
784 let templates = self.templates.read().await;
785 req.template_id
786 .as_deref()
787 .and_then(|id| templates.get(id).cloned())
788 };
789
790 if req.parent_role.is_none() {
791 if let Some(parent_id) = req.parent_instance_id.as_deref() {
792 let instances = self.instances.read().await;
793 req.parent_role = instances
794 .get(parent_id)
795 .map(|instance| instance.role.clone());
796 }
797 }
798
799 let instances = self.instances.read().await;
800 let total_agents = instances.len();
801 let running_agents = instances
802 .values()
803 .filter(|instance| instance.status == AgentInstanceStatus::Running)
804 .count();
805 drop(instances);
806
807 let mut decision = policy.evaluate(&req, total_agents, running_agents, template.as_ref());
808 if approval_override
809 && !decision.allowed
810 && decision.requires_user_approval
811 && matches!(decision.code, Some(SpawnDenyCode::SpawnRequiresApproval))
812 {
813 decision = SpawnDecision {
814 allowed: true,
815 code: None,
816 reason: None,
817 requires_user_approval: false,
818 };
819 }
820 if !decision.allowed {
821 if decision.requires_user_approval && !approval_override {
822 self.queue_spawn_approval(&req, &decision).await;
823 }
824 return SpawnResult {
825 decision,
826 instance: None,
827 };
828 }
829
830 let mission_id = req
831 .mission_id
832 .clone()
833 .unwrap_or_else(|| "mission-default".to_string());
834
835 if let Some(reason) = self
836 .mission_budget_exceeded_reason(&policy, &mission_id)
837 .await
838 {
839 return SpawnResult {
840 decision: SpawnDecision {
841 allowed: false,
842 code: Some(SpawnDenyCode::SpawnMissionBudgetExceeded),
843 reason: Some(reason),
844 requires_user_approval: false,
845 },
846 instance: None,
847 };
848 }
849
850 let template = template.unwrap_or_else(|| AgentTemplate {
851 template_id: "default-template".to_string(),
852 display_name: None,
853 avatar_url: None,
854 role: req.role.clone(),
855 system_prompt: None,
856 default_model: None,
857 skills: Vec::new(),
858 default_budget: BudgetLimit::default(),
859 capabilities: Default::default(),
860 });
861
862 let skill_hash = match compute_skill_hash(&workspace_root, &template, &policy).await {
863 Ok(hash) => hash,
864 Err(err) => {
865 let lowered = err.to_ascii_lowercase();
866 let code = if lowered.contains("pinned hash mismatch") {
867 SpawnDenyCode::SpawnSkillHashMismatch
868 } else if lowered.contains("skill source denied") {
869 SpawnDenyCode::SpawnSkillSourceDenied
870 } else {
871 SpawnDenyCode::SpawnRequiredSkillMissing
872 };
873 return SpawnResult {
874 decision: SpawnDecision {
875 allowed: false,
876 code: Some(code),
877 reason: Some(err),
878 requires_user_approval: false,
879 },
880 instance: None,
881 };
882 }
883 };
884
885 let parent_snapshot = {
886 let instances = self.instances.read().await;
887 req.parent_instance_id
888 .as_deref()
889 .and_then(|id| instances.get(id).cloned())
890 };
891 let parent_usage = if let Some(parent_id) = req.parent_instance_id.as_deref() {
892 self.budgets.read().await.get(parent_id).cloned()
893 } else {
894 None
895 };
896
897 let budget = resolve_budget(
898 &policy,
899 parent_snapshot,
900 parent_usage,
901 &template,
902 req.budget_override.clone(),
903 &req.role,
904 );
905
906 let instance_id = format!("ins_{}", Uuid::new_v4().simple());
907 let managed_worktree = prepare_agent_instance_workspace(
908 state,
909 &workspace_root,
910 req.mission_id.as_deref(),
911 &instance_id,
912 &template.template_id,
913 )
914 .await;
915 let workspace_repo_root = managed_worktree
916 .as_ref()
917 .map(|row| row.record.repo_root.clone())
918 .or_else(|| crate::runtime::worktrees::resolve_git_repo_root(&workspace_root))
919 .unwrap_or_else(|| workspace_root.clone());
920 let worker_workspace_root = managed_worktree
921 .as_ref()
922 .map(|row| row.record.path.clone())
923 .unwrap_or_else(|| workspace_root.clone());
924 let mut session = Session::new(
925 Some(format!("Agent Team {}", template.template_id)),
926 Some(worker_workspace_root.clone()),
927 );
928 session.workspace_root = Some(worker_workspace_root.clone());
929 let session_id = session.id.clone();
930 if let Err(err) = state.storage.save_session(session).await {
931 if let Some(worktree) = managed_worktree.as_ref() {
932 let _ = crate::runtime::worktrees::delete_managed_worktree(state, &worktree.record)
933 .await;
934 }
935 return SpawnResult {
936 decision: SpawnDecision {
937 allowed: false,
938 code: Some(SpawnDenyCode::SpawnPolicyMissing),
939 reason: Some(format!("failed creating child session: {}", err)),
940 requires_user_approval: false,
941 },
942 instance: None,
943 };
944 }
945
946 let instance = AgentInstance {
947 instance_id: instance_id.clone(),
948 mission_id: mission_id.clone(),
949 parent_instance_id: req.parent_instance_id.clone(),
950 role: template.role.clone(),
951 template_id: template.template_id.clone(),
952 session_id: session_id.clone(),
953 run_id: None,
954 status: AgentInstanceStatus::Running,
955 budget,
956 skill_hash: skill_hash.clone(),
957 capabilities: template.capabilities.clone(),
958 metadata: Some(json!({
959 "source": req.source,
960 "justification": req.justification,
961 "workspaceRoot": worker_workspace_root,
962 "workspaceRepoRoot": workspace_repo_root,
963 "managedWorktree": managed_worktree.as_ref().map(|row| json!({
964 "path": row.record.path,
965 "branch": row.record.branch,
966 "repoRoot": row.record.repo_root,
967 "cleanupBranch": row.record.cleanup_branch,
968 "reused": row.reused,
969 })).unwrap_or(Value::Null),
970 })),
971 };
972
973 self.instances
974 .write()
975 .await
976 .insert(instance.instance_id.clone(), instance.clone());
977 self.budgets.write().await.insert(
978 instance.instance_id.clone(),
979 InstanceBudgetState {
980 started_at: Some(Instant::now()),
981 ..InstanceBudgetState::default()
982 },
983 );
984 let _ = self.append_audit("spawn.approved", &instance).await;
985
986 SpawnResult {
987 decision: SpawnDecision {
988 allowed: true,
989 code: None,
990 reason: None,
991 requires_user_approval: false,
992 },
993 instance: Some(instance),
994 }
995 }
996
997 pub async fn approve_spawn_approval(
998 &self,
999 state: &AppState,
1000 approval_id: &str,
1001 reason: Option<&str>,
1002 ) -> Option<SpawnResult> {
1003 let approval = self.spawn_approvals.write().await.remove(approval_id)?;
1004 let result = self
1005 .spawn_with_approval_override(state, approval.request.clone(), true)
1006 .await;
1007 if let Some(instance) = result.instance.as_ref() {
1008 let note = reason.unwrap_or("approved by operator");
1009 let _ = self
1010 .append_approval_audit("spawn.approval.approved", approval_id, Some(instance), note)
1011 .await;
1012 } else {
1013 let note = reason.unwrap_or("approval replay failed policy checks");
1014 let _ = self
1015 .append_approval_audit("spawn.approval.rejected_on_replay", approval_id, None, note)
1016 .await;
1017 }
1018 Some(result)
1019 }
1020
1021 pub async fn deny_spawn_approval(
1022 &self,
1023 approval_id: &str,
1024 reason: Option<&str>,
1025 ) -> Option<PendingSpawnApproval> {
1026 let approval = self.spawn_approvals.write().await.remove(approval_id)?;
1027 let note = reason.unwrap_or("denied by operator");
1028 let _ = self
1029 .append_approval_audit("spawn.approval.denied", approval_id, None, note)
1030 .await;
1031 Some(approval)
1032 }
1033
1034 pub async fn cancel_instance(
1035 &self,
1036 state: &AppState,
1037 instance_id: &str,
1038 reason: &str,
1039 ) -> Option<AgentInstance> {
1040 let mut instances = self.instances.write().await;
1041 let instance = instances.get_mut(instance_id)?;
1042 if matches!(
1043 instance.status,
1044 AgentInstanceStatus::Completed
1045 | AgentInstanceStatus::Failed
1046 | AgentInstanceStatus::Cancelled
1047 ) {
1048 return Some(instance.clone());
1049 }
1050 instance.status = AgentInstanceStatus::Cancelled;
1051 let snapshot = instance.clone();
1052 drop(instances);
1053 let _ = state.cancellations.cancel(&snapshot.session_id).await;
1054 cleanup_instance_managed_worktree(state, &snapshot).await;
1055 let _ = self.append_audit("instance.cancelled", &snapshot).await;
1056 emit_instance_cancelled(state, &snapshot, reason);
1057 Some(snapshot)
1058 }
1059
1060 async fn queue_spawn_approval(&self, req: &SpawnRequest, decision: &SpawnDecision) {
1061 let approval = PendingSpawnApproval {
1062 approval_id: format!("spawn_{}", Uuid::new_v4().simple()),
1063 created_at_ms: crate::now_ms(),
1064 request: req.clone(),
1065 decision_code: decision.code.clone(),
1066 reason: decision.reason.clone(),
1067 };
1068 self.spawn_approvals
1069 .write()
1070 .await
1071 .insert(approval.approval_id.clone(), approval);
1072 }
1073
1074 async fn mission_budget_exceeded_reason(
1075 &self,
1076 policy: &SpawnPolicy,
1077 mission_id: &str,
1078 ) -> Option<String> {
1079 let Some(limit) = policy.mission_total_budget.as_ref() else {
1080 return None;
1081 };
1082 let usage = self
1083 .mission_budgets
1084 .read()
1085 .await
1086 .get(mission_id)
1087 .cloned()
1088 .unwrap_or_default();
1089 if let Some(max) = limit.max_tokens {
1090 if usage.tokens_used >= max {
1091 return Some(format!(
1092 "mission max_tokens exhausted ({}/{})",
1093 usage.tokens_used, max
1094 ));
1095 }
1096 }
1097 if let Some(max) = limit.max_steps {
1098 if usage.steps_used >= u64::from(max) {
1099 return Some(format!(
1100 "mission max_steps exhausted ({}/{})",
1101 usage.steps_used, max
1102 ));
1103 }
1104 }
1105 if let Some(max) = limit.max_tool_calls {
1106 if usage.tool_calls_used >= u64::from(max) {
1107 return Some(format!(
1108 "mission max_tool_calls exhausted ({}/{})",
1109 usage.tool_calls_used, max
1110 ));
1111 }
1112 }
1113 if let Some(max) = limit.max_cost_usd {
1114 if usage.cost_used_usd >= max {
1115 return Some(format!(
1116 "mission max_cost_usd exhausted ({:.6}/{:.6})",
1117 usage.cost_used_usd, max
1118 ));
1119 }
1120 }
1121 None
1122 }
1123
1124 pub async fn cancel_mission(&self, state: &AppState, mission_id: &str, reason: &str) -> usize {
1125 let instance_ids = self
1126 .instances
1127 .read()
1128 .await
1129 .values()
1130 .filter(|instance| instance.mission_id == mission_id)
1131 .map(|instance| instance.instance_id.clone())
1132 .collect::<Vec<_>>();
1133 let mut count = 0usize;
1134 for instance_id in instance_ids {
1135 if self
1136 .cancel_instance(state, &instance_id, reason)
1137 .await
1138 .is_some()
1139 {
1140 count = count.saturating_add(1);
1141 }
1142 }
1143 count
1144 }
1145
1146 async fn mark_instance_terminal(
1147 &self,
1148 state: &AppState,
1149 instance_id: &str,
1150 status: AgentInstanceStatus,
1151 ) -> Option<AgentInstance> {
1152 let mut instances = self.instances.write().await;
1153 let instance = instances.get_mut(instance_id)?;
1154 if matches!(
1155 instance.status,
1156 AgentInstanceStatus::Completed
1157 | AgentInstanceStatus::Failed
1158 | AgentInstanceStatus::Cancelled
1159 ) {
1160 return Some(instance.clone());
1161 }
1162 instance.status = status.clone();
1163 let snapshot = instance.clone();
1164 drop(instances);
1165 cleanup_instance_managed_worktree(state, &snapshot).await;
1166 match status {
1167 AgentInstanceStatus::Completed => emit_instance_completed(state, &snapshot),
1168 AgentInstanceStatus::Failed => emit_instance_failed(state, &snapshot),
1169 _ => {}
1170 }
1171 Some(snapshot)
1172 }
1173
1174 pub async fn handle_engine_event(&self, state: &AppState, event: &EngineEvent) {
1175 let Some(session_id) = extract_session_id(event) else {
1176 return;
1177 };
1178 let Some(instance_id) = self.instance_id_for_session(&session_id).await else {
1179 return;
1180 };
1181 if event.event_type == "provider.usage" {
1182 let total_tokens = event
1183 .properties
1184 .get("totalTokens")
1185 .and_then(|v| v.as_u64())
1186 .unwrap_or(0);
1187 let cost_used_usd = event
1188 .properties
1189 .get("costUsd")
1190 .and_then(|v| v.as_f64())
1191 .unwrap_or(0.0);
1192 if total_tokens > 0 {
1193 let exhausted = self
1194 .apply_exact_token_usage(state, &instance_id, total_tokens, cost_used_usd)
1195 .await;
1196 if exhausted {
1197 let _ = self
1198 .cancel_instance(state, &instance_id, "budget exhausted")
1199 .await;
1200 }
1201 }
1202 return;
1203 }
1204 let mut delta_tokens = 0u64;
1205 let mut delta_steps = 0u32;
1206 let mut delta_tool_calls = 0u32;
1207 if event.event_type == "message.part.updated" {
1208 if let Some(part) = event.properties.get("part") {
1209 let part_type = part.get("type").and_then(|v| v.as_str()).unwrap_or("");
1210 if part_type == "tool-invocation" {
1211 delta_tool_calls = 1;
1212 } else if part_type == "text" {
1213 let delta = event
1214 .properties
1215 .get("delta")
1216 .and_then(|v| v.as_str())
1217 .unwrap_or("");
1218 if !delta.is_empty() {
1219 delta_tokens = estimate_tokens(delta);
1220 }
1221 }
1222 }
1223 } else if event.event_type == "session.run.finished" {
1224 delta_steps = 1;
1225 let run_status = event
1226 .properties
1227 .get("status")
1228 .and_then(|v| v.as_str())
1229 .unwrap_or("")
1230 .to_ascii_lowercase();
1231 if run_status == "completed" {
1232 let _ = self
1233 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Completed)
1234 .await;
1235 } else if run_status == "failed" || run_status == "error" {
1236 let _ = self
1237 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Failed)
1238 .await;
1239 }
1240 }
1241 if delta_tokens == 0 && delta_steps == 0 && delta_tool_calls == 0 {
1242 return;
1243 }
1244 let exhausted = self
1245 .apply_budget_delta(
1246 state,
1247 &instance_id,
1248 delta_tokens,
1249 delta_steps,
1250 delta_tool_calls,
1251 )
1252 .await;
1253 if exhausted {
1254 let _ = self
1255 .cancel_instance(state, &instance_id, "budget exhausted")
1256 .await;
1257 }
1258 }
1259
1260 async fn instance_id_for_session(&self, session_id: &str) -> Option<String> {
1261 self.instances
1262 .read()
1263 .await
1264 .values()
1265 .find(|instance| instance.session_id == session_id)
1266 .map(|instance| instance.instance_id.clone())
1267 }
1268
1269 async fn apply_budget_delta(
1270 &self,
1271 state: &AppState,
1272 instance_id: &str,
1273 delta_tokens: u64,
1274 delta_steps: u32,
1275 delta_tool_calls: u32,
1276 ) -> bool {
1277 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1278 enabled: false,
1279 require_justification: false,
1280 max_agents: None,
1281 max_concurrent: None,
1282 child_budget_percent_of_parent_remaining: None,
1283 mission_total_budget: None,
1284 cost_per_1k_tokens_usd: None,
1285 spawn_edges: HashMap::new(),
1286 required_skills: HashMap::new(),
1287 role_defaults: HashMap::new(),
1288 skill_sources: Default::default(),
1289 });
1290 let mut budgets = self.budgets.write().await;
1291 let Some(usage) = budgets.get_mut(instance_id) else {
1292 return false;
1293 };
1294 if usage.exhausted {
1295 return true;
1296 }
1297 let prev_cost_used_usd = usage.cost_used_usd;
1298 usage.tokens_used = usage.tokens_used.saturating_add(delta_tokens);
1299 usage.steps_used = usage.steps_used.saturating_add(delta_steps);
1300 usage.tool_calls_used = usage.tool_calls_used.saturating_add(delta_tool_calls);
1301 if let Some(rate) = policy.cost_per_1k_tokens_usd {
1302 usage.cost_used_usd += (delta_tokens as f64 / 1000.0) * rate;
1303 }
1304 let elapsed_ms = usage
1305 .started_at
1306 .map(|started| started.elapsed().as_millis() as u64)
1307 .unwrap_or(0);
1308
1309 let mut exhausted_reason: Option<&'static str> = None;
1310 let mut snapshot: Option<AgentInstance> = None;
1311 {
1312 let mut instances = self.instances.write().await;
1313 if let Some(instance) = instances.get_mut(instance_id) {
1314 instance.metadata = Some(merge_metadata_usage(
1315 instance.metadata.take(),
1316 usage.tokens_used,
1317 usage.steps_used,
1318 usage.tool_calls_used,
1319 usage.cost_used_usd,
1320 elapsed_ms,
1321 ));
1322 if let Some(limit) = instance.budget.max_tokens {
1323 if usage.tokens_used >= limit {
1324 exhausted_reason = Some("max_tokens");
1325 }
1326 }
1327 if exhausted_reason.is_none() {
1328 if let Some(limit) = instance.budget.max_steps {
1329 if usage.steps_used >= limit {
1330 exhausted_reason = Some("max_steps");
1331 }
1332 }
1333 }
1334 if exhausted_reason.is_none() {
1335 if let Some(limit) = instance.budget.max_tool_calls {
1336 if usage.tool_calls_used >= limit {
1337 exhausted_reason = Some("max_tool_calls");
1338 }
1339 }
1340 }
1341 if exhausted_reason.is_none() {
1342 if let Some(limit) = instance.budget.max_duration_ms {
1343 if elapsed_ms >= limit {
1344 exhausted_reason = Some("max_duration_ms");
1345 }
1346 }
1347 }
1348 if exhausted_reason.is_none() {
1349 if let Some(limit) = instance.budget.max_cost_usd {
1350 if usage.cost_used_usd >= limit {
1351 exhausted_reason = Some("max_cost_usd");
1352 }
1353 }
1354 }
1355 snapshot = Some(instance.clone());
1356 }
1357 }
1358 let Some(instance) = snapshot else {
1359 return false;
1360 };
1361 emit_budget_usage(
1362 state,
1363 &instance,
1364 usage.tokens_used,
1365 usage.steps_used,
1366 usage.tool_calls_used,
1367 usage.cost_used_usd,
1368 elapsed_ms,
1369 );
1370 let mission_exhausted = self
1371 .apply_mission_budget_delta(
1372 state,
1373 &instance.mission_id,
1374 delta_tokens,
1375 u64::from(delta_steps),
1376 u64::from(delta_tool_calls),
1377 usage.cost_used_usd - prev_cost_used_usd,
1378 &policy,
1379 )
1380 .await;
1381 if mission_exhausted {
1382 usage.exhausted = true;
1383 let _ = self
1384 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1385 .await;
1386 return true;
1387 }
1388 if let Some(reason) = exhausted_reason {
1389 usage.exhausted = true;
1390 emit_budget_exhausted(
1391 state,
1392 &instance,
1393 reason,
1394 usage.tokens_used,
1395 usage.steps_used,
1396 usage.tool_calls_used,
1397 usage.cost_used_usd,
1398 elapsed_ms,
1399 );
1400 return true;
1401 }
1402 false
1403 }
1404
1405 async fn apply_exact_token_usage(
1406 &self,
1407 state: &AppState,
1408 instance_id: &str,
1409 total_tokens: u64,
1410 cost_used_usd: f64,
1411 ) -> bool {
1412 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1413 enabled: false,
1414 require_justification: false,
1415 max_agents: None,
1416 max_concurrent: None,
1417 child_budget_percent_of_parent_remaining: None,
1418 mission_total_budget: None,
1419 cost_per_1k_tokens_usd: None,
1420 spawn_edges: HashMap::new(),
1421 required_skills: HashMap::new(),
1422 role_defaults: HashMap::new(),
1423 skill_sources: Default::default(),
1424 });
1425 let mut budgets = self.budgets.write().await;
1426 let Some(usage) = budgets.get_mut(instance_id) else {
1427 return false;
1428 };
1429 if usage.exhausted {
1430 return true;
1431 }
1432 let prev_tokens = usage.tokens_used;
1433 let prev_cost_used_usd = usage.cost_used_usd;
1434 usage.tokens_used = usage.tokens_used.max(total_tokens);
1435 if cost_used_usd > 0.0 {
1436 usage.cost_used_usd = usage.cost_used_usd.max(cost_used_usd);
1437 } else if let Some(rate) = policy.cost_per_1k_tokens_usd {
1438 let delta = usage.tokens_used.saturating_sub(prev_tokens);
1439 usage.cost_used_usd += (delta as f64 / 1000.0) * rate;
1440 }
1441 let elapsed_ms = usage
1442 .started_at
1443 .map(|started| started.elapsed().as_millis() as u64)
1444 .unwrap_or(0);
1445 let mut exhausted_reason: Option<&'static str> = None;
1446 let mut snapshot: Option<AgentInstance> = None;
1447 {
1448 let mut instances = self.instances.write().await;
1449 if let Some(instance) = instances.get_mut(instance_id) {
1450 instance.metadata = Some(merge_metadata_usage(
1451 instance.metadata.take(),
1452 usage.tokens_used,
1453 usage.steps_used,
1454 usage.tool_calls_used,
1455 usage.cost_used_usd,
1456 elapsed_ms,
1457 ));
1458 if let Some(limit) = instance.budget.max_tokens {
1459 if usage.tokens_used >= limit {
1460 exhausted_reason = Some("max_tokens");
1461 }
1462 }
1463 if exhausted_reason.is_none() {
1464 if let Some(limit) = instance.budget.max_cost_usd {
1465 if usage.cost_used_usd >= limit {
1466 exhausted_reason = Some("max_cost_usd");
1467 }
1468 }
1469 }
1470 snapshot = Some(instance.clone());
1471 }
1472 }
1473 let Some(instance) = snapshot else {
1474 return false;
1475 };
1476 emit_budget_usage(
1477 state,
1478 &instance,
1479 usage.tokens_used,
1480 usage.steps_used,
1481 usage.tool_calls_used,
1482 usage.cost_used_usd,
1483 elapsed_ms,
1484 );
1485 let mission_exhausted = self
1486 .apply_mission_budget_delta(
1487 state,
1488 &instance.mission_id,
1489 usage.tokens_used.saturating_sub(prev_tokens),
1490 0,
1491 0,
1492 usage.cost_used_usd - prev_cost_used_usd,
1493 &policy,
1494 )
1495 .await;
1496 if mission_exhausted {
1497 usage.exhausted = true;
1498 let _ = self
1499 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1500 .await;
1501 return true;
1502 }
1503 if let Some(reason) = exhausted_reason {
1504 usage.exhausted = true;
1505 emit_budget_exhausted(
1506 state,
1507 &instance,
1508 reason,
1509 usage.tokens_used,
1510 usage.steps_used,
1511 usage.tool_calls_used,
1512 usage.cost_used_usd,
1513 elapsed_ms,
1514 );
1515 return true;
1516 }
1517 false
1518 }
1519
1520 async fn append_audit(&self, action: &str, instance: &AgentInstance) -> anyhow::Result<()> {
1521 let path = self.audit_path.read().await.clone();
1522 if let Some(parent) = path.parent() {
1523 fs::create_dir_all(parent).await?;
1524 }
1525 let row = json!({
1526 "action": action,
1527 "missionID": instance.mission_id,
1528 "instanceID": instance.instance_id,
1529 "parentInstanceID": instance.parent_instance_id,
1530 "role": instance.role,
1531 "templateID": instance.template_id,
1532 "sessionID": instance.session_id,
1533 "skillHash": instance.skill_hash,
1534 "workspaceRoot": instance_workspace_root(instance),
1535 "workspaceRepoRoot": instance_workspace_repo_root(instance),
1536 "managedWorktree": instance_managed_worktree(instance),
1537 "timestampMs": crate::now_ms(),
1538 });
1539 let mut existing = if path.exists() {
1540 fs::read_to_string(&path).await.unwrap_or_default()
1541 } else {
1542 String::new()
1543 };
1544 existing.push_str(&serde_json::to_string(&row)?);
1545 existing.push('\n');
1546 fs::write(path, existing).await?;
1547 Ok(())
1548 }
1549
1550 async fn append_approval_audit(
1551 &self,
1552 action: &str,
1553 approval_id: &str,
1554 instance: Option<&AgentInstance>,
1555 reason: &str,
1556 ) -> anyhow::Result<()> {
1557 let path = self.audit_path.read().await.clone();
1558 if let Some(parent) = path.parent() {
1559 fs::create_dir_all(parent).await?;
1560 }
1561 let row = json!({
1562 "action": action,
1563 "approvalID": approval_id,
1564 "reason": reason,
1565 "missionID": instance.map(|v| v.mission_id.clone()),
1566 "instanceID": instance.map(|v| v.instance_id.clone()),
1567 "parentInstanceID": instance.and_then(|v| v.parent_instance_id.clone()),
1568 "role": instance.map(|v| v.role.clone()),
1569 "templateID": instance.map(|v| v.template_id.clone()),
1570 "sessionID": instance.map(|v| v.session_id.clone()),
1571 "skillHash": instance.map(|v| v.skill_hash.clone()),
1572 "workspaceRoot": instance.and_then(instance_workspace_root),
1573 "workspaceRepoRoot": instance.and_then(instance_workspace_repo_root),
1574 "managedWorktree": instance.and_then(instance_managed_worktree),
1575 "timestampMs": crate::now_ms(),
1576 });
1577 let mut existing = if path.exists() {
1578 fs::read_to_string(&path).await.unwrap_or_default()
1579 } else {
1580 String::new()
1581 };
1582 existing.push_str(&serde_json::to_string(&row)?);
1583 existing.push('\n');
1584 fs::write(path, existing).await?;
1585 Ok(())
1586 }
1587
1588 async fn apply_mission_budget_delta(
1589 &self,
1590 state: &AppState,
1591 mission_id: &str,
1592 delta_tokens: u64,
1593 delta_steps: u64,
1594 delta_tool_calls: u64,
1595 delta_cost_used_usd: f64,
1596 policy: &SpawnPolicy,
1597 ) -> bool {
1598 let mut budgets = self.mission_budgets.write().await;
1599 let row = budgets.entry(mission_id.to_string()).or_default();
1600 row.tokens_used = row.tokens_used.saturating_add(delta_tokens);
1601 row.steps_used = row.steps_used.saturating_add(delta_steps);
1602 row.tool_calls_used = row.tool_calls_used.saturating_add(delta_tool_calls);
1603 row.cost_used_usd += delta_cost_used_usd.max(0.0);
1604 if row.exhausted {
1605 return true;
1606 }
1607 let Some(limit) = policy.mission_total_budget.as_ref() else {
1608 return false;
1609 };
1610 let mut exhausted_by: Option<&'static str> = None;
1611 if let Some(max) = limit.max_tokens {
1612 if row.tokens_used >= max {
1613 exhausted_by = Some("mission_max_tokens");
1614 }
1615 }
1616 if exhausted_by.is_none() {
1617 if let Some(max) = limit.max_steps {
1618 if row.steps_used >= u64::from(max) {
1619 exhausted_by = Some("mission_max_steps");
1620 }
1621 }
1622 }
1623 if exhausted_by.is_none() {
1624 if let Some(max) = limit.max_tool_calls {
1625 if row.tool_calls_used >= u64::from(max) {
1626 exhausted_by = Some("mission_max_tool_calls");
1627 }
1628 }
1629 }
1630 if exhausted_by.is_none() {
1631 if let Some(max) = limit.max_cost_usd {
1632 if row.cost_used_usd >= max {
1633 exhausted_by = Some("mission_max_cost_usd");
1634 }
1635 }
1636 }
1637 if let Some(exhausted_by) = exhausted_by {
1638 row.exhausted = true;
1639 emit_mission_budget_exhausted(
1640 state,
1641 mission_id,
1642 exhausted_by,
1643 row.tokens_used,
1644 row.steps_used,
1645 row.tool_calls_used,
1646 row.cost_used_usd,
1647 );
1648 return true;
1649 }
1650 false
1651 }
1652
1653 pub async fn set_for_test(
1654 &self,
1655 workspace_root: Option<String>,
1656 policy: Option<SpawnPolicy>,
1657 templates: Vec<AgentTemplate>,
1658 ) {
1659 *self.policy.write().await = policy;
1660 let mut by_id = HashMap::new();
1661 for template in templates {
1662 by_id.insert(template.template_id.clone(), template);
1663 }
1664 *self.templates.write().await = by_id;
1665 self.instances.write().await.clear();
1666 self.budgets.write().await.clear();
1667 self.mission_budgets.write().await.clear();
1668 self.spawn_approvals.write().await.clear();
1669 *self.loaded_workspace.write().await = workspace_root;
1670 }
1671}
1672
1673fn resolve_budget(
1674 policy: &SpawnPolicy,
1675 parent_instance: Option<AgentInstance>,
1676 parent_usage: Option<InstanceBudgetState>,
1677 template: &AgentTemplate,
1678 override_budget: Option<BudgetLimit>,
1679 role: &AgentRole,
1680) -> BudgetLimit {
1681 let role_default = policy.role_defaults.get(role).cloned().unwrap_or_default();
1682 let mut chosen = merge_budget(
1683 merge_budget(role_default, template.default_budget.clone()),
1684 override_budget.unwrap_or_default(),
1685 );
1686
1687 if let Some(parent) = parent_instance {
1688 let usage = parent_usage.unwrap_or_default();
1689 if let Some(pct) = policy.child_budget_percent_of_parent_remaining {
1690 if pct > 0 {
1691 chosen.max_tokens = cap_budget_remaining_u64(
1692 chosen.max_tokens,
1693 parent.budget.max_tokens,
1694 usage.tokens_used,
1695 pct,
1696 );
1697 chosen.max_steps = cap_budget_remaining_u32(
1698 chosen.max_steps,
1699 parent.budget.max_steps,
1700 usage.steps_used,
1701 pct,
1702 );
1703 chosen.max_tool_calls = cap_budget_remaining_u32(
1704 chosen.max_tool_calls,
1705 parent.budget.max_tool_calls,
1706 usage.tool_calls_used,
1707 pct,
1708 );
1709 chosen.max_duration_ms = cap_budget_remaining_u64(
1710 chosen.max_duration_ms,
1711 parent.budget.max_duration_ms,
1712 usage
1713 .started_at
1714 .map(|started| started.elapsed().as_millis() as u64)
1715 .unwrap_or(0),
1716 pct,
1717 );
1718 chosen.max_cost_usd = cap_budget_remaining_f64(
1719 chosen.max_cost_usd,
1720 parent.budget.max_cost_usd,
1721 usage.cost_used_usd,
1722 pct,
1723 );
1724 }
1725 }
1726 }
1727 chosen
1728}
1729
1730fn merge_budget(base: BudgetLimit, overlay: BudgetLimit) -> BudgetLimit {
1731 BudgetLimit {
1732 max_tokens: overlay.max_tokens.or(base.max_tokens),
1733 max_steps: overlay.max_steps.or(base.max_steps),
1734 max_tool_calls: overlay.max_tool_calls.or(base.max_tool_calls),
1735 max_duration_ms: overlay.max_duration_ms.or(base.max_duration_ms),
1736 max_cost_usd: overlay.max_cost_usd.or(base.max_cost_usd),
1737 }
1738}
1739
1740fn cap_budget_remaining_u64(
1741 child: Option<u64>,
1742 parent_limit: Option<u64>,
1743 parent_used: u64,
1744 pct: u8,
1745) -> Option<u64> {
1746 match (child, parent_limit) {
1747 (Some(child), Some(parent_limit)) => {
1748 let remaining = parent_limit.saturating_sub(parent_used);
1749 Some(child.min(remaining.saturating_mul(pct as u64) / 100))
1750 }
1751 (None, Some(parent_limit)) => {
1752 let remaining = parent_limit.saturating_sub(parent_used);
1753 Some(remaining.saturating_mul(pct as u64) / 100)
1754 }
1755 (Some(child), None) => Some(child),
1756 (None, None) => None,
1757 }
1758}
1759
1760fn cap_budget_remaining_u32(
1761 child: Option<u32>,
1762 parent_limit: Option<u32>,
1763 parent_used: u32,
1764 pct: u8,
1765) -> Option<u32> {
1766 match (child, parent_limit) {
1767 (Some(child), Some(parent_limit)) => {
1768 let remaining = parent_limit.saturating_sub(parent_used);
1769 Some(child.min(remaining.saturating_mul(pct as u32) / 100))
1770 }
1771 (None, Some(parent_limit)) => {
1772 let remaining = parent_limit.saturating_sub(parent_used);
1773 Some(remaining.saturating_mul(pct as u32) / 100)
1774 }
1775 (Some(child), None) => Some(child),
1776 (None, None) => None,
1777 }
1778}
1779
1780fn cap_budget_remaining_f64(
1781 child: Option<f64>,
1782 parent_limit: Option<f64>,
1783 parent_used: f64,
1784 pct: u8,
1785) -> Option<f64> {
1786 match (child, parent_limit) {
1787 (Some(child), Some(parent_limit)) => {
1788 let remaining = (parent_limit - parent_used).max(0.0);
1789 Some(child.min(remaining * f64::from(pct) / 100.0))
1790 }
1791 (None, Some(parent_limit)) => {
1792 let remaining = (parent_limit - parent_used).max(0.0);
1793 Some(remaining * f64::from(pct) / 100.0)
1794 }
1795 (Some(child), None) => Some(child),
1796 (None, None) => None,
1797 }
1798}
1799
1800async fn compute_skill_hash(
1801 workspace_root: &str,
1802 template: &AgentTemplate,
1803 policy: &SpawnPolicy,
1804) -> Result<String, String> {
1805 use sha2::{Digest, Sha256};
1806 let mut rows = Vec::new();
1807 let skill_service = SkillService::for_workspace(Some(PathBuf::from(workspace_root)));
1808 for skill in &template.skills {
1809 if let Some(path) = skill.path.as_deref() {
1810 validate_skill_source(skill.id.as_deref(), Some(path), policy)?;
1811 let skill_path = Path::new(workspace_root).join(path);
1812 let raw = fs::read_to_string(&skill_path)
1813 .await
1814 .map_err(|_| format!("missing required skill path `{}`", skill_path.display()))?;
1815 let digest = hash_hex(raw.as_bytes());
1816 validate_pinned_hash(skill.id.as_deref(), Some(path), &digest, policy)?;
1817 rows.push(format!("path:{}:{}", path, digest));
1818 } else if let Some(id) = skill.id.as_deref() {
1819 validate_skill_source(Some(id), None, policy)?;
1820 let loaded = skill_service
1821 .load_skill(id)
1822 .map_err(|err| format!("failed loading skill `{id}`: {err}"))?;
1823 let Some(loaded) = loaded else {
1824 return Err(format!("missing required skill id `{id}`"));
1825 };
1826 let digest = hash_hex(loaded.content.as_bytes());
1827 validate_pinned_hash(Some(id), None, &digest, policy)?;
1828 rows.push(format!("id:{}:{}", id, digest));
1829 }
1830 }
1831 rows.sort();
1832 let mut hasher = Sha256::new();
1833 for row in rows {
1834 hasher.update(row.as_bytes());
1835 hasher.update(b"\n");
1836 }
1837 let digest = hasher.finalize();
1838 Ok(format!("sha256:{}", hash_hex(digest.as_slice())))
1839}
1840
1841fn validate_skill_source(
1842 id: Option<&str>,
1843 path: Option<&str>,
1844 policy: &SpawnPolicy,
1845) -> Result<(), String> {
1846 use tandem_orchestrator::SkillSourceMode;
1847 match policy.skill_sources.mode {
1848 SkillSourceMode::Any => Ok(()),
1849 SkillSourceMode::ProjectOnly => {
1850 if id.is_some() {
1851 return Err("skill source denied: project_only forbids skill IDs".to_string());
1852 }
1853 let Some(path) = path else {
1854 return Err("skill source denied: project_only requires skill path".to_string());
1855 };
1856 let p = PathBuf::from(path);
1857 if p.is_absolute() {
1858 return Err("skill source denied: absolute skill paths are forbidden".to_string());
1859 }
1860 Ok(())
1861 }
1862 SkillSourceMode::Allowlist => {
1863 if let Some(id) = id {
1864 if policy.skill_sources.allowlist_ids.iter().any(|v| v == id) {
1865 return Ok(());
1866 }
1867 }
1868 if let Some(path) = path {
1869 if policy
1870 .skill_sources
1871 .allowlist_paths
1872 .iter()
1873 .any(|v| v == path)
1874 {
1875 return Ok(());
1876 }
1877 }
1878 Err("skill source denied: not present in allowlist".to_string())
1879 }
1880 }
1881}
1882
1883fn validate_pinned_hash(
1884 id: Option<&str>,
1885 path: Option<&str>,
1886 digest: &str,
1887 policy: &SpawnPolicy,
1888) -> Result<(), String> {
1889 let by_id = id.and_then(|id| policy.skill_sources.pinned_hashes.get(&format!("id:{id}")));
1890 let by_path = path.and_then(|path| {
1891 policy
1892 .skill_sources
1893 .pinned_hashes
1894 .get(&format!("path:{path}"))
1895 });
1896 let expected = by_id.or(by_path);
1897 if let Some(expected) = expected {
1898 let normalized = expected.strip_prefix("sha256:").unwrap_or(expected);
1899 if normalized != digest {
1900 return Err("pinned hash mismatch for skill reference".to_string());
1901 }
1902 }
1903 Ok(())
1904}
1905
1906fn hash_hex(bytes: &[u8]) -> String {
1907 let mut out = String::with_capacity(bytes.len() * 2);
1908 for byte in bytes {
1909 use std::fmt::Write as _;
1910 let _ = write!(&mut out, "{:02x}", byte);
1911 }
1912 out
1913}
1914
1915fn estimate_tokens(text: &str) -> u64 {
1916 let chars = text.chars().count() as u64;
1917 (chars / 4).max(1)
1918}
1919
1920fn extract_session_id(event: &EngineEvent) -> Option<String> {
1921 event
1922 .properties
1923 .get("sessionID")
1924 .and_then(|v| v.as_str())
1925 .map(|v| v.to_string())
1926 .or_else(|| {
1927 event
1928 .properties
1929 .get("part")
1930 .and_then(|v| v.get("sessionID"))
1931 .and_then(|v| v.as_str())
1932 .map(|v| v.to_string())
1933 })
1934}