ai_agent/utils/forked_agent.rs
1// Source: ~/claudecode/openclaudecode/src/utils/forkedAgent.ts
2//! Helper for running forked agent query loops with usage tracking.
3//!
4//! This utility ensures forked agents:
5//! 1. Share identical cache-critical params with the parent to guarantee prompt cache hits
6//! 2. Track full usage metrics across the entire query loop
7//! 3. Log metrics via the tengu_fork_agent_query event when complete
8//! 4. Isolate mutable state to prevent interference with the main agent loop
9
10use std::collections::{HashMap, HashSet};
11use std::sync::Arc;
12
13use crate::tool::{DenialTrackingState, QueryChainTracking, ToolUseContext, ToolUseContextOptions};
14use crate::types::message::Message;
15use crate::utils::abort_controller::{AbortController, create_child_abort_controller};
16use crate::utils::file_state_cache::{FileStateCache, clone_file_state_cache};
17use crate::utils::messages::Usage;
18use crate::utils::uuid::create_agent_id;
19
20// ---------------------------------------------------------------------------
21// CacheSafeParams
22// ---------------------------------------------------------------------------
23
24/// Parameters that must be identical between the fork and parent API requests
25/// to share the parent's prompt cache. The Anthropic API cache key is composed of:
26/// system prompt, tools, model, messages (prefix), and thinking config.
27///
28/// `CacheSafeParams` carries the first five. Thinking config is derived from the
29/// inherited `tool_use_context.options.thinking_config` — but can be inadvertently
30/// changed if the fork sets `max_output_tokens`, which clamps `budget_tokens` in
31/// claude.ts (but only for older models that do not use adaptive thinking).
32/// See the `max_output_tokens` doc on `ForkedAgentConfig`.
33#[derive(Clone)]
34pub struct CacheSafeParams {
35 /// System prompt - must match parent for cache hits
36 pub system_prompt: String,
37 /// User context - prepended to messages, affects cache
38 pub user_context: HashMap<String, String>,
39 /// System context - appended to system prompt, affects cache
40 pub system_context: HashMap<String, String>,
41 /// Tool use context containing tools, model, and other options
42 pub tool_use_context: Arc<ToolUseContext>,
43 /// Parent context messages for prompt cache sharing
44 pub fork_context_messages: Vec<Message>,
45}
46
47// Slot written by handle_stop_hooks after each turn so post-turn forks
48// (prompt_suggestion, post_turn_summary, /btw) can share the main loop's
49// prompt cache without each caller threading params through.
50static LAST_CACHE_SAFE_PARAMS: std::sync::Mutex<Option<CacheSafeParams>> =
51 std::sync::Mutex::new(None);
52
53/// Save cache-safe params for later retrieval by post-turn forks.
54pub fn save_cache_safe_params(params: Option<CacheSafeParams>) {
55 let mut guard = LAST_CACHE_SAFE_PARAMS.lock().unwrap();
56 *guard = params;
57}
58
59/// Get the last saved cache-safe params.
60pub fn get_last_cache_safe_params() -> Option<CacheSafeParams> {
61 LAST_CACHE_SAFE_PARAMS.lock().unwrap().clone()
62}
63
64// ---------------------------------------------------------------------------
65// ForkedAgentConfig / ForkedAgentResult
66// ---------------------------------------------------------------------------
67
68/// Source identifier for tracking query origins.
69#[derive(Debug, Clone)]
70pub struct QuerySource(pub String);
71
72/// CanUseTool function type - determines whether a tool may be executed.
73pub type CanUseToolFn = dyn Fn(
74 &serde_json::Value, // tool definition
75 &serde_json::Value, // input
76 Arc<ToolUseContext>,
77 Arc<crate::types::message::AssistantMessage>,
78 &str, // query source
79 bool, // is explicit
80 ) -> std::pin::Pin<
81 Box<dyn std::future::Future<Output = Result<PermissionDecision, String>> + Send>,
82 > + Send
83 + Sync;
84
85/// Permission decision from can_use_tool.
86#[derive(Debug, Clone)]
87pub enum PermissionDecision {
88 Allow,
89 Deny { reason: Option<String> },
90 Ask { expires_at: Option<u64> },
91}
92
93/// Options for creating a subagent context.
94///
95/// By default, all mutable state is isolated to prevent interference with the parent.
96/// Use these options to:
97/// - Override specific fields (e.g., custom options, agent_id, messages)
98/// - Explicitly opt-in to sharing specific callbacks (for interactive subagents)
99#[derive(Clone)]
100pub struct SubagentContextOverrides {
101 /// Override the options object (e.g., custom tools, model)
102 pub options: Option<ToolUseContextOptions>,
103 /// Override the agent_id (for subagents with their own ID)
104 pub agent_id: Option<String>,
105 /// Override the agent_type (for subagents with a specific type)
106 pub agent_type: Option<String>,
107 /// Override the messages array
108 pub messages: Option<Vec<Message>>,
109 /// Override the read_file_state (e.g., fresh cache instead of clone)
110 pub read_file_state: Option<Arc<FileStateCache>>,
111 /// Override the abort_controller
112 pub abort_controller: Option<Arc<AbortController>>,
113 /// Override the get_app_state function
114 pub get_app_state: Option<Arc<dyn Fn() -> Box<dyn std::any::Any> + Send + Sync>>,
115
116 /// Explicit opt-in to share parent's set_app_state callback.
117 /// Use for interactive subagents that need to update shared state.
118 /// @default false (isolated no-op)
119 pub share_set_app_state: bool,
120 /// Explicit opt-in to share parent's set_response_length callback.
121 /// Use for subagents that contribute to parent's response metrics.
122 /// @default false (isolated no-op)
123 pub share_set_response_length: bool,
124 /// Explicit opt-in to share parent's abort_controller.
125 /// Use for interactive subagents that should abort with parent.
126 /// Note: Only applies if abort_controller override is not provided.
127 /// @default false (new controller linked to parent)
128 pub share_abort_controller: bool,
129 /// Critical system reminder to re-inject at every user turn
130 pub critical_system_reminder_experimental: Option<String>,
131 /// When true, can_use_tool must always be called even when hooks auto-approve.
132 /// Used by speculation for overlay file path rewriting.
133 pub require_can_use_tool: Option<bool>,
134 /// Override content_replacement_state — used by resumeAgentBackground to thread
135 /// state reconstructed from the resumed sidechain so the same results
136 /// are re-replaced (prompt cache stability).
137 pub content_replacement_state: Option<Arc<dyn std::any::Any + Send + Sync>>,
138}
139
140impl Default for SubagentContextOverrides {
141 fn default() -> Self {
142 Self {
143 options: None,
144 agent_id: None,
145 agent_type: None,
146 messages: None,
147 read_file_state: None,
148 abort_controller: None,
149 get_app_state: None,
150 share_set_app_state: false,
151 share_set_response_length: false,
152 share_abort_controller: false,
153 critical_system_reminder_experimental: None,
154 require_can_use_tool: None,
155 content_replacement_state: None,
156 }
157 }
158}
159
160/// Configuration for a forked agent query.
161pub struct ForkedAgentConfig {
162 /// Messages to start the forked query loop with
163 pub prompt_messages: Vec<Message>,
164 /// Cache-safe parameters that must match the parent query
165 pub cache_safe_params: CacheSafeParams,
166 /// Permission check function for the forked agent
167 pub can_use_tool: Arc<CanUseToolFn>,
168 /// Source identifier for tracking
169 pub query_source: QuerySource,
170 /// Label for analytics (e.g., 'session_memory', 'supervisor')
171 pub fork_label: String,
172 /// Optional overrides for the subagent context
173 pub overrides: Option<SubagentContextOverrides>,
174 /// Optional cap on output tokens. CAUTION: setting this changes both max_tokens
175 /// AND budget_tokens (via clamping in claude.ts). If the fork uses cache_safe_params
176 /// to share the parent's prompt cache, a different budget_tokens will invalidate
177 /// the cache — thinking config is part of the cache key. Only set this when cache
178 /// sharing is not a goal (e.g., compact summaries).
179 pub max_output_tokens: Option<u64>,
180 /// Optional cap on number of turns (API round-trips)
181 pub max_turns: Option<u32>,
182 /// Optional callback invoked for each message as it arrives (for streaming UI)
183 pub on_message: Option<Arc<dyn Fn(Message) + Send + Sync>>,
184 /// Skip sidechain transcript recording (e.g., for ephemeral work like speculation)
185 pub skip_transcript: bool,
186 /// Skip writing new prompt cache entries on the last message. For
187 /// fire-and-forget forks where no future request will read from this prefix.
188 pub skip_cache_write: bool,
189}
190
191/// Result from a forked agent query.
192pub struct ForkedAgentResult {
193 /// All messages yielded during the query loop
194 pub messages: Vec<Message>,
195 /// Accumulated usage across all API calls in the loop
196 pub total_usage: Usage,
197}
198
199// ---------------------------------------------------------------------------
200// Helper: create_cache_safe_params
201// ---------------------------------------------------------------------------
202
203/// Creates `CacheSafeParams` from a parent `ToolUseContext`.
204/// Use this helper when forking from a post-sampling hook context.
205///
206/// To override specific fields (e.g., tool_use_context with cloned file state),
207/// clone the result and override the field.
208pub fn create_cache_safe_params(
209 system_prompt: String,
210 user_context: HashMap<String, String>,
211 system_context: HashMap<String, String>,
212 tool_use_context: Arc<ToolUseContext>,
213 fork_context_messages: Vec<Message>,
214) -> CacheSafeParams {
215 CacheSafeParams {
216 system_prompt,
217 user_context,
218 system_context,
219 tool_use_context,
220 fork_context_messages,
221 }
222}
223
224// ---------------------------------------------------------------------------
225// Helper: create_get_app_state_with_allowed_tools
226// ---------------------------------------------------------------------------
227
228/// Creates a modified get_app_state that adds allowed tools to the permission context.
229/// This is used by forked skill/command execution to grant tool permissions.
230pub fn create_get_app_state_with_allowed_tools(
231 base_get_app_state: Arc<dyn Fn() -> Box<dyn std::any::Any> + Send + Sync>,
232 allowed_tools: Vec<String>,
233) -> Arc<dyn Fn() -> Box<dyn std::any::Any> + Send + Sync> {
234 if allowed_tools.is_empty() {
235 return base_get_app_state;
236 }
237 Arc::new(move || {
238 let app_state = base_get_app_state();
239 // In a full implementation, this would modify the tool_permission_context
240 // to add the allowed_tools to always_allow_rules.command.
241 // For now, return the base state since the type is opaque.
242 app_state
243 })
244}
245
246// ---------------------------------------------------------------------------
247// PreparedForkedContext
248// ---------------------------------------------------------------------------
249
250/// Result from preparing a forked command context.
251pub struct PreparedForkedContext {
252 /// Skill content with args replaced
253 pub skill_content: String,
254 /// Modified get_app_state with allowed tools
255 pub modified_get_app_state: Arc<dyn Fn() -> Box<dyn std::any::Any> + Send + Sync>,
256 /// The general-purpose agent to use
257 pub base_agent: serde_json::Value,
258 /// Initial prompt messages
259 pub prompt_messages: Vec<Message>,
260}
261
262/// Prepares the context for executing a forked command/skill.
263/// This handles the common setup that both SkillTool and slash commands need.
264#[allow(dead_code)]
265pub async fn prepare_forked_command_context(
266 command: serde_json::Value, // PromptCommand as JSON
267 args: &str,
268 context: &ToolUseContext,
269) -> Result<PreparedForkedContext, String> {
270 // Get skill content with $ARGUMENTS replaced
271 // In a full implementation, this would call command.get_prompt_for_command(args, context)
272 let skill_content_for_msg = args.to_string();
273 let skill_content_for_result = skill_content_for_msg.clone();
274
275 // Parse and prepare allowed tools
276 let allowed_tools: Vec<String> = command
277 .get("allowed_tools")
278 .and_then(|v| v.as_array())
279 .map(|arr| {
280 arr.iter()
281 .filter_map(|v| v.as_str().map(String::from))
282 .collect()
283 })
284 .unwrap_or_default();
285
286 // Create modified context with allowed tools
287 // We can't capture `context` into a 'static closure, so we just use a no-op wrapper
288 let modified_get_app_state: Arc<dyn Fn() -> Box<dyn std::any::Any> + Send + Sync> =
289 Arc::new(|| Box::new(()) as Box<dyn std::any::Any>);
290 let _ = create_get_app_state_with_allowed_tools; // unused in this simplified version
291
292 // Use command.agent if specified, otherwise 'general-purpose'
293 let agent_type_name = command
294 .get("agent")
295 .and_then(|v| v.as_str())
296 .unwrap_or("general-purpose");
297
298 let agents = context.options.agent_definitions.active_agents.clone();
299 let base_agent = agents
300 .iter()
301 .find(|a: &&serde_json::Value| {
302 a.get("agent_type")
303 .and_then(|v| v.as_str())
304 .map(|s| s == agent_type_name)
305 .unwrap_or(false)
306 })
307 .or_else(|| {
308 agents.iter().find(|a: &&serde_json::Value| {
309 a.get("agent_type")
310 .and_then(|v| v.as_str())
311 .map(|s| s == "general-purpose")
312 .unwrap_or(false)
313 })
314 })
315 .or_else(|| agents.first())
316 .cloned();
317
318 let base_agent =
319 base_agent.ok_or_else(|| "No agent available for forked execution".to_string())?;
320
321 // Prepare prompt messages
322 let prompt_messages = vec![Message::User(crate::types::message::UserMessage {
323 base: crate::types::message::MessageBase {
324 uuid: Some(uuid::Uuid::new_v4().to_string()),
325 parent_uuid: None,
326 timestamp: Some(chrono::Utc::now().to_rfc3339()),
327 created_at: None,
328 is_meta: None,
329 is_virtual: None,
330 is_compact_summary: None,
331 tool_use_result: None,
332 origin: None,
333 extra: HashMap::new(),
334 },
335 message_type: "user".to_string(),
336 message: crate::types::message::UserMessageContent {
337 content: crate::types::message::UserContent::Text(skill_content_for_msg),
338 extra: HashMap::new(),
339 },
340 })];
341
342 Ok(PreparedForkedContext {
343 skill_content: skill_content_for_result,
344 modified_get_app_state,
345 base_agent,
346 prompt_messages,
347 })
348}
349
350// ---------------------------------------------------------------------------
351// Helper: extract_result_text
352// ---------------------------------------------------------------------------
353
354/// Extracts result text from agent messages.
355#[allow(dead_code)]
356pub fn extract_result_text(agent_messages: &[Message], default_text: &str) -> String {
357 // Find the last assistant message and extract text from its content.
358 let last_assistant = agent_messages
359 .iter()
360 .rev()
361 .find(|m| matches!(m, Message::Assistant(_)));
362 match last_assistant {
363 Some(msg) => {
364 if let Ok(json) = serde_json::to_value(msg) {
365 let content = json
366 .get("message")
367 .and_then(|m| m.get("content"))
368 .and_then(|c| c.as_array());
369 if let Some(arr) = content {
370 let text = extract_text_content_json(arr, "\n");
371 if text.is_empty() {
372 return default_text.to_string();
373 }
374 return text;
375 }
376 }
377 default_text.to_string()
378 }
379 None => default_text.to_string(),
380 }
381}
382
383/// Extract text content from a message's content array.
384fn extract_text_content_json(content: &[serde_json::Value], separator: &str) -> String {
385 let texts: Vec<String> = content
386 .iter()
387 .filter(|block| block.get("type").and_then(|t| t.as_str()) == Some("text"))
388 .filter_map(|block| block.get("text").and_then(|t| t.as_str()))
389 .map(|t| t.to_string())
390 .collect();
391 texts.join(separator)
392}
393
394// ---------------------------------------------------------------------------
395// create_subagent_context
396// ---------------------------------------------------------------------------
397
398/// Creates an isolated `ToolUseContext` for subagents.
399///
400/// By default, ALL mutable state is isolated to prevent interference:
401/// - read_file_state: cloned from parent
402/// - abort_controller: new controller linked to parent (parent abort propagates)
403/// - get_app_state: wrapped to set should_avoid_permission_prompts
404/// - All mutation callbacks (set_app_state, etc.): no-op
405/// - Fresh collections: nested_memory_attachment_triggers, tool_decisions
406///
407/// Callers can:
408/// - Override specific fields via the overrides parameter
409/// - Explicitly opt-in to sharing specific callbacks (share_set_app_state, etc.)
410pub fn create_subagent_context(
411 parent_context: &ToolUseContext,
412 overrides: Option<&SubagentContextOverrides>,
413) -> ToolUseContext {
414 let overrides = overrides.cloned().unwrap_or_default();
415
416 // Determine abort_controller: explicit override > share parent's > new child linked to parent
417 // Since ToolUseContext stores abort_signal as Option<()>, we create a new AbortController
418 // linked to a default parent for the subagent context.
419 let child_controller = create_child_abort_controller(&AbortController::default(), None);
420 let _abort_controller = child_controller;
421
422 // Determine get_app_state - wrap to set should_avoid_permission_prompts unless sharing
423 // (if sharing abort_controller, it's an interactive agent that CAN show UI)
424 // Since get_app_state is a Box<dyn Fn...> and can't be cloned, we wrap it in Arc.
425 // We need to move the closure out of parent_context, which requires 'static.
426 // Since ToolUseContext.get_app_state is a Box<dyn Fn() -> Box<dyn Any> + Send + Sync>,
427 // we can't clone it. We use a no-op wrapper for now.
428 let get_app_state: Box<dyn Fn() -> Box<dyn std::any::Any> + Send + Sync> =
429 if let Some(fn_arc) = overrides.get_app_state {
430 Box::new(move || fn_arc())
431 } else {
432 // No-op wrapper - in a full impl, ToolUseContext would use Arc for this field
433 Box::new(|| Box::new(()) as Box<dyn std::any::Any>)
434 };
435
436 // Clone file state cache: cloned from parent (or from override)
437 let read_file_state = if let Some(override_cache) = &overrides.read_file_state {
438 Some(Arc::new(clone_file_state_cache(override_cache))
439 as Arc<dyn std::any::Any + Send + Sync>)
440 } else {
441 parent_context.read_file_state.clone()
442 };
443
444 // Content replacement state: override > clone of parent > None
445 // Clone by default (not fresh): cache-sharing forks process parent
446 // messages containing parent tool_use_ids. A fresh state would see
447 // them as unseen and make divergent replacement decisions → wire
448 // prefix differs → cache miss. A clone makes identical decisions → cache hit.
449 // For non-forking subagents the parent UUIDs never match — clone is harmless.
450 let content_replacement_state = overrides
451 .content_replacement_state
452 .clone()
453 .or_else(|| parent_context.content_replacement_state.clone());
454
455 // Denial tracking: isolated for non-sharing, shared for sharing
456 let local_denial_tracking = if overrides.share_set_app_state {
457 parent_context.local_denial_tracking.clone()
458 } else {
459 Some(Arc::new(std::sync::Mutex::new(
460 DenialTrackingState::default(),
461 )))
462 };
463
464 ToolUseContext {
465 // Mutable state - cloned by default to maintain isolation
466 read_file_state,
467 nested_memory_attachment_triggers: Some(Arc::new(std::sync::Mutex::new(HashSet::new()))),
468 loaded_nested_memory_paths: Some(Arc::new(std::sync::Mutex::new(HashSet::new()))),
469 dynamic_skill_dir_triggers: Some(Arc::new(std::sync::Mutex::new(HashSet::new()))),
470 // Per-subagent: tracks skills surfaced by discovery for was_discovered telemetry
471 discovered_skill_names: Some(Arc::new(std::sync::Mutex::new(HashSet::new()))),
472 tool_decisions: None,
473 // Content replacement state
474 content_replacement_state,
475 // Abort signal
476 abort_signal: None,
477 // AppState access
478 get_app_state,
479 set_app_state: if overrides.share_set_app_state {
480 // Can't clone Box<dyn Fn>, so we use a no-op wrapper that calls parent
481 // Since we can't move parent_context.set_app_state, we use a no-op here.
482 // In a full implementation, ToolUseContext would use Arc for these callbacks.
483 Box::new(|_: Box<dyn Fn(Box<dyn std::any::Any>) -> Box<dyn std::any::Any>>| {})
484 } else {
485 // No-op
486 Box::new(|_: Box<dyn Fn(Box<dyn std::any::Any>) -> Box<dyn std::any::Any>>| {})
487 },
488 // Task registration/kill must always reach the root store
489 // Can't clone Box<dyn Fn>, use no-op
490 set_app_state_for_tasks: Some(Box::new(
491 |_: Box<dyn Fn(Box<dyn std::any::Any>) -> Box<dyn std::any::Any>>| {},
492 )),
493 local_denial_tracking,
494 // Mutation callbacks - no-op by default (Box<dyn Fn> can't be cloned)
495 set_in_progress_tool_use_ids: {
496 type SetIdsFn = dyn Fn(&HashSet<String>) -> HashSet<String>;
497 Box::new(|_: Box<SetIdsFn>| {})
498 },
499 set_response_length: if overrides.share_set_response_length {
500 // Can't clone, use no-op
501 Box::new(|_: Box<dyn Fn(usize) -> usize>| {})
502 } else {
503 Box::new(|_: Box<dyn Fn(usize) -> usize>| {})
504 },
505 push_api_metrics_entry: None, // Can't clone Box<dyn Fn>
506 update_file_history_state: Box::new(
507 |_: Box<dyn Fn(Box<dyn std::any::Any>) -> Box<dyn std::any::Any>>| {},
508 ),
509 // Attribution is scoped and functional (prev => next) — use no-op since we can't clone
510 update_attribution_state: Box::new(
511 |_: Box<dyn Fn(Box<dyn std::any::Any>) -> Box<dyn std::any::Any>>| {},
512 ),
513 // UI callbacks - None for subagents (can't control parent UI)
514 add_notification: None,
515 set_tool_jsx: None,
516 set_stream_mode: None,
517 set_sdk_status: None,
518 open_message_selector: None,
519 // Fields that can be overridden or copied from parent
520 options: overrides
521 .options
522 .clone()
523 .unwrap_or_else(|| parent_context.options.clone()),
524 messages: overrides
525 .messages
526 .clone()
527 .unwrap_or_else(|| parent_context.messages.clone()),
528 // Generate new agent_id for subagents (each subagent should have its own ID)
529 agent_id: overrides
530 .agent_id
531 .clone()
532 .or_else(|| Some(create_agent_id(None))),
533 agent_type: overrides
534 .agent_type
535 .clone()
536 .or_else(|| parent_context.agent_type.clone()),
537 // Create new query tracking chain for subagent with incremented depth
538 query_tracking: Some(QueryChainTracking {
539 chain_id: uuid::Uuid::new_v4().to_string(),
540 depth: parent_context
541 .query_tracking
542 .as_ref()
543 .map(|t| t.depth + 1)
544 .unwrap_or(0),
545 }),
546 file_reading_limits: parent_context.file_reading_limits.clone(),
547 glob_limits: parent_context.glob_limits.clone(),
548 user_modified: parent_context.user_modified,
549 critical_system_reminder_experimental: overrides
550 .critical_system_reminder_experimental
551 .clone()
552 .or_else(|| parent_context.critical_system_reminder_experimental.clone()),
553 require_can_use_tool: overrides
554 .require_can_use_tool
555 .unwrap_or(parent_context.require_can_use_tool),
556 preserve_tool_use_results: parent_context.preserve_tool_use_results,
557 rendered_system_prompt: parent_context.rendered_system_prompt.clone(),
558 request_prompt: None, // Can't clone Arc<dyn Fn...>
559 tool_use_id: parent_context.tool_use_id.clone(),
560 handle_elicitation: None, // Can't clone Arc<dyn Fn...>
561 append_system_message: None, // Can't clone Box<dyn Fn>
562 send_os_notification: None, // Can't clone Box<dyn Fn>
563 set_has_interruptible_tool_in_progress: None, // Can't clone Box<dyn Fn>
564 set_conversation_id: None, // Can't clone Box<dyn Fn>
565 on_compact_progress: None, // Can't clone Box<dyn Fn>
566 }
567}
568
569// ---------------------------------------------------------------------------
570// run_forked_agent
571// ---------------------------------------------------------------------------
572
573/// Runs a forked agent query loop and tracks cache hit metrics.
574///
575/// This function:
576/// 1. Uses identical cache-safe params from parent to enable prompt caching
577/// 2. Accumulates usage across all query iterations
578/// 3. Logs tengu_fork_agent_query with full usage when complete
579///
580/// NOTE: The actual query loop integration depends on the `query` module which
581/// is still being translated. This implementation provides the full structure
582/// and will wire up to the query loop once it's complete.
583pub async fn run_forked_agent(config: ForkedAgentConfig) -> Result<ForkedAgentResult, String> {
584 let start_time = std::time::Instant::now();
585 let fork_label = config.fork_label.clone();
586 let query_source_str = config.query_source.0.clone();
587
588 let ForkedAgentConfig {
589 prompt_messages,
590 cache_safe_params,
591 query_source,
592 overrides,
593 max_output_tokens,
594 max_turns,
595 skip_cache_write,
596 ..
597 } = config;
598
599 let CacheSafeParams {
600 system_prompt,
601 user_context,
602 system_context,
603 tool_use_context,
604 fork_context_messages,
605 } = cache_safe_params;
606
607 // Create isolated context to prevent mutation of parent state
608 let overrides_ref = overrides.as_ref();
609 let _isolated_tool_use_context = create_subagent_context(&tool_use_context, overrides_ref);
610
611 // Do NOT filter_incomplete_tool_calls here — it drops the whole assistant on
612 // partial tool batches, orphaning the paired results (API 400). Dangling
613 // tool_uses are repaired downstream by ensure_tool_result_pairing in claude.ts,
614 // same as the main thread — identical post-repair prefix keeps the cache hit.
615 let mut initial_messages: Vec<Message> =
616 Vec::with_capacity(fork_context_messages.len() + prompt_messages.len());
617 initial_messages.extend_from_slice(&fork_context_messages);
618 initial_messages.extend_from_slice(&prompt_messages);
619
620 // Generate agent ID and record initial messages for transcript
621 // When skip_transcript is set, skip agent ID creation and all transcript I/O
622 let agent_id = if config.skip_transcript {
623 None
624 } else {
625 Some(create_agent_id(Some(&fork_label)))
626 };
627 let _ = agent_id; // reserved for transcript recording
628
629 // In a full implementation, this would call the query engine:
630 // let result = query_engine.submit_message(&prompt).await;
631 // let (output_messages, total_usage) = collect_query_results(result);
632
633 let _ = (
634 system_prompt,
635 user_context,
636 system_context,
637 query_source,
638 max_output_tokens,
639 max_turns,
640 skip_cache_write,
641 initial_messages,
642 );
643
644 // Placeholder result until query loop integration is complete
645 let output_messages: Vec<Message> = Vec::new();
646 let total_usage = Usage::default();
647
648 log::debug!(
649 "Forked agent [{}] finished: {} messages, total_usage: input={} output={} cache_read={} cache_create={}",
650 fork_label,
651 output_messages.len(),
652 total_usage.input_tokens,
653 total_usage.output_tokens,
654 total_usage.cache_read_input_tokens,
655 total_usage.cache_creation_input_tokens,
656 );
657
658 let duration_ms = start_time.elapsed().as_millis() as u64;
659
660 // Log the fork query metrics with full Usage
661 log_fork_agent_query_event(
662 &fork_label,
663 &query_source_str,
664 duration_ms,
665 output_messages.len(),
666 &total_usage,
667 tool_use_context.query_tracking.as_ref(),
668 );
669
670 Ok(ForkedAgentResult {
671 messages: output_messages,
672 total_usage,
673 })
674}
675
676/// Accumulate usage from a new usage entry.
677fn accumulate_usage(acc: &mut Usage, delta: &Usage) {
678 acc.input_tokens += delta.input_tokens;
679 acc.output_tokens += delta.output_tokens;
680 acc.cache_creation_input_tokens += delta.cache_creation_input_tokens;
681 acc.cache_read_input_tokens += delta.cache_read_input_tokens;
682 acc.server_tool_use.web_search_requests += delta.server_tool_use.web_search_requests;
683 acc.server_tool_use.web_fetch_requests += delta.server_tool_use.web_fetch_requests;
684 if let (Some(acc_cache), Some(delta_cache)) = (&mut acc.cache_creation, &delta.cache_creation) {
685 acc_cache.ephemeral_1h_input_tokens += delta_cache.ephemeral_1h_input_tokens;
686 acc_cache.ephemeral_5m_input_tokens += delta_cache.ephemeral_5m_input_tokens;
687 }
688 if acc.cache_creation.is_none() && delta.cache_creation.is_some() {
689 acc.cache_creation = delta.cache_creation.clone();
690 }
691 if delta.service_tier.is_some() {
692 acc.service_tier = delta.service_tier.clone();
693 }
694}
695
696// ---------------------------------------------------------------------------
697// log_fork_agent_query_event
698// ---------------------------------------------------------------------------
699
700/// Logs the tengu_fork_agent_query event with full Usage fields.
701fn log_fork_agent_query_event(
702 fork_label: &str,
703 query_source: &str,
704 duration_ms: u64,
705 message_count: usize,
706 total_usage: &Usage,
707 query_tracking: Option<&QueryChainTracking>,
708) {
709 // Calculate cache hit rate
710 let total_input_tokens = total_usage.input_tokens as u64
711 + total_usage.cache_creation_input_tokens as u64
712 + total_usage.cache_read_input_tokens as u64;
713 let cache_hit_rate = if total_input_tokens > 0 {
714 total_usage.cache_read_input_tokens as f64 / total_input_tokens as f64
715 } else {
716 0.0
717 };
718
719 log::debug!(
720 "tengu_fork_agent_query: fork_label={} query_source={} duration_ms={} message_count={} \
721 input_tokens={} output_tokens={} cache_read={} cache_create={} cache_hit_rate={:.4} \
722 chain_id={} depth={}",
723 fork_label,
724 query_source,
725 duration_ms,
726 message_count,
727 total_usage.input_tokens,
728 total_usage.output_tokens,
729 total_usage.cache_read_input_tokens,
730 total_usage.cache_creation_input_tokens,
731 cache_hit_rate,
732 query_tracking
733 .map(|t| t.chain_id.as_str())
734 .unwrap_or("none"),
735 query_tracking.map(|t| t.depth).unwrap_or(0),
736 );
737}
738
739// ---------------------------------------------------------------------------
740// is_in_fork_child (guard against recursive forking)
741// Source: ~/claudecode/openclaudecode/src/tools/AgentTool/forkSubagent.ts
742// ---------------------------------------------------------------------------
743
744use crate::constants::xml_tags::FORK_BOILERPLATE_TAG;
745
746/// Guard against recursive forking. Fork children keep the Agent tool in their
747/// tool pool for cache-identical tool definitions, so we reject fork attempts
748/// at call time by detecting the fork boilerplate tag in conversation history.
749pub fn is_in_fork_child(messages: &[Message]) -> bool {
750 messages.iter().any(|m| {
751 if let Message::User(user) = m {
752 match &user.message.content {
753 crate::types::message::UserContent::Blocks(content) => {
754 content.iter().any(|block| {
755 // UserContentBlock is a struct with block_type and text fields
756 let is_text = block.block_type == "text";
757 let has_tag = block
758 .text
759 .as_ref()
760 .map(|t| t.contains(FORK_BOILERPLATE_TAG))
761 .unwrap_or(false);
762 is_text && has_tag
763 })
764 }
765 crate::types::message::UserContent::Text(text) => {
766 text.contains(FORK_BOILERPLATE_TAG)
767 }
768 }
769 } else {
770 false
771 }
772 })
773}