1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::Context;
7use futures::future::BoxFuture;
8use serde::Deserialize;
9use serde::Serialize;
10use serde_json::{json, Value};
11use tandem_core::{
12 any_policy_matches, SpawnAgentHook, SpawnAgentToolContext, SpawnAgentToolResult,
13 ToolPolicyContext, ToolPolicyDecision, ToolPolicyHook,
14};
15use tandem_orchestrator::{
16 AgentInstance, AgentInstanceStatus, AgentRole, AgentTemplate, BudgetLimit, SpawnDecision,
17 SpawnDenyCode, SpawnPolicy, SpawnRequest, SpawnSource,
18};
19use tandem_skills::SkillService;
20use tandem_types::{EngineEvent, Session};
21use tokio::fs;
22use tokio::sync::RwLock;
23use uuid::Uuid;
24
25use crate::AppState;
26
27#[derive(Clone, Default)]
28pub struct AgentTeamRuntime {
29 policy: Arc<RwLock<Option<SpawnPolicy>>>,
30 templates: Arc<RwLock<HashMap<String, AgentTemplate>>>,
31 instances: Arc<RwLock<HashMap<String, AgentInstance>>>,
32 budgets: Arc<RwLock<HashMap<String, InstanceBudgetState>>>,
33 mission_budgets: Arc<RwLock<HashMap<String, MissionBudgetState>>>,
34 spawn_approvals: Arc<RwLock<HashMap<String, PendingSpawnApproval>>>,
35 loaded_workspace: Arc<RwLock<Option<String>>>,
36 audit_path: Arc<RwLock<PathBuf>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct SpawnResult {
41 pub decision: SpawnDecision,
42 pub instance: Option<AgentInstance>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46pub struct AgentMissionSummary {
47 #[serde(rename = "missionID")]
48 pub mission_id: String,
49 #[serde(rename = "instanceCount")]
50 pub instance_count: usize,
51 #[serde(rename = "runningCount")]
52 pub running_count: usize,
53 #[serde(rename = "completedCount")]
54 pub completed_count: usize,
55 #[serde(rename = "failedCount")]
56 pub failed_count: usize,
57 #[serde(rename = "cancelledCount")]
58 pub cancelled_count: usize,
59 #[serde(rename = "queuedCount")]
60 pub queued_count: usize,
61 #[serde(rename = "tokenUsedTotal")]
62 pub token_used_total: u64,
63 #[serde(rename = "toolCallsUsedTotal")]
64 pub tool_calls_used_total: u64,
65 #[serde(rename = "stepsUsedTotal")]
66 pub steps_used_total: u64,
67 #[serde(rename = "costUsedUsdTotal")]
68 pub cost_used_usd_total: f64,
69}
70
71#[derive(Debug, Clone, Default)]
72struct InstanceBudgetState {
73 tokens_used: u64,
74 steps_used: u32,
75 tool_calls_used: u32,
76 cost_used_usd: f64,
77 started_at: Option<Instant>,
78 exhausted: bool,
79}
80
81#[derive(Debug, Clone, Default)]
82struct MissionBudgetState {
83 tokens_used: u64,
84 steps_used: u64,
85 tool_calls_used: u64,
86 cost_used_usd: f64,
87 exhausted: bool,
88}
89
90#[derive(Debug, Clone, Serialize)]
91pub struct PendingSpawnApproval {
92 #[serde(rename = "approvalID")]
93 pub approval_id: String,
94 #[serde(rename = "createdAtMs")]
95 pub created_at_ms: u64,
96 pub request: SpawnRequest,
97 #[serde(rename = "decisionCode")]
98 pub decision_code: Option<SpawnDenyCode>,
99 pub reason: Option<String>,
100}
101
102#[derive(Clone)]
103pub struct ServerSpawnAgentHook {
104 state: AppState,
105}
106
107#[derive(Debug, Deserialize)]
108struct SpawnAgentToolInput {
109 #[serde(rename = "missionID")]
110 mission_id: Option<String>,
111 #[serde(rename = "parentInstanceID")]
112 parent_instance_id: Option<String>,
113 #[serde(rename = "templateID")]
114 template_id: Option<String>,
115 role: AgentRole,
116 source: Option<SpawnSource>,
117 justification: String,
118 #[serde(rename = "budgetOverride", default)]
119 budget_override: Option<BudgetLimit>,
120}
121
122impl ServerSpawnAgentHook {
123 pub fn new(state: AppState) -> Self {
124 Self { state }
125 }
126}
127
128impl SpawnAgentHook for ServerSpawnAgentHook {
129 fn spawn_agent(
130 &self,
131 ctx: SpawnAgentToolContext,
132 ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>> {
133 let state = self.state.clone();
134 Box::pin(async move {
135 let parsed = serde_json::from_value::<SpawnAgentToolInput>(ctx.args.clone());
136 let input = match parsed {
137 Ok(input) => input,
138 Err(err) => {
139 return Ok(SpawnAgentToolResult {
140 output: format!("spawn_agent denied: invalid args ({err})"),
141 metadata: json!({
142 "ok": false,
143 "code": "SPAWN_INVALID_ARGS",
144 "error": err.to_string(),
145 }),
146 });
147 }
148 };
149 let req = SpawnRequest {
150 mission_id: input.mission_id,
151 parent_instance_id: input.parent_instance_id,
152 source: input.source.unwrap_or(SpawnSource::ToolCall),
153 parent_role: None,
154 role: input.role,
155 template_id: input.template_id,
156 justification: input.justification,
157 budget_override: input.budget_override,
158 };
159
160 let event_ctx = SpawnEventContext {
161 session_id: Some(ctx.session_id.as_str()),
162 message_id: Some(ctx.message_id.as_str()),
163 run_id: None,
164 };
165 emit_spawn_requested_with_context(&state, &req, &event_ctx);
166 let result = state.agent_teams.spawn(&state, req.clone()).await;
167 if !result.decision.allowed || result.instance.is_none() {
168 emit_spawn_denied_with_context(&state, &req, &result.decision, &event_ctx);
169 return Ok(SpawnAgentToolResult {
170 output: result
171 .decision
172 .reason
173 .clone()
174 .unwrap_or_else(|| "spawn_agent denied".to_string()),
175 metadata: json!({
176 "ok": false,
177 "code": result.decision.code,
178 "error": result.decision.reason,
179 "requiresUserApproval": result.decision.requires_user_approval,
180 }),
181 });
182 }
183 let instance = result.instance.expect("checked is_some");
184 emit_spawn_approved_with_context(&state, &req, &instance, &event_ctx);
185 Ok(SpawnAgentToolResult {
186 output: format!(
187 "spawned {} as instance {} (session {})",
188 instance.template_id, instance.instance_id, instance.session_id
189 ),
190 metadata: json!({
191 "ok": true,
192 "missionID": instance.mission_id,
193 "instanceID": instance.instance_id,
194 "sessionID": instance.session_id,
195 "runID": instance.run_id,
196 "status": instance.status,
197 "skillHash": instance.skill_hash,
198 }),
199 })
200 })
201 }
202}
203
204#[derive(Clone)]
205pub struct ServerToolPolicyHook {
206 state: AppState,
207}
208
209impl ServerToolPolicyHook {
210 pub fn new(state: AppState) -> Self {
211 Self { state }
212 }
213}
214
215impl ToolPolicyHook for ServerToolPolicyHook {
216 fn evaluate_tool(
217 &self,
218 ctx: ToolPolicyContext,
219 ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>> {
220 let state = self.state.clone();
221 Box::pin(async move {
222 let tool = normalize_tool_name(&ctx.tool);
223 if let Some(policy) = state.routine_session_policy(&ctx.session_id).await {
224 let allowed_patterns = policy
225 .allowed_tools
226 .iter()
227 .map(|name| normalize_tool_name(name))
228 .collect::<Vec<_>>();
229 if !policy.allowed_tools.is_empty() && !any_policy_matches(&allowed_patterns, &tool)
230 {
231 let reason = format!(
232 "tool `{}` is not allowed for routine `{}` (run `{}`)",
233 tool, policy.routine_id, policy.run_id
234 );
235 state.event_bus.publish(EngineEvent::new(
236 "routine.tool.denied",
237 json!({
238 "sessionID": ctx.session_id,
239 "messageID": ctx.message_id,
240 "runID": policy.run_id,
241 "routineID": policy.routine_id,
242 "tool": tool,
243 "reason": reason,
244 "timestampMs": crate::now_ms(),
245 }),
246 ));
247 return Ok(ToolPolicyDecision {
248 allowed: false,
249 reason: Some(reason),
250 });
251 }
252 }
253
254 let Some(instance) = state
255 .agent_teams
256 .instance_for_session(&ctx.session_id)
257 .await
258 else {
259 return Ok(ToolPolicyDecision {
260 allowed: true,
261 reason: None,
262 });
263 };
264 let caps = instance.capabilities.clone();
265 let deny = evaluate_capability_deny(
266 &state,
267 &instance,
268 &tool,
269 &ctx.args,
270 &caps,
271 &ctx.session_id,
272 &ctx.message_id,
273 )
274 .await;
275 if let Some(reason) = deny {
276 state.event_bus.publish(EngineEvent::new(
277 "agent_team.capability.denied",
278 json!({
279 "sessionID": ctx.session_id,
280 "messageID": ctx.message_id,
281 "runID": instance.run_id,
282 "missionID": instance.mission_id,
283 "instanceID": instance.instance_id,
284 "tool": tool,
285 "reason": reason,
286 "timestampMs": crate::now_ms(),
287 }),
288 ));
289 return Ok(ToolPolicyDecision {
290 allowed: false,
291 reason: Some(reason),
292 });
293 }
294 Ok(ToolPolicyDecision {
295 allowed: true,
296 reason: None,
297 })
298 })
299 }
300}
301
302impl AgentTeamRuntime {
303 pub fn new(audit_path: PathBuf) -> Self {
304 Self {
305 policy: Arc::new(RwLock::new(None)),
306 templates: Arc::new(RwLock::new(HashMap::new())),
307 instances: Arc::new(RwLock::new(HashMap::new())),
308 budgets: Arc::new(RwLock::new(HashMap::new())),
309 mission_budgets: Arc::new(RwLock::new(HashMap::new())),
310 spawn_approvals: Arc::new(RwLock::new(HashMap::new())),
311 loaded_workspace: Arc::new(RwLock::new(None)),
312 audit_path: Arc::new(RwLock::new(audit_path)),
313 }
314 }
315
316 pub async fn set_audit_path(&self, path: PathBuf) {
317 *self.audit_path.write().await = path;
318 }
319
320 pub async fn list_templates(&self) -> Vec<AgentTemplate> {
321 let mut rows = self
322 .templates
323 .read()
324 .await
325 .values()
326 .cloned()
327 .collect::<Vec<_>>();
328 rows.sort_by(|a, b| a.template_id.cmp(&b.template_id));
329 rows
330 }
331
332 async fn templates_dir_for_loaded_workspace(&self) -> anyhow::Result<PathBuf> {
333 let workspace = self
334 .loaded_workspace
335 .read()
336 .await
337 .clone()
338 .ok_or_else(|| anyhow::anyhow!("agent team workspace not loaded"))?;
339 Ok(PathBuf::from(workspace)
340 .join(".tandem")
341 .join("agent-team")
342 .join("templates"))
343 }
344
345 fn template_filename(template_id: &str) -> String {
346 let safe = template_id
347 .trim()
348 .chars()
349 .map(|ch| {
350 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
351 ch
352 } else {
353 '_'
354 }
355 })
356 .collect::<String>();
357 let fallback = if safe.is_empty() {
358 "template".to_string()
359 } else {
360 safe
361 };
362 format!("{fallback}.yaml")
363 }
364
365 pub async fn upsert_template(
366 &self,
367 workspace_root: &str,
368 template: AgentTemplate,
369 ) -> anyhow::Result<AgentTemplate> {
370 self.ensure_loaded_for_workspace(workspace_root).await?;
371 let templates_dir = self.templates_dir_for_loaded_workspace().await?;
372 fs::create_dir_all(&templates_dir).await?;
373 let path = templates_dir.join(Self::template_filename(&template.template_id));
374 let payload = serde_yaml::to_string(&template)?;
375 fs::write(path, payload).await?;
376 self.templates
377 .write()
378 .await
379 .insert(template.template_id.clone(), template.clone());
380 Ok(template)
381 }
382
383 pub async fn delete_template(
384 &self,
385 workspace_root: &str,
386 template_id: &str,
387 ) -> anyhow::Result<bool> {
388 self.ensure_loaded_for_workspace(workspace_root).await?;
389 let templates_dir = self.templates_dir_for_loaded_workspace().await?;
390 let path = templates_dir.join(Self::template_filename(template_id));
391 let existed = self.templates.write().await.remove(template_id).is_some();
392 if path.exists() {
393 let _ = fs::remove_file(path).await;
394 }
395 Ok(existed)
396 }
397
398 pub async fn list_instances(
399 &self,
400 mission_id: Option<&str>,
401 parent_instance_id: Option<&str>,
402 status: Option<AgentInstanceStatus>,
403 ) -> Vec<AgentInstance> {
404 let mut rows = self
405 .instances
406 .read()
407 .await
408 .values()
409 .filter(|instance| {
410 if let Some(mission_id) = mission_id {
411 if instance.mission_id != mission_id {
412 return false;
413 }
414 }
415 if let Some(parent_id) = parent_instance_id {
416 if instance.parent_instance_id.as_deref() != Some(parent_id) {
417 return false;
418 }
419 }
420 if let Some(status) = &status {
421 if &instance.status != status {
422 return false;
423 }
424 }
425 true
426 })
427 .cloned()
428 .collect::<Vec<_>>();
429 rows.sort_by(|a, b| a.instance_id.cmp(&b.instance_id));
430 rows
431 }
432
433 pub async fn list_mission_summaries(&self) -> Vec<AgentMissionSummary> {
434 let instances = self.instances.read().await;
435 let mut by_mission: HashMap<String, AgentMissionSummary> = HashMap::new();
436 for instance in instances.values() {
437 let row = by_mission
438 .entry(instance.mission_id.clone())
439 .or_insert_with(|| AgentMissionSummary {
440 mission_id: instance.mission_id.clone(),
441 instance_count: 0,
442 running_count: 0,
443 completed_count: 0,
444 failed_count: 0,
445 cancelled_count: 0,
446 queued_count: 0,
447 token_used_total: 0,
448 tool_calls_used_total: 0,
449 steps_used_total: 0,
450 cost_used_usd_total: 0.0,
451 });
452 row.instance_count = row.instance_count.saturating_add(1);
453 match instance.status {
454 AgentInstanceStatus::Queued => {
455 row.queued_count = row.queued_count.saturating_add(1)
456 }
457 AgentInstanceStatus::Running => {
458 row.running_count = row.running_count.saturating_add(1)
459 }
460 AgentInstanceStatus::Completed => {
461 row.completed_count = row.completed_count.saturating_add(1)
462 }
463 AgentInstanceStatus::Failed => {
464 row.failed_count = row.failed_count.saturating_add(1)
465 }
466 AgentInstanceStatus::Cancelled => {
467 row.cancelled_count = row.cancelled_count.saturating_add(1)
468 }
469 }
470 if let Some(usage) = instance
471 .metadata
472 .as_ref()
473 .and_then(|m| m.get("budgetUsage"))
474 .and_then(|u| u.as_object())
475 {
476 row.token_used_total = row.token_used_total.saturating_add(
477 usage
478 .get("tokensUsed")
479 .and_then(|v| v.as_u64())
480 .unwrap_or(0),
481 );
482 row.tool_calls_used_total = row.tool_calls_used_total.saturating_add(
483 usage
484 .get("toolCallsUsed")
485 .and_then(|v| v.as_u64())
486 .unwrap_or(0),
487 );
488 row.steps_used_total = row
489 .steps_used_total
490 .saturating_add(usage.get("stepsUsed").and_then(|v| v.as_u64()).unwrap_or(0));
491 row.cost_used_usd_total += usage
492 .get("costUsedUsd")
493 .and_then(|v| v.as_f64())
494 .unwrap_or(0.0);
495 }
496 }
497 let mut rows = by_mission.into_values().collect::<Vec<_>>();
498 rows.sort_by(|a, b| a.mission_id.cmp(&b.mission_id));
499 rows
500 }
501
502 pub async fn instance_for_session(&self, session_id: &str) -> Option<AgentInstance> {
503 self.instances
504 .read()
505 .await
506 .values()
507 .find(|instance| instance.session_id == session_id)
508 .cloned()
509 }
510
511 pub async fn list_spawn_approvals(&self) -> Vec<PendingSpawnApproval> {
512 let mut rows = self
513 .spawn_approvals
514 .read()
515 .await
516 .values()
517 .cloned()
518 .collect::<Vec<_>>();
519 rows.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
520 rows
521 }
522
523 pub async fn ensure_loaded_for_workspace(&self, workspace_root: &str) -> anyhow::Result<()> {
524 let normalized = workspace_root.trim().to_string();
525 let already_loaded = self
526 .loaded_workspace
527 .read()
528 .await
529 .as_ref()
530 .map(|s| s == &normalized)
531 .unwrap_or(false);
532 if already_loaded {
533 return Ok(());
534 }
535
536 let root = PathBuf::from(&normalized);
537 let policy_path = root
538 .join(".tandem")
539 .join("agent-team")
540 .join("spawn-policy.yaml");
541 let templates_dir = root.join(".tandem").join("agent-team").join("templates");
542
543 let mut next_policy = None;
544 if policy_path.exists() {
545 let raw = fs::read_to_string(&policy_path)
546 .await
547 .with_context(|| format!("failed reading {}", policy_path.display()))?;
548 let parsed = serde_yaml::from_str::<SpawnPolicy>(&raw)
549 .with_context(|| format!("failed parsing {}", policy_path.display()))?;
550 next_policy = Some(parsed);
551 }
552
553 let mut next_templates = HashMap::new();
554 if templates_dir.exists() {
555 let mut entries = fs::read_dir(&templates_dir).await?;
556 while let Some(entry) = entries.next_entry().await? {
557 let path = entry.path();
558 if !path.is_file() {
559 continue;
560 }
561 let ext = path
562 .extension()
563 .and_then(|v| v.to_str())
564 .unwrap_or_default()
565 .to_ascii_lowercase();
566 if ext != "yaml" && ext != "yml" && ext != "json" {
567 continue;
568 }
569 let raw = fs::read_to_string(&path).await?;
570 let template = serde_yaml::from_str::<AgentTemplate>(&raw)
571 .with_context(|| format!("failed parsing {}", path.display()))?;
572 next_templates.insert(template.template_id.clone(), template);
573 }
574 }
575
576 *self.policy.write().await = next_policy;
577 *self.templates.write().await = next_templates;
578 *self.loaded_workspace.write().await = Some(normalized);
579 Ok(())
580 }
581
582 pub async fn spawn(&self, state: &AppState, req: SpawnRequest) -> SpawnResult {
583 self.spawn_with_approval_override(state, req, false).await
584 }
585
586 async fn spawn_with_approval_override(
587 &self,
588 state: &AppState,
589 mut req: SpawnRequest,
590 approval_override: bool,
591 ) -> SpawnResult {
592 let workspace_root = state.workspace_index.snapshot().await.root;
593 if let Err(err) = self.ensure_loaded_for_workspace(&workspace_root).await {
594 return SpawnResult {
595 decision: SpawnDecision {
596 allowed: false,
597 code: Some(SpawnDenyCode::SpawnPolicyMissing),
598 reason: Some(format!("spawn policy load failed: {}", err)),
599 requires_user_approval: false,
600 },
601 instance: None,
602 };
603 }
604
605 let Some(policy) = self.policy.read().await.clone() else {
606 return SpawnResult {
607 decision: SpawnDecision {
608 allowed: false,
609 code: Some(SpawnDenyCode::SpawnPolicyMissing),
610 reason: Some("spawn policy file missing".to_string()),
611 requires_user_approval: false,
612 },
613 instance: None,
614 };
615 };
616
617 let template = {
618 let templates = self.templates.read().await;
619 req.template_id
620 .as_deref()
621 .and_then(|template_id| templates.get(template_id).cloned())
622 };
623 if req.template_id.is_none() {
624 if let Some(found) = self
625 .templates
626 .read()
627 .await
628 .values()
629 .find(|t| t.role == req.role)
630 .cloned()
631 {
632 req.template_id = Some(found.template_id.clone());
633 }
634 }
635 let template = if template.is_some() {
636 template
637 } else {
638 let templates = self.templates.read().await;
639 req.template_id
640 .as_deref()
641 .and_then(|id| templates.get(id).cloned())
642 };
643
644 if req.parent_role.is_none() {
645 if let Some(parent_id) = req.parent_instance_id.as_deref() {
646 let instances = self.instances.read().await;
647 req.parent_role = instances
648 .get(parent_id)
649 .map(|instance| instance.role.clone());
650 }
651 }
652
653 let instances = self.instances.read().await;
654 let total_agents = instances.len();
655 let running_agents = instances
656 .values()
657 .filter(|instance| instance.status == AgentInstanceStatus::Running)
658 .count();
659 drop(instances);
660
661 let mut decision = policy.evaluate(&req, total_agents, running_agents, template.as_ref());
662 if approval_override
663 && !decision.allowed
664 && decision.requires_user_approval
665 && matches!(decision.code, Some(SpawnDenyCode::SpawnRequiresApproval))
666 {
667 decision = SpawnDecision {
668 allowed: true,
669 code: None,
670 reason: None,
671 requires_user_approval: false,
672 };
673 }
674 if !decision.allowed {
675 if decision.requires_user_approval && !approval_override {
676 self.queue_spawn_approval(&req, &decision).await;
677 }
678 return SpawnResult {
679 decision,
680 instance: None,
681 };
682 }
683
684 let mission_id = req
685 .mission_id
686 .clone()
687 .unwrap_or_else(|| "mission-default".to_string());
688
689 if let Some(reason) = self
690 .mission_budget_exceeded_reason(&policy, &mission_id)
691 .await
692 {
693 return SpawnResult {
694 decision: SpawnDecision {
695 allowed: false,
696 code: Some(SpawnDenyCode::SpawnMissionBudgetExceeded),
697 reason: Some(reason),
698 requires_user_approval: false,
699 },
700 instance: None,
701 };
702 }
703
704 let template = template.unwrap_or_else(|| AgentTemplate {
705 template_id: "default-template".to_string(),
706 role: req.role.clone(),
707 system_prompt: None,
708 skills: Vec::new(),
709 default_budget: BudgetLimit::default(),
710 capabilities: Default::default(),
711 });
712
713 let skill_hash = match compute_skill_hash(&workspace_root, &template, &policy).await {
714 Ok(hash) => hash,
715 Err(err) => {
716 let lowered = err.to_ascii_lowercase();
717 let code = if lowered.contains("pinned hash mismatch") {
718 SpawnDenyCode::SpawnSkillHashMismatch
719 } else if lowered.contains("skill source denied") {
720 SpawnDenyCode::SpawnSkillSourceDenied
721 } else {
722 SpawnDenyCode::SpawnRequiredSkillMissing
723 };
724 return SpawnResult {
725 decision: SpawnDecision {
726 allowed: false,
727 code: Some(code),
728 reason: Some(err),
729 requires_user_approval: false,
730 },
731 instance: None,
732 };
733 }
734 };
735
736 let parent_snapshot = {
737 let instances = self.instances.read().await;
738 req.parent_instance_id
739 .as_deref()
740 .and_then(|id| instances.get(id).cloned())
741 };
742 let parent_usage = if let Some(parent_id) = req.parent_instance_id.as_deref() {
743 self.budgets.read().await.get(parent_id).cloned()
744 } else {
745 None
746 };
747
748 let budget = resolve_budget(
749 &policy,
750 parent_snapshot,
751 parent_usage,
752 &template,
753 req.budget_override.clone(),
754 &req.role,
755 );
756
757 let mut session = Session::new(
758 Some(format!("Agent Team {}", template.template_id)),
759 Some(workspace_root.clone()),
760 );
761 session.workspace_root = Some(workspace_root.clone());
762 let session_id = session.id.clone();
763 if let Err(err) = state.storage.save_session(session).await {
764 return SpawnResult {
765 decision: SpawnDecision {
766 allowed: false,
767 code: Some(SpawnDenyCode::SpawnPolicyMissing),
768 reason: Some(format!("failed creating child session: {}", err)),
769 requires_user_approval: false,
770 },
771 instance: None,
772 };
773 }
774
775 let instance = AgentInstance {
776 instance_id: format!("ins_{}", Uuid::new_v4().simple()),
777 mission_id: mission_id.clone(),
778 parent_instance_id: req.parent_instance_id.clone(),
779 role: template.role.clone(),
780 template_id: template.template_id.clone(),
781 session_id: session_id.clone(),
782 run_id: None,
783 status: AgentInstanceStatus::Running,
784 budget,
785 skill_hash: skill_hash.clone(),
786 capabilities: template.capabilities.clone(),
787 metadata: Some(json!({
788 "source": req.source,
789 "justification": req.justification,
790 })),
791 };
792
793 self.instances
794 .write()
795 .await
796 .insert(instance.instance_id.clone(), instance.clone());
797 self.budgets.write().await.insert(
798 instance.instance_id.clone(),
799 InstanceBudgetState {
800 started_at: Some(Instant::now()),
801 ..InstanceBudgetState::default()
802 },
803 );
804 let _ = self.append_audit("spawn.approved", &instance).await;
805
806 SpawnResult {
807 decision: SpawnDecision {
808 allowed: true,
809 code: None,
810 reason: None,
811 requires_user_approval: false,
812 },
813 instance: Some(instance),
814 }
815 }
816
817 pub async fn approve_spawn_approval(
818 &self,
819 state: &AppState,
820 approval_id: &str,
821 reason: Option<&str>,
822 ) -> Option<SpawnResult> {
823 let approval = self.spawn_approvals.write().await.remove(approval_id)?;
824 let result = self
825 .spawn_with_approval_override(state, approval.request.clone(), true)
826 .await;
827 if let Some(instance) = result.instance.as_ref() {
828 let note = reason.unwrap_or("approved by operator");
829 let _ = self
830 .append_approval_audit("spawn.approval.approved", approval_id, Some(instance), note)
831 .await;
832 } else {
833 let note = reason.unwrap_or("approval replay failed policy checks");
834 let _ = self
835 .append_approval_audit("spawn.approval.rejected_on_replay", approval_id, None, note)
836 .await;
837 }
838 Some(result)
839 }
840
841 pub async fn deny_spawn_approval(
842 &self,
843 approval_id: &str,
844 reason: Option<&str>,
845 ) -> Option<PendingSpawnApproval> {
846 let approval = self.spawn_approvals.write().await.remove(approval_id)?;
847 let note = reason.unwrap_or("denied by operator");
848 let _ = self
849 .append_approval_audit("spawn.approval.denied", approval_id, None, note)
850 .await;
851 Some(approval)
852 }
853
854 pub async fn cancel_instance(
855 &self,
856 state: &AppState,
857 instance_id: &str,
858 reason: &str,
859 ) -> Option<AgentInstance> {
860 let mut instances = self.instances.write().await;
861 let instance = instances.get_mut(instance_id)?;
862 if matches!(
863 instance.status,
864 AgentInstanceStatus::Completed
865 | AgentInstanceStatus::Failed
866 | AgentInstanceStatus::Cancelled
867 ) {
868 return Some(instance.clone());
869 }
870 instance.status = AgentInstanceStatus::Cancelled;
871 let snapshot = instance.clone();
872 drop(instances);
873 let _ = state.cancellations.cancel(&snapshot.session_id).await;
874 let _ = self.append_audit("instance.cancelled", &snapshot).await;
875 emit_instance_cancelled(state, &snapshot, reason);
876 Some(snapshot)
877 }
878
879 async fn queue_spawn_approval(&self, req: &SpawnRequest, decision: &SpawnDecision) {
880 let approval = PendingSpawnApproval {
881 approval_id: format!("spawn_{}", Uuid::new_v4().simple()),
882 created_at_ms: crate::now_ms(),
883 request: req.clone(),
884 decision_code: decision.code.clone(),
885 reason: decision.reason.clone(),
886 };
887 self.spawn_approvals
888 .write()
889 .await
890 .insert(approval.approval_id.clone(), approval);
891 }
892
893 async fn mission_budget_exceeded_reason(
894 &self,
895 policy: &SpawnPolicy,
896 mission_id: &str,
897 ) -> Option<String> {
898 let limit = policy.mission_total_budget.as_ref()?;
899 let usage = self
900 .mission_budgets
901 .read()
902 .await
903 .get(mission_id)
904 .cloned()
905 .unwrap_or_default();
906 if let Some(max) = limit.max_tokens {
907 if usage.tokens_used >= max {
908 return Some(format!(
909 "mission max_tokens exhausted ({}/{})",
910 usage.tokens_used, max
911 ));
912 }
913 }
914 if let Some(max) = limit.max_steps {
915 if usage.steps_used >= u64::from(max) {
916 return Some(format!(
917 "mission max_steps exhausted ({}/{})",
918 usage.steps_used, max
919 ));
920 }
921 }
922 if let Some(max) = limit.max_tool_calls {
923 if usage.tool_calls_used >= u64::from(max) {
924 return Some(format!(
925 "mission max_tool_calls exhausted ({}/{})",
926 usage.tool_calls_used, max
927 ));
928 }
929 }
930 if let Some(max) = limit.max_cost_usd {
931 if usage.cost_used_usd >= max {
932 return Some(format!(
933 "mission max_cost_usd exhausted ({:.6}/{:.6})",
934 usage.cost_used_usd, max
935 ));
936 }
937 }
938 None
939 }
940
941 pub async fn cancel_mission(&self, state: &AppState, mission_id: &str, reason: &str) -> usize {
942 let instance_ids = self
943 .instances
944 .read()
945 .await
946 .values()
947 .filter(|instance| instance.mission_id == mission_id)
948 .map(|instance| instance.instance_id.clone())
949 .collect::<Vec<_>>();
950 let mut count = 0usize;
951 for instance_id in instance_ids {
952 if self
953 .cancel_instance(state, &instance_id, reason)
954 .await
955 .is_some()
956 {
957 count = count.saturating_add(1);
958 }
959 }
960 count
961 }
962
963 async fn mark_instance_terminal(
964 &self,
965 state: &AppState,
966 instance_id: &str,
967 status: AgentInstanceStatus,
968 ) -> Option<AgentInstance> {
969 let mut instances = self.instances.write().await;
970 let instance = instances.get_mut(instance_id)?;
971 if matches!(
972 instance.status,
973 AgentInstanceStatus::Completed
974 | AgentInstanceStatus::Failed
975 | AgentInstanceStatus::Cancelled
976 ) {
977 return Some(instance.clone());
978 }
979 instance.status = status.clone();
980 let snapshot = instance.clone();
981 drop(instances);
982 match status {
983 AgentInstanceStatus::Completed => emit_instance_completed(state, &snapshot),
984 AgentInstanceStatus::Failed => emit_instance_failed(state, &snapshot),
985 _ => {}
986 }
987 Some(snapshot)
988 }
989
990 pub async fn handle_engine_event(&self, state: &AppState, event: &EngineEvent) {
991 let Some(session_id) = extract_session_id(event) else {
992 return;
993 };
994 let Some(instance_id) = self.instance_id_for_session(&session_id).await else {
995 return;
996 };
997 if event.event_type == "provider.usage" {
998 let total_tokens = event
999 .properties
1000 .get("totalTokens")
1001 .and_then(|v| v.as_u64())
1002 .unwrap_or(0);
1003 let cost_used_usd = event
1004 .properties
1005 .get("costUsd")
1006 .and_then(|v| v.as_f64())
1007 .unwrap_or(0.0);
1008 if total_tokens > 0 {
1009 let exhausted = self
1010 .apply_exact_token_usage(state, &instance_id, total_tokens, cost_used_usd)
1011 .await;
1012 if exhausted {
1013 let _ = self
1014 .cancel_instance(state, &instance_id, "budget exhausted")
1015 .await;
1016 }
1017 }
1018 return;
1019 }
1020 let mut delta_tokens = 0u64;
1021 let mut delta_steps = 0u32;
1022 let mut delta_tool_calls = 0u32;
1023 if event.event_type == "message.part.updated" {
1024 if let Some(part) = event.properties.get("part") {
1025 let part_type = part.get("type").and_then(|v| v.as_str()).unwrap_or("");
1026 if part_type == "tool-invocation" {
1027 delta_tool_calls = 1;
1028 } else if part_type == "text" {
1029 let delta = event
1030 .properties
1031 .get("delta")
1032 .and_then(|v| v.as_str())
1033 .unwrap_or("");
1034 if !delta.is_empty() {
1035 delta_tokens = estimate_tokens(delta);
1036 }
1037 }
1038 }
1039 } else if event.event_type == "session.run.finished" {
1040 delta_steps = 1;
1041 let run_status = event
1042 .properties
1043 .get("status")
1044 .and_then(|v| v.as_str())
1045 .unwrap_or("")
1046 .to_ascii_lowercase();
1047 if run_status == "completed" {
1048 let _ = self
1049 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Completed)
1050 .await;
1051 } else if run_status == "failed" || run_status == "error" {
1052 let _ = self
1053 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Failed)
1054 .await;
1055 }
1056 }
1057 if delta_tokens == 0 && delta_steps == 0 && delta_tool_calls == 0 {
1058 return;
1059 }
1060 let exhausted = self
1061 .apply_budget_delta(
1062 state,
1063 &instance_id,
1064 delta_tokens,
1065 delta_steps,
1066 delta_tool_calls,
1067 )
1068 .await;
1069 if exhausted {
1070 let _ = self
1071 .cancel_instance(state, &instance_id, "budget exhausted")
1072 .await;
1073 }
1074 }
1075
1076 async fn instance_id_for_session(&self, session_id: &str) -> Option<String> {
1077 self.instances
1078 .read()
1079 .await
1080 .values()
1081 .find(|instance| instance.session_id == session_id)
1082 .map(|instance| instance.instance_id.clone())
1083 }
1084
1085 async fn apply_budget_delta(
1086 &self,
1087 state: &AppState,
1088 instance_id: &str,
1089 delta_tokens: u64,
1090 delta_steps: u32,
1091 delta_tool_calls: u32,
1092 ) -> bool {
1093 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1094 enabled: false,
1095 require_justification: false,
1096 max_agents: None,
1097 max_concurrent: None,
1098 child_budget_percent_of_parent_remaining: None,
1099 mission_total_budget: None,
1100 cost_per_1k_tokens_usd: None,
1101 spawn_edges: HashMap::new(),
1102 required_skills: HashMap::new(),
1103 role_defaults: HashMap::new(),
1104 skill_sources: Default::default(),
1105 });
1106 let mut budgets = self.budgets.write().await;
1107 let Some(usage) = budgets.get_mut(instance_id) else {
1108 return false;
1109 };
1110 if usage.exhausted {
1111 return true;
1112 }
1113 let prev_cost_used_usd = usage.cost_used_usd;
1114 usage.tokens_used = usage.tokens_used.saturating_add(delta_tokens);
1115 usage.steps_used = usage.steps_used.saturating_add(delta_steps);
1116 usage.tool_calls_used = usage.tool_calls_used.saturating_add(delta_tool_calls);
1117 if let Some(rate) = policy.cost_per_1k_tokens_usd {
1118 usage.cost_used_usd += (delta_tokens as f64 / 1000.0) * rate;
1119 }
1120 let elapsed_ms = usage
1121 .started_at
1122 .map(|started| started.elapsed().as_millis() as u64)
1123 .unwrap_or(0);
1124
1125 let mut exhausted_reason: Option<&'static str> = None;
1126 let mut snapshot: Option<AgentInstance> = None;
1127 {
1128 let mut instances = self.instances.write().await;
1129 if let Some(instance) = instances.get_mut(instance_id) {
1130 instance.metadata = Some(merge_metadata_usage(
1131 instance.metadata.take(),
1132 usage.tokens_used,
1133 usage.steps_used,
1134 usage.tool_calls_used,
1135 usage.cost_used_usd,
1136 elapsed_ms,
1137 ));
1138 if let Some(limit) = instance.budget.max_tokens {
1139 if usage.tokens_used >= limit {
1140 exhausted_reason = Some("max_tokens");
1141 }
1142 }
1143 if exhausted_reason.is_none() {
1144 if let Some(limit) = instance.budget.max_steps {
1145 if usage.steps_used >= limit {
1146 exhausted_reason = Some("max_steps");
1147 }
1148 }
1149 }
1150 if exhausted_reason.is_none() {
1151 if let Some(limit) = instance.budget.max_tool_calls {
1152 if usage.tool_calls_used >= limit {
1153 exhausted_reason = Some("max_tool_calls");
1154 }
1155 }
1156 }
1157 if exhausted_reason.is_none() {
1158 if let Some(limit) = instance.budget.max_duration_ms {
1159 if elapsed_ms >= limit {
1160 exhausted_reason = Some("max_duration_ms");
1161 }
1162 }
1163 }
1164 if exhausted_reason.is_none() {
1165 if let Some(limit) = instance.budget.max_cost_usd {
1166 if usage.cost_used_usd >= limit {
1167 exhausted_reason = Some("max_cost_usd");
1168 }
1169 }
1170 }
1171 snapshot = Some(instance.clone());
1172 }
1173 }
1174 let Some(instance) = snapshot else {
1175 return false;
1176 };
1177 emit_budget_usage(
1178 state,
1179 &instance,
1180 usage.tokens_used,
1181 usage.steps_used,
1182 usage.tool_calls_used,
1183 usage.cost_used_usd,
1184 elapsed_ms,
1185 );
1186 let mission_exhausted = self
1187 .apply_mission_budget_delta(
1188 state,
1189 &instance.mission_id,
1190 delta_tokens,
1191 u64::from(delta_steps),
1192 u64::from(delta_tool_calls),
1193 usage.cost_used_usd - prev_cost_used_usd,
1194 &policy,
1195 )
1196 .await;
1197 if mission_exhausted {
1198 usage.exhausted = true;
1199 let _ = self
1200 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1201 .await;
1202 return true;
1203 }
1204 if let Some(reason) = exhausted_reason {
1205 usage.exhausted = true;
1206 emit_budget_exhausted(
1207 state,
1208 &instance,
1209 reason,
1210 usage.tokens_used,
1211 usage.steps_used,
1212 usage.tool_calls_used,
1213 usage.cost_used_usd,
1214 elapsed_ms,
1215 );
1216 return true;
1217 }
1218 false
1219 }
1220
1221 async fn apply_exact_token_usage(
1222 &self,
1223 state: &AppState,
1224 instance_id: &str,
1225 total_tokens: u64,
1226 cost_used_usd: f64,
1227 ) -> bool {
1228 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1229 enabled: false,
1230 require_justification: false,
1231 max_agents: None,
1232 max_concurrent: None,
1233 child_budget_percent_of_parent_remaining: None,
1234 mission_total_budget: None,
1235 cost_per_1k_tokens_usd: None,
1236 spawn_edges: HashMap::new(),
1237 required_skills: HashMap::new(),
1238 role_defaults: HashMap::new(),
1239 skill_sources: Default::default(),
1240 });
1241 let mut budgets = self.budgets.write().await;
1242 let Some(usage) = budgets.get_mut(instance_id) else {
1243 return false;
1244 };
1245 if usage.exhausted {
1246 return true;
1247 }
1248 let prev_tokens = usage.tokens_used;
1249 let prev_cost_used_usd = usage.cost_used_usd;
1250 usage.tokens_used = usage.tokens_used.max(total_tokens);
1251 if cost_used_usd > 0.0 {
1252 usage.cost_used_usd = usage.cost_used_usd.max(cost_used_usd);
1253 } else if let Some(rate) = policy.cost_per_1k_tokens_usd {
1254 let delta = usage.tokens_used.saturating_sub(prev_tokens);
1255 usage.cost_used_usd += (delta as f64 / 1000.0) * rate;
1256 }
1257 let elapsed_ms = usage
1258 .started_at
1259 .map(|started| started.elapsed().as_millis() as u64)
1260 .unwrap_or(0);
1261 let mut exhausted_reason: Option<&'static str> = None;
1262 let mut snapshot: Option<AgentInstance> = None;
1263 {
1264 let mut instances = self.instances.write().await;
1265 if let Some(instance) = instances.get_mut(instance_id) {
1266 instance.metadata = Some(merge_metadata_usage(
1267 instance.metadata.take(),
1268 usage.tokens_used,
1269 usage.steps_used,
1270 usage.tool_calls_used,
1271 usage.cost_used_usd,
1272 elapsed_ms,
1273 ));
1274 if let Some(limit) = instance.budget.max_tokens {
1275 if usage.tokens_used >= limit {
1276 exhausted_reason = Some("max_tokens");
1277 }
1278 }
1279 if exhausted_reason.is_none() {
1280 if let Some(limit) = instance.budget.max_cost_usd {
1281 if usage.cost_used_usd >= limit {
1282 exhausted_reason = Some("max_cost_usd");
1283 }
1284 }
1285 }
1286 snapshot = Some(instance.clone());
1287 }
1288 }
1289 let Some(instance) = snapshot else {
1290 return false;
1291 };
1292 emit_budget_usage(
1293 state,
1294 &instance,
1295 usage.tokens_used,
1296 usage.steps_used,
1297 usage.tool_calls_used,
1298 usage.cost_used_usd,
1299 elapsed_ms,
1300 );
1301 let mission_exhausted = self
1302 .apply_mission_budget_delta(
1303 state,
1304 &instance.mission_id,
1305 usage.tokens_used.saturating_sub(prev_tokens),
1306 0,
1307 0,
1308 usage.cost_used_usd - prev_cost_used_usd,
1309 &policy,
1310 )
1311 .await;
1312 if mission_exhausted {
1313 usage.exhausted = true;
1314 let _ = self
1315 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1316 .await;
1317 return true;
1318 }
1319 if let Some(reason) = exhausted_reason {
1320 usage.exhausted = true;
1321 emit_budget_exhausted(
1322 state,
1323 &instance,
1324 reason,
1325 usage.tokens_used,
1326 usage.steps_used,
1327 usage.tool_calls_used,
1328 usage.cost_used_usd,
1329 elapsed_ms,
1330 );
1331 return true;
1332 }
1333 false
1334 }
1335
1336 async fn append_audit(&self, action: &str, instance: &AgentInstance) -> anyhow::Result<()> {
1337 let path = self.audit_path.read().await.clone();
1338 if let Some(parent) = path.parent() {
1339 fs::create_dir_all(parent).await?;
1340 }
1341 let row = json!({
1342 "action": action,
1343 "missionID": instance.mission_id,
1344 "instanceID": instance.instance_id,
1345 "parentInstanceID": instance.parent_instance_id,
1346 "role": instance.role,
1347 "templateID": instance.template_id,
1348 "sessionID": instance.session_id,
1349 "skillHash": instance.skill_hash,
1350 "timestampMs": crate::now_ms(),
1351 });
1352 let mut existing = if path.exists() {
1353 fs::read_to_string(&path).await.unwrap_or_default()
1354 } else {
1355 String::new()
1356 };
1357 existing.push_str(&serde_json::to_string(&row)?);
1358 existing.push('\n');
1359 fs::write(path, existing).await?;
1360 Ok(())
1361 }
1362
1363 async fn append_approval_audit(
1364 &self,
1365 action: &str,
1366 approval_id: &str,
1367 instance: Option<&AgentInstance>,
1368 reason: &str,
1369 ) -> anyhow::Result<()> {
1370 let path = self.audit_path.read().await.clone();
1371 if let Some(parent) = path.parent() {
1372 fs::create_dir_all(parent).await?;
1373 }
1374 let row = json!({
1375 "action": action,
1376 "approvalID": approval_id,
1377 "reason": reason,
1378 "missionID": instance.map(|v| v.mission_id.clone()),
1379 "instanceID": instance.map(|v| v.instance_id.clone()),
1380 "parentInstanceID": instance.and_then(|v| v.parent_instance_id.clone()),
1381 "role": instance.map(|v| v.role.clone()),
1382 "templateID": instance.map(|v| v.template_id.clone()),
1383 "sessionID": instance.map(|v| v.session_id.clone()),
1384 "skillHash": instance.map(|v| v.skill_hash.clone()),
1385 "timestampMs": crate::now_ms(),
1386 });
1387 let mut existing = if path.exists() {
1388 fs::read_to_string(&path).await.unwrap_or_default()
1389 } else {
1390 String::new()
1391 };
1392 existing.push_str(&serde_json::to_string(&row)?);
1393 existing.push('\n');
1394 fs::write(path, existing).await?;
1395 Ok(())
1396 }
1397
1398 #[allow(clippy::too_many_arguments)]
1399 async fn apply_mission_budget_delta(
1400 &self,
1401 state: &AppState,
1402 mission_id: &str,
1403 delta_tokens: u64,
1404 delta_steps: u64,
1405 delta_tool_calls: u64,
1406 delta_cost_used_usd: f64,
1407 policy: &SpawnPolicy,
1408 ) -> bool {
1409 let mut budgets = self.mission_budgets.write().await;
1410 let row = budgets.entry(mission_id.to_string()).or_default();
1411 row.tokens_used = row.tokens_used.saturating_add(delta_tokens);
1412 row.steps_used = row.steps_used.saturating_add(delta_steps);
1413 row.tool_calls_used = row.tool_calls_used.saturating_add(delta_tool_calls);
1414 row.cost_used_usd += delta_cost_used_usd.max(0.0);
1415 if row.exhausted {
1416 return true;
1417 }
1418 let Some(limit) = policy.mission_total_budget.as_ref() else {
1419 return false;
1420 };
1421 let mut exhausted_by: Option<&'static str> = None;
1422 if let Some(max) = limit.max_tokens {
1423 if row.tokens_used >= max {
1424 exhausted_by = Some("mission_max_tokens");
1425 }
1426 }
1427 if exhausted_by.is_none() {
1428 if let Some(max) = limit.max_steps {
1429 if row.steps_used >= u64::from(max) {
1430 exhausted_by = Some("mission_max_steps");
1431 }
1432 }
1433 }
1434 if exhausted_by.is_none() {
1435 if let Some(max) = limit.max_tool_calls {
1436 if row.tool_calls_used >= u64::from(max) {
1437 exhausted_by = Some("mission_max_tool_calls");
1438 }
1439 }
1440 }
1441 if exhausted_by.is_none() {
1442 if let Some(max) = limit.max_cost_usd {
1443 if row.cost_used_usd >= max {
1444 exhausted_by = Some("mission_max_cost_usd");
1445 }
1446 }
1447 }
1448 if let Some(exhausted_by) = exhausted_by {
1449 row.exhausted = true;
1450 emit_mission_budget_exhausted(
1451 state,
1452 mission_id,
1453 exhausted_by,
1454 row.tokens_used,
1455 row.steps_used,
1456 row.tool_calls_used,
1457 row.cost_used_usd,
1458 );
1459 return true;
1460 }
1461 false
1462 }
1463
1464 pub async fn set_for_test(
1465 &self,
1466 workspace_root: Option<String>,
1467 policy: Option<SpawnPolicy>,
1468 templates: Vec<AgentTemplate>,
1469 ) {
1470 *self.policy.write().await = policy;
1471 let mut by_id = HashMap::new();
1472 for template in templates {
1473 by_id.insert(template.template_id.clone(), template);
1474 }
1475 *self.templates.write().await = by_id;
1476 self.instances.write().await.clear();
1477 self.budgets.write().await.clear();
1478 self.mission_budgets.write().await.clear();
1479 self.spawn_approvals.write().await.clear();
1480 *self.loaded_workspace.write().await = workspace_root;
1481 }
1482}
1483
1484fn resolve_budget(
1485 policy: &SpawnPolicy,
1486 parent_instance: Option<AgentInstance>,
1487 parent_usage: Option<InstanceBudgetState>,
1488 template: &AgentTemplate,
1489 override_budget: Option<BudgetLimit>,
1490 role: &AgentRole,
1491) -> BudgetLimit {
1492 let role_default = policy.role_defaults.get(role).cloned().unwrap_or_default();
1493 let mut chosen = merge_budget(
1494 merge_budget(role_default, template.default_budget.clone()),
1495 override_budget.unwrap_or_default(),
1496 );
1497
1498 if let Some(parent) = parent_instance {
1499 let usage = parent_usage.unwrap_or_default();
1500 if let Some(pct) = policy.child_budget_percent_of_parent_remaining {
1501 if pct > 0 {
1502 chosen.max_tokens = cap_budget_remaining_u64(
1503 chosen.max_tokens,
1504 parent.budget.max_tokens,
1505 usage.tokens_used,
1506 pct,
1507 );
1508 chosen.max_steps = cap_budget_remaining_u32(
1509 chosen.max_steps,
1510 parent.budget.max_steps,
1511 usage.steps_used,
1512 pct,
1513 );
1514 chosen.max_tool_calls = cap_budget_remaining_u32(
1515 chosen.max_tool_calls,
1516 parent.budget.max_tool_calls,
1517 usage.tool_calls_used,
1518 pct,
1519 );
1520 chosen.max_duration_ms = cap_budget_remaining_u64(
1521 chosen.max_duration_ms,
1522 parent.budget.max_duration_ms,
1523 usage
1524 .started_at
1525 .map(|started| started.elapsed().as_millis() as u64)
1526 .unwrap_or(0),
1527 pct,
1528 );
1529 chosen.max_cost_usd = cap_budget_remaining_f64(
1530 chosen.max_cost_usd,
1531 parent.budget.max_cost_usd,
1532 usage.cost_used_usd,
1533 pct,
1534 );
1535 }
1536 }
1537 }
1538 chosen
1539}
1540
1541fn merge_budget(base: BudgetLimit, overlay: BudgetLimit) -> BudgetLimit {
1542 BudgetLimit {
1543 max_tokens: overlay.max_tokens.or(base.max_tokens),
1544 max_steps: overlay.max_steps.or(base.max_steps),
1545 max_tool_calls: overlay.max_tool_calls.or(base.max_tool_calls),
1546 max_duration_ms: overlay.max_duration_ms.or(base.max_duration_ms),
1547 max_cost_usd: overlay.max_cost_usd.or(base.max_cost_usd),
1548 }
1549}
1550
1551fn cap_budget_remaining_u64(
1552 child: Option<u64>,
1553 parent_limit: Option<u64>,
1554 parent_used: u64,
1555 pct: u8,
1556) -> Option<u64> {
1557 match (child, parent_limit) {
1558 (Some(child), Some(parent_limit)) => {
1559 let remaining = parent_limit.saturating_sub(parent_used);
1560 Some(child.min(remaining.saturating_mul(pct as u64) / 100))
1561 }
1562 (None, Some(parent_limit)) => {
1563 let remaining = parent_limit.saturating_sub(parent_used);
1564 Some(remaining.saturating_mul(pct as u64) / 100)
1565 }
1566 (Some(child), None) => Some(child),
1567 (None, None) => None,
1568 }
1569}
1570
1571fn cap_budget_remaining_u32(
1572 child: Option<u32>,
1573 parent_limit: Option<u32>,
1574 parent_used: u32,
1575 pct: u8,
1576) -> Option<u32> {
1577 match (child, parent_limit) {
1578 (Some(child), Some(parent_limit)) => {
1579 let remaining = parent_limit.saturating_sub(parent_used);
1580 Some(child.min(remaining.saturating_mul(pct as u32) / 100))
1581 }
1582 (None, Some(parent_limit)) => {
1583 let remaining = parent_limit.saturating_sub(parent_used);
1584 Some(remaining.saturating_mul(pct as u32) / 100)
1585 }
1586 (Some(child), None) => Some(child),
1587 (None, None) => None,
1588 }
1589}
1590
1591fn cap_budget_remaining_f64(
1592 child: Option<f64>,
1593 parent_limit: Option<f64>,
1594 parent_used: f64,
1595 pct: u8,
1596) -> Option<f64> {
1597 match (child, parent_limit) {
1598 (Some(child), Some(parent_limit)) => {
1599 let remaining = (parent_limit - parent_used).max(0.0);
1600 Some(child.min(remaining * f64::from(pct) / 100.0))
1601 }
1602 (None, Some(parent_limit)) => {
1603 let remaining = (parent_limit - parent_used).max(0.0);
1604 Some(remaining * f64::from(pct) / 100.0)
1605 }
1606 (Some(child), None) => Some(child),
1607 (None, None) => None,
1608 }
1609}
1610
1611async fn compute_skill_hash(
1612 workspace_root: &str,
1613 template: &AgentTemplate,
1614 policy: &SpawnPolicy,
1615) -> Result<String, String> {
1616 use sha2::{Digest, Sha256};
1617 let mut rows = Vec::new();
1618 let skill_service = SkillService::for_workspace(Some(PathBuf::from(workspace_root)));
1619 for skill in &template.skills {
1620 if let Some(path) = skill.path.as_deref() {
1621 validate_skill_source(skill.id.as_deref(), Some(path), policy)?;
1622 let skill_path = Path::new(workspace_root).join(path);
1623 let raw = fs::read_to_string(&skill_path)
1624 .await
1625 .map_err(|_| format!("missing required skill path `{}`", skill_path.display()))?;
1626 let digest = hash_hex(raw.as_bytes());
1627 validate_pinned_hash(skill.id.as_deref(), Some(path), &digest, policy)?;
1628 rows.push(format!("path:{}:{}", path, digest));
1629 } else if let Some(id) = skill.id.as_deref() {
1630 validate_skill_source(Some(id), None, policy)?;
1631 let loaded = skill_service
1632 .load_skill(id)
1633 .map_err(|err| format!("failed loading skill `{id}`: {err}"))?;
1634 let Some(loaded) = loaded else {
1635 return Err(format!("missing required skill id `{id}`"));
1636 };
1637 let digest = hash_hex(loaded.content.as_bytes());
1638 validate_pinned_hash(Some(id), None, &digest, policy)?;
1639 rows.push(format!("id:{}:{}", id, digest));
1640 }
1641 }
1642 rows.sort();
1643 let mut hasher = Sha256::new();
1644 for row in rows {
1645 hasher.update(row.as_bytes());
1646 hasher.update(b"\n");
1647 }
1648 let digest = hasher.finalize();
1649 Ok(format!("sha256:{}", hash_hex(digest.as_slice())))
1650}
1651
1652fn validate_skill_source(
1653 id: Option<&str>,
1654 path: Option<&str>,
1655 policy: &SpawnPolicy,
1656) -> Result<(), String> {
1657 use tandem_orchestrator::SkillSourceMode;
1658 match policy.skill_sources.mode {
1659 SkillSourceMode::Any => Ok(()),
1660 SkillSourceMode::ProjectOnly => {
1661 if id.is_some() {
1662 return Err("skill source denied: project_only forbids skill IDs".to_string());
1663 }
1664 let Some(path) = path else {
1665 return Err("skill source denied: project_only requires skill path".to_string());
1666 };
1667 let p = PathBuf::from(path);
1668 if p.is_absolute() {
1669 return Err("skill source denied: absolute skill paths are forbidden".to_string());
1670 }
1671 Ok(())
1672 }
1673 SkillSourceMode::Allowlist => {
1674 if let Some(id) = id {
1675 if policy.skill_sources.allowlist_ids.iter().any(|v| v == id) {
1676 return Ok(());
1677 }
1678 }
1679 if let Some(path) = path {
1680 if policy
1681 .skill_sources
1682 .allowlist_paths
1683 .iter()
1684 .any(|v| v == path)
1685 {
1686 return Ok(());
1687 }
1688 }
1689 Err("skill source denied: not present in allowlist".to_string())
1690 }
1691 }
1692}
1693
1694fn validate_pinned_hash(
1695 id: Option<&str>,
1696 path: Option<&str>,
1697 digest: &str,
1698 policy: &SpawnPolicy,
1699) -> Result<(), String> {
1700 let by_id = id.and_then(|id| policy.skill_sources.pinned_hashes.get(&format!("id:{id}")));
1701 let by_path = path.and_then(|path| {
1702 policy
1703 .skill_sources
1704 .pinned_hashes
1705 .get(&format!("path:{path}"))
1706 });
1707 let expected = by_id.or(by_path);
1708 if let Some(expected) = expected {
1709 let normalized = expected.strip_prefix("sha256:").unwrap_or(expected);
1710 if normalized != digest {
1711 return Err("pinned hash mismatch for skill reference".to_string());
1712 }
1713 }
1714 Ok(())
1715}
1716
1717fn hash_hex(bytes: &[u8]) -> String {
1718 let mut out = String::with_capacity(bytes.len() * 2);
1719 for byte in bytes {
1720 use std::fmt::Write as _;
1721 let _ = write!(&mut out, "{:02x}", byte);
1722 }
1723 out
1724}
1725
1726fn estimate_tokens(text: &str) -> u64 {
1727 let chars = text.chars().count() as u64;
1728 (chars / 4).max(1)
1729}
1730
1731fn extract_session_id(event: &EngineEvent) -> Option<String> {
1732 event
1733 .properties
1734 .get("sessionID")
1735 .and_then(|v| v.as_str())
1736 .map(|v| v.to_string())
1737 .or_else(|| {
1738 event
1739 .properties
1740 .get("part")
1741 .and_then(|v| v.get("sessionID"))
1742 .and_then(|v| v.as_str())
1743 .map(|v| v.to_string())
1744 })
1745}
1746
1747fn merge_metadata_usage(
1748 metadata: Option<Value>,
1749 tokens_used: u64,
1750 steps_used: u32,
1751 tool_calls_used: u32,
1752 cost_used_usd: f64,
1753 elapsed_ms: u64,
1754) -> Value {
1755 let mut base = metadata
1756 .and_then(|v| v.as_object().cloned())
1757 .unwrap_or_default();
1758 base.insert(
1759 "budgetUsage".to_string(),
1760 json!({
1761 "tokensUsed": tokens_used,
1762 "stepsUsed": steps_used,
1763 "toolCallsUsed": tool_calls_used,
1764 "costUsedUsd": cost_used_usd,
1765 "elapsedMs": elapsed_ms
1766 }),
1767 );
1768 Value::Object(base)
1769}
1770
1771fn normalize_tool_name(name: &str) -> String {
1772 match name.trim().to_lowercase().replace('-', "_").as_str() {
1773 "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
1774 other => other.to_string(),
1775 }
1776}
1777
1778async fn evaluate_capability_deny(
1779 state: &AppState,
1780 instance: &AgentInstance,
1781 tool: &str,
1782 args: &Value,
1783 caps: &tandem_orchestrator::CapabilitySpec,
1784 session_id: &str,
1785 message_id: &str,
1786) -> Option<String> {
1787 let deny_patterns = caps
1788 .tool_denylist
1789 .iter()
1790 .map(|name| normalize_tool_name(name))
1791 .collect::<Vec<_>>();
1792 if !deny_patterns.is_empty() && any_policy_matches(&deny_patterns, tool) {
1793 return Some(format!("tool `{tool}` denied by agent capability policy"));
1794 }
1795 let allow_patterns = caps
1796 .tool_allowlist
1797 .iter()
1798 .map(|name| normalize_tool_name(name))
1799 .collect::<Vec<_>>();
1800 if !allow_patterns.is_empty() && !any_policy_matches(&allow_patterns, tool) {
1801 return Some(format!("tool `{tool}` not in agent allowlist"));
1802 }
1803
1804 let browser_execution_tool = matches!(
1805 tool,
1806 "browser_open"
1807 | "browser_navigate"
1808 | "browser_snapshot"
1809 | "browser_click"
1810 | "browser_type"
1811 | "browser_press"
1812 | "browser_wait"
1813 | "browser_extract"
1814 | "browser_screenshot"
1815 | "browser_close"
1816 );
1817
1818 if matches!(
1819 tool,
1820 "websearch" | "webfetch" | "webfetch_html" | "browser_open" | "browser_navigate"
1821 ) || browser_execution_tool
1822 {
1823 if !caps.net_scopes.enabled {
1824 return Some("network disabled for this agent instance".to_string());
1825 }
1826 if !caps.net_scopes.allow_hosts.is_empty() {
1827 if tool == "websearch" {
1828 return Some(
1829 "websearch blocked: host allowlist cannot be verified for search tool"
1830 .to_string(),
1831 );
1832 }
1833 if let Some(host) = extract_url_host(args) {
1834 let allowed = caps.net_scopes.allow_hosts.iter().any(|h| {
1835 let allowed = h.trim().to_ascii_lowercase();
1836 !allowed.is_empty()
1837 && (host == allowed || host.ends_with(&format!(".{allowed}")))
1838 });
1839 if !allowed {
1840 return Some(format!("network host `{host}` not in allow_hosts"));
1841 }
1842 }
1843 }
1844 }
1845
1846 if tool == "bash" {
1847 let cmd = args
1848 .get("command")
1849 .and_then(|v| v.as_str())
1850 .unwrap_or("")
1851 .to_ascii_lowercase();
1852 if cmd.contains("git push") {
1853 if !caps.git_caps.push {
1854 return Some("git push disabled for this agent instance".to_string());
1855 }
1856 if caps.git_caps.push_requires_approval {
1857 let action = state.permissions.evaluate("git_push", "git_push").await;
1858 match action {
1859 tandem_core::PermissionAction::Allow => {}
1860 tandem_core::PermissionAction::Deny => {
1861 return Some("git push denied by policy rule".to_string());
1862 }
1863 tandem_core::PermissionAction::Ask => {
1864 let pending = state
1865 .permissions
1866 .ask_for_session_with_context(
1867 Some(session_id),
1868 "git_push",
1869 args.clone(),
1870 Some(tandem_core::PermissionArgsContext {
1871 args_source: "agent_team.git_push".to_string(),
1872 args_integrity: "runtime-checked".to_string(),
1873 query: Some(format!(
1874 "instanceID={} messageID={}",
1875 instance.instance_id, message_id
1876 )),
1877 }),
1878 )
1879 .await;
1880 return Some(format!(
1881 "git push requires explicit user approval (approvalID={})",
1882 pending.id
1883 ));
1884 }
1885 }
1886 }
1887 }
1888 if cmd.contains("git commit") && !caps.git_caps.commit {
1889 return Some("git commit disabled for this agent instance".to_string());
1890 }
1891 }
1892
1893 let access_kind = tool_fs_access_kind(tool);
1894 if let Some(kind) = access_kind {
1895 let Some(session) = state.storage.get_session(session_id).await else {
1896 return Some("session not found for capability evaluation".to_string());
1897 };
1898 let Some(root) = session.workspace_root.clone() else {
1899 return Some("workspace root missing for capability evaluation".to_string());
1900 };
1901 let requested = extract_tool_candidate_paths(tool, args);
1902 if !requested.is_empty() {
1903 let allowed_scopes = if kind == "read" {
1904 &caps.fs_scopes.read
1905 } else {
1906 &caps.fs_scopes.write
1907 };
1908 if allowed_scopes.is_empty() {
1909 return Some(format!("fs {kind} access blocked: no scopes configured"));
1910 }
1911 for candidate in requested {
1912 if !is_path_allowed_by_scopes(&root, &candidate, allowed_scopes) {
1913 return Some(format!("fs {kind} access denied for path `{}`", candidate));
1914 }
1915 }
1916 }
1917 }
1918
1919 denied_secrets_reason(tool, caps, args)
1920}
1921
1922fn denied_secrets_reason(
1923 tool: &str,
1924 caps: &tandem_orchestrator::CapabilitySpec,
1925 args: &Value,
1926) -> Option<String> {
1927 if tool == "auth" {
1928 if caps.secrets_scopes.is_empty() {
1929 return Some("secrets are disabled for this agent instance".to_string());
1930 }
1931 let alias = args
1932 .get("id")
1933 .or_else(|| args.get("provider"))
1934 .or_else(|| args.get("providerID"))
1935 .and_then(|v| v.as_str())
1936 .unwrap_or("")
1937 .trim();
1938 if !alias.is_empty() && !caps.secrets_scopes.iter().any(|allowed| allowed == alias) {
1939 return Some(format!(
1940 "secret alias `{alias}` is not in agent secretsScopes allowlist"
1941 ));
1942 }
1943 }
1944 None
1945}
1946
1947fn tool_fs_access_kind(tool: &str) -> Option<&'static str> {
1948 match tool {
1949 "read" | "glob" | "grep" | "codesearch" | "lsp" => Some("read"),
1950 "write" | "edit" | "apply_patch" => Some("write"),
1951 _ => None,
1952 }
1953}
1954
1955fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
1956 let Some(obj) = args.as_object() else {
1957 return Vec::new();
1958 };
1959 let keys: &[&str] = match tool {
1960 "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
1961 "glob" => &["pattern"],
1962 "lsp" => &["filePath", "path"],
1963 "bash" => &["cwd"],
1964 "apply_patch" => &["path"],
1965 _ => &["path", "cwd"],
1966 };
1967 keys.iter()
1968 .filter_map(|key| obj.get(*key))
1969 .filter_map(|value| value.as_str())
1970 .filter(|s| !s.trim().is_empty())
1971 .map(|raw| strip_glob_tokens(raw).to_string())
1972 .collect()
1973}
1974
1975fn strip_glob_tokens(path: &str) -> &str {
1976 let mut end = path.len();
1977 for (idx, ch) in path.char_indices() {
1978 if ch == '*' || ch == '?' || ch == '[' {
1979 end = idx;
1980 break;
1981 }
1982 }
1983 &path[..end]
1984}
1985
1986fn is_path_allowed_by_scopes(root: &str, candidate: &str, scopes: &[String]) -> bool {
1987 let root_path = PathBuf::from(root);
1988 let candidate_path = resolve_path(&root_path, candidate);
1989 scopes.iter().any(|scope| {
1990 let scope_path = resolve_path(&root_path, scope);
1991 candidate_path.starts_with(scope_path)
1992 })
1993}
1994
1995fn resolve_path(root: &Path, raw: &str) -> PathBuf {
1996 let raw = raw.trim();
1997 if raw.is_empty() {
1998 return root.to_path_buf();
1999 }
2000 let path = PathBuf::from(raw);
2001 if path.is_absolute() {
2002 path
2003 } else {
2004 root.join(path)
2005 }
2006}
2007
2008fn extract_url_host(args: &Value) -> Option<String> {
2009 let url = args
2010 .get("url")
2011 .or_else(|| args.get("uri"))
2012 .or_else(|| args.get("link"))
2013 .and_then(|v| v.as_str())?;
2014 let raw = url.trim();
2015 let (_, after_scheme) = raw.split_once("://")?;
2016 let host_port = after_scheme.split('/').next().unwrap_or_default();
2017 let host = host_port.split('@').next_back().unwrap_or_default();
2018 let host = host
2019 .split(':')
2020 .next()
2021 .unwrap_or_default()
2022 .to_ascii_lowercase();
2023 if host.is_empty() {
2024 None
2025 } else {
2026 Some(host)
2027 }
2028}
2029
2030pub fn emit_spawn_requested(state: &AppState, req: &SpawnRequest) {
2031 emit_spawn_requested_with_context(state, req, &SpawnEventContext::default());
2032}
2033
2034pub fn emit_spawn_denied(state: &AppState, req: &SpawnRequest, decision: &SpawnDecision) {
2035 emit_spawn_denied_with_context(state, req, decision, &SpawnEventContext::default());
2036}
2037
2038pub fn emit_spawn_approved(state: &AppState, req: &SpawnRequest, instance: &AgentInstance) {
2039 emit_spawn_approved_with_context(state, req, instance, &SpawnEventContext::default());
2040}
2041
2042#[derive(Default)]
2043pub struct SpawnEventContext<'a> {
2044 pub session_id: Option<&'a str>,
2045 pub message_id: Option<&'a str>,
2046 pub run_id: Option<&'a str>,
2047}
2048
2049pub fn emit_spawn_requested_with_context(
2050 state: &AppState,
2051 req: &SpawnRequest,
2052 ctx: &SpawnEventContext<'_>,
2053) {
2054 state.event_bus.publish(EngineEvent::new(
2055 "agent_team.spawn.requested",
2056 json!({
2057 "sessionID": ctx.session_id,
2058 "messageID": ctx.message_id,
2059 "runID": ctx.run_id,
2060 "missionID": req.mission_id,
2061 "instanceID": Value::Null,
2062 "parentInstanceID": req.parent_instance_id,
2063 "source": req.source,
2064 "requestedRole": req.role,
2065 "templateID": req.template_id,
2066 "justification": req.justification,
2067 "timestampMs": crate::now_ms(),
2068 }),
2069 ));
2070}
2071
2072pub fn emit_spawn_denied_with_context(
2073 state: &AppState,
2074 req: &SpawnRequest,
2075 decision: &SpawnDecision,
2076 ctx: &SpawnEventContext<'_>,
2077) {
2078 state.event_bus.publish(EngineEvent::new(
2079 "agent_team.spawn.denied",
2080 json!({
2081 "sessionID": ctx.session_id,
2082 "messageID": ctx.message_id,
2083 "runID": ctx.run_id,
2084 "missionID": req.mission_id,
2085 "instanceID": Value::Null,
2086 "parentInstanceID": req.parent_instance_id,
2087 "source": req.source,
2088 "requestedRole": req.role,
2089 "templateID": req.template_id,
2090 "code": decision.code,
2091 "error": decision.reason,
2092 "timestampMs": crate::now_ms(),
2093 }),
2094 ));
2095}
2096
2097pub fn emit_spawn_approved_with_context(
2098 state: &AppState,
2099 req: &SpawnRequest,
2100 instance: &AgentInstance,
2101 ctx: &SpawnEventContext<'_>,
2102) {
2103 state.event_bus.publish(EngineEvent::new(
2104 "agent_team.spawn.approved",
2105 json!({
2106 "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2107 "messageID": ctx.message_id,
2108 "runID": ctx.run_id.or(instance.run_id.as_deref()),
2109 "missionID": instance.mission_id,
2110 "instanceID": instance.instance_id,
2111 "parentInstanceID": instance.parent_instance_id,
2112 "source": req.source,
2113 "requestedRole": req.role,
2114 "templateID": instance.template_id,
2115 "skillHash": instance.skill_hash,
2116 "timestampMs": crate::now_ms(),
2117 }),
2118 ));
2119 state.event_bus.publish(EngineEvent::new(
2120 "agent_team.instance.started",
2121 json!({
2122 "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2123 "messageID": ctx.message_id,
2124 "runID": ctx.run_id.or(instance.run_id.as_deref()),
2125 "missionID": instance.mission_id,
2126 "instanceID": instance.instance_id,
2127 "parentInstanceID": instance.parent_instance_id,
2128 "role": instance.role,
2129 "status": instance.status,
2130 "budgetLimit": instance.budget,
2131 "skillHash": instance.skill_hash,
2132 "timestampMs": crate::now_ms(),
2133 }),
2134 ));
2135}
2136
2137pub fn emit_budget_usage(
2138 state: &AppState,
2139 instance: &AgentInstance,
2140 tokens_used: u64,
2141 steps_used: u32,
2142 tool_calls_used: u32,
2143 cost_used_usd: f64,
2144 elapsed_ms: u64,
2145) {
2146 state.event_bus.publish(EngineEvent::new(
2147 "agent_team.budget.usage",
2148 json!({
2149 "sessionID": instance.session_id,
2150 "messageID": Value::Null,
2151 "runID": instance.run_id,
2152 "missionID": instance.mission_id,
2153 "instanceID": instance.instance_id,
2154 "tokensUsed": tokens_used,
2155 "stepsUsed": steps_used,
2156 "toolCallsUsed": tool_calls_used,
2157 "costUsedUsd": cost_used_usd,
2158 "elapsedMs": elapsed_ms,
2159 "timestampMs": crate::now_ms(),
2160 }),
2161 ));
2162}
2163
2164#[allow(clippy::too_many_arguments)]
2165pub fn emit_budget_exhausted(
2166 state: &AppState,
2167 instance: &AgentInstance,
2168 exhausted_by: &str,
2169 tokens_used: u64,
2170 steps_used: u32,
2171 tool_calls_used: u32,
2172 cost_used_usd: f64,
2173 elapsed_ms: u64,
2174) {
2175 state.event_bus.publish(EngineEvent::new(
2176 "agent_team.budget.exhausted",
2177 json!({
2178 "sessionID": instance.session_id,
2179 "messageID": Value::Null,
2180 "runID": instance.run_id,
2181 "missionID": instance.mission_id,
2182 "instanceID": instance.instance_id,
2183 "exhaustedBy": exhausted_by,
2184 "tokensUsed": tokens_used,
2185 "stepsUsed": steps_used,
2186 "toolCallsUsed": tool_calls_used,
2187 "costUsedUsd": cost_used_usd,
2188 "elapsedMs": elapsed_ms,
2189 "timestampMs": crate::now_ms(),
2190 }),
2191 ));
2192}
2193
2194pub fn emit_instance_cancelled(state: &AppState, instance: &AgentInstance, reason: &str) {
2195 state.event_bus.publish(EngineEvent::new(
2196 "agent_team.instance.cancelled",
2197 json!({
2198 "sessionID": instance.session_id,
2199 "messageID": Value::Null,
2200 "runID": instance.run_id,
2201 "missionID": instance.mission_id,
2202 "instanceID": instance.instance_id,
2203 "parentInstanceID": instance.parent_instance_id,
2204 "role": instance.role,
2205 "status": instance.status,
2206 "reason": reason,
2207 "timestampMs": crate::now_ms(),
2208 }),
2209 ));
2210}
2211
2212pub fn emit_instance_completed(state: &AppState, instance: &AgentInstance) {
2213 state.event_bus.publish(EngineEvent::new(
2214 "agent_team.instance.completed",
2215 json!({
2216 "sessionID": instance.session_id,
2217 "messageID": Value::Null,
2218 "runID": instance.run_id,
2219 "missionID": instance.mission_id,
2220 "instanceID": instance.instance_id,
2221 "parentInstanceID": instance.parent_instance_id,
2222 "role": instance.role,
2223 "status": instance.status,
2224 "timestampMs": crate::now_ms(),
2225 }),
2226 ));
2227}
2228
2229pub fn emit_instance_failed(state: &AppState, instance: &AgentInstance) {
2230 state.event_bus.publish(EngineEvent::new(
2231 "agent_team.instance.failed",
2232 json!({
2233 "sessionID": instance.session_id,
2234 "messageID": Value::Null,
2235 "runID": instance.run_id,
2236 "missionID": instance.mission_id,
2237 "instanceID": instance.instance_id,
2238 "parentInstanceID": instance.parent_instance_id,
2239 "role": instance.role,
2240 "status": instance.status,
2241 "timestampMs": crate::now_ms(),
2242 }),
2243 ));
2244}
2245
2246pub fn emit_mission_budget_exhausted(
2247 state: &AppState,
2248 mission_id: &str,
2249 exhausted_by: &str,
2250 tokens_used: u64,
2251 steps_used: u64,
2252 tool_calls_used: u64,
2253 cost_used_usd: f64,
2254) {
2255 state.event_bus.publish(EngineEvent::new(
2256 "agent_team.mission.budget.exhausted",
2257 json!({
2258 "sessionID": Value::Null,
2259 "messageID": Value::Null,
2260 "runID": Value::Null,
2261 "missionID": mission_id,
2262 "instanceID": Value::Null,
2263 "exhaustedBy": exhausted_by,
2264 "tokensUsed": tokens_used,
2265 "stepsUsed": steps_used,
2266 "toolCallsUsed": tool_calls_used,
2267 "costUsedUsd": cost_used_usd,
2268 "timestampMs": crate::now_ms(),
2269 }),
2270 ));
2271}