Skip to main content

tandem_server/agent_teams_parts/
part02.rs

1
2fn merge_metadata_usage(
3    metadata: Option<Value>,
4    tokens_used: u64,
5    steps_used: u32,
6    tool_calls_used: u32,
7    cost_used_usd: f64,
8    elapsed_ms: u64,
9) -> Value {
10    let mut base = metadata
11        .and_then(|v| v.as_object().cloned())
12        .unwrap_or_default();
13    base.insert(
14        "budgetUsage".to_string(),
15        json!({
16            "tokensUsed": tokens_used,
17            "stepsUsed": steps_used,
18            "toolCallsUsed": tool_calls_used,
19            "costUsedUsd": cost_used_usd,
20            "elapsedMs": elapsed_ms
21        }),
22    );
23    Value::Object(base)
24}
25
26fn instance_workspace_root(instance: &AgentInstance) -> Option<Value> {
27    instance
28        .metadata
29        .as_ref()
30        .and_then(|row| row.get("workspaceRoot"))
31        .cloned()
32}
33
34fn instance_workspace_repo_root(instance: &AgentInstance) -> Option<Value> {
35    instance
36        .metadata
37        .as_ref()
38        .and_then(|row| row.get("workspaceRepoRoot"))
39        .cloned()
40}
41
42fn instance_managed_worktree(instance: &AgentInstance) -> Option<Value> {
43    instance
44        .metadata
45        .as_ref()
46        .and_then(|row| row.get("managedWorktree"))
47        .cloned()
48}
49
50async fn prepare_agent_instance_workspace(
51    state: &AppState,
52    workspace_root: &str,
53    mission_id: Option<&str>,
54    instance_id: &str,
55    template_id: &str,
56) -> Option<crate::runtime::worktrees::ManagedWorktreeEnsureResult> {
57    let repo_root = crate::runtime::worktrees::resolve_git_repo_root(workspace_root)?;
58    crate::runtime::worktrees::ensure_managed_worktree(
59        state,
60        crate::runtime::worktrees::ManagedWorktreeEnsureInput {
61            repo_root,
62            task_id: mission_id.map(ToString::to_string),
63            owner_run_id: Some(instance_id.to_string()),
64            lease_id: None,
65            branch_hint: Some(template_id.to_string()),
66            base: "HEAD".to_string(),
67            cleanup_branch: true,
68        },
69    )
70    .await
71    .ok()
72}
73
74async fn cleanup_instance_managed_worktree(state: &AppState, instance: &AgentInstance) {
75    let Some(metadata) = instance.metadata.as_ref() else {
76        return;
77    };
78    let Some(worktree) = metadata.get("managedWorktree").and_then(Value::as_object) else {
79        return;
80    };
81    let Some(path) = worktree.get("path").and_then(Value::as_str) else {
82        return;
83    };
84    let Some(branch) = worktree.get("branch").and_then(Value::as_str) else {
85        return;
86    };
87    let Some(repo_root) = worktree.get("repoRoot").and_then(Value::as_str) else {
88        return;
89    };
90    let record = crate::ManagedWorktreeRecord {
91        key: crate::runtime::worktrees::managed_worktree_key(
92            repo_root,
93            instance.mission_id.as_str().into(),
94            Some(instance.instance_id.as_str()),
95            None,
96            path,
97            branch,
98        ),
99        repo_root: repo_root.to_string(),
100        path: path.to_string(),
101        branch: branch.to_string(),
102        base: "HEAD".to_string(),
103        managed: true,
104        task_id: Some(instance.mission_id.clone()),
105        owner_run_id: Some(instance.instance_id.clone()),
106        lease_id: None,
107        cleanup_branch: worktree
108            .get("cleanupBranch")
109            .and_then(Value::as_bool)
110            .unwrap_or(true),
111        created_at_ms: 0,
112        updated_at_ms: 0,
113    };
114    let _ = crate::runtime::worktrees::delete_managed_worktree(state, &record).await;
115}
116
117fn normalize_tool_name(name: &str) -> String {
118    match name.trim().to_lowercase().replace('-', "_").as_str() {
119        "todowrite" | "update_todo_list" | "update_todos" => "todo_write".to_string(),
120        other => other.to_string(),
121    }
122}
123
124async fn evaluate_capability_deny(
125    state: &AppState,
126    instance: &AgentInstance,
127    tool: &str,
128    args: &Value,
129    caps: &tandem_orchestrator::CapabilitySpec,
130    session_id: &str,
131    message_id: &str,
132) -> Option<String> {
133    let deny_patterns = caps
134        .tool_denylist
135        .iter()
136        .map(|name| normalize_tool_name(name))
137        .collect::<Vec<_>>();
138    if !deny_patterns.is_empty() && any_policy_matches(&deny_patterns, tool) {
139        return Some(format!("tool `{tool}` denied by agent capability policy"));
140    }
141    let allow_patterns = caps
142        .tool_allowlist
143        .iter()
144        .map(|name| normalize_tool_name(name))
145        .collect::<Vec<_>>();
146    if !allow_patterns.is_empty() && !any_policy_matches(&allow_patterns, tool) {
147        return Some(format!("tool `{tool}` not in agent allowlist"));
148    }
149
150    let browser_execution_tool = matches!(
151        tool,
152        "browser_open"
153            | "browser_navigate"
154            | "browser_snapshot"
155            | "browser_click"
156            | "browser_type"
157            | "browser_press"
158            | "browser_wait"
159            | "browser_extract"
160            | "browser_screenshot"
161            | "browser_close"
162    );
163
164    if matches!(
165        tool,
166        "websearch" | "webfetch" | "webfetch_html" | "browser_open" | "browser_navigate"
167    ) || browser_execution_tool
168    {
169        if !caps.net_scopes.enabled {
170            return Some("network disabled for this agent instance".to_string());
171        }
172        if !caps.net_scopes.allow_hosts.is_empty() {
173            if tool == "websearch" {
174                return Some(
175                    "websearch blocked: host allowlist cannot be verified for search tool"
176                        .to_string(),
177                );
178            }
179            if let Some(host) = extract_url_host(args) {
180                let allowed = caps.net_scopes.allow_hosts.iter().any(|h| {
181                    let allowed = h.trim().to_ascii_lowercase();
182                    !allowed.is_empty()
183                        && (host == allowed || host.ends_with(&format!(".{allowed}")))
184                });
185                if !allowed {
186                    return Some(format!("network host `{host}` not in allow_hosts"));
187                }
188            }
189        }
190    }
191
192    if tool == "bash" {
193        let cmd = args
194            .get("command")
195            .and_then(|v| v.as_str())
196            .unwrap_or("")
197            .to_ascii_lowercase();
198        if cmd.contains("git push") {
199            if !caps.git_caps.push {
200                return Some("git push disabled for this agent instance".to_string());
201            }
202            if caps.git_caps.push_requires_approval {
203                let action = state.permissions.evaluate("git_push", "git_push").await;
204                match action {
205                    tandem_core::PermissionAction::Allow => {}
206                    tandem_core::PermissionAction::Deny => {
207                        return Some("git push denied by policy rule".to_string());
208                    }
209                    tandem_core::PermissionAction::Ask => {
210                        let pending = state
211                            .permissions
212                            .ask_for_session_with_context(
213                                Some(session_id),
214                                "git_push",
215                                args.clone(),
216                                Some(tandem_core::PermissionArgsContext {
217                                    args_source: "agent_team.git_push".to_string(),
218                                    args_integrity: "runtime-checked".to_string(),
219                                    query: Some(format!(
220                                        "instanceID={} messageID={}",
221                                        instance.instance_id, message_id
222                                    )),
223                                }),
224                            )
225                            .await;
226                        return Some(format!(
227                            "git push requires explicit user approval (approvalID={})",
228                            pending.id
229                        ));
230                    }
231                }
232            }
233        }
234        if cmd.contains("git commit") && !caps.git_caps.commit {
235            return Some("git commit disabled for this agent instance".to_string());
236        }
237    }
238
239    let access_kind = tool_fs_access_kind(tool);
240    if let Some(kind) = access_kind {
241        let Some(session) = state.storage.get_session(session_id).await else {
242            return Some("session not found for capability evaluation".to_string());
243        };
244        let Some(root) = session.workspace_root.clone() else {
245            return Some("workspace root missing for capability evaluation".to_string());
246        };
247        let requested = extract_tool_candidate_paths(tool, args);
248        if !requested.is_empty() {
249            let allowed_scopes = if kind == "read" {
250                &caps.fs_scopes.read
251            } else {
252                &caps.fs_scopes.write
253            };
254            if allowed_scopes.is_empty() {
255                return Some(format!("fs {kind} access blocked: no scopes configured"));
256            }
257            for candidate in requested {
258                if !is_path_allowed_by_scopes(&root, &candidate, allowed_scopes) {
259                    return Some(format!("fs {kind} access denied for path `{}`", candidate));
260                }
261            }
262        }
263    }
264
265    denied_secrets_reason(tool, caps, args)
266}
267
268fn denied_secrets_reason(
269    tool: &str,
270    caps: &tandem_orchestrator::CapabilitySpec,
271    args: &Value,
272) -> Option<String> {
273    if tool == "auth" {
274        if caps.secrets_scopes.is_empty() {
275            return Some("secrets are disabled for this agent instance".to_string());
276        }
277        let alias = args
278            .get("id")
279            .or_else(|| args.get("provider"))
280            .or_else(|| args.get("providerID"))
281            .and_then(|v| v.as_str())
282            .unwrap_or("")
283            .trim();
284        if !alias.is_empty() && !caps.secrets_scopes.iter().any(|allowed| allowed == alias) {
285            return Some(format!(
286                "secret alias `{alias}` is not in agent secretsScopes allowlist"
287            ));
288        }
289    }
290    None
291}
292
293fn tool_fs_access_kind(tool: &str) -> Option<&'static str> {
294    match tool {
295        "read" | "glob" | "grep" | "codesearch" | "lsp" => Some("read"),
296        "write" | "edit" | "apply_patch" => Some("write"),
297        _ => None,
298    }
299}
300
301fn extract_tool_candidate_paths(tool: &str, args: &Value) -> Vec<String> {
302    let Some(obj) = args.as_object() else {
303        return Vec::new();
304    };
305    let keys: &[&str] = match tool {
306        "read" | "write" | "edit" | "grep" | "codesearch" => &["path", "filePath", "cwd"],
307        "glob" => &["pattern"],
308        "lsp" => &["filePath", "path"],
309        "bash" => &["cwd"],
310        "apply_patch" => &["path"],
311        _ => &["path", "cwd"],
312    };
313    keys.iter()
314        .filter_map(|key| obj.get(*key))
315        .filter_map(|value| value.as_str())
316        .filter(|s| !s.trim().is_empty())
317        .map(|raw| strip_glob_tokens(raw).to_string())
318        .collect()
319}
320
321fn strip_glob_tokens(path: &str) -> &str {
322    let mut end = path.len();
323    for (idx, ch) in path.char_indices() {
324        if ch == '*' || ch == '?' || ch == '[' {
325            end = idx;
326            break;
327        }
328    }
329    &path[..end]
330}
331
332fn is_path_allowed_by_scopes(root: &str, candidate: &str, scopes: &[String]) -> bool {
333    let root_path = PathBuf::from(root);
334    let candidate_path = resolve_path(&root_path, candidate);
335    scopes.iter().any(|scope| {
336        let scope_path = resolve_path(&root_path, scope);
337        candidate_path.starts_with(scope_path)
338    })
339}
340
341fn resolve_path(root: &Path, raw: &str) -> PathBuf {
342    let raw = raw.trim();
343    if raw.is_empty() {
344        return root.to_path_buf();
345    }
346    let path = PathBuf::from(raw);
347    if path.is_absolute() {
348        path
349    } else {
350        root.join(path)
351    }
352}
353
354fn extract_url_host(args: &Value) -> Option<String> {
355    let url = args
356        .get("url")
357        .or_else(|| args.get("uri"))
358        .or_else(|| args.get("link"))
359        .and_then(|v| v.as_str())?;
360    let raw = url.trim();
361    let (_, after_scheme) = raw.split_once("://")?;
362    let host_port = after_scheme.split('/').next().unwrap_or_default();
363    let host = host_port.split('@').next_back().unwrap_or_default();
364    let host = host
365        .split(':')
366        .next()
367        .unwrap_or_default()
368        .to_ascii_lowercase();
369    if host.is_empty() {
370        None
371    } else {
372        Some(host)
373    }
374}
375
376pub fn emit_spawn_requested(state: &AppState, req: &SpawnRequest) {
377    emit_spawn_requested_with_context(state, req, &SpawnEventContext::default());
378}
379
380pub fn emit_spawn_denied(state: &AppState, req: &SpawnRequest, decision: &SpawnDecision) {
381    emit_spawn_denied_with_context(state, req, decision, &SpawnEventContext::default());
382}
383
384pub fn emit_spawn_approved(state: &AppState, req: &SpawnRequest, instance: &AgentInstance) {
385    emit_spawn_approved_with_context(state, req, instance, &SpawnEventContext::default());
386}
387
388#[derive(Default)]
389pub struct SpawnEventContext<'a> {
390    pub session_id: Option<&'a str>,
391    pub message_id: Option<&'a str>,
392    pub run_id: Option<&'a str>,
393}
394
395pub fn emit_spawn_requested_with_context(
396    state: &AppState,
397    req: &SpawnRequest,
398    ctx: &SpawnEventContext<'_>,
399) {
400    state.event_bus.publish(EngineEvent::new(
401        "agent_team.spawn.requested",
402        json!({
403            "sessionID": ctx.session_id,
404            "messageID": ctx.message_id,
405            "runID": ctx.run_id,
406            "missionID": req.mission_id,
407            "instanceID": Value::Null,
408            "parentInstanceID": req.parent_instance_id,
409            "source": req.source,
410            "requestedRole": req.role,
411            "templateID": req.template_id,
412            "justification": req.justification,
413            "timestampMs": crate::now_ms(),
414        }),
415    ));
416}
417
418pub fn emit_spawn_denied_with_context(
419    state: &AppState,
420    req: &SpawnRequest,
421    decision: &SpawnDecision,
422    ctx: &SpawnEventContext<'_>,
423) {
424    state.event_bus.publish(EngineEvent::new(
425        "agent_team.spawn.denied",
426        json!({
427            "sessionID": ctx.session_id,
428            "messageID": ctx.message_id,
429            "runID": ctx.run_id,
430            "missionID": req.mission_id,
431            "instanceID": Value::Null,
432            "parentInstanceID": req.parent_instance_id,
433            "source": req.source,
434            "requestedRole": req.role,
435            "templateID": req.template_id,
436            "code": decision.code,
437            "error": decision.reason,
438            "timestampMs": crate::now_ms(),
439        }),
440    ));
441}
442
443pub fn emit_spawn_approved_with_context(
444    state: &AppState,
445    req: &SpawnRequest,
446    instance: &AgentInstance,
447    ctx: &SpawnEventContext<'_>,
448) {
449    state.event_bus.publish(EngineEvent::new(
450        "agent_team.spawn.approved",
451        json!({
452            "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
453            "messageID": ctx.message_id,
454            "runID": ctx.run_id.or(instance.run_id.as_deref()),
455            "missionID": instance.mission_id,
456            "instanceID": instance.instance_id,
457            "parentInstanceID": instance.parent_instance_id,
458            "source": req.source,
459            "requestedRole": req.role,
460            "templateID": instance.template_id,
461            "skillHash": instance.skill_hash,
462            "workspaceRoot": instance_workspace_root(instance),
463            "workspaceRepoRoot": instance_workspace_repo_root(instance),
464            "managedWorktree": instance_managed_worktree(instance),
465            "timestampMs": crate::now_ms(),
466        }),
467    ));
468    state.event_bus.publish(EngineEvent::new(
469        "agent_team.instance.started",
470        json!({
471            "sessionID": ctx.session_id.unwrap_or(&instance.session_id),
472            "messageID": ctx.message_id,
473            "runID": ctx.run_id.or(instance.run_id.as_deref()),
474            "missionID": instance.mission_id,
475            "instanceID": instance.instance_id,
476            "parentInstanceID": instance.parent_instance_id,
477            "role": instance.role,
478            "status": instance.status,
479            "budgetLimit": instance.budget,
480            "skillHash": instance.skill_hash,
481            "workspaceRoot": instance_workspace_root(instance),
482            "workspaceRepoRoot": instance_workspace_repo_root(instance),
483            "managedWorktree": instance_managed_worktree(instance),
484            "timestampMs": crate::now_ms(),
485        }),
486    ));
487}
488
489pub fn emit_budget_usage(
490    state: &AppState,
491    instance: &AgentInstance,
492    tokens_used: u64,
493    steps_used: u32,
494    tool_calls_used: u32,
495    cost_used_usd: f64,
496    elapsed_ms: u64,
497) {
498    state.event_bus.publish(EngineEvent::new(
499        "agent_team.budget.usage",
500        json!({
501            "sessionID": instance.session_id,
502            "messageID": Value::Null,
503            "runID": instance.run_id,
504            "missionID": instance.mission_id,
505            "instanceID": instance.instance_id,
506            "tokensUsed": tokens_used,
507            "stepsUsed": steps_used,
508            "toolCallsUsed": tool_calls_used,
509            "costUsedUsd": cost_used_usd,
510            "elapsedMs": elapsed_ms,
511            "timestampMs": crate::now_ms(),
512        }),
513    ));
514}
515
516pub fn emit_budget_exhausted(
517    state: &AppState,
518    instance: &AgentInstance,
519    exhausted_by: &str,
520    tokens_used: u64,
521    steps_used: u32,
522    tool_calls_used: u32,
523    cost_used_usd: f64,
524    elapsed_ms: u64,
525) {
526    state.event_bus.publish(EngineEvent::new(
527        "agent_team.budget.exhausted",
528        json!({
529            "sessionID": instance.session_id,
530            "messageID": Value::Null,
531            "runID": instance.run_id,
532            "missionID": instance.mission_id,
533            "instanceID": instance.instance_id,
534            "exhaustedBy": exhausted_by,
535            "tokensUsed": tokens_used,
536            "stepsUsed": steps_used,
537            "toolCallsUsed": tool_calls_used,
538            "costUsedUsd": cost_used_usd,
539            "elapsedMs": elapsed_ms,
540            "timestampMs": crate::now_ms(),
541        }),
542    ));
543}
544
545pub fn emit_instance_cancelled(state: &AppState, instance: &AgentInstance, reason: &str) {
546    state.event_bus.publish(EngineEvent::new(
547        "agent_team.instance.cancelled",
548        json!({
549            "sessionID": instance.session_id,
550            "messageID": Value::Null,
551            "runID": instance.run_id,
552            "missionID": instance.mission_id,
553            "instanceID": instance.instance_id,
554            "parentInstanceID": instance.parent_instance_id,
555            "role": instance.role,
556            "status": instance.status,
557            "reason": reason,
558            "workspaceRoot": instance_workspace_root(instance),
559            "workspaceRepoRoot": instance_workspace_repo_root(instance),
560            "managedWorktree": instance_managed_worktree(instance),
561            "timestampMs": crate::now_ms(),
562        }),
563    ));
564}
565
566pub fn emit_instance_completed(state: &AppState, instance: &AgentInstance) {
567    state.event_bus.publish(EngineEvent::new(
568        "agent_team.instance.completed",
569        json!({
570            "sessionID": instance.session_id,
571            "messageID": Value::Null,
572            "runID": instance.run_id,
573            "missionID": instance.mission_id,
574            "instanceID": instance.instance_id,
575            "parentInstanceID": instance.parent_instance_id,
576            "role": instance.role,
577            "status": instance.status,
578            "workspaceRoot": instance_workspace_root(instance),
579            "workspaceRepoRoot": instance_workspace_repo_root(instance),
580            "managedWorktree": instance_managed_worktree(instance),
581            "timestampMs": crate::now_ms(),
582        }),
583    ));
584}
585
586pub fn emit_instance_failed(state: &AppState, instance: &AgentInstance) {
587    state.event_bus.publish(EngineEvent::new(
588        "agent_team.instance.failed",
589        json!({
590            "sessionID": instance.session_id,
591            "messageID": Value::Null,
592            "runID": instance.run_id,
593            "missionID": instance.mission_id,
594            "instanceID": instance.instance_id,
595            "parentInstanceID": instance.parent_instance_id,
596            "role": instance.role,
597            "status": instance.status,
598            "workspaceRoot": instance_workspace_root(instance),
599            "workspaceRepoRoot": instance_workspace_repo_root(instance),
600            "managedWorktree": instance_managed_worktree(instance),
601            "timestampMs": crate::now_ms(),
602        }),
603    ));
604}
605
606pub fn emit_mission_budget_exhausted(
607    state: &AppState,
608    mission_id: &str,
609    exhausted_by: &str,
610    tokens_used: u64,
611    steps_used: u64,
612    tool_calls_used: u64,
613    cost_used_usd: f64,
614) {
615    state.event_bus.publish(EngineEvent::new(
616        "agent_team.mission.budget.exhausted",
617        json!({
618            "sessionID": Value::Null,
619            "messageID": Value::Null,
620            "runID": Value::Null,
621            "missionID": mission_id,
622            "instanceID": Value::Null,
623            "exhaustedBy": exhausted_by,
624            "tokensUsed": tokens_used,
625            "stepsUsed": steps_used,
626            "toolCallsUsed": tool_calls_used,
627            "costUsedUsd": cost_used_usd,
628            "timestampMs": crate::now_ms(),
629        }),
630    ));
631}