1use std::collections::HashMap;
2use std::path::{Path, PathBuf};
3use std::sync::Arc;
4use std::time::Instant;
5
6use anyhow::Context;
7use futures::future::BoxFuture;
8use serde::Deserialize;
9use serde::Serialize;
10use serde_json::{json, Value};
11use tandem_core::{
12 any_policy_matches, SpawnAgentHook, SpawnAgentToolContext, SpawnAgentToolResult,
13 ToolPolicyContext, ToolPolicyDecision, ToolPolicyHook,
14};
15use tandem_orchestrator::{
16 AgentInstance, AgentInstanceStatus, AgentRole, AgentTemplate, BudgetLimit, SpawnDecision,
17 SpawnDenyCode, SpawnPolicy, SpawnRequest, SpawnSource,
18};
19use tandem_skills::SkillService;
20use tandem_types::{EngineEvent, Session};
21use tokio::fs;
22use tokio::sync::RwLock;
23use uuid::Uuid;
24
25use crate::AppState;
26
27#[derive(Clone, Default)]
28pub struct AgentTeamRuntime {
29 policy: Arc<RwLock<Option<SpawnPolicy>>>,
30 templates: Arc<RwLock<HashMap<String, AgentTemplate>>>,
31 instances: Arc<RwLock<HashMap<String, AgentInstance>>>,
32 budgets: Arc<RwLock<HashMap<String, InstanceBudgetState>>>,
33 mission_budgets: Arc<RwLock<HashMap<String, MissionBudgetState>>>,
34 spawn_approvals: Arc<RwLock<HashMap<String, PendingSpawnApproval>>>,
35 loaded_workspace: Arc<RwLock<Option<String>>>,
36 audit_path: Arc<RwLock<PathBuf>>,
37}
38
39#[derive(Debug, Clone)]
40pub struct SpawnResult {
41 pub decision: SpawnDecision,
42 pub instance: Option<AgentInstance>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46pub struct AgentMissionSummary {
47 #[serde(rename = "missionID")]
48 pub mission_id: String,
49 #[serde(rename = "instanceCount")]
50 pub instance_count: usize,
51 #[serde(rename = "runningCount")]
52 pub running_count: usize,
53 #[serde(rename = "completedCount")]
54 pub completed_count: usize,
55 #[serde(rename = "failedCount")]
56 pub failed_count: usize,
57 #[serde(rename = "cancelledCount")]
58 pub cancelled_count: usize,
59 #[serde(rename = "queuedCount")]
60 pub queued_count: usize,
61 #[serde(rename = "tokenUsedTotal")]
62 pub token_used_total: u64,
63 #[serde(rename = "toolCallsUsedTotal")]
64 pub tool_calls_used_total: u64,
65 #[serde(rename = "stepsUsedTotal")]
66 pub steps_used_total: u64,
67 #[serde(rename = "costUsedUsdTotal")]
68 pub cost_used_usd_total: f64,
69}
70
71#[derive(Debug, Clone, Default)]
72struct InstanceBudgetState {
73 tokens_used: u64,
74 steps_used: u32,
75 tool_calls_used: u32,
76 cost_used_usd: f64,
77 started_at: Option<Instant>,
78 exhausted: bool,
79}
80
81#[derive(Debug, Clone, Default)]
82struct MissionBudgetState {
83 tokens_used: u64,
84 steps_used: u64,
85 tool_calls_used: u64,
86 cost_used_usd: f64,
87 exhausted: bool,
88}
89
90#[derive(Debug, Clone, Serialize)]
91pub struct PendingSpawnApproval {
92 #[serde(rename = "approvalID")]
93 pub approval_id: String,
94 #[serde(rename = "createdAtMs")]
95 pub created_at_ms: u64,
96 pub request: SpawnRequest,
97 #[serde(rename = "decisionCode")]
98 pub decision_code: Option<SpawnDenyCode>,
99 pub reason: Option<String>,
100}
101
102#[derive(Clone)]
103pub struct ServerSpawnAgentHook {
104 state: AppState,
105}
106
107#[derive(Debug, Deserialize)]
108struct SpawnAgentToolInput {
109 #[serde(rename = "missionID")]
110 mission_id: Option<String>,
111 #[serde(rename = "parentInstanceID")]
112 parent_instance_id: Option<String>,
113 #[serde(rename = "templateID")]
114 template_id: Option<String>,
115 role: AgentRole,
116 source: Option<SpawnSource>,
117 justification: String,
118 #[serde(rename = "budgetOverride", default)]
119 budget_override: Option<BudgetLimit>,
120}
121
122impl ServerSpawnAgentHook {
123 pub fn new(state: AppState) -> Self {
124 Self { state }
125 }
126}
127
128impl SpawnAgentHook for ServerSpawnAgentHook {
129 fn spawn_agent(
130 &self,
131 ctx: SpawnAgentToolContext,
132 ) -> BoxFuture<'static, anyhow::Result<SpawnAgentToolResult>> {
133 let state = self.state.clone();
134 Box::pin(async move {
135 let parsed = serde_json::from_value::<SpawnAgentToolInput>(ctx.args.clone());
136 let input = match parsed {
137 Ok(input) => input,
138 Err(err) => {
139 return Ok(SpawnAgentToolResult {
140 output: format!("spawn_agent denied: invalid args ({err})"),
141 metadata: json!({
142 "ok": false,
143 "code": "SPAWN_INVALID_ARGS",
144 "error": err.to_string(),
145 }),
146 });
147 }
148 };
149 let req = SpawnRequest {
150 mission_id: input.mission_id,
151 parent_instance_id: input.parent_instance_id,
152 source: input.source.unwrap_or(SpawnSource::ToolCall),
153 parent_role: None,
154 role: input.role,
155 template_id: input.template_id,
156 justification: input.justification,
157 budget_override: input.budget_override,
158 };
159
160 let event_ctx = SpawnEventContext {
161 session_id: Some(ctx.session_id.as_str()),
162 message_id: Some(ctx.message_id.as_str()),
163 run_id: None,
164 };
165 emit_spawn_requested_with_context(&state, &req, &event_ctx);
166 let result = state.agent_teams.spawn(&state, req.clone()).await;
167 if !result.decision.allowed || result.instance.is_none() {
168 emit_spawn_denied_with_context(&state, &req, &result.decision, &event_ctx);
169 return Ok(SpawnAgentToolResult {
170 output: result
171 .decision
172 .reason
173 .clone()
174 .unwrap_or_else(|| "spawn_agent denied".to_string()),
175 metadata: json!({
176 "ok": false,
177 "code": result.decision.code,
178 "error": result.decision.reason,
179 "requiresUserApproval": result.decision.requires_user_approval,
180 }),
181 });
182 }
183 let instance = result.instance.expect("checked is_some");
184 emit_spawn_approved_with_context(&state, &req, &instance, &event_ctx);
185 Ok(SpawnAgentToolResult {
186 output: format!(
187 "spawned {} as instance {} (session {})",
188 instance.template_id, instance.instance_id, instance.session_id
189 ),
190 metadata: json!({
191 "ok": true,
192 "missionID": instance.mission_id,
193 "instanceID": instance.instance_id,
194 "sessionID": instance.session_id,
195 "runID": instance.run_id,
196 "status": instance.status,
197 "skillHash": instance.skill_hash,
198 }),
199 })
200 })
201 }
202}
203
204#[derive(Clone)]
205pub struct ServerToolPolicyHook {
206 state: AppState,
207}
208
209impl ServerToolPolicyHook {
210 pub fn new(state: AppState) -> Self {
211 Self { state }
212 }
213}
214
215impl ToolPolicyHook for ServerToolPolicyHook {
216 fn evaluate_tool(
217 &self,
218 ctx: ToolPolicyContext,
219 ) -> BoxFuture<'static, anyhow::Result<ToolPolicyDecision>> {
220 let state = self.state.clone();
221 Box::pin(async move {
222 let tool = normalize_tool_name(&ctx.tool);
223 if let Some(policy) = state.routine_session_policy(&ctx.session_id).await {
224 let allowed_patterns = policy
225 .allowed_tools
226 .iter()
227 .map(|name| normalize_tool_name(name))
228 .collect::<Vec<_>>();
229 if !policy.allowed_tools.is_empty() && !any_policy_matches(&allowed_patterns, &tool)
230 {
231 let reason = format!(
232 "tool `{}` is not allowed for routine `{}` (run `{}`)",
233 tool, policy.routine_id, policy.run_id
234 );
235 state.event_bus.publish(EngineEvent::new(
236 "routine.tool.denied",
237 json!({
238 "sessionID": ctx.session_id,
239 "messageID": ctx.message_id,
240 "runID": policy.run_id,
241 "routineID": policy.routine_id,
242 "tool": tool,
243 "reason": reason,
244 "timestampMs": crate::now_ms(),
245 }),
246 ));
247 return Ok(ToolPolicyDecision {
248 allowed: false,
249 reason: Some(reason),
250 });
251 }
252 }
253
254 let Some(instance) = state
255 .agent_teams
256 .instance_for_session(&ctx.session_id)
257 .await
258 else {
259 return Ok(ToolPolicyDecision {
260 allowed: true,
261 reason: None,
262 });
263 };
264 let caps = instance.capabilities.clone();
265 let deny = evaluate_capability_deny(
266 &state,
267 &instance,
268 &tool,
269 &ctx.args,
270 &caps,
271 &ctx.session_id,
272 &ctx.message_id,
273 )
274 .await;
275 if let Some(reason) = deny {
276 state.event_bus.publish(EngineEvent::new(
277 "agent_team.capability.denied",
278 json!({
279 "sessionID": ctx.session_id,
280 "messageID": ctx.message_id,
281 "runID": instance.run_id,
282 "missionID": instance.mission_id,
283 "instanceID": instance.instance_id,
284 "tool": tool,
285 "reason": reason,
286 "timestampMs": crate::now_ms(),
287 }),
288 ));
289 return Ok(ToolPolicyDecision {
290 allowed: false,
291 reason: Some(reason),
292 });
293 }
294 Ok(ToolPolicyDecision {
295 allowed: true,
296 reason: None,
297 })
298 })
299 }
300}
301
302impl AgentTeamRuntime {
303 pub fn new(audit_path: PathBuf) -> Self {
304 Self {
305 policy: Arc::new(RwLock::new(None)),
306 templates: Arc::new(RwLock::new(HashMap::new())),
307 instances: Arc::new(RwLock::new(HashMap::new())),
308 budgets: Arc::new(RwLock::new(HashMap::new())),
309 mission_budgets: Arc::new(RwLock::new(HashMap::new())),
310 spawn_approvals: Arc::new(RwLock::new(HashMap::new())),
311 loaded_workspace: Arc::new(RwLock::new(None)),
312 audit_path: Arc::new(RwLock::new(audit_path)),
313 }
314 }
315
316 pub async fn set_audit_path(&self, path: PathBuf) {
317 *self.audit_path.write().await = path;
318 }
319
320 pub async fn list_templates(&self) -> Vec<AgentTemplate> {
321 let mut rows = self
322 .templates
323 .read()
324 .await
325 .values()
326 .cloned()
327 .collect::<Vec<_>>();
328 rows.sort_by(|a, b| a.template_id.cmp(&b.template_id));
329 rows
330 }
331
332 async fn templates_dir_for_loaded_workspace(&self) -> anyhow::Result<PathBuf> {
333 let workspace = self
334 .loaded_workspace
335 .read()
336 .await
337 .clone()
338 .ok_or_else(|| anyhow::anyhow!("agent team workspace not loaded"))?;
339 Ok(PathBuf::from(workspace)
340 .join(".tandem")
341 .join("agent-team")
342 .join("templates"))
343 }
344
345 fn template_filename(template_id: &str) -> String {
346 let safe = template_id
347 .trim()
348 .chars()
349 .map(|ch| {
350 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
351 ch
352 } else {
353 '_'
354 }
355 })
356 .collect::<String>();
357 let fallback = if safe.is_empty() {
358 "template".to_string()
359 } else {
360 safe
361 };
362 format!("{fallback}.yaml")
363 }
364
365 pub async fn upsert_template(
366 &self,
367 workspace_root: &str,
368 template: AgentTemplate,
369 ) -> anyhow::Result<AgentTemplate> {
370 self.ensure_loaded_for_workspace(workspace_root).await?;
371 let templates_dir = self.templates_dir_for_loaded_workspace().await?;
372 fs::create_dir_all(&templates_dir).await?;
373 let path = templates_dir.join(Self::template_filename(&template.template_id));
374 let payload = serde_yaml::to_string(&template)?;
375 fs::write(path, payload).await?;
376 self.templates
377 .write()
378 .await
379 .insert(template.template_id.clone(), template.clone());
380 Ok(template)
381 }
382
383 pub async fn delete_template(
384 &self,
385 workspace_root: &str,
386 template_id: &str,
387 ) -> anyhow::Result<bool> {
388 self.ensure_loaded_for_workspace(workspace_root).await?;
389 let templates_dir = self.templates_dir_for_loaded_workspace().await?;
390 let path = templates_dir.join(Self::template_filename(template_id));
391 let existed = self.templates.write().await.remove(template_id).is_some();
392 if path.exists() {
393 let _ = fs::remove_file(path).await;
394 }
395 Ok(existed)
396 }
397
398 pub async fn get_template_for_workspace(
399 &self,
400 workspace_root: &str,
401 template_id: &str,
402 ) -> anyhow::Result<Option<AgentTemplate>> {
403 self.ensure_loaded_for_workspace(workspace_root).await?;
404 Ok(self.templates.read().await.get(template_id).cloned())
405 }
406
407 pub async fn list_instances(
408 &self,
409 mission_id: Option<&str>,
410 parent_instance_id: Option<&str>,
411 status: Option<AgentInstanceStatus>,
412 ) -> Vec<AgentInstance> {
413 let mut rows = self
414 .instances
415 .read()
416 .await
417 .values()
418 .filter(|instance| {
419 if let Some(mission_id) = mission_id {
420 if instance.mission_id != mission_id {
421 return false;
422 }
423 }
424 if let Some(parent_id) = parent_instance_id {
425 if instance.parent_instance_id.as_deref() != Some(parent_id) {
426 return false;
427 }
428 }
429 if let Some(status) = &status {
430 if &instance.status != status {
431 return false;
432 }
433 }
434 true
435 })
436 .cloned()
437 .collect::<Vec<_>>();
438 rows.sort_by(|a, b| a.instance_id.cmp(&b.instance_id));
439 rows
440 }
441
442 pub async fn list_mission_summaries(&self) -> Vec<AgentMissionSummary> {
443 let instances = self.instances.read().await;
444 let mut by_mission: HashMap<String, AgentMissionSummary> = HashMap::new();
445 for instance in instances.values() {
446 let row = by_mission
447 .entry(instance.mission_id.clone())
448 .or_insert_with(|| AgentMissionSummary {
449 mission_id: instance.mission_id.clone(),
450 instance_count: 0,
451 running_count: 0,
452 completed_count: 0,
453 failed_count: 0,
454 cancelled_count: 0,
455 queued_count: 0,
456 token_used_total: 0,
457 tool_calls_used_total: 0,
458 steps_used_total: 0,
459 cost_used_usd_total: 0.0,
460 });
461 row.instance_count = row.instance_count.saturating_add(1);
462 match instance.status {
463 AgentInstanceStatus::Queued => {
464 row.queued_count = row.queued_count.saturating_add(1)
465 }
466 AgentInstanceStatus::Running => {
467 row.running_count = row.running_count.saturating_add(1)
468 }
469 AgentInstanceStatus::Completed => {
470 row.completed_count = row.completed_count.saturating_add(1)
471 }
472 AgentInstanceStatus::Failed => {
473 row.failed_count = row.failed_count.saturating_add(1)
474 }
475 AgentInstanceStatus::Cancelled => {
476 row.cancelled_count = row.cancelled_count.saturating_add(1)
477 }
478 }
479 if let Some(usage) = instance
480 .metadata
481 .as_ref()
482 .and_then(|m| m.get("budgetUsage"))
483 .and_then(|u| u.as_object())
484 {
485 row.token_used_total = row.token_used_total.saturating_add(
486 usage
487 .get("tokensUsed")
488 .and_then(|v| v.as_u64())
489 .unwrap_or(0),
490 );
491 row.tool_calls_used_total = row.tool_calls_used_total.saturating_add(
492 usage
493 .get("toolCallsUsed")
494 .and_then(|v| v.as_u64())
495 .unwrap_or(0),
496 );
497 row.steps_used_total = row
498 .steps_used_total
499 .saturating_add(usage.get("stepsUsed").and_then(|v| v.as_u64()).unwrap_or(0));
500 row.cost_used_usd_total += usage
501 .get("costUsedUsd")
502 .and_then(|v| v.as_f64())
503 .unwrap_or(0.0);
504 }
505 }
506 let mut rows = by_mission.into_values().collect::<Vec<_>>();
507 rows.sort_by(|a, b| a.mission_id.cmp(&b.mission_id));
508 rows
509 }
510
511 pub async fn instance_for_session(&self, session_id: &str) -> Option<AgentInstance> {
512 self.instances
513 .read()
514 .await
515 .values()
516 .find(|instance| instance.session_id == session_id)
517 .cloned()
518 }
519
520 pub async fn list_spawn_approvals(&self) -> Vec<PendingSpawnApproval> {
521 let mut rows = self
522 .spawn_approvals
523 .read()
524 .await
525 .values()
526 .cloned()
527 .collect::<Vec<_>>();
528 rows.sort_by(|a, b| a.created_at_ms.cmp(&b.created_at_ms));
529 rows
530 }
531
532 pub async fn ensure_loaded_for_workspace(&self, workspace_root: &str) -> anyhow::Result<()> {
533 let normalized = workspace_root.trim().to_string();
534 let already_loaded = self
535 .loaded_workspace
536 .read()
537 .await
538 .as_ref()
539 .map(|s| s == &normalized)
540 .unwrap_or(false);
541 if already_loaded {
542 return Ok(());
543 }
544
545 let root = PathBuf::from(&normalized);
546 let policy_path = root
547 .join(".tandem")
548 .join("agent-team")
549 .join("spawn-policy.yaml");
550 let templates_dir = root.join(".tandem").join("agent-team").join("templates");
551
552 let mut next_policy = None;
553 if policy_path.exists() {
554 let raw = fs::read_to_string(&policy_path)
555 .await
556 .with_context(|| format!("failed reading {}", policy_path.display()))?;
557 let parsed = serde_yaml::from_str::<SpawnPolicy>(&raw)
558 .with_context(|| format!("failed parsing {}", policy_path.display()))?;
559 next_policy = Some(parsed);
560 }
561
562 let mut next_templates = HashMap::new();
563 if templates_dir.exists() {
564 let mut entries = fs::read_dir(&templates_dir).await?;
565 while let Some(entry) = entries.next_entry().await? {
566 let path = entry.path();
567 if !path.is_file() {
568 continue;
569 }
570 let ext = path
571 .extension()
572 .and_then(|v| v.to_str())
573 .unwrap_or_default()
574 .to_ascii_lowercase();
575 if ext != "yaml" && ext != "yml" && ext != "json" {
576 continue;
577 }
578 let raw = fs::read_to_string(&path).await?;
579 let template = serde_yaml::from_str::<AgentTemplate>(&raw)
580 .with_context(|| format!("failed parsing {}", path.display()))?;
581 next_templates.insert(template.template_id.clone(), template);
582 }
583 }
584
585 *self.policy.write().await = next_policy;
586 *self.templates.write().await = next_templates;
587 *self.loaded_workspace.write().await = Some(normalized);
588 Ok(())
589 }
590
591 pub async fn spawn(&self, state: &AppState, req: SpawnRequest) -> SpawnResult {
592 self.spawn_with_approval_override(state, req, false).await
593 }
594
595 async fn spawn_with_approval_override(
596 &self,
597 state: &AppState,
598 mut req: SpawnRequest,
599 approval_override: bool,
600 ) -> SpawnResult {
601 let workspace_root = state.workspace_index.snapshot().await.root;
602 if let Err(err) = self.ensure_loaded_for_workspace(&workspace_root).await {
603 return SpawnResult {
604 decision: SpawnDecision {
605 allowed: false,
606 code: Some(SpawnDenyCode::SpawnPolicyMissing),
607 reason: Some(format!("spawn policy load failed: {}", err)),
608 requires_user_approval: false,
609 },
610 instance: None,
611 };
612 }
613
614 let Some(policy) = self.policy.read().await.clone() else {
615 return SpawnResult {
616 decision: SpawnDecision {
617 allowed: false,
618 code: Some(SpawnDenyCode::SpawnPolicyMissing),
619 reason: Some("spawn policy file missing".to_string()),
620 requires_user_approval: false,
621 },
622 instance: None,
623 };
624 };
625
626 let template = {
627 let templates = self.templates.read().await;
628 req.template_id
629 .as_deref()
630 .and_then(|template_id| templates.get(template_id).cloned())
631 };
632 if req.template_id.is_none() {
633 if let Some(found) = self
634 .templates
635 .read()
636 .await
637 .values()
638 .find(|t| t.role == req.role)
639 .cloned()
640 {
641 req.template_id = Some(found.template_id.clone());
642 }
643 }
644 let template = if template.is_some() {
645 template
646 } else {
647 let templates = self.templates.read().await;
648 req.template_id
649 .as_deref()
650 .and_then(|id| templates.get(id).cloned())
651 };
652
653 if req.parent_role.is_none() {
654 if let Some(parent_id) = req.parent_instance_id.as_deref() {
655 let instances = self.instances.read().await;
656 req.parent_role = instances
657 .get(parent_id)
658 .map(|instance| instance.role.clone());
659 }
660 }
661
662 let instances = self.instances.read().await;
663 let total_agents = instances.len();
664 let running_agents = instances
665 .values()
666 .filter(|instance| instance.status == AgentInstanceStatus::Running)
667 .count();
668 drop(instances);
669
670 let mut decision = policy.evaluate(&req, total_agents, running_agents, template.as_ref());
671 if approval_override
672 && !decision.allowed
673 && decision.requires_user_approval
674 && matches!(decision.code, Some(SpawnDenyCode::SpawnRequiresApproval))
675 {
676 decision = SpawnDecision {
677 allowed: true,
678 code: None,
679 reason: None,
680 requires_user_approval: false,
681 };
682 }
683 if !decision.allowed {
684 if decision.requires_user_approval && !approval_override {
685 self.queue_spawn_approval(&req, &decision).await;
686 }
687 return SpawnResult {
688 decision,
689 instance: None,
690 };
691 }
692
693 let mission_id = req
694 .mission_id
695 .clone()
696 .unwrap_or_else(|| "mission-default".to_string());
697
698 if let Some(reason) = self
699 .mission_budget_exceeded_reason(&policy, &mission_id)
700 .await
701 {
702 return SpawnResult {
703 decision: SpawnDecision {
704 allowed: false,
705 code: Some(SpawnDenyCode::SpawnMissionBudgetExceeded),
706 reason: Some(reason),
707 requires_user_approval: false,
708 },
709 instance: None,
710 };
711 }
712
713 let template = template.unwrap_or_else(|| AgentTemplate {
714 template_id: "default-template".to_string(),
715 display_name: None,
716 avatar_url: None,
717 role: req.role.clone(),
718 system_prompt: None,
719 default_model: None,
720 skills: Vec::new(),
721 default_budget: BudgetLimit::default(),
722 capabilities: Default::default(),
723 });
724
725 let skill_hash = match compute_skill_hash(&workspace_root, &template, &policy).await {
726 Ok(hash) => hash,
727 Err(err) => {
728 let lowered = err.to_ascii_lowercase();
729 let code = if lowered.contains("pinned hash mismatch") {
730 SpawnDenyCode::SpawnSkillHashMismatch
731 } else if lowered.contains("skill source denied") {
732 SpawnDenyCode::SpawnSkillSourceDenied
733 } else {
734 SpawnDenyCode::SpawnRequiredSkillMissing
735 };
736 return SpawnResult {
737 decision: SpawnDecision {
738 allowed: false,
739 code: Some(code),
740 reason: Some(err),
741 requires_user_approval: false,
742 },
743 instance: None,
744 };
745 }
746 };
747
748 let parent_snapshot = {
749 let instances = self.instances.read().await;
750 req.parent_instance_id
751 .as_deref()
752 .and_then(|id| instances.get(id).cloned())
753 };
754 let parent_usage = if let Some(parent_id) = req.parent_instance_id.as_deref() {
755 self.budgets.read().await.get(parent_id).cloned()
756 } else {
757 None
758 };
759
760 let budget = resolve_budget(
761 &policy,
762 parent_snapshot,
763 parent_usage,
764 &template,
765 req.budget_override.clone(),
766 &req.role,
767 );
768
769 let mut session = Session::new(
770 Some(format!("Agent Team {}", template.template_id)),
771 Some(workspace_root.clone()),
772 );
773 session.workspace_root = Some(workspace_root.clone());
774 let session_id = session.id.clone();
775 if let Err(err) = state.storage.save_session(session).await {
776 return SpawnResult {
777 decision: SpawnDecision {
778 allowed: false,
779 code: Some(SpawnDenyCode::SpawnPolicyMissing),
780 reason: Some(format!("failed creating child session: {}", err)),
781 requires_user_approval: false,
782 },
783 instance: None,
784 };
785 }
786
787 let instance = AgentInstance {
788 instance_id: format!("ins_{}", Uuid::new_v4().simple()),
789 mission_id: mission_id.clone(),
790 parent_instance_id: req.parent_instance_id.clone(),
791 role: template.role.clone(),
792 template_id: template.template_id.clone(),
793 session_id: session_id.clone(),
794 run_id: None,
795 status: AgentInstanceStatus::Running,
796 budget,
797 skill_hash: skill_hash.clone(),
798 capabilities: template.capabilities.clone(),
799 metadata: Some(json!({
800 "source": req.source,
801 "justification": req.justification,
802 })),
803 };
804
805 self.instances
806 .write()
807 .await
808 .insert(instance.instance_id.clone(), instance.clone());
809 self.budgets.write().await.insert(
810 instance.instance_id.clone(),
811 InstanceBudgetState {
812 started_at: Some(Instant::now()),
813 ..InstanceBudgetState::default()
814 },
815 );
816 let _ = self.append_audit("spawn.approved", &instance).await;
817
818 SpawnResult {
819 decision: SpawnDecision {
820 allowed: true,
821 code: None,
822 reason: None,
823 requires_user_approval: false,
824 },
825 instance: Some(instance),
826 }
827 }
828
829 pub async fn approve_spawn_approval(
830 &self,
831 state: &AppState,
832 approval_id: &str,
833 reason: Option<&str>,
834 ) -> Option<SpawnResult> {
835 let approval = self.spawn_approvals.write().await.remove(approval_id)?;
836 let result = self
837 .spawn_with_approval_override(state, approval.request.clone(), true)
838 .await;
839 if let Some(instance) = result.instance.as_ref() {
840 let note = reason.unwrap_or("approved by operator");
841 let _ = self
842 .append_approval_audit("spawn.approval.approved", approval_id, Some(instance), note)
843 .await;
844 } else {
845 let note = reason.unwrap_or("approval replay failed policy checks");
846 let _ = self
847 .append_approval_audit("spawn.approval.rejected_on_replay", approval_id, None, note)
848 .await;
849 }
850 Some(result)
851 }
852
853 pub async fn deny_spawn_approval(
854 &self,
855 approval_id: &str,
856 reason: Option<&str>,
857 ) -> Option<PendingSpawnApproval> {
858 let approval = self.spawn_approvals.write().await.remove(approval_id)?;
859 let note = reason.unwrap_or("denied by operator");
860 let _ = self
861 .append_approval_audit("spawn.approval.denied", approval_id, None, note)
862 .await;
863 Some(approval)
864 }
865
866 pub async fn cancel_instance(
867 &self,
868 state: &AppState,
869 instance_id: &str,
870 reason: &str,
871 ) -> Option<AgentInstance> {
872 let mut instances = self.instances.write().await;
873 let instance = instances.get_mut(instance_id)?;
874 if matches!(
875 instance.status,
876 AgentInstanceStatus::Completed
877 | AgentInstanceStatus::Failed
878 | AgentInstanceStatus::Cancelled
879 ) {
880 return Some(instance.clone());
881 }
882 instance.status = AgentInstanceStatus::Cancelled;
883 let snapshot = instance.clone();
884 drop(instances);
885 let _ = state.cancellations.cancel(&snapshot.session_id).await;
886 let _ = self.append_audit("instance.cancelled", &snapshot).await;
887 emit_instance_cancelled(state, &snapshot, reason);
888 Some(snapshot)
889 }
890
891 async fn queue_spawn_approval(&self, req: &SpawnRequest, decision: &SpawnDecision) {
892 let approval = PendingSpawnApproval {
893 approval_id: format!("spawn_{}", Uuid::new_v4().simple()),
894 created_at_ms: crate::now_ms(),
895 request: req.clone(),
896 decision_code: decision.code.clone(),
897 reason: decision.reason.clone(),
898 };
899 self.spawn_approvals
900 .write()
901 .await
902 .insert(approval.approval_id.clone(), approval);
903 }
904
905 async fn mission_budget_exceeded_reason(
906 &self,
907 policy: &SpawnPolicy,
908 mission_id: &str,
909 ) -> Option<String> {
910 let limit = policy.mission_total_budget.as_ref()?;
911 let usage = self
912 .mission_budgets
913 .read()
914 .await
915 .get(mission_id)
916 .cloned()
917 .unwrap_or_default();
918 if let Some(max) = limit.max_tokens {
919 if usage.tokens_used >= max {
920 return Some(format!(
921 "mission max_tokens exhausted ({}/{})",
922 usage.tokens_used, max
923 ));
924 }
925 }
926 if let Some(max) = limit.max_steps {
927 if usage.steps_used >= u64::from(max) {
928 return Some(format!(
929 "mission max_steps exhausted ({}/{})",
930 usage.steps_used, max
931 ));
932 }
933 }
934 if let Some(max) = limit.max_tool_calls {
935 if usage.tool_calls_used >= u64::from(max) {
936 return Some(format!(
937 "mission max_tool_calls exhausted ({}/{})",
938 usage.tool_calls_used, max
939 ));
940 }
941 }
942 if let Some(max) = limit.max_cost_usd {
943 if usage.cost_used_usd >= max {
944 return Some(format!(
945 "mission max_cost_usd exhausted ({:.6}/{:.6})",
946 usage.cost_used_usd, max
947 ));
948 }
949 }
950 None
951 }
952
953 pub async fn cancel_mission(&self, state: &AppState, mission_id: &str, reason: &str) -> usize {
954 let instance_ids = self
955 .instances
956 .read()
957 .await
958 .values()
959 .filter(|instance| instance.mission_id == mission_id)
960 .map(|instance| instance.instance_id.clone())
961 .collect::<Vec<_>>();
962 let mut count = 0usize;
963 for instance_id in instance_ids {
964 if self
965 .cancel_instance(state, &instance_id, reason)
966 .await
967 .is_some()
968 {
969 count = count.saturating_add(1);
970 }
971 }
972 count
973 }
974
975 async fn mark_instance_terminal(
976 &self,
977 state: &AppState,
978 instance_id: &str,
979 status: AgentInstanceStatus,
980 ) -> Option<AgentInstance> {
981 let mut instances = self.instances.write().await;
982 let instance = instances.get_mut(instance_id)?;
983 if matches!(
984 instance.status,
985 AgentInstanceStatus::Completed
986 | AgentInstanceStatus::Failed
987 | AgentInstanceStatus::Cancelled
988 ) {
989 return Some(instance.clone());
990 }
991 instance.status = status.clone();
992 let snapshot = instance.clone();
993 drop(instances);
994 match status {
995 AgentInstanceStatus::Completed => emit_instance_completed(state, &snapshot),
996 AgentInstanceStatus::Failed => emit_instance_failed(state, &snapshot),
997 _ => {}
998 }
999 Some(snapshot)
1000 }
1001
1002 pub async fn handle_engine_event(&self, state: &AppState, event: &EngineEvent) {
1003 let Some(session_id) = extract_session_id(event) else {
1004 return;
1005 };
1006 let Some(instance_id) = self.instance_id_for_session(&session_id).await else {
1007 return;
1008 };
1009 if event.event_type == "provider.usage" {
1010 let total_tokens = event
1011 .properties
1012 .get("totalTokens")
1013 .and_then(|v| v.as_u64())
1014 .unwrap_or(0);
1015 let cost_used_usd = event
1016 .properties
1017 .get("costUsd")
1018 .and_then(|v| v.as_f64())
1019 .unwrap_or(0.0);
1020 if total_tokens > 0 {
1021 let exhausted = self
1022 .apply_exact_token_usage(state, &instance_id, total_tokens, cost_used_usd)
1023 .await;
1024 if exhausted {
1025 let _ = self
1026 .cancel_instance(state, &instance_id, "budget exhausted")
1027 .await;
1028 }
1029 }
1030 return;
1031 }
1032 let mut delta_tokens = 0u64;
1033 let mut delta_steps = 0u32;
1034 let mut delta_tool_calls = 0u32;
1035 if event.event_type == "message.part.updated" {
1036 if let Some(part) = event.properties.get("part") {
1037 let part_type = part.get("type").and_then(|v| v.as_str()).unwrap_or("");
1038 if part_type == "tool-invocation" {
1039 delta_tool_calls = 1;
1040 } else if part_type == "text" {
1041 let delta = event
1042 .properties
1043 .get("delta")
1044 .and_then(|v| v.as_str())
1045 .unwrap_or("");
1046 if !delta.is_empty() {
1047 delta_tokens = estimate_tokens(delta);
1048 }
1049 }
1050 }
1051 } else if event.event_type == "session.run.finished" {
1052 delta_steps = 1;
1053 let run_status = event
1054 .properties
1055 .get("status")
1056 .and_then(|v| v.as_str())
1057 .unwrap_or("")
1058 .to_ascii_lowercase();
1059 if run_status == "completed" {
1060 let _ = self
1061 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Completed)
1062 .await;
1063 } else if run_status == "failed" || run_status == "error" {
1064 let _ = self
1065 .mark_instance_terminal(state, &instance_id, AgentInstanceStatus::Failed)
1066 .await;
1067 }
1068 }
1069 if delta_tokens == 0 && delta_steps == 0 && delta_tool_calls == 0 {
1070 return;
1071 }
1072 let exhausted = self
1073 .apply_budget_delta(
1074 state,
1075 &instance_id,
1076 delta_tokens,
1077 delta_steps,
1078 delta_tool_calls,
1079 )
1080 .await;
1081 if exhausted {
1082 let _ = self
1083 .cancel_instance(state, &instance_id, "budget exhausted")
1084 .await;
1085 }
1086 }
1087
1088 async fn instance_id_for_session(&self, session_id: &str) -> Option<String> {
1089 self.instances
1090 .read()
1091 .await
1092 .values()
1093 .find(|instance| instance.session_id == session_id)
1094 .map(|instance| instance.instance_id.clone())
1095 }
1096
1097 async fn apply_budget_delta(
1098 &self,
1099 state: &AppState,
1100 instance_id: &str,
1101 delta_tokens: u64,
1102 delta_steps: u32,
1103 delta_tool_calls: u32,
1104 ) -> bool {
1105 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1106 enabled: false,
1107 require_justification: false,
1108 max_agents: None,
1109 max_concurrent: None,
1110 child_budget_percent_of_parent_remaining: None,
1111 mission_total_budget: None,
1112 cost_per_1k_tokens_usd: None,
1113 spawn_edges: HashMap::new(),
1114 required_skills: HashMap::new(),
1115 role_defaults: HashMap::new(),
1116 skill_sources: Default::default(),
1117 });
1118 let mut budgets = self.budgets.write().await;
1119 let Some(usage) = budgets.get_mut(instance_id) else {
1120 return false;
1121 };
1122 if usage.exhausted {
1123 return true;
1124 }
1125 let prev_cost_used_usd = usage.cost_used_usd;
1126 usage.tokens_used = usage.tokens_used.saturating_add(delta_tokens);
1127 usage.steps_used = usage.steps_used.saturating_add(delta_steps);
1128 usage.tool_calls_used = usage.tool_calls_used.saturating_add(delta_tool_calls);
1129 if let Some(rate) = policy.cost_per_1k_tokens_usd {
1130 usage.cost_used_usd += (delta_tokens as f64 / 1000.0) * rate;
1131 }
1132 let elapsed_ms = usage
1133 .started_at
1134 .map(|started| started.elapsed().as_millis() as u64)
1135 .unwrap_or(0);
1136
1137 let mut exhausted_reason: Option<&'static str> = None;
1138 let mut snapshot: Option<AgentInstance> = None;
1139 {
1140 let mut instances = self.instances.write().await;
1141 if let Some(instance) = instances.get_mut(instance_id) {
1142 instance.metadata = Some(merge_metadata_usage(
1143 instance.metadata.take(),
1144 usage.tokens_used,
1145 usage.steps_used,
1146 usage.tool_calls_used,
1147 usage.cost_used_usd,
1148 elapsed_ms,
1149 ));
1150 if let Some(limit) = instance.budget.max_tokens {
1151 if usage.tokens_used >= limit {
1152 exhausted_reason = Some("max_tokens");
1153 }
1154 }
1155 if exhausted_reason.is_none() {
1156 if let Some(limit) = instance.budget.max_steps {
1157 if usage.steps_used >= limit {
1158 exhausted_reason = Some("max_steps");
1159 }
1160 }
1161 }
1162 if exhausted_reason.is_none() {
1163 if let Some(limit) = instance.budget.max_tool_calls {
1164 if usage.tool_calls_used >= limit {
1165 exhausted_reason = Some("max_tool_calls");
1166 }
1167 }
1168 }
1169 if exhausted_reason.is_none() {
1170 if let Some(limit) = instance.budget.max_duration_ms {
1171 if elapsed_ms >= limit {
1172 exhausted_reason = Some("max_duration_ms");
1173 }
1174 }
1175 }
1176 if exhausted_reason.is_none() {
1177 if let Some(limit) = instance.budget.max_cost_usd {
1178 if usage.cost_used_usd >= limit {
1179 exhausted_reason = Some("max_cost_usd");
1180 }
1181 }
1182 }
1183 snapshot = Some(instance.clone());
1184 }
1185 }
1186 let Some(instance) = snapshot else {
1187 return false;
1188 };
1189 emit_budget_usage(
1190 state,
1191 &instance,
1192 usage.tokens_used,
1193 usage.steps_used,
1194 usage.tool_calls_used,
1195 usage.cost_used_usd,
1196 elapsed_ms,
1197 );
1198 let mission_exhausted = self
1199 .apply_mission_budget_delta(
1200 state,
1201 &instance.mission_id,
1202 delta_tokens,
1203 u64::from(delta_steps),
1204 u64::from(delta_tool_calls),
1205 usage.cost_used_usd - prev_cost_used_usd,
1206 &policy,
1207 )
1208 .await;
1209 if mission_exhausted {
1210 usage.exhausted = true;
1211 let _ = self
1212 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1213 .await;
1214 return true;
1215 }
1216 if let Some(reason) = exhausted_reason {
1217 usage.exhausted = true;
1218 emit_budget_exhausted(
1219 state,
1220 &instance,
1221 reason,
1222 usage.tokens_used,
1223 usage.steps_used,
1224 usage.tool_calls_used,
1225 usage.cost_used_usd,
1226 elapsed_ms,
1227 );
1228 return true;
1229 }
1230 false
1231 }
1232
1233 async fn apply_exact_token_usage(
1234 &self,
1235 state: &AppState,
1236 instance_id: &str,
1237 total_tokens: u64,
1238 cost_used_usd: f64,
1239 ) -> bool {
1240 let policy = self.policy.read().await.clone().unwrap_or(SpawnPolicy {
1241 enabled: false,
1242 require_justification: false,
1243 max_agents: None,
1244 max_concurrent: None,
1245 child_budget_percent_of_parent_remaining: None,
1246 mission_total_budget: None,
1247 cost_per_1k_tokens_usd: None,
1248 spawn_edges: HashMap::new(),
1249 required_skills: HashMap::new(),
1250 role_defaults: HashMap::new(),
1251 skill_sources: Default::default(),
1252 });
1253 let mut budgets = self.budgets.write().await;
1254 let Some(usage) = budgets.get_mut(instance_id) else {
1255 return false;
1256 };
1257 if usage.exhausted {
1258 return true;
1259 }
1260 let prev_tokens = usage.tokens_used;
1261 let prev_cost_used_usd = usage.cost_used_usd;
1262 usage.tokens_used = usage.tokens_used.max(total_tokens);
1263 if cost_used_usd > 0.0 {
1264 usage.cost_used_usd = usage.cost_used_usd.max(cost_used_usd);
1265 } else if let Some(rate) = policy.cost_per_1k_tokens_usd {
1266 let delta = usage.tokens_used.saturating_sub(prev_tokens);
1267 usage.cost_used_usd += (delta as f64 / 1000.0) * rate;
1268 }
1269 let elapsed_ms = usage
1270 .started_at
1271 .map(|started| started.elapsed().as_millis() as u64)
1272 .unwrap_or(0);
1273 let mut exhausted_reason: Option<&'static str> = None;
1274 let mut snapshot: Option<AgentInstance> = None;
1275 {
1276 let mut instances = self.instances.write().await;
1277 if let Some(instance) = instances.get_mut(instance_id) {
1278 instance.metadata = Some(merge_metadata_usage(
1279 instance.metadata.take(),
1280 usage.tokens_used,
1281 usage.steps_used,
1282 usage.tool_calls_used,
1283 usage.cost_used_usd,
1284 elapsed_ms,
1285 ));
1286 if let Some(limit) = instance.budget.max_tokens {
1287 if usage.tokens_used >= limit {
1288 exhausted_reason = Some("max_tokens");
1289 }
1290 }
1291 if exhausted_reason.is_none() {
1292 if let Some(limit) = instance.budget.max_cost_usd {
1293 if usage.cost_used_usd >= limit {
1294 exhausted_reason = Some("max_cost_usd");
1295 }
1296 }
1297 }
1298 snapshot = Some(instance.clone());
1299 }
1300 }
1301 let Some(instance) = snapshot else {
1302 return false;
1303 };
1304 emit_budget_usage(
1305 state,
1306 &instance,
1307 usage.tokens_used,
1308 usage.steps_used,
1309 usage.tool_calls_used,
1310 usage.cost_used_usd,
1311 elapsed_ms,
1312 );
1313 let mission_exhausted = self
1314 .apply_mission_budget_delta(
1315 state,
1316 &instance.mission_id,
1317 usage.tokens_used.saturating_sub(prev_tokens),
1318 0,
1319 0,
1320 usage.cost_used_usd - prev_cost_used_usd,
1321 &policy,
1322 )
1323 .await;
1324 if mission_exhausted {
1325 usage.exhausted = true;
1326 let _ = self
1327 .cancel_mission(state, &instance.mission_id, "mission budget exhausted")
1328 .await;
1329 return true;
1330 }
1331 if let Some(reason) = exhausted_reason {
1332 usage.exhausted = true;
1333 emit_budget_exhausted(
1334 state,
1335 &instance,
1336 reason,
1337 usage.tokens_used,
1338 usage.steps_used,
1339 usage.tool_calls_used,
1340 usage.cost_used_usd,
1341 elapsed_ms,
1342 );
1343 return true;
1344 }
1345 false
1346 }
1347
1348 async fn append_audit(&self, action: &str, instance: &AgentInstance) -> anyhow::Result<()> {
1349 let path = self.audit_path.read().await.clone();
1350 if let Some(parent) = path.parent() {
1351 fs::create_dir_all(parent).await?;
1352 }
1353 let row = json!({
1354 "action": action,
1355 "missionID": instance.mission_id,
1356 "instanceID": instance.instance_id,
1357 "parentInstanceID": instance.parent_instance_id,
1358 "role": instance.role,
1359 "templateID": instance.template_id,
1360 "sessionID": instance.session_id,
1361 "skillHash": instance.skill_hash,
1362 "timestampMs": crate::now_ms(),
1363 });
1364 let mut existing = if path.exists() {
1365 fs::read_to_string(&path).await.unwrap_or_default()
1366 } else {
1367 String::new()
1368 };
1369 existing.push_str(&serde_json::to_string(&row)?);
1370 existing.push('\n');
1371 fs::write(path, existing).await?;
1372 Ok(())
1373 }
1374
1375 async fn append_approval_audit(
1376 &self,
1377 action: &str,
1378 approval_id: &str,
1379 instance: Option<&AgentInstance>,
1380 reason: &str,
1381 ) -> anyhow::Result<()> {
1382 let path = self.audit_path.read().await.clone();
1383 if let Some(parent) = path.parent() {
1384 fs::create_dir_all(parent).await?;
1385 }
1386 let row = json!({
1387 "action": action,
1388 "approvalID": approval_id,
1389 "reason": reason,
1390 "missionID": instance.map(|v| v.mission_id.clone()),
1391 "instanceID": instance.map(|v| v.instance_id.clone()),
1392 "parentInstanceID": instance.and_then(|v| v.parent_instance_id.clone()),
1393 "role": instance.map(|v| v.role.clone()),
1394 "templateID": instance.map(|v| v.template_id.clone()),
1395 "sessionID": instance.map(|v| v.session_id.clone()),
1396 "skillHash": instance.map(|v| v.skill_hash.clone()),
1397 "timestampMs": crate::now_ms(),
1398 });
1399 let mut existing = if path.exists() {
1400 fs::read_to_string(&path).await.unwrap_or_default()
1401 } else {
1402 String::new()
1403 };
1404 existing.push_str(&serde_json::to_string(&row)?);
1405 existing.push('\n');
1406 fs::write(path, existing).await?;
1407 Ok(())
1408 }
1409
1410 #[allow(clippy::too_many_arguments)]
1411 async fn apply_mission_budget_delta(
1412 &self,
1413 state: &AppState,
1414 mission_id: &str,
1415 delta_tokens: u64,
1416 delta_steps: u64,
1417 delta_tool_calls: u64,
1418 delta_cost_used_usd: f64,
1419 policy: &SpawnPolicy,
1420 ) -> bool {
1421 let mut budgets = self.mission_budgets.write().await;
1422 let row = budgets.entry(mission_id.to_string()).or_default();
1423 row.tokens_used = row.tokens_used.saturating_add(delta_tokens);
1424 row.steps_used = row.steps_used.saturating_add(delta_steps);
1425 row.tool_calls_used = row.tool_calls_used.saturating_add(delta_tool_calls);
1426 row.cost_used_usd += delta_cost_used_usd.max(0.0);
1427 if row.exhausted {
1428 return true;
1429 }
1430 let Some(limit) = policy.mission_total_budget.as_ref() else {
1431 return false;
1432 };
1433 let mut exhausted_by: Option<&'static str> = None;
1434 if let Some(max) = limit.max_tokens {
1435 if row.tokens_used >= max {
1436 exhausted_by = Some("mission_max_tokens");
1437 }
1438 }
1439 if exhausted_by.is_none() {
1440 if let Some(max) = limit.max_steps {
1441 if row.steps_used >= u64::from(max) {
1442 exhausted_by = Some("mission_max_steps");
1443 }
1444 }
1445 }
1446 if exhausted_by.is_none() {
1447 if let Some(max) = limit.max_tool_calls {
1448 if row.tool_calls_used >= u64::from(max) {
1449 exhausted_by = Some("mission_max_tool_calls");
1450 }
1451 }
1452 }
1453 if exhausted_by.is_none() {
1454 if let Some(max) = limit.max_cost_usd {
1455 if row.cost_used_usd >= max {
1456 exhausted_by = Some("mission_max_cost_usd");
1457 }
1458 }
1459 }
1460 if let Some(exhausted_by) = exhausted_by {
1461 row.exhausted = true;
1462 emit_mission_budget_exhausted(
1463 state,
1464 mission_id,
1465 exhausted_by,
1466 row.tokens_used,
1467 row.steps_used,
1468 row.tool_calls_used,
1469 row.cost_used_usd,
1470 );
1471 return true;
1472 }
1473 false
1474 }
1475
1476 pub async fn set_for_test(
1477 &self,
1478 workspace_root: Option<String>,
1479 policy: Option<SpawnPolicy>,
1480 templates: Vec<AgentTemplate>,
1481 ) {
1482 *self.policy.write().await = policy;
1483 let mut by_id = HashMap::new();
1484 for template in templates {
1485 by_id.insert(template.template_id.clone(), template);
1486 }
1487 *self.templates.write().await = by_id;
1488 self.instances.write().await.clear();
1489 self.budgets.write().await.clear();
1490 self.mission_budgets.write().await.clear();
1491 self.spawn_approvals.write().await.clear();
1492 *self.loaded_workspace.write().await = workspace_root;
1493 }
1494}
1495
1496fn resolve_budget(
1497 policy: &SpawnPolicy,
1498 parent_instance: Option<AgentInstance>,
1499 parent_usage: Option<InstanceBudgetState>,
1500 template: &AgentTemplate,
1501 override_budget: Option<BudgetLimit>,
1502 role: &AgentRole,
1503) -> BudgetLimit {
1504 let role_default = policy.role_defaults.get(role).cloned().unwrap_or_default();
1505 let mut chosen = merge_budget(
1506 merge_budget(role_default, template.default_budget.clone()),
1507 override_budget.unwrap_or_default(),
1508 );
1509
1510 if let Some(parent) = parent_instance {
1511 let usage = parent_usage.unwrap_or_default();
1512 if let Some(pct) = policy.child_budget_percent_of_parent_remaining {
1513 if pct > 0 {
1514 chosen.max_tokens = cap_budget_remaining_u64(
1515 chosen.max_tokens,
1516 parent.budget.max_tokens,
1517 usage.tokens_used,
1518 pct,
1519 );
1520 chosen.max_steps = cap_budget_remaining_u32(
1521 chosen.max_steps,
1522 parent.budget.max_steps,
1523 usage.steps_used,
1524 pct,
1525 );
1526 chosen.max_tool_calls = cap_budget_remaining_u32(
1527 chosen.max_tool_calls,
1528 parent.budget.max_tool_calls,
1529 usage.tool_calls_used,
1530 pct,
1531 );
1532 chosen.max_duration_ms = cap_budget_remaining_u64(
1533 chosen.max_duration_ms,
1534 parent.budget.max_duration_ms,
1535 usage
1536 .started_at
1537 .map(|started| started.elapsed().as_millis() as u64)
1538 .unwrap_or(0),
1539 pct,
1540 );
1541 chosen.max_cost_usd = cap_budget_remaining_f64(
1542 chosen.max_cost_usd,
1543 parent.budget.max_cost_usd,
1544 usage.cost_used_usd,
1545 pct,
1546 );
1547 }
1548 }
1549 }
1550 chosen
1551}
1552
1553fn merge_budget(base: BudgetLimit, overlay: BudgetLimit) -> BudgetLimit {
1554 BudgetLimit {
1555 max_tokens: overlay.max_tokens.or(base.max_tokens),
1556 max_steps: overlay.max_steps.or(base.max_steps),
1557 max_tool_calls: overlay.max_tool_calls.or(base.max_tool_calls),
1558 max_duration_ms: overlay.max_duration_ms.or(base.max_duration_ms),
1559 max_cost_usd: overlay.max_cost_usd.or(base.max_cost_usd),
1560 }
1561}
1562
1563fn cap_budget_remaining_u64(
1564 child: Option<u64>,
1565 parent_limit: Option<u64>,
1566 parent_used: u64,
1567 pct: u8,
1568) -> Option<u64> {
1569 match (child, parent_limit) {
1570 (Some(child), Some(parent_limit)) => {
1571 let remaining = parent_limit.saturating_sub(parent_used);
1572 Some(child.min(remaining.saturating_mul(pct as u64) / 100))
1573 }
1574 (None, Some(parent_limit)) => {
1575 let remaining = parent_limit.saturating_sub(parent_used);
1576 Some(remaining.saturating_mul(pct as u64) / 100)
1577 }
1578 (Some(child), None) => Some(child),
1579 (None, None) => None,
1580 }
1581}
1582
1583fn cap_budget_remaining_u32(
1584 child: Option<u32>,
1585 parent_limit: Option<u32>,
1586 parent_used: u32,
1587 pct: u8,
1588) -> Option<u32> {
1589 match (child, parent_limit) {
1590 (Some(child), Some(parent_limit)) => {
1591 let remaining = parent_limit.saturating_sub(parent_used);
1592 Some(child.min(remaining.saturating_mul(pct as u32) / 100))
1593 }
1594 (None, Some(parent_limit)) => {
1595 let remaining = parent_limit.saturating_sub(parent_used);
1596 Some(remaining.saturating_mul(pct as u32) / 100)
1597 }
1598 (Some(child), None) => Some(child),
1599 (None, None) => None,
1600 }
1601}
1602
1603fn cap_budget_remaining_f64(
1604 child: Option<f64>,
1605 parent_limit: Option<f64>,
1606 parent_used: f64,
1607 pct: u8,
1608) -> Option<f64> {
1609 match (child, parent_limit) {
1610 (Some(child), Some(parent_limit)) => {
1611 let remaining = (parent_limit - parent_used).max(0.0);
1612 Some(child.min(remaining * f64::from(pct) / 100.0))
1613 }
1614 (None, Some(parent_limit)) => {
1615 let remaining = (parent_limit - parent_used).max(0.0);
1616 Some(remaining * f64::from(pct) / 100.0)
1617 }
1618 (Some(child), None) => Some(child),
1619 (None, None) => None,
1620 }
1621}
1622
1623async fn compute_skill_hash(
1624 workspace_root: &str,
1625 template: &AgentTemplate,
1626 policy: &SpawnPolicy,
1627) -> Result<String, String> {
1628 use sha2::{Digest, Sha256};
1629 let mut rows = Vec::new();
1630 let skill_service = SkillService::for_workspace(Some(PathBuf::from(workspace_root)));
1631 for skill in &template.skills {
1632 if let Some(path) = skill.path.as_deref() {
1633 validate_skill_source(skill.id.as_deref(), Some(path), policy)?;
1634 let skill_path = Path::new(workspace_root).join(path);
1635 let raw = fs::read_to_string(&skill_path)
1636 .await
1637 .map_err(|_| format!("missing required skill path `{}`", skill_path.display()))?;
1638 let digest = hash_hex(raw.as_bytes());
1639 validate_pinned_hash(skill.id.as_deref(), Some(path), &digest, policy)?;
1640 rows.push(format!("path:{}:{}", path, digest));
1641 } else if let Some(id) = skill.id.as_deref() {
1642 validate_skill_source(Some(id), None, policy)?;
1643 let loaded = skill_service
1644 .load_skill(id)
1645 .map_err(|err| format!("failed loading skill `{id}`: {err}"))?;
1646 let Some(loaded) = loaded else {
1647 return Err(format!("missing required skill id `{id}`"));
1648 };
1649 let digest = hash_hex(loaded.content.as_bytes());
1650 validate_pinned_hash(Some(id), None, &digest, policy)?;
1651 rows.push(format!("id:{}:{}", id, digest));
1652 }
1653 }
1654 rows.sort();
1655 let mut hasher = Sha256::new();
1656 for row in rows {
1657 hasher.update(row.as_bytes());
1658 hasher.update(b"\n");
1659 }
1660 let digest = hasher.finalize();
1661 Ok(format!("sha256:{}", hash_hex(digest.as_slice())))
1662}
1663
1664fn validate_skill_source(
1665 id: Option<&str>,
1666 path: Option<&str>,
1667 policy: &SpawnPolicy,
1668) -> Result<(), String> {
1669 use tandem_orchestrator::SkillSourceMode;
1670 match policy.skill_sources.mode {
1671 SkillSourceMode::Any => Ok(()),
1672 SkillSourceMode::ProjectOnly => {
1673 if id.is_some() {
1674 return Err("skill source denied: project_only forbids skill IDs".to_string());
1675 }
1676 let Some(path) = path else {
1677 return Err("skill source denied: project_only requires skill path".to_string());
1678 };
1679 let p = PathBuf::from(path);
1680 if p.is_absolute() {
1681 return Err("skill source denied: absolute skill paths are forbidden".to_string());
1682 }
1683 Ok(())
1684 }
1685 SkillSourceMode::Allowlist => {
1686 if let Some(id) = id {
1687 if policy.skill_sources.allowlist_ids.iter().any(|v| v == id) {
1688 return Ok(());
1689 }
1690 }
1691 if let Some(path) = path {
1692 if policy
1693 .skill_sources
1694 .allowlist_paths
1695 .iter()
1696 .any(|v| v == path)
1697 {
1698 return Ok(());
1699 }
1700 }
1701 Err("skill source denied: not present in allowlist".to_string())
1702 }
1703 }
1704}
1705
1706fn validate_pinned_hash(
1707 id: Option<&str>,
1708 path: Option<&str>,
1709 digest: &str,
1710 policy: &SpawnPolicy,
1711) -> Result<(), String> {
1712 let by_id = id.and_then(|id| policy.skill_sources.pinned_hashes.get(&format!("id:{id}")));
1713 let by_path = path.and_then(|path| {
1714 policy
1715 .skill_sources
1716 .pinned_hashes
1717 .get(&format!("path:{path}"))
1718 });
1719 let expected = by_id.or(by_path);
1720 if let Some(expected) = expected {
1721 let normalized = expected.strip_prefix("sha256:").unwrap_or(expected);
1722 if normalized != digest {
1723 return Err("pinned hash mismatch for skill reference".to_string());
1724 }
1725 }
1726 Ok(())
1727}
1728
1729fn hash_hex(bytes: &[u8]) -> String {
1730 let mut out = String::with_capacity(bytes.len() * 2);
1731 for byte in bytes {
1732 use std::fmt::Write as _;
1733 let _ = write!(&mut out, "{:02x}", byte);
1734 }
1735 out
1736}
1737
1738fn estimate_tokens(text: &str) -> u64 {
1739 let chars = text.chars().count() as u64;
1740 (chars / 4).max(1)
1741}
1742
1743fn extract_session_id(event: &EngineEvent) -> Option<String> {
1744 event
1745 .properties
1746 .get("sessionID")
1747 .and_then(|v| v.as_str())
1748 .map(|v| v.to_string())
1749 .or_else(|| {
1750 event
1751 .properties
1752 .get("part")
1753 .and_then(|v| v.get("sessionID"))
1754 .and_then(|v| v.as_str())
1755 .map(|v| v.to_string())
1756 })
1757}
1758
1759fn merge_metadata_usage(
1760 metadata: Option<Value>,
1761 tokens_used: u64,
1762 steps_used: u32,
1763 tool_calls_used: u32,
1764 cost_used_usd: f64,
1765 elapsed_ms: u64,
1766) -> Value {
1767 let mut base = metadata
1768 .and_then(|v| v.as_object().cloned())
1769 .unwrap_or_default();
1770 base.insert(
1771 "budgetUsage".to_string(),
1772 json!({
1773 "tokensUsed": tokens_used,
1774 "stepsUsed": steps_used,
1775 "toolCallsUsed": tool_calls_used,
1776 "costUsedUsd": cost_used_usd,
1777 "elapsedMs": elapsed_ms
1778 }),
1779 );
1780 Value::Object(base)
1781}
1782
1783fn normalize_tool_name(name: &str) -> String {
1784 match name.trim().to_lowercase().replace('-', "_").as_str() {
1785 "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
1786 other => other.to_string(),
1787 }
1788}
1789
1790async fn evaluate_capability_deny(
1791 state: &AppState,
1792 instance: &AgentInstance,
1793 tool: &str,
1794 args: &Value,
1795 caps: &tandem_orchestrator::CapabilitySpec,
1796 session_id: &str,
1797 message_id: &str,
1798) -> Option<String> {
1799 let deny_patterns = caps
1800 .tool_denylist
1801 .iter()
1802 .map(|name| normalize_tool_name(name))
1803 .collect::<Vec<_>>();
1804 if !deny_patterns.is_empty() && any_policy_matches(&deny_patterns, tool) {
1805 return Some(format!("tool `{tool}` denied by agent capability policy"));
1806 }
1807 let allow_patterns = caps
1808 .tool_allowlist
1809 .iter()
1810 .map(|name| normalize_tool_name(name))
1811 .collect::<Vec<_>>();
1812 if !allow_patterns.is_empty() && !any_policy_matches(&allow_patterns, tool) {
1813 return Some(format!("tool `{tool}` not in agent allowlist"));
1814 }
1815
1816 let browser_execution_tool = matches!(
1817 tool,
1818 "browser_open"
1819 | "browser_navigate"
1820 | "browser_snapshot"
1821 | "browser_click"
1822 | "browser_type"
1823 | "browser_press"
1824 | "browser_wait"
1825 | "browser_extract"
1826 | "browser_screenshot"
1827 | "browser_close"
1828 );
1829
1830 if matches!(
1831 tool,
1832 "websearch" | "webfetch" | "webfetch_html" | "browser_open" | "browser_navigate"
1833 ) || browser_execution_tool
1834 {
1835 if !caps.net_scopes.enabled {
1836 return Some("network disabled for this agent instance".to_string());
1837 }
1838 if !caps.net_scopes.allow_hosts.is_empty() {
1839 if tool == "websearch" {
1840 return Some(
1841 "websearch blocked: host allowlist cannot be verified for search tool"
1842 .to_string(),
1843 );
1844 }
1845 if let Some(host) = extract_url_host(args) {
1846 let allowed = caps.net_scopes.allow_hosts.iter().any(|h| {
1847 let allowed = h.trim().to_ascii_lowercase();
1848 !allowed.is_empty()
1849 && (host == allowed || host.ends_with(&format!(".{allowed}")))
1850 });
1851 if !allowed {
1852 return Some(format!("network host `{host}` not in allow_hosts"));
1853 }
1854 }
1855 }
1856 }
1857
1858 if tool == "bash" {
1859 let cmd = args
1860 .get("command")
1861 .and_then(|v| v.as_str())
1862 .unwrap_or("")
1863 .to_ascii_lowercase();
1864 if cmd.contains("git push") {
1865 if !caps.git_caps.push {
1866 return Some("git push disabled for this agent instance".to_string());
1867 }
1868 if caps.git_caps.push_requires_approval {
1869 let action = state.permissions.evaluate("git_push", "git_push").await;
1870 match action {
1871 tandem_core::PermissionAction::Allow => {}
1872 tandem_core::PermissionAction::Deny => {
1873 return Some("git push denied by policy rule".to_string());
1874 }
1875 tandem_core::PermissionAction::Ask => {
1876 let pending = state
1877 .permissions
1878 .ask_for_session_with_context(
1879 Some(session_id),
1880 "git_push",
1881 args.clone(),
1882 Some(tandem_core::PermissionArgsContext {
1883 args_source: "agent_team.git_push".to_string(),
1884 args_integrity: "runtime-checked".to_string(),
1885 query: Some(format!(
1886 "instanceID={} messageID={}",
1887 instance.instance_id, message_id
1888 )),
1889 }),
1890 )
1891 .await;
1892 return Some(format!(
1893 "git push requires explicit user approval (approvalID={})",
1894 pending.id
1895 ));
1896 }
1897 }
1898 }
1899 }
1900 if cmd.contains("git commit") && !caps.git_caps.commit {
1901 return Some("git commit disabled for this agent instance".to_string());
1902 }
1903 }
1904
1905 let access_kind = tool_fs_access_kind(tool);
1906 if let Some(kind) = access_kind {
1907 let Some(session) = state.storage.get_session(session_id).await else {
1908 return Some("session not found for capability evaluation".to_string());
1909 };
1910 let Some(root) = session.workspace_root.clone() else {
1911 return Some("workspace root missing for capability evaluation".to_string());
1912 };
1913 let requested = extract_tool_candidate_paths(tool, args);
1914 if !requested.is_empty() {
1915 let allowed_scopes = if kind == "read" {
1916 &caps.fs_scopes.read
1917 } else {
1918 &caps.fs_scopes.write
1919 };
1920 if allowed_scopes.is_empty() {
1921 return Some(format!("fs {kind} access blocked: no scopes configured"));
1922 }
1923 for candidate in requested {
1924 if !is_path_allowed_by_scopes(&root, &candidate, allowed_scopes) {
1925 return Some(format!("fs {kind} access denied for path `{}`", candidate));
1926 }
1927 }
1928 }
1929 }
1930
1931 denied_secrets_reason(tool, caps, args)
1932}
1933
1934fn denied_secrets_reason(
1935 tool: &str,
1936 caps: &tandem_orchestrator::CapabilitySpec,
1937 args: &Value,
1938) -> Option<String> {
1939 if tool == "auth" {
1940 if caps.secrets_scopes.is_empty() {
1941 return Some("secrets are disabled for this agent instance".to_string());
1942 }
1943 let alias = args
1944 .get("id")
1945 .or_else(|| args.get("provider"))
1946 .or_else(|| args.get("providerID"))
1947 .and_then(|v| v.as_str())
1948 .unwrap_or("")
1949 .trim();
1950 if !alias.is_empty() && !caps.secrets_scopes.iter().any(|allowed| allowed == alias) {
1951 return Some(format!(
1952 "secret alias `{alias}` is not in agent secretsScopes allowlist"
1953 ));
1954 }
1955 }
1956 None
1957}
1958
1959fn tool_fs_access_kind(tool: &str) -> Option<&'static str> {
1960 match tool {
1961 "read" | "glob" | "grep" | "codesearch" | "lsp" => Some("read"),
1962 "write" | "edit" | "apply_patch" => Some("write"),
1963 _ => None,
1964 }
1965}
1966
1967fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
1968 let Some(obj) = args.as_object() else {
1969 return Vec::new();
1970 };
1971 let keys: &[&str] = match tool {
1972 "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
1973 "glob" => &["pattern"],
1974 "lsp" => &["filePath", "path"],
1975 "bash" => &["cwd"],
1976 "apply_patch" => &["path"],
1977 _ => &["path", "cwd"],
1978 };
1979 keys.iter()
1980 .filter_map(|key| obj.get(*key))
1981 .filter_map(|value| value.as_str())
1982 .filter(|s| !s.trim().is_empty())
1983 .map(|raw| strip_glob_tokens(raw).to_string())
1984 .collect()
1985}
1986
1987fn strip_glob_tokens(path: &str) -> &str {
1988 let mut end = path.len();
1989 for (idx, ch) in path.char_indices() {
1990 if ch == '*' || ch == '?' || ch == '[' {
1991 end = idx;
1992 break;
1993 }
1994 }
1995 &path[..end]
1996}
1997
1998fn is_path_allowed_by_scopes(root: &str, candidate: &str, scopes: &[String]) -> bool {
1999 let root_path = PathBuf::from(root);
2000 let candidate_path = resolve_path(&root_path, candidate);
2001 scopes.iter().any(|scope| {
2002 let scope_path = resolve_path(&root_path, scope);
2003 candidate_path.starts_with(scope_path)
2004 })
2005}
2006
2007fn resolve_path(root: &Path, raw: &str) -> PathBuf {
2008 let raw = raw.trim();
2009 if raw.is_empty() {
2010 return root.to_path_buf();
2011 }
2012 let path = PathBuf::from(raw);
2013 if path.is_absolute() {
2014 path
2015 } else {
2016 root.join(path)
2017 }
2018}
2019
2020fn extract_url_host(args: &Value) -> Option<String> {
2021 let url = args
2022 .get("url")
2023 .or_else(|| args.get("uri"))
2024 .or_else(|| args.get("link"))
2025 .and_then(|v| v.as_str())?;
2026 let raw = url.trim();
2027 let (_, after_scheme) = raw.split_once("://")?;
2028 let host_port = after_scheme.split('/').next().unwrap_or_default();
2029 let host = host_port.split('@').next_back().unwrap_or_default();
2030 let host = host
2031 .split(':')
2032 .next()
2033 .unwrap_or_default()
2034 .to_ascii_lowercase();
2035 if host.is_empty() {
2036 None
2037 } else {
2038 Some(host)
2039 }
2040}
2041
2042pub fn emit_spawn_requested(state: &AppState, req: &SpawnRequest) {
2043 emit_spawn_requested_with_context(state, req, &SpawnEventContext::default());
2044}
2045
2046pub fn emit_spawn_denied(state: &AppState, req: &SpawnRequest, decision: &SpawnDecision) {
2047 emit_spawn_denied_with_context(state, req, decision, &SpawnEventContext::default());
2048}
2049
2050pub fn emit_spawn_approved(state: &AppState, req: &SpawnRequest, instance: &AgentInstance) {
2051 emit_spawn_approved_with_context(state, req, instance, &SpawnEventContext::default());
2052}
2053
2054#[derive(Default)]
2055pub struct SpawnEventContext<'a> {
2056 pub session_id: Option<&'a str>,
2057 pub message_id: Option<&'a str>,
2058 pub run_id: Option<&'a str>,
2059}
2060
2061pub fn emit_spawn_requested_with_context(
2062 state: &AppState,
2063 req: &SpawnRequest,
2064 ctx: &SpawnEventContext<'_>,
2065) {
2066 state.event_bus.publish(EngineEvent::new(
2067 "agent_team.spawn.requested",
2068 json!({
2069 "sessionID": ctx.session_id,
2070 "messageID": ctx.message_id,
2071 "runID": ctx.run_id,
2072 "missionID": req.mission_id,
2073 "instanceID": Value::Null,
2074 "parentInstanceID": req.parent_instance_id,
2075 "source": req.source,
2076 "requestedRole": req.role,
2077 "templateID": req.template_id,
2078 "justification": req.justification,
2079 "timestampMs": crate::now_ms(),
2080 }),
2081 ));
2082}
2083
2084pub fn emit_spawn_denied_with_context(
2085 state: &AppState,
2086 req: &SpawnRequest,
2087 decision: &SpawnDecision,
2088 ctx: &SpawnEventContext<'_>,
2089) {
2090 state.event_bus.publish(EngineEvent::new(
2091 "agent_team.spawn.denied",
2092 json!({
2093 "sessionID": ctx.session_id,
2094 "messageID": ctx.message_id,
2095 "runID": ctx.run_id,
2096 "missionID": req.mission_id,
2097 "instanceID": Value::Null,
2098 "parentInstanceID": req.parent_instance_id,
2099 "source": req.source,
2100 "requestedRole": req.role,
2101 "templateID": req.template_id,
2102 "code": decision.code,
2103 "error": decision.reason,
2104 "timestampMs": crate::now_ms(),
2105 }),
2106 ));
2107}
2108
2109pub fn emit_spawn_approved_with_context(
2110 state: &AppState,
2111 req: &SpawnRequest,
2112 instance: &AgentInstance,
2113 ctx: &SpawnEventContext<'_>,
2114) {
2115 state.event_bus.publish(EngineEvent::new(
2116 "agent_team.spawn.approved",
2117 json!({
2118 "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2119 "messageID": ctx.message_id,
2120 "runID": ctx.run_id.or(instance.run_id.as_deref()),
2121 "missionID": instance.mission_id,
2122 "instanceID": instance.instance_id,
2123 "parentInstanceID": instance.parent_instance_id,
2124 "source": req.source,
2125 "requestedRole": req.role,
2126 "templateID": instance.template_id,
2127 "skillHash": instance.skill_hash,
2128 "timestampMs": crate::now_ms(),
2129 }),
2130 ));
2131 state.event_bus.publish(EngineEvent::new(
2132 "agent_team.instance.started",
2133 json!({
2134 "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
2135 "messageID": ctx.message_id,
2136 "runID": ctx.run_id.or(instance.run_id.as_deref()),
2137 "missionID": instance.mission_id,
2138 "instanceID": instance.instance_id,
2139 "parentInstanceID": instance.parent_instance_id,
2140 "role": instance.role,
2141 "status": instance.status,
2142 "budgetLimit": instance.budget,
2143 "skillHash": instance.skill_hash,
2144 "timestampMs": crate::now_ms(),
2145 }),
2146 ));
2147}
2148
2149pub fn emit_budget_usage(
2150 state: &AppState,
2151 instance: &AgentInstance,
2152 tokens_used: u64,
2153 steps_used: u32,
2154 tool_calls_used: u32,
2155 cost_used_usd: f64,
2156 elapsed_ms: u64,
2157) {
2158 state.event_bus.publish(EngineEvent::new(
2159 "agent_team.budget.usage",
2160 json!({
2161 "sessionID": instance.session_id,
2162 "messageID": Value::Null,
2163 "runID": instance.run_id,
2164 "missionID": instance.mission_id,
2165 "instanceID": instance.instance_id,
2166 "tokensUsed": tokens_used,
2167 "stepsUsed": steps_used,
2168 "toolCallsUsed": tool_calls_used,
2169 "costUsedUsd": cost_used_usd,
2170 "elapsedMs": elapsed_ms,
2171 "timestampMs": crate::now_ms(),
2172 }),
2173 ));
2174}
2175
2176#[allow(clippy::too_many_arguments)]
2177pub fn emit_budget_exhausted(
2178 state: &AppState,
2179 instance: &AgentInstance,
2180 exhausted_by: &str,
2181 tokens_used: u64,
2182 steps_used: u32,
2183 tool_calls_used: u32,
2184 cost_used_usd: f64,
2185 elapsed_ms: u64,
2186) {
2187 state.event_bus.publish(EngineEvent::new(
2188 "agent_team.budget.exhausted",
2189 json!({
2190 "sessionID": instance.session_id,
2191 "messageID": Value::Null,
2192 "runID": instance.run_id,
2193 "missionID": instance.mission_id,
2194 "instanceID": instance.instance_id,
2195 "exhaustedBy": exhausted_by,
2196 "tokensUsed": tokens_used,
2197 "stepsUsed": steps_used,
2198 "toolCallsUsed": tool_calls_used,
2199 "costUsedUsd": cost_used_usd,
2200 "elapsedMs": elapsed_ms,
2201 "timestampMs": crate::now_ms(),
2202 }),
2203 ));
2204}
2205
2206pub fn emit_instance_cancelled(state: &AppState, instance: &AgentInstance, reason: &str) {
2207 state.event_bus.publish(EngineEvent::new(
2208 "agent_team.instance.cancelled",
2209 json!({
2210 "sessionID": instance.session_id,
2211 "messageID": Value::Null,
2212 "runID": instance.run_id,
2213 "missionID": instance.mission_id,
2214 "instanceID": instance.instance_id,
2215 "parentInstanceID": instance.parent_instance_id,
2216 "role": instance.role,
2217 "status": instance.status,
2218 "reason": reason,
2219 "timestampMs": crate::now_ms(),
2220 }),
2221 ));
2222}
2223
2224pub fn emit_instance_completed(state: &AppState, instance: &AgentInstance) {
2225 state.event_bus.publish(EngineEvent::new(
2226 "agent_team.instance.completed",
2227 json!({
2228 "sessionID": instance.session_id,
2229 "messageID": Value::Null,
2230 "runID": instance.run_id,
2231 "missionID": instance.mission_id,
2232 "instanceID": instance.instance_id,
2233 "parentInstanceID": instance.parent_instance_id,
2234 "role": instance.role,
2235 "status": instance.status,
2236 "timestampMs": crate::now_ms(),
2237 }),
2238 ));
2239}
2240
2241pub fn emit_instance_failed(state: &AppState, instance: &AgentInstance) {
2242 state.event_bus.publish(EngineEvent::new(
2243 "agent_team.instance.failed",
2244 json!({
2245 "sessionID": instance.session_id,
2246 "messageID": Value::Null,
2247 "runID": instance.run_id,
2248 "missionID": instance.mission_id,
2249 "instanceID": instance.instance_id,
2250 "parentInstanceID": instance.parent_instance_id,
2251 "role": instance.role,
2252 "status": instance.status,
2253 "timestampMs": crate::now_ms(),
2254 }),
2255 ));
2256}
2257
2258pub fn emit_mission_budget_exhausted(
2259 state: &AppState,
2260 mission_id: &str,
2261 exhausted_by: &str,
2262 tokens_used: u64,
2263 steps_used: u64,
2264 tool_calls_used: u64,
2265 cost_used_usd: f64,
2266) {
2267 state.event_bus.publish(EngineEvent::new(
2268 "agent_team.mission.budget.exhausted",
2269 json!({
2270 "sessionID": Value::Null,
2271 "messageID": Value::Null,
2272 "runID": Value::Null,
2273 "missionID": mission_id,
2274 "instanceID": Value::Null,
2275 "exhaustedBy": exhausted_by,
2276 "tokensUsed": tokens_used,
2277 "stepsUsed": steps_used,
2278 "toolCallsUsed": tool_calls_used,
2279 "costUsedUsd": cost_used_usd,
2280 "timestampMs": crate::now_ms(),
2281 }),
2282 ));
2283}